Apache Airflow per MLOPS e ETL - Descrizione, vantaggi ed esempi

Buon framework per ETS/MLOPS con Python

Indice

Apache Airflow è una piattaforma open source progettata per creare, pianificare e monitorare i flussi di lavoro in modo programmabile, interamente in codice Python, offrendo un’alternativa flessibile e potente rispetto agli strumenti tradizionali, manuali o basati sull’interfaccia utente.

Apache Airflow è stato originariamente sviluppato da Airbnb nel 2014 per gestire flussi di lavoro sempre più complessi e è diventato un progetto di livello superiore dell’Apache Software Foundation nel 2019.

Airflow è particolarmente utile quando si hanno flussi di lavoro complessi con molte attività che devono essere eseguite in un ordine specifico, con dipendenze e gestione degli errori. È particolarmente utile per le organizzazioni che eseguono molti lavori sui dati che richiedono l’orchestrazione, il monitoraggio e i meccanismi di riprovata, piuttosto che semplici script o lavori cron.

Catena di eventi informatici

Funzionalità principali di Apache Airflow e casi d’uso tipici

  • Definizione del flusso di lavoro basata su Python: I flussi di lavoro vengono definiti come codice Python, permettendo la generazione dinamica di pipeline utilizzando costrutti di programmazione standard come cicli e logica condizionale.
  • Grafi aciclici diretti (DAG): Airflow utilizza DAG per rappresentare i flussi di lavoro, dove ogni nodo è un’attività e gli archi definiscono le dipendenze. Questa struttura garantisce che le attività vengano eseguite in un ordine specifico senza cicli.
  • Interfaccia utente robusta: Airflow fornisce un’interfaccia web moderna per il monitoraggio, la pianificazione e la gestione dei flussi di lavoro, offrendo visibilità sullo stato delle attività e sui log.
  • Integrazioni estese: Include molti operatori e hook predefiniti per connettersi a servizi cloud (AWS, GCP, Azure), database e ad altri strumenti, rendendolo altamente estensibile.
  • Scalabilità e flessibilità: Airflow può orchestrare flussi di lavoro su larga scala, supportando sia le distribuzioni on-premises che quelle in cloud, e si adatta a una vasta gamma di casi d’uso, dall’ETL all’apprendimento automatico.

Casi d’uso tipici

  • Orchestrare pipeline ETL (Extract, Transform, Load)
  • Pianificare e monitorare flussi di lavoro sui dati
  • Automatizzare l’addestramento e il deployment di modelli di machine learning
  • Gestire attività di infrastruttura

Servizi di Airflow gestiti

Vari fornitori di cloud offrono servizi gestiti di Airflow, riducendo il carico operativo di installazione e manutenzione:

  • Amazon Managed Workflows for Apache Airflow (MWAA): Un servizio completamente gestito che gestisce scalabilità, sicurezza e disponibilità, permettendo agli utenti di concentrarsi sulla creazione dei flussi di lavoro.
  • Google Cloud Composer: Airflow gestito su Google Cloud Platform.
  • Microsoft Fabric Data Factory: Offre lavori di Airflow come soluzione di orchestrazione gestita all’interno dell’ecosistema Azure.

Installazione e primo utilizzo

Airflow può essere installato tramite il gestore di pacchetti Python (pip install apache-airflow) o strumenti gestiti come Astro CLI, che semplificano l’installazione e la gestione dei progetti. Dopo l’installazione, gli utenti definiscono DAG in script Python e li gestiscono tramite l’interfaccia utente di Airflow.

Riassunto

Funzionalità Descrizione
Definizione del flusso di lavoro Codice Python, basato su DAG
Interfaccia utente Web-based, moderna, robusta
Integrazioni Cloud (AWS, GCP, Azure), database, plugin personalizzati
Servizi gestiti AWS MWAA, Google Cloud Composer, Microsoft Fabric
Casi d’uso ETL, pipeline dati, flussi di lavoro ML, attività di infrastruttura

Airflow è ampiamente adottato nella comunità degli ingegneri dei dati per la sua flessibilità, scalabilità e forte ecosistema, rendendolo una scelta leader per l’orchestrazione dei flussi di lavoro in ambienti di produzione.

