1. Jason Baldridge
  2. fogbow

Commits

Jason Baldridge  committed 5584b44

Added bigram classes during 7/7 exercises.

  • Participants
  • Parent commits 5cbeea5
  • Branches default

Comments (0)

Files changed (3)

File lib/log4j-1.2.16.jar

Binary file added.

File src/main/java/fogbow/jason/BigramCount.java

View file
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package fogbow.jason;
+
+import java.io.IOException;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.GenericOptionsParser;
+
+public class BigramCount {
+
+  public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
+
+    private final static IntWritable one = new IntWritable(1);
+    private Text bigram = new Text();
+      
+    public void map(Object key, Text value, Context context)
+	throws IOException, InterruptedException {
+
+      StringTokenizer itr = new StringTokenizer(value.toString());
+      String prev = "<START>";
+      while (itr.hasMoreTokens()) {
+	  String current = itr.nextToken();
+	  bigram.set(prev+" "+current);
+	  context.write(bigram, one);
+	  prev = current;
+      }
+    }
+  }
+  
+  public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
+
+    private IntWritable result = new IntWritable();
+
+    public void reduce(Text key, Iterable<IntWritable> values, Context context)
+	throws IOException, InterruptedException {
+
+      int sum = 0;
+      for (IntWritable val : values)
+        sum += val.get();
+
+      result.set(sum);
+      context.write(key, result);
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration conf = new Configuration();
+    Job job = new Job(conf, "bigram count");
+    job.setJarByClass(BigramCount.class);
+    job.setMapperClass(TokenizerMapper.class);
+    job.setCombinerClass(IntSumReducer.class);
+    job.setReducerClass(IntSumReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+    FileInputFormat.addInputPath(job, new Path(args[0]));
+    FileOutputFormat.setOutputPath(job, new Path(args[1]));
+    System.exit(job.waitForCompletion(true) ? 0 : 1);
+  }
+}

File src/main/java/fogbow/jason/BigramRelativeFrequenciesBadong.java

