Uno degli strumenti più utili in Apache Airflow è XCom, che permette di passare dati tra i task all'interno di un DAG. XCom (acronimo di "Cross-Communication") consente ai task di condividere informazioni in modo efficiente e controllato.
Cos'è XCom
XCom è un meccanismo di scambio dati integrato in Airflow. Ogni XCom è caratterizzato da:
- key: un identificatore univoco per il dato
- value: il valore da condividere (deve essere serializzabile)
- task_id: il task che ha generato il dato
- dag_id: il DAG di appartenenza
- execution_date: la data di esecuzione
Push e Pull
I due metodi principali per interagire con XCom sono:
xcom_push — Inviare dati
def push_data(**context):
context['ti'].xcom_push(key='result', value={'count': 42})
xcom_pull — Ricevere dati
def pull_data(**context):
result = context['ti'].xcom_pull(
task_ids='push_task',
key='result'
)
print(f"Ricevuto: {result}") # {'count': 42}
Return value automatico
Quando un PythonOperator ritorna un valore, questo viene automaticamente salvato come XCom con la key return_value:
@task
def calculate():
return 42 # salvato automaticamente in XCom
@task
def use_result(value):
print(f"Il risultato è: {value}")
result = calculate()
use_result(result) # TaskFlow API gestisce XCom automaticamente
Limitazioni e considerazioni
- XCom è progettato per piccoli volumi di dati (metadati, percorsi file, conteggi)
- Non usare XCom per trasferire dataset completi — usa piuttosto file su storage condiviso
- Il limite di default per un singolo XCom è di 48 KB (configurabile)
- I valori XCom devono essere serializzabili in JSON (o pickle, se configurato)
Custom XCom Backend
Per gestire dati più grandi, è possibile implementare un Custom XCom Backend che salva i dati su un sistema di storage esterno (S3, GCS, Redis) invece che nel database di Airflow.
Best Practice
- Usa XCom solo per piccoli dati di coordinamento
- Preferisci la TaskFlow API per gestione automatica
- Implementa un Custom XCom Backend per dati più grandi
- Pulisci periodicamente gli XCom vecchi dal database
XCom è uno strumento essenziale per costruire pipeline di dati interconnesse e flessibili in Apache Airflow.