This notebook was prepared by Donne Martin. Source and license info is on GitHub.

Spark

  • IPython Notebook Setup

  • Python Shell

  • DataFrames

  • RDDs

  • Pair RDDs

  • Running Spark on a Cluster

  • Viewing the Spark Application UI

  • Working with Partitions

  • Caching RDDs

  • Checkpointing RDDs

  • Writing and Running a Spark Application

  • Configuring Spark Applications

  • Streaming

  • Streaming with States

  • Broadcast Variables

  • Accumulators

IPython Notebook Setup

The dev-setup repo contains scripts to install Spark and to automate the its integration with IPython Notebook through the pydata.sh script.

You can also follow the instructions provided here to configure IPython Notebook Support for PySpark with Python 2.

To run Python 3 with Spark 1.4+, check out the following posts on Stack Overflow or Reddit.

Python Shell

Starting the PySpark Shell

The pyspark command launches an interactive Python REPL pre-configured with a SparkContext (sc) and SparkSession (spark). This is the fastest way to prototype Spark jobs – you get immediate feedback on transformations and actions without writing a full application. The shell is especially useful for exploratory data analysis on large datasets, as you can iteratively build and test your data pipeline one transformation at a time.

!pyspark

Inspecting the SparkContext

The SparkContext (sc) is the main entry point to all Spark functionality – it represents the connection to the Spark cluster and is used to create RDDs, accumulators, and broadcast variables. Displaying sc in the shell shows the application name, master URL, and Spark version, which helps verify you are connected to the correct cluster and running the expected configuration.

sc

DataFrames

From the following reference:

A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.

Loading JSON Data from S3 into a DataFrame

context.load() reads structured data from an external source (here, JSON files on Amazon S3) and returns a distributed DataFrame. Spark infers the schema automatically from the JSON structure, so you do not need to define column types manually. Loading directly from S3 means you can process datasets far larger than local memory, leveraging Spark’s distributed execution engine to parallelize reads across the cluster.

users = context.load("s3n://path/to/users.json", "json")

Filtering Rows in a DataFrame

The filter() method returns a new DataFrame containing only the rows that satisfy the given boolean condition. Spark uses lazy evaluation, meaning this filter is not executed immediately but rather recorded as part of the query plan. Spark’s Catalyst optimizer may reorder or combine filters with other operations for maximum efficiency when an action (like show() or count()) eventually triggers execution.

young = users.filter(users.age<21)

Pandas-Style Bracket Filtering

Spark DataFrames support a bracket syntax (df[condition]) that mirrors Pandas boolean indexing. This syntactic sugar makes Spark more accessible to data scientists already familiar with Pandas. Under the hood, the bracket syntax calls the same filter() method, so there is no performance difference – it is purely a readability convenience.

young = users[users.age<21]

Column Transformations with select()

The select() method projects specific columns from a DataFrame, optionally applying transformations. Arithmetic operations like young.age + 1 create new computed columns without mutating the original DataFrame (DataFrames are immutable). This pattern is essential for feature engineering in ML pipelines – you can derive new features from existing columns while preserving the original data.

young.select(young.name, young.age+1)

Grouping and Aggregation

groupBy() followed by count() groups rows by the specified column and computes the count within each group. This is the Spark equivalent of SQL’s GROUP BY ... COUNT(*). Spark distributes the aggregation across the cluster using a shuffle operation, making it efficient even for billions of rows where a single-machine solution would run out of memory.

young.groupBy("gender").count()

Joining DataFrames

The join() method combines two DataFrames based on a matching condition, similar to SQL joins. The third argument specifies the join type: “left_outer” retains all rows from the left DataFrame even if there is no match in the right. Spark’s Catalyst optimizer automatically selects the best join strategy (broadcast join for small tables, sort-merge join for large ones) based on table statistics.

young.join(logs, logs.userId == users.userId, "left_outer")

Running SQL Queries on DataFrames

registerTempTable() registers a DataFrame as a temporary SQL table, enabling you to use standard SQL syntax via context.sql(). This is powerful because it allows complex queries with subqueries, window functions, and CTEs that might be awkward to express using the DataFrame API alone. The SQL approach is often preferred by analysts who are already fluent in SQL, while both APIs produce the same optimized execution plan.

