Ben Wing avatar Ben Wing committed 731679a Merge

Automatic merge

Comments (0)

Files changed (7)

Add a comment to this file

parse-tweets/lib/gson-1.6.jar

Binary file added.

Add a comment to this file

parse-tweets/lib/guava-r08.jar

Binary file added.

parse-tweets/main/src/java/fogbow/textgrounder/GetTwitterUsers.java

+package fogbow.textgrounder;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * <p>
+ * outputs a sorted list of all the words that have come after it in the
+ * training corpus, and takes the following command-line arguments:
+ * </p>
+ * 
+ * <ul>
+ * <li>[input-path] input path</li>
+ * <li>[output-path] output path</li>
+ * <li>[num-reducers] number of reducers</li>
+ * </ul>
+ * 
+ * 
+ * Taken and adapted from edu.umd.cloud9.demo.CloudNineWordCount
+ * 
+ * @author Andy Luong & Erik Skiles
+ */
+public class GetTwitterUsers extends Configured implements Tool {
+	private static final Logger sLogger = Logger
+			.getLogger(GetTwitterUsers.class);
+
+	private static class GetTwitterUsersMapper extends
+			Mapper<LongWritable, Text, Text, NullWritable> {
+
+		Text out = new Text();
+
+		@Override
+		public void map(LongWritable key, Text value, Context context)
+				throws IOException, InterruptedException {
+
+			try {
+				JSONObject json = new JSONObject(value.toString());
+				JSONObject user = json.getJSONObject("user");
+				String id = user.getString("id_str");
+				out.set(id);
+				context.write(out, NullWritable.get());
+
+			} catch (JSONException e) {
+				// TODO Auto-generated catch block
+				e.printStackTrace();
+			}
+		}
+	}
+
+	private static class GetTwitterUsersReducer extends
+			Reducer<Text, NullWritable, Text, NullWritable> {
+		@Override
+		public void reduce(Text key, Iterable<NullWritable> values,
+				Context context) throws IOException, InterruptedException {
+			context.write(key, NullWritable.get());
+		}
+	}
+
+	/**
+	 * Creates an instance of this tool.
+	 */
+	public GetTwitterUsers() {
+	}
+
+	private static int printUsage() {
+		System.out.println("usage: [input-path] [output-path] [num-reducers]");
+		ToolRunner.printGenericCommandUsage(System.out);
+		return -1;
+	}
+
+	/**
+	 * Runs this tool.
+	 */
+	public int run(String[] args) throws Exception {
+		if (args.length != 3) {
+			printUsage();
+			return -1;
+		}
+
+		String inputPath = args[0];
+		String outputPath = args[1];
+		int reduceTasks = Integer.parseInt(args[2]);
+
+		sLogger.info("Tool: GetTwitterUsers");
+		sLogger.info(" - input path: " + inputPath);
+		sLogger.info(" - output path: " + outputPath);
+		sLogger.info(" - number of reducers: " + reduceTasks);
+
+		Configuration conf = new Configuration();
+		Job job = new Job(conf, "GetTwitterUsers");
+		job.setJarByClass(GetTwitterUsers.class);
+
+		job.setNumReduceTasks(reduceTasks);
+
+		FileInputFormat.setInputPaths(job, new Path(inputPath));
+		FileOutputFormat.setOutputPath(job, new Path(outputPath));
+
+		job.setOutputKeyClass(Text.class);
+		job.setOutputValueClass(NullWritable.class);
+		job.setMapOutputValueClass(NullWritable.class);
+
+		job.setMapperClass(GetTwitterUsersMapper.class);
+		job.setCombinerClass(GetTwitterUsersReducer.class);
+		job.setReducerClass(GetTwitterUsersReducer.class);
+
+		// Delete the output directory if it exists already
+		Path outputDir = new Path(outputPath);
+		FileSystem.get(conf).delete(outputDir, true);
+
+		long startTime = System.currentTimeMillis();
+		job.waitForCompletion(true);
+		sLogger.info("Job Finished in "
+				+ (System.currentTimeMillis() - startTime) / 1000.0
+				+ " seconds");
+
+		return 0;
+	}
+
+	/**
+	 * Dispatches command-line arguments to the tool via the
+	 * <code>ToolRunner</code>.
+	 */
+	public static void main(String[] args) throws Exception {
+		int res = ToolRunner.run(new Configuration(), new GetTwitterUsers(),
+				args);
+		System.exit(res);
+	}
+
+}

