Commits

Ben Wing  committed acaf092

Move parse-tweets from under twitter-pull to top-level dir

  • Participants
  • Parent commits 17d9af4

Comments (0)

Files changed (12)

File parse-tweets/lib/gson-1.6.jar

Binary file added.

File parse-tweets/lib/guava-r08.jar

Binary file added.

File parse-tweets/main/src/java/fogbow/textgrounder/GetTwitterUsers.java

+package fogbow.textgrounder;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * <p>
+ * outputs a sorted list of all the words that have come after it in the
+ * training corpus, and takes the following command-line arguments:
+ * </p>
+ * 
+ * <ul>
+ * <li>[input-path] input path</li>
+ * <li>[output-path] output path</li>
+ * <li>[num-reducers] number of reducers</li>
+ * </ul>
+ * 
+ * 
+ * Taken and adapted from edu.umd.cloud9.demo.CloudNineWordCount
+ * 
+ * @author Andy Luong & Erik Skiles
+ */
+public class GetTwitterUsers extends Configured implements Tool {
+	private static final Logger sLogger = Logger
+			.getLogger(GetTwitterUsers.class);
+
+	private static class GetTwitterUsersMapper extends
+			Mapper<LongWritable, Text, Text, NullWritable> {
+
+		Text out = new Text();
+
+		@Override
+		public void map(LongWritable key, Text value, Context context)
+				throws IOException, InterruptedException {
+
+			try {
+				JSONObject json = new JSONObject(value.toString());
+				JSONObject user = json.getJSONObject("user");
+				String id = user.getString("id_str");
+				out.set(id);
+				context.write(out, NullWritable.get());
+
+			} catch (JSONException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+		}
+	}
+
+	private static class GetTwitterUsersReducer extends
+			Reducer<Text, NullWritable, Text, NullWritable> {
+		@Override
+		public void reduce(Text key, Iterable<NullWritable> values,
+				Context context) throws IOException, InterruptedException {
+			context.write(key, NullWritable.get());
+		}
+	}
+
+	/**
+	 * Creates an instance of this tool.
+	 */
+	public GetTwitterUsers() {
+	}
+
+	private static int printUsage() {
+		System.out.println("usage: [input-path] [output-path] [num-reducers]");
+		ToolRunner.printGenericCommandUsage(System.out);
+		return -1;
+	}
+
+	/**
+	 * Runs this tool.
+	 */
+	public int run(String[] args) throws Exception {
+		if (args.length != 3) {
+			printUsage();
+			return -1;
+		}
+
+		String inputPath = args[0];
+		String outputPath = args[1];
+		int reduceTasks = Integer.parseInt(args[2]);
+
+		sLogger.info("Tool: GetTwitterUsers");
+		sLogger.info(" - input path: " + inputPath);
+		sLogger.info(" - output path: " + outputPath);
+		sLogger.info(" - number of reducers: " + reduceTasks);
+
+		Configuration conf = new Configuration();
+		Job job = new Job(conf, "GetTwitterUsers");
+		job.setJarByClass(GetTwitterUsers.class);
+
+		job.setNumReduceTasks(reduceTasks);
+
+		FileInputFormat.setInputPaths(job, new Path(inputPath));
+		FileOutputFormat.setOutputPath(job, new Path(outputPath));
+
+		job.setOutputKeyClass(Text.class);
+		job.setOutputValueClass(NullWritable.class);
+		job.setMapOutputValueClass(NullWritable.class);
+
+		job.setMapperClass(GetTwitterUsersMapper.class);
+		job.setCombinerClass(GetTwitterUsersReducer.class);
+		job.setReducerClass(GetTwitterUsersReducer.class);
+
+		// Delete the output directory if it exists already
+		Path outputDir = new Path(outputPath);
+		FileSystem.get(conf).delete(outputDir, true);
+
+		long startTime = System.currentTimeMillis();
+		job.waitForCompletion(true);
+		sLogger.info("Job Finished in "
+				+ (System.currentTimeMillis() - startTime) / 1000.0
+				+ " seconds");
+
+		return 0;
+	}
+
+	/**
+	 * Dispatches command-line arguments to the tool via the
+	 * <code>ToolRunner</code>.
+	 */
+	public static void main(String[] args) throws Exception {
+		int res = ToolRunner.run(new Configuration(), new GetTwitterUsers(),
+				args);
+		System.exit(res);
+	}
+
+}

File parse-tweets/main/src/java/twools/PreprocessTweetsJSON.java

+package twools;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+/**
+ * <p>
+ * command-line arguments:
+ * </p>
+ * 
+ * <ul>
+ * <li>[input-path] input path</li>
+ * <li>[output-path] output path</li>
+ * <li>[num-reducers] number of reducers</li>
+ * </ul>
+ * 
+ * 
+ * Taken and adapted from edu.umd.cloud9.demo.CloudNineWordCount
+ * 
+ * @author Andy Luong
+ */
+public class PreprocessTweetsJSON extends Configured implements Tool {
+	private static final Logger sLogger = Logger
+			.getLogger(PreprocessTweetsJSON.class);
+
+	private static class PreprocessTweetsJSONMapper extends
+			Mapper<LongWritable, Text, Text, Text> {
+
+		Text user = new Text();
+		Text data = new Text();
+
+		private static boolean twokenizeFlag;
+		private static float diagSize;
+
+		@Override
+		public void setup(Context context) {
+			twokenizeFlag = context.getConfiguration().getBoolean("twokenize",
+					false);
+			diagSize = context.getConfiguration().getFloat("diagSize",
+					(float) 0.5);
+		}
+
+		@Override
+		public void map(LongWritable key, Text value, Context context)
+				throws IOException, InterruptedException {
+
+			try {
+				Status status = Status.fromJson(value.toString(), diagSize);
+				if (status.hasGeo()) {
+
+					String text = status.getText();
+
+					if (twokenizeFlag) {
+						text = Twokenize.tokenize(status.getText()).mkString(
+								" ");
+
+					}
+
+					user.set(status.getUserId());
+					data.set(status.getCreatedAt() + "\t" + status.getCoord()
+							+ "\t" + text + "\t" + status.getUserMentions());
+
+					context.write(user, data);
+				}
+
+			} catch (Exception e) {
+				System.err
+						.println("Bad JSON Input. Exception: " + e.toString());
+				System.err.println(value.toString());
+			}
+		}
+	}
+
+	// Reducer will
+	private static class PreprocessTweetsJSONReducer extends
+			Reducer<Text, Text, Text, Text> {
+		@Override
+		public void reduce(Text key, Iterable<Text> values, Context context)
+				throws IOException, InterruptedException {
+
+			for (Text val : values)
+				context.write(key, val);
+		}
+	}
+
+	/**
+	 * Creates an instance of this tool.
+	 */
+	public PreprocessTweetsJSON() {
+	}
+
+	private static int printUsage() {
+		System.out
+				.println("usage: [input-path] [output-path] [twokenize text (true or false)] [Bounding-box diagonal length] [num-reducers]");
+		ToolRunner.printGenericCommandUsage(System.out);
+		return -1;
+	}
+
+	/**
+	 * Runs this tool.
+	 */
+	@Override
+	public int run(String[] args) throws Exception {
+		if (args.length != 5) {
+			printUsage();
+			return -1;
+		}
+
+		String inputPath = args[0];
+		String outputPath = args[1];
+		String twokenizeFlag = args[2];
+		float diagSize = Float.parseFloat(args[3]);
+		int reduceTasks = Integer.parseInt(args[4]);
+
+		Configuration conf = new Configuration();
+		conf.setBoolean("twokenize", Boolean.valueOf(twokenizeFlag));
+		conf.setFloat("diagSize", diagSize);
+
+		Job job = new Job(conf, "PreprocessTweetsJSON");
+		job.setJarByClass(PreprocessTweetsJSON.class);
+
+		job.setNumReduceTasks(reduceTasks);
+
+		FileInputFormat.setInputPaths(job, new Path(inputPath));
+		FileOutputFormat.setOutputPath(job, new Path(outputPath));
+
+		job.setOutputKeyClass(Text.class);
+		job.setOutputValueClass(Text.class);
+
+		job.setMapperClass(PreprocessTweetsJSONMapper.class);
+		job.setReducerClass(PreprocessTweetsJSONReducer.class);
+
+		// Delete the output directory if it exists already
+		Path outputDir = new Path(outputPath);
+		FileSystem.get(conf).delete(outputDir, true);
+
+		sLogger.info("Tool: PreprocessTweetsJSON");
+		sLogger.info(" - input path: " + inputPath);
+		sLogger.info(" - output path: " + outputPath);
+		sLogger.info(" - twokenize: " + twokenizeFlag);
+		sLogger.info(" - number of reducers: " + reduceTasks);
+
+		long startTime = System.currentTimeMillis();
+		job.waitForCompletion(true);
+		sLogger.info("Job Finished in "
+				+ (System.currentTimeMillis() - startTime) / 1000.0
+				+ " seconds");
+
+		return 0;
+	}
+
+	/**
+	 * Dispatches command-line arguments to the tool via the
+	 * <code>ToolRunner</code>.
+	 */
+	public static void main(String[] args) throws Exception {
+		int res = ToolRunner.run(new Configuration(),
+				new PreprocessTweetsJSON(), args);
+		System.exit(res);
+	}
+
+}

File parse-tweets/main/src/java/twools/Status.java

+package twools;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+/**
+ * Singleton Object representing a status.
+ */
+public class Status {
+	private static final JsonParser parser = new JsonParser();
+	private static final SimpleDateFormat twitterFormat = new SimpleDateFormat(
+			"EEE MMM dd HH:mm:ss +0000 yyyy");
+	private static final SimpleDateFormat isoFormat = new SimpleDateFormat(
+			"yyyy-MM-dd'T'HH:mm'Z'");
+
+	private static Status status = new Status();
+
+	private int httpStatusCode;
+	private String id;
+	private String userId;
+	private String screenname;
+	private String createdAt;
+	private String text;
+	private final ArrayList<String> mentionUsers = new ArrayList<String>();;
+	private double lat;
+	private double lon;
+	private boolean geoBool;
+	private JsonObject obj;
+
+	protected Status() {
+
+	}
+
+	public boolean hasGeo() {
+		return status.geoBool;
+	}
+
+	public String getId() {
+		return status.id;
+	}
+
+	public String getUserId() {
+		return "USER_" + status.userId;
+	}
+
+	public String getText() {
+		return status.text;
+	}
+
+	public String getLat() {
+		return Double.toString(status.lat);
+	}
+
+	public String getLon() {
+		return Double.toString(status.lon);
+	}
+
+	public String getCoord() {
+		return status.lat + "," + status.lon;
+	}
+
+	public String getScreenname() {
+		return status.screenname;
+	}
+
+	public int getHttpStatusCode() {
+		return status.httpStatusCode;
+	}
+
+	public String getUserMentions() {
+		if (mentionUsers.isEmpty()) {
+			return "NONE";
+		} else {
+			String users = "";
+			for (String user : mentionUsers) {
+				users += "," + user;
+			}
+			return users.substring(1);
+		}
+	}
+
+	public String getCreatedAt() throws ParseException {
+		return isoFormat.format(twitterFormat.parse(createdAt));
+	}
+
+	public static Status fromJson(String json, float diagSize) throws Exception {
+		Preconditions.checkNotNull(json);
+
+		JsonObject obj = (JsonObject) parser.parse(json);
+		if (!obj.isJsonNull()) {
+			JsonObject user = obj.get("user").getAsJsonObject();
+			status.obj = obj;
+			status.text = obj.get("text").getAsString()
+					.replaceAll("\\r\\n|\\r|\\n", " ");// removing new lines
+			status.id = obj.get("id_str").getAsString();
+			status.userId = user.get("id_str").getAsString();
+			status.screenname = user.get("screen_name").getAsString();
+			status.createdAt = obj.get("created_at").getAsString();
+			status.getGeolocation(diagSize);
+			status.getMentions();
+
+			return status;
+		} else {
+			return null;
+		}
+	}
+
+	// Gets the mentions from a user
+	private void getMentions() {
+		mentionUsers.clear();
+		if (!obj.get("entities").isJsonNull()) {
+			if (obj.get("entities").getAsJsonObject().get("user_mentions") != null) {
+				JsonArray users = obj.get("entities").getAsJsonObject()
+						.get("user_mentions").getAsJsonArray();
+
+				for (JsonElement user : users) {
+					mentionUsers.add(user.getAsJsonObject().get("id_str")
+							.getAsString());
+				}
+			}
+
+		}
+
+	}
+
+	// Given two JsonArray points, get the distance between them
+	private double getDistance(JsonArray pt1, JsonArray pt2) {
+		double d = Math.sqrt(Math.pow(pt1.get(0).getAsDouble()
+				- pt2.get(0).getAsDouble(), 2)
+				+ Math.pow(pt1.get(1).getAsDouble() - pt2.get(1).getAsDouble(),
+						2));
+		return d;
+	}
+
+	// Validate that lat is between -90 and 90
+	// and long is between -180 and 180
+	private boolean validateCoords(double lat, double lon) {
+		if (Math.abs(lat) <= 90 && Math.abs(lon) <= 180)
+			return true;
+		return false;
+	}
+
+	// Get the geolocation of a tweet
+	// diagSize corresponds to if the geo coordiantes are in a boundng box, how
+	// small the box is...
+	private void getGeolocation(float diagSize) {
+		double lat, lon;
+		lat = lon = 0;
+		JsonObject geoTag = null;
+		JsonArray coords = null;
+
+		// Geo Location
+		if (!obj.get("geo").isJsonNull()) {
+			geoTag = obj.get("geo").getAsJsonObject();
+			coords = geoTag.get("coordinates").getAsJsonArray();
+		} else if (!obj.get("place").isJsonNull()) {
+			geoTag = obj.get("place").getAsJsonObject().get("bounding_box")
+					.getAsJsonObject();
+			coords = geoTag.get("coordinates").getAsJsonArray();
+		} else
+			return;
+
+		// Point or Polygon
+		if (geoTag.get("type").getAsString().equals("Point")) {
+			lat = coords.get(0).getAsDouble();
+			lon = coords.get(1).getAsDouble();
+		} else if (geoTag.get("type").getAsString().equals("Polygon")) {
+			JsonArray coordPoints = coords.get(0).getAsJsonArray();
+
+			// Must be a quadralateral
+			if (coordPoints.size() == 4) {
+
+				JsonArray pt1 = (JsonArray) coordPoints.get(0);
+				JsonArray pt3 = (JsonArray) coordPoints.get(2);
+
+				// Small Area
+				if (getDistance(pt1, pt3) < diagSize) {
+
+					for (int i = 0; i < coordPoints.size(); i++) {
+						JsonArray coord = (JsonArray) coordPoints.get(i);
+						lat += coord.get(1).getAsDouble();
+						lon += coord.get(0).getAsDouble();
+					}
+
+					lat = lat / coordPoints.size();
+					lon = lon / coordPoints.size();
+
+				} else {
+					status.geoBool = false;
+					return;
+				}
+			}
+
+		} else {
+			System.err.println(geoTag.get("type").getAsString()
+					+ " is Not Point or Polygon");
+			return;
+		}
+
+		if (validateCoords(lat, lon)) {
+			// Update the lat and lon
+			status.lat = lat;
+			status.lon = lon;
+			status.geoBool = true;
+		} else {
+			System.err.println("Lat " + lat + " and Long " + lon
+					+ " are not in the proper range.");
+			status.geoBool = false;
+		}
+	}
+}

File parse-tweets/main/src/scala/fogbow/textgrounder/Preprocess.scala

+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package fogbow.textgrounder;
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.io.{IntWritable, NullWritable, Text}
+import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer}
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+import org.apache.hadoop.util.GenericOptionsParser
+import scala.collection.JavaConversions._
+
+class JsonTweetMapper extends Mapper[Object, Text, Text, NullWritable] {
+  val data = new Text
+
+  override 
+  def map (key: Object, value: Text, context: Mapper[Object, Text, Text, NullWritable]#Context) {
+	
+	try {
+	    val status = Status.fromJson(value.toString)
+	    if (status.hasGeo()) {
+
+		  data.set(List(
+		    status.getUserId(),
+		    status.getCreatedAt(),
+		    status.getCoord(),
+		    status.getLat(),
+		    status.getLon(),
+		    status.getText()
+		  ) mkString "\t")
+	
+		  context.write(data, NullWritable.get())
+	    }
+	} catch {
+	  case e: Exception => { 
+		System.err.println("Bad JSON?") 
+		System.err.println(value.toString)
+		}
+	}
+  }
+}
+
+class IdentityReducer extends Reducer[Text,NullWritable,Text,NullWritable] {
+
+ override 
+ def reduce (key: Text, values: java.lang.Iterable[NullWritable], 
+ 			context: Reducer[Text,NullWritable,Text,NullWritable]#Context) {
+	context write(key, NullWritable.get())
+  }
+}
+
+object PreprocessJsonTweets {
+  def main (args: Array[String]) {
+    if (args.length != 3) {
+      println("Usage: <inputPath> <outputPath> <numReduceTasks>")
+      System.exit(1)
+    }
+
+    val conf = new Configuration()
+    val job = new Job(conf, "")
+   
+    val outputPath = new Path(args(1))
+    FileInputFormat.addInputPath(job, new Path(args(0)))
+    FileOutputFormat.setOutputPath(job, outputPath)
+    FileSystem.get(conf).delete(outputPath, true)
+    job.setNumReduceTasks(args(2).toInt)
+
+    job.setJarByClass(classOf[JsonTweetMapper])
+    job.setMapperClass(classOf[JsonTweetMapper])
+    job.setOutputKeyClass(classOf[Text])
+    job.setOutputValueClass(classOf[NullWritable])
+    //job.setReducerClass(classOf[IdentityReducer])
+
+    System.exit(if (job.waitForCompletion(true)) 0 else 1)
+  }
+}