Modo principale in cui Airflow semplifica l’automazione dei flussi di lavoro

  • Flussi di lavoro come codice (Python)
    I flussi di lavoro di Airflow vengono definiti come script Python, utilizzando Grafi aciclici diretti (DAG) per rappresentare le attività e le loro dipendenze. Questo approccio “flussi di lavoro come codice” consente la generazione dinamica, la parametrizzazione e il controllo versione facile delle pipeline, rendendole altamente mantenibili e adattabili ai requisiti in cambiamento.

  • Pipeline dinamiche ed estensibili
    Sfruttando le capacità complete di Python, gli utenti possono incorporare cicli, condizionali e logica personalizzata direttamente nelle definizioni dei flussi di lavoro. Questo permette la creazione dinamica di attività e una parametrizzazione avanzata che sarebbe difficile o impossibile in file di configurazione statici o strumenti basati su interfaccia grafica.

  • Gestione delle attività Pythonic
    L’API TaskFlow di Airflow (introdotta in Airflow 2.0) rende la definizione delle attività ancora più Pythonic. Gli sviluppatori scrivono semplici funzioni Python, le decorano e Airflow gestisce automaticamente la creazione delle attività, il collegamento delle dipendenze e il passaggio dei dati tra le attività, ottenendo un codice più pulito e mantenibile.

  • Operatori e integrazioni personalizzati
    Gli utenti possono creare operatori, sensori e hook personalizzati in Python per interagire con qualsiasi sistema esterno, API o database. Questa estensibilità consente un’integrazione senza sforzo con l’ampio ecosistema Python e i servizi esterni.

  • Integrazione nativa con l’ecosistema Python
    Poiché i flussi di lavoro vengono scritti in Python, gli utenti possono sfruttare l’ampia gamma di librerie Python (come Pandas, NumPy o framework di machine learning) all’interno delle attività, migliorando ulteriormente le capacità di automazione.

  • Leggibilità e manutenibilità
    La leggibilità di Python e la sua popolarità rendono Airflow accessibile a un ampio numero di utenti, dagli ingegneri dei dati agli analisti. L’approccio basato su codice supporta anche pratiche standard dell’ingegneria del software come le revisioni del codice e il controllo delle versioni.

Alcuni vantaggi di Apache Airflow

Funzionalità Beneficio per l’automazione dei flussi di lavoro
DAG basati su Python Flussi di lavoro dinamici, flessibili e mantenibili
API TaskFlow Definizioni dei flussi di lavoro più pulite e Pythonic
Operatori/Sensori personalizzati Integrazione con qualsiasi sistema o API
Integrazione nativa con Python Utilizzo di qualsiasi libreria o strumento Python
Interfaccia utente robusta Monitoraggio e gestione visiva dei flussi di lavoro

L’integrazione profonda di Airflow con Python non solo semplifica l’automazione dei flussi di lavoro, ma consente anche ai team di costruire facilmente pipeline dati robuste, scalabili e altamente personalizzate.

Esempio: semplice pipeline ETL in Python

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract_data():
    print("Estrazione dei dati")

def transform_data():
    print("Trasformazione dei dati")

def load_data():
    print("Caricamento dei dati")

default_args = {'start_date': datetime(2024, 12, 1), 'retries': 1}

dag = DAG('etl_pipeline', default_args=default_args, schedule_interval='@daily')

extract_task = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load_data, dag=dag)

extract_task >> transform_task >> load_task

Questo esempio mostra come ogni fase di una pipeline ETL venga definita come una funzione Python e orchestrata da Airflow utilizzando operatori Python e DAG: extract_task >> transform_task >> load_task

Un altro esempio di DAG di Apache Airflow

Ecco un esempio base di codifica di un DAG (Grafo aciclico diretto) in Apache Airflow utilizzando Python. Questo esempio dimostra come definire un flusso di lavoro con due attività semplici utilizzando l’BashOperator, che esegue comandi bash:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='my_first_dag',
    default_args=default_args,
    description='Il mio primo DAG di Airflow!',
    schedule_interval='@daily',
    start_date=datetime(2023, 1, 1),
    catchup=False,
) as dag:

    # Task 1: Stampa la data corrente
    print_date = BashOperator(
        task_id='print_date',
        bash_command='date'
    )

    # Task 2: Dì ciao
    say_hello = BashOperator(
        task_id='say_hello',
        bash_command='echo "Ciao, Airflow!"'
    )

    # Definisci le dipendenze tra i task
    print_date >> say_hello

