Apache Airflow for MLOPS and ETL - Description, Benefits and Examples
Nice framework for ETS/MLOPS with Python
Apache Airflow is an open-source platform designed to programmatically author, schedule, and monitor workflows, - entirely in Python code, offering a flexible and powerful alternative to traditional, manual, or UI-based workflow tools.
Apache Airflow was originally developed at Airbnb in 2014 to manage increasingly complex workflows and became an Apache Software Foundation top-level project in 2019.
Airflow is most beneficial when you have complex workflows with multiple tasks that need to be executed in a specific order, with dependencies and error handling. It is particularly useful for organizations running many data jobs that require orchestration, monitoring, and retry mechanisms, rather than simple scripts or cron jobs.
Apache Airflow Key Features and Typical Use Cases
- Python-Based Workflow Definition: Workflows are defined as Python code, allowing for dynamic pipeline generation using standard programming constructs such as loops and conditional logic.
- Directed Acyclic Graphs (DAGs): Airflow uses DAGs to represent workflows, where each node is a task and edges define dependencies. This structure ensures tasks run in a specified order without cycles.
- Robust UI: Airflow provides a modern web interface for monitoring, scheduling, and managing workflows, offering visibility into task status and logs.
- Extensive Integrations: It includes many built-in operators and hooks to connect with cloud services (AWS, GCP, Azure), databases, and other tools, making it highly extensible.
- Scalability and Flexibility: Airflow can orchestrate workflows at scale, supporting both on-premises and cloud deployments, and is suitable for a wide range of use cases from ETL to machine learning.
Typical Use Cases
- Orchestrating ETL (Extract, Transform, Load) pipelines
- Scheduling and monitoring data workflows
- Automating machine learning model training and deployment
- Managing infrastructure tasks
Managed Airflow Services
Several cloud providers offer managed Airflow services, reducing the operational burden of setup and maintenance:
- Amazon Managed Workflows for Apache Airflow (MWAA): A fully managed service that handles scaling, security, and availability, allowing users to focus on workflow development.
- Google Cloud Composer: Managed Airflow on Google Cloud Platform.
- Microsoft Fabric Data Factory: Offers Apache Airflow jobs as a managed orchestration solution within the Azure ecosystem.
Installation and Getting Started
Airflow can be installed via Python’s package manager (pip install apache-airflow
) or managed tools like Astro CLI, which simplifies setup and project management. After installation, users define DAGs in Python scripts and manage them through the Airflow UI.
Summary
Feature | Description |
---|---|
Workflow Definition | Python code, DAG-based |
UI | Web-based, modern, robust |
Integrations | Cloud (AWS, GCP, Azure), databases, custom plugins |
Managed Services | AWS MWAA, Google Cloud Composer, Microsoft Fabric |
Use Cases | ETL, data pipelines, ML workflows, infrastructure tasks |
Airflow is widely adopted in the data engineering community for its flexibility, scalability, and strong ecosystem, making it a leading choice for workflow orchestration in production environments.
Key Ways Airflow Simplifies Workflow Automation
-
Workflows as Code (Python)
Airflow workflows are defined as Python scripts, using Directed Acyclic Graphs (DAGs) to represent tasks and their dependencies. This “workflows as code” approach enables dynamic generation, parameterization, and easy version control of pipelines, making them highly maintainable and adaptable to changing requirements. -
Dynamic and Extensible Pipelines
By leveraging Python’s full capabilities, users can incorporate loops, conditionals, and custom logic directly into their workflow definitions. This allows for dynamic task creation and advanced parameterization that would be difficult or impossible in static configuration files or GUI-based tools. -
Pythonic Task Management
Airflow’s TaskFlow API (introduced in Airflow 2.0) makes defining tasks even more Pythonic. Developers write plain Python functions, decorate them, and Airflow automatically handles task creation, dependency wiring, and data passing between tasks, resulting in cleaner and more maintainable code. -
Custom Operators and Integrations
Users can create custom Python operators, sensors, and hooks to interact with virtually any external system, API, or database. This extensibility enables seamless integration with the broader Python ecosystem and external services. -
Native Python Ecosystem Integration
Because workflows are written in Python, users can leverage the vast array of Python libraries (such as Pandas, NumPy, or machine learning frameworks) within their tasks, further enhancing automation capabilities. -
Readable and Maintainable
Python’s readability and popularity make Airflow accessible to a wide range of users, from data engineers to analysts. The code-based approach also supports standard software engineering practices like code reviews and version control.
Some Apache Airflow Benefits
Feature | Benefit for Workflow Automation |
---|---|
Python-based DAGs | Dynamic, flexible, and maintainable workflows |
TaskFlow API | Cleaner, more Pythonic workflow definitions |
Custom Operators/Sensors | Integrate with any system or API |
Native Python integration | Use any Python library or tool |
Robust UI | Monitor and manage workflows visually |
Airflow’s deep integration with Python not only simplifies workflow automation but also empowers teams to build robust, scalable, and highly customized data pipelines efficiently.
Example: Simple ETL Workflow in Python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract_data():
print("Extracting data")
def transform_data():
print("Transforming data")
def load_data():
print("Loading data")
default_args = {'start_date': datetime(2024, 12, 1), 'retries': 1}
dag = DAG('etl_pipeline', default_args=default_args, schedule_interval='@daily')
extract_task = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load_data, dag=dag)
extract_task >> transform_task >> load_task
This example shows how each stage of an ETL pipeline is defined as a Python function and orchestrated by Airflow using Python operators and DAGs:
extract_task >> transform_task >> load_task
Another example of Apache Airflow DAG
Here is a basic example of coding a DAG (Directed Acyclic Graph) in Apache Airflow using Python. This example demonstrates how to define a workflow with two simple tasks using the BashOperator, which runs bash commands:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG(
dag_id='my_first_dag',
default_args=default_args,
description='My first Airflow DAG!',
schedule_interval='@daily',
start_date=datetime(2023, 1, 1),
catchup=False,
) as dag:
# Task 1: Print the current date
print_date = BashOperator(
task_id='print_date',
bash_command='date'
)
# Task 2: Say Hello
say_hello = BashOperator(
task_id='say_hello',
bash_command='echo "Hello, Airflow!"'
)
# Define task dependencies
print_date >> say_hello
How this works:
- The DAG is defined using a context manager (
with DAG(...) as dag:
). - Two tasks are created:
print_date
andsay_hello
. - The dependency
print_date >> say_hello
ensuressay_hello
runs only afterprint_date
completes. - Save this code as
my_first_dag.py
in your Airflowdags/
directory, and Airflow will automatically detect and schedule it.
You can expand on this template by adding PythonOperator tasks, branching, or more complex logic as your workflows grow.
Key Parameters When Creating a DAG Object in Python
When defining a DAG in Apache Airflow, several key parameters should be set to ensure proper scheduling, identification, and behavior of your workflow.
Essential Parameters:
-
dag_id
The unique identifier for your DAG. Every DAG in your Airflow environment must have a distinctdag_id
. -
start_date
The date and time when the DAG becomes eligible for execution. This determines when scheduling begins for the DAG. -
schedule (or
schedule_interval
)
Defines how often the DAG should run (e.g.,"@daily"
,"0 12 * * *"
, orNone
for manual runs). -
catchup
A boolean that determines if Airflow should backfill missed runs between thestart_date
and the current date when the DAG is first enabled. Defaults toFalse
.
Other Common Parameters:
-
default_args
A dictionary of default arguments applied to all tasks within the DAG (e.g.,owner
,email
,retries
,retry_delay
). This is optional but recommended for DRY (Don’t Repeat Yourself) code. -
params
A dictionary for runtime configuration parameters, allowing you to pass values to tasks at runtime and making your DAGs more flexible.
Example:
from airflow import DAG
from datetime import datetime, timedelta
default_args = {
'owner': 'jdoe',
'email': ['jdoe@example.com'],
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='example_dag',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
default_args=default_args,
params={'example_param': 'value'}
) as dag:
# Define tasks here
pass
Table of DAG parameters
Parameter | Description | Required |
---|---|---|
dag_id | Unique name for the DAG | Yes |
start_date | When the DAG is eligible to start running | Yes |
schedule | Frequency/timing for DAG runs | Yes |
catchup | Whether to backfill missed runs | No |
default_args | Default arguments for all tasks in the DAG | No |
params | Runtime parameters for dynamic configuration | No |
Setting these parameters ensures your DAG is uniquely identified, scheduled correctly, and behaves as intended in Airflow.
Including Custom Runtime Parameters Using the params
Argument in Airflow
Apache Airflow’s params
argument allows you to inject custom runtime parameters into your DAGs and tasks, enabling dynamic and flexible workflow configurations. Params enable you to provide runtime configuration to tasks. You can configure default Params in your DAG code and supply additional Params, or overwrite Param values when triggering a DAG run.
This approach makes your Airflow workflows more dynamic and configurable, supporting a wide variety of automation scenarios.
How to Set Custom Runtime Parameters
- Define
params
in the DAG:
When creating a DAG object, include theparams
argument as a dictionary. Each key-value pair represents a parameter name and its default value.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def print_param(**kwargs):
# Access the parameter via kwargs['params']
my_param = kwargs['params']['my_param']
print(f"My custom parameter is: {my_param}")
with DAG(
dag_id='custom_params_example',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
params={'my_param': 'default_value'} # Custom runtime parameter
) as dag:
task = PythonOperator(
task_id='print_param_task',
python_callable=print_param,
provide_context=True
)
- Override at Runtime:
When triggering a DAG run manually (via the Airflow UI, CLI, or API), you can supply or overrideparams
values for that specific run.
Accessing Parameters in Tasks
- In your task’s Python callable, access params using
kwargs['params']
. - For templated fields (like in BashOperator), use
{{ params.my_param }}
in the template string.
Useful links
- https://airflow.apache.org
- https://github.com/apache/airflow
- https://en.wikipedia.org/wiki/Apache_Airflow
- uv - New Python Package, Project, and Environment Manager
- Python Cheatsheet
- AWS lambda performance: JavaScript vs Python vs Golang
- AWS SAM + AWS SQS + Python PowerTools
- venv Cheatsheet
- Generating PDF in Python - Libraries and examples"