Spark RDD API Bigram with Spark object BigramCount extends Tokenizer { val log = Logger.getLogger(getClass().getName()) def main(argv: Array[String]) { val args = new Conf(argv) log.info("Input: " + args.input()) log.info("Output: " + args.output()) log.info("Number of reducers: " + args.reducers()) val conf = new SparkConf().setAppName("Bigram Count") val sc = new SparkContext(conf) val outputDir = new Path(args.output()) FileSystem.get(sc.hadoopConfiguration).delete(outputDir, true) val textFile = sc.textFile(args.input()) val counts = textFile .flatMap(line => { val tokens = tokenize(line) if (tokens.length > 1) tokens.sliding(2).map(p => p.mkString(" ")).toList else List() }) .map(bigram => (bigram, 1)) .reduceByKey(_ + _) counts.saveAsTextFile(args.output()) } } With Scala Hadoop object BigramCount extends Configured with Tool with WritableConversions with Tokenizer { val log = Logger.getLogger(getClass().getName()) class MyMapper extends Mapper[LongWritable, Text, Text, IntWritable] { override def map(key: LongWritable, value: Text, context: Mapper[LongWritable, Text, Text, IntWritable]#Context) = { val tokens = tokenize(value) if (tokens.length > 1) tokens.sliding(2).map(p => p.mkString(" ")).foreach(word => context.write(word, 1)) } } class MyReducer extends Reducer[Text, IntWritable, Text, IntWritable] { override def reduce(key: Text, values: java.lang.Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context) = { // Although it is possible to write the reducer in a functional style (e.g., with foldLeft), // an imperative implementation is clearer for two reasons: // // (1) The MapReduce framework supplies an iterable over writable objects; since writable // are container objects, it simply returns (a reference to) the same object each time // but with a different payload inside it. // (2) Implicit writable conversions in WritableConversions. // // The combination of both means that a functional implementation may have unpredictable // behavior when the two issues interact. var sum = 0 for (value <- values.asScala) { sum += value } context.write(key, sum) } }