simplify primary key reflection and support empty PKConstraint + arguments that gathers PK cols

Issue #2910 resolved
Mike Bayer repo owner created an issue
diff --git a/lib/sqlalchemy/engine/reflection.py b/lib/sqlalchemy/engine/reflection.py
index badec84..5e06ad4 100644
--- a/lib/sqlalchemy/engine/reflection.py
+++ b/lib/sqlalchemy/engine/reflection.py
@@ -461,6 +461,9 @@ class Inspector(object):
         found_table = False
         cols_by_orig_name = {}

+        existing_pk_cols = list(table.primary_key.columns)
+        table.primary_key.columns.clear()
+
         for col_d in self.get_columns(table_name, schema, **tblkw):
             found_table = True
             orig_name = col_d['name']('name')
@@ -518,17 +521,17 @@ class Inspector(object):
                 for pk in pk_cons['constrained_columns']('constrained_columns')
                 if pk in cols_by_orig_name and pk not in exclude_columns
             ]
-            pk_cols += [               pk
-                for pk in table.primary_key
-                if pk.key in exclude_columns
-            ](
-)
-            primary_key_constraint = sa_schema.PrimaryKeyConstraint(
-                name=pk_cons.get('name'),
-                *pk_cols
-            )
+            # update pk constraint name
+            table.primary_key.name = pk_cons.get('name')
+
+            # set the primary key flag on new columns
+            for col in pk_cols:
+                col.primary_key = True
+
+            # fire a new event; this will add all existing
+            # primary key columns
+            table.primary_key._set_parent_with_dispatch(table)

-            table.append_constraint(primary_key_constraint)

         # Foreign keys
         fkeys = self.get_foreign_keys(table_name, schema, **tblkw)
diff --git a/lib/sqlalchemy/sql/schema.py b/lib/sqlalchemy/sql/schema.py
index 6ee9287..0eb4d9e 100644
--- a/lib/sqlalchemy/sql/schema.py
+++ b/lib/sqlalchemy/sql/schema.py
@@ -434,11 +434,6 @@ class Table(SchemaItem, TableClause):

     def _autoload(self, metadata, autoload_with, include_columns,
                   exclude_columns=()):
-        if self.primary_key.columns:
-            PrimaryKeyConstraint(*[               c for c in self.primary_key.columns
-                if c.key in exclude_columns
-            ](
-))._set_parent_with_dispatch(self)

         if autoload_with:
             autoload_with.run_callable(
@@ -2528,10 +2523,22 @@ class PrimaryKeyConstraint(ColumnCollectionConstraint):
     def _set_parent(self, table):
         super(PrimaryKeyConstraint, self)._set_parent(table)

-        if table.primary_key in table.constraints:
-            table.constraints.remove(table.primary_key)
-        table.primary_key = self
-        table.constraints.add(self)
+        if table.primary_key is not self:
+            table.constraints.discard(table.primary_key)
+            table.primary_key = self
+            table.constraints.add(self)
+
+        table_pks = [for c in table.c if c.primary_key](c)
+#        conflicts = set(c for c in self.columns if not c.primary_key)
+        if self.columns and table_pks and set(table_pks) != set(self.columns.values()):
+            raise exc.InvalidRequestError(
+                    "table specifies columns %s as primary_key=True, "
+                    "not matching locally specified columns %s" %
+                    (", ".join(c.name for c in table_pks),
+                    ", ".join(c.name for c in self.columns)
+                    )
+                )
+        self.columns.extend(table_pks)

         for c in self.columns:
             c.primary_key = True

test such as:

from sqlalchemy import *

t = Table('t', MetaData(), Column('x', Integer, primary_key=True),
                PrimaryKeyConstraint(mssql_clustered=True)
            )
print(t.primary_key)
print(t.primary_key.kwargs)

also this should raise an error:

from sqlalchemy import *

t = Table('t', MetaData(), Column('x', Integer, primary_key=True),
            Column('q', Integer),
                PrimaryKeyConstraint('q', mssql_clustered=True)
            )
print(t.primary_key)
print(t.primary_key.kwargs)

..annnnd:

from sqlalchemy import *
metadata = MetaData()

t = Table('t', metadata,
   Column('a', Integer, primary_key=True),
   Column('b', Integer, primary_key=True),
   Column('c', Integer, primary_key=True),
   PrimaryKeyConstraint('b', 'c', mssql_clustered=True)
)

Comments (2)

  1. Log in to comment