PySpark Fundamentals: Distributed Data Processing for Large DatasetsΒΆ

When pandas runs out of RAM, Spark scales to terabytes. This notebook covers PySpark DataFrames, transformations, actions, SQL API, and the mental model shift from pandas to distributed computing.

Setup: PySpark Availability CheckΒΆ

PySpark requires Java + the Spark distribution. We detect availability and show all code patterns β€” each PySpark pattern is paired with a pandas equivalent so you see the conceptual mapping clearly.

try:
    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    from pyspark.sql.types import (
        StructType, StructField, StringType, IntegerType, DoubleType
    )
    from pyspark.sql.window import Window
    HAS_SPARK = True
    print('PySpark available')
except ImportError:
    HAS_SPARK = False
    print('PySpark not installed β€” showing all code patterns')
    print('Install: pip install pyspark  (also needs Java 8/11/17)')

import pandas as pd
import numpy as np
import sqlite3

np.random.seed(42)
print('pandas version:', pd.__version__)

1. The Spark Mental ModelΒΆ

Distributed Computing ArchitectureΒΆ

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    DRIVER NODE                          β”‚
β”‚  Your Python code β”‚ SparkContext β”‚ DAG Scheduler        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
           β”‚  distributes tasks
    β”Œβ”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β–Ό                         β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”             β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚Executor β”‚             β”‚Executor β”‚    ...N executors
β”‚ Task1   β”‚             β”‚ Task1   β”‚
β”‚ Task2   β”‚             β”‚ Task2   β”‚
β”‚[Part 1] β”‚             β”‚[Part 2] β”‚    each processes one partition
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜             β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Key ConceptsΒΆ

Concept

Description

Driver

Your Python process. Coordinates, doesn’t process data.

Executor

Worker process. Runs tasks in parallel across partitions.

Partition

A chunk of the dataset. Default: one per CPU core.

RDD

Resilient Distributed Dataset. Low-level API (rarely used directly).

DataFrame

High-level API. Columnar, schema-aware. Use this.

Dataset

Typed DataFrame (Scala/Java only).

Transformation

Lazy operation that builds a plan. Does NOT compute.

Action

Triggers computation. Returns data to driver or writes to disk.

DAG

Spark’s execution plan β€” built from transformations, run on action.

Stage

A group of tasks with no shuffle. Stages separated by shuffles.

Shuffle

Redistribution of data across executors. Expensive. Avoid when possible.

# Lazy evaluation demonstration
lazy_eval_explanation = '''
TRANSFORMATIONS (lazy β€” just build a plan):
  df.select(...)       df.filter(...)       df.withColumn(...)
  df.groupBy(...)      df.join(...)         df.orderBy(...)
  df.union(...)        df.distinct()        df.repartition(...)

ACTIONS (eager β€” trigger computation):
  df.show()            df.count()           df.collect()
  df.first()           df.take(n)           df.toPandas()
  df.write.parquet()   df.write.csv()       df.write.saveAsTable()

Why lazy evaluation?
  1. Spark optimizes the ENTIRE plan before running any computation
  2. Catalyst optimizer rewrites your query for efficiency
  3. Operations that cancel each other out are eliminated
  4. Filter pushdown: WHERE clauses moved to data source level

Mental model:
  pandas: executes immediately, like Python statements
  Spark:  builds a recipe first, cooks only when you call an action
'''
print(lazy_eval_explanation)

2. SparkSession SetupΒΆ

spark_session_code = '''
from pyspark.sql import SparkSession

spark = SparkSession.builder \\
    .appName("PySpark Fundamentals") \\
    .master("local[*]") \\
    .config("spark.sql.shuffle.partitions", "8") \\
    .config("spark.driver.memory", "4g") \\
    .config("spark.sql.adaptive.enabled", "true") \\
    .getOrCreate()

# master options:
#   local       β€” 1 thread (debugging only)
#   local[4]    β€” 4 threads
#   local[*]    β€” all CPU cores
#   spark://... β€” cluster master URL
#   yarn        β€” YARN (Hadoop cluster)
#   k8s://...   β€” Kubernetes

spark.version  # e.g. '3.5.0'
sc = spark.sparkContext  # low-level context if needed
'''

