Commits

Mike Bayer committed ba83ff7

Fixed table reflection for Oracle when accessing a synonym that refers
to a DBLINK remote database; while the syntax has been present in the
Oracle dialect for some time, up until now it has never been tested.
The syntax has been tested against a sample database linking to itself,
however there's still some uncertainty as to what should be used for the
"owner" when querying the remote database for table information.
Currently, the value of "username" from user_db_links is used to
match the "owner". [ticket:2619]

Comments (0)

Files changed (6)

doc/build/changelog/changelog_08.rst

     :version: 0.8.0b2
 
     .. change::
+        :tags: oracle, bug
+        :tickets: 2619
+
+      Fixed table reflection for Oracle when accessing a synonym that refers
+      to a DBLINK remote database; while the syntax has been present in the
+      Oracle dialect for some time, up until now it has never been tested.
+      The syntax has been tested against a sample database linking to itself,
+      however there's still some uncertainty as to what should be used for the
+      "owner" when querying the remote database for table information.
+      Currently, the value of "username" from user_db_links is used to
+      match the "owner".
+
+    .. change::
         :tags: orm, feature
         :tickets: 2601
 

lib/sqlalchemy/dialects/oracle/base.py

 
         if resolve_synonyms:
             actual_name, owner, dblink, synonym = self._resolve_synonym(
-                                                         connection,
-                                                         desired_owner=self.denormalize_name(schema),
-                                                         desired_synonym=self.denormalize_name(table_name)
-                                                   )
+                        connection,
+                         desired_owner=self.denormalize_name(schema),
+                         desired_synonym=self.denormalize_name(table_name)
+                       )
         else:
             actual_name, owner, dblink, synonym = None, None, None, None
         if not actual_name:
             actual_name = self.denormalize_name(table_name)
-        if not dblink:
-            dblink = ''
-        if not owner:
+
+        if dblink:
+            # using user_db_links here since all_db_links appears
+            # to have more restricted permissions.
+            # http://docs.oracle.com/cd/B28359_01/server.111/b28310/ds_admin005.htm
+            # will need to hear from more users if we are doing
+            # the right thing here.  See [ticket:2619]
+            owner = connection.scalar(
+                            sql.text("SELECT username FROM user_db_links "
+                                    "WHERE db_link=:link"), link=dblink)
+            dblink = "@" + dblink
+        elif not owner:
             owner = self.denormalize_name(schema or self.default_schema_name)
-        return (actual_name, owner, dblink, synonym)
+
+        return (actual_name, owner, dblink or '', synonym)
 
     @reflection.cache
     def get_schema_names(self, connection, **kw):
         else:
             char_length_col = 'data_length'
 
-        c = connection.execute(sql.text(
-                "SELECT column_name, data_type, %(char_length_col)s, data_precision, data_scale, "
-                "nullable, data_default FROM ALL_TAB_COLUMNS%(dblink)s "
-                "WHERE table_name = :table_name AND owner = :owner "
-                "ORDER BY column_id" % {'dblink': dblink, 'char_length_col': char_length_col}),
-                               table_name=table_name, owner=schema)
+        params = {"table_name": table_name}
+        text = "SELECT column_name, data_type, %(char_length_col)s, "\
+                "data_precision, data_scale, "\
+                "nullable, data_default FROM ALL_TAB_COLUMNS%(dblink)s "\
+                "WHERE table_name = :table_name"
+        if schema is not None:
+            params['owner'] = schema
+            text += " AND owner = :owner "
+        text += " ORDER BY column_id"
+        text = text % {'dblink': dblink, 'char_length_col': char_length_col}
+
+        c = connection.execute(sql.text(text), **params)
 
         for row in c:
             (colname, orig_colname, coltype, length, precision, scale, nullable, default) = \
                                           resolve_synonyms, dblink,
                                           info_cache=info_cache)
         indexes = []
