Commits

Quinton Anderson committed 4edfe37

Finally got all the integration tests passing properly!

Comments (0)

Files changed (9)

   :java-source-paths ["src/jvm" "test/jvm"]
   :test-paths ["test/clj"]
   :javac-options {:debug "true" :fork "true"}
-  :resources-path "multilang/resources"
+  :resources-path "multilang"
   :aot :all
   :repositories {
                    "twitter4j" "http://twitter4j.org/maven2"
                      [org.jmock/jmock-legacy "2.5.1"]
                      [org.mockito/mockito-all "1.8.4"]
                      [org.easytesting/fest-assert-core "2.0M8"]
+                     [net.sf.opencsv/opencsv "2.3"]
                      [org.testng/testng "6.1.1"]]}}
   
   )

src/jvm/storm/cookbook/tfidf/TermTopology.java

 import storm.cookbook.tfidf.functions.DocumentFetchFunction;
 import storm.cookbook.tfidf.functions.DocumentTokenizer;
 import storm.cookbook.tfidf.functions.OuterJoinReducer;
+import storm.cookbook.tfidf.functions.SplitAndProjectToFields;
 import storm.cookbook.tfidf.functions.TermFilter;
 import storm.cookbook.tfidf.functions.TfidfExpression;
 import storm.cookbook.tfidf.spout.TwitterSpout;
+import storm.cookbook.tfidf.state.TfIdfUpdater;
 import storm.trident.Stream;
 import storm.trident.TridentState;
 import storm.trident.TridentTopology;
 import storm.trident.operation.BaseFunction;
 import storm.trident.operation.TridentCollector;
 import storm.trident.operation.builtin.Count;
+import storm.trident.operation.builtin.FilterNull;
 import storm.trident.operation.builtin.MapGet;
 import storm.trident.spout.ITridentSpout;
 import storm.trident.state.StateFactory;
         }
     }
 	
+	public static class StatisSourceFunction extends BaseFunction {
+        @Override
+        public void execute(TridentTuple tuple, TridentCollector collector) {
+            collector.emit(new Values("twitter"));
+        }
+    }
+	
 	public static Stream getUrlStream(TridentTopology topology, ITridentSpout spout){
 		Stream urlStream = null;
     	if(spout == null){
     		FixedBatchSpout testSpout = new FixedBatchSpout(new Fields("url"), 1, 
-    				new Values("http://t.co/hP5PM6fm"), 
-    				new Values("http://t.co/xSFteG23"));
-    		testSpout.setCycle(true);
+    				new Values("doc01"), 
+    				new Values("doc02"), 
+    				new Values("doc03"), 
+    				new Values("doc04"), 
+    				new Values("doc05"));
+    		testSpout.setCycle(false);
     		urlStream = topology.newStream("spout1", testSpout);
     	} else {
     		urlStream = topology.newStream("spout1", spout);
 	
 	public static void addDQueryStream(TridentState state, TridentTopology topology, LocalDRPC drpc){
 		topology.newDRPCStream("dQuery",drpc)
-		.each(new Fields("args"), new Split(), new Fields("source"))
-		.stateQuery(state, new Fields("source"), new MapGet(),
-				new Fields("d_term", "currentD"));
+					.each(new Fields("args"), new Split(), new Fields("source"))
+					.stateQuery(state, new Fields("source"), new MapGet(),new Fields("d"))
+					.each(new Fields("d"), new FilterNull())
+					.project(new Fields("source","d"));
 	}
 	
 	private static void addDFQueryStream(TridentState dfState,
 		topology.newDRPCStream("dfQuery",drpc)
 				.each(new Fields("args"), new Split(), new Fields("term"))
 				.stateQuery(dfState, new Fields("term"),
-						new MapGet(), new Fields("currentDf"));
+						new MapGet(), new Fields("df"))
+				.each(new Fields("df"), new FilterNull())
+				.project(new Fields("term","df"));
+	}
+	
+	private static void addTFIDFQueryStream(TridentState tfState,
+			TridentState dfState,
+			TridentState dState,
+			TridentTopology topology, LocalDRPC drpc) {
+		topology.newDRPCStream("tfidfQuery",drpc)
+				.each(new Fields("args"), new SplitAndProjectToFields(), new Fields("documentId", "term"))
+				.each(new Fields(), new StatisSourceFunction(), new Fields("source"))
+				.stateQuery(tfState, new Fields("documentId", "term"), new MapGet(), new Fields("tf"))
+				.stateQuery(dfState,new Fields("term"), new MapGet(), new Fields("df"))
+				.stateQuery(dState,new Fields("source"), new MapGet(), new Fields("d"))
+				.each(new Fields("term","documentId","tf","d","df"), new TfidfExpression(), new Fields("tfidf"))
+				.each(new Fields("tfidf"), new FilterNull())
+				.project(new Fields("documentId","term","tfidf"));
 	}
 