File twitter-pull/parse-tweets/lib/gson-1.6.jar

Binary file removed.

File twitter-pull/parse-tweets/lib/guava-r08.jar

Binary file removed.

File twitter-pull/parse-tweets/main/src/java/fogbow/textgrounder/GetTwitterUsers.java

-package fogbow.textgrounder;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.log4j.Logger;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-/**
- * <p>
- * outputs a sorted list of all the words that have come after it in the
- * training corpus, and takes the following command-line arguments:
- * </p>
- * 
- * <ul>
- * <li>[input-path] input path</li>
- * <li>[output-path] output path</li>
- * <li>[num-reducers] number of reducers</li>
- * </ul>
- * 
- * 
- * Taken and adapted from edu.umd.cloud9.demo.CloudNineWordCount
- * 
- * @author Andy Luong & Erik Skiles
- */
-public class GetTwitterUsers extends Configured implements Tool {
-	private static final Logger sLogger = Logger
-			.getLogger(GetTwitterUsers.class);
-
-	private static class GetTwitterUsersMapper extends
-			Mapper<LongWritable, Text, Text, NullWritable> {
-
-		Text out = new Text();
-
-		@Override
-		public void map(LongWritable key, Text value, Context context)
-				throws IOException, InterruptedException {
-
-			try {
-				JSONObject json = new JSONObject(value.toString());
-				JSONObject user = json.getJSONObject("user");
-				String id = user.getString("id_str");
-				out.set(id);
-				context.write(out, NullWritable.get());
-
-			} catch (JSONException e) {
-				// TODO Auto-generated catch block
-				e.printStackTrace();
-			}
-		}
-	}
-
-	private static class GetTwitterUsersReducer extends
-			Reducer<Text, NullWritable, Text, NullWritable> {
-		@Override
-		public void reduce(Text key, Iterable<NullWritable> values,
-				Context context) throws IOException, InterruptedException {
-			context.write(key, NullWritable.get());
-		}
-	}
-
-	/**
-	 * Creates an instance of this tool.
-	 */
-	public GetTwitterUsers() {
-	}
-
-	private static int printUsage() {
-		System.out.println("usage: [input-path] [output-path] [num-reducers]");
-		ToolRunner.printGenericCommandUsage(System.out);
-		return -1;
-	}
-
-	/**
-	 * Runs this tool.
-	 */
-	public int run(String[] args) throws Exception {
-		if (args.length != 3) {
-			printUsage();
-			return -1;
-		}
-
-		String inputPath = args[0];
-		String outputPath = args[1];
-		int reduceTasks = Integer.parseInt(args[2]);
-
-		sLogger.info("Tool: GetTwitterUsers");
-		sLogger.info(" - input path: " + inputPath);
-		sLogger.info(" - output path: " + outputPath);
-		sLogger.info(" - number of reducers: " + reduceTasks);
-
-		Configuration conf = new Configuration();
-		Job job = new Job(conf, "GetTwitterUsers");
-		job.setJarByClass(GetTwitterUsers.class);
-
-		job.setNumReduceTasks(reduceTasks);
-
-		FileInputFormat.setInputPaths(job, new Path(inputPath));
-		FileOutputFormat.setOutputPath(job, new Path(outputPath));
-
-		job.setOutputKeyClass(Text.class);
-		job.setOutputValueClass(NullWritable.class);
-		job.setMapOutputValueClass(NullWritable.class);
-
-		job.setMapperClass(GetTwitterUsersMapper.class);
-		job.setCombinerClass(GetTwitterUsersReducer.class);
-		job.setReducerClass(GetTwitterUsersReducer.class);
-
-		// Delete the output directory if it exists already
-		Path outputDir = new Path(outputPath);
-		FileSystem.get(conf).delete(outputDir, true);
-
-		long startTime = System.currentTimeMillis();
-		job.waitForCompletion(true);
-		sLogger.info("Job Finished in "
-				+ (System.currentTimeMillis() - startTime) / 1000.0
-				+ " seconds");
-
-		return 0;
-	}
-
-	/**
-	 * Dispatches command-line arguments to the tool via the
-	 * <code>ToolRunner</code>.
-	 */
-	public static void main(String[] args) throws Exception {
-		int res = ToolRunner.run(new Configuration(), new GetTwitterUsers(),
-				args);
-		System.exit(res);
-	}
-
-}

