Apache Airflow for MLOPS and ETL - Description, Benefits and Examples

Nice framework for ETS/MLOPS with Python

Page content

Apache Airflow is an open-source platform designed to programmatically author, schedule, and monitor workflows, - entirely in Python code, offering a flexible and powerful alternative to traditional, manual, or UI-based workflow tools.

Apache Airflow was originally developed at Airbnb in 2014 to manage increasingly complex workflows and became an Apache Software Foundation top-level project in 2019.

Airflow is most beneficial when you have complex workflows with multiple tasks that need to be executed in a specific order, with dependencies and error handling. It is particularly useful for organizations running many data jobs that require orchestration, monitoring, and retry mechanisms, rather than simple scripts or cron jobs.

Chain oof cyber events

Apache Airflow Key Features and Typical Use Cases

  • Python-Based Workflow Definition: Workflows are defined as Python code, allowing for dynamic pipeline generation using standard programming constructs such as loops and conditional logic.
  • Directed Acyclic Graphs (DAGs): Airflow uses DAGs to represent workflows, where each node is a task and edges define dependencies. This structure ensures tasks run in a specified order without cycles.
  • Robust UI: Airflow provides a modern web interface for monitoring, scheduling, and managing workflows, offering visibility into task status and logs.
  • Extensive Integrations: It includes many built-in operators and hooks to connect with cloud services (AWS, GCP, Azure), databases, and other tools, making it highly extensible.
  • Scalability and Flexibility: Airflow can orchestrate workflows at scale, supporting both on-premises and cloud deployments, and is suitable for a wide range of use cases from ETL to machine learning.

Typical Use Cases

  • Orchestrating ETL (Extract, Transform, Load) pipelines
  • Scheduling and monitoring data workflows
  • Automating machine learning model training and deployment
  • Managing infrastructure tasks

Managed Airflow Services

Several cloud providers offer managed Airflow services, reducing the operational burden of setup and maintenance:

  • Amazon Managed Workflows for Apache Airflow (MWAA): A fully managed service that handles scaling, security, and availability, allowing users to focus on workflow development.
  • Google Cloud Composer: Managed Airflow on Google Cloud Platform.
  • Microsoft Fabric Data Factory: Offers Apache Airflow jobs as a managed orchestration solution within the Azure ecosystem.

Installation and Getting Started

Airflow can be installed via Python’s package manager (pip install apache-airflow) or managed tools like Astro CLI, which simplifies setup and project management. After installation, users define DAGs in Python scripts and manage them through the Airflow UI.

Summary

Feature Description
Workflow Definition Python code, DAG-based
UI Web-based, modern, robust
Integrations Cloud (AWS, GCP, Azure), databases, custom plugins
Managed Services AWS MWAA, Google Cloud Composer, Microsoft Fabric
Use Cases ETL, data pipelines, ML workflows, infrastructure tasks

Airflow is widely adopted in the data engineering community for its flexibility, scalability, and strong ecosystem, making it a leading choice for workflow orchestration in production environments.

Key Ways Airflow Simplifies Workflow Automation

  • Workflows as Code (Python)
    Airflow workflows are defined as Python scripts, using Directed Acyclic Graphs (DAGs) to represent tasks and their dependencies. This “workflows as code” approach enables dynamic generation, parameterization, and easy version control of pipelines, making them highly maintainable and adaptable to changing requirements.

  • Dynamic and Extensible Pipelines
    By leveraging Python’s full capabilities, users can incorporate loops, conditionals, and custom logic directly into their workflow definitions. This allows for dynamic task creation and advanced parameterization that would be difficult or impossible in static configuration files or GUI-based tools.

  • Pythonic Task Management
    Airflow’s TaskFlow API (introduced in Airflow 2.0) makes defining tasks even more Pythonic. Developers write plain Python functions, decorate them, and Airflow automatically handles task creation, dependency wiring, and data passing between tasks, resulting in cleaner and more maintainable code.

  • Custom Operators and Integrations
    Users can create custom Python operators, sensors, and hooks to interact with virtually any external system, API, or database. This extensibility enables seamless integration with the broader Python ecosystem and external services.

  • Native Python Ecosystem Integration
    Because workflows are written in Python, users can leverage the vast array of Python libraries (such as Pandas, NumPy, or machine learning frameworks) within their tasks, further enhancing automation capabilities.

  • Readable and Maintainable
    Python’s readability and popularity make Airflow accessible to a wide range of users, from data engineers to analysts. The code-based approach also supports standard software engineering practices like code reviews and version control.

Some Apache Airflow Benefits

Feature Benefit for Workflow Automation
Python-based DAGs Dynamic, flexible, and maintainable workflows
TaskFlow API Cleaner, more Pythonic workflow definitions
Custom Operators/Sensors Integrate with any system or API
Native Python integration Use any Python library or tool
Robust UI Monitor and manage workflows visually

