1. idank
  2. sqlalchemy

Commits

Mike Bayer  committed 4a63b2d

fix to the fix for [ticket:396] plus a unit test

  • Participants
  • Parent commits 36c0cdc
  • Branches default

Comments (0)

Files changed (3)

File lib/sqlalchemy/databases/mysql.py

View file
  • Ignore whitespace
     def preparer(self):
         return MySQLIdentifierPreparer(self)
 
-    def do_executemany(self, cursor, statement, parameters, **kwargs):
+    def do_executemany(self, cursor, statement, parameters, context=None, **kwargs):
         try:
-            cursor.executemany(statement, parameters)
+            rowcount = cursor.executemany(statement, parameters)
+            if context is not None:
+                context._rowcount = rowcount
         except mysql.OperationalError, o:
             if o.args[0] == 2006 or o.args[0] == 2014:
                 cursor.invalidate()
             
 
     def do_rollback(self, connection):
-        # some versions of MySQL just dont support rollback() at all....
+        # MySQL without InnoDB doesnt support rollback()
         try:
             connection.rollback()
         except:

File test/sql/alltests.py

View file
  • Ignore whitespace
         # assorted round-trip tests
         'sql.query',
         'sql.quote',
+        'sql.rowcount',
         
         # defaults, sequences (postgres/oracle)
         'sql.defaults',

File test/sql/rowcount.py

View file
  • Ignore whitespace
+from sqlalchemy import *
+import testbase
+
+class FoundRowsTest(testbase.AssertMixin):
+    """tests rowcount functionality"""
+    def setUpAll(self):
+        metadata = BoundMetaData(testbase.db)
+
+        global employees_table
+
+        employees_table = Table('employees', metadata,
+            Column('employee_id', Integer, primary_key=True),
+            Column('name', String(50)),
+            Column('department', String(1)),
+        )
+        employees_table.create()
+
+    def setUp(self):
+        global data
+        data = [ ('Angela', 'A'),
+                 ('Andrew', 'A'),
+                 ('Anand', 'A'),
+                 ('Bob', 'B'),
+                 ('Bobette', 'B'),
+                 ('Buffy', 'B'),
+                 ('Charlie', 'C'),
+                 ('Cynthia', 'C'),
+                 ('Chris', 'C') ]
+
+        i = employees_table.insert()
+        i.execute(*[{'name':n, 'department':d} for n, d in data])
+    def tearDown(self):
+        employees_table.delete().execute()
+        
+    def tearDownAll(self):
+        employees_table.drop()
+
+    def testbasic(self):
+        s = employees_table.select()
+        r = s.execute().fetchall()
+
+        assert len(r) == len(data)
+
+    def test_update_rowcount1(self):
+        # WHERE matches 3, 3 rows changed
+        department = employees_table.c.department
+        r = employees_table.update(department=='C').execute(department='Z')
+        assert r.rowcount == 3
+        
+    def test_update_rowcount2(self):
+        # WHERE matches 3, 0 rows changed
+        department = employees_table.c.department
+        r = employees_table.update(department=='C').execute(department='C')
+        assert r.rowcount == 3
+        
+    def test_delete_rowcount(self):
+        # WHERE matches 3, 3 rows deleted
+        department = employees_table.c.department
+        r = employees_table.delete(department=='C').execute()
+        assert r.rowcount == 3
+
+if __name__ == '__main__':
+    testbase.main()
+    
+
+
+