Apache Airflow para MLOPS e ETL - Descrição, Benefícios e Exemplos
Ótimo framework para ETS/MLOPS com Python
Apache Airflow é uma plataforma de código aberto projetada para criar, agendar e monitorar fluxos de trabalho de forma programática, inteiramente em código Python, oferecendo uma alternativa flexível e poderosa às ferramentas de fluxo de trabalho tradicionais, manuais ou baseadas em interface gráfica.
O Apache Airflow foi originalmente desenvolvido no Airbnb em 2014 para gerenciar fluxos de trabalho cada vez mais complexos e tornou-se um projeto de nível superior da Apache Software Foundation em 2019.
O Airflow é mais benéfico quando você possui fluxos de trabalho complexos com múltiplas tarefas que precisam ser executadas em uma ordem específica, com dependências e tratamento de erros. É particularmente útil para organizações que executam muitos trabalhos de dados que requerem orquestração, monitoramento e mecanismos de repetição, em vez de scripts simples ou tarefas cron.

Principais Recursos e Casos de Uso Típicos do Apache Airflow
- Definição de Fluxos de Trabalho Baseada em Python: Os fluxos de trabalho são definidos como código Python, permitindo a geração dinâmica de pipelines usando construções de programação padrão, como loops e lógica condicional.
- Grafos Acíclicos Direcionados (DAGs): O Airflow utiliza DAGs para representar fluxos de trabalho, onde cada nó é uma tarefa e as arestas definem as dependências. Essa estrutura garante que as tarefas sejam executadas em uma ordem especificada sem ciclos.
- Interface Robusta (UI): O Airflow fornece uma interface web moderna para monitoramento, agendamento e gerenciamento de fluxos de trabalho, oferecendo visibilidade do status das tarefas e logs.
- Integrações Extensas: Inclui muitos operadores e hooks integrados para conectar-se a serviços em nuvem (AWS, GCP, Azure), bancos de dados e outras ferramentas, tornando-o altamente extensível.
- Escalabilidade e Flexibilidade: O Airflow pode orquestrar fluxos de trabalho em escala, suportando implantações locais e em nuvem, sendo adequado para uma ampla variedade de casos de uso, desde ETL até aprendizado de máquina.
Casos de Uso Típicos
- Orquestração de pipelines ETL (Extract, Transform, Load)
- Agendamento e monitoramento de fluxos de trabalho de dados
- Automação do treinamento e implantação de modelos de aprendizado de máquina
- Gerenciamento de tarefas de infraestrutura
Serviços Gerenciados de Airflow
Vários provedores de nuvem oferecem serviços gerenciados de Airflow, reduzindo o ônus operacional de configuração e manutenção:
- Amazon Managed Workflows for Apache Airflow (MWAA): Um serviço totalmente gerenciado que gerencia escalabilidade, segurança e disponibilidade, permitindo que os usuários se concentrem no desenvolvimento de fluxos de trabalho.
- Google Cloud Composer: Airflow gerenciado na Google Cloud Platform.
- Microsoft Fabric Data Factory: Oferece trabalhos do Apache Airflow como uma solução de orquestração gerenciada dentro do ecossistema Azure.
Instalação e Primeiros Passos
O Airflow pode ser instalado via gerenciador de pacotes do Python (pip install apache-airflow) ou ferramentas gerenciadas como o Astro CLI, que simplificam a configuração e o gerenciamento de projetos. Após a instalação, os usuários definem DAGs em scripts Python e os gerenciam através da interface do Airflow.
Resumo
| Recurso | Descrição |
|---|---|
| Definição de Fluxo de Trabalho | Código Python, baseado em DAG |
| Interface (UI) | Baseada na web, moderna e robusta |
| Integrações | Nuvem (AWS, GCP, Azure), bancos de dados, plugins personalizados |
| Serviços Gerenciados | AWS MWAA, Google Cloud Composer, Microsoft Fabric |
| Casos de Uso | ETL, pipelines de dados, fluxos de ML, tarefas de infraestrutura |
O Airflow é amplamente adotado na comunidade de engenharia de dados por sua flexibilidade, escalabilidade e ecossistema robusto, tornando-o uma escolha líder para orquestração de fluxos de trabalho em ambientes de produção.
Principais Formas de Como o Airflow Simplifica a Automação de Fluxos de Trabalho
-
Fluxos de Trabalho como Código (Python)
Os fluxos de trabalho do Airflow são definidos como scripts Python, usando Grafos Acíclicos Direcionados (DAGs) para representar tarefas e suas dependências. Essa abordagem de “fluxos de trabalho como código” permite geração dinâmica, parametrização e controle de versão fácil de pipelines, tornando-os altamente mantíveis e adaptáveis a requisitos em mudança. -
Pipelines Dinâmicos e Extensíveis
Aproveitando a capacidade total do Python, os usuários podem incorporar loops, condicionais e lógica personalizada diretamente em suas definições de fluxo de trabalho. Isso permite a criação dinâmica de tarefas e parametrização avançada que seria difícil ou impossível em arquivos de configuração estáticos ou ferramentas baseadas em GUI. -
Gerenciamento de Tarefas Pythonico
A API TaskFlow do Airflow (introduzida no Airflow 2.0) torna a definição de tarefas ainda mais “Pythonica”. Os desenvolvedores escrevem funções Python puras, as decoram, e o Airflow gerencia automaticamente a criação de tarefas, o cabeamento de dependências e a passagem de dados entre tarefas, resultando em código mais limpo e mantível. -
Operadores e Integrações Personalizadas
Os usuários podem criar operadores, sensores e hooks Python personalizados para interagir com virtualmente qualquer sistema externo, API ou banco de dados. Essa extensibilidade permite integração perfeita com o ecossistema Python mais amplo e serviços externos. -
Integração Nativa com o Ecossistema Python
Como os fluxos de trabalho são escritos em Python, os usuários podem aproveitar a vasta gama de bibliotecas Python (como Pandas, NumPy ou frameworks de aprendizado de máquina) dentro de suas tarefas, aumentando ainda mais as capacidades de automação. -
Legível e Mantível
A legibilidade e popularidade do Python tornam o Airflow acessível a uma ampla gama de usuários, desde engenheiros de dados até analistas. A abordagem baseada em código também suporta práticas padrão de engenharia de software, como revisões de código e controle de versão.
Alguns Benefícios do Apache Airflow
| Recurso | Benefício para Automação de Fluxos de Trabalho |
|---|---|
| DAGs baseados em Python | Fluxos de trabalho dinâmicos, flexíveis e mantíveis |
| API TaskFlow | Definições de fluxo de trabalho mais limpas e Pythonicas |
| Operadores/Sensores Personalizados | Integração com qualquer sistema ou API |
| Integração Python nativa | Use qualquer biblioteca ou ferramenta Python |
| Interface Robusta | Monitore e gerencie fluxos de trabalho visualmente |
A integração profunda do Airflow com o Python não apenas simplifica a automação de fluxos de trabalho, mas também capacita equipes a construir pipelines de dados robustos, escaláveis e altamente personalizados de forma eficiente.
Exemplo: Fluxo de Trabalho ETL Simples em Python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract_data():
print("Extracting data")
def transform_data():
print("Transforming data")
def load_data():
print("Loading data")
default_args = {'start_date': datetime(2024, 12, 1), 'retries': 1}
dag = DAG('etl_pipeline', default_args=default_args, schedule_interval='@daily')
extract_task = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load_data, dag=dag)
extract_task >> transform_task >> load_task
Este exemplo mostra como cada etapa de um pipeline ETL é definida como uma função Python e orquestrada pelo Airflow usando operadores Python e DAGs:
extract_task >> transform_task >> load_task
Outro exemplo de DAG do Apache Airflow
Aqui está um exemplo básico de codificação de um DAG (Grafo Acíclico Direcionado) no Apache Airflow usando Python. Este exemplo demonstra como definir um fluxo de trabalho com duas tarefas simples usando o BashOperator, que executa comandos bash:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG(
dag_id='my_first_dag',
default_args=default_args,
description='My first Airflow DAG!',
schedule_interval='@daily',
start_date=datetime(2023, 1, 1),
catchup=False,
) as dag:
# Task 1: Print the current date
print_date = BashOperator(
task_id='print_date',
bash_command='date'
)
# Task 2: Say Hello
say_hello = BashOperator(
task_id='say_hello',
bash_command='echo "Hello, Airflow!"'
)
# Define task dependencies
print_date >> say_hello
Como isso funciona:
- O DAG é definido usando um gerenciador de contexto (
with DAG(...) as dag:). - Duas tarefas são criadas:
print_dateesay_hello. - A dependência
print_date >> say_hellogarante quesay_helloseja executado apenas apósprint_dateser concluído. - Salve este código como
my_first_dag.pyno diretóriodags/do seu Airflow e o Airflow detectará e agendará automaticamente.
Você pode expandir este modelo adicionando tarefas PythonOperator, ramificações ou lógica mais complexa conforme seus fluxos de trabalho crescem.
Parâmetros Chave ao Criar um Objeto DAG em Python
Ao definir um DAG no Apache Airflow, vários parâmetros chave devem ser definidos para garantir o agendamento adequado, identificação e comportamento do seu fluxo de trabalho.
Parâmetros Essenciais:
-
dag_id
O identificador exclusivo para o seu DAG. Cada DAG no seu ambiente Airflow deve ter umdag_iddistinto. -
start_date
A data e hora em que o DAG se torna elegível para execução. Isso determina quando o agendamento começa para o DAG. -
schedule (ou
schedule_interval)
Define com que frequência o DAG deve ser executado (ex:"@daily","0 12 * * *", ouNonepara execuções manuais). -
catchup
Um booleano que determina se o Airflow deve preencher execuções perdidas entre astart_datee a data atual quando o DAG é habilitado pela primeira vez. O padrão éFalse.
Outros Parâmetros Comuns:
-
default_args
Um dicionário de argumentos padrão aplicados a todas as tarefas dentro do DAG (ex:owner,email,retries,retry_delay). Isso é opcional, mas recomendado para código DRY (Don’t Repeat Yourself). -
params
Um dicionário para parâmetros de configuração em tempo de execução, permitindo que você passe valores para tarefas em tempo de execução e torne seus DAGs mais flexíveis.
Exemplo:
from airflow import DAG
from datetime import datetime, timedelta
default_args = {
'owner': 'jdoe',
'email': ['jdoe@example.com'],
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='example_dag',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
default_args=default_args,
params={'example_param': 'value'}
) as dag:
# Define tasks here
pass
Tabela de parâmetros do DAG
| Parâmetro | Descrição | Obrigatório |
|---|---|---|
| dag_id | Nome exclusivo para o DAG | Sim |
| start_date | Quando o DAG é elegível para começar a rodar | Sim |
| schedule | Frequência/tempo para execuções do DAG | Sim |
| catchup | Se deve preencher execuções perdidas | Não |
| default_args | Argumentos padrão para todas as tarefas no DAG | Não |
| params | Parâmetros de tempo de execução para configuração dinâmica | Não |
Definir esses parâmetros garante que seu DAG seja identificado de forma única, agendado corretamente e se comporte conforme o esperado no Airflow.
Incluindo Parâmetros de Tempo de Execução Personalizados Usando o Argumento params no Airflow
O argumento params do Apache Airflow permite que você injete parâmetros de tempo de execução personalizados em seus DAGs e tarefas, permitindo configurações de fluxo de trabalho dinâmicas e flexíveis. Params permitem que você forneça configuração de tempo de execução para tarefas. Você pode configurar Params padrão no código do seu DAG e fornecer Params adicionais, ou sobrescrever valores de Params ao acionar uma execução de DAG.
Essa abordagem torna seus fluxos de trabalho do Airflow mais dinâmicos e configuráveis, suportando uma ampla variedade de cenários de automação.
Como Definir Parâmetros de Tempo de Execução Personalizados
- Definir
paramsno DAG:
Ao criar um objeto DAG, inclua o argumentoparamscomo um dicionário. Cada par chave-valor representa um nome de parâmetro e seu valor padrão.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def print_param(**kwargs):
# Access the parameter via kwargs['params']
my_param = kwargs['params']['my_param']
print(f"My custom parameter is: {my_param}")
with DAG(
dag_id='custom_params_example',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
params={'my_param': 'default_value'} # Custom runtime parameter
) as dag:
task = PythonOperator(
task_id='print_param_task',
python_callable=print_param,
provide_context=True
)
- Sobrescrever em Tempo de Execução:
Ao acionar uma execução de DAG manualmente (via interface do Airflow, CLI ou API), você pode fornecer ou sobrescrever valores deparamspara essa execução específica.
Acessando Parâmetros em Tarefas
- No callable Python da sua tarefa, acesse os params usando
kwargs['params']. - Para campos modelados (como em BashOperator), use
{{ params.my_param }}na string do modelo.
Links Úteis
- https://airflow.apache.org
- https://github.com/apache/airflow
- https://en.wikipedia.org/wiki/Apache_Airflow
- uv - Novo Gerenciador de Pacotes, Projetos e Ambientes Python
- Python Cheatsheet
- AWS lambda performance: JavaScript vs Python vs Golang
- AWS SAM + AWS SQS + Python PowerTools
- venv Cheatsheet
- Gerando PDF em Python - Bibliotecas e exemplos