Commits

Florian Schlachter  committed 214feb8

Initial commit.

  • Participants

Comments (0)

Files changed (10)

+/.project
+/.pydevproject
+*.pyc
+/.settings/org.eclipse.core.resources.prefs
+Copyright (c) 2012 Florian Schlachter <flori@n-schlachter.de>
+
+Permission is hereby granted, free of charge, to any person
+obtaining a copy of this software and associated documentation
+files (the "Software"), to deal in the Software without
+restriction, including without limitation the rights to use,
+copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the
+Software is furnished to do so, subject to the following
+conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+OTHER DEALINGS IN THE SOFTWARE.

File README.md

Empty file added.

File client_example.py

+# -*- coding: utf-8 -*-
+
+import time
+
+from pyrpc import RPCClient
+
+COUNT = 10000
+
+def main():
+    c = RPCClient()
+    c.connect(ip='127.0.0.1', port=4711)
+    
+    print "5^2 + 10^2 =", c.pythagoras(5, 10)
+    
+    stime = time.time()
+    for _ in xrange(COUNT):
+        c.ping()
+    etime = time.time()
+    
+    print "%d calls took %.2fs (%d/s)" % (COUNT, etime-stime, COUNT/(etime-stime))
+
+if __name__ == "__main__":
+    main()

File pyrpc/__init__.py

+# -*- coding: utf-8 -*-
+
+from rpc import *

File pyrpc/client.py

+# -*- coding: utf-8 -*-
+
+import socket
+
+__all__ = ('Client', 'ClientException')
+
+class ClientException(Exception): pass
+class Client(object):
+    
+    def __init__(self):
+        self.socket = None
+        self.address = None
+    
+    def connect(self, ip, port):
+        self.disconnect()
+        self.address = (ip, port)
+        try:
+            self.socket = socket.create_connection(self.address, timeout=2)
+        except socket.error, e:
+            raise ClientException(e)
+    
+    def disconnect(self):
+        if self.socket:
+            self.socket.close()
+            self.socket = None
+
+    def recv(self):
+        try:
+            return self.socket.recv(8192)
+        except socket.error, e:
+            raise ClientException(e)
+
+    def send(self, buf):
+        try:
+            self.socket.sendall(buf)
+        except socket.error, e:
+            raise ClientException(e)

File pyrpc/message.py

+# -*- coding: utf-8 -*-
+
+import zlib
+import struct
+import cPickle
+import time
+import random
+
+random.seed(time.time())
+
+class MessageException(Exception): pass
+class MessageIncomplete(MessageException): pass
+class MessageInvalid(MessageException): pass
+class Message(object):
+    MAGIC = 0x0815
+    HEADER = "!HL"
+    HEADER_SIZE = struct.calcsize(HEADER)
+    
+    def __init__(self, data, job_ident=None):
+        self.data = data
+    
+    def __repr__(self):
+        return '<Message data=%s>' % \
+            (self.data, )
+
+    def serialize(self):
+        data = zlib.compress(cPickle.dumps(self.data))
+        header = struct.pack(self.HEADER,
+                             self.MAGIC,
+                             len(data))
+        return "".join([header, data])
+    
+    @staticmethod
+    def parse(buf):
+        if len(buf) < Message.HEADER_SIZE:
+            raise MessageIncomplete(u'Header not present.')
+        
+        try:
+            magic, datalen = struct.unpack(Message.HEADER, buf[:Message.HEADER_SIZE])
+        except struct.error:
+            raise MessageInvalid(u'Struct unpack failed.')
+        
+        buf = buf[Message.HEADER_SIZE:]
+        
+        if magic != Message.MAGIC:
+            raise MessageInvalid(u'Invalid magic.')
+        
+        if len(buf) < datalen:
+            raise MessageIncomplete(u'Data not complete.')
+        
+        try:
+            data = cPickle.loads(zlib.decompress(buf[:datalen]))
+        except ValueError:
+            raise MessageInvalid(u'Deserialization failed.')
+        except zlib.error:
+            raise MessageInvalid(u'Decompress failed.')
+    
+        # What remains from the buffer?
+        buf = buf[datalen:]
+        
+        return data, buf

