Commits

Kaspar Schiess committed 7b21fff

Really transports queue name

Comments (0)

Files changed (6)

 
+= 0.3
+
+  + Handles corrupt messages by dropping them. 
+  + Statistics output on SIGINFO; most OSs have that on Ctrl+T. 
+
 = 0.2
   
   + Proper framing of transmissions; transmissions are now versioned and 

lib/event_shipper/filter/transmission.rb

   class Transmission
     include EventShipper::Protocol
 
-    def en hash
-      transmission(
-        event(
-          queue: 'queue', 
-          json: hash.to_json)).serialize_to_string
+    def en event
+      transmission(event).serialize_to_string
     end
-    def de string
+    def de string, attributes={}
       transmission = parse_transmission(string)
 
       if transmission.version != 1
       end
 
       event = transmission.events.first
-      JSON.parse(event.json)
-    
+      attributes[:queue] = event.queue
+
+      return JSON.parse(event.json), attributes
+
     rescue ProtocolBuffers::DecodeError
       return nil
     end

lib/event_shipper/protocol.rb

 require_relative 'protocol/v1.pb.rb'
 
 module EventShipper::Protocol
+
+module_function
   def transmission *events
     Transmission.new(
       version: 1, 

lib/event_shipper/udp.rb

       @filters.reverse.inject(obj) { |o, f| f.de(o) }
     end
 
-    def send hash
-      @socket.send encode(hash), 
+    # Sends messages via UDP to a central proxy. Using the queue argument, the
+    # caller can decide which redis queue events end up in. 
+    #
+    # @param hash [Hash] Attributes of the event to send. 
+    # @param queue [String] Queue to put events into on the other end
+    def send hash, queue='queue'
+      event = Protocol.event(
+        queue: queue, 
+        json: hash.to_json)
+
+      @socket.send encode(event), 
         0,    # flags...
         @host, @port
     end
     # Handles a single message; decoding it and passing it to the caller. 
     #
     def handle_message string
-      msg = decode(string)
-      queue = nil
+      msg, attributes = decode(string)
 
       if msg
         @stats.count_message
+
+        queue = attributes[:queue]
         yield queue, msg
       else
         @stats.count_failure

spec/lib/event_shipper/filter/transmission_spec.rb

+require 'spec_helper'
+
+require 'event_shipper'
+
+describe EventShipper::Filter::Transmission do
+  let(:transmission) { described_class.new }
+  
+  let(:event) { EventShipper::Protocol.event({
+    queue: 'queue',
+    json: { key: 'value' }.to_json }) }
+  let(:message) { transmission.en(event) }
+
+  describe "#de" do
+    it "returns hash and context" do
+      hash, attrs = transmission.de(message)
+      attrs[:queue].should == 'queue'
+      hash['key'].should == 'value'
+    end
+  end
+end

spec/lib/event_shipper/udp_spec.rb

 
 require 'event_shipper'
 require 'event_shipper/udp'
+require 'event_shipper/protocol'
 
 describe EventShipper::UDP do
-  include EventShipper::Protocol
-
   let(:udp) { described_class.new('127.0.0.1', 5050) }
+  let(:event) { EventShipper::Protocol.event({
+    queue: 'queue',
+    json: { key: 'value' }.to_json }) }
+
   describe "#send(hash)" do
     let(:socket) { UDPSocket.new }
     it "turns the hash into a bson string and sends it" do
       udp.send(a: 1)
       sleep 0.01 until ready
 
-      transmission = parse_transmission(msg)
+      transmission = EventShipper::Protocol.parse_transmission(msg)
       transmission.version.should == 1
 
       transmission.events.first.tap { |event| 
     end 
   end
   describe "#handle_message(string)" do
-    let(:msg) { udp.encode(key: 'value') }
+    let(:msg) { udp.encode(event) }
     
     it "yields a decoded message to the block" do
       button = flexmock('button').tap { |fm| fm.should_receive(:press).
         once }
         
       udp.handle_message(msg) do |queue, hash|
+        queue.should == 'queue'
         hash['key'].should == 'value'
         button.press
       end
       button = flexmock('button').tap { |fm| fm.should_receive(:press).
         never }
         
-      udp.handle_message('invalid message') do
+      udp.handle_message('invalid messages') do
         button.press
       end
     end
       udp.wrap a
       udp.wrap b
 
-      udp.encode({})
+      udp.encode(event)
 
       SeqFilter.call_seq.should == [:a, :b]
     end
       udp.wrap c
       udp.wrap d
 
-      udp.decode(udp.encode({}))
+      udp.decode(udp.encode(event))
 
       SeqFilter.call_seq.should == [:c, :d, :d, :c]
     end