1. Kyle Schaffrick
  2. amqpev

Commits

Kyle Schaffrick  committed 6e6737e

Reorganize distribution and add examples.

  • Participants
  • Parent commits 5754ed1
  • Branches default

Comments (0)

Files changed (34)

File __init__.py

  • Ignore whitespace
-import logging
-
-class NullHandler(logging.Handler):
-    def emit(self, record):
-        pass
-
-logging.getLogger("amqpev").addHandler(NullHandler())

File amqpev/__init__.py

View file
  • Ignore whitespace
+import logging
+
+class NullHandler(logging.Handler):
+    def emit(self, record):
+        pass
+
+logging.getLogger("amqpev").addHandler(NullHandler())

File amqpev/api.py

View file
  • Ignore whitespace
+from protocol import Connection
+from transport import GreenTCPTransport
+from spec.spec import spec_0_8 as spec
+
+import logging
+
+log = logging.getLogger("amqpev.api")
+
+class BrokerServiceFactory(object):
+    def __init__(self, host):
+        trans = GreenTCPTransport(host)
+        self.conn = Connection(trans)
+
+    def __call__(self):
+        return BrokerService(self.conn.channel())
+
+    def close(self):
+        self.conn.close()
+
+
+class BrokerService(object):
+    """
+    A BrokerService object along with it's helper classes for the various
+    objects in AMQP model provide a high-level API for interacting with the
+    AMQP broker services. A BrokerService is essentially an adapter around a
+    pre-configured channel.
+    """
+    def __init__(self, channel):
+        self._ch = channel
+        self._exchanges = {
+                '': Exchange(self._ch, "", 'direct', False, False, {},
+                    pre_declared=True),
+                'amq.direct': Exchange(self._ch, "amq.direct", 'direct', False,
+                    False, {}, pre_declared=True),
+                'amq.fanout': Exchange(self._ch, "amq.fanout", 'fanout', False,
+                    False, {}, pre_declared=True),
+                'amq.topic': Exchange(self._ch, "amq.topic", 'topic', False,
+                    False, {}, pre_declared=True) }
+        self._queues = {}
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, type, value, tb):
+        try:
+            self.close()
+        except:
+            if type is None:
+                log.exception("Exception while closing BrokerService.")
+                raise
+            else:
+                log.exception("Masking exception while shutting down"
+                        " BrokerService")
+
+    def close(self):
+        self._ch.close()
+
+    @property
+    def channel(self):
+        return self._ch
+
+    def exchange(self, name=None, type='direct', passive=False, durable=False,
+            arguments={}):
+        """
+        If we've seen this exchange name before, return the same Exchange
+        object from our identity map. If we haven't, declare it on the channel,
+        save, and return the new Exchange.
+        """
+        if name is None:
+            raise ValueError("Exchanges must be explicitly named")
+        elif name in self._exchanges:
+            return self._exchanges[name]
+        else:
+            ex = Exchange(self._ch, name, type, passive, durable, arguments)
+            self._exchanges[ex.name] = ex
+            return ex
+
+    def queue(self, name=None, passive=False, durable=False, exclusive=None,
+            auto_delete=None, arguments=None):
+        """
+        If we've seen this queue name before, return the same Queue object from
+        our identity map. If we haven't, declare it on the channel, save, and
+        return.
+
+        If no name is given, a new, exclusive, auto-delete queue will always be
+        declared and returned.
+        """
+        if exclusive is None:
+            exclusive = name is None
+        if auto_delete is None:
+            auto_delete = name is None
+        arguments = arguments or {}
+
+        if name is not None and name in self._queues:
+            return self._queues[name]
+        else:
+            q = Queue(self._ch, self, name, passive, durable, exclusive,
+                    auto_delete, arguments)
+            self._queues[q.name] = q
+            return q
+
+    def publish(self, message, routing_key, **kwargs):
+        """
+        Convenience for publishing on the default direct exchange.
+        """
+        return self.exchange('').publish(message, routing_key, **kwargs)
+
+    def consume_from(self, *queues, **kwargs):
+        """
+        Consume from some set of queues. Returns a context manager object,
+        specifically a ConsumerSet.
+        """
+        no_ack = kwargs.get('no_ack', False)
+        prefetch = kwargs.get('prefetch', None)
+        return ConsumerSet(self._ch, queues, no_ack, prefetch)
+
+    def ack(self, message, thru=False):
+        """
+        Acknowledge a message.
+        """
+        self._ch.send_method(spec.basic.ack,
+                delivery_tag=message.delivery_headers['delivery_tag'],
+                multiple=thru)
+
+
+class Exchange(object):
+    """
+    This is a helper object representing an AMQP exchange in the high-level
+    API.
+    """
+    def __init__(self, channel, name, type, passive, durable, arguments,
+            pre_declared=False):
+        self._ch = channel
+        self._name = name
+
+        if not pre_declared:
+            self._ch.sync_method(spec.exchange.declare,
+                    (spec.exchange.declare_ok,), ticket=0, exchange=name,
+                    type=type, passive=passive, durable=durable,
+                    auto_delete=False, internal=False, nowait=False,
+                    arguments=arguments)
+
+    @property
+    def name(self):
+        return self._name
+
+    def publish(self, message, routing_key="", mandatory=False, immediate=False):
+        self._ch.send_method(spec.basic.publish, ticket=0, exchange=self._name,
+                routing_key=routing_key, mandatory=mandatory,
+                immediate=immediate)
+        self._ch.send_content(spec.basic, message.properties, message.body,
+                len(message.body))
+
+
+class Queue(object):
+    """
+    This helper object represents an AMQP queue in the high-level API.
+    """
+    def __init__(self, channel, broker_svc, name, passive, durable, exclusive,
+            auto_delete, arguments):
+        self._ch = channel
+        self._bs = broker_svc
+        self._name = "" if name is None else name
+
+        rx_m = self._ch.sync_method(spec.queue.declare,
+                (spec.queue.declare_ok,), ticket=0, queue=self._name,
+                passive=passive, durable=durable, exclusive=exclusive,
+                auto_delete=auto_delete, nowait=False, arguments=arguments)
+
+        self._name = rx_m.args['queue']
+
+    @property
+    def name(self):
+        return self._name
+
+    def bind(self, exch, routing_key="", arguments=None):
+        arguments = arguments or {}
+        self._ch.sync_method(spec.queue.bind, (spec.queue.bind_ok,), ticket=0,
+                queue=self.name, exchange=exch.name, routing_key=routing_key,
+                nowait=False, arguments=arguments)
+        return self
+
+    def publish(self, message, **kwargs):
+        """
+        Convenience for publishing to this queue via the default direct
+        exchange.
+        """
+        return self._bs.publish(message, self.name, **kwargs)
+
+    def get(self, no_ack=True):
+        rx_m = self._ch.sync_method(spec.basic.get,
+                (spec.basic.get_ok, spec.basic.get_empty),
+                ticket=0, queue=self.name, no_ack=no_ack)
+
+        if rx_m.method == spec.basic.get_empty:
+            return None
+
+        (headers, body_len, body) = self._ch.recv_content()
+        msg = Message(body, headers)
+        msg.delivery_headers = rx_m.args.copy()
+        msg._consumed_from_queue = self
+        return msg
+
+    def __contains__(self, message):
+        """
+        This magic method allows neatly testing if a delivered message came
+        from a particular queue using the expression "msg in queue". Returns
+        true if the message was delivered from this queue.
+        """
+        return message._consumed_from_queue is self
+
+
+class Message(object):
+    """
+    An AMQP message, with payload and headers.
+    """
+    def __init__(self, body, properties=None):
+        self.body = body
+        self.properties = properties or {}
+        self.delivery_headers = {}
+
+    def encode(self):
+        return self.body
+
+    @classmethod
+    def decode(cls, encoded_body):
+        return encoded_body
+
+
+class ConsumerSet(object):
+    """
+    A set of consumers that can be used to asynchronously consume messages from
+    some set of queues.
+    """
+    def __init__(self, channel, queues, no_ack=False, prefetch=None):
+        self._ch = channel
+        self.queues = queues
+        self._c_tags = {}
+        self._state = 'stopped'
+        self._no_ack = no_ack
+        self._prefetch = prefetch
+
+    def __enter__(self):
+        self.start_consumers()
+        return self
+
+    def __exit__(self, type, value, tb):
+        try:
+            self.stop_consumers()
+            # This is to consume any pending messages from the receive queues.
+            # stop_consumers() does not do this because the application may
+            # call stop_consumers() and continue iterating to actually consume
+            # those messages intead of discarding them.
+            list(self)
+        except:
+            if type is None:
+                log.exception("Exception while shutting down ConsumerSet.")
+                raise
+            else:
+                log.exception("Masking exception while shutting down"
+                        " ConsumerSet.")
+
+    def start_consumers(self):
+        if self._state == 'running': return
+
+        if self._prefetch:
+            self._ch.sync_method(spec.basic.qos, (spec.basic.qos_ok,),
+                    prefetch_size=0, prefetch_count=self._prefetch,
+                    global_=False)
+
+        for queue in self.queues:
+            rx_m = self._ch.sync_method(spec.basic.consume,
+                    (spec.basic.consume_ok,), ticket=0, queue=queue.name,
+                    consumer_tag='', no_local=False, no_ack=self._no_ack,
+                    exclusive=False, nowait=False)
+            self._c_tags[ rx_m.args['consumer_tag'] ] = queue
+
+        self._state = 'running'
+
+    def stop_consumers(self):
+        if self._state != 'running': return 
+
+        for c_tag in self._c_tags.keys():
+            self._ch.send_method(spec.basic.cancel, consumer_tag=c_tag,
+                    nowait=False)
+
+        self._state = 'stop-wait'
+
+    def next_message(self):
+        if self._state == 'stopped':
+            return
+
+        rx_m = self._ch.recv_method()
+
+        if rx_m.method == spec.basic.deliver:
+            (headers, body_len, body) = self._ch.recv_content()
+            msg = Message(body, headers)
+            msg.delivery_headers = rx_m.args.copy()
+            msg._consumed_from_queue = self._c_tags[
+                    msg.delivery_headers['consumer_tag'] ]
+            return msg
+
+        elif rx_m.method == spec.basic.cancel_ok:
+            self._c_tags.pop(rx_m.args['consumer_tag'])
+
+            if not self._c_tags:
+                if self._prefetch:
+                    self._ch.sync_method(spec.basic.qos, (spec.basic.qos_ok,),
+                            prefetch_size=0, prefetch_count=0, global_=False)
+                self._state == 'stopped'
+                return
+
+    def __iter__(self):
+        while True:
+            msg = self.next_message()
+            if msg is None:
+                return
+            else:
+                yield msg

