Spark RDD API
Bigram with Spark
object BigramCount extends Tokenizer {
val log = Logger.getLogger(getClass().getName())
def main(argv: Array[String]) {
val args = new Conf(argv)
log.info("Input: " + args.input())
log.info("Output: " + args.output())
log.info("Number of reducers: " + args.reducers())
val conf = new SparkConf().setAppName("Bigram Count")
val sc = new SparkContext(conf)
val outputDir = new Path(args.output())
FileSystem.get(sc.hadoopConfiguration).delete(outputDir, true)
val textFile = sc.textFile(args.input())
val counts = textFile
.flatMap(line => {
val tokens = tokenize(line)
if (tokens.length > 1) tokens.sliding(2).map(p => p.mkString(" ")).toList else List()
})
.map(bigram => (bigram, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile(args.output())
}
}
With Scala Hadoop
object BigramCount extends Configured with Tool with WritableConversions with Tokenizer {
val log = Logger.getLogger(getClass().getName())
class MyMapper extends Mapper[LongWritable, Text, Text, IntWritable] {
override def map(key: LongWritable, value: Text,
context: Mapper[LongWritable, Text, Text, IntWritable]#Context) = {
val tokens = tokenize(value)
if (tokens.length > 1)
tokens.sliding(2).map(p => p.mkString(" ")).foreach(word => context.write(word, 1))
}
}
class MyReducer extends Reducer[Text, IntWritable, Text, IntWritable] {
override def reduce(key: Text, values: java.lang.Iterable[IntWritable],
context: Reducer[Text, IntWritable, Text, IntWritable]#Context) = {
// Although it is possible to write the reducer in a functional style (e.g., with foldLeft),
// an imperative implementation is clearer for two reasons:
//
// (1) The MapReduce framework supplies an iterable over writable objects; since writable
// are container objects, it simply returns (a reference to) the same object each time
// but with a different payload inside it.
// (2) Implicit writable conversions in WritableConversions.
//
// The combination of both means that a functional implementation may have unpredictable
// behavior when the two issues interact.
var sum = 0
for (value <- values.asScala) {
sum += value
}
context.write(key, sum)
}
}