Source

protocol-example / ptypes.py

from inspect import currentframe
import struct


i64 = {'ctypes': 'q', 'java': 'long'}
i16 = {'ctypes': 'h', 'java': 'short'}
string = {'ctypes': 'x', 'java': 'String'}
def mktoken(name, types):
    """Create a token"""
    return Token(types['java'], name, fmt=types['ctypes'], 
                 simple=not types['java'] == "String")

def t(name, data_type):
    """Inserts name = (name, data_type) in locals()
    of calling scope"""
    currentframe().f_back.f_locals[name] = mktoken(name, data_type)

class Token(object):
    """Represents one or more tokens"""
    simple = True

    def __init__(self, javaname, name, fmt="x", simple=True):
        self.simple = simple
        self.fmt = fmt
        self.javaname = javaname
        self.name = name
        self.pattern_size = struct.calcsize(self.fmt)

    def parse(self, buf):
        if self.simple:
            values = struct.unpack("!%s" % (self.fmt), 
                                   buf.data[:self.pattern_size])
            buf.data = buf.data[self.pattern_size:]
            
            return {self.name: values[0]}
        
        else:
            
            strlen = struct.unpack("!h", buf.data[:2])[0]
            str_ = struct.unpack("!%ss" % (strlen,), buf.data[2:2+strlen])[0]
            buf.data = buf.data[2+strlen:]
            return {self.name: str_} 
    
    
    def __str__(self):
        type_ = "simple" if self.simple else "string"
        return "%s token with format %s" % (type_, self.fmt)
    
class Buffer(object):
    """Class to hold a message buffer. Each token will
    consume parts of the buffer as it parses the message"""
    def __init__(self, data):
        self.data = data  
        
    
class Message(object):
    """A message, knows how to parse data"""
    def __init__(self, msgname, msgdesc, msgtype, *tokens):
        self.tokens = tokens
        self.__doc__ = msgdesc
        self.parsers = self.tokens
        self.name = msgname
        self.type = hex(msgtype) #for java
 
        
    def parse(self, data):
        buf = Buffer(data[1:]) #remove type byte
        result = {'type': self.name}
        for res in [x.parse(buf) for x in self.parsers]:            
            result.update(res)
        assert len(buf.data) == 0
        return result
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.