Jason Baldridge avatar Jason Baldridge committed a29f15b

Adoptd word count from Cloud9 for easy access in Fogbow.

Comments (0)

Files changed (1)

src/main/java/fogbow/example/CloudNineWordCount.java

+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0 
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package fogbow.example;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+/**
+ * <p>
+ * Simple word count demo. This Hadoop Tool counts words in flat text file, and
+ * takes the following command-line arguments:
+ * </p>
+ * 
+ * <ul>
+ * <li>[input-path] input path</li>
+ * <li>[output-path] output path</li>
+ * <li>[num-reducers] number of reducers</li>
+ * </ul>
+ * 
+ * 
+ * Taken and adapted from edu.umd.cloud9.demo.DemoWordCount 
+ * 
+ * @author Jimmy Lin
+ */
+public class CloudNineWordCount extends Configured implements Tool {
+
+  private static final Logger sLogger = Logger.getLogger(CloudNineWordCount.class);
+
+  // mapper: emits (token, 1) for every word occurrence
+  private static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
+
+    // reuse objects to save overhead of object creation
+    private final static IntWritable one = new IntWritable(1);
+    private Text word = new Text();
+
+    @Override
+    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+      String line = ((Text) value).toString();
+      StringTokenizer itr = new StringTokenizer(line);
+      while (itr.hasMoreTokens()) {
+	word.set(itr.nextToken());
+	context.write(word, one);
+      }
+    }
+  }
+
+  // reducer: sums up all the counts
+  private static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
+
+    // reuse objects
+    private final static IntWritable SumValue = new IntWritable();
+
+    @Override
+    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
+      throws IOException, InterruptedException {
+
+      // sum up values
+      Iterator<IntWritable> iter = values.iterator();
+      int sum = 0;
+      while (iter.hasNext()) {
+	sum += iter.next().get();
+      }
+      SumValue.set(sum);
+      context.write(key, SumValue);
+    }
+  }
+
+  /**
+   * Creates an instance of this tool.
+   */
+  public CloudNineWordCount() { }
+
+  private static int printUsage() {
+    System.out.println("usage: [input-path] [output-path] [num-reducers]");
+    ToolRunner.printGenericCommandUsage(System.out);
+    return -1;
+  }
+
+  /**
+   * Runs this tool.
+   */
+  public int run(String[] args) throws Exception {
+    if (args.length != 3) {
+      printUsage();
+      return -1;
+    }
+
+    String inputPath = args[0];
+    String outputPath = args[1];
+    int reduceTasks = Integer.parseInt(args[2]);
+
+    sLogger.info("Tool: CloudNineWordCount");
+    sLogger.info(" - input path: " + inputPath);
+    sLogger.info(" - output path: " + outputPath);
+    sLogger.info(" - number of reducers: " + reduceTasks);
+
+    Configuration conf = new Configuration();
+    Job job = new Job(conf, "CloudNineWordCount");
+    job.setJarByClass(CloudNineWordCount.class);
+
+    job.setNumReduceTasks(reduceTasks);
+
+    FileInputFormat.setInputPaths(job, new Path(inputPath));
+    FileOutputFormat.setOutputPath(job, new Path(outputPath));
+
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+
+    job.setMapperClass(MyMapper.class);
+    job.setCombinerClass(MyReducer.class);
+    job.setReducerClass(MyReducer.class);
+
+    // Delete the output directory if it exists already
+    Path outputDir = new Path(outputPath);
+    FileSystem.get(conf).delete(outputDir, true);
+
+    long startTime = System.currentTimeMillis();
+    job.waitForCompletion(true);
+    sLogger.info("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0
+		 + " seconds");
+
+    return 0;
+  }
+
+  /**
+   * Dispatches command-line arguments to the tool via the
+   * <code>ToolRunner</code>.
+   */
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new CloudNineWordCount(), args);
+    System.exit(res);
+  }
+
+}
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.