dbt: Data Modeling for Analytics EngineeringΒΆ

dbt (data build tool) transforms raw data in your warehouse into clean, tested, documented models using SQL. It brings software engineering practices (version control, testing, documentation) to analytics. This notebook covers dbt’s core concepts with runnable Python simulations.

1. SetupΒΆ

Import standard libraries and attempt to import dbt-core. A HAS_DBT flag controls whether dbt-specific features are available.

import pandas as pd
import numpy as np
import textwrap
from pathlib import Path
import re
import datetime

# Try importing dbt-core
try:
    import dbt
    import dbt.version
    HAS_DBT = True
    print(f"dbt-core version: {dbt.version.__version__}")
except ImportError:
    HAS_DBT = False
    print("dbt-core not installed. Running in simulation mode.")
    print("Install with: pip install dbt-core dbt-duckdb")

print(f"\nPandas  : {pd.__version__}")
print(f"NumPy   : {np.__version__}")
print(f"HAS_DBT : {HAS_DBT}")
print(f"Date    : {datetime.date.today()}")

2. dbt Project StructureΒΆ

A dbt project is a directory of SQL files, YAML configs, and Jinja2 macros. The layout below is the conventional structure recommended by dbt Labs:

  • models/ β€” SQL files that define transformations (staging, intermediate, marts)

  • tests/ β€” SQL assertions and generic YAML-defined tests

  • macros/ β€” Reusable Jinja2 SQL functions

  • seeds/ β€” Static CSV files loaded into the warehouse as tables

  • analyses/ β€” Ad-hoc SQL that is compiled but not materialized

  • dbt_project.yml β€” Project-level configuration (name, version, materializations)

  • profiles.yml β€” Connection credentials (usually in ~/.dbt/profiles.yml)

project_structure = """
my_dbt_project/
β”œβ”€β”€ dbt_project.yml          # Project config
β”œβ”€β”€ profiles.yml             # Connection profiles
β”œβ”€β”€ models/
β”‚   β”œβ”€β”€ staging/             # Clean raw data (1:1 with source tables)
β”‚   β”‚   β”œβ”€β”€ stg_orders.sql
β”‚   β”‚   └── stg_customers.sql
β”‚   β”œβ”€β”€ intermediate/        # Business logic, joins
β”‚   β”‚   └── int_order_items.sql
β”‚   └── marts/               # Final analytics-ready tables
β”‚       β”œβ”€β”€ fct_orders.sql   # Facts
β”‚       └── dim_customers.sql # Dimensions
β”œβ”€β”€ tests/
β”‚   β”œβ”€β”€ generic/             # Reusable test macros
β”‚   └── singular/            # One-off SQL assertions
β”œβ”€β”€ macros/                  # Reusable Jinja2 SQL functions
β”œβ”€β”€ seeds/                   # Static CSV files loaded to warehouse
└── analyses/                # Ad-hoc SQL (not materialized)
"""

print(project_structure)

dbt_project.yml exampleΒΆ

This file declares the project name, dbt version constraints, and default materializations per folder:

dbt_project_yml = """
# dbt_project.yml
name: my_dbt_project
version: '1.0.0'
config-version: 2

profile: my_dbt_project   # matches a key in profiles.yml

model-paths: ["models"]
seed-paths:  ["seeds"]
test-paths:  ["tests"]
macro-paths: ["macros"]

models:
  my_dbt_project:
    staging:
      +materialized: view       # staging models are views
      +schema: staging
    intermediate:
      +materialized: ephemeral  # not persisted, inlined as CTE
    marts:
      +materialized: table      # marts are physical tables
      +schema: analytics
"""

print(dbt_project_yml)

3. The Three Model TypesΒΆ

dbt projects are conventionally organized into three layers:

Layer

Purpose

Naming

Materialization

Staging

1:1 with source tables; type casting, renaming, light cleaning

stg_<source>__<table>

view

Intermediate

Business logic, joins, aggregations not yet ready for end users

int_<entity>_<verb>

ephemeral / view

Marts

Final analytics-ready fact and dimension tables

fct_<entity>, dim_<entity>

table / incremental

