1. Aurélien Matouillot
  2. sqlalchemy

Commits

Mike Bayer  committed e4c31b4

- [feature] The Core oeprator system now includes
the `getitem` operator, i.e. the bracket
operator in Python. This is used at first
to provide index and slice behavior to the
Postgresql ARRAY type, and also provides a hook
for end-user definition of custom __getitem__
schemes which can be applied at the type
level as well as within ORM-level custom
operator schemes.

Note that this change has the effect that
descriptor-based __getitem__ schemes used by
the ORM in conjunction with synonym() or other
"descriptor-wrapped" schemes will need
to start using a custom comparator in order
to maintain this behavior.

- [feature] postgresql.ARRAY now supports
indexing and slicing. The Python [] operator
is available on all SQL expressions that are
of type ARRAY; integer or simple slices can be
passed. The slices can also be used on the
assignment side in the SET clause of an UPDATE
statement by passing them into Update.values();
see the docs for examples.

- [feature] Added new "array literal" construct
postgresql.array(). Basically a "tuple" that
renders as ARRAY[1,2,3].

  • Participants
  • Parent commits 5e85a7e
  • Branches default

Comments (0)

Files changed (15)

File CHANGES

View file
  • Ignore whitespace
     decryption, usage of Postgis functions, etc.
     [ticket:1534]
 
+  - [feature] The Core oeprator system now includes
+    the `getitem` operator, i.e. the bracket
+    operator in Python.  This is used at first
+    to provide index and slice behavior to the
+    Postgresql ARRAY type, and also provides a hook
+    for end-user definition of custom __getitem__
+    schemes which can be applied at the type
+    level as well as within ORM-level custom
+    operator schemes.
+
+    Note that this change has the effect that
+    descriptor-based __getitem__ schemes used by
+    the ORM in conjunction with synonym() or other
+    "descriptor-wrapped" schemes will need
+    to start using a custom comparator in order
+    to maintain this behavior.
+
   - [feature] Revised the rules used to determine
     the operator precedence for the user-defined
     operator, i.e. that granted using the ``op()``
     performance of bind/result processing.
     [ticket:2441]
 
+  - [feature] postgresql.ARRAY now supports
+    indexing and slicing.  The Python [] operator
+    is available on all SQL expressions that are
+    of type ARRAY; integer or simple slices can be
+    passed.  The slices can also be used on the
+    assignment side in the SET clause of an UPDATE
+    statement by passing them into Update.values();
+    see the docs for examples.
+
+  - [feature] Added new "array literal" construct
+    postgresql.array().  Basically a "tuple" that
+    renders as ARRAY[1,2,3].
+
   - [feature] Added support for the Postgresql ONLY
     keyword, which can appear corresponding to a
     table in a SELECT, UPDATE, or DELETE statement.

File doc/build/dialects/postgresql.rst

View file
  • Ignore whitespace
         MACADDR, NUMERIC, REAL, SMALLINT, TEXT, TIME, TIMESTAMP, \
         UUID, VARCHAR
 
-Types which are specific to PostgreSQL, or have PostgreSQL-specific 
+Types which are specific to PostgreSQL, or have PostgreSQL-specific
 construction arguments, are as follows:
 
 .. currentmodule:: sqlalchemy.dialects.postgresql
 
+.. autoclass:: array
+
 .. autoclass:: ARRAY
     :members: __init__
     :show-inheritance:

File lib/sqlalchemy/dialects/postgresql/__init__.py

View file
  • Ignore whitespace
 from .base import \
     INTEGER, BIGINT, SMALLINT, VARCHAR, CHAR, TEXT, NUMERIC, FLOAT, REAL, INET, \
     CIDR, UUID, BIT, MACADDR, DOUBLE_PRECISION, TIMESTAMP, TIME,\
-    DATE, BYTEA, BOOLEAN, INTERVAL, ARRAY, ENUM, dialect
+    DATE, BYTEA, BOOLEAN, INTERVAL, ARRAY, ENUM, dialect, array
 
 __all__ = (
 'INTEGER', 'BIGINT', 'SMALLINT', 'VARCHAR', 'CHAR', 'TEXT', 'NUMERIC', 'FLOAT', 'REAL', 'INET',
 'CIDR', 'UUID', 'BIT', 'MACADDR', 'DOUBLE_PRECISION', 'TIMESTAMP', 'TIME',
-'DATE', 'BYTEA', 'BOOLEAN', 'INTERVAL', 'ARRAY', 'ENUM', 'dialect'
+'DATE', 'BYTEA', 'BOOLEAN', 'INTERVAL', 'ARRAY', 'ENUM', 'dialect', 'array'
 )

File lib/sqlalchemy/dialects/postgresql/base.py

View file
  • Ignore whitespace
 
 from ... import sql, schema, exc, util
 from ...engine import default, reflection
-from ...sql import compiler, expression, util as sql_util
+from ...sql import compiler, expression, util as sql_util, operators
 from ... import types as sqltypes
 
 try:
 
 PGUuid = UUID
 
