Apache Airflow pour MLOPS et ETL - Description, avantages et exemples

Belle framework pour ETS/MLOPS avec Python

Sommaire

Apache Airflow est une plateforme open source conçue pour créer, planifier et surveiller des workflows de manière programmée, entièrement en code Python, offrant une alternative flexible et puissante aux outils traditionnels, manuels ou basés sur une interface graphique.

Apache Airflow a été initialement développé chez Airbnb en 2014 pour gérer des workflows de plus en plus complexes et est devenu un projet de niveau supérieur de la fondation Apache Software en 2019.

Airflow est le plus bénéfique lorsque vous avez des workflows complexes avec plusieurs tâches qui doivent être exécutées dans un ordre spécifique, avec des dépendances et un traitement des erreurs. Il est particulièrement utile pour les organisations qui exécutent de nombreuses tâches de données nécessitant une orchestration, un suivi et des mécanismes de réessai, plutôt que des scripts ou des tâches cron simples.

Chaîne d’événements cyber

Fonctionnalités principales d’Apache Airflow et cas d’usage typiques

  • Définition des workflows basée sur Python : Les workflows sont définis comme du code Python, permettant une génération dynamique de pipelines à l’aide de constructions de programmation standard telles que les boucles et la logique conditionnelle.
  • Graphes acycliques dirigés (DAGs) : Airflow utilise des DAGs pour représenter les workflows, où chaque nœud est une tâche et les arêtes définissent les dépendances. Cette structure garantit que les tâches s’exécutent dans un ordre spécifié sans cycles.
  • Interface utilisateur robuste : Airflow fournit une interface web moderne pour surveiller, planifier et gérer les workflows, offrant une visibilité sur l’état des tâches et les journaux.
  • Intégrations étendues : Il inclut de nombreux opérateurs et hooks intégrés pour se connecter aux services cloud (AWS, GCP, Azure), aux bases de données et à d’autres outils, le rendant hautement extensible.
  • Évolutivité et flexibilité : Airflow peut orchestrer des workflows à grande échelle, en soutenant à la fois les déploiements locaux et cloud, et convient à une large gamme de cas d’usage allant de l’ETL à l’apprentissage automatique.

Cas d’usage typiques

  • Orchestration de pipelines ETL (Extraction, Transformation, Chargement)
  • Planification et suivi de workflows de données
  • Automatisation de l’entraînement et du déploiement de modèles d’apprentissage automatique
  • Gestion des tâches d’infrastructure

Services d’Airflow gérés

Plusieurs fournisseurs de cloud proposent des services d’Airflow gérés, réduisant la charge opérationnelle liée à l’installation et à l’entretien :

  • Amazon Managed Workflows for Apache Airflow (MWAA) : Un service entièrement géré qui gère l’évolutivité, la sécurité et la disponibilité, permettant aux utilisateurs de se concentrer sur le développement des workflows.
  • Google Cloud Composer : Airflow géré sur Google Cloud Platform.
  • Microsoft Fabric Data Factory : Propose des tâches d’Airflow comme solution d’orchestration gérée au sein de l’écosystème Azure.

Installation et mise en route

Airflow peut être installé via le gestionnaire de paquets Python (pip install apache-airflow) ou des outils gérés comme Astro CLI, qui simplifient l’installation et la gestion des projets. Après l’installation, les utilisateurs définissent des DAGs dans des scripts Python et les gèrent via l’interface utilisateur d’Airflow.

Résumé

Fonctionnalité Description
Définition des workflows Code Python, basé sur DAGs
Interface utilisateur Web, moderne, robuste
Intégrations Cloud (AWS, GCP, Azure), bases de données, plugins personnalisés
Services gérés AWS MWAA, Google Cloud Composer, Microsoft Fabric
Cas d’usage ETL, pipelines de données, workflows d’apprentissage automatique, tâches d’infrastructure

Airflow est largement adopté dans la communauté d’ingénierie des données pour sa flexibilité, son évolutivité et son écosystème solide, le rendant un choix majeur pour l’orchestration de workflows dans les environnements de production.

