Apache Airflow

Опубликовано: 15 марта 2024 г. · Обновлено: 10 апреля 2025 г.
#airflow #etl #data engineering #devops

Введение

Apache Airflow произвел революцию в оркестрации процессов обработки данных с момента своего появления в 2014 году в компании Airbnb. Сегодня эта платформа с открытым исходным кодом стала стандартом де-факто для автоматизации, планирования и мониторинга рабочих процессов данных. Особенно ценно то, как Airflow интегрируется с современными системами хранения и анализа данных, включая PostgreSQL, ClickHouse, хранилища данных (DWH) и инструменты бизнес-аналитики (BI).

Основы Apache Airflow

Концепции и архитектура

Airflow строится вокруг концепции DAG (Directed Acyclic Graph) — направленных ациклических графов, которые представляют последовательность задач и их зависимости.

Ключевые компоненты экосистемы Airflow:

  1. DAG — Python-файлы, определяющие структуру рабочего процесса
  2. Операторы — шаблоны для выполнения конкретных типов задач
  3. Задачи (Tasks) — экземпляры операторов внутри DAG
  4. Планировщик (Scheduler) — управляет выполнением задач
  5. Веб-сервер — интерфейс для мониторинга и управления
  6. Исполнители (Executors) — определяют способ выполнения задач

Настройка и развертывание

Современные подходы к развертыванию Airflow включают:

  • Контейнеризацию с Docker и оркестрацию с Kubernetes
  • Управляемые сервисы от облачных провайдеров (AWS MWAA, GCP Cloud Composer)
  • Локальные установки для разработки и небольших проектов

Интеграция Airflow с PostgreSQL

PostgreSQL может выполнять две роли в экосистеме Airflow:

PostgreSQL как метаданные Airflow

По умолчанию Airflow использует SQLite, но для производственных сред рекомендуется PostgreSQL:

# Конфигурация подключения к PostgreSQL в airflow.cfg
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@postgres:5432/airflow

PostgreSQL как источник или назначение данных

Airflow предоставляет специальные операторы для работы с PostgreSQL:

from airflow.providers.postgres.operators.postgres import PostgresOperator

create_table = PostgresOperator(
    task_id="create_table",
    postgres_conn_id="postgres_default",
    sql="""
        CREATE TABLE IF NOT EXISTS sales (
            id SERIAL PRIMARY KEY,
            date DATE NOT NULL,
            amount DECIMAL(10,2) NOT NULL
        );
    """,
    dag=dag
)

Для передачи данных между системами:

from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator

def transfer_data(**kwargs):
    source_hook = PostgresHook(postgres_conn_id='postgres_source')
    target_hook = PostgresHook(postgres_conn_id='postgres_target')
    
    data = source_hook.get_records("SELECT * FROM source_table")
    
    for row in data:
        target_hook.run(f"INSERT INTO target_table VALUES {row}")

transfer_task = PythonOperator(
    task_id='transfer_postgres_data',
    python_callable=transfer_data,
    dag=dag
)

Интеграция Airflow с ClickHouse

ClickHouse — высокопроизводительная колоночная СУБД, оптимизированная для аналитических запросов, становится всё более популярной для обработки больших объемов данных.

Подключение к ClickHouse из Airflow

Для работы с ClickHouse в Airflow требуется установить дополнительные пакеты:

pip install apache-airflow-providers-clickhouse

Затем можно использовать ClickHouseHook и ClickHouseOperator:

from airflow.providers.clickhouse.operators.clickhouse import ClickHouseOperator
from airflow.providers.clickhouse.hooks.clickhouse import ClickHouseHook

# Выполнение SQL-запроса в ClickHouse
clickhouse_query = ClickHouseOperator(
    task_id='execute_clickhouse_query',
    clickhouse_conn_id='clickhouse_conn',
    sql="""
        SELECT 
            toDate(time) AS date,
            count() AS hits,
            uniqExact(user_id) AS users
        FROM events
        WHERE date >= '{{ ds }}' AND date <= '{{ ds }}'
        GROUP BY date
    """,
    dag=dag
)

# Использование хука для более сложной логики
def process_clickhouse_data(**kwargs):
    ch_hook = ClickHouseHook(clickhouse_conn_id='clickhouse_conn')
    results = ch_hook.get_records("SELECT count() FROM events")
    return results[0][0]

