Vai al contenuto principale
BlogCreare e utilizzare i DAG dinamici in Apache Airflow
Apache Airflow

Creare e utilizzare i DAG dinamici in Apache Airflow

6 Nov 20245 min lettura
SC

Stanislao Corvino

Dottore Commercialista · Data Evangelist

In Apache Airflow, i DAG dinamici offrono la flessibilità di creare flussi di lavoro che si adattano automaticamente in base a parametri o condizioni specifiche. I DAG dinamici permettono di generare task e dipendenze al volo, rendendo le pipeline più flessibili e manutenibili.

Perché usare DAG dinamici

I DAG dinamici sono utili quando:

  • Il numero di task non è noto a priori
  • La struttura del DAG dipende da configurazioni esterne
  • Si vogliono creare pipeline riutilizzabili per diversi dataset
  • Le dipendenze tra task cambiano in base a condizioni runtime

Approcci per creare DAG dinamici

1. Generazione con cicli Python

Il modo più semplice è usare cicli Python nel file di definizione del DAG:

tables = ['users', 'orders', 'products', 'reviews']

with DAG('etl_pipeline', ...) as dag:
    for table in tables:
        extract = PythonOperator(
            task_id=f'extract_{table}',
            python_callable=extract_data,
            op_kwargs={'table': table}
        )
        load = PythonOperator(
            task_id=f'load_{table}',
            python_callable=load_data,
            op_kwargs={'table': table}
        )
        extract >> load

2. DAG Factory Pattern

Per configurazioni più complesse, si può usare il pattern DAG Factory:

def create_dag(dag_id, config):
    dag = DAG(dag_id, default_args=config['defaults'])
    with dag:
        # Crea task basati sulla configurazione
        for step in config['steps']:
            task = PythonOperator(
                task_id=step['name'],
                python_callable=step['callable']
            )
    return dag

3. Configurazione da file esterni

È possibile leggere la configurazione dei DAG da file YAML o JSON:

import yaml

with open('/config/pipelines.yaml') as f:
    configs = yaml.safe_load(f)

for pipeline in configs['pipelines']:
    dag = create_dag(pipeline['name'], pipeline)

Task Mapping (Airflow 2.3+)

A partire da Airflow 2.3, è disponibile il Dynamic Task Mapping, che permette di creare task dinamici in modo più elegante:

@task
def get_files():
    return ['file1.csv', 'file2.csv', 'file3.csv']

@task
def process_file(filename):
    # elabora il file
    pass

files = get_files()
process_file.expand(filename=files)

Best Practice

  • Evita di creare troppi task dinamici (centinaia) per non sovraccaricare lo scheduler
  • Documenta la logica di generazione dinamica
  • Testa i DAG dinamici con diverse configurazioni
  • Usa il Task Mapping quando possibile (Airflow 2.3+)
  • Mantieni le configurazioni esterne versionali

I DAG dinamici sono uno strumento potente che, se usato correttamente, può ridurre drasticamente la duplicazione del codice e rendere le pipeline più manutenibili.

Tag:
Apache AirflowDAGPython

Articoli correlati