Commits

Andrew Godwin committed 27347e3

Some cleanup of the drop-primary-key code, and some test cleanup

Comments (0)

Files changed (3)

south/db/generic.py

     drop_index_string = 'DROP INDEX %(index_name)s'
     delete_column_string = 'ALTER TABLE %s DROP COLUMN %s CASCADE;'
     create_primary_key_string = "ALTER TABLE %(table)s ADD CONSTRAINT %(constraint)s PRIMARY KEY (%(columns)s)"
-    drop_primary_key_string = "ALTER TABLE %(table)s DROP CONSTRAINT %(constraint)s"
+    delete_primary_key_sql = "ALTER TABLE %(table)s DROP CONSTRAINT %(constraint)s"
     backend_name = None
 
     def __init__(self, db_alias):
     def _constraints_affecting_columns(self, table_name, columns, type="UNIQUE"):
         """
         Gets the names of the constraints affecting the given columns.
+        If columns is None, returns all constraints of the type on the table.
         """
 
         if self.dry_run:
             raise ValueError("Cannot get constraints for columns during a dry run.")
 
-        columns = set(columns)
+        if columns is not None:
+            columns = set(columns)
 
         if type == "CHECK":
             ifsc_table = "constraint_column_usage"
                 kc.table_name = %%s AND
                 c.constraint_type = %%s
         """ % ifsc_table, ['public', table_name, type])
+        
         # Load into a dict
         mapping = {}
         for constraint, column in rows:
             mapping.setdefault(constraint, set())
             mapping[constraint].add(column)
+        
         # Find ones affecting these columns
         for constraint, itscols in mapping.items():
-            if itscols == columns:
+            if itscols == columns or columns is None:
                 yield constraint
 
 
         raise NotImplementedError("rename_column has no generic SQL syntax")
 
 
-    def drop_primary_key(self, table_name):
+    def delete_primary_key(self, table_name):
         """
         Drops the old primary key.
         """
-        self.execute(self.drop_primary_key_string % {
-            "table": self.quote_name(table_name),
-            "constraint": self.quote_name(table_name+"_pkey"),
-        })
+        # Dry runs mean we can't do anything.
+        if self.dry_run:
+            return
 
-    delete_primary_key = alias('drop_primary_key')
+        constraints = list(self._constraints_affecting_columns(table_name, None, type="PRIMARY KEY"))
+        if not constraints:
+            raise ValueError("Cannot find a PRIMARY KEY constraint on table %s" % (table_name,))
+        
+        for constraint in constraints:
+            self.execute(self.delete_primary_key_sql % {
+                "table": self.quote_name(table_name), 
+                "constraint": self.quote_name(constraint),
+            })
+
+    drop_primary_key = alias('delete_primary_key')
 
 
     def create_primary_key(self, table_name, columns):

south/db/mysql.py

         params = (self.quote_name(old_table_name), self.quote_name(table_name))
         self.execute('RENAME TABLE %s TO %s;' % params)
     
+    #def drop_primary_key(self, table_name):
+    #    """
+    #    Drops the old primary key.
+    #    """
+    #    self.execute(self.drop_primary_key_string % {
+    #        "table": self.quote_name(table_name),
+    #    })
+    
     
     def _constraints_affecting_columns(self, table_name, columns, type="UNIQUE"):
         """

south/tests/db.py

     def setUp(self):
         db.debug = False
         db.clear_deferred_sql()
+        db.start_transaction()
+    
+    def tearDown(self):
+        db.rollback_transaction()
 
     def test_create(self):
         """
-        Test creation and deletion of tables.
+        Test creation of tables.
         """
         cursor = connection.cursor()
         # It needs to take at least 2 args
         self.assertRaises(TypeError, db.create_table, "test1")
         # Empty tables (i.e. no columns) are not fine, so make at least 1
         db.create_table("test1", [('email_confirmed', models.BooleanField(default=False))])
-        db.start_transaction()
         # And should exist
         cursor.execute("SELECT * FROM test1")
         # Make sure we can't do the same query on an empty table
             self.fail("Non-existent table could be selected!")
         except:
             pass
-        # Clear the dirty transaction
-        db.rollback_transaction()
-        db.start_transaction()
-        # Remove the table
-        db.drop_table("test1")
+    
+    def test_delete(self):
+        """
+        Test deletion of tables.
+        """
+        db.create_table("test_deltable", [('email_confirmed', models.BooleanField(default=False))])
+        db.delete_table("test_deltable")
         # Make sure it went
         try:
             cursor.execute("SELECT * FROM test1")
             self.fail("Just-deleted table could be selected!")
         except:
             pass
-        # Clear the dirty transaction
-        db.rollback_transaction()
-        db.start_transaction()
-        # Try deleting a nonexistent one
+    
+    def test_nonexistent_delete(self):
+        """
+        Test deletion of nonexistent tables.
+        """
         try:
-            db.delete_table("nottheretest1")
+            db.delete_table("test_nonexistdeltable")
             self.fail("Non-existent table could be deleted!")
         except:
             pass
-        db.rollback_transaction()
     
     def test_foreign_keys(self):
         """
         Test = db.mock_model(model_name='Test', db_table='test5a',
                              db_tablespace='', pk_field_name='ID',
                              pk_field_type=models.AutoField, pk_field_args=[])