parse-tweets/main/src/java/twools/PreprocessTweetsJSON.java

+package twools;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+/**
+ * <p>
+ * command-line arguments:
+ * </p>
+ * 
+ * <ul>
+ * <li>[input-path] input path</li>
+ * <li>[output-path] output path</li>
+ * <li>[num-reducers] number of reducers</li>
+ * </ul>
+ * 
+ * 
+ * Taken and adapted from edu.umd.cloud9.demo.CloudNineWordCount
+ * 
+ * @author Andy Luong
+ */
+public class PreprocessTweetsJSON extends Configured implements Tool {
+	private static final Logger sLogger = Logger
+			.getLogger(PreprocessTweetsJSON.class);
+
+	private static class PreprocessTweetsJSONMapper extends
+			Mapper<LongWritable, Text, Text, Text> {
+
+		Text user = new Text();
+		Text data = new Text();
+
+		private static boolean twokenizeFlag;
+		private static float diagSize;
+
+		@Override
+		public void setup(Context context) {
+			twokenizeFlag = context.getConfiguration().getBoolean("twokenize",
+					false);
+			diagSize = context.getConfiguration().getFloat("diagSize",
+					(float) 0.5);
+		}
+
+		@Override
+		public void map(LongWritable key, Text value, Context context)
+				throws IOException, InterruptedException {
+
+			try {
+				Status status = Status.fromJson(value.toString(), diagSize);
+				if (status.hasGeo()) {
+
+					String text = status.getText();
+
+					if (twokenizeFlag) {
+						text = Twokenize.tokenize(status.getText()).mkString(
+								" ");
+
+					}
+
+					user.set(status.getUserId());
+					data.set(status.getCreatedAt() + "\t" + status.getCoord()
+							+ "\t" + text + "\t" + status.getUserMentions());
+
+					context.write(user, data);
+				}
+
+			} catch (Exception e) {
+				System.err
+						.println("Bad JSON Input. Exception: " + e.toString());
+				System.err.println(value.toString());
+			}
+		}
+	}
+
+	// Reducer will
+	private static class PreprocessTweetsJSONReducer extends
+			Reducer<Text, Text, Text, Text> {
+		@Override
+		public void reduce(Text key, Iterable<Text> values, Context context)
+				throws IOException, InterruptedException {
+
+			for (Text val : values)
+				context.write(key, val);
+		}
+	}
+
+	/**
+	 * Creates an instance of this tool.
+	 */
+	public PreprocessTweetsJSON() {
+	}
+
+	private static int printUsage() {
+		System.out
+				.println("usage: [input-path] [output-path] [twokenize text (true or false)] [Bounding-box diagonal length] [num-reducers]");
+		ToolRunner.printGenericCommandUsage(System.out);
+		return -1;
+	}
+
+	/**
+	 * Runs this tool.
+	 */
+	@Override
+	public int run(String[] args) throws Exception {
+		if (args.length != 5) {
+			printUsage();
+			return -1;
+		}
+
+		String inputPath = args[0];
+		String outputPath = args[1];
+		String twokenizeFlag = args[2];
+		float diagSize = Float.parseFloat(args[3]);
+		int reduceTasks = Integer.parseInt(args[4]);
+
+		Configuration conf = new Configuration();
+		conf.setBoolean("twokenize", Boolean.valueOf(twokenizeFlag));
+		conf.setFloat("diagSize", diagSize);
+
+		Job job = new Job(conf, "PreprocessTweetsJSON");
+		job.setJarByClass(PreprocessTweetsJSON.class);
+
+		job.setNumReduceTasks(reduceTasks);
+
+		FileInputFormat.setInputPaths(job, new Path(inputPath));
+		FileOutputFormat.setOutputPath(job, new Path(outputPath));
+
+		job.setOutputKeyClass(Text.class);
+		job.setOutputValueClass(Text.class);
+
+		job.setMapperClass(PreprocessTweetsJSONMapper.class);
+		job.setReducerClass(PreprocessTweetsJSONReducer.class);
+
+		// Delete the output directory if it exists already
+		Path outputDir = new Path(outputPath);
+		FileSystem.get(conf).delete(outputDir, true);
+
+		sLogger.info("Tool: PreprocessTweetsJSON");
+		sLogger.info(" - input path: " + inputPath);
+		sLogger.info(" - output path: " + outputPath);
+		sLogger.info(" - twokenize: " + twokenizeFlag);
+		sLogger.info(" - number of reducers: " + reduceTasks);
+
+		long startTime = System.currentTimeMillis();
+		job.waitForCompletion(true);
+		sLogger.info("Job Finished in "
+				+ (System.currentTimeMillis() - startTime) / 1000.0
+				+ " seconds");
+
+		return 0;
+	}
+
+	/**
+	 * Dispatches command-line arguments to the tool via the
+	 * <code>ToolRunner</code>.
+	 */
+	public static void main(String[] args) throws Exception {
+		int res = ToolRunner.run(new Configuration(),
+				new PreprocessTweetsJSON(), args);
+		System.exit(res);
+	}
+
+}

