Spark

Resilient Distributed Dataset (RDD)

RDDs allow us to write programs in terms of operations on distributed datasets.

Learned in CS451.

How is RDD different from HDFS?

HDFS is a distributed file system designed to store data across a cluster (so primarily for data storage).

RDD is a processing abstraction for performing distributed computation, optimized for in-memory operations.

  • HDFS provides the persistent data layer, while RDD offers the high-performance computation layer.

This is why you load data from hdfs with RDD:

lines = spark.textFile("hdfs://...")
  • lines is a RDD

Properties:

  • Collections of objects spread across a cluster, stored in RAM or on Disk
  • Built through parallel transformations
  • Automatically rebuilt on failure

Operations

  • Transformations (e.g. map, filter, groupBy)
  • Actions (e.g. count, collect, save)
lines = spark.textFile("hdfs://...")
errors = lines.filter(lambda s: s.startswith("ERROR"))
messages = errors.map(lambda s: s.split("\t")[2])
messages.cache() # this caches the messages
 
messages.filter(lambda s: "mysql" in s).count()
messages.filter(lambda s: "php" in s).count()

Always cache the branching point.

Fault Recovery

RDDs track lineage information that can be used to efficiently recompute lost data.

Creating RDDs

# Turn a Python collection into an RDD
sc.parallelize([1, 2, 3])
# Load text file from local FS, HDFS, or S3
sc.textFile("file.txt")
sc.textFile("directory/*.txt")
sc.textFile("hdfs://namenode:9000/path/file")

Basic Transformations

nums = sc.parallelize([1, 2, 3])
 
# Pass each element through a function
squares = nums.map(lambda x: x*x) // {1, 4, 9}
 
# Keep elements passing a predicate
even = squares.filter(lambda x: x % 2 == 0) // {4} 
 
# Map each element to zero or more others
>nums.flatMap(lambda x: => range(x))
# => {0, 0, 1, 0, 1, 2}

Basic Actions

nums = sc.parallelize([1, 2, 3])
# Retrieve RDD contents as a local collection (not used often)
nums.collect() # => [1, 2, 3]
 
# Return first K elements
nums.take(2) # => [1, 2]
 
# Count number of elements
nums.count() # => 3
 
# Merge elements with an associative function
nums.reduce(lambda x, y: x + y) # => 6
 
# Write elements to a text file
nums.saveAsTextFile("hdfs://file.txt")

Counts

  • nums.count() → this makes each worker count, and then the driver just add it up
  • len(nums.collect()) → this makes the driver count it

Working with key-value pairs:

pets = sc.parallelize([("cat", 1), ("dog", 1), ("cat", 2)])
 
pets.reduceByKey(lambda x, y: x + y) # => {(cat, 3), (dog, 1)}
pets.groupByKey() # => {(cat, [1, 2]), (dog, [1])}
> pets.sortByKey() # => {(cat, 1), (cat, 2), (dog, 1)}

NOTE: You can write pets.reduceByKey(_ + _) instead of pets.reduceByKey(lambda x, y: x + y).

This is how you can do word count in 2 lines of code!! As opposed to the wordy MapReduce implementation…

lines = sc.textFile("hamlet.txt")
counts = lines.flatMap(lambda line: line.split(" "))
    .map(lambda word: (word, 1))
    .reduceByKey(lambda x, y: x + y)
    .saveAsTextFile("results")
val textFile = sc.textFile("hamlet.txt")
textFile.flatMap(line => line.split(" "))
    .map(word => (word, 1))
    .reduceByKey((x, y) => x + y)
    .saveAsTextFile("results")
 
// Less verbose
val textFile = sc.textFile("hamlet.txt")
textFile.flatMap(_.split(" "))
    .map((_, 1))
    .reduceByKey(_+ _)
    .saveAsTextFile("results")

Other Key-Value Operations

visits = sc.parallelize([("index.html", "1.2.3.4"),
                         ("about.html", "3.4.5.6"),
                         ("index.html", "1.3.3.1")])
 
pageNames = sc.parallelize([ ("index.html", "Home"),("about.html", "About") ])
 
visits.join(pageNames)
# ("index.html", ("1.2.3.4", "Home"))
# ("index.html", ("1.3.3.1", "Home"))
# ("about.html", ("3.4.5.6", "About"))
 
visits.cogroup(pageNames)
# ("index.html", (["1.2.3.4", "1.3.3.1"], ["Home"]))
# ("about.html", (["3.4.5.6"], ["About"]))

Setting the Level of Parallelism

All the pair RDD operations take an optional second parameter for number of tasks

words.reduceByKey(lambda x, y: x + y, 5)
words.groupByKey(5)
visits.join(pageViews, 5)
 
sc.TextFile("file.txt", 5)

There is a minimum....

If a file is split into 16 blocks on HDFS, and you open it with textFile(PathString, 10), you’ll still get 16 partitions, not 10.

What is the default level of parallelism?

It uses the same number of partitions for destination as the source has.

  • Ex: a reduceByKey on an RDD with 8 partitions will result in another RDD with 8 partitions.

If you specify spark.default.parallelism it will use this as the default instead! (For shuffles only, not parallelize textFile or other base RDDs)

Since Spark is lazy evaluated, we can start building a graph under the hood and be more optimized!