+class _Slice(expression.ColumnElement):
+    __visit_name__ = 'slice'
+    type = sqltypes.NULLTYPE
+    def __init__(self, slice_, source_comparator):
+        self.start = source_comparator._check_literal(
+                            source_comparator.expr,
+                            operators.getitem, slice_.start)
+        self.stop = source_comparator._check_literal(
+                            source_comparator.expr,
+                            operators.getitem, slice_.stop)
+
+class array(expression.Tuple):
+    """A Postgresql ARRAY literal.
+
+    This is used to produce ARRAY literals in SQL expressions, e.g.::
+
+        from sqlalchemy.dialects.postgresql import array
+        from sqlalchemy.dialects import postgresql
+        from sqlalchemy import select, func
+
+        stmt = select([
+                        array([1,2]) + array([3,4,5])
+                    ])
+
+        print stmt.compile(dialect=postgresql.dialect())
+
+    Produces the SQL::
+
+        SELECT ARRAY[%(param_1)s, %(param_2)s] ||
+            ARRAY[%(param_3)s, %(param_4)s, %(param_5)s]) AS anon_1
+
+    An instance of :class:`.array` will always have the datatype
+    :class:`.ARRAY`.  The "inner" type of the array is inferred from
+    the values present, unless the "type_" keyword argument is passed::
+
+        array(['foo', 'bar'], type_=CHAR)
+
+    .. versionadded:: 0.8 Added the :class:`~.postgresql.array` literal type.
+
+    See also:
+
+    :class:`.postgresql.ARRAY`
+
+    """
+    __visit_name__ = 'array'
+
+    def __init__(self, clauses, **kw):
+        super(array, self).__init__(*clauses, **kw)
+        self.type = ARRAY(self.type)
+
+    def _bind_param(self, operator, obj):
+        return array(*[
+            expression.BindParameter(None, o, _compared_to_operator=operator,
+                             _compared_to_type=self.type, unique=True)
+            for o in obj
+        ])
+
+    def self_group(self, against):
+        return self
+
 class ARRAY(sqltypes.Concatenable, sqltypes.TypeEngine):
     """Postgresql ARRAY type.
 
     Represents values as Python lists.
 
-    The ARRAY type may not be supported on all DBAPIs.
+    An :class:`.ARRAY` type is constructed given the "type"
+    of element::
+
+        mytable = Table("mytable", metadata,
+                Column("data", ARRAY(Integer))
+            )
+
+    The above type represents an N-dimensional array,
+    meaning Postgresql will interpret values with any number
+    of dimensions automatically.   To produce an INSERT
+    construct that passes in a 1-dimensional array of integers::
+
+        connection.execute(
+                mytable.insert(),
+                data=[1,2,3]
+        )
+
+    The :class:`.ARRAY` type can be constructed given a fixed number
+    of dimensions::
+
+        mytable = Table("mytable", metadata,
+                Column("data", ARRAY(Integer, dimensions=2))
+            )
+
+    This has the effect of the :class:`.ARRAY` type
+    specifying that number of bracketed blocks when a :class:`.Table`
+    is used in a CREATE TABLE statement, or when the type is used
+    within a :func:`.expression.cast` construct; it also causes
+    the bind parameter and result set processing of the type
+    to optimize itself to expect exactly that number of dimensions.
+    Note that Postgresql itself still allows N dimensions with such a type.
+
+    SQL expressions of type :class:`.ARRAY` have support for "index" and "slice"
+    behavior.  The Python ``[]`` operator works normally here, given
+    integer indexes or slices.  Note that Postgresql arrays default
+    to 1-based indexing.  The operator produces binary expression
+    constructs which will produce the appropriate SQL, both for
+    SELECT statements::
+
+        select([mytable.c.data[5], mytable.c.data[2:7]])
+
+    as well as UPDATE statements when the :meth:`.Update.values` method
+    is used::
+
+        mytable.update().values({mytable.c.data[5]:7,
+                        mytable.c.data[2:7]:[1,2,3]})
+
+    .. versionadded:: 0.8 Added support for index and slice operations
+       to the :class:`.ARRAY` type, including support for UPDATE
+       statements.
+
+    The :class:`.ARRAY` type may not be supported on all DBAPIs.
     It is known to work on psycopg2 and not pg8000.
 
+    See also:
+
+    :class:`.postgresql.array` - produce a literal array value.
 
     """
     __visit_name__ = 'ARRAY'
 
