Debug School

rakesh kumar
rakesh kumar

Posted on • Edited on

How to Creating RDDs from data sources or collections

SparkContext creates RDDs from in-memory collections or external storage (HDFS, S3, local FS, Hadoop formats).

Examples

sc = spark.sparkContext

# From an in-memory collection
nums = sc.parallelize([1,2,3,4,5], numSlices=2)

# From text files (HDFS/S3/local)
rdd_hdfs  = sc.textFile("hdfs://namenode:8020/data/logs/*.log")
rdd_s3    = sc.textFile("s3a://my-bucket/2025/07/*.json")
rdd_local = sc.textFile("file:///var/tmp/sample.txt")

# Common transforms/actions
even_squares = nums.map(lambda x: x*x).filter(lambda x: x % 2 == 0)
print("Even squares:", even_squares.collect())

pairs   = sc.parallelize([("a",1), ("b",2), ("a",3)])
reduced = pairs.reduceByKey(lambda a,b: a+b)
print("Reduced:", reduced.collect())
Enter fullscreen mode Exit fullscreen mode

output

Reduced: [('a', 4), ('b', 2)]
Enter fullscreen mode Exit fullscreen mode
  1. Create RDD from an In-Memory Collection

This example creates an RDD from an in-memory Python list.

sc = spark.sparkContext
nums = sc.parallelize([1, 2, 3, 4, 5], numSlices=2)

# Transform: square the numbers
squares = nums.map(lambda x: x * x)

# Action: collect the results
print("Squares:", squares.collect())
Enter fullscreen mode Exit fullscreen mode

output

Squares: [1, 4, 9, 16, 25]
Enter fullscreen mode Exit fullscreen mode

Data Source: In-memory Python list

Transform: map() to square numbers

Action: collect() to bring the data back to the driver

  1. Create RDD from a Text File on HDFS

This creates an RDD from a text file stored on HDFS.

rdd_hdfs = sc.textFile("hdfs://namenode:8020/data/logs/*.log")

# Action: count the number of lines
line_count = rdd_hdfs.count()
print(f"Line count in HDFS file: {line_count}")
Enter fullscreen mode Exit fullscreen mode

Data Source: HDFS

Transform: None (just reading)

Action: count() to count the number of lines

  1. Create RDD from a Text File on S3

This creates an RDD from a file stored in S3.

rdd_s3 = sc.textFile("s3a://my-bucket/2025/07/*.json")

# Transform: map JSON lines to their lengths
json_line_lengths = rdd_s3.map(lambda line: len(line))

# Action: collect the lengths
print("JSON line lengths:", json_line_lengths.collect())
Enter fullscreen mode Exit fullscreen mode

If file lines were:

{"id":1,"name":"Ashwani"}
{"id":2,"name":"John"}
Enter fullscreen mode Exit fullscreen mode

output

Then lengths might be [25, 24].
Enter fullscreen mode Exit fullscreen mode

Data Source: S3 (using s3a protocol)

Transform: map() to calculate line lengths

Action: collect() to fetch the data

  1. Create RDD from a Local File System

This example reads from a local file.

rdd_local = sc.textFile("file:///var/tmp/sample.txt")

# Action: print the first 5 lines
print(rdd_local.take(5))
Enter fullscreen mode Exit fullscreen mode

Data Source: Local file system

Transform: None (just reading)

Action: take() to get the first 5 lines

  1. Create RDD from a Python List of Pairs

You can create RDDs with key-value pairs.

pairs = sc.parallelize([("a", 1), ("b", 2), ("a", 3)])

# Transform: sum the values of the same key using reduceByKey
reduced = pairs.reduceByKey(lambda a, b: a + b)

# Action: collect the results
print("Reduced:", reduced.collect())
Enter fullscreen mode Exit fullscreen mode

Data Source: In-memory list of pairs

Transform: reduceByKey() to sum values by key

Action: collect() to gather the results

  1. Create RDD from a Large Dataset in HDFS

This reads a large log file from HDFS and processes it.

rdd_logs = sc.textFile("hdfs://namenode:8020/data/logs/*.log")