-	@SuppressWarnings("unchecked")
-	public static StormTopology buildTopology(ITridentSpout spout, LocalDRPC drpc) {
+	@SuppressWarnings("rawtypes")
+	public static TridentTopology buildTopology(ITridentSpout spout, LocalDRPC drpc) {
 		TridentTopology topology = new TridentTopology();
 
 		Stream documentStream = getUrlStream(topology, spout)
 				.persistentAggregate(getStateFactory("df"), new Count(),
 						new Fields("df"));
 		
-		
-		
 		addDFQueryStream(dfState, topology, drpc);
-		Stream dfStream = dfState.newValuesStream();
+		
 		
 		TridentState dState = documentStream.groupBy(new Fields("source"))
 				.persistentAggregate(getStateFactory("d"), new Count(), new Fields("d"));
 		
 		addDQueryStream(dState, topology, drpc);
-		Stream dStream = dState.newValuesStream();
-		
-		Stream idf = topology.multiReduce(dStream, dfStream, new OuterJoinReducer(), new Fields("source", "d", "term", "df"));
-		
-		
-		Stream tf = termStream.groupBy(new Fields("documentId", "term"))
-					.aggregate(new Count(), new Fields("tf"));
-		
-		Stream expressionStream = topology.join(tf, new Fields("term"), idf, new Fields("term"), 
-				new Fields("term","documentId","tf","source","d","df"));
 		
-		//now do a partition persist.
+		//make this a persistent aggregate
+		TridentState tfState = termStream.groupBy(new Fields("documentId", "term"))
+					.persistentAggregate(getStateFactory("tf"), new Count(), new Fields("tf"));
 		
-		/*Stream tfidfStream = termStream.groupBy(new Fields("documentId", "term"))
-				.aggregate(new Count(), new Fields("tf"))
-				.each(new Fields("term","documentId", "tf"),
-						new TfidfExpression(), new Fields("tfidf"));*/
+		addTFIDFQueryStream(tfState, dfState, dState, topology, drpc);
 		
-		return topology.build();
+		return topology;
 	}
 
 	public static void main(String[] args) throws Exception {
 		if (args.length == 0) {
 			LocalDRPC drpc = new LocalDRPC();
 			LocalCluster cluster = new LocalCluster();
-			StormTopology temp = buildTopology(null, drpc);
-			cluster.submitTopology("tfidf", conf, buildTopology(null, drpc));
+			conf.setDebug(true);
+			cluster.submitTopology("tfidf", conf, buildTopology(null, drpc).build());
+			for(int i=0; i<100; i++) {
+                System.out.println("DRPC RESULT: " + drpc.execute("tfidfQuery", "doc01 area"));
+                Thread.sleep(1000);
+            }
 		} else {
 			conf.setNumWorkers(6);
 			StormSubmitter.submitTopology("twitter", conf, getTwitterTopology());
 			//TODO: Create the twitter spout and pass it in here...
-			StormSubmitter.submitTopology(args[0], conf, buildTopology(null, null));
+			StormSubmitter.submitTopology(args[0], conf, buildTopology(null, null).build());
 		}
 	}
 

src/jvm/storm/cookbook/tfidf/functions/DocumentFetchFunction.java

 package storm.cookbook.tfidf.functions;
 
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.io.UnsupportedEncodingException;
 import java.net.URL;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.tika.metadata.Metadata;
 import org.apache.tika.parser.AutoDetectParser;
 
 import storm.trident.operation.BaseFunction;
 import storm.trident.operation.TridentCollector;
+import storm.trident.operation.TridentOperationContext;
 import storm.trident.tuple.TridentTuple;
 import twitter4j.URLEntity;
+import au.com.bytecode.opencsv.CSVReader;
+import backtype.storm.Config;
 import backtype.storm.tuple.Values;
 
 public class DocumentFetchFunction extends BaseFunction {
 
 	private static final long serialVersionUID = 1L;
 	private List<String> mimeTypes;
+	private Map<String,String> testData = new HashMap<String,String>();
+	private boolean testMode = false;
 
 	public DocumentFetchFunction(String[] supportedMimeTypes) {
 		mimeTypes = Arrays.asList(supportedMimeTypes);
 	}
 
+	public void prepare(Map conf, TridentOperationContext context) {
+		if (conf.get(backtype.storm.Config.TOPOLOGY_DEBUG).equals(true)) {
+			testMode = true;
+			CSVReader reader;
+			try {
+				reader = new CSVReader(new BufferedReader(
+						new InputStreamReader(
+								DocumentFetchFunction.class
+										.getResourceAsStream("docs.csv"))));
+				List<String[]> myEntries = reader.readAll();
+				for(String[] row : myEntries){
+					testData.put(row[0], row[1]);
+				}
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+		}
+	}
+	
+
 	@Override
 	public void execute(TridentTuple tuple, TridentCollector collector) {
 		String url = tuple.getStringByField("url");
-		try {
-			Parser parser = new AutoDetectParser();
-			Metadata metadata = new Metadata();
-			ParseContext parseContext = new ParseContext();
-			URL urlObject = new URL(url);
-			ContentHandler handler = new BodyContentHandler(10 * 1024 * 1024);
-			parser.parse((InputStream) urlObject.getContent(), handler,
-					metadata, parseContext);
-			String[] mimeDetails = metadata.get("Content-Type").split(";");
-			if ((mimeDetails.length > 0)
-					&& (mimeTypes.contains(mimeDetails[0]))) {
-				collector.emit(new Values(handler.toString(), url.trim(), "twitter"));
+		if (testMode) {
+			String contents = testData.get(url);
+			if(contents != null)
+				collector.emit(new Values(contents, url.trim(),"twitter"));
+		} else {
+			try {
+				Parser parser = new AutoDetectParser();
+				Metadata metadata = new Metadata();
+				ParseContext parseContext = new ParseContext();
+				URL urlObject = new URL(url);
+				ContentHandler handler = new BodyContentHandler(10 * 1024 * 1024);
+				parser.parse((InputStream) urlObject.getContent(), handler,
+						metadata, parseContext);
+				String[] mimeDetails = metadata.get("Content-Type").split(";");
+				if ((mimeDetails.length > 0)
+						&& (mimeTypes.contains(mimeDetails[0]))) {
+					collector.emit(new Values(handler.toString(), url.trim(),
+							"twitter"));
+				}
+			} catch (Exception e) {
 			}
-		} catch (Exception e) {
 		}
 
 	}

src/jvm/storm/cookbook/tfidf/functions/OuterJoinState.java

 	
 	public List<Values> join(){
 		List<Values> ret = new ArrayList<Values>();
-		int lhsId = getLHS();
-		int rhsId = getRhs(lhsId);
-		for(Object[] lhs : bothSides.get(lhsId)){
-			for(Object[] rhs : bothSides.get(rhsId)){
-				Values vals = new Values(lhs);
-				vals.addAll(Arrays.asList(rhs));
-				ret.add(vals);
+		try{
+			int lhsId = getLHS();
+			int rhsId = getRhs(lhsId);
+			for(Object[] lhs : bothSides.get(lhsId)){
+				for(Object[] rhs : bothSides.get(rhsId)){
+					Values vals = new Values(lhs);
+					vals.addAll(Arrays.asList(rhs));
+					ret.add(vals);
+				}
 			}
-		}
+		}catch(Exception e){}
 		return ret;
 	}
 

src/jvm/storm/cookbook/tfidf/functions/SplitAndProjectToFields.java

+package storm.cookbook.tfidf.functions;
+
+import backtype.storm.tuple.Values;
+import storm.trident.operation.BaseFunction;
+import storm.trident.operation.TridentCollector;
+import storm.trident.tuple.TridentTuple;
+
+@SuppressWarnings("serial")
+public class SplitAndProjectToFields extends BaseFunction {
+
+	@Override
+	public void execute(TridentTuple tuple, TridentCollector collector) {
+		Values vals = new Values();
+		for(String word: tuple.getString(0).split(" ")) {
+            if(word.length() > 0) {
+            	vals.add(word);
+            }
+        }
+		collector.emit(vals);
+		
+	}
+
+	
+
+}

src/jvm/storm/cookbook/tfidf/functions/TfidfExpression.java

 
 	Logger LOG = LoggerFactory.getLogger(TfidfExpression.class);
 	private static final long serialVersionUID = 1L;
-	private DRPCClient client = null;
-	
-	public void prepare(Map conf, TridentOperationContext context){
-		//todo: add this to the config mappings
-		if("true".equals(conf.get("LOCAL")))
-				client = new DRPCClient("localhost", 3772);
-	}
-	
-	private String execute(String function, String args) throws TException, DRPCExecutionException{
-		if(client != null)
-			return client.execute(function, args);
-		//the test values are just there for testing purposes
-		if(function.equals("d"))
-			return "100";
-		return "50";
-	}
 
 	@Override
 	public void execute(TridentTuple tuple, TridentCollector collector) {
 		try {
-			String result = execute("dQuery", "twitter");
-			double d = Double.parseDouble(result);
-			result = execute("dfQuery", tuple.getStringByField("term"));
-			double df = Double.parseDouble(result);
+			double d = (double)tuple.getLongByField("d");
+			double df = (double)tuple.getLongByField("df");
 			double tf = (double) tuple.getLongByField("tf");
-			double tfidf = tf * Math.log(d / (1.0 + df));
+			LOG.debug("d=" + d + "df=" + df + "tf="+ tf);
+			double tfidf = tf * Math.log(d / (1 + df));
 			LOG.debug("Emitting new TFIDF(term,Document): ("
 					+ tuple.getStringByField("term") + ","
 					+ tuple.getStringByField("documentId") + ") = " + tfidf);
 			collector.emit(new Values(tfidf));
-		} catch (Exception e) {
-			LOG.error(e.getStackTrace().toString());
-		} 
+		} catch (Exception e) {} 
 		
 	}
 

src/jvm/storm/cookbook/tfidf/functions/docs.csv

+doc_id,text

src/jvm/storm/cookbook/tfidf/state/TfIdfUpdater.java

+package storm.cookbook.tfidf.state;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseStateUpdater;
+import storm.trident.state.map.SnapshottableMap;
+import storm.trident.tuple.TridentTuple;
+import backtype.storm.tuple.Values;
+
+@SuppressWarnings("rawtypes")
+public class TfIdfUpdater extends BaseStateUpdater<SnapshottableMap> {
+
+	private static final long serialVersionUID = 1L;
+
+	@SuppressWarnings({ "unchecked" })
+	@Override
+	public void updateState(SnapshottableMap state, List<TridentTuple> tuples,
+			TridentCollector collector) {
+		List<List<Object>> keys = new LinkedList<List<Object>>();
+		List<List<Object>> values = new LinkedList<List<Object>>();
+		
+		for(TridentTuple tuple: tuples){
+			List<Object> k = new ArrayList<Object>();
+			List<Object> v = new ArrayList<Object>();
+			k.add(tuple.getValueByField("documentId"));
+			k.add(tuple.getValueByField("term"));
+			v.add(tuple.getValueByField("tfidf"));
+			keys.add(k);
+			values.add(v);
+		}
+		state.multiPut(keys, values);
+		
+		for(TridentTuple tuple: tuples){
+			collector.emit(new Values(tuple.getValueByField("documentId"),
+					tuple.getValueByField("term"),
+					tuple.getValueByField("tfidf")));
+		}
+		
+	}
+
+	
+
+}

test/clj/TermTopology.clj

 (ns storm.cookbook.tfif.TermTopology
   (:use [clojure.test])
   (:require [backtype.storm [testing :as t]])
+  (:use clojure.java.shell)
   (:import [storm.cookbook.tfidf TermTopology])
+  (:import [backtype.storm.generated KillOptions])
   (:use [storm.trident testing])
-  (:use [backtype.storm util])
+  (:use [backtype.storm util config])
   )
 
 (bootstrap-imports)
 
+(defn bootstrap-db []
+  (sh "/Users/admin/Downloads/Development/dsc-cassandra-1.1.6/bin/cassandra-cli" "-f" "dropAndCreateSchema.txt")
+  )
+
+(defn with-topology-debug* [cluster topo body-fn]
+  (t/submit-local-topology (:nimbus cluster) "tester" {TOPOLOGY-DEBUG true} (.build topo))
+  (body-fn)
+  (.killTopologyWithOpts (:nimbus cluster) "tester" (doto (KillOptions.) (.set_wait_secs 0)))
+  )
+
+(defmacro with-topology-debug [[cluster topo] & body]
+  `(with-topology-debug* ~cluster ~topo (fn [] ~@body)))
 
 (deftest test-tfidf
+    (bootstrap-db)
     (t/with-local-cluster [cluster]
       (with-drpc [drpc]
         (letlocals
           (bind feeder (feeder-spout ["url"]))
           (bind topo (TermTopology/buildTopology feeder drpc)) 
-          (with-topology [cluster topo]
-              (feed feeder [["http://t.co/hP5PM6fm"] ["http://t.co/xSFteG23"]])
-              (is (= [[2]] (exec-drpc drpc "words" "the")))
-              (is (= [[1]] (exec-drpc drpc "words" "hello")))
+          (with-topology-debug [cluster topo]
+              (feed feeder [["doc01"] ["doc02"] ["doc03"] ["doc04"] ["doc05"]])
+              (is (= [["twitter" 5]] (exec-drpc drpc "dQuery" "twitter")))
+              (is (= [["area" 3]] (exec-drpc drpc "dfQuery" "area")))
+              (is (= [["doc01" "area" 0.44628710262841953]] (exec-drpc drpc "tfidfQuery" "doc01 area")))
             )
           ) 
         )
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.