Back

XCom e passaggio di dati tra i task in Apache Airflow

Uno degli strumenti più utili in Apache Airflow è XCom, che permette di passare dati tra i task all’interno di un DAG. XCom (acronimo di “Cross-Communication”) consente ai task di condividere informazioni senza la necessità di passare dati tramite variabili globali o file esterni. In questo articolo vedremo come funziona XCom e come utilizzarlo per condividere informazioni tra task all’interno di un flusso di lavoro.

Cosa sono gli XCom in Apache Airflow?

Gli XCom sono piccoli frammenti di dati che possono essere scambiati tra i task all’interno di un DAG. Ogni task può “inviare” un XCom e uno o più task possono “prelevarlo” in seguito. Gli XCom sono memorizzati nel database di Airflow e possono contenere qualsiasi tipo di dato serializzabile in JSON, come stringhe, numeri o oggetti Python semplici.

Come inviare dati con XCom

Per inviare dati utilizzando XCom, un task deve restituire un valore che verrà automaticamente memorizzato come XCom. Ecco un esempio di come un task può inviare un XCom:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

# Funzione che invia un XCom
def push_xcom(**kwargs):
    return "Messaggio inviato tramite XCom!"

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 11, 1),
}

dag = DAG('xcom_push_example', default_args=default_args, schedule_interval='@daily')

push_task = PythonOperator(
    task_id='push_task',
    python_callable=push_xcom,
    provide_context=True,  # Necessario per usare XCom
    dag=dag
)

In questo esempio, il task push_task restituisce una stringa che viene automaticamente memorizzata come XCom. Il valore “Messaggio inviato tramite XCom!” può essere recuperato da altri task all’interno del DAG.

Come prelevare dati con XCom

Per prelevare un XCom inviato da un altro task, è possibile utilizzare la funzione xcom_pull. Questo consente a un task di recuperare i dati inviati da un altro task. Ecco un esempio di come prelevare un XCom:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

# Funzione che preleva un XCom
def pull_xcom(**kwargs):
    ti = kwargs['ti']
    message = ti.xcom_pull(task_ids='push_task')
    print(f"Messaggio ricevuto: {message}")

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 11, 1),
}

dag = DAG('xcom_pull_example', default_args=default_args, schedule_interval='@daily')

pull_task = PythonOperator(
    task_id='pull_task',
    python_callable=pull_xcom,
    provide_context=True,  # Necessario per usare XCom
    dag=dag
)

In questo esempio, il task pull_task recupera il messaggio inviato dal task push_task utilizzando xcom_pull. Il messaggio viene quindi stampato nel log.

Utilizzare XCom con operatori personalizzati

Gli XCom possono essere utilizzati con qualsiasi operatore in Airflow, inclusi gli operatori personalizzati. Ad esempio, puoi utilizzare XCom per passare dati tra task che eseguono query SQL o script Python complessi. Qui sotto trovi un esempio di utilizzo di XCom con un PythonOperator e un task che calcola un valore:

def calculate_value(**kwargs):
    return 42  # Invia il risultato come XCom

def print_value(**kwargs):
    ti = kwargs['ti']
    value = ti.xcom_pull(task_ids='calculate_task')
    print(f"Il valore calcolato è: {value}")

dag = DAG('xcom_custom_example', default_args=default_args, schedule_interval='@daily')

calculate_task = PythonOperator(
    task_id='calculate_task',
    python_callable=calculate_value,
    provide_context=True,
    dag=dag
)

print_task = PythonOperator(
    task_id='print_task',
    python_callable=print_value,
    provide_context=True,
    dag=dag
)

calculate_task >> print_task

In questo esempio, il task calculate_task restituisce un valore numerico (42) come XCom, che viene poi recuperato e stampato dal task print_task.

Limitazioni e best practice per XCom

Anche se XCom è uno strumento potente, è importante utilizzarlo con attenzione. Gli XCom sono memorizzati nel database di Airflow e, se usati impropriamente, possono causare sovraccarico del sistema. Alcune best practice includono:

  • Limitare la dimensione dei dati inviati tramite XCom (piccoli frammenti di dati come stringhe o numeri).
  • Non utilizzare XCom per trasferire file o grandi quantità di dati; è meglio utilizzare sistemi di storage esterni per questo scopo.
  • Ricorda che gli XCom sono visibili a tutti i task nel DAG, quindi usa con attenzione dati sensibili.
Esempio di utilizzo di XCom in Apache Airflow

Conclusione

Gli XCom in Apache Airflow sono strumenti potenti per il passaggio di dati tra task. Utilizzando XCom, puoi rendere i tuoi flussi di lavoro più dinamici e automatizzati, consentendo ai task di condividere informazioni senza bisogno di dipendenze esterne. Ora che hai appreso come inviare e prelevare dati con XCom, sei pronto a costruire DAG più complessi e interattivi.

Fonte: Documentazione ufficiale di Apache Airflow

Per altri tutorial, visita la sezione: Tutti gli articoli