Commits

Mike Bayer committed 392bca4

More adjustment to this SQLite related issue which was released in
0.7.9, to intercept legacy SQLite quoting characters when reflecting
foreign keys. In addition to intercepting double quotes, other
quoting characters such as brackets, backticks, and single quotes
are now also intercepted. [ticket:2568]

  • Participants
  • Parent commits f410b04
  • Branches rel_0_7

Comments (0)

Files changed (3)

File doc/build/changelog/changelog_07.rst

       logic's failure to take .key into account.
 
     .. change::
+        :tags: sqlite, bug
+        :tickets: 2568
+
+      More adjustment to this SQLite related issue which was released in
+      0.7.9, to intercept legacy SQLite quoting characters when reflecting
+      foreign keys.  In addition to intercepting double quotes, other
+      quoting characters such as brackets, backticks, and single quotes
+      are now also intercepted.
+
+    .. change::
         :tags: sql, bug
         :tickets: 2631
 

File lib/sqlalchemy/dialects/sqlite/base.py

     supports_cast = True
     supports_default_values = True
 
+    _broken_fk_pragma_quotes = False
+
     def __init__(self, isolation_level=None, native_datetime=False, **kwargs):
         default.DefaultDialect.__init__(self, **kwargs)
         self.isolation_level = isolation_level
             self.supports_cast = \
                                 self.dbapi.sqlite_version_info >= (3, 2, 3)
 
+            # see http://www.sqlalchemy.org/trac/ticket/2568
+            # as well as http://www.sqlite.org/src/info/600482d161
+            self._broken_fk_pragma_quotes = \
+                                self.dbapi.sqlite_version_info < (3, 6, 14)
+
+
     _isolation_lookup = {
         'READ UNCOMMITTED':1,
         'SERIALIZABLE':0
         else:
             pragma = "PRAGMA "
         qtable = quote(table_name)
-        c = _pragma_cursor(connection.execute("%sforeign_key_list(%s)" % (pragma, qtable)))
+        statement = "%sforeign_key_list(%s)" % (pragma, qtable)
+        c = _pragma_cursor(connection.execute(statement))
         fkeys = []
         fks = {}
         while True:
             if row is None:
                 break
             (numerical_id, rtbl, lcol, rcol) = (row[0], row[2], row[3], row[4])
-            # sqlite won't return rcol if the table
-            # was created with REFERENCES <tablename>, no col
-            if rcol is None:
-                rcol = lcol
 
-            # see http://www.sqlalchemy.org/trac/ticket/2568
-            # as well as http://www.sqlite.org/src/info/600482d161
-            if self.dbapi.sqlite_version_info < (3, 6, 14):
-                rtbl = re.sub(r'^\"|\"$', '', rtbl)
+            self._parse_fk(fks, fkeys, numerical_id, rtbl, lcol, rcol)
+        return fkeys
 
-            try:
-                fk = fks[numerical_id]
-            except KeyError:
-                fk = {
-                    'name': None,
-                    'constrained_columns' : [],
-                    'referred_schema' : None,
-                    'referred_table' : rtbl,
-                    'referred_columns' : []
-                }
-                fkeys.append(fk)
-                fks[numerical_id] = fk
+    def _parse_fk(self, fks, fkeys, numerical_id, rtbl, lcol, rcol):
+        # sqlite won't return rcol if the table
+        # was created with REFERENCES <tablename>, no col
+        if rcol is None:
+            rcol = lcol
 
-            # look up the table based on the given table's engine, not 'self',
-            # since it could be a ProxyEngine
-            if lcol not in fk['constrained_columns']:
-                fk['constrained_columns'].append(lcol)
-            if rcol not in fk['referred_columns']:
-                fk['referred_columns'].append(rcol)
-        return fkeys
+        if self._broken_fk_pragma_quotes:
+            rtbl = re.sub(r'^[\"\[`\']|[\"\]`\']$', '', rtbl)
+
+        try:
+            fk = fks[numerical_id]
+        except KeyError:
+            fk = {
+                'name': None,
+                'constrained_columns': [],
+                'referred_schema': None,
+                'referred_table': rtbl,
+                'referred_columns': []
+            }
+            fkeys.append(fk)
+            fks[numerical_id] = fk
+
+        if lcol not in fk['constrained_columns']:
+            fk['constrained_columns'].append(lcol)
+        if rcol not in fk['referred_columns']:
+            fk['referred_columns'].append(rcol)
+        return fk
 
     @reflection.cache
     def get_indexes(self, connection, table_name, schema=None, **kw):

File test/dialect/test_sqlite.py

             meta.drop_all()
 
     @testing.provide_metadata
-    def test_quoted_identifiers_one(self):
+    def test_quoted_identifiers_functional_one(self):
         """Tests autoload of tables created with quoted column names."""
 
         metadata = self.metadata
                 == table2.c.id)
 
     @testing.provide_metadata
-    def test_quoted_identifiers_two(self):
+    def test_quoted_identifiers_functional_two(self):
         """"test the edgiest of edge cases, quoted table/col names
         that start and end with quotes.
 
         #assert j.onclause.compare(table1.c['"id"']
         #        == table2.c['"aid"'])
 
+    def test_legacy_quoted_identifiers_unit(self):
+        dialect = sqlite.dialect()
+        dialect._broken_fk_pragma_quotes = True
+
+
+        for row in [
+            (0, 'target', 'tid', 'id'),
+            (0, '"target"', 'tid', 'id'),
+            (0, '[target]', 'tid', 'id'),
+            (0, "'target'", 'tid', 'id'),
+            (0, '`target`', 'tid', 'id'),
+        ]:
+            fks = {}
+            fkeys = []
+            dialect._parse_fk(fks, fkeys, *row)
+            eq_(fkeys, [{
+                    'referred_table': 'target',
+                    'referred_columns': ['id'],
+                    'referred_schema': None,
+                    'name': None,
+                    'constrained_columns': ['tid']
+                }])
+
+
     def test_attached_as_schema(self):
         cx = testing.db.connect()
         try: