Commits

Quinton Anderson committed 5d5bda1

created the basic project structure and implemented the twitter spout, next will be the trident logic.

Comments (0)

Files changed (10)

+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<groupId>storm.cookbook</groupId>
+	<artifactId>tfidf-topology</artifactId>
+	<version>0.0.1-SNAPSHOT</version>
+	<packaging>jar</packaging>
+
+	<name>tfidf-topology</name>
+	<url>https://bitbucket.org/qanderson/tfidf-topology</url>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
+	<repositories>
+		<repository>
+			<id>github-releases</id>
+			<url>http://oss.sonatype.org/content/repositories/github-releases/</url>
+		</repository>
+		<repository>
+			<id>clojars.org</id>
+			<url>http://clojars.org/repo</url>
+		</repository>
+		<repository>
+			<id>twitter4j</id>
+			<url>http://twitter4j.org/maven2</url>
+		</repository>
+		<repository>
+			<id>cloudera</id>
+			<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
+		</repository>
+	</repositories>
+
+	<dependencies>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>4.11</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+			<version>1.6.1</version>
+		</dependency>
+        <dependency>
+            <groupId>org.jmock</groupId>
+            <artifactId>jmock-legacy</artifactId>
+            <version>2.5.1</version>
+            <scope>test</scope>
+        </dependency>		
+        <dependency>
+			<groupId>storm</groupId>
+			<artifactId>storm</artifactId>
+			<version>0.8.1</version>
+			<!-- keep storm out of the jar-with-dependencies -->
+			<scope>provided</scope>
+			<exclusions>
+				<exclusion>
+					<artifactId>slf4j-api</artifactId>
+					<groupId>org.slf4j</groupId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>org.twitter4j</groupId>
+			<artifactId>twitter4j-core</artifactId>
+			<version>[3.0,)</version>
+		</dependency>
+		<dependency>
+			<groupId>org.twitter4j</groupId>
+			<artifactId>twitter4j-stream</artifactId>
+			<version>[3.0,)</version>
+		</dependency>
+		<dependency>
+			<groupId>com.googlecode.json-simple</groupId>
+			<artifactId>json-simple</artifactId>
+			<version>1.1</version>
+		</dependency>
+        <dependency>
+            <groupId>org.jmock</groupId>
+            <artifactId>jmock-junit4</artifactId>
+            <version>2.5.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+			<groupId>redis.clients</groupId>
+			<artifactId>jedis</artifactId>
+			<version>2.1.0</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+    		<groupId>org.apache.tika</groupId>
+    		<artifactId>tika-parsers</artifactId>
+    		<version>1.2</version>
+  		</dependency>
+    </dependencies>
+
+	<build>
+		<plugins>
+		    <!-- 
+		    bind the maven-assembly-plugin to the package phase
+		    this will create a jar file without the storm dependencies
+		    suitable for deployment to a cluster.
+		     -->
+			<plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<configuration>
+					<descriptorRefs>
+						<descriptorRef>jar-with-dependencies</descriptorRef>
+					</descriptorRefs>
+					<archive>
+						<manifest>
+							<mainClass></mainClass>
+						</manifest>
+					</archive>
+				</configuration>
+				<executions>
+					<execution>
+						<id>make-assembly</id>
+						<phase>package</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+					</execution>
+				</executions>
+
+			</plugin>
+
+			<plugin>
+				<groupId>com.theoryinpractise</groupId>
+				<artifactId>clojure-maven-plugin</artifactId>
+				<version>1.3.8</version>
+				<extensions>true</extensions>
+				<configuration>
+					<sourceDirectories>
+						<sourceDirectory>src/clj</sourceDirectory>
+					</sourceDirectories>
+				</configuration>
+				<executions>
+					<execution>
+						<id>compile</id>
+						<phase>compile</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+					</execution>
+					<execution>
+						<id>test</id>
+						<phase>test</phase>
+						<goals>
+							<goal>test</goal>
+						</goals>
+					</execution>
+				</executions>
+
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<configuration>
+					<source>1.6</source>
+					<target>1.6</target>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+</project>

src/main/java/storm/cookbook/tfidf/Conf.java

+package storm.cookbook.tfidf;
+
+
+public class Conf {
+    public static final String REDIS_HOST_KEY = "redisHost";
+    public static final String REDIS_PORT_KEY = "redisPort";
+    public static final String DEFAULT_JEDIS_PORT = "6379";
+}

src/main/java/storm/cookbook/tfidf/TermTopology.java