-        db.start_transaction()
         db.create_table("test5a", [('ID', models.AutoField(verbose_name='ID', primary_key=True, auto_created=True))])
         db.create_table("test5b", [
             ('id', models.AutoField(verbose_name='ID', primary_key=True, auto_created=True)),
             ('UNIQUE', models.ForeignKey(Test)),
         ])
         db.execute_deferred_sql()
-        db.rollback_transaction()
     
     def test_rename(self):
         """
         """
         cursor = connection.cursor()
         db.create_table("test_rn", [('spam', models.BooleanField(default=False))])
-        db.start_transaction()
         # Make sure we can select the column
         cursor.execute("SELECT spam FROM test_rn")
         # Rename it
             self.fail("Just-renamed column could be selected!")
         except:
             pass
-        db.rollback_transaction()
         db.delete_table("test_rn")
     
     def test_dry_rename(self):
         """
         cursor = connection.cursor()
         db.create_table("test_drn", [('spam', models.BooleanField(default=False))])
-        db.start_transaction()
         # Make sure we can select the column
         cursor.execute("SELECT spam FROM test_drn")
         # Rename it
             self.fail("Dry-renamed new column could be selected!")
         except:
             pass
-        db.rollback_transaction()
         db.delete_table("test_drn")
     
     def test_table_rename(self):
         """
         cursor = connection.cursor()
         db.create_table("testtr", [('spam', models.BooleanField(default=False))])
-        db.start_transaction()
         # Make sure we can select the column
         cursor.execute("SELECT spam FROM testtr")
         # Rename it
             self.fail("Just-renamed column could be selected!")
         except:
             pass
-        db.rollback_transaction()
         db.delete_table("testtr2")
     
     def test_index(self):
             ('eggs', models.IntegerField(unique=True)),
         ])
         db.execute_deferred_sql()
-        db.start_transaction()
         # Add an index on that column
         db.create_index("test3", ["SELECT"])
         # Add another index on two columns
         db.delete_index("test3", ["SELECT", "eggs"])
         # Delete the unique index/constraint
         db.delete_unique("test3", ["eggs"])
-        db.rollback_transaction()
         db.delete_table("test3")
     
     def test_primary_key(self):
             ('eggs', models.IntegerField(unique=True)),
         ])
         db.execute_deferred_sql()
-        db.start_transaction()
         # Remove the default primary key, and make eggs it
-        db.drop_primary_key("test_pk")
+        db.delete_primary_key("test_pk")
         db.create_primary_key("test_pk", "new_pkey")
         # Try inserting a now-valid row pair
         db.execute("INSERT INTO test_pk (id, new_pkey, eggs) VALUES (1, 2, 3), (1, 3, 4)")
-        db.rollback_transaction()
         db.delete_table("test_pk")
     
     def test_alter(self):
         """
         Test altering columns/tables
         """
