Commits

Mike Bayer committed ce1f5fa Draft Merge

merge default

Comments (0)

Files changed (16)

doc/build/changelog/changelog_07.rst

     :released:
 
     .. change::
+        :tags: mssql, bug
+        :tickets: 2638
+
+      Added a Py3K conditional around unnecessary .decode()
+      call in mssql information schema, fixes reflection
+      in Py3k.
+
+    .. change::
         :tags: orm, bug
         :tickets: 2650
 

doc/build/changelog/changelog_08.rst

     :version: 0.8.0
 
     .. change::
+        :tags: sql, feature
+        :tickets: 695
+
+      :class:`.Index` now supports arbitrary SQL expressions and/or
+      functions, in addition to straight columns.   Common modifiers
+      include using ``somecolumn.desc()`` for a descending index and
+      ``func.lower(somecolumn)`` for a case-insensitive index, depending on the
+      capabilities of the target backend.
+
+    .. change::
+        :tags: mssql, bug
+        :tickets: 2638
+
+      Added a py3K conditional around unnecessary .decode()
+      call in mssql information schema, fixes reflection
+      in Py3K. Also in 0.7.10.
+
+    .. change::
         :tags: orm, bug
         :tickets: 2650
 

doc/build/core/schema.rst

     {sql}i.create(engine)
     CREATE INDEX someindex ON mytable (col5){stop}
 
+Functional Indexes
+~~~~~~~~~~~~~~~~~~~
+
+:class:`.Index` supports SQL and function expressions, as supported by the
+target backend.  To create an index against a column using a descending
+value, the :meth:`.ColumnElement.desc` modifier may be used::
+
+    from sqlalchemy import Index
+
+    Index('someindex', mytable.c.somecol.desc())
+
+Or with a backend that supports functional indexes such as Postgresql,
+a "case insensitive" index can be created using the ``lower()`` function::
+
+    from sqlalchemy import func, Index
+
+    Index('someindex', func.lower(mytable.c.somecol))
+
+.. versionadded:: 0.8 :class:`.Index` supports SQL expressions and functions
+   as well as plain columns.
+
+Index API
+---------
+
 .. autoclass:: Index
     :show-inheritance:
     :members:

lib/sqlalchemy/dialects/mssql/base.py

     def visit_drop_index(self, drop):
         return "\nDROP INDEX %s.%s" % (
             self.preparer.quote_identifier(drop.element.table.name),
-            self.preparer.quote(
-                        self._index_identifier(drop.element.name),
-                        drop.element.quote)
+            self._prepared_index_name(drop.element,
+                                        include_schema=True)
             )
 
 

lib/sqlalchemy/dialects/mssql/information_schema.py

     impl = Unicode
 
     def process_bind_param(self, value, dialect):
+        # Py2K
         if isinstance(value, str):
             value = value.decode(dialect.encoding)
+        # end Py2K
         return value
 
 schemata = Table("SCHEMATA", ischema,

lib/sqlalchemy/dialects/mysql/base.py

 
     def visit_create_index(self, create):
         index = create.element
+        self._verify_index_table(index)
         preparer = self.preparer
         table = preparer.format_table(index.table)
-        columns = [preparer.quote(c.name, c.quote) for c in index.columns]
+        columns = [self.sql_compiler.process(expr, include_table=False)
+                for expr in index.expressions]
+
         name = preparer.quote(
-                    self._index_identifier(index.name),
+                    self._prepared_index_name(index),
                     index.quote)
 
         text = "CREATE "
     def visit_drop_index(self, drop):
         index = drop.element
 
-        return "\nDROP INDEX %s ON %s" % \
-                    (self.preparer.quote(
-                        self._index_identifier(index.name), index.quote
-                    ),
+        return "\nDROP INDEX %s ON %s" % (
+                    self._prepared_index_name(index,
+                                        include_schema=False),
                      self.preparer.format_table(index.table))
 
     def visit_drop_constraint(self, drop):

lib/sqlalchemy/dialects/postgresql/base.py

     def visit_create_index(self, create):
         preparer = self.preparer
         index = create.element
+        self._verify_index_table(index)
         text = "CREATE "
         if index.unique:
             text += "UNIQUE "
-        ops = index.kwargs.get('postgresql_ops', {})
         text += "INDEX %s ON %s " % (
-                    preparer.quote(
-                        self._index_identifier(index.name), index.quote),
+                        self._prepared_index_name(index,
+                                include_schema=False),
                     preparer.format_table(index.table)
                 )
 
             using = index.kwargs['postgresql_using']
             text += "USING %s " % preparer.quote(using, index.quote)
 
+        ops = index.kwargs.get('postgresql_ops', {})
         text += "(%s)" \
                 % (
                     ', '.join([
-                        preparer.format_column(c) +
+                        self.sql_compiler.process(expr, include_table=False) +
+
+
                         (c.key in ops and (' ' + ops[c.key]) or '')
-                        for c in index.columns])
+
+
+                        for expr, c in zip(index.expressions, index.columns)])
                     )
 
-        if "postgres_where" in index.kwargs:
-            whereclause = index.kwargs['postgres_where']
-            util.warn_deprecated(
-                    "The 'postgres_where' argument has been renamed "
-                    "to 'postgresql_where'.")
-        elif 'postgresql_where' in index.kwargs:
+        if 'postgresql_where' in index.kwargs:
             whereclause = index.kwargs['postgresql_where']
         else:
             whereclause = None

lib/sqlalchemy/dialects/sqlite/base.py

         return preparer.format_table(table, use_schema=False)
 
     def visit_create_index(self, create):