count_task = PythonOperator(
    task_id='count_records',
    python_callable=process_clickhouse_data,
    dag=dag
)

Типичные сценарии использования ClickHouse с Airflow

  1. Агрегация данных для отчетов:

    aggregate_data = ClickHouseOperator(
        task_id='aggregate_daily_stats',
        clickhouse_conn_id='clickhouse_conn',
        sql="""
            INSERT INTO daily_stats
            SELECT 
                toDate(timestamp) AS date,
                countIf(action='view') AS views,
                countIf(action='purchase') AS purchases,
                uniqExact(user_id) AS unique_users
            FROM user_events
            WHERE toDate(timestamp) = '{{ ds }}'
            GROUP BY date
        """,
        dag=dag
    )
  2. Передача данных между PostgreSQL и ClickHouse:

    def postgres_to_clickhouse(**kwargs):
        pg_hook = PostgresHook(postgres_conn_id='postgres_conn')
        ch_hook = ClickHouseHook(clickhouse_conn_id='clickhouse_conn')
        
        # Получение данных из PostgreSQL
        data = pg_hook.get_pandas_df("SELECT * FROM transactions WHERE date = %s", parameters=(kwargs['ds'],))
        
        # Запись данных в ClickHouse через pandas
        if not data.empty:
            ch_hook.insert_dataframe(
                'transactions',
                data,
                target_fields=['id', 'date', 'amount', 'user_id', 'product_id']
            )
    
    transfer_task = PythonOperator(
        task_id='transfer_postgres_to_clickhouse',
        python_callable=postgres_to_clickhouse,
        provide_context=True,
        dag=dag
    )

Интеграция с хранилищами данных (DWH)

Хранилища данных (Data Warehouses) являются центральными компонентами современных архитектур данных, и Airflow отлично подходит для автоматизации ETL-процессов.

Типы хранилищ данных

  1. Облачные DWH:

    • Google BigQuery
    • Amazon Redshift
    • Snowflake
    • Azure Synapse Analytics
  2. Локальные/Гибридные решения:

    • Greenplum
    • Teradata
    • Vertica
    • ClickHouse (как аналитическое хранилище)

ETL-паттерны с Airflow для DWH

Инкрементальная загрузка данных

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.models import Variable
import pandas as pd
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1
}

dag = DAG(
    'incremental_load_to_dwh',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False
)

def get_last_loaded_date():
    # Получаем последнюю загруженную дату из переменных Airflow
    last_date = Variable.get('last_load_date', default_var='2023-01-01')
    return last_date

def update_last_loaded_date(**kwargs):
    # Обновляем дату после успешной загрузки
    Variable.set('last_load_date', kwargs['ds'])

def incremental_load(**kwargs):
    source_hook = PostgresHook(postgres_conn_id='source_postgres')
    target_hook = PostgresHook(postgres_conn_id='dwh_postgres')
    
    last_date = get_last_loaded_date()
    current_date = kwargs['ds']
    
    # Загружаем только новые данные
    query = f"""
        SELECT * FROM transactions 
        WHERE transaction_date > '{last_date}' 
        AND transaction_date <= '{current_date}'
    """
    
    df = source_hook.get_pandas_df(query)
    
    if not df.empty:
        # Преобразование данных (T в ETL)
        df['amount'] = df['amount'].astype(float)
        df['processed_timestamp'] = datetime.now()
        
        # Загрузка в DWH
        target_hook.insert_rows(
            table='fact_transactions',
            rows=df.values.tolist(),
            target_fields=df.columns.tolist()
        )
    
    return current_date

extract_transform_load = PythonOperator(
    task_id='extract_transform_load',
    python_callable=incremental_load,
    provide_context=True,
    dag=dag
)

update_checkpoint = PythonOperator(
    task_id='update_checkpoint',
    python_callable=update_last_loaded_date,
    provide_context=True,
    dag=dag
)

extract_transform_load >> update_checkpoint

Звездообразная схема и измерения

Создание типичной звездообразной схемы (Star Schema) для DWH:

