Apache Airflow para MLOPS e ETL - Descrição, Benefícios e Exemplos

Bom framework para ETS/MLOPS com Python

Conteúdo da página

Apache Airflow é uma plataforma de código aberto projetada para autorizar, agendar e monitorar fluxos de trabalho de forma programática, totalmente em código Python, oferecendo uma alternativa flexível e poderosa aos ferramentas tradicionais, manuais ou baseadas em interface gráfica de fluxos de trabalho.

O Airflow foi originalmente desenvolvido na Airbnb em 2014 para gerenciar fluxos de trabalho cada vez mais complexos e tornou-se um projeto de nível superior da Apache Software Foundation em 2019.

O Airflow é mais benéfico quando você tem fluxos de trabalho complexos com múltiplas tarefas que precisam ser executadas em uma ordem específica, com dependências e tratamento de erros. É particularmente útil para organizações que executam muitos trabalhos de dados que exigem orquestração, monitoramento e mecanismos de repetição, em vez de scripts simples ou trabalhos cron.

Cadeia de eventos cibernéticos

Principais Funcionalidades e Casos de Uso Comuns do Apache Airflow

  • Definição de Fluxos de Trabalho Baseada em Python: Os fluxos de trabalho são definidos como código Python, permitindo a geração dinâmica de pipelines usando construções de programação padrão, como loops e lógica condicional.
  • Grafos Acíclicos Direcionados (DAGs): O Airflow usa DAGs para representar fluxos de trabalho, onde cada nó é uma tarefa e as arestas definem dependências. Essa estrutura garante que as tarefas sejam executadas em uma ordem especificada sem ciclos.
  • Interface de Usuário Robusta: O Airflow fornece uma interface web moderna para monitorar, agendar e gerenciar fluxos de trabalho, oferecendo visibilidade no status das tarefas e logs.
  • Integrações Extensas: Ele inclui muitos operadores e hooks embutidos para conectar-se a serviços de nuvem (AWS, GCP, Azure), bancos de dados e outras ferramentas, tornando-o altamente extensível.
  • Escalabilidade e Flexibilidade: O Airflow pode orquestrar fluxos de trabalho em grande escala, suportando implantações locais e em nuvem, e é adequado para uma ampla gama de casos de uso, desde ETL até machine learning.

Casos de Uso Comuns

  • Orquestração de pipelines ETL (Extração, Transformação, Carregamento)
  • Agendamento e monitoramento de fluxos de dados
  • Automatização do treinamento e implantação de modelos de machine learning
  • Gerenciamento de tarefas de infraestrutura

Serviços Gerenciados do Airflow

Vários provedores de nuvem oferecem serviços gerenciados do Airflow, reduzindo a carga operacional de configuração e manutenção:

  • Amazon Managed Workflows for Apache Airflow (MWAA): Um serviço totalmente gerenciado que lida com escalabilidade, segurança e disponibilidade, permitindo que os usuários se concentrem no desenvolvimento de fluxos de trabalho.
  • Google Cloud Composer: Airflow gerenciado no Google Cloud Platform.
  • Microsoft Fabric Data Factory: Oferece trabalhos do Airflow como uma solução de orquestração gerenciada dentro do ecossistema Azure.

Instalação e Começando

O Airflow pode ser instalado via gerenciador de pacotes do Python (pip install apache-airflow) ou ferramentas gerenciadas como Astro CLI, que simplificam a configuração e o gerenciamento de projetos. Após a instalação, os usuários definem DAGs em scripts Python e os gerenciam através da interface do Airflow.

Resumo

Funcionalidade Descrição
Definição de Fluxo Código Python, baseado em DAGs
Interface de Usuário Web-based, moderna e robusta
Integrações Nuvem (AWS, GCP, Azure), bancos de dados, plugins customizados
Serviços Gerenciados AWS MWAA, Google Cloud Composer, Microsoft Fabric
Casos de Uso ETL, pipelines de dados, fluxos de trabalho de ML, tarefas de infraestrutura

O Airflow é amplamente adotado na comunidade de engenharia de dados por sua flexibilidade, escalabilidade e ecossistema forte, tornando-o uma escolha líder para orquestração de fluxos de trabalho em ambientes de produção.

