Gli Operatori di Apache Airflow sono il cuore della piattaforma e permettono di eseguire vari tipi di task all’interno dei DAG. Esploreremo i tre operatori più utilizzati: BashOperator, PythonOperator e SqlOperator.
BashOperator
Esegue comandi Bash. Utile per script shell e operazioni su file:
from airflow.operators.bash import BashOperator
run_etl = BashOperator(
task_id='run_etl_script',
bash_command='/scripts/etl.sh {{ ds }}',
)PythonOperator
Esegue una funzione Python. L’operatore più flessibile:
from airflow.operators.python import PythonOperator
def process_data(ds, **kwargs):
print(f"Elaborazione dati per {ds}")
return {'records_processed': 1000}
process_task = PythonOperator(
task_id='process_data',
python_callable=process_data,
)TaskFlow API (Airflow 2.0+)
from airflow.decorators import task
@task
def extract():
return {'data': [1, 2, 3]}
@task
def transform(data):
return [x * 2 for x in data['data']]SqlOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
create_table = PostgresOperator(
task_id='create_table',
postgres_conn_id='my_postgres',
sql="""CREATE TABLE IF NOT EXISTS daily_report (
date DATE, total_orders INT, revenue DECIMAL(10,2)
);""",
)Best Practice
- Usa la TaskFlow API per codice più leggibile
- Mantieni i task atomici
- Preferisci operatori specifici a quelli generici
- Usa i template Jinja per parametrizzare i task
Padroneggiare gli operatori è il primo passo per pipeline efficaci e manutenibili.