amqpev / amqpev / frame.py

import struct
import errors
import binops
from spec.spec import spec_0_8 as spec


def flatten_once(l):
    result = []
    for elem in l:
        if isinstance(elem, (list, tuple)):
            result.extend(elem)
        else:
            result.append(elem)
    return result

        
def mirror_dict(d):
    d2 = {}
    for k, v in d.iteritems():
        d2[k] = v
        d2[v] = k
    return d2


FRAME_TYPE = mirror_dict({
    1: 'method',
    2: 'header',
    3: 'body',
    4: 'oob-method',
    5: 'oob-header',
    6: 'oob-body',
    7: 'trace',
    8: 'heartbeat'})


class Frame(object):
    class IncompleteFrameError(errors.AMQPError): pass

    def marshall_amqp(self):
        out = struct.pack('!BHL', FRAME_TYPE[self.type], self.channel,
                len(self.payload))
        out += self.payload
        out += '\xCE'
        return out

    @classmethod
    def unmarshall_amqp(cls, buf):
        HEADER_LEN = 7

        if len(buf) < HEADER_LEN:
            raise cls.IncompleteFrameError("Incomplete frame header")

        (header, rest) = (buf[:HEADER_LEN], buf[HEADER_LEN:])
        (type, channel, payload_len) = struct.unpack('!BHL', header)

        if len(rest) < payload_len + 1:
            raise cls.IncompleteFrameError("Incomplete payload (need %i, got"
                    " %i)" % (payload_len, len(rest)))

        (payload, framing_byte) = (rest[:payload_len], rest[payload_len])
        consumed_bytes = HEADER_LEN + payload_len + 1

        frame = cls()
        frame.type = FRAME_TYPE[type]
        frame.channel = channel
        frame.payload = payload

        frame_class = FRAME_UNMARSHALL_DISPATCH.get(frame.type)
        if frame_class is not None:
            return (frame_class.unmarshall_amqp(frame), consumed_bytes)
        else:
            return (frame, consumed_bytes)

    def marshall_dict(self):
        return {
            'channel': self.channel,
            'type': self.type,
            'payload': self.payload }


class MethodFrame(Frame):
    type = 'method'

    def __init__(self, method, **kwargs):
        self.method = method
        self.args = kwargs

    def marshall_amqp(self):
        field_desc = self.method['fields']
        packing_list = [self.method['class_id'], self.method['id']]
        coding = "HH"
        
        for field in field_desc:
            try:
                val = self.args[ field['name'] ]
            except KeyError:
                raise errors.AMQPMethodParameterError("Parameter %s.%s is"
                        " required but was not provided."
                        % (self.method['qname'], field['name']))

            if field['pack'] is not None:
                val = getattr(binops, field['pack'])(val)
            packing_list.append(val)
            coding += field['code']

        packing_list = flatten_once(packing_list)
        self.payload = binops.pack_splice(coding, packing_list)

        return Frame.marshall_amqp(self)

    @classmethod
    def unmarshall_amqp(cls, frame):
        HEADER_LEN = 4
        buf = frame.payload
        (header, rest) = (buf[:HEADER_LEN], buf[HEADER_LEN:])
        (class_id, method_id) = struct.unpack("!HH", header)

        method = spec[class_id][method_id]
        coding = ''.join( field['code'] for field in method['fields'] )
        payload_contents = binops.unpack_splice(coding, rest)
        args = {}

        for field in method['fields']:
            nr_items = len(field['code'])
            val = payload_contents[:nr_items]
            payload_contents = payload_contents[nr_items:]
            if field['unpack'] is not None:
                val = getattr(binops, field['unpack'])(*val)
            elif nr_items == 1:
                val = val[0]
            args[ field['name'] ] = val

        new_frame = cls(method, **args)
        new_frame.channel = frame.channel
        return new_frame

    def marshall_dict(self):
        self.payload = (self.method['qname'], self.args)
        return Frame.marshall_dict(self)


class HeaderFrame(Frame):
    type = 'header'

    def __init__(self, class_, body_len, headers):
        self.class_ = class_
        self.headers = headers
        self.body_len = body_len

    def marshall_amqp(self):
        prop_desc = self.class_.properties
        packing_list = [self.class_.id, 0, self.body_len]
        coding = "HHQ"
        flags = 0

        prop_packing_list = []
        prop_coding = ""
        for prop in prop_desc:
            name = prop['name']
            if name not in self.headers:
                continue

            val = self.headers[name]
            # Bit properties do not go in the property list, only in the
            # flags.
            if prop['code'] == '.':
                if val: flags |= prop['bitmask']
                continue

            flags |= prop['bitmask']

            if prop['pack'] is not None:
                val = getattr(binops, prop['pack'])(val)
            prop_packing_list.append(val)
            prop_coding += prop['code']

        # Set LSB on all flag words except the last one
        nr_words = self.class_.property_flag_words
        for i in range(1, nr_words):
            flags |= 1 << (i * 16)

        # Shift out each word. Push them on the front of the packing list
        # because the flags are stored big-endian.
        for i in range(nr_words):
            prop_packing_list.insert(0, 0xFFFF & flags)
            flags >>= 8
        prop_coding = 'H' * nr_words + prop_coding

        packing_list.extend(prop_packing_list)
        coding += prop_coding

        packing_list = flatten_once(packing_list)
        self.payload = binops.pack_splice(coding, packing_list)

        return Frame.marshall_amqp(self)

    @classmethod
    def unmarshall_amqp(cls, frame):
        HEADER_LEN = 12
        buf = frame.payload
        (header, rest) = (buf[:HEADER_LEN], buf[HEADER_LEN:])
        (class_id, body_len) = struct.unpack("!HxxQ", header)

        class_ = spec[class_id]
        nr_words = class_.property_flag_words
        flag_coding = '!' + 'H' * nr_words
        (flag_data, rest) = (rest[:nr_words*2], rest[nr_words*2:])
        flag_words = struct.unpack(flag_coding, flag_data)

        flags = reduce(lambda t, w: t << 16 | w, flag_words, 0)

        props_present = [ prop for prop in class_.properties
                if prop['bitmask'] & flags ]
        coding = ''.join( prop['code'] for prop in props_present )
        payload_contents = binops.unpack_splice(coding, rest)
        headers = {}

        for prop in props_present:
            name = prop['name']

            if prop['code'] == '.':
                headers[name] = True
                continue

            nr_items = len(prop['code'])
            val = payload_contents[:nr_items]
            payload_contents = payload_contents[nr_items:]
            if prop['unpack'] is not None:
                val = getattr(binops, prop['unpack'])(*val)
            elif nr_items == 1:
                val = val[0]

            headers[name] = val

        new_frame = cls(class_, body_len, headers)
        new_frame.channel = frame.channel
        return new_frame

    def marshall_dict(self):
        self.payload = (self.class_.name, self.body_len, self.headers)
        return Frame.marshall_dict(self)


class BodyFrame(Frame):
    type = 'body'

    def __init__(self, payload):
        self.payload = payload

    @classmethod
    def unmarshall_amqp(cls, frame):
        new_frame = cls(frame.payload)
        new_frame.channel = frame.channel
        return new_frame


FRAME_UNMARSHALL_DISPATCH = {
    'method': MethodFrame,
    'header': HeaderFrame,
    'body': BodyFrame }
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.