File twitter-pull/parse-tweets/main/src/java/twools/PreprocessTweetsJSON.java

-package twools;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.log4j.Logger;
-
-/**
- * <p>
- * command-line arguments:
- * </p>
- * 
- * <ul>
- * <li>[input-path] input path</li>
- * <li>[output-path] output path</li>
- * <li>[num-reducers] number of reducers</li>
- * </ul>
- * 
- * 
- * Taken and adapted from edu.umd.cloud9.demo.CloudNineWordCount
- * 
- * @author Andy Luong
- */
-public class PreprocessTweetsJSON extends Configured implements Tool {
-	private static final Logger sLogger = Logger
-			.getLogger(PreprocessTweetsJSON.class);
-
-	private static class PreprocessTweetsJSONMapper extends
-			Mapper<LongWritable, Text, Text, Text> {
-
-		Text user = new Text();
-		Text data = new Text();
-
-		private static boolean twokenizeFlag;
-		private static float diagSize;
-
-		@Override
-		public void setup(Context context) {
-			twokenizeFlag = context.getConfiguration().getBoolean("twokenize",
-					false);
-			diagSize = context.getConfiguration().getFloat("diagSize",
-					(float) 0.5);
-		}
-
-		@Override
-		public void map(LongWritable key, Text value, Context context)
-				throws IOException, InterruptedException {
-
-			try {
-				Status status = Status.fromJson(value.toString(), diagSize);
-				if (status.hasGeo()) {
-
-					String text = status.getText();
-
-					if (twokenizeFlag) {
-						text = Twokenize.tokenize(status.getText()).mkString(
-								" ");
-
-					}
-
-					user.set(status.getUserId());
-					data.set(status.getCreatedAt() + "\t" + status.getCoord()
-							+ "\t" + text + "\t" + status.getUserMentions());
-
-					context.write(user, data);
-				}
-
-			} catch (Exception e) {
-				System.err
-						.println("Bad JSON Input. Exception: " + e.toString());
-				System.err.println(value.toString());
-			}
-		}
-	}
-
-	// Reducer will
-	private static class PreprocessTweetsJSONReducer extends
-			Reducer<Text, Text, Text, Text> {
-		@Override
-		public void reduce(Text key, Iterable<Text> values, Context context)
-				throws IOException, InterruptedException {
-
-			for (Text val : values)
-				context.write(key, val);
-		}
-	}
-
-	/**
-	 * Creates an instance of this tool.
-	 */
-	public PreprocessTweetsJSON() {
-	}
-
-	private static int printUsage() {
-		System.out
-				.println("usage: [input-path] [output-path] [twokenize text (true or false)] [Bounding-box diagonal length] [num-reducers]");
-		ToolRunner.printGenericCommandUsage(System.out);
-		return -1;
-	}
-
-	/**
-	 * Runs this tool.
-	 */
-	@Override
-	public int run(String[] args) throws Exception {
-		if (args.length != 5) {
-			printUsage();
-			return -1;
-		}
-
-		String inputPath = args[0];
-		String outputPath = args[1];
-		String twokenizeFlag = args[2];
-		float diagSize = Float.parseFloat(args[3]);
-		int reduceTasks = Integer.parseInt(args[4]);
-
-		Configuration conf = new Configuration();
-		conf.setBoolean("twokenize", Boolean.valueOf(twokenizeFlag));
-		conf.setFloat("diagSize", diagSize);
-
-		Job job = new Job(conf, "PreprocessTweetsJSON");
-		job.setJarByClass(PreprocessTweetsJSON.class);
-
-		job.setNumReduceTasks(reduceTasks);
-
-		FileInputFormat.setInputPaths(job, new Path(inputPath));
-		FileOutputFormat.setOutputPath(job, new Path(outputPath));
-
-		job.setOutputKeyClass(Text.class);
-		job.setOutputValueClass(Text.class);
-
-		job.setMapperClass(PreprocessTweetsJSONMapper.class);
-		job.setReducerClass(PreprocessTweetsJSONReducer.class);
-
-		// Delete the output directory if it exists already
-		Path outputDir = new Path(outputPath);
-		FileSystem.get(conf).delete(outputDir, true);
-
-		sLogger.info("Tool: PreprocessTweetsJSON");
-		sLogger.info(" - input path: " + inputPath);
-		sLogger.info(" - output path: " + outputPath);
-		sLogger.info(" - twokenize: " + twokenizeFlag);
-		sLogger.info(" - number of reducers: " + reduceTasks);
-
-		long startTime = System.currentTimeMillis();
-		job.waitForCompletion(true);
-		sLogger.info("Job Finished in "
-				+ (System.currentTimeMillis() - startTime) / 1000.0
-				+ " seconds");
-
-		return 0;
-	}
-
-	/**
-	 * Dispatches command-line arguments to the tool via the
-	 * <code>ToolRunner</code>.
-	 */
-	public static void main(String[] args) throws Exception {
-		int res = ToolRunner.run(new Configuration(),
-				new PreprocessTweetsJSON(), args);
-		System.exit(res);
-	}
-
-}

