Apache Airflow für MLOPS und ETL – Beschreibung, Vorteile und Beispiele
Gutes Framework für ETS/MLOPS mit Python
Apache Airflow ist eine Open-Source-Plattform, die entwickelt wurde, um Workflows programmatisch zu erstellen, zu planen und zu überwachen – vollständig in Python-Code, wodurch eine flexible und leistungsstarke Alternative zu traditionellen, manuellen oder UI-basierten Workflow-Tools geboten wird.
Apache Airflow wurde ursprünglich 2014 bei Airbnb entwickelt, um zunehmend komplexe Workflows zu verwalten, und wurde 2019 zu einem Top-Level-Projekt der Apache Software Foundation.
Airflow ist am vorteilhaftesten, wenn Sie komplexe Workflows mit mehreren Aufgaben haben, die in einer bestimmten Reihenfolge ausgeführt werden müssen, mit Abhängigkeiten und Fehlerbehandlung. Es ist besonders nützlich für Organisationen, die viele Datenjobs ausführen, die Orchestrierung, Überwachung und Wiederholungsmechanismen erfordern, anstatt einfache Skripte oder cron-Jobs zu verwenden.
Schlüsselmerkmale und typische Anwendungsfälle von Apache Airflow
- Workflow-Definition basierend auf Python: Workflows werden als Python-Code definiert, wodurch mithilfe von Standard-Programmierkonstrukten wie Schleifen und bedingten Logiken dynamische Pipeline-Generierung ermöglicht wird.
- Gerichtete azyklische Graphen (DAGs): Airflow verwendet DAGs, um Workflows darzustellen, wobei jeder Knoten eine Aufgabe ist und Kanten Abhängigkeiten definieren. Diese Struktur stellt sicher, dass Aufgaben in einer festgelegten Reihenfolge ohne Zyklen ausgeführt werden.
- Robuste Benutzeroberfläche: Airflow bietet eine moderne Web-Oberfläche zur Überwachung, Planung und Verwaltung von Workflows und bietet Einblick in den Status und die Protokolle der Aufgaben.
- Umfassende Integrationen: Es enthält viele integrierte Operatoren und Hooks, um mit Cloud-Diensten (AWS, GCP, Azure), Datenbanken und anderen Tools zu kommunizieren, was eine hohe Erweiterbarkeit ermöglicht.
- Skalierbarkeit und Flexibilität: Airflow kann Workflows auf Skalenebene orchestrieren, unterstützt sowohl lokale als auch Cloud-Bereitstellungen und ist für eine Vielzahl von Anwendungsfällen von ETL bis hin zu maschinellem Lernen geeignet.
Typische Anwendungsfälle
- Orchestrierung von ETL-Pipelines (Extrahieren, Transformieren, Laden)
- Planung und Überwachung von Datenworkflows
- Automatisierung des Trainings und der Bereitstellung von maschinellen Lernmodellen
- Verwaltung von Infrastrukturaufgaben
Verwaltete Airflow-Dienste
Mehrere Cloud-Anbieter bieten verwaltete Airflow-Dienste an, die den operativen Aufwand für die Einrichtung und Wartung reduzieren:
- Amazon Managed Workflows for Apache Airflow (MWAA): Ein vollständig verwalteter Dienst, der Skalierung, Sicherheit und Verfügbarkeit verwaltet und Nutzern ermöglicht, sich auf die Workflow-Entwicklung zu konzentrieren.
- Google Cloud Composer: Verwalteter Airflow auf der Google Cloud Platform.
- Microsoft Fabric Data Factory: Bietet Apache Airflow-Jobs als verwaltete Orchestrierungslösung innerhalb des Azure-Ökosystems an.
Installation und Einstieg
Airflow kann über Pythons Paketmanager (pip install apache-airflow
) oder verwaltete Tools wie den Astro CLI installiert werden, was die Einrichtung und Projektverwaltung vereinfacht. Nach der Installation definieren Nutzer DAGs in Python-Scripts und verwalten sie über die Airflow-Oberfläche.
Zusammenfassung
Merkmal | Beschreibung |
---|---|
Workflow-Definition | Python-Code, DAG-basiert |
UI | Web-basiert, modern, robust |
Integrationen | Cloud (AWS, GCP, Azure), Datenbanken, benutzerdefinierte Plugins |
Verwaltete Dienste | AWS MWAA, Google Cloud Composer, Microsoft Fabric |
Anwendungsfälle | ETL, Datenpipelines, ML-Workflows, Infrastrukturaufgaben |
Airflow wird in der Daten-Engineering-Community weit verbreitet genutzt, aufgrund seiner Flexibilität, Skalierbarkeit und starken Ökosystem, was es zu einer führenden Wahl für Workflow-Orchestrierung in Produktionsumgebungen macht.
Wichtige Wege, wie Airflow die Workflow-Automatisierung vereinfacht
-
Workflows als Code (Python)
Airflow-Workflows werden als Python-Scripts definiert, wobei gerichtete azyklische Graphen (DAGs) verwendet werden, um Aufgaben und deren Abhängigkeiten darzustellen. Dieser „Workflows als Code“-Ansatz ermöglicht die dynamische Generierung, Parametrisierung und einfache Versionskontrolle von Pipelines, was sie sehr wartbar und anpassbar an sich ändernde Anforderungen macht. -
Dynamische und erweiterbare Pipelines
Durch die Nutzung der vollständigen Fähigkeiten von Python können Nutzer Schleifen, Bedingungen und benutzerdefinierte Logik direkt in ihre Workflow-Definitionen einbauen. Dies ermöglicht die dynamische Erstellung von Aufgaben und erweiterte Parametrisierung, die in statischen Konfigurationsdateien oder GUI-basierten Tools schwer oder unmöglich wäre. -
Python-ähnliche Aufgabenverwaltung
Die TaskFlow-API (eingeführt in Airflow 2.0) macht die Definition von Aufgaben noch pythonischer. Entwickler schreiben einfache Python-Funktionen, dekorieren sie und Airflow verwaltet automatisch die Aufgaben-erstellung, Abhängigkeitsverknüpfung und Datenübertragung zwischen Aufgaben, was zu saubererem und wartbarerem Code führt. -
Benutzerdefinierte Operatoren und Integrationen
Nutzer können benutzerdefinierte Python-Operatoren, Sensoren und Hooks erstellen, um mit fast jedem externen System, API oder Datenbank zu interagieren. Diese Erweiterbarkeit ermöglicht eine nahtlose Integration mit dem breiteren Python-Ökosystem und externen Diensten. -
Nativ Integration mit dem Python-Ökosystem
Da Workflows in Python geschrieben werden, können Nutzer die umfangreiche Vielzahl von Python-Bibliotheken (wie Pandas, NumPy oder maschinelle Lernframeworks) innerhalb ihrer Aufgaben nutzen, was die Automatisierungsfähigkeiten weiter verbessert. -
Lesbar und wartbar
Die Lesbarkeit und Beliebtheit von Python machen Airflow für eine breite Nutzergruppe zugänglich, von Daten-Engineern bis hin zu Analysten. Der codebasierte Ansatz unterstützt auch Standard-Softwareentwicklungspraktiken wie Code-Reviews und Versionskontrolle.
Einige Vorteile von Apache Airflow
Merkmal | Vorteil für Workflow-Automatisierung |
---|---|
Python-basierte DAGs | Dynamische, flexible und wartbare Workflows |
TaskFlow API | Reinere, pythonischere Workflow-Definitionen |
Benutzerdefinierte Operatoren/Sensoren | Integrieren Sie mit jedem System oder API |
Native Python-Integration | Nutzen Sie jede Python-Bibliothek oder Tool |
Robuste UI | Überwachen und verwalten Sie Workflows visuell |
Die tiefgreifende Integration von Airflow mit Python vereinfacht nicht nur die Workflow-Automatisierung, sondern ermöglicht es Teams auch, robuste, skalierbare und hochanpassbare Datenpipelines effizient zu erstellen.
Beispiel: Einfache ETL-Pipeline in Python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract_data():
print("Daten extrahieren")
def transform_data():
print("Daten transformieren")
def load_data():
print("Daten laden")
default_args = {'start_date': datetime(2024, 12, 1), 'retries': 1}
dag = DAG('etl_pipeline', default_args=default_args, schedule_interval='@daily')
extract_task = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load_data, dag=dag)
extract_task >> transform_task >> load_task
Dieses Beispiel zeigt, wie jeder Schritt einer ETL-Pipeline als Python-Funktion definiert wird und von Airflow mithilfe von Python-Operatoren und DAGs orchestriert wird:
extract_task >> transform_task >> load_task
Ein weiteres Beispiel für eine Apache Airflow DAG
Hier ist ein einfaches Beispiel, wie man eine DAG (gerichteter azyklischer Graph) in Apache Airflow mit Python codiert. Dieses Beispiel veranschaulicht, wie man einen Workflow mit zwei einfachen Aufgaben mithilfe des BashOperators erstellt, der Befehle ausführt:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG(
dag_id='my_first_dag',
default_args=default_args,
description='Meine erste Airflow DAG!',
schedule_interval='@daily',
start_date=datetime(2023, 1, 1),
catchup=False,
) as dag:
# Aufgabe 1: Aktuellen Datum ausgeben
print_date = BashOperator(
task_id='print_date',
bash_command='date'
)
# Aufgabe 2: Hallo sagen
say_hello = BashOperator(
task_id='say_hello',
bash_command='echo "Hallo, Airflow!"'
)
# Aufgabenabhängigkeiten definieren
print_date >> say_hello
So funktioniert das:
- Die DAG wird mithilfe eines Kontextmanagers (
with DAG(...) as dag:
) definiert. - Zwei Aufgaben werden erstellt:
print_date
undsay_hello
. - Die Abhängigkeit
print_date >> say_hello
stellt sicher, dasssay_hello
nur nach Abschluss vonprint_date
ausgeführt wird. - Speichern Sie diesen Code als
my_first_dag.py
in Ihrem Airflowdags/
-Verzeichnis, und Airflow erkennt und planen ihn automatisch.
Sie können diesen Vorlagen-Code erweitern, indem Sie PythonOperator-Aufgaben, Verzweigungen oder komplexere Logik hinzufügen, je nachdem, wie sich Ihre Workflows entwickeln.
Wichtige Parameter beim Erstellen eines DAG-Objekts in Python
Beim Definieren einer DAG in Apache Airflow sollten mehrere Schlüsselparameter gesetzt werden, um eine ordnungsgemäße Planung, Identifizierung und Verhaltensweise Ihres Workflows sicherzustellen.
Wichtige Parameter:
-
dag_id
Die eindeutige Kennung für Ihre DAG. Jede DAG in Ihrem Airflow-Umgebung muss eine anderedag_id
haben. -
start_date
Das Datum und die Uhrzeit, ab der die DAG für die Ausführung berechtigt ist. Dies bestimmt, wann die Planung für die DAG beginnt. -
schedule (oder
schedule_interval
)
Definiert, wie oft die DAG ausgeführt werden soll (z. B."@daily"
,"0 12 * * *"
, oderNone
für manuelle Ausführungen). -
catchup
Ein boolescher Wert, der bestimmt, ob Airflow fehlende Ausführungen zwischen demstart_date
und dem aktuellen Datum zurückverfolgen soll, wenn die DAG erstmals aktiviert wird. Standardmäßig ist diesFalse
.
Weitere häufige Parameter:
-
default_args
Ein Wörterbuch mit Standardargumenten, die auf alle Aufgaben innerhalb der DAG angewendet werden (z. B.owner
,email
,retries
,retry_delay
). Dies ist optional, aber empfohlen, um DRY (Don’t Repeat Yourself) Code zu schreiben. -
params
Ein Wörterbuch für Laufzeitkonfigurationsparameter, das es ermöglicht, Werte zu Aufgaben zur Laufzeit zu übergeben und Ihre DAGs flexibler zu machen.
Beispiel:
from airflow import DAG
from datetime import datetime, timedelta
default_args = {
'owner': 'jdoe',
'email': ['jdoe@example.com'],
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='example_dag',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
default_args=default_args,
params={'example_param': 'value'}
) as dag:
# Definieren Sie hier Aufgaben
pass
Tabelle der DAG-Parameter
Parameter | Beschreibung | Erforderlich |
---|---|---|
dag_id | Eindeutiger Name für die DAG | Ja |
start_date | Wann die DAG berechtigt ist, um zu laufen | Ja |
schedule | Häufigkeit/Zeitpunkt für DAG-Ausführungen | Ja |
catchup | Ob fehlende Ausführungen zurückverfolgt werden sollen | Nein |
default_args | Standardargumente für alle Aufgaben in der DAG | Nein |
params | Laufzeitparameter für dynamische Konfiguration | Nein |
Die Festlegung dieser Parameter stellt sicher, dass Ihre DAG eindeutig identifiziert, korrekt geplant und wie beabsichtigt in Airflow verhält.
Einbeziehung benutzerdefinierter Laufzeitparameter mithilfe des params
-Arguments in Airflow
Das params
-Argument von Apache Airflow ermöglicht es Ihnen, benutzerdefinierte Laufzeitparameter in Ihre DAGs und Aufgaben einzubinden, wodurch dynamische und flexible Workflow-Konfigurationen ermöglicht werden. Params ermöglichen es Ihnen, Laufzeitkonfigurationen für Aufgaben bereitzustellen. Sie können Standard-Params in Ihrem DAG-Code definieren und zusätzliche Params bereitstellen oder Paramwerte überschreiben, wenn Sie eine DAG-Ausführung auslösen.
Dieser Ansatz macht Ihre Airflow-Workflows dynamischer und konfigurierbarer und unterstützt eine Vielzahl von Automatisierungsszenarien.
Wie man benutzerdefinierte Laufzeitparameter festlegt
- Definieren Sie
params
in der DAG:
Wenn Sie ein DAG-Objekt erstellen, geben Sie denparams
-Argument als Wörterbuch an. Jedes Schlüssel-Wert-Paar stellt einen Parametername und dessen Standardwert dar.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def print_param(**kwargs):
# Zugriff auf den Parameter über kwargs['params']
my_param = kwargs['params']['my_param']
print(f"Mein benutzerdefinierter Parameter lautet: {my_param}")
with DAG(
dag_id='custom_params_example',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
params={'my_param': 'default_value'} # Benutzerdefinierter Laufzeitparameter
) as dag:
task = PythonOperator(
task_id='print_param_task',
python_callable=print_param,
provide_context=True
)
- Überschreiben Sie zur Laufzeit:
Wenn Sie eine DAG-Ausführung manuell auslösen (über die Airflow-Oberfläche, CLI oder API), können Sieparams
-Werte für diese Ausführung bereitstellen oder überschreiben.
Zugriff auf Parameter in Aufgaben
- In der Python-Funktion Ihrer Aufgabe können Sie Parameter über
kwargs['params']
zugreifen. - Für vorgeschaltete Felder (z. B. in BashOperator) verwenden Sie
{{ params.my_param }}
in der Vorlage.
Nützliche Links
- https://airflow.apache.org
- https://github.com/apache/airflow
- https://en.wikipedia.org/wiki/Apache_Airflow
- uv - Neues Python-Paket, Projekt- und Umgebungsverwaltungs-Tool
- Python Cheatsheet
- AWS Lambda Leistung: JavaScript vs Python vs Golang
- AWS SAM + AWS SQS + Python PowerTools
- venv Cheatsheet
- PDF in Python generieren - Bibliotheken und Beispiele"