1. Quinton Anderson
  2. tfidf-topology

Commits

Quinton Anderson  committed a579d0a

planning of topology complete, building out functions started

  • Participants
  • Parent commits 5d5bda1
  • Branches master

Comments (0)

Files changed (5)

File pom.xml

View file
     		<artifactId>tika-parsers</artifactId>
     		<version>1.2</version>
   		</dependency>
+  		<dependency>
+			<groupId>net.htmlparser</groupId>
+			<artifactId>jericho-html</artifactId>
+			<version>2.3</version>
+		</dependency>
+            
     </dependencies>
 
 	<build>

File src/main/java/storm/cookbook/tfidf/DocumentFetchFunction.java

View file
+package storm.cookbook.tfidf;
+
+import java.net.URL;
+
+import au.id.jericho.lib.html.Source;
+import backtype.storm.tuple.Values;
+import storm.trident.operation.BaseFunction;
+import storm.trident.operation.TridentCollector;
+import storm.trident.tuple.TridentTuple;
+import twitter4j.URLEntity;
+
+public class DocumentFetchFunction extends BaseFunction {
+
+	@Override
+	public void execute(TridentTuple tuple, TridentCollector collector) {
+		URLEntity[] urls = (URLEntity[]) tuple.getValueByField(TfidfTopologyFields.TWEET_URLS);
+		for(int i = 0; i < urls.length; i++){
+			//TODO: use another lib here
+			/*Source source = new Source(new URL(urls[i].getExpandedURL()));
+			String renderedText = source.getRenderer().toString();
+			collector.emit(new Values(renderedText));*/
+		}
+		//The tweet text is always a document itself
+		collector.emit(new Values(tuple.getStringByField(TfidfTopologyFields.TWEET_TEXT)));
+
+	}
+
+}

File src/main/java/storm/cookbook/tfidf/TermTopology.java

View file
 package storm.cookbook.tfidf;
 
+import storm.trident.TridentState;
+import storm.trident.TridentTopology;
+import storm.trident.operation.BaseFunction;
+import storm.trident.operation.TridentCollector;
+import storm.trident.operation.builtin.Count;
+import storm.trident.operation.builtin.FilterNull;
+import storm.trident.operation.builtin.MapGet;
+import storm.trident.operation.builtin.Sum;
+import storm.trident.testing.FixedBatchSpout;
+import storm.trident.testing.MemoryMapState;
+import storm.trident.tuple.TridentTuple;
 import backtype.storm.Config;
 import backtype.storm.LocalCluster;
+import backtype.storm.LocalDRPC;
 import backtype.storm.StormSubmitter;
 import backtype.storm.generated.AlreadyAliveException;
 import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.generated.StormTopology;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
 import backtype.storm.utils.Utils;
 
 public class TermTopology {
 
-	private TopologyBuilder builder = new TopologyBuilder();
-	private Config conf = new Config();
-	private LocalCluster cluster;
-
-	public TermTopology() {
-		
-	}
-
-	public TopologyBuilder getBuilder() {
-		return builder;
-	}
-
-	public LocalCluster getLocalCluster() {
-		return cluster;
-	}
-
-	public Config getConf() {
-		return conf;
-	}
-
-	public void runLocal(int runTime) {
-		conf.setDebug(true);
-		cluster = new LocalCluster();
-		cluster.submitTopology("test", conf, builder.createTopology());
-		if (runTime > 0) {
-			Utils.sleep(runTime);
-			shutDownLocal();
-		}
-	}
-
-	public void shutDownLocal() {
-		if (cluster != null) {
-			cluster.killTopology("test");
-			cluster.shutdown();
-		}
-	}
-
-	public void runCluster(String name, String redisHost, String cassandraHost)
-			throws AlreadyAliveException, InvalidTopologyException {
-		conf.setNumWorkers(20);
-		StormSubmitter.submitTopology(name, conf, builder.createTopology());
-	}
-
-	public static void main(String[] args) throws Exception {
-
-		TermTopology topology = new TermTopology();
-
-		if (args != null && args.length > 1) {
-			topology.runCluster(args[0], args[1], args[2]);
-		} else {
-			if (args != null && args.length == 1)
-				System.out
-						.println("Running in local mode, redis ip missing for cluster run");
-			topology.runLocal(10000);
-		}
-
-	}
+	public static class Split extends BaseFunction {
+        @Override
+        public void execute(TridentTuple tuple, TridentCollector collector) {
+            String sentence = tuple.getString(0);
+            for(String word: sentence.split(" ")) {
+                collector.emit(new Values(word));                
+            }
+        }
+    }
+    
+    public static StormTopology buildTopology(LocalDRPC drpc) {
+        //TODO: lookup the track terms for the spout or get them from the command line
+        TwitterTrackSpout twitterSpout = new TwitterTrackSpout(10000, new String[]{"google"}, 100);
+        TridentTopology topology = new TridentTopology();    
+        /*TridentState termCounts =
+                topology.newStream("tweetSpout", twitterSpout)
+                  .parallelismHint(16)
+                  .each(new Fields("status","links"), new FetchDocuments(), new Fields("documents"))
+                  .each(new Fields("document"), new Tokenize(), new Fields("words"))
+                  .each(new Fields("words"), new StopFilter(), new Fields("terms"))
+                  .each(new Fields("terms"), new StemFunction, new Fields("stems"))
+                  .aggregate(new Fields("stem","document"), new TFAggregate, new Fields("tf"))
+                  .aggregate(new Fields("stems","document"), new DFAggregate, new Fields("df"))
+                  .aggregate(new Fields("document"), new DAggregate, new Fields("d"))
+                  .aggregate(new Fields("d","df"), new IDFAggregate,new Fields("idf"))
+                  .persistentAggregate(new MemoryMapState.Factory(), new Fields("td","idf"), 
+                  						new TFIDFAggregate, new Fields("tfidf"))         
+                  .parallelismHint(16);*/
+                
+        /*topology.newDRPCStream("words", drpc)
+                .each(new Fields("args"), new Split(), new Fields("word"))
+                .groupBy(new Fields("word"))
+                .stateQuery(termCounts, new Fields("word"), new MapGet(), new Fields("count"))
+                .each(new Fields("count"), new FilterNull())
+                .aggregate(new Fields("count"), new Sum(), new Fields("sum"))
+                ;*/
+        return topology.build();
+    }
+    
+    public static void main(String[] args) throws Exception {
+        Config conf = new Config();
+        conf.setMaxSpoutPending(20);
+        if(args.length==0) {
+            LocalDRPC drpc = new LocalDRPC();
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
+            for(int i=0; i<100; i++) {
+                System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
+                Thread.sleep(1000);
+            }
+        } else {
+            conf.setNumWorkers(3);
+            StormSubmitter.submitTopology(args[0], conf, buildTopology(null));        
+        }
+    }
 
 }