View file
+/*
+ * Cloud9: A MapReduce Library for Hadoop
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you
+ * may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package fogbow.jason;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+import edu.umd.cloud9.io.pair.PairOfStrings;
+
+public class BigramRelativeFrequenciesBadong extends Configured implements Tool {
+    private static final Logger LOG = Logger.getLogger(BigramRelativeFrequenciesBadong.class);
+
+    // Mapper: emits (token, 1) for every bigram occurrence.
+    protected static class MyMapper extends	Mapper<LongWritable, Text, PairOfStrings, FloatWritable> {
+	// Reuse objects to save overhead of object creation.
+	private static final FloatWritable one = new FloatWritable(1);
+	private static final PairOfStrings bigram = new PairOfStrings();
+	private boolean HAVE_OUTPUT_ZZZ = false;
+
+	@Override
+	    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+
+	    String line = value.toString();
+
+	    String prev = null;
+	    StringTokenizer itr = new StringTokenizer(line);
+	    while (itr.hasMoreTokens()) {
+		String cur = itr.nextToken();
+
+		// Emit only if we have an actual bigram.
+		if (prev != null) {
+
+		    // Simple way to truncate tokens that are too long.
+		    if (cur.length() > 100) {
+			cur = cur.substring(0, 100);
+		    }
+
+		    if (prev.length() > 100) {
+			prev = prev.substring(0, 100);
+		    }
+
+		    bigram.set(prev, cur);
+		    context.write(bigram, one);
+
+		    if (!HAVE_OUTPUT_ZZZ) {
+			bigram.set(prev, "ZZZZZZZZZZZZZZZZZZZ");
+			context.write(bigram,one);
+			HAVE_OUTPUT_ZZZ = true;
+		    }
+
+		}
+		prev = cur;
+	    }
+	}
+    }
+
+    // Combiner: sums up all the counts.
+    protected static class MyCombiner extends Reducer<PairOfStrings, FloatWritable, PairOfStrings, FloatWritable> {
+	private static final FloatWritable sumWritable = new FloatWritable();
+
+	@Override
+	    public void reduce(PairOfStrings key, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException {
+	    int sum = 0;
+	    Iterator<FloatWritable> iter = values.iterator();
+	    while (iter.hasNext()) {
+		sum += iter.next().get();
+	    }
+	    sumWritable.set(sum);
+	    context.write(key, sumWritable);
+	}
+    }
+
+    // Reducer: sums up all the counts.
+    protected static class MyReducer extends Reducer<PairOfStrings, FloatWritable, PairOfStrings, FloatWritable> {
+	private static final FloatWritable value = new FloatWritable();
+	private float marginal = 0.0f;
+	private Map<PairOfStrings, Integer> counter = new HashMap<PairOfStrings, Integer>();
+
+	@Override
+	    public void reduce(PairOfStrings key, Iterable<FloatWritable> values, Context context) throws IOException, InterruptedException {
+	    int sum = 0;
+	    Iterator<FloatWritable> iter = values.iterator();
+	    while (iter.hasNext()) {
+		sum += iter.next().get();
+	    }
+
+	    counter.put(key,sum);
+
+
+	    if (key.getRightElement().equals("ZZZZZZZZZZZZZZZZZZZ")) {
+		int sumOfKey = 0;
+		for (PairOfStrings key2: counter.keySet()) {
+		    sumOfKey += counter.get(key2);
+		}
+		
+		for (PairOfStrings key2: counter.keySet()) {
+		    value.set(counter.get(key2) / sumOfKey);
+		    context.write(key2, value);
+		}
+		counter = new HashMap<PairOfStrings, Integer>();
+	    }
+	}
+    }
+
+    protected static class MyPartitioner extends Partitioner<PairOfStrings, FloatWritable> {
+	@Override
+	    public int getPartition(PairOfStrings key, FloatWritable value, int numReduceTasks) {
+	    return (key.getLeftElement().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
+	}
+    }
+
+    private BigramRelativeFrequenciesBadong() {}
+
+    private static int printUsage() {
+	System.out.println("usage: [input-path] [output-path] [num-reducers]");
+	ToolRunner.printGenericCommandUsage(System.out);
+	return -1;
+    }
+
+    /**
+     * Runs this tool.
+     */
+    public int run(String[] args) throws Exception {
+	if (args.length != 3) {
+	    printUsage();
+	    return -1;
+	}
+
+	String inputPath = args[0];
+	String outputPath = args[1];
+	int reduceTasks = Integer.parseInt(args[2]);
+
+	LOG.info("Tool name: BigramRelativeFrequenciesBadong");
+	LOG.info(" - input path: " + inputPath);
+	LOG.info(" - output path: " + outputPath);
+	LOG.info(" - num reducers: " + reduceTasks);
+
+	Job job = new Job(getConf(), "BigramRelativeFrequenciesBadong");
+	job.setJarByClass(BigramRelativeFrequenciesBadong.class);
+
+	job.setNumReduceTasks(reduceTasks);
+
+	FileInputFormat.setInputPaths(job, new Path(inputPath));
+	FileOutputFormat.setOutputPath(job, new Path(outputPath));
+
+	job.setMapOutputKeyClass(PairOfStrings.class);
+	job.setMapOutputValueClass(FloatWritable.class);
+	job.setOutputKeyClass(PairOfStrings.class);
+	job.setOutputValueClass(FloatWritable.class);
+	//job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
+	job.setMapperClass(MyMapper.class);
+	//job.setCombinerClass(MyCombiner.class);
+	job.setReducerClass(MyReducer.class);
+	job.setPartitionerClass(MyPartitioner.class);
+
+	// Delete the output directory if it exists already
+	Path outputDir = new Path(outputPath);
+	FileSystem.get(getConf()).delete(outputDir, true);
+
+	long startTime = System.currentTimeMillis();
+	job.waitForCompletion(true);
+	System.out.println("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+
+	return 0;
+    }
+
+    /**
+     * Dispatches command-line arguments to the tool via the
+     * <code>ToolRunner</code>.
+     */
+    public static void main(String[] args) throws Exception {
+	int res = ToolRunner.run(new BigramRelativeFrequenciesBadong(), args);
+	System.exit(res);
+    }
+}