WordCount Learned in CS451. Reference implementations here https://git.uwaterloo.ca/cs451/bespin/-/blob/master/src/main/java/io/bespin/java/mapreduce/wordcount/WordCount.java?ref_type=heads /** * Bespin: reference implementations of "big data" algorithms * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package io.bespin.java.mapreduce.wordcount; import io.bespin.java.util.Tokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; import org.kohsuke.args4j.CmdLineException; import org.kohsuke.args4j.CmdLineParser; import org.kohsuke.args4j.Option; import org.kohsuke.args4j.ParserProperties; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; /** * Simple word count demo. */ public class WordCount extends Configured implements Tool { private static final Logger LOG = Logger.getLogger(WordCount.class); // Mapper: emits (token, 1) for every word occurrence. public static final class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> { // Reuse objects to save overhead of object creation. private static final IntWritable ONE = new IntWritable(1); private static final Text WORD = new Text(); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { for (String word : Tokenizer.tokenize(value.toString())) { WORD.set(word); context.write(WORD, ONE); } } } public static final class MyMapperIMC extends Mapper<LongWritable, Text, Text, IntWritable> { private Map<String, Integer> counts; @Override public void setup(Context context) throws IOException, InterruptedException { counts = new HashMap<>(); } @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { for (String word : Tokenizer.tokenize(value.toString())) { if (counts.containsKey(word)) { counts.put(word, counts.get(word)+1); } else { counts.put(word, 1); } } } @Override public void cleanup(Context context) throws IOException, InterruptedException { IntWritable cnt = new IntWritable(); Text token = new Text(); for (Map.Entry<String, Integer> entry : counts.entrySet()) { token.set(entry.getKey()); cnt.set(entry.getValue()); context.write(token, cnt); } } } // Reducer: sums up all the counts. public static final class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { // Reuse objects. private static final IntWritable SUM = new IntWritable(); @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // Sum up values. Iterator<IntWritable> iter = values.iterator(); int sum = 0; while (iter.hasNext()) { sum += iter.next().get(); } SUM.set(sum); context.write(key, SUM); } } /** * Creates an instance of this tool. */ private WordCount() {} private static final class Args { @Option(name = "-input", metaVar = "[path]", required = true, usage = "input path") String input; @Option(name = "-output", metaVar = "[path]", required = true, usage = "output path") String output; @Option(name = "-reducers", metaVar = "[num]", usage = "number of reducers") int numReducers = 1; @Option(name = "-imc", usage = "use in-mapper combining") boolean imc = false; } /** * Runs this tool. */ @Override public int run(String[] argv) throws Exception { final Args args = new Args(); CmdLineParser parser = new CmdLineParser(args, ParserProperties.defaults().withUsageWidth(100)); try { parser.parseArgument(argv); } catch (CmdLineException e) { System.err.println(e.getMessage()); parser.printUsage(System.err); return -1; } LOG.info("Tool: " + WordCount.class.getSimpleName()); LOG.info(" - input path: " + args.input); LOG.info(" - output path: " + args.output); LOG.info(" - number of reducers: " + args.numReducers); LOG.info(" - use in-mapper combining: " + args.imc); Configuration conf = getConf(); Job job = Job.getInstance(conf); job.setJobName(WordCount.class.getSimpleName()); job.setJarByClass(WordCount.class); job.setNumReduceTasks(args.numReducers); FileInputFormat.setInputPaths(job, new Path(args.input)); FileOutputFormat.setOutputPath(job, new Path(args.output)); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapperClass(args.imc ? MyMapperIMC.class : MyMapper.class); job.setCombinerClass(MyReducer.class); job.setReducerClass(MyReducer.class); // Delete the output directory if it exists already. Path outputDir = new Path(args.output); FileSystem.get(conf).delete(outputDir, true); long startTime = System.currentTimeMillis(); job.waitForCompletion(true); LOG.info("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); return 0; } /** * Dispatches command-line arguments to the tool via the {@code ToolRunner}. * * @param args command-line arguments * @throws Exception if tool encounters an exception */ public static void main(String[] args) throws Exception { ToolRunner.run(new WordCount(), args); } }