Commits

Andy Luong committed 17d9af4

Moved Preprocessing Code into Twools package
-Significant restructuring of the Status.Java code, uses a singleton now
-Modifed Preprocess to output mentions (includes reply) users and modified the output syntax
-Adjustment on how bounding boxes are handled for coords

Comments (0)

Files changed (4)

twitter-pull/parse-tweets/main/src/java/fogbow/textgrounder/PreprocessTweetsJSON.java

-package fogbow.textgrounder;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.log4j.Logger;
-
-/**
- * <p>
- * command-line arguments:
- * </p>
- * 
- * <ul>
- * <li>[input-path] input path</li>
- * <li>[output-path] output path</li>
- * <li>[num-reducers] number of reducers</li>
- * </ul>
- * 
- * 
- * Taken and adapted from edu.umd.cloud9.demo.CloudNineWordCount
- * 
- * @author Andy Luong
- */
-public class PreprocessTweetsJSON extends Configured implements Tool {
-	private static final Logger sLogger = Logger
-			.getLogger(GetTwitterUsers.class);
-
-	// static int count = 0;
-
-	private static class PreprocessTweetsJSONMapper extends
-			Mapper<LongWritable, Text, Text, Text> {
-
-		Text user = new Text();
-		Text data = new Text();
-
-		@Override
-		public void map(LongWritable key, Text value, Context context)
-				throws IOException, InterruptedException {
-
-			try {
-				Status status = Status.fromJson(value.toString());
-				if (status.hasGeo()) {
-					user.set(status.getId());
-					data.set(status.getUserId() + "\t" + status.getCreatedAt()
-							+ "\t" + status.getCoord() + "\t" + status.getLat()
-							+ "\t" + status.getLon() + "\t" + status.getText());
-
-					context.write(user, data);
-				}
-
-			} catch (Exception e) {
-				System.err.println("Bad JSON?");
-				System.err.println(value.toString());
-			}
-
-			/*
-			 * count++; if (count % 100000 == 0) System.out.println(count);
-			 */
-		}
-	}
-
-	private static class PreprocessTweetsJSONReducer extends
-			Reducer<Text, Text, Text, NullWritable> {
-		@Override
-		public void reduce(Text key, Iterable<Text> values, Context context)
-				throws IOException, InterruptedException {
-
-			for (Text val : values)
-				context.write(val, NullWritable.get());
-		}
-	}
-
-	/**
-	 * Creates an instance of this tool.
-	 */
-	public PreprocessTweetsJSON() {
-	}
-
-	private static int printUsage() {
-		System.out.println("usage: [input-path] [output-path] [num-reducers]");
-		ToolRunner.printGenericCommandUsage(System.out);
-		return -1;
-	}
-
-	/**
-	 * Runs this tool.
-	 */
-	public int run(String[] args) throws Exception {
-		if (args.length != 3) {
-			printUsage();
-			return -1;
-		}
-
-		String inputPath = args[0];
-		String outputPath = args[1];
-		int reduceTasks = Integer.parseInt(args[2]);
-
-		sLogger.info("Tool: PreprocessTweetsJSON");
-		sLogger.info(" - input path: " + inputPath);
-		sLogger.info(" - output path: " + outputPath);
-		sLogger.info(" - number of reducers: " + reduceTasks);
-
-		Configuration conf = new Configuration();
-		Job job = new Job(conf, "PreprocessTweetsJSON");
-		job.setJarByClass(PreprocessTweetsJSON.class);
-
-		job.setNumReduceTasks(reduceTasks);
-
-		FileInputFormat.setInputPaths(job, new Path(inputPath));
-		FileOutputFormat.setOutputPath(job, new Path(outputPath));
-
-		job.setOutputKeyClass(Text.class);
-		job.setOutputValueClass(NullWritable.class);
-		job.setMapOutputValueClass(Text.class);
-
-		job.setMapperClass(PreprocessTweetsJSONMapper.class);
-		job.setReducerClass(PreprocessTweetsJSONReducer.class);
-
-		// Delete the output directory if it exists already
-		Path outputDir = new Path(outputPath);
-		FileSystem.get(conf).delete(outputDir, true);
-
-		long startTime = System.currentTimeMillis();
-		job.waitForCompletion(true);
-		sLogger.info("Job Finished in "
-				+ (System.currentTimeMillis() - startTime) / 1000.0
-				+ " seconds");
-
-		return 0;
-	}
-
-	/**
-	 * Dispatches command-line arguments to the tool via the
-	 * <code>ToolRunner</code>.
-	 */
-	public static void main(String[] args) throws Exception {
-		int res = ToolRunner.run(new Configuration(),
-				new PreprocessTweetsJSON(), args);
-		System.exit(res);
-	}
-
-}

