Back

Creare task con dipendenze complesse in Apache Airflow

In Apache Airflow, i task con dipendenze complesse sono fondamentali per la gestione di flussi di lavoro avanzati. La possibilità di configurare dipendenze complesse tra i task permette di definire sequenze di esecuzione precise, coordinando task paralleli, task condizionali e dipendenze di task multipli. In questo articolo vedremo come creare task con dipendenze complesse e alcuni esempi pratici.

Configurare dipendenze tra task in Apache Airflow

In Apache Airflow, le dipendenze tra task sono configurate utilizzando operatori come >> e <<, che stabiliscono l’ordine di esecuzione tra i task. Per task con dipendenze complesse, è possibile utilizzare operatori condizionali, task paralleli e gestione di task multipli per creare flussi di lavoro sofisticati.

Creare task paralleli

Quando i task non dipendono l’uno dall’altro, possono essere eseguiti in parallelo. Questo può migliorare l’efficienza dei flussi di lavoro, riducendo il tempo totale di esecuzione. Ecco un esempio di configurazione di task paralleli:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 11, 1),
}

dag = DAG('parallel_task_example', default_args=default_args, schedule_interval='@daily')

start = DummyOperator(task_id='start', dag=dag)

task_1 = DummyOperator(task_id='task_1', dag=dag)
task_2 = DummyOperator(task_id='task_2', dag=dag)
task_3 = DummyOperator(task_id='task_3', dag=dag)

end = DummyOperator(task_id='end', dag=dag)

# Configura i task paralleli
start >> [task_1, task_2, task_3] >> end

In questo esempio, i task task_1, task_2 e task_3 vengono eseguiti in parallelo dopo il task start e prima del task end.

Creare task condizionali

Airflow consente di creare task condizionali che vengono eseguiti solo se una certa condizione è soddisfatta. Per fare questo, puoi utilizzare l’operatore BranchPythonOperator, che permette di biforcare il flusso di lavoro in base al risultato di una funzione Python.

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 11, 1),
}

dag = DAG('conditional_task_example', default_args=default_args, schedule_interval='@daily')

def choose_branch(**kwargs):
    return 'task_true' if datetime.now().hour < 12 else 'task_false'

branch_task = BranchPythonOperator(
    task_id='branch_task',
    python_callable=choose_branch,
    dag=dag
)

task_true = DummyOperator(task_id='task_true', dag=dag)
task_false = DummyOperator(task_id='task_false', dag=dag)
end = DummyOperator(task_id='end', dag=dag)

branch_task >> [task_true, task_false] >> end

In questo esempio, il task branch_task sceglie tra due task (task_true e task_false) in base all’ora corrente. Se è prima di mezzogiorno, esegue task_true; altrimenti, esegue task_false.

Gestire task con dipendenze multiple

In alcuni casi, un task deve essere eseguito solo dopo il completamento di più task precedenti. Airflow consente di definire task con dipendenze multiple utilizzando una lista di task come input. Ecco un esempio di configurazione:

dag = DAG('multiple_dependencies_example', default_args=default_args, schedule_interval='@daily')

task_a = DummyOperator(task_id='task_a', dag=dag)
task_b = DummyOperator(task_id='task_b', dag=dag)
task_c = DummyOperator(task_id='task_c', dag=dag)

task_d = DummyOperator(task_id='task_d', dag=dag)

# task_d dipende da task_a, task_b e task_c
[task_a, task_b, task_c] >> task_d

In questo esempio, il task task_d viene eseguito solo dopo il completamento dei task task_a, task_b e task_c. Questo approccio semplifica la gestione delle dipendenze complesse all’interno del DAG.

Utilizzare SubDAG per organizzare task complessi

Per gestire task complessi, puoi utilizzare i SubDAG, che permettono di suddividere il DAG principale in sezioni organizzate. I SubDAG sono particolarmente utili per raggruppare task con dipendenze multiple e mantenere il codice ordinato.

from airflow.operators.subdag_operator import SubDagOperator

def subdag(parent_dag_name, child_dag_name, args):
    dag_subdag = DAG(dag_id=f'{parent_dag_name}.{child_dag_name}', default_args=args, schedule_interval="@daily")
    with dag_subdag:
        task_1 = DummyOperator(task_id='task_1', dag=dag_subdag)
        task_2 = DummyOperator(task_id='task_2', dag=dag_subdag)
        task_1 >> task_2
    return dag_subdag

main_dag = DAG('main_dag', default_args=default_args, schedule_interval='@daily')

start = DummyOperator(task_id='start', dag=main_dag)
subdag_task = SubDagOperator(
    task_id='section_1',
    subdag=subdag('main_dag', 'section_1', default_args),
    dag=main_dag,
)
end = DummyOperator(task_id='end', dag=main_dag)

start >> subdag_task >> end

In questo esempio, il SubDagOperator definisce un SubDAG all’interno del DAG principale, facilitando la gestione di task complessi con dipendenze specifiche.

Conclusione

In Apache Airflow, creare task con dipendenze complesse permette di orchestrare flussi di lavoro avanzati e su misura per le esigenze specifiche dei processi. Utilizzando task paralleli, condizionali, dipendenze multiple e SubDAG, puoi configurare workflow dinamici e flessibili. Ora che hai appreso come gestire le dipendenze complesse in Airflow, puoi costruire flussi di lavoro più robusti e personalizzati.

Fonte: Documentazione ufficiale di Apache Airflow

Per altri tutorial, visita la sezione: Tutti gli articoli