File twitter-pull/parse-tweets/main/src/java/twools/Status.java

-package twools;
-
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-
-import com.google.common.base.Preconditions;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-
-/**
- * Singleton Object representing a status.
- */
-public class Status {
-	private static final JsonParser parser = new JsonParser();
-	private static final SimpleDateFormat twitterFormat = new SimpleDateFormat(
-			"EEE MMM dd HH:mm:ss +0000 yyyy");
-	private static final SimpleDateFormat isoFormat = new SimpleDateFormat(
-			"yyyy-MM-dd'T'HH:mm'Z'");
-
-	private static Status status = new Status();
-
-	private int httpStatusCode;
-	private String id;
-	private String userId;
-	private String screenname;
-	private String createdAt;
-	private String text;
-	private final ArrayList<String> mentionUsers = new ArrayList<String>();;
-	private double lat;
-	private double lon;
-	private boolean geoBool;
-	private JsonObject obj;
-
-	protected Status() {
-
-	}
-
-	public boolean hasGeo() {
-		return status.geoBool;
-	}
-
-	public String getId() {
-		return status.id;
-	}
-
-	public String getUserId() {
-		return "USER_" + status.userId;
-	}
-
-	public String getText() {
-		return status.text;
-	}
-
-	public String getLat() {
-		return Double.toString(status.lat);
-	}
-
-	public String getLon() {
-		return Double.toString(status.lon);
-	}
-
-	public String getCoord() {
-		return status.lat + "," + status.lon;
-	}
-
-	public String getScreenname() {
-		return status.screenname;
-	}
-
-	public int getHttpStatusCode() {
-		return status.httpStatusCode;
-	}
-
-	public String getUserMentions() {
-		if (mentionUsers.isEmpty()) {
-			return "NONE";
-		} else {
-			String users = "";
-			for (String user : mentionUsers) {
-				users += "," + user;
-			}
-			return users.substring(1);
-		}
-	}
-
-	public String getCreatedAt() throws ParseException {
-		return isoFormat.format(twitterFormat.parse(createdAt));
-	}
-
-	public static Status fromJson(String json, float diagSize) throws Exception {
-		Preconditions.checkNotNull(json);
-
-		JsonObject obj = (JsonObject) parser.parse(json);
-		if (!obj.isJsonNull()) {
-			JsonObject user = obj.get("user").getAsJsonObject();
-			status.obj = obj;
-			status.text = obj.get("text").getAsString()
-					.replaceAll("\\r\\n|\\r|\\n", " ");// removing new lines
-			status.id = obj.get("id_str").getAsString();
-			status.userId = user.get("id_str").getAsString();
-			status.screenname = user.get("screen_name").getAsString();
-			status.createdAt = obj.get("created_at").getAsString();
-			status.getGeolocation(diagSize);
-			status.getMentions();
-
-			return status;
-		} else {
-			return null;
-		}
-	}
-
-	// Gets the mentions from a user
-	private void getMentions() {
-		mentionUsers.clear();
-		if (!obj.get("entities").isJsonNull()) {
-			if (obj.get("entities").getAsJsonObject().get("user_mentions") != null) {
-				JsonArray users = obj.get("entities").getAsJsonObject()
-						.get("user_mentions").getAsJsonArray();
-
-				for (JsonElement user : users) {
-					mentionUsers.add(user.getAsJsonObject().get("id_str")
-							.getAsString());
-				}
-			}
-
-		}
-
-	}
-
-	// Given two JsonArray points, get the distance between them
-	private double getDistance(JsonArray pt1, JsonArray pt2) {
-		double d = Math.sqrt(Math.pow(pt1.get(0).getAsDouble()
-				- pt2.get(0).getAsDouble(), 2)
-				+ Math.pow(pt1.get(1).getAsDouble() - pt2.get(1).getAsDouble(),
-						2));
-		return d;
-	}
-
-	// Validate that lat is between -90 and 90
-	// and long is between -180 and 180
-	private boolean validateCoords(double lat, double lon) {
-		if (Math.abs(lat) <= 90 && Math.abs(lon) <= 180)
-			return true;
-		return false;
-	}
-
-	// Get the geolocation of a tweet
-	// diagSize corresponds to if the geo coordiantes are in a boundng box, how
-	// small the box is...
-	private void getGeolocation(float diagSize) {
-		double lat, lon;
-		lat = lon = 0;
-		JsonObject geoTag = null;
-		JsonArray coords = null;
-
-		// Geo Location
-		if (!obj.get("geo").isJsonNull()) {
-			geoTag = obj.get("geo").getAsJsonObject();
-			coords = geoTag.get("coordinates").getAsJsonArray();
-		} else if (!obj.get("place").isJsonNull()) {
-			geoTag = obj.get("place").getAsJsonObject().get("bounding_box")
-					.getAsJsonObject();
-			coords = geoTag.get("coordinates").getAsJsonArray();
-		} else
-			return;
-
-		// Point or Polygon
-		if (geoTag.get("type").getAsString().equals("Point")) {
-			lat = coords.get(0).getAsDouble();
-			lon = coords.get(1).getAsDouble();
-		} else if (geoTag.get("type").getAsString().equals("Polygon")) {
-			JsonArray coordPoints = coords.get(0).getAsJsonArray();
-
-			// Must be a quadralateral
-			if (coordPoints.size() == 4) {
-
-				JsonArray pt1 = (JsonArray) coordPoints.get(0);
-				JsonArray pt3 = (JsonArray) coordPoints.get(2);
-
-				// Small Area
-				if (getDistance(pt1, pt3) < diagSize) {
-
-					for (int i = 0; i < coordPoints.size(); i++) {
-						JsonArray coord = (JsonArray) coordPoints.get(i);
-						lat += coord.get(1).getAsDouble();
-						lon += coord.get(0).getAsDouble();
-					}
-
-					lat = lat / coordPoints.size();
-					lon = lon / coordPoints.size();
-
-				} else {
-					status.geoBool = false;
-					return;
-				}
-			}
-
-		} else {
-			System.err.println(geoTag.get("type").getAsString()
-					+ " is Not Point or Polygon");
-			return;
-		}
-
-		if (validateCoords(lat, lon)) {
-			// Update the lat and lon
-			status.lat = lat;
-			status.lon = lon;
-			status.geoBool = true;
-		} else {
-			System.err.println("Lat " + lat + " and Long " + lon
-					+ " are not in the proper range.");
-			status.geoBool = false;
-		}
-	}
-}

