Data Pipelines with Airflow: DAGs, Operators, and Production PatternsΒΆ

Ad-hoc scripts break. Airflow DAGs don’t. This notebook teaches the Airflow mental model β€” DAGs, tasks, operators, dependencies β€” with patterns for retry logic, alerting, and the common mistakes that cause production incidents.

Setup: Airflow Availability CheckΒΆ

Airflow requires a running scheduler, metadata database, and webserver. We detect whether it’s available and show all code as working patterns regardless β€” the ETL logic runs locally to make every concept concrete.

try:
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from airflow.operators.bash import BashOperator
    from airflow.sensors.filesystem import FileSensor
    from airflow.utils.dates import days_ago
    HAS_AIRFLOW = True
    print('Airflow is available:', __import__('airflow').__version__)
except ImportError:
    HAS_AIRFLOW = False
    print('Airflow not installed β€” showing all code patterns (pip install apache-airflow to run)')

import sqlite3
import pandas as pd
import json
import os
import time
import tempfile
from datetime import datetime, timedelta
from pathlib import Path

print('\nAll ETL logic below runs locally to demonstrate concepts.')

1. Airflow Core ConceptsΒΆ

Concept

What it is

Analogy

DAG

Directed Acyclic Graph β€” the workflow definition

Recipe

Task

A single unit of work inside a DAG

One step in the recipe

Operator

Template for a task type (Python, Bash, SQL, HTTP…)

Cooking technique

DagRun

One execution instance of a DAG

One time you cook the recipe

TaskInstance

One execution of one task in a DagRun

XCom

Cross-communication: key-value store for passing data between tasks

Shared notepad

Scheduler

Process that triggers DAGs based on schedule

Kitchen timer

Executor

How tasks actually run (LocalExecutor, CeleryExecutor, K8s)

Kitchen size

DAG Rules:

  • Directed: dependencies flow one way (task A β†’ task B)

  • Acyclic: no cycles (A β†’ B β†’ A would run forever)

  • Graph: multiple branches and merges allowed

extract_data β†’ transform_data β†’ load_to_db β†’ send_report
                             β†—
fetch_config ───────────────
# Airflow DAG definition pattern (shown as string when Airflow not available)
dag_definition = '''
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# Default arguments applied to every task in the DAG
default_args = {
    'owner':            'data_team',
    'depends_on_past':  False,          # Don't wait for previous day's run
    'email_on_failure': True,
    'email_on_retry':   False,
    'retries':          3,
    'retry_delay':      timedelta(minutes=5),
    'retry_exponential_backoff': True,  # 5m, 10m, 20m...
}

with DAG(
    dag_id='ecommerce_etl',
    default_args=default_args,
    description='Daily e-commerce ETL pipeline',
    schedule_interval='0 6 * * *',      # 6am UTC daily (cron syntax)
    start_date=datetime(2024, 1, 1),
    catchup=False,                      # Don't backfill missed runs
    tags=['ecommerce', 'etl'],
) as dag:

    extract = PythonOperator(
        task_id='extract_orders',
        python_callable=extract_orders,
    )

    transform = PythonOperator(
        task_id='transform_orders',
        python_callable=transform_orders,
    )

    load = PythonOperator(
        task_id='load_to_warehouse',
        python_callable=load_to_warehouse,
    )

    # Set dependencies with >> (upstream) and << (downstream)
    extract >> transform >> load
'''

print('DAG definition pattern:')
print(dag_definition)

2. A Complete ETL DAG: Extract β†’ Transform β†’ LoadΒΆ

We implement the full ETL logic as plain Python functions. In Airflow, these same functions are wrapped in PythonOperator. The business logic is identical β€” Airflow adds scheduling, retries, and monitoring.

# Setup: local SQLite "warehouse" and temp file for staging
STAGING_FILE = '/tmp/airflow_demo_orders.json'
conn = sqlite3.connect(':memory:')

