1. Jan-Philip Gehrcke
  2. gipc

Issues

Issue #8 invalid

Pipe buffer size and broken pipes

Ivan Smirnov
created an issue

It looks like on most Unix kernels, the 64K is the limit, so when exceeding 65536 bytes, we get

    return _write(fd, buf)
OSError: [Errno 32] Broken pipe
<Greenlet at 0x7f6652e0deb0: <function <lambda> at 0x7f664e9570c8>> failed with OSError

Also, it looks like it's only guaranteed to be atomic/thread-safe up to 4K, though I'm not sure about that in general case.

See also:

64K (or is it 4K?) is pretty limiting, I wonder if it's possible to manually split the bindata into chunks of configurable size (defaulting to 64K on Unix and maybe some other value on Windows) and pass them through in a smart way using gevent's cooperative blocking but in a way that both read/write operations remain atomic? OS pipes are still fast enough to allow for this without a huge performance hit, imo.

Comments (12)

  1. Ivan Smirnov reporter

    Sure, here you go. I was initially was writing this to benchmark the speed of raw byte transfer mode, but with the 65K limit benchmarking doesn't really make sense.

    import gevent
    import time
    import gipc
    
    
    def bench(n, value):
    
        with gipc.pipe() as (r, w):
            try:
                t0 = time.time()
                p = gipc.start_process(target=reader, args=(r,))
                writers = gevent.spawn(lambda: [gevent.spawn(lambda:
                    w.put(value)) for _ in xrange(n)])
                writers.join()
                return (time.time() - t0) * 1000.
            finally:
                p.terminate()
                p.join()
    
    def reader(handle):
        with handle:
            while True:
                handle.get()
    
    def main():
        n = 1
        k = 65500  # try 65600
        t = bench(n, 'a' * k)
        print 'Transferring %.2fKB in %d threads...' % (k / 1024., n)
    
    if __name__ == "__main__":
        main()
    

    Output for k = 65500: (it's not 65536 exactly because of the message length field and the pickling overhead)

    ╰─>$ python gipc-pipe-buffer.py
    Transferring 63.96KB in 1 threads...
    

    Output for k = 65600:

    ╰─>$ python gipc-pipe-buffer.py
    Traceback (most recent call last):
      File "/home/smirnov/anaconda/lib/python2.7/site-packages/gevent-1.0dev-py2.7-linux-x86_64.egg/gevent/greenlet.py", line 327, in run
        result = self._run(*self.args, **self.kwargs)
      File "gipc-pipe-buffer.py", line 13, in <lambda>
        w.put(value)) for _ in xrange(n)])
      File "/home/smirnov/anaconda/lib/python2.7/site-packages/gipc/gipc.py", line 679, in put
        self._write(struct.pack("!i", len(bindata)) + bindata)
      File "/home/smirnov/anaconda/lib/python2.7/site-packages/gipc/gipc.py", line 659, in _write
        diff = len(bindata) - _WRITE_NB(self._fd, bindata)
      File "/home/smirnov/anaconda/lib/python2.7/site-packages/gevent-1.0dev-py2.7-linux-x86_64.egg/gevent/os.py", line 69, in nb_write
        return _write(fd, buf)
    OSError: [Errno 32] Broken pipe
    <Greenlet at 0x7fd9f95e3730: <function <lambda> at 0x7e9aa0>> failed with OSError
    
    Transferring 64.00KB in 1 threads...
    
  2. Jan-Philip Gehrcke repo owner

    I have several doubts about your code.

    First of all, if you want to make sure that your data does not end up corrupted, you should never write from multiple instances (threads, greenlets, whatever) to the same write end of a pipe without further synchronization/locking mechanism. I would not trust the '4K atomicity' concept that you were mentioning above. There is no "atomicity" guarantee for single reads or writes, the POSIX standard does not define this. There is a guarantee that all data that is written to the write end arrives at the read end in the order as written, nothing more, nothing less.

    So, for the purpose of this benchmark, please forget about writing to the same write end from multiple greenlets. For simplicity, we are interested in having one writer greenlet in the parent and read from the pipe in the child. Also, when measuring something like throughput you for sure do not only want to send a few kilobyte through the system. Likewise, I changed your code and removed n (number of write greenlets, we keep that equal to 1 for the purpose of this benchmark). I introduced N, the number of messages to transmit.

    I don't get why you needed the nested call to gevent.spawn at all. That can produce bogus behavior, I removed it.

    Also, you should have some synchronization mechanism between parent and child. No need to brutally kick the child with SIGTERM. I have modified your code in a way that the child stops reading upon a certain message. The child process then terminates by itself, a simple join() in the parent then is enough.

    You said you wanted to "benchmark the speed of raw byte transfer mode", so you better get some meaningful number in the end (something like bytes per second) :-)

    Your code with these modifications:

    import gevent
    import time
    import gipc
    import sys
    
    def bench(N, value):
        with gipc.pipe() as (r, w):
            t0 = time.time()
            p = gipc.start_process(target=reader, args=(r,))
            writer = gevent.spawn(lambda w: [w.put(value) for _ in xrange(N)], w)
            writer.join()
            delta = time.time() - t0
            w.put("stop")
            p.join()
            return delta
    
    
    def reader(handle):
        with handle:
            while handle.get() != "stop":
                pass
    
    
    def main():
        N = 20000
        k = int(sys.argv[1])
        delta = bench(N, 'a' * k)
        mbytes_per_sec = N*k/10**6/delta
        print "MBytes/s: %s" % mbytes_per_sec
    
    
    if __name__ == "__main__":
        main()
    

    Executing it with different message sizes it becomes obvious that there is a sweet spot for the message size:

    $ python test.py 64000
    MBytes/s: 896.7830354
     $ python test.py 65000
    MBytes/s: 1367.4782361
    $ python test.py 66000
    MBytes/s: 896.822579174
    
  3. Ivan Smirnov reporter

    Thanks Jan-Philip. I had problems with early pipes closing before, but that was a different kind of error; also the magical 64K number doesn't just pop out of nowhere?

    Anyways, curious what's the actual problem here. Will test it out a bit later on Mac OS machine.

  4. Jan-Philip Gehrcke repo owner

    For learning exercise it might be good for you to deeply debug what happened in your code. For writing solid code, however, it is essential to know how pipes behave and to write within the confines of what POSIX guarantees. From that point of view, it's not really worth the effort to dig into the details of what happens in your code.

    Just remember that for such kind of communication you need to establish some kind of protocol and order. In the moment you have established this (like the 'stop' message above and the corresponding knowledge that the child will be gone shortly after that), you usually find yourself writing solid code already.

  5. Ivan Smirnov reporter

    Thanks a lot, Jan-Philip, lesson learned, you can include this example in your "bad practices" section of the docs :)

    Actually, one thing I don't fully understand to this moment still is why it doesn't work when n=1? One greenlet spawns one greenlet that does one write and the pipe breaks down at 64K?

  6. Jan-Philip Gehrcke repo owner

    Since you insisted on getting a detailed explanation, here it is :-)

    Generally spoken, your code contains a race condition. In one test (I call it 65K test from here on), this race condition does not have an obvious effect, in the other test (66K) it has one.

    For your 65K test, the write greenlets finish writing to the pipe before the read end of the pipe becomes closed (in the child).

    For the 66K test, the child process is terminated (and the pipe read end closed) before the write greenlets finish writing to the data to the pipe. A write syscall applied to a pipe whose read end is already closed causes a SIGPIPE signal to be generated for the calling process. If the calling process is ignoring this signal, then write fails with the error EPIPE (http://man7.org/linux/man-pages/man7/pipe.7.html). Python's OSError: [Errno 32] Broken pipe is exactly this error, as you can see via import errno; print errno.EPIPE.

    Regarding your code's race condition, racer A are the greenlets you spawn for writing to the pipe. Racer B are a "no-op"-join() (I'll explain this later), a subsequent SIGTER transmission to the child, the actual termination of the child, and -- with that -- the closing of the child's pipe end (the read end).

    On modern Linuxes the pipe capacity is 65536 bytes. In any case, sending N bytes of data requires at least ceil(N/65536) calls to the write syscall (This is what happens in the gipc/gevent internals). In your example, this means that the 65K case requires only one single call to write. The 66K case already needs two write calls. This explains why racer A is slower in the 66K case than it is in the 65K case. Racer B is equally fast in both cases.

    I guess the main problem in your code is that you thought writers.join() waits until all the writing is done. However, you oversaw that you've spawned greenlets within greenlets and the outermost greenlet immediately returns if you do not explicitly wait for the inner greenlets in the outer one. That's the same as with processes. Repro:

    from gevent import sleep
    from gevent import spawn
    import time
    
    t0 = time.time()
    glets = spawn(lambda: spawn(lambda: sleep(1)))
    glets.join()
    d = time.time() - t0
    print "Time passed: %.3f" % d
    

    Although the innermost greenlet would not return earlier than after a second, the outer greenlet's join() returns practically immediately. Let's look at the code again:

        with gipc.pipe() as (r, w):
            try:
                t0 = time.time()
                p = gipc.start_process(target=reader, args=(r,))
                writers = gevent.spawn(lambda: [gevent.spawn(lambda:
                    w.put(value)) for _ in xrange(n)])
                writers.join()
                return (time.time() - t0) * 1000.
            finally:
                p.terminate()
                p.join()
    

    writers.join() returns earlier than you thought. Hence, return is called earlier than expected. return triggers the finally clause, which itself makes SIGTER sent to the child process. Abnormal termination of the process immediately closes all related file descriptors.

    I guess due to the overhead in gevent and gipc, the timing difference between the 65K and 66K cases really is significant. Still, it is kind of 'luck' that the timing of join()-SIGTER-termination seems to be right inbetween.

    In order to prove that this race condition is the issue, you can add gevent.sleep(1) to the finally clause (guarantees that those writes are done before SIGTER is sent) or, a more realistic solution:

    writers = [gevent.spawn(lambda: w.put(value)) for _ in xrange(n)]
    gevent.joinall(writers)
    

    This really waits until all write greenlets are finished.

  7. Log in to comment