Apache Airflow för MLOPS och ETL - Beskrivning, Fördelar och Exempel

Bra ramverk för ETS/MLOPS med Python

Sidinnehåll

Apache Airflow är en öppen källkodplattform som är utformad för att programmatiskt skriva, schemalägga och övervaka flöden - helt i Python-kod, vilket erbjuder en flexibel och kraftfull alternativ till traditionella, manuella eller gränssnittsbaserade flödesverktyg.

Apache Airflow utvecklades ursprungligen på Airbnb 2014 för att hantera alltmer komplexa flöden och blev ett toppnivåprojekt inom Apache Software Foundation 2019.

Airflow är mest fördelaktigt när du har komplexa flöden med flera uppgifter som måste köras i en specifik ordning, med beroenden och felhantering. Det är särskilt användbart för organisationer som kör många datajobb som kräver orkestration, övervakning och återförsöksmekanismer, snarare än enkla skript eller cron-jobb.

Kedja av cyberevents

Apache Airflows nyckelfunktioner och typiska användningsområden

  • Pythonbaserad flödesdefinition: Flöden definieras som Python-kod, vilket tillåter dynamisk generering av pipeliner med hjälp av standardprogrammeringskonstruktioner som loopar och villkorlig logik.
  • Directed Acyclic Graphs (DAGs): Airflow använder DAGs för att representera flöden, där varje nod är en uppgift och kanter definierar beroenden. Denna struktur säkerställer att uppgifter körs i en specificerad ordning utan cykler.
  • Robust UI: Airflow erbjuder ett modernt webbgränssnitt för övervakning, schemaläggning och hantering av flöden, vilket ger synlighet i uppgiftsstatus och loggar.
  • Uppföljande integreringar: Den inkluderar många inbyggda operatörer och krokar för att ansluta till molntjänster (AWS, GCP, Azure), databaser och andra verktyg, vilket gör den högst utökbar.
  • Skalbarhet och flexibilitet: Airflow kan orkestrera flöden i stor skala, stödja både lokala och molnbaserade distributioner, och är lämplig för ett brett spektrum av användningsområden från ETL till maskininlärning.

Typiska användningsområden

  • Orkestrera ETL (Extract, Transform, Load) pipeliner
  • Schemaläggning och övervakning av dataflöden
  • Automatisering av maskininlärningsmodelltränings- och distributionsprocesser
  • Hantering av infrastrukturuppgifter

Hanterade Airflow-tjänster

Flera molntjänsteleverantörer erbjuder hanterade Airflow-tjänster, vilket minskar den operationella bördan för uppsättning och underhåll:

  • Amazon Managed Workflows for Apache Airflow (MWAA): En helt hanterad tjänst som hanterar skalning, säkerhet och tillgänglighet, vilket låter användare fokusera på flödesutveckling.
  • Google Cloud Composer: Hanterad Airflow på Google Cloud Platform.
  • Microsoft Fabric Data Factory: Erbjuder Apache Airflow-jobb som en hanterad orkestrationslösning inom Azure-ecosystemet.

Installation och introduktion

Airflow kan installeras via Python-pakethanteraren (pip install apache-airflow) eller hanterade verktyg som Astro CLI, vilket förenklar uppsättningen och projekthanteringen. Efter installationen definierar användare DAGs i Python-skript och hanterar dem via Airflow-gränssnittet.

Sammanfattning

Funktion Beskrivning
Flödesdefinition Python-kod, DAG-baserad
UI Webb-baserad, modern, robust
Integreringar Moln (AWS, GCP, Azure), databaser, anpassade moduler
Hanterade tjänster AWS MWAA, Google Cloud Composer, Microsoft Fabric
Användningsområden ETL, dataflöden, ML-flöden, infrastrukturuppgifter

Airflow är vida antaget inom dataengineeringssamhället för sin flexibilitet, skalbarhet och starka ekosystem, vilket gör den till ett ledande val för flödesorkestration i produktionsmiljöer.