# Transform: filter logs containing "error" and count them
error_logs = rdd_logs.filter(lambda line: "error" in line)

# Action: count the error logs
print(f"Number of error logs: {error_logs.count()}")
Enter fullscreen mode Exit fullscreen mode

Data Source: HDFS (log files)

Transform: filter() to get only error logs

Action: count() to count the error logs

  1. Create RDD from Multiple Text Files on Local FS

This creates an RDD from multiple text files on the local file system.

rdd_multiple_files = sc.textFile("file:///tmp/*.txt")

# Transform: map each line to its length
line_lengths = rdd_multiple_files.map(lambda line: len(line))

# Action: collect the lengths
print("Line lengths from multiple files:", line_lengths.collect())
Enter fullscreen mode Exit fullscreen mode

Data Source: Local file system (multiple files)

Transform: map() to calculate line lengths

Action: collect() to get the results

  1. Create RDD from an RDD Generated by Sampling

Sampling an existing RDD is useful when you want to process a smaller subset of data.

rdd_sample = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

# Transform: sample 50% of the data without replacement
sampled_rdd = rdd_sample.sample(False, 0.5)

# Action: collect the sample
print("Sampled RDD:", sampled_rdd.collect())
Enter fullscreen mode Exit fullscreen mode

Data Source: In-memory collection

Transform: sample() to take a random sample of the data

Action: collect() to get the sampled data

  1. Create RDD from a CSV File on S3

This reads a CSV file stored on S3 and processes the data.

rdd_csv = sc.textFile("s3a://my-bucket/data.csv")

# Transform: split each line into a list
rdd_split = rdd_csv.map(lambda line: line.split(','))

# Action: print the first 5 rows
print(rdd_split.take(5))
Enter fullscreen mode Exit fullscreen mode
name,age,city
Ashwani,30,Delhi
John,25,Mumbai
Sarah,28,Bangalore
Mike,35,Chennai
Enter fullscreen mode Exit fullscreen mode

Output:

[['name', 'age', 'city'],
 ['Ashwani', '30', 'Delhi'],
 ['John', '25', 'Mumbai'],
 ['Sarah', '28', 'Bangalore'],
 ['Mike', '35', 'Chennai']]
Enter fullscreen mode Exit fullscreen mode

Data Source: S3 (CSV file)

Transform: map() to split each row by commas

Action: take() to print the first 5 rows

  1. Create RDD from a Python Dictionary

You can create an RDD from an in-memory dictionary by converting it into a list of pairs.

data_dict = {"apple": 3, "banana": 2, "cherry": 5}

# Create an RDD from the dictionary (key-value pairs)
rdd_dict = sc.parallelize(data_dict.items())

# Transform: filter keys that start with "a"
filtered_dict = rdd_dict.filter(lambda x: x[0].startswith('a'))

# Action: collect and print the result
print("Filtered dictionary:", filtered_dict.collect())
Enter fullscreen mode Exit fullscreen mode

Data Source: In-memory dictionary

Transform: filter() to select keys that start with "a"

Action: collect() to fetch the results

Summary of RDD Creation and Transformations/Actions:

RDD from Collections: parallelize() can be used to create RDDs from in-memory collections like lists, tuples, or dictionaries.

RDD from Files: textFile() allows you to create RDDs from text files located on HDFS, S3, or local file systems.

Common Transformations: map(), filter(), reduceByKey(), and sample() allow you to manipulate RDD data.

Actions: collect(), count(), take() are used to retrieve the results from the RDD transformations.

These examples demonstrate how you can create and manipulate RDDs using different data sources, and perform common transformations and actions in Apache Spark.

Replace vowels in a list (using re.sub)

import re

words = ["apple", "banana", "cherry", "grape"]

# Replace vowels with '*'
replaced_words = list(map(lambda w: re.sub(r'[aeiouAEIOU]', '*', w), words))
print(replaced_words)
Enter fullscreen mode Exit fullscreen mode

output

['*ppl*', 'b*n*n*', 'ch*rry', 'gr*p*']
Enter fullscreen mode Exit fullscreen mode

Find all vowels in a list (using re.findall)

import re

data = {"apple": 3, "banana": 2, "cherry": 5}

