Commits

Quinton Anderson committed a02b3ed

Got the basic hello world topology working

  • Participants

Comments (0)

Files changed (4)

+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<groupId>storm.cookbook</groupId>
+	<artifactId>hello-world</artifactId>
+	<version>0.0.1-SNAPSHOT</version>
+	<packaging>jar</packaging>
+
+	<name>hello-world</name>
+	<url>https://bitbucket.org/qanderson/hello-world</url>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
+	<repositories>
+		<repository>
+			<id>github-releases</id>
+			<url>http://oss.sonatype.org/content/repositories/github-releases/</url>
+		</repository>
+		<repository>
+			<id>clojars.org</id>
+			<url>http://clojars.org/repo</url>
+		</repository>
+		<repository>
+			<id>twitter4j</id>
+			<url>http://twitter4j.org/maven2</url>
+		</repository>
+	</repositories>
+
+	<dependencies>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>3.8.1</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>storm</groupId>
+			<artifactId>storm</artifactId>
+			<version>0.8.1</version>
+			<!-- keep storm out of the jar-with-dependencies -->
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>com.googlecode.json-simple</groupId>
+			<artifactId>json-simple</artifactId>
+			<version>1.1</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+		    <!-- 
+		    bind the maven-assembly-plugin to the package phase
+		    this will create a jar file without the storm dependencies
+		    suitable for deployment to a cluster.
+		     -->
+			<plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<configuration>
+					<descriptorRefs>
+						<descriptorRef>jar-with-dependencies</descriptorRef>
+					</descriptorRefs>
+					<archive>
+						<manifest>
+							<mainClass></mainClass>
+						</manifest>
+					</archive>
+				</configuration>
+				<executions>
+					<execution>
+						<id>make-assembly</id>
+						<phase>package</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+					</execution>
+				</executions>
+
+			</plugin>
+
+			<plugin>
+				<groupId>com.theoryinpractise</groupId>
+				<artifactId>clojure-maven-plugin</artifactId>
+				<version>1.3.8</version>
+				<extensions>true</extensions>
+				<configuration>
+					<sourceDirectories>
+						<sourceDirectory>src/clj</sourceDirectory>
+					</sourceDirectories>
+				</configuration>
+				<executions>
+					<execution>
+						<id>compile</id>
+						<phase>compile</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+					</execution>
+					<execution>
+						<id>test</id>
+						<phase>test</phase>
+						<goals>
+							<goal>test</goal>
+						</goals>
+					</execution>
+				</executions>
+
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<configuration>
+					<source>1.6</source>
+					<target>1.6</target>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+</project>

File src/main/java/storm/cookbook/HelloWorldBolt.java

+package storm.cookbook;
+
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+public class HelloWorldBolt extends BaseRichBolt {
+	
+	public static Logger LOG = Logger.getLogger(HelloWorldBolt.class);
+	
+	private static final long serialVersionUID = -841805977046116528L;
+	
+	private int myCount = 0;
+
+	@Override
+	public void prepare(Map stormConf, TopologyContext context,
+			OutputCollector collector) {
+	}
+
+	@Override
+	public void execute(Tuple input) {
+		String test = input.getStringByField("sentence");
+		if(test == "Hello World"){
+			myCount++;
+			System.out.println("Found a Hello World! My Count is now: " + Integer.toString(myCount));
+		}
+	}
+
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		declarer.declare(new Fields("myCount"));
+
+	}
+
+}

File src/main/java/storm/cookbook/HelloWorldSpout.java

+package storm.cookbook;
+
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.log4j.Logger;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+public class HelloWorldSpout extends BaseRichSpout {
+	
+	private static final long serialVersionUID = -4646687160233411001L;
+
+	public static Logger LOG = Logger.getLogger(HelloWorldSpout.class);
+
+	SpoutOutputCollector collector;
+	
+	private int referenceRandom;
+	
+	private static final int MAX_RANDOM = 10;
+	
+	public HelloWorldSpout(){
+		final Random rand = new Random();
+		referenceRandom = rand.nextInt(MAX_RANDOM);
+	}
+
+	@Override
+	public void open(Map conf, TopologyContext context,
+			SpoutOutputCollector collector) {
+		this.collector = collector;
+
+	}
+
+	@Override
+	public void nextTuple() {
+		Utils.sleep(100);
+		final Random rand = new Random();
+		int instanceRandom = rand.nextInt(MAX_RANDOM);
+		if(instanceRandom == referenceRandom){
+			collector.emit(new Values("Hello World"));
+		} else {
+			collector.emit(new Values("Other Random Word"));
+		}
+		
+
+	}
+
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		declarer.declare(new Fields("sentence"));
+	}
+
+}

File src/main/java/storm/cookbook/HelloWorldTopology.java

+package storm.cookbook;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.utils.Utils;
+
+public class HelloWorldTopology {
+
+	/**
+	 * @param args
+	 * @throws Exception 
+	 * @throws  
+	 */
+	public static void main(String[] args) throws Exception {
+		TopologyBuilder builder = new TopologyBuilder();
+        
+        builder.setSpout("randomHelloWorld", new HelloWorldSpout(), 10);        
+        builder.setBolt("HelloWorldBolt", new HelloWorldBolt(), 1)
+                .shuffleGrouping("randomHelloWorld");
+                
+        Config conf = new Config();
+        conf.setDebug(true);
+        
+        if(args!=null && args.length > 0) {
+            conf.setNumWorkers(3);
+            
+            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
+        } else {
+        
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", conf, builder.createTopology());
+            Utils.sleep(10000);
+            cluster.killTopology("test");
+            cluster.shutdown();    
+        }
+
+	}
+
+}