Issues

Issue #11 resolved

Creating a pipe and passing to more than one process fails

Dustin Oprea
created an issue

Code:

import gipc

def _worker(r):
    pass

if __name__ == '__main__':
    with gipc.pipe() as (r, w):
        p1 = gipc.start_process(_worker, args=(r,))
        p2 = gipc.start_process(_worker, args=(r,))

        print("Joining.")

        p1.join()
        p2.join()

Error:

Traceback (most recent call last):
  File "./multi.py", line 11, in <module>
    p2 = gipc.start_process(_worker, args=(r,))
  File "/Users/dustin/development/python/test/gipc_multiple/lib/python2.7/site-packages/gipc/gipc.py", line 260, in start_process
    h.close()
  File "/Users/dustin/development/python/test/gipc_multiple/lib/python2.7/site-packages/gipc/gipc.py", line 513, in close
    self._validate()
  File "/Users/dustin/development/python/test/gipc_multiple/lib/python2.7/site-packages/gipc/gipc.py", line 542, in _validate
    "GIPCHandle has been closed before.")
gipc.gipc.GIPCClosed: GIPCHandle has been closed before.

There is no error if I don't pass the r pipe to the second process.

Comments (10)

  1. Jan-Philip Gehrcke repo owner

    That is expected behavior: right after the 'handle' is passed to the first child process, it becomes closed in the parent. Obviously, any subsequent operation on this handle in the parent fails with GIPCClosed.

    This 'auto close' concept makes transferring handles between processes almost foolproof. It is something that is also done by other libraries, you most likely find it in multiprocessing's Connection or Pipe objects under the hood.

    What are you intending to achieve by providing the same file descriptor to multiple processes? I appreciate that sometimes this might be useful. However, please be aware of that such an architecture requires an additional synchronization mechanism for synchronizing reads/writes on the same file descriptor from multiple processes. gipc does not provide such synchronization out of the box, so if it did not throw GIPCClosed in your test case, your code would become non-deterministic, i.e. subject to race conditions.

  2. Dustin Oprea reporter

    I guess that I just didn't understand why it wasn't working. I did the same thing with multiprocessing (also adding a semaphore), and produced a reliable, "poor man's" queue. I'm assuming the race-condition that you're referring to is the one that would happen without that semaphore above? I don't that would be a problem since your docs mention that you're locking on the pipe: "Any read/write operation on a pipe is gevent.lock.Semaphore-protected and therefore greenlet-/threadsafe and atomic."

    The docs don't say anything about it auto-closing. You might want to add a flag, or at least document it since it diverges from MP (and gevent, possibly).

  3. Jan-Philip Gehrcke repo owner

    I'm assuming the race-condition that you're referring to is the one that would happen without that semaphore above?

    Right.

    I don't that would be a problem since your docs mention that you're locking on the pipe: "Any read/write operation on a pipe is gevent.lock.Semaphore-protected and therefore greenlet-/threadsafe and atomic."

    The gevent.lock.Semaphore is stored on the heap of the current process. It can only provide synchronization for greenlets and threads within that process, so it is not capable of synchronizing processes. For this one needs an OS semaphore (a POSIX semaphore for example) or, in more general terms, memory shared among processes.

    The docs don't say anything about it auto-closing.

    There you are right. I should clarify this in the docs. Currently, the text is not explicit enough: ", gipc takes care of closing dispensable pipe handles (file descriptors) in the parent as well as in the child after forking".

    You might want to add a flag, or at least document it since it diverges from MP (and gevent, possibly).

    I agree, multiprocessing.Pipe is not auto-closing. multiprocessing.Pipe has a somewhat complex implementation. gipc's pipe, in contrast, is simpler: it is a tight wrapper around the pipe() system call, and implements so to say the canonical pipe example, in which a pipe is created before fork(), i.e. fork() duplicates both file descriptors. In most cases, two of those are dispensable/unnecessary and should be closed in order to make the program not leak file descriptors. On the other hand, developers using multiprocessing.Pipe should be advised to explicitly close() handles after providing them to a child process if they do not need it anymore in the parent (I bet many do not do this).

    What I want to say is that I like the simple and safe approach gipc is taking here. I will document it more clearly. In view of the feature you are missing here, I will also think about ways to enable users of gipc to achieve what you want to do. Is it correct that you basically wanted to implement a queue? Maybe I will just add a queue implementation to gipc.

  4. Dustin Oprea reporter

    I had written one, but, since I couldn't pass the pipe to more than one process, I just had to go with multiprocessing pipes and semaphores.

    Do you know why gevent.lock.Semaphore hosted at the process level? The fundamental difference between a semaphore and a lock is one is at the process level and one is at the thread level. Why didn't they introduce coroutine semantics to the "lock" instead of the "semaphore"?

    Dustin

  5. Log in to comment