# Replace vowels in keys
replaced_dict = dict(map(lambda kv: (re.sub(r'[aeiouAEIOU]', '*', kv[0]), kv[1]), data.items()))

print(replaced_dict)
Enter fullscreen mode Exit fullscreen mode
[['a', 'e'], ['a', 'a', 'a'], ['e'], ['a', 'e']]
Enter fullscreen mode Exit fullscreen mode

Replace vowels in a dictionary (using re.sub)

import re

data = {"apple": 3, "banana": 2, "cherry": 5}

# Replace vowels in keys
replaced_dict = dict(map(lambda kv: (re.sub(r'[aeiouAEIOU]', '*', kv[0]), kv[1]), data.items()))

print(replaced_dict)
Enter fullscreen mode Exit fullscreen mode

Output

{'*ppl*': 3, 'b*n*n*': 2, 'ch*rry': 5}
Enter fullscreen mode Exit fullscreen mode

20 unique programming problems solved using reduceByKey in PySpark

Word Count

lines = sc.parallelize(["a a b", "b c"])
words = lines.flatMap(lambda x: x.split())
//['a', 'a', 'b', 'b', 'c']
pairs = words.map(lambda x: (x, 1))
word_count = pairs.reduceByKey(lambda x, y: x + y)
//[('a', 1), ('a', 1), ('b', 1), ('b', 1), ('c', 1)]
print(word_count.collect())  # Output: [('a', 2), ('b', 2), ('c', 1)]
Enter fullscreen mode Exit fullscreen mode
  1. Sum of Sales by Product
sales = sc.parallelize([("apple", 100), ("banana", 80), ("apple", 120)])
total_sales = sales.reduceByKey(lambda x, y: x + y)
print(total_sales.collect())  # Output: [('apple', 220), ('banana', 80)]
Enter fullscreen mode Exit fullscreen mode
  1. Maximum Temperature by City
temps = sc.parallelize([("NYC", 70), ("LA", 75), ("NYC", 72)])
max_temp = temps.reduceByKey(lambda x, y: max(x, y))
print(max_temp.collect())  # Output: [('NYC', 72), ('LA', 75)]
Enter fullscreen mode Exit fullscreen mode
  1. Minimum Price by Product
prices = sc.parallelize([("pen", 5), ("pen", 3), ("book", 12)])
min_price = prices.reduceByKey(lambda x, y: min(x, y))
print(min_price.collect())  # Output: [('pen', 3), ('book', 12)]
Enter fullscreen mode Exit fullscreen mode
  1. Concatenate Strings by Key
text = sc.parallelize([("topic1", "A"), ("topic1", "B")])
concat = text.reduceByKey(lambda x, y: x + y)
print(concat.collect())  # Output: [('topic1', 'AB')]
Enter fullscreen mode Exit fullscreen mode
  1. Count Frequency by Item
items = sc.parallelize(["cat", "dog", "cat", "bird"])
freq = items.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
print(freq.collect())  # Output: [('cat', 2), ('dog', 1), ('bird', 1)]
Enter fullscreen mode Exit fullscreen mode
  1. Average Score by Student
scores = sc.parallelize([("Amy", 80), ("Amy", 90), ("Bob", 60)])
total = scores.mapValues(lambda v: (v, 1)).reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1]))
average = total.mapValues(lambda x: x[0]/x[1])
print(average.collect())  # Output: [('Amy', 85.0), ('Bob', 60.0)]
Enter fullscreen mode Exit fullscreen mode
  1. Product of Numbers by Key
nums = sc.parallelize([("A", 2), ("A", 3), ("B", 4)])
product = nums.reduceByKey(lambda x, y: x * y)
print(product.collect())  # Output: [('A', 6), ('B', 4)]
Enter fullscreen mode Exit fullscreen mode
  1. Find Longest Word by Initial Letter
words = sc.parallelize(["apple", "apricot", "banana"])
pairs = words.map(lambda x: (x[0], x))
longest = pairs.reduceByKey(lambda a, b: a if len(a) > len(b) else b)
print(longest.collect())  # Output: [('a', 'apricot'), ('b', 'banana')]
Enter fullscreen mode Exit fullscreen mode
  1. Find Shortest Name by Department
