Commits

Mike Bayer committed a7ada78

- [bug] bulk_insert() fixes:

1. bulk_insert() operation was
not working most likely since the 0.2 series
when used with an engine. #41
2. Repaired bulk_insert() to complete when
used against a lower-case-t table and executing
with only one set of parameters, working
around SQLAlchemy bug #2461 in this regard.
3. bulk_insert() uses "inline=True" so that phrases
like RETURNING and such don't get invoked for
single-row bulk inserts.
4. bulk_insert() will check that you're passing
a list of dictionaries in, raises TypeError
if not detected.

Comments (0)

Files changed (5)

+0.3.1
+=====
+- [bug] bulk_insert() fixes:
+
+    1. bulk_insert() operation was
+       not working most likely since the 0.2 series
+       when used with an engine. #41
+    2. Repaired bulk_insert() to complete when
+       used against a lower-case-t table and executing
+       with only one set of parameters, working
+       around SQLAlchemy bug #2461 in this regard.
+    3. bulk_insert() uses "inline=True" so that phrases
+       like RETURNING and such don't get invoked for
+       single-row bulk inserts.
+    4. bulk_insert() will check that you're passing
+       a list of dictionaries in, raises TypeError
+       if not detected.
+
 0.3.0
 =====
 - [general] The focus of 0.3 is to clean up 

alembic/__init__.py

 from os import path
 
-__version__ = '0.3.0'
+__version__ = '0.3.1'
 
 package_dir = path.abspath(path.dirname(__file__))
 

alembic/ddl/impl.py

         self._exec(schema.DropIndex(index))
 
     def bulk_insert(self, table, rows):
+        if not isinstance(rows, list):
+            raise TypeError("List expected")
+        elif rows and not isinstance(rows[0], dict):
+            raise TypeError("List of dictionaries expected")
         if self.as_sql:
             for row in rows:
