MLOPSおよびETL向けのApache Airflow - 説明、利点および例

Pythonを用いたETS/MLOPS向けの優れたフレームワーク

目次

Apache Airflow は、Python コードを使用してワークフローをプログラマティックに作成、スケジュール、監視するためのオープンソースプラットフォームです。伝統的な、手動の、またはUIベースのワークフローツールの代替として、柔軟で強力な選択肢を提供します。

Apache Airflow は、2014年にAirbnbで開発され、ますます複雑になるワークフローを管理するために作られ、2019年にApache Software Foundationのトップレベルプロジェクトとなりました。

Airflow は、複数のタスクがあり、特定の順序で実行され、依存関係とエラーハンドリングが必要な複雑なワークフローに最も有益です。特に、多くのデータジョブを実行し、オーケストレーション、監視、リトライメカニズムが必要な組織にとって非常に有用です。単純なスクリプトやcronジョブよりも適しています。

Chain oof cyber events

Apache Airflow の主な機能と典型的な使用例

  • Pythonベースのワークフロー定義: ワークフローはPythonコードとして定義され、ループや条件付き論理などの標準的なプログラミング構造を使用して動的なパイプライン生成が可能です。
  • 有向非巡回グラフ (DAGs): Airflow は、ワークフローを DAGs として表します。各ノードはタスクであり、エッジは依存関係を定義します。この構造により、サイクルがない指定された順序でタスクが実行されます。
  • 強力なUI: Airflow は、ワークフローの監視、スケジュール、管理に使用できる現代的なウェブインターフェースを提供し、タスクのステータスやログの可視化を可能にします。
  • 広範な統合: クラウドサービス(AWS、GCP、Azure)、データベース、その他のツールと接続するための多くの組み込みオペレータとフックが含まれており、非常に拡張可能です。
  • 拡張性と柔軟性: Airflow は、オンプレミスおよびクラウドの展開をサポートし、ETLから機械学習に至るまで、幅広い使用例に適しています。

典型的な使用例

  • ETL(抽出、変換、読み込み)パイプラインのオーケストレーション
  • データワークフローのスケジュールと監視
  • 機械学習モデルのトレーニングとデプロイの自動化
  • インフラタスクの管理

管理されたAirflowサービス

いくつかのクラウドプロバイダーが管理されたAirflowサービスを提供しており、セットアップとメンテナンスの運用負担を軽減しています:

  • Amazon Managed Workflows for Apache Airflow (MWAA): スケーリング、セキュリティ、可用性を自動的に処理し、ユーザーがワークフロー開発に集中できる完全に管理されたサービスです。
  • Google Cloud Composer: Google Cloud Platform 上での管理されたAirflowです。
  • Microsoft Fabric Data Factory: Azureエコシステム内でApache Airflowジョブを提供する管理されたオーケストレーションソリューションです。

インストールと開始

Airflow は、Pythonのパッケージマネージャー(pip install apache-airflow)またはAstro CLIなどの管理ツールを使用してインストールできます。これにより、セットアップとプロジェクト管理が簡略化されます。インストール後、ユーザーはPythonスクリプトでDAGを定義し、Airflow UIを通じてそれらを管理します。

概要

機能 説明
ワークフロー定義 Pythonコード、DAGベース
UI モダンで強力なウェブベース
統合 クラウド(AWS、GCP、Azure)、データベース、カスタムプラグイン
管理されたサービス AWS MWAA、Google Cloud Composer、Microsoft Fabric
使用例 ETL、データパイプライン、MLワークフロー、インフラタスク

Airflow は、柔軟性、拡張性、強力なエコシステムにより、データエンジニアリングコミュニティで広く採用されており、生産環境でのワークフローのオーケストレーションにおいて主要な選択肢となっています。