Nyckelmetoder som Airflow förenklar flödesautomatisering

  • Flöden som kod (Python) Airflow-flöden definieras som Python-skript, med Directed Acyclic Graphs (DAGs) för att representera uppgifter och deras beroenden. Denna “flöden som kod”-metod möjliggör dynamisk generering, parameterisering och enkel versionskontroll av pipeliner, vilket gör dem högst underhållningsbara och anpassningsbara till förändrade krav.

  • Dynamiska och utökbara pipeliner Genom att utnyttja Python fulla kapacitet kan användare integrera loopar, villkor och anpassad logik direkt i sina flödesdefinitioner. Detta möjliggör dynamisk uppgiftsgenerering och avancerad parameterisering som skulle vara svår eller omöjlig i statiska konfigurationsfiler eller gränssnittsbaserade verktyg.

  • Pythonisk uppgiftshantering Airflows TaskFlow API (introducerad i Airflow 2.0) gör det ännu mer pythoniskt att definiera uppgifter. Utvecklare skriver rena Python-funktioner, dekorerar dem och Airflow hanterar automatiskt uppgiftsgenerering, beroendekoppling och datapassering mellan uppgifter, vilket resulterar i renare och mer underhållningsbar kod.

  • Anpassade operatörer och integreringar Användare kan skapa anpassade Python-operatörer, sensorer och krokar för att interagera med nästan vilket externt system, API eller databas som helst. Denna utökbarhet möjliggör smidig integrering med det bredare Python-ecosystemet och externa tjänster.

  • Naturlig integrering med Python-ecosystemet Eftersom flöden skrivs i Python kan användare utnyttja den stora mängden Python-bibliotek (som Pandas, NumPy eller maskininlärningsramverk) inom sina uppgifter, vilket ytterligare förbättrar automatiseringsmöjligheterna.

  • Läsbar och underhållningsbar Pythons läsbarhet och popularitet gör Airflow tillgängligt för ett brett spektrum av användare, från dataingenjörer till analytiker. Den kodbaserade metoden stöder också standardprogrammeringspraktiker som kodgranskning och versionskontroll.

Några Apache Airflow-fördelar

Funktion Fördel för flödesautomatisering
Python-baserade DAGs Dynamiska, flexibla och underhållningsbara flöden
TaskFlow API Renare, mer pythoniska flödesdefinitioner
Anpassade operatörer/sensorer Integrera med vilket system eller API som helst
Naturlig Python-integrering Använd vilket Python-bibliotek eller verktyg som helst
Robust UI Övervaka och hantera flöden visuellt

Airflows djupa integrering med Python förenklar inte bara flödesautomatisering utan ger också team möjlighet att bygga robusta, skalbara och högst anpassade datapipeliner effektivt.

Exempel: Enkel ETL-pipeline i Python

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract_data():
    print("Extraherar data")

def transform_data():
    print("Transformerar data")

def load_data():
    print("Laddar data")

default_args = {'start_date': datetime(2024, 12, 1), 'retries': 1}

dag = DAG('etl_pipeline', default_args=default_args, schedule_interval='@daily')

extract_task = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load_data, dag=dag)

extract_task >> transform_task >> load_task

Detta exempel visar hur varje steg i en ETL-pipeline definieras som en Python-funktion och orkestreras av Airflow med hjälp av Python-operatörer och DAGs: extract_task >> transform_task >> load_task

Ett annat exempel på Apache Airflow DAG

Här är ett grundläggande exempel på hur man kodar en DAG (Directed Acyclic Graph) i Apache Airflow med Python. Det här exemplet demonstrerar hur man definierar ett flöde med två enkla uppgifter med BashOperator, som kör bash-kommandon:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='my_first_dag',
    default_args=default_args,
    description='Min första Airflow DAG!',
    schedule_interval='@daily',
    start_date=datetime(2023, 1, 1),
    catchup=False,
) as dag:

    # Uppgift 1: Skriv ut det aktuella datumet
    print_date = BashOperator(
        task_id='print_date',
        bash_command='date'
    )

    # Uppgift 2: Säg Hej
    say_hello = BashOperator(
        task_id='say_hello',
        bash_command='echo "Hej, Airflow!"'
    )

    # Definiera uppgiftsberoenden
    print_date >> say_hello

Hur detta fungerar:

  • DAG definieras med en kontexthanterare (with DAG(...) as dag:).
  • Två uppgifter skapas: print_date och say_hello.
  • Beroendet print_date >> say_hello säkerställer att say_hello bara körs efter att print_date har slutförts.
  • Spara den här koden som my_first_dag.py i din Airflow dags/ katalog, och Airflow kommer automatiskt att upptäcka och schemalägga den.