# Создание таблиц измерений и фактов
create_dim_date = PostgresOperator(
    task_id='create_dim_date',
    postgres_conn_id='dwh_postgres',
    sql="""
        CREATE TABLE IF NOT EXISTS dim_date (
            date_id SERIAL PRIMARY KEY,
            full_date DATE NOT NULL,
            day INT NOT NULL,
            month INT NOT NULL,
            year INT NOT NULL,
            quarter INT NOT NULL,
            day_of_week INT NOT NULL,
            is_weekend BOOLEAN NOT NULL
        )
    """,
    dag=dag
)

create_dim_product = PostgresOperator(
    task_id='create_dim_product',
    postgres_conn_id='dwh_postgres',
    sql="""
        CREATE TABLE IF NOT EXISTS dim_product (
            product_id SERIAL PRIMARY KEY,
            product_name VARCHAR(100) NOT NULL,
            category VARCHAR(50) NOT NULL,
            price DECIMAL(10,2) NOT NULL,
            created_at TIMESTAMP NOT NULL,
            updated_at TIMESTAMP NOT NULL
        )
    """,
    dag=dag
)

create_fact_sales = PostgresOperator(
    task_id='create_fact_sales',
    postgres_conn_id='dwh_postgres',
    sql="""
        CREATE TABLE IF NOT EXISTS fact_sales (
            sale_id SERIAL PRIMARY KEY,
            date_id INT REFERENCES dim_date(date_id),
            product_id INT REFERENCES dim_product(product_id),
            quantity INT NOT NULL,
            total_amount DECIMAL(10,2) NOT NULL,
            discount_amount DECIMAL(10,2) NOT NULL,
            transaction_id VARCHAR(50) NOT NULL,
            created_at TIMESTAMP NOT NULL
        )
    """,
    dag=dag
)

# Определение порядка выполнения
create_dim_date >> create_dim_product >> create_fact_sales

Интеграция с инструментами бизнес-аналитики (BI)

Airflow может не только загружать данные в хранилища, но и автоматизировать генерацию отчетов и дашбордов BI-систем.

Популярные BI-инструменты для интеграции с Airflow

  1. Tableau
  2. Power BI
  3. Looker
  4. Metabase
  5. Superset
  6. Redash

Автоматизация BI-процессов с помощью Airflow

Обновление извлечений Tableau

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1
}

dag = DAG(
    'refresh_tableau_extracts',
    default_args=default_args,
    schedule_interval='0 2 * * *',  # Каждый день в 2:00
    catchup=False
)

# Обновление экстрактов Tableau с помощью табкмд
refresh_extract = BashOperator(
    task_id='refresh_tableau_extract',
    bash_command='tabcmd refreshextracts --project "Sales Reports" --workbook "Daily Sales Dashboard"',
    dag=dag
)

Генерация отчетов и их доставка

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
import pandas as pd
import io
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1
}

dag = DAG(
    'generate_and_email_report',
    default_args=default_args,
    schedule_interval='0 7 * * 1',  # Каждый понедельник в 7:00
    catchup=False
)

def generate_weekly_report(**kwargs):
    dwh_hook = PostgresHook(postgres_conn_id='dwh_postgres')
    
    # Запрос для отчета о продажах за предыдущую неделю
    query = """
        SELECT 
            d.full_date AS date,
            p.category,
            p.product_name,
            SUM(f.quantity) AS total_quantity,
            SUM(f.total_amount) AS total_sales
        FROM fact_sales f
        JOIN dim_date d ON f.date_id = d.date_id
        JOIN dim_product p ON f.product_id = p.product_id
        WHERE d.full_date BETWEEN 
            DATE '{{ ds }}' - INTERVAL '7 days' AND 
            DATE '{{ ds }}'
        GROUP BY d.full_date, p.category, p.product_name
        ORDER BY d.full_date, total_sales DESC
    """
    
    df = dwh_hook.get_pandas_df(query)
    
    # Формирование Excel-отчета
    buffer = io.BytesIO()
    with pd.ExcelWriter(buffer) as writer:
        df.to_excel(writer, sheet_name='Weekly Sales', index=False)
    
    buffer.seek(0)
    
    # Сохранение отчета для передачи в следующую задачу
    kwargs['ti'].xcom_push(key='weekly_report', value=buffer.getvalue())
    
    return 'Weekly sales report generated successfully'