File amqpev/binops.py

View file
  • Ignore whitespace
+import struct
+from re import finditer
+import datetime
+
+
+def pack_splice(spec, fields):
+    """
+    Enhanced version of struct.pack which supports splicing arbitrary-length
+    binary data, as well as packing contiguous strings of bits.
+
+    Adds the following format characters:
+
+      ':' - Indicates a splice operation. The corresponding datum in the
+            argument list should be a string, and will be spliced wholly into
+            the output at this location.
+      '.' - Indicates a packed bit. Each occurrence will consume one datum from
+            the argument list, and strings of contiguous bits will be packed
+            into bytes, working from LSB to MSB order within each byte.
+    """
+    bit_vectors = [ (m.start(), m.end())
+            for m in finditer(r'\.{1,8}', spec) ]
+
+    bit_packed_fields = list(fields)
+    spec_list = list(spec)
+
+    for start, end in reversed(bit_vectors):
+        bits = fields[start:end]
+        byte = 0
+
+        for i, bit in enumerate(bits):
+            if bit: byte |= 1 << i
+
+        bit_packed_fields[start:end] = [byte]
+        spec_list[start:end] = ['B']
+
+    bit_packed_spec = ''.join(spec_list)
+
+    chunks = bit_packed_spec.split(':')
+    packed = ''
+    bit_packed_fields.insert(0, '')
+
+    for chunk in chunks:
+        packed += bit_packed_fields.pop(0)
+        nr_elems = len(chunk)
+
+        if nr_elems > 0:
+            packed += struct.pack('!' + chunk, *bit_packed_fields[:nr_elems])
+            bit_packed_fields = bit_packed_fields[nr_elems:]
+
+    return packed
+
+
+def unpack_splice(spec, buf, return_consumed_bytes=False):
+    """
+    Basically the inverse of pack_splice(), with one extra restriction: Each
+    occurrence of a splice (':') should be preceded by a format character that
+    produces an integer when unpacked. The value of this integer will indicate
+    the number of bytes to slice from the packed buffer, starting at this
+    point. Unpacking continues starting with the next byte following.
+
+    This means that binary formats that include splices are required to be
+    preceded by their length in the packed data. Note however that for
+    compatibility with pack_splice(), the length will appear in the output
+    list.
+    """
+    bit_vectors = [ (m.start(), m.end())
+            for m in finditer(r'\.{1,8}', spec) ]
+
+    spec_list = list(spec)
+    bit_unpack_map = {}
+
+    for start, end in reversed(bit_vectors):
+        spec_list[start:end] = ['B']
+        bit_unpack_map[start] = end - start
+
+    bit_packed_spec = ''.join(spec_list)
+
+    bit_packed_fields = []
+    chunks = bit_packed_spec.split(':')
+    total_consumed_bytes = 0
+
+    for chunk in chunks:
+
+        if bit_packed_fields:
+            unsplice_len = bit_packed_fields[-1]
+            bit_packed_fields.append(buf[:unsplice_len])
+            buf = buf[unsplice_len:]
+            total_consumed_bytes += unsplice_len
+
+        consumed_bytes = struct.calcsize('!' + chunk)
+        bit_packed_fields.extend(struct.unpack('!' + chunk, buf[:consumed_bytes]))
+        buf = buf[consumed_bytes:]
+        total_consumed_bytes += consumed_bytes
+
+    fields = list(bit_packed_fields)
+
+    for packed_field_idx in sorted(bit_unpack_map.keys()):
+        nr_bits = bit_unpack_map[packed_field_idx]
+        field = fields[packed_field_idx]
+        bit_vector = []
+
+        for i in range(nr_bits):
+            bit_vector.append(bool(field & (1 << i)))
+
+        fields[packed_field_idx:packed_field_idx+1] = bit_vector
+
+    if return_consumed_bytes:
+        return (fields, total_consumed_bytes)
+    else:
+        return fields
+
+
+def pack_str(val):
+    return (len(val), val)
+
+def unpack_str(size, data):
+    return data
+
+def pack_decimal(val):
+    # TODO
+    return (0, int(val))
+
+def unpack_decimal(scale, value):
+    # TODO
+    return value
+
+def pack_fieldarray(val):
+    return (0, '')
+
+def unpack_fieldarray(size, data):
+    # TODO
+    return data
+
+def pack_table(table_data):
+    coding = ''
+    packing_list = []
+
+    for field_name, val in table_data.iteritems():
+        coding += 'B:'
+        packing_list += pack_str(field_name)
+
+        # FIXME This works around a bug in rabbit_binary_parser:parse_table
+        # where datatype "s" is interpreted as a int16 integer, and not a
+        # string with a uint8 size as specified. So to get along with Rabbit,
+        # we need to use long strings always.
+        #if isinstance(val, str) and len(val) < 255:
+        #    coding += 'cB:'
+        #    packing_list += ['s'] + list(pack_str(val))
+        if isinstance(val, str):
+            coding += 'cL:'
+            packing_list += ['S'] + list(pack_str(val))
+        elif isinstance(val, (int, long)):
+            amqp_code, pack_code = _fit_integer(val)
+            coding += 'c' + pack_code
+            packing_list += [amqp_code, val]
+        elif isinstance(val, float):
+            coding += 'cd'
+            packing_list += ['d', val]
+        elif isinstance(val, datetime.datetime):
+            coding += 'cQ'
+            packing_list += ['T', datetime.time.mktime(val.timetuple())]
+        elif isinstance(val, dict):
+            coding += 'cL:'
+            packing_list += ['F'] + list(pack_table(val))
+        else:
+            pass
+
+    buf = pack_splice(coding, packing_list)
+    return (len(buf), buf)
+
+def unpack_table(size, buf):
+    table_data = {}
+
+    while buf:
+        (fields, consumed_bytes) = unpack_splice('B:', buf,
+                return_consumed_bytes=True)
+        field_name = unpack_str(*fields)
+        buf = buf[consumed_bytes:]
+
+        table_datatype = buf[0]
+        coding = 'x' + TABLE_DATATYPE_CODING[table_datatype][0]
+        unpacker = TABLE_DATATYPE_CODING[table_datatype][1]
+
+        (fields, consumed_bytes) = unpack_splice(coding, buf,
+                return_consumed_bytes=True)
+        buf = buf[consumed_bytes:]
+        val = unpacker(*fields)
+
+        table_data[field_name] = val
+
+    return table_data
+
+
+TABLE_DATATYPE_CODING = {
+    't': ('B', bool),
+    'b': ('b', int),
+    'B': ('B', int),
+    'U': ('h', int),
+    'u': ('H', int),
+    'I': ('l', int),
+    'i': ('L', int),
+    'L': ('q', int),
+    'l': ('Q', int),
+    'f': ('f', float),
+    'd': ('d', float),
+    'D': ('BL', unpack_decimal),
+    's': ('B:', unpack_str),
+    'S': ('L:', unpack_str),
+    'A': ('L:', unpack_fieldarray),
+    'T': ('Q', datetime.datetime.fromtimestamp),
+    'F': ('Q:', unpack_table),
+    'V': ('', lambda: None) }
+
+INT_RANGES = [
+        (              -0x80,               0x7F, 'b', 'b'),
+        (               0x00,               0xFF, 'B', 'B'),
+        (            -0x8000,             0x7FFF, 'U', 'h'),
+        (             0x0000,             0xFFFF, 'u', 'H'),
+        (        -0x80000000,         0x7FFFFFFF, 'I', 'l'),
+        (         0x00000000,         0xFFFFFFFF, 'i', 'L'),
+        ( 0x8000000000000000, 0x7FFFFFFFFFFFFFFF, 'L', 'q'),
+        ( 0x0000000000000000, 0xFFFFFFFFFFFFFFFF, 'l', 'Q') ]
+
+def _fit_integer(val):
+    for lo, hi, amqp_code, pack_code in INT_RANGES:
+        if val >= lo and val <= hi:
+            return (amqp_code, pack_code)
+    raise OverflowError("Can't encode an integer that large.")

