Commits

Quinton Anderson committed 0d9d239

initial commit

Comments (0)

Files changed (2)

+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<groupId>storm.cookbook</groupId>
+	<artifactId>log-topology</artifactId>
+	<version>0.0.1-SNAPSHOT</version>
+	<packaging>jar</packaging>
+
+	<name>log-topology</name>
+	<url>https://bitbucket.org/qanderson/log-topology</url>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
+	<repositories>
+		<repository>
+			<id>github-releases</id>
+			<url>http://oss.sonatype.org/content/repositories/github-releases/</url>
+		</repository>
+		<repository>
+			<id>clojars.org</id>
+			<url>http://clojars.org/repo</url>
+		</repository>
+		<repository>
+			<id>twitter4j</id>
+			<url>http://twitter4j.org/maven2</url>
+		</repository>
+	</repositories>
+
+	<dependencies>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>4.11</version>
+			<scope>test</scope>
+		</dependency>
+        <dependency>
+            <groupId>org.jmock</groupId>
+            <artifactId>jmock-junit4</artifactId>
+            <version>2.5.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.jmock</groupId>
+            <artifactId>jmock-legacy</artifactId>
+            <version>2.5.1</version>
+            <scope>test</scope>
+        </dependency>		<dependency>
+			<groupId>storm</groupId>
+			<artifactId>storm</artifactId>
+			<version>0.8.1</version>
+			<!-- keep storm out of the jar-with-dependencies -->
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>com.googlecode.json-simple</groupId>
+			<artifactId>json-simple</artifactId>
+			<version>1.1</version>
+		</dependency>
+		<dependency>
+			<groupId>redis.clients</groupId>
+			<artifactId>jedis</artifactId>
+			<version>2.1.0</version>
+		</dependency>
+        <dependency>
+            <groupId>commons-httpclient</groupId>
+            <artifactId>commons-httpclient</artifactId>
+            <version>3.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.jmock</groupId>
+            <artifactId>jmock-junit3</artifactId>
+            <version>2.5.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.jmock</groupId>
+            <artifactId>jmock-junit3</artifactId>
+            <version>2.5.1</version>
+        </dependency>
+    </dependencies>
+
+	<build>
+		<plugins>
+		    <!-- 
+		    bind the maven-assembly-plugin to the package phase
+		    this will create a jar file without the storm dependencies
+		    suitable for deployment to a cluster.
+		     -->
+			<plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<configuration>
+					<descriptorRefs>
+						<descriptorRef>jar-with-dependencies</descriptorRef>
+					</descriptorRefs>
+					<archive>
+						<manifest>
+							<mainClass></mainClass>
+						</manifest>
+					</archive>
+				</configuration>
+				<executions>
+					<execution>
+						<id>make-assembly</id>
+						<phase>package</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+					</execution>
+				</executions>
+
+			</plugin>
+
+			<plugin>
+				<groupId>com.theoryinpractise</groupId>
+				<artifactId>clojure-maven-plugin</artifactId>
+				<version>1.3.8</version>
+				<extensions>true</extensions>
+				<configuration>
+					<sourceDirectories>
+						<sourceDirectory>src/clj</sourceDirectory>
+					</sourceDirectories>
+				</configuration>
+				<executions>
+					<execution>
+						<id>compile</id>
+						<phase>compile</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+					</execution>
+					<execution>
+						<id>test</id>
+						<phase>test</phase>
+						<goals>
+							<goal>test</goal>
+						</goals>
+					</execution>
+				</executions>
+
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<configuration>
+					<source>1.6</source>
+					<target>1.6</target>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+</project>

src/main/java/storm/cookbook/LogTopology.java

+package storm.cookbook;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.*;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+public class LogTopology {
+
+    private TopologyBuilder builder = new TopologyBuilder();
+    private Config conf = new Config();
+    private LocalCluster cluster;
+
+    public static final String DEFAULT_JEDIS_PORT = "6379";
+
+    public LogTopology(){
+        
+    }
+
+    public TopologyBuilder getBuilder() {
+        return builder;
+    }
+    
+    public LocalCluster getLocalCluster(){
+    	return cluster;
+    }
+
+    public Config getConf() {
+        return conf;
+    }
+
+    public void runLocal(int runTime){
+        conf.setDebug(true);
+        //conf.put(Conf.REDIS_HOST_KEY, "localhost");
+        cluster = new LocalCluster();
+        cluster.submitTopology("test", conf, builder.createTopology());
+        if(runTime > 0){
+            Utils.sleep(runTime);
+            shutDownLocal();
+        }
+    }
+
+    public void shutDownLocal(){
+        if(cluster != null){
+            cluster.killTopology("test");
+            cluster.shutdown();
+        }
+    }
+
+    public void runCluster(String name, String redisHost) throws AlreadyAliveException, InvalidTopologyException {
+        conf.setNumWorkers(20);
+        //conf.put(Conf.REDIS_HOST_KEY, redisHost);
+        StormSubmitter.submitTopology(name, conf, builder.createTopology());
+    }
+
+
+
+	public static void main(String[] args) throws Exception {
+
+        LogTopology topology = new LogTopology();
+        
+
+        if(args!=null && args.length > 1) {
+            topology.runCluster(args[0], args[1]);
+        } else {
+            if(args!=null && args.length == 1)
+                System.out.println("Running in local mode, redis ip missing for cluster run");
+            topology.runLocal(10000);
+        }
+
+	}
+
+}