Apache Airflow w MLOPS i ETL — opis, korzyści i przykłady

Dobry framework do ETS/MLOPS z użyciem Pythona

Page content

Apache Airflow to otwarty platforma programowy, zaprojektowana do programistycznej tworzenia, harmonogramowania i monitorowania przepływów pracy – całkowicie w kodzie Pythona, oferując elastyczne i potężne alternatywy dla tradycyjnych, ręcznych lub opartych na interfejsie graficznym narzędzi do zarządzania przepływami pracy.

Apache Airflow został pierwotnie opracowany w Airbnb w 2014 roku w celu zarządzania coraz bardziej złożonymi przepływami pracy i został w 2019 roku projektem poziomu województwa Apache Software Foundation.

Airflow jest najbardziej korzystny, gdy masz złożone przepływy pracy z wieloma zadaniami, które muszą być wykonywane w określonej kolejności, z zależnościami i mechanizmami obsługi błędów. Jest szczególnie przydatny dla organizacji prowadzących wiele zadań związanych z danymi, które wymagają orchestracji, monitorowania i mechanizmów ponownego uruchamiania, zamiast prostych skryptów lub zadań cron.

Ciąg wydarzeń cybernetycznych

Kluczowe funkcje i typowe przypadki użycia Apache Airflow

  • Definicja przepływu pracy oparta na Pythonie: Przepływy pracy są definiowane jako kod Pythona, umożliwiając dynamiczne generowanie potoków przy użyciu standardowych konstrukcji programistycznych, takich jak pętle i logika warunkowa.
  • Skierowane Grafy Acykliczne (DAGs): Airflow korzysta z DAGów do reprezentowania przepływów pracy, gdzie każdy węzeł to zadanie, a krawędzie definiują zależności. Ta struktura zapewnia, że zadania są wykonywane w określonej kolejności bez cykli.
  • Robustny interfejs użytkownika: Airflow oferuje nowoczesny interfejs sieciowy do monitorowania, harmonogramowania i zarządzania przepływami pracy, zapewniając widoczność statusu zadań i logów.
  • Rozszerzalne integracje: Obejmuje wiele wbudowanych operatorów i hooków do łączenia się z usługami chmurowymi (AWS, GCP, Azure), bazami danych i innymi narzędziami, co czyni go bardzo elastycznym.
  • Skalowalność i elastyczność: Airflow może orchestrować przepływy pracy na dużą skalę, wspierając zarówno wdrożenia lokalne, jak i w chmurze, a także jest odpowiedni dla szerokiego zakresu przypadków użycia od ETL do uczenia maszynowego.

Typowe przypadki użycia

  • Orchestracja potoków ETL (Extract, Transform, Load)
  • Harmonogramowanie i monitorowanie przepływów danych
  • Automatyzacja trenowania i wdrażania modeli uczenia maszynowego
  • Zarządzanie zadaniami infrastruktury

Zarządzane usługi Airflow

Wiele dostawców chmurowych oferuje zarządzane usługi Airflow, zmniejszając obciążenie operacyjne związane z konfiguracją i utrzymaniem:

  • Amazon Managed Workflows for Apache Airflow (MWAA): Pełnie zarządzana usługa, która obsługuje skalowanie, bezpieczeństwo i dostępność, pozwalając użytkownikom skupić się na tworzeniu przepływów pracy.
  • Google Cloud Composer: Zarządzany Airflow na platformie Google Cloud.
  • Microsoft Fabric Data Factory: Oferuje zadania Apache Airflow jako rozwiązanie zarządzanej orchestracji w ekosystemie Azure.

Instalacja i pierwsze kroki

Airflow można zainstalować za pomocą menedżera pakietów Pythona (pip install apache-airflow) lub narzędzi zarządzających, takich jak Astro CLI, które upraszczają konfigurację i zarządzanie projektami. Po zainstalowaniu użytkownicy definiują DAGi w skryptach Pythona i zarządzają nimi przez interfejs użytkownika Airflow.

Podsumowanie

Funkcja Opis
Definicja przepływu pracy Kod Pythona, oparty na DAGach
Interfejs użytkownika Webowy, nowoczesny, solidny
Integracje Chmura (AWS, GCP, Azure), bazy danych, wewnętrzne wtyczki
Zarządzane usługi AWS MWAA, Google Cloud Composer, Microsoft Fabric
Przypadki użycia ETL, potoki danych, przepływy uczenia maszynowego, zadania infrastruktury

Airflow jest szeroko stosowany w społeczności inżynierii danych ze względu na swoją elastyczność, skalowalność i silny ekosystem, czyniąc go najlepszym wyborem do orchestracji przepływów pracy w środowiskach produkcyjnych.

