Apache Airflow для MLOps и ETL - Описание, Преимущества и Примеры

Отличный фреймворк для ETS/MLOPS на Python

Содержимое страницы

Apache Airflow — это открытая платформа, предназначенная для программного создания, планирования и мониторинга рабочих процессов — полностью на языке Python, предлагающая гибкую и мощную альтернативу традиционным, ручным или основанным на интерфейсе инструментам для работы с рабочими процессами.

Apache Airflow был разработан в Airbnb в 2014 году для управления все более сложными рабочими процессами и стал проектом верхнего уровня Apache Software Foundation в 2019 году.

Airflow наиболее полезен, когда у вас есть сложные рабочие процессы с множеством задач, которые необходимо выполнять в определенном порядке, с зависимостями и обработкой ошибок. Он особенно полезен для организаций, выполняющих множество задач с данными, требующих оркестрации, мониторинга и механизмов повторного запуска, а не простых скриптов или cron-задач.

Цепочка киберсобытий

Ключевые особенности Apache Airflow и типичные сценарии использования

  • Определение рабочих процессов на Python: Рабочие процессы определяются как код на Python, что позволяет динамически генерировать конвейеры с использованием стандартных программных конструкций, таких как циклы и условная логика.
  • Направленные ациклические графы (DAGs): Airflow использует DAGs для представления рабочих процессов, где каждый узел — это задача, а ребра определяют зависимости. Эта структура обеспечивает выполнение задач в указанном порядке без циклов.
  • Удобный интерфейс: Airflow предоставляет современный веб-интерфейс для мониторинга, планирования и управления рабочими процессами, предлагая видимость статуса задач и журналов.
  • Обширные интеграции: Он включает множество встроенных операторов и хуков для подключения к облачным сервисам (AWS, GCP, Azure), базам данных и другим инструментам, делая его высоко масштабируемым.
  • Масштабируемость и гибкость: Airflow может оркестрировать рабочие процессы в большом масштабе, поддерживая как локальные, так и облачные развертывания, и подходит для широкого спектра сценариев использования, от ETL до машинного обучения.

Типичные сценарии использования

  • Оркестрация ETL (Extract, Transform, Load) конвейеров
  • Планирование и мониторинг рабочих процессов с данными
  • Автоматизация обучения и развертывания моделей машинного обучения
  • Управление инфраструктурными задачами

Управляемые сервисы Airflow

Несколько облачных провайдеров предлагают управляемые сервисы Airflow, снижая операционную нагрузку на настройку и обслуживание:

  • Amazon Managed Workflows for Apache Airflow (MWAA): Полностью управляемый сервис, который обрабатывает масштабируемость, безопасность и доступность, позволяя пользователям сосредоточиться на разработке рабочих процессов.
  • Google Cloud Composer: Управляемый Airflow на Google Cloud Platform.
  • Microsoft Fabric Data Factory: Предлагает задачи Apache Airflow в качестве управляемого решения для оркестрации в экосистеме Azure.

Установка и начало работы

Airflow можно установить через менеджер пакетов Python (pip install apache-airflow) или управляемые инструменты, такие как Astro CLI, которые упрощают настройку и управление проектами. После установки пользователи определяют DAGs в скриптах Python и управляют ими через интерфейс Airflow.

Итог

Особенность Описание
Определение рабочих процессов Код на Python, на основе DAGs
Интерфейс Веб-интерфейс, современный, удобный
Интеграции Облачные сервисы (AWS, GCP, Azure), базы данных, пользовательские плагины
Управляемые сервисы AWS MWAA, Google Cloud Composer, Microsoft Fabric
Сценарии использования ETL, конвейеры данных, рабочие процессы ML, инфраструктурные задачи

Airflow широко используется в сообществе инженерии данных благодаря своей гибкости, масштабируемости и мощному экосистеме, делая его ведущим выбором для оркестрации рабочих процессов в производственных средах.