+    class Comparator(sqltypes.Concatenable.Comparator):
+        def __getitem__(self, index):
+            if isinstance(index, slice):
+                index = _Slice(index, self)
+                return_type = self.type
+            else:
+                return_type = self.type.item_type
+            return self._binary_operate(self.expr, operators.getitem, index,
+                            result_type=return_type)
+
+    comparator_factory = Comparator
+
     def __init__(self, item_type, as_tuple=False, dimensions=None):
         """Construct an ARRAY.
 
         :param item_type: The data type of items of this array. Note that
           dimensionality is irrelevant here, so multi-dimensional arrays like
           ``INTEGER[][]``, are constructed as ``ARRAY(Integer)``, not as
-          ``ARRAY(ARRAY(Integer))`` or such. The type mapping figures out on
-          the fly
+          ``ARRAY(ARRAY(Integer))`` or such.
 
         :param as_tuple=False: Specify whether return results
           should be converted to tuples from lists. DBAPIs such
 
 class PGCompiler(compiler.SQLCompiler):
 
+    def visit_array(self, element, **kw):
+        return "ARRAY[%s]" % self.visit_clauselist(element, **kw)
+
+    def visit_slice(self, element, **kw):
+        return "%s:%s" % (
+                    self.process(element.start, **kw),
+                    self.process(element.stop, **kw),
+                )
+
+    def visit_getitem_binary(self, binary, operator, **kw):
+        return "%s[%s]" % (
+                self.process(binary.left, **kw),
+                self.process(binary.right, **kw)
+            )
+
     def visit_match_op_binary(self, binary, operator, **kw):
         return "%s @@ to_tsquery(%s)" % (
-                        self.process(binary.left),
-                        self.process(binary.right))
+                        self.process(binary.left, **kw),
+                        self.process(binary.right, **kw))
 
     def visit_ilike_op_binary(self, binary, operator, **kw):
         escape = binary.modifiers.get("escape", None)
         return '%s ILIKE %s' % \
-                (self.process(binary.left), self.process(binary.right)) \
+                (self.process(binary.left, **kw),
+                    self.process(binary.right, **kw)) \
                 + (escape and
                         (' ESCAPE ' + self.render_literal_value(escape, None))
                         or '')
     def visit_notilike_op_binary(self, binary, operator, **kw):
         escape = binary.modifiers.get("escape", None)
         return '%s NOT ILIKE %s' % \
-                (self.process(binary.left), self.process(binary.right)) \
+                (self.process(binary.left, **kw),
+                    self.process(binary.right, **kw)) \
                 + (escape and
                         (' ESCAPE ' + self.render_literal_value(escape, None))
                         or '')
     def limit_clause(self, select):
         text = ""
         if select._limit is not None:
-            text +=  " \n LIMIT " + self.process(sql.literal(select._limit))
+            text += " \n LIMIT " + self.process(sql.literal(select._limit))
         if select._offset is not None:
             if select._limit is None:
                 text += " \n LIMIT ALL"

File lib/sqlalchemy/sql/compiler.py

View file
  • Ignore whitespace
                                     within_columns_clause=False,
                                     **kw)
 
-    def visit_column(self, column, add_to_result_map=None, **kwargs):
+    def visit_column(self, column, add_to_result_map=None,
+                                    include_table=True, **kwargs):
         name = orig_name = column.name
         if name is None:
             raise exc.CompileError("Cannot compile Column object until "
             name = self.preparer.quote(name, column.quote)
 
         table = column.table
-        if table is None or not table.named_with_column:
+        if table is None or not include_table or not table.named_with_column:
             return name
         else:
             if table.schema:
         text += table_text
 
         text += ' SET '
-        if extra_froms and self.render_table_with_column_in_update_from:
-            text += ', '.join(
-                            self.visit_column(c[0]) +
-                            '=' + c[1] for c in colparams
-                            )
-        else:
-            text += ', '.join(
-                        self.preparer.quote(c[0].name, c[0].quote) +
+        include_table = extra_froms and \
+                        self.render_table_with_column_in_update_from
+        text += ', '.join(
+                        c[0]._compiler_dispatch(self,
+                            include_table=include_table) +
                         '=' + c[1] for c in colparams
-                            )
+                        )
 
         if update_stmt._returning:
             self.returning = update_stmt._returning
                               if not stmt.parameters or
                               key not in stmt.parameters)
 
+        # create a list of column assignment clauses as tuples
+        values = []
+
         if stmt.parameters is not None:
             for k, v in stmt.parameters.iteritems():
-                parameters.setdefault(sql._column_as_key(k), v)
+                colkey = sql._column_as_key(k)
+                if colkey is not None:
+                    parameters.setdefault(colkey, v)
+                else:
+                    # a non-Column expression on the left side;
+                    # add it to values() in an "as-is" state,
+                    # coercing right side to bound param
+                    if sql._is_literal(v):
+                        v = self.process(sql.bindparam(None, v, type_=k.type))
+                    else:
+                        v = self.process(v.self_group())
 
-        # create a list of column assignment clauses as tuples
-        values = []
+                    values.append((k, v))
+
 
         need_pks = self.isinsert and \
                         not self.inline and \

File lib/sqlalchemy/sql/expression.py

View file
  • Ignore whitespace
         return element
     if hasattr(element, '__clause_element__'):
         element = element.__clause_element__()
-    return element.key
+    try:
+        return element.key
+    except AttributeError:
+        return None
 
 def _clause_element_as_expr(element):
     if hasattr(element, '__clause_element__'):
                             type_=sqltypes.BOOLEANTYPE,
                             negate=negate, modifiers=kwargs)
 
-    def _binary_operate(self, expr, op, obj, reverse=False):
+    def _binary_operate(self, expr, op, obj, reverse=False, result_type=None):
         obj = self._check_literal(expr, op, obj)
 
         if reverse:
         else:
             left, right = expr, obj
 
-        op, result_type = left.comparator._adapt_expression(op, right.comparator)
+        if result_type is None:
+            op, result_type = left.comparator._adapt_expression(
+                                                op, right.comparator)
 
         return BinaryExpression(left, right, op, type_=result_type)
 
         return self._boolean_compare(expr, op,
                               ClauseList(*args).self_group(against=op),
                               negate=negate_op)
+
+    def _unsupported_impl(self, expr, op, *arg, **kw):
+        raise NotImplementedError("Operator '%s' is not supported on "
+                            "this expression" % op.__name__)
+
     def _neg_impl(self, expr, op, **kw):
         """See :meth:`.ColumnOperators.__neg__`."""
         return UnaryExpression(expr, operator=operators.neg)
         "startswith_op": (_startswith_impl,),
         "endswith_op": (_endswith_impl,),
         "neg": (_neg_impl,),
+        "getitem": (_unsupported_impl,),
     }
 
 
 
     def __init__(self, *clauses, **kw):
         clauses = [_literal_as_binds(c) for c in clauses]
+        self.type = kw.pop('type_', None)
+        if self.type is None:
+            self.type = _type_from_args(clauses)
         super(Tuple, self).__init__(*clauses, **kw)
-        self.type = _type_from_args(clauses)
 
     @property
     def _select_iterable(self):

File lib/sqlalchemy/sql/operators.py

View file
  • Ignore whitespace
 """Defines operators used in SQL expressions."""
 
 from operator import (
-    and_, or_, inv, add, mul, sub, mod, truediv, lt, le, ne, gt, ge, eq, neg
+    and_, or_, inv, add, mul, sub, mod, truediv, lt, le, ne, gt, ge, eq, neg,
+    getitem
     )
 
 # Py2K
         """
         return self.operate(neg)
 