File amqpev/channel_fsm.py

View file
  • Ignore whitespace
+import sys
+from eventlet.channel import channel
+from spec.spec import spec_0_8 as spec
+import errors
+
+
+def connection_init(ch, auth="guest\x00guest", locale="en_US", vhost="/",
+        insist=False):
+    """
+    Connection initialization logic. Assumes that the transport has just been
+    opened, and has not yet received a start method from the server. Will start
+    and secure the channel and open the given vhost.
+    """
+    ch.wait_method(spec.connection.start)
+    # TODO: Do something with connection params
+    ch.send_method(spec.connection.start_ok,
+            client_properties={
+                'platform': "Python " + sys.version.split(' ')[0],
+                'product': "AMQPev Client",
+                'version': "0.0.9",
+                'copyright': "Copyright (c) 2009 Kyle Schaffrick",
+                'information': "Non-blocking AMQP client for Eventlet" },
+            mechanism="PLAIN",
+            response=auth,
+            locale=locale)
+
+    while True:
+        rx_m = ch.wait_method(spec.connection.tune,
+                spec.connection.secure)
+
+        if rx_m.method == spec.connection.secure:
+            # TODO: Handle challenge-response
+            ch.send_method(spec.connection.secure_ok, response="")
+            rx_m = ch.wait_method(spec.connection.tune)
+
+        elif rx_m.method == spec.connection.tune:
+            tune_params = rx_m.args.copy()
+            ch.send_method(spec.connection.tune_ok, **tune_params)
+            ch.send_method(spec.connection.open,
+                    virtual_host=vhost,
+                    capabilities="",
+                    insist=insist)
+            break
+
+    rx_m = ch.wait_method(spec.connection.open_ok, spec.connection.redirect)
+
+    if rx_m.method == spec.connection.open_ok:
+        return ('open', tune_params)
+    elif rx_m.method == spec.connection.redirect:
+        redirected_to = rx_m.args['host']
+        ch.send_method(spec.connection.close)
+        ch.wait_method(spec.connection.close_ok)
+        return ('redirected', redirected_to)

File amqpev/errors.py

View file
  • Ignore whitespace
+class AMQPError(RuntimeError):
+    """
+    Any error specific to AMQP itself or AMQPev's client implementation.
+    """
+    pass
+
+class ChannelClosedError(AMQPError):
+    """ The channel has been closed """
+    pass
+
+class BrokerClosedChannel(ChannelClosedError):
+    """ The broker closed the connection """
+    pass
+
+class ClientClosedChannel(ChannelClosedError):
+    """ The client closed the connection """
+    pass
+
+class AMQPChannelException(BrokerClosedChannel):
+    """
+    The broker we are connected to issued a protocol exception on our
+    channel.
+    """
+    pass
+
+class ConnectionClosedError(ChannelClosedError):
+    """ The connection has been closed """
+    pass
+
+class BrokerClosedConnection(ConnectionClosedError):
+    """ The broker closed the connection """
+    pass
+
+class ClientClosedConnection(ConnectionClosedError):
+    """ The client closed the connection """
+    pass
+
+class AMQPConnectionException(BrokerClosedConnection):
+    """
+    The broker we are connected to issued a protocol exception on our
+    connection.
+    """
+    pass
+
+class FrameTypeUnexpectedError(AMQPError):
+    """
+    The broker sent a frame type that was not expected, given AMQPev's current
+    knowledge of the state of the channel.
+    """
+    pass
+
+class UnknownChannelReceivedError(AMQPError):
+    """ The broker sent a frame that is on a channel we do not know about. """
+    pass
+
+class ConnectionInitError(AMQPError):
+    """ Unable to initialize the connection """
+    pass
+
+class ChannelCreateError(AMQPError):
+    """ Unable to create a channel """
+    pass
+
+class AMQPMethodParameterError(AMQPError):
+    """ Attempted to marshall a method without all required parameters. """
+    pass

File amqpev/frame.py

View file
  • Ignore whitespace
+import struct
+import errors
+import binops
+from spec.spec import spec_0_8 as spec
+
+
+def flatten_once(l):
+    result = []
+    for elem in l:
+        if isinstance(elem, (list, tuple)):
+            result.extend(elem)
+        else:
+            result.append(elem)
+    return result
+
+        
+def mirror_dict(d):
+    d2 = {}
+    for k, v in d.iteritems():
+        d2[k] = v
+        d2[v] = k
+    return d2
+
+
+FRAME_TYPE = mirror_dict({
+    1: 'method',
+    2: 'header',
+    3: 'body',
+    4: 'oob-method',
+    5: 'oob-header',
+    6: 'oob-body',
+    7: 'trace',
+    8: 'heartbeat'})
+
+
+class Frame(object):
+    class IncompleteFrameError(errors.AMQPError): pass
+
+    def marshall_amqp(self):
+        out = struct.pack('!BHL', FRAME_TYPE[self.type], self.channel,
+                len(self.payload))
+        out += self.payload
+        out += '\xCE'
+        return out
+
+    @classmethod
+    def unmarshall_amqp(cls, buf):
+        HEADER_LEN = 7
+
+        if len(buf) < HEADER_LEN:
+            raise cls.IncompleteFrameError("Incomplete frame header")
+
+        (header, rest) = (buf[:HEADER_LEN], buf[HEADER_LEN:])
+        (type, channel, payload_len) = struct.unpack('!BHL', header)
+
+        if len(rest) < payload_len + 1:
+            raise cls.IncompleteFrameError("Incomplete payload (need %i, got"
+                    " %i)" % (payload_len, len(rest)))
+
+        (payload, framing_byte) = (rest[:payload_len], rest[payload_len])
+        consumed_bytes = HEADER_LEN + payload_len + 1
+
+        frame = cls()
+        frame.type = FRAME_TYPE[type]
+        frame.channel = channel
+        frame.payload = payload
+
+        frame_class = FRAME_UNMARSHALL_DISPATCH.get(frame.type)
+        if frame_class is not None:
+            return (frame_class.unmarshall_amqp(frame), consumed_bytes)
+        else:
+            return (frame, consumed_bytes)
+
+    def marshall_dict(self):
+        return {
+            'channel': self.channel,
+            'type': self.type,
+            'payload': self.payload }
+
+
+class MethodFrame(Frame):
+    type = 'method'
+
+    def __init__(self, method, **kwargs):
+        self.method = method
+        self.args = kwargs
+
+    def marshall_amqp(self):
+        field_desc = self.method['fields']
+        packing_list = [self.method['class_id'], self.method['id']]
+        coding = "HH"
+        
+        for field in field_desc:
+            try:
+                val = self.args[ field['name'] ]
+            except KeyError:
+                raise errors.AMQPMethodParameterError("Parameter %s.%s is"
+                        " required but was not provided."
+                        % (self.method['qname'], field['name']))
+
+            if field['pack'] is not None:
+                val = getattr(binops, field['pack'])(val)
+            packing_list.append(val)
+            coding += field['code']
+
+        packing_list = flatten_once(packing_list)
+        self.payload = binops.pack_splice(coding, packing_list)
+
+        return Frame.marshall_amqp(self)
+
+    @classmethod
+    def unmarshall_amqp(cls, frame):
+        HEADER_LEN = 4
+        buf = frame.payload
+        (header, rest) = (buf[:HEADER_LEN], buf[HEADER_LEN:])
+        (class_id, method_id) = struct.unpack("!HH", header)
+
+        method = spec[class_id][method_id]
+        coding = ''.join( field['code'] for field in method['fields'] )
+        payload_contents = binops.unpack_splice(coding, rest)
+        args = {}
+
+        for field in method['fields']:
+            nr_items = len(field['code'])
+            val = payload_contents[:nr_items]
+            payload_contents = payload_contents[nr_items:]
+            if field['unpack'] is not None:
+                val = getattr(binops, field['unpack'])(*val)
+            elif nr_items == 1:
+                val = val[0]
+            args[ field['name'] ] = val
+
+        new_frame = cls(method, **args)
+        new_frame.channel = frame.channel
+        return new_frame
+
+    def marshall_dict(self):
+        self.payload = (self.method['qname'], self.args)
+        return Frame.marshall_dict(self)
+
+
+class HeaderFrame(Frame):
+    type = 'header'
+
+    def __init__(self, class_, body_len, headers):
+        self.class_ = class_
+        self.headers = headers
+        self.body_len = body_len
+
+    def marshall_amqp(self):
+        prop_desc = self.class_.properties
+        packing_list = [self.class_.id, 0, self.body_len]
+        coding = "HHQ"
+        flags = 0
+
+        prop_packing_list = []
+        prop_coding = ""
+        for prop in prop_desc:
+            name = prop['name']
+            if name not in self.headers:
+                continue
+
+            val = self.headers[name]
+            # Bit properties do not go in the property list, only in the
+            # flags.
+            if prop['code'] == '.':
+                if val: flags |= prop['bitmask']
+                continue
+
+            flags |= prop['bitmask']
+
+            if prop['pack'] is not None:
+                val = getattr(binops, prop['pack'])(val)
+            prop_packing_list.append(val)
+            prop_coding += prop['code']
+
+        # Set LSB on all flag words except the last one
+        nr_words = self.class_.property_flag_words
+        for i in range(1, nr_words):
+            flags |= 1 << (i * 16)
+
+        # Shift out each word. Push them on the front of the packing list
+        # because the flags are stored big-endian.
+        for i in range(nr_words):
+            prop_packing_list.insert(0, 0xFFFF & flags)
+            flags >>= 8
+        prop_coding = 'H' * nr_words + prop_coding
+
+        packing_list.extend(prop_packing_list)
+        coding += prop_coding
+
+        packing_list = flatten_once(packing_list)
+        self.payload = binops.pack_splice(coding, packing_list)
+
+        return Frame.marshall_amqp(self)
+
+    @classmethod
+    def unmarshall_amqp(cls, frame):
+        HEADER_LEN = 12
+        buf = frame.payload
+        (header, rest) = (buf[:HEADER_LEN], buf[HEADER_LEN:])
+        (class_id, body_len) = struct.unpack("!HxxQ", header)
+
+        class_ = spec[class_id]
+        nr_words = class_.property_flag_words
+        flag_coding = '!' + 'H' * nr_words
+        (flag_data, rest) = (rest[:nr_words*2], rest[nr_words*2:])
+        flag_words = struct.unpack(flag_coding, flag_data)
+
+        flags = reduce(lambda t, w: t << 16 | w, flag_words, 0)
+
+        props_present = [ prop for prop in class_.properties
+                if prop['bitmask'] & flags ]
+        coding = ''.join( prop['code'] for prop in props_present )
+        payload_contents = binops.unpack_splice(coding, rest)
+        headers = {}
+
+        for prop in props_present:
+            name = prop['name']
+
+            if prop['code'] == '.':
+                headers[name] = True
+                continue
+
+            nr_items = len(prop['code'])
+            val = payload_contents[:nr_items]
+            payload_contents = payload_contents[nr_items:]
+            if prop['unpack'] is not None:
+                val = getattr(binops, prop['unpack'])(*val)
+            elif nr_items == 1:
+                val = val[0]
+
+            headers[name] = val
+
+        new_frame = cls(class_, body_len, headers)
+        new_frame.channel = frame.channel
+        return new_frame
+
+    def marshall_dict(self):
+        self.payload = (self.class_.name, self.body_len, self.headers)
+        return Frame.marshall_dict(self)
+
+
+class BodyFrame(Frame):
+    type = 'body'
+
+    def __init__(self, payload):
+        self.payload = payload
+
+    @classmethod
+    def unmarshall_amqp(cls, frame):
+        new_frame = cls(frame.payload)
+        new_frame.channel = frame.channel
+        return new_frame
+
+
+FRAME_UNMARSHALL_DISPATCH = {
+    'method': MethodFrame,
+    'header': HeaderFrame,
+    'body': BodyFrame }