Ключевые способы, которыми Airflow упрощает автоматизацию рабочих процессов

  • Рабочие процессы как код (Python) Рабочие процессы Airflow определяются как скрипты Python, использующие направленные ациклические графы (DAGs) для представления задач и их зависимостей. Этот подход «рабочие процессы как код» позволяет динамически генерировать, параметризовать и легко управлять версиями конвейеров, делая их высоко поддерживаемыми и адаптируемыми к изменяющимся требованиям.

  • Динамические и расширяемые конвейеры Используя полный потенциал Python, пользователи могут включать циклы, условные операторы и пользовательскую логику непосредственно в определения рабочих процессов. Это позволяет динамически создавать задачи и продвинутую параметризацию, что было бы сложно или невозможно в статических файлах конфигурации или инструментах на основе графического интерфейса.

  • Управление задачами на Python TaskFlow API Airflow (введен в Airflow 2.0) делает определение задач еще более «питоновским». Разработчики пишут обычные функции Python, декорируют их, и Airflow автоматически обрабатывает создание задач, подключение зависимостей и передачу данных между задачами, что приводит к более чистому и поддерживаемому коду.

  • Пользовательские операторы и интеграции Пользователи могут создавать пользовательские операторы, сенсоры и хуки на Python для взаимодействия практически с любой внешней системой, API или базой данных. Эта расширяемость обеспечивает беспрепятственную интеграцию с более широкой экосистемой Python и внешними сервисами.

  • Естественная интеграция с экосистемой Python Поскольку рабочие процессы написаны на Python, пользователи могут использовать огромное количество библиотек Python (таких как Pandas, NumPy или фреймворки машинного обучения) в своих задачах, еще больше повышая возможности автоматизации.

  • Читаемость и поддерживаемость Читаемость и популярность Python делают Airflow доступным для широкого круга пользователей, от инженеров данных до аналитиков. Подход на основе кода также поддерживает стандартные практики разработки программного обеспечения, такие как проверка кода и управление версиями.

Некоторые преимущества Apache Airflow

Особенность Преимущество для автоматизации рабочих процессов
DAGs на Python Динамичные, гибкие и поддерживаемые рабочие процессы
TaskFlow API Чище и более «питоновские» определения рабочих процессов
Пользовательские операторы/сенсоры Интеграция с любой системой или API
Естественная интеграция с Python Использование любой библиотеки или инструмента Python
Удобный интерфейс Мониторинг и управление рабочими процессами визуально

Глубокая интеграция Airflow с Python не только упрощает автоматизацию рабочих процессов, но и позволяет командам эффективно создавать надежные, масштабируемые и высоконастраиваемые конвейеры данных.

Пример: Простой ETL рабочий процесс на Python

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract_data():
    print("Извлечение данных")

def transform_data():
    print("Трансформация данных")

def load_data():
    print("Загрузка данных")

default_args = {'start_date': datetime(2024, 12, 1), 'retries': 1}

dag = DAG('etl_pipeline', default_args=default_args, schedule_interval='@daily')

extract_task = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load_data, dag=dag)

extract_task >> transform_task >> load_task

Этот пример показывает, как каждый этап конвейера ETL определяется как функция Python и оркестрируется Airflow с использованием Python операторов и DAGs: extract_task >> transform_task >> load_task

Еще один пример DAG в Apache Airflow

Вот базовый пример кодирования DAG (Directed Acyclic Graph) в Apache Airflow с использованием Python. Этот пример демонстрирует, как определить рабочий процесс с двумя простыми задачами с использованием BashOperator, который выполняет команды bash:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='my_first_dag',
    default_args=default_args,
    description='Мой первый DAG в Airflow!',
    schedule_interval='@daily',
    start_date=datetime(2023, 1, 1),
    catchup=False,
) as dag:

    # Задача 1: Печать текущей даты
    print_date = BashOperator(
        task_id='print_date',
        bash_command='date'
    )

    # Задача 2: Приветствие
    say_hello = BashOperator(
        task_id='say_hello',
        bash_command='echo "Привет, Airflow!"'
    )

    # Определение зависимостей задач
    print_date >> say_hello