Airflow がワークフロー自動化を簡略化する主な方法

  • ワークフローをコードとして(Python)
    Airflow のワークフローは、Python スクリプトとして定義され、有向非巡回グラフ(DAGs)を使用してタスクとその依存関係を表します。この「ワークフローをコードとして」のアプローチにより、パイプラインの動的な生成、パラメータ化、バージョン管理が容易になり、変化する要件に柔軟に対応できる非常に保守性の高いワークフローが実現できます。

  • 動的で拡張可能なパイプライン
    Python のフル機能を活用することで、ユーザーはループ、条件分岐、カスタムロジックをワークフロー定義に直接組み込むことができます。これにより、静的な設定ファイルやGUIベースのツールでは困難または不可能な動的なタスクの作成や高度なパラメータ化が可能になります。

  • Python的タスク管理
    Airflow 2.0 で導入された TaskFlow API により、タスクの定義がさらにPython的になります。開発者は通常のPython関数を記述し、それらをデコレートし、Airflow が自動的にタスクの作成、依存関係の接続、タスク間のデータの受け渡しを処理します。これにより、コードがよりきれいで保守性が高まります。

  • カスタムオペレータと統合
    ユーザーは、ほぼすべての外部システム、API、データベースとやり取りするためのカスタムPythonオペレータ、センサー、フックを作成できます。この拡張性により、広範なPythonエコシステムと外部サービスとのシームレスな統合が可能になります。

  • ネイティブなPythonエコシステム統合
    ワークフローがPythonで書かれているため、ユーザーはPandasやNumPy、機械学習フレームワークなどの豊富なPythonライブラリをタスク内で活用でき、自動化の能力をさらに高めることができます。

  • 読みやすく保守性が高い
    Pythonの読みやすさと人気により、Airflowはデータエンジニアからアナリストに至る幅広いユーザーにアクセス可能になります。コードベースのアプローチは、コードレビューやバージョン管理などの標準的なソフトウェアエンジニアリングの実践もサポートします。

Apache Airflow の一部の利点

機能 ワークフロー自動化への利点
PythonベースのDAGs 動的で柔軟で保守性の高いワークフロー
TaskFlow API よりPython的なワークフロー定義
カスタムオペレータ/センサー 任意のシステムやAPIとの統合
ネイティブなPython統合 任意のPythonライブラリやツールの使用
強力なUI ワークフローを視覚的に監視・管理

Airflow は、Pythonとの深い統合により、ワークフロー自動化を簡略化するだけでなく、チームが効率的に強力で拡張性のあるカスタマイズ可能なデータパイプラインを構築できるようにもします。

例: Python での単純なETLワークフロー

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract_data():
    print("Extracting data")

def transform_data():
    print("Transforming data")

def load_data():
    print("Loading data")

default_args = {'start_date': datetime(2024, 12, 1), 'retries': 1}

dag = DAG('etl_pipeline', default_args=default_args, schedule_interval='@daily')

extract_task = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load_data, dag=dag)

extract_task >> transform_task >> load_task

この例では、ETLパイプラインの各段階がPython関数として定義され、AirflowがPythonオペレータとDAGを使用してオーケストレーションされています: extract_task >> transform_task >> load_task

Apache Airflow DAG のもう一つの例

Apache Airflow でPythonを使用してDAG(有向非巡回グラフ)をコーディングする基本的な例を以下に示します。この例では、BashOperatorを使用して2つの単純なタスクを持つワークフローを定義する方法を示しています。BashOperatorはbashコマンドを実行します:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='my_first_dag',
    default_args=default_args,
    description='My first Airflow DAG!',
    schedule_interval='@daily',
    start_date=datetime(2023, 1, 1),
    catchup=False,
) as dag:

    # タスク1: 現在の日付を印刷
    print_date = BashOperator(
        task_id='print_date',
        bash_command='date'
    )

    # タスク2: こんにちはを印刷
    say_hello = BashOperator(
        task_id='say_hello',
        bash_command='echo "Hello, Airflow!"'
    )

    # タスクの依存関係を定義
    print_date >> say_hello

このように動作します:

  • DAG はコンテキストマネージャー (with DAG(...) as dag:) を使用して定義されます。
  • 2つのタスク print_datesay_hello が作成されます。
  • 依存関係 print_date >> say_hello により、say_helloprint_date が完了した後にのみ実行されます。
  • このコードを my_first_dag.py として Airflow の dags/ ディレクトリに保存すると、Airflow が自動的に検出しスケジュールします。