-                self._exec(table.insert().values(**dict(
+                self._exec(table.insert(inline=True).values(**dict(
                     (k, _literal_bindparam(k, v, type_=table.c[k].type))
                     for k, v in row.items()
                 )))
         else:
-            self._exec(table.insert(), *rows)
+            # work around http://www.sqlalchemy.org/trac/ticket/2461
+            if not hasattr(table, '_autoincrement_column'):
+                table._autoincrement_column = None
+            self._exec(table.insert(inline=True), multiparams=rows)
 
     def compare_type(self, inspector_column, metadata_column):
 

tests/__init__.py

         except KeyError:
             dialect_mod = getattr(__import__('sqlalchemy.dialects.%s' % name).dialects, name)
             _dialects[name] = d = dialect_mod.dialect()
+            if name == 'postgresql':
+                d.implicit_returning = True
             return d
 
 def assert_compiled(element, assert_string, dialect=None):

tests/test_bulk_insert.py

-from tests import op_fixture
+from tests import op_fixture, _sqlite_testing_config, eq_, assert_raises_message
 from alembic import op
 from sqlalchemy import Integer, \
             UniqueConstraint, String
 from sqlalchemy.sql import table, column
+from unittest import TestCase
+from sqlalchemy import Table, Column, MetaData
 
-def _test_bulk_insert(dialect, as_sql):
+def _table_fixture(dialect, as_sql):
     context = op_fixture(dialect, as_sql)
     t1 = table("ins_table",
                 column('id', Integer),
                 column('v1', String()),
                 column('v2', String()),
     )
+    return context, t1
+
+def _big_t_table_fixture(dialect, as_sql):
+    context = op_fixture(dialect, as_sql)
+    t1 = Table("ins_table", MetaData(),
+                Column('id', Integer, primary_key=True),
+                Column('v1', String()),
+                Column('v2', String()),
+    )
+    return context, t1
+
+def _test_bulk_insert(dialect, as_sql):
+    context, t1 = _table_fixture(dialect, as_sql)
+
     op.bulk_insert(t1, [
         {'id':1, 'v1':'row v1', 'v2':'row v5'},
         {'id':2, 'v1':'row v2', 'v2':'row v6'},
     ])
     return context
 
+def _test_bulk_insert_single(dialect, as_sql):
+    context, t1 = _table_fixture(dialect, as_sql)
+
+    op.bulk_insert(t1, [
+        {'id':1, 'v1':'row v1', 'v2':'row v5'},
+    ])
+    return context
+
+def _test_bulk_insert_single_bigt(dialect, as_sql):
+    context, t1 = _big_t_table_fixture(dialect, as_sql)
+
+    op.bulk_insert(t1, [
+        {'id':1, 'v1':'row v1', 'v2':'row v5'},
+    ])
+    return context
+
 def test_bulk_insert():
     context = _test_bulk_insert('default', False)
     context.assert_(
     op.bulk_insert(t1, [
         {'v1':'row v1', },
     ])
-    # TODO: this is wrong because the test fixture isn't actually 
-    # doing what the real context would do.   Sending this to 
-    # PG is going to produce a RETURNING clause.  fixture would
-    # need to be beefed up
     context.assert_(
         'INSERT INTO ins_table (id, v1, v2) VALUES (%(id)s, %(v1)s, %(v2)s)'
     )
         'INSERT INTO ins_table (id, v1, v2) VALUES (%(id)s, %(v1)s, %(v2)s)'
     )
 
+def test_bulk_insert_pg_single():
+    context = _test_bulk_insert_single('postgresql', False)
+    context.assert_(
+        'INSERT INTO ins_table (id, v1, v2) VALUES (%(id)s, %(v1)s, %(v2)s)'
+    )
+
+def test_bulk_insert_pg_single_as_sql():
+    context = _test_bulk_insert_single('postgresql', True)
+    context.assert_(
+        "INSERT INTO ins_table (id, v1, v2) VALUES (1, 'row v1', 'row v5')"
+    )
+
+def test_bulk_insert_pg_single_big_t_as_sql():
+    context = _test_bulk_insert_single_bigt('postgresql', True)
+    context.assert_(
+        "INSERT INTO ins_table (id, v1, v2) VALUES (1, 'row v1', 'row v5')"
+    )
+
 def test_bulk_insert_mssql():
     context = _test_bulk_insert('mssql', False)
     context.assert_(
         "INSERT INTO ins_table (id, v1, v2) VALUES (4, 'row v4', 'row v8')", 
         'SET IDENTITY_INSERT ins_table OFF'
     )
+
+def test_invalid_format():
+    context, t1 = _table_fixture("sqlite", False)
+    assert_raises_message(
+        TypeError,
+        "List expected",
+        op.bulk_insert, t1, {"id":5}
+    )
+
+    assert_raises_message(
+        TypeError,
+        "List of dictionaries expected",
+        op.bulk_insert, t1, [(5, )]
+    )
+
+class RoundTripTest(TestCase):
+    def setUp(self):
+        from sqlalchemy import create_engine
+        from alembic.migration import MigrationContext
+        self.conn = create_engine("sqlite://").connect()
+        self.conn.execute("""
+            create table foo(
+                id integer primary key,
+                data varchar(50),
+                x integer
+            )
+        """)
+        context = MigrationContext.configure(self.conn)
+        self.op = op.Operations(context)
+        self.t1 = table('foo',
+                column('id'),
+                column('data'),
+                column('x')
+        )
+    def tearDown(self):
+        self.conn.close()
+
+    def test_single_insert_round_trip(self):
+        self.op.bulk_insert(self.t1, 
+            [{'data':"d1", "x":"x1"}]
+        )
+
+        eq_(
+            self.conn.execute("select id, data, x from foo").fetchall(),
+            [
+                (1, "d1", "x1"),
+            ]
+        )
+
+    def test_bulk_insert_round_trip(self):
+        self.op.bulk_insert(self.t1, [
+            {'data':"d1", "x":"x1"},
+            {'data':"d2", "x":"x2"},
+            {'data':"d3", "x":"x3"},
+        ])
+
+        eq_(
+            self.conn.execute("select id, data, x from foo").fetchall(),
+            [
+                (1, "d1", "x1"),
+                (2, "d2", "x2"),
+                (3, "d3", "x3")
+            ]
+        )
+