Commits

Mike Bayer committed 2732bb7

illustrate orm2010.py using gevent

Comments (0)

Files changed (4)

+syntax:regexp
+^bin/
+^lib/
+^include/
+.*\.pyc$
+\.Python

green_sqla/__init__.py

Empty file added.

green_sqla/psyco_gevent.py

+"""A wait callback to allow psycopg2 cooperation with gevent.
+
+Use `make_psycopg_green()` to enable gevent support in Psycopg.
+"""
+
+# Copyright (C) 2010 Daniele Varrazzo <daniele.varrazzo@gmail.com>
+# and licensed under the MIT license:
+# 
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+# 
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+# 
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+import psycopg2
+from psycopg2 import extensions
+
+from gevent.socket import wait_read, wait_write
+
+def make_psycopg_green():
+    """Configure Psycopg to be used with gevent in non-blocking way."""
+    if not hasattr(extensions, 'set_wait_callback'):
+        raise ImportError(
+            "support for coroutines not available in this Psycopg version (%s)"
+            % psycopg2.__version__)
+
+    extensions.set_wait_callback(gevent_wait_callback)
+
+def gevent_wait_callback(conn, timeout=None):
+    """A wait callback useful to allow gevent to work with Psycopg."""
+    while 1:
+        state = conn.poll()
+        if state == extensions.POLL_OK:
+            break
+        elif state == extensions.POLL_READ:
+            wait_read(conn.fileno(), timeout=timeout)
+        elif state == extensions.POLL_WRITE:
+            wait_write(conn.fileno(), timeout=timeout)
+        else:
+            raise psycopg2.OperationalError(
+                "Bad result from poll: %r" % state)

green_sqla/stress_sqla.py

+from sqlalchemy import Column, Integer, create_engine, ForeignKey, \
+    String, Numeric
+
+from sqlalchemy.orm import Session, relationship
+
+from sqlalchemy.ext.declarative import declarative_base
+import random
+from decimal import Decimal
+
+Base = declarative_base()
+
+class Employee(Base):
+    __tablename__ = 'employee'
+
+    id = Column(Integer, primary_key=True)
+    name = Column(String(100), nullable=False)
+    type = Column(String(50), nullable=False)
+
+    __mapper_args__ = {'polymorphic_on':type}
+
+class Boss(Employee):
+    __tablename__ = 'boss'
+
+    id = Column(Integer, ForeignKey('employee.id'), primary_key=True)
+    golf_average = Column(Numeric)
+
+    __mapper_args__ = {'polymorphic_identity':'boss'}
+
+class Grunt(Employee):
+    __tablename__ = 'grunt'
+
+    id = Column(Integer, ForeignKey('employee.id'), primary_key=True)
+    savings = Column(Numeric)
+
+    employer_id = Column(Integer, ForeignKey('boss.id'))
+
+    employer = relationship("Boss", backref="employees", 
+                                primaryjoin=Boss.id==employer_id)
+
+    __mapper_args__ = {'polymorphic_identity':'grunt'}
+
+def runit(engine):
+    sess = Session(engine)
+    # create 1000 Boss objects.
+    bosses = [
+        Boss(
+            name="Boss %d" % i, 
+            golf_average=Decimal(random.randint(40, 150))
+        )
+        for i in xrange(1000)
+    ]
+
+    sess.add_all(bosses)
+
+    # create 10000 Grunt objects.
+    grunts = [
+        Grunt(
+            name="Grunt %d" % i,
+            savings=Decimal(random.randint(5000000, 15000000) / 100)
+        )
+        for i in xrange(10000)
+    ]
+
+    # Assign each Grunt a Boss.  Look them up in the DB
+    # to simulate a little bit of two-way activity with the 
+    # DB while we populate.  Autoflush occurs on each query.
+    while grunts:
+        boss = sess.query(Boss).\
+                    filter_by(name="Boss %d" % (101 - len(grunts) / 100)).\
+                    first()
+        for grunt in grunts[0:100]:
+            grunt.employer = boss
+
+        grunts = grunts[100:]
+
+    sess.flush()
+
+    report = []
+
+    # load all the Grunts, print a report with their name, stats,
+    # and their bosses' stats.
+    for grunt in sess.query(Grunt):
+        report.append((
+                        grunt.name, 
+                        grunt.savings, 
+                        grunt.employer.name, 
+                        grunt.employer.golf_average
+                    ))
+
+def timeit(fn):
+    def time_fn(*arg, **kw):
+        import time
+        now = time.time()
+        fn(*arg, **kw)
+        total = time.time() - now
+        print "total time for %s: %d" % (fn.__name__, total)
+    return time_fn
+
+@timeit
+def run_with_gevent():
+    from psyco_gevent import make_psycopg_green
+    make_psycopg_green()
+
+    from sqlalchemy.pool import NullPool
+    engine = create_engine('postgresql+psycopg2://scott:tiger@localhost/test', 
+                                echo=True, 
+                                poolclass=NullPool
+                                )
+    Base.metadata.drop_all(engine)
+    Base.metadata.create_all(engine)
+
+    import gevent
+    threads = [gevent.spawn(runit, engine) for i in xrange(5)]
+    for t in threads:
+        t.join()
+
+@timeit
+def run_with_threads():
+    from sqlalchemy.pool import NullPool
+    engine = create_engine('postgresql+psycopg2://scott:tiger@localhost/test', 
+                                echo=True, 
+                                poolclass=NullPool
+                                )
+    Base.metadata.drop_all(engine)
+    Base.metadata.create_all(engine)
+
+    import threading
+    threads = [threading.Thread(target=runit, args=(engine, )) for i in xrange(5)]
+    for t in threads:
+        t.start()
+    for t in threads:
+        t.join()
+
+run_with_gevent()
+#run_with_threads()