Technology  /  Apache Spark

Apache Spark 49 guides · updated 2026

Distributed data processing at scale — RDDs, DataFrames, Structured Streaming, and the tuning techniques that keep Spark jobs fast and cheap.

Spark countByKey()

countByKey() counts the number of values associated with each key in a key-value RDD and returns the result as a Python dictionary on the driver. It’s an action (not a transformation) that immediately triggers computation and returns a local dict — not an RDD.


Basic Usage

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("countByKey").getOrCreate()
sc = spark.sparkContext
events = sc.parallelize([
("page_view", "/home"),
("click", "buy-button"),
("page_view", "/products"),
("purchase", "order-123"),
("click", "nav-menu"),
("page_view", "/about"),
])
counts = events.countByKey()
print(counts)
# defaultdict(<class 'int'>, {'page_view': 3, 'click': 2, 'purchase': 1})
# Access like a regular dict
print(counts["page_view"]) # 3
print(counts.get("checkout", 0)) # 0 — key not present
# Sort by count descending
sorted_counts = sorted(counts.items(), key=lambda x: x[1], reverse=True)
for event_type, count in sorted_counts:
print(f"{event_type}: {count}")
# page_view: 3
# click: 2
# purchase: 1

countByKey vs countByValue

simple = sc.parallelize(["a", "b", "a", "c", "b", "a"])
# countByValue — counts occurrences of each distinct ELEMENT
simple.countByValue() # {"a": 3, "b": 2, "c": 1}
pairs = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])
# countByKey — counts how many (key, value) pairs have each KEY
pairs.countByKey() # {"a": 2, "b": 1}

Real-World Examples

Session Analysis

sessions = sc.parallelize([
("user_001", "login"), ("user_002", "search"),
("user_001", "checkout"), ("user_001", "purchase"),
("user_002", "login"), ("user_003", "login"),
])
# Count events per user
events_per_user = sessions.countByKey()
print(events_per_user)
# {"user_001": 3, "user_002": 2, "user_003": 1}
# Find users with the most activity
top_users = sorted(events_per_user.items(), key=lambda x: x[1], reverse=True)
print(top_users[:3]) # [("user_001", 3), ("user_002", 2), ("user_003", 1)]

Limitation: Result Must Fit in Driver Memory

countByKey() brings the entire result dictionary to the driver. This is safe when the number of distinct keys is small, but problematic when there are millions of keys:

# SAFE: 1000 product categories → small dict
product_events = sc.parallelize(...)
product_events.countByKey() # dict with ~1000 entries
# UNSAFE: 50 million user IDs → dict with 50M entries in driver memory
user_events.countByKey() # Risk: OutOfMemoryError on driver
# SAFER alternative for large cardinality: stay in distributed mode
user_events \
.mapValues(lambda v: 1) \
.reduceByKey(lambda a, b: a + b) # Returns an RDD, not a dict

DataFrame Equivalent

from pyspark.sql import functions as F
df = spark.createDataFrame(
[("page_view", "/home"), ("click", "buy"), ("page_view", "/about")],
["event_type", "target"]
)
# Equivalent to countByKey but distributed (returns DataFrame)
df.groupBy("event_type").count().orderBy(F.col("count").desc()).show()
# +----------+-----+
# |event_type|count|
# +----------+-----+
# | page_view| 2|
# | click| 1|
# +----------+-----+