twitter-pull/parse-tweets/main/src/java/fogbow/textgrounder/Status.java

-package fogbow.textgrounder;
-
-import com.google.common.base.Preconditions;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import java.text.SimpleDateFormat;
-import java.text.ParseException;
-
-/**
- * Object representing a status.
- */
-public class Status {
-  private static final JsonParser parser = new JsonParser();
-  private static final SimpleDateFormat twitterFormat = new SimpleDateFormat("EEE MMM dd HH:mm:ss +0000 yyyy");
-  private static final SimpleDateFormat isoFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm'Z'");
-
-  private String id;
-  private String userId;
-  private String screenname;
-  private String createdAt;
-  private String lat;
-  private String lon;
-  private String text;
-  private int httpStatusCode;
-  private JsonObject jsonObject;
-  private String jsonString;
-  private boolean geoBool;
-
-  protected Status() {}
-
-  public boolean hasGeo() {return geoBool; }
-  public String getId() { return id; }
-  public String getUserId() { return "USER_" + userId; }
-  public String getText() { return text; }
-  public String getLat() { return lat; }
-  public String getLon() { return lon; }
-  public String getCoord() { return "??:" + lat + "," + lon; }
-  public String getScreenname() { return screenname; }
-  public int getHttpStatusCode() { return httpStatusCode; }
-  public JsonObject getJsonObject() { return jsonObject; }
-  public String getJsonString() { return jsonString; }
-  public String getCreatedAt() throws ParseException {
-    return isoFormat.format(twitterFormat.parse(createdAt));
-  }
-
-  public static Status fromJson(String json) throws Exception {
-	Preconditions.checkNotNull(json);
-
-	JsonObject obj = (JsonObject) parser.parse(json);
-	JsonObject user = obj.get("user").getAsJsonObject();
-
-	Status status = new Status();
-	status.text = obj.get("text").getAsString();
-	status.id = obj.get("id_str").getAsString();
-	status.userId = user.get("id_str").getAsString();
-	status.screenname = user.get("screen_name").getAsString();
-	status.createdAt = obj.get("created_at").getAsString();
-	status.geoBool = false;
-	
-	double lat, lon;
-	lat = lon = 0; 
-	
-	if (!obj.get("geo").isJsonNull()) {
-		JsonObject geo = obj.get("geo").getAsJsonObject();
-        JsonArray coords = geo.get("coordinates").getAsJsonArray();
-        
-		if(geo.get("type").getAsString().equals("Point")) {
-			lat = coords.get(0).getAsDouble();
-			lon = coords.get(1).getAsDouble();
-			status.geoBool = true;
-		}
-		else if(geo.get("type").getAsString().equals("Polygon")) {
-			
-			JsonArray coordP = coords.get(0).getAsJsonArray();
-			
-			//Must be a quadralateral
-			if (coordP.size() == 4) {
-				
-				JsonArray pt1 = (JsonArray) coordP.get(0);
-				//JsonArray pt2 = (JsonArray) coordP.get(1);
-				JsonArray pt3 = (JsonArray) coordP.get(2);
-				//JsonArray pt4 = (JsonArray) coordP.get(3);
-				
-				double diag = getDistance(pt1, pt3);
-				
-				//Small Area
-				if( diag < .5) {
-				
-					for(int i = 0; i < coordP.size(); i++) {
-						JsonArray coord = (JsonArray)coordP.get(i);
-						lat += coord.get(0).getAsDouble();
-						lon += coord.get(1).getAsDouble();
-					}
-					lat = lat / coordP.size();
-					lon = lon / coordP.size();
-					
-					status.geoBool = true;
-				}
-			}
-		}
-		else
-			System.out.println("Not Point or Polygon");
-		
-		status.lat = ""+lat;
-		status.lon = ""+lon;
-	}
-	else if(!obj.get("place").isJsonNull()) {
-		JsonObject boundingbox = obj.get("place").getAsJsonObject().get("bounding_box").getAsJsonObject();
-		JsonArray coords = boundingbox.get("coordinates").getAsJsonArray();
-		
-
-		
-		if(boundingbox.get("type").getAsString().equals("Point")) {
-			lat = coords.get(0).getAsDouble();
-			lon = coords.get(1).getAsDouble();
-			status.geoBool = true;
-		}
-		else if(boundingbox.get("type").getAsString().equals("Polygon")) {
-		
-			JsonArray coordP = coords.get(0).getAsJsonArray();
-			
-			//Must be a quadralateral
-			if (coordP.size() == 4) {
-				
-				JsonArray pt1 = (JsonArray) coordP.get(0);
-				//JsonArray pt2 = (JsonArray) coordP.get(1);
-				JsonArray pt3 = (JsonArray) coordP.get(2);
-				//JsonArray pt4 = (JsonArray) coordP.get(3);
-				
-				double diag = getDistance(pt1, pt3);
-				
-				//Small Area
-				if( diag < .5) {
-				
-					for(int i = 0; i < coordP.size(); i++) {
-						JsonArray coord = (JsonArray)coordP.get(i);
-						lat += coord.get(0).getAsDouble();
-						lon += coord.get(1).getAsDouble();
-					}
-					lat = lat / coordP.size();
-					lon = lon / coordP.size();
-					
-					status.geoBool = true;
-				}
-			}
-		}
-		else
-			System.out.println("Not Point or Polygon");
-		
-		status.lat = ""+lat;
-		status.lon = ""+lon;
-	}
-
-	// TODO: We need to parse out the other fields.
-	status.jsonObject = obj;
-	status.jsonString = json;
-
-	return status;
-  }
-  
-  public static double getDistance(JsonArray pt1, JsonArray pt2) {
-  	double d = Math.sqrt(
-  			Math.pow(pt1.get(0).getAsDouble() - pt2.get(0).getAsDouble(), 2) + 
-  			Math.pow(pt1.get(1).getAsDouble() - pt2.get(1).getAsDouble(), 2)
-  			);
-  			
-  	return d;
-  }
-  
-}

