← Назад к курсу

Пособие по Apache Airflow: Полное руководство по автоматизации

Что такое Apache Airflow?

Apache Airflow — это платформа с открытым исходным кодом для программирования, планирования и мониторинга рабочих процессов, созданная Airbnb в 2014 году и переданная в Apache Foundation.

Философия и особенности

  1. Код как конфигурация:

    • Workflow определяются в Python-коде (DAGs)
    • Версионный контроль через Git
    • Тестирование и отладка как обычного кода
  2. Масштабируемая архитектура:

    • Модульная архитектура: Scheduler, Worker, Web Server
    • Поддержка распределенного выполнения через Celery, Kubernetes
    • Горизонтальное масштабирование
  3. Экосистема для данных:

    • 80+ операторов для популярных сервисов
    • Интеграция с Hadoop, Spark, Snowflake, BigQuery
    • Поддержка контейнеризации (Docker, Kubernetes)

Ключевые концепции

1. DAG (Directed Acyclic Graph):

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

with DAG(
    dag_id="my_dag",
    start_date=datetime(2024, 1, 1),
    schedule_interval="@daily",
) as dag:
    
    def extract():
        return "Data extracted"
    
    extract_task = PythonOperator(
        task_id="extract",
        python_callable=extract
    )

2. Операторы (Operators):

  • Action: Выполняют действие (PythonOperator, BashOperator)
  • Transfer: Перемещают данные (S3ToRedshiftOperator)
  • Sensor: Ожидают условия (FileSensor, HttpSensor)

3. Контекст выполнения:

  • XCom: Обмен данными между задачами
  • Variables: Глобальные переменные
  • Connections: Учетные данные для внешних систем

Архитектура компонентов

1. Scheduler:

  • Планирует выполнение задач
  • Определяет зависимости
  • Отправляет задачи в очередь

2. Executor:

  • LocalExecutor: Для разработки
  • CeleryExecutor: Распределенное выполнение
  • KubernetesExecutor: Нативный запуск в K8s

3. Web Server:

  • UI для мониторинга и управления
  • Просмотр логов
  • Ручной запуск задач

Практические примеры

Пример 1: ETL пайплайн

with DAG("etl_pipeline", schedule_interval="@daily") as dag:
    
    extract = PythonOperator(task_id="extract", ...)
    transform = PythonOperator(task_id="transform", ...)
    load = PythonOperator(task_id="load", ...)
    
    extract >> transform >> load

Пример 2: Пайплайн машинного обучения

with DAG("ml_pipeline", schedule_interval="@weekly") as dag:
    
    get_data = PythonOperator(task_id="get_data")
    clean_data = PythonOperator(task_id="clean_data")
    train_model = PythonOperator(task_id="train_model")
    evaluate = PythonOperator(task_id="evaluate")
    deploy = PythonOperator(task_id="deploy")
    
    get_data >> clean_data >> train_model >> evaluate >> deploy

Установка и настройка

Вариант 1: Docker Compose (рекомендуется)

# docker-compose.yml
version: '3'
services:
  postgres:
    image: postgres:13
  redis:
    image: redis:latest
  webserver:
    image: apache/airflow:2.5.0
  scheduler:
    image: apache/airflow:2.5.0
  worker:
    image: apache/airflow:2.5.0

Вариант 2: Установка из PyPI

pip install apache-airflow
airflow db init
airflow users create --username admin --password admin
airflow webserver --port 8080
airflow scheduler

Продвинутые возможности

1. Динамические DAGs:

def create_dag(dag_id, schedule, default_args):
    with DAG(dag_id, schedule_interval=schedule, default_args=default_args) as dag:
        # Динамическое создание задач
        pass
    return dag

# Создание нескольких DAGs
for i in range(5):
    dag_id = f'dynamic_dag_{i}'
    globals()[dag_id] = create_dag(dag_id, '@daily', default_args)

2. Custom Operators:

from airflow.models.baseoperator import BaseOperator

class MyCustomOperator(BaseOperator):
    def __init__(self, my_param, **kwargs):
        super().__init__(**kwargs)
        self.my_param = my_param
    
    def execute(self, context):
        # Логика оператора
        return f"Hello {self.my_param}"

3. Поддержка Kubernetes:

  • Запуск каждой задачи в отдельном поде
  • Динамическое выделение ресурсов
  • Использование custom Docker-образов

Мониторинг и алертинг

1. Логирование:

  • Централизованные логи в Cloudwatch, ELK
  • Кастомные обработчики логов
  • Уровни логирования (DEBUG, INFO, WARNING, ERROR)

2. Метрики:

  • Экспорт метрик в Prometheus
  • Мониторинг через Grafana
  • Кастомные метрики

3. Алерты:

  • Уведомления в Slack, Email, PagerDuty
  • На основе статуса задач
  • Кастомные условия

Best Practices

1. Проектирование DAGs:

  • Одна бизнес-логика = один DAG
  • Идемпотентность задач
  • Избегайте длинных цепочек (>20 задач)

2. Производительность:

  • Используйте подDAGs для организации
  • Оптимизируйте расписание
  • Настройка пулов параллелизма

3. Безопасность:

  • Храните секреты в Vault, AWS Secrets Manager
  • RBAC для доступа к UI
  • Изоляция окружений

Ограничения и решения

1. Нет built-in dependency management:

  • Решение: Используйте Docker или Kubernetes
  • Решение: Виртуальные окружения

2. Сложность мониторинга:

  • Решение: Airflow + Prometheus + Grafana
  • Решение: Datadog integration

3. Версионирование данных:

  • Решение: DBT для трансформаций
  • Решение: Great Expectations для валидации

Когда НЕ использовать Airflow?

  • Простые cron-задачи (используйте систему планирования ОС)
  • Real-time обработка данных (используйте Spark Streaming, Flink)
  • No-code требования (используйте n8n, Zapier)

Альтернативы

  • Prefect: Modern alternative with Python-native API
  • Dagster: Data-aware orchestrator
  • Luigi: От Spotify, проще но менее функциональный

Полезные ресурсы

  • Официальная документация: airflow.apache.org
  • Примеры DAGs: github.com/apache/airflow/tree/main/airflow/example_dags
  • Best Practices: airflow.apache.org/docs/apache-airflow/stable/best-practices.html
  • Сообщество: astronomer.io/community