PySpark Fundamentals: Distributed Data Processing for Large DatasetsΒΆ
When pandas runs out of RAM, Spark scales to terabytes. This notebook covers PySpark DataFrames, transformations, actions, SQL API, and the mental model shift from pandas to distributed computing.
Setup: PySpark Availability CheckΒΆ
PySpark requires Java + the Spark distribution. We detect availability and show all code patterns β each PySpark pattern is paired with a pandas equivalent so you see the conceptual mapping clearly.
try:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import (
StructType, StructField, StringType, IntegerType, DoubleType
)
from pyspark.sql.window import Window
HAS_SPARK = True
print('PySpark available')
except ImportError:
HAS_SPARK = False
print('PySpark not installed β showing all code patterns')
print('Install: pip install pyspark (also needs Java 8/11/17)')
import pandas as pd
import numpy as np
import sqlite3
np.random.seed(42)
print('pandas version:', pd.__version__)
1. The Spark Mental ModelΒΆ
Distributed Computing ArchitectureΒΆ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β DRIVER NODE β
β Your Python code β SparkContext β DAG Scheduler β
ββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββββββ
β distributes tasks
ββββββββ΄βββββββββββββββββββ
βΌ βΌ
βββββββββββ βββββββββββ
βExecutor β βExecutor β ...N executors
β Task1 β β Task1 β
β Task2 β β Task2 β
β[Part 1] β β[Part 2] β each processes one partition
βββββββββββ βββββββββββ
Key ConceptsΒΆ
Concept |
Description |
|---|---|
Driver |
Your Python process. Coordinates, doesnβt process data. |
Executor |
Worker process. Runs tasks in parallel across partitions. |
Partition |
A chunk of the dataset. Default: one per CPU core. |
RDD |
Resilient Distributed Dataset. Low-level API (rarely used directly). |
DataFrame |
High-level API. Columnar, schema-aware. Use this. |
Dataset |
Typed DataFrame (Scala/Java only). |
Transformation |
Lazy operation that builds a plan. Does NOT compute. |
Action |
Triggers computation. Returns data to driver or writes to disk. |
DAG |
Sparkβs execution plan β built from transformations, run on action. |
Stage |
A group of tasks with no shuffle. Stages separated by shuffles. |
Shuffle |
Redistribution of data across executors. Expensive. Avoid when possible. |
# Lazy evaluation demonstration
lazy_eval_explanation = '''
TRANSFORMATIONS (lazy β just build a plan):
df.select(...) df.filter(...) df.withColumn(...)
df.groupBy(...) df.join(...) df.orderBy(...)
df.union(...) df.distinct() df.repartition(...)
ACTIONS (eager β trigger computation):
df.show() df.count() df.collect()
df.first() df.take(n) df.toPandas()
df.write.parquet() df.write.csv() df.write.saveAsTable()
Why lazy evaluation?
1. Spark optimizes the ENTIRE plan before running any computation
2. Catalyst optimizer rewrites your query for efficiency
3. Operations that cancel each other out are eliminated
4. Filter pushdown: WHERE clauses moved to data source level
Mental model:
pandas: executes immediately, like Python statements
Spark: builds a recipe first, cooks only when you call an action
'''
print(lazy_eval_explanation)
2. SparkSession SetupΒΆ
spark_session_code = '''
from pyspark.sql import SparkSession
spark = SparkSession.builder \\
.appName("PySpark Fundamentals") \\
.master("local[*]") \\
.config("spark.sql.shuffle.partitions", "8") \\
.config("spark.driver.memory", "4g") \\
.config("spark.sql.adaptive.enabled", "true") \\
.getOrCreate()
# master options:
# local β 1 thread (debugging only)
# local[4] β 4 threads
# local[*] β all CPU cores
# spark://... β cluster master URL
# yarn β YARN (Hadoop cluster)
# k8s://... β Kubernetes
spark.version # e.g. '3.5.0'
sc = spark.sparkContext # low-level context if needed
'''
if HAS_SPARK:
spark = SparkSession.builder \
.appName('PySpark Fundamentals') \
.master('local[*]') \
.config('spark.sql.shuffle.partitions', '4') \
.config('spark.ui.showConsoleProgress', 'false') \
.getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
print(f'Spark version: {spark.version}')
print(f'Cores available: {spark.sparkContext.defaultParallelism}')
else:
print('SparkSession code pattern:')
print(spark_session_code)
3. DataFrame CreationΒΆ
# Generate sample data as pandas first (works regardless of Spark)
np.random.seed(42)
N = 1000
orders_pandas = pd.DataFrame({
'order_id': range(1, N + 1),
'customer_id': np.random.randint(1, 201, N),
'product': np.random.choice(['Laptop','Phone','Tablet','Watch','Camera'], N),
'category': np.random.choice(['Electronics','Clothing','Books'], N),
'amount': np.round(np.random.uniform(10, 1000, N), 2),
'quantity': np.random.randint(1, 6, N),
'status': np.random.choice(['completed','returned','pending'], N, p=[0.7,0.1,0.2]),
'order_date': pd.date_range('2023-01-01', periods=N, freq='8h').strftime('%Y-%m-%d')
})
print(f'Sample DataFrame: {orders_pandas.shape}')
orders_pandas.head()
dataframe_creation_code = '''
# Method 1: From pandas DataFrame
spark_df = spark.createDataFrame(orders_pandas)
# Method 2: From list of tuples with explicit schema
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
schema = StructType([
StructField("order_id", IntegerType(), nullable=False),
StructField("customer_id",IntegerType(), nullable=True),
StructField("product", StringType(), nullable=True),
StructField("amount", DoubleType(), nullable=True),
])
data = [(1, 42, "Laptop", 999.99), (2, 17, "Phone", 599.00)]
spark_df = spark.createDataFrame(data, schema=schema)
# Method 3: From CSV/Parquet/JSON
spark_df = spark.read.csv("/data/orders.csv", header=True, inferSchema=True)
spark_df = spark.read.parquet("/data/orders.parquet")
spark_df = spark.read.json("/data/orders.json")
# Inspect
spark_df.printSchema() # column names and types
spark_df.show(5) # print first 5 rows
spark_df.show(5, truncate=False) # show full column values
spark_df.dtypes # list of (name, type) tuples
spark_df.count() # ACTION: triggers computation
spark_df.describe().show() # summary statistics
'''
if HAS_SPARK:
spark_df = spark.createDataFrame(orders_pandas)
print('Schema:')
spark_df.printSchema()
print(f'Partitions: {spark_df.rdd.getNumPartitions()}')
print('First 5 rows:')
spark_df.show(5)
else:
print('DataFrame creation patterns:')
print(dataframe_creation_code)
4. Transformations (Lazy)ΒΆ
Every transformation returns a new DataFrame β PySpark DataFrames are immutable. No computation happens until an action is called.
transformations_code = '''
from pyspark.sql import functions as F
# select: choose/rename/compute columns
df.select("order_id", "amount", "status")
df.select(F.col("amount") * 1.1) # 10% markup
df.select("*", F.col("amount").alias("price")) # add computed column
# filter (alias: where)
df.filter(F.col("status") == "completed")
df.filter((F.col("amount") > 100) & (F.col("category") == "Electronics"))
df.where("amount > 100 AND status = 'completed'") # SQL string syntax
# withColumn: add/replace column
df.withColumn("total", F.col("amount") * F.col("quantity"))
df.withColumn("status_upper", F.upper(F.col("status")))
df.withColumn("order_year", F.year(F.to_date("order_date", "yyyy-MM-dd")))
# withColumnRenamed
df.withColumnRenamed("amount", "order_amount")
# drop columns
df.drop("order_date", "quantity")
# distinct / dropDuplicates
df.distinct()
df.dropDuplicates(["customer_id"]) # keep first per customer
# groupBy + agg
df.groupBy("category").agg(
F.count("*").alias("order_count"),
F.sum("amount").alias("total_revenue"),
F.avg("amount").alias("avg_order"),
F.max("amount").alias("max_order")
)
# orderBy / sort
df.orderBy("amount", ascending=False)
df.orderBy(F.col("amount").desc(), F.col("order_id").asc())
# join
orders.join(customers, on="customer_id", how="inner") # inner/left/right/outer/cross
orders.join(customers, orders.customer_id == customers.id, "left")
'''
if HAS_SPARK:
from pyspark.sql import functions as F
# Chained transformations (all lazy)
result = (
spark_df
.filter(F.col('status') == 'completed')
.withColumn('total_value', F.col('amount') * F.col('quantity'))
.withColumn('order_year', F.year(F.to_date('order_date', 'yyyy-MM-dd')))
.groupBy('category')
.agg(
F.count('*').alias('order_count'),
F.round(F.sum('total_value'), 2).alias('total_revenue'),
F.round(F.avg('amount'), 2).alias('avg_amount')
)
.orderBy('total_revenue', ascending=False)
)
print('Revenue by category (completed orders):')
result.show()
else:
print('Transformations reference:')
print(transformations_code)
# pandas equivalent
result = (
orders_pandas
.query("status == 'completed'")
.assign(total_value=lambda df: df['amount'] * df['quantity'])
.groupby('category')
.agg(order_count=('order_id','count'),
total_revenue=('total_value','sum'),
avg_amount=('amount','mean'))
.round(2)
.sort_values('total_revenue', ascending=False)
)
print('Pandas equivalent result:')
display(result)
# Window functions in PySpark
window_functions_code = '''
from pyspark.sql.window import Window
from pyspark.sql import functions as F
# Define window
window_by_customer = Window.partitionBy("customer_id").orderBy("order_date")
window_category = Window.partitionBy("category").orderBy(F.col("amount").desc())
window_running = Window.orderBy("order_date").rowsBetween(
Window.unboundedPreceding, Window.currentRow)
df.withColumn("row_num", F.row_number().over(window_by_customer)) \\
.withColumn("rank_cat", F.rank().over(window_category)) \\
.withColumn("prev_amount", F.lag("amount", 1).over(window_by_customer)) \\
.withColumn("running_sum", F.sum("amount").over(window_running))
'''
if HAS_SPARK:
from pyspark.sql.window import Window
w_cat = Window.partitionBy('category').orderBy(F.col('amount').desc())
top_orders = (
spark_df
.withColumn('rank_in_category', F.rank().over(w_cat))
.filter(F.col('rank_in_category') <= 2)
.select('category', 'product', 'amount', 'rank_in_category')
.orderBy('category', 'rank_in_category')
)
print('Top 2 orders per category by amount:')
top_orders.show()
else:
print('PySpark Window functions:')
print(window_functions_code)
# pandas equivalent
orders_pandas['rank_in_cat'] = orders_pandas.groupby('category')['amount'].rank(
method='first', ascending=False
)
top2 = (orders_pandas[orders_pandas['rank_in_cat'] <= 2]
[['category','product','amount','rank_in_cat']]
.sort_values(['category','rank_in_cat']))
print('Pandas equivalent:')
display(top2.head(10))
5. Actions (Trigger Computation)ΒΆ
actions_code = '''
# Actions that return data to the driver:
df.show(n=20, truncate=True) # print to console
df.count() # number of rows (triggers full scan)
df.collect() # ALL rows as Python list β dangerous on large data!
df.first() # first Row object
df.head(n) # first n Row objects
df.take(n) # same as head(n)
df.toPandas() # convert to pandas β only for small data!
# Actions that write data:
df.write.parquet("/out/orders/", mode="overwrite") # best for Spark
df.write.csv("/out/orders.csv", header=True, mode="append")
df.write.json("/out/orders/")
df.write.saveAsTable("default.orders") # Hive metastore
df.write.format("delta").save("/out/orders/") # Delta Lake
# Write modes:
# overwrite β replace existing data
# append β add to existing data
# error β fail if exists (default)
# ignore β skip if exists
# Partitioned write (creates subdirectories by value)
df.write.partitionBy("order_year", "category") \\
.parquet("/out/orders_partitioned/", mode="overwrite")
# Produces: /out/orders_partitioned/order_year=2023/category=Electronics/part-00000.parquet
'''
if HAS_SPARK:
print('Actions demonstration:')
print(f' count(): {spark_df.count()}')
print(f' first(): {spark_df.first()["order_id"]}')
print(f' Partitions: {spark_df.rdd.getNumPartitions()}')
# toPandas (safe for small results)
small = spark_df.groupBy('status').count().toPandas()
print(' Status counts (via toPandas):')
display(small)
else:
print('Actions reference:')
print(actions_code)
6. Spark SQL: SQL API Against DataFramesΒΆ
spark_sql_code = '''
# Register DataFrame as a temporary view (SQL-queryable)
df.createOrReplaceTempView("orders")
# Run SQL β same optimizer, same performance
result = spark.sql("""
SELECT
category,
COUNT(*) AS order_count,
ROUND(SUM(amount), 2) AS total_revenue,
ROUND(AVG(amount), 2) AS avg_order,
ROUND(SUM(amount) / SUM(SUM(amount)) OVER () * 100, 1) AS pct_of_total
FROM orders
WHERE status = 'completed'
GROUP BY category
ORDER BY total_revenue DESC
""")
result.show()
# Global temp views (survive across SparkSessions)
df.createOrReplaceGlobalTempView("orders_global")
spark.sql("SELECT * FROM global_temp.orders_global LIMIT 5")
# Mix SQL and DataFrame API freely:
top_categories = spark.sql("SELECT category FROM orders GROUP BY category HAVING COUNT(*) > 100")
filtered = df.join(top_categories, on="category")
'''
if HAS_SPARK:
spark_df.createOrReplaceTempView('orders')
sql_result = spark.sql("""
SELECT
category,
COUNT(*) AS order_count,
ROUND(SUM(amount), 2) AS total_revenue,
ROUND(AVG(amount), 2) AS avg_order
FROM orders
WHERE status = 'completed'
GROUP BY category
ORDER BY total_revenue DESC
""")
print('Spark SQL result:')
sql_result.show()
print('Explain plan (Spark optimizer output):')
sql_result.explain(mode='simple')
else:
print('Spark SQL patterns:')
print(spark_sql_code)
7. Performance: Partitioning, Caching, Broadcast JoinsΒΆ
performance_code = '''
# --- Caching ---
# Use when a DataFrame is reused multiple times
df.cache() # MEMORY_AND_DISK (default)
df.persist() # same as cache()
df.persist(StorageLevel.MEMORY_ONLY) # faster, may cause OOM
df.unpersist() # release cache
# When to cache: DataFrame used in 2+ actions, computation is expensive
# When NOT to cache: DataFrame used once, data doesn't fit in memory
# --- Broadcast Join ---
# When one table is small (< few hundred MB), broadcast it to all executors
# Avoids shuffle entirely β massive speedup
from pyspark.sql.functions import broadcast
# Auto-broadcast threshold (default: 10MB)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 50 * 1024 * 1024) # 50MB
# Explicit broadcast hint:
orders.join(broadcast(lookup_table), on="product_id")
# --- Repartition vs Coalesce ---
df.repartition(100) # increase OR decrease partitions (full shuffle)
df.coalesce(10) # only decrease (no shuffle β merge local partitions)
df.repartition("category") # repartition by column value (for partitioned writes)
# Rule of thumb: 128MB per partition
# Too few partitions β cores sit idle
# Too many partitions β task scheduling overhead
# --- Adaptive Query Execution (AQE) ---
spark.conf.set("spark.sql.adaptive.enabled", "true") # Spark 3.0+ default
# AQE dynamically adjusts partitions and join strategies at runtime
# --- Skew handling ---
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Splits skewed partitions automatically
'''
if HAS_SPARK:
# Demonstrate caching
import time
completed = spark_df.filter(F.col('status') == 'completed').cache()
# First action: materializes cache
t0 = time.time()
n1 = completed.count()
t1 = time.time() - t0
# Second action: reads from cache
t0 = time.time()
n2 = completed.groupBy('category').count().collect()
t2 = time.time() - t0
print(f'First action (fill cache): {t1*1000:.1f}ms ({n1} rows)')
print(f'Second action (from cache): {t2*1000:.1f}ms')
completed.unpersist()
# Broadcast join demo
print()
category_meta = spark.createDataFrame([
('Electronics', 'High Value'), ('Books', 'Low Margin'), ('Clothing', 'Seasonal')
], ['category', 'tier'])
joined = spark_df.join(F.broadcast(category_meta), on='category', how='left')
print('Broadcast join explain:')
joined.explain(mode='simple')
else:
print('Performance patterns:')
print(performance_code)
8. pandas vs PySpark: When to Use EachΒΆ
Factor |
pandas |
PySpark |
|---|---|---|
Data size |
< 10GB (fits in RAM) |
> 10GB (or terabytes) |
Execution |
Single machine, single core (vectorized) |
Multi-node cluster |
Speed on small data |
Faster (no serialization overhead) |
Slower (cluster startup cost) |
Speed on large data |
OOM or very slow |
Linear scale-out |
API maturity |
Very mature, rich ecosystem |
Growing fast (pandas-on-Spark) |
ML integration |
scikit-learn, everything |
MLlib, works with pandas UDFs |
Debugging |
Easy (standard Python) |
Harder (distributed errors) |
Cost |
Free (laptop) |
Cloud cluster costs |
Learning curve |
Low |
Medium-High |
Mutation |
DataFrames are mutable |
DataFrames are immutable |
Indexing |
Row index (0,1,2β¦) |
No row index (partitioned) |
Null handling |
NaN (float) + None |
null (unified) |
# pandas β PySpark API translation table
comparison = {
'pandas': [
'df["col"]',
'df[["a","b"]]',
'df.rename(columns={"old":"new"})',
'df[df.col > 100]',
'df.assign(x=df.a+df.b)',
'df.groupby("col").agg({"a":"sum","b":"mean"})',
'df.sort_values("col", ascending=False)',
'df.merge(df2, on="key", how="left")',
'df.drop_duplicates()',
'df.fillna(0)',
'df.isna().sum()',
'len(df)',
'df.head(5)',
'df.dtypes',
'df.to_csv("out.csv")',
],
'PySpark': [
'df.select("col") or df.col',
'df.select("a","b")',
'df.withColumnRenamed("old","new")',
'df.filter(F.col("col") > 100)',
'df.withColumn("x", F.col("a")+F.col("b"))',
'df.groupBy("col").agg(F.sum("a"), F.avg("b"))',
'df.orderBy(F.col("col").desc())',
'df.join(df2, on="key", how="left")',
'df.distinct()',
'df.fillna(0)',
'df.select([F.count(F.when(F.isnan(c),c)).alias(c) for c in df.columns])',
'df.count()',
'df.show(5) or df.head(5)',
'df.dtypes or df.printSchema()',
'df.write.csv("out/", header=True)',
]
}
comp_df = pd.DataFrame(comparison)
comp_df.index += 1
display(comp_df)
# pandas-on-Spark (formerly Koalas) β pandas API on Spark
pandas_on_spark = '''
# Spark 3.2+: pandas API on Spark (pyspark.pandas)
import pyspark.pandas as ps
# Almost identical to pandas, runs on Spark underneath
df = ps.read_csv("/data/orders.csv")
df[df["status"] == "completed"].groupby("category")["amount"].mean()
# Convert between APIs
spark_df = df.to_spark() # pyspark.pandas β Spark DataFrame
ps_df = spark_df.pandas_api() # Spark DataFrame β pyspark.pandas
pd_df = df.to_pandas() # pyspark.pandas β regular pandas (collects!)
'''
print('pandas-on-Spark (best of both worlds):')
print(pandas_on_spark)
if HAS_SPARK:
spark.stop()
print('SparkSession stopped.')
Cheat SheetΒΆ
# SparkSession
spark = SparkSession.builder.appName('app').master('local[*]').getOrCreate()
# Create DataFrame
df = spark.createDataFrame(pandas_df)
df = spark.read.parquet('/path/')
df = spark.read.csv('/path/', header=True, inferSchema=True)
# Inspect
df.printSchema() df.show(5) df.count() df.describe().show()
# Transformations (lazy)
df.select('a','b') # columns
df.filter(F.col('x') > 0) # filter rows
df.withColumn('c', F.col('a') + F.col('b')) # new column
df.groupBy('cat').agg(F.sum('amt')) # aggregate
df.orderBy(F.col('amt').desc()) # sort
df.join(other, on='key', how='left') # join
df.repartition(100) df.coalesce(10) # partition control
# Actions (eager)
df.show() df.count() df.collect() df.toPandas() df.write.parquet(...)
# Performance
df.cache() # reuse in multiple actions
orders.join(broadcast(small), ...) # avoid shuffle for small tables
spark.conf.set('spark.sql.adaptive.enabled', 'true') # AQE
# SQL API
df.createOrReplaceTempView('t')
spark.sql('SELECT * FROM t WHERE x > 0').show()
9. ExercisesΒΆ
DataFrame operations: Using the
orders_pandasDataFrame (or create a Spark DataFrame from it), write a PySpark pipeline that: (a) filters to completed orders, (b) adds atotal_valuecolumn (amount * quantity), Β© computes the top-5 products by total revenue, (d) shows the result.Window functions: Using PySpark window functions (or pandas equivalent), compute: for each product, the running 7-day total revenue ordered by date. Show the last 10 rows.
Broadcast join: Create a small
discountsDataFrame ({'category': ..., 'discount_pct': ...}). Join it to the orders DataFrame usingbroadcast(). Add adiscounted_amountcolumn. Verify with.explain()that a broadcast join was used.Repartitioning: Create a DataFrame with 4 partitions (
df.repartition(4)). Usedf.rdd.mapPartitions(lambda it: [sum(1 for _ in it)])to count rows per partition. Then userepartition('category')and repeat β what changed?Spark SQL: Register the orders DataFrame as a temp view called
orders. Write a Spark SQL query using a window function (ROW_NUMBER or RANK) to find the top-3 orders by amount for each category. Compare the query plan (explain()) with the equivalent DataFrame API code.