Principais Formas em que o Airflow Simplifica a Automatização de Fluxos de Trabalho

  • Fluxos de Trabalho como Código (Python)
    Os fluxos de trabalho do Airflow são definidos como scripts Python, usando Grafos Acíclicos Direcionados (DAGs) para representar tarefas e suas dependências. Essa abordagem de “fluxos de trabalho como código” permite a geração dinâmica, parametrização e controle de versão fácil de pipelines, tornando-os altamente mantíveis e adaptáveis a requisitos em mudança.

  • Pipelines Dinâmicos e Extensíveis
    Ao aproveitar plenamente as capacidades do Python, os usuários podem incorporar loops, condicionais e lógica personalizada diretamente nas definições de fluxo de trabalho. Isso permite a criação dinâmica de tarefas e parametrização avançada que seriam difíceis ou impossíveis em arquivos de configuração estáticos ou ferramentas baseadas em interface gráfica.

  • Gerenciamento de Tarefas Pythonico
    A API TaskFlow do Airflow (introduzida no Airflow 2.0) torna a definição de tarefas ainda mais Pythonica. Os desenvolvedores escrevem funções Python comuns, decoram-as e o Airflow automaticamente lida com a criação de tarefas, conexão de dependências e passagem de dados entre tarefas, resultando em código mais limpo e mantível.

  • Operadores e Integrações Personalizados
    Os usuários podem criar operadores, sensores e hooks personalizados em Python para interagir com praticamente qualquer sistema externo, API ou banco de dados. Essa extensibilidade permite uma integração sem problemas com o ecossistema Python mais amplo e serviços externos.

  • Integração Nativa com o Ecossistema Python
    Como os fluxos de trabalho são escritos em Python, os usuários podem aproveitar a vasta gama de bibliotecas Python (como Pandas, NumPy ou frameworks de machine learning) dentro das tarefas, aprimorando ainda mais as capacidades de automação.

  • Legibilidade e Manutenibilidade
    A legibilidade do Python e sua popularidade tornam o Airflow acessível a uma ampla gama de usuários, desde engenheiros de dados até analistas. A abordagem baseada em código também apoia práticas padrão de engenharia de software, como revisão de código e controle de versão.

Alguns Benefícios do Apache Airflow

Funcionalidade Benefício para a Automatização de Fluxos de Trabalho
DAGs baseados em Python Fluxos de trabalho dinâmicos, flexíveis e mantíveis
API TaskFlow Definições de fluxos de trabalho mais limpas e Pythonicas
Operadores/Sensores Personalizados Integração com qualquer sistema ou API
Integração nativa com Python Uso de qualquer biblioteca ou ferramenta Python
Interface de Usuário Robusta Monitoramento e gerenciamento visual de fluxos de trabalho

A profunda integração do Airflow com o Python não só simplifica a automação de fluxos de trabalho, mas também empodera equipes a construir pipelines de dados robustos, escaláveis e altamente personalizados de forma eficiente.

Exemplo: Fluxo de Trabalho ETL Simples em Python

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract_data():
    print("Extraindo dados")

def transform_data():
    print("Transformando dados")

def load_data():
    print("Carregando dados")

default_args = {'start_date': datetime(2024, 12, 1), 'retries': 1}

dag = DAG('etl_pipeline', default_args=default_args, schedule_interval='@daily')

extract_task = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load_data, dag=dag)

extract_task >> transform_task >> load_task

Este exemplo mostra como cada etapa de um pipeline ETL é definida como uma função Python e orquestrada pelo Airflow usando operadores Python e DAGs: extract_task >> transform_task >> load_task

Outro exemplo de DAG do Apache Airflow

Aqui está um exemplo básico de codificação de uma DAG (Grafo Acíclico Direcionado) no Apache Airflow usando Python. Este exemplo demonstra como definir um fluxo de trabalho com duas tarefas simples usando o BashOperator, que executa comandos bash:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='my_first_dag',
    default_args=default_args,
    description='Meu primeiro DAG do Airflow!',
    schedule_interval='@daily',
    start_date=datetime(2023, 1, 1),
    catchup=False,
) as dag:

    # Tarefa 1: Imprimir a data atual
    print_date = BashOperator(
        task_id='print_date',
        bash_command='date'
    )

    # Tarefa 2: Dizer "Olá"
    say_hello = BashOperator(
        task_id='say_hello',
        bash_command='echo "Olá, Airflow!"'
    )

    # Definir dependências de tarefa
    print_date >> say_hello