# Создание задачи для генерации отчета
generate_report = PythonOperator(
    task_id='generate_weekly_report',
    python_callable=generate_weekly_report,
    provide_context=True,
    dag=dag
)

# Отправка отчета по электронной почте
email_report = EmailOperator(
    task_id='email_weekly_report',
    to=['analytics@example.com', 'sales@example.com'],
    subject='Weekly Sales Report - {{ ds }}',
    html_content="""
        <p>Добрый день,</p>
        <p>Прикрепляю еженедельный отчет по продажам за период до {{ ds }}.</p>
        <p>С уважением,<br>Система аналитической отчетности</p>
    """,
    files=['/tmp/weekly_sales_{{ ds }}.xlsx'],
    dag=dag
)

generate_report >> email_report

Объединение всех компонентов: комплексный пример

Создадим комплексный пример DAG, который:

  1. Извлекает данные из PostgreSQL
  2. Преобразует их и загружает в ClickHouse для аналитики
  3. Создает агрегированные таблицы в формате DWH
  4. Обновляет BI-дашборды
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.clickhouse.hooks.clickhouse import ClickHouseHook
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
import pandas as pd

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True,
    'email': ['data_team@example.com']
}

dag = DAG(
    'end_to_end_data_pipeline',
    default_args=default_args,
    description='Комплексный ETL процесс с PostgreSQL, ClickHouse и BI',
    schedule_interval='0 1 * * *',  # Ежедневно в 1:00
    catchup=False
)

def extract_from_postgres(**kwargs):
    """Извлечение данных из PostgreSQL"""
    execution_date = kwargs['ds']
    pg_hook = PostgresHook(postgres_conn_id='postgres_source')
    
    # Загрузка новых транзакций
    transactions_query = f"""
        SELECT 
            t.transaction_id,
            t.user_id,
            t.product_id,
            t.quantity,
            t.amount,
            t.discount,
            t.transaction_date,
            u.email,
            u.country,
            p.product_name,
            p.category,
            p.price
        FROM transactions t
        JOIN users u ON t.user_id = u.user_id
        JOIN products p ON t.product_id = p.product_id
        WHERE t.transaction_date = '{execution_date}'
    """
    
    df = pg_hook.get_pandas_df(transactions_query)
    
    # Сохраняем DataFrame во временной переменной
    kwargs['ti'].xcom_push(key='raw_transactions', value=df.to_json())
    return f"Extracted {len(df)} transactions for {execution_date}"

def transform_load_to_clickhouse(**kwargs):
    """Преобразование и загрузка данных в ClickHouse"""
    ti = kwargs['ti']
    raw_data_json = ti.xcom_pull(task_ids='extract_from_postgres', key='raw_transactions')
    df = pd.read_json(raw_data_json)
    
    if df.empty:
        return "No data to load"
    
    # Преобразование данных
    df['total_amount'] = df['amount'] - df['discount']
    df['processing_timestamp'] = datetime.now()
    
    # Загрузка в ClickHouse
    ch_hook = ClickHouseHook(clickhouse_conn_id='clickhouse_conn')
    
    # Создаем таблицу, если она не существует
    create_table_sql = """
        CREATE TABLE IF NOT EXISTS transactions (
            transaction_id String,
            user_id UInt32,
            product_id UInt32,
            quantity UInt16,
            amount Decimal(10,2),
            discount Decimal(10,2),
            total_amount Decimal(10,2),
            transaction_date Date,
            email String,
            country String,
            product_name String,
            category String,
            price Decimal(10,2),
            processing_timestamp DateTime
        ) ENGINE = MergeTree()
        ORDER BY (transaction_date, user_id)
    """
    ch_hook.run(create_table_sql)
    
    # Вставка данных
    ch_hook.insert_dataframe(
        'transactions',
        df,
        target_fields=[
            'transaction_id', 'user_id', 'product_id', 'quantity', 
            'amount', 'discount', 'total_amount', 'transaction_date', 
            'email', 'country', 'product_name', 'category', 
            'price', 'processing_timestamp'
        ]
    )
    
    return f"Loaded {len(df)} records to ClickHouse"

