Hadoop

MapReduce

So this exists before CUDA times.

Solve Embarrassingly Parallel problems.

Hemal Shah is telling me that this was one of the revolutionary ideas that enabled us to train neural networks at scale. And the stuff that the Google Brain team was doing was super duper cool.

From CS451

MapReduce is based around Key-Value Pairs This is a common way to break things down!

If the input is a text file:

  • Key – Position of a line
  • Value – Text of a line.

The programmer defines two functions:

  1. map:
  2. reduce:

A simple word count example

def map(pos : Long, text: String):
    for word in tokenize(text):
        emit(word, 1)
        
def reduce(key: String, values: Collection of Ints):
    sum = 0
    for v in values:
        sum += v
    emit(key, sum)
    
def partition(key : String, reducer_count: Nat):
    return hashcode(key) % reducer_count

Bigram implementations https://git.uwaterloo.ca/cs451/bespin/-/blob/master/src/main/java/io/bespin/java/mapreduce/bigram/BigramCount.java

In-Mapper Combiner (IMC)

https://stackoverflow.com/questions/28198873/difference-between-combiner-and-in-mapper-combiner-in-mapreduce

Counting co-occurrence (Pairs)

Term: Co-Occurrence

  • : number of times word and word cooccur in some context
  • E.g. how many times is followed immediately by in a sentence is , where is the vocabulary

How do you deal with keys that are pairs?

Two options:

  1. Pair – We’ll be computing individual Cells
  2. Stripe – We’ll be computing individual Rows

First option: combine the keys into a pair.

Input: Sentence Output: ((a,b), 1) for all pairs of words a,b in the sentence.

def map(key : Long, value: String):
    for u in tokenize(value):
        for each v that coöccurs with u in value:
            emit((u, v), 1)
    
def reduce(key: (String, String), values: List[Int]):
    for value in values:
        sum += value
        emit(key, sum)

Disadvantage

That’s a lot of pairs to be fair (order matters). The combiner also doesn’t do much?

  • The combiner won’t do much because there are N x N potential keys. Most keys will have few entries, so there will be few cases where the combiner reduces the number of pairs.

Second Option: Use Stripes

Input: Sentence Output: , where:

  • is a word from the input
  • are all words that co-occur with a is the number of times co-occur
  • means a map (aka a dictionary, associative array, etc)
def map(key: Long, value: String):
    for u in tokenize(value):
        counts = {}
        for each v that co-occurs with u in value:
            counts(v) += 1
        emit(u, counts)
 
def reduce(key: Long, values: List[Map[String->Int]]):
    for value in values:
        sum += value
    emit(key, sum)
  • Fewer key-value pairs to send
  • Combiners will do more work
  • Map is a heavier object than a single
  • More computationally intensive for the mappers and combiners.

Intuition

Better CPU utilization in using stripes.

Why doesn’t it grow polynomially? Like don’t you have O(n^2) different pairs?

In a practical MapReduce setup, each mapper processes smaller subsets of the dataset (for example, one document, basket, or chunk of data at a time).

  • Suppose there are n words and k sentences
  • Each worker processes m = n/k words
  • Each mapper does O(m^2) work.

In most real-world datasets, (the number of items per basket/document) is relatively constant or small compared to , which is the total number of baskets/documents.

Thus, the total work done by all mappers across the dataset is roughly proportional to .

So the trick is by reducing the constant time factor.

Should you always use stripes?

It’s a tradeoff. “Easier to understand and implement” is NOT bad. For English words and normal sentence lengths, the stripe fits in memory easily. It won’t always work out that way.

Another Problem, Relative Frequencies Where

  • is number of co-occurrences of and , and
  • is the sum of over all

Why do we want to do this? How do we make it fit into MapReduce?

#todo CHECK about the two passes thing

We want to do this, but we don’t have access to freq(a) :( This is only known at the reducer level.

DOESN’T WORK

def reduce(key:Pair[String], values: List[Int]):let (a, b) = key
    for v in values:
        sum += v
    emit((b, a), sum / freq(a))

We need to do something elaborate.

def map(key: Long, value: String):
    for u in tokenize(value):
        for v in cooccurrence(u):
            emit((u, v), 1)
            // emit((u, “*”), 1) // improved using line below
        emit((u, “*”), len(cooccurrence(u))
 
def partition(key: Pair, value: Int, N: Int):
    return hash(key.left) % N
 
marginal = 0
def reduce(key: Pair, values: List[Int]):
    let (a, b) = key
    for (v in values):
        sum += v
    if (b ==*”):
        marginal = sum
    else:
        emit((b, a), sum / marginal)
  • The reducer works! That’s because Hadoop sorts the keys. Pairs will sort lexicographically by the first key, and then the second. So So comes before any because the ASCII code of * is 42.