Andy Luong avatar Andy Luong committed 6510abd

Added Java and Scala code to parse twitter-pull tweets

Comments (0)

Files changed (6)

Add a comment to this file

twitter-pull/parse-tweets/lib/gson-1.6.jar

Binary file added.

Add a comment to this file

twitter-pull/parse-tweets/lib/guava-r08.jar

Binary file added.

twitter-pull/parse-tweets/main/src/java/fogbow/textgrounder/GetTwitterUsers.java

+package fogbow.textgrounder;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * <p>
+ * outputs a sorted list of all the words that have come after it in the
+ * training corpus, and takes the following command-line arguments:
+ * </p>
+ * 
+ * <ul>
+ * <li>[input-path] input path</li>
+ * <li>[output-path] output path</li>
+ * <li>[num-reducers] number of reducers</li>
+ * </ul>
+ * 
+ * 
+ * Taken and adapted from edu.umd.cloud9.demo.CloudNineWordCount
+ * 
+ * @author Andy Luong & Erik Skiles
+ */
+public class GetTwitterUsers extends Configured implements Tool {
+	private static final Logger sLogger = Logger
+			.getLogger(GetTwitterUsers.class);
+
+	private static class GetTwitterUsersMapper extends
+			Mapper<LongWritable, Text, Text, NullWritable> {
+
+		Text out = new Text();
+
+		@Override
+		public void map(LongWritable key, Text value, Context context)
+				throws IOException, InterruptedException {
+
+			try {
+				JSONObject json = new JSONObject(value.toString());
+				JSONObject user = json.getJSONObject("user");
+				String id = user.getString("id_str");
+				out.set(id);
+				context.write(out, NullWritable.get());
+
+			} catch (JSONException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+		}
+	}
+
+	private static class GetTwitterUsersReducer extends
+			Reducer<Text, NullWritable, Text, NullWritable> {
+		@Override
+		public void reduce(Text key, Iterable<NullWritable> values,
+				Context context) throws IOException, InterruptedException {
+			context.write(key, NullWritable.get());
+		}
+	}
+
+	/**
+	 * Creates an instance of this tool.
+	 */
+	public GetTwitterUsers() {
+	}
+
+	private static int printUsage() {
+		System.out.println("usage: [input-path] [output-path] [num-reducers]");
+		ToolRunner.printGenericCommandUsage(System.out);
+		return -1;
+	}
+
+	/**
+	 * Runs this tool.
+	 */
+	public int run(String[] args) throws Exception {
+		if (args.length != 3) {
+			printUsage();
+			return -1;
+		}
+
+		String inputPath = args[0];
+		String outputPath = args[1];
+		int reduceTasks = Integer.parseInt(args[2]);
+
+		sLogger.info("Tool: GetTwitterUsers");
+		sLogger.info(" - input path: " + inputPath);
+		sLogger.info(" - output path: " + outputPath);
+		sLogger.info(" - number of reducers: " + reduceTasks);
+
+		Configuration conf = new Configuration();
+		Job job = new Job(conf, "GetTwitterUsers");
+		job.setJarByClass(GetTwitterUsers.class);
+
+		job.setNumReduceTasks(reduceTasks);
+
+		FileInputFormat.setInputPaths(job, new Path(inputPath));
+		FileOutputFormat.setOutputPath(job, new Path(outputPath));
+
+		job.setOutputKeyClass(Text.class);
+		job.setOutputValueClass(NullWritable.class);
+		job.setMapOutputValueClass(NullWritable.class);
+
+		job.setMapperClass(GetTwitterUsersMapper.class);
+		job.setCombinerClass(GetTwitterUsersReducer.class);
+		job.setReducerClass(GetTwitterUsersReducer.class);
+
+		// Delete the output directory if it exists already
+		Path outputDir = new Path(outputPath);
+		FileSystem.get(conf).delete(outputDir, true);
+
+		long startTime = System.currentTimeMillis();
+		job.waitForCompletion(true);
+		sLogger.info("Job Finished in "
+				+ (System.currentTimeMillis() - startTime) / 1000.0
+				+ " seconds");
+
+		return 0;
+	}
+
+	/**
+	 * Dispatches command-line arguments to the tool via the
+	 * <code>ToolRunner</code>.
+	 */
+	public static void main(String[] args) throws Exception {
+		int res = ToolRunner.run(new Configuration(), new GetTwitterUsers(),
+				args);
+		System.exit(res);
+	}
+
+}

twitter-pull/parse-tweets/main/src/java/fogbow/textgrounder/PreprocessTweetsJSON.java

+package fogbow.textgrounder;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+/**
+ * <p>
+ * command-line arguments:
+ * </p>
+ * 
+ * <ul>
+ * <li>[input-path] input path</li>
+ * <li>[output-path] output path</li>
+ * <li>[num-reducers] number of reducers</li>
+ * </ul>
+ * 
+ * 
+ * Taken and adapted from edu.umd.cloud9.demo.CloudNineWordCount
+ * 
+ * @author Andy Luong
+ */
+public class PreprocessTweetsJSON extends Configured implements Tool {
+	private static final Logger sLogger = Logger
+			.getLogger(GetTwitterUsers.class);
+
+	// static int count = 0;
+
+	private static class PreprocessTweetsJSONMapper extends
+			Mapper<LongWritable, Text, Text, Text> {
+
+		Text user = new Text();
+		Text data = new Text();
+
+		@Override
+		public void map(LongWritable key, Text value, Context context)
+				throws IOException, InterruptedException {
+
+			try {
+				Status status = Status.fromJson(value.toString());
+				if (status.hasGeo()) {
+					user.set(status.getId());
+					data.set(status.getUserId() + "\t" + status.getCreatedAt()
+							+ "\t" + status.getCoord() + "\t" + status.getLat()
+							+ "\t" + status.getLon() + "\t" + status.getText());
+
+					context.write(user, data);
+				}
+
+			} catch (Exception e) {
+				System.err.println("Bad JSON?");
+				System.err.println(value.toString());
+			}
+
+			/*
+			 * count++; if (count % 100000 == 0) System.out.println(count);
+			 */
+		}
+	}
+
+	private static class PreprocessTweetsJSONReducer extends
+			Reducer<Text, Text, Text, NullWritable> {
+		@Override
+		public void reduce(Text key, Iterable<Text> values, Context context)
+				throws IOException, InterruptedException {
+
+			for (Text val : values)
+				context.write(val, NullWritable.get());
+		}
+	}
+
+	/**
+	 * Creates an instance of this tool.
+	 */
+	public PreprocessTweetsJSON() {
+	}
+
+	private static int printUsage() {
+		System.out.println("usage: [input-path] [output-path] [num-reducers]");
+		ToolRunner.printGenericCommandUsage(System.out);
+		return -1;
+	}
+
+	/**
+	 * Runs this tool.
+	 */
+	public int run(String[] args) throws Exception {
+		if (args.length != 3) {
+			printUsage();
+			return -1;
+		}
+
+		String inputPath = args[0];
+		String outputPath = args[1];
+		int reduceTasks = Integer.parseInt(args[2]);
+
+		sLogger.info("Tool: PreprocessTweetsJSON");
+		sLogger.info(" - input path: " + inputPath);
+		sLogger.info(" - output path: " + outputPath);
+		sLogger.info(" - number of reducers: " + reduceTasks);
+
+		Configuration conf = new Configuration();
+		Job job = new Job(conf, "PreprocessTweetsJSON");
+		job.setJarByClass(PreprocessTweetsJSON.class);
+
+		job.setNumReduceTasks(reduceTasks);
+
+		FileInputFormat.setInputPaths(job, new Path(inputPath));
+		FileOutputFormat.setOutputPath(job, new Path(outputPath));
+
+		job.setOutputKeyClass(Text.class);
+		job.setOutputValueClass(NullWritable.class);
+		job.setMapOutputValueClass(Text.class);
+
+		job.setMapperClass(PreprocessTweetsJSONMapper.class);
+		job.setReducerClass(PreprocessTweetsJSONReducer.class);
+
+		// Delete the output directory if it exists already
+		Path outputDir = new Path(outputPath);
+		FileSystem.get(conf).delete(outputDir, true);
+
+		long startTime = System.currentTimeMillis();
+		job.waitForCompletion(true);
+		sLogger.info("Job Finished in "
+				+ (System.currentTimeMillis() - startTime) / 1000.0
+				+ " seconds");
+
+		return 0;
+	}
+
+	/**
+	 * Dispatches command-line arguments to the tool via the
+	 * <code>ToolRunner</code>.
+	 */
+	public static void main(String[] args) throws Exception {
+		int res = ToolRunner.run(new Configuration(),
+				new PreprocessTweetsJSON(), args);
+		System.exit(res);
+	}
+
+}

twitter-pull/parse-tweets/main/src/java/fogbow/textgrounder/Status.java

+package fogbow.textgrounder;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import java.text.SimpleDateFormat;
+import java.text.ParseException;
+
+/**
+ * Object representing a status.
+ */
+public class Status {
+  private static final JsonParser parser = new JsonParser();
+  private static final SimpleDateFormat twitterFormat = new SimpleDateFormat("EEE MMM dd HH:mm:ss +0000 yyyy");
+  private static final SimpleDateFormat isoFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm'Z'");
+
+  private String id;
+  private String userId;
+  private String screenname;
+  private String createdAt;
+  private String lat;
+  private String lon;
+  private String text;
+  private int httpStatusCode;
+  private JsonObject jsonObject;
+  private String jsonString;
+  private boolean geoBool;
+
+  protected Status() {}
+
+  public boolean hasGeo() {return geoBool; }
+  public String getId() { return id; }
+  public String getUserId() { return "USER_" + userId; }
+  public String getText() { return text; }
+  public String getLat() { return lat; }
+  public String getLon() { return lon; }
+  public String getCoord() { return "??:" + lat + "," + lon; }
+  public String getScreenname() { return screenname; }
+  public int getHttpStatusCode() { return httpStatusCode; }
+  public JsonObject getJsonObject() { return jsonObject; }
+  public String getJsonString() { return jsonString; }
+  public String getCreatedAt() throws ParseException {
+    return isoFormat.format(twitterFormat.parse(createdAt));
+  }
+
+  public static Status fromJson(String json) throws Exception {
+	Preconditions.checkNotNull(json);
+
+	JsonObject obj = (JsonObject) parser.parse(json);
+	JsonObject user = obj.get("user").getAsJsonObject();
+
+	Status status = new Status();
+	status.text = obj.get("text").getAsString();
+	status.id = obj.get("id_str").getAsString();
+	status.userId = user.get("id_str").getAsString();
+	status.screenname = user.get("screen_name").getAsString();
+	status.createdAt = obj.get("created_at").getAsString();
+	status.geoBool = false;
+	
+	double lat, lon;
+	lat = lon = 0; 
+	
+	if (!obj.get("geo").isJsonNull()) {
+		JsonObject geo = obj.get("geo").getAsJsonObject();
+        JsonArray coords = geo.get("coordinates").getAsJsonArray();
+        
+		if(geo.get("type").getAsString().equals("Point")) {
+			lat = coords.get(0).getAsDouble();
+			lon = coords.get(1).getAsDouble();
+			status.geoBool = true;
+		}
+		else if(geo.get("type").getAsString().equals("Polygon")) {
+			
+			JsonArray coordP = coords.get(0).getAsJsonArray();
+			
+			//Must be a quadralateral
+			if (coordP.size() == 4) {
+				
+				JsonArray pt1 = (JsonArray) coordP.get(0);
+				//JsonArray pt2 = (JsonArray) coordP.get(1);
+				JsonArray pt3 = (JsonArray) coordP.get(2);
+				//JsonArray pt4 = (JsonArray) coordP.get(3);
+				
+				double diag = getDistance(pt1, pt3);
+				
+				//Small Area
+				if( diag < .5) {
+				
+					for(int i = 0; i < coordP.size(); i++) {
+						JsonArray coord = (JsonArray)coordP.get(i);
+						lat += coord.get(0).getAsDouble();
+						lon += coord.get(1).getAsDouble();
+					}
+					lat = lat / coordP.size();
+					lon = lon / coordP.size();
+					
+					status.geoBool = true;
+				}
+			}
+		}
+		else
+			System.out.println("Not Point or Polygon");
+		
+		status.lat = ""+lat;
+		status.lon = ""+lon;
+	}
+	else if(!obj.get("place").isJsonNull()) {
+		JsonObject boundingbox = obj.get("place").getAsJsonObject().get("bounding_box").getAsJsonObject();
+		JsonArray coords = boundingbox.get("coordinates").getAsJsonArray();
+		
+
+		
+		if(boundingbox.get("type").getAsString().equals("Point")) {
+			lat = coords.get(0).getAsDouble();
+			lon = coords.get(1).getAsDouble();
+			status.geoBool = true;
+		}
+		else if(boundingbox.get("type").getAsString().equals("Polygon")) {
+		
+			JsonArray coordP = coords.get(0).getAsJsonArray();
+			
+			//Must be a quadralateral
+			if (coordP.size() == 4) {
+				
+				JsonArray pt1 = (JsonArray) coordP.get(0);
+				//JsonArray pt2 = (JsonArray) coordP.get(1);
+				JsonArray pt3 = (JsonArray) coordP.get(2);
+				//JsonArray pt4 = (JsonArray) coordP.get(3);
+				
+				double diag = getDistance(pt1, pt3);
+				
+				//Small Area
+				if( diag < .5) {
+				
+					for(int i = 0; i < coordP.size(); i++) {
+						JsonArray coord = (JsonArray)coordP.get(i);
+						lat += coord.get(0).getAsDouble();
+						lon += coord.get(1).getAsDouble();
+					}
+					lat = lat / coordP.size();
+					lon = lon / coordP.size();
+					
+					status.geoBool = true;
+				}
+			}
+		}
+		else
+			System.out.println("Not Point or Polygon");
+		
+		status.lat = ""+lat;
+		status.lon = ""+lon;
+	}
+
+	// TODO: We need to parse out the other fields.
+	status.jsonObject = obj;
+	status.jsonString = json;
+
+	return status;
+  }
+  
+  public static double getDistance(JsonArray pt1, JsonArray pt2) {
+  	double d = Math.sqrt(
+  			Math.pow(pt1.get(0).getAsDouble() - pt2.get(0).getAsDouble(), 2) + 
+  			Math.pow(pt1.get(1).getAsDouble() - pt2.get(1).getAsDouble(), 2)
+  			);
+  			
+  	return d;
+  }
+  
+}

twitter-pull/parse-tweets/main/src/scala/fogbow/textgrounder/Preprocess.scala

+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package fogbow.textgrounder;
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.io.{IntWritable, NullWritable, Text}
+import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer}
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+import org.apache.hadoop.util.GenericOptionsParser
+import scala.collection.JavaConversions._
+
+class JsonTweetMapper extends Mapper[Object, Text, Text, NullWritable] {
+  val data = new Text
+
+  override 
+  def map (key: Object, value: Text, context: Mapper[Object, Text, Text, NullWritable]#Context) {
+	
+	try {
+	    val status = Status.fromJson(value.toString)
+	    if (status.hasGeo()) {
+
+		  data.set(List(
+		    status.getUserId(),
+		    status.getCreatedAt(),
+		    status.getCoord(),
+		    status.getLat(),
+		    status.getLon(),
+		    status.getText()
+		  ) mkString "\t")
+	
+		  context.write(data, NullWritable.get())
+	    }
+	} catch {
+	  case e: Exception => { 
+		System.err.println("Bad JSON?") 
+		System.err.println(value.toString)
+		}
+	}
+  }
+}
+
+class IdentityReducer extends Reducer[Text,NullWritable,Text,NullWritable] {
+
+ override 
+ def reduce (key: Text, values: java.lang.Iterable[NullWritable], 
+ 			context: Reducer[Text,NullWritable,Text,NullWritable]#Context) {
+	context write(key, NullWritable.get())
+  }
+}
+
+object PreprocessJsonTweets {
+  def main (args: Array[String]) {
+    if (args.length != 3) {
+      println("Usage: <inputPath> <outputPath> <numReduceTasks>")
+      System.exit(1)
+    }
+
+    val conf = new Configuration()
+    val job = new Job(conf, "")
+   
+    val outputPath = new Path(args(1))
+    FileInputFormat.addInputPath(job, new Path(args(0)))
+    FileOutputFormat.setOutputPath(job, outputPath)
+    FileSystem.get(conf).delete(outputPath, true)
+    job.setNumReduceTasks(args(2).toInt)
+
+    job.setJarByClass(classOf[JsonTweetMapper])
+    job.setMapperClass(classOf[JsonTweetMapper])
+    job.setOutputKeyClass(classOf[Text])
+    job.setOutputValueClass(classOf[NullWritable])
+    //job.setReducerClass(classOf[IdentityReducer])
+
+    System.exit(if (job.waitForCompletion(true)) 0 else 1)
+  }
+}
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.