jason kirtland avatar jason kirtland committed e1b0c82

Better quoting of identifiers when manipulating schemas
Merged from r2981

Comments (0)

Files changed (5)

 0.3.10
 - sql
+    - better quoting of identifiers when manipulating schemas
     - got connection-bound metadata to work with implicit execution
     - foreign key specs can have any chararcter in their identifiers
      [ticket:667]
       each other, improves ORM lazy load optimization [ticket:664]
 - orm
     - cleanup to connection-bound sessions, SessionTransaction
+- mysql
+    - fixed issue with tables in alternate schemas [ticket:662]
 - postgres
     - fixed max identifier length (63) [ticket:571]
 

lib/sqlalchemy/ansisql.py

     def visit_check_constraint(self, constraint):
         self.append(", \n\t")
         if constraint.name is not None:
-            self.append("CONSTRAINT %s " % constraint.name)
+            self.append("CONSTRAINT %s " %
+                        self.preparer.format_constraint(constraint))
         self.append(" CHECK (%s)" % constraint.sqltext)
 
     def visit_column_check_constraint(self, constraint):
-        self.append(" ")
         self.append(" CHECK (%s)" % constraint.sqltext)
 
     def visit_primary_key_constraint(self, constraint):
             return
         self.append(", \n\t")
         if constraint.name is not None:
-            self.append("CONSTRAINT %s " % constraint.name)
+            self.append("CONSTRAINT %s " % self.preparer.format_constraint(constraint))
         self.append("PRIMARY KEY ")
         self.append("(%s)" % (string.join([self.preparer.format_column(c) for c in constraint],', ')))
 
         self.execute()
 
     def define_foreign_key(self, constraint):
+        preparer = self.preparer
         if constraint.name is not None:
-            self.append("CONSTRAINT %s " % constraint.name)
+            self.append("CONSTRAINT %s " %
+                        preparer.format_constraint(constraint))
         self.append("FOREIGN KEY(%s) REFERENCES %s (%s)" % (
-            string.join([self.preparer.format_column(f.parent) for f in constraint.elements], ', '),
-            self.preparer.format_table(list(constraint.elements)[0].column.table),
-            string.join([self.preparer.format_column(f.column) for f in constraint.elements], ', ')
+            string.join([preparer.format_column(f.parent) for f in constraint.elements], ', '),
+            preparer.format_table(list(constraint.elements)[0].column.table),
+            string.join([preparer.format_column(f.column) for f in constraint.elements], ', ')
         ))
         if constraint.ondelete is not None:
             self.append(" ON DELETE %s" % constraint.ondelete)
     def visit_unique_constraint(self, constraint):
         self.append(", \n\t")
         if constraint.name is not None:
-            self.append("CONSTRAINT %s " % constraint.name)
-        self.append(" UNIQUE ")
-        self.append("(%s)" % (string.join([self.preparer.format_column(c) for c in constraint],', ')))
+            self.append("CONSTRAINT %s " %
+                        self.preparer.format_constraint(constraint))
+        self.append(" UNIQUE (%s)" % (string.join([self.preparer.format_column(c) for c in constraint],', ')))
 
     def visit_column(self, column):
         pass
 
     def visit_index(self, index):
+        preparer = self.preparer        
         self.append('CREATE ')
         if index.unique:
             self.append('UNIQUE ')
         self.append('INDEX %s ON %s (%s)' \
-                    % (index.name, self.preparer.format_table(index.table),
-                       string.join([self.preparer.format_column(c) for c in index.columns], ', ')))
+                    % (preparer.format_index(index),
+                       preparer.format_table(index.table),
+                       string.join([preparer.format_column(c) for c in index.columns], ', ')))
         self.execute()
 
 class ANSISchemaDropper(ANSISchemaBase):
             table.accept_visitor(self)
 
     def visit_index(self, index):
-        self.append("\nDROP INDEX " + index.name)
+        self.append("\nDROP INDEX " + self.preparer.format_index(index))
         self.execute()
 
     def drop_foreignkey(self, constraint):
-        self.append("ALTER TABLE %s DROP CONSTRAINT %s" % (self.preparer.format_table(constraint.table), constraint.name))
+        self.append("ALTER TABLE %s DROP CONSTRAINT %s" % (
+            self.preparer.format_table(constraint.table),
+            self.preparer.format_constraint(constraint)))
         self.execute()
 
     def visit_table(self, table):
 
         return value.replace('"', '""')
 