ワークフローが成長するにつれて、PythonOperatorタスク、分岐、またはより複雑なロジックをこのテンプレートに追加して拡張できます。

Python で DAG オブジェクトを作成する際の重要なパラメータ

Apache Airflow で DAG を定義する際には、ワークフローの適切なスケジューリング、識別、動作を保証するためにいくつかの重要なパラメータを設定する必要があります。

必須パラメータ:

  • dag_id
    DAG の一意の識別子。Airflow 環境内のすべての DAG には、一意の dag_id が必要です。

  • start_date
    DAG が実行可能になる日時。この日時が DAG のスケジュール開始日を決定します。

  • schedule (または schedule_interval)
    DAG がどの頻度で実行されるかを定義します(例: "@daily""0 12 * * *"、または None で手動実行)。

  • catchup
    DAG が初めて有効化されたときに、start_date から現在の日付までの間の実行をバックフィルするかどうかを指定します。デフォルトは False です。

その他の一般的なパラメータ:

  • default_args
    DAG 内のすべてのタスクに適用されるデフォルトの引数の辞書(例: owneremailretriesretry_delay)。これはオプションですが、DRY(Don’t Repeat Yourself)なコードを実現するために推奨されます。

  • params
    ランタイムでの構成パラメータを指定する辞書。これにより、タスクにランタイムで値を渡すことができ、DAG がより柔軟になります。

例:

from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    'owner': 'jdoe',
    'email': ['jdoe@example.com'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='example_dag',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    default_args=default_args,
    params={'example_param': 'value'}
) as dag:
    # タスクをここに定義
    pass

DAG パラメータの表

パラメータ 説明 必須
dag_id DAG の一意の名前 はい
start_date DAG が実行可能になる日時 はい
schedule DAG の実行頻度/タイミング はい
catchup 過去の実行をバックフィルするかどうか いいえ
default_args DAG 内のすべてのタスクに適用されるデフォルトの引数 いいえ
params ダイナミックな構成に使用されるランタイムパラメータ いいえ

これらのパラメータを設定することで、DAG が一意に識別され、正しくスケジュールされ、Airflow で意図した通りに動作します。

Airflow で params 引数を使用してカスタムランタイムパラメータを含める方法

Apache Airflow の params 引数は、DAG およびタスクにカスタムランタイムパラメータを注入し、動的で柔軟なワークフロー構成を可能にします。params はタスクにランタイム構成を提供します。DAG コード内でデフォルトの params を定義し、追加の params を指定したり、DAG 実行時に params の値を上書きしたりできます。

このアプローチにより、Airflow のワークフローがより動的で構成可能になり、幅広い自動化シナリオをサポートします。

カスタムランタイムパラメータを設定する方法

  • DAG 内で params を定義:
    DAG オブジェクトを作成する際、params 引数を辞書として含めます。各キー-値ペアはパラメータ名とそのデフォルト値を表します。
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def print_param(**kwargs):
    # kwargs['params'] を使用してパラメータにアクセス
    my_param = kwargs['params']['my_param']
    print(f"私のカスタムパラメータは: {my_param}")

with DAG(
    dag_id='custom_params_example',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    params={'my_param': 'default_value'}  # カスタムランタイムパラメータ
) as dag:

    task = PythonOperator(
        task_id='print_param_task',
        python_callable=print_param,
        provide_context=True
    )
  • 実行時に上書き:
    DAG 実行を手動でトリガーする際(Airflow UI、CLI、またはAPI経由で)、特定の実行に対して params の値を上書きまたは指定できます。

タスク内でパラメータにアクセスする方法

  • タスクのPython可変関数内で、kwargs['params'] を使用してパラメータにアクセスします。
  • テンプレートフィールド(例: BashOperator)では、テンプレート文字列内で {{ params.my_param }} を使用します。

有用なリンク