+package storm.cookbook.tfidf;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+public class TermTopology {
+
+	private TopologyBuilder builder = new TopologyBuilder();
+	private Config conf = new Config();
+	private LocalCluster cluster;
+
+	public TermTopology() {
+		
+	}
+
+	public TopologyBuilder getBuilder() {
+		return builder;
+	}
+
+	public LocalCluster getLocalCluster() {
+		return cluster;
+	}
+
+	public Config getConf() {
+		return conf;
+	}
+
+	public void runLocal(int runTime) {
+		conf.setDebug(true);
+		cluster = new LocalCluster();
+		cluster.submitTopology("test", conf, builder.createTopology());
+		if (runTime > 0) {
+			Utils.sleep(runTime);
+			shutDownLocal();
+		}
+	}
+
+	public void shutDownLocal() {
+		if (cluster != null) {
+			cluster.killTopology("test");
+			cluster.shutdown();
+		}
+	}
+
+	public void runCluster(String name, String redisHost, String cassandraHost)
+			throws AlreadyAliveException, InvalidTopologyException {
+		conf.setNumWorkers(20);
+		StormSubmitter.submitTopology(name, conf, builder.createTopology());
+	}
+
+	public static void main(String[] args) throws Exception {
+
+		TermTopology topology = new TermTopology();
+
+		if (args != null && args.length > 1) {
+			topology.runCluster(args[0], args[1], args[2]);
+		} else {
+			if (args != null && args.length == 1)
+				System.out
+						.println("Running in local mode, redis ip missing for cluster run");
+			topology.runLocal(10000);
+		}
+
+	}
+
+}

src/main/java/storm/cookbook/tfidf/TwitterTrackSpout.java

+package storm.cookbook.tfidf;
+
+import backtype.storm.Config;
+import twitter4j.TwitterStream;
+import twitter4j.TwitterStreamFactory;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import twitter4j.FilterQuery;
+import twitter4j.StallWarning;
+import twitter4j.Status;
+import twitter4j.StatusDeletionNotice;
+import twitter4j.StatusListener;
+
+public class TwitterTrackSpout extends BaseRichSpout {
+	Logger LOG = LoggerFactory.getLogger(TwitterTrackSpout.class); 
+    SpoutOutputCollector collector;
+    LinkedBlockingQueue<Status> queue = null;
+    TwitterStream twitterStream;
+    long maxQueueDepth;
+    String[] trackTerms;
+    
+    public TwitterTrackSpout(long maxQueueDepth, String[] trackTerms) {
+    	this.maxQueueDepth = maxQueueDepth;
+    	this.trackTerms = trackTerms;
+    }
+    
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        queue = new LinkedBlockingQueue<Status>(1000);
+        this.collector = collector;
+        StatusListener listener = new StatusListener() {
+        	
+            @Override
+            public void onStatus(Status status) {
+            	if(queue.size() < maxQueueDepth){
+            		queue.offer(status);
+            	} else {
+            		LOG.error("Queue is now full, the following message is dropped: "+status);
+            	}
+            }
+
+            @Override
+            public void onDeletionNotice(StatusDeletionNotice sdn) {
+            }
+
+            @Override
+            public void onTrackLimitationNotice(int i) {
+            }
+
+            @Override
+            public void onScrubGeo(long l, long l1) {
+            }
+
+            @Override
+            public void onException(Exception e) {
+            }
+
+			@Override
+			public synchronized void onStallWarning(StallWarning arg0) {
+				LOG.error("Stall warning received!");
+			}
+            
+        };
+        twitterStream = new TwitterStreamFactory().getInstance();
+        twitterStream.addListener(listener);
+        FilterQuery filter = new FilterQuery();
+        filter.count(0);
+        filter.track(trackTerms);
+        twitterStream.filter(filter);
+    }
+
+    @Override
+    public void nextTuple() {
+        Status ret = queue.poll();
+        if(ret==null) {
+            Utils.sleep(50);
+        } else {
+            collector.emit(new Values(ret));
+        }
+    }
+
+    @Override
+    public void close() {
+        twitterStream.shutdown();
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        Config ret = new Config();
+        ret.setMaxTaskParallelism(1);
+        return ret;
+    }    
+
+    @Override
+    public void ack(Object id) {
+    }
+
+    @Override
+    public void fail(Object id) {
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("tweet"));
+    }
+    
+}

src/main/resources/log4j.properties