-    def _quote_identifier(self, value):
+    def quote_identifier(self, value):
         """Quote an identifier.
 
         Subclasses should override this to provide database-dependent
 
     def __generic_obj_format(self, obj, ident):
         if getattr(obj, 'quote', False):
-            return self._quote_identifier(ident)
+            return self.quote_identifier(ident)
         if self.dialect.cache_identifiers:
             case_sens = getattr(obj, 'case_sensitive', None)
             try:
                 return self.__strings[(ident, case_sens)]
             except KeyError:
                 if self._requires_quotes(ident, getattr(obj, 'case_sensitive', ident == ident.lower())):
-                    self.__strings[(ident, case_sens)] = self._quote_identifier(ident)
+                    self.__strings[(ident, case_sens)] = self.quote_identifier(ident)
                 else:
                     self.__strings[(ident, case_sens)] = ident
                 return self.__strings[(ident, case_sens)]
         else:
             if self._requires_quotes(ident, getattr(obj, 'case_sensitive', ident == ident.lower())):
-                return self._quote_identifier(ident)
+                return self.quote_identifier(ident)
             else:
                 return ident
 
     def format_alias(self, alias):
         return self.__generic_obj_format(alias, alias.name)
 
+    def format_constraint(self, constraint):
+        return self.__generic_obj_format(constraint, constraint.name)
+
+    def format_index(self, index):
+        return self.__generic_obj_format(index, index.name)
+
     def format_table(self, table, use_schema=True, name=None):
         """Prepare a quoted table and schema name."""
 

lib/sqlalchemy/databases/mysql.py

     def is_disconnect(self, e):
         return isinstance(e, self.dbapi.OperationalError) and e.args[0] in (2006, 2013, 2014, 2045, 2055)
 
-    def get_default_schema_name(self):
-        if not hasattr(self, '_default_schema_name'):
-            self._default_schema_name = sql.text("select database()", self).scalar()
-        return self._default_schema_name
+    def get_default_schema_name(self, connection):
+        try:
+            return self._default_schema_name
+        except AttributeError:
+            name = self._default_schema_name = \
+              connection.execute('SELECT DATABASE()').scalar()
+            return name
 
     def has_table(self, connection, table_name, schema=None):
         # SHOW TABLE STATUS LIKE and SHOW TABLES LIKE do not function properly
         else:
             st = "DESCRIBE `%s`" % table_name
         try:
-            return connection.execute(st).rowcount > 0
+            rs = connection.execute(st)
+            have = rs.rowcount > 0
+            rs.close()
+            return have
         except exceptions.SQLError, e:
             if e.orig.args[0] == 1146:
                 return False
 
 class MySQLSchemaDropper(ansisql.ANSISchemaDropper):
     def visit_index(self, index):
-        self.append("\nDROP INDEX " + index.name + " ON " + index.table.name)
+        self.append("\nDROP INDEX %s ON %s" %
+                    (self.preparer.format_index(index),
+                     self.preparer.format_table(index.table)))
         self.execute()
 
     def drop_foreignkey(self, constraint):
-        self.append("ALTER TABLE %s DROP FOREIGN KEY %s" % (self.preparer.format_table(constraint.table), constraint.name))
+        self.append("ALTER TABLE %s DROP FOREIGN KEY %s" %
+                    (self.preparer.format_table(constraint.table),
+                     self.preparer.format_constraint(constraint)))
         self.execute()
 
 class MySQLIdentifierPreparer(ansisql.ANSIIdentifierPreparer):
         return RESERVED_WORDS
 
     def _escape_identifier(self, value):
-        #TODO: determine MySQL's escaping rules
-        return value
+        return value.replace('`', '``')
 
     def _fold_identifier_case(self, value):
         #TODO: determine MySQL's case folding rules

lib/sqlalchemy/databases/sqlite.py

         return "oid"
 
     def has_table(self, connection, table_name, schema=None):
-        cursor = connection.execute("PRAGMA table_info(" + table_name + ")", {})
+        cursor = connection.execute("PRAGMA table_info(%s)" %
+           self.identifier_preparer.quote_identifier(table_name), {})
         row = cursor.fetchone()
 
         # consume remaining rows, to work around: http://www.sqlite.org/cvstrac/tktview?tn=1884

test/engine/reflection.py

     def testreserved(self):
         # check a table that uses an SQL reserved name doesn't cause an error
         meta = MetaData(testbase.db)
+        table_a = Table('select', meta, 
+                       Column('not', Integer, primary_key=True),
+                       Column('from', String(12), nullable=False),
+                       UniqueConstraint('from', name='when'))
+        Index('where', table_a.c['from'])
+
+        quoter = meta.bind.dialect.identifier_preparer.quote_identifier
+
+        table_b = Table('false', meta,
+                        Column('create', Integer, primary_key=True),
+                        Column('true', Integer, ForeignKey('select.not')),
+                        CheckConstraint('%s <> 1' % quoter('true'), name='limit'))
+
+        table_c = Table('is', meta,
+                        Column('or', Integer, nullable=False, primary_key=True),
+                        Column('join', Integer, nullable=False, primary_key=True),
+                        PrimaryKeyConstraint('or', 'join', name='to'))
+
+        index_c = Index('else', table_c.c.join)
+
+        #meta.bind.echo = True
+        meta.create_all()
+
+        index_c.drop()
+        
+        meta2 = MetaData(testbase.db)
+        try:
+            table_a2 = Table('select', meta2, autoload=True)
+            table_b2 = Table('false', meta2, autoload=True)
+            table_c2 = Table('is', meta2, autoload=True)
+        finally:
+            meta.drop_all()
+
+
+        meta = MetaData(testbase.db)
         table = Table(
             'select', meta, 
             Column('col1', Integer, primary_key=True)
 
     @testbase.unsupported('sqlite')
     def testcreate(self):
-        schema = testbase.db.url.database
+        engine = testbase.db
+        schema = engine.dialect.get_default_schema_name(engine)
+
         metadata = MetaData(testbase.db)
         table1 = Table('table1', metadata, 
             Column('col1', Integer, primary_key=True),
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.