Apache Airflow voor MLOPS en ETL - Omschrijving, voordelen en voorbeelden
Mooi kader voor ETS/MLOPS met Python
Apache Airflow is een open-source platform dat is ontworpen om workflows programmatisch te schrijven, te plannen en te monitoren, volledig in Python-code, en biedt een flexibele en krachtige alternatief voor traditionele, manuele of UI-gebaseerde workflow-tools.
Apache Airflow is oorspronkelijk ontwikkeld bij Airbnb in 2014 om steeds complexere workflows te beheren en werd in 2019 een top-level project van de Apache Software Foundation.
Airflow is het meest nuttig wanneer je complexe workflows hebt met meerdere taken die in een specifieke volgorde moeten worden uitgevoerd, met afhankelijkheden en foutafhandeling. Het is vooral handig voor organisaties die veel data-taken uitvoeren die orkestratie, monitoring en herstartmechanismen vereisen, in plaats van eenvoudige scripts of cron-taken.
Belangrijke functies van Apache Airflow en typische toepassingen
- Workflowdefinitie op basis van Python: Workflows worden gedefinieerd als Python-code, waardoor dynamische pipelinegeneratie mogelijk is met behulp van standaard programmeerconstructies zoals lussen en voorwaardelijke logica.
- Gerichte acyclische grafieken (DAGs): Airflow gebruikt DAGs om workflows weer te geven, waarbij elk knooppunt een taak is en de randen afhankelijkheden definiëren. Deze structuur zorgt ervoor dat taken in een opgegeven volgorde worden uitgevoerd zonder cycli.
- Robuuste gebruikersinterface: Airflow biedt een moderne webinterface voor het monitoren, plannen en beheren van workflows, met inzicht in de status van taken en logboeken.
- Uitgebreide integraties: Het bevat veel ingebouwde operatoren en hooks om verbinding te maken met cloudservices (AWS, GCP, Azure), databases en andere tools, waardoor het zeer uitbreidbaar is.
- Schaalbaarheid en flexibiliteit: Airflow kan workflows op schaal orkestreren, ondersteunt zowel on-premises als cloudimplementaties, en is geschikt voor een breed scala aan toepassingen, van ETL tot machine learning.
Typische toepassingen
- Orkestratie van ETL (Extract, Transform, Load)-pipelines
- Plannen en monitoren van data-workflows
- Automatiseren van het trainen en implementeren van machine learningmodellen
- Beheren van infrastructuurtaken
Beheerde Airflow-diensten
Verschillende cloudproviders bieden beheerde Airflow-diensten aan, die de operationele last van installatie en onderhoud verminderen:
- Amazon Managed Workflows for Apache Airflow (MWAA): Een volledig beheerde dienst die schaalbaarheid, beveiliging en beschikbaarheid afhandelt, waardoor gebruikers zich kunnen richten op workflowontwikkeling.
- Google Cloud Composer: Beheerde Airflow op Google Cloud Platform.
- Microsoft Fabric Data Factory: Biedt Apache Airflow-taken aan als een beheerde orkestratiedienst binnen het Azure-ecosysteem.
Installatie en aan de slag gaan
Airflow kan worden geïnstalleerd via Python’s pakketbeheerder (pip install apache-airflow
) of beheerhulpmiddelen zoals Astro CLI, wat de installatie en projectbeheer vereenvoudigt. Na installatie definiëren gebruikers DAGs in Pythonscripts en beheren ze deze via de Airflow-gebruikersinterface.
Samenvatting
Kenmerk | Beschrijving |
---|---|
Workflowdefinitie | Python-code, DAG-gebaseerd |
Gebruikersinterface | Webgebaseerd, modern, robuust |
Integraties | Cloud (AWS, GCP, Azure), databases, aangepaste plugins |
Beheerde diensten | AWS MWAA, Google Cloud Composer, Microsoft Fabric |
Toepassingen | ETL, datapipelines, ML-workflows, infrastructuurtaken |
Airflow wordt breed gebruikt in de data-engineeringgemeenschap vanwege zijn flexibiliteit, schaalbaarheid en sterke ecosystem, waardoor het een vooraanstaand keuze is voor workfloworkestratie in productieomgevingen.
Belangrijkste manieren waarop Airflow workflowautomatisering vereenvoudigt
-
Workflows als code (Python)
Airflow-workflows worden gedefinieerd als Pythonscripts, met behulp van gerichte acyclische grafieken (DAGs) om taken en hun afhankelijkheden weer te geven. Deze “workflows als code” aanpak maakt dynamische generatie, parameterisatie en eenvoudige versiebeheer van pipelines mogelijk, waardoor ze zeer onderhoudbaar en aanpasbaar zijn aan veranderende eisen. -
Dynamische en uitbreidbare pipelines
Door gebruik te maken van de volledige mogelijkheden van Python kunnen gebruikers lussen, voorwaarden en aangepaste logica direct in hun workflowdefinities opnemen. Dit maakt het mogelijk om dynamische taakcreatie en geavanceerde parameterisatie te realiseren, wat in statische configuratiebestanden of GUI-tools lastig of onmogelijk zou zijn. -
Python-achtige taakbeheer
Airflow’s TaskFlow API (geïntroduceerd in Airflow 2.0) maakt het definiëren van taken nog Python-achtiger. Ontwikkelaars schrijven gewone Pythonfuncties, markeren deze, en Airflow zorgt automatisch voor taakcreatie, afhankelijkheidsschakeling en datapassing tussen taken, wat leidt tot schoner en onderhoudbaarder code. -
Aangepaste operatoren en integraties
Gebruikers kunnen aangepaste Python-operatoren, sensoren en hooks aanmaken om interactie mogelijk te maken met vrijwel elk extern systeem, API of database. Deze uitbreidbaarheid maakt een naadloze integratie met het brede Python-ecosysteem en externe diensten mogelijk. -
Integratie met het native Python-ecosysteem
Omdat workflows in Python worden geschreven, kunnen gebruikers gebruik maken van de enorme hoeveelheid Pythonbibliotheken (zoals Pandas, NumPy of machine learningframeworks) binnen hun taken, wat de automatiseringsmogelijkheden verder versterkt. -
Leesbaar en onderhoudbaar
De leesbaarheid en populariteit van Python maken Airflow toegankelijk voor een breed scala aan gebruikers, van data-engineers tot analisten. De codegebaseerde aanpak ondersteunt ook standaard software-engineeringpraktijken zoals codebeoordeling en versiebeheer.
Enkele voordelen van Apache Airflow
Kenmerk | Voordelen voor workflowautomatisering |
---|---|
Python-gebaseerde DAGs | Dynamische, flexibele en onderhoudbare workflows |
TaskFlow API | Schoner, meer Python-achtige workflowdefinities |
Aangepaste operatoren/sensoren | Integreer met elk systeem of API |
Native Python integratie | Gebruik elke Pythonbibliothek of tool |
Robuuste gebruikersinterface | Monitor en beheer workflows visueel |
De diepe integratie van Airflow met Python vereenvoudigt niet alleen workflowautomatisering, maar geeft ook teams de mogelijkheid om robuuste, schaalbare en zeer aangepaste datapipelines efficiënt te bouwen.
Voorbeeld: Eenvoudige ETL-workflow in Python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract_data():
print("Data extraheren")
def transform_data():
print("Data transformeren")
def load_data():
print("Data laden")
default_args = {'start_date': datetime(2024, 12, 1), 'retries': 1}
dag = DAG('etl_pipeline', default_args=default_args, schedule_interval='@daily')
extract_task = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load_data, dag=dag)
extract_task >> transform_task >> load_task
Dit voorbeeld laat zien hoe elk stadium van een ETL-pipeline als een Pythonfunctie wordt gedefinieerd en wordt georkestratieerd door Airflow met behulp van Python-operatoren en DAGs:
extract_task >> transform_task >> load_task
Een ander voorbeeld van een Apache Airflow DAG
Hieronder volgt een basisvoorbeeld van het coderen van een DAG (Directed Acyclic Graph) in Apache Airflow met behulp van Python. Dit voorbeeld laat zien hoe je een workflow kunt definiëren met twee eenvoudige taken met behulp van de BashOperator, die bash-opdrachten uitvoert:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG(
dag_id='my_first_dag',
default_args=default_args,
description='Mijn eerste Airflow DAG!',
schedule_interval='@daily',
start_date=datetime(2023, 1, 1),
catchup=False,
) as dag:
# Taak 1: Druk de huidige datum af
print_date = BashOperator(
task_id='print_date',
bash_command='date'
)
# Taak 2: Zeg Hallo
say_hello = BashOperator(
task_id='say_hello',
bash_command='echo "Hallo, Airflow!"'
)
# Definieer taakafhankelijkheden
print_date >> say_hello
Hoe dit werkt:
- De DAG wordt gedefinieerd met behulp van een contextmanager (
with DAG(...) as dag:
). - Er worden twee taken gecreëerd:
print_date
ensay_hello
. - De afhankelijkheid
print_date >> say_hello
zorgt ervoor datsay_hello
pas wordt uitgevoerd nadatprint_date
is voltooid. - Sla deze code op als
my_first_dag.py
in je Airflowdags/
-map, en Airflow zal deze automatisch detecteren en plannen.
Je kunt dit sjabloon uitbreiden door PythonOperator-taken, takkingen of complexere logica toe te voegen naarmate je workflows groeien.
Belangrijke parameters bij het maken van een DAG-object in Python
Bij het definiëren van een DAG in Apache Airflow moeten verschillende belangrijke parameters worden ingesteld om ervoor te zorgen dat je workflow correct wordt gepland, geïdentificeerd en gedragen.
Essentiële parameters:
-
dag_id
Het unieke identificatienummer voor je DAG. Elke DAG in je Airflow-omgeving moet een uniekdag_id
hebben. -
start_date
De datum en tijd waarop de DAG geschikt is voor uitvoering. Dit bepaalt wanneer de planning van de DAG begint. -
schedule (of
schedule_interval
)
Definieert hoe vaak de DAG moet worden uitgevoerd (bijvoorbeeld"@daily"
,"0 12 * * *"
, ofNone
voor handmatige uitvoering). -
catchup
Een boolean die bepaalt of Airflow moet compenseren voor gemiste uitvoeringen tussen destart_date
en de huidige datum wanneer de DAG voor het eerst wordt ingeschakeld. Standaard is ditFalse
.
Andere veelgebruikte parameters:
-
default_args
Een dictionary van standaardargumenten die worden toegepast op alle taken binnen de DAG (bijvoorbeeldowner
,email
,retries
,retry_delay
). Dit is optioneel, maar wordt aanbevolen voor DRY (Don’t Repeat Yourself) code. -
params
Een dictionary voor runtimeconfiguratieparameters, waarmee je waarden kunt doorgeven aan taken tijdens runtime en je DAGs flexibeler kunt maken.
Voorbeeld:
from airflow import DAG
from datetime import datetime, timedelta
default_args = {
'owner': 'jdoe',
'email': ['jdoe@example.com'],
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='example_dag',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
default_args=default_args,
params={'example_param': 'value'}
) as dag:
# Definieer taken hier
pass
Tabel van DAG-parameters
Parameter | Beschrijving | Vereist |
---|---|---|
dag_id | Unieke naam voor de DAG | Ja |
start_date | Wanneer de DAG geschikt is om te starten | Ja |
schedule | Frequentie/tijd voor DAG-uitvoering | Ja |
catchup | Of gemiste uitvoeringen moeten worden gecompenseerd | Nee |
default_args | Standaardargumenten voor alle taken in de DAG | Nee |
params | Runtimeparameters voor dynamische configuratie | Nee |
Instellen van deze parameters zorgt ervoor dat je DAG uniek wordt geïdentificeerd, correct wordt gepland en zich zoals bedoeld gedraagt in Airflow.
Inclusie van aangepaste runtimeparameters met behulp van het params
-argument in Airflow
Het params
-argument van Apache Airflow stelt je in staat om aangepaste runtimeparameters in te voeren in je DAGs en taken, waardoor dynamische en flexibele workflowconfiguraties mogelijk worden. Params geven je de mogelijkheid om runtimeconfiguratie aan taken door te geven. Je kunt standaard Params definiëren in je DAG-code en extra Params toevoegen of Paramwaarden overschrijven bij het activeren van een DAG-uitvoering.
Deze aanpak maakt je Airflow-workflows dynamischer en configurabeler, waardoor een breed scala aan automatiseringsscenario’s ondersteund wordt.
Hoe je aangepaste runtimeparameters kunt instellen
- Definieer
params
in de DAG:
Bij het maken van een DAG-object, voeg hetparams
-argument toe als een dictionary. Elke sleutel-waarde-paar vertegenwoordigt een parameter naam en zijn standaardwaarde.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def print_param(**kwargs):
# Toegang tot de parameter via kwargs['params']
my_param = kwargs['params']['my_param']
print(f"Mijn aangepaste parameter is: {my_param}")
with DAG(
dag_id='custom_params_example',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
params={'my_param': 'default_value'} # Aangepaste runtimeparameter
) as dag:
task = PythonOperator(
task_id='print_param_task',
python_callable=print_param,
provide_context=True
)
- Overschrijf tijdens runtime:
Bij het handmatig activeren van een DAG-uitvoering (via de Airflow-gebruikersinterface, CLI of API), kun jeparams
-waarden overschrijven voor die specifieke uitvoering.
Toegang tot parameters in taken
- In je taak’s Python-functie, toegang krijgen tot params via
kwargs['params']
. - Voor sjabloonvelden (zoals in BashOperator), gebruik
{{ params.my_param }}
in de sjabloonstring.
Nuttige links
- https://airflow.apache.org
- https://github.com/apache/airflow
- https://en.wikipedia.org/wiki/Apache_Airflow
- uv - Nieuw Python-pakket, project en omgevingsbeheer
- Python Cheat Sheet
- AWS lambda prestaties: JavaScript vs Python vs Golang
- AWS SAM + AWS SQS + Python PowerTools
- venv Cheat Sheet
- PDF genereren in Python - Bibliotheken en voorbeelden"