young.registerTempTable("young")
context.sql("SELECT count(*) FROM young")

Converting Spark DataFrame to Pandas

toPandas() collects all distributed data to the driver node and returns a local Pandas DataFrame. This is useful for visualization (Matplotlib, Seaborn) or using libraries that only work with in-memory data. Caution: calling toPandas() on a large DataFrame will attempt to load the entire dataset into driver memory, potentially causing an out-of-memory error. Always filter or aggregate to a manageable size first.

pandas_df = young.toPandas()

Creating a Spark DataFrame from Pandas

createDataFrame() converts a local Pandas DataFrame into a distributed Spark DataFrame. This enables a common workflow: load and clean a small dataset locally in Pandas, then distribute it across the cluster for joining with larger datasets or running parallel ML training. Spark infers the schema from the Pandas dtypes, though you can also supply an explicit schema for more control.

spark_df = context.createDataFrame(pandas_df)

Creating a SQLContext

The SQLContext is the entry point for working with structured data (DataFrames and SQL) in Spark 1.x. It wraps the SparkContext and adds DataFrame and SQL capabilities. In Spark 2.x and later, SQLContext is superseded by SparkSession, which unifies SQL, streaming, and ML contexts into a single entry point.

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

Loading JSON Files into a DataFrame via SQLContext

sqlContext.jsonFile() reads a JSON file and infers the schema automatically to create a DataFrame. Each line of the JSON file is treated as a separate record. This method is convenient for semi-structured data that does not follow a rigid tabular format, as Spark handles nested fields and arrays within the JSON structure.

df = sqlContext.jsonFile("file:/path/file.json")

Displaying DataFrame Contents

show() triggers execution of the lazy query plan and prints the first 20 rows (by default) in a formatted table. This is the standard way to inspect data during development. You can pass an integer argument like show(50) to control how many rows are displayed, or show(truncate=False) to prevent column values from being truncated.

df.show()

Inspecting the Schema

printSchema() displays the DataFrame’s column names, data types, and nullability in a tree format. Understanding the schema is critical before performing joins, aggregations, or ML feature extraction, because type mismatches (e.g., a numeric column inferred as string) cause runtime errors. This is especially important when loading semi-structured data like JSON where the schema is inferred rather than declared.

df.printSchema()

Selecting a Single Column

select() with a column name string returns a new DataFrame containing only that column. This is the Spark equivalent of SQL’s SELECT column_name FROM table. Selecting only the columns you need reduces the amount of data shuffled across the network during subsequent operations, which is a key optimization in distributed computing.

df.select("column_name")

Filtering with Column Expressions

filter() accepts column expressions using DataFrame column references (e.g., df.column_name > 10). The condition is pushed down into the query plan and, when reading from sources like Parquet or JDBC, Spark can apply predicate pushdown to avoid reading unnecessary data from disk. This makes filtering one of the cheapest operations in Spark.

df.filter(df.column_name>10)

Group By and Count

Combining groupBy() with count() computes the frequency of each distinct value in the specified column. This triggers a shuffle operation where data with the same key is sent to the same partition for aggregation. Shuffle is the most expensive operation in Spark (network I/O), so minimizing the data size before a groupBy (e.g., by filtering first) significantly improves performance.

df.groupBy("column_name").count()

Converting an RDD to a DataFrame

inferSchema() examines the data in an RDD to automatically determine column names and types, then wraps it in a DataFrame. This bridges the gap between Spark’s low-level RDD API and the higher-level DataFrame API, which benefits from Catalyst query optimization and Tungsten memory management. In modern Spark (2.x+), you would use spark.createDataFrame() instead.

df = sqlContext.inferSchema(my_data)

Registering a DataFrame as a SQL Table

registerTempTable() makes a DataFrame queryable via SQL by assigning it a table name in the catalog. The table exists only for the duration of the SparkSession (it is not persisted to any metastore). This is the gateway to mixing DataFrame operations with full SQL queries, allowing you to leverage SQL features like window functions, CASE expressions, and subqueries.

