Commits

diana committed 489dcf8

deprecate inspector.get_primary_keys() in favor of inspector.get_pk_constraint()
- see #2422

  • Participants
  • Parent commits 641ebde

Comments (0)

Files changed (11)

lib/sqlalchemy/dialects/firebird/base.py

             return None
 
     @reflection.cache
-    def get_primary_keys(self, connection, table_name, schema=None, **kw):
+    def get_pk_constraint(self, connection, table_name, schema=None, **kw):
         # Query to extract the PK/FK constrained fields of the given table
         keyqry = """
         SELECT se.rdb$field_name AS fname
         # get primary key fields
         c = connection.execute(keyqry, ["PRIMARY KEY", tablename])
         pkfields = [self.normalize_name(r['fname']) for r in c.fetchall()]
-        return pkfields
+        return {'constrained_columns':pkfields, 'name':None}
 
     @reflection.cache
     def get_column_sequence(self, connection, 
         ORDER BY r.rdb$field_position
         """
         # get the PK, used to determine the eventual associated sequence
-        pkey_cols = self.get_primary_keys(connection, table_name)
+        pk_constraint = self.get_pk_constraint(connection, table_name)
+        pkey_cols = pk_constraint['constrained_columns']
 
         tablename = self.denormalize_name(table_name)
         # get all of the fields for this table

lib/sqlalchemy/dialects/informix/base.py

                   and t3.tabid = t2.tabid and t3.colno = t1.colno
                 order by t1.colno""", table_name, schema)
 
-        primary_cols = self.get_primary_keys(connection, table_name, schema, **kw)
+        pk_constraint = self.get_pk_constraint(connection, table_name, schema, **kw)
+        primary_cols = pk_constraint['constrained_columns']
 
         columns = []
         rows = c.fetchall()
         return fkeys.values()
 
     @reflection.cache
-    def get_primary_keys(self, connection, table_name, schema=None, **kw):
+    def get_pk_constraint(self, connection, table_name, schema=None, **kw):
         schema = schema or self.default_schema_name
 
         # Select the column positions from sysindexes for sysconstraints
             colpositions |= colpos
 
         if not len(colpositions):
-            return []
+            return {'constrained_columns':[], 'name':None}
 
         # Select the column names using the columnpositions
         # TODO: Maybe cache a bit of those col infos (eg select all colnames for one table)
             table_name, *colpositions
         ).fetchall()
 
-        return reduce(lambda x,y: list(x)+list(y), c, [])
+        cols = reduce(lambda x,y: list(x)+list(y), c, [])
+        return {'constrained_columns':cols, 'name':None}
 
     @reflection.cache
     def get_indexes(self, connection, table_name, schema, **kw):

lib/sqlalchemy/dialects/mssql/base.py

         return cols
 
     @reflection.cache
-    def get_primary_keys(self, connection, tablename, schema=None, **kw):
+    def get_pk_constraint(self, connection, tablename, schema=None, **kw):
         current_schema = schema or self.default_schema_name
         pkeys = []
         # information_schema.referential_constraints
         for row in c:
             if 'PRIMARY' in row[TC.c.constraint_type.name]:
                 pkeys.append(row[0])
-        return pkeys
+        return {'constrained_columns':pkeys, 'name':None}
 
     @reflection.cache
     def get_foreign_keys(self, connection, tablename, schema=None, **kw):

lib/sqlalchemy/dialects/mysql/base.py

         return parsed_state.columns
 
     @reflection.cache
-    def get_primary_keys(self, connection, table_name, schema=None, **kw):
+    def get_pk_constraint(self, connection, table_name, schema=None, **kw):
         parsed_state = self._parsed_state_or_create(connection, table_name, schema, **kw)
         for key in parsed_state.keys:
             if key['type'] == 'PRIMARY':
                 # There can be only one.
-                ##raise Exception, str(key)
-                return [s[0] for s in key['columns']]
-        return []
+                cols = [s[0] for s in key['columns']]
+                return {'constrained_columns':cols, 'name':None}
+        return {'constrained_columns':[], 'name':None}
 
     @reflection.cache
     def get_foreign_keys(self, connection, table_name, schema=None, **kw):

lib/sqlalchemy/dialects/oracle/base.py

                                 schema=self.denormalize_name(schema))
         indexes = []
         last_index_name = None
-        pkeys = self.get_primary_keys(connection, table_name, schema,
-                                      resolve_synonyms=resolve_synonyms,
-                                      dblink=dblink,
-                                      info_cache=kw.get('info_cache'))
+        pk_constraint = self.get_pk_constraint(
+            connection, table_name, schema, resolve_synonyms=resolve_synonyms,
+            dblink=dblink, info_cache=kw.get('info_cache'))
+        pkeys = pk_constraint['constrained_columns']
         uniqueness = dict(NONUNIQUE=False, UNIQUE=True)
 
         oracle_sys_col = re.compile(r'SYS_NC\d+\$', re.IGNORECASE)
         constraint_data = rp.fetchall()
         return constraint_data
 
-    def get_primary_keys(self, connection, table_name, schema=None, **kw):
-        """
-
-        kw arguments can be:
-
-            oracle_resolve_synonyms
-
-            dblink
-
-        """
-        return self._get_primary_keys(connection, table_name, schema, **kw)[0]
-
     @reflection.cache
-    def _get_primary_keys(self, connection, table_name, schema=None, **kw):
+    def get_pk_constraint(self, connection, table_name, schema=None, **kw):
         resolve_synonyms = kw.get('oracle_resolve_synonyms', False)
         dblink = kw.get('dblink', '')
         info_cache = kw.get('info_cache')
                                         info_cache=kw.get('info_cache'))
 
         for row in constraint_data:
-            #print "ROW:" , row
             (cons_name, cons_type, local_column, remote_table, remote_column, remote_owner) = \
                 row[0:2] + tuple([self.normalize_name(x) for x in row[2:6]])
             if cons_type == 'P':
                 if constraint_name is None:
                     constraint_name = self.normalize_name(cons_name)
                 pkeys.append(local_column)
-        return pkeys, constraint_name
-
-    def get_pk_constraint(self, connection, table_name, schema=None, **kw):
-        cols, name = self._get_primary_keys(connection, table_name, schema=schema, **kw)
-
-        return {
-            'constrained_columns':cols,
-            'name':name
-        }
+        return {'constrained_columns':pkeys, 'name':constraint_name}
 
     @reflection.cache
     def get_foreign_keys(self, connection, table_name, schema=None, **kw):

lib/sqlalchemy/dialects/postgresql/base.py

         return columns
 
     @reflection.cache
-    def get_primary_keys(self, connection, table_name, schema=None, **kw):
+    def get_pk_constraint(self, connection, table_name, schema=None, **kw):
         table_oid = self.get_table_oid(connection, table_name, schema,
                                        info_cache=kw.get('info_cache'))
 
         """
         t = sql.text(PK_SQL, typemap={'attname':sqltypes.Unicode})
         c = connection.execute(t, table_oid=table_oid)
