Data Pipelines with Airflow: DAGs, Operators, and Production PatternsΒΆ
Ad-hoc scripts break. Airflow DAGs donβt. This notebook teaches the Airflow mental model β DAGs, tasks, operators, dependencies β with patterns for retry logic, alerting, and the common mistakes that cause production incidents.
Setup: Airflow Availability CheckΒΆ
Airflow requires a running scheduler, metadata database, and webserver. We detect whether itβs available and show all code as working patterns regardless β the ETL logic runs locally to make every concept concrete.
try:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.sensors.filesystem import FileSensor
from airflow.utils.dates import days_ago
HAS_AIRFLOW = True
print('Airflow is available:', __import__('airflow').__version__)
except ImportError:
HAS_AIRFLOW = False
print('Airflow not installed β showing all code patterns (pip install apache-airflow to run)')
import sqlite3
import pandas as pd
import json
import os
import time
import tempfile
from datetime import datetime, timedelta
from pathlib import Path
print('\nAll ETL logic below runs locally to demonstrate concepts.')
1. Airflow Core ConceptsΒΆ
Concept |
What it is |
Analogy |
|---|---|---|
DAG |
Directed Acyclic Graph β the workflow definition |
Recipe |
Task |
A single unit of work inside a DAG |
One step in the recipe |
Operator |
Template for a task type (Python, Bash, SQL, HTTPβ¦) |
Cooking technique |
DagRun |
One execution instance of a DAG |
One time you cook the recipe |
TaskInstance |
One execution of one task in a DagRun |
|
XCom |
Cross-communication: key-value store for passing data between tasks |
Shared notepad |
Scheduler |
Process that triggers DAGs based on schedule |
Kitchen timer |
Executor |
How tasks actually run (LocalExecutor, CeleryExecutor, K8s) |
Kitchen size |
DAG Rules:
Directed: dependencies flow one way (task A β task B)
Acyclic: no cycles (A β B β A would run forever)
Graph: multiple branches and merges allowed
extract_data β transform_data β load_to_db β send_report
β
fetch_config βββββββββββββββ
# Airflow DAG definition pattern (shown as string when Airflow not available)
dag_definition = '''
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
# Default arguments applied to every task in the DAG
default_args = {
'owner': 'data_team',
'depends_on_past': False, # Don't wait for previous day's run
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True, # 5m, 10m, 20m...
}
with DAG(
dag_id='ecommerce_etl',
default_args=default_args,
description='Daily e-commerce ETL pipeline',
schedule_interval='0 6 * * *', # 6am UTC daily (cron syntax)
start_date=datetime(2024, 1, 1),
catchup=False, # Don't backfill missed runs
tags=['ecommerce', 'etl'],
) as dag:
extract = PythonOperator(
task_id='extract_orders',
python_callable=extract_orders,
)
transform = PythonOperator(
task_id='transform_orders',
python_callable=transform_orders,
)
load = PythonOperator(
task_id='load_to_warehouse',
python_callable=load_to_warehouse,
)
# Set dependencies with >> (upstream) and << (downstream)
extract >> transform >> load
'''
print('DAG definition pattern:')
print(dag_definition)
2. A Complete ETL DAG: Extract β Transform β LoadΒΆ
We implement the full ETL logic as plain Python functions. In Airflow, these same functions are wrapped in PythonOperator. The business logic is identical β Airflow adds scheduling, retries, and monitoring.
# Setup: local SQLite "warehouse" and temp file for staging
STAGING_FILE = '/tmp/airflow_demo_orders.json'
conn = sqlite3.connect(':memory:')
# Simulate an external API / source database
SOURCE_ORDERS = [
{'order_id': i, 'customer_id': (i % 50) + 1,
'amount': round(10 + (i * 7.3) % 490, 2),
'status': ['completed','returned','pending'][i % 3],
'order_date': f'2023-{(i%12)+1:02d}-{(i%28)+1:02d}',
'raw_region': ['NORTH','SOUTH ','east','West '][i%4] # dirty data
}
for i in range(1, 101)
]
print(f'Source system has {len(SOURCE_ORDERS)} orders (simulated)')
# ---- TASK 1: EXTRACT ----
def extract_orders(**context):
"""
In production: call an API, read from S3, query a source DB.
Here: read from our simulated source and write to staging file.
Airflow context: provides execution_date, dag_id, etc.
Returns data via XCom (or writes to intermediate storage).
"""
execution_date = context.get('execution_date', datetime.now())
print(f'[extract] Running for {execution_date}')
# Simulate incremental extract (only new orders)
# In real Airflow: use context['execution_date'] for window
orders = SOURCE_ORDERS # would filter by date in production
# Write to staging (intermediate storage β not XCom for large data)
with open(STAGING_FILE, 'w') as f:
json.dump(orders, f)
print(f'[extract] Wrote {len(orders)} orders to {STAGING_FILE}')
return len(orders) # XCom return value (small metadata only)
# Run it
n = extract_orders(execution_date=datetime(2023, 12, 1))
print(f'Extracted: {n} rows')
# ---- TASK 2: TRANSFORM ----
def transform_orders(**context):
"""
Reads raw staging file, applies data quality rules, writes clean version.
Transformations:
- Normalize region (strip whitespace, lowercase, capitalize)
- Filter out invalid statuses
- Add derived columns (year, month, is_completed)
- Validate order amounts (no negatives)
"""
with open(STAGING_FILE) as f:
raw = json.load(f)
df = pd.DataFrame(raw)
print(f'[transform] Input: {len(df)} rows')
# Data quality
df['region'] = df['raw_region'].str.strip().str.title()
df['is_completed'] = df['status'] == 'completed'
df['order_year'] = pd.to_datetime(df['order_date']).dt.year
df['order_month'] = pd.to_datetime(df['order_date']).dt.month
# Validation: reject negative amounts
invalid = df[df['amount'] <= 0]
if len(invalid) > 0:
print(f'[transform] WARNING: {len(invalid)} invalid rows dropped')
df = df[df['amount'] > 0]
# Drop raw column
df = df.drop(columns=['raw_region'])
clean_file = STAGING_FILE.replace('.json', '_clean.json')
df.to_json(clean_file, orient='records')
print(f'[transform] Output: {len(df)} clean rows β {clean_file}')
return len(df)
transform_orders()
# Preview
clean_df = pd.read_json(STAGING_FILE.replace('.json', '_clean.json'))
print()
clean_df.head()
# ---- TASK 3: LOAD ----
def load_to_warehouse(**context):
"""
Idempotent load: DELETE matching rows then INSERT fresh data.
This ensures re-running the pipeline doesn't create duplicates.
"""
clean_file = STAGING_FILE.replace('.json', '_clean.json')
df = pd.read_json(clean_file)
# Create table if not exists
conn.execute('''
CREATE TABLE IF NOT EXISTS orders_warehouse (
order_id INTEGER PRIMARY KEY,
customer_id INTEGER,
amount REAL,
status TEXT,
order_date TEXT,
region TEXT,
is_completed INTEGER,
order_year INTEGER,
order_month INTEGER,
loaded_at TEXT
)
''')
df['loaded_at'] = datetime.now().isoformat()
df['is_completed'] = df['is_completed'].astype(int)
# Idempotent: upsert pattern
df.to_sql('orders_warehouse', conn, if_exists='replace', index=False)
conn.commit()
count = conn.execute('SELECT COUNT(*) FROM orders_warehouse').fetchone()[0]
print(f'[load] {count} rows in warehouse')
return count
load_to_warehouse()
# Verify
pd.read_sql('SELECT region, COUNT(*) as n, ROUND(AVG(amount),2) as avg_amount FROM orders_warehouse GROUP BY region', conn)
3. Sensor OperatorsΒΆ
Sensors are special operators that wait for a condition before allowing downstream tasks to proceed. They poll at a configurable interval.
Sensor |
What it waits for |
|---|---|
|
File to appear at a path |
|
HTTP endpoint to return 200 |
|
SQL query to return a non-empty result |
|
Another DAGβs task to complete |
|
S3 object to exist |
# Sensor pattern shown as code
sensor_patterns = '''
# FileSensor: wait for upstream file drop
from airflow.sensors.filesystem import FileSensor
wait_for_file = FileSensor(
task_id='wait_for_daily_export',
filepath='/data/exports/orders_{{ ds }}.csv', # ds = execution date YYYY-MM-DD
poke_interval=60, # check every 60 seconds
timeout=3600, # fail after 1 hour
mode='reschedule', # release worker slot while waiting (efficient)
soft_fail=True, # mark as skipped instead of failed on timeout
)
# HttpSensor: wait for API to be available
from airflow.sensors.http_sensor import HttpSensor
api_ready = HttpSensor(
task_id='wait_for_api',
http_conn_id='orders_api',
endpoint='/health',
poke_interval=30,
timeout=600,
mode='reschedule',
)
# Dependency: sensor must pass before extract runs
wait_for_file >> extract >> transform >> load
api_ready >> extract
'''
print('Sensor patterns:')
print(sensor_patterns)
# Local equivalent: polling loop
def wait_for_file_local(filepath, timeout_sec=30, poll_interval=1):
"""Local equivalent of FileSensor."""
deadline = time.time() + timeout_sec
while time.time() < deadline:
if Path(filepath).exists():
print(f'File found: {filepath}')
return True
print(f'Waiting for {filepath}...')
time.sleep(poll_interval)
raise TimeoutError(f'File not found within {timeout_sec}s: {filepath}')
# Demo: create file, then sense it
test_file = '/tmp/airflow_demo_ready.flag'
Path(test_file).write_text('ready')
wait_for_file_local(test_file)
4. Dynamic DAGs: Generating DAGs from ConfigΒΆ
Instead of one DAG per pipeline, generate DAGs programmatically from a config dictionary. Airflow scans all Python files in the DAGs folder and picks up any DAG objects at module level.
dynamic_dag_pattern = '''
# dynamic_dags.py β generates one DAG per table in config
PIPELINE_CONFIG = [
{'table': 'orders', 'schedule': '0 6 * * *', 'source': 's3://bucket/orders/'},
{'table': 'customers', 'schedule': '0 7 * * *', 'source': 's3://bucket/customers/'},
{'table': 'products', 'schedule': '0 1 * * 1', 'source': 's3://bucket/products/'},
]
def make_dag(config):
dag = DAG(
dag_id=f"sync_{config['table']}",
schedule_interval=config['schedule'],
start_date=datetime(2024, 1, 1),
catchup=False,
)
with dag:
extract = PythonOperator(
task_id=f"extract_{config['table']}",
python_callable=lambda: extract_from_s3(config['source']),
)
load = PythonOperator(
task_id=f"load_{config['table']}",
python_callable=lambda: load_table(config['table']),
)
extract >> load
return dag
# Register all DAGs in module namespace (Airflow picks these up)
for config in PIPELINE_CONFIG:
globals()[f"dag_{config['table']}"] = make_dag(config)
'''
print('Dynamic DAG generation pattern:')
print(dynamic_dag_pattern)
# Demonstrate the Python logic locally
PIPELINE_CONFIG = [
{'table': 'orders', 'schedule': '0 6 * * *', 'source': 's3://bucket/orders/'},
{'table': 'customers', 'schedule': '0 7 * * *', 'source': 's3://bucket/customers/'},
{'table': 'products', 'schedule': '0 1 * * 1', 'source': 's3://bucket/products/'},
]
print('DAGs that would be generated:')
for cfg in PIPELINE_CONFIG:
print(f" dag_id=sync_{cfg['table']:12s} schedule={cfg['schedule']}")
5. XCom: Passing Data Between TasksΒΆ
xcom_pattern = '''
# XCom = Cross-Communication. Tasks push small values to Airflow metadata DB.
# Rule: XCom is for METADATA (row counts, file paths, status codes)
# NOT for large data (use S3/GCS/staging tables for that)
def extract(**context):
orders = fetch_from_api()
# Push: explicit push
context["ti"].xcom_push(key="row_count", value=len(orders))
context["ti"].xcom_push(key="staging_path", value="/tmp/orders_20240101.json")
return len(orders) # implicit push via return (key="return_value")
def transform(**context):
# Pull: get value from extract task
staging_path = context["ti"].xcom_pull(
task_ids="extract",
key="staging_path"
)
row_count = context["ti"].xcom_pull(
task_ids="extract",
key="return_value" # implicit return
)
print(f"Processing {row_count} rows from {staging_path}")
# Jinja template access in operators:
BashOperator(
task_id="notify",
bash_command="echo 'Loaded {{ ti.xcom_pull(task_ids=\"load\") }} rows'"
)
'''
print('XCom pattern:')
print(xcom_pattern)
# Simulate XCom with a dict
xcom_store = {}
def push_xcom(task_id, key, value):
xcom_store[(task_id, key)] = value
def pull_xcom(task_id, key):
return xcom_store.get((task_id, key))
push_xcom('extract', 'row_count', 100)
push_xcom('extract', 'staging_path', '/tmp/orders.json')
push_xcom('transform', 'clean_count', 97)
print('Local XCom store simulation:')
for k, v in xcom_store.items():
print(f' [{k[0]}][{k[1]}] = {v}')
6. Error Handling: Retries, SLA, AlertsΒΆ
error_handling_pattern = '''
from airflow.utils.email import send_email
def on_failure_callback(context):
"""Called when a task fails after all retries."""
dag_id = context["dag"].dag_id
task_id = context["task_instance"].task_id
exc = context.get("exception", "Unknown")
# Option 1: send email
send_email(
to=["oncall@company.com"],
subject=f"[AIRFLOW FAILURE] {dag_id}.{task_id}",
html_content=f"Task {task_id} failed: {exc}",
)
# Option 2: post to Slack via requests
import requests
requests.post(SLACK_WEBHOOK, json={"text": f":red_circle: {dag_id}.{task_id} failed"})
def on_sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
"""Called when a task exceeds its SLA."""
print(f"SLA missed for tasks: {task_list}")
default_args = {
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(minutes=60),
'email_on_failure': True,
'email_on_retry': False,
'on_failure_callback': on_failure_callback,
}
with DAG(
dag_id='robust_etl',
default_args=default_args,
sla_miss_callback=on_sla_miss_callback,
...
) as dag:
critical_task = PythonOperator(
task_id='critical_transform',
python_callable=transform_orders,
sla=timedelta(hours=2), # alert if not done in 2h
)
'''
print('Error handling patterns:')
print(error_handling_pattern)
# Demonstrate retry logic locally
import random as rand
rand.seed(99)
def flaky_api_call(attempt):
"""Simulates an unreliable API: fails 60% of the time."""
if rand.random() < 0.6:
raise ConnectionError(f'API timeout on attempt {attempt}')
return {'data': 'success', 'attempt': attempt}
def run_with_retries(func, max_retries=3, base_delay=0.1):
"""Exponential backoff retry decorator (local Airflow equivalent)."""
for attempt in range(1, max_retries + 1):
try:
result = func(attempt)
print(f' Attempt {attempt}: SUCCESS β {result}')
return result
except Exception as e:
delay = base_delay * (2 ** (attempt - 1))
print(f' Attempt {attempt}: FAILED ({e}) β retry in {delay:.1f}s')
if attempt < max_retries:
time.sleep(delay)
raise RuntimeError(f'All {max_retries} attempts failed')
print('Retry with exponential backoff (simulated):')
try:
result = run_with_retries(flaky_api_call, max_retries=5, base_delay=0.05)
except RuntimeError as e:
print(f'Final failure: {e}')
7. Production PatternsΒΆ
IdempotencyΒΆ
A task is idempotent if running it multiple times produces the same result. This is critical because Airflow retries tasks on failure β non-idempotent tasks create duplicates.
Backfill and catchupΒΆ
catchup=True(default): Airflow runs all missed DAG runs sincestart_date. Useful for initial data load.catchup=False: Only run the latest. Use for pipelines that donβt need history.airflow dags backfill -s 2024-01-01 -e 2024-01-31 my_dag: Manual backfill.
# Idempotency patterns
idempotency_patterns = '''
# Pattern 1: DELETE then INSERT (for small tables)
def load_idempotent(conn, df, table, partition_col, partition_value):
conn.execute(f"DELETE FROM {table} WHERE {partition_col} = ?", (partition_value,))
df.to_sql(table, conn, if_exists='append', index=False)
# Pattern 2: MERGE / UPSERT (INSERT OR REPLACE in SQLite)
sql = """
INSERT OR REPLACE INTO orders_warehouse
(order_id, customer_id, amount, status, order_date)
VALUES (?, ?, ?, ?, ?)
"""
# Pattern 3: Staging table swap (atomic, zero-downtime)
def atomic_swap(conn, staging_table, target_table):
conn.execute(f"DROP TABLE IF EXISTS {target_table}_old")
conn.execute(f"ALTER TABLE {target_table} RENAME TO {target_table}_old")
conn.execute(f"ALTER TABLE {staging_table} RENAME TO {target_table}")
conn.commit()
# Rollback: rename _old back if needed
# Pattern 4: Write with execution_date partition
def load_partitioned(conn, df, execution_date):
partition = execution_date.strftime('%Y-%m-%d')
conn.execute("DELETE FROM fact_orders WHERE partition_date = ?", (partition,))
df['partition_date'] = partition
df.to_sql('fact_orders', conn, if_exists='append', index=False)
'''
print('Idempotency patterns:')
print(idempotency_patterns)
# Task Groups pattern (Airflow 2.0+)
task_groups_pattern = '''
from airflow.utils.task_group import TaskGroup
with DAG('grouped_etl', ...) as dag:
with TaskGroup('ingestion') as ingest_group:
extract_orders = PythonOperator(task_id='extract_orders', ...)
extract_customers = PythonOperator(task_id='extract_customers', ...)
extract_products = PythonOperator(task_id='extract_products', ...)
with TaskGroup('transformation') as transform_group:
clean_orders = PythonOperator(task_id='clean_orders', ...)
build_fact = PythonOperator(task_id='build_fact_table', ...)
clean_orders >> build_fact
load = PythonOperator(task_id='load_to_warehouse', ...)
# All ingestion tasks must complete before transformation
ingest_group >> transform_group >> load
'''
print('Task Groups pattern (cleaner DAG visualization):')
print(task_groups_pattern)
8. Alternative: Prefect (Modern Airflow Alternative)ΒΆ
Prefect takes a different philosophy: write Python-first workflows, no XML/YAML, with a simpler mental model.
Feature |
Airflow |
Prefect |
|---|---|---|
Workflow definition |
DAG (Python) |
|
Task definition |
Operator |
|
Scheduling |
Cron in DAG |
Schedule + deployments |
Dynamic workflows |
Complex |
Native (tasks return futures) |
Local testing |
Requires setup |
|
Learning curve |
High |
Lower |
Ecosystem |
Very mature |
Growing rapidly |
try:
from prefect import flow, task
HAS_PREFECT = True
print('Prefect available')
except ImportError:
HAS_PREFECT = False
print('Prefect not installed (pip install prefect)')
prefect_equivalent = '''
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(retries=3, retry_delay_seconds=60,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1))
def extract_orders(date: str) -> list:
return fetch_from_api(date)
@task
def transform_orders(orders: list) -> pd.DataFrame:
return clean_and_enrich(orders)
@task
def load_to_warehouse(df: pd.DataFrame) -> int:
return write_to_db(df)
@flow(name='ecommerce-etl')
def etl_pipeline(date: str = None):
# Tasks are called like regular Python functions
raw = extract_orders(date)
clean = transform_orders(raw)
n_rows = load_to_warehouse(clean)
return n_rows
# Run locally:
etl_pipeline(date="2024-01-01")
# Deploy with schedule:
# prefect deployment build etl.py:etl_pipeline -n daily --cron "0 6 * * *"
'''
print('Prefect equivalent of the Airflow ETL DAG:')
print(prefect_equivalent)
Cheat SheetΒΆ
# DAG skeleton
with DAG(
dag_id='my_pipeline',
schedule_interval='0 6 * * *', # cron or '@daily', '@hourly'
start_date=datetime(2024,1,1),
catchup=False, # ALWAYS set False unless backfill needed
default_args={'retries': 3, 'retry_delay': timedelta(minutes=5)},
) as dag: ...
# Dependencies
a >> b >> c # a β b β c
[a, b] >> c # a,b β c (fan-in)
a >> [b, c] # a β b,c (fan-out)
a.set_downstream(b) # same as a >> b
# XCom
ti.xcom_push(key='path', value='/tmp/file.csv')
ti.xcom_pull(task_ids='extract', key='path')
# Idempotency: DELETE-then-INSERT or INSERT OR REPLACE
# Sensors: use mode='reschedule' to avoid holding worker slot
# Production: catchup=False, set retries + retry_delay, on_failure_callback
# CLI commands
airflow dags list
airflow dags trigger my_dag
airflow dags backfill -s 2024-01-01 -e 2024-01-31 my_dag
airflow tasks test my_dag task_id 2024-01-01
9. ExercisesΒΆ
ETL pipeline: Extend the local ETL pipeline to add a fourth task:
generate_reportthat reads from the warehouse, computes revenue by region, and writes a JSON summary file. Chain all four tasks and run them sequentially.Idempotency test: Modify the
load_to_warehousefunction to useINSERT OR REPLACE(upsert). Run it three times on the same data. Verify the row count stays constant.Retry simulation: Write a
run_with_retrieswrapper that uses exponential backoff with jitter (random additional delay 0-1s). Test it with a function that fails the first 2 attempts and succeeds on the 3rd.Dynamic pipeline: Given a list of 5 table configs (
{'table': name, 'source_file': path}), write a function that generates and runs an ETL pipeline for each table. This mirrors how dynamic DAGs work in production.Prefect comparison: If you have Prefect installed (
pip install prefect), rewrite the ETL pipeline using@flowand@taskdecorators. Compare the code length and complexity to the Airflow version.