Пособие по Apache Airflow: Полное руководство по автоматизации
Что такое Apache Airflow?
Apache Airflow — это платформа с открытым исходным кодом для программирования, планирования и мониторинга рабочих процессов, созданная Airbnb в 2014 году и переданная в Apache Foundation.
Философия и особенности
-
Код как конфигурация:
- Workflow определяются в Python-коде (DAGs)
- Версионный контроль через Git
- Тестирование и отладка как обычного кода
-
Масштабируемая архитектура:
- Модульная архитектура: Scheduler, Worker, Web Server
- Поддержка распределенного выполнения через Celery, Kubernetes
- Горизонтальное масштабирование
-
Экосистема для данных:
- 80+ операторов для популярных сервисов
- Интеграция с Hadoop, Spark, Snowflake, BigQuery
- Поддержка контейнеризации (Docker, Kubernetes)
Ключевые концепции
1. DAG (Directed Acyclic Graph):
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
with DAG(
dag_id="my_dag",
start_date=datetime(2024, 1, 1),
schedule_interval="@daily",
) as dag:
def extract():
return "Data extracted"
extract_task = PythonOperator(
task_id="extract",
python_callable=extract
)
2. Операторы (Operators):
- Action: Выполняют действие (PythonOperator, BashOperator)
- Transfer: Перемещают данные (S3ToRedshiftOperator)
- Sensor: Ожидают условия (FileSensor, HttpSensor)
3. Контекст выполнения:
- XCom: Обмен данными между задачами
- Variables: Глобальные переменные
- Connections: Учетные данные для внешних систем
Архитектура компонентов
1. Scheduler:
- Планирует выполнение задач
- Определяет зависимости
- Отправляет задачи в очередь
2. Executor:
- LocalExecutor: Для разработки
- CeleryExecutor: Распределенное выполнение
- KubernetesExecutor: Нативный запуск в K8s
3. Web Server:
- UI для мониторинга и управления
- Просмотр логов
- Ручной запуск задач
Практические примеры
Пример 1: ETL пайплайн
with DAG("etl_pipeline", schedule_interval="@daily") as dag:
extract = PythonOperator(task_id="extract", ...)
transform = PythonOperator(task_id="transform", ...)
load = PythonOperator(task_id="load", ...)
extract >> transform >> load
Пример 2: Пайплайн машинного обучения
with DAG("ml_pipeline", schedule_interval="@weekly") as dag:
get_data = PythonOperator(task_id="get_data")
clean_data = PythonOperator(task_id="clean_data")
train_model = PythonOperator(task_id="train_model")
evaluate = PythonOperator(task_id="evaluate")
deploy = PythonOperator(task_id="deploy")
get_data >> clean_data >> train_model >> evaluate >> deploy
Установка и настройка
Вариант 1: Docker Compose (рекомендуется)
# docker-compose.yml
version: '3'
services:
postgres:
image: postgres:13
redis:
image: redis:latest
webserver:
image: apache/airflow:2.5.0
scheduler:
image: apache/airflow:2.5.0
worker:
image: apache/airflow:2.5.0
Вариант 2: Установка из PyPI
pip install apache-airflow airflow db init airflow users create --username admin --password admin airflow webserver --port 8080 airflow scheduler
Продвинутые возможности
1. Динамические DAGs:
def create_dag(dag_id, schedule, default_args):
with DAG(dag_id, schedule_interval=schedule, default_args=default_args) as dag:
# Динамическое создание задач
pass
return dag
# Создание нескольких DAGs
for i in range(5):
dag_id = f'dynamic_dag_{i}'
globals()[dag_id] = create_dag(dag_id, '@daily', default_args)
2. Custom Operators:
from airflow.models.baseoperator import BaseOperator
class MyCustomOperator(BaseOperator):
def __init__(self, my_param, **kwargs):
super().__init__(**kwargs)
self.my_param = my_param
def execute(self, context):
# Логика оператора
return f"Hello {self.my_param}"
3. Поддержка Kubernetes:
- Запуск каждой задачи в отдельном поде
- Динамическое выделение ресурсов
- Использование custom Docker-образов
Мониторинг и алертинг
1. Логирование:
- Централизованные логи в Cloudwatch, ELK
- Кастомные обработчики логов
- Уровни логирования (DEBUG, INFO, WARNING, ERROR)
2. Метрики:
- Экспорт метрик в Prometheus
- Мониторинг через Grafana
- Кастомные метрики
3. Алерты:
- Уведомления в Slack, Email, PagerDuty
- На основе статуса задач
- Кастомные условия
Best Practices
1. Проектирование DAGs:
- Одна бизнес-логика = один DAG
- Идемпотентность задач
- Избегайте длинных цепочек (>20 задач)
2. Производительность:
- Используйте подDAGs для организации
- Оптимизируйте расписание
- Настройка пулов параллелизма
3. Безопасность:
- Храните секреты в Vault, AWS Secrets Manager
- RBAC для доступа к UI
- Изоляция окружений
Ограничения и решения
1. Нет built-in dependency management:
- Решение: Используйте Docker или Kubernetes
- Решение: Виртуальные окружения
2. Сложность мониторинга:
- Решение: Airflow + Prometheus + Grafana
- Решение: Datadog integration
3. Версионирование данных:
- Решение: DBT для трансформаций
- Решение: Great Expectations для валидации
Когда НЕ использовать Airflow?
- Простые cron-задачи (используйте систему планирования ОС)
- Real-time обработка данных (используйте Spark Streaming, Flink)
- No-code требования (используйте n8n, Zapier)
Альтернативы
- Prefect: Modern alternative with Python-native API
- Dagster: Data-aware orchestrator
- Luigi: От Spotify, проще но менее функциональный
Полезные ресурсы
- Официальная документация: airflow.apache.org
- Примеры DAGs: github.com/apache/airflow/tree/main/airflow/example_dags
- Best Practices: airflow.apache.org/docs/apache-airflow/stable/best-practices.html
- Сообщество: astronomer.io/community