
Apache Airflow
Введение
Apache Airflow произвел революцию в оркестрации процессов обработки данных с момента своего появления в 2014 году в компании Airbnb. Сегодня эта платформа с открытым исходным кодом стала стандартом де-факто для автоматизации, планирования и мониторинга рабочих процессов данных. Особенно ценно то, как Airflow интегрируется с современными системами хранения и анализа данных, включая PostgreSQL, ClickHouse, хранилища данных (DWH) и инструменты бизнес-аналитики (BI).
Основы Apache Airflow
Концепции и архитектура
Airflow строится вокруг концепции DAG (Directed Acyclic Graph) — направленных ациклических графов, которые представляют последовательность задач и их зависимости.
Ключевые компоненты экосистемы Airflow:
- DAG — Python-файлы, определяющие структуру рабочего процесса
- Операторы — шаблоны для выполнения конкретных типов задач
- Задачи (Tasks) — экземпляры операторов внутри DAG
- Планировщик (Scheduler) — управляет выполнением задач
- Веб-сервер — интерфейс для мониторинга и управления
- Исполнители (Executors) — определяют способ выполнения задач
Настройка и развертывание
Современные подходы к развертыванию Airflow включают:
- Контейнеризацию с Docker и оркестрацию с Kubernetes
- Управляемые сервисы от облачных провайдеров (AWS MWAA, GCP Cloud Composer)
- Локальные установки для разработки и небольших проектов
Интеграция Airflow с PostgreSQL
PostgreSQL может выполнять две роли в экосистеме Airflow:
PostgreSQL как метаданные Airflow
По умолчанию Airflow использует SQLite, но для производственных сред рекомендуется PostgreSQL:
# Конфигурация подключения к PostgreSQL в airflow.cfg
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
PostgreSQL как источник или назначение данных
Airflow предоставляет специальные операторы для работы с PostgreSQL:
from airflow.providers.postgres.operators.postgres import PostgresOperator
create_table = PostgresOperator(
task_id="create_table",
postgres_conn_id="postgres_default",
sql="""
CREATE TABLE IF NOT EXISTS sales (
id SERIAL PRIMARY KEY,
date DATE NOT NULL,
amount DECIMAL(10,2) NOT NULL
);
""",
dag=dag
)
Для передачи данных между системами:
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator
def transfer_data(**kwargs):
source_hook = PostgresHook(postgres_conn_id='postgres_source')
target_hook = PostgresHook(postgres_conn_id='postgres_target')
data = source_hook.get_records("SELECT * FROM source_table")
for row in data:
target_hook.run(f"INSERT INTO target_table VALUES {row}")
transfer_task = PythonOperator(
task_id='transfer_postgres_data',
python_callable=transfer_data,
dag=dag
)
Интеграция Airflow с ClickHouse
ClickHouse — высокопроизводительная колоночная СУБД, оптимизированная для аналитических запросов, становится всё более популярной для обработки больших объемов данных.
Подключение к ClickHouse из Airflow
Для работы с ClickHouse в Airflow требуется установить дополнительные пакеты:
pip install apache-airflow-providers-clickhouse
Затем можно использовать ClickHouseHook и ClickHouseOperator:
from airflow.providers.clickhouse.operators.clickhouse import ClickHouseOperator
from airflow.providers.clickhouse.hooks.clickhouse import ClickHouseHook
# Выполнение SQL-запроса в ClickHouse
clickhouse_query = ClickHouseOperator(
task_id='execute_clickhouse_query',
clickhouse_conn_id='clickhouse_conn',
sql="""
SELECT
toDate(time) AS date,
count() AS hits,
uniqExact(user_id) AS users
FROM events
WHERE date >= '{{ ds }}' AND date <= '{{ ds }}'
GROUP BY date
""",
dag=dag
)
# Использование хука для более сложной логики
def process_clickhouse_data(**kwargs):
ch_hook = ClickHouseHook(clickhouse_conn_id='clickhouse_conn')
results = ch_hook.get_records("SELECT count() FROM events")
return results[0][0]
count_task = PythonOperator(
task_id='count_records',
python_callable=process_clickhouse_data,
dag=dag
)
Типичные сценарии использования ClickHouse с Airflow
-
Агрегация данных для отчетов:
aggregate_data = ClickHouseOperator( task_id='aggregate_daily_stats', clickhouse_conn_id='clickhouse_conn', sql=""" INSERT INTO daily_stats SELECT toDate(timestamp) AS date, countIf(action='view') AS views, countIf(action='purchase') AS purchases, uniqExact(user_id) AS unique_users FROM user_events WHERE toDate(timestamp) = '{{ ds }}' GROUP BY date """, dag=dag )
-
Передача данных между PostgreSQL и ClickHouse:
def postgres_to_clickhouse(**kwargs): pg_hook = PostgresHook(postgres_conn_id='postgres_conn') ch_hook = ClickHouseHook(clickhouse_conn_id='clickhouse_conn') # Получение данных из PostgreSQL data = pg_hook.get_pandas_df("SELECT * FROM transactions WHERE date = %s", parameters=(kwargs['ds'],)) # Запись данных в ClickHouse через pandas if not data.empty: ch_hook.insert_dataframe( 'transactions', data, target_fields=['id', 'date', 'amount', 'user_id', 'product_id'] ) transfer_task = PythonOperator( task_id='transfer_postgres_to_clickhouse', python_callable=postgres_to_clickhouse, provide_context=True, dag=dag )
Интеграция с хранилищами данных (DWH)
Хранилища данных (Data Warehouses) являются центральными компонентами современных архитектур данных, и Airflow отлично подходит для автоматизации ETL-процессов.
Типы хранилищ данных
-
Облачные DWH:
- Google BigQuery
- Amazon Redshift
- Snowflake
- Azure Synapse Analytics
-
Локальные/Гибридные решения:
- Greenplum
- Teradata
- Vertica
- ClickHouse (как аналитическое хранилище)
ETL-паттерны с Airflow для DWH
Инкрементальная загрузка данных
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.models import Variable
import pandas as pd
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1
}
dag = DAG(
'incremental_load_to_dwh',
default_args=default_args,
schedule_interval='@daily',
catchup=False
)
def get_last_loaded_date():
# Получаем последнюю загруженную дату из переменных Airflow
last_date = Variable.get('last_load_date', default_var='2023-01-01')
return last_date
def update_last_loaded_date(**kwargs):
# Обновляем дату после успешной загрузки
Variable.set('last_load_date', kwargs['ds'])
def incremental_load(**kwargs):
source_hook = PostgresHook(postgres_conn_id='source_postgres')
target_hook = PostgresHook(postgres_conn_id='dwh_postgres')
last_date = get_last_loaded_date()
current_date = kwargs['ds']
# Загружаем только новые данные
query = f"""
SELECT * FROM transactions
WHERE transaction_date > '{last_date}'
AND transaction_date <= '{current_date}'
"""
df = source_hook.get_pandas_df(query)
if not df.empty:
# Преобразование данных (T в ETL)
df['amount'] = df['amount'].astype(float)
df['processed_timestamp'] = datetime.now()
# Загрузка в DWH
target_hook.insert_rows(
table='fact_transactions',
rows=df.values.tolist(),
target_fields=df.columns.tolist()
)
return current_date
extract_transform_load = PythonOperator(
task_id='extract_transform_load',
python_callable=incremental_load,
provide_context=True,
dag=dag
)
update_checkpoint = PythonOperator(
task_id='update_checkpoint',
python_callable=update_last_loaded_date,
provide_context=True,
dag=dag
)
extract_transform_load >> update_checkpoint
Звездообразная схема и измерения
Создание типичной звездообразной схемы (Star Schema) для DWH:
# Создание таблиц измерений и фактов
create_dim_date = PostgresOperator(
task_id='create_dim_date',
postgres_conn_id='dwh_postgres',
sql="""
CREATE TABLE IF NOT EXISTS dim_date (
date_id SERIAL PRIMARY KEY,
full_date DATE NOT NULL,
day INT NOT NULL,
month INT NOT NULL,
year INT NOT NULL,
quarter INT NOT NULL,
day_of_week INT NOT NULL,
is_weekend BOOLEAN NOT NULL
)
""",
dag=dag
)
create_dim_product = PostgresOperator(
task_id='create_dim_product',
postgres_conn_id='dwh_postgres',
sql="""
CREATE TABLE IF NOT EXISTS dim_product (
product_id SERIAL PRIMARY KEY,
product_name VARCHAR(100) NOT NULL,
category VARCHAR(50) NOT NULL,
price DECIMAL(10,2) NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL
)
""",
dag=dag
)
create_fact_sales = PostgresOperator(
task_id='create_fact_sales',
postgres_conn_id='dwh_postgres',
sql="""
CREATE TABLE IF NOT EXISTS fact_sales (
sale_id SERIAL PRIMARY KEY,
date_id INT REFERENCES dim_date(date_id),
product_id INT REFERENCES dim_product(product_id),
quantity INT NOT NULL,
total_amount DECIMAL(10,2) NOT NULL,
discount_amount DECIMAL(10,2) NOT NULL,
transaction_id VARCHAR(50) NOT NULL,
created_at TIMESTAMP NOT NULL
)
""",
dag=dag
)
# Определение порядка выполнения
create_dim_date >> create_dim_product >> create_fact_sales
Интеграция с инструментами бизнес-аналитики (BI)
Airflow может не только загружать данные в хранилища, но и автоматизировать генерацию отчетов и дашбордов BI-систем.
Популярные BI-инструменты для интеграции с Airflow
- Tableau
- Power BI
- Looker
- Metabase
- Superset
- Redash
Автоматизация BI-процессов с помощью Airflow
Обновление извлечений Tableau
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1
}
dag = DAG(
'refresh_tableau_extracts',
default_args=default_args,
schedule_interval='0 2 * * *', # Каждый день в 2:00
catchup=False
)
# Обновление экстрактов Tableau с помощью табкмд
refresh_extract = BashOperator(
task_id='refresh_tableau_extract',
bash_command='tabcmd refreshextracts --project "Sales Reports" --workbook "Daily Sales Dashboard"',
dag=dag
)
Генерация отчетов и их доставка
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
import pandas as pd
import io
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1
}
dag = DAG(
'generate_and_email_report',
default_args=default_args,
schedule_interval='0 7 * * 1', # Каждый понедельник в 7:00
catchup=False
)
def generate_weekly_report(**kwargs):
dwh_hook = PostgresHook(postgres_conn_id='dwh_postgres')
# Запрос для отчета о продажах за предыдущую неделю
query = """
SELECT
d.full_date AS date,
p.category,
p.product_name,
SUM(f.quantity) AS total_quantity,
SUM(f.total_amount) AS total_sales
FROM fact_sales f
JOIN dim_date d ON f.date_id = d.date_id
JOIN dim_product p ON f.product_id = p.product_id
WHERE d.full_date BETWEEN
DATE '{{ ds }}' - INTERVAL '7 days' AND
DATE '{{ ds }}'
GROUP BY d.full_date, p.category, p.product_name
ORDER BY d.full_date, total_sales DESC
"""
df = dwh_hook.get_pandas_df(query)
# Формирование Excel-отчета
buffer = io.BytesIO()
with pd.ExcelWriter(buffer) as writer:
df.to_excel(writer, sheet_name='Weekly Sales', index=False)
buffer.seek(0)
# Сохранение отчета для передачи в следующую задачу
kwargs['ti'].xcom_push(key='weekly_report', value=buffer.getvalue())
return 'Weekly sales report generated successfully'
# Создание задачи для генерации отчета
generate_report = PythonOperator(
task_id='generate_weekly_report',
python_callable=generate_weekly_report,
provide_context=True,
dag=dag
)
# Отправка отчета по электронной почте
email_report = EmailOperator(
task_id='email_weekly_report',
to=['analytics@example.com', 'sales@example.com'],
subject='Weekly Sales Report - {{ ds }}',
html_content="""
<p>Добрый день,</p>
<p>Прикрепляю еженедельный отчет по продажам за период до {{ ds }}.</p>
<p>С уважением,<br>Система аналитической отчетности</p>
""",
files=['/tmp/weekly_sales_{{ ds }}.xlsx'],
dag=dag
)
generate_report >> email_report
Объединение всех компонентов: комплексный пример
Создадим комплексный пример DAG, который:
- Извлекает данные из PostgreSQL
- Преобразует их и загружает в ClickHouse для аналитики
- Создает агрегированные таблицы в формате DWH
- Обновляет BI-дашборды
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.clickhouse.hooks.clickhouse import ClickHouseHook
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
import pandas as pd
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 2,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
'email': ['data_team@example.com']
}
dag = DAG(
'end_to_end_data_pipeline',
default_args=default_args,
description='Комплексный ETL процесс с PostgreSQL, ClickHouse и BI',
schedule_interval='0 1 * * *', # Ежедневно в 1:00
catchup=False
)
def extract_from_postgres(**kwargs):
"""Извлечение данных из PostgreSQL"""
execution_date = kwargs['ds']
pg_hook = PostgresHook(postgres_conn_id='postgres_source')
# Загрузка новых транзакций
transactions_query = f"""
SELECT
t.transaction_id,
t.user_id,
t.product_id,
t.quantity,
t.amount,
t.discount,
t.transaction_date,
u.email,
u.country,
p.product_name,
p.category,
p.price
FROM transactions t
JOIN users u ON t.user_id = u.user_id
JOIN products p ON t.product_id = p.product_id
WHERE t.transaction_date = '{execution_date}'
"""
df = pg_hook.get_pandas_df(transactions_query)
# Сохраняем DataFrame во временной переменной
kwargs['ti'].xcom_push(key='raw_transactions', value=df.to_json())
return f"Extracted {len(df)} transactions for {execution_date}"
def transform_load_to_clickhouse(**kwargs):
"""Преобразование и загрузка данных в ClickHouse"""
ti = kwargs['ti']
raw_data_json = ti.xcom_pull(task_ids='extract_from_postgres', key='raw_transactions')
df = pd.read_json(raw_data_json)
if df.empty:
return "No data to load"
# Преобразование данных
df['total_amount'] = df['amount'] - df['discount']
df['processing_timestamp'] = datetime.now()
# Загрузка в ClickHouse
ch_hook = ClickHouseHook(clickhouse_conn_id='clickhouse_conn')
# Создаем таблицу, если она не существует
create_table_sql = """
CREATE TABLE IF NOT EXISTS transactions (
transaction_id String,
user_id UInt32,
product_id UInt32,
quantity UInt16,
amount Decimal(10,2),
discount Decimal(10,2),
total_amount Decimal(10,2),
transaction_date Date,
email String,
country String,
product_name String,
category String,
price Decimal(10,2),
processing_timestamp DateTime
) ENGINE = MergeTree()
ORDER BY (transaction_date, user_id)
"""
ch_hook.run(create_table_sql)
# Вставка данных
ch_hook.insert_dataframe(
'transactions',
df,
target_fields=[
'transaction_id', 'user_id', 'product_id', 'quantity',
'amount', 'discount', 'total_amount', 'transaction_date',
'email', 'country', 'product_name', 'category',
'price', 'processing_timestamp'
]
)
return f"Loaded {len(df)} records to ClickHouse"
def create_dwh_aggregations(**kwargs):
"""Создание агрегированных таблиц в DWH (ClickHouse)"""
execution_date = kwargs['ds']
ch_hook = ClickHouseHook(clickhouse_conn_id='clickhouse_conn')
# Создаем агрегированную таблицу для дневных продаж по категориям
daily_agg_sql = f"""
CREATE TABLE IF NOT EXISTS daily_sales_by_category (
report_date Date,
category String,
total_quantity UInt32,
total_sales Decimal(15,2),
avg_discount_percentage Decimal(5,2),
unique_customers UInt32
) ENGINE = SummingMergeTree()
ORDER BY (report_date, category)
"""
ch_hook.run(daily_agg_sql)
# Заполнение агрегированной таблицы
insert_agg_sql = f"""
INSERT INTO daily_sales_by_category
SELECT
transaction_date AS report_date,
category,
sum(quantity) AS total_quantity,
sum(total_amount) AS total_sales,
avg(discount / amount * 100) AS avg_discount_percentage,
uniqExact(user_id) AS unique_customers
FROM transactions
WHERE transaction_date = '{execution_date}'
GROUP BY transaction_date, category
"""
ch_hook.run(insert_agg_sql)
return f"Created DWH aggregations for {execution_date}"
def update_bi_dashboards(**kwargs):
"""Обновление BI дашбордов"""
# В реальном сценарии здесь может быть API-вызов к BI-системе
# Для примера используем bash-команду
return "BI dashboards updated successfully"
# Определение задач
extract_task = PythonOperator(
task_id='extract_from_postgres',
python_callable=extract_from_postgres,
provide_context=True,
dag=dag
)
transform_load_task = PythonOperator(
task_id='transform_load_to_clickhouse',
python_callable=transform_load_to_clickhouse,
provide_context=True,
dag=dag
)
create_aggregations_task = PythonOperator(
task_id='create_dwh_aggregations',
python_callable=create_dwh_aggregations,
provide_context=True,
dag=dag
)
update_bi_task = PythonOperator(
task_id='update_bi_dashboards',
python_callable=update_bi_dashboards,
provide_context=True,
dag=dag
)
# Определение зависимостей между задачами (последовательность выполнения)
extract_task >> transform_load_task >> create_aggregations_task >> update_bi_task
Лучшие практики для работы с Airflow в современных стеках данных
Организация кода
-
Модульная структура DAG:
- Отдельные файлы для общих функций и операторов
- Использование шаблонов для типовых задач
-
Обработка ошибок и восстановление:
- Настройка политик повторных попыток для каждого DAG
- Реализация идемпотентных операций
- Логирование на всех этапах
Оптимизация производительности
-
Для PostgreSQL:
- Использование bulk-операций вместо отдельных запросов
- Индексирование таблиц для оптимизации запросов
- Ограничение размеров выборок для обработки в памяти
-
Для ClickHouse:
- Использование правильной структуры таблиц (MergeTree семейство)
- Применение предварительной агрегации
- Использование distributed-таблиц для масштабирования
Мониторинг и отладка
-
Логирование задач:
def task_with_logging(**kwargs): import logging logger = logging.getLogger("airflow.task") logger.info("Starting task execution") # ... выполнение задачи ... logger.info("Task completed successfully")
-
Метрики выполнения:
- Интеграция с Prometheus/Grafana
- Отслеживание времени выполнения задач
Заключение
Apache Airflow представляет собой мощную и гибкую платформу для оркестрации процессов обработки данных. В сочетании с такими технологиями, как PostgreSQL, ClickHouse, современные хранилища данных и инструменты бизнес-аналитики, Airflow образует основу для построения надежных и масштабируемых пайплайнов данных.
Правильная интеграция этих компонентов позволяет организациям:
- Автоматизировать сложные процессы обработки данных
- Обеспечить надежность и отказоустойчивость ETL-пайплайнов
- Сократить время от получения сырых данных до аналитических выводов
- Масштабировать системы обработки данных вместе с ростом организации
При проектировании архитектуры данных с использованием Airflow и других рассмотренных технологий важно учитывать особенности каждого компонента и следовать отраслевым лучшим практикам, чтобы создать эффективную и устойчивую систему.