File amqpev/protocol.py

View file
  • Ignore whitespace
+import itertools
+import logging
+from eventlet.coros import semaphore, queue, event, api as ev_api
+from spec.spec import spec_0_8 as spec
+
+from frame import MethodFrame, HeaderFrame, BodyFrame
+import channel_fsm
+import errors
+
+log = logging.getLogger("amqpev.protocol")
+
+
+class BaseConnection(object):
+    def __init__(self, transport, vhost="/", insist=False):
+        self._transport = transport
+        self._channel_queues = {}
+        self._mux = Multiplexer(self._channel_queues)
+        self._chan_zero = self._create_channel(0)
+        self.runner_exception = None
+
+        self._transport.open()
+        ev_api.spawn(self._demux_runner)
+        ev_api.spawn(self._mux_runner)
+
+        self._state = 'open'
+        (result, params) = channel_fsm.connection_init(self._chan_zero,
+                vhost=vhost, insist=insist)
+
+        self.tune_params = params
+
+        if result == 'open':
+            self._state = 'open'
+        elif result == 'redirected':
+            self._state = 'broker-closed'
+
+    def close(self):
+        self._chan_zero.sync_method(spec.connection.close,
+                (spec.connection.close_ok,), reply_code=0,
+                reply_text="Closing", class_id=0, method_id=0)
+        self._transport.close()
+        self._state = 'client-closed'
+        self.runner_exception = errors.ClientClosedConnection()
+
+    def _demux_runner(self):
+        while self._state == 'open':
+            try:
+                frame = self._transport.recv_frame()
+                self._mux.demux_frame(frame)
+            except errors.ConnectionClosedError, exc:
+                self.runner_exception = exc
+                log.exception("Connection down in demux runner, exiting.")
+                self._state = 'closed'
+            except Exception, exc:
+                self.runner_exception = exc
+                log.exception("Caught exception in demux runner.")
+
+            if self.runner_exception is not None:
+                self._mux.all_channel_exception(self.runner_exception)
+                self.runner_exception = None
+
+    def _mux_runner(self):
+        while self._state == 'open':
+            try:
+                frame = self._mux.get_next_muxed_frame()
+                self._transport.send_frame(frame)
+            except errors.ConnectionClosedError, exc:
+                self.runner_exception = exc
+                log.exception("Connection down in mux runner, exiting.")
+                self._state = 'closed'
+            except Exception, exc:
+                self.runner_exception = exc
+                log.exception("Caught exception in mux runner.")
+
+            if self.runner_exception is not None:
+                self._mux.all_channel_exception(self.runner_exception)
+                self.runner_exception = None
+
+    def _create_channel(self, id=None):
+        if id is None:
+            free_chan_nrs = (set(range(max(self._channel_queues) + 2)) -
+                    set(self._channel_queues))
+            id = list(free_chan_nrs)[0]
+
+        elif id in self._channel_queues:
+            raise ChannelCreateError("Channel already exists")
+
+        cfq = ChannelFrameQueue(self._mux)
+        self._channel_queues[id] = cfq
+        return Channel(id, self, cfq)
+
+    def _drop_channel(self, id):
+        self._channel_queues.pop(id)
+
+
+class Connection(BaseConnection):
+    def channel(self):
+        new_chan = self._create_channel()
+        try:
+            new_chan.sync_method(spec.channel.open, (spec.channel.open_ok,),
+                    out_of_band="")
+            return new_chan
+        except:
+            self._drop_channel(new_chan.id)
+            raise
+
+    def close_channel(self, ch):
+        ch.sync_method(spec.channel.close, (spec.channel.close_ok,),
+                reply_code=0, reply_text="Closing", class_id=0, method_id=0)
+        self._drop_channel(ch.id)
+
+
+class BaseChannel(object):
+    def __init__(self, id, connection, channel_queue):
+        self._id = id
+        self._frame_queue = channel_queue
+        self._open = True
+        self._connection = connection
+
+    def _recv_frame(self):
+        if not self._open:
+            raise errors.ChannelClosedError()
+
+        frame = self._frame_queue.recv_frame()
+        assert frame.channel == self._id, ("Frame delivered to"
+                " incorrect ChannelFrameQueue")
+
+        return frame
+
+    def _peek_frames(self):
+        if not self._open:
+            raise errors.ChannelClosedError()
+        return self._frame_queue.peek_frames()
+
+    def _drop_frame(self, index):
+        if not self._open:
+            raise errors.ChannelClosedError()
+        return self._frame_queue.drop_frame(index)
+
+    def _send_frame(self, frame):
+        if not self._open:
+            raise errors.ChannelClosedError()
+
+        frame.channel = self._id
+        self._frame_queue.send_frame(frame)
+
+    def send_method(self, method, **kwargs):
+        self._send_frame(MethodFrame(method, **kwargs))
+
+    def recv_method(self, discard=False):
+        frame = self._recv_frame()
+        while True:
+            if frame.type == 'method':
+                return frame
+            elif discard:
+                continue
+            else:
+                raise errors.FrameTypeUnexpectedError()
+
+    def wait_method(self, *methods):
+        for index, frame in self._peek_frames():
+            if frame.type == 'method' and frame.method in methods:
+                self._drop_frame(index)
+                return frame
+
+    def sync_method(self, method, reply_methods, **kwargs):
+        self.send_method(method, **kwargs)
+        return self.wait_method(*reply_methods)
+
+    def send_content(self, class_, headers, body, body_len=None):
+        if isinstance(body, str):
+            body = [body]
+            body_len = len(body[0])
+        elif body_len is None:
+            body = [''.join(body)]
+            body_len = len(body[0])
+
+        self._send_frame(HeaderFrame(class_, body_len, headers))
+
+        mtu = self._connection.tune_params['frame_max']
+        segment_buf = ""
+
+        for hunk in body:
+            segment_buf += hunk
+
+            while len(segment_buf) >= mtu:
+                (segment, rest) = (segment_buf[:mtu], segment_buf[mtu:])
+                self._send_frame(BodyFrame(segment))
+                segment_buf = rest
+
+        if len(segment_buf):
+            self._send_frame(BodyFrame(segment_buf))
+
+    def recv_content(self):
+        def next_frame_is(f_type):
+            (_, next_frame) = self._peek_frames().next()
+            return next_frame.type == f_type
+
+        if not next_frame_is('header'):
+            raise errors.FrameTypeUnexpectedError("recv_content expecting a"
+                    " header frame, got %s instead." % next_frame.type)
+
+        header_frame = self._recv_frame()
+        body = ""
+
+        while len(body) < header_frame.body_len and next_frame_is('body'):
+            body += self._recv_frame().payload
+
+        return (header_frame.headers, header_frame.body_len, body)
+
+    @property
+    def id(self):
+        return self._id
+
+
+class Channel(BaseChannel):
+    def close(self):
+        self._connection.close_channel(self)
+        self._open = False
+
+
+class Multiplexer(object):
+    """
+    Implements channel frame multiplexing for a connection.
+    """
+    def __init__(self, channel_queues):
+        self._channel_queues = channel_queues
+        self._last_channel_idx = 0
+
+        self.tx_sem = semaphore()
+        self.tx_enqueue_notify = self.tx_sem.release
+        self.tx_wait_if_empty = self.tx_sem.acquire
+
+    def get_next_muxed_frame(self):
+        """
+        Get the next frame to transmit in the multiplex sequence. Blocks if
+        there are no frames queued for sending.
+        """
+        self.tx_wait_if_empty()
+        chan_nrs = sorted(self._channel_queues.keys())
+        assert len(chan_nrs) > 0, "We should have at least one channel by now."
+        result = None
+        chan_idx = self._last_channel_idx
+
+        while result is None:
+            chan_idx = (chan_idx + 1) % len(chan_nrs)
+            chan_nr = chan_nrs[chan_idx]
+            queue = self._channel_queues[chan_nr]
+
+            result = queue.get_next_tx_frame()
+            if result is not None:
+                result.channel = chan_nr
+
+        self._last_channel_idx = chan_idx
+        return result
+
+    def demux_frame(self, frame):
+        """
+        Demultiplex a received frame.
+        """
+        try:
+            self._channel_queues[frame.channel].put_next_rx_frame(frame)
+        except KeyError:
+            raise errors.UnknownChannelReceivedError("Recieved on unknown"
+                    " channel %i" % frame.channel)
+
+    def all_channel_exception(self, exc):
+        for cq in self._channel_queues.itervalues():
+            cq.raise_(exc)
+
+
+class ChannelFrameQueue(object):
+    """
+    Implements queueing behavior for a single channel. Contains the send and
+    receive frame queues for a single channel.
+    """
+    def __init__(self, multiplexer, send_queue_max=10):
+        self.mux = multiplexer
+        self.send_queue_sem = semaphore(limit=send_queue_max)
+        self._send_frame_queue = []
+        self._recv_frame_queue = queue()
+        self._recv_pushback = []
+
+        self._send_queue_wait = self.send_queue_sem.release
+        self._send_dequeue_notify = self.send_queue_sem.acquire
+
+    def send_frame(self, frame):
+        """
+        Queues a frame to be sent on this channel. This call will wait if the
+        send queue length exceeds `send_queue_max` frames.
+        """
+        # We append the frame to our send queue, and then notify the mux.
+        self._send_queue_wait()
+        self._send_frame_queue.append(frame)
+        self.mux.tx_enqueue_notify()
+
+    def get_next_tx_frame(self):
+        """
+        Gets the next frame in the transmit queue, or None if it is empty.
+        """
+        if len(self._send_frame_queue):
+            self._send_dequeue_notify()
+            return self._send_frame_queue.pop(0)
+
+    def put_next_rx_frame(self, frame):
+        """
+        Puts the next frame in the receive queue.
+        """
+        self._recv_frame_queue.send(frame)
+
+    def raise_(self, exc):
+        """
+        Raise an exception on the channel.
+        """
+        self._recv_frame_queue.send(exc=exc)
+
+    def recv_frame(self):
+        """
+        Receive a frame from the channel. This dequeues the frame, which will
+        wait if the queue is empty.
+        """
+        # If there are any frames in our pushback buffer, return those first.
+        if self._recv_pushback:
+            return self._recv_pushback.pop(0)
+        else:
+            return self._recv_frame_queue.wait()
+
+    def peek_frames(self):
+        """
+        Return an iterator over the frames in our queue so that higher level
+        APIs may selectively accept frames. The iterator yields tuples of
+        (index, frame) such that drop_frame(index) will delete frame from the
+        receive queue.
+
+        Calls to drop_frame() or recv_frame() will invalidate the indices
+        returned by this method.
+        """
+        while self._recv_frame_queue.ready():
+            self._recv_pushback.append(self._recv_frame_queue.wait())
+
+        index = 0
+        for frame in self._recv_pushback:
+            yield index, frame
+            index += 1
+
+        while True:
+            frame = self._recv_frame_queue.wait()
+            self._recv_pushback.append(frame)
+            yield index, frame
+            index += 1
+
+    def drop_frame(self, index):
+        """
+        Drop a frame from the receive queue. The index is obtained from the
+        peek_frames method. Calling it with an invalidated index is undefined
+        behavior (see peek_frames doc).
+        """
+        self._recv_pushback.pop(index)