df.registerTempTable("dataframe_name")

Querying a Registered Table with SQL

sqlContext.sql() executes a SQL query against registered temporary tables and returns the result as a DataFrame. The result can be further processed with DataFrame operations, saved to disk, or converted to Pandas. Spark’s Catalyst optimizer parses the SQL string and generates the same optimized physical plan as equivalent DataFrame operations, so there is no performance penalty for choosing SQL over the programmatic API.

rdd_from_df = sqlContext.sql("SELECT * FROM dataframe_name")

RDDs

Note: RDDs are included for completeness. In Spark 1.3, DataFrames were introduced which are recommended over RDDs. Check out the DataFrames announcement for more info.

Resilient Distributed Datasets (RDDs) are the fundamental unit of data in Spark. RDDs can be created from a file, from data in memory, or from another RDD. RDDs are immutable.

There are two types of RDD operations:

  • Actions: Returns values, data is not processed in an RDD until an action is preformed

  • Transformations: Defines a new RDD based on the current

Creating an RDD from Text Files

sc.textFile() reads text files from any supported filesystem (local, HDFS, S3) and returns an RDD where each element is one line of text. The wildcard * pattern reads all files in a directory, which is common when data is split across multiple part files. Spark automatically partitions the data across the cluster based on the number of HDFS blocks or file splits, enabling parallel processing from the very first step.

my_data = sc.textFile("file:/path/*")

Counting Elements in an RDD

count() is an action that triggers execution of the entire lineage of transformations and returns the total number of elements. Because actions materialize the result, count() forces Spark to read, transform, and aggregate the data across all partitions. It is commonly used to verify data loaded correctly or to check how many records survive a filter.

my_data.count()

Return all the elements of the dataset as an array–this is usually more useful after a filter or other operation that returns a sufficiently small subset of the data:

my_data.collect()

Previewing Data with take()

take(n) returns the first n elements of the RDD to the driver as a Python list. Unlike collect(), which pulls the entire dataset into memory, take() is safe for large datasets because it only retrieves a small sample. It is the go-to method for quick inspection during development, similar to Pandas’ head().

my_data.take(10)

Filtering an RDD with Lambda Functions

filter() is a transformation that returns a new RDD containing only the elements for which the lambda function returns True. Transformations are lazy – Spark records them in the lineage graph but does not execute anything until an action is called. Chaining multiple filters together does not incur multiple passes over the data; Spark’s optimizer fuses them into a single pass.

my_data.filter(lambda line: ".txt" in line)

Chaining Transformations and Actions

Spark’s fluent API lets you chain textFile(), filter(), and count() into a single expression. This declarative style describes what you want computed rather than how to compute it, and Spark’s DAG scheduler determines the optimal execution plan. Chaining is not just syntactic convenience – it allows Spark to see the entire pipeline at once and optimize across operations (e.g., combining map and filter into a single stage).

sc.textFile("file:/path/file.txt") \
    .filter(lambda line: ".txt" in line) \
    .count()

Mapping and Extracting Fields

map() applies a function to each element of the RDD and returns a new RDD of the results. Here, each line is split into words and only the first word is kept. This is a fundamental narrow transformation – each input partition maps independently to one output partition with no data shuffling. Map operations are the building blocks for data cleaning, parsing, and feature extraction in Spark pipelines.

first_words = my_data.map(lambda line: line.split()[0])

Iterating Over RDD Results

After collecting a subset of data to the driver with take(), you can iterate over it using a standard Python for loop. This pattern is useful for printing, debugging, or applying non-distributed logic to a small sample. Remember that take() returns a regular Python list, so all Python string and collection operations are available on the result.

for word in first_words.take(10):
    print word

Saving RDD Output to Disk

saveAsTextFile() writes each partition of the RDD to a separate file in the specified directory. Spark creates one output file per partition (e.g., part-00000, part-00001), which enables parallel writes for maximum throughput. The output directory must not already exist, or Spark will raise an error to prevent accidental data overwrites.

first_words.saveAsTextFile("file:/path/file")

Pair RDDs

Pair RDDs contain elements that are key-value pairs. Keys and values can be any type.