-        primary_keys = [r[0] for r in c.fetchall()]
-        return primary_keys
-
-    @reflection.cache
-    def get_pk_constraint(self, connection, table_name, schema=None, **kw):
-        cols = self.get_primary_keys(connection, table_name, 
-                                            schema=schema, **kw)
-
-        table_oid = self.get_table_oid(connection, table_name, schema,
-                                       info_cache=kw.get('info_cache'))
+        cols = [r[0] for r in c.fetchall()]
 
         PK_CONS_SQL = """
         SELECT conname
         t = sql.text(PK_CONS_SQL, typemap={'conname':sqltypes.Unicode})
         c = connection.execute(t, table_oid=table_oid)
         name = c.scalar()
-        return {
-            'constrained_columns':cols,
-            'name':name
-        }
+
+        return {'constrained_columns':cols, 'name':name}
 
     @reflection.cache
     def get_foreign_keys(self, connection, table_name, schema=None, **kw):

lib/sqlalchemy/dialects/sqlite/base.py

         return columns
 
     @reflection.cache
-    def get_primary_keys(self, connection, table_name, schema=None, **kw):
+    def get_pk_constraint(self, connection, table_name, schema=None, **kw):
         cols = self.get_columns(connection, table_name, schema, **kw)
         pkeys = []
         for col in cols:
             if col['primary_key']:
                 pkeys.append(col['name'])
-        return pkeys
+        return {'constrained_columns':pkeys, 'name':None}
 
     @reflection.cache
     def get_foreign_keys(self, connection, table_name, schema=None, **kw):

lib/sqlalchemy/engine/base.py

 
         raise NotImplementedError()
 
-    def get_primary_keys(self, connection, table_name, schema=None, **kw):
-        """Return information about primary keys in `table_name`.
-
-        Given a :class:`.Connection`, a string
-        `table_name`, and an optional string `schema`, return primary
-        key information as a list of column names.
-
-        """
-        raise NotImplementedError()
-
-    def get_pk_constraint(self, table_name, schema=None, **kw):
+    def get_pk_constraint(self, connection, table_name, schema=None, **kw):
         """Return information about the primary key constraint on
         table_name`.
 
-        Given a string `table_name`, and an optional string `schema`, return
-        primary key information as a dictionary with these keys:
+        Given a :class:`.Connection`, a string
+        `table_name`, and an optional string `schema`, return primary 
+        key information as a dictionary with these keys:
 
         constrained_columns
           a list of column names that make up the primary key

lib/sqlalchemy/engine/default.py

         insp = reflection.Inspector.from_engine(connection)
         return insp.reflecttable(table, include_columns, exclude_columns)
 
-    def get_pk_constraint(self, conn, table_name, schema=None, **kw):
-        """Compatibility method, adapts the result of get_primary_keys()
-        for those dialects which don't implement get_pk_constraint().
-
-        """
-        return {
-            'constrained_columns':
-                        self.get_primary_keys(conn, table_name, 
-                                                schema=schema, **kw)
-        }
-
     def validate_identifier(self, ident):
         if len(ident) > self.max_identifier_length:
             raise exc.IdentifierError(

lib/sqlalchemy/engine/reflection.py

 
 import sqlalchemy
 from sqlalchemy import exc, sql
+from sqlalchemy import schema as sa_schema
 from sqlalchemy import util
+from sqlalchemy.types import TypeEngine
+from sqlalchemy.util import deprecated
 from sqlalchemy.util import topological
-from sqlalchemy.types import TypeEngine
-from sqlalchemy import schema as sa_schema
 
 
 @util.decorator
                 col_def['type'] = coltype()
         return col_defs
 
+    @deprecated('0.7', 'Call to deprecated method get_primary_keys.'
+                '  Use get_pk_constraint instead.')
     def get_primary_keys(self, table_name, schema=None, **kw):
         """Return information about primary keys in `table_name`.
 
         primary key information as a list of column names.
         """
 
-        pkeys = self.dialect.get_primary_keys(self.bind, table_name, schema,
-                                              info_cache=self.info_cache,
-                                              **kw)
-
+        pkeys = self.dialect.get_pk_constraint(self.bind, table_name, schema,
+                                               info_cache=self.info_cache,
+                                               **kw)['constrained_columns']
         return pkeys
 
     def get_pk_constraint(self, table_name, schema=None, **kw):

test/engine/test_reflection.py

-from test.lib.testing import eq_, assert_raises, assert_raises_message
-import StringIO, unicodedata
+import unicodedata
+import sqlalchemy as sa
+from sqlalchemy import exc as sa_exc
 from sqlalchemy import types as sql_types
 from sqlalchemy import schema, events, event
+from sqlalchemy import MetaData, Integer, String
 from sqlalchemy.engine.reflection import Inspector
-from sqlalchemy import MetaData, Integer, String
+from test.lib import ComparesTables, \
+                            testing, engines, AssertsCompiledSQL, fixtures
 from test.lib.schema import Table, Column
-import sqlalchemy as sa
-from test.lib import ComparesTables, \
-                            testing, engines, AssertsCompiledSQL
-from test.lib import fixtures
+from test.lib.testing import eq_, assert_raises, assert_raises_message
 
 create_inspector = Inspector.from_engine
 
         self._test_get_columns(schema='test_schema', table_type='view')
 
     @testing.provide_metadata
-    def _test_get_primary_keys(self, schema=None):
+    def _test_get_pk_constraint(self, schema=None):
         meta = self.metadata
-        users, addresses, dingalings = createTables(meta, schema)
+        users, addresses, _ = createTables(meta, schema)
         meta.create_all()
         insp = Inspector(meta.bind)
-        users_pkeys = insp.get_primary_keys(users.name,
-                                            schema=schema)
+
+        users_cons = insp.get_pk_constraint(users.name, schema=schema)
+        users_pkeys = users_cons['constrained_columns']
         eq_(users_pkeys,  ['user_id'])
-        addr_cons = insp.get_pk_constraint(addresses.name,
-                                            schema=schema)
 
+        addr_cons = insp.get_pk_constraint(addresses.name, schema=schema)
         addr_pkeys = addr_cons['constrained_columns']
         eq_(addr_pkeys,  ['address_id'])
 
             eq_(addr_cons['name'], 'email_ad_pk')
         go()
 
-    def test_get_primary_keys(self):
-        self._test_get_primary_keys()
+    def test_get_pk_constraint(self):
+        self._test_get_pk_constraint()
 
     @testing.fails_on('sqlite', 'no schemas')
-    def test_get_primary_keys_with_schema(self):
-        self._test_get_primary_keys(schema='test_schema')
+    def test_get_pk_constraint_with_schema(self):
+        self._test_get_pk_constraint(schema='test_schema')
+
+    @testing.provide_metadata
+    def test_deprecated_get_primary_keys(self):
+        meta = self.metadata
+        users, _, _ = createTables(meta, schema=None)
+        meta.create_all()
+        insp = Inspector(meta.bind)
+        assert_raises_message(
+            sa_exc.SADeprecationWarning,
+            "Call to deprecated method get_primary_keys."
+            "  Use get_pk_constraint instead.",
+            insp.get_primary_keys, users.name
+        )
 
     @testing.provide_metadata
     def _test_get_foreign_keys(self, schema=None):