Commits

Mike Bayer committed 5317f6f

- [bug] Adjusted column default reflection code to
convert non-string values to string, to accommodate
old SQLite versions that don't deliver
default info as a string. [ticket:2265]
- factor sqlite column reflection to be like we did for postgresql,
in a separate method.

Comments (0)

Files changed (4)

     its name, as *part* of its name (i.e. """mytable""").
     [ticket:2568]
 
+  - [bug] Adjusted column default reflection code to
+    convert non-string values to string, to accommodate
+    old SQLite versions that don't deliver
+    default info as a string.  [ticket:2265]
+
 - mysql
   - [bug] Updated mysqlconnector interface to use
     updated "client flag" and "charset" APIs,

lib/sqlalchemy/dialects/sqlite/base.py

         c = _pragma_cursor(
                     connection.execute("%stable_info(%s)" %
                     (pragma, qtable)))
-        found_table = False
+
+        rows = c.fetchall()
         columns = []
-        while True:
-            row = c.fetchone()
-            if row is None:
-                break
-            (name, type_, nullable, default, has_default, primary_key) = \
+        for row in rows:
+            (name, type_, nullable, default, primary_key) = \
                 (row[1], row[2].upper(), not row[3],
-                row[4], row[4] is not None, row[5])
-            match = re.match(r'(\w+)(\(.*?\))?', type_)
-            if match:
-                coltype = match.group(1)
-                args = match.group(2)
-            else:
-                coltype = "VARCHAR"
-                args = ''
-            try:
-                coltype = self.ischema_names[coltype]
-                if args is not None:
-                    args = re.findall(r'(\d+)', args)
-                    coltype = coltype(*[int(a) for a in args])
-            except KeyError:
-                util.warn("Did not recognize type '%s' of column '%s'" %
-                          (coltype, name))
-                coltype = sqltypes.NullType()
+                row[4], row[5])
 
-            columns.append({
-                'name' : name,
-                'type' : coltype,
-                'nullable' : nullable,
-                'default' : default,
-                'autoincrement':default is None,
-                'primary_key': primary_key
-            })
+            columns.append(self._get_column_info(name, type_, nullable,
+                                    default, primary_key))
         return columns
 
+    def _get_column_info(self, name, type_, nullable,
+                                    default, primary_key):
+
+        match = re.match(r'(\w+)(\(.*?\))?', type_)
+        if match:
+            coltype = match.group(1)
+            args = match.group(2)
+        else:
+            coltype = "VARCHAR"
+            args = ''
+        try:
+            coltype = self.ischema_names[coltype]
+            if args is not None:
+                args = re.findall(r'(\d+)', args)
+                coltype = coltype(*[int(a) for a in args])
+        except KeyError:
+            util.warn("Did not recognize type '%s' of column '%s'" %
+                      (coltype, name))
+            coltype = sqltypes.NullType()
+
+        if default is not None:
+            default = unicode(default)
+
+        return {
+            'name': name,
+            'type': coltype,
+            'nullable': nullable,
+            'default': default,
+            'autoincrement': default is None,
+            'primary_key': primary_key
+        }
+
     @reflection.cache
     def get_primary_keys(self, connection, table_name, schema=None, **kw):
         cols = self.get_columns(connection, table_name, schema, **kw)
                 fk = fks[numerical_id]
             except KeyError:
                 fk = {
-                    'name' : None,
+                    'name': None,
                     'constrained_columns' : [],
                     'referred_schema' : None,
                     'referred_table' : rtbl,
 
     if cursor.closed:
         cursor.fetchone = lambda: None
+        cursor.fetchall = lambda: []
     return cursor

lib/sqlalchemy/engine/reflection.py

 
             coltype = col_d['type']
             col_kw = {
-                'nullable':col_d['nullable'],
+                'nullable': col_d['nullable'],
             }
             for k in ('autoincrement', 'quote', 'info', 'key'):
                 if k in col_d:

test/dialect/test_sqlite.py

 
     @testing.provide_metadata
     def test_boolean_default(self):
-        t= Table("t", self.metadata,
+        t = Table("t", self.metadata,
                 Column("x", Boolean, server_default=sql.false()))
         t.create(testing.db)
         testing.db.execute(t.insert())
             [(False,), (True,)]
         )
 
+    def test_old_style_default(self):
+        """test non-quoted integer value on older sqlite pragma"""
+
+        dialect = sqlite.dialect()
+        eq_(
+            dialect._get_column_info("foo", "INTEGER", False, 3, False),
+            {'primary_key': False, 'nullable': False,
+                'default': '3', 'autoincrement': False,
+                'type': INTEGER, 'name': 'foo'}
+        )
+
+
+
 
 class DialectTest(fixtures.TestBase, AssertsExecutionResults):