if HAS_SPARK:
    spark = SparkSession.builder \
        .appName('PySpark Fundamentals') \
        .master('local[*]') \
        .config('spark.sql.shuffle.partitions', '4') \
        .config('spark.ui.showConsoleProgress', 'false') \
        .getOrCreate()
    spark.sparkContext.setLogLevel('ERROR')
    print(f'Spark version: {spark.version}')
    print(f'Cores available: {spark.sparkContext.defaultParallelism}')
else:
    print('SparkSession code pattern:')
    print(spark_session_code)

3. DataFrame CreationΒΆ

# Generate sample data as pandas first (works regardless of Spark)
np.random.seed(42)
N = 1000

orders_pandas = pd.DataFrame({
    'order_id':   range(1, N + 1),
    'customer_id': np.random.randint(1, 201, N),
    'product':    np.random.choice(['Laptop','Phone','Tablet','Watch','Camera'], N),
    'category':   np.random.choice(['Electronics','Clothing','Books'], N),
    'amount':     np.round(np.random.uniform(10, 1000, N), 2),
    'quantity':   np.random.randint(1, 6, N),
    'status':     np.random.choice(['completed','returned','pending'], N, p=[0.7,0.1,0.2]),
    'order_date': pd.date_range('2023-01-01', periods=N, freq='8h').strftime('%Y-%m-%d')
})

print(f'Sample DataFrame: {orders_pandas.shape}')
orders_pandas.head()
dataframe_creation_code = '''
# Method 1: From pandas DataFrame
spark_df = spark.createDataFrame(orders_pandas)

# Method 2: From list of tuples with explicit schema
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

schema = StructType([
    StructField("order_id",   IntegerType(), nullable=False),
    StructField("customer_id",IntegerType(), nullable=True),
    StructField("product",    StringType(),  nullable=True),
    StructField("amount",     DoubleType(),  nullable=True),
])

data = [(1, 42, "Laptop", 999.99), (2, 17, "Phone", 599.00)]
spark_df = spark.createDataFrame(data, schema=schema)

# Method 3: From CSV/Parquet/JSON
spark_df = spark.read.csv("/data/orders.csv", header=True, inferSchema=True)
spark_df = spark.read.parquet("/data/orders.parquet")
spark_df = spark.read.json("/data/orders.json")

# Inspect
spark_df.printSchema()         # column names and types
spark_df.show(5)               # print first 5 rows
spark_df.show(5, truncate=False)  # show full column values
spark_df.dtypes                # list of (name, type) tuples
spark_df.count()               # ACTION: triggers computation
spark_df.describe().show()     # summary statistics
'''

if HAS_SPARK:
    spark_df = spark.createDataFrame(orders_pandas)
    print('Schema:')
    spark_df.printSchema()
    print(f'Partitions: {spark_df.rdd.getNumPartitions()}')
    print('First 5 rows:')
    spark_df.show(5)
else:
    print('DataFrame creation patterns:')
    print(dataframe_creation_code)

4. Transformations (Lazy)ΒΆ

Every transformation returns a new DataFrame β€” PySpark DataFrames are immutable. No computation happens until an action is called.