parse-tweets/main/src/java/twools/Status.java

+package twools;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+/**
+ * Singleton Object representing a status.
+ */
+public class Status {
+	private static final JsonParser parser = new JsonParser();
+	private static final SimpleDateFormat twitterFormat = new SimpleDateFormat(
+			"EEE MMM dd HH:mm:ss +0000 yyyy");
+	private static final SimpleDateFormat isoFormat = new SimpleDateFormat(
+			"yyyy-MM-dd'T'HH:mm'Z'");
+
+	private static Status status = new Status();
+
+	private int httpStatusCode;
+	private String id;
+	private String userId;
+	private String screenname;
+	private String createdAt;
+	private String text;
+	private final ArrayList<String> mentionUsers = new ArrayList<String>();;
+	private double lat;
+	private double lon;
+	private boolean geoBool;
+	private JsonObject obj;
+
+	protected Status() {
+
+	}
+
+	public boolean hasGeo() {
+		return status.geoBool;
+	}
+
+	public String getId() {
+		return status.id;
+	}
+
+	public String getUserId() {
+		return "USER_" + status.userId;
+	}
+
+	public String getText() {
+		return status.text;
+	}
+
+	public String getLat() {
+		return Double.toString(status.lat);
+	}
+
+	public String getLon() {
+		return Double.toString(status.lon);
+	}
+
+	public String getCoord() {
+		return status.lat + "," + status.lon;
+	}
+
+	public String getScreenname() {
+		return status.screenname;
+	}
+
+	public int getHttpStatusCode() {
+		return status.httpStatusCode;
+	}
+
+	public String getUserMentions() {
+		if (mentionUsers.isEmpty()) {
+			return "NONE";
+		} else {
+			String users = "";
+			for (String user : mentionUsers) {
+				users += "," + user;
+			}
+			return users.substring(1);
+		}
+	}
+
+	public String getCreatedAt() throws ParseException {
+		return isoFormat.format(twitterFormat.parse(createdAt));
+	}
+
+	public static Status fromJson(String json, float diagSize) throws Exception {
+		Preconditions.checkNotNull(json);
+
+		JsonObject obj = (JsonObject) parser.parse(json);
+		if (!obj.isJsonNull()) {
+			JsonObject user = obj.get("user").getAsJsonObject();
+			status.obj = obj;
+			status.text = obj.get("text").getAsString()
+					.replaceAll("\\r\\n|\\r|\\n", " ");// removing new lines
+			status.id = obj.get("id_str").getAsString();
+			status.userId = user.get("id_str").getAsString();
+			status.screenname = user.get("screen_name").getAsString();
+			status.createdAt = obj.get("created_at").getAsString();
+			status.getGeolocation(diagSize);
+			status.getMentions();
+
+			return status;
+		} else {
+			return null;
+		}
+	}
+
+	// Gets the mentions from a user
+	private void getMentions() {
+		mentionUsers.clear();
+		if (!obj.get("entities").isJsonNull()) {
+			if (obj.get("entities").getAsJsonObject().get("user_mentions") != null) {
+				JsonArray users = obj.get("entities").getAsJsonObject()
+						.get("user_mentions").getAsJsonArray();
+
+				for (JsonElement user : users) {
+					mentionUsers.add(user.getAsJsonObject().get("id_str")
+							.getAsString());
+				}
+			}
+
+		}
+
+	}
+
+	// Given two JsonArray points, get the distance between them
+	private double getDistance(JsonArray pt1, JsonArray pt2) {
+		double d = Math.sqrt(Math.pow(pt1.get(0).getAsDouble()
+				- pt2.get(0).getAsDouble(), 2)
+				+ Math.pow(pt1.get(1).getAsDouble() - pt2.get(1).getAsDouble(),
+						2));
+		return d;
+	}
+
+	// Validate that lat is between -90 and 90
+	// and long is between -180 and 180
+	private boolean validateCoords(double lat, double lon) {
+		if (Math.abs(lat) <= 90 && Math.abs(lon) <= 180)
+			return true;
+		return false;
+	}
+
+	// Get the geolocation of a tweet
+	// diagSize corresponds to if the geo coordiantes are in a boundng box, how
+	// small the box is...
+	private void getGeolocation(float diagSize) {
+		double lat, lon;
+		lat = lon = 0;
+		JsonObject geoTag = null;
+		JsonArray coords = null;
+
+		// Geo Location
+		if (!obj.get("geo").isJsonNull()) {
+			geoTag = obj.get("geo").getAsJsonObject();
+			coords = geoTag.get("coordinates").getAsJsonArray();
+		} else if (!obj.get("place").isJsonNull()) {
+			geoTag = obj.get("place").getAsJsonObject().get("bounding_box")
+					.getAsJsonObject();
+			coords = geoTag.get("coordinates").getAsJsonArray();
+		} else
+			return;
+
+		// Point or Polygon
+		if (geoTag.get("type").getAsString().equals("Point")) {
+			lat = coords.get(0).getAsDouble();
+			lon = coords.get(1).getAsDouble();
+		} else if (geoTag.get("type").getAsString().equals("Polygon")) {
+			JsonArray coordPoints = coords.get(0).getAsJsonArray();
+
+			// Must be a quadralateral
+			if (coordPoints.size() == 4) {
+
+				JsonArray pt1 = (JsonArray) coordPoints.get(0);
+				JsonArray pt3 = (JsonArray) coordPoints.get(2);
+
+				// Small Area
+				if (getDistance(pt1, pt3) < diagSize) {
+
+					for (int i = 0; i < coordPoints.size(); i++) {
+						JsonArray coord = (JsonArray) coordPoints.get(i);
+						lat += coord.get(1).getAsDouble();
+						lon += coord.get(0).getAsDouble();
+					}
+
+					lat = lat / coordPoints.size();
+					lon = lon / coordPoints.size();
+
+				} else {
+					status.geoBool = false;
+					return;
+				}
+			}
+
+		} else {
+			System.err.println(geoTag.get("type").getAsString()
+					+ " is Not Point or Polygon");
+			return;
+		}
+
+		if (validateCoords(lat, lon)) {
+			// Update the lat and lon
+			status.lat = lat;
+			status.lon = lon;
+			status.geoBool = true;
+		} else {
+			System.err.println("Lat " + lat + " and Long " + lon
+					+ " are not in the proper range.");
+			status.geoBool = false;
+		}
+	}
+}

