Trigger e sensori in Apache Airflow
In Apache Airflow, i Trigger e i Sensori sono strumenti potenti per monitorare e reagire a eventi esterni o condizioni specifiche. I sensori attendono che una condizione venga soddisfatta prima di eseguire i task, mentre i trigger permettono di avviare task o DAG in risposta a determinati eventi. In questo articolo vedremo come configurare i sensori e i trigger, con esempi pratici di utilizzo.
Cosa sono i sensori in Apache Airflow?
I Sensori in Apache Airflow sono operatori speciali che controllano continuamente una condizione fino a quando non viene soddisfatta. Possono monitorare file, tabelle di database, API e molto altro. Ad esempio, un sensore può attendere che un file venga caricato su un sistema di file o che un task in un altro DAG venga completato.
Esempio di utilizzo del FileSensor
Il FileSensor è un sensore che attende l’esistenza di un file in una directory specifica. Ecco un esempio di come utilizzarlo:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 11, 1),
}
dag = DAG('file_sensor_example', default_args=default_args, schedule_interval='@daily')
start = DummyOperator(task_id='start', dag=dag)
# FileSensor attende l'esistenza di un file specifico
file_sensor = FileSensor(
task_id='check_for_file',
filepath='/path/to/file.csv', # Sostituisci con il percorso corretto
poke_interval=30, # Intervallo di controllo in secondi
timeout=600, # Timeout dopo 10 minuti
dag=dag
)
end = DummyOperator(task_id='end', dag=dag)
start >> file_sensor >> end
In questo esempio, il FileSensor attende l’esistenza di un file chiamato file.csv nel percorso specificato. L’intervallo di controllo è impostato a 30 secondi, e se il file non viene trovato entro 10 minuti, il sensore fallisce.
Cosa sono i trigger in Apache Airflow?
I Trigger in Apache Airflow permettono di avviare task o DAG in risposta a determinati eventi o condizioni. Possono essere utilizzati per eseguire azioni automatiche al verificarsi di un evento specifico, come l’arrivo di un file o il completamento di un processo in un altro sistema.
Esempio di utilizzo di un TriggerDagRunOperator
Il TriggerDagRunOperator è un operatore che avvia un altro DAG in risposta al completamento di un task o di un DAG. Questo può essere utile quando un flusso di lavoro è suddiviso in più DAG. Ecco un esempio di utilizzo:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 11, 1),
}
dag = DAG('trigger_dagrun_example', default_args=default_args, schedule_interval='@daily')
start = DummyOperator(task_id='start', dag=dag)
# TriggerDagRunOperator avvia un altro DAG
trigger = TriggerDagRunOperator(
task_id='trigger_another_dag',
trigger_dag_id='another_dag', # ID del DAG da avviare
dag=dag
)
end = DummyOperator(task_id='end', dag=dag)
start >> trigger >> end
In questo esempio, il TriggerDagRunOperator avvia un altro DAG chiamato another_dag dopo il completamento del task start. Questo meccanismo è utile per orchestrare flussi di lavoro che dipendono dall’esecuzione di più DAG separati.
Utilizzare il TimeSensor per monitorare l’orario
Il TimeSensor è un sensore che attende il raggiungimento di un’ora specifica. È utile quando si vuole eseguire un task solo dopo un certo orario del giorno. Ecco un esempio:
from airflow import DAG
from airflow.sensors.time_sensor import TimeSensor
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 11, 1),
}
dag = DAG('time_sensor_example', default_args=default_args, schedule_interval='@daily')
start = DummyOperator(task_id='start', dag=dag)
# TimeSensor attende fino alle 10:00 AM
time_sensor = TimeSensor(
task_id='wait_for_time',
target_time='10:00:00', # Ora specificata (10:00 AM)
dag=dag
)
end = DummyOperator(task_id='end', dag=dag)
start >> time_sensor >> end
In questo esempio, il TimeSensor attende fino alle 10:00 del mattino prima di permettere al DAG di continuare con l’esecuzione del task successivo.
Conclusione
In Apache Airflow, i Trigger e i Sensori permettono di monitorare eventi e condizioni esterne, avviando task o DAG solo quando necessario. Utilizzando strumenti come il FileSensor o il TriggerDagRunOperator, puoi orchestrare flussi di lavoro complessi che reagiscono dinamicamente ai cambiamenti. Ora che hai appreso come configurare sensori e trigger, sei pronto per implementare flussi di lavoro automatizzati e reattivi.
Fonte: Documentazione ufficiale di Apache Airflow
Per altri tutorial, visita la sezione: Tutti gli articoli
Tag:ETL