names = sc.parallelize([("IT", "Sam"), ("IT", "Jo"), ("HR", "Ann")])
shortest = names.reduceByKey(lambda a, b: a if len(a) < len(b) else b)
print(shortest.collect())  # Output: [('IT', 'Jo'), ('HR', 'Ann')]
Enter fullscreen mode Exit fullscreen mode
  1. Sum Transactions by User
transactions = sc.parallelize([("Tom", 50), ("Tom", 40), ("Ava", 100)])
sum_trans = transactions.reduceByKey(lambda x, y: x + y)
print(sum_trans.collect())  # Output: [('Tom', 90), ('Ava', 100)]
Enter fullscreen mode Exit fullscreen mode
  1. Unique Elements across Lists by Category
lists = sc.parallelize([("A", [1, 2]), ("A", [2, 3])])
unique = lists.reduceByKey(lambda x, y: list(set(x + y)))
print(unique.collect())  # Output: [('A', [1, 2, 3])]
Enter fullscreen mode Exit fullscreen mode
  1. First Appearance by Key
pairs = sc.parallelize([("A", 5), ("A", 8), ("B", 3)])
first = pairs.reduceByKey(lambda x, y: x)
print(first.collect())  # Output: [('A', 5), ('B', 3)]
Enter fullscreen mode Exit fullscreen mode
  1. Last Appearance by Key
pairs = sc.parallelize([("A", 2), ("A", 4), ("B", 7)])
last = pairs.reduceByKey(lambda x, y: y)
print(last.collect())  # Output: [('A', 4), ('B', 7)]
Enter fullscreen mode Exit fullscreen mode
  1. Merge Tuples by Key
data = sc.parallelize([("K", (1, 2)), ("K", (3, 4))])
merge = data.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))
print(merge.collect())  # Output: [('K', (4, 6))]
Enter fullscreen mode Exit fullscreen mode
  1. Count Distinct by Key
entries = sc.parallelize([("G", 5), ("G", 5), ("G", 6)])
distinct_count = entries.groupByKey().mapValues(lambda vals: len(set(vals)))
print(distinct_count.collect())  # Output: [('G', 2)]
Enter fullscreen mode Exit fullscreen mode
  1. Set Union by Key
sets = sc.parallelize([("A", {1,2}), ("A", {2,3})])
union = sets.reduceByKey(lambda x, y: x.union(y))
print(union.collect())  # Output: [('A', {1, 2, 3})]
Enter fullscreen mode Exit fullscreen mode
  1. Collect to List by Key
pairs = sc.parallelize([("T", "a"), ("T", "b")])
collected = pairs.groupByKey().mapValues(list)
print(collected.collect())  # Output: [('T', ['a', 'b'])]
Enter fullscreen mode Exit fullscreen mode
  1. Find Sum of Squared Values by Key
nums = sc.parallelize([("H", 2), ("H", 3)])
squared_sum = nums.mapValues(lambda x: x**2).reduceByKey(lambda x, y: x + y)
print(squared_sum.collect())  # Output: [('H', 13)]
Enter fullscreen mode Exit fullscreen mode
  1. Filter Even Values and Sum by Key
nums = sc.parallelize([("X", 2), ("X", 3), ("Y", 4)])
even_sum = nums.filter(lambda kv: kv[1] % 2 == 0).reduceByKey(lambda x, y: x + y)
print(even_sum.collect())  # Output: [('X', 2), ('Y', 4)]
Enter fullscreen mode Exit fullscreen mode

SUMMARY

reduce by key

square of all element of list

count the character length of each line for given json and print in list
print the first 5 lines

count the error logs by filtering which line error word is there

Splits each row or each line by commas → and print or creates a list of strings.

extract element of dictionary where key starts with a
find then replace vowels in list
Find all vowels in a list (using re.findall)
Replace vowels in a dictionary (using re.sub)
reducebykey=Word Count,Sum of Sales by Product,Maximum Temperature by City,Minimum Price by Product,Count Frequency by Item,Concatenate Strings by Key

Top comments (0)