-        q = sql.text("""
-        SELECT a.index_name, a.column_name, b.uniqueness
-        FROM ALL_IND_COLUMNS%(dblink)s a,
-        ALL_INDEXES%(dblink)s b
-        WHERE
-            a.index_name = b.index_name
-            AND a.table_owner = b.table_owner
-            AND a.table_name = b.table_name
 
-        AND a.table_name = :table_name
-        AND a.table_owner = :schema
-        ORDER BY a.index_name, a.column_position""" % {'dblink': dblink})
-        rp = connection.execute(q, table_name=self.denormalize_name(table_name),
-                                schema=self.denormalize_name(schema))
+        params = {'table_name': table_name}
+        text = \
+            "SELECT a.index_name, a.column_name, b.uniqueness "\
+            "\nFROM ALL_IND_COLUMNS%(dblink)s a, "\
+            "\nALL_INDEXES%(dblink)s b "\
+            "\nWHERE "\
+            "\na.index_name = b.index_name "\
+            "\nAND a.table_owner = b.table_owner "\
+            "\nAND a.table_name = b.table_name "\
+            "\nAND a.table_name = :table_name "
+
+        if schema is not None:
+            params['schema'] = schema
+            text += "AND a.table_owner = :schema "
+
+        text += "ORDER BY a.index_name, a.column_position"
+
+        text = text % {'dblink': dblink}
+
+        q = sql.text(text)
+        rp = connection.execute(q, **params)
         indexes = []
         last_index_name = None
         pk_constraint = self.get_pk_constraint(
     def _get_constraint_data(self, connection, table_name, schema=None,
                             dblink='', **kw):
 
-        rp = connection.execute(
-            sql.text("""SELECT
-             ac.constraint_name,
-             ac.constraint_type,
-             loc.column_name AS local_column,
-             rem.table_name AS remote_table,
-             rem.column_name AS remote_column,
-             rem.owner AS remote_owner,
-             loc.position as loc_pos,
-             rem.position as rem_pos
-           FROM all_constraints%(dblink)s ac,
-             all_cons_columns%(dblink)s loc,
-             all_cons_columns%(dblink)s rem
-           WHERE ac.table_name = :table_name
-           AND ac.constraint_type IN ('R','P')
-           AND ac.owner = :owner
-           AND ac.owner = loc.owner
-           AND ac.constraint_name = loc.constraint_name
-           AND ac.r_owner = rem.owner(+)
-           AND ac.r_constraint_name = rem.constraint_name(+)
-           AND (rem.position IS NULL or loc.position=rem.position)
-           ORDER BY ac.constraint_name, loc.position""" % {'dblink': dblink}),
-            table_name=table_name, owner=schema)
+        params = {'table_name': table_name}
+
+        text = \
+            "SELECT"\
+            "\nac.constraint_name,"\
+            "\nac.constraint_type,"\
+            "\nloc.column_name AS local_column,"\
+            "\nrem.table_name AS remote_table,"\
+            "\nrem.column_name AS remote_column,"\
+            "\nrem.owner AS remote_owner,"\
+            "\nloc.position as loc_pos,"\
+            "\nrem.position as rem_pos"\
+            "\nFROM all_constraints%(dblink)s ac,"\
+            "\nall_cons_columns%(dblink)s loc,"\
+            "\nall_cons_columns%(dblink)s rem"\
+            "\nWHERE ac.table_name = :table_name"\
+            "\nAND ac.constraint_type IN ('R','P')"
+
+        if schema is not None:
+            params['owner'] = schema
+            text += "\nAND ac.owner = :owner"
+
+        text += \
+            "\nAND ac.owner = loc.owner"\
+            "\nAND ac.constraint_name = loc.constraint_name"\
+            "\nAND ac.r_owner = rem.owner(+)"\
+            "\nAND ac.r_constraint_name = rem.constraint_name(+)"\
+            "\nAND (rem.position IS NULL or loc.position=rem.position)"\
+            "\nORDER BY ac.constraint_name, loc.position"
+
+        text = text % {'dblink': dblink}
+        rp = connection.execute(sql.text(text), **params)
         constraint_data = rp.fetchall()
         return constraint_data
 
             self._prepare_reflection_args(connection, view_name, schema,
                                           resolve_synonyms, dblink,
                                           info_cache=info_cache)
-        s = sql.text("""
-        SELECT text FROM all_views
-        WHERE owner = :schema
-        AND view_name = :view_name
-        """)
-        rp = connection.execute(s,
-                                view_name=view_name, schema=schema).scalar()
+
+        params = {'view_name': view_name}
+        text = "SELECT text FROM all_views WHERE view_name=:view_name"
+
+        if schema is not None:
+            text += " AND owner = :schema"
+            params['schema'] = schema
+
+        rp = connection.execute(sql.text(text), **params).scalar()
         if rp:
             return rp.decode(self.encoding)
         else:

lib/sqlalchemy/testing/plugin/noseplugin.py

 def _post_setup_options(opt, file_config):
     from sqlalchemy.testing import config
     config.options = options
+    config.file_config = file_config
 
 
 @post
 
                 if not check.enabled:
                     raise SkipTest(
-                        "'%s' unsupported on DB implementation '%s'" % (
-                         cls.__name__, config.db.name)
+                        check.reason if check.reason
+                        else
+                        (
+                            "'%s' unsupported on DB implementation '%s'" % (
+                                cls.__name__, config.db.name
+                            )
                         )
+                    )
 
         if cls.__unsupported_on__:
             spec = exclusions.db_spec(*cls.__unsupported_on__)
 requirement_cls=test.requirements:DefaultRequirements
 profile_file=test/profiles.txt
 
+# name of a "loopback" link set up on the oracle database.
+# to create this, suppose your DB is scott/tiger@xe.  You'd create it
+# like:
+# create database link test_link connect to scott identified by tiger using 'xe';
+oracle_db_link = test_link
+
+
 [db]
 default=sqlite:///:memory:
 sqlite=sqlite:///:memory:

test/dialect/test_oracle.py

         eq_(result, u'’é')
 
 
+class DBLinkReflectionTest(fixtures.TestBase):
+    __requires__ = 'oracle_test_dblink',
+    __only_on__ = 'oracle'
+
+    @classmethod
+    def setup_class(cls):
+        from sqlalchemy.testing import config
+        cls.dblink = config.file_config.get('sqla_testing', 'oracle_db_link')
+
+        with testing.db.connect() as conn:
+            conn.execute(
+                "create table test_table "
+                "(id integer primary key, data varchar2(50))")
+            conn.execute("create synonym test_table_syn "
+                "for test_table@%s" % cls.dblink)
+
+    @classmethod
+    def teardown_class(cls):
+        with testing.db.connect() as conn:
+            conn.execute("drop synonym test_table_syn")
+            conn.execute("drop table test_table")
+
+    def test_hello_world(self):
+        """test that the synonym/dblink is functional."""
+        testing.db.execute("insert into test_table_syn (id, data) "
+                            "values (1, 'some data')")
+        eq_(
+            testing.db.execute("select * from test_table_syn").first(),
+            (1, 'some data')
+        )
+
+    def test_reflection(self):
+        """test the resolution of the synonym/dblink. """
+        m = MetaData()
+
+        t = Table('test_table_syn', m, autoload=True,
+                autoload_with=testing.db, oracle_resolve_synonyms=True)
+        eq_(t.c.keys(), ['id', 'data'])
+        eq_(list(t.primary_key), [t.c.id])
         return skip_if(lambda: not self._has_sqlite())
 
     @property
+    def oracle_test_dblink(self):
+        return skip_if(
+                    lambda: not self.config.file_config.has_option(
+                        'sqla_testing', 'oracle_db_link'),
+                    "oracle_db_link option not specified in config"
+                )
+
+    @property
     def ad_hoc_engines(self):
         """Test environment must allow ad-hoc engine/connection creation.