Apache Airflow för MLOPS och ETL - Beskrivning, Fördelar och Exempel
Bra ramverk för ETS/MLOPS med Python
Apache Airflow är en öppen källkodplattform som är utformad för att programmatiskt skriva, schemalägga och övervaka flöden - helt i Python-kod, vilket erbjuder en flexibel och kraftfull alternativ till traditionella, manuella eller gränssnittsbaserade flödesverktyg.
Apache Airflow utvecklades ursprungligen på Airbnb 2014 för att hantera alltmer komplexa flöden och blev ett toppnivåprojekt inom Apache Software Foundation 2019.
Airflow är mest fördelaktigt när du har komplexa flöden med flera uppgifter som måste köras i en specifik ordning, med beroenden och felhantering. Det är särskilt användbart för organisationer som kör många datajobb som kräver orkestration, övervakning och återförsöksmekanismer, snarare än enkla skript eller cron-jobb.

Apache Airflows nyckelfunktioner och typiska användningsområden
- Pythonbaserad flödesdefinition: Flöden definieras som Python-kod, vilket tillåter dynamisk generering av pipeliner med hjälp av standardprogrammeringskonstruktioner som loopar och villkorlig logik.
- Directed Acyclic Graphs (DAGs): Airflow använder DAGs för att representera flöden, där varje nod är en uppgift och kanter definierar beroenden. Denna struktur säkerställer att uppgifter körs i en specificerad ordning utan cykler.
- Robust UI: Airflow erbjuder ett modernt webbgränssnitt för övervakning, schemaläggning och hantering av flöden, vilket ger synlighet i uppgiftsstatus och loggar.
- Uppföljande integreringar: Den inkluderar många inbyggda operatörer och krokar för att ansluta till molntjänster (AWS, GCP, Azure), databaser och andra verktyg, vilket gör den högst utökbar.
- Skalbarhet och flexibilitet: Airflow kan orkestrera flöden i stor skala, stödja både lokala och molnbaserade distributioner, och är lämplig för ett brett spektrum av användningsområden från ETL till maskininlärning.
Typiska användningsområden
- Orkestrera ETL (Extract, Transform, Load) pipeliner
- Schemaläggning och övervakning av dataflöden
- Automatisering av maskininlärningsmodelltränings- och distributionsprocesser
- Hantering av infrastrukturuppgifter
Hanterade Airflow-tjänster
Flera molntjänsteleverantörer erbjuder hanterade Airflow-tjänster, vilket minskar den operationella bördan för uppsättning och underhåll:
- Amazon Managed Workflows for Apache Airflow (MWAA): En helt hanterad tjänst som hanterar skalning, säkerhet och tillgänglighet, vilket låter användare fokusera på flödesutveckling.
- Google Cloud Composer: Hanterad Airflow på Google Cloud Platform.
- Microsoft Fabric Data Factory: Erbjuder Apache Airflow-jobb som en hanterad orkestrationslösning inom Azure-ecosystemet.
Installation och introduktion
Airflow kan installeras via Python-pakethanteraren (pip install apache-airflow) eller hanterade verktyg som Astro CLI, vilket förenklar uppsättningen och projekthanteringen. Efter installationen definierar användare DAGs i Python-skript och hanterar dem via Airflow-gränssnittet.
Sammanfattning
| Funktion | Beskrivning |
|---|---|
| Flödesdefinition | Python-kod, DAG-baserad |
| UI | Webb-baserad, modern, robust |
| Integreringar | Moln (AWS, GCP, Azure), databaser, anpassade moduler |
| Hanterade tjänster | AWS MWAA, Google Cloud Composer, Microsoft Fabric |
| Användningsområden | ETL, dataflöden, ML-flöden, infrastrukturuppgifter |
Airflow är vida antaget inom dataengineeringssamhället för sin flexibilitet, skalbarhet och starka ekosystem, vilket gör den till ett ledande val för flödesorkestration i produktionsmiljöer.
Nyckelmetoder som Airflow förenklar flödesautomatisering
-
Flöden som kod (Python) Airflow-flöden definieras som Python-skript, med Directed Acyclic Graphs (DAGs) för att representera uppgifter och deras beroenden. Denna “flöden som kod”-metod möjliggör dynamisk generering, parameterisering och enkel versionskontroll av pipeliner, vilket gör dem högst underhållningsbara och anpassningsbara till förändrade krav.
-
Dynamiska och utökbara pipeliner Genom att utnyttja Python fulla kapacitet kan användare integrera loopar, villkor och anpassad logik direkt i sina flödesdefinitioner. Detta möjliggör dynamisk uppgiftsgenerering och avancerad parameterisering som skulle vara svår eller omöjlig i statiska konfigurationsfiler eller gränssnittsbaserade verktyg.
-
Pythonisk uppgiftshantering Airflows TaskFlow API (introducerad i Airflow 2.0) gör det ännu mer pythoniskt att definiera uppgifter. Utvecklare skriver rena Python-funktioner, dekorerar dem och Airflow hanterar automatiskt uppgiftsgenerering, beroendekoppling och datapassering mellan uppgifter, vilket resulterar i renare och mer underhållningsbar kod.
-
Anpassade operatörer och integreringar Användare kan skapa anpassade Python-operatörer, sensorer och krokar för att interagera med nästan vilket externt system, API eller databas som helst. Denna utökbarhet möjliggör smidig integrering med det bredare Python-ecosystemet och externa tjänster.
-
Naturlig integrering med Python-ecosystemet Eftersom flöden skrivs i Python kan användare utnyttja den stora mängden Python-bibliotek (som Pandas, NumPy eller maskininlärningsramverk) inom sina uppgifter, vilket ytterligare förbättrar automatiseringsmöjligheterna.
-
Läsbar och underhållningsbar Pythons läsbarhet och popularitet gör Airflow tillgängligt för ett brett spektrum av användare, från dataingenjörer till analytiker. Den kodbaserade metoden stöder också standardprogrammeringspraktiker som kodgranskning och versionskontroll.
Några Apache Airflow-fördelar
| Funktion | Fördel för flödesautomatisering |
|---|---|
| Python-baserade DAGs | Dynamiska, flexibla och underhållningsbara flöden |
| TaskFlow API | Renare, mer pythoniska flödesdefinitioner |
| Anpassade operatörer/sensorer | Integrera med vilket system eller API som helst |
| Naturlig Python-integrering | Använd vilket Python-bibliotek eller verktyg som helst |
| Robust UI | Övervaka och hantera flöden visuellt |
Airflows djupa integrering med Python förenklar inte bara flödesautomatisering utan ger också team möjlighet att bygga robusta, skalbara och högst anpassade datapipeliner effektivt.
Exempel: Enkel ETL-pipeline i Python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract_data():
print("Extraherar data")
def transform_data():
print("Transformerar data")
def load_data():
print("Laddar data")
default_args = {'start_date': datetime(2024, 12, 1), 'retries': 1}
dag = DAG('etl_pipeline', default_args=default_args, schedule_interval='@daily')
extract_task = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load_data, dag=dag)
extract_task >> transform_task >> load_task
Detta exempel visar hur varje steg i en ETL-pipeline definieras som en Python-funktion och orkestreras av Airflow med hjälp av Python-operatörer och DAGs:
extract_task >> transform_task >> load_task
Ett annat exempel på Apache Airflow DAG
Här är ett grundläggande exempel på hur man kodar en DAG (Directed Acyclic Graph) i Apache Airflow med Python. Det här exemplet demonstrerar hur man definierar ett flöde med två enkla uppgifter med BashOperator, som kör bash-kommandon:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG(
dag_id='my_first_dag',
default_args=default_args,
description='Min första Airflow DAG!',
schedule_interval='@daily',
start_date=datetime(2023, 1, 1),
catchup=False,
) as dag:
# Uppgift 1: Skriv ut det aktuella datumet
print_date = BashOperator(
task_id='print_date',
bash_command='date'
)
# Uppgift 2: Säg Hej
say_hello = BashOperator(
task_id='say_hello',
bash_command='echo "Hej, Airflow!"'
)
# Definiera uppgiftsberoenden
print_date >> say_hello
Hur detta fungerar:
- DAG definieras med en kontexthanterare (
with DAG(...) as dag:). - Två uppgifter skapas:
print_dateochsay_hello. - Beroendet
print_date >> say_hellosäkerställer attsay_hellobara körs efter attprint_datehar slutförts. - Spara den här koden som
my_first_dag.pyi din Airflowdags/katalog, och Airflow kommer automatiskt att upptäcka och schemalägga den.
Du kan bygga vidare på den här mallar genom att lägga till PythonOperator-uppgifter, grenar eller mer komplex logik när dina flöden växer.
Nyckelparametrar när du skapar ett DAG-objekt i Python
När du definierar ett DAG i Apache Airflow bör flera viktiga parametrar anges för att säkerställa korrekt schemaläggning, identifiering och beteende för ditt arbetsflöde.
Viktiga parametrar:
-
dag_id Den unika identifieraren för ditt DAG. Varje DAG i din Airflow-miljö måste ha en distinkt
dag_id. -
start_date Datum och tid när DAG:en blir berättigad för exekvering. Detta bestämmer när schemaläggningen börjar för DAG:en.
-
schedule (eller
schedule_interval) Definierar hur ofta DAG:en ska köras (t.ex.,"@daily","0 12 * * *", ellerNoneför manuella körningar). -
catchup En boolean som bestämmer om Airflow ska fylla i missade körningar mellan
start_dateoch det aktuella datumet när DAG:en först aktiveras. Standardvärdet ärFalse.
Andra vanliga parametrar:
-
default_args En ordbok med standardargument som tillämpas på alla uppgifter inom DAG:en (t.ex.,
owner,email,retries,retry_delay). Detta är valfritt men rekommenderas för DRY (Don’t Repeat Yourself) kod. -
params En ordbok för konfigurationsparametrar vid körning, som låter dig skicka värden till uppgifter vid körning och gör dina DAG:er mer flexibla.
Exempel:
from airflow import DAG
from datetime import datetime, timedelta
default_args = {
'owner': 'jdoe',
'email': ['jdoe@example.com'],
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='example_dag',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
default_args=default_args,
params={'example_param': 'value'}
) as dag:
# Definiera uppgifter här
pass
Tabell över DAG-parametrar
| Parameter | Beskrivning | Obligatorisk |
|---|---|---|
| dag_id | Unikt namn för DAG:en | Ja |
| start_date | När DAG:en är berättigad att börja köra | Ja |
| schedule | Frekvens/tidpunkt för DAG-körningar | Ja |
| catchup | Om missade körningar ska fyllas i | Nej |
| default_args | Standardargument för alla uppgifter i DAG:en | Nej |
| params | Parametrar vid körning för dynamisk konfiguration | Nej |
Att ställa in dessa parametrar säkerställer att ditt DAG är unikt identifierat, schemalagt korrekt och beteer sig som avsett i Airflow.
Inkludering av anpassade körningsparametrar med hjälp av params-argumentet i Airflow
Apache Airflows params-argument låter dig injicera anpassade körningsparametrar i dina DAG:er och uppgifter, vilket möjliggör dynamiska och flexibla arbetsflödeskonfigurationer. Params låter dig ge konfigurationsparametrar vid körning till uppgifter. Du kan konfigurera standard-Params i din DAG-kod och tillhandahålla ytterligare Params, eller skriva över Param-värden när du utlöser en DAG-körning.
Denna metod gör dina Airflow-arbetsflöden mer dynamiska och konfigurerbara, och stöder en bred variation av automatiseringsscenarier.
Hur man ställer in anpassade körningsparametrar
- Definiera
paramsi DAG:en: När du skapar ett DAG-objekt, inkluderaparams-argumentet som en ordbok. Varje nyckel-värde-par representerar ett parameternamn och dess standardvärde.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def print_param(**kwargs):
# Åtkomst till parametern via kwargs['params']
my_param = kwargs['params']['my_param']
print(f"Min anpassade parameter är: {my_param}")
with DAG(
dag_id='custom_params_example',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
params={'my_param': 'default_value'} # Anpassad körningsparameter
) as dag:
task = PythonOperator(
task_id='print_param_task',
python_callable=print_param,
provide_context=True
)
- Överskriv vid körning:
När du manuellt utlöser en DAG-körning (via Airflow-gränssnittet, CLI eller API) kan du tillhandahålla eller överskriva
params-värden för den specifika körningen.
Åtkomst till parametrar i uppgifter
- I din uppgifts Python-kallbarhet, åtkomst till params med
kwargs['params']. - För mallade fält (som i BashOperator), använd
{{ params.my_param }}i mallsträngen.
Användbara länkar
- https://airflow.apache.org
- https://github.com/apache/airflow
- https://en.wikipedia.org/wiki/Apache_Airflow
- uv - Ny Python-pakethanterare, projekt- och miljöhanterare
- Python Cheatsheet
- AWS lambda prestanda: JavaScript vs Python vs Golang
- AWS SAM + AWS SQS + Python PowerTools
- venv Cheatsheet
- Generering av PDF i Python - Bibliotek och exempel