In Apache Airflow, i DAG dinamici offrono la flessibilità di creare flussi di lavoro che si adattano automaticamente in base a parametri o condizioni specifiche. I DAG dinamici permettono di generare task e dipendenze al volo, rendendo le pipeline più flessibili e manutenibili.
Perché usare DAG dinamici
I DAG dinamici sono utili quando:
- Il numero di task non è noto a priori
- La struttura del DAG dipende da configurazioni esterne
- Si vogliono creare pipeline riutilizzabili per diversi dataset
- Le dipendenze tra task cambiano in base a condizioni runtime
Approcci per creare DAG dinamici
1. Generazione con cicli Python
Il modo più semplice è usare cicli Python nel file di definizione del DAG:
tables = ['users', 'orders', 'products', 'reviews']
with DAG('etl_pipeline', ...) as dag:
for table in tables:
extract = PythonOperator(
task_id=f'extract_{table}',
python_callable=extract_data,
op_kwargs={'table': table}
)
load = PythonOperator(
task_id=f'load_{table}',
python_callable=load_data,
op_kwargs={'table': table}
)
extract >> load
2. DAG Factory Pattern
Per configurazioni più complesse, si può usare il pattern DAG Factory:
def create_dag(dag_id, config):
dag = DAG(dag_id, default_args=config['defaults'])
with dag:
# Crea task basati sulla configurazione
for step in config['steps']:
task = PythonOperator(
task_id=step['name'],
python_callable=step['callable']
)
return dag
3. Configurazione da file esterni
È possibile leggere la configurazione dei DAG da file YAML o JSON:
import yaml
with open('/config/pipelines.yaml') as f:
configs = yaml.safe_load(f)
for pipeline in configs['pipelines']:
dag = create_dag(pipeline['name'], pipeline)
Task Mapping (Airflow 2.3+)
A partire da Airflow 2.3, è disponibile il Dynamic Task Mapping, che permette di creare task dinamici in modo più elegante:
@task
def get_files():
return ['file1.csv', 'file2.csv', 'file3.csv']
@task
def process_file(filename):
# elabora il file
pass
files = get_files()
process_file.expand(filename=files)
Best Practice
- Evita di creare troppi task dinamici (centinaia) per non sovraccaricare lo scheduler
- Documenta la logica di generazione dinamica
- Testa i DAG dinamici con diverse configurazioni
- Usa il Task Mapping quando possibile (Airflow 2.3+)
- Mantieni le configurazioni esterne versionali
I DAG dinamici sono uno strumento potente che, se usato correttamente, può ridurre drasticamente la duplicazione del codice e rendere le pipeline più manutenibili.