event_shipper / lib / event_shipper / udp.rb

require 'socket'

require_relative 'filters'
require_relative 'stats'

module EventShipper
  class UDP
    attr_reader :stats

    def initialize host, port
      @host, @port = host, port
      @socket = UDPSocket.new
      
      # The Bson-Filter acts as a terminator for hash based messages and
      # turns things into BSON for the wire. 
      @filters = []
      wrap Filter::Transmission.new
      
      @stats = Stats.new      
    end

    def close
      @socket.close
    end

    def wrap filter
      @filters << filter
    end

    def encode obj
      @filters.inject(obj) { |o, f| f.en(o) }
    end
    def decode obj
      @filters.reverse.inject([obj, {}]) { |o, f| f.de(*o) }
    end

    # Sends messages via UDP to a central proxy. Using the queue argument, the
    # caller can decide which redis queue events end up in. 
    #
    # @param hash [Hash] Attributes of the event to send. 
    # @param queue [String] Queue to put events into on the other end
    def send hash, queue='queue'
      event = Protocol.event(
        queue: queue, 
        json: hash.to_json)

      @socket.send encode(event), 
        0,    # flags...
        @host, @port
    end

    # Handles a single message; decoding it and passing it to the caller. 
    #
    def handle_message string
      msg, attributes = decode(string)

      if msg
        @stats.count_message

        queue = attributes[:queue]
        yield queue, msg
      else
        @stats.count_failure
      end
    end

    # Enters a loop, receiving messages, yielding them to the block. 
    #
    def dispatch
      @socket.bind @host, @port
      loop do
        datagram, source_info = @socket.recvfrom(10 * 1024)

        handle_message(datagram, &Proc.new)
      end
    end
  end
end
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.