Spark countByKey()
countByKey() counts the number of values associated with each key in a key-value RDD and returns the result as a Python dictionary on the driver. It’s an action (not a transformation) that immediately triggers computation and returns a local dict — not an RDD.
Basic Usage
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("countByKey").getOrCreate()sc = spark.sparkContext
events = sc.parallelize([ ("page_view", "/home"), ("click", "buy-button"), ("page_view", "/products"), ("purchase", "order-123"), ("click", "nav-menu"), ("page_view", "/about"),])
counts = events.countByKey()print(counts)# defaultdict(<class 'int'>, {'page_view': 3, 'click': 2, 'purchase': 1})
# Access like a regular dictprint(counts["page_view"]) # 3print(counts.get("checkout", 0)) # 0 — key not present
# Sort by count descendingsorted_counts = sorted(counts.items(), key=lambda x: x[1], reverse=True)for event_type, count in sorted_counts: print(f"{event_type}: {count}")# page_view: 3# click: 2# purchase: 1countByKey vs countByValue
simple = sc.parallelize(["a", "b", "a", "c", "b", "a"])
# countByValue — counts occurrences of each distinct ELEMENTsimple.countByValue() # {"a": 3, "b": 2, "c": 1}
pairs = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])
# countByKey — counts how many (key, value) pairs have each KEYpairs.countByKey() # {"a": 2, "b": 1}Real-World Examples
Session Analysis
sessions = sc.parallelize([ ("user_001", "login"), ("user_002", "search"), ("user_001", "checkout"), ("user_001", "purchase"), ("user_002", "login"), ("user_003", "login"),])
# Count events per userevents_per_user = sessions.countByKey()print(events_per_user)# {"user_001": 3, "user_002": 2, "user_003": 1}
# Find users with the most activitytop_users = sorted(events_per_user.items(), key=lambda x: x[1], reverse=True)print(top_users[:3]) # [("user_001", 3), ("user_002", 2), ("user_003", 1)]Limitation: Result Must Fit in Driver Memory
countByKey() brings the entire result dictionary to the driver. This is safe when the number of distinct keys is small, but problematic when there are millions of keys:
# SAFE: 1000 product categories → small dictproduct_events = sc.parallelize(...)product_events.countByKey() # dict with ~1000 entries
# UNSAFE: 50 million user IDs → dict with 50M entries in driver memoryuser_events.countByKey() # Risk: OutOfMemoryError on driver
# SAFER alternative for large cardinality: stay in distributed modeuser_events \ .mapValues(lambda v: 1) \ .reduceByKey(lambda a, b: a + b) # Returns an RDD, not a dictDataFrame Equivalent
from pyspark.sql import functions as F
df = spark.createDataFrame( [("page_view", "/home"), ("click", "buy"), ("page_view", "/about")], ["event_type", "target"])
# Equivalent to countByKey but distributed (returns DataFrame)df.groupBy("event_type").count().orderBy(F.col("count").desc()).show()# +----------+-----+# |event_type|count|# +----------+-----+# | page_view| 2|# | click| 1|# +----------+-----+