Apache Airflow voor MLOPS en ETL - Omschrijving, voordelen en voorbeelden

Mooi kader voor ETS/MLOPS met Python

Inhoud

Apache Airflow is een open-source platform dat is ontworpen om workflows programmatisch te schrijven, te plannen en te monitoren, volledig in Python-code, en biedt een flexibele en krachtige alternatief voor traditionele, manuele of UI-gebaseerde workflow-tools.

Apache Airflow is oorspronkelijk ontwikkeld bij Airbnb in 2014 om steeds complexere workflows te beheren en werd in 2019 een top-level project van de Apache Software Foundation.

Airflow is het meest nuttig wanneer je complexe workflows hebt met meerdere taken die in een specifieke volgorde moeten worden uitgevoerd, met afhankelijkheden en foutafhandeling. Het is vooral handig voor organisaties die veel data-taken uitvoeren die orkestratie, monitoring en herstartmechanismen vereisen, in plaats van eenvoudige scripts of cron-taken.

Chain oof cyber events

Belangrijke functies van Apache Airflow en typische toepassingen

  • Workflowdefinitie op basis van Python: Workflows worden gedefinieerd als Python-code, waardoor dynamische pipelinegeneratie mogelijk is met behulp van standaard programmeerconstructies zoals lussen en voorwaardelijke logica.
  • Gerichte acyclische grafieken (DAGs): Airflow gebruikt DAGs om workflows weer te geven, waarbij elk knooppunt een taak is en de randen afhankelijkheden definiëren. Deze structuur zorgt ervoor dat taken in een opgegeven volgorde worden uitgevoerd zonder cycli.
  • Robuuste gebruikersinterface: Airflow biedt een moderne webinterface voor het monitoren, plannen en beheren van workflows, met inzicht in de status van taken en logboeken.
  • Uitgebreide integraties: Het bevat veel ingebouwde operatoren en hooks om verbinding te maken met cloudservices (AWS, GCP, Azure), databases en andere tools, waardoor het zeer uitbreidbaar is.
  • Schaalbaarheid en flexibiliteit: Airflow kan workflows op schaal orkestreren, ondersteunt zowel on-premises als cloudimplementaties, en is geschikt voor een breed scala aan toepassingen, van ETL tot machine learning.

Typische toepassingen

  • Orkestratie van ETL (Extract, Transform, Load)-pipelines
  • Plannen en monitoren van data-workflows
  • Automatiseren van het trainen en implementeren van machine learningmodellen
  • Beheren van infrastructuurtaken

Beheerde Airflow-diensten

Verschillende cloudproviders bieden beheerde Airflow-diensten aan, die de operationele last van installatie en onderhoud verminderen:

  • Amazon Managed Workflows for Apache Airflow (MWAA): Een volledig beheerde dienst die schaalbaarheid, beveiliging en beschikbaarheid afhandelt, waardoor gebruikers zich kunnen richten op workflowontwikkeling.
  • Google Cloud Composer: Beheerde Airflow op Google Cloud Platform.
  • Microsoft Fabric Data Factory: Biedt Apache Airflow-taken aan als een beheerde orkestratiedienst binnen het Azure-ecosysteem.

Installatie en aan de slag gaan

Airflow kan worden geïnstalleerd via Python’s pakketbeheerder (pip install apache-airflow) of beheerhulpmiddelen zoals Astro CLI, wat de installatie en projectbeheer vereenvoudigt. Na installatie definiëren gebruikers DAGs in Pythonscripts en beheren ze deze via de Airflow-gebruikersinterface.

Samenvatting

Kenmerk Beschrijving
Workflowdefinitie Python-code, DAG-gebaseerd
Gebruikersinterface Webgebaseerd, modern, robuust
Integraties Cloud (AWS, GCP, Azure), databases, aangepaste plugins
Beheerde diensten AWS MWAA, Google Cloud Composer, Microsoft Fabric
Toepassingen ETL, datapipelines, ML-workflows, infrastructuurtaken

Airflow wordt breed gebruikt in de data-engineeringgemeenschap vanwege zijn flexibiliteit, schaalbaarheid en sterke ecosystem, waardoor het een vooraanstaand keuze is voor workfloworkestratie in productieomgevingen.

