1. Jan-Philip Gehrcke
  2. gipc

Commits

Jan-Philip Gehrcke  committed 29902b0

Adjust child handle pre/post processing in parent/child to duplex handles. Add duplex IPC test.

  • Participants
  • Parent commits bf11d7a
  • Branches duplex

Comments (0)

Files changed (2)

File gipc/gipc.py

View file
  • Ignore whitespace
         ))
 
 
+def _filter_handles(l):
+    handles = []
+    for o in l:
+        if isinstance(o, _GIPCHandle):
+            handles.append(o)
+        elif isinstance(o, _GIPCDuplexHandle):
+            handles.append(o._writer)
+            handles.append(o._reader)
+    return handles
+
+
 def start_process(target, args=(), kwargs={}, daemon=None, name=None):
     """Start child process and execute function ``target(*args, **kwargs)``.
     Any existing instance of :class:`gipc._GIPCHandle` or
     if not isinstance(kwargs, dict):
         raise TypeError, '`kwargs` must be dictionary.'
     log.debug("Invoke target `%s` in child process." % target)
-    allargs = itertools.chain(args, kwargs.values())
-    childhandles = [a for a in allargs if isinstance(a, _GIPCHandle)]
+    childhandles = _filter_handles(itertools.chain(args, kwargs.values()))
     if WINDOWS:
         for h in childhandles:
             h._pre_createprocess_windows()
     p.start()
     p.start = lambda *a, **b: sys.stderr.write(
         "gipc WARNING: Redundant call to %s.start()\n" % p)
-    if WINDOWS:
-        for h in childhandles:
-            h._post_createprocess_windows()
     # Close dispensable file handles in parent.
     for h in childhandles:
         log.debug("Invalidate %s in parent." % h)
+        if WINDOWS:
+            h._post_createprocess_windows()
         h.close()
     return p
 
     state is reset before running the user-given function.
     """
     log.debug("_child start. target: `%s`" % target)
-    allargs = itertools.chain(args, kwargs.values())
-    childhandles = [a for a in allargs if isinstance(a, _GIPCHandle)]
+    childhandles = _filter_handles(itertools.chain(args, kwargs.values()))
     if not WINDOWS:
         # `gevent.reinit` calls `libev.ev_loop_fork()`, which reinitialises
         # the kernel state for backends that have one. Must be called in the
                 log.debug("Invalidate %s in child." % h)
                 h._set_legit_process()
                 # At duplication time the handle might have been locked.
+                # Unlock.
                 h._lock.counter = 1
                 h.close()
     else:

File test/test_gipc.py

View file
  • Ignore whitespace
         self._greenlets_to_be_killed = []
 
     def teardown(self):
-        # Make sure to not leak file descriptors.
+        # Make sure to not leak file descriptors during TestComm tests.
         if get_all_handles():
             for h in get_all_handles():
                 try:
         self.wh.close()
         self.rh.close()
 
+    def test_write_closewrite_read(self):
+        self.wh.put("a")
+        self.wh.close()
+        assert self.rh.get() == "a"
+        with raises(EOFError):
+            self.rh.get()
+        self.rh.close()
+
 
 class TestProcess():
     def test_is_alive_true(self):
             p.join()
 
 
-def duplchild_simple_echo(self, h):
+def duplchild_simple_echo(h):
     h.put(h.get())
 
 
     r = h.resolver
 
 
-def compl_child_test_getaddrinfo_mp():
+def complchild_test_getaddrinfo_mp():
     return