Commits

Quinton Anderson committed 587d311

Completed the topology and run bolt

  • Participants

Comments (0)

Files changed (8)

+# Polyglot Word Count Topology
+
+This example demonstrates the multi language capabilities of storm. It is a bit of a toy example, but it does illustrate things well.
+
+Essentially it is an implementation of the word count example from the storm starter project, however the topology is implemented in Clojure, some of the bolts are implemented in C++ and some in Ruby. 
+

multilang/resources/count.rb

+require "./storm"
+
+class CountBolt < Storm::Bolt
+  attr_accessor :counts
+  def initialize
+    @counts = Hash.new
+  end
+  
+  def process(tup)
+    word = String(tup.values[0])
+    counts[:word] = counts[:word].to_i + 1.to_i
+    emit([word, counts[:word].to_s])  
+  end
+end
+
+CountBolt.new.run

multilang/resources/splitsentence-cpp-bolt

Binary file added.

multilang/resources/storm.rb

+require "rubygems"
+require "json"
+
+module Storm
+  module Protocol
+    class << self
+      attr_accessor :mode, :pending_taskids, :pending_commands
+    end
+
+    self.pending_taskids = []
+    self.pending_commands = []
+
+    def read_message
+      msg = ""
+      loop do
+        line = STDIN.readline.chomp
+        break if line == "end"
+        msg << line
+        msg << "\n"
+      end
+      JSON.parse msg.chomp
+    end
+
+    def read_task_ids
+      Storm::Protocol.pending_taskids.shift ||
+        begin
+          msg = read_message
+          until msg.is_a? Array
+            Storm::Protocol.pending_commands.push(msg)
+            msg = read_message
+          end
+          msg
+        end
+    end
+
+    def read_command
+      Storm::Protocol.pending_commands.shift ||
+        begin
+          msg = read_message
+          while msg.is_a? Array
+            Storm::Protocol.pending_taskids.push(msg)
+            msg = read_message
+          end
+          msg
+        end
+    end
+
+    def send_msg_to_parent(msg)
+      puts msg.to_json
+      puts "end"
+      STDOUT.flush
+    end
+
+    def sync
+      send_msg_to_parent({'command' => 'sync'})
+    end
+
+    def send_pid(heartbeat_dir)
+      pid = Process.pid
+      send_msg_to_parent({'pid' => pid})
+      File.open("#{heartbeat_dir}/#{pid}", "w").close
+    end
+
+    def emit_bolt(tup, args = {})
+      stream = args[:stream]
+      anchors = args[:anchors] || args[:anchor] || []
+      anchors = [anchors] unless anchors.is_a? Enumerable
+      direct = args[:direct_task]
+      m = {:command => :emit, :anchors => anchors.map(&:id), :tuple => tup}
+      m[:stream] = stream if stream
+      m[:task] = direct if direct
+      send_msg_to_parent m
+      read_task_ids unless direct
+    end
+
+    def emit_spout(tup, args = {})
+      stream = args[:stream]
+      id = args[:id]
+      direct = args[:direct_task]
+      m = {:command => :emit, :tuple => tup}
+      m[:id] = id if id
+      m[:stream] = stream if stream
+      m[:task] = direct if direct
+      send_msg_to_parent m
+      read_task_ids unless direct
+    end
+
+    def emit(*args)
+      case Storm::Protocol.mode
+      when 'spout'
+        emit_spout(*args)
+      when 'bolt'
+        emit_bolt(*args)
+      end
+    end
+
+    def ack(tup)
+      send_msg_to_parent :command => :ack, :id => tup.id
+    end
+
+    def fail(tup)
+      send_msg_to_parent :command => :fail, :id => tup.id
+    end
+
+    def log(msg)
+      send_msg_to_parent :command => :log, :msg => msg.to_s
+    end
+
+    def handshake
+      setup_info = read_message
+      send_pid setup_info['pidDir']
+      [setup_info['conf'], setup_info['context']]
+    end
+  end
+
+  class Tuple
+    attr_accessor :id, :component, :stream, :task, :values
+
+    def initialize(id, component, stream, task, values)
+      @id = id
+      @component = component
+      @stream = stream
+      @task = task
+      @values = values
+    end
+
+    def self.from_hash(hash)
+      Tuple.new(*hash.values_at("id", "comp", "stream", "task", "tuple"))
+    end
+  end
+
+  class Bolt
+    include Storm::Protocol
+
+    def prepare(conf, context); end
+
+    def process(tuple); end
+
+    def run
+      Storm::Protocol.mode = 'bolt'
+      prepare(*handshake)
+      begin
+        while true
+          process Tuple.from_hash(read_command)
+        end
+      rescue Exception => e
+        log 'Exception in bolt: ' + e.message + ' - ' + e.backtrace.join('\n')
+      end
+    end
+  end
+
+  class Spout
+    include Storm::Protocol
+
+    def open(conf, context); end
+
+    def nextTuple; end
+
+    def ack(id); end
+
+    def fail(id); end
+
+    def run
+      Storm::Protocol.mode = 'spout'
+      open(*handshake)
+
+      begin
+        while true
+          msg = read_command
+          case msg['command']
+          when 'next'
+            nextTuple
+          when 'ack'
+            ack(msg['id'])
+          when 'fail'
+            fail(msg['id'])
+          end
+          sync
+        end
+      rescue Exception => e
+        log 'Exception in spout: ' + e.message + ' - ' + e.backtrace.join('\n')
+      end
+    end
+  end
+end
+(defproject polyglot-count-topology "0.0.1-SNAPSHOT"
+  :source-paths ["src/clj"]
+  :java-source-paths ["src/jvm" "test/jvm"]
+  :test-paths ["test/clj"]
+  :javac-options     ["-target" "1.6" "-source" "1.6"]
+  :resource-paths ["multilang"]
+  :main storm.cookbook.count-topology
+  :aot :all
+  :min-lein-version "2.0.0"
+  :dependencies [[org.slf4j/slf4j-log4j12 "1.6.1"]
+                 [org.clojure/clojure "1.4.0"]
+                   [commons-collections/commons-collections "3.2.1"]
+                   [storm-starter "0.0.1-SNAPSHOT"]]
+
+  :profiles {:dev {:dependencies [[storm "0.8.2"]
+                     [junit/junit "4.11"]
+                     [org.testng/testng "6.1.1"]]}}
+  
+  )
+