File src/main/java/storm/cookbook/tfidf/TfidfTopologyFields.java

View file
+package storm.cookbook.tfidf;
+
+public class TfidfTopologyFields {
+
+	public static final String TWEET_TEXT = "text";
+	public static final String TWEET_ID = "id";
+	public static final String TWEET_URLS = "urls";
+	public static final String TWEET_HAS_TAGS = "hasTags";
+	
+}

File src/main/java/storm/cookbook/tfidf/TwitterTrackSpout.java

View file
 package storm.cookbook.tfidf;
 
-import backtype.storm.Config;
-import twitter4j.TwitterStream;
-import twitter4j.TwitterStreamFactory;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
 import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import storm.trident.operation.TridentCollector;
+import storm.trident.spout.IBatchSpout;
 import twitter4j.FilterQuery;
 import twitter4j.StallWarning;
 import twitter4j.Status;
 import twitter4j.StatusDeletionNotice;
 import twitter4j.StatusListener;
+import twitter4j.TwitterStream;
+import twitter4j.TwitterStreamFactory;
+import twitter4j.URLEntity;
+import backtype.storm.Config;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
 
-public class TwitterTrackSpout extends BaseRichSpout {
+public class TwitterTrackSpout implements IBatchSpout {
 	Logger LOG = LoggerFactory.getLogger(TwitterTrackSpout.class); 
-    SpoutOutputCollector collector;
     LinkedBlockingQueue<Status> queue = null;
     TwitterStream twitterStream;
     long maxQueueDepth;
     String[] trackTerms;
+    long batchSize;
     
-    public TwitterTrackSpout(long maxQueueDepth, String[] trackTerms) {
+    public TwitterTrackSpout(long maxQueueDepth, String[] trackTerms, long batchSize) {
     	this.maxQueueDepth = maxQueueDepth;
     	this.trackTerms = trackTerms;
+    	this.batchSize = batchSize;
     }
     
-    @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+    public void open(Map conf, TopologyContext context) {
         queue = new LinkedBlockingQueue<Status>(1000);
-        this.collector = collector;
         StatusListener listener = new StatusListener() {
         	
             @Override
         filter.track(trackTerms);
         twitterStream.filter(filter);
     }
-
-    @Override
-    public void nextTuple() {
-        Status ret = queue.poll();
+    
+    public void emitBatch(long batchId, TridentCollector collector) {
+    	//TODO: do this better
+    	Status ret = queue.poll();
         if(ret==null) {
             Utils.sleep(50);
         } else {
-            collector.emit(new Values(ret));
+            collector.emit(new Values(ret.getId(), ret.getText(),ret.getURLEntities(),ret.getHashtagEntities()));
         }
     }
 
         return ret;
     }    
 
-    @Override
-    public void ack(Object id) {
+    public void fail(long batchId) {
     }
 
-    @Override
-    public void fail(Object id) {
-    }
+	@Override
+	public void ack(long batchId) {
+		//TODO: consider only removing from queue once processed, maybe use kafka instead?
+		
+	}
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("tweet"));
-    }
+	@Override
+	public Fields getOutputFields() {
+		return new Fields(TfidfTopologyFields.TWEET_ID,
+				TfidfTopologyFields.TWEET_TEXT,
+				TfidfTopologyFields.TWEET_URLS,
+				TfidfTopologyFields.TWEET_HAS_TAGS);
+	}
     
 }