Commits

Anonymous committed 6059662

reuse_addr default is true;fix on_close event;add RpcProxy.valid;

Comments (0)

Files changed (2)

 from .rpc_shell import *
 
 
-VERSION = (0, 3, 6)
+VERSION = (0, 3, 7)
 __version__ = VERSION
 from gevent.queue import Queue
 from gevent.event import AsyncResult
 
-from logging import root as logger
+from logging import root as logger, currentframe
 
 
 PICKLE_PROTOCOL = HIGHEST_PROTOCOL
 
 use_logging = False
 log_head = '[p%s]' % os.getpid()
-def printf(msg):
+def printf(msg, *args):
     if use_logging:
-        logger.warn(msg)
+        logger.warn(msg, *args)
     else:
-        print('%s%s' % (log_head, msg))
+        print('%s%s' % (log_head, msg % args))
 
 
 def log_except(clear=False):
         if ex.errno != ENOENT:
             raise
 
-def tcp_listener(address, backlog=50, reuse_addr=None, family=_socket.AF_INET):
+def tcp_listener(address, backlog=50, reuse_addr=True, family=_socket.AF_INET):
     """A shortcut to create a TCP socket, bind it and put it into listening state."""
     sock = socket(family=family)
     if reuse_addr is not None:
             for p in endpoints:
                 yield p
 
-    def bind(self, address):
+    def bind(self, address, reuse_addr=True):
         self.addr = address
         self.bind_addr = address
         if not isinstance(address, str):
-            self._socket = tcp_listener(address, reuse_addr=False)
+            self._socket = tcp_listener(address, reuse_addr=reuse_addr)
         else:
-            self._socket = bind_unix_listener(address)
+            self._socket = bind_unix_listener(address, reuse_addr=reuse_addr)
         _my_servers_[address] = self
 
     def connect(self, address):
         if self.stoped:
             return
         self.stoped = True
+        self.svr.svc_stop(self)
         self._stop_resps()
         self._stop_proxys()
         if not self.sock_error:
         if getattr(self, '_heart_task', None):
             self._heart_task.kill(block=False)
             self._heart_task = None
-        self.svr.svc_stop(self)
 
         try:
             pass #self._pool.join(timeout=60*3, raise_error=True)
         except GreenletExit:
             pass
         except Exception as err:
-            printf(err)
+            printf('[RpcService._recver]%s', err)
         finally:
             self.stop()
 
     def _on_socket_error(self, err):
         if self.stoped or self.reconnect_timeout <= 0:
+            self.sock_error = True
             self.stop()
             return
         def _reconnect():
 
         if self._reconnected is None:
             self._reconnected = AsyncResult()
-            printf('socket error:%s,  RpcService try reconnect' % err)
+            printf('socket error:%s,  RpcService try reconnect', err)
             self.send = self.send_wait
             if hasattr(self.svr, 'reconnect'):#RpcClient.reconnect
                 spawn(_reconnect)
             except KeyError:
                 proxy_cls = RpcProxy
                 proxy = proxy_cls(export_id, svc=self)
-                self._proxys[export_id] = proxy
+                self.reg_proxy(export_id, proxy)
                 return proxy
         else:
-            return proxy_cls(export_id, svc=self)
+            p = proxy_cls(export_id, svc=self)
+            self.reg_proxy(id(p), p)
+            return p
+
+    def reg_proxy(self, key, proxy):
+        self._proxys[key] = proxy
 
     def stop_shell(self, shell_id):
         shell = self.shells.pop(shell_id, None)
     def execute(self, func, args, kw, **others):
         return self.svr.execute(func, args, kw)
 
+    def valid_proxy(self, export_id):
+        return self.get_export(export_id) != None
+
 
 class RpcClient(RpcBase):
     def __init__(self, size=None, uid=None):
         self.svc._handle(parts)
 
     def before_stop(self):
+        if self.addr in _addr_clients:
+            _addr_clients.pop(self.addr, None)
         self.svc.stop()
 
     def svc_stop(self, service):
         self._accept_task = spawn(self._accept)
 
     def before_stop(self):
+        if self.addr in _my_servers_:
+            _my_servers_.pop(self.addr, None)
         for svc in self._services.values():
             svc.stop()
         self._accept_task.kill(block=True)
         self._id = export_id
         self._addr = addr
         self.timeout = CALL_TIMEORUT
-        self._closes = WeakKeyDictionary()
+        self._closes = {}#WeakKeyDictionary()
 
     def __reduce__(self):
         """ 支持pickle,
     def __getitem__(self, key):
         return self.__getattr__('__getitem__')(key)
 
+    def valid(self):
+        """ check this proxy is valid """
+        return self._svc.call(0, 'valid_proxy', (self._id,), None)
+
     def get_service(self):
         return self._svc
 
     def on_close(self):
         for func in self._closes.keys():
             try:
-                func(self)
+                spawn(func, self)
             except StandardError:
                 log_except()
 
     def sub_close(self, func):
-        if func not in self._closes:
-            self._closes[func] = None
+        self._closes[func] = None
 
     def unsub_close(self, func):
-        if func in self._closes:
-            self._closes.pop(func)
+        self._closes.pop(func, None)
 
     @classmethod
     def new_by_local(cls, export, addr):
     def get_owner(self):
         if self.is_local:
             return self.export
-        return RpcProxy(self._id, addr=self._addr, svc=self._svc)
+        p = RpcProxy(self._id, addr=self._addr, svc=self._svc)
+        self._svc.reg_proxy(id(p), p)
 
 
 def map_items(proxys, attr, *args, **kw):