# Simulate an external API / source database
SOURCE_ORDERS = [
    {'order_id': i, 'customer_id': (i % 50) + 1,
     'amount': round(10 + (i * 7.3) % 490, 2),
     'status': ['completed','returned','pending'][i % 3],
     'order_date': f'2023-{(i%12)+1:02d}-{(i%28)+1:02d}',
     'raw_region': ['NORTH','SOUTH ','east','West '][i%4]  # dirty data
    }
    for i in range(1, 101)
]
print(f'Source system has {len(SOURCE_ORDERS)} orders (simulated)')
# ---- TASK 1: EXTRACT ----
def extract_orders(**context):
    """
    In production: call an API, read from S3, query a source DB.
    Here: read from our simulated source and write to staging file.
    
    Airflow context: provides execution_date, dag_id, etc.
    Returns data via XCom (or writes to intermediate storage).
    """
    execution_date = context.get('execution_date', datetime.now())
    print(f'[extract] Running for {execution_date}')
    
    # Simulate incremental extract (only new orders)
    # In real Airflow: use context['execution_date'] for window
    orders = SOURCE_ORDERS  # would filter by date in production
    
    # Write to staging (intermediate storage β€” not XCom for large data)
    with open(STAGING_FILE, 'w') as f:
        json.dump(orders, f)
    
    print(f'[extract] Wrote {len(orders)} orders to {STAGING_FILE}')
    return len(orders)  # XCom return value (small metadata only)

# Run it
n = extract_orders(execution_date=datetime(2023, 12, 1))
print(f'Extracted: {n} rows')
# ---- TASK 2: TRANSFORM ----
def transform_orders(**context):
    """
    Reads raw staging file, applies data quality rules, writes clean version.
    Transformations:
      - Normalize region (strip whitespace, lowercase, capitalize)
      - Filter out invalid statuses
      - Add derived columns (year, month, is_completed)
      - Validate order amounts (no negatives)
    """
    with open(STAGING_FILE) as f:
        raw = json.load(f)
    
    df = pd.DataFrame(raw)
    print(f'[transform] Input: {len(df)} rows')
    
    # Data quality
    df['region']       = df['raw_region'].str.strip().str.title()
    df['is_completed'] = df['status'] == 'completed'
    df['order_year']   = pd.to_datetime(df['order_date']).dt.year
    df['order_month']  = pd.to_datetime(df['order_date']).dt.month
    
    # Validation: reject negative amounts
    invalid = df[df['amount'] <= 0]
    if len(invalid) > 0:
        print(f'[transform] WARNING: {len(invalid)} invalid rows dropped')
    df = df[df['amount'] > 0]
    
    # Drop raw column
    df = df.drop(columns=['raw_region'])
    
    clean_file = STAGING_FILE.replace('.json', '_clean.json')
    df.to_json(clean_file, orient='records')
    
    print(f'[transform] Output: {len(df)} clean rows β†’ {clean_file}')
    return len(df)

transform_orders()

# Preview
clean_df = pd.read_json(STAGING_FILE.replace('.json', '_clean.json'))
print()
clean_df.head()
# ---- TASK 3: LOAD ----
def load_to_warehouse(**context):
    """
    Idempotent load: DELETE matching rows then INSERT fresh data.
    This ensures re-running the pipeline doesn't create duplicates.
    """
    clean_file = STAGING_FILE.replace('.json', '_clean.json')
    df = pd.read_json(clean_file)
    
    # Create table if not exists
    conn.execute('''
        CREATE TABLE IF NOT EXISTS orders_warehouse (
            order_id    INTEGER PRIMARY KEY,
            customer_id INTEGER,
            amount      REAL,
            status      TEXT,
            order_date  TEXT,
            region      TEXT,
            is_completed INTEGER,
            order_year  INTEGER,
            order_month INTEGER,
            loaded_at   TEXT
        )
    ''')
    
    df['loaded_at'] = datetime.now().isoformat()
    df['is_completed'] = df['is_completed'].astype(int)
    
    # Idempotent: upsert pattern
    df.to_sql('orders_warehouse', conn, if_exists='replace', index=False)
    conn.commit()
    
    count = conn.execute('SELECT COUNT(*) FROM orders_warehouse').fetchone()[0]
    print(f'[load] {count} rows in warehouse')
    return count