transformations_code = '''
from pyspark.sql import functions as F

# select: choose/rename/compute columns
df.select("order_id", "amount", "status")
df.select(F.col("amount") * 1.1)              # 10% markup
df.select("*", F.col("amount").alias("price")) # add computed column

# filter (alias: where)
df.filter(F.col("status") == "completed")
df.filter((F.col("amount") > 100) & (F.col("category") == "Electronics"))
df.where("amount > 100 AND status = 'completed'")  # SQL string syntax

# withColumn: add/replace column
df.withColumn("total", F.col("amount") * F.col("quantity"))
df.withColumn("status_upper", F.upper(F.col("status")))
df.withColumn("order_year", F.year(F.to_date("order_date", "yyyy-MM-dd")))

# withColumnRenamed
df.withColumnRenamed("amount", "order_amount")

# drop columns
df.drop("order_date", "quantity")

# distinct / dropDuplicates
df.distinct()
df.dropDuplicates(["customer_id"])  # keep first per customer

# groupBy + agg
df.groupBy("category").agg(
    F.count("*").alias("order_count"),
    F.sum("amount").alias("total_revenue"),
    F.avg("amount").alias("avg_order"),
    F.max("amount").alias("max_order")
)

# orderBy / sort
df.orderBy("amount", ascending=False)
df.orderBy(F.col("amount").desc(), F.col("order_id").asc())

# join
orders.join(customers, on="customer_id", how="inner")  # inner/left/right/outer/cross
orders.join(customers, orders.customer_id == customers.id, "left")
'''

if HAS_SPARK:
    from pyspark.sql import functions as F
    
    # Chained transformations (all lazy)
    result = (
        spark_df
        .filter(F.col('status') == 'completed')
        .withColumn('total_value', F.col('amount') * F.col('quantity'))
        .withColumn('order_year', F.year(F.to_date('order_date', 'yyyy-MM-dd')))
        .groupBy('category')
        .agg(
            F.count('*').alias('order_count'),
            F.round(F.sum('total_value'), 2).alias('total_revenue'),
            F.round(F.avg('amount'), 2).alias('avg_amount')
        )
        .orderBy('total_revenue', ascending=False)
    )
    print('Revenue by category (completed orders):')
    result.show()
else:
    print('Transformations reference:')
    print(transformations_code)
    
    # pandas equivalent
    result = (
        orders_pandas
        .query("status == 'completed'")
        .assign(total_value=lambda df: df['amount'] * df['quantity'])
        .groupby('category')
        .agg(order_count=('order_id','count'),
             total_revenue=('total_value','sum'),
             avg_amount=('amount','mean'))
        .round(2)
        .sort_values('total_revenue', ascending=False)
    )
    print('Pandas equivalent result:')
    display(result)
# Window functions in PySpark
window_functions_code = '''
from pyspark.sql.window import Window
from pyspark.sql import functions as F

# Define window
window_by_customer = Window.partitionBy("customer_id").orderBy("order_date")
window_category    = Window.partitionBy("category").orderBy(F.col("amount").desc())
window_running     = Window.orderBy("order_date").rowsBetween(
                         Window.unboundedPreceding, Window.currentRow)

df.withColumn("row_num",     F.row_number().over(window_by_customer)) \\
  .withColumn("rank_cat",    F.rank().over(window_category)) \\
  .withColumn("prev_amount", F.lag("amount", 1).over(window_by_customer)) \\
  .withColumn("running_sum", F.sum("amount").over(window_running))
'''

if HAS_SPARK:
    from pyspark.sql.window import Window
    
    w_cat = Window.partitionBy('category').orderBy(F.col('amount').desc())
    
    top_orders = (
        spark_df
        .withColumn('rank_in_category', F.rank().over(w_cat))
        .filter(F.col('rank_in_category') <= 2)
        .select('category', 'product', 'amount', 'rank_in_category')
        .orderBy('category', 'rank_in_category')
    )
    print('Top 2 orders per category by amount:')
    top_orders.show()
else:
    print('PySpark Window functions:')
    print(window_functions_code)
    
    # pandas equivalent
    orders_pandas['rank_in_cat'] = orders_pandas.groupby('category')['amount'].rank(
        method='first', ascending=False
    )
    top2 = (orders_pandas[orders_pandas['rank_in_cat'] <= 2]
            [['category','product','amount','rank_in_cat']]
            .sort_values(['category','rank_in_cat']))
    print('Pandas equivalent:')
    display(top2.head(10))