twitter-pull/parse-tweets/main/src/java/twools/PreprocessTweetsJSON.java

+package twools;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+/**
+ * <p>
+ * command-line arguments:
+ * </p>
+ * 
+ * <ul>
+ * <li>[input-path] input path</li>
+ * <li>[output-path] output path</li>
+ * <li>[num-reducers] number of reducers</li>
+ * </ul>
+ * 
+ * 
+ * Taken and adapted from edu.umd.cloud9.demo.CloudNineWordCount
+ * 
+ * @author Andy Luong
+ */
+public class PreprocessTweetsJSON extends Configured implements Tool {
+	private static final Logger sLogger = Logger
+			.getLogger(PreprocessTweetsJSON.class);
+
+	private static class PreprocessTweetsJSONMapper extends
+			Mapper<LongWritable, Text, Text, Text> {
+
+		Text user = new Text();
+		Text data = new Text();
+
+		private static boolean twokenizeFlag;
+		private static float diagSize;
+
+		@Override
+		public void setup(Context context) {
+			twokenizeFlag = context.getConfiguration().getBoolean("twokenize",
+					false);
+			diagSize = context.getConfiguration().getFloat("diagSize",
+					(float) 0.5);
+		}
+
+		@Override
+		public void map(LongWritable key, Text value, Context context)
+				throws IOException, InterruptedException {
+
+			try {
+				Status status = Status.fromJson(value.toString(), diagSize);
+				if (status.hasGeo()) {
+
+					String text = status.getText();
+
+					if (twokenizeFlag) {
+						text = Twokenize.tokenize(status.getText()).mkString(
+								" ");
+
+					}
+
+					user.set(status.getUserId());
+					data.set(status.getCreatedAt() + "\t" + status.getCoord()
+							+ "\t" + text + "\t" + status.getUserMentions());
+
+					context.write(user, data);
+				}
+
+			} catch (Exception e) {
+				System.err
+						.println("Bad JSON Input. Exception: " + e.toString());
+				System.err.println(value.toString());
+			}
+		}
+	}
+
+	// Reducer will
+	private static class PreprocessTweetsJSONReducer extends
+			Reducer<Text, Text, Text, Text> {
+		@Override
+		public void reduce(Text key, Iterable<Text> values, Context context)
+				throws IOException, InterruptedException {
+
+			for (Text val : values)
+				context.write(key, val);
+		}
+	}
+
+	/**
+	 * Creates an instance of this tool.
+	 */
+	public PreprocessTweetsJSON() {
+	}
+
+	private static int printUsage() {
+		System.out
+				.println("usage: [input-path] [output-path] [twokenize text (true or false)] [Bounding-box diagonal length] [num-reducers]");
+		ToolRunner.printGenericCommandUsage(System.out);
+		return -1;
+	}
+
+	/**
+	 * Runs this tool.
+	 */
+	@Override
+	public int run(String[] args) throws Exception {
+		if (args.length != 5) {
+			printUsage();
+			return -1;
+		}
+
+		String inputPath = args[0];
+		String outputPath = args[1];
+		String twokenizeFlag = args[2];
+		float diagSize = Float.parseFloat(args[3]);
+		int reduceTasks = Integer.parseInt(args[4]);
+
+		Configuration conf = new Configuration();
+		conf.setBoolean("twokenize", Boolean.valueOf(twokenizeFlag));
+		conf.setFloat("diagSize", diagSize);
+
+		Job job = new Job(conf, "PreprocessTweetsJSON");
+		job.setJarByClass(PreprocessTweetsJSON.class);
+
+		job.setNumReduceTasks(reduceTasks);
+
+		FileInputFormat.setInputPaths(job, new Path(inputPath));
+		FileOutputFormat.setOutputPath(job, new Path(outputPath));
+
+		job.setOutputKeyClass(Text.class);
+		job.setOutputValueClass(Text.class);
+
+		job.setMapperClass(PreprocessTweetsJSONMapper.class);
+		job.setReducerClass(PreprocessTweetsJSONReducer.class);
+
+		// Delete the output directory if it exists already
+		Path outputDir = new Path(outputPath);
+		FileSystem.get(conf).delete(outputDir, true);
+
+		sLogger.info("Tool: PreprocessTweetsJSON");
+		sLogger.info(" - input path: " + inputPath);
+		sLogger.info(" - output path: " + outputPath);
+		sLogger.info(" - twokenize: " + twokenizeFlag);
+		sLogger.info(" - number of reducers: " + reduceTasks);
+
+		long startTime = System.currentTimeMillis();
+		job.waitForCompletion(true);
+		sLogger.info("Job Finished in "
+				+ (System.currentTimeMillis() - startTime) / 1000.0
+				+ " seconds");
+
+		return 0;
+	}
+
+	/**
+	 * Dispatches command-line arguments to the tool via the
+	 * <code>ToolRunner</code>.
+	 */
+	public static void main(String[] args) throws Exception {
+		int res = ToolRunner.run(new Configuration(),
+				new PreprocessTweetsJSON(), args);
+		System.exit(res);
+	}
+
+}