File pyrpc/rpc.py

+# -*- coding: utf-8 -*-
+
+from client import Client, ClientException
+from server import Server, Connection
+from message import Message, MessageIncomplete, MessageInvalid
+
+class RPCServerConnection(Connection):
+    def __init__(self, *args, **kwargs):
+        super(RPCServerConnection, self).__init__(*args, **kwargs)
+        self.bufs = []
+    
+    def handle_data(self, buf):
+        self.bufs.append(buf)
+        
+        while len(self.bufs) > 0:
+            try:
+                request, remaining_buf = Message.parse("".join(self.bufs))
+            except MessageIncomplete:
+                return True
+            except MessageInvalid, e:
+                raise ServerCommunicationError(e)
+            
+            self.bufs = [remaining_buf, ]
+            
+            fname = request['name']
+            args = request['args']
+            kwargs = request['kwargs']
+            
+            func_name = 'handle_%s' % fname
+    
+            if self.server.handler_kwargs:
+                callee = self.server.handler(**self.server.handler_kwargs)
+            else:
+                callee = self.server.handler()
+            callee.data = request
+            callee.address = self.address
+            callee.connection = self
+            
+            callee.prepare()
+    
+            if hasattr(callee, func_name):
+                try:
+                    result = getattr(callee, func_name)(*args, **kwargs)
+                except Exception, e:
+                    #raise # for debugging purposes
+                    response = Message({
+                        'status': 'exception',
+                        'text': unicode(e)
+                    })
+                else:
+                    response = Message({
+                        'status': 'ok',
+                        'result': result
+                    })
+            else:
+                response = Message({
+                    'status': 'notfound'
+                })
+            
+            self.send(response.serialize())
+
+class RPCServerHandler(object):
+    
+    def prepare(self):
+        return
+
+class RPCServerException(Exception): pass
+class ServerCommunicationError(RPCServerException): pass
+class RPCServer(Server):
+    CONNECTION_CLASS = RPCServerConnection
+    
+    def __init__(self, *args, **kwargs):
+        self.handler = kwargs.pop('handler')
+        if kwargs.has_key('handler_kwargs'):
+            self.handler_kwargs = kwargs.pop('handler_kwargs')
+        else:
+            self.handler_kwargs = None
+        
+        super(RPCServer, self).__init__(*args, **kwargs)
+
+class RPCClientException(Exception): pass
+class ClientCommunicationError(RPCClientException): pass
+class RemoteException(RPCClientException): pass
+class MethodNotFound(RPCClientException): pass
+class RPCClient(Client):
+    
+    def __init__(self, env=None):
+        super(RPCClient, self).__init__()
+        self.env = env or {}
+    
+    def __getattr__(self, name):
+        
+        def do_call(*args, **kwargs):
+            data = {
+                'name': name,
+                'args': args,
+                'kwargs': kwargs,
+            }
+            data.update(self.env)
+            request = Message(data)
+            
+            # Send request
+            self.send(request.serialize())
+            
+            # Receive response
+            bufs = []
+            while True:
+                try:
+                    buf = self.recv()
+                except ClientException, e:
+                    raise ClientCommunicationError(e)
+                
+                if not buf:
+                    raise ClientCommunicationError('Server hang up.')
+                
+                bufs.append(buf)
+    
+                try:
+                    response, _ = Message.parse("".join(bufs))
+                except MessageIncomplete:
+                    continue
+                except MessageInvalid, e:
+                    raise ClientCommunicationError(e)
+                else:
+                    # Message complete
+                    break
+            
+            if response.get('status') == 'exception':
+                raise RemoteException(response.get('text'))
+            elif response.get('status') == 'notfound':
+                raise MethodNotFound(u'Method "%s" not found.' % name)
+            elif response.get('status') == 'ok':
+                return response.get('result')
+            else:
+                raise NotImplementedError
+        
+        return do_call