Kluczowe sposoby, w jakie Airflow upraszcza automatyzację przepływów pracy

  • Przepływy pracy jako kod (Python)
    Przepływy pracy są definiowane jako skrypty Pythona, korzystając z Skierowanych Grafów Acyklicznych (DAGs) do reprezentowania zadań i ich zależności. Ten „przepływy pracy jako kod” podejście umożliwia dynamiczne generowanie, parametryzowanie i łatwe kontrolowanie wersji potoków, czyniąc je bardzo utrzyjmowalnymi i elastycznymi wobec zmieniających się wymagań.

  • Dynamiczne i rozszerzalne potoki
    Korzystając z pełnych możliwości Pythona, użytkownicy mogą włączać pętle, warunki i niestandardową logikę bezpośrednio do definicji przepływu pracy. To umożliwia dynamiczne tworzenie zadań i zaawansowane parametryzowanie, które byłyby trudne lub niemożliwe w statycznych plikach konfiguracyjnych lub narzędziach opartych na interfejsie graficznym.

  • Zarządzanie zadaniami w stylu Pythona
    API TaskFlow (wprowadzone w wersji Airflow 2.0) czyni definiowanie zadań jeszcze bardziej Pythonowym. Programiści piszą zwykłe funkcje Pythona, dekorują je, a Airflow automatycznie obsługuje tworzenie zadań, przewiązanie zależności i przekazywanie danych między zadaniami, co prowadzi do czystszej i bardziej utrzyjmowalnej kodowej struktury.

  • Niestandardowe operatory i integracje
    Użytkownicy mogą tworzyć niestandardowe operatory, czujniki i hooki do interakcji z niemal każdym zewnętrznym systemem, API lub bazą danych. Ta elastyczność umożliwia płynną integrację z większym ekosystemem Pythona i usługami zewnętrznych.

  • Integracja z natywnym ekosystemem Pythona
    Ponieważ przepływy pracy są pisane w Pythonie, użytkownicy mogą korzystać z ogromnej liczby bibliotek Pythona (takich jak Pandas, NumPy lub ramów uczenia maszynowego) w swoich zadaniach, co dodatkowo wzmocni możliwości automatyzacji.

  • Czytelność i utrzyjmowalność
    Czytelność Pythona i jego popularność czynią Airflow dostępny dla szerokiego zakresu użytkowników, od inżynierów danych po analityków. Podejście oparte na kodzie również wspiera standardowe praktyki inżynierii oprogramowania, takie jak recenzje kodu i kontrola wersji.

Niektóre korzyści z użycia Apache Airflow

Funkcja Korzyść dla automatyzacji przepływów pracy
DAGi oparte na Pythonie Dynamiczne, elastyczne i utrzyjmowalne przepływy pracy
API TaskFlow Czystsze, bardziej Pythonowe definicje przepływów pracy
Niestandardowe operatory/Czujniki Integracja z dowolnym systemem lub API
Integracja z natywnym ekosystemem Pythona Użycie dowolnej biblioteki lub narzędzia Pythona
Solidny interfejs użytkownika Monitorowanie i zarządzanie przepływami pracy wizualnie

Głęboka integracja Airflow z Pythonem nie tylko upraszcza automatyzację przepływów pracy, ale również umożliwia zespołom budowanie solidnych, skalowalnych i bardzo dopasowanych potoków danych z dużą wydajnością.

Przykład: Prosty potok ETL w Pythonie

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract_data():
    print("Wyodrębnianie danych")

def transform_data():
    print("Przetwarzanie danych")

def load_data():
    print("Ładowanie danych")

default_args = {'start_date': datetime(2024, 12, 1), 'retries': 1}

dag = DAG('etl_pipeline', default_args=default_args, schedule_interval='@daily')

extract_task = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load_data, dag=dag)

extract_task >> transform_task >> load_task

Ten przykład pokazuje, jak każdy etap potoku ETL jest definiowany jako funkcja Pythona i orchestrowany przez Airflow przy użyciu operatorów Pythona i DAGów: extract_task >> transform_task >> load_task

Inny przykład DAGa Apache Airflow

Oto podstawowy przykład kodowania DAGa (Directed Acyclic Graph) w Apache Airflow przy użyciu Pythona. Ten przykład demonstruje, jak zdefiniować przepływ pracy z dwoma prostymi zadaniami przy użyciu BashOperator, który uruchamia polecenia bash:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='my_first_dag',
    default_args=default_args,
    description='Mój pierwszy DAG Airflow!',
    schedule_interval='@daily',
    start_date=datetime(2023, 1, 1),
    catchup=False,
) as dag:

    # Zadanie 1: Wypisz aktualną datę
    print_date = BashOperator(
        task_id='print_date',
        bash_command='date'
    )

    # Zadanie 2: Powiedz "Cześć"
    say_hello = BashOperator(
        task_id='say_hello',
        bash_command='echo "Cześć, Airflow!"'
    )

    # Zdefiniuj zależności zadań
    print_date >> say_hello

