Commits

Mike Bayer committed d63b817

Added a new method :meth:`.Engine.execution_options`
to :class:`.Engine`. This method works similarly to
:class:`.Connection.execution_options` in that it creates
a copy of the parent object which will refer to the new
set of options. The method can be used to build
sharding schemes where each engine shares the same
underlying pool of connections. The method
has been tested against the horizontal shard
recipe in the ORM as well.

Comments (0)

Files changed (4)

doc/build/changelog/changelog_08.rst

     :released:
 
     .. change::
+        :tags: sql, feature
+
+      Added a new method :meth:`.Engine.execution_options`
+      to :class:`.Engine`.  This method works similarly to
+      :class:`.Connection.execution_options` in that it creates
+      a copy of the parent object which will refer to the new
+      set of options.   The method can be used to build
+      sharding schemes where each engine shares the same
+      underlying pool of connections.   The method
+      has been tested against the horizontal shard
+      recipe in the ORM as well.
+
+      .. seealso::
+
+          :meth:`.Engine.execution_options`
+
+    .. change::
         :tags: sql, orm, bug
         :tickets: 2595
 

lib/sqlalchemy/engine/base.py

         the same underlying DBAPI connection, but also defines the given
         execution options which will take effect for a call to
         :meth:`execute`. As the new :class:`.Connection` references the same
-        underlying resource, it is probably best to ensure that the copies
+        underlying resource, it's usually a good idea to ensure that the copies
         would be discarded immediately, which is implicit if used as in::
 
             result = connection.execution_options(stream_results=True).\\
                                 execute(stmt)
 
-        :meth:`.Connection.execution_options` accepts all options as those
-        accepted by :meth:`.Executable.execution_options`.  Additionally,
-        it includes options that are applicable only to
-        :class:`.Connection`.
+        Note that any key/value can be passed to :meth:`.Connection.execution_options`,
+        and it will be stored in the ``_execution_options`` dictionary of
+        the :class:`.Connnection`.   It is suitable for usage by end-user
+        schemes to communicate with event listeners, for example.
+
+        The keywords that are currently recognized by SQLAlchemy itself
+        include all those listed under :meth:`.Executable.execution_options`,
+        as well as others that are specific to :class:`.Connection`.
 
         :param autocommit: Available on: Connection, statement.
           When True, a COMMIT will be invoked after execution
                 "Cannot start a two phase transaction when a transaction "
                 "is already in progress.")
         if xid is None:
-            xid = self.engine.dialect.create_xid();
+            xid = self.engine.dialect.create_xid()
         self.__transaction = TwoPhaseTransaction(self, xid)
         return self.__transaction
 
         if proxy:
             interfaces.ConnectionProxy._adapt_listener(self, proxy)
         if execution_options:
-            if 'isolation_level' in execution_options:
-                raise exc.ArgumentError(
-                    "'isolation_level' execution option may "
-                    "only be specified on Connection.execution_options(). "
-                    "To set engine-wide isolation level, "
-                    "use the isolation_level argument to create_engine()."
-                )
             self.update_execution_options(**execution_options)
 
     def update_execution_options(self, **opt):
         can be sent via the ``execution_options`` parameter
         to :func:`.create_engine`.
 