+    def __getitem__(self, index):
+        """Implement the [] operator.
+
+        This can be used by some database-specific types
+        such as Postgresql ARRAY and HSTORE.
+
+        """
+        return self.operate(getitem, index)
+
     def concat(self, other):
         """Implement the 'concat' operator.
 
 
 _associative = _commutative.union([concat_op, and_, or_])
 
+_natural_self_precedent = _associative.union([getitem])
+"""Operators where if we have (a op b) op c, we don't want to
+parenthesize (a op b).
+
+"""
 
 _smallest = symbol('_smallest', canonical=-100)
 _largest = symbol('_largest', canonical=100)
 
 _PRECEDENCE = {
     from_: 15,
+    getitem: 15,
     mul: 7,
     truediv: 7,
     # Py2K
 
 
 def is_precedent(operator, against):
-    if operator is against and operator in _associative:
+    if operator is against and operator in _natural_self_precedent:
         return False
     else:
         return (_PRECEDENCE.get(operator,

File lib/sqlalchemy/types.py

View file
  • Ignore whitespace
         """Base class for custom comparison operations defined at the
         type level.  See :attr:`.TypeEngine.comparator_factory`.
 
+        The public base class for :class:`.TypeEngine.Comparator`
+        is :class:`.ColumnOperators`.
+
         """
 
         def __init__(self, expr):

File lib/sqlalchemy/util/langhelpers.py

View file
  • Ignore whitespace
         dunders = [m for m in dir(from_cls)
                    if (m.startswith('__') and m.endswith('__') and
                        not hasattr(into_cls, m) and m not in skip)]
+
     for method in dunders:
         try:
             fn = getattr(from_cls, method)

File test/dialect/test_postgresql.py

View file
  • Ignore whitespace
 # coding: utf-8
-from test.lib.testing import eq_, assert_raises, assert_raises_message
+from test.lib.testing import eq_, assert_raises, assert_raises_message, is_
 from test.lib import  engines
 import datetime
 from sqlalchemy import *
         self.assert_compile(x,
             '''SELECT pg_table.col1, pg_table."variadic" FROM pg_table''')
 
+    def test_array(self):
+        c = Column('x', postgresql.ARRAY(Integer))
+
+        self.assert_compile(
+            cast(c, postgresql.ARRAY(Integer)),
+            "CAST(x AS INTEGER[])"
+        )
+        self.assert_compile(
+                c[5],
+                "x[%(x_1)s]",
+                checkparams={'x_1': 5}
+        )
+
+        self.assert_compile(
+                c[5:7],
+                "x[%(x_1)s:%(x_2)s]",
+                checkparams={'x_2': 7, 'x_1': 5}
+        )
+        self.assert_compile(
+                c[5:7][2:3],
+                "x[%(x_1)s:%(x_2)s][%(param_1)s:%(param_2)s]",
+                checkparams={'x_2': 7, 'x_1': 5, 'param_1':2, 'param_2':3}
+        )
+        self.assert_compile(
+                c[5:7][3],
+                "x[%(x_1)s:%(x_2)s][%(param_1)s]",
+                checkparams={'x_2': 7, 'x_1': 5, 'param_1':3}
+        )
+
+    def test_array_literal_type(self):
+        is_(postgresql.array([1, 2]).type._type_affinity, postgresql.ARRAY)
+        is_(postgresql.array([1, 2]).type.item_type._type_affinity, Integer)
+
+        is_(postgresql.array([1, 2], type_=String).
+                    type.item_type._type_affinity, String)
+
+    def test_array_literal(self):
+        self.assert_compile(
+            func.array_dims(postgresql.array([1, 2]) +
+                        postgresql.array([3, 4, 5])),
+            "array_dims(ARRAY[%(param_1)s, %(param_2)s] || "
+                    "ARRAY[%(param_3)s, %(param_4)s, %(param_5)s])",
+            checkparams={'param_5': 5, 'param_4': 4, 'param_1': 1,
+                'param_3': 3, 'param_2': 2}
+        )
+
+    def test_update_array_element(self):
+        m = MetaData()
+        t = Table('t', m, Column('data', postgresql.ARRAY(Integer)))
+        self.assert_compile(
+            t.update().values({t.c.data[5]: 1}),
+            "UPDATE t SET data[%(data_1)s]=%(param_1)s",
+            checkparams={'data_1': 5, 'param_1': 1}
+        )
+
+    def test_update_array_slice(self):
+        m = MetaData()
+        t = Table('t', m, Column('data', postgresql.ARRAY(Integer)))
+        self.assert_compile(
+            t.update().values({t.c.data[2:5]: 2}),
+            "UPDATE t SET data[%(data_1)s:%(data_2)s]=%(param_1)s",
+            checkparams={'param_1': 2, 'data_2': 5, 'data_1': 2}
+
+        )
+
     def test_from_only(self):
         m = MetaData()
         tbl1 = Table('testtbl1', m, Column('id', Integer))
         eq_(results[0]['intarr'], [1, 2, 3])
 
     def test_array_concat(self):
-        arrtable.insert().execute(intarr=[1, 2, 3], strarr=[u'abc',
-                                  u'def'])
+        arrtable.insert().execute(intarr=[1, 2, 3],
+                    strarr=[u'abc', u'def'])
         results = select([arrtable.c.intarr + [4, 5,
                          6]]).execute().fetchall()
         eq_(len(results), 1)
         eq_(results[0]['strarr'], [u'm\xe4\xe4', u'm\xf6\xf6'])
         eq_(results[1]['strarr'], [[u'm\xe4\xe4'], [u'm\xf6\xf6']])
 
+    def test_array_literal(self):
+        eq_(
+            testing.db.scalar(
+                select([
+                    postgresql.array([1, 2]) + postgresql.array([3, 4, 5])
+                ])
+                ), [1,2,3,4,5]
+        )
+
+    def test_array_getitem_single_type(self):
+        is_(arrtable.c.intarr[1].type._type_affinity, Integer)
+        is_(arrtable.c.strarr[1].type._type_affinity, String)
+
+    def test_array_getitem_slice_type(self):
+        is_(arrtable.c.intarr[1:3].type._type_affinity, postgresql.ARRAY)
+        is_(arrtable.c.strarr[1:3].type._type_affinity, postgresql.ARRAY)
+
+    def test_array_getitem_single_exec(self):
+        with testing.db.connect() as conn:
+            conn.execute(
+                arrtable.insert(),
+                intarr=[4, 5, 6],
+                strarr=[u'abc', u'def']
+            )
+            eq_(
+                conn.scalar(select([arrtable.c.intarr[2]])),
+                5
+            )
+            conn.execute(
+                arrtable.update().values({arrtable.c.intarr[2]: 7})
+            )
+            eq_(
+                conn.scalar(select([arrtable.c.intarr[2]])),
+                7
+            )
+
+    def test_array_getitem_slice_exec(self):
+        with testing.db.connect() as conn:
+            conn.execute(
+                arrtable.insert(),
+                intarr=[4, 5, 6],
+                strarr=[u'abc', u'def']
+            )
+            eq_(
+                conn.scalar(select([arrtable.c.intarr[2:3]])),
+                [5, 6]
+            )
+            conn.execute(
+                arrtable.update().values({arrtable.c.intarr[2:3]: [7, 8]})
+            )
+            eq_(
+                conn.scalar(select([arrtable.c.intarr[2:3]])),
+                [7, 8]
+            )
+
     @testing.provide_metadata
     def test_tuple_flag(self):
         metadata = self.metadata

File test/lib/profiles.txt

View file
  • Ignore whitespace
 
 # TEST: test.aaa_profiling.test_compiler.CompileTest.test_update
 
-test.aaa_profiling.test_compiler.CompileTest.test_update 2.5_sqlite_pysqlite_nocextensions 57
-test.aaa_profiling.test_compiler.CompileTest.test_update 2.6_sqlite_pysqlite_nocextensions 57
-test.aaa_profiling.test_compiler.CompileTest.test_update 2.7_mysql_mysqldb_cextensions 57
-test.aaa_profiling.test_compiler.CompileTest.test_update 2.7_mysql_mysqldb_nocextensions 57
-test.aaa_profiling.test_compiler.CompileTest.test_update 2.7_postgresql_psycopg2_cextensions 57
-test.aaa_profiling.test_compiler.CompileTest.test_update 2.7_postgresql_psycopg2_nocextensions 57
-test.aaa_profiling.test_compiler.CompileTest.test_update 2.7_sqlite_pysqlite_cextensions 57
-test.aaa_profiling.test_compiler.CompileTest.test_update 2.7_sqlite_pysqlite_nocextensions 57
-test.aaa_profiling.test_compiler.CompileTest.test_update 3.2_postgresql_psycopg2_nocextensions 60
-test.aaa_profiling.test_compiler.CompileTest.test_update 3.2_sqlite_pysqlite_nocextensions 60
+test.aaa_profiling.test_compiler.CompileTest.test_update 2.7_sqlite_pysqlite_cextensions 65
 
 # TEST: test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause
 
-test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause 2.5_sqlite_pysqlite_nocextensions 117
-test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause 2.6_sqlite_pysqlite_nocextensions 117
-test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause 2.7_mysql_mysqldb_cextensions 117
-test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause 2.7_mysql_mysqldb_nocextensions 117
-test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause 2.7_postgresql_psycopg2_cextensions 117
-test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause 2.7_postgresql_psycopg2_nocextensions 117
-test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause 2.7_sqlite_pysqlite_cextensions 117
-test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause 2.7_sqlite_pysqlite_nocextensions 117
-test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause 3.2_postgresql_psycopg2_nocextensions 122
-test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause 3.2_sqlite_pysqlite_nocextensions 122
+test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause 2.7_sqlite_pysqlite_cextensions 130
 
 # TEST: test.aaa_profiling.test_orm.LoadManyToOneFromIdentityTest.test_many_to_one_load_identity
 

File test/orm/test_descriptor.py

View file
  • Ignore whitespace
             def method1(self):
                 return "method1"
 
-            def __getitem__(self, key):
-                return 'value'
-
         prop = myprop(lambda self:None)
         Foo.foo = prop
 
         assert Foo.foo is not prop
         assert Foo.foo.attr == 'bar'
         assert Foo.foo.method1() == 'method1'
-        assert Foo.foo['bar'] == 'value'
 
     def test_comparator(self):
         class Comparator(PropComparator):
             def method2(self, other):
                 return "method2"
 
-            # TODO ?
-            #def __getitem__(self, key):
-            #    return 'value'
+            def __getitem__(self, key):
+                return 'value'
 
             def __eq__(self, other):
                 return column('foo') == func.upper(other)
         eq_(Foo.foo.method1(), "method1")
         eq_(Foo.foo.method2('x'), "method2")
         assert Foo.foo.attr == 'bar'
-        # TODO ?
-        #assert Foo.foo['bar'] == 'value'
+        assert Foo.foo['bar'] == 'value'
         eq_(
             (Foo.foo == 'bar').__str__(),
             "foo = upper(:upper_1)"

File test/orm/test_mapper.py

View file
  • Ignore whitespace
         assert_col = []
         class extendedproperty(property):
             attribute = 123
-            def __getitem__(self, key):
-                return 'value'
 
         class User(object):
             def _get_name(self):
         assert u in sess.dirty
 
         eq_(User.uname.attribute, 123)
-        eq_(User.uname['key'], 'value')
 
     def test_synonym_of_synonym(self):
         users,  User = (self.tables.users,
             def method1(self):
                 return "method1"
 
-            def __getitem__(self, key):
-                return 'value'
-
         from sqlalchemy.orm.properties import ColumnProperty
         class UCComparator(ColumnProperty.Comparator):
             __hash__ = None
             assert u2 is u3
 
             eq_(User.uc_name.attribute, 123)
-            eq_(User.uc_name['key'], 'value')
             sess.rollback()
 
     def test_comparable_column(self):

File test/sql/test_compiler.py

View file
  • Ignore whitespace
             checkparams={'date_1':datetime.date(2006,6,1),
                             'date_2':datetime.date(2006,6,5)})
 
-    def test_operator_precedence(self):
-        table = Table('op', metadata,
-            Column('field', Integer))
-        self.assert_compile(table.select((table.c.field == 5) == None),
-            "SELECT op.field FROM op WHERE (op.field = :field_1) IS NULL")
-        self.assert_compile(table.select((table.c.field + 5) == table.c.field),
-            "SELECT op.field FROM op WHERE op.field + :field_1 = op.field")
-        self.assert_compile(table.select((table.c.field + 5) * 6),
-            "SELECT op.field FROM op WHERE (op.field + :field_1) * :param_1")
-        self.assert_compile(table.select((table.c.field * 5) + 6),
-            "SELECT op.field FROM op WHERE op.field * :field_1 + :param_1")
-        self.assert_compile(table.select(5 + table.c.field.in_([5,6])),
-            "SELECT op.field FROM op WHERE :param_1 + (op.field IN (:field_1, :field_2))")
-        self.assert_compile(table.select((5 + table.c.field).in_([5,6])),
-            "SELECT op.field FROM op WHERE :field_1 + op.field IN (:param_1, :param_2)")
-        self.assert_compile(table.select(not_(and_(table.c.field == 5, table.c.field == 7))),
-            "SELECT op.field FROM op WHERE NOT (op.field = :field_1 AND op.field = :field_2)")
-        self.assert_compile(table.select(not_(table.c.field == 5)),
-            "SELECT op.field FROM op WHERE op.field != :field_1")
-        self.assert_compile(table.select(not_(table.c.field.between(5, 6))),
-            "SELECT op.field FROM op WHERE NOT (op.field BETWEEN :field_1 AND :field_2)")
-        self.assert_compile(table.select(not_(table.c.field) == 5),
-            "SELECT op.field FROM op WHERE (NOT op.field) = :param_1")
-        self.assert_compile(table.select((table.c.field == table.c.field).between(False, True)),
-            "SELECT op.field FROM op WHERE (op.field = op.field) BETWEEN :param_1 AND :param_2")
-        self.assert_compile(table.select(between((table.c.field == table.c.field), False, True)),
-            "SELECT op.field FROM op WHERE (op.field = op.field) BETWEEN :param_1 AND :param_2")
-
-    def test_associativity(self):
-        f = column('f')
-        self.assert_compile( f - f, "f - f" )
-        self.assert_compile( f - f - f, "(f - f) - f" )
-
-        self.assert_compile( (f - f) - f, "(f - f) - f" )
-        self.assert_compile( (f - f).label('foo') - f, "(f - f) - f" )
-
-        self.assert_compile( f - (f - f), "f - (f - f)" )
-        self.assert_compile( f - (f - f).label('foo'), "f - (f - f)" )
-
-        # because - less precedent than /
-        self.assert_compile( f / (f - f), "f / (f - f)" )
-        self.assert_compile( f / (f - f).label('foo'), "f / (f - f)" )
-
-        self.assert_compile( f / f - f, "f / f - f" )
-        self.assert_compile( (f / f) - f, "f / f - f" )
-        self.assert_compile( (f / f).label('foo') - f, "f / f - f" )
-
-        # because / more precedent than -
-        self.assert_compile( f - (f / f), "f - f / f" )
-        self.assert_compile( f - (f / f).label('foo'), "f - f / f" )
-        self.assert_compile( f - f / f, "f - f / f" )
-        self.assert_compile( (f - f) / f, "(f - f) / f" )
-
-        self.assert_compile( ((f - f) / f) - f, "(f - f) / f - f")
-        self.assert_compile( (f - f) / (f - f), "(f - f) / (f - f)")
-
-        # higher precedence
-        self.assert_compile( (f / f) - (f / f), "f / f - f / f")
-
-        self.assert_compile( (f / f) - (f - f), "f / f - (f - f)")
-        self.assert_compile( (f / f) / (f - f), "(f / f) / (f - f)")
-        self.assert_compile( f / (f / (f - f)), "f / (f / (f - f))")
-
 
     def test_delayed_col_naming(self):
         my_str = Column(String)
                 "mytable WHERE mytable.myid = :myid_1",
                 params = {table1.c.name:'fred'})
 
+    def test_update_to_expression(self):
+        """test update from an expression.
+
+        this logic is triggered currently by a left side that doesn't
+        have a key.  The current supported use case is updating the index
+        of a Postgresql ARRAY type.
+
+        """
+        expr = func.foo(table1.c.myid)
+        assert not hasattr(expr, "key")
+        self.assert_compile(
+            table1.update().values({expr: 'bar'}),
+            "UPDATE mytable SET foo(myid)=:param_1"
+        )
+
     def test_correlated_update(self):
         # test against a straight text subquery
         u = update(table1, values = {

File test/sql/test_operators.py

View file
  • Ignore whitespace
     def test_plus(self):
         self._do_operate_test(operators.add)
 
+    def test_no_getitem(self):
+        assert_raises_message(
+            NotImplementedError,
+            "Operator 'getitem' is not supported on this expression",
+            self._do_operate_test, operators.getitem
+        )
+        assert_raises_message(
+            NotImplementedError,
+            "Operator 'getitem' is not supported on this expression",
+            lambda: column('left')[3]
+        )
+
     def test_in(self):
         left = column('left')
         assert left.comparator.operate(operators.in_op, [1, 2, 3]).compare(
     def _assert_not_add_override(self, expr):
         assert not hasattr(expr, "foob")
 
+from sqlalchemy import and_, not_, between
+
+class OperatorPrecedenceTest(fixtures.TestBase, testing.AssertsCompiledSQL):
+    __dialect__ = 'default'
+
+    def test_operator_precedence(self):
+        # TODO: clean up /break up
+        metadata = MetaData()
+        table = Table('op', metadata,
+            Column('field', Integer))
+        self.assert_compile(table.select((table.c.field == 5) == None),
+            "SELECT op.field FROM op WHERE (op.field = :field_1) IS NULL")
+        self.assert_compile(table.select((table.c.field + 5) == table.c.field),
+            "SELECT op.field FROM op WHERE op.field + :field_1 = op.field")
+        self.assert_compile(table.select((table.c.field + 5) * 6),
+            "SELECT op.field FROM op WHERE (op.field + :field_1) * :param_1")
+        self.assert_compile(table.select((table.c.field * 5) + 6),
+            "SELECT op.field FROM op WHERE op.field * :field_1 + :param_1")
+        self.assert_compile(table.select(5 + table.c.field.in_([5, 6])),
+            "SELECT op.field FROM op WHERE :param_1 + "
+                        "(op.field IN (:field_1, :field_2))")
+        self.assert_compile(table.select((5 + table.c.field).in_([5, 6])),
+            "SELECT op.field FROM op WHERE :field_1 + op.field "
+                    "IN (:param_1, :param_2)")
+        self.assert_compile(table.select(not_(and_(table.c.field == 5,
+                        table.c.field == 7))),
+            "SELECT op.field FROM op WHERE NOT "
+                "(op.field = :field_1 AND op.field = :field_2)")
+        self.assert_compile(table.select(not_(table.c.field == 5)),
+            "SELECT op.field FROM op WHERE op.field != :field_1")
+        self.assert_compile(table.select(not_(table.c.field.between(5, 6))),
+            "SELECT op.field FROM op WHERE NOT "
+                    "(op.field BETWEEN :field_1 AND :field_2)")
+        self.assert_compile(table.select(not_(table.c.field) == 5),
+            "SELECT op.field FROM op WHERE (NOT op.field) = :param_1")
+        self.assert_compile(table.select((table.c.field == table.c.field).\
+                            between(False, True)),
+            "SELECT op.field FROM op WHERE (op.field = op.field) "
+                            "BETWEEN :param_1 AND :param_2")
+        self.assert_compile(table.select(
+                        between((table.c.field == table.c.field), False, True)),
+            "SELECT op.field FROM op WHERE (op.field = op.field) "
+                    "BETWEEN :param_1 AND :param_2")
+
+class OperatorAssociativityTest(fixtures.TestBase, testing.AssertsCompiledSQL):
+    __dialect__ = 'default'
+
+    def test_associativity(self):
+        # TODO: clean up /break up
+        f = column('f')
+        self.assert_compile(f - f, "f - f")
+        self.assert_compile(f - f - f, "(f - f) - f")
+
+        self.assert_compile((f - f) - f, "(f - f) - f")
+        self.assert_compile((f - f).label('foo') - f, "(f - f) - f")
+
+        self.assert_compile(f - (f - f), "f - (f - f)")
+        self.assert_compile(f - (f - f).label('foo'), "f - (f - f)")
+
+        # because - less precedent than /
+        self.assert_compile(f / (f - f), "f / (f - f)")
+        self.assert_compile(f / (f - f).label('foo'), "f / (f - f)")
+
+        self.assert_compile(f / f - f, "f / f - f")
+        self.assert_compile((f / f) - f, "f / f - f")
+        self.assert_compile((f / f).label('foo') - f, "f / f - f")
+
+        # because / more precedent than -
+        self.assert_compile(f - (f / f), "f - f / f")
+        self.assert_compile(f - (f / f).label('foo'), "f - f / f")
+        self.assert_compile(f - f / f, "f - f / f")
+        self.assert_compile((f - f) / f, "(f - f) / f")
+
+        self.assert_compile(((f - f) / f) - f, "(f - f) / f - f")
+        self.assert_compile((f - f) / (f - f), "(f - f) / (f - f)")
+
+        # higher precedence
+        self.assert_compile((f / f) - (f / f), "f / f - f / f")
+
+        self.assert_compile((f / f) - (f - f), "f / f - (f - f)")
+        self.assert_compile((f / f) / (f - f), "(f / f) / (f - f)")
+        self.assert_compile(f / (f / (f - f)), "f / (f / (f - f))")
+
+