Source

event_shipper / lib / event_shipper / udp.rb

require 'socket'

require_relative 'filters'
require_relative 'stats'

module EventShipper
  class UDP
    attr_reader :stats

    def initialize host, port
      @host, @port = host, port
      @socket = UDPSocket.new
      
      # The Bson-Filter acts as a terminator for hash based messages and
      # turns things into BSON for the wire. 
      @filters = []
      wrap Filter::Transmission.new
      
      @stats = Stats.new      
    end

    def wrap filter
      @filters << filter
    end

    def encode obj
      @filters.inject(obj) { |o, f| f.en(o) }
    end
    def decode obj
      @filters.reverse.inject(obj) { |o, f| f.de(o) }
    end

    def send hash
      @socket.send encode(hash), 
        0,    # flags...
        @host, @port
    end

    # Handles a single message; decoding it and passing it to the caller. 
    #
    def handle_message string
      msg = decode(string)
      queue = nil

      if msg
        @stats.count_message
        yield queue, msg
      else
        @stats.count_failure
      end
    end

    # Enters a loop, receiving messages, yielding them to the block. 
    #
    def dispatch
      @socket.bind @host, @port
      loop do
        datagram, source_info = @socket.recvfrom(10 * 1024)

        handle_message(datagram)
      end
    end
  end
end