5. Actions (Trigger Computation)ΒΆ

actions_code = '''
# Actions that return data to the driver:
df.show(n=20, truncate=True)     # print to console
df.count()                        # number of rows (triggers full scan)
df.collect()                      # ALL rows as Python list β€” dangerous on large data!
df.first()                        # first Row object
df.head(n)                        # first n Row objects
df.take(n)                        # same as head(n)
df.toPandas()                     # convert to pandas β€” only for small data!

# Actions that write data:
df.write.parquet("/out/orders/", mode="overwrite")         # best for Spark
df.write.csv("/out/orders.csv", header=True, mode="append")
df.write.json("/out/orders/")
df.write.saveAsTable("default.orders")                     # Hive metastore
df.write.format("delta").save("/out/orders/")              # Delta Lake

# Write modes:
#   overwrite  β€” replace existing data
#   append     β€” add to existing data
#   error      β€” fail if exists (default)
#   ignore     β€” skip if exists

# Partitioned write (creates subdirectories by value)
df.write.partitionBy("order_year", "category") \\
        .parquet("/out/orders_partitioned/", mode="overwrite")
# Produces: /out/orders_partitioned/order_year=2023/category=Electronics/part-00000.parquet
'''

if HAS_SPARK:
    print('Actions demonstration:')
    print(f'  count():  {spark_df.count()}')
    print(f'  first():  {spark_df.first()["order_id"]}')
    print(f'  Partitions: {spark_df.rdd.getNumPartitions()}')
    
    # toPandas (safe for small results)
    small = spark_df.groupBy('status').count().toPandas()
    print('  Status counts (via toPandas):')
    display(small)
else:
    print('Actions reference:')
    print(actions_code)

6. Spark SQL: SQL API Against DataFramesΒΆ

spark_sql_code = '''
# Register DataFrame as a temporary view (SQL-queryable)
df.createOrReplaceTempView("orders")

# Run SQL β€” same optimizer, same performance
result = spark.sql("""
    SELECT
        category,
        COUNT(*) AS order_count,
        ROUND(SUM(amount), 2) AS total_revenue,
        ROUND(AVG(amount), 2) AS avg_order,
        ROUND(SUM(amount) / SUM(SUM(amount)) OVER () * 100, 1) AS pct_of_total
    FROM orders
    WHERE status = 'completed'
    GROUP BY category
    ORDER BY total_revenue DESC
""")
result.show()

# Global temp views (survive across SparkSessions)
df.createOrReplaceGlobalTempView("orders_global")
spark.sql("SELECT * FROM global_temp.orders_global LIMIT 5")

# Mix SQL and DataFrame API freely:
top_categories = spark.sql("SELECT category FROM orders GROUP BY category HAVING COUNT(*) > 100")
filtered = df.join(top_categories, on="category")
'''

if HAS_SPARK:
    spark_df.createOrReplaceTempView('orders')
    
    sql_result = spark.sql("""
        SELECT
            category,
            COUNT(*) AS order_count,
            ROUND(SUM(amount), 2) AS total_revenue,
            ROUND(AVG(amount), 2) AS avg_order
        FROM orders
        WHERE status = 'completed'
        GROUP BY category
        ORDER BY total_revenue DESC
    """)
    
    print('Spark SQL result:')
    sql_result.show()
    
    print('Explain plan (Spark optimizer output):')
    sql_result.explain(mode='simple')
else:
    print('Spark SQL patterns:')
    print(spark_sql_code)

7. Performance: Partitioning, Caching, Broadcast JoinsΒΆ

