Commits

Mike Bayer committed 1b386c3

- postgres
- Added Index reflection support to Postgres, using a
great patch we long neglected, submitted by
Ken Kuhlman. [ticket:714]

  • Participants
  • Parent commits 7af7343
  • Branches rel_0_4

Comments (0)

Files changed (4)

     - Connection.invalidate() checks for closed status 
       to avoid attribute errors. [ticket:1246]
 
+- postgres
+    - Added Index reflection support to Postgres, using a
+      great patch we long neglected, submitted by 
+      Ken Kuhlman. [ticket:714]
+    
 - mysql
     - Fixed bug in exception raise when FK columns not present
       during reflection. [ticket:1241]

File lib/sqlalchemy/databases/postgres.py

 
             table.append_constraint(schema.ForeignKeyConstraint(constrained_columns, refspec, conname))
 
+        # Indexes 
+        IDX_SQL = """
+          SELECT c.relname, i.indisunique, i.indexprs, i.indpred,
+            a.attname
+          FROM pg_index i, pg_class c, pg_attribute a
+          WHERE i.indrelid = :table AND i.indexrelid = c.oid
+            AND a.attrelid = i.indexrelid AND i.indisprimary = 'f'
+          ORDER BY c.relname, a.attnum
+        """
+        t = sql.text(IDX_SQL, typemap={'attname':sqltypes.Unicode})
+        c = connection.execute(t, table=table_oid)
+        indexes = {}
+        sv_idx_name = None
+        for row in c.fetchall():
+            idx_name, unique, expr, prd, col = row
+
+            if expr and not idx_name == sv_idx_name:
+                util.warn(
+                  "Skipped unsupported reflection of expression-based index %s"
+                  % idx_name)
+                sv_idx_name = idx_name
+                continue
+            if prd and not idx_name == sv_idx_name:
+                util.warn(
+                   "Predicate of partial index %s ignored during reflection"
+                   % idx_name)
+                sv_idx_name = idx_name
+
+            if not indexes.has_key(idx_name):
+                indexes[idx_name] = [unique, []]
+            indexes[idx_name][1].append(col)
+
+        for name, (unique, columns) in indexes.items():
+            schema.Index(name, *[table.columns[c] for c in columns], 
+                         **dict(unique=unique))
+ 
+
+
     def _load_domains(self, connection):
         ## Load data types for domains:
         SQL_DOMAINS = """

File test/dialect/postgres.py

         finally:
             testing.db.execute("drop table speedy_users", None)
 
+    @testing.emits_warning()
+    def test_index_reflection(self):
+        """ Reflecting partial & expression-based indexes should warn """
+        import warnings
+        def capture_warnings(*args, **kw):
+            capture_warnings._orig_showwarning(*args, **kw)
+            capture_warnings.warnings.append(args)
+        capture_warnings._orig_showwarning = warnings.warn
+        capture_warnings.warnings = []
+
+        m1 = MetaData(testing.db)
+        t1 = Table('party', m1,
+            Column('id', String(10), nullable=False),
+            Column('name', String(20), index=True)
+            )
+        m1.create_all()
+        testing.db.execute("""
+          create index idx1 on party ((id || name))
+        """, None) 
+        testing.db.execute("""
+          create unique index idx2 on party (id) where name = 'test'
+        """, None)
+        try:
+            m2 = MetaData(testing.db)
+
+            warnings.warn = capture_warnings
+            t2 = Table('party', m2, autoload=True)
+      
+            wrn = capture_warnings.warnings
+            assert str(wrn[0][0]) == (
+              "Skipped unsupported reflection of expression-based index idx1")
+            assert str(wrn[1][0]) == (
+              "Predicate of partial index idx2 ignored during reflection")
+            assert len(t2.indexes) == 2
+            # Make sure indexes are in the order we expect them in
+            tmp = [(idx.name, idx) for idx in t2.indexes]
+            tmp.sort()
+            r1, r2 = [idx[1] for idx in tmp]
+
+            assert r1.name == 'idx2'
+            assert r1.unique == True
+            assert r2.unique == False
+            assert [t2.c.id] == r1.columns
+            assert [t2.c.name] == r2.columns
+        finally:
+            warnings.warn = capture_warnings._orig_showwarning
+            m1.drop_all()
+
     def test_create_partial_index(self):
         tbl = Table('testtbl', MetaData(), Column('data',Integer))
         idx = Index('test_idx1', tbl.c.data, postgres_where=and_(tbl.c.data > 5, tbl.c.data < 10))

File test/engine/reflection.py

     test_composite_fk = testing.exclude('mysql', '<', (4, 1, 1))(test_composite_fk)
 
 
-    def testreserved(self):
+    def test_reserved(self):
         # check a table that uses an SQL reserved name doesn't cause an error
         meta = MetaData(testing.db)
         table_a = Table('select', meta,
             table_c2 = Table('is', meta2, autoload=True)
         finally:
             meta.drop_all()
-    testreserved = testing.unsupported('oracle')(testreserved)
+    test_reserved = testing.unsupported('oracle')(test_reserved)
 
     def test_reflect_all(self):
         existing = testing.db.table_names()
             m9.reflect()
             self.assert_(not m9.tables)
 
+    @testing.fails_on_everything_except('postgres', 'mysql')
+    def test_index_reflection(self):
+        m1 = MetaData(testing.db)
+        t1 = Table('party', m1,
+            Column('id', Integer, nullable=False),
+            Column('name', String(20), index=True)
+            )
+        i1 = Index('idx1', t1.c.id, unique=True)
+        i2 = Index('idx2', t1.c.name, t1.c.id, unique=False)
+        m1.create_all()
+        try:
+            m2 = MetaData(testing.db)
+            t2 = Table('party', m2, autoload=True)
+
+            print len(t2.indexes), t2.indexes
+            assert len(t2.indexes) == 3
+            # Make sure indexes are in the order we expect them in
+            tmp = [(idx.name, idx) for idx in t2.indexes]
+            tmp.sort()
+            r1, r2, r3 = [idx[1] for idx in tmp]
+
+            assert r1.name == 'idx1'
+            assert r2.name == 'idx2'
+            assert r1.unique == True
+            assert r2.unique == False
+            assert r3.unique == False
+            assert [t2.c.id] == r1.columns
+            assert [t2.c.name, t2.c.id] == r2.columns
+            assert [t2.c.name] == r3.columns
+        finally:
+            m1.drop_all()
 
 class CreateDropTest(TestBase):
     def setUpAll(self):