def create_dwh_aggregations(**kwargs):
    """Создание агрегированных таблиц в DWH (ClickHouse)"""
    execution_date = kwargs['ds']
    ch_hook = ClickHouseHook(clickhouse_conn_id='clickhouse_conn')
    
    # Создаем агрегированную таблицу для дневных продаж по категориям
    daily_agg_sql = f"""
        CREATE TABLE IF NOT EXISTS daily_sales_by_category (
            report_date Date,
            category String,
            total_quantity UInt32,
            total_sales Decimal(15,2),
            avg_discount_percentage Decimal(5,2),
            unique_customers UInt32
        ) ENGINE = SummingMergeTree()
        ORDER BY (report_date, category)
    """
    ch_hook.run(daily_agg_sql)
    
    # Заполнение агрегированной таблицы
    insert_agg_sql = f"""
        INSERT INTO daily_sales_by_category
        SELECT
            transaction_date AS report_date,
            category,
            sum(quantity) AS total_quantity,
            sum(total_amount) AS total_sales,
            avg(discount / amount * 100) AS avg_discount_percentage,
            uniqExact(user_id) AS unique_customers
        FROM transactions
        WHERE transaction_date = '{execution_date}'
        GROUP BY transaction_date, category
    """
    ch_hook.run(insert_agg_sql)
    
    return f"Created DWH aggregations for {execution_date}"

def update_bi_dashboards(**kwargs):
    """Обновление BI дашбордов"""
    # В реальном сценарии здесь может быть API-вызов к BI-системе
    # Для примера используем bash-команду
    return "BI dashboards updated successfully"

# Определение задач
extract_task = PythonOperator(
    task_id='extract_from_postgres',
    python_callable=extract_from_postgres,
    provide_context=True,
    dag=dag
)

transform_load_task = PythonOperator(
    task_id='transform_load_to_clickhouse',
    python_callable=transform_load_to_clickhouse,
    provide_context=True,
    dag=dag
)

create_aggregations_task = PythonOperator(
    task_id='create_dwh_aggregations',
    python_callable=create_dwh_aggregations,
    provide_context=True, 
    dag=dag
)

update_bi_task = PythonOperator(
    task_id='update_bi_dashboards',
    python_callable=update_bi_dashboards,
    provide_context=True,
    dag=dag
)

# Определение зависимостей между задачами (последовательность выполнения)
extract_task >> transform_load_task >> create_aggregations_task >> update_bi_task

Лучшие практики для работы с Airflow в современных стеках данных

Организация кода

  1. Модульная структура DAG:

    • Отдельные файлы для общих функций и операторов
    • Использование шаблонов для типовых задач
  2. Обработка ошибок и восстановление:

    • Настройка политик повторных попыток для каждого DAG
    • Реализация идемпотентных операций
    • Логирование на всех этапах

Оптимизация производительности

  1. Для PostgreSQL:

    • Использование bulk-операций вместо отдельных запросов
    • Индексирование таблиц для оптимизации запросов
    • Ограничение размеров выборок для обработки в памяти
  2. Для ClickHouse:

    • Использование правильной структуры таблиц (MergeTree семейство)
    • Применение предварительной агрегации
    • Использование distributed-таблиц для масштабирования

Мониторинг и отладка

  1. Логирование задач:

    def task_with_logging(**kwargs):
        import logging
        logger = logging.getLogger("airflow.task")
        logger.info("Starting task execution")
        # ... выполнение задачи ...
        logger.info("Task completed successfully")
  2. Метрики выполнения:

    • Интеграция с Prometheus/Grafana
    • Отслеживание времени выполнения задач

Заключение

Apache Airflow представляет собой мощную и гибкую платформу для оркестрации процессов обработки данных. В сочетании с такими технологиями, как PostgreSQL, ClickHouse, современные хранилища данных и инструменты бизнес-аналитики, Airflow образует основу для построения надежных и масштабируемых пайплайнов данных.

Правильная интеграция этих компонентов позволяет организациям:

  1. Автоматизировать сложные процессы обработки данных
  2. Обеспечить надежность и отказоустойчивость ETL-пайплайнов
  3. Сократить время от получения сырых данных до аналитических выводов
  4. Масштабировать системы обработки данных вместе с ростом организации

При проектировании архитектуры данных с использованием Airflow и других рассмотренных технологий важно учитывать особенности каждого компонента и следовать отраслевым лучшим практикам, чтобы создать эффективную и устойчивую систему.