Commits

zhihu committed f256f4f

add ping method

Comments (0)

Files changed (3)

examples/sync/client.py

 
 if __name__ == '__main__':
     basic_service = client()
+    print basic_service.transport.ping()
     request = SimpleRequest()
     request.param1 = 'hello, '
     import time
 # -*- coding: utf-8 -*-
 import logging
 import time
-import json
 from threading import Thread
 import signal
 import os
 from protorpc import protobuf
 from protorpc import remote
 
-from .utils.debug import get_traceback_info
 from .rpc_pb import Request
 from .rpc_pb import Response
 
 logger = logging.getLogger(__file__)
 
+
 class Server(object):
 
     channel = None
 
     def register_service(self, service, name=None):
         '''
-        Class-instances which should be available for calling, are registered with this function.
+        Class-instances which should be available for calling, are registered
+        with this function.
 
-        No methods for this instance are stored because they could be reached with the function 
-        getattr for any given class.
+        No methods for this instance are stored because they could be reached
+        with the function getattr for any given class.
 
         :param instance: class-instance which should be registered
         :param name: with this name the instance is being registered
         '''
         if not name:
-            definition_name_function = getattr(service, 'definition_name', None)
+            definition_name_function = getattr(service, \
+                    'definition_name', None)
             if definition_name_function:
                 name = definition_name_function()
             else:
         """
         raise NotImplementedError()
 
+
 class RPCServer(Server):
 
-    def __init__(self, name, listen, worker_num=5, 
+    def __init__(self, name, listen, worker_num=5,
             protocol=protobuf):
 
-        super(RPCServer,self).__init__(protocol)
-        self.server_id = "{0}@{1}_{2}".format(name,os.uname()[1],os.getpid())
+        super(RPCServer, self).__init__(protocol)
+        self.server_id = "{0}@{1}_{2}".format(name, os.uname()[1], os.getpid())
         self.listen = listen
         self.worker_num = worker_num
         self.context = zmq.Context(1)
         self.__stop = True
 
     def register_signal_handlers(self):
-        signal.signal(signal.SIGTERM, lambda signum,frame:self.stop())
-        signal.signal(signal.SIGINT, lambda signum,frame:self.stop())
-        signal.signal(signal.SIGQUIT, lambda signum,frame:self.stop())
-        signal.signal(signal.SIGUSR1, lambda signum,frame:self.stop())
+        signal.signal(signal.SIGTERM, lambda signum, frame: self.stop())
+        signal.signal(signal.SIGINT, lambda signum, frame: self.stop())
+        signal.signal(signal.SIGQUIT, lambda signum, frame: self.stop())
+        signal.signal(signal.SIGUSR1, lambda signum, frame: self.stop())
 
     def start(self):
         # Socket to talk to workers
         worker_url = "inproc://{0}_workers".format(self.server_id)
         logger.debug('bind worker on: {0}'.format(worker_url))
-        workers = self.context.socket(zmq.XREP) 
+        workers = self.context.socket(zmq.XREP)
         workers.bind(worker_url)
 
         # Socket to talk to clients
         clients = self.context.socket(zmq.XREP)
         clients.bind(self.listen)
 
-        for i in range(0,self.worker_num):
+        for i in range(0, self.worker_num):
             execute_method_thread = ExecuteMethodThread(self, worker_url)
             self.threads.append(execute_method_thread)
             execute_method_thread.start()
                 break
 
             # handle worker activity on the backend
-            if socks.has_key(xrep_workers) and socks[xrep_workers] == zmq.POLLIN:
+            if xrep_workers in socks \
+                    and socks[xrep_workers] == zmq.POLLIN:
                 msg = xrep_workers.recv_multipart()
                 # add the worker address to the queue
                 worker_addr = msg[0]
 
             #  Dequeue and drop the next worker address
             if workers_list \
-               and socks.has_key(xrep_clients) \
+               and xrep_clients in socks \
                and socks[xrep_clients] == zmq.POLLIN:
-                   worker_addr = workers_list.pop()
-                   msg_in = xrep_clients.recv_multipart()
-                   msg_out = [worker_addr, '']+msg_in
-                   xrep_workers.send_multipart(msg_out)
+
+                worker_addr = workers_list.pop()
+                msg_in = xrep_clients.recv_multipart()
+                msg_out = [worker_addr, ''] + msg_in
+                xrep_workers.send_multipart(msg_out)
+
 
 class ExecuteMethodThread(Thread):
 
     def __init__(self, server, worker_url):
         Thread.__init__(self)
-        self.server       = server
-        self.context        = self.server.context
-        self.server_id      = self.server.server_id
-        self.protocol     = self.server.protocol
-        self.worker_url     = worker_url
+        self.server = server
+        self.context = self.server.context
+        self.server_id = self.server.server_id
+        self.protocol = self.server.protocol
+        self.worker_url = worker_url
         self.__stop = False
 
     def stop(self):
         self.__reply(self.protocol.encode_message(response))
 
     def __reply(self, msg):
-        self.socket.send_multipart(self.return_address+[msg])
+        self.socket.send_multipart(self.return_address + [msg])
 
     def __send_error(self,
                     response,
                     status_state,
                     error_message,
                     error_name=None):
+
         status = remote.RpcStatus(state=status_state,
-                                error_message= '%s - '%self.server.server_id +  error_message,
-                                error_name=error_name)
+                error_message='{0} - '
+                       .format(self.server.server_id + error_message),
+                error_name=error_name)
         response.status = status
         return response
 
         with self.server.app.request_context():
             resp = Response()
             try:
-                req = self.protocol.decode_message(Request,msg)
+                req = self.protocol.decode_message(Request, msg)
                 client_id = req.client_id
                 service_name = req.service_name
-                method_name = req.method_name 
+                method_name = req.method_name
+                if method_name == 'ping':
+                    resp.response = "I'm still alive"
+                    return resp
                 service = self.server.get_service(service_name)
                 method = self.server.get_method(service, method_name)
                 method_info = method.remote
                 #send reply back to client
                 self.reply(result_message)
 
-        except AssertionError, e:
-            logger.error('Could not assert right socket state, creating new socket.')
+        except AssertionError:
+            logger.error('Could not assert right socket state, creating new socket.', exc_info=1)
 
-        except zmq.ZMQError as e:
-            logger.error('Could not send or receive message, creating new socket.')
+        except zmq.ZMQError:
+            logger.error('Could not send or receive message, creating new socket.', exc_info=1)
         finally:
             self.poller.unregister(self.socket)
             self.socket.close()

starry/transport.py

         response_type = remote_info.response_type
         return self.call_remote(method_name, request, response_type)
 
+    def ping(self):
+        req = Request()
+        req.client_id = self.client_id
+        req.service_name = self.__service_name
+        req.method_name = 'ping'
+        req.request = ''
+        tcp_request = self.__request_type(
+                            transport=self,
+                            service_name=self.__service_name,
+                            encoded_request=self.protocol.encode_message(req),
+                            )
+
+        encoded_response, status = tcp_request.get_response()
+        return encoded_response
+
     def call_remote(self, method_name, request, response_type):
         """Start a remote procedure call.
 
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.