Commits

Ben Trofatter committed cf16567 Draft

Added sybase requirements to testing and improved view reflection.

Comments (0)

Files changed (2)

lib/sqlalchemy/dialects/sybase/base.py

           FROM sysobjects o JOIN sysusers u ON o.uid=u.uid
           WHERE u.name = :schema_name
               AND o.name = :table_name
-              AND o.type = 'U'
+              AND o.type in ('U', 'V')
         """)
 
         # Py2K
 
     @reflection.cache
     def get_foreign_keys(self, connection, table_name, schema=None, **kw):
+
         table_id = self.get_table_id(connection, table_name, schema,
                                      info_cache=kw.get("info_cache"))
     
         column_cache = {}
         foreign_keys = []
     
-        table_cache[table_id] = table_name
+        table_cache[table_id] = {"name": table_name, "schema": schema}
     
         COLUMN_SQL = text("""
           SELECT c.colid AS id, c.name AS name
                                                      table_id=table_id)
     
         REFTABLE_SQL = text("""
-          SELECT o.id AS id, o.name AS name, u.name AS 'schema'
+          SELECT o.name AS name, u.name AS 'schema'
           FROM sysobjects o JOIN sysusers u ON o.uid = u.uid
           WHERE o.id = :table_id
         """)
                 c = connection.execute(REFTABLE_SQL, table_id=reftable_id)
                 reftable = c.fetchone()
                 c.close()
-                table_cache[reftable_id] = {"name": reftable["name"],
-                                            "schema": reftable["schema"]}
-    
+                table_info = {"name": reftable["name"], "schema": None}
+                if (schema is not None or
+                        reftable["schema"] != self.default_schema_name):
+                    table_info["schema"] = reftable["schema"]
+
+                table_cache[reftable_id] = table_info
                 results = connection.execute(COLUMN_SQL, table_id=reftable_id)
                 reftable_columns = {}
                 for col in results:
             for i in range(1, r["count"]+1):
                 constrained_columns.append(columns[r["fokey%i" % i]])
                 referred_columns.append(reftable_columns[r["refkey%i" % i]])
-    
+   
             fk_info = {
                     "constrained_columns": constrained_columns,
                     "referred_schema": reftable["schema"],
             AND o.id = :table_id
             AND (i.status & 2048) = 0
             AND i.indid BETWEEN 1 AND 254
-            AND o.type = 'U'
         """)
 
         results = connection.execute(INDEX_SQL, table_id=table_id)
             AND o.id = :table_id
             AND (i.status & 2048) = 2048
             AND i.indid BETWEEN 1 AND 254
-            AND o.type = 'U'
         """)
 
         results = connection.execute(PK_SQL, table_id=table_id)
         return [v["name"] for v in views]
 
     def has_table(self, connection, table_name, schema=None):
-        if schema is None:
-            schema = self.default_schema_name
-
-        HAS_TABLE_SQL = text("""
-          SELECT o.name
-          FROM sysobjects o JOIN sysusers u ON o.uid = u.uid
-          WHERE o.name = :table_name
-             AND u.name = :schema_name
-             AND o.type = 'U'
-        """)
-
-        # Py2K
-        if isinstance(schema, unicode):
-            schema = schema.encode("ascii")
-        if isinstance(table_name, unicode):
-            table_name = table_name.encode("ascii")
-        # end Py2K
-        result = connection.execute(HAS_TABLE_SQL, table_name=table_name,
-                                    schema_name=schema)
-        return result.scalar() is not None
-
-    #def reflecttable(self, connection, table, include_columns):
-    #    raise NotImplementedError()
-
+        try:
+            self.get_table_id(connection, table_name, schema)
+        except exc.NoSuchTableError:
+            return False
+        else:
+            return True

test/requirements.py

     def empty_strings_varchar(self):
         """target database can persist/return an empty string with a varchar."""
 
-        return fails_if("oracle", 'oracle converts empty '
-                                'strings to a blank space')
+        return fails_if(["oracle"],
+                        'oracle converts empty strings to a blank space')
 
     @property
     def empty_strings_text(self):
         """target database can persist/return an empty string with an
         unbounded text."""
 
-        return fails_if("oracle", 'oracle converts empty '
-                                'strings to a blank space')
+        return fails_if(["oracle"],
+                        'oracle converts empty strings to a blank space')
+
+    @property
+    def unicode_data(self):
+        return skip_if([
+            no_support("sybase", "no unicode driver support")
+            ])
 
     @property
     def unicode_connections(self):
                 lambda: not self._has_cextensions(), "C extensions not installed"
                 )
 
-
     @property
     def emulated_lastrowid(self):
         """"target dialect retrieves cursor.lastrowid or an equivalent
                                        'mssql+pyodbc', 'mssql+mxodbc')
 
     @property
+    def implements_get_lastrowid(self):
+        return skip_if([
+            no_support('sybase', 'not supported by database'),
+            ])
+
+    @property
     def dbapi_lastrowid(self):
         """"target backend includes a 'lastrowid' accessor on the DBAPI
         cursor object.
     def reflects_pk_names(self):
         """Target driver reflects the name of primary key constraints."""
 
-        return fails_on_everything_except('postgresql', 'oracle')
+        return fails_on_everything_except('postgresql', 'oracle', 'sybase')
 
     @property
     def python2(self):