-        See :meth:`.Connection.execution_options` for more
-        details on execution options.
+        .. seealso::
+
+            :meth:`.Connection.execution_options`
+
+            :meth:`.Engine.execution_options`
 
         """
+        if 'isolation_level' in opt:
+            raise exc.ArgumentError(
+                "'isolation_level' execution option may "
+                "only be specified on Connection.execution_options(). "
+                "To set engine-wide isolation level, "
+                "use the isolation_level argument to create_engine()."
+            )
         self._execution_options = \
                 self._execution_options.union(opt)
 
+    def execution_options(self, **opt):
+        """Return a new :class:`.Engine` that will provide
+        :class:`.Connection` objects with the given execution options.
+
+        The returned :class:`.Engine` remains related to the original
+        :class:`.Engine` in that it shares the same connection pool and
+        other state:
+
+        * The :class:`.Pool` used by the new :class:`.Engine` is the
+          same instance.  The :meth:`.Engine.dispose` method will replace
+          the connection pool instance for the parent engine as well
+          as this one.
+        * Event listeners are "cascaded" - meaning, the new :class:`.Engine`
+          inherits the events of the parent, and new events can be associated
+          with the new :class:`.Engine` individually.
+        * The logging configuration and logging_name is copied from the parent
+          :class:`.Engine`.
+
+        The intent of the :meth:`.Engine.execution_options` method is
+        to implement "sharding" schemes where multiple :class:`.Engine`
+        objects refer to the same connection pool, but are differentiated
+        by options that would be consumed by a custom event::
+
+            primary_engine = create_engine("mysql://")
+            shard1 = primary_engine.execution_options(shard_id="shard1")
+            shard2 = primary_engine.execution_options(shard_id="shard2")
+
+        Above, the ``shard1`` engine serves as a factory for :class:`.Connection`
+        objects that will contain the execution option ``shard_id=shard1``,
+        and ``shard2`` will produce :class:`.Connection` objects that contain
+        the execution option ``shard_id=shard2``.
+
+        An event handler can consume the above execution option to perform
+        a schema switch or other operation, given a connection.  Below
+        we emit a MySQL ``use`` statement to switch databases, at the same
+        time keeping track of which database we've established using the
+        :attr:`.Connection.info` dictionary, which gives us a persistent
+        storage space that follows the DBAPI connection::
+
+            from sqlalchemy import event
+            from sqlalchemy.engine import Engine
+
+            shards = {"default": "base", shard_1": "db1", "shard_2": "db2"}
+
+            @event.listens_for(Engine, "before_cursor_execute")
+            def _switch_shard(conn, cursor, stmt, params, context, executemany):
+                shard_id = conn._execution_options.get('shard_id', "default")
+                current_shard = conn.info.get("current_shard", None)
+
+                if current_shard != shard_id:
+                    cursor.execute("use %%s" %% shards[shard_id])
+                    conn.info["current_shard"] = shard_id
+
+        .. seealso::
+
+            :meth:`.Connection.execution_options` - update execution options
+             on a :class:`.Connection` object.
+
+            :meth:`.Engine.update_execution_options` - update the execution
+             options for a given :class:.`Engine` in place.
+
+        """
+        return OptionEngine(self, opt)
+
     @property
     def name(self):
         """String name of the :class:`~sqlalchemy.engine.Dialect` in use by
         else:
             conn = connection
         if not schema:
-            schema =  self.dialect.default_schema_name
+            schema = self.dialect.default_schema_name
         try:
             return self.dialect.get_table_names(conn, schema)
         finally:
 
         return self.pool.unique_connection()
 
+class OptionEngine(Engine):
+    def __init__(self, proxied, execution_options):
+        self._proxied = proxied
+        self.url = proxied.url
+        self.dialect = proxied.dialect
+        self.logging_name = proxied.logging_name
+        self.echo = proxied.echo
+        log.instance_logger(self, echoflag=self.echo)
+        self.dispatch = self.dispatch._join(proxied.dispatch)
+        self._execution_options = proxied._execution_options
+        self.update_execution_options(**execution_options)
 
+    def _get_pool(self):
+        return self._proxied.pool
+
+    def _set_pool(self, pool):
+        self._proxied.pool = pool
+
+    pool = property(_get_pool, _set_pool)
+
+    def _get_has_events(self):
+        return self._proxied._has_events or \
+            self.__dict__.get('_has_events', False)
+
+    def _set_has_events(self, value):
+        self.__dict__['_has_events'] = value
+
+    _has_events = property(_get_has_events, _set_has_events)

test/engine/test_execute.py

 
     @testing.requires.ad_hoc_engines
     def test_engine_level_options(self):
-        eng = engines.testing_engine(options={'execution_options'
-                : {'foo': 'bar'}})
+        eng = engines.testing_engine(options={'execution_options':
+                                            {'foo': 'bar'}})
         conn = eng.contextual_connect()
         eq_(conn._execution_options['foo'], 'bar')
         eq_(conn.execution_options(bat='hoho')._execution_options['foo'
         conn = eng.contextual_connect()
         eq_(conn._execution_options['foo'], 'hoho')
 
+    @testing.requires.ad_hoc_engines
+    def test_generative_engine_execution_options(self):
+        eng = engines.testing_engine(options={'execution_options':
+                                            {'base': 'x1'}})
+
+        eng1 = eng.execution_options(foo="b1")
+        eng2 = eng.execution_options(foo="b2")
+        eng1a = eng1.execution_options(bar="a1")
+        eng2a = eng2.execution_options(foo="b3", bar="a2")
+
+        eq_(eng._execution_options,
+                {'base': 'x1'})
+        eq_(eng1._execution_options,
+                {'base': 'x1', 'foo': 'b1'})
+        eq_(eng2._execution_options,
+                {'base': 'x1', 'foo': 'b2'})
+        eq_(eng1a._execution_options,
+                {'base': 'x1', 'foo': 'b1', 'bar': 'a1'})
+        eq_(eng2a._execution_options,
+                {'base': 'x1', 'foo': 'b3', 'bar': 'a2'})
+        is_(eng1a.pool, eng.pool)
+
+        # test pool is shared
+        eng2.dispose()
+        is_(eng1a.pool, eng2.pool)
+        is_(eng.pool, eng2.pool)
+
+    @testing.requires.ad_hoc_engines
+    def test_generative_engine_event_dispatch(self):
+        canary = []
+        def l1(*arg, **kw):
+            canary.append("l1")
+        def l2(*arg, **kw):
+            canary.append("l2")
+        def l3(*arg, **kw):
+            canary.append("l3")
+
+        eng = engines.testing_engine(options={'execution_options':
+                                            {'base': 'x1'}})
+        event.listen(eng, "before_execute", l1)
+
+        eng1 = eng.execution_options(foo="b1")
+        event.listen(eng, "before_execute", l2)
+        event.listen(eng1, "before_execute", l3)
+
+        eng.execute(select([1]))
+        eng1.execute(select([1]))
+
+        eq_(canary, ["l1", "l2", "l3", "l1", "l2"])
+
+    @testing.requires.ad_hoc_engines
+    def test_generative_engine_event_dispatch_hasevents(self):
+        def l1(*arg, **kw):
+            pass
+        eng = create_engine(testing.db.url)
+        assert not eng._has_events
+        event.listen(eng, "before_execute", l1)
+        eng2 = eng.execution_options(foo='bar')
+        assert eng2._has_events
+
     def test_unicode_test_fails_warning(self):
         class MockCursor(engines.DBAPIProxyCursor):
             def execute(self, stmt, params=None, **kw):
 
     def tearDown(self):
         Engine.dispatch._clear()
+        Engine._has_events = False
 
     def _assert_stmts(self, expected, received):
         orig = list(received)

test/ext/test_horizontal_shard.py

 from sqlalchemy.orm import *
 from sqlalchemy.ext.horizontal_shard import ShardedSession
 from sqlalchemy.sql import operators
+from sqlalchemy import pool
 from sqlalchemy.testing import fixtures
 from sqlalchemy import testing
 from sqlalchemy.testing.engines import testing_engine
 
 # TODO: ShardTest can be turned into a base for further subclasses
 
-class ShardTest(fixtures.TestBase):
+
+
+
+class ShardTest(object):
     __skip_if__ = (lambda: util.win32,)
+    __requires__ = 'sqlite',
+
+    schema = None
 
     def setUp(self):
         global db1, db2, db3, db4, weather_locations, weather_reports
 
-        try:
-            db1 = testing_engine('sqlite:///shard1.db', options=dict(pool_threadlocal=True))
-        except ImportError:
-            raise SkipTest('Requires sqlite')
-        db2 = testing_engine('sqlite:///shard2.db')
-        db3 = testing_engine('sqlite:///shard3.db')
-        db4 = testing_engine('sqlite:///shard4.db')
+        db1, db2, db3, db4 = self._init_dbs()
 
         meta = MetaData()
         ids = Table('ids', meta,
 
             c = db1.contextual_connect()
             nextid = c.execute(ids.select(for_update=True)).scalar()
-            c.execute(ids.update(values={ids.c.nextid : ids.c.nextid + 1}))
+            c.execute(ids.update(values={ids.c.nextid: ids.c.nextid + 1}))
             return nextid
 
         weather_locations = Table("weather_locations", meta,
                 Column('id', Integer, primary_key=True, default=id_generator),
                 Column('continent', String(30), nullable=False),
-                Column('city', String(50), nullable=False)
+                Column('city', String(50), nullable=False),
+                schema=self.schema
             )
 
         weather_reports = Table(
             meta,
             Column('id', Integer, primary_key=True),
             Column('location_id', Integer,
-                   ForeignKey('weather_locations.id')),
+                   ForeignKey(weather_locations.c.id)),
             Column('temperature', Float),
             Column('report_time', DateTime,
                    default=datetime.datetime.now),
+            schema=self.schema
             )
 
         for db in (db1, db2, db3, db4):
         self.setup_session()
         self.setup_mappers()
 
-    def tearDown(self):
-        clear_mappers()
-
-        for db in (db1, db2, db3, db4):
-            db.connect().invalidate()
-        for i in range(1,5):
-            os.remove("shard%d.db" % i)
 
     @classmethod
     def setup_session(cls):
                 self.temperature = temperature
 
         mapper(WeatherLocation, weather_locations, properties={
-            'reports':relationship(Report, backref='location'),
+            'reports': relationship(Report, backref='location'),
             'city': deferred(weather_locations.c.city),
         })
 
         mapper(Report, weather_reports)
+
     def _fixture_data(self):
         tokyo = WeatherLocation('Asia', 'Tokyo')
         newyork = WeatherLocation('North America', 'New York')
         event.listen(WeatherLocation, "load", load)
         sess = self._fixture_data()
 
-        tokyo = sess.query(WeatherLocation).filter_by(city="Tokyo").set_shard("asia").one()
+        tokyo = sess.query(WeatherLocation).\
+                    filter_by(city="Tokyo").set_shard("asia").one()
 
         sess.query(WeatherLocation).all()
         eq_(
             ['asia', 'north_america', 'north_america',
             'europe', 'europe', 'south_america',
             'south_america']
-        )
+        )
+
+class DistinctEngineShardTest(ShardTest, fixtures.TestBase):
+
+    def _init_dbs(self):
+        db1 = testing_engine('sqlite:///shard1.db',
+                            options=dict(pool_threadlocal=True))
+        db2 = testing_engine('sqlite:///shard2.db')
+        db3 = testing_engine('sqlite:///shard3.db')
+        db4 = testing_engine('sqlite:///shard4.db')
+
+        return db1, db2, db3, db4
+
+    def tearDown(self):
+        clear_mappers()
+
+        for db in (db1, db2, db3, db4):
+            db.connect().invalidate()
+        for i in range(1, 5):
+            os.remove("shard%d.db" % i)
+
+class AttachedFileShardTest(ShardTest, fixtures.TestBase):
+    schema = "changeme"
+
+    def _init_dbs(self):
+        db1 = testing_engine('sqlite://', options={"execution_options":
+                                            {"shard_id": "shard1"}})
+        assert db1._has_events
+
+        db2 = db1.execution_options(shard_id="shard2")
+        db3 = db1.execution_options(shard_id="shard3")
+        db4 = db1.execution_options(shard_id="shard4")
+
+        import re
+        @event.listens_for(db1, "before_cursor_execute", retval=True)
+        def _switch_shard(conn, cursor, stmt, params, context, executemany):
+            shard_id = conn._execution_options['shard_id']
+            # because SQLite can't just give us a "use" statement, we have
+            # to use the schema hack to locate table names
+            if shard_id:
+                stmt = re.sub(r"\"?changeme\"?\.", shard_id + "_", stmt)
+
+            return stmt, params
+
+        return db1, db2, db3, db4
+
+