Pypy 5.7: eventlet + psycopg2cffi + celery RuntimeError: Second simultaneous read

Issue #2525 new
André Cimander
created an issue

Hey,

first of all, thanks for working on this wonderful project!

We are considering switching our worker infrastructure to PyPy, while doing initial work I stumbled over this socket problem. We are test-running a celery worker with 60 threads (since we are I/O bound by external apis).

It's working fine with CPython + eventlet + psycopg2/psycopg2cffi.

The database connection is made to a local pgbouncer instance(3500 concurrent connection limit, 60 connections to the DB with enough reserve pool connections) on a unix-domain socket, maybe that's a problem with PyPy?

If it's indeed a bug and you need more information, we are eager to help. If it's a stupid mistake from our side: sorry for wasting your time.

OS:

  • Ubuntu 16.04 | Linux worker4 4.4.0-66-generic #87-Ubuntu SMP Fri Mar 3 15:29:05 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
  • PyPy 5.7.0 with GCC 6.2.0 20160901 (we used the Linux x86-64 binary (64bit, tar.bz2 built on Ubuntu 12.04 - 14.04) download, maybe this is the culprit?)

Relevant Packages:

  • celery==3.1.25
  • eventlet==0.20.1
  • psycopg2cffi==2.7.5
  • django==1.9.12

Stacktrace:

RuntimeError: Second simultaneous read on fileno 5 detected.  Unless you really know what you're doing, make sure that only one greenthread can read any particular socket.  Consider using a pools.Pool. If you do know what you're doing and want to disable this error, call eventlet.debug.hub_prevent_multiple_readers(False) - MY THREAD=<bound method greenlet.switch of <greenlet.greenlet object at 0x000000000bcddb80>>; THAT THREAD=FdListener('read', 5, <bound method greenlet.switch of <greenlet.greenlet object at 0x0000000006cc6410>>, <bound method greenlet.throw of <greenlet.greenlet object at 0x0000000006cc6410>>)
  File "celery/app/trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "celery/app/trace.py", line 438, in __protected_call__
    return self.run(*args, **kwargs)
  File "influencerdb/utils/tasks.py", line 16, in run
    self.action(*args, **kwargs)
  File "influencerdb_insights_instagram/tasks.py", line 44, in action
    channel = get_instance_or_false(InfluencerChannelInstagram, influencer_channel_id)
  File "globalcore/utils/model.py", line 170, in get_instance_or_false
    obj = qset.get(**{model_id_key: model_id_or_object})
  File "django/db/models/query.py", line 381, in get
    num = len(clone)
  File "django/db/models/query.py", line 240, in __len__
    self._fetch_all()
  File "django/db/models/query.py", line 1074, in _fetch_all
    self._result_cache = list(self.iterator())
  File "django/db/models/query.py", line 52, in __iter__
    results = compiler.execute_sql()
  File "django/db/models/sql/compiler.py", line 846, in execute_sql
    cursor = self.connection.cursor()
  File "django/db/backends/base/base.py", line 233, in cursor
    cursor = self.make_cursor(self._cursor())
  File "django/db/backends/base/base.py", line 204, in _cursor
    self.ensure_connection()
  File "django/db/backends/base/base.py", line 199, in ensure_connection
    self.connect()
  File "django/db/backends/base/base.py", line 171, in connect
    self.connection = self.get_new_connection(conn_params)
  File "django/db/backends/postgresql/base.py", line 186, in get_new_connection
    self.isolation_level = connection.isolation_level
  File "psycopg2cffi/_impl/connection.py", line 43, in check_closed_
    return func(self, *args, **kwargs)
  File "psycopg2cffi/_impl/connection.py", line 253, in isolation_level
    name = self._get_guc('default_transaction_isolation')
  File "psycopg2cffi/_impl/connection.py", line 217, in _get_guc
    pgres = self._execute_green(query)
  File "psycopg2cffi/_impl/connection.py", line 689, in _execute_green
    _green_callback(self)
  File "eventlet/support/psycopg2_patcher.py", line 50, in eventlet_wait_callback
    trampoline(conn.fileno(), read=True)
  File "eventlet/hubs/__init__.py", line 158, in trampoline
    listener = hub.add(hub.READ, fileno, current.switch, current.throw, mark_as_closed)
  File "eventlet/hubs/epolls.py", line 49, in add
    listener = BaseHub.add(self, evtype, fileno, cb, tb, mac)
  File "eventlet/hubs/hub.py", line 177, in add
    evtype, fileno, evtype, cb, bucket[fileno]))

Comments (1)

  1. Omer Katz

    For anyone to debug this we'd need a testcase. If you can, please provide a minimal testcase that reproduces the problem.

    The problem may lay with psycopg2cffi trying to read from the same socket twice, thus corrupting data but it is unlikely. I think that your real problem is that you are reusing the same connection instead of using a connection pool. See https://github.com/jneight/django-db-geventpool for accessing a database with multiple greenlets.

    I do wonder why this does not reproduce easily with psycopg2. Can you try to use the original psycopg2 module on PyPy and see if this can be reproduced?

  2. Log in to comment