File twitter-pull/parse-tweets/main/src/scala/fogbow/textgrounder/Preprocess.scala

-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package fogbow.textgrounder;
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.io.{IntWritable, NullWritable, Text}
-import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer}
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
-import org.apache.hadoop.util.GenericOptionsParser
-import scala.collection.JavaConversions._
-
-class JsonTweetMapper extends Mapper[Object, Text, Text, NullWritable] {
-  val data = new Text
-
-  override 
-  def map (key: Object, value: Text, context: Mapper[Object, Text, Text, NullWritable]#Context) {
-	
-	try {
-	    val status = Status.fromJson(value.toString)
-	    if (status.hasGeo()) {
-
-		  data.set(List(
-		    status.getUserId(),
-		    status.getCreatedAt(),
-		    status.getCoord(),
-		    status.getLat(),
-		    status.getLon(),
-		    status.getText()
-		  ) mkString "\t")
-	
-		  context.write(data, NullWritable.get())
-	    }
-	} catch {
-	  case e: Exception => { 
-		System.err.println("Bad JSON?") 
-		System.err.println(value.toString)
-		}
-	}
-  }
-}
-
-class IdentityReducer extends Reducer[Text,NullWritable,Text,NullWritable] {
-
- override 
- def reduce (key: Text, values: java.lang.Iterable[NullWritable], 
- 			context: Reducer[Text,NullWritable,Text,NullWritable]#Context) {
-	context write(key, NullWritable.get())
-  }
-}
-
-object PreprocessJsonTweets {
-  def main (args: Array[String]) {
-    if (args.length != 3) {
-      println("Usage: <inputPath> <outputPath> <numReduceTasks>")
-      System.exit(1)
-    }
-
-    val conf = new Configuration()
-    val job = new Job(conf, "")
-   
-    val outputPath = new Path(args(1))
-    FileInputFormat.addInputPath(job, new Path(args(0)))
-    FileOutputFormat.setOutputPath(job, outputPath)
-    FileSystem.get(conf).delete(outputPath, true)
-    job.setNumReduceTasks(args(2).toInt)
-
-    job.setJarByClass(classOf[JsonTweetMapper])
-    job.setMapperClass(classOf[JsonTweetMapper])
-    job.setOutputKeyClass(classOf[Text])
-    job.setOutputValueClass(classOf[NullWritable])
-    //job.setReducerClass(classOf[IdentityReducer])
-
-    System.exit(if (job.waitForCompletion(true)) 0 else 1)
-  }
-}