Principales façons dont Airflow simplifie l’automatisation des workflows

  • Workflows comme code (Python)
    Les workflows d’Airflow sont définis comme des scripts Python, en utilisant des Graphes acycliques dirigés (DAGs) pour représenter les tâches et leurs dépendances. Cette approche “workflows comme code” permet une génération dynamique, une paramétrisation et un contrôle de version facile des pipelines, les rendant très maintenables et adaptables aux exigences changeantes.

  • Pipelines dynamiques et extensibles
    En exploitant pleinement les capacités de Python, les utilisateurs peuvent intégrer des boucles, des conditions et de la logique personnalisée directement dans leurs définitions de workflow. Cela permet une création dynamique de tâches et une paramétrisation avancée qui serait difficile ou impossible à réaliser avec des fichiers de configuration statiques ou des outils basés sur une interface graphique.

  • Gestion des tâches Pythonique
    L’API TaskFlow d’Airflow (introduite avec Airflow 2.0) rend la définition des tâches encore plus Pythonique. Les développeurs écrivent des fonctions Python simples, les décorent, et Airflow gère automatiquement la création de tâches, le câblage des dépendances et le passage de données entre les tâches, ce qui donne un code plus propre et plus maintenable.

  • Opérateurs et intégrations personnalisés
    Les utilisateurs peuvent créer des opérateurs, des capteurs et des hooks personnalisés en Python pour interagir avec presque tout système externe, API ou base de données. Cette extensibilité permet une intégration fluide avec l’écosystème Python plus large et les services externes.

  • Intégration native avec l’écosystème Python
    Puisque les workflows sont écrits en Python, les utilisateurs peuvent exploiter l’immense variété de bibliothèques Python (comme Pandas, NumPy ou des cadres d’apprentissage automatique) au sein de leurs tâches, renforçant ainsi les capacités d’automatisation.

  • Lisible et maintenable
    La lisibilité de Python et sa popularité rendent Airflow accessible à un large éventail d’utilisateurs, des ingénieurs en données aux analystes. L’approche basée sur le code soutient également les pratiques standard de l’ingénierie logicielle comme les revues de code et le contrôle de version.

Quelques avantages d’Apache Airflow

Fonctionnalité Avantage pour l’automatisation des workflows
DAGs basés sur Python Workflows dynamiques, flexibles et maintenables
API TaskFlow Définitions de workflows plus propres et plus Pythoniques
Opérateurs/Senseurs personnalisés Intégration avec tout système ou API
Intégration native avec Python Utilisation de toute bibliothèque ou outil Python
Interface utilisateur robuste Surveillance et gestion visuelle des workflows

L’intégration approfondie d’Airflow avec Python simplifie non seulement l’automatisation des workflows, mais permet également aux équipes de construire efficacement des pipelines de données robustes, évolutifs et hautement personnalisés.

Exemple : workflow ETL simple en Python

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract_data():
    print("Extraction des données")

def transform_data():
    print("Transformation des données")

def load_data():
    print("Chargement des données")

default_args = {'start_date': datetime(2024, 12, 1), 'retries': 1}

dag = DAG('etl_pipeline', default_args=default_args, schedule_interval='@daily')

extract_task = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load_data, dag=dag)

extract_task >> transform_task >> load_task

Cet exemple montre comment chaque étape d’un pipeline ETL est définie comme une fonction Python et orchestrée par Airflow à l’aide d’opérateurs et de DAGs :
extract_task >> transform_task >> load_task

Autre exemple d’un DAG d’Apache Airflow

Voici un exemple de base de codage d’un DAG (Directed Acyclic Graph) dans Apache Airflow à l’aide de Python. Cet exemple illustre comment définir un workflow avec deux tâches simples à l’aide de l’opérateur BashOperator, qui exécute des commandes bash :

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='my_first_dag',
    default_args=default_args,
    description='Mon premier DAG Airflow !',
    schedule_interval='@daily',
    start_date=datetime(2023, 1, 1),
    catchup=False,
) as dag:

    # Tâche 1 : Imprimer la date actuelle
    print_date = BashOperator(
        task_id='print_date',
        bash_command='date'
    )

    # Tâche 2 : Dire bonjour
    say_hello = BashOperator(
        task_id='say_hello',
        bash_command='echo "Bonjour, Airflow !"'
    )

    # Définir les dépendances des tâches
    print_date >> say_hello

Fonctionnement :

  • Le DAG est défini à l’aide d’un gestionnaire de contexte (with DAG(...) as dag:).
  • Deux tâches sont créées : print_date et say_hello.
  • La dépendance print_date >> say_hello garantit que say_hello s’exécute uniquement après que print_date ait terminé.
  • Enregistrez ce code comme my_first_dag.py dans votre répertoire dags/ d’Airflow, et Airflow détectera et planifiera automatiquement ce DAG.