-        index = create.element
-        preparer = self.preparer
-        text = "CREATE "
-        if index.unique:
-            text += "UNIQUE "
-        text += "INDEX %s ON %s (%s)" \
-                    % (preparer.format_index(index,
-                       name=self._index_identifier(index.name)),
-                       preparer.format_table(index.table, use_schema=False),
-                       ', '.join(preparer.quote(c.name, c.quote)
-                                 for c in index.columns))
-        return text
+        return super(SQLiteDDLCompiler, self).\
+                    visit_create_index(create, include_table_schema=False)
 
 
 class SQLiteTypeCompiler(compiler.GenericTypeCompiler):

lib/sqlalchemy/dialects/sybase/base.py

         index = drop.element
         return "\nDROP INDEX %s.%s" % (
             self.preparer.quote_identifier(index.table.name),
-            self.preparer.quote(
-                    self._index_identifier(index.name), index.quote)
+            self._prepared_index_name(drop.element,
+                                        include_schema=False)
             )
 
 

lib/sqlalchemy/ext/mutable.py

         dict.__setitem__(self, key, value)
         self.changed()
 
-    def __delitem__(self, key, value):
+    def __delitem__(self, key):
         """Detect dictionary del events and emit change events."""
-        dict.__delitem__(self, key, value)
+        dict.__delitem__(self, key)
         self.changed()
 
     @classmethod

lib/sqlalchemy/schema.py

 
     :ref:`schema_indexes` - General information on :class:`.Index`.
 
-    :ref:`postgresql_indexes` - PostgreSQL-specific options available for the :class:`.Index` construct.
-
-    :ref:`mysql_indexes` - MySQL-specific options available for the :class:`.Index` construct.
+    :ref:`postgresql_indexes` - PostgreSQL-specific options available for the
+    :class:`.Index` construct.
+
+    :ref:`mysql_indexes` - MySQL-specific options available for the
+    :class:`.Index` construct.
+
     """
 
     __visit_name__ = 'index'
 
-    def __init__(self, name, *columns, **kw):
+    def __init__(self, name, *expressions, **kw):
         """Construct an index object.
 
         :param name:
           The name of the index
 