Airflow’s deep integration with Python not only simplifies workflow automation but also empowers teams to build robust, scalable, and highly customized data pipelines efficiently.

Example: Simple ETL Workflow in Python

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract_data():
    print("Extracting data")

def transform_data():
    print("Transforming data")

def load_data():
    print("Loading data")

default_args = {'start_date': datetime(2024, 12, 1), 'retries': 1}

dag = DAG('etl_pipeline', default_args=default_args, schedule_interval='@daily')

extract_task = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load_data, dag=dag)

extract_task >> transform_task >> load_task

This example shows how each stage of an ETL pipeline is defined as a Python function and orchestrated by Airflow using Python operators and DAGs: extract_task >> transform_task >> load_task

Another example of Apache Airflow DAG

Here is a basic example of coding a DAG (Directed Acyclic Graph) in Apache Airflow using Python. This example demonstrates how to define a workflow with two simple tasks using the BashOperator, which runs bash commands:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='my_first_dag',
    default_args=default_args,
    description='My first Airflow DAG!',
    schedule_interval='@daily',
    start_date=datetime(2023, 1, 1),
    catchup=False,
) as dag:

    # Task 1: Print the current date
    print_date = BashOperator(
        task_id='print_date',
        bash_command='date'
    )

    # Task 2: Say Hello
    say_hello = BashOperator(
        task_id='say_hello',
        bash_command='echo "Hello, Airflow!"'
    )

    # Define task dependencies
    print_date >> say_hello

How this works:

  • The DAG is defined using a context manager (with DAG(...) as dag:).
  • Two tasks are created: print_date and say_hello.
  • The dependency print_date >> say_hello ensures say_hello runs only after print_date completes.
  • Save this code as my_first_dag.py in your Airflow dags/ directory, and Airflow will automatically detect and schedule it.

You can expand on this template by adding PythonOperator tasks, branching, or more complex logic as your workflows grow.

Key Parameters When Creating a DAG Object in Python

When defining a DAG in Apache Airflow, several key parameters should be set to ensure proper scheduling, identification, and behavior of your workflow.

Essential Parameters:

  • dag_id
    The unique identifier for your DAG. Every DAG in your Airflow environment must have a distinct dag_id.

  • start_date
    The date and time when the DAG becomes eligible for execution. This determines when scheduling begins for the DAG.

  • schedule (or schedule_interval)
    Defines how often the DAG should run (e.g., "@daily", "0 12 * * *", or None for manual runs).

  • catchup
    A boolean that determines if Airflow should backfill missed runs between the start_date and the current date when the DAG is first enabled. Defaults to False.

Other Common Parameters:

  • default_args
    A dictionary of default arguments applied to all tasks within the DAG (e.g., owner, email, retries, retry_delay). This is optional but recommended for DRY (Don’t Repeat Yourself) code.

  • params
    A dictionary for runtime configuration parameters, allowing you to pass values to tasks at runtime and making your DAGs more flexible.

Example:

from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    'owner': 'jdoe',
    'email': ['jdoe@example.com'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='example_dag',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    default_args=default_args,
    params={'example_param': 'value'}
) as dag:
    # Define tasks here
    pass

Table of DAG parameters

Parameter Description Required
dag_id Unique name for the DAG Yes
start_date When the DAG is eligible to start running Yes
schedule Frequency/timing for DAG runs Yes
catchup Whether to backfill missed runs No
default_args Default arguments for all tasks in the DAG No
params Runtime parameters for dynamic configuration No

Setting these parameters ensures your DAG is uniquely identified, scheduled correctly, and behaves as intended in Airflow.

Including Custom Runtime Parameters Using the params Argument in Airflow

Apache Airflow’s params argument allows you to inject custom runtime parameters into your DAGs and tasks, enabling dynamic and flexible workflow configurations. Params enable you to provide runtime configuration to tasks. You can configure default Params in your DAG code and supply additional Params, or overwrite Param values when triggering a DAG run.

This approach makes your Airflow workflows more dynamic and configurable, supporting a wide variety of automation scenarios.

How to Set Custom Runtime Parameters

  • Define params in the DAG:
    When creating a DAG object, include the params argument as a dictionary. Each key-value pair represents a parameter name and its default value.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def print_param(**kwargs):
    # Access the parameter via kwargs['params']
    my_param = kwargs['params']['my_param']
    print(f"My custom parameter is: {my_param}")

with DAG(
    dag_id='custom_params_example',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    params={'my_param': 'default_value'}  # Custom runtime parameter
) as dag:

    task = PythonOperator(
        task_id='print_param_task',
        python_callable=print_param,
        provide_context=True
    )
  • Override at Runtime:
    When triggering a DAG run manually (via the Airflow UI, CLI, or API), you can supply or override params values for that specific run.

Accessing Parameters in Tasks

  • In your task’s Python callable, access params using kwargs['params'].
  • For templated fields (like in BashOperator), use {{ params.my_param }} in the template string.