Представьте себе: вы инженер данных и в 2 часа ночи смотрите на экран, гадая, почему ваш пакетный процесс решил сделать незапланированный перерыв на кофе где-то между извлечением данных о клиентах и их загрузкой в ваше хранилище. Звучит знакомо? Добро пожаловать в вечную борьбу за управление рабочими процессами, где выбор правильного инструмента оркестрации может означать разницу между спокойными ночами и тесным общением с панелью мониторинга.
Сегодня мы погрузимся в давнее противостояние между двумя тяжеловесами на базе Python: Apache Airflow и Luigi. Представьте это как выбор в области инженерии данных между швейцарским армейским ножом и прецизионным скальпелем — оба справятся с задачей, но совершенно по-разному.
Повесть о двух архитектурах
Прежде чем засучить рукава и окунуться в код, давайте поймём, что движет этими инструментами. Это как познакомиться с партнёром по танцам перед выходом на танцпол — нужно понимать их движения.
Luigi: минималист, ориентированный на цель
Luigi работает по тому, что я называю философией «следу из хлебных крошек». Это целенаправленный подход, означающий, что каждая задача точно знает, что ей нужно произвести и от чего она зависит. Представьте Luigi как друга, у которого всегда есть чёткий план: «Мне нужно купить молоко в магазине, но сначала нужно проверить, есть ли у нас деньги, а перед этим найти кошелёк».
Архитектура Luigi строится вокруг трёх основных концепций:
- Задачи, которые определяют, что нужно сделать.
- Цели, которые представляют результат (ваш след из хлебных крошек).
- Зависимости, которые создают порядок выполнения.
Airflow: дирижёр DAG
Airflow, напротив, мыслит в терминах направленных ациклических графов (DAG). Если Luigi — это след из хлебных крошек, то Airflow больше похож на дирижирование оркестром — он видит всю симфонию и координирует, когда должен играть каждый инструмент. Это ориентированный на рабочий процесс подход, а не на цель, сосредоточенный на взаимоотношениях и синхронизации между задачами.
Вот визуальное представление того, как различаются эти архитектуры:
Приступаем к работе: примеры кода
Нет ничего лучше, чем увидеть эти инструменты в действии. Давайте создадим простой конвейер данных, который извлекает пользовательские данные, обрабатывает их и загружает в базу данных. Представьте это как «Hello World» в области инженерии данных.
Реализация Luigi
Подход Luigi напоминает сборку из кубиков LEGO — каждый элемент имеет чёткую цель и предсказуемо соединяется с другими:
import luigi
import pandas as pd
from datetime import datetime
class ExtractUserData(luigi.Task):
date = luigi.DateParameter(default=datetime.now().date())
def output(self):
return luigi.LocalTarget(f'data/raw_users_{self.date}.csv')
def run(self):
# Имитация извлечения данных
users_data = {
'user_id': [1, 2, 3, 4, 5],
'name': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
'signup_date': [self.date] * 5
}
df = pd.DataFrame(users_data)
with self.output().open('w') as f:
df.to_csv(f, index=False)
class ProcessUserData(luigi.Task):
date = luigi.DateParameter(default=datetime.now().date())
def requires(self):
return ExtractUserData(self.date)
def output(self):
return luigi.LocalTarget(f'data/processed_users_{self.date}.csv')
def run(self):
with self.input().open('r') as f:
df = pd.read_csv(f)
# Добавляем некоторую магию обработки
df['user_category'] = df['user_id'].apply(
lambda x: 'premium' if x % 2 == 0 else 'standard'
)
df['days_since_signup'] = (
datetime.now().date() - pd.to_datetime(df['signup_date']).dt.date
).dt.days
with self.output().open('w') as f:
df.to_csv(f, index=False)
class LoadUserData(luigi.Task):
date = luigi.DateParameter(default=datetime.now().date())
def requires(self):
return ProcessUserData(self.date)
def output(self):
return luigi.LocalTarget(f'data/loaded_users_{self.date}.success')
def run(self):
with self.input().open('r') as f:
df = pd.read_csv(f)
# Имитация загрузки в базу данных
print(f"Загружаем {len(df)} пользователей в базу данных...")
# db.load_data(df) # Ваша логика загрузки в базу данных здесь
# Создаём маркер успешной загрузки
with self.output().open('w') as f:
f.write(f"Успешно загружено {len(df)} пользователей в {datetime.now()}")
# Запускаем конвейер
if __name__ == '__main__':
luigi.run(['LoadUserData', '--local-scheduler'])
Реализация Airflow
Подход Airflow больше похож на написание рецепта — вы определяете все шаги заранее и позволяете планировщику решать, когда готовить каждый ингредиент:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
def extract_user_data(**context):
"""Извлекаем данные о пользователях и сохраняем в файл"""
execution_date = context['ds']
users_data = {
'user_id': [1, 2, 3, 4, 5],
'name': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
'signup_date': [execution_date] * 5
}
df = pd.DataFrame(users_data)
file_path = f'/tmp/raw_users_{execution_date}.csv'
df.to_csv(file_path, index=False)
return file_path
def process_user_data(**context):
"""Обрабатываем извлечённые данные о пользователях"""
execution_date = context['ds']
# Получаем входной файл из предыдущей задачи
input_file = f'/tmp/raw_users_{execution_date}.csv'
df = pd.read_csv(input_file)
# Добавляем магию обработки
df['user_category'] = df['user_id'].apply(
lambda x: 'premium' if x % 2 == 0 else 'standard'
)
df['days_since_signup'] = (
datetime.now().date() - pd.to_datetime(df['signup_date']).dt.date
).dt.days
output_file = f'/tmp/processed_users_{execution_date}.csv'
df.to_csv(output_file, index=False)
return output_file
def load_user_data(**context):
"""Загружаем обработанные данные в базу данных"""
execution_date = context['ds']
input_file = f'/tmp/processed_users_{execution_date}.csv'
df = pd.read_csv(input_file)
# Имитация загрузки в базу данных
print(f"Загружаем {len(df)} пользователей в базу данных...")
# db.load_data(df) # Ваша логика загрузки в базу данных здесь
return f"Успешно загружено {len(df)} пользователей"
# Определяем DAG
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2025, 9, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'user_data_