The key principle: each layer only references the layer directly below it using ref().

# --- Staging model: stg_orders.sql ---
stg_orders_sql = """
-- models/staging/stg_orders.sql
-- One row per raw order; rename columns, cast types, no business logic.

with source as (
    select * from {{ source('raw', 'orders') }}
),

renamed as (
    select
        id                                          as order_id,
        customer_id,
        cast(order_date as date)                    as order_date,
        cast(amount_cents as numeric) / 100.0       as order_amount,
        lower(trim(status))                         as status,
        cast(created_at as timestamp)               as created_at,
        current_timestamp                           as loaded_at
    from source
)

select * from renamed
"""

print("=== stg_orders.sql ===")
print(stg_orders_sql)

# --- Intermediate model: int_order_items.sql ---
int_order_items_sql = """
-- models/intermediate/int_order_items.sql
-- Join orders with order items to create a denormalized dataset.

with orders as (
    select * from {{ ref('stg_orders') }}
),

order_items as (
    select * from {{ ref('stg_order_items') }}
),

joined as (
    select
        o.order_id,
        o.customer_id,
        o.order_date,
        o.status,
        oi.product_id,
        oi.quantity,
        oi.unit_price,
        oi.quantity * oi.unit_price  as line_total
    from orders o
    inner join order_items oi
        on o.order_id = oi.order_id
)

select * from joined
"""

print("=== int_order_items.sql ===")
print(int_order_items_sql)

# --- Mart model: fct_orders.sql ---
fct_orders_sql = """
-- models/marts/fct_orders.sql
-- Fact table: one row per order with pre-aggregated metrics.

with order_items as (
    select * from {{ ref('int_order_items') }}
),

customers as (
    select * from {{ ref('dim_customers') }}
),

order_totals as (
    select
        order_id,
        customer_id,
        order_date,
        status,
        count(product_id)         as num_items,
        sum(line_total)           as order_total,
        min(unit_price)           as min_item_price,
        max(unit_price)           as max_item_price
    from order_items
    group by 1, 2, 3, 4
)

select
    ot.*,
    c.customer_name,
    c.customer_segment
from order_totals ot
left join customers c
    on ot.customer_id = c.customer_id
"""

print("=== fct_orders.sql ===")
print(fct_orders_sql)

4. Materializations, ref(), and source()ΒΆ

ref() and source()ΒΆ

  • {{ ref('model_name') }} β€” references another dbt model. dbt resolves this to the correct schema/database and builds the DAG automatically.

  • {{ source('source_name', 'table_name') }} β€” references a raw source table declared in a sources.yml file. Enables freshness checks and lineage tracking back to raw data.

The Four MaterializationsΒΆ

Materialization

Description

Use Case

view

SQL view, no data stored

Staging models, lightweight transforms

table

Drops and recreates the full table on each dbt run

Marts that are small enough to fully refresh

incremental

Only appends/updates new rows since last run

Large fact tables (events, transactions)

ephemeral

Compiled as a CTE, never persisted

Intermediate logic used only once

Materializations are set in dbt_project.yml (per folder) or per-model using a config block.

incremental_model_sql = """
-- models/marts/fct_events.sql
-- Incremental model: append only new events on each run.

{{ config(
    materialized = 'incremental',
    unique_key   = 'event_id',
    on_schema_change = 'sync_all_columns'
) }}

with source as (
    select * from {{ source('raw', 'events') }}

    -- On incremental runs, only process rows newer than the last load.
    -- is_incremental() returns True only when the target table already exists.
    {% if is_incremental() %}
        where event_timestamp > (
            select max(event_timestamp) from {{ this }}
        )
    {% endif %}
),

transformed as (
    select
        event_id,
        user_id,
        event_type,
        cast(event_timestamp as timestamp) as event_timestamp,
        properties,
        current_timestamp                  as dbt_loaded_at
    from source
)

select * from transformed
"""

print("=== Incremental Model: fct_events.sql ===")
print(incremental_model_sql)

