Spark RDD API

Bigram with Spark

object BigramCount extends Tokenizer {
  val log = Logger.getLogger(getClass().getName())
  def main(argv: Array[String]) {
    val args = new Conf(argv)
    log.info("Input: " + args.input())
    log.info("Output: " + args.output())
    log.info("Number of reducers: " + args.reducers())
    val conf = new SparkConf().setAppName("Bigram Count")
    val sc = new SparkContext(conf)
    val outputDir = new Path(args.output())
    FileSystem.get(sc.hadoopConfiguration).delete(outputDir, true)
    val textFile = sc.textFile(args.input())
    val counts = textFile
      .flatMap(line => {
        val tokens = tokenize(line)
        if (tokens.length > 1) tokens.sliding(2).map(p => p.mkString(" ")).toList else List()
      })
      .map(bigram => (bigram, 1))
      .reduceByKey(_ + _)
    counts.saveAsTextFile(args.output())
  }
}

With Scala Hadoop

object BigramCount extends Configured with Tool with WritableConversions with Tokenizer {
  val log = Logger.getLogger(getClass().getName())
 
  class MyMapper extends Mapper[LongWritable, Text, Text, IntWritable] {
    override def map(key: LongWritable, value: Text,
                     context: Mapper[LongWritable, Text, Text, IntWritable]#Context) = {
      val tokens = tokenize(value)
      if (tokens.length > 1)
        tokens.sliding(2).map(p => p.mkString(" ")).foreach(word => context.write(word, 1))
    }
  }
 
  class MyReducer extends Reducer[Text, IntWritable, Text, IntWritable] {
    override def reduce(key: Text, values: java.lang.Iterable[IntWritable],
                        context: Reducer[Text, IntWritable, Text, IntWritable]#Context) = {
      // Although it is possible to write the reducer in a functional style (e.g., with foldLeft),
      // an imperative implementation is clearer for two reasons:
      //
      //   (1) The MapReduce framework supplies an iterable over writable objects; since writable
      //       are container objects, it simply returns (a reference to) the same object each time
      //       but with a different payload inside it.
      //   (2) Implicit writable conversions in WritableConversions.
      //
      // The combination of both means that a functional implementation may have unpredictable
      // behavior when the two issues interact.
      var sum = 0
      for (value <- values.asScala) {
        sum += value
      }
      context.write(key, sum)
    }
  }