Commits

Mike Bayer  committed dfd30f0

- [bug] implement 'tablename' parameter on
drop_index() as this is needed by some
backends.

- [feature] Added execution_options parameter
to op.execute(), will call execution_options()
on the Connection before executing.

The immediate use case here is to allow
access to the new no_parameters option
in SQLAlchemy 0.7.6, which allows
some DBAPIs (psycopg2, MySQLdb) to allow
percent signs straight through without
escaping, thus providing cross-compatible
operation with DBAPI execution and
static script generation.

  • Participants
  • Parent commits 6d61c9d

Comments (0)

Files changed (6)

 =====
 - [feature] Informative error message when op.XYZ
   directives are invoked at module import time.
-  
+
+- [bug] implement 'tablename' parameter on 
+  drop_index() as this is needed by some 
+  backends.
+
+- [feature] Added execution_options parameter
+  to op.execute(), will call execution_options()
+  on the Connection before executing.
+
+  The immediate use case here is to allow
+  access to the new no_parameters option
+  in SQLAlchemy 0.7.6, which allows
+  some DBAPIs (psycopg2, MySQLdb) to allow
+  percent signs straight through without
+  escaping, thus providing cross-compatible
+  operation with DBAPI execution and 
+  static script generation.
+
 - [bug] setup.py won't install argparse if on
   Python 2.7/3.2
 

File alembic/ddl/impl.py

 from alembic.ddl import base
 from alembic import util
 from sqlalchemy import types as sqltypes
+from sqlalchemy import util as sqla_util
 
 class ImplMeta(type):
     def __init__(cls, classname, bases, dict_):
     def bind(self):
         return self.connection
 
-    def _exec(self, construct, *args, **kw):
+    def _exec(self, construct, execution_options=None, 
+                            multiparams=(), 
+                            params=sqla_util.immutabledict()):
         if isinstance(construct, basestring):
             construct = text(construct)
         if self.as_sql:
-            if args or kw:
+            if multiparams or params:
                 # TODO: coverage
                 raise Exception("Execution arguments not allowed with as_sql")
             self.static_output(unicode(
                     construct.compile(dialect=self.dialect)
                     ).replace("\t", "    ").strip() + ";")
         else:
-            self.connection.execute(construct, *args, **kw)
+            conn = self.connection
+            if execution_options:
+                conn = conn.execution_options(**execution_options)
+            conn.execute(construct, *multiparams, **params)
 
-    def execute(self, sql):
-        self._exec(sql)
+    def execute(self, sql, execution_options=None):
+        self._exec(sql, execution_options)
 
     def alter_column(self, table_name, column_name, 
                         nullable=None,

File alembic/environment.py

         with Operations.context(self._migration_context):
             self.get_context().run_migrations(**kw)
 
-    def execute(self, sql):
+    def execute(self, sql, execution_options=None):
         """Execute the given SQL using the current change context.
 
         The behavior of :meth:`.execute` is the same
         first been made available via :meth:`.configure`.
 
         """
-        self.get_context().execute(sql)
+        self.get_context().execute(sql, 
+                execution_options=execution_options)
 
     def static_output(self, text):
         """Emit text directly to the "offline" SQL stream.

File alembic/operations.py

         return schema.Column(name, type_, **kw)
 
     def _index(self, name, tablename, columns, **kw):
-        t = schema.Table(tablename, schema.MetaData(),
+        t = schema.Table(tablename or 'no_table', schema.MetaData(),
             *[schema.Column(n, NULLTYPE) for n in columns]
         )
         return schema.Index(name, *list(t.c), **kw)
             self._index(name, tablename, *columns, **kw)
         )
 
-    def drop_index(self, name):
+    def drop_index(self, name, tablename=None):
         """Issue a "drop index" instruction using the current 
         migration context.
 
         e.g.::
 
             drop_index("accounts")
+            
+        :param tablename: name of the owning table.  Some
+         backends such as Microsoft SQL Server require this.
 
         """
         # need a dummy column name here since SQLAlchemy
         # 0.7.6 and further raises on Index with no columns
-        self.impl.drop_index(self._index(name, 'foo', ['x']))
+        self.impl.drop_index(self._index(name, tablename, ['x']))
 
     def drop_constraint(self, name, tablename):
         """Drop a constraint of the given name"""
         """
         return impl._literal_bindparam(None, value, type_=type_)
 
-    def execute(self, sql):
+    def execute(self, sql, execution_options=None):
         """Execute the given SQL using the current migration context.
 
         In a SQL script context, the statement is emitted directly to the 
         * Pretty much anything that's "executable" as described
           in :ref:`sqlexpression_toplevel`.
 
-
+        :param execution_options: Optional dictionary of 
+         execution options, will be passed to 
+         :meth:`sqlalchemy.engine.base.Connection.execution_options`.
         """
-        self.migration_context.impl.execute(sql)
+        self.migration_context.impl.execute(sql, 
+                    execution_options=execution_options)
 
     def get_bind(self):
         """Return the current 'bind'.

File tests/test_mssql.py

             'ALTER TABLE t ALTER COLUMN c INTEGER'
         )
 
+    def test_drop_index(self):
+        context = op_fixture('mssql')
+        op.drop_index('my_idx', 'my_table')
+        # TODO: annoying that SQLA escapes unconditionally
+        context.assert_contains("DROP INDEX [my_table].my_idx")
+
     def test_drop_column_w_default(self):
         context = op_fixture('mssql')
         op.drop_column('t1', 'c1', mssql_drop_default=True)

File tests/test_postgresql.py

 def downgrade():
     op.drop_table("sometable")
     ENUM(name="pgenum").drop(op.get_bind(), checkfirst=False)
-    
+
 """ % self.rid)
 
     def test_offline_inline_enum_create(self):
         assert "DROP TABLE sometable" in buf.getvalue()
         assert "DROP TYPE pgenum" in buf.getvalue()
 
+from alembic.migration import MigrationContext
+from alembic.operations import Operations
+from sqlalchemy.sql import table, column
 
+class PostgresqlInlineLiteralTest(TestCase):
+    @classmethod
+    def setup_class(cls):
+        cls.bind = db_for_dialect("postgresql")
+        cls.bind.execute("""
+            create table tab (
+                col varchar(50)
+            )
+        """)
+        cls.bind.execute("""
+            insert into tab (col) values 
+                ('old data 1'),
+                ('old data 2.1'),
+                ('old data 3')
+        """)
+
+    @classmethod
+    def teardown_class(cls):
+        cls.bind.execute("drop table tab")
+
+    def setUp(self):
+        self.conn = self.bind.connect()
+        ctx = MigrationContext.configure(self.conn)
+        self.op = Operations(ctx)
+
+    def tearDown(self):
+        self.conn.close()
+
+    def test_inline_percent(self):
+        # TODO: here's the issue, you need to escape this.
+        tab = table('tab', column('col'))
+        self.op.execute(
+            tab.update().where(
+                tab.c.col.like(self.op.inline_literal('%.%'))
+            ).values(col=self.op.inline_literal('new data')),
+            execution_options={'no_parameters':True}
+        )
+        eq_(
+            self.conn.execute("select count(*) from tab where col='new data'").scalar(),
+            1,
+        )
 
 class PostgresqlDefaultCompareTest(TestCase):
     @classmethod