src/clj/storm/cookbook/count_topology.clj

+(ns storm.cookbook.count-topology
+  (:import (backtype.storm StormSubmitter LocalCluster)
+           (storm.cookbook QtSplitSentence RubyCount))
+  (:use [backtype.storm clojure config])
+   )
+
+(defspout sentence-spout ["sentence"]
+  [conf context collector]
+  (let [sentences ["a little brown dog"
+                   "the man petted the dog"
+                   "four score and seven years ago"
+                   "an apple a day keeps the doctor away"]]
+    (spout
+     (nextTuple []
+       (Thread/sleep 100)
+       (emit-spout! collector [(rand-nth sentences)])         
+       )
+     (ack [id]
+        ))))
+
+(defn mk-topology []
+
+  (topology
+   {"1" (spout-spec sentence-spout)}
+   {"3" (bolt-spec {"1" :shuffle}
+                   (QtSplitSentence.)
+                   :p 1)
+    "4" (bolt-spec {"3" ["word"]}
+                   (RubyCount.)
+                   :p 1)}))
+
+(defn run-local! []
+  (let [cluster (LocalCluster.)]
+    (.submitTopology cluster "word-count" {TOPOLOGY-DEBUG true} (mk-topology))
+    (Thread/sleep 10000)
+    (.shutdown cluster)
+    ))
+
+(defn submit-topology! [name]
+  (StormSubmitter/submitTopology
+   name
+   {TOPOLOGY-DEBUG true
+    TOPOLOGY-WORKERS 3}
+   (mk-topology)))
+
+(defn -main
+  ([]
+   (run-local!))
+  ([name]
+   (submit-topology! name)))
+

src/jvm/storm/cookbook/QtSplitSentence.java

+package storm.cookbook;
+
+import java.util.Map;
+
+import backtype.storm.task.ShellBolt;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+
+public class QtSplitSentence extends ShellBolt implements IRichBolt {
+    
+	private static final long serialVersionUID = -2503812433333011106L;
+
+	public QtSplitSentence() {
+        super("splitsentence-cpp-bolt");
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("word"));
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+
+}

src/jvm/storm/cookbook/RubyCount.java

+package storm.cookbook;
+
+import java.util.Map;
+
+import backtype.storm.task.ShellBolt;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+
+public class RubyCount extends ShellBolt implements IRichBolt {
+
+	private static final long serialVersionUID = -5880076377355349028L;
+
+	public RubyCount() {
+        super("ruby","count.rb");
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("word", "count"));
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+
+}