Если вы когда-либо пытались организовать сложные data pipelines, вы, вероятно, слышали извечный вопрос: «Стоит ли выбрать Airflow или Luigi?» Это равносильно дебатам о кофе — оба варианта широко любимы, у обоих есть свои страстные сторонники, и оба точно справятся с задачей. Разница в том, что один — это элегантная эспрессо-машина, а другой — надёжная кофеварка.

В этом руководстве мы разберём Apache Airflow и Luigi не просто для того, чтобы сказать, какой из них «лучше» (спойлер: это зависит от ситуации), но и дадим вам практические знания, чтобы вы могли принять обоснованное решение для вашего конкретного случая использования. Мы погрузимся в реальные примеры кода, изучим их архитектуры, и я поделюсь некоторыми проверенными на практике советами, которые помогут вам избежать распространённых ошибок, подстерегающих большинство команд.

Основы понимания

Прежде чем начать сравнение, установим базовые понятия. И Apache Airflow, и Luigi — это системы управления рабочими процессами на основе Python с открытым исходным кодом, предназначенные для решения схожих задач: оркестрации сложных data pipelines, управления зависимостями и мониторинга выполнения. Представьте их как разные философии, решающие одну и ту же проблему.

Apache Airflow был создан Airbnb в 2014 году и с тех пор стал проектом верхнего уровня Apache. Это универсальный инструмент для оркестрации рабочих процессов, имеющий более 28 800 звёзд на GitHub и активное сообщество из более чем 2300 участников. Airflow представляет рабочие процессы в виде ориентированных ацикличных графов (DAG), что является элегантным способом сказать «визуальное представление задач, которые выполняются в одном направлении без циклов».

Luigi появился в Spotify в 2012 году и предлагает более минималистичный подход. Он фокусируется на определениях задач и их результатах, выстраивая конвейеры через зависимости задач. Если Airflow — это полнофункциональная платформа оркестрации, то Luigi — это элегантная, лёгкая альтернатива, которая говорит: «давайте упростим».

Архитектура: фундамент имеет значение

Архитектурные различия между этими инструментами — это то, где всё начинает существенно различаться.

Архитектура Airflow на основе DAG

Основная концепция Airflow вращается вокруг DAG. Каждый DAG — это набор задач с явно определёнными зависимостями между ними. Вот что делает это элегантным:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
    'owner': 'data-team',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2025, 1, 1),
}
dag = DAG(
    'etl_pipeline_example',
    default_args=default_args,
    description='A simple ETL pipeline',
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    catchup=False,
)
def extract_data(**context):
    print("Extracting data from source...")
    return {'records': 1000}
def transform_data(**context):
    ti = context['task_instance']
    records = ti.xcom_pull(task_ids='extract')
    print(f"Transforming {records['records']} records...")
    return {'processed': records['records']}
def load_data(**context):
    ti = context['task_instance']
    processed = ti.xcom_pull(task_ids='transform')
    print(f"Loading {processed['processed']} records to warehouse...")
extract_task = PythonOperator(
    task_id='extract',
    python_callable=extract_data,
    dag=dag,
)
transform_task = PythonOperator(
    task_id='transform',
    python_callable=transform_data,
    dag=dag,
)
load_task = PythonOperator(
    task_id='load',
    python_callable=load_data,
    dag=dag,
)
extract_task >> transform_task >> load_task

Этот код создаёт линейный ETL-конвейер с чёткими зависимостями. Оператор >> определяет направление потока — извлечение должно завершиться до преобразования, а преобразование должно завершиться до загрузки.

Архитектура Luigi на основе задач

Luigi использует другой подход. Он определяет рабочие процессы через классы задач, где зависимости выражаются через переопределение методов:

import luigi
import json
class ExtractDataTask(luigi.Task):
    date = luigi.DateParameter(default=luigi.date.today())
    def output(self):
        return luigi.LocalTarget(f'data/raw_{self.date}.json')
    def run(self):
        print("Extracting data from source...")
        data = {'records': 1000, 'timestamp': str(self.date)}
        with self.output().open('w') as f:
            json.dump(data, f)