Как это работает:

  • DAG определяется с использованием менеджера контекста (with DAG(...) as dag:).
  • Создаются две задачи: print_date и say_hello.
  • Зависимость print_date >> say_hello обеспечивает выполнение say_hello только после завершения print_date.
  • Сохраните этот код как my_first_dag.py в каталоге dags/ вашего Airflow, и Airflow автоматически обнаружит и запланирует его.

Вы можете расширять этот шаблон, добавляя задачи PythonOperator, ветвление или более сложную логику по мере роста ваших рабочих процессов.

Ключевые параметры при создании объекта DAG в Python

При определении DAG в Apache Airflow необходимо установить несколько ключевых параметров, чтобы обеспечить правильное планирование, идентификацию и поведение вашего рабочего процесса.

Основные параметры:

  • dag_id Уникальный идентификатор для вашего DAG. Каждый DAG в вашей среде Airflow должен иметь уникальный dag_id.

  • start_date Дата и время, когда DAG становится доступным для выполнения. Это определяет, когда начинается планирование для DAG.

  • schedule (или schedule_interval) Определяет, как часто должен запускаться DAG (например, "@daily", "0 12 * * *", или None для ручных запусков).

  • catchup Булево значение, определяющее, должен ли Airflow выполнять пропущенные запуски между start_date и текущей датой при первом включении DAG. По умолчанию False.

Другие распространенные параметры:

  • default_args Словарь аргументов по умолчанию, применяемых ко всем задачам в DAG (например, owner, email, retries, retry_delay). Это необязательно, но рекомендуется для DRY (Don’t Repeat Yourself) кода.

  • params Словарь для параметров конфигурации времени выполнения, позволяющий передавать значения задачам во время выполнения и делающий ваши DAG более гибкими.

Пример:

from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    'owner': 'jdoe',
    'email': ['jdoe@example.com'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='example_dag',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    default_args=default_args,
    params={'example_param': 'value'}
) as dag:
    # Определите задачи здесь
    pass

Таблица параметров DAG

Параметр Описание Обязательно
dag_id Уникальное имя для DAG Да
start_date Когда DAG становится доступным для запуска Да
schedule Частота/время запусков DAG Да
catchup Нужно ли выполнять пропущенные запуски Нет
default_args Аргументы по умолчанию для всех задач в DAG Нет
params Параметры времени выполнения для динамической конфигурации Нет

Установка этих параметров гарантирует, что ваш DAG имеет уникальное имя, правильно запланирован и ведет себя так, как задумано в Airflow.

Включение пользовательских параметров времени выполнения с помощью аргумента params в Airflow

Аргумент params в Apache Airflow позволяет встраивать пользовательские параметры времени выполнения в ваши DAG и задачи, обеспечивая динамическую и гибкую конфигурацию рабочих процессов. Params позволяют предоставлять конфигурацию времени выполнения задачам. Вы можете настроить параметры Params по умолчанию в коде DAG и передавать дополнительные Params или переопределять значения Param при запуске DAG.

Этот подход делает ваши рабочие процессы Airflow более динамичными и настраиваемыми, поддерживая широкий спектр сценариев автоматизации.

Как установить пользовательские параметры времени выполнения

  • Определите params в DAG: При создании объекта DAG включите аргумент params в виде словаря. Каждая пара ключ-значение представляет имя параметра и его значение по умолчанию.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def print_param(**kwargs):
    # Доступ к параметру через kwargs['params']
    my_param = kwargs['params']['my_param']
    print(f"Мой пользовательский параметр: {my_param}")

with DAG(
    dag_id='custom_params_example',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    params={'my_param': 'default_value'}  # Пользовательский параметр времени выполнения
) as dag:

    task = PythonOperator(
        task_id='print_param_task',
        python_callable=print_param,
        provide_context=True
    )
  • Переопределение во время выполнения: При ручном запуске DAG (через интерфейс Airflow, CLI или API) вы можете передавать или переопределять значения params для этого конкретного запуска.

Доступ к параметрам в задачах

  • В Python-функции вашей задачи получите доступ к params с помощью kwargs['params'].
  • Для шаблонных полей (например, в BashOperator) используйте {{ params.my_param }} в строке шаблона.

Полезные ссылки