Belangrijkste manieren waarop Airflow workflowautomatisering vereenvoudigt

  • Workflows als code (Python)
    Airflow-workflows worden gedefinieerd als Pythonscripts, met behulp van gerichte acyclische grafieken (DAGs) om taken en hun afhankelijkheden weer te geven. Deze “workflows als code” aanpak maakt dynamische generatie, parameterisatie en eenvoudige versiebeheer van pipelines mogelijk, waardoor ze zeer onderhoudbaar en aanpasbaar zijn aan veranderende eisen.

  • Dynamische en uitbreidbare pipelines
    Door gebruik te maken van de volledige mogelijkheden van Python kunnen gebruikers lussen, voorwaarden en aangepaste logica direct in hun workflowdefinities opnemen. Dit maakt het mogelijk om dynamische taakcreatie en geavanceerde parameterisatie te realiseren, wat in statische configuratiebestanden of GUI-tools lastig of onmogelijk zou zijn.

  • Python-achtige taakbeheer
    Airflow’s TaskFlow API (geïntroduceerd in Airflow 2.0) maakt het definiëren van taken nog Python-achtiger. Ontwikkelaars schrijven gewone Pythonfuncties, markeren deze, en Airflow zorgt automatisch voor taakcreatie, afhankelijkheidsschakeling en datapassing tussen taken, wat leidt tot schoner en onderhoudbaarder code.

  • Aangepaste operatoren en integraties
    Gebruikers kunnen aangepaste Python-operatoren, sensoren en hooks aanmaken om interactie mogelijk te maken met vrijwel elk extern systeem, API of database. Deze uitbreidbaarheid maakt een naadloze integratie met het brede Python-ecosysteem en externe diensten mogelijk.

  • Integratie met het native Python-ecosysteem
    Omdat workflows in Python worden geschreven, kunnen gebruikers gebruik maken van de enorme hoeveelheid Pythonbibliotheken (zoals Pandas, NumPy of machine learningframeworks) binnen hun taken, wat de automatiseringsmogelijkheden verder versterkt.

  • Leesbaar en onderhoudbaar
    De leesbaarheid en populariteit van Python maken Airflow toegankelijk voor een breed scala aan gebruikers, van data-engineers tot analisten. De codegebaseerde aanpak ondersteunt ook standaard software-engineeringpraktijken zoals codebeoordeling en versiebeheer.

Enkele voordelen van Apache Airflow

Kenmerk Voordelen voor workflowautomatisering
Python-gebaseerde DAGs Dynamische, flexibele en onderhoudbare workflows
TaskFlow API Schoner, meer Python-achtige workflowdefinities
Aangepaste operatoren/sensoren Integreer met elk systeem of API
Native Python integratie Gebruik elke Pythonbibliothek of tool
Robuuste gebruikersinterface Monitor en beheer workflows visueel

De diepe integratie van Airflow met Python vereenvoudigt niet alleen workflowautomatisering, maar geeft ook teams de mogelijkheid om robuuste, schaalbare en zeer aangepaste datapipelines efficiënt te bouwen.

Voorbeeld: Eenvoudige ETL-workflow in Python

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract_data():
    print("Data extraheren")

def transform_data():
    print("Data transformeren")

def load_data():
    print("Data laden")

default_args = {'start_date': datetime(2024, 12, 1), 'retries': 1}

dag = DAG('etl_pipeline', default_args=default_args, schedule_interval='@daily')

extract_task = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load_data, dag=dag)

extract_task >> transform_task >> load_task

Dit voorbeeld laat zien hoe elk stadium van een ETL-pipeline als een Pythonfunctie wordt gedefinieerd en wordt georkestratieerd door Airflow met behulp van Python-operatoren en DAGs: extract_task >> transform_task >> load_task

Een ander voorbeeld van een Apache Airflow DAG

Hieronder volgt een basisvoorbeeld van het coderen van een DAG (Directed Acyclic Graph) in Apache Airflow met behulp van Python. Dit voorbeeld laat zien hoe je een workflow kunt definiëren met twee eenvoudige taken met behulp van de BashOperator, die bash-opdrachten uitvoert:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='my_first_dag',
    default_args=default_args,
    description='Mijn eerste Airflow DAG!',
    schedule_interval='@daily',
    start_date=datetime(2023, 1, 1),
    catchup=False,
) as dag:

    # Taak 1: Druk de huidige datum af
    print_date = BashOperator(
        task_id='print_date',
        bash_command='date'
    )

    # Taak 2: Zeg Hallo
    say_hello = BashOperator(
        task_id='say_hello',
        bash_command='echo "Hallo, Airflow!"'
    )

    # Definieer taakafhankelijkheden
    print_date >> say_hello

Hoe dit werkt:

  • De DAG wordt gedefinieerd met behulp van een contextmanager (with DAG(...) as dag:).
  • Er worden twee taken gecreëerd: print_date en say_hello.
  • De afhankelijkheid print_date >> say_hello zorgt ervoor dat say_hello pas wordt uitgevoerd nadat print_date is voltooid.
  • Sla deze code op als my_first_dag.py in je Airflow dags/-map, en Airflow zal deze automatisch detecteren en plannen.

