Commits

Kaspar Schiess committed 6b026b1

Better handling of invalid messages

Comments (0)

Files changed (4)

lib/event_shipper/filter/transmission.rb

 
       event = transmission.events.first
       JSON.parse(event.json)
+    
+    rescue ProtocolBuffers::DecodeError
+      return nil
     end
   end
 end

lib/event_shipper/stats.rb

+
+module EventShipper
+  class Stats
+    def initialize
+      reset
+    end
+
+    def count_message
+      @messages += 1
+    end
+    def count_failure
+      @failures += 1
+    end
+
+    def reset
+      @period_start = Time.now
+      @messages = @failures = 0
+    end
+
+    def to_s
+      seconds = Time.now - @period_start 
+
+      mps = @messages / Float(seconds)
+      fps = @failures / Float(seconds)
+
+      "#{mps} messages/s, #{fps} failures/s"
+    end
+  end
+end

lib/event_shipper/udp.rb

 require 'socket'
 
 require_relative 'filters'
+require_relative 'stats'
 
 module EventShipper
   class UDP
       @filters = []
       wrap Filter::Transmission.new
       
-      @messages_per_period = 0
-      @period_start = Time.now
-      Thread.start do
-        loop do
-          puts "total #{(@messages_per_period / (Time.now - @period_start)).round(3)} msgs/s"
-          
-          @messages_per_period = 0
-          @period_start = Time.now
-
-          sleep 10
-        end
-      end
+      @stats = Stats.new      
     end
 
     def wrap filter
     end
 
     def send hash
-      @messages_per_period += 1
       @socket.send encode(hash), 
         0,    # flags...
         @host, @port
     end
 
+    # Handles a single message; decoding it and passing it to the caller. 
+    #
+    def handle_message string
+      msg = decode(string)
+      queue = nil
+
+      if msg
+        @stats.count_message
+        yield queue, msg
+      else
+        @stats.count_failure
+      end
+    end
+
     # Enters a loop, receiving messages, yielding them to the block. 
     #
     def dispatch
       @socket.bind @host, @port
       loop do
         datagram, source_info = @socket.recvfrom(10 * 1024)
-        @messages_per_period += 1
-
-        hash = decode(datagram)
 
-        yield nil, hash
+        handle_message(datagram)
       end
     end
   end

spec/lib/event_shipper/udp_spec.rb

         JSON.parse(event.json).should == {'a' => 1} }
     end 
   end
+  describe "#handle_message(string)" do
+    let(:msg) { udp.encode(key: 'value') }
+    
+    it "yields a decoded message to the block" do
+      button = flexmock('button').tap { |fm| fm.should_receive(:press).
+        once }
+        
+      udp.handle_message(msg) do |queue, hash|
+        hash['key'].should == 'value'
+        button.press
+      end
+    end
+    it "ignores messages that cannot be decoded" do
+      button = flexmock('button').tap { |fm| fm.should_receive(:press).
+        never }
+        
+      udp.handle_message('invalid message') do
+        button.press
+      end
+    end
+  end
 
   class SeqFilter
     @call_seq = []
 
       SeqFilter.call_seq.should == [:c, :d, :d, :c]
     end
+
+    it 'returns nil if message is invalid' do
+      udp.decode('invalid_message').should be_nil
+    end
   end
 end