Vai al contenuto principale
BlogEsecuzione parallela e gestione dei task falliti in Apache Airflow
Apache AirflowApache per i Big Data

Esecuzione parallela e gestione dei task falliti in Apache Airflow

4 Nov 20245 min lettura
SC

Stanislao Corvino

Dottore Commercialista · Data Evangelist

Uno dei punti di forza di Apache Airflow è la capacità di eseguire task in parallelo e gestire eventuali fallimenti. L'esecuzione parallela permette di accelerare i flussi di lavoro, mentre una gestione robusta dei fallimenti garantisce affidabilità in produzione.

Esecuzione parallela

In Airflow, i task vengono eseguiti in parallelo quando non hanno dipendenze dirette tra loro. La configurazione del parallelismo avviene a diversi livelli:

Configurazione globale

[core]
parallelism = 32          # task paralleli totali
max_active_tasks_per_dag = 16  # task paralleli per DAG
max_active_runs_per_dag = 16   # esecuzioni parallele per DAG

Configurazione per DAG

dag = DAG(
    'my_pipeline',
    max_active_tasks=8,
    max_active_runs=3,
    concurrency=8,
)

Executor Types

La scelta dell'executor determina come vengono eseguiti i task paralleli:

  • SequentialExecutor: un task alla volta (solo per sviluppo)
  • LocalExecutor: più task in parallelo sulla stessa macchina
  • CeleryExecutor: distribuzione su più worker via message broker
  • KubernetesExecutor: un pod per ogni task

Gestione dei task falliti

Retry automatici

default_args = {
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(minutes=30),
}

Callback per i fallimenti

def on_failure(context):
    task_id = context['task_instance'].task_id
    dag_id = context['dag'].dag_id
    # Invia notifica Slack, email, ecc.
    send_alert(f"Task {task_id} in DAG {dag_id} fallito!")

default_args = {
    'on_failure_callback': on_failure,
    'on_retry_callback': on_retry,
    'on_success_callback': on_success,
}

Pool per il controllo delle risorse

I Pool permettono di limitare il numero di task concorrenti che accedono a una risorsa condivisa:

# Crea un pool con 5 slot
# Admin > Pools > Aggiungi

task = PythonOperator(
    task_id='query_db',
    pool='database_pool',  # max 5 query concorrenti
    pool_slots=1,
    ...
)

Priority Weight

Quando i task competono per le risorse, il priority_weight determina l'ordine di esecuzione:

critical_task = PythonOperator(
    task_id='critical',
    priority_weight=10,  # priorità alta
    ...
)

optional_task = PythonOperator(
    task_id='optional',
    priority_weight=1,   # priorità bassa
    ...
)

Best Practice

  • Configura sempre i retry con backoff esponenziale
  • Usa i Pool per proteggere risorse condivise (database, API esterne)
  • Implementa callback per notifiche in caso di fallimento
  • Monitora il parallelismo per ottimizzare l'utilizzo delle risorse
  • Testa i flussi di lavoro con task che falliscono intenzionalmente

Una gestione efficace del parallelismo e dei fallimenti è fondamentale per pipeline di dati affidabili e performanti.

Tag:
Apache AirflowParallel ExecutionError Handling

Articoli correlati