Given a log file with the following space deilmited format: [date_time, user_id, ip_address, action], map each request to (user_id, 1):

DATE_TIME = 0
USER_ID = 1
IP_ADDRESS = 2
ACTION = 3

log_data = sc.textFile("file:/path/*")

user_actions = log_data \
    .map(lambda line: line.split()) \
    .map(lambda words: (words[USER_ID], 1))  \
    .reduceByKey(lambda count1, count2: count1 + count2)

Sorting Pair RDD Results

sortByKey() reorders the Pair RDD by key. Passing False sorts in descending order, which is useful for ranking (e.g., finding the most active users). This is a wide transformation that requires a full shuffle across the cluster, as data must be redistributed so that all keys end up in sorted order across partitions.

user_actions.map(lambda pair: (pair[0], pair[1])).sortyByKey(False).take(5)

Grouping Values by Key

groupByKey() collects all values with the same key into a single iterable. While convenient, it is less efficient than reduceByKey() because it shuffles all values across the network before grouping, rather than performing a local pre-aggregation. Use groupByKey() when you need the full list of values per key (e.g., collecting all IP addresses for each user), but prefer reduceByKey() or aggregateByKey() when performing aggregations.

user_ips = log_data \
    .map(lambda line: line.split()) \
    .map(lambda words: (words[IP_ADDRESS],words[USER_ID])) \
    .groupByKey()

Given a user table with the following csv format: [user_id, user_info0, user_info1, …], map each line to (user_id, [user_info…]):

user_data = sc.textFile("file:/path/*")

user_profile = user_data \
    .map(lambda line: line.split(',')) \
    .map(lambda words: (words[0], words[1:]))

Joining Two Pair RDDs

join() on Pair RDDs performs an inner join on the keys, producing a new Pair RDD where each key maps to a tuple of values from both RDDs. This is the distributed equivalent of a SQL INNER JOIN. Spark shuffles both RDDs by key so that matching records land on the same partition, then combines them. For optimal performance, ensure that both RDDs are partitioned by the join key using partitionBy().

user_actions_with_profile = user_actions.join(user_profile)

Displaying Joined Results

After a join, iterating over the result with take() and a for loop lets you inspect the combined records. The destructuring syntax (user_id, (user_info, count)) unpacks the nested tuple structure that Spark produces from a join – the outer tuple is (key, value) and the inner value is a tuple of the two joined values.

for (user_id, (user_info, count)) in user_actions_with_profiles.take(10):
    print user_id, count, user_info

Running Spark on a Cluster

Starting Cluster Daemons

The spark-master and spark-worker services are the core components of Spark’s standalone cluster manager. The master coordinates resource allocation and job scheduling, while workers execute tasks on their respective nodes. Starting these services is the first step to running Spark in cluster mode rather than the local mode used for development.

!sudo service spark-master start
!sudo service spark-worker start

Stopping Cluster Daemons

Stopping the master and worker services gracefully shuts down the Spark standalone cluster. Running jobs will be terminated, and allocated resources will be released. In production, you would typically use a cluster manager like YARN or Kubernetes rather than manually starting and stopping standalone daemons.

!sudo service spark-master stop
!sudo service spark-worker stop

Restarting Cluster Daemons

A stop-then-start cycle is useful when you have changed Spark configuration files (e.g., memory settings, number of cores) and need the changes to take effect. Configuration changes only apply when the daemons are restarted. Note: this example only stops the services; in practice, you would follow with the corresponding start commands.

!sudo service spark-master stop
!sudo service spark-worker stop

Spark Standalone Cluster UI

The standalone cluster master hosts a web UI on port 18080 that displays worker node status, running applications, completed applications, and resource utilization (memory and cores). This dashboard is essential for monitoring cluster health and diagnosing resource contention issues when multiple applications compete for the same workers.

http://localhost:18080//

Connecting PySpark Shell to a Cluster