-        db.create_table("test4", [
+        db.create_table("test_alterc", [
             ('spam', models.BooleanField(default=False)),
             ('eggs', models.IntegerField()),
         ])
-        db.start_transaction()
         # Add a column
-        db.add_column("test4", "add1", models.IntegerField(default=3), keep_default=False)
+        db.add_column("test_alterc", "add1", models.IntegerField(default=3), keep_default=False)
         # Add a FK with keep_default=False (#69)
         User = db.mock_model(model_name='User', db_table='auth_user', db_tablespace='', pk_field_name='id', pk_field_type=models.AutoField, pk_field_args=[], pk_field_kwargs={})
-        db.add_column("test4", "user", models.ForeignKey(User, null=True), keep_default=False)
-        db.delete_column("test4", "add1")
-        
-        db.rollback_transaction()
-        db.delete_table("test4")
+        db.add_column("test_alterc", "user", models.ForeignKey(User, null=True), keep_default=False)
+        db.delete_column("test_alterc", "add1")
+        db.delete_table("test_alterc")
         
     def test_alter_column_postgres_multiword(self):
         """
 
         # test if 'with timezone' is preserved
         if db.backend_name == "postgres":
-            db.start_transaction()
             db.execute("INSERT INTO test_multiword (col_datetime) VALUES ('2009-04-24 14:20:55+02')")
             db.alter_column('test_multiword', 'col_datetime', models.DateTimeField(auto_now=True))
             assert db.execute("SELECT col_datetime = '2009-04-24 14:20:55+02' FROM test_multiword")[0][0]
-            db.rollback_transaction()
 
-        
         db.delete_table("test_multiword")
     
     def test_alter_constraints(self):
         # Add in some test values
         db.execute("INSERT INTO test_alterc (num) VALUES (1), (2)")
         # Ensure that adding a negative number is bad
-        db.start_transaction()
         try:
             db.execute("INSERT INTO test_alterc (num) VALUES (-3)")
         except:
-            db.rollback_transaction()
+            pass
         else:
             self.fail("Could insert a negative integer into a PositiveIntegerField.")
         # Alter it to a normal IntegerField
         db.dry_run = False
         db.delete_unique("test_unique", ["spam"])
         db.create_unique("test_unique", ["spam"])
-        db.start_transaction()
         # Test it works
         db.execute("INSERT INTO test_unique2 (id) VALUES (1), (2)")
         db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (true, 0, 1), (false, 1, 2)")
         try:
             db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (true, 2, 1)")
         except:
-            db.rollback_transaction()
+            pass
         else:
             self.fail("Could insert non-unique item.")
         # Drop that, add one only on eggs
         db.delete_unique("test_unique", ["spam"])
         db.execute("DELETE FROM test_unique")
         db.create_unique("test_unique", ["eggs"])
-        db.start_transaction()
         # Test similarly
         db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (true, 0, 1), (false, 1, 2)")
         try:
             db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (true, 1, 1)")
         except:
-            db.rollback_transaction()
+            pass
         else:
             self.fail("Could insert non-unique item.")
         # Drop those, test combined constraints
         db.delete_unique("test_unique", ["eggs"])
         db.execute("DELETE FROM test_unique")
         db.create_unique("test_unique", ["spam", "eggs", "ham_id"])
-        db.start_transaction()
         # Test similarly
         db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (true, 0, 1), (false, 1, 1)")
         try:
             db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (true, 0, 1)")
         except:
-            db.rollback_transaction()
+            pass
         else:
             self.fail("Could insert non-unique pair.")
         db.delete_unique("test_unique", ["spam", "eggs", "ham_id"])
         """
         Under PostgreSQL at least, capitalised constrains must be quoted.
         """
-        db.start_transaction()
-        try:
-            db.create_table("test_capconst", [
-                ('SOMECOL', models.PositiveIntegerField(primary_key=True)),
-            ])
-            # Alter it so it's not got the check constraint
-            db.alter_column("test_capconst", "SOMECOL", models.IntegerField())
-        finally:
-            db.rollback_transaction()
+        db.create_table("test_capconst", [
+            ('SOMECOL', models.PositiveIntegerField(primary_key=True)),
+        ])
+        # Alter it so it's not got the check constraint
+        db.alter_column("test_capconst", "SOMECOL", models.IntegerField())
     
     def test_text_default(self):
         """
         MySQL cannot have blank defaults on TEXT columns.
         """
-        db.start_transaction()
-        try:
-            db.create_table("test_textdef", [
-                ('textcol', models.TextField(blank=True)),
-            ])
-        finally:
-            db.rollback_transaction()
+        db.create_table("test_textdef", [
+            ('textcol', models.TextField(blank=True)),
+        ])
     
     def test_add_unique_fk(self):
         """
         db.create_table("test_add_unique_fk", [
             ('spam', models.BooleanField(default=False))
         ])
-        db.start_transaction()
         
         db.add_column("test_add_unique_fk", "mock1", models.ForeignKey(db.mock_model('Mock', 'mock'), null=True, unique=True))
         db.add_column("test_add_unique_fk", "mock2", models.OneToOneField(db.mock_model('Mock', 'mock'), null=True))
         
-        db.rollback_transaction()
         db.delete_table("test_add_unique_fk")