File amqpev/spec/__init__.py

  • Ignore whitespace
Empty file added.

File amqpev/spec/amqp_0-8.xml

View file
  • Ignore whitespace
+<?xml version="1.0"?>
+
+<!--
+Copyright Notice
+================
+© Copyright JPMorgan Chase Bank, Cisco Systems, Inc., Envoy Technologies Inc.,
+iMatix Corporation, IONA� Technologies, Red Hat, Inc., TWIST Process
+Innovations, and 29West Inc. 2006. All rights reserved.
+
+License
+=======
+JPMorgan Chase Bank, Cisco Systems, Inc., Envoy Technologies Inc., iMatix
+Corporation, IONA� Technologies, Red Hat, Inc., TWIST Process Innovations, and
+29West Inc. (collectively, the "Authors") each hereby grants to you a
+worldwide, perpetual, royalty-free, nontransferable, nonexclusive license to
+(i) copy, display, and implement the Advanced Messaging Queue Protocol ("AMQP")
+Specification and (ii) the Licensed Claims that are held by the Authors, all
+for the purpose of implementing the Advanced Messaging Queue Protocol
+Specification. Your license and any rights under this Agreement will terminate
+immediately without notice from any Author if you bring any claim, suit,
+demand, or action related to the Advanced Messaging Queue Protocol
+Specification against any Author.  Upon termination, you shall destroy all
+copies of the Advanced Messaging Queue Protocol Specification in your
+possession or control.
+
+As used hereunder, "Licensed Claims" means those claims of a patent or patent
+application, throughout the world, excluding design patents and design
+registrations, owned or controlled, or that can be sublicensed without fee and
+in compliance with the requirements of this Agreement, by an Author or its
+affiliates now or at any future time and which would necessarily be infringed
+by implementation of the Advanced Messaging Queue Protocol Specification. A
+claim is necessarily infringed hereunder only when it is not possible to avoid
+infringing it because there is no plausible non-infringing alternative for
+implementing the required portions of the Advanced Messaging Queue Protocol
+Specification. Notwithstanding the foregoing, Licensed Claims shall not include
+any claims other than as set forth above even if contained in the same patent
+as Licensed Claims; or that read solely on any implementations of any portion
+of the Advanced Messaging Queue Protocol Specification that are not required by
+the Advanced Messaging Queue Protocol Specification, or that, if licensed,
+would require a payment of royalties by the licensor to unaffiliated third
+parties.  Moreover, Licensed Claims shall not include (i) any enabling
+technologies that may be necessary to make or use any Licensed Product but are
+not themselves expressly set forth in the Advanced Messaging Queue Protocol
+Specification (e.g., semiconductor manufacturing technology, compiler
+technology, object oriented technology, networking technology, operating system
+technology, and the like); or (ii) the implementation of other published
+standards developed elsewhere and merely referred to in the body of the
+Advanced Messaging Queue Protocol Specification, or (iii) any Licensed Product
+and any combinations thereof the purpose or function of which is not required
+for compliance with the Advanced Messaging Queue Protocol Specification. For
+purposes of this definition, the Advanced Messaging Queue Protocol
+Specification shall be deemed to include both architectural and interconnection
+requirements essential for interoperability and may also include supporting
+source code artifacts where such architectural, interconnection requirements
+and source code artifacts are expressly identified as being required or
+documentation to achieve compliance with the Advanced Messaging Queue Protocol
+Specification.
+
+As used hereunder, "Licensed Products" means only those specific portions of
+products (hardware, software or combinations thereof) that implement and are
+compliant with all relevant portions of the Advanced Messaging Queue Protocol
+Specification.
+
+The following disclaimers, which you hereby also acknowledge as to any use you
+may make of the Advanced Messaging Queue Protocol Specification:
+
+THE ADVANCED MESSAGING QUEUE PROTOCOL SPECIFICATION IS PROVIDED "AS IS," AND
+THE AUTHORS MAKE NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED,
+INCLUDING, BUT NOT LIMITED TO, WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
+PARTICULAR PURPOSE, NON-INFRINGEMENT, OR TITLE; THAT THE CONTENTS OF THE
+ADVANCED MESSAGING QUEUE PROTOCOL SPECIFICATION ARE SUITABLE FOR ANY PURPOSE;
+NOR THAT THE IMPLEMENTATION OF THE ADVANCED MESSAGING QUEUE PROTOCOL
+SPECIFICATION WILL NOT INFRINGE ANY THIRD PARTY PATENTS, COPYRIGHTS, TRADEMARKS
+OR OTHER RIGHTS.
+
+THE AUTHORS WILL NOT BE LIABLE FOR ANY DIRECT, INDIRECT, SPECIAL, INCIDENTAL OR
+CONSEQUENTIAL DAMAGES ARISING OUT OF OR RELATING TO ANY USE, IMPLEMENTATION OR
+DISTRIBUTION OF THE ADVANCED MESSAGING QUEUE PROTOCOL SPECIFICATION.
+
+The name and trademarks of the Authors may NOT be used in any manner, including
+advertising or publicity pertaining to the Advanced Messaging Queue Protocol
+Specification or its contents without specific, written prior permission. Title
+to copyright in the Advanced Messaging Queue Protocol Specification will at all
+times remain with the Authors.
+
+No other rights are granted by implication, estoppel or otherwise.
+
+Upon termination of your license or rights under this Agreement, you shall
+destroy all copies of the Advanced Messaging Queue Protocol Specification in
+your possession or control.
+
+Trademarks
+==========
+"JPMorgan", "JPMorgan Chase", "Chase", the JPMorgan Chase logo and the Octagon
+Symbol are trademarks of JPMorgan Chase & Co.
+
+IMATIX and the iMatix logo are trademarks of iMatix Corporation sprl.
+
+IONA, IONA Technologies, and the IONA logos are trademarks of IONA Technologies
+PLC and/or its subsidiaries.
+
+LINUX is a trademark of Linus Torvalds. RED HAT and JBOSS are registered
+trademarks of Red Hat, Inc. in the US and other countries.
+
+Java, all Java-based trademarks and OpenOffice.org are trademarks of Sun
+Microsystems, Inc. in the United States, other countries, or both.
+
+Other company, product, or service names may be trademarks or service marks of
+others.
+
+Links to full AMQP specification:
+=================================
+http://www.envoytech.org/spec/amq/
+http://www.iona.com/opensource/amqp/
+http://www.redhat.com/solutions/specifications/amqp/
+http://www.twiststandards.org/tiki-index.php?page=AMQ
+http://www.imatix.com/amqp
+
+-->
+
+<amqp major="8" minor="0" port="5672" comment="AMQ protocol 0.80">
+  AMQ Protocol 0.80
+  <!--
+  ======================================================
+  ==       CONSTANTS
+  ======================================================
+  -->
+  <constant name="frame method" value="1"/>
+  <constant name="frame header" value="2"/>
+  <constant name="frame body" value="3"/>
+  <constant name="frame oob method" value="4"/>
+  <constant name="frame oob header" value="5"/>
+  <constant name="frame oob body" value="6"/>
+  <constant name="frame trace" value="7"/>
+  <constant name="frame heartbeat" value="8"/>
+  <constant name="frame min size" value="4096"/>
+  <constant name="frame end" value="206"/>
+  <constant name="reply success" value="200">
+    Indicates that the method completed successfully. This reply code is
+    reserved for future use - the current protocol design does not use positive
+    confirmation and reply codes are sent only in case of an error.
+  </constant>
+  <constant name="not delivered" value="310" class="soft error">
+    The client asked for a specific message that is no longer available.  The
+    message was delivered to another client, or was purged from the queue for
+    some other reason.
+  </constant>
+  <constant name="content too large" value="311" class="soft error">
+    The client attempted to transfer content larger than the server could
+    accept at the present time.  The client may retry at a later time.
+  </constant>
+  <constant name="connection forced" value="320" class="hard error">
+    An operator intervened to close the connection for some reason.  The client
+    may retry at some later date.
+  </constant>
+  <constant name="invalid path" value="402" class="hard error">
+    The client tried to work with an unknown virtual host or cluster.
+  </constant>
+  <constant name="access refused" value="403" class="soft error">
+    The client attempted to work with a server entity to which it has no  due
+    to security settings.
+  </constant>
+  <constant name="not found" value="404" class="soft error">
+    The client attempted to work with a server entity that does not exist.
+  </constant>
+  <constant name="resource locked" value="405" class="soft error">
+    The client attempted to work with a server entity to which it has no access
+    because another client is working with it.
+  </constant>
+  <constant name="frame error" value="501" class="hard error">
+    The client sent a malformed frame that the server could not decode.  This
+    strongly implies a programming error in the client.
+  </constant>
+  <constant name="syntax error" value="502" class="hard error">
+    The client sent a frame that contained illegal values for one or more
+    fields.  This strongly implies a programming error in the client.
+  </constant>
+  <constant name="command invalid" value="503" class="hard error">
+    The client sent an invalid sequence of frames, attempting to perform an
+    operation that was considered invalid by the server. This usually implies a
+    programming error in the client.
+  </constant>
+  <constant name="channel error" value="504" class="hard error">
+    The client attempted to work with a channel that had not been correctly
+    opened.  This most likely indicates a fault in the client layer.
+  </constant>
+  <constant name="resource error" value="506" class="hard error">
+    The server could not complete the method because it lacked sufficient
+    resources. This may be due to the client creating too many of some type of
+    entity.
+  </constant>
+  <constant name="not allowed" value="530" class="hard error">
+    The client tried to work with some entity in a manner that is prohibited by
+    the server, due to security settings or by some other criteria.
+  </constant>
+  <constant name="not implemented" value="540" class="hard error">
+    The client tried to use functionality that is not implemented in the
+    server.
+  </constant>
+  <constant name="internal error" value="541" class="hard error">
+    The server could not complete the method because of an internal error.  The
+    server may require intervention by an operator in order to resume normal
+    operations.
+  </constant>
+  <!--
+  ======================================================
+  ==       DOMAIN TYPES
+  ======================================================
+  -->
+  <domain name="access ticket" type="short">
+    access ticket granted by server
+    <doc>
+      An access ticket granted by the server for a certain set of access rights
+      within a specific realm. Access tickets are valid within the channel
+      where they were created, and expire when the channel closes.
+    </doc>
+    <assert check="ne" value="0"/>
+  </domain>
+  <domain name="class id" type="short"/>
+  <domain name="consumer tag" type="shortstr">
+    consumer tag
+    <doc>
+      Identifier for the consumer, valid within the current connection.
+    </doc>
+    <rule implement="MUST">
+      The consumer tag is valid only within the channel from which the consumer
+      was created. I.e. a client MUST NOT create a consumer in one channel and
+      then use it in another.
+    </rule>
+  </domain>
+  <domain name="delivery tag" type="longlong">
+    server-assigned delivery tag
+    <doc>
+      The server-assigned and channel-specific delivery tag
+    </doc>
+    <rule implement="MUST">
+      The delivery tag is valid only within the channel from which the message
+      was received.  I.e. a client MUST NOT receive a message on one channel
+      and then acknowledge it on another.
+    </rule>
+    <rule implement="MUST">
+      The server MUST NOT use a zero value for delivery tags.  Zero is reserved
+      for client use, meaning "all messages so far received".
+    </rule>
+  </domain>
+  <domain name="exchange name" type="shortstr">
+    exchange name
+    <doc>
+      The exchange name is a client-selected string that identifies the
+      exchange for publish methods.  Exchange names may consist of any mixture
+      of digits, letters, and underscores.  Exchange names are scoped by the
+      virtual host.
+    </doc>
+    <assert check="length" value="127"/>
+  </domain>
+  <domain name="known hosts" type="shortstr">
+    list of known hosts
+    <doc>
+      Specifies the list of equivalent or alternative hosts that the server
+      knows about, which will normally include the current server itself.
+      Clients can cache this information and use it when reconnecting to a
+      server after a failure.
+    </doc>
+    <rule implement="MAY">
+      The server MAY leave this field empty if it knows of no other hosts than
+      itself.
+    </rule>
+  </domain>
+  <domain name="method id" type="short"/>
+  <domain name="no ack" type="bit">
+    no acknowledgement needed
+    <doc>
+      If this field is set the server does not expect acknowledgments for
+      messages.  That is, when a message is delivered to the client the server
+      automatically and silently acknowledges it on behalf of the client.  This
+      functionality increases performance but at the cost of reliability.
+      Messages can get lost if a client dies before it can deliver them to the
+      application.
+    </doc>
+  </domain>
+  <domain name="no local" type="bit">
+    do not deliver own messages
+    <doc>
+      If the no-local field is set the server will not send messages to the
+      client that published them.
+    </doc>
+  </domain>
+  <domain name="path" type="shortstr">
+    <doc>
+      Must start with a slash "/" and continue with path names separated by
+      slashes. A path name consists of any combination of at least one of
+      [A-Za-z0-9] plus zero or more of [.-_+!=:].
+    </doc>
+    <assert check="notnull"/>
+    <assert check="syntax" rule="path"/>
+    <assert check="length" value="127"/>
+  </domain>
+  <domain name="peer properties" type="table">
+    <doc>
+      This string provides a set of peer properties, used for identification,
+      debugging, and general information.
+    </doc>
+    <rule implement="SHOULD">
+      The properties SHOULD contain these fields: "product", giving the name of
+      the peer product, "version", giving the name of the peer version,
+      "platform", giving the name of the operating system, "copyright", if
+      appropriate, and "information", giving other general information.
+    </rule>
+  </domain>
+  <domain name="queue name" type="shortstr">
+    queue name
+    <doc>
+      The queue name identifies the queue within the vhost.  Queue names may
+      consist of any mixture of digits, letters, and underscores.
+    </doc>
+    <assert check="length" value="127"/>
+  </domain>
+  <domain name="redelivered" type="bit">
+    message is being redelivered
+    <doc>
+      This indicates that the message has been previously delivered to this or
+      another client.
+    </doc>
+    <rule implement="SHOULD">
+      The server SHOULD try to signal redelivered messages when it can.  When
+      redelivering a message that was not successfully acknowledged, the server
+      SHOULD deliver it to the original client if possible.
+    </rule>
+    <rule implement="MUST">
+      The client MUST NOT rely on the redelivered field but MUST take it as a
+      hint that the message may already have been processed.  A fully robust
+      client must be able to track duplicate received messages on
+      non-transacted, and locally-transacted channels.
+    </rule>
+  </domain>
+  <domain name="reply code" type="short">
+    reply code from server
+    <doc>
+      The reply code. The AMQ reply codes are defined in AMQ RFC 011.
+    </doc>
+    <assert check="notnull"/>
+  </domain>
+  <domain name="reply text" type="shortstr">
+    localised reply text
+    <doc>
+      The localised reply text.  This text can be logged as an aid to resolving
+      issues.
+    </doc>
+    <assert check="notnull"/>
+  </domain>
+  <class name="connection" handler="connection" index="10">
+    <!--
+    ======================================================
+    ==       CONNECTION
+    ======================================================
+    -->
+    work with socket connections
+    <doc>
+      The connection class provides methods for a client to establish a network
+      connection to a server, and for both peers to operate the connection
+      thereafter.
+    </doc>
+    <doc name="grammar">
+      connection          = open-connection *use-connection close-connection
+      open-connection     = C:protocol-header
+                            S:START C:START-OK
+                            *challenge
+                            S:TUNE C:TUNE-OK
+                            C:OPEN S:OPEN-OK | S:REDIRECT
+      challenge           = S:SECURE C:SECURE-OK
+      use-connection      = *channel
+      close-connection    = C:CLOSE S:CLOSE-OK
+      / S:CLOSE C:CLOSE-OK
+    </doc>
+    <chassis name="server" implement="MUST"/>
+    <chassis name="client" implement="MUST"/>
+    <!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
+    <method name="start" synchronous="1" index="10">
+      start connection negotiation
+      <doc>
+        This method starts the connection negotiation process by telling the
+        client the protocol version that the server proposes, along with a list
+        of security mechanisms which the client can use for authentication.
+      </doc>
+      <rule implement="MUST">
+        If the client cannot handle the protocol version suggested by the
+        server it MUST close the socket connection.
+      </rule>
+      <rule implement="MUST">
+        The server MUST provide a protocol version that is lower than or equal
+        to that requested by the client in the protocol header. If the server
+        cannot support the specified protocol it MUST NOT send this method, but
+        MUST close the socket connection.
+      </rule>
+      <chassis name="client" implement="MUST"/>
+      <response name="start-ok"/>
+      <field name="version major" type="octet">
+        protocol major version
+        <doc>
+          The protocol major version that the server agrees to use, which
+          cannot be higher than the client's major version.
+        </doc>
+      </field>
+      <field name="version minor" type="octet">
+        protocol major version
+        <doc>
+          The protocol minor version that the server agrees to use, which
+          cannot be higher than the client's minor version.
+        </doc>
+      </field>
+      <field name="server properties" domain="peer properties">
+        server properties
+      </field>
+      <field name="mechanisms" type="longstr">
+        available security mechanisms
+        <doc>
+          A list of the security mechanisms that the server supports, delimited
+          by spaces.  Currently ASL supports these mechanisms: PLAIN.
+        </doc>
+        <see name="security mechanisms"/>
+        <assert check="notnull"/>
+      </field>
+      <field name="locales" type="longstr">
+        available message locales
+        <doc>
+          A list of the message locales that the server supports, delimited by
+          spaces.  The locale defines the language in which the server will
+          send reply texts.
+        </doc>
+        <rule implement="MUST">
+          All servers MUST support at least the en_US locale.
+        </rule>
+        <assert check="notnull"/>
+      </field>
+    </method>
+    <method name="start-ok" synchronous="1" index="11">
+      select security mechanism and locale
+      <doc>
+        This method selects a SASL security mechanism. ASL uses SASL (RFC2222)
+        to negotiate authentication and encryption.
+      </doc>
+      <chassis name="server" implement="MUST"/>
+      <field name="client properties" domain="peer properties">
+        client properties
+      </field>
+      <field name="mechanism" type="shortstr">
+        selected security mechanism
+        <doc>
+          A single security mechanisms selected by the client, which must be
+          one of those specified by the server.
+        </doc>
+        <rule implement="SHOULD">
+          The client SHOULD authenticate using the highest-level security
+          profile it can handle from the list provided by the server.
+        </rule>
+        <rule implement="MUST">
+          The mechanism field MUST contain one of the security mechanisms
+          proposed by the server in the Start method. If it doesn't, the server
+          MUST close the socket.
+        </rule>
+        <assert check="notnull"/>
+      </field>
+      <field name="response" type="longstr">
+        security response data
+        <doc>
+          A block of opaque data passed to the security mechanism. The contents
+          of this data are defined by the SASL security mechanism.  For the
+          PLAIN security mechanism this is defined as a field table holding two
+          fields, LOGIN and PASSWORD.
+        </doc>
+        <assert check="notnull"/>
+      </field>
+      <field name="locale" type="shortstr">
+        selected message locale
+        <doc>
+          A single message local selected by the client, which must be one of
+          those specified by the server.
+        </doc>
+        <assert check="notnull"/>
+      </field>
+    </method>
+    <!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
+    <method name="secure" synchronous="1" index="20">
+      security mechanism challenge
+      <doc>
+        The SASL protocol works by exchanging challenges and responses until
+        both peers have received sufficient information to authenticate each
+        other.  This method challenges the client to provide more information.
+      </doc>
+      <chassis name="client" implement="MUST"/>
+      <response name="secure-ok"/>
+      <field name="challenge" type="longstr">
+        security challenge data
+        <doc>
+          Challenge information, a block of opaque binary data passed to the
+          security mechanism.
+        </doc>
+        <see name="security mechanisms"/>
+      </field>
+    </method>
+    <method name="secure-ok" synchronous="1" index="21">
+      security mechanism response
+      <doc>
+        This method attempts to authenticate, passing a block of SASL data for
+        the security mechanism at the server side.
+      </doc>
+      <chassis name="server" implement="MUST"/>
+      <field name="response" type="longstr">
+        security response data
+        <doc>
+          A block of opaque data passed to the security mechanism.  The
+          contents of this data are defined by the SASL security mechanism.
+        </doc>
+        <assert check="notnull"/>
+      </field>
+    </method>
+    <!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
+    <method name="tune" synchronous="1" index="30">
+      propose connection tuning parameters
+      <doc>
+        This method proposes a set of connection configuration values to the
+        client.  The client can accept and/or adjust these.
+      </doc>
+      <chassis name="client" implement="MUST"/>
+      <response name="tune-ok"/>
+      <field name="channel max" type="short">
+        proposed maximum channels
+        <doc>
+          The maximum total number of channels that the server allows per
+          connection. Zero means that the server does not impose a fixed limit,
+          but the number of allowed channels may be limited by available server
+          resources.
+        </doc>
+      </field>
+      <field name="frame max" type="long">
+        proposed maximum frame size
+        <doc>
+          The largest frame size that the server proposes for the connection.
+          The client can negotiate a lower value.  Zero means that the server
+          does not impose any specific limit but may reject very large frames
+          if it cannot allocate resources for them.
+        </doc>
+        <rule implement="MUST">
+          Until the frame-max has been negotiated, both peers MUST accept
+          frames of up to 4096 octets large. The minimum non-zero value for the
+          frame-max field is 4096.
+        </rule>
+      </field>
+      <field name="heartbeat" type="short">
+        desired heartbeat delay
+        <doc>
+          The delay, in seconds, of the connection heartbeat that the server
+          wants.  Zero means the server does not want a heartbeat.
+        </doc>
+      </field>
+    </method>
+    <method name="tune-ok" synchronous="1" index="31">
+      negotiate connection tuning parameters
+      <doc>
+        This method sends the client's connection tuning parameters to the
+        server. Certain fields are negotiated, others provide capability
+        information.
+      </doc>
+      <chassis name="server" implement="MUST"/>
+      <field name="channel max" type="short">
+        negotiated maximum channels
+        <doc>
+          The maximum total number of channels that the client will use per
+          connection.  May not be higher than the value specified by the
+          server.
+        </doc>
+        <rule implement="MAY">
+          The server MAY ignore the channel-max value or MAY use it for tuning
+          its resource allocation.
+        </rule>
+        <assert check="notnull"/>
+        <assert check="le" method="tune" field="channel max"/>
+      </field>
+      <field name="frame max" type="long">
+        negotiated maximum frame size
+        <doc>
+          The largest frame size that the client and server will use for the
+          connection.  Zero means that the client does not impose any specific
+          limit but may reject very large frames if it cannot allocate
+          resources for them.  Note that the frame-max limit applies
+          principally to content frames, where large contents can be broken
+          into frames of arbitrary size.
+        </doc>
+        <rule implement="MUST">
+          Until the frame-max has been negotiated, both peers must accept
+          frames of up to 4096 octets large. The minimum non-zero value for the
+          frame-max field is 4096.
+        </rule>
+      </field>
+      <field name="heartbeat" type="short">
+        desired heartbeat delay
+        <doc>
+          The delay, in seconds, of the connection heartbeat that the client
+          wants. Zero means the client does not want a heartbeat.
+        </doc>
+      </field>
+    </method>
+    <!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
+    <method name="open" synchronous="1" index="40">
+      open connection to virtual host
+      <doc>
+        This method opens a connection to a virtual host, which is a collection
+        of resources, and acts to separate multiple application domains within
+        a server.
+      </doc>
+      <rule implement="MUST">
+        The client MUST open the context before doing any work on the
+        connection.
+      </rule>
+      <chassis name="server" implement="MUST"/>
+      <response name="open-ok"/>
+      <response name="redirect"/>
+      <field name="virtual host" domain="path">
+        virtual host name
+        <assert check="regexp" value="^[a-zA-Z0-9/-_]+$"/>
+        <doc>
+          The name of the virtual host to work with.
+        </doc>
+        <rule implement="MUST">
+          If the server supports multiple virtual hosts, it MUST enforce a full
+          separation of exchanges, queues, and all associated entities per
+          virtual host. An application, connected to a specific virtual host,
+          MUST NOT be able to access resources of another virtual host.
+        </rule>
+        <rule implement="SHOULD">
+          The server SHOULD verify that the client has permission to access the
+          specified virtual host.
+        </rule>
+        <rule implement="MAY">
+          The server MAY configure arbitrary limits per virtual host, such as
+          the number of each type of entity that may be used, per connection
+          and/or in total.
+        </rule>
+      </field>
+      <field name="capabilities" type="shortstr">
+        required capabilities
+        <doc>
+          The client may specify a number of capability names, delimited by
+          spaces.  The server can use this string to how to process the
+          client's connection request.
+        </doc>
+      </field>
+      <field name="insist" type="bit">
+        insist on connecting to server
+        <doc>
+          In a configuration with multiple load-sharing servers, the server may
+          respond to a Connection.Open method with a Connection.Redirect.  The
+          insist option tells the server that the client is insisting on a
+          connection to the specified server.
+        </doc>
+        <rule implement="SHOULD">
+          When the client uses the insist option, the server SHOULD accept the
+          client connection unless it is technically unable to do so.
+        </rule>
+      </field>
+    </method>
+    <method name="open-ok" synchronous="1" index="41">
+      signal that the connection is ready
+      <doc>
+        This method signals to the client that the connection is ready for use.
+      </doc>
+      <chassis name="client" implement="MUST"/>
+      <field name="known hosts" domain="known hosts"/>
+    </method>
+    <method name="redirect" synchronous="1" index="50">
+      asks the client to use a different server
+      <doc>
+        This method redirects the client to another server, based on the
+        requested virtual host and/or capabilities.
+      </doc>
+      <rule implement="SHOULD">
+        When getting the Connection.Redirect method, the client SHOULD
+        reconnect to the host specified, and if that host is not present, to
+        any of the hosts specified in the known-hosts list.
+      </rule>
+      <chassis name="client" implement="MAY"/>
+      <field name="host" type="shortstr">
+        server to connect to
+        <doc>
+          Specifies the server to connect to.  This is an IP address or a DNS
+          name, optionally followed by a colon and a port number. If no port
+          number is specified, the client should use the default port number
+          for the protocol.
+        </doc>
+        <assert check="notnull"/>
+      </field>
+      <field name="known hosts" domain="known hosts"/>
+    </method>
+    <!-- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -->
+    <method name="close" synchronous="1" index="60">
+      request a connection close
+      <doc>
+        This method indicates that the sender wants to close the connection.
+        This may be due to internal conditions (e.g. a forced shut-down) or due
+        to an error handling a specific method, i.e. an exception.  When a
+        close is due to an exception, the sender provides the class and method
+        id of the method which caused the exception.
+      </doc>
+      <rule implement="MUST">
+        After sending this method any received method except the Close-OK
+        method MUST be discarded.
+      </rule>
+      <rule implement="MAY">
+        The peer sending this method MAY use a counter or timeout to detect
+        failure of the other peer to respond correctly with the Close-OK
+        method.
+      </rule>
+      <rule implement="MUST">
+        When a server receives the Close method from a client it MUST delete
+        all server-side resources associated with the client's context.  A
+        client CANNOT reconnect to a context after sending or receiving a Close
+        method.
+      </rule>
+      <chassis name="client" implement="MUST"/>
+      <chassis name="server" implement="MUST"/>
+      <response name="close-ok"/>
+      <field name="reply code" domain="reply code"/>
+      <field name="reply text" domain="reply text"/>
+      <field name="class id" domain="class id">
+        failing method class
+        <doc>
+          When the close is provoked by a method exception, this is the class
+          of the method.
+        </doc>
+      </field>
+      <field name="method id" domain="class id">
+        failing method ID
+        <doc>
+          When the close is provoked by a method exception, this is the ID of
+          the method.
+        </doc>
+      </field>
+    </method>
+    <method name="close-ok" synchronous="1" index="61">
+      confirm a connection close
+      <doc>
+        This method confirms a Connection.Close method and tells the recipient
+        that it is safe to release resources for the connection and close the
+        socket.
+      </doc>
+      <rule implement="SHOULD">
+        A peer that detects a socket closure without having received a Close-Ok
+        handshake method SHOULD log the error.
+      </rule>
+      <chassis name="client" implement="MUST"/>
+      <chassis name="server" implement="MUST"/>
+    </method>
+  </class>
+  <class name="channel" handler="channel" index="20">
+    <!--
+    ======================================================
+    ==       CHANNEL
+    ======================================================
+    -->
+    work with channels
+    <doc>
+      The channel class provides methods for a client to establish a virtual
+      connection - a channel - to a server and for both peers to operate the
+      virtual connection thereafter.
+    </doc>
+    <doc name="grammar">
+      channel             = open-channel *use-channel close-channel
+      open-channel        = C:OPEN S:OPEN-OK
+      use-channel         = C:FLOW S:FLOW-OK