-        :param \*columns:
-          Columns to include in the index. All columns must belong to the same
-          table.
+        :param \*expressions:
+          Column expressions to include in the index.   The expressions
+          are normally instances of :class:`.Column`, but may also
+          be arbitrary SQL expressions which ultmately refer to a
+          :class:`.Column`.
+
+          .. versionadded:: 0.8 :class:`.Index` supports SQL expressions as
+             well as plain columns.
 
         :param unique:
             Defaults to False: create a unique index.
 
         """
         self.table = None
+
+        columns = []
+        for expr in expressions:
+            if not isinstance(expr, expression.ClauseElement):
+                columns.append(expr)
+            else:
+                cols = []
+                visitors.traverse(expr, {}, {'column': cols.append})
+                if cols:
+                    columns.append(cols[0])
+                else:
+                    columns.append(expr)
+
+        self.expressions = expressions
+
         # will call _set_parent() if table-bound column
         # objects are present
         ColumnCollectionMixin.__init__(self, *columns)
+
         self.name = name
         self.unique = kw.pop('unique', False)
         self.kwargs = kw
                 )
         table.indexes.add(self)
 
+        self.expressions = [
+            expr if isinstance(expr, expression.ClauseElement)
+            else colexpr
+            for expr, colexpr in zip(self.expressions, self.columns)
+        ]
+
     @property
     def bind(self):
         """Return the connectable associated with this Index."""

lib/sqlalchemy/sql/compiler.py

     def visit_drop_view(self, drop):
         return "\nDROP VIEW " + self.preparer.format_table(drop.element)
 
-    def _index_identifier(self, ident):
-        if isinstance(ident, sql._truncated_label):
-            max = self.dialect.max_index_name_length or \
-                        self.dialect.max_identifier_length
-            if len(ident) > max:
-                ident = ident[0:max - 8] + \
-                                "_" + util.md5_hex(ident)[-4:]
-        else:
-            self.dialect.validate_identifier(ident)
-
-        return ident
-
-    def visit_create_index(self, create, include_schema=False):
+
+    def _verify_index_table(self, index):
+        if index.table is None:
+            raise exc.CompileError("Index '%s' is not associated "
+                            "with any table." % index.name)
+
+
+    def visit_create_index(self, create, include_schema=False,
+                                include_table_schema=True):
         index = create.element
+        self._verify_index_table(index)
         preparer = self.preparer
         text = "CREATE "
         if index.unique:
                     % (
                         self._prepared_index_name(index,
                                 include_schema=include_schema),
-                       preparer.format_table(index.table),
-                       ', '.join(preparer.quote(c.name, c.quote)
-                                 for c in index.columns))
+                       preparer.format_table(index.table,
+                                    use_schema=include_table_schema),
+                       ', '.join(
+                            self.sql_compiler.process(expr,
+                                include_table=False) for
+                                expr in index.expressions)
+                        )
         return text
 
     def visit_drop_index(self, drop):
         else:
             schema_name = None
 
+        ident = index.name
+        if isinstance(ident, sql._truncated_label):
+            max_ = self.dialect.max_index_name_length or \
+                        self.dialect.max_identifier_length
+            if len(ident) > max_:
+                ident = ident[0:max_ - 8] + \
+                                "_" + util.md5_hex(ident)[-4:]
+        else:
+            self.dialect.validate_identifier(ident)
+
         index_name = self.preparer.quote(
-                            self._index_identifier(index.name),
+                                    ident,
                                     index.quote)
 
         if schema_name:

lib/sqlalchemy/testing/assertsql.py

 
 class CompiledSQL(SQLMatchRule):
 
-    def __init__(self, statement, params):
+    def __init__(self, statement, params=None):
         SQLMatchRule.__init__(self)
         self.statement = statement
         self.params = params
                                executemany):
         if not context:
             return
+        from sqlalchemy.schema import _DDLCompiles
         _received_parameters = list(context.compiled_parameters)
 
         # recompile from the context, using the default dialect
 
-        compiled = \
-            context.compiled.statement.compile(dialect=DefaultDialect(),
+        if isinstance(context.compiled.statement, _DDLCompiles):
+            compiled = \
+                context.compiled.statement.compile(dialect=DefaultDialect())
+        else:
+            compiled = \
+                context.compiled.statement.compile(dialect=DefaultDialect(),
                 column_keys=context.compiled.column_keys)
-        _received_statement = re.sub(r'\n', '', str(compiled))
+        _received_statement = re.sub(r'[\n\t]', '', str(compiled))
         equivalent = self.statement == _received_statement
         if self.params:
             if util.callable(self.params):

test/dialect/test_mssql.py

         fp.close()
         return stream
 
+class InfoCoerceUnicodeTest(fixtures.TestBase):
+    def test_info_unicode_coercion(self):
+        from sqlalchemy.dialects.mssql.information_schema import CoerceUnicode
+
+        dialect = mssql.dialect()
+        value = CoerceUnicode().bind_processor(dialect)('a string')
+        assert isinstance(value, unicode)
 
 class ReflectHugeViewTest(fixtures.TestBase):
     __only_on__ = 'mssql'

test/dialect/test_postgresql.py

                             'USING hash (data)',
                             dialect=postgresql.dialect())
 
-    @testing.uses_deprecated(r".*'postgres_where' argument has been "
-                             "renamed.*")
-    def test_old_create_partial_index(self):
-        tbl = Table('testtbl', MetaData(), Column('data', Integer))
-        idx = Index('test_idx1', tbl.c.data,
-                    postgres_where=and_(tbl.c.data > 5, tbl.c.data
-                    < 10))
-        self.assert_compile(schema.CreateIndex(idx),
-                            'CREATE INDEX test_idx1 ON testtbl (data) '
-                            'WHERE data > 5 AND data < 10',
-                            dialect=postgresql.dialect())
 
     def test_extract(self):
         t = table('t', column('col1', DateTime), column('col2', Date),

test/sql/test_constraints.py

 from sqlalchemy.testing import assert_raises, assert_raises_message
 from sqlalchemy import *
 from sqlalchemy import exc, schema
-from sqlalchemy.testing import fixtures, AssertsExecutionResults, AssertsCompiledSQL
+from sqlalchemy.testing import fixtures, AssertsExecutionResults, \
+                    AssertsCompiledSQL
 from sqlalchemy import testing
-from sqlalchemy.testing import config, engines
-from sqlalchemy.engine import ddl
+from sqlalchemy.testing import engines
 from sqlalchemy.testing import eq_
 from sqlalchemy.testing.assertsql import AllOf, RegexSQL, ExactSQL, CompiledSQL
-from sqlalchemy.dialects.postgresql import base as postgresql
 
-class ConstraintTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
+class ConstraintGenTest(fixtures.TestBase, AssertsExecutionResults):
     __dialect__ = 'default'
 
-    def setup(self):
-        global metadata
-        metadata = MetaData(testing.db)
+    @testing.provide_metadata
+    def test_pk_fk_constraint_create(self):
+        metadata = self.metadata
 
-    def teardown(self):
-        metadata.drop_all()
-
-    def test_constraint(self):
-        employees = Table('employees', metadata,
+        Table('employees', metadata,
             Column('id', Integer),
             Column('soc', String(40)),
             Column('name', String(30)),
             PrimaryKeyConstraint('id', 'soc')
             )
-        elements = Table('elements', metadata,
+        Table('elements', metadata,
             Column('id', Integer),
             Column('stuff', String(30)),
             Column('emp_id', Integer),
             Column('emp_soc', String(40)),
             PrimaryKeyConstraint('id', name='elements_primkey'),
-            ForeignKeyConstraint(['emp_id', 'emp_soc'], ['employees.id', 'employees.soc'])
+            ForeignKeyConstraint(['emp_id', 'emp_soc'],
+                                ['employees.id', 'employees.soc'])
             )
-        metadata.create_all()
+        self.assert_sql_execution(
+            testing.db,
+            lambda: metadata.create_all(checkfirst=False),
+            CompiledSQL('CREATE TABLE employees ('
+                    'id INTEGER, '
+                    'soc VARCHAR(40), '
+                    'name VARCHAR(30), '
+                    'PRIMARY KEY (id, soc)'
+                    ')'
+            ),
+            CompiledSQL('CREATE TABLE elements ('
+                    'id INTEGER, '
+                    'stuff VARCHAR(30), '
+                    'emp_id INTEGER, '
+                    'emp_soc VARCHAR(40), '
+                    'CONSTRAINT elements_primkey PRIMARY KEY (id), '
+                    'FOREIGN KEY(emp_id, emp_soc) '
+                            'REFERENCES employees (id, soc)'
+                ')'
+            )
+        )
 
-    def test_double_fk_usage_raises(self):
-        f = ForeignKey('b.id')
 
-        Column('x', Integer, f)
-        assert_raises(exc.InvalidRequestError, Column, "y", Integer, f)
+    @testing.provide_metadata
+    def test_cyclic_fk_table_constraint_create(self):
+        metadata = self.metadata
 
-    def test_circular_constraint(self):
-        a = Table("a", metadata,
+        Table("a", metadata,
             Column('id', Integer, primary_key=True),
             Column('bid', Integer),
-            ForeignKeyConstraint(["bid"], ["b.id"], name="afk")
+            ForeignKeyConstraint(["bid"], ["b.id"])
             )
-        b = Table("b", metadata,
+        Table("b", metadata,
             Column('id', Integer, primary_key=True),
             Column("aid", Integer),
             ForeignKeyConstraint(["aid"], ["a.id"], use_alter=True, name="bfk")
             )
-        metadata.create_all()
+        self._assert_cyclic_constraint(metadata)
 
-    def test_circular_constraint_2(self):
-        a = Table("a", metadata,
+    @testing.provide_metadata
+    def test_cyclic_fk_column_constraint_create(self):
+        metadata = self.metadata
+
+        Table("a", metadata,
             Column('id', Integer, primary_key=True),
             Column('bid', Integer, ForeignKey("b.id")),
             )
-        b = Table("b", metadata,
+        Table("b", metadata,
             Column('id', Integer, primary_key=True),
-            Column("aid", Integer, ForeignKey("a.id", use_alter=True, name="bfk")),
+            Column("aid", Integer,
+                ForeignKey("a.id", use_alter=True, name="bfk")
+                ),
             )
-        metadata.create_all()
+        self._assert_cyclic_constraint(metadata)
 
-    @testing.fails_on('mysql', 'FIXME: unknown')
-    def test_check_constraint(self):
-        foo = Table('foo', metadata,
+    def _assert_cyclic_constraint(self, metadata):
+        assertions = [
+            CompiledSQL('CREATE TABLE b ('
+                    'id INTEGER NOT NULL, '
+                    'aid INTEGER, '
+                    'PRIMARY KEY (id)'
+                    ')'
+                    ),
+            CompiledSQL('CREATE TABLE a ('
+                    'id INTEGER NOT NULL, '
+                    'bid INTEGER, '
+                    'PRIMARY KEY (id), '
+                    'FOREIGN KEY(bid) REFERENCES b (id)'
+                    ')'
+                ),
+        ]
+        if testing.db.dialect.supports_alter:
+            assertions.append(
+                CompiledSQL('ALTER TABLE b ADD CONSTRAINT bfk '
+                        'FOREIGN KEY(aid) REFERENCES a (id)')
+            )
+        self.assert_sql_execution(
+            testing.db,
+            lambda: metadata.create_all(checkfirst=False),
+            *assertions
+        )
+
+        assertions = []
+        if testing.db.dialect.supports_alter:
+            assertions.append(CompiledSQL('ALTER TABLE b DROP CONSTRAINT bfk'))
+        assertions.extend([
+            CompiledSQL("DROP TABLE a"),
+            CompiledSQL("DROP TABLE b"),
+            ])
+        self.assert_sql_execution(
+            testing.db,
+            lambda: metadata.drop_all(checkfirst=False),
+            *assertions
+        )
+
+    @testing.provide_metadata
+    def test_check_constraint_create(self):
+        metadata = self.metadata
+
+        Table('foo', metadata,
             Column('id', Integer, primary_key=True),
             Column('x', Integer),
             Column('y', Integer),
             CheckConstraint('x>y'))
-        bar = Table('bar', metadata,
+        Table('bar', metadata,
             Column('id', Integer, primary_key=True),
             Column('x', Integer, CheckConstraint('x>7')),
             Column('z', Integer)
             )
 
-        metadata.create_all()
-        foo.insert().execute(id=1,x=9,y=5)
-        assert_raises(exc.DBAPIError, foo.insert().execute, id=2,x=5,y=9)
-        bar.insert().execute(id=1,x=10)
-        assert_raises(exc.DBAPIError, bar.insert().execute, id=2,x=5)
+        self.assert_sql_execution(
+            testing.db,
+            lambda: metadata.create_all(checkfirst=False),
+            AllOf(
+                CompiledSQL('CREATE TABLE foo ('
+                        'id INTEGER NOT NULL, '
+                        'x INTEGER, '
+                        'y INTEGER, '
+                        'PRIMARY KEY (id), '
+                        'CHECK (x>y)'
+                        ')'
+                        ),
+                CompiledSQL('CREATE TABLE bar ('
+                        'id INTEGER NOT NULL, '
+                        'x INTEGER CHECK (x>7), '
+                        'z INTEGER, '
+                        'PRIMARY KEY (id)'
+                        ')'
+                )
+            )
+        )
 
-    def test_unique_constraint(self):
-        foo = Table('foo', metadata,
+    @testing.provide_metadata
+    def test_unique_constraint_create(self):
+        metadata = self.metadata
+
+        Table('foo', metadata,
             Column('id', Integer, primary_key=True),
             Column('value', String(30), unique=True))
-        bar = Table('bar', metadata,
+        Table('bar', metadata,
             Column('id', Integer, primary_key=True),
             Column('value', String(30)),
             Column('value2', String(30)),
             UniqueConstraint('value', 'value2', name='uix1')
             )
-        metadata.create_all()
-        foo.insert().execute(id=1, value='value1')
-        foo.insert().execute(id=2, value='value2')
-        bar.insert().execute(id=1, value='a', value2='a')
-        bar.insert().execute(id=2, value='a', value2='b')
-        assert_raises(exc.DBAPIError, foo.insert().execute, id=3, value='value1')
-        assert_raises(exc.DBAPIError, bar.insert().execute, id=3, value='a', value2='b')
 
+        self.assert_sql_execution(
+            testing.db,
+            lambda: metadata.create_all(checkfirst=False),
+            AllOf(
+                CompiledSQL('CREATE TABLE foo ('
+                        'id INTEGER NOT NULL, '
+                        'value VARCHAR(30), '
+                        'PRIMARY KEY (id), '
+                        'UNIQUE (value)'
+                    ')'),
+                CompiledSQL('CREATE TABLE bar ('
+                        'id INTEGER NOT NULL, '
+                        'value VARCHAR(30), '
+                        'value2 VARCHAR(30), '
+                        'PRIMARY KEY (id), '
+                        'CONSTRAINT uix1 UNIQUE (value, value2)'
+                        ')')
+            )
+        )
+
+    @testing.provide_metadata
     def test_index_create(self):
+        metadata = self.metadata
+
         employees = Table('employees', metadata,
                           Column('id', Integer, primary_key=True),
                           Column('first_name', String(30)),
                           Column('last_name', String(30)),
                           Column('email_address', String(30)))
-        employees.create()
 
         i = Index('employee_name_index',
                   employees.c.last_name, employees.c.first_name)
-        i.create()
         assert i in employees.indexes
 
         i2 = Index('employee_email_index',
                    employees.c.email_address, unique=True)
-        i2.create()
         assert i2 in employees.indexes
 
+        self.assert_sql_execution(
+            testing.db,
+            lambda: metadata.create_all(checkfirst=False),
+            RegexSQL("^CREATE TABLE"),
+            AllOf(
+                CompiledSQL('CREATE INDEX employee_name_index ON '
+                        'employees (last_name, first_name)', []),
+                CompiledSQL('CREATE UNIQUE INDEX employee_email_index ON '
+                        'employees (email_address)', [])
+            )
+        )
+
+    @testing.provide_metadata
     def test_index_create_camelcase(self):
         """test that mixed-case index identifiers are legal"""
 
+        metadata = self.metadata
+
         employees = Table('companyEmployees', metadata,
                           Column('id', Integer, primary_key=True),
                           Column('firstName', String(30)),
                           Column('lastName', String(30)),
                           Column('emailAddress', String(30)))
 
-        employees.create()
 
-        i = Index('employeeNameIndex',
+
+        Index('employeeNameIndex',
                   employees.c.lastName, employees.c.firstName)
-        i.create()
 
-        i = Index('employeeEmailIndex',
+        Index('employeeEmailIndex',
                   employees.c.emailAddress, unique=True)
-        i.create()
 
-        # Check that the table is useable. This is mostly for pg,
-        # which can be somewhat sticky with mixed-case identifiers
-        employees.insert().execute(firstName='Joe', lastName='Smith', id=0)
-        ss = employees.select().execute().fetchall()
-        assert ss[0].firstName == 'Joe'
-        assert ss[0].lastName == 'Smith'
+        self.assert_sql_execution(
+            testing.db,
+            lambda: metadata.create_all(checkfirst=False),
+            RegexSQL("^CREATE TABLE"),
+            AllOf(
+                CompiledSQL('CREATE INDEX "employeeNameIndex" ON '
+                        '"companyEmployees" ("lastName", "firstName")', []),
+                CompiledSQL('CREATE UNIQUE INDEX "employeeEmailIndex" ON '
+                        '"companyEmployees" ("emailAddress")', [])
+            )
+        )
 
+    @testing.provide_metadata
     def test_index_create_inline(self):
-        """Test indexes defined with tables"""
+        # test an index create using index=True, unique=True
+
+        metadata = self.metadata
 
         events = Table('events', metadata,
                        Column('id', Integer, primary_key=True),
                        Column('announcer', String(30)),
                        Column('winner', String(30)))
 
-        Index('sport_announcer', events.c.sport, events.c.announcer, unique=True)
+        Index('sport_announcer', events.c.sport, events.c.announcer,
+                                    unique=True)
         Index('idx_winners', events.c.winner)
 
         eq_(
-            set([ ix.name for ix in events.indexes ]),
-            set(['ix_events_name', 'ix_events_location', 'sport_announcer', 'idx_winners'])
+            set(ix.name for ix in events.indexes),
+            set(['ix_events_name', 'ix_events_location',
+                        'sport_announcer', 'idx_winners'])
         )
 
         self.assert_sql_execution(
             lambda: events.create(testing.db),
             RegexSQL("^CREATE TABLE events"),
             AllOf(
-                ExactSQL('CREATE UNIQUE INDEX ix_events_name ON events (name)'),
-                ExactSQL('CREATE INDEX ix_events_location ON events (location)'),
-                ExactSQL('CREATE UNIQUE INDEX sport_announcer ON events (sport, announcer)'),
+                ExactSQL('CREATE UNIQUE INDEX ix_events_name ON events '
+                                        '(name)'),
+                ExactSQL('CREATE INDEX ix_events_location ON events '
+                                        '(location)'),
+                ExactSQL('CREATE UNIQUE INDEX sport_announcer ON events '
+                                        '(sport, announcer)'),
                 ExactSQL('CREATE INDEX idx_winners ON events (winner)')
             )
         )
 
-        # verify that the table is functional
-        events.insert().execute(id=1, name='hockey finals', location='rink',
-                                sport='hockey', announcer='some canadian',
-                                winner='sweden')
-        ss = events.select().execute().fetchall()
+    @testing.provide_metadata
+    def test_index_functional_create(self):
+        metadata = self.metadata
+
+        t = Table('sometable', metadata,
+                Column('id', Integer, primary_key=True),
+                Column('data', String(50))
+            )
+        Index('myindex', t.c.data.desc())
+        self.assert_sql_execution(
+            testing.db,
+            lambda: t.create(testing.db),
+            CompiledSQL('CREATE TABLE sometable (id INTEGER NOT NULL, '
+                            'data VARCHAR(50), PRIMARY KEY (id))'),
+            ExactSQL('CREATE INDEX myindex ON sometable (data DESC)')
+        )
+
+class ConstraintCompilationTest(fixtures.TestBase, AssertsCompiledSQL):
+    __dialect__ = 'default'
+
+    def test_create_plain(self):
+        t = Table('t', MetaData(), Column('x', Integer))
+        i = Index("xyz", t.c.x)
+        self.assert_compile(
+            schema.CreateIndex(i),
+            "CREATE INDEX xyz ON t (x)"
+        )
+
+    def test_drop_plain_unattached(self):
+        self.assert_compile(
+            schema.DropIndex(Index(name="xyz")),
+            "DROP INDEX xyz"
+        )
+
+    def test_drop_plain(self):
+        self.assert_compile(
+            schema.DropIndex(Index(name="xyz")),
+            "DROP INDEX xyz"
+        )
+
+    def test_create_schema(self):
+        t = Table('t', MetaData(), Column('x', Integer), schema="foo")
+        i = Index("xyz", t.c.x)
+        self.assert_compile(
+            schema.CreateIndex(i),
+            "CREATE INDEX xyz ON foo.t (x)"
+        )
+
+    def test_drop_schema(self):
+        t = Table('t', MetaData(), Column('x', Integer), schema="foo")
+        i = Index("xyz", t.c.x)
+        self.assert_compile(
+            schema.DropIndex(i),
+            "DROP INDEX foo.xyz"
+        )
+
 
     def test_too_long_idx_name(self):
         dialect = testing.db.dialect.__class__()
             dialect=dialect
         )
 
-    def test_index_declartion_inline(self):
+    def test_functional_index(self):
+        metadata = MetaData()
+        x = Table('x', metadata,
+                Column('q', String(50))
+            )
+        idx = Index('y', func.lower(x.c.q))
+
+        self.assert_compile(
+            schema.CreateIndex(idx),
+            "CREATE INDEX y ON x (lower(q))"
+        )
+
+        self.assert_compile(
+            schema.CreateIndex(idx),
+            "CREATE INDEX y ON x (lower(q))",
+            dialect=testing.db.dialect
+        )
+
+    def test_index_declaration_inline(self):
+        metadata = MetaData()
+
         t1 = Table('t1', metadata,
             Column('x', Integer),
             Column('y', Integer),
             "CREATE INDEX foo ON t1 (x, y)"
         )
 
-    def test_index_asserts_cols_standalone(self):
-        t1 = Table('t1', metadata,
-            Column('x', Integer)
-        )
-        t2 = Table('t2', metadata,
-            Column('y', Integer)
-        )
-        assert_raises_message(
-            exc.ArgumentError,
-            "Column 't2.y' is not part of table 't1'.",
-            Index,
-            "bar", t1.c.x, t2.c.y
-        )
-
-    def test_index_asserts_cols_inline(self):
-        t1 = Table('t1', metadata,
-            Column('x', Integer)
-        )
-        assert_raises_message(
-            exc.ArgumentError,
-            "Index 'bar' is against table 't1', and "
-            "cannot be associated with table 't2'.",
-            Table, 't2', metadata,
-                Column('y', Integer),
-                Index('bar', t1.c.x)
-        )
-
-    def test_raise_index_nonexistent_name(self):
-        m = MetaData()
-        # the KeyError isn't ideal here, a nicer message
-        # perhaps
-        assert_raises(
-            KeyError,
-            Table, 't', m, Column('x', Integer), Index("foo", "q")
-        )
-
-    def test_raise_not_a_column(self):
-        assert_raises(
-            exc.ArgumentError,
-            Index, "foo", 5
-        )
-
-    def test_no_warning_w_no_columns(self):
-        Index(name="foo")
-
-    def test_raise_clauseelement_not_a_column(self):
-        m = MetaData()
-        t2 = Table('t2', m, Column('x', Integer))
-        class SomeClass(object):
-            def __clause_element__(self):
-                return t2
-        assert_raises(
-            exc.ArgumentError,
-            Index, "foo", SomeClass()
-        )
-
-    def test_create_plain(self):
-        t = Table('t', MetaData(), Column('x', Integer))
-        i = Index("xyz", t.c.x)
-        self.assert_compile(
-            schema.CreateIndex(i),
-            "CREATE INDEX xyz ON t (x)"
-        )
-
-    def test_drop_plain_unattached(self):
-        self.assert_compile(
-            schema.DropIndex(Index(name="xyz")),
-            "DROP INDEX xyz"
-        )
-
-    def test_drop_plain(self):
-        t = Table('t', MetaData(), Column('x', Integer))
-        i = Index("xyz", t.c.x)
-        self.assert_compile(
-            schema.DropIndex(Index(name="xyz")),
-            "DROP INDEX xyz"
-        )
-
-    def test_create_schema(self):
-        t = Table('t', MetaData(), Column('x', Integer), schema="foo")
-        i = Index("xyz", t.c.x)
-        self.assert_compile(
-            schema.CreateIndex(i),
-            "CREATE INDEX xyz ON foo.t (x)"
-        )
-
-    def test_drop_schema(self):
-        t = Table('t', MetaData(), Column('x', Integer), schema="foo")
-        i = Index("xyz", t.c.x)
-        self.assert_compile(
-            schema.DropIndex(i),
-            "DROP INDEX foo.xyz"
-        )
-
-class ConstraintCompilationTest(fixtures.TestBase, AssertsCompiledSQL):
-    __dialect__ = 'default'
-
     def _test_deferrable(self, constraint_factory):
         t = Table('tbl', MetaData(),
                   Column('a', Integer),
 
     def test_column_level_ck_name(self):
         t = Table('tbl', MetaData(),
-            Column('a', Integer, CheckConstraint("a > 5", name="ck_a_greater_five"))
+            Column('a', Integer, CheckConstraint("a > 5",
+                                name="ck_a_greater_five"))
         )
         self.assert_compile(
             schema.CreateTable(t),
 
     def test_multiple(self):
         m = MetaData()
-        foo = Table("foo", m,
+        Table("foo", m,
             Column('id', Integer, primary_key=True),
             Column('bar', Integer, primary_key=True)
         )
 
         self.assert_compile(
             schema.CreateTable(t),
-            "CREATE TABLE tbl (a INTEGER, b INTEGER CHECK (a < b) DEFERRABLE INITIALLY DEFERRED)"
+            "CREATE TABLE tbl (a INTEGER, b INTEGER CHECK (a < b) "
+                "DEFERRABLE INITIALLY DEFERRED)"
         )
 
     def test_use_alter(self):
         m = MetaData()
-        t = Table('t', m,
+        Table('t', m,
                   Column('a', Integer),
         )
 
-        t2 = Table('t2', m,
-                Column('a', Integer, ForeignKey('t.a', use_alter=True, name='fk_ta')),
-                Column('b', Integer, ForeignKey('t.a', name='fk_tb')), # to ensure create ordering ...
+        Table('t2', m,
+                Column('a', Integer, ForeignKey('t.a', use_alter=True,
+                                                        name='fk_ta')),
+                Column('b', Integer, ForeignKey('t.a', name='fk_tb'))
         )
 
         e = engines.mock_engine(dialect_name='postgresql')
 
         e.assert_sql([
             'CREATE TABLE t (a INTEGER)',
-            'CREATE TABLE t2 (a INTEGER, b INTEGER, CONSTRAINT fk_tb FOREIGN KEY(b) REFERENCES t (a))',
-            'ALTER TABLE t2 ADD CONSTRAINT fk_ta FOREIGN KEY(a) REFERENCES t (a)',
+            'CREATE TABLE t2 (a INTEGER, b INTEGER, CONSTRAINT fk_tb '
+                            'FOREIGN KEY(b) REFERENCES t (a))',
+            'ALTER TABLE t2 '
+                    'ADD CONSTRAINT fk_ta FOREIGN KEY(a) REFERENCES t (a)',
             'ALTER TABLE t2 DROP CONSTRAINT fk_ta',
             'DROP TABLE t2',
             'DROP TABLE t'
         ])
 
-
-    def test_add_drop_constraint(self):
+    def _constraint_create_fixture(self):
         m = MetaData()
 
         t = Table('tbl', m,
                 Column('b', Integer)
         )
 
-        constraint = CheckConstraint('a < b',name="my_test_constraint",
-                                        deferrable=True,initially='DEFERRED',
+        return t, t2
+
+    def test_render_ck_constraint_inline(self):
+        t, t2 = self._constraint_create_fixture()
+
+        CheckConstraint('a < b', name="my_test_constraint",
+                                        deferrable=True, initially='DEFERRED',
                                         table=t)
 
-
         # before we create an AddConstraint,
         # the CONSTRAINT comes out inline
         self.assert_compile(
             "CREATE TABLE tbl ("
             "a INTEGER, "
             "b INTEGER, "
-            "CONSTRAINT my_test_constraint CHECK (a < b) DEFERRABLE INITIALLY DEFERRED"
+            "CONSTRAINT my_test_constraint CHECK (a < b) "
+                        "DEFERRABLE INITIALLY DEFERRED"
             ")"
         )
 
+    def test_render_ck_constraint_external(self):
+        t, t2 = self._constraint_create_fixture()
+
+        constraint = CheckConstraint('a < b', name="my_test_constraint",
+                                        deferrable=True, initially='DEFERRED',
+                                        table=t)
+
         self.assert_compile(
             schema.AddConstraint(constraint),
             "ALTER TABLE tbl ADD CONSTRAINT my_test_constraint "
                     "CHECK (a < b) DEFERRABLE INITIALLY DEFERRED"
         )
 
+    def test_external_ck_constraint_cancels_internal(self):
+        t, t2 = self._constraint_create_fixture()
+
+        constraint = CheckConstraint('a < b', name="my_test_constraint",
+                                        deferrable=True, initially='DEFERRED',
+                                        table=t)
+
+        schema.AddConstraint(constraint)
+
         # once we make an AddConstraint,
         # inline compilation of the CONSTRAINT
         # is disabled
             ")"
         )
 
+    def test_render_drop_constraint(self):
+        t, t2 = self._constraint_create_fixture()
+
+        constraint = CheckConstraint('a < b', name="my_test_constraint",
+                                        deferrable=True, initially='DEFERRED',
+                                        table=t)
+
         self.assert_compile(
             schema.DropConstraint(constraint),
             "ALTER TABLE tbl DROP CONSTRAINT my_test_constraint"
         )
 
+    def test_render_drop_constraint_cascade(self):
+        t, t2 = self._constraint_create_fixture()
+
+        constraint = CheckConstraint('a < b', name="my_test_constraint",
+                                        deferrable=True, initially='DEFERRED',
+                                        table=t)
+
         self.assert_compile(
             schema.DropConstraint(constraint, cascade=True),
             "ALTER TABLE tbl DROP CONSTRAINT my_test_constraint CASCADE"
         )
 
+    def test_render_add_fk_constraint_stringcol(self):
+        t, t2 = self._constraint_create_fixture()
+
         constraint = ForeignKeyConstraint(["b"], ["t2.a"])
         t.append_constraint(constraint)
         self.assert_compile(
             "ALTER TABLE tbl ADD FOREIGN KEY(b) REFERENCES t2 (a)"
         )
 
+    def test_render_add_fk_constraint_realcol(self):
+        t, t2 = self._constraint_create_fixture()
+
         constraint = ForeignKeyConstraint([t.c.a], [t2.c.b])
         t.append_constraint(constraint)
         self.assert_compile(
             "ALTER TABLE tbl ADD FOREIGN KEY(a) REFERENCES t2 (b)"
         )
 
+    def test_render_add_uq_constraint_stringcol(self):
+        t, t2 = self._constraint_create_fixture()
+
         constraint = UniqueConstraint("a", "b", name="uq_cst")
         t2.append_constraint(constraint)
         self.assert_compile(
             "ALTER TABLE t2 ADD CONSTRAINT uq_cst UNIQUE (a, b)"
         )
 
+    def test_render_add_uq_constraint_realcol(self):
+        t, t2 = self._constraint_create_fixture()
+
         constraint = UniqueConstraint(t2.c.a, t2.c.b, name="uq_cs2")
         self.assert_compile(
             schema.AddConstraint(constraint),
             "ALTER TABLE t2 ADD CONSTRAINT uq_cs2 UNIQUE (a, b)"
         )
 
+    def test_render_add_pk_constraint(self):
+        t, t2 = self._constraint_create_fixture()
+
         assert t.c.a.primary_key is False
         constraint = PrimaryKeyConstraint(t.c.a)
         assert t.c.a.primary_key is True
             "ALTER TABLE tbl ADD PRIMARY KEY (a)"
         )
 
+class ConstraintAPITest(fixtures.TestBase):
+    def test_double_fk_usage_raises(self):
+        f = ForeignKey('b.id')
+
+        Column('x', Integer, f)
+        assert_raises(exc.InvalidRequestError, Column, "y", Integer, f)
+
     def test_auto_append_constraint(self):
         m = MetaData()
 
                 Column('b', Integer)
         )
 
-        uq = UniqueConstraint(t.c.a)
-        ck = CheckConstraint(t.c.a > 5)
-        fk = ForeignKeyConstraint([t.c.a], [t2.c.a])
-        pk = PrimaryKeyConstraint(t.c.a)
+        UniqueConstraint(t.c.a)
+        CheckConstraint(t.c.a > 5)
+        ForeignKeyConstraint([t.c.a], [t2.c.a])
+        PrimaryKeyConstraint(t.c.a)
 
         m2 = MetaData()
 
         c = CheckConstraint(t.c.a > t2.c.b)
         assert c not in t.constraints
         assert c not in t2.constraints
+
+    def test_index_asserts_cols_standalone(self):
+        metadata = MetaData()
+
+        t1 = Table('t1', metadata,
+            Column('x', Integer)
+        )
+        t2 = Table('t2', metadata,
+            Column('y', Integer)
+        )
+        assert_raises_message(
+            exc.ArgumentError,
+            "Column 't2.y' is not part of table 't1'.",
+            Index,
+            "bar", t1.c.x, t2.c.y
+        )
+
+    def test_index_asserts_cols_inline(self):
+        metadata = MetaData()
+
+        t1 = Table('t1', metadata,
+            Column('x', Integer)
+        )
+        assert_raises_message(
+            exc.ArgumentError,
+            "Index 'bar' is against table 't1', and "
+            "cannot be associated with table 't2'.",
+            Table, 't2', metadata,
+                Column('y', Integer),
+                Index('bar', t1.c.x)
+        )
+
+    def test_raise_index_nonexistent_name(self):
+        m = MetaData()
+        # the KeyError isn't ideal here, a nicer message
+        # perhaps
+        assert_raises(
+            KeyError,
+            Table, 't', m, Column('x', Integer), Index("foo", "q")
+        )
+
+    def test_raise_not_a_column(self):
+        assert_raises(
+            exc.ArgumentError,
+            Index, "foo", 5
+        )
+
+    def test_raise_expr_no_column(self):
+        idx = Index('foo', func.lower(5))
+
+        assert_raises_message(
+            exc.CompileError,
+            "Index 'foo' is not associated with any table.",
+            schema.CreateIndex(idx).compile, dialect=testing.db.dialect
+        )
+        assert_raises_message(
+            exc.CompileError,
+            "Index 'foo' is not associated with any table.",
+            schema.CreateIndex(idx).compile
+        )
+
+
+    def test_no_warning_w_no_columns(self):
+        # I think the test here is, there is no warning.
+        # people want to create empty indexes for the purpose of
+        # a drop.
+        idx = Index(name="foo")
+
+        assert_raises_message(
+            exc.CompileError,
+            "Index 'foo' is not associated with any table.",
+            schema.CreateIndex(idx).compile, dialect=testing.db.dialect
+        )
+        assert_raises_message(
+            exc.CompileError,
+            "Index 'foo' is not associated with any table.",
+            schema.CreateIndex(idx).compile
+        )
+
+    def test_raise_clauseelement_not_a_column(self):
+        m = MetaData()
+        t2 = Table('t2', m, Column('x', Integer))
+        class SomeClass(object):
+            def __clause_element__(self):
+                return t2
+        assert_raises(
+            exc.ArgumentError,
+            Index, "foo", SomeClass()
+        )
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.