parse-tweets/main/src/scala/fogbow/textgrounder/Preprocess.scala

+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package fogbow.textgrounder;
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.io.{IntWritable, NullWritable, Text}
+import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer}
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+import org.apache.hadoop.util.GenericOptionsParser
+import scala.collection.JavaConversions._
+
+class JsonTweetMapper extends Mapper[Object, Text, Text, NullWritable] {
+  val data = new Text
+
+  override 
+  def map (key: Object, value: Text, context: Mapper[Object, Text, Text, NullWritable]#Context) {
+	
+	try {
+	    val status = Status.fromJson(value.toString)
+	    if (status.hasGeo()) {
+
+		  data.set(List(
+		    status.getUserId(),
+		    status.getCreatedAt(),
+		    status.getCoord(),
+		    status.getLat(),
+		    status.getLon(),
+		    status.getText()
+		  ) mkString "\t")
+	
+		  context.write(data, NullWritable.get())
+	    }
+	} catch {
+	  case e: Exception => { 
+		System.err.println("Bad JSON?") 
+		System.err.println(value.toString)
+		}
+	}
+  }
+}
+
+class IdentityReducer extends Reducer[Text,NullWritable,Text,NullWritable] {
+
+ override 
+ def reduce (key: Text, values: java.lang.Iterable[NullWritable], 
+ 			context: Reducer[Text,NullWritable,Text,NullWritable]#Context) {
+	context write(key, NullWritable.get())
+  }
+}
+
+object PreprocessJsonTweets {
+  def main (args: Array[String]) {
+    if (args.length != 3) {
+      println("Usage: <inputPath> <outputPath> <numReduceTasks>")
+      System.exit(1)
+    }
+
+    val conf = new Configuration()
+    val job = new Job(conf, "")
+   
+    val outputPath = new Path(args(1))
+    FileInputFormat.addInputPath(job, new Path(args(0)))
+    FileOutputFormat.setOutputPath(job, outputPath)
+    FileSystem.get(conf).delete(outputPath, true)
+    job.setNumReduceTasks(args(2).toInt)
+
+    job.setJarByClass(classOf[JsonTweetMapper])
+    job.setMapperClass(classOf[JsonTweetMapper])
+    job.setOutputKeyClass(classOf[Text])
+    job.setOutputValueClass(classOf[NullWritable])
+    //job.setReducerClass(classOf[IdentityReducer])
+
+    System.exit(if (job.waitForCompletion(true)) 0 else 1)
+  }
+}

twitter-graphs/twitterRelationGraphs.py

         return False
 
     else:
+	api.SetCache(None)
         print "You have been successfully authenticated."
         return True
 
         tweet_s = (idString + "\t" +
                    str(tweet.GetCreatedAtInSeconds()) + "\t" +
                    geo + "\t" +
-                   tweet.GetText().lower())
+                   tweet.GetText().lower().replace("\r","").replace("\n",""))
 
         outUserTweets.write(tweet_s + "\n")
 
                 tweet_s = (otherID + "\t" +
                            str(tweet.GetCreatedAtInSeconds()) + "\t" +
                            geo + "\t" +
-                           tweet.GetText().lower())
+                           tweet.GetText().lower().replace("\r","").replace("\n",""))
 
                 outTweet.write(tweet_s + "\n")
 
 
         if not user in processedUsers and not len(user) == 0:
             checkApiLimit()
+            #Delay between calls
+            time.sleep(apiCallDelaySeconds)
             processUser(user)
 
     fin.close()
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.