load_to_warehouse()

# Verify
pd.read_sql('SELECT region, COUNT(*) as n, ROUND(AVG(amount),2) as avg_amount FROM orders_warehouse GROUP BY region', conn)

3. Sensor OperatorsΒΆ

Sensors are special operators that wait for a condition before allowing downstream tasks to proceed. They poll at a configurable interval.

Sensor

What it waits for

FileSensor

File to appear at a path

HttpSensor

HTTP endpoint to return 200

SqlSensor

SQL query to return a non-empty result

ExternalTaskSensor

Another DAG’s task to complete

S3KeySensor

S3 object to exist

# Sensor pattern shown as code
sensor_patterns = '''
# FileSensor: wait for upstream file drop
from airflow.sensors.filesystem import FileSensor

wait_for_file = FileSensor(
    task_id='wait_for_daily_export',
    filepath='/data/exports/orders_{{ ds }}.csv',  # ds = execution date YYYY-MM-DD
    poke_interval=60,    # check every 60 seconds
    timeout=3600,        # fail after 1 hour
    mode='reschedule',   # release worker slot while waiting (efficient)
    soft_fail=True,      # mark as skipped instead of failed on timeout
)

# HttpSensor: wait for API to be available
from airflow.sensors.http_sensor import HttpSensor

api_ready = HttpSensor(
    task_id='wait_for_api',
    http_conn_id='orders_api',
    endpoint='/health',
    poke_interval=30,
    timeout=600,
    mode='reschedule',
)

# Dependency: sensor must pass before extract runs
wait_for_file >> extract >> transform >> load
api_ready     >> extract
'''

print('Sensor patterns:')
print(sensor_patterns)

# Local equivalent: polling loop
def wait_for_file_local(filepath, timeout_sec=30, poll_interval=1):
    """Local equivalent of FileSensor."""
    deadline = time.time() + timeout_sec
    while time.time() < deadline:
        if Path(filepath).exists():
            print(f'File found: {filepath}')
            return True
        print(f'Waiting for {filepath}...')
        time.sleep(poll_interval)
    raise TimeoutError(f'File not found within {timeout_sec}s: {filepath}')

# Demo: create file, then sense it
test_file = '/tmp/airflow_demo_ready.flag'
Path(test_file).write_text('ready')
wait_for_file_local(test_file)

4. Dynamic DAGs: Generating DAGs from ConfigΒΆ

Instead of one DAG per pipeline, generate DAGs programmatically from a config dictionary. Airflow scans all Python files in the DAGs folder and picks up any DAG objects at module level.

dynamic_dag_pattern = '''
# dynamic_dags.py β€” generates one DAG per table in config

PIPELINE_CONFIG = [
    {'table': 'orders',    'schedule': '0 6 * * *',  'source': 's3://bucket/orders/'},
    {'table': 'customers', 'schedule': '0 7 * * *',  'source': 's3://bucket/customers/'},
    {'table': 'products',  'schedule': '0 1 * * 1',  'source': 's3://bucket/products/'},
]

def make_dag(config):
    dag = DAG(
        dag_id=f"sync_{config['table']}",
        schedule_interval=config['schedule'],
        start_date=datetime(2024, 1, 1),
        catchup=False,
    )
    with dag:
        extract = PythonOperator(
            task_id=f"extract_{config['table']}",
            python_callable=lambda: extract_from_s3(config['source']),
        )
        load = PythonOperator(
            task_id=f"load_{config['table']}",
            python_callable=lambda: load_table(config['table']),
        )
        extract >> load
    return dag

# Register all DAGs in module namespace (Airflow picks these up)
for config in PIPELINE_CONFIG:
    globals()[f"dag_{config['table']}"] = make_dag(config)
'''