Vous pouvez étendre ce modèle en ajoutant des tâches PythonOperator, des branches ou de la logique plus complexe à mesure que vos workflows s’élargissent.

Paramètres clés lors de la création d’un objet DAG en Python

Lors de la définition d’un DAG dans Apache Airflow, plusieurs paramètres clés doivent être configurés pour garantir un planification, une identification et un comportement corrects de votre workflow.

Paramètres essentiels :

  • dag_id
    L’identifiant unique de votre DAG. Chaque DAG dans votre environnement Airflow doit avoir un dag_id distinct.

  • start_date
    La date et l’heure à partir desquelles le DAG devient éligible à l’exécution. Cela détermine à partir de quand le planification commence pour le DAG.

  • schedule (ou schedule_interval)
    Définit aussi souvent le DAG doit s’exécuter (par exemple, "@daily", "0 12 * * *", ou None pour les exécutions manuelles).

  • catchup
    Un booléen qui détermine si Airflow doit rattraper les exécutions manquées entre la start_date et la date actuelle lorsque le DAG est activé pour la première fois. Par défaut, il est False.

Autres paramètres courants :

  • default_args
    Un dictionnaire de paramètres par défaut appliqués à toutes les tâches du DAG (par exemple, owner, email, retries, retry_delay). C’est optionnel mais recommandé pour le code DRY (Don’t Repeat Yourself).

  • params
    Un dictionnaire pour les paramètres de configuration en temps réel, permettant de passer des valeurs aux tâches en temps réel et rendant vos DAGs plus flexibles.

Exemple :

from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    'owner': 'jdoe',
    'email': ['jdoe@example.com'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='example_dag',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    default_args=default_args,
    params={'example_param': 'value'}
) as dag:
    # Définir les tâches ici
    pass

Tableau des paramètres de DAG

Paramètre Description Obligatoire
dag_id Nom unique pour le DAG Oui
start_date Quand le DAG est éligible pour commencer à s’exécuter Oui
schedule Fréquence/temps d’exécution du DAG Oui
catchup Si les exécutions manquées doivent être rattrapées Non
default_args Arguments par défaut pour toutes les tâches du DAG Non
params Paramètres de configuration en temps réel pour une flexibilité accrue Non

La configuration de ces paramètres garantit que votre DAG est correctement identifié, planifié et se comporte comme prévu dans Airflow.

Inclusion de paramètres de temps d’exécution personnalisés à l’aide de l’argument params dans Airflow

L’argument params d’Apache Airflow permet d’injecter des paramètres de temps d’exécution personnalisés dans vos DAGs et tâches, permettant des configurations de workflow dynamiques et flexibles. Les paramètres permettent de fournir une configuration de temps d’exécution aux tâches. Vous pouvez définir des paramètres par défaut dans votre code DAG et fournir des paramètres supplémentaires, ou remplacer les valeurs des paramètres lors de la déclenchement d’une exécution de DAG.

Cette approche rend vos workflows Airflow plus dynamiques et configurables, soutenant une large variété de scénarios d’automatisation.

Comment définir des paramètres de temps d’exécution personnalisés

  • Définir params dans le DAG :
    Lors de la création d’un objet DAG, incluez l’argument params comme un dictionnaire. Chaque paire clé-valeur représente le nom d’un paramètre et sa valeur par défaut.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def print_param(**kwargs):
    # Accédez au paramètre via kwargs['params']
    my_param = kwargs['params']['my_param']
    print(f"Mon paramètre personnalisé est : {my_param}")

with DAG(
    dag_id='custom_params_example',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    params={'my_param': 'default_value'}  # Paramètre de temps d'exécution personnalisé
) as dag:

    task = PythonOperator(
        task_id='print_param_task',
        python_callable=print_param,
        provide_context=True
    )
  • Remplacer en temps d’exécution :
    Lorsque vous déclenchez manuellement une exécution de DAG (via l’interface utilisateur d’Airflow, le CLI ou l’API), vous pouvez fournir ou remplacer les valeurs params pour cette exécution spécifique.

Accès aux paramètres dans les tâches

  • Dans votre fonction callable Python de la tâche, accédez aux paramètres à l’aide de kwargs['params'].
  • Pour les champs modèles (comme dans BashOperator), utilisez {{ params.my_param }} dans la chaîne de modèle.

Liens utiles