Uno dei punti di forza di Apache Airflow è la capacità di eseguire task in parallelo e gestire eventuali fallimenti. L'esecuzione parallela permette di accelerare i flussi di lavoro, mentre una gestione robusta dei fallimenti garantisce affidabilità in produzione.
Esecuzione parallela
In Airflow, i task vengono eseguiti in parallelo quando non hanno dipendenze dirette tra loro. La configurazione del parallelismo avviene a diversi livelli:
Configurazione globale
[core]
parallelism = 32 # task paralleli totali
max_active_tasks_per_dag = 16 # task paralleli per DAG
max_active_runs_per_dag = 16 # esecuzioni parallele per DAG
Configurazione per DAG
dag = DAG(
'my_pipeline',
max_active_tasks=8,
max_active_runs=3,
concurrency=8,
)
Executor Types
La scelta dell'executor determina come vengono eseguiti i task paralleli:
- SequentialExecutor: un task alla volta (solo per sviluppo)
- LocalExecutor: più task in parallelo sulla stessa macchina
- CeleryExecutor: distribuzione su più worker via message broker
- KubernetesExecutor: un pod per ogni task
Gestione dei task falliti
Retry automatici
default_args = {
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(minutes=30),
}
Callback per i fallimenti
def on_failure(context):
task_id = context['task_instance'].task_id
dag_id = context['dag'].dag_id
# Invia notifica Slack, email, ecc.
send_alert(f"Task {task_id} in DAG {dag_id} fallito!")
default_args = {
'on_failure_callback': on_failure,
'on_retry_callback': on_retry,
'on_success_callback': on_success,
}
Pool per il controllo delle risorse
I Pool permettono di limitare il numero di task concorrenti che accedono a una risorsa condivisa:
# Crea un pool con 5 slot
# Admin > Pools > Aggiungi
task = PythonOperator(
task_id='query_db',
pool='database_pool', # max 5 query concorrenti
pool_slots=1,
...
)
Priority Weight
Quando i task competono per le risorse, il priority_weight determina l'ordine di esecuzione:
critical_task = PythonOperator(
task_id='critical',
priority_weight=10, # priorità alta
...
)
optional_task = PythonOperator(
task_id='optional',
priority_weight=1, # priorità bassa
...
)
Best Practice
- Configura sempre i retry con backoff esponenziale
- Usa i Pool per proteggere risorse condivise (database, API esterne)
- Implementa callback per notifiche in caso di fallimento
- Monitora il parallelismo per ottimizzare l'utilizzo delle risorse
- Testa i flussi di lavoro con task che falliscono intenzionalmente
Una gestione efficace del parallelismo e dei fallimenti è fondamentale per pipeline di dati affidabili e performanti.