Daniele Varrazzo avatar Daniele Varrazzo committed f190987

Added first bunch of files.

Comments (0)

Files changed (6)

+Some testing material to work with Psycopg2 with coroutine support.
+
+

eventlet/nonblock_eventlet.py

+import eventlet
+eventlet.monkey_patch()
+
+import psyco_eventlet
+psyco_eventlet.make_psycopg_green()
+
+import urllib2  # green
+
+import psycopg2
+
+import logging
+logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
+logger = logging.getLogger()
+logger.info("testing psycopg2 with eventlet")
+
+conn = psycopg2.connect("dbname=postgres")
+
+def download(num, secs):
+    url = "http://localhost:8000/%d/" % secs
+    for i in range(num):
+        logger.info("download %d start", i)
+        data = urllib2.urlopen(url).read()
+        logger.info("download %d end", i)
+
+def fetch(num, secs):
+    cur = conn.cursor()
+    for i in range(num):
+        logger.info("fetch %d start", i)
+        cur.execute("select pg_sleep(%s)", (secs,))
+        logger.info("fetch %d end", i)
+
+logger.info("making jobs")
+pool = eventlet.GreenPool()
+pool.spawn(download, 2, 3),
+pool.spawn(fetch, 3, 2),
+
+logger.info("join begin")
+pool.waitall()
+logger.info("join end")
+
+

eventlet/psyco_eventlet.py

+"""A wait callback to allow psycopg2 cooperation with eventlet.
+"""
+# Copyright (C) 2010 Daniele Varrazzo <daniele.varrazzo@gmail.com>
+
+import psycopg2
+from psycopg2 import extensions
+
+from eventlet.hubs import trampoline
+
+def make_psycopg_green():
+    if not hasattr(extensions, 'set_wait_callback'):
+        raise ImportError(
+            "Support for coroutines is available only from Psycopg 2.2.0")
+
+    extensions.set_wait_callback(eventlet_wait_callback)
+
+def eventlet_wait_callback(conn, timeout=-1):
+    while 1:
+        state = conn.poll()
+        if state == extensions.POLL_OK:
+            break
+        elif state == extensions.POLL_READ:
+            trampoline(conn.fileno(), read=True)
+        elif state == extensions.POLL_WRITE:
+            trampoline(conn.fileno(), write=True)
+        else:
+            raise psycopg2.OperationalError(
+                "Bad result from poll: %r" % state)

gevent/nonblock_gevent.py

+import gevent
+import gevent.monkey
+gevent.monkey.patch_all()
+
+import psyco_gevent
+psyco_gevent.make_psycopg_green()
+
+import urllib2  # green
+
+import psycopg2
+
+import logging
+logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
+logger = logging.getLogger()
+logger.info("testing psycopg2 with gevent")
+
+conn = psycopg2.connect("dbname=postgres")
+
+def download(num, secs):
+    url = "http://localhost:8000/%d/" % secs
+    for i in range(num):
+        logger.info("download %d start", i)
+        data = urllib2.urlopen(url).read()
+        logger.info("download %d end", i)
+
+def fetch(num, secs):
+    cur = conn.cursor()
+    for i in range(num):
+        logger.info("fetch %d start", i)
+        cur.execute("select pg_sleep(%s)", (secs,))
+        logger.info("fetch %d end", i)
+
+logger.info("making jobs")
+jobs = [
+    gevent.spawn(download, 2, 3),
+    gevent.spawn(fetch, 3, 2),
+    ]
+
+logger.info("join begin")
+gevent.joinall(jobs)
+logger.info("join end")
+

gevent/psyco_gevent.py

+"""A wait callback to allow psycopg2 cooperation with gevent.
+"""
+# Copyright (C) 2010 Daniele Varrazzo <daniele.varrazzo@gmail.com>
+
+import psycopg2
+from psycopg2 import extensions
+
+from gevent.hub import getcurrent, get_hub
+from gevent.core import read_event, write_event, EV_TIMEOUT
+from gevent.hub import sleep as gevent_sleep
+
+def make_psycopg_green():
+    if not hasattr(extensions, 'set_wait_callback'):
+        raise ImportError(
+            "Support for coroutines is available only from Psycopg 2.2.0")
+
+    extensions.set_wait_callback(gevent_wait_callback)
+
+class Timeout(Exception):
+    pass
+
+def gevent_wait_callback(conn, timeout=-1):
+    while 1:
+        state = conn.poll()
+        if state == extensions.POLL_OK:
+            break
+        elif state == extensions.POLL_READ:
+            wait_read(conn.fileno(), timeout=timeout)
+        elif state == extensions.POLL_WRITE:
+            wait_write(conn.fileno(), timeout=timeout)
+        else:
+            raise psycopg2.OperationalError(
+                "Bad result from poll: %r" % state)
+
+def _wait_helper(ev, evtype):
+    current = ev.arg
+    if evtype & EV_TIMEOUT:
+        current.throw(Timeout())
+    else:
+        current.switch(ev)
+
+def wait_read(fileno, timeout=-1):
+    evt = read_event(fileno, _wait_helper, timeout, getcurrent())
+    try:
+        switch_result = get_hub().switch()
+        assert evt is switch_result, 'Invalid switch into wait_read(): %r' % (switch_result, )
+    finally:
+        evt.cancel()
+
+
+def wait_write(fileno, timeout=-1):
+    evt = write_event(fileno, _wait_helper, timeout, getcurrent())
+    try:
+        switch_result = get_hub().switch()
+        assert evt is switch_result, 'Invalid switch into wait_write(): %r' % (switch_result, )
+    finally:
+        evt.cancel()

tools/wait_server.py

+import time
+from wsgiref.util import setup_testing_defaults
+from wsgiref.simple_server import make_server
+
+def wait_app(environ, start_response):
+    """An application serving blocking pages."""
+    status = '200 OK'
+    headers = [('Content-type', 'text/plain')]
+
+    start_response(status, headers)
+    try:
+        secs = int(environ['PATH_INFO'].replace('/', ''))
+    except:
+        secs = 0
+
+    time.sleep(secs)
+    return [ str(secs) ]
+
+httpd = make_server('', 8000, wait_app)
+print "Serving on port 8000..."
+httpd.serve_forever()
+
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.