Como isso funciona:

  • O DAG é definido usando um gerenciador de contexto (with DAG(...) as dag:).
  • Duas tarefas são criadas: print_date e say_hello.
  • A dependência print_date >> say_hello garante que say_hello execute apenas após print_date ser concluída.
  • Salve este código como my_first_dag.py no diretório dags/ do seu Airflow, e o Airflow detectará e agendará automaticamente.

Você pode expandir este modelo adicionando tarefas com PythonOperator, ramificação ou lógica mais complexa conforme seus fluxos de trabalho crescem.

Parâmetros Principais ao Criar um Objeto DAG em Python

Ao definir um DAG no Apache Airflow, vários parâmetros importantes devem ser configurados para garantir o agendamento adequado, identificação e comportamento do seu fluxo de trabalho.

Parâmetros Essenciais:

  • dag_id
    O identificador único para o seu DAG. Cada DAG no seu ambiente do Airflow deve ter um dag_id distinto.

  • start_date
    A data e hora em que o DAG se torna elegível para execução. Isso determina quando o agendamento começa para o DAG.

  • schedule (ou schedule_interval)
    Define com que frequência o DAG deve ser executado (ex: "@daily", "0 12 * * *", ou None para execuções manuais).

  • catchup
    Um valor booleano que determina se o Airflow deve preencher execuções perdidas entre a start_date e a data atual quando o DAG for ativado pela primeira vez. O padrão é False.

Outros Parâmetros Comuns:

  • default_args
    Um dicionário de argumentos padrão aplicados a todas as tarefas dentro do DAG (ex: owner, email, retries, retry_delay). É opcional, mas recomendado para código DRY (Don’t Repeat Yourself).

  • params
    Um dicionário para parâmetros de configuração em tempo de execução, permitindo que você passe valores para tarefas em tempo de execução e tornando seus DAGs mais flexíveis.

Exemplo:

from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    'owner': 'jdoe',
    'email': ['jdoe@example.com'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='example_dag',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    default_args=default_args,
    params={'example_param': 'value'}
) as dag:
    # Definir tarefas aqui
    pass

Tabela de Parâmetros de DAG

Parâmetro Descrição Obrigatório
dag_id Nome único para o DAG Sim
start_date Quando o DAG se torna elegível para execução Sim
schedule Frequência/timing para execuções do DAG Sim
catchup Se deve preencher execuções perdidas Não
default_args Argumentos padrão para todas as tarefas no DAG Não
params Parâmetros de configuração em tempo de execução Não

Definir esses parâmetros garante que seu DAG seja identificado de forma única, agendado corretamente e comportar-se conforme o esperado no Airflow.

Incluindo Parâmetros de Tempo de Execução Personalizados Usando o Argumento params no Airflow

O argumento params do Apache Airflow permite injetar parâmetros de tempo de execução personalizados em seus DAGs e tarefas, permitindo configurações de fluxo de trabalho dinâmicas e flexíveis. Os parâmetros permitem que você forneça configurações de tempo de execução para tarefas. Você pode configurar parâmetros padrão em seu código de DAG e fornecer parâmetros adicionais ou substituir valores de parâmetro ao disparar uma execução de DAG.

Essa abordagem torna seus fluxos de trabalho do Airflow mais dinâmicos e configuráveis, suportando uma ampla variedade de cenários de automação.

Como Definir Parâmetros de Tempo de Execução Personalizados

  • Definir params no DAG:
    Ao criar um objeto DAG, inclua o argumento params como um dicionário. Cada par de chave-valor representa o nome de um parâmetro e seu valor padrão.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def print_param(**kwargs):
    # Acessar o parâmetro via kwargs['params']
    my_param = kwargs['params']['my_param']
    print(f"Meu parâmetro personalizado é: {my_param}")

with DAG(
    dag_id='custom_params_example',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    params={'my_param': 'default_value'}  # Parâmetro de tempo de execução personalizado
) as dag:

    task = PythonOperator(
        task_id='print_param_task',
        python_callable=print_param,
        provide_context=True
    )
  • Substituir em Tempo de Execução:
    Ao disparar uma execução de DAG manualmente (via interface do Airflow, CLI ou API), você pode fornecer ou substituir valores de params para essa execução específica.

Acessando Parâmetros em Tarefas

  • Em sua função Python de tarefa, acesse os parâmetros usando kwargs['params'].
  • Para campos de modelo (como no BashOperator), use {{ params.my_param }} na string do modelo.