Представьте себе: вы инженер данных и в 2 часа ночи смотрите на экран, гадая, почему ваш пакетный процесс решил сделать незапланированный перерыв на кофе где-то между извлечением данных о клиентах и их загрузкой в ваше хранилище. Звучит знакомо? Добро пожаловать в вечную борьбу за управление рабочими процессами, где выбор правильного инструмента оркестрации может означать разницу между спокойными ночами и тесным общением с панелью мониторинга.

Сегодня мы погрузимся в давнее противостояние между двумя тяжеловесами на базе Python: Apache Airflow и Luigi. Представьте это как выбор в области инженерии данных между швейцарским армейским ножом и прецизионным скальпелем — оба справятся с задачей, но совершенно по-разному.

Повесть о двух архитектурах

Прежде чем засучить рукава и окунуться в код, давайте поймём, что движет этими инструментами. Это как познакомиться с партнёром по танцам перед выходом на танцпол — нужно понимать их движения.

Luigi: минималист, ориентированный на цель

Luigi работает по тому, что я называю философией «следу из хлебных крошек». Это целенаправленный подход, означающий, что каждая задача точно знает, что ей нужно произвести и от чего она зависит. Представьте Luigi как друга, у которого всегда есть чёткий план: «Мне нужно купить молоко в магазине, но сначала нужно проверить, есть ли у нас деньги, а перед этим найти кошелёк».

Архитектура Luigi строится вокруг трёх основных концепций:

  • Задачи, которые определяют, что нужно сделать.
  • Цели, которые представляют результат (ваш след из хлебных крошек).
  • Зависимости, которые создают порядок выполнения.

Airflow: дирижёр DAG

Airflow, напротив, мыслит в терминах направленных ациклических графов (DAG). Если Luigi — это след из хлебных крошек, то Airflow больше похож на дирижирование оркестром — он видит всю симфонию и координирует, когда должен играть каждый инструмент. Это ориентированный на рабочий процесс подход, а не на цель, сосредоточенный на взаимоотношениях и синхронизации между задачами.

Вот визуальное представление того, как различаются эти архитектуры:

graph TD subgraph "Архитектура Luigi" A1[Задача A] --> B1[Результат задачи A] B1 --> C1[Задача B требует результата задачи A] C1 --> D1[Результат задачи B] D1 --> E1[Задача C требует результата задачи B] end subgraph "Архитектура Airflow" A2[Задача A] --> B2[Задача B] A2 --> C2[Задача C] B2 --> D2[Задача D] C2 --> D2 end

Приступаем к работе: примеры кода

Нет ничего лучше, чем увидеть эти инструменты в действии. Давайте создадим простой конвейер данных, который извлекает пользовательские данные, обрабатывает их и загружает в базу данных. Представьте это как «Hello World» в области инженерии данных.

Реализация Luigi

Подход Luigi напоминает сборку из кубиков LEGO — каждый элемент имеет чёткую цель и предсказуемо соединяется с другими:

import luigi
import pandas as pd
from datetime import datetime
class ExtractUserData(luigi.Task):
    date = luigi.DateParameter(default=datetime.now().date())
    def output(self):
        return luigi.LocalTarget(f'data/raw_users_{self.date}.csv')
    def run(self):
        # Имитация извлечения данных
        users_data = {
            'user_id': [1, 2, 3, 4, 5],
            'name': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
            'signup_date': [self.date] * 5
        }
        df = pd.DataFrame(users_data)
        with self.output().open('w') as f:
            df.to_csv(f, index=False)
class ProcessUserData(luigi.Task):
    date = luigi.DateParameter(default=datetime.now().date())
    def requires(self):
        return ExtractUserData(self.date)
    def output(self):
        return luigi.LocalTarget(f'data/processed_users_{self.date}.csv')
    def run(self):
        with self.input().open('r') as f:
            df = pd.read_csv(f)
        # Добавляем некоторую магию обработки
        df['user_category'] = df['user_id'].apply(
            lambda x: 'premium' if x % 2 == 0 else 'standard'
        )
        df['days_since_signup'] = (
            datetime.now().date() - pd.to_datetime(df['signup_date']).dt.date
        ).dt.days
        with self.output().open('w') as f:
            df.to_csv(f, index=False)
class LoadUserData(luigi.Task):
    date = luigi.DateParameter(default=datetime.now().date())
    def requires(self):
        return ProcessUserData(self.date)
    def output(self):
        return luigi.LocalTarget(f'data/loaded_users_{self.date}.success')
    def run(self):
        with self.input().open('r') as f:
            df = pd.read_csv(f)
        # Имитация загрузки в базу данных
        print(f"Загружаем {len(df)} пользователей в базу данных...")
        # db.load_data(df)  # Ваша логика загрузки в базу данных здесь
        # Создаём маркер успешной загрузки
        with self.output().open('w') as f:
            f.write(f"Успешно загружено {len(df)} пользователей в {datetime.now()}")
# Запускаем конвейер
if __name__ == '__main__':
    luigi.run(['LoadUserData', '--local-scheduler'])

Реализация Airflow

Подход Airflow больше похож на написание рецепта — вы определяете все шаги заранее и позволяете планировщику решать, когда готовить каждый ингредиент:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
def extract_user_data(**context):
    """Извлекаем данные о пользователях и сохраняем в файл"""
    execution_date = context['ds']
    users_data = {
        'user_id': [1, 2, 3, 4, 5],
        'name': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
        'signup_date': [execution_date] * 5
    }
    df = pd.DataFrame(users_data)
    file_path = f'/tmp/raw_users_{execution_date}.csv'
    df.to_csv(file_path, index=False)
    return file_path
def process_user_data(**context):
    """Обрабатываем извлечённые данные о пользователях"""
    execution_date = context['ds']
    # Получаем входной файл из предыдущей задачи
    input_file = f'/tmp/raw_users_{execution_date}.csv'
    df = pd.read_csv(input_file)
    # Добавляем магию обработки
    df['user_category'] = df['user_id'].apply(
        lambda x: 'premium' if x % 2 == 0 else 'standard'
    )
    df['days_since_signup'] = (
        datetime.now().date() - pd.to_datetime(df['signup_date']).dt.date
    ).dt.days
    output_file = f'/tmp/processed_users_{execution_date}.csv'
    df.to_csv(output_file, index=False)
    return output_file
def load_user_data(**context):
    """Загружаем обработанные данные в базу данных"""
    execution_date = context['ds']
    input_file = f'/tmp/processed_users_{execution_date}.csv'
    df = pd.read_csv(input_file)
    # Имитация загрузки в базу данных
    print(f"Загружаем {len(df)} пользователей в базу данных...")
    # db.load_data(df)  # Ваша логика загрузки в базу данных здесь
    return f"Успешно загружено {len(df)} пользователей"
# Определяем DAG
default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2025, 9, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}
dag = DAG(
    'user_data_