print("\nKey points about incremental models:")
print("  - is_incremental() is False on the first run (full load)")
print("  - is_incremental() is True on subsequent runs (append/merge only new rows)")
print("  - unique_key triggers a MERGE instead of INSERT (handles late-arriving data)")
print("  - Use --full-refresh flag to force a full reload")

5. dbt TestsΒΆ

dbt has two kinds of tests:

  1. Generic tests β€” Declared in YAML (schema.yml). Built-in: not_null, unique, accepted_values, relationships. Custom generics via macros.

  2. Singular tests β€” One-off SQL files in tests/. The test passes if the query returns zero rows.

Run with dbt test. Results show pass/fail per column per model.

schema_yml = """
# models/marts/schema.yml
version: 2

models:
  - name: fct_orders
    description: "One row per order with pre-aggregated line item metrics."
    columns:
      - name: order_id
        description: "Primary key β€” unique identifier for each order."
        tests:
          - not_null
          - unique

      - name: customer_id
        description: "Foreign key to dim_customers."
        tests:
          - not_null
          - relationships:
              to: ref('dim_customers')
              field: customer_id

      - name: status
        description: "Order lifecycle status."
        tests:
          - accepted_values:
              values: ['pending', 'completed', 'cancelled', 'refunded']

      - name: order_total
        description: "Sum of all line item totals for the order."
        tests:
          - not_null

  - name: dim_customers
    description: "One row per customer with current attributes."
    columns:
      - name: customer_id
        tests:
          - not_null
          - unique
      - name: customer_segment
        tests:
          - accepted_values:
              values: ['consumer', 'corporate', 'home_office']
"""

print("=== schema.yml (Generic Tests) ===")
print(schema_yml)

singular_test_sql = """
-- tests/singular/assert_order_total_positive.sql
-- Test passes when this query returns ZERO rows.
-- Fail condition: any order with a negative total.

select
    order_id,
    order_total
from {{ ref('fct_orders') }}
where order_total < 0
"""

print("=== Singular Test: assert_order_total_positive.sql ===")
print(singular_test_sql)
# Simulate generic dbt tests in Python using pandas

def run_generic_tests(df: pd.DataFrame, column: str, test_type: str, **kwargs) -> dict:
    """
    Simulate dbt generic tests on a pandas DataFrame column.

    Parameters
    ----------
    df        : DataFrame containing the data to test
    column    : Column name to test
    test_type : One of 'not_null', 'unique', 'accepted_values'
    **kwargs  : Extra args (e.g., values=[...] for accepted_values)

    Returns
    -------
    dict with keys: test, column, passed, failures, message
    """
    series = df[column]

    if test_type == "not_null":
        null_count = series.isna().sum()
        passed = null_count == 0
        return {
            "test": "not_null",
            "column": column,
            "passed": passed,
            "failures": int(null_count),
            "message": "PASS" if passed else f"FAIL β€” {null_count} null value(s) found"
        }

    elif test_type == "unique":
        dup_count = series.duplicated().sum()
        passed = dup_count == 0
        return {
            "test": "unique",
            "column": column,
            "passed": passed,
            "failures": int(dup_count),
            "message": "PASS" if passed else f"FAIL β€” {dup_count} duplicate value(s) found"
        }

    elif test_type == "accepted_values":
        accepted = set(kwargs.get("values", []))
        invalid = series[~series.isin(accepted) & series.notna()]
        passed = len(invalid) == 0
        return {
            "test": "accepted_values",
            "column": column,
            "passed": passed,
            "failures": len(invalid),
            "message": "PASS" if passed else f"FAIL β€” unexpected values: {invalid.unique().tolist()}"
        }

    else:
        raise ValueError(f"Unknown test type: {test_type}")


# Create a sample fct_orders DataFrame
np.random.seed(42)
n = 10

fct_orders = pd.DataFrame({
    "order_id":    [f"ORD-{i:04d}" for i in range(1, n + 1)],
    "customer_id": [f"CUST-{np.random.randint(1,5):03d}" for _ in range(n)],
    "order_date":  pd.date_range("2024-01-01", periods=n, freq="D"),
    "status":      np.random.choice(["pending", "completed", "cancelled"], n),
    "order_total": np.round(np.random.uniform(20, 500, n), 2),
})