File pyrpc/server.py

+# -*- coding: utf-8 -*-
+
+import socket
+import select
+
+__all__ = ('Server', 'ServerException')
+
+class Connection(object):
+
+    def __init__(self, server, sock, address):
+        self.server = server
+        self.socket = sock
+        self.address = address
+        self.buffer_send = []
+        self.buffer_read = []
+
+    def disconnect(self):
+        if self.socket:
+            self.socket.close()
+            self.socket = None
+
+    def _check_healthy(self):
+        if not self.socket:
+            return False
+        return True
+
+    def handle_read(self):
+        if not self._check_healthy():
+            # Connection not healthy, return!
+            return False
+
+        try:
+            buf = self.socket.recv(8192)
+        except socket.error:
+            # Connection error
+            return False
+        
+        if not buf:
+            # No data available, connection probably closed
+            self.disconnect()
+            return False
+        
+        self.handle_data(buf)
+        
+        return True
+    
+    def handle_write(self):
+        if not self._check_healthy():
+            # Connection not healthy, return!
+            return False
+        
+        if not self.buffer_send:
+            # Connection (presumedly) alive, but buffer empty! 
+            return True
+        
+        buf = self.buffer_send[0]
+
+        try:
+            bytes_sent = self.socket.send(buf)
+        except socket.error:
+            # Connection error
+            return False
+
+        if bytes_sent >= len(buf):
+            self.buffer_send = self.buffer_send[1:]
+        else:
+            self.buffer_send[0] = buf[bytes_sent:]
+        
+        return True
+    
+    def flush(self):
+        for item in self.buffer_send:
+            self.socket.sendall(item)
+        self.buffer_send = [] 
+
+    def send(self, buf):
+        self.buffer_send.append(buf)
+
+    def has_write(self):
+        return len(self.buffer_send) > 0
+
+class ServerException(Exception): pass
+class Server(object):
+    CONNECTION_CLASS = Connection
+    
+    def __init__(self, port, ip=''):
+        self.ip = ip
+        self.port = port
+        self.socket = None
+        
+        self.clients = {} # fileno -> obj
+    
+    def start(self):
+        self.stop()
+        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        self.socket.bind((self.ip, self.port))
+        self.socket.listen(3)
+    
+    def stop(self):
+        if self.socket:
+            self.socket.close()
+            self.socket = None
+    
+    def _dispatch(self):
+        removals = []
+        for fno, cliobj in self.clients.iteritems():
+            if not cliobj.socket:
+                removals.append(fno)
+
+        for removal in removals:
+            del self.clients[removal]
+                
+        rl, wl, _ = select.select([self.socket.fileno(), ] + self.clients.keys(),
+                                   map(lambda f: f.socket.fileno(), filter(lambda f: f.has_write(), self.clients.values())),
+                                   [])
+
+        for s in rl:
+            if s == self.socket.fileno():
+                # New client
+                socket, address = self.socket.accept()
+                self.clients[socket.fileno()] = self.CONNECTION_CLASS(server=self,
+                                                                      sock=socket,
+                                                                      address=address)
+            else:
+                if not self.clients[s].handle_read():
+                    del self.clients[s]
+        
+        for s in wl:
+            if self.clients.has_key(s) and not self.clients[s].handle_write():
+                del self.clients[s]
+    
+    def run(self):
+        self.start()
+        try:
+            while True:
+                self._dispatch()
+        finally:
+            self.stop()

File server_example.py

+# -*- coding: utf-8 -*-
+
+from pyrpc import RPCServer, RPCServerHandler
+
+class CalcServerHandler(RPCServerHandler):
+    
+    def handle_pythagoras(self, a, b):
+        return a**2 + b**2
+
+    def handle_ping(self):
+        return True
+
+def main():
+    cs = RPCServer(port=4711, handler=CalcServerHandler)
+    try:
+        cs.run()
+    except KeyboardInterrupt:
+        pass
+
+if __name__ == "__main__":
+    main()