1. David Powers
  2. omq




  • more simple than dns (name channels)
  • fast
  • message based
  • built-in error handling
  • built-in timeouts
  • built-in heartbeats
  • versioning of messages?
  • sync and async
  • real message queues
  • serialization to disk

Queue types

We implement two types of queues that are designed to deal with different types of workloads:

broadcast queues send all messages to all subscribers round-robin queues send each message to only one subscriber

module Delivery : sig
  type t =
    | Broadcast
    | Round_robin

Additionally, each queue may be persistent, or transient. Persistent queues are serialized to disk (possibly before or after the message is sent, and later subscribers may replay the queue from the beginning (in the case of broadcast queues).

module Sync : sig
  type t =
    (* written before sending to clients, and flush/fsync called *)
    | Strong
    (* written before sending to clients, flushed every few seconds *)
    | Weak
    (* may be written before or after sending to clients, no flushing *)
    | Async

module Persistence : sig
  type t =
    | Persistent of Sync.t
    | Transient


First, the type as exposed in OCaml:

module Version : sig
  type t

  val one : t

module Client : sig
  module Replay : sig
    type t =
      | From_beginning
      | From_message of Message_id.t
      | From_current

  type t =
    | Connect of Version.t * Id.t
    | Subscribe of string * Replay.t
    | Unsubscribe of string
    | Ack of Time.t

module Topic : sig
  module Id : sig
    type t

  type t = {
    id: Id.t;
    topic: string;

module Message : sig
  module Id : sig
    type t

  type t = {
    id: Id.t;
    topic : Topic.Id.t;
    body: string;

module Server : sig
  type t =
    | Error of string
    | Ping of Time.t
    | Message of Message.t
    | Subscribed of Topic.t
    | Unsubscribed of Topic.t

The wire protocol is designed with the following in mind:

  • We may change our mind, so some form of version on connection is required
  • Loose authentication is a nice to have
  • Watching the traffic with standard network monitoring tools should be possible
  • Messages should have minimal overhead


All numbers are expressed in big-endian format


The first message sent by a client on connection is the version number, expressed as a signed 64bit integer. The server must respond with either the same number, indicating that all future communication until the end of this connection will use this version of the protocol, or with -1, indicating that this version of the protocol is unsupported. If the protocol is unsupported the server and client are both expected to disconnect without further exchanges.

After this point we are discussing the specifics of protocol one.

Message format

Here message refers to a higher level in the protocol than the Message.t in the OCaml type spec.

Messages from the client are all sent as sexps conforming to the normal sexp converters followed by a newline.

Messages from the server start with a 32bit unsigned integer indicating the length of the next message in bytes.

This is followed by a single byte indicating the type of the message (included in the length): 0 -> Error 1 -> Ping 2 -> Message 3 -> Subscribed 4 -> Unsubscribed

The payload following the byte is dependent on the type as follows:

Error - all remaining bytes are a string describing the error Ping - the epoch representation of the current time as a 32 bit number Subscribed - the full sexp of the OCaml type representation Unsubscribed - the full sexp of the OCaml type representation

Message - A 64bit number with the message id A 32bit number with the topic id The remaining bytes are the message. The format of a message is unspecified.

Pings and Acks

Regardless of any other communication the server is expected to ping the client at least once every 50 seconds, and the

The wire protocol is designed to be very low overhead for messages and generally human-readable for control messages. No attempt has been made to conform to any existing message middleware standard. It