Back

Trigger e sensori in Apache Airflow

In Apache Airflow, i Trigger e i Sensori sono strumenti potenti per monitorare e reagire a eventi esterni o condizioni specifiche. I sensori attendono che una condizione venga soddisfatta prima di eseguire i task, mentre i trigger permettono di avviare task o DAG in risposta a determinati eventi. In questo articolo vedremo come configurare i sensori e i trigger, con esempi pratici di utilizzo.

Cosa sono i sensori in Apache Airflow?

I Sensori in Apache Airflow sono operatori speciali che controllano continuamente una condizione fino a quando non viene soddisfatta. Possono monitorare file, tabelle di database, API e molto altro. Ad esempio, un sensore può attendere che un file venga caricato su un sistema di file o che un task in un altro DAG venga completato.

Esempio di utilizzo del FileSensor

Il FileSensor è un sensore che attende l’esistenza di un file in una directory specifica. Ecco un esempio di come utilizzarlo:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 11, 1),
}

dag = DAG('file_sensor_example', default_args=default_args, schedule_interval='@daily')

start = DummyOperator(task_id='start', dag=dag)

# FileSensor attende l'esistenza di un file specifico
file_sensor = FileSensor(
    task_id='check_for_file',
    filepath='/path/to/file.csv',  # Sostituisci con il percorso corretto
    poke_interval=30,  # Intervallo di controllo in secondi
    timeout=600,  # Timeout dopo 10 minuti
    dag=dag
)

end = DummyOperator(task_id='end', dag=dag)

start >> file_sensor >> end

In questo esempio, il FileSensor attende l’esistenza di un file chiamato file.csv nel percorso specificato. L’intervallo di controllo è impostato a 30 secondi, e se il file non viene trovato entro 10 minuti, il sensore fallisce.

Cosa sono i trigger in Apache Airflow?

I Trigger in Apache Airflow permettono di avviare task o DAG in risposta a determinati eventi o condizioni. Possono essere utilizzati per eseguire azioni automatiche al verificarsi di un evento specifico, come l’arrivo di un file o il completamento di un processo in un altro sistema.

Esempio di utilizzo di un TriggerDagRunOperator

Il TriggerDagRunOperator è un operatore che avvia un altro DAG in risposta al completamento di un task o di un DAG. Questo può essere utile quando un flusso di lavoro è suddiviso in più DAG. Ecco un esempio di utilizzo:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 11, 1),
}

dag = DAG('trigger_dagrun_example', default_args=default_args, schedule_interval='@daily')

start = DummyOperator(task_id='start', dag=dag)

# TriggerDagRunOperator avvia un altro DAG
trigger = TriggerDagRunOperator(
    task_id='trigger_another_dag',
    trigger_dag_id='another_dag',  # ID del DAG da avviare
    dag=dag
)

end = DummyOperator(task_id='end', dag=dag)

start >> trigger >> end

In questo esempio, il TriggerDagRunOperator avvia un altro DAG chiamato another_dag dopo il completamento del task start. Questo meccanismo è utile per orchestrare flussi di lavoro che dipendono dall’esecuzione di più DAG separati.

Utilizzare il TimeSensor per monitorare l’orario

Il TimeSensor è un sensore che attende il raggiungimento di un’ora specifica. È utile quando si vuole eseguire un task solo dopo un certo orario del giorno. Ecco un esempio:

from airflow import DAG
from airflow.sensors.time_sensor import TimeSensor
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 11, 1),
}

dag = DAG('time_sensor_example', default_args=default_args, schedule_interval='@daily')

start = DummyOperator(task_id='start', dag=dag)

# TimeSensor attende fino alle 10:00 AM
time_sensor = TimeSensor(
    task_id='wait_for_time',
    target_time='10:00:00',  # Ora specificata (10:00 AM)
    dag=dag
)

end = DummyOperator(task_id='end', dag=dag)

start >> time_sensor >> end

In questo esempio, il TimeSensor attende fino alle 10:00 del mattino prima di permettere al DAG di continuare con l’esecuzione del task successivo.

Esempio di sensori in Apache Airflow

Conclusione

In Apache Airflow, i Trigger e i Sensori permettono di monitorare eventi e condizioni esterne, avviando task o DAG solo quando necessario. Utilizzando strumenti come il FileSensor o il TriggerDagRunOperator, puoi orchestrare flussi di lavoro complessi che reagiscono dinamicamente ai cambiamenti. Ora che hai appreso come configurare sensori e trigger, sei pronto per implementare flussi di lavoro automatizzati e reattivi.

Fonte: Documentazione ufficiale di Apache Airflow

Per altri tutorial, visita la sezione: Tutti gli articoli