+log4j.rootLogger=DEBUG,DRFA
+
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+
+log4j.appender.DRFA.File=logs/tfidf.log
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n

src/main/resources/twitter4j.properties

+debug=true
+oauth.consumerKey=LjmiPJjcleLMDMlH7qqp9g
+oauth.consumerSecret=jePpbKZvIAUGFqCsYZJTgDVQaCk98HYcz3fBiKfidQ
+oauth.accessToken=220736755-BjuLx3CnWgn9obhcOfmTqeLweYyTJDZjPRhvMBw5
+oauth.accessTokenSecret=54fTeNxU8GFa7o0ENFgWAfqf1uim335nrYXPApEDd50

src/test/java/storm/cookbook/tfidf/IntegrationTestTopology.java

+package storm.cookbook.tfidf;
+
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.json.simple.JSONArray;
+import org.json.simple.JSONValue;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import backtype.storm.utils.Utils;
+
+/**
+ * The integration test basically injects on the input queue, and then
+ * introduces a test bolt which simply persists the tuple into a JSON object
+ * onto an output queue. Note that test is parameter driven, but the cluster is
+ * only shutdown once all tests have run
+ * */
+public class IntegrationTestTopology {
+
+	
+
+}

src/test/java/storm/cookbook/tfidf/StormTestCase.java

+package storm.cookbook.tfidf;
+
+import java.io.IOException;
+
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import junit.framework.TestCase;
+import org.jmock.Expectations;
+import org.jmock.Mockery;
+import org.jmock.api.Action;
+import org.jmock.api.Expectation;
+import org.jmock.internal.ExpectationBuilder;
+import org.jmock.internal.ExpectationCollector;
+import org.jmock.lib.legacy.ClassImposteriser;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+public class StormTestCase {
+    
+	protected Mockery context = new Mockery() {{
+        setImposteriser(ClassImposteriser.INSTANCE);
+    }}; 
+
+    protected Tuple getTuple(){
+        final Tuple tuple = context.mock(Tuple.class);
+        return tuple;
+    }
+
+
+}

src/test/java/storm/cookbook/tfidf/TestBolt.java

+package storm.cookbook.tfidf;
+
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import redis.clients.jedis.Jedis;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+
+public final class TestBolt extends BaseRichBolt {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final transient Logger LOG = Logger.getLogger(TestBolt.class);
+	
+	private static Jedis jedis;
+	
+	private String channel;
+	
+	public TestBolt(String channel){
+		this.channel = channel;
+	}
+	
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+    	jedis = new Jedis("localhost", Integer.parseInt(Conf.DEFAULT_JEDIS_PORT));
+    	jedis.connect();
+    }
+    
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {}
+
+	@Override
+	public void execute(Tuple input) {
+		jedis.rpush(channel, input.getString(1));
+		
+	}
+    
+}

src/test/java/storm/cookbook/tfidf/TestTwitterApi.java

+package storm.cookbook.tfidf;
+
+import twitter4j.FilterQuery;
+import twitter4j.StallWarning;
+import twitter4j.Status;
+import twitter4j.StatusDeletionNotice;
+import twitter4j.StatusListener;
+import twitter4j.TwitterException;
+import twitter4j.TwitterStream;
+import twitter4j.TwitterStreamFactory;
+
+public class TestTwitterApi {
+	public static void main(String[] args) throws TwitterException {
+        TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
+        FilterQuery filter = new FilterQuery();
+        filter.count(0);
+        filter.track(new String[]{"australia"});
+        StatusListener listener = new StatusListener() {
+            @Override
+            public void onStatus(Status status) {
+                System.out.println("@" + status.getUser().getScreenName() + " - " + status.getText());
+            }
+
+            @Override
+            public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
+                System.out.println("Got a status deletion notice id:" + statusDeletionNotice.getStatusId());
+            }
+
+            @Override
+            public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
+                System.out.println("Got track limitation notice:" + numberOfLimitedStatuses);
+            }
+
+            @Override
+            public void onScrubGeo(long userId, long upToStatusId) {
+                System.out.println("Got scrub_geo event userId:" + userId + " upToStatusId:" + upToStatusId);
+            }
+
+            @Override
+            public void onStallWarning(StallWarning warning) {
+                System.out.println("Got stall warning:" + warning);
+            }
+
+            @Override
+            public void onException(Exception ex) {
+                ex.printStackTrace();
+            }
+        };
+        twitterStream.addListener(listener);
+        twitterStream.filter(filter);
+    }
+
+}