Setting the MASTER environment variable to a Spark master URL (e.g., spark://localhost:7077) directs the PySpark shell to submit tasks to that cluster instead of running locally. This is how you transition from local development to distributed execution – the same code runs unchanged, but Spark distributes the work across all available workers.

!MASTER=spark://localhost:7077 pyspark

Verifying the Master Connection

sc.master returns the URL of the Spark master that this SparkContext is connected to. Checking this value confirms whether you are running in local mode (local[*]) or connected to a standalone cluster (spark://host:port), YARN (yarn), or another cluster manager. This is a quick sanity check to ensure your jobs will execute where you expect.

sc.master

Viewing the Spark Application UI

From the following reference:

Every SparkContext launches a web UI, by default on port 4040, that displays useful information about the application. This includes:

A list of scheduler stages and tasks A summary of RDD sizes and memory usage Environmental information. Information about the running executors

You can access this interface by simply opening http://:4040 in a web browser. If multiple SparkContexts are running on the same host, they will bind to successive ports beginning with 4040 (4041, 4042, etc).

Note that this information is only available for the duration of the application by default. To view the web UI after the fact, set spark.eventLog.enabled to true before starting the application. This configures Spark to log Spark events that encode the information displayed in the UI to persisted storage.

http://localhost:4040/

Working with Partitions

From the following reference:

The Spark map() and flatMap() methods only operate on one input at a time, and provide no means to execute code before or after transforming a batch of values. It looks possible to simply put the setup and cleanup code before and after a call to map() in Spark:

val dbConnection = ...
lines.map(... dbConnection.createStatement(...) ...)
dbConnection.close() // Wrong!

However, this fails for several reasons:

  • It puts the object dbConnection into the map function’s closure, which requires that it be serializable (for example, by implementing java.io.Serializable). An object like a database connection is generally not serializable.

  • map() is a transformation, rather than an operation, and is lazily evaluated. The connection can’t be closed immediately here.

  • Even so, it would only close the connection on the driver, not necessarily freeing resources allocated by serialized copies.

In fact, neither map() nor flatMap() is the closest counterpart to a Mapper in Spark — it’s the important mapPartitions() method. This method does not map just one value to one other value, but rather maps an Iterator of values to an Iterator of other values. It’s like a “bulk map” method. This means that the mapPartitions() function can allocate resources locally at its start, and release them when done mapping many values.

def count_txt(partIter):
    for line in partIter: 
        if ".txt" in line: txt_count += 1
    yield (txt_count)

my_data = sc.textFile("file:/path/*") \
    .mapPartitions(count_txt) \
    .collect()
    
# Show the partitioning 
print "Data partitions: ", my_data.toDebugString()

Caching RDDs

Caching an RDD saves the data in memory. Caching is a suggestion to Spark as it is memory dependent.

By default, every RDD operation executes the entire lineage. Caching can boost performance for datasets that are likely to be used by saving this expensive recomputation and is ideal for iterative algorithms or machine learning.

  • cache() stores data in memory

  • persist() stores data in MEMORY_ONLY, MEMORY_AND_DISK (spill to disk), and DISK_ONLY

Disk memory is stored on the node, not on HDFS.

Replication is possible by using MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. If a cached partition becomes unavailable, Spark recomputes the partition through the lineage.

Serialization is possible with MEMORY_ONLY_SER and MEMORY_AND_DISK_SER. This is more space efficient but less time efficient, as it uses Java serialization by default.

# Cache RDD to memory
my_data.cache()

# Persist RDD to both memory and disk (if memory is not enough), with replication of 2
my_data.persist(MEMORY_AND_DISK_2)

# Unpersist RDD, removing it from memory and disk
my_data.unpersist()

# Change the persistence level after unpersist
my_data.persist(MEMORY_AND_DISK)

Checkpointing RDDs

Caching maintains RDD lineage, providing resilience. If the lineage is very long, it is possible to get a stack overflow.

Checkpointing saves the data to HDFS, which provide fault tolerant storage across nodes. HDFS is not as fast as local storage for both reading and writing. Checkpointing is good for long lineages and for very large data sets that might not fit on local storage. Checkpointing removes lineage.

Materializing a Checkpoint

Calling checkpoint() marks an RDD for checkpointing, but the data is not actually written until an action (like count()) forces evaluation. The checkpoint is saved to the directory set by sc.setCheckpointDir(). After checkpointing, Spark replaces the RDD’s lineage with a reference to the checkpoint files, which prevents stack overflow errors from extremely long transformation chains and provides fault-tolerant recovery from HDFS.

# Enable checkpointing by setting the checkpoint directory, 
# which will contain all checkpoints for the given data:
sc.setCheckpointDir("checkpoints")

my_data = sc.parallelize([1,2,3,4,5])

# Long loop that may cause a stack overflow
for i in range(1000):
    my_data = mydata.map(lambda myInt: myInt + 1)

    if i % 10 == 0: 
        my_data.checkpoint()
        my_data.count()

my_data.collect()
     
# Display the lineage
for rddstring in my_data.toDebugString().split('\n'): 
    print rddstring.strip()

Writing and Running a Spark Application

Writing a Standalone Spark Application

A Spark application is a self-contained Python script that creates its own SparkContext, runs transformations and actions, and exits. Unlike the interactive shell, applications are submitted to the cluster via spark-submit, which handles dependency distribution and cluster negotiation. Structuring your code as a standalone application is necessary for production deployment, scheduling with cron or Airflow, and running on managed services like EMR or Databricks.

import sys
from pyspark import SparkContext

if __name__ == "__main__":
    if len(sys.argv) < 2:
        print >> sys.stderr, "Usage: App Name <file>"
        exit(-1)
        
    count_text_files()
    
def count_text_files():
    sc = SparkContext()
    logfile = sys.argv[1]
    text_files_count = sc.textFile(logfile)
        .filter(lambda line: '.txt' in line)
    text_files_count.cache()
    print("Number of text files: ", text_files_count.count())

Submitting a Spark Application

spark-submit is the unified script for launching Spark applications on any cluster manager (standalone, YARN, Mesos, Kubernetes). The --properties-file flag points to a configuration file that specifies master URL, memory allocation, number of executors, and other settings. This separation of configuration from code means the same application can run on different clusters without code changes.

!spark-submit --properties-file dir/myspark.conf script.py data/*

Configuring Spark Applications

Command-Line Configuration

Spark configuration can be set directly on the spark-submit command line using flags like --master, --name, --executor-memory, and --num-executors. Command-line settings take the highest precedence, overriding both the properties file and programmatic settings. This makes it easy to adjust resource allocation per job without modifying code or config files.

!spark-submit --master spark//localhost:7077 --name 'App Name' script.py data/*

Properties File Configuration

The spark.conf file uses a simple key-value format to configure application name, master URL, UI port, memory, and hundreds of other settings. Using a properties file is cleaner than passing many command-line flags and makes configurations version-controllable and shareable across team members. Spark reads this file when --properties-file is passed to spark-submit.

spark.app.name  App Name
spark.ui.port   4141
spark.master    spark://localhost:7077

Submitting with a Properties File

Passing --properties-file spark.conf to spark-submit loads all configuration from the specified file. This is the recommended approach for production deployments where you want consistent, reproducible configuration. The properties file can be stored in version control alongside the application code.

!spark-submit --properties-file spark.conf script.py data/*

Programmatic Configuration with SparkConf

SparkConf() lets you set configuration options directly in Python code using a builder pattern. This approach is useful for dynamic configurations that depend on runtime conditions (e.g., setting the app name based on a timestamp). However, programmatic settings have the lowest precedence – they are overridden by both properties files and command-line flags.

sconf = SparkConf() \
    .setAppName("Word Count") \
    .set("spark.ui.port","4141")
sc = SparkContext(conf=sconf)

Configuring Log Levels

Spark uses Log4j for logging, and the verbosity can be controlled by editing the log4j.properties file. By default, Spark logs at the INFO level, which can produce overwhelming output. Setting the level to WARN or ERROR reduces noise and makes it easier to spot genuine issues. Placing a copy of the properties file in your working directory overrides the default configuration.

$SPARK_HOME/conf/log4j.properties.template

Streaming

Starting the Spark Shell for Streaming

Streaming requires at least two threads: one to receive data from the source and one to process it. The --master local[2] flag ensures two threads are available in local mode. Using local[1] would cause the streaming job to hang because the single thread would be occupied by the receiver, leaving no thread for processing.

!spark-shell --master local[2]

Creating a StreamingContext

The StreamingContext is the entry point for Spark Streaming applications. The Seconds(1) parameter sets the batch interval – Spark will collect incoming data for 1 second, then process that micro-batch as an RDD. Shorter intervals provide lower latency but higher overhead; longer intervals improve throughput at the cost of latency. The socketTextStream() method creates a DStream that reads text data from a TCP socket, useful for development and testing.

val ssc = new StreamingContext(new SparkConf(), Seconds(1)) 
val my_stream = ssc.socketTextStream(hostname, port)

Creating a DStream from a Socket Source

A DStream (Discretized Stream) is a continuous sequence of RDDs representing a stream of data. socketTextStream() connects to a TCP socket and creates a DStream where each element is a line of text received from the socket. In production, you would use more robust sources like Kafka, Flume, or Kinesis rather than raw sockets.

val logs = ssc.socketTextStream(hostname, port)

DStreams support regular transformations such as map, flatMap, and filter, and pair transformations such as reduceByKey, groupByKey, and joinByKey.

Apply a DStream operation to each batch of RDDs (count up requests by user id, reduce by key to get the count):

val requests = my_stream
    .map(line => (line.split(" ")(2), 1))
    .reduceByKey((x, y) => x + y)

The transform(function) method creates a new DStream by executing the input function on the RDDs.

val sorted_requests = requests
    .map(pair => pair.swap)
    .transform(rdd => rdd.sortByKey(false))

foreachRDD(function) performs a function on each RDD in the DStream (map is like a shortcut not requiring you to get the RDD first before doing an operation):

sorted_requests.foreachRDD((rdd, time) => {
    println("Top users @ " + time)
    rdd.take(5).foreach(
    pair => printf("User: %s (%s)\n", pair._2, pair._1))
}

Saving DStream Output to Files

saveAsTextFiles() writes each micro-batch RDD to a separate directory with a timestamp suffix (e.g., /dir/requests-1620000000000/). This creates a time-partitioned directory structure that is easy to process with subsequent batch jobs. Each directory contains part files from the parallel writers, following the same convention as regular RDD output.

requests.saveAsTextFiles("/dir/requests")

Starting Stream Processing

ssc.start() begins the actual ingestion and processing of streaming data. Until this method is called, the DStream transformations are just a logical plan. After starting, Spark continuously polls the input sources, creates micro-batch RDDs at the configured interval, and applies the registered transformations and output operations.

ssc.start()

Waiting for Stream Termination

awaitTermination() blocks the main thread until the streaming computation is stopped (either programmatically via ssc.stop() or by an error). Without this call, the main thread would exit immediately and the JVM would shut down, terminating the streaming application. This is required in standalone Spark Streaming applications but not in interactive notebooks.

ssc.awaitTermination()

Streaming with States

Checkpointing for Stateful Streaming

Stateful streaming operations (like updateStateByKey()) accumulate state across batches, which means the lineage grows indefinitely. Checkpointing periodically writes the state to a reliable filesystem (HDFS or S3), truncating the lineage and enabling recovery from failures. Checkpointing is mandatory for any streaming application that uses stateful transformations or window operations.

ssc.checkpoint("dir")

Stateful Streaming with updateStateByKey()

updateStateByKey() maintains a running state for each key across micro-batches. The update function receives the new values from the current batch and the previous state, then returns the updated state. This enables cumulative computations like running totals, session tracking, and online aggregations. The state is automatically checkpointed to prevent lineage explosion.

def updateCount = (newCounts: Seq[Int], state: Option[Int]) => {
    val newCount = newCounts.foldLeft(0)(_ + _)
    val previousCount = state.getOrElse(0)
    Some(newCount + previousCount)
}

val totalUserreqs = userreqs.updateStateByKey(updateCount)

Sliding Window Operations

reduceByKeyAndWindow() computes aggregations over a sliding window of data. The window duration (5 minutes) defines how much historical data to include, while the slide interval (30 seconds) defines how often to recompute. This is ideal for real-time dashboards that need rolling metrics like “requests per user in the last 5 minutes, updated every 30 seconds.” Spark can optimize this by incrementally adding new data and subtracting expired data rather than recomputing from scratch.

val reqcountsByWindow = logs.map(line => (line.split(' ')(2), 1))
    .reduceByKeyAndWindow((x: Int, y: Int) => x + y, Minutes(5), Seconds(30))

Monitoring with StreamingListener

The StreamingListener API provides callbacks for streaming events such as batch completion, receiver start/stop, and processing delays. By extending StreamingListener and overriding methods like onReceiverStopped(), you can implement custom monitoring, alerting, or graceful shutdown logic. This is essential for production streaming applications where you need to detect and respond to failures automatically.

// define listener
class MyListener extends StreamingListener {
  override def onReceiverStopped(...) {
    streamingContext.stop()
  }
} 

// attach listener
streamingContext. addStreamingListener(new MyListener())

Broadcast Variables

Loading Data for Broadcasting

Before broadcasting, the data must be loaded into the driver’s memory as a regular Python object (here, a list read from a local file). The strip() call removes whitespace and newline characters from each line. This local data will then be shipped to every worker node via Spark’s broadcast mechanism, avoiding the need for each task to independently read the file.

broadcast_file = "broadcast.txt"
broadcast_list = list(map(lambda l: l.strip(), open(broadcast_file)))

Creating a Broadcast Variable

sc.broadcast() distributes a read-only variable to all worker nodes, where it is cached in memory. Without broadcasting, Spark would serialize and ship the variable with every task, resulting in redundant network transfers. Broadcast variables are ideal for lookup tables, configuration dictionaries, and filter lists that are used by every partition but are too large to embed in a closure efficiently.

broadcast_list_sc = sc.broadcast(broadcast_list)

Filtering with a Broadcast Variable

Access the broadcast data via .value inside transformations. Here, each line is checked against the broadcast list using Python’s any() function. Because the broadcast variable is cached on each worker, the filter runs entirely in memory without any network communication per record. This pattern is common for blacklist/whitelist filtering, geolocation lookups, and feature enrichment in ETL pipelines.

log_file = "hdfs://localhost/user/logs/*"
filtered_data = sc.textFile(log_file)\
    .filter(lambda line: any(item in line for item in broadcast_list_sc.value))

filtered_data.take(10)

Accumulators

Creating an Accumulator

sc.accumulator(0) creates a shared variable that workers can only add to (not read). Accumulators provide a safe way to implement distributed counters and sums without race conditions. The driver program can read the accumulator’s final value after all tasks complete. They are commonly used for counting error records, tracking data quality metrics, or implementing custom progress indicators.

txt_count = sc.accumulator(0)

Counting with an Accumulator Inside foreach()

foreach() applies a function to each element of the RDD as a side effect (no return value). Inside the function, txt_count.add(1) increments the accumulator on the worker. Spark guarantees that each task’s accumulator updates are applied exactly once (for actions), making this a reliable way to compute global counts alongside other processing. Note that accumulator updates inside transformations may be replayed if tasks are re-executed, leading to overcounting.

my_data = sc.textFile(filePath)
my_data.foreach(lambda line: if '.txt' in line: txt_count.add(1))

Multiple Accumulators for Parallel Counting

You can create multiple accumulators to track different metrics simultaneously in a single pass over the data. This avoids reading the data multiple times (once per metric) and is far more efficient than running separate filter().count() operations. The function dispatches to different accumulators based on the data content, and all counts are available on the driver after the action completes.

jpg_count = sc.accumulator(0)
html_count = sc.accumulator(0)
css_count = sc.accumulator(0)

def countFileType(s):
    if '.jpg' in s: jpg_count.add(1)
    elif '.html' in s: html_count.add(1)
    elif '.css' in s: css_count.add(1)

filename="hdfs://logs/*"

logs = sc.textFile(filename)
logs.foreach(lambda line: countFileType(line))

print  'File Type Totals:'
print '.css files: ', css_count.value
print '.html files: ', html_count.value
print '.jpg files: ', jpg_count.value