dbt: Data Modeling for Analytics EngineeringΒΆ
dbt (data build tool) transforms raw data in your warehouse into clean, tested, documented models using SQL. It brings software engineering practices (version control, testing, documentation) to analytics. This notebook covers dbtβs core concepts with runnable Python simulations.
1. SetupΒΆ
Import standard libraries and attempt to import dbt-core. A HAS_DBT flag controls whether dbt-specific features are available.
import pandas as pd
import numpy as np
import textwrap
from pathlib import Path
import re
import datetime
# Try importing dbt-core
try:
import dbt
import dbt.version
HAS_DBT = True
print(f"dbt-core version: {dbt.version.__version__}")
except ImportError:
HAS_DBT = False
print("dbt-core not installed. Running in simulation mode.")
print("Install with: pip install dbt-core dbt-duckdb")
print(f"\nPandas : {pd.__version__}")
print(f"NumPy : {np.__version__}")
print(f"HAS_DBT : {HAS_DBT}")
print(f"Date : {datetime.date.today()}")
2. dbt Project StructureΒΆ
A dbt project is a directory of SQL files, YAML configs, and Jinja2 macros. The layout below is the conventional structure recommended by dbt Labs:
models/ β SQL files that define transformations (staging, intermediate, marts)
tests/ β SQL assertions and generic YAML-defined tests
macros/ β Reusable Jinja2 SQL functions
seeds/ β Static CSV files loaded into the warehouse as tables
analyses/ β Ad-hoc SQL that is compiled but not materialized
dbt_project.yml β Project-level configuration (name, version, materializations)
profiles.yml β Connection credentials (usually in
~/.dbt/profiles.yml)
project_structure = """
my_dbt_project/
βββ dbt_project.yml # Project config
βββ profiles.yml # Connection profiles
βββ models/
β βββ staging/ # Clean raw data (1:1 with source tables)
β β βββ stg_orders.sql
β β βββ stg_customers.sql
β βββ intermediate/ # Business logic, joins
β β βββ int_order_items.sql
β βββ marts/ # Final analytics-ready tables
β βββ fct_orders.sql # Facts
β βββ dim_customers.sql # Dimensions
βββ tests/
β βββ generic/ # Reusable test macros
β βββ singular/ # One-off SQL assertions
βββ macros/ # Reusable Jinja2 SQL functions
βββ seeds/ # Static CSV files loaded to warehouse
βββ analyses/ # Ad-hoc SQL (not materialized)
"""
print(project_structure)
dbt_project.yml exampleΒΆ
This file declares the project name, dbt version constraints, and default materializations per folder:
dbt_project_yml = """
# dbt_project.yml
name: my_dbt_project
version: '1.0.0'
config-version: 2
profile: my_dbt_project # matches a key in profiles.yml
model-paths: ["models"]
seed-paths: ["seeds"]
test-paths: ["tests"]
macro-paths: ["macros"]
models:
my_dbt_project:
staging:
+materialized: view # staging models are views
+schema: staging
intermediate:
+materialized: ephemeral # not persisted, inlined as CTE
marts:
+materialized: table # marts are physical tables
+schema: analytics
"""
print(dbt_project_yml)
3. The Three Model TypesΒΆ
dbt projects are conventionally organized into three layers:
Layer |
Purpose |
Naming |
Materialization |
|---|---|---|---|
Staging |
1:1 with source tables; type casting, renaming, light cleaning |
|
view |
Intermediate |
Business logic, joins, aggregations not yet ready for end users |
|
ephemeral / view |
Marts |
Final analytics-ready fact and dimension tables |
|
table / incremental |
The key principle: each layer only references the layer directly below it using ref().
# --- Staging model: stg_orders.sql ---
stg_orders_sql = """
-- models/staging/stg_orders.sql
-- One row per raw order; rename columns, cast types, no business logic.
with source as (
select * from {{ source('raw', 'orders') }}
),
renamed as (
select
id as order_id,
customer_id,
cast(order_date as date) as order_date,
cast(amount_cents as numeric) / 100.0 as order_amount,
lower(trim(status)) as status,
cast(created_at as timestamp) as created_at,
current_timestamp as loaded_at
from source
)
select * from renamed
"""
print("=== stg_orders.sql ===")
print(stg_orders_sql)
# --- Intermediate model: int_order_items.sql ---
int_order_items_sql = """
-- models/intermediate/int_order_items.sql
-- Join orders with order items to create a denormalized dataset.
with orders as (
select * from {{ ref('stg_orders') }}
),
order_items as (
select * from {{ ref('stg_order_items') }}
),
joined as (
select
o.order_id,
o.customer_id,
o.order_date,
o.status,
oi.product_id,
oi.quantity,
oi.unit_price,
oi.quantity * oi.unit_price as line_total
from orders o
inner join order_items oi
on o.order_id = oi.order_id
)
select * from joined
"""
print("=== int_order_items.sql ===")
print(int_order_items_sql)
# --- Mart model: fct_orders.sql ---
fct_orders_sql = """
-- models/marts/fct_orders.sql
-- Fact table: one row per order with pre-aggregated metrics.
with order_items as (
select * from {{ ref('int_order_items') }}
),
customers as (
select * from {{ ref('dim_customers') }}
),
order_totals as (
select
order_id,
customer_id,
order_date,
status,
count(product_id) as num_items,
sum(line_total) as order_total,
min(unit_price) as min_item_price,
max(unit_price) as max_item_price
from order_items
group by 1, 2, 3, 4
)
select
ot.*,
c.customer_name,
c.customer_segment
from order_totals ot
left join customers c
on ot.customer_id = c.customer_id
"""
print("=== fct_orders.sql ===")
print(fct_orders_sql)
4. Materializations, ref(), and source()ΒΆ
ref() and source()ΒΆ
{{ ref('model_name') }}β references another dbt model. dbt resolves this to the correct schema/database and builds the DAG automatically.{{ source('source_name', 'table_name') }}β references a raw source table declared in asources.ymlfile. Enables freshness checks and lineage tracking back to raw data.
The Four MaterializationsΒΆ
Materialization |
Description |
Use Case |
|---|---|---|
|
SQL view, no data stored |
Staging models, lightweight transforms |
|
Drops and recreates the full table on each |
Marts that are small enough to fully refresh |
|
Only appends/updates new rows since last run |
Large fact tables (events, transactions) |
|
Compiled as a CTE, never persisted |
Intermediate logic used only once |
Materializations are set in dbt_project.yml (per folder) or per-model using a config block.
incremental_model_sql = """
-- models/marts/fct_events.sql
-- Incremental model: append only new events on each run.
{{ config(
materialized = 'incremental',
unique_key = 'event_id',
on_schema_change = 'sync_all_columns'
) }}
with source as (
select * from {{ source('raw', 'events') }}
-- On incremental runs, only process rows newer than the last load.
-- is_incremental() returns True only when the target table already exists.
{% if is_incremental() %}
where event_timestamp > (
select max(event_timestamp) from {{ this }}
)
{% endif %}
),
transformed as (
select
event_id,
user_id,
event_type,
cast(event_timestamp as timestamp) as event_timestamp,
properties,
current_timestamp as dbt_loaded_at
from source
)
select * from transformed
"""
print("=== Incremental Model: fct_events.sql ===")
print(incremental_model_sql)
print("\nKey points about incremental models:")
print(" - is_incremental() is False on the first run (full load)")
print(" - is_incremental() is True on subsequent runs (append/merge only new rows)")
print(" - unique_key triggers a MERGE instead of INSERT (handles late-arriving data)")
print(" - Use --full-refresh flag to force a full reload")
5. dbt TestsΒΆ
dbt has two kinds of tests:
Generic tests β Declared in YAML (
schema.yml). Built-in:not_null,unique,accepted_values,relationships. Custom generics via macros.Singular tests β One-off SQL files in
tests/. The test passes if the query returns zero rows.
Run with dbt test. Results show pass/fail per column per model.
schema_yml = """
# models/marts/schema.yml
version: 2
models:
- name: fct_orders
description: "One row per order with pre-aggregated line item metrics."
columns:
- name: order_id
description: "Primary key β unique identifier for each order."
tests:
- not_null
- unique
- name: customer_id
description: "Foreign key to dim_customers."
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
- name: status
description: "Order lifecycle status."
tests:
- accepted_values:
values: ['pending', 'completed', 'cancelled', 'refunded']
- name: order_total
description: "Sum of all line item totals for the order."
tests:
- not_null
- name: dim_customers
description: "One row per customer with current attributes."
columns:
- name: customer_id
tests:
- not_null
- unique
- name: customer_segment
tests:
- accepted_values:
values: ['consumer', 'corporate', 'home_office']
"""
print("=== schema.yml (Generic Tests) ===")
print(schema_yml)
singular_test_sql = """
-- tests/singular/assert_order_total_positive.sql
-- Test passes when this query returns ZERO rows.
-- Fail condition: any order with a negative total.
select
order_id,
order_total
from {{ ref('fct_orders') }}
where order_total < 0
"""
print("=== Singular Test: assert_order_total_positive.sql ===")
print(singular_test_sql)
# Simulate generic dbt tests in Python using pandas
def run_generic_tests(df: pd.DataFrame, column: str, test_type: str, **kwargs) -> dict:
"""
Simulate dbt generic tests on a pandas DataFrame column.
Parameters
----------
df : DataFrame containing the data to test
column : Column name to test
test_type : One of 'not_null', 'unique', 'accepted_values'
**kwargs : Extra args (e.g., values=[...] for accepted_values)
Returns
-------
dict with keys: test, column, passed, failures, message
"""
series = df[column]
if test_type == "not_null":
null_count = series.isna().sum()
passed = null_count == 0
return {
"test": "not_null",
"column": column,
"passed": passed,
"failures": int(null_count),
"message": "PASS" if passed else f"FAIL β {null_count} null value(s) found"
}
elif test_type == "unique":
dup_count = series.duplicated().sum()
passed = dup_count == 0
return {
"test": "unique",
"column": column,
"passed": passed,
"failures": int(dup_count),
"message": "PASS" if passed else f"FAIL β {dup_count} duplicate value(s) found"
}
elif test_type == "accepted_values":
accepted = set(kwargs.get("values", []))
invalid = series[~series.isin(accepted) & series.notna()]
passed = len(invalid) == 0
return {
"test": "accepted_values",
"column": column,
"passed": passed,
"failures": len(invalid),
"message": "PASS" if passed else f"FAIL β unexpected values: {invalid.unique().tolist()}"
}
else:
raise ValueError(f"Unknown test type: {test_type}")
# Create a sample fct_orders DataFrame
np.random.seed(42)
n = 10
fct_orders = pd.DataFrame({
"order_id": [f"ORD-{i:04d}" for i in range(1, n + 1)],
"customer_id": [f"CUST-{np.random.randint(1,5):03d}" for _ in range(n)],
"order_date": pd.date_range("2024-01-01", periods=n, freq="D"),
"status": np.random.choice(["pending", "completed", "cancelled"], n),
"order_total": np.round(np.random.uniform(20, 500, n), 2),
})
# Introduce intentional failures for demonstration
fct_orders_with_issues = fct_orders.copy()
fct_orders_with_issues.loc[0, "order_total"] = None # null
fct_orders_with_issues.loc[1, "order_id"] = "ORD-0002" # duplicate
fct_orders_with_issues.loc[2, "status"] = "shipped" # invalid value
print("=== Running dbt-style generic tests ===")
print()
tests_to_run = [
("order_id", "not_null", {}),
("order_id", "unique", {}),
("order_total", "not_null", {}),
("status", "accepted_values", {"values": ["pending", "completed", "cancelled"]}),
]
print("--- Clean DataFrame ---")
for col, test, kw in tests_to_run:
result = run_generic_tests(fct_orders, col, test, **kw)
status = "β" if result["passed"] else "β"
print(f" [{status}] {test}({col}): {result['message']}")
print()
print("--- DataFrame with intentional issues ---")
for col, test, kw in tests_to_run:
result = run_generic_tests(fct_orders_with_issues, col, test, **kw)
status = "β" if result["passed"] else "β"
print(f" [{status}] {test}({col}): {result['message']}")
6. Jinja2 MacrosΒΆ
Macros are reusable SQL functions written in Jinja2. They live in the macros/ directory and are called with {{ macro_name(args) }} in any model.
Common use cases:
Converting cents to dollars
Generating surrogate keys
Abstracting cross-database SQL differences
Dynamically generating SQL (e.g., unpivoting columns)
macro_cents_to_dollars = """
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, scale=2) %}
round(cast({{ column_name }} as numeric) / 100.0, {{ scale }})
{% endmacro %}
"""
macro_surrogate_key = """
-- macros/generate_surrogate_key.sql
-- Creates a deterministic MD5 surrogate key from one or more columns.
{% macro generate_surrogate_key(field_list) %}
{% set fields = [] %}
{% for field in field_list %}
{% set _ = fields.append(
"coalesce(cast(" ~ field ~ " as varchar), '_dbt_utils_surrogate_key_null_')"
) %}
{% endfor %}
md5({{ fields | join(" || '-' || ") }})
{% endmacro %}
"""
macro_safe_divide = """
-- macros/safe_divide.sql
-- Returns null instead of raising a divide-by-zero error.
{% macro safe_divide(numerator, denominator) %}
case
when {{ denominator }} = 0 or {{ denominator }} is null
then null
else {{ numerator }} / {{ denominator }}
end
{% endmacro %}
"""
print("=== Macro: cents_to_dollars ===")
print(macro_cents_to_dollars)
print("=== Macro: generate_surrogate_key ===")
print(macro_surrogate_key)
print("=== Macro: safe_divide ===")
print(macro_safe_divide)
import hashlib
# Simulate macro execution in Python
def cents_to_dollars(value, scale=2):
"""Python equivalent of the cents_to_dollars dbt macro."""
if value is None:
return None
return round(value / 100.0, scale)
def generate_surrogate_key(fields: list):
"""
Python equivalent of generate_surrogate_key macro.
Concatenates field values and returns an MD5 hash.
"""
null_sentinel = "_dbt_utils_surrogate_key_null_"
parts = [str(f) if f is not None else null_sentinel for f in fields]
combined = "-".join(parts)
return hashlib.md5(combined.encode()).hexdigest()
def safe_divide(numerator, denominator):
"""Python equivalent of the safe_divide dbt macro."""
if denominator is None or denominator == 0:
return None
return numerator / denominator
# Demo
print("=== cents_to_dollars simulation ===")
for cents in [1099, 5000, None, 0]:
print(f" {cents} cents -> ${cents_to_dollars(cents)}")
print()
print("=== generate_surrogate_key simulation ===")
rows = [
["ORD-001", "2024-01-15"],
["ORD-002", "2024-01-16"],
["ORD-001", None], # null field
]
for row in rows:
sk = generate_surrogate_key(row)
print(f" {row} -> {sk}")
print()
print("=== safe_divide simulation ===")
pairs = [(100, 4), (100, 0), (50, None), (0, 5)]
for num, den in pairs:
print(f" {num} / {den} -> {safe_divide(num, den)}")
7. Incremental Models β Pandas SimulationΒΆ
Incremental models only process new or changed rows since the last dbt run. This is critical for large tables (billions of rows) where a full refresh would be too expensive.
The pattern:
On first run (
is_incremental() = False): load everything.On subsequent runs (
is_incremental() = True): load only rows newer thanmax(event_timestamp)in the existing table.Use
unique_keyto MERGE (upsert) instead of INSERT to handle late-arriving or updated records.
def simulate_incremental_load(
source_df: pd.DataFrame,
existing_df: pd.DataFrame | None,
timestamp_col: str,
unique_key: str
) -> pd.DataFrame:
"""
Simulate dbt incremental materialization.
Parameters
----------
source_df : Full source table (raw data)
existing_df : Current state of the target table (None = first run)
timestamp_col : Column used to identify new rows
unique_key : Primary key for upsert (MERGE) behaviour
Returns
-------
Updated target table DataFrame.
"""
is_incremental = existing_df is not None and len(existing_df) > 0
if not is_incremental:
print("[is_incremental() = False] Full load β processing ALL rows.")
new_rows = source_df.copy()
else:
max_ts = existing_df[timestamp_col].max()
print(f"[is_incremental() = True] Incremental load β rows after {max_ts}")
new_rows = source_df[source_df[timestamp_col] > max_ts].copy()
print(f" Rows in source : {len(source_df):,}")
print(f" New rows to process : {len(new_rows):,}")
new_rows["dbt_loaded_at"] = datetime.datetime.now()
if not is_incremental:
return new_rows
# Upsert: replace existing rows with same unique_key, append the rest
existing_filtered = existing_df[~existing_df[unique_key].isin(new_rows[unique_key])]
result = pd.concat([existing_filtered, new_rows], ignore_index=True)
result = result.sort_values(timestamp_col).reset_index(drop=True)
return result
# Build a fake raw events source
base_ts = datetime.datetime(2024, 1, 1)
raw_events = pd.DataFrame({
"event_id": [f"EVT-{i:05d}" for i in range(1, 21)],
"user_id": np.random.randint(100, 110, 20),
"event_type": np.random.choice(["page_view", "click", "purchase"], 20),
"event_timestamp": [base_ts + datetime.timedelta(hours=i * 6) for i in range(20)],
})
print("\n" + "=" * 55)
print("RUN 1: First dbt run (full load)")
print("=" * 55)
target_table = simulate_incremental_load(
source_df=raw_events.iloc[:10], # only first 10 rows exist
existing_df=None,
timestamp_col="event_timestamp",
unique_key="event_id"
)
print(f" Target table rows after run 1: {len(target_table)}")
print(f" Max timestamp: {target_table['event_timestamp'].max()}")
print()
print("=" * 55)
print("RUN 2: Incremental run (10 new events arrived)")
print("=" * 55)
target_table = simulate_incremental_load(
source_df=raw_events, # all 20 rows now in source
existing_df=target_table,
timestamp_col="event_timestamp",
unique_key="event_id"
)
print(f" Target table rows after run 2: {len(target_table)}")
print(f" Max timestamp: {target_table['event_timestamp'].max()}")
print()
print("Final target table (last 5 rows):")
print(target_table.tail(5).to_string(index=False))
8. DAG and LineageΒΆ
dbt automatically builds a Directed Acyclic Graph (DAG) from ref() and source() calls. This enables:
Parallel execution of independent models
Selective runs (
dbt run --select +fct_ordersruns fct_orders and all upstream models)Documentation and lineage visualization (
dbt docs serve)
The DAG flows from raw sources through staging β intermediate β marts.
try:
import networkx as nx
import matplotlib
matplotlib.use("Agg") # non-interactive backend
import matplotlib.pyplot as plt
# Build the DAG
G = nx.DiGraph()
nodes = [
("raw.orders", {"layer": "source"}),
("raw.customers", {"layer": "source"}),
("raw.order_items", {"layer": "source"}),
("stg_orders", {"layer": "staging"}),
("stg_customers", {"layer": "staging"}),
("stg_order_items", {"layer": "staging"}),
("int_order_items", {"layer": "intermediate"}),
("dim_customers", {"layer": "mart"}),
("fct_orders", {"layer": "mart"}),
]
edges = [
("raw.orders", "stg_orders"),
("raw.customers", "stg_customers"),
("raw.order_items", "stg_order_items"),
("stg_orders", "int_order_items"),
("stg_order_items", "int_order_items"),
("stg_customers", "dim_customers"),
("int_order_items", "fct_orders"),
("dim_customers", "fct_orders"),
]
G.add_nodes_from(nodes)
G.add_edges_from(edges)
layer_colors = {
"source": "#f4a261",
"staging": "#2a9d8f",
"intermediate": "#e9c46a",
"mart": "#264653",
}
pos = {
"raw.orders": (0, 2),
"raw.customers": (0, 1),
"raw.order_items": (0, 0),
"stg_orders": (2, 2),
"stg_customers": (2, 1),
"stg_order_items": (2, 0),
"int_order_items": (4, 1),
"dim_customers": (4, 2),
"fct_orders": (6, 1.5),
}
node_colors = [layer_colors[G.nodes[n]["layer"]] for n in G.nodes]
node_font_colors = ["white" if G.nodes[n]["layer"] == "mart" else "black" for n in G.nodes]
fig, ax = plt.subplots(figsize=(14, 5))
nx.draw_networkx(
G, pos=pos, ax=ax,
node_color=node_colors,
node_size=2200,
font_size=8,
font_color="white",
arrows=True,
arrowstyle="->",
arrowsize=20,
edge_color="#555",
width=1.5,
)
# Legend
from matplotlib.patches import Patch
legend_elements = [Patch(facecolor=c, label=l) for l, c in layer_colors.items()]
ax.legend(handles=legend_elements, loc="lower right", fontsize=9)
ax.set_title("dbt Project DAG", fontsize=14, pad=15)
ax.axis("off")
plt.tight_layout()
plt.savefig("/tmp/dbt_dag.png", dpi=120, bbox_inches="tight")
plt.show()
print("DAG saved to /tmp/dbt_dag.png")
except ImportError:
print("networkx/matplotlib not available β falling back to ASCII DAG.\n")
ascii_dag = """
SOURCE LAYER STAGING LAYER INTERMEDIATE MART LAYER
βββββββββββββ βββββββββββββ ββββββββββββ ββββββββββ
raw.orders --> stg_orders ββ
βββ> int_order_items βββ
raw.order_items --> stg_order_items β βββ> fct_orders
β
raw.customers --> stg_customers ββ> dim_customers ββββ
Legend:
raw.* = Source tables (declared in sources.yml)
stg_* = Staging models (view)
int_* = Intermediate models (ephemeral)
fct_*/dim_* = Mart models (table)
"""
print(ascii_dag)
# Always show ASCII version for documentation purposes
ascii_dag = """
SOURCE LAYER STAGING LAYER INTERMEDIATE MART LAYER
βββββββββββββ βββββββββββββ ββββββββββββ ββββββββββ
raw.orders --> stg_orders βββ
βββ> int_order_items βββ
raw.order_items --> stg_order_items βββ βββ> fct_orders
β
raw.customers --> stg_customers ββββ> dim_customers βββββ
Execution order (dbt resolves automatically from ref() calls):
1. stg_orders, stg_customers, stg_order_items (parallel)
2. int_order_items, dim_customers (parallel, after staging)
3. fct_orders (after both intermediate and dim)
"""
print(ascii_dag)
9. dbt Commands Cheat SheetΒΆ
dbt run # Run all models
dbt run --select staging.* # Run staging models only
dbt run --select +fct_orders # fct_orders and all upstream
dbt test # Run all tests
dbt test --select stg_orders # Test specific model
dbt docs generate # Generate documentation
dbt docs serve # Open docs in browser (localhost:8080)
dbt seed # Load CSV seeds to warehouse
dbt snapshot # Capture slowly changing dimensions
dbt compile # Compile Jinja β SQL without running
dbt debug # Check connection and config
dbt deps # Install dbt packages from packages.yml
Node Selection SyntaxΒΆ
Selector |
Meaning |
|---|---|
|
Only the |
|
All models in the staging folder |
|
|
|
|
|
|
|
All models tagged |
|
The |
Useful FlagsΒΆ
dbt run --full-refresh # Force incremental models to do a full reload
dbt run --target prod # Use the prod profile
dbt run --threads 8 # Parallelism level
dbt run --vars '{run_date: 2024-01-15}' # Pass runtime variables
10. ExercisesΒΆ
Work through these exercises to practice dbt concepts covered in this notebook.
Exercise 1 β Staging Model with Type Casting
Write a staging model for a raw_events table that:
Casts
event_idtovarchar,event_timestamptotimestamp, andrevenuetonumericRenames all columns to snake_case (
userIdβuser_id,eventTypeβevent_type)Adds a
loaded_atcolumn set tocurrent_timestampFilters out test events where
event_type = 'test'
Exercise 2 β Schema Tests for fct_orders
Add a complete schema.yml for fct_orders with the following tests:
order_idisnot_nullanduniquestatusis in['pending', 'completed', 'cancelled']customer_idreferencesdim_customers.customer_id(relationships test)order_totalisnot_null
Exercise 3 β Incremental Model with Date Filter
Write an incremental model that loads only orders created in the last 3 days, partitioned by order_date:
Use
is_incremental()to filter on the last 3 days relative tocurrent_dateSet
unique_key = 'order_id'so late-arriving updates are mergedAdd a
dbt_updated_ataudit column
Exercise 4 β safe_divide Macro
Create a macro safe_divide(numerator, denominator) that:
Returns
nullinstead of raising a divide-by-zero errorHandles both
0andnulldenominatorsUse it in a model to compute
avg_order_value = safe_divide(total_revenue, total_orders)
Exercise 5 β SCD Type 2 with dbt Snapshots
Simulate a Slowly Changing Dimension (SCD Type 2) for customers. dbt snapshots automatically track row history by adding dbt_valid_from and dbt_valid_to columns.
-- snapshots/snap_customers.sql
{% snapshot snap_customers %}
{{ config(
target_schema = 'snapshots',
unique_key = 'customer_id',
strategy = 'check',
check_cols = ['email', 'customer_segment', 'address']
) }}
select * from {{ source('raw', 'customers') }}
{% endsnapshot %}
After running dbt snapshot, the snapshot table will contain:
customer_id |
segment |
dbt_valid_from |
dbt_valid_to |
|
|---|---|---|---|---|
1 |
consumer |
2024-01-01 |
2024-06-01 |
|
1 |
corporate |
2024-06-01 |
null |
A null dbt_valid_to indicates the current record. Use this pattern to answer βwhat was the customerβs segment at the time of the order?β by joining on order_date between dbt_valid_from and coalesce(dbt_valid_to, current_date).