print('Dynamic DAG generation pattern:')
print(dynamic_dag_pattern)

# Demonstrate the Python logic locally
PIPELINE_CONFIG = [
    {'table': 'orders',    'schedule': '0 6 * * *',  'source': 's3://bucket/orders/'},
    {'table': 'customers', 'schedule': '0 7 * * *',  'source': 's3://bucket/customers/'},
    {'table': 'products',  'schedule': '0 1 * * 1',  'source': 's3://bucket/products/'},
]

print('DAGs that would be generated:')
for cfg in PIPELINE_CONFIG:
    print(f"  dag_id=sync_{cfg['table']:12s}  schedule={cfg['schedule']}")

5. XCom: Passing Data Between TasksΒΆ

xcom_pattern = '''
# XCom = Cross-Communication. Tasks push small values to Airflow metadata DB.
# Rule: XCom is for METADATA (row counts, file paths, status codes)
#       NOT for large data (use S3/GCS/staging tables for that)

def extract(**context):
    orders = fetch_from_api()
    # Push: explicit push
    context["ti"].xcom_push(key="row_count", value=len(orders))
    context["ti"].xcom_push(key="staging_path", value="/tmp/orders_20240101.json")
    return len(orders)  # implicit push via return (key="return_value")

def transform(**context):
    # Pull: get value from extract task
    staging_path = context["ti"].xcom_pull(
        task_ids="extract",
        key="staging_path"
    )
    row_count = context["ti"].xcom_pull(
        task_ids="extract",
        key="return_value"  # implicit return
    )
    print(f"Processing {row_count} rows from {staging_path}")

# Jinja template access in operators:
BashOperator(
    task_id="notify",
    bash_command="echo 'Loaded {{ ti.xcom_pull(task_ids=\"load\") }} rows'"
)
'''

print('XCom pattern:')
print(xcom_pattern)

# Simulate XCom with a dict
xcom_store = {}

def push_xcom(task_id, key, value):
    xcom_store[(task_id, key)] = value

def pull_xcom(task_id, key):
    return xcom_store.get((task_id, key))

push_xcom('extract', 'row_count', 100)
push_xcom('extract', 'staging_path', '/tmp/orders.json')
push_xcom('transform', 'clean_count', 97)

print('Local XCom store simulation:')
for k, v in xcom_store.items():
    print(f'  [{k[0]}][{k[1]}] = {v}')

6. Error Handling: Retries, SLA, AlertsΒΆ

error_handling_pattern = '''
from airflow.utils.email import send_email

def on_failure_callback(context):
    """Called when a task fails after all retries."""
    dag_id  = context["dag"].dag_id
    task_id = context["task_instance"].task_id
    exc     = context.get("exception", "Unknown")
    
    # Option 1: send email
    send_email(
        to=["oncall@company.com"],
        subject=f"[AIRFLOW FAILURE] {dag_id}.{task_id}",
        html_content=f"Task {task_id} failed: {exc}",
    )
    
    # Option 2: post to Slack via requests
    import requests
    requests.post(SLACK_WEBHOOK, json={"text": f":red_circle: {dag_id}.{task_id} failed"})

def on_sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
    """Called when a task exceeds its SLA."""
    print(f"SLA missed for tasks: {task_list}")

default_args = {
    'retries':                   3,
    'retry_delay':               timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay':           timedelta(minutes=60),
    'email_on_failure':          True,
    'email_on_retry':            False,
    'on_failure_callback':       on_failure_callback,
}

with DAG(
    dag_id='robust_etl',
    default_args=default_args,
    sla_miss_callback=on_sla_miss_callback,
    ...
) as dag:
    
    critical_task = PythonOperator(
        task_id='critical_transform',
        python_callable=transform_orders,
        sla=timedelta(hours=2),   # alert if not done in 2h
    )
'''

print('Error handling patterns:')
print(error_handling_pattern)
# Demonstrate retry logic locally
import random as rand
rand.seed(99)