twitter-pull/parse-tweets/main/src/java/twools/Status.java

+package twools;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+/**
+ * Singleton Object representing a status.
+ */
+public class Status {
+	private static final JsonParser parser = new JsonParser();
+	private static final SimpleDateFormat twitterFormat = new SimpleDateFormat(
+			"EEE MMM dd HH:mm:ss +0000 yyyy");
+	private static final SimpleDateFormat isoFormat = new SimpleDateFormat(
+			"yyyy-MM-dd'T'HH:mm'Z'");
+
+	private static Status status = new Status();
+
+	private int httpStatusCode;
+	private String id;
+	private String userId;
+	private String screenname;
+	private String createdAt;
+	private String text;
+	private final ArrayList<String> mentionUsers = new ArrayList<String>();;
+	private double lat;
+	private double lon;
+	private boolean geoBool;
+	private JsonObject obj;
+
+	protected Status() {
+
+	}
+
+	public boolean hasGeo() {
+		return status.geoBool;
+	}
+
+	public String getId() {
+		return status.id;
+	}
+
+	public String getUserId() {
+		return "USER_" + status.userId;
+	}
+
+	public String getText() {
+		return status.text;
+	}
+
+	public String getLat() {
+		return Double.toString(status.lat);
+	}
+
+	public String getLon() {
+		return Double.toString(status.lon);
+	}
+
+	public String getCoord() {
+		return status.lat + "," + status.lon;
+	}
+
+	public String getScreenname() {
+		return status.screenname;
+	}
+
+	public int getHttpStatusCode() {
+		return status.httpStatusCode;
+	}
+
+	public String getUserMentions() {
+		if (mentionUsers.isEmpty()) {
+			return "NONE";
+		} else {
+			String users = "";
+			for (String user : mentionUsers) {
+				users += "," + user;
+			}
+			return users.substring(1);
+		}
+	}
+
+	public String getCreatedAt() throws ParseException {
+		return isoFormat.format(twitterFormat.parse(createdAt));
+	}
+
+	public static Status fromJson(String json, float diagSize) throws Exception {
+		Preconditions.checkNotNull(json);
+
+		JsonObject obj = (JsonObject) parser.parse(json);
+		if (!obj.isJsonNull()) {
+			JsonObject user = obj.get("user").getAsJsonObject();
+			status.obj = obj;
+			status.text = obj.get("text").getAsString()
+					.replaceAll("\\r\\n|\\r|\\n", " ");// removing new lines
+			status.id = obj.get("id_str").getAsString();
+			status.userId = user.get("id_str").getAsString();
+			status.screenname = user.get("screen_name").getAsString();
+			status.createdAt = obj.get("created_at").getAsString();
+			status.getGeolocation(diagSize);
+			status.getMentions();
+
+			return status;
+		} else {
+			return null;
+		}
+	}
+
+	// Gets the mentions from a user
+	private void getMentions() {
+		mentionUsers.clear();
+		if (!obj.get("entities").isJsonNull()) {
+			if (obj.get("entities").getAsJsonObject().get("user_mentions") != null) {
+				JsonArray users = obj.get("entities").getAsJsonObject()
+						.get("user_mentions").getAsJsonArray();
+
+				for (JsonElement user : users) {
+					mentionUsers.add(user.getAsJsonObject().get("id_str")
+							.getAsString());
+				}
+			}
+
+		}
+
+	}
+
+	// Given two JsonArray points, get the distance between them
+	private double getDistance(JsonArray pt1, JsonArray pt2) {
+		double d = Math.sqrt(Math.pow(pt1.get(0).getAsDouble()
+				- pt2.get(0).getAsDouble(), 2)
+				+ Math.pow(pt1.get(1).getAsDouble() - pt2.get(1).getAsDouble(),
+						2));
+		return d;
+	}
+
+	// Validate that lat is between -90 and 90
+	// and long is between -180 and 180
+	private boolean validateCoords(double lat, double lon) {
+		if (Math.abs(lat) <= 90 && Math.abs(lon) <= 180)
+			return true;
+		return false;
+	}
+
+	// Get the geolocation of a tweet
+	// diagSize corresponds to if the geo coordiantes are in a boundng box, how
+	// small the box is...
+	private void getGeolocation(float diagSize) {
+		double lat, lon;
+		lat = lon = 0;
+		JsonObject geoTag = null;
+		JsonArray coords = null;
+
+		// Geo Location
+		if (!obj.get("geo").isJsonNull()) {
+			geoTag = obj.get("geo").getAsJsonObject();
+			coords = geoTag.get("coordinates").getAsJsonArray();
+		} else if (!obj.get("place").isJsonNull()) {
+			geoTag = obj.get("place").getAsJsonObject().get("bounding_box")
+					.getAsJsonObject();
+			coords = geoTag.get("coordinates").getAsJsonArray();
+		} else
+			return;
+
+		// Point or Polygon
+		if (geoTag.get("type").getAsString().equals("Point")) {
+			lat = coords.get(0).getAsDouble();
+			lon = coords.get(1).getAsDouble();
+		} else if (geoTag.get("type").getAsString().equals("Polygon")) {
+			JsonArray coordPoints = coords.get(0).getAsJsonArray();
+
+			// Must be a quadralateral
+			if (coordPoints.size() == 4) {
+
+				JsonArray pt1 = (JsonArray) coordPoints.get(0);
+				JsonArray pt3 = (JsonArray) coordPoints.get(2);
+
+				// Small Area
+				if (getDistance(pt1, pt3) < diagSize) {
+
+					for (int i = 0; i < coordPoints.size(); i++) {
+						JsonArray coord = (JsonArray) coordPoints.get(i);
+						lat += coord.get(1).getAsDouble();
+						lon += coord.get(0).getAsDouble();
+					}
+
+					lat = lat / coordPoints.size();
+					lon = lon / coordPoints.size();
+
+				} else {
+					status.geoBool = false;
+					return;
+				}
+			}
+
+		} else {
+			System.err.println(geoTag.get("type").getAsString()
+					+ " is Not Point or Polygon");
+			return;
+		}
+
+		if (validateCoords(lat, lon)) {
+			// Update the lat and lon
+			status.lat = lat;
+			status.lon = lon;
+			status.geoBool = true;
+		} else {
+			System.err.println("Lat " + lat + " and Long " + lon
+					+ " are not in the proper range.");
+			status.geoBool = false;
+		}
+	}
+}