performance_code = '''
# --- Caching ---
# Use when a DataFrame is reused multiple times
df.cache()              # MEMORY_AND_DISK (default)
df.persist()            # same as cache()
df.persist(StorageLevel.MEMORY_ONLY)  # faster, may cause OOM
df.unpersist()          # release cache

# When to cache: DataFrame used in 2+ actions, computation is expensive
# When NOT to cache: DataFrame used once, data doesn't fit in memory

# --- Broadcast Join ---
# When one table is small (< few hundred MB), broadcast it to all executors
# Avoids shuffle entirely β€” massive speedup
from pyspark.sql.functions import broadcast

# Auto-broadcast threshold (default: 10MB)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 50 * 1024 * 1024)  # 50MB

# Explicit broadcast hint:
orders.join(broadcast(lookup_table), on="product_id")

# --- Repartition vs Coalesce ---
df.repartition(100)          # increase OR decrease partitions (full shuffle)
df.coalesce(10)              # only decrease (no shuffle β€” merge local partitions)
df.repartition("category")   # repartition by column value (for partitioned writes)

# Rule of thumb: 128MB per partition
# Too few partitions β†’ cores sit idle
# Too many partitions β†’ task scheduling overhead

# --- Adaptive Query Execution (AQE) ---
spark.conf.set("spark.sql.adaptive.enabled", "true")  # Spark 3.0+ default
# AQE dynamically adjusts partitions and join strategies at runtime

# --- Skew handling ---
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# Splits skewed partitions automatically
'''

if HAS_SPARK:
    # Demonstrate caching
    import time
    
    completed = spark_df.filter(F.col('status') == 'completed').cache()
    
    # First action: materializes cache
    t0 = time.time()
    n1 = completed.count()
    t1 = time.time() - t0
    
    # Second action: reads from cache
    t0 = time.time()
    n2 = completed.groupBy('category').count().collect()
    t2 = time.time() - t0
    
    print(f'First action (fill cache):  {t1*1000:.1f}ms  ({n1} rows)')
    print(f'Second action (from cache): {t2*1000:.1f}ms')
    
    completed.unpersist()
    
    # Broadcast join demo
    print()
    category_meta = spark.createDataFrame([
        ('Electronics', 'High Value'), ('Books', 'Low Margin'), ('Clothing', 'Seasonal')
    ], ['category', 'tier'])
    
    joined = spark_df.join(F.broadcast(category_meta), on='category', how='left')
    print('Broadcast join explain:')
    joined.explain(mode='simple')
else:
    print('Performance patterns:')
    print(performance_code)

8. pandas vs PySpark: When to Use EachΒΆ

Factor

pandas

PySpark

Data size

< 10GB (fits in RAM)

> 10GB (or terabytes)

Execution

Single machine, single core (vectorized)

Multi-node cluster

Speed on small data

Faster (no serialization overhead)

Slower (cluster startup cost)

Speed on large data

OOM or very slow

Linear scale-out

API maturity

Very mature, rich ecosystem

Growing fast (pandas-on-Spark)

ML integration

scikit-learn, everything

MLlib, works with pandas UDFs

Debugging

Easy (standard Python)

Harder (distributed errors)

Cost

Free (laptop)

Cloud cluster costs

Learning curve

Low

Medium-High

Mutation

DataFrames are mutable

DataFrames are immutable

Indexing

Row index (0,1,2…)

No row index (partitioned)

Null handling

NaN (float) + None

null (unified)

# pandas ↔ PySpark API translation table
comparison = {
    'pandas': [
        'df["col"]',
        'df[["a","b"]]',
        'df.rename(columns={"old":"new"})',
        'df[df.col > 100]',
        'df.assign(x=df.a+df.b)',
        'df.groupby("col").agg({"a":"sum","b":"mean"})',
        'df.sort_values("col", ascending=False)',
        'df.merge(df2, on="key", how="left")',
        'df.drop_duplicates()',
        'df.fillna(0)',
        'df.isna().sum()',
        'len(df)',
        'df.head(5)',
        'df.dtypes',
        'df.to_csv("out.csv")',
    ],
    'PySpark': [
        'df.select("col")  or  df.col',
        'df.select("a","b")',
        'df.withColumnRenamed("old","new")',
        'df.filter(F.col("col") > 100)',
        'df.withColumn("x", F.col("a")+F.col("b"))',
        'df.groupBy("col").agg(F.sum("a"), F.avg("b"))',
        'df.orderBy(F.col("col").desc())',
        'df.join(df2, on="key", how="left")',
        'df.distinct()',
        'df.fillna(0)',
        'df.select([F.count(F.when(F.isnan(c),c)).alias(c) for c in df.columns])',
        'df.count()',
        'df.show(5)  or  df.head(5)',
        'df.dtypes  or  df.printSchema()',
        'df.write.csv("out/", header=True)',
    ]
}