def flaky_api_call(attempt):
    """Simulates an unreliable API: fails 60% of the time."""
    if rand.random() < 0.6:
        raise ConnectionError(f'API timeout on attempt {attempt}')
    return {'data': 'success', 'attempt': attempt}

def run_with_retries(func, max_retries=3, base_delay=0.1):
    """Exponential backoff retry decorator (local Airflow equivalent)."""
    for attempt in range(1, max_retries + 1):
        try:
            result = func(attempt)
            print(f'  Attempt {attempt}: SUCCESS β†’ {result}')
            return result
        except Exception as e:
            delay = base_delay * (2 ** (attempt - 1))
            print(f'  Attempt {attempt}: FAILED ({e}) β€” retry in {delay:.1f}s')
            if attempt < max_retries:
                time.sleep(delay)
    raise RuntimeError(f'All {max_retries} attempts failed')

print('Retry with exponential backoff (simulated):')
try:
    result = run_with_retries(flaky_api_call, max_retries=5, base_delay=0.05)
except RuntimeError as e:
    print(f'Final failure: {e}')

7. Production PatternsΒΆ

IdempotencyΒΆ

A task is idempotent if running it multiple times produces the same result. This is critical because Airflow retries tasks on failure β€” non-idempotent tasks create duplicates.

Backfill and catchupΒΆ

  • catchup=True (default): Airflow runs all missed DAG runs since start_date. Useful for initial data load.

  • catchup=False: Only run the latest. Use for pipelines that don’t need history.

  • airflow dags backfill -s 2024-01-01 -e 2024-01-31 my_dag: Manual backfill.

# Idempotency patterns
idempotency_patterns = '''
# Pattern 1: DELETE then INSERT (for small tables)
def load_idempotent(conn, df, table, partition_col, partition_value):
    conn.execute(f"DELETE FROM {table} WHERE {partition_col} = ?", (partition_value,))
    df.to_sql(table, conn, if_exists='append', index=False)

# Pattern 2: MERGE / UPSERT (INSERT OR REPLACE in SQLite)
sql = """
INSERT OR REPLACE INTO orders_warehouse
    (order_id, customer_id, amount, status, order_date)
VALUES (?, ?, ?, ?, ?)
"""

# Pattern 3: Staging table swap (atomic, zero-downtime)
def atomic_swap(conn, staging_table, target_table):
    conn.execute(f"DROP TABLE IF EXISTS {target_table}_old")
    conn.execute(f"ALTER TABLE {target_table} RENAME TO {target_table}_old")
    conn.execute(f"ALTER TABLE {staging_table} RENAME TO {target_table}")
    conn.commit()
    # Rollback: rename _old back if needed

# Pattern 4: Write with execution_date partition
def load_partitioned(conn, df, execution_date):
    partition = execution_date.strftime('%Y-%m-%d')
    conn.execute("DELETE FROM fact_orders WHERE partition_date = ?", (partition,))
    df['partition_date'] = partition
    df.to_sql('fact_orders', conn, if_exists='append', index=False)
'''

print('Idempotency patterns:')
print(idempotency_patterns)
# Task Groups pattern (Airflow 2.0+)
task_groups_pattern = '''
from airflow.utils.task_group import TaskGroup

with DAG('grouped_etl', ...) as dag:

    with TaskGroup('ingestion') as ingest_group:
        extract_orders   = PythonOperator(task_id='extract_orders', ...)
        extract_customers = PythonOperator(task_id='extract_customers', ...)
        extract_products  = PythonOperator(task_id='extract_products', ...)

    with TaskGroup('transformation') as transform_group:
        clean_orders    = PythonOperator(task_id='clean_orders', ...)
        build_fact      = PythonOperator(task_id='build_fact_table', ...)
        clean_orders >> build_fact

    load = PythonOperator(task_id='load_to_warehouse', ...)

    # All ingestion tasks must complete before transformation
    ingest_group >> transform_group >> load
'''
print('Task Groups pattern (cleaner DAG visualization):')
print(task_groups_pattern)

8. Alternative: Prefect (Modern Airflow Alternative)ΒΆ

Prefect takes a different philosophy: write Python-first workflows, no XML/YAML, with a simpler mental model.

Feature

Airflow

Prefect

Workflow definition

DAG (Python)

@flow decorator

Task definition

Operator

@task decorator

Scheduling

Cron in DAG

Schedule + deployments

Dynamic workflows

Complex

Native (tasks return futures)

Local testing

Requires setup

flow() runs locally

Learning curve

High

Lower

Ecosystem

Very mature

Growing rapidly

try:
    from prefect import flow, task
    HAS_PREFECT = True
    print('Prefect available')
except ImportError:
    HAS_PREFECT = False
    print('Prefect not installed (pip install prefect)')

prefect_equivalent = '''
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta

@task(retries=3, retry_delay_seconds=60,
      cache_key_fn=task_input_hash,
      cache_expiration=timedelta(hours=1))
def extract_orders(date: str) -> list:
    return fetch_from_api(date)

@task
def transform_orders(orders: list) -> pd.DataFrame:
    return clean_and_enrich(orders)

@task
def load_to_warehouse(df: pd.DataFrame) -> int:
    return write_to_db(df)

@flow(name='ecommerce-etl')
def etl_pipeline(date: str = None):
    # Tasks are called like regular Python functions
    raw     = extract_orders(date)
    clean   = transform_orders(raw)
    n_rows  = load_to_warehouse(clean)
    return n_rows

# Run locally:
etl_pipeline(date="2024-01-01")

# Deploy with schedule:
# prefect deployment build etl.py:etl_pipeline -n daily --cron "0 6 * * *"
'''

print('Prefect equivalent of the Airflow ETL DAG:')
print(prefect_equivalent)

Cheat SheetΒΆ

# DAG skeleton
with DAG(
    dag_id='my_pipeline',
    schedule_interval='0 6 * * *',   # cron or '@daily', '@hourly'
    start_date=datetime(2024,1,1),
    catchup=False,                    # ALWAYS set False unless backfill needed
    default_args={'retries': 3, 'retry_delay': timedelta(minutes=5)},
) as dag: ...

# Dependencies
a >> b >> c          # a β†’ b β†’ c
[a, b] >> c          # a,b β†’ c (fan-in)
a >> [b, c]          # a β†’ b,c (fan-out)
a.set_downstream(b)  # same as a >> b

# XCom
ti.xcom_push(key='path', value='/tmp/file.csv')
ti.xcom_pull(task_ids='extract', key='path')

# Idempotency: DELETE-then-INSERT or INSERT OR REPLACE
# Sensors: use mode='reschedule' to avoid holding worker slot
# Production: catchup=False, set retries + retry_delay, on_failure_callback

# CLI commands
airflow dags list
airflow dags trigger my_dag
airflow dags backfill -s 2024-01-01 -e 2024-01-31 my_dag
airflow tasks test my_dag task_id 2024-01-01

9. ExercisesΒΆ

  1. ETL pipeline: Extend the local ETL pipeline to add a fourth task: generate_report that reads from the warehouse, computes revenue by region, and writes a JSON summary file. Chain all four tasks and run them sequentially.

  2. Idempotency test: Modify the load_to_warehouse function to use INSERT OR REPLACE (upsert). Run it three times on the same data. Verify the row count stays constant.

  3. Retry simulation: Write a run_with_retries wrapper that uses exponential backoff with jitter (random additional delay 0-1s). Test it with a function that fails the first 2 attempts and succeeds on the 3rd.

  4. Dynamic pipeline: Given a list of 5 table configs ({'table': name, 'source_file': path}), write a function that generates and runs an ETL pipeline for each table. This mirrors how dynamic DAGs work in production.

  5. Prefect comparison: If you have Prefect installed (pip install prefect), rewrite the ETL pipeline using @flow and @task decorators. Compare the code length and complexity to the Airflow version.