Du kan bygga vidare på den här mallar genom att lägga till PythonOperator-uppgifter, grenar eller mer komplex logik när dina flöden växer.

Nyckelparametrar när du skapar ett DAG-objekt i Python

När du definierar ett DAG i Apache Airflow bör flera viktiga parametrar anges för att säkerställa korrekt schemaläggning, identifiering och beteende för ditt arbetsflöde.

Viktiga parametrar:

  • dag_id Den unika identifieraren för ditt DAG. Varje DAG i din Airflow-miljö måste ha en distinkt dag_id.

  • start_date Datum och tid när DAG:en blir berättigad för exekvering. Detta bestämmer när schemaläggningen börjar för DAG:en.

  • schedule (eller schedule_interval) Definierar hur ofta DAG:en ska köras (t.ex., "@daily", "0 12 * * *", eller None för manuella körningar).

  • catchup En boolean som bestämmer om Airflow ska fylla i missade körningar mellan start_date och det aktuella datumet när DAG:en först aktiveras. Standardvärdet är False.

Andra vanliga parametrar:

  • default_args En ordbok med standardargument som tillämpas på alla uppgifter inom DAG:en (t.ex., owner, email, retries, retry_delay). Detta är valfritt men rekommenderas för DRY (Don’t Repeat Yourself) kod.

  • params En ordbok för konfigurationsparametrar vid körning, som låter dig skicka värden till uppgifter vid körning och gör dina DAG:er mer flexibla.

Exempel:

from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    'owner': 'jdoe',
    'email': ['jdoe@example.com'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='example_dag',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    default_args=default_args,
    params={'example_param': 'value'}
) as dag:
    # Definiera uppgifter här
    pass

Tabell över DAG-parametrar

Parameter Beskrivning Obligatorisk
dag_id Unikt namn för DAG:en Ja
start_date När DAG:en är berättigad att börja köra Ja
schedule Frekvens/tidpunkt för DAG-körningar Ja
catchup Om missade körningar ska fyllas i Nej
default_args Standardargument för alla uppgifter i DAG:en Nej
params Parametrar vid körning för dynamisk konfiguration Nej

Att ställa in dessa parametrar säkerställer att ditt DAG är unikt identifierat, schemalagt korrekt och beteer sig som avsett i Airflow.

Inkludering av anpassade körningsparametrar med hjälp av params-argumentet i Airflow

Apache Airflows params-argument låter dig injicera anpassade körningsparametrar i dina DAG:er och uppgifter, vilket möjliggör dynamiska och flexibla arbetsflödeskonfigurationer. Params låter dig ge konfigurationsparametrar vid körning till uppgifter. Du kan konfigurera standard-Params i din DAG-kod och tillhandahålla ytterligare Params, eller skriva över Param-värden när du utlöser en DAG-körning.

Denna metod gör dina Airflow-arbetsflöden mer dynamiska och konfigurerbara, och stöder en bred variation av automatiseringsscenarier.

Hur man ställer in anpassade körningsparametrar

  • Definiera params i DAG:en: När du skapar ett DAG-objekt, inkludera params-argumentet som en ordbok. Varje nyckel-värde-par representerar ett parameternamn och dess standardvärde.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def print_param(**kwargs):
    # Åtkomst till parametern via kwargs['params']
    my_param = kwargs['params']['my_param']
    print(f"Min anpassade parameter är: {my_param}")

with DAG(
    dag_id='custom_params_example',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    params={'my_param': 'default_value'}  # Anpassad körningsparameter
) as dag:

    task = PythonOperator(
        task_id='print_param_task',
        python_callable=print_param,
        provide_context=True
    )
  • Överskriv vid körning: När du manuellt utlöser en DAG-körning (via Airflow-gränssnittet, CLI eller API) kan du tillhandahålla eller överskriva params-värden för den specifika körningen.

Åtkomst till parametrar i uppgifter

  • I din uppgifts Python-kallbarhet, åtkomst till params med kwargs['params'].
  • För mallade fält (som i BashOperator), använd {{ params.my_param }} i mallsträngen.

Användbara länkar