Source

event_shipper / lib / event_shipper / udp.rb

require 'socket'

require_relative 'filters'
require_relative 'stats'

module EventShipper
  class UDP
    attr_reader :stats

    def initialize host, port
      @host, @port = host, port
      @socket = UDPSocket.new
      
      # The Bson-Filter acts as a terminator for hash based messages and
      # turns things into BSON for the wire. 
      @filters = []
      wrap Filter::Transmission.new
      
      @stats = Stats.new      
    end

    def wrap filter
      @filters << filter
    end

    def encode obj
      @filters.inject(obj) { |o, f| f.en(o) }
    end
    def decode obj
      @filters.reverse.inject(obj) { |o, f| f.de(o) }
    end

    # Sends messages via UDP to a central proxy. Using the queue argument, the
    # caller can decide which redis queue events end up in. 
    #
    # @param hash [Hash] Attributes of the event to send. 
    # @param queue [String] Queue to put events into on the other end
    def send hash, queue='queue'
      event = Protocol.event(
        queue: queue, 
        json: hash.to_json)

      @socket.send encode(event), 
        0,    # flags...
        @host, @port
    end

    # Handles a single message; decoding it and passing it to the caller. 
    #
    def handle_message string
      msg, attributes = decode(string)

      if msg
        @stats.count_message

        queue = attributes[:queue]
        yield queue, msg
      else
        @stats.count_failure
      end
    end

    # Enters a loop, receiving messages, yielding them to the block. 
    #
    def dispatch
      @socket.bind @host, @port
      loop do
        datagram, source_info = @socket.recvfrom(10 * 1024)

        handle_message(datagram)
      end
    end
  end
end