1. youngking
  2. starry

Commits

zhihu  committed 51c1252

add an example and change logging format

  • Participants
  • Parent commits 91cfe47
  • Branches master

Comments (0)

Files changed (5)

File examples/sync/client.py

View file
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from protorpc import messages
+from protorpc import remote
+
+from interface import  (SimpleRequest,
+                         SimpleResponse,
+                         BasicService,
+                        )
+
+def client(name='client', address='tcp://0.0.0.0:5000', service_name='basic_service'):
+    from starry.transport  import TcpTransport
+    basic_service = BasicService.Stub(TcpTransport(name, address, service_name))
+    return basic_service
+
+if __name__ == '__main__':
+    basic_service = client()
+    request = SimpleRequest()
+    request.param1 = 'hello, '
+    import time
+    t1 = time.time()
+    print " --------------  sync test  ----------------"
+    for i in range(10):
+        print i
+        request.param2 = 'world %s'%i
+        print basic_service.test(request)
+    t2 = time.time()
+    print "tested: ", t2 -t1

File examples/sync/interface.py

View file
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from protorpc import messages
+from protorpc import remote
+
+class SimpleRequest(messages.Message):
+  """Simple request message type used for tests."""
+
+  param1 = messages.StringField(1)
+  param2 = messages.StringField(2)
+
+
+class SimpleResponse(messages.Message):
+  """Simple response message type used for tests."""
+  value = messages.StringField(1)
+
+
+class BasicService(remote.Service):
+  """A basic service with decorated remote method."""
+
+  def __init__(self):
+    self.request_ids = []
+
+  @remote.method(SimpleRequest, SimpleResponse)
+  def test(self, request):
+      raise NotImplementedError('Method test is not implemented')

File examples/sync/server.py

View file
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+from protorpc import messages
+from protorpc import remote
+import time
+from starry.utils import options
+
+from interface import  (SimpleRequest,
+                         SimpleResponse,
+                         BasicService,
+                        )
+
+class BasicServiceImpl(BasicService):
+
+    def test(self, request):
+        time.sleep(0.1)
+        return SimpleResponse(value = request.param1+request.param2)
+
+
+def create_app(address='tcp://0.0.0.0:5000', service_name='basic_service'):
+    from starry.app import Application
+    from starry.server import RPCServer
+    rpcserver = RPCServer('rpcserver', address)
+    rpcserver.register_service(BasicServiceImpl(), service_name)
+    app = Application(None)
+    options.parse_command_line()
+    app.add_server(rpcserver)
+    return app
+
+
+if __name__ == '__main__':
+    app = create_app()
+    app.run()

File starry/server.py

View file
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
 import logging
+import time
 import json
 from threading import Thread
 import signal
 from .rpc_pb import Request
 from .rpc_pb import Response
 
-logger = logging.getLogger('starry.server')
+logger = logging.getLogger(__file__)
 
 class Server(object):
 
                 method_info = method.remote
                 try:
                     request_msg = self.protocol.decode_message(method_info.request_type,req.request)
-                    logger.info("client: {client_id}, request: {request} ,serice: {service_name} ,method: {method_name}".format(client_id=client_id, 
-                                                                                                                                request = request_msg,
-                                                                                                                                service_name = service_name,
-                                                                                                                                method_name = method_name ))
                 except Exception as e:
                     raise remote.ServiceDefinitionError(e)
             except KeyError as e:
                                 'Invalid Service define: {0}'.format(err)
                                 )
             try:
+                start = time.time()
                 response = method(request_msg)
+                end = time.time()
+                finished = end - start
+                logger.info("<method {method_name}( {request} ) > from {client_id} to {service_name} ,finished in {finished}s".format(client_id=client_id, 
+                                                                                                                            request = request_msg,
+                                                                                                                            service_name = service_name,
+                                                                                                                            method_name = method_name,
+                                                                                                                            finished = finished,
+                                                                                                                            ))
                 resp.response = self.protocol.encode_message(response)
                 return resp
             except remote.ApplicationError, err:

File starry/utils/time_queue.py

View file
+# -*- coding: utf-8 -*-
+"""
+time_queue.py
+
+A priority queue based on time
+"""
+
+import heapq
+import time           
+
+class TimeQueue(object):
+    """a priority queue based on time"""
+    
+    def __init__(self):
+        """create a time queue"""
+        self._queue = list()
+        
+    def put(self, task, start_time=time.time()):
+        """put one task into the queue""" 
+        assert task is not None
+        heapq.heappush(self._queue, (start_time, task, ))
+        
+    def peek_time(self):
+        """return the time the next task is due"""
+        return self._queue[0][0]
+        
+    def pop(self):
+        """
+        return the next task
+        """
+        _, task = heapq.heappop(self._queue)
+        return task             
+        
+    def __len__(self):
+        """report the size of the queue"""
+        return len(self._queue)   
+        
+
+if __name__ == '__main__':
+    queue = TimeQueue()
+    queue.put(1)
+    queue.put(2, time.time()+10)
+    queue.put(3)
+    queue.put(1, time.time()+5)
+    print queue.pop()
+    print queue.pop()
+    print queue.pop()
+