comp_df = pd.DataFrame(comparison)
comp_df.index += 1
display(comp_df)
# pandas-on-Spark (formerly Koalas) β€” pandas API on Spark
pandas_on_spark = '''
# Spark 3.2+: pandas API on Spark (pyspark.pandas)
import pyspark.pandas as ps

# Almost identical to pandas, runs on Spark underneath
df = ps.read_csv("/data/orders.csv")
df[df["status"] == "completed"].groupby("category")["amount"].mean()

# Convert between APIs
spark_df = df.to_spark()         # pyspark.pandas β†’ Spark DataFrame
ps_df    = spark_df.pandas_api() # Spark DataFrame β†’ pyspark.pandas
pd_df    = df.to_pandas()        # pyspark.pandas β†’ regular pandas (collects!)
'''

print('pandas-on-Spark (best of both worlds):')
print(pandas_on_spark)

if HAS_SPARK:
    spark.stop()
    print('SparkSession stopped.')

Cheat SheetΒΆ

# SparkSession
spark = SparkSession.builder.appName('app').master('local[*]').getOrCreate()

# Create DataFrame
df = spark.createDataFrame(pandas_df)
df = spark.read.parquet('/path/')
df = spark.read.csv('/path/', header=True, inferSchema=True)

# Inspect
df.printSchema()  df.show(5)  df.count()  df.describe().show()

# Transformations (lazy)
df.select('a','b')                          # columns
df.filter(F.col('x') > 0)                   # filter rows
df.withColumn('c', F.col('a') + F.col('b')) # new column
df.groupBy('cat').agg(F.sum('amt'))         # aggregate
df.orderBy(F.col('amt').desc())             # sort
df.join(other, on='key', how='left')        # join
df.repartition(100)  df.coalesce(10)        # partition control

# Actions (eager)
df.show()  df.count()  df.collect()  df.toPandas()  df.write.parquet(...)

# Performance
df.cache()                          # reuse in multiple actions
orders.join(broadcast(small), ...)  # avoid shuffle for small tables
spark.conf.set('spark.sql.adaptive.enabled', 'true')  # AQE

# SQL API
df.createOrReplaceTempView('t')
spark.sql('SELECT * FROM t WHERE x > 0').show()

9. ExercisesΒΆ

  1. DataFrame operations: Using the orders_pandas DataFrame (or create a Spark DataFrame from it), write a PySpark pipeline that: (a) filters to completed orders, (b) adds a total_value column (amount * quantity), Β© computes the top-5 products by total revenue, (d) shows the result.

  2. Window functions: Using PySpark window functions (or pandas equivalent), compute: for each product, the running 7-day total revenue ordered by date. Show the last 10 rows.

  3. Broadcast join: Create a small discounts DataFrame ({'category': ..., 'discount_pct': ...}). Join it to the orders DataFrame using broadcast(). Add a discounted_amount column. Verify with .explain() that a broadcast join was used.

  4. Repartitioning: Create a DataFrame with 4 partitions (df.repartition(4)). Use df.rdd.mapPartitions(lambda it: [sum(1 for _ in it)]) to count rows per partition. Then use repartition('category') and repeat β€” what changed?

  5. Spark SQL: Register the orders DataFrame as a temp view called orders. Write a Spark SQL query using a window function (ROW_NUMBER or RANK) to find the top-3 orders by amount for each category. Compare the query plan (explain()) with the equivalent DataFrame API code.