Come funziona:

  • Il DAG viene definito utilizzando un gestore di contesto (with DAG(...) as dag:).
  • Vengono create due attività: print_date e say_hello.
  • La dipendenza print_date >> say_hello garantisce che say_hello venga eseguito solo dopo che print_date è completata.
  • Salva questo codice come my_first_dag.py nella directory dags/ di Airflow, e Airflow rileverà automaticamente e pianificherà il DAG.

Puoi espandere questo modello aggiungendo attività con PythonOperator, ramificazioni o logica più complessa man mano che i tuoi flussi di lavoro crescono.

Parametri chiave quando si crea un oggetto DAG in Python

Quando si definisce un DAG in Apache Airflow, devono essere impostati diversi parametri chiave per garantire una corretta pianificazione, identificazione e comportamento del flusso di lavoro.

Parametri essenziali:

  • dag_id
    L’identificatore unico per il tuo DAG. Ogni DAG nell’ambiente Airflow deve avere un dag_id distinto.

  • start_date
    La data e l’ora in cui il DAG diventa idoneo all’esecuzione. Questo determina quando inizia la pianificazione del DAG.

  • schedule (o schedule_interval)
    Definisce con quale frequenza il DAG dovrebbe essere eseguito (es. "@daily", "0 12 * * *", o None per esecuzioni manuali).

  • catchup
    Un booleano che determina se Airflow deve recuperare le esecuzioni mancanti tra la start_date e la data corrente quando il DAG viene abilitato per la prima volta. Predefinito a False.

Altri parametri comuni:

  • default_args
    Un dizionario di argomenti predefiniti applicati a tutte le attività all’interno del DAG (es. owner, email, retries, retry_delay). Questo è opzionale ma consigliato per il principio DRY (Don’t Repeat Yourself).

  • params
    Un dizionario per i parametri di configurazione in tempo reale, permettendo di passare valori alle attività in tempo reale e rendendo i DAG più flessibili.

Esempio:

from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    'owner': 'jdoe',
    'email': ['jdoe@example.com'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='example_dag',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    default_args=default_args,
    params={'example_param': 'value'}
) as dag:
    # Definisci le attività qui
    pass

Tabella dei parametri del DAG

Parametro Descrizione Obbligatorio
dag_id Nome unico per il DAG
start_date Quando il DAG è idoneo per l’esecuzione
schedule Frequenza/tempo per le esecuzioni del DAG
catchup Se recuperare le esecuzioni mancanti No
default_args Argomenti predefiniti per tutte le attività nel DAG No
params Parametri di configurazione in tempo reale per una configurazione dinamica No

Impostare questi parametri garantisce che il tuo DAG venga identificato in modo univoco, pianificato correttamente e si comporti come previsto in Airflow.

Inclusione di parametri di runtime personalizzati utilizzando l’argomento params in Airflow

L’argomento params di Apache Airflow permette di iniettare parametri di runtime personalizzati nei tuoi DAG e attività, abilitando configurazioni di flusso di lavoro dinamiche e flessibili. I parametri permettono di fornire configurazioni di runtime alle attività. Puoi configurare parametri predefiniti nel codice del tuo DAG e fornire parametri aggiuntivi, o sovrascrivere i valori dei parametri quando si avvia un’esecuzione del DAG.

Questo approccio rende i tuoi flussi di lavoro di Airflow più dinamici e configurabili, supportando una vasta gamma di scenari di automazione.

Come impostare parametri di runtime personalizzati

  • Definisci params nel DAG:
    Quando si crea un oggetto DAG, includi l’argomento params come un dizionario. Ogni coppia chiave-valore rappresenta il nome di un parametro e il valore predefinito.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def print_param(**kwargs):
    # Accedi al parametro tramite kwargs['params']
    my_param = kwargs['params']['my_param']
    print(f"Il mio parametro personalizzato è: {my_param}")

with DAG(
    dag_id='custom_params_example',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    params={'my_param': 'default_value'}  # Parametro di runtime personalizzato
) as dag:

    task = PythonOperator(
        task_id='print_param_task',
        python_callable=print_param,
        provide_context=True
    )
  • Sovrascrivi in tempo reale:
    Quando si avvia manualmente un’esecuzione del DAG (tramite l’interfaccia utente di Airflow, CLI o API), puoi fornire o sovrascrivere i valori params per quell’esecuzione specifica.

Accesso ai parametri nelle attività

  • Nella funzione Python dell’attività, accedi ai parametri utilizzando kwargs['params'].
  • Per i campi modellati (come in BashOperator), utilizza {{ params.my_param }} nella stringa del modello.