Je kunt dit sjabloon uitbreiden door PythonOperator-taken, takkingen of complexere logica toe te voegen naarmate je workflows groeien.

Belangrijke parameters bij het maken van een DAG-object in Python

Bij het definiëren van een DAG in Apache Airflow moeten verschillende belangrijke parameters worden ingesteld om ervoor te zorgen dat je workflow correct wordt gepland, geïdentificeerd en gedragen.

Essentiële parameters:

  • dag_id
    Het unieke identificatienummer voor je DAG. Elke DAG in je Airflow-omgeving moet een uniek dag_id hebben.

  • start_date
    De datum en tijd waarop de DAG geschikt is voor uitvoering. Dit bepaalt wanneer de planning van de DAG begint.

  • schedule (of schedule_interval)
    Definieert hoe vaak de DAG moet worden uitgevoerd (bijvoorbeeld "@daily", "0 12 * * *", of None voor handmatige uitvoering).

  • catchup
    Een boolean die bepaalt of Airflow moet compenseren voor gemiste uitvoeringen tussen de start_date en de huidige datum wanneer de DAG voor het eerst wordt ingeschakeld. Standaard is dit False.

Andere veelgebruikte parameters:

  • default_args
    Een dictionary van standaardargumenten die worden toegepast op alle taken binnen de DAG (bijvoorbeeld owner, email, retries, retry_delay). Dit is optioneel, maar wordt aanbevolen voor DRY (Don’t Repeat Yourself) code.

  • params
    Een dictionary voor runtimeconfiguratieparameters, waarmee je waarden kunt doorgeven aan taken tijdens runtime en je DAGs flexibeler kunt maken.

Voorbeeld:

from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    'owner': 'jdoe',
    'email': ['jdoe@example.com'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='example_dag',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    default_args=default_args,
    params={'example_param': 'value'}
) as dag:
    # Definieer taken hier
    pass

Tabel van DAG-parameters

Parameter Beschrijving Vereist
dag_id Unieke naam voor de DAG Ja
start_date Wanneer de DAG geschikt is om te starten Ja
schedule Frequentie/tijd voor DAG-uitvoering Ja
catchup Of gemiste uitvoeringen moeten worden gecompenseerd Nee
default_args Standaardargumenten voor alle taken in de DAG Nee
params Runtimeparameters voor dynamische configuratie Nee

Instellen van deze parameters zorgt ervoor dat je DAG uniek wordt geïdentificeerd, correct wordt gepland en zich zoals bedoeld gedraagt in Airflow.

Inclusie van aangepaste runtimeparameters met behulp van het params-argument in Airflow

Het params-argument van Apache Airflow stelt je in staat om aangepaste runtimeparameters in te voeren in je DAGs en taken, waardoor dynamische en flexibele workflowconfiguraties mogelijk worden. Params geven je de mogelijkheid om runtimeconfiguratie aan taken door te geven. Je kunt standaard Params definiëren in je DAG-code en extra Params toevoegen of Paramwaarden overschrijven bij het activeren van een DAG-uitvoering.

Deze aanpak maakt je Airflow-workflows dynamischer en configurabeler, waardoor een breed scala aan automatiseringsscenario’s ondersteund wordt.

Hoe je aangepaste runtimeparameters kunt instellen

  • Definieer params in de DAG:
    Bij het maken van een DAG-object, voeg het params-argument toe als een dictionary. Elke sleutel-waarde-paar vertegenwoordigt een parameter naam en zijn standaardwaarde.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def print_param(**kwargs):
    # Toegang tot de parameter via kwargs['params']
    my_param = kwargs['params']['my_param']
    print(f"Mijn aangepaste parameter is: {my_param}")

with DAG(
    dag_id='custom_params_example',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    params={'my_param': 'default_value'}  # Aangepaste runtimeparameter
) as dag:

    task = PythonOperator(
        task_id='print_param_task',
        python_callable=print_param,
        provide_context=True
    )
  • Overschrijf tijdens runtime:
    Bij het handmatig activeren van een DAG-uitvoering (via de Airflow-gebruikersinterface, CLI of API), kun je params-waarden overschrijven voor die specifieke uitvoering.

Toegang tot parameters in taken

  • In je taak’s Python-functie, toegang krijgen tot params via kwargs['params'].
  • Voor sjabloonvelden (zoals in BashOperator), gebruik {{ params.my_param }} in de sjabloonstring.