Jak to działa:

  • DAG jest definiowany przy użyciu menedżera kontekstu (with DAG(...) as dag:).
  • Utworzono dwa zadania: print_date i say_hello.
  • Zależność print_date >> say_hello zapewnia, że say_hello uruchamia się tylko po zakończeniu print_date.
  • Zapisz ten kod jako my_first_dag.py w katalogu dags/ Airflow, a Airflow automatycznie wykryje i zaplanuje go.

Możesz rozszerzyć ten szablon dodając zadania PythonOperator, rozgałęzienia lub bardziej złożoną logikę wraz z rozwojem swoich przepływów pracy.

Kluczowe parametry przy tworzeniu obiektu DAG w Pythonie

Podczas definiowania DAGa w Apache Airflow należy ustawić kilka kluczowych parametrów, aby zapewnić poprawne harmonogramowanie, identyfikację i zachowanie przepływu pracy.

Niezbędne parametry:

  • dag_id
    Unikalny identyfikator dla DAGa. Każdy DAG w środowisku Airflow musi mieć unikalny dag_id.

  • start_date
    Data i czas, w którym DAG staje się elegible do wykonania. Określa, kiedy rozpoczyna się harmonogramowanie DAGa.

  • schedule (lub schedule_interval)
    Definiuje, jak często powinien być uruchamiany DAG (np. "@daily", "0 12 * * *", lub None dla ręcznych uruchomień).

  • catchup
    Wartość logiczna określająca, czy Airflow powinien uzupełniać brakujące uruchomienia między start_date a aktualną datą, gdy DAG zostanie po raz pierwszy włączony. Domyślnie False.

Inne powszechne parametry:

  • default_args
    Słownik domyślnych argumentów stosowanych do wszystkich zadań w DAGu (np. owner, email, retries, retry_delay). Jest opcjonalny, ale zalecany w celu uniknięcia powtarzania się kodu (DRY).

  • params
    Słownik do konfiguracji parametrów uruchamiania, pozwalający przekazywać wartości do zadań w czasie uruchamiania i zwiększający elastyczność DAGów.

Przykład:

from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    'owner': 'jdoe',
    'email': ['jdoe@example.com'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='example_dag',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    default_args=default_args,
    params={'example_param': 'value'}
) as dag:
    # Zdefiniuj zadania tutaj
    pass

Tabela parametrów DAGa

Parametr Opis Wymagane
dag_id Unikalna nazwa DAGa Tak
start_date Kiedy DAG staje się elegible do uruchomienia Tak
schedule Częstotliwość/timing uruchamiania DAGa Tak
catchup Czy uzupełniać brakujące uruchomienia Nie
default_args Domyślne argumenty dla wszystkich zadań w DAGu Nie
params Parametry uruchamiania do dynamicznej konfiguracji Nie

Ustawienie tych parametrów zapewnia, że DAG jest unikalnie identyfikowany, poprawnie harmonogramowany i zachowuje się zgodnie z oczekiwaniami w Airflow.

Włączanie niestandardowych parametrów uruchamiania przy użyciu argumentu params w Airflow

Argument params w Apache Airflow umożliwia wstrzykiwanie niestandardowych parametrów uruchamiania do DAGów i zadań, umożliwiając dynamiczne i elastyczne konfiguracje przepływów pracy. Parametry umożliwiają dostarczanie konfiguracji uruchamiania do zadań. Można skonfigurować domyślne Parametry w kodzie DAGa i dostarczyć dodatkowe Parametry lub nadpisać wartości Parametrów podczas uruchamiania DAGa.

Ten podejście czyni przepływy pracy Airflow bardziej dynamicznymi i konfigurowalnymi, wspierając szeroki zakres scenariuszy automatyzacji.

Jak ustawić niestandardowe parametry uruchamiania

  • Zdefiniuj params w DAGu:
    Podczas tworzenia obiektu DAG, dołącz argument params jako słownik. Każda para klucz-wartość reprezentuje nazwę parametru i jego wartość domyślną.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def print_param(**kwargs):
    # Dostęp do parametru przez kwargs['params']
    my_param = kwargs['params']['my_param']
    print(f"Mój niestandardowy parametr to: {my_param}")

with DAG(
    dag_id='custom_params_example',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    params={'my_param': 'default_value'}  # Niestandardowy parametr uruchamiania
) as dag:

    task = PythonOperator(
        task_id='print_param_task',
        python_callable=print_param,
        provide_context=True
    )
  • Nadpisz w czasie uruchamiania:
    Podczas ręcznego uruchamiania działania DAGa (przez interfejs użytkownika Airflow, CLI lub API), możesz dostarczyć lub nadpisać wartości params dla tej konkretnej sesji uruchamiania.

Dostęp do parametrów w zadaniach

  • W Twojej funkcji Pythona zadania, uzyskaj dostęp do parametrów za pomocą kwargs['params'].
  • Dla pól szablonowanych (np. w BashOperator), użyj {{ params.my_param }} w ciągu szablonu.

Przydatne linki