class TransformDataTask(luigi.Task):
    date = luigi.DateParameter(default=luigi.date.today())
    def requires(self):
        return ExtractDataTask(date=self.date)
    def output(self):
        return luigi.LocalTarget(f'data/transformed_{self.date}.json')
    def run(self):
        print("Transforming data...")
        with self.input().open('r') as f:
            data = json.load(f)
        processed = {
            'processed': data['records'] * 2,
            'timestamp': data['timestamp']
        }
        with self.output().open('w') as f:
            json.dump(processed, f)
class LoadDataTask(luigi.Task):
    date = luigi.DateParameter(default=luigi.date.today())
    def requires(self):
        return TransformDataTask(date=self.date)
    def output(self):
        return luigi.LocalTarget(f'data/loaded_{self.date}.txt')
    def run(self):
        print("Loading data to warehouse...")
        with self.input().open('r') as f:
            data = json.load(f)
        with self.output().open('w') as f:
            f.write(f"Loaded {data['processed']} records successfully")
if __name__ == '__main__':
    luigi.build([LoadDataTask()], local_scheduler=True)

Обратите внимание на философскую разницу: Luigi подчёркивает результаты и их зависимости. Каждая задача явно объявляет, что она производит (output()) и что ей нужно (requires()). Luigi проверяет, существуют ли результаты, прежде чем повторно выполнять задачи — это может быть невероятно полезно для идемпотентных рабочих процессов.

Расписание: сердце оркестрации

Именно здесь становится особенно заметна разница между двумя инструментами.

Возможности планирования Airflow надёжны и ориентированы на предприятия. Его встроенный планировщик может:

  • Выполнять рабочие процессы через заданные интервалы с использованием выражений cron или пользовательских интервалов.
  • Запускать несколько DAG одновременно.
  • Поддерживать динамическое создание DAG.
  • Запускать рабочие процессы на основе внешних событий.
  • Выполнять задачи параллельно с продвинутым управлением зависимостями.

Luigi, напротив, не имеет встроенного планирования. Рабочие процессы должны запускаться вручную или через внешние планировщики, такие как cron. Это не обязательно слабость для определённых случаев использования, но требует дополнительной настройки инфраструктуры для производственных развёртываний.

# Airflow: Простое планирование с выражениями cron
dag = DAG(
    'my_pipeline',
    schedule_interval='0 2 * * 1-5',  # По будням в 2 часа ночи
    start_date=datetime(2025, 1, 1),
)
# Luigi: Требует внешнего планирования (пример cron)
# 0 2 * * 1-5 /usr/bin/python -m luigi --module my_tasks LoadDataTask

Пользовательский интерфейс и мониторинг

Веб-интерфейс Airflow обширен. Вы получаете визуальное представление своего DAG, можете отслеживать выполнение задач в режиме реального времени, запускать выполнения вручную, повторять неудачные задачи и просматривать журналы прямо из браузера. Это многофункциональная панель, которая упрощает устранение неполадок.

Интерфейс Luigi, скажем так, минималистичен. Есть базовый веб-интерфейс, но ему не хватает глубины и интерактивности предложения Airflow. Если вам важна визуальная обратная связь и быстрые выводы о состоянии конвейера, Airflow здесь побеждает.

Масштабируемость: когда всё становится серьёзным

Вот критический вопрос: что происходит, когда ваши конвейеры вырастают с 10 задач до 10 000?

Масштабирование Airflow: Airflow создан для масштабирования с нуля. Он использует исполнителей (например, Celery или Kubernetes) для распределения выполнения задач по нескольким машинам. Вы можете запускать сотни DAG с тысячами задач одновременно. Архитектура естественным образом обрабатывает распределённую обработку.

Масштабирование Luigi: Luigi выполняет