# Introduce intentional failures for demonstration
fct_orders_with_issues = fct_orders.copy()
fct_orders_with_issues.loc[0, "order_total"] = None          # null
fct_orders_with_issues.loc[1, "order_id"] = "ORD-0002"       # duplicate
fct_orders_with_issues.loc[2, "status"] = "shipped"          # invalid value

print("=== Running dbt-style generic tests ===")
print()

tests_to_run = [
    ("order_id",    "not_null",       {}),
    ("order_id",    "unique",         {}),
    ("order_total", "not_null",       {}),
    ("status",      "accepted_values", {"values": ["pending", "completed", "cancelled"]}),
]

print("--- Clean DataFrame ---")
for col, test, kw in tests_to_run:
    result = run_generic_tests(fct_orders, col, test, **kw)
    status = "βœ“" if result["passed"] else "βœ—"
    print(f"  [{status}] {test}({col}): {result['message']}")

print()
print("--- DataFrame with intentional issues ---")
for col, test, kw in tests_to_run:
    result = run_generic_tests(fct_orders_with_issues, col, test, **kw)
    status = "βœ“" if result["passed"] else "βœ—"
    print(f"  [{status}] {test}({col}): {result['message']}")

6. Jinja2 MacrosΒΆ

Macros are reusable SQL functions written in Jinja2. They live in the macros/ directory and are called with {{ macro_name(args) }} in any model.

Common use cases:

  • Converting cents to dollars

  • Generating surrogate keys

  • Abstracting cross-database SQL differences

  • Dynamically generating SQL (e.g., unpivoting columns)

macro_cents_to_dollars = """
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, scale=2) %}
    round(cast({{ column_name }} as numeric) / 100.0, {{ scale }})
{% endmacro %}
"""

macro_surrogate_key = """
-- macros/generate_surrogate_key.sql
-- Creates a deterministic MD5 surrogate key from one or more columns.
{% macro generate_surrogate_key(field_list) %}
    {% set fields = [] %}
    {% for field in field_list %}
        {% set _ = fields.append(
            "coalesce(cast(" ~ field ~ " as varchar), '_dbt_utils_surrogate_key_null_')"
        ) %}
    {% endfor %}
    md5({{ fields | join(" || '-' || ") }})
{% endmacro %}
"""

macro_safe_divide = """
-- macros/safe_divide.sql
-- Returns null instead of raising a divide-by-zero error.
{% macro safe_divide(numerator, denominator) %}
    case
        when {{ denominator }} = 0 or {{ denominator }} is null
            then null
        else {{ numerator }} / {{ denominator }}
    end
{% endmacro %}
"""

print("=== Macro: cents_to_dollars ===")
print(macro_cents_to_dollars)

print("=== Macro: generate_surrogate_key ===")
print(macro_surrogate_key)

print("=== Macro: safe_divide ===")
print(macro_safe_divide)
import hashlib

# Simulate macro execution in Python

def cents_to_dollars(value, scale=2):
    """Python equivalent of the cents_to_dollars dbt macro."""
    if value is None:
        return None
    return round(value / 100.0, scale)


def generate_surrogate_key(fields: list):
    """
    Python equivalent of generate_surrogate_key macro.
    Concatenates field values and returns an MD5 hash.
    """
    null_sentinel = "_dbt_utils_surrogate_key_null_"
    parts = [str(f) if f is not None else null_sentinel for f in fields]
    combined = "-".join(parts)
    return hashlib.md5(combined.encode()).hexdigest()


def safe_divide(numerator, denominator):
    """Python equivalent of the safe_divide dbt macro."""
    if denominator is None or denominator == 0:
        return None
    return numerator / denominator


# Demo
print("=== cents_to_dollars simulation ===")
for cents in [1099, 5000, None, 0]:
    print(f"  {cents} cents -> ${cents_to_dollars(cents)}")

print()
print("=== generate_surrogate_key simulation ===")
rows = [
    ["ORD-001", "2024-01-15"],
    ["ORD-002", "2024-01-16"],
    ["ORD-001", None],          # null field
]
for row in rows:
    sk = generate_surrogate_key(row)
    print(f"  {row} -> {sk}")

print()
print("=== safe_divide simulation ===")
pairs = [(100, 4), (100, 0), (50, None), (0, 5)]
for num, den in pairs:
    print(f"  {num} / {den} -> {safe_divide(num, den)}")

7. Incremental Models β€” Pandas SimulationΒΆ

Incremental models only process new or changed rows since the last dbt run. This is critical for large tables (billions of rows) where a full refresh would be too expensive.

The pattern:

  1. On first run (is_incremental() = False): load everything.

  2. On subsequent runs (is_incremental() = True): load only rows newer than max(event_timestamp) in the existing table.

  3. Use unique_key to MERGE (upsert) instead of INSERT to handle late-arriving or updated records.

def simulate_incremental_load(
    source_df: pd.DataFrame,
    existing_df: pd.DataFrame | None,
    timestamp_col: str,
    unique_key: str
) -> pd.DataFrame:
    """
    Simulate dbt incremental materialization.

    Parameters
    ----------
    source_df     : Full source table (raw data)
    existing_df   : Current state of the target table (None = first run)
    timestamp_col : Column used to identify new rows
    unique_key    : Primary key for upsert (MERGE) behaviour

    Returns
    -------
    Updated target table DataFrame.
    """
    is_incremental = existing_df is not None and len(existing_df) > 0

    if not is_incremental:
        print("[is_incremental() = False] Full load β€” processing ALL rows.")
        new_rows = source_df.copy()
    else:
        max_ts = existing_df[timestamp_col].max()
        print(f"[is_incremental() = True] Incremental load β€” rows after {max_ts}")
        new_rows = source_df[source_df[timestamp_col] > max_ts].copy()
        print(f"  Rows in source      : {len(source_df):,}")
        print(f"  New rows to process : {len(new_rows):,}")

    new_rows["dbt_loaded_at"] = datetime.datetime.now()

    if not is_incremental:
        return new_rows

    # Upsert: replace existing rows with same unique_key, append the rest
    existing_filtered = existing_df[~existing_df[unique_key].isin(new_rows[unique_key])]
    result = pd.concat([existing_filtered, new_rows], ignore_index=True)
    result = result.sort_values(timestamp_col).reset_index(drop=True)
    return result


# Build a fake raw events source
base_ts = datetime.datetime(2024, 1, 1)
raw_events = pd.DataFrame({
    "event_id":        [f"EVT-{i:05d}" for i in range(1, 21)],
    "user_id":         np.random.randint(100, 110, 20),
    "event_type":      np.random.choice(["page_view", "click", "purchase"], 20),
    "event_timestamp": [base_ts + datetime.timedelta(hours=i * 6) for i in range(20)],
})

print("\n" + "=" * 55)
print("RUN 1: First dbt run (full load)")
print("=" * 55)
target_table = simulate_incremental_load(
    source_df=raw_events.iloc[:10],  # only first 10 rows exist
    existing_df=None,
    timestamp_col="event_timestamp",
    unique_key="event_id"
)
print(f"  Target table rows after run 1: {len(target_table)}")
print(f"  Max timestamp: {target_table['event_timestamp'].max()}")

print()
print("=" * 55)
print("RUN 2: Incremental run (10 new events arrived)")
print("=" * 55)
target_table = simulate_incremental_load(
    source_df=raw_events,            # all 20 rows now in source
    existing_df=target_table,
    timestamp_col="event_timestamp",
    unique_key="event_id"
)
print(f"  Target table rows after run 2: {len(target_table)}")
print(f"  Max timestamp: {target_table['event_timestamp'].max()}")

print()
print("Final target table (last 5 rows):")
print(target_table.tail(5).to_string(index=False))

8. DAG and LineageΒΆ

dbt automatically builds a Directed Acyclic Graph (DAG) from ref() and source() calls. This enables:

  • Parallel execution of independent models

  • Selective runs (dbt run --select +fct_orders runs fct_orders and all upstream models)

  • Documentation and lineage visualization (dbt docs serve)

The DAG flows from raw sources through staging β†’ intermediate β†’ marts.

try:
    import networkx as nx
    import matplotlib
    matplotlib.use("Agg")          # non-interactive backend
    import matplotlib.pyplot as plt

    # Build the DAG
    G = nx.DiGraph()

    nodes = [
        ("raw.orders",       {"layer": "source"}),
        ("raw.customers",    {"layer": "source"}),
        ("raw.order_items",  {"layer": "source"}),
        ("stg_orders",       {"layer": "staging"}),
        ("stg_customers",    {"layer": "staging"}),
        ("stg_order_items",  {"layer": "staging"}),
        ("int_order_items",  {"layer": "intermediate"}),
        ("dim_customers",    {"layer": "mart"}),
        ("fct_orders",       {"layer": "mart"}),
    ]

    edges = [
        ("raw.orders",      "stg_orders"),
        ("raw.customers",   "stg_customers"),
        ("raw.order_items", "stg_order_items"),
        ("stg_orders",      "int_order_items"),
        ("stg_order_items", "int_order_items"),
        ("stg_customers",   "dim_customers"),
        ("int_order_items", "fct_orders"),
        ("dim_customers",   "fct_orders"),
    ]

    G.add_nodes_from(nodes)
    G.add_edges_from(edges)

    layer_colors = {
        "source":       "#f4a261",
        "staging":      "#2a9d8f",
        "intermediate": "#e9c46a",
        "mart":         "#264653",
    }

    pos = {
        "raw.orders":      (0, 2),
        "raw.customers":   (0, 1),
        "raw.order_items": (0, 0),
        "stg_orders":      (2, 2),
        "stg_customers":   (2, 1),
        "stg_order_items": (2, 0),
        "int_order_items": (4, 1),
        "dim_customers":   (4, 2),
        "fct_orders":      (6, 1.5),
    }

    node_colors = [layer_colors[G.nodes[n]["layer"]] for n in G.nodes]
    node_font_colors = ["white" if G.nodes[n]["layer"] == "mart" else "black" for n in G.nodes]

    fig, ax = plt.subplots(figsize=(14, 5))
    nx.draw_networkx(
        G, pos=pos, ax=ax,
        node_color=node_colors,
        node_size=2200,
        font_size=8,
        font_color="white",
        arrows=True,
        arrowstyle="->",
        arrowsize=20,
        edge_color="#555",
        width=1.5,
    )

    # Legend
    from matplotlib.patches import Patch
    legend_elements = [Patch(facecolor=c, label=l) for l, c in layer_colors.items()]
    ax.legend(handles=legend_elements, loc="lower right", fontsize=9)
    ax.set_title("dbt Project DAG", fontsize=14, pad=15)
    ax.axis("off")
    plt.tight_layout()
    plt.savefig("/tmp/dbt_dag.png", dpi=120, bbox_inches="tight")
    plt.show()
    print("DAG saved to /tmp/dbt_dag.png")

except ImportError:
    print("networkx/matplotlib not available β€” falling back to ASCII DAG.\n")

    ascii_dag = """
  SOURCE LAYER         STAGING LAYER      INTERMEDIATE       MART LAYER
  ─────────────        ─────────────      ────────────       ──────────

  raw.orders      -->  stg_orders     ─┐
                                       β”œβ”€β”€>  int_order_items ──┐
  raw.order_items -->  stg_order_items β”˜                        β”œβ”€β”€> fct_orders
                                                                β”‚
  raw.customers   -->  stg_customers  ──>   dim_customers    β”€β”€β”€β”˜

  Legend:
    raw.*           = Source tables (declared in sources.yml)
    stg_*           = Staging models (view)
    int_*           = Intermediate models (ephemeral)
    fct_*/dim_*     = Mart models (table)
    """
    print(ascii_dag)
# Always show ASCII version for documentation purposes
ascii_dag = """
  SOURCE LAYER         STAGING LAYER        INTERMEDIATE         MART LAYER
  ─────────────        ─────────────        ────────────         ──────────

  raw.orders      -->  stg_orders      ──┐
                                         β”œβ”€β”€>  int_order_items ──┐
  raw.order_items -->  stg_order_items β”€β”€β”˜                        β”œβ”€β”€>  fct_orders
                                                                  β”‚
  raw.customers   -->  stg_customers   ────>  dim_customers   β”€β”€β”€β”€β”˜

  Execution order (dbt resolves automatically from ref() calls):
    1. stg_orders, stg_customers, stg_order_items  (parallel)
    2. int_order_items, dim_customers              (parallel, after staging)
    3. fct_orders                                  (after both intermediate and dim)
"""

print(ascii_dag)

9. dbt Commands Cheat SheetΒΆ

dbt run                          # Run all models
dbt run --select staging.*       # Run staging models only
dbt run --select +fct_orders     # fct_orders and all upstream
dbt test                         # Run all tests
dbt test --select stg_orders     # Test specific model
dbt docs generate                # Generate documentation
dbt docs serve                   # Open docs in browser (localhost:8080)
dbt seed                         # Load CSV seeds to warehouse
dbt snapshot                     # Capture slowly changing dimensions
dbt compile                      # Compile Jinja β†’ SQL without running
dbt debug                        # Check connection and config
dbt deps                         # Install dbt packages from packages.yml

Node Selection SyntaxΒΆ

Selector

Meaning

stg_orders

Only the stg_orders model

staging.*

All models in the staging folder

+fct_orders

fct_orders and all upstream models

fct_orders+

fct_orders and all downstream models

+fct_orders+

fct_orders plus full ancestry and descendants

tag:nightly

All models tagged nightly

source:raw.orders

The raw.orders source node

Useful FlagsΒΆ

dbt run --full-refresh            # Force incremental models to do a full reload
dbt run --target prod             # Use the prod profile
dbt run --threads 8               # Parallelism level
dbt run --vars '{run_date: 2024-01-15}'  # Pass runtime variables

10. ExercisesΒΆ

Work through these exercises to practice dbt concepts covered in this notebook.

Exercise 1 β€” Staging Model with Type Casting

Write a staging model for a raw_events table that:

  • Casts event_id to varchar, event_timestamp to timestamp, and revenue to numeric

  • Renames all columns to snake_case (userId β†’ user_id, eventType β†’ event_type)

  • Adds a loaded_at column set to current_timestamp

  • Filters out test events where event_type = 'test'

Exercise 2 β€” Schema Tests for fct_orders

Add a complete schema.yml for fct_orders with the following tests:

  • order_id is not_null and unique

  • status is in ['pending', 'completed', 'cancelled']

  • customer_id references dim_customers.customer_id (relationships test)

  • order_total is not_null

Exercise 3 β€” Incremental Model with Date Filter

Write an incremental model that loads only orders created in the last 3 days, partitioned by order_date:

  • Use is_incremental() to filter on the last 3 days relative to current_date

  • Set unique_key = 'order_id' so late-arriving updates are merged

  • Add a dbt_updated_at audit column

Exercise 4 β€” safe_divide Macro

Create a macro safe_divide(numerator, denominator) that:

  • Returns null instead of raising a divide-by-zero error

  • Handles both 0 and null denominators

  • Use it in a model to compute avg_order_value = safe_divide(total_revenue, total_orders)

Exercise 5 β€” SCD Type 2 with dbt Snapshots

Simulate a Slowly Changing Dimension (SCD Type 2) for customers. dbt snapshots automatically track row history by adding dbt_valid_from and dbt_valid_to columns.

-- snapshots/snap_customers.sql
{% snapshot snap_customers %}
{{ config(
    target_schema = 'snapshots',
    unique_key    = 'customer_id',
    strategy      = 'check',
    check_cols    = ['email', 'customer_segment', 'address']
) }}
select * from {{ source('raw', 'customers') }}
{% endsnapshot %}

After running dbt snapshot, the snapshot table will contain:

customer_id

email

segment

dbt_valid_from

dbt_valid_to

1

old@email.com

consumer

2024-01-01

2024-06-01

1

new@email.com

corporate

2024-06-01

null

A null dbt_valid_to indicates the current record. Use this pattern to answer β€œwhat was the customer’s segment at the time of the order?” by joining on order_date between dbt_valid_from and coalesce(dbt_valid_to, current_date).