Mike Bayer avatar Mike Bayer committed 7805137

merge -r4528:4546 of query_columns. query_columns is done

Comments (0)

Files changed (4)

lib/sqlalchemy/orm/query.py

 
 def _generative(*assertions):
     """mark a method as generative."""
-    
+
     def decorate(fn):
         argspec = util.format_argspec_plus(fn)
         run_assertions = assertions
         exec code in env
         return env[fn.__name__]
     return decorate
-    
+
 class Query(object):
     """Encapsulates the object-fetching operations provided by Mappers."""
 
     def __init__(self, entities, session=None, entity_name=None):
         self.session = session
-        
+
         self._with_options = []
         self._lockmode = None
         self._order_by = False
 
         for ent in util.to_list(entities):
             _QueryEntity(self, ent, entity_name=entity_name)
-                
+
         self.__setup_aliasizers(self._entities)
-        
+
     def __setup_aliasizers(self, entities):
         d = {}
         for ent in entities:
                         with_polymorphic = None
                     else:
                         with_polymorphic = adapter = None
-                
+
                     d[entity] = (mapper, adapter, selectable, is_aliased_class, with_polymorphic)
                 ent.setup_entity(entity, *d[entity])
-    
+
     def __mapper_loads_polymorphically_with(self, mapper, adapter):
         for m2 in mapper._with_polymorphic_mappers:
             for m in m2.iterate_to_root():
                 self._polymorphic_adapters[m.mapped_table] = self._polymorphic_adapters[m.local_table] = adapter
-        
+
     def __set_select_from(self, from_obj):
         if isinstance(from_obj, expression._SelectBaseMixin):
             # alias SELECTs and unions
 
     def _get_polymorphic_adapter(self, entity, selectable):
         self.__mapper_loads_polymorphically_with(entity.mapper, sql_util.ColumnAdapter(selectable, entity.mapper._equivalent_columns))
-    
+
     def _reset_polymorphic_adapter(self, mapper):
         for m2 in mapper._with_polymorphic_mappers:
             for m in m2.iterate_to_root():
                 self._polymorphic_adapters.pop(m.mapped_table, None)
                 self._polymorphic_adapters.pop(m.local_table, None)
-        
+
     def __reset_joinpoint(self):
         self._joinpoint = None
         self._filter_aliases = None
-        
+
     def __adapt_polymorphic_element(self, element):
         if isinstance(element, expression.FromClause):
             search = element
             search = element.table
         else:
             search = None
-            
+
         if search:
             alias = self._polymorphic_adapters.get(search, None)
             if alias:
         adapters = []    
         if as_filter and self._filter_aliases:
             adapters.append(self._filter_aliases.replace)
-        
+
         if self._polymorphic_adapters:
             adapters.append(self.__adapt_polymorphic_element)
 
         if self._from_obj_alias:
             adapters.append(self._from_obj_alias.replace)
-        
+
         if not adapters:
             return clause
             
 
     def _mapper_zero(self):
         return self._entity_zero().mapper
-    
+
     def _extension_zero(self):
         ent = self._entity_zero()
         return getattr(ent, 'extension', ent.mapper.extension)
-            
+
     def _mapper_entities(self):
         for ent in self._entities:
             if hasattr(ent, 'primary_entity'):
                 yield ent
     _mapper_entities = property(_mapper_entities)
-    
+
     def _joinpoint_zero(self):
         return self._joinpoint or self._entity_zero().entity
-        
+
     def _mapper_zero_or_none(self):
         if not getattr(self._entities[0], 'primary_entity', False):
             return None
         return self._entities[0].mapper
-        
+
     def _only_mapper_zero(self):
         if len(self._entities) > 1:
             raise sa_exc.InvalidRequestError("This operation requires a Query against a single mapper.")
         return self._mapper_zero()
-    
+
     def _only_entity_zero(self):
         if len(self._entities) > 1:
             raise sa_exc.InvalidRequestError("This operation requires a Query against a single mapper.")
         entity = self._entities[0]._clone()
         self._entities = [entity] + self._entities[1:]
         return entity
-        
+
     def __mapper_zero_from_obj(self):
         if self._from_obj:
             return self._from_obj
         for ent in self._mapper_entities:
             equivs.update(ent.mapper._equivalent_columns)
         return equivs
-    
+
     def __no_criterion_condition(self, meth):
         if self._criterion or self._statement or self._from_obj:
             util.warn(
         self._statement = self._criterion = self._from_obj = None
         self._order_by = self._group_by = self._distinct = False
         self.__joined_tables = {}
-    
+
     def __no_from_condition(self, meth):
         if self._from_obj:
             raise sa_exc.InvalidRequestError("Query.%s() being called on a Query which already has a FROM clause established.  This usage is deprecated.")
             raise sa_exc.InvalidRequestError(
                 ("Query.%s() being called on a Query with an existing full "
                  "statement - can't apply criterion.") % meth)
-    
+
     def __no_limit_offset(self, meth):
         if self._limit or self._offset:
             util.warn("Query.%s() being called on a Query which already has LIMIT or OFFSET applied. "
             "or to filter/join to the row-limited results of the query, call from_self() first."
             "In release 0.5, from_self() will be called automatically in this scenario."
             )
-            
+
     def __no_criterion(self):
         """generate a Query with no criterion, warn if criterion was present"""
     __no_criterion = _generative(__no_criterion_condition)(__no_criterion)
 
     def _with_current_path(self, path):
         """indicate that this query applies to objects loaded within a certain path.
-        
-        Used by deferred loaders (see strategies.py) which transfer query 
+
+        Used by deferred loaders (see strategies.py) which transfer query
         options from an originating query to a newly generated query intended
         for the deferred load.
-        
+
         """
         self._current_path = path
     _with_current_path = _generative()(_with_current_path)
-    
+
     def with_polymorphic(self, cls_or_mappers, selectable=None):
         """Load columns for descendant mappers of this Query's mapper.
-        
+
         Using this method will ensure that each descendant mapper's
-        tables are included in the FROM clause, and will allow filter() 
-        criterion to be used against those tables.  The resulting 
+        tables are included in the FROM clause, and will allow filter()
+        criterion to be used against those tables.  The resulting
         instances will also have those columns already loaded so that
         no "post fetch" of those columns will be required.
-        
+
         ``cls_or_mappers`` is a single class or mapper, or list of class/mappers,
         which inherit from this Query's mapper.  Alternatively, it
-        may also be the string ``'*'``, in which case all descending 
+        may also be the string ``'*'``, in which case all descending
         mappers will be added to the FROM clause.
-        
-        ``selectable`` is a table or select() statement that will 
+
+        ``selectable`` is a table or select() statement that will
         be used in place of the generated FROM clause.  This argument
-        is required if any of the desired mappers use concrete table 
-        inheritance, since SQLAlchemy currently cannot generate UNIONs 
-        among tables automatically.  If used, the ``selectable`` 
-        argument must represent the full set of tables and columns mapped 
+        is required if any of the desired mappers use concrete table
+        inheritance, since SQLAlchemy currently cannot generate UNIONs
+        among tables automatically.  If used, the ``selectable``
+        argument must represent the full set of tables and columns mapped
         by every desired mapper.  Otherwise, the unaccounted mapped columns
-        will result in their table being appended directly to the FROM 
+        will result in their table being appended directly to the FROM
         clause which will usually lead to incorrect results.
 
         """
         entity = self._generate_mapper_zero()
         entity.set_with_polymorphic(self, cls_or_mappers, selectable=selectable)
     with_polymorphic = _generative(__no_from_condition, __no_criterion_condition)(with_polymorphic)
-        
+
     def yield_per(self, count):
         """Yield only ``count`` rows at a time.
 
         """
         self._yield_per = count
     yield_per = _generative()(yield_per)
-    
+
     def get(self, ident, **kwargs):
         """Return an instance of the object based on the given identifier, or None if not found.
 
         in the order of the table def's primary key columns.
 
         """
-        
+
         ret = self._extension_zero().get(self, ident, **kwargs)
         if ret is not mapper.EXT_CONTINUE:
             return ret
          \**kwargs
            all extra keyword arguments are propagated to the constructor of
            Query.
-           
-       deprecated.  use sqlalchemy.orm.with_parent in conjunction with 
+
+       deprecated.  use sqlalchemy.orm.with_parent in conjunction with
        filter().
 
         """
         criterion = prop.compare(operators.eq, instance, value_is_parent=True)
         return Query(target, **kwargs).filter(criterion)
     query_from_parent = classmethod(util.deprecated(None, False)(query_from_parent))
-    
+
     def correlate(self, *args):
         self._correlate = self._correlate.union([_orm_selectable(s) for s in args])
     correlate = _generative()(correlate)
-    
+
     def autoflush(self, setting):
         """Return a Query with a specific 'autoflush' setting.
 
         Note that a Session with autoflush=False will
-        not autoflush, even if this flag is set to True at the 
+        not autoflush, even if this flag is set to True at the
         Query level.  Therefore this flag is usually used only
         to disable autoflush for a specific Query.
-        
+
         """
         self._autoflush = setting
     autoflush = _generative()(autoflush)
-    
+
     def populate_existing(self):
         """Return a Query that will refresh all instances loaded.
 
 
         An alternative to populate_existing() is to expire the Session
         fully using session.expire_all().
-        
+
         """
         self._populate_existing = True
     populate_existing = _generative()(populate_existing)
-    
+
     def with_parent(self, instance, property=None):
         """add a join criterion corresponding to a relationship to the given parent instance.
 
 
         if alias:
             entity = aliased(entity, alias)
-            
+
         self._entities = list(self._entities)
         m = _MapperEntity(self, entity)
         self.__setup_aliasizers([m])
     add_entity = _generative()(add_entity)
-    
+
     def from_self(self, *entities):
         """return a Query that selects from this Query's SELECT statement.
-        
-        \*entities - optional list of entities which will replace 
-        those being selected.  
+
+        \*entities - optional list of entities which will replace
+        those being selected.
         """
-        
+
         fromclause = self.compile().correlate(None)
         self._statement = self._criterion = None
         self._order_by = self._group_by = self._distinct = False
             for ent in entities:
                 _QueryEntity(self, ent)
             self.__setup_aliasizers(self._entities)
-            
+
     from_self = _generative()(from_self)
     _from_self = from_self
 
     def values(self, *columns):
         """Return an iterator yielding result tuples corresponding to the given list of columns"""
-        
+
         if not columns:
             return iter(())
         q = self._clone()
             q._yield_per = 10
         return iter(q)
     _values = values
-    
+
     def add_column(self, column):
         """Add a SQL ColumnElement to the list of result columns to be returned."""
-        
+
         self._entities = list(self._entities)
         c = _ColumnEntity(self, column)
         self.__setup_aliasizers([c])
     add_column = _generative()(add_column)
-    
+
     def options(self, *args):
         """Return a new Query object, applying the given list of
         MapperOptions.
             for opt in opts:
                 opt.process_query(self)
     __options = _generative()(__options)
-    
+
     def with_lockmode(self, mode):
         """Return a new Query object with the specified locking mode."""
-        
+
         self._lockmode = mode
     with_lockmode = _generative()(with_lockmode)
-    
+
     def params(self, *args, **kwargs):
         """add values for bind parameters which may have been specified in filter().
 
         self._params = self._params.copy()
         self._params.update(kwargs)
     params = _generative()(params)
-    
+
     def filter(self, criterion):
         """apply the given filtering criterion to the query and return the newly resulting ``Query``
 
 
         if criterion is not None and not isinstance(criterion, sql.ClauseElement):
             raise sa_exc.ArgumentError("filter() argument must be of type sqlalchemy.sql.ClauseElement or string")
-            
+
         criterion = self._adapt_clause(criterion, True, True)
-        
+
         if self._criterion is not None:
             self._criterion = self._criterion & criterion
         else:
             self._criterion = criterion
     filter = _generative(__no_statement_condition, __no_limit_offset)(filter)
-    
+
     def filter_by(self, **kwargs):
         """apply the given filtering criterion to the query and return the newly resulting ``Query``."""
 
 
     def order_by(self, *criterion):
         """apply one or more ORDER BY criterion to the query and return the newly resulting ``Query``"""
-        
+
         criterion = [self._adapt_clause(expression._literal_as_text(o), True, True) for o in criterion]
 
         if self._order_by is False:
             self._order_by = self._order_by + criterion
     order_by = util.array_as_starargs_decorator(order_by)
     order_by = _generative(__no_statement_condition)(order_by)
-    
+
     def group_by(self, *criterion):
         """apply one or more GROUP BY criterion to the query and return the newly resulting ``Query``"""
 
         criterion = list(chain(*[_orm_columns(c) for c in criterion]))
-        
+
         if self._group_by is False:
             self._group_by = criterion
         else:
             self._group_by = self._group_by + criterion
     group_by = util.array_as_starargs_decorator(group_by)
     group_by = _generative(__no_statement_condition)(group_by)
-    
+
     def having(self, criterion):
         """apply a HAVING criterion to the query and return the newly resulting ``Query``."""
 
         else:
             self._having = criterion
     having = _generative(__no_statement_condition)(having)
-    
+
     def join(self, *props, **kwargs):
         """Create a join against this ``Query`` object's criterion
         and apply generatively, retunring the newly resulting ``Query``.
           * a string property name, i.e. "rooms".  This will join along
           the relation of the same name from this Query's "primary"
           mapper, if one is present.
-          
-          * a class-mapped attribute, i.e. Houses.rooms.  This will create a 
+
+          * a class-mapped attribute, i.e. Houses.rooms.  This will create a
           join from "Houses" table to that of the "rooms" relation.
-          
+
           * a 2-tuple containing one of the above, combined with a selectable
             which derives from the destination table.   This will cause the join
-            to link to the given selectable instead of the relation's 
-            usual target table.  This argument can be used to join to table 
+            to link to the given selectable instead of the relation's
+            usual target table.  This argument can be used to join to table
             or class aliases, or "polymorphic" selectables.
 
         e.g.::
 
             session.query(Company).join('employees')
             session.query(Company).join('employees', 'tasks')
-            
+
             PAlias = aliased(Person)
             session.query(Person).join((Person.friends, Palias))
-            
+
             session.query(Houses).join(Colonials.rooms, Room.closets)
             session.query(Company).join(('employees', people.join(engineers)), Engineer.computers)
 
         \**kwargs include:
-        
+
             aliased - when joining, create anonymous aliases of each table.  This is
             used for self-referential joins or multiple joins to the same table.
             Consider usage of the aliased(SomeClass) construct as a more explicit
             approach to this.
-            
+
             from_joinpoint - when joins are specified using string property names,
             locate the property from the mapper found in the most recent join() call,
             instead of from the root entity.
             raise TypeError("unknown arguments: %s" % ','.join(kwargs.keys()))
         return self.__join(props, outerjoin=False, create_aliases=aliased, from_joinpoint=from_joinpoint)
     join = util.array_as_starargs_decorator(join)
-    
+
     def outerjoin(self, *props, **kwargs):
         """Create a left outer join against this ``Query`` object's criterion
         and apply generatively, retunring the newly resulting ``Query``.
           * a string property name, i.e. "rooms".  This will join along
           the relation of the same name from this Query's "primary"
           mapper, if one is present.
-          
-          * a class-mapped attribute, i.e. Houses.rooms.  This will create a 
+
+          * a class-mapped attribute, i.e. Houses.rooms.  This will create a
           join from "Houses" table to that of the "rooms" relation.
-          
+
           * a 2-tuple containing one of the above, combined with a selectable
             which derives from the destination table.   This will cause the join
-            to link to the given selectable instead of the relation's 
-            usual target table.  This argument can be used to join to table 
+            to link to the given selectable instead of the relation's
+            usual target table.  This argument can be used to join to table
             or class aliases, or "polymorphic" selectables.
 
         e.g.::
 
             session.query(Company).outerjoin('employees')
             session.query(Company).outerjoin('employees', 'tasks')
-            
+
             PAlias = aliased(Person)
             session.query(Person).outerjoin((Person.friends, Palias))
-            
+
             session.query(Houses).outerjoin(Colonials.rooms, Room.closets)
             session.query(Company).outerjoin(('employees', people.outerjoin(engineers)), Engineer.computers)
 
             raise TypeError("unknown arguments: %s" % ','.join(kwargs.keys()))
         return self.__join(props, outerjoin=True, create_aliases=aliased, from_joinpoint=from_joinpoint)
     outerjoin = util.array_as_starargs_decorator(outerjoin)
-    
+
     def __join(self, keys, outerjoin, create_aliases, from_joinpoint):
         self.__currenttables = util.Set(self.__currenttables)
         self._polymorphic_adapters = self._polymorphic_adapters.copy()
-        
+
         if not from_joinpoint:
             self.__reset_joinpoint()
-        
+
         clause = self._from_obj
         target = None
-        
+
         for key in util.to_list(keys):
             use_selectable = None   # pre-chosen selectable to join to, either user-specified or mapper.with_polymorphic
             alias_criterion = False  # indicate to adapt future filter(), order_by(), etc. criterion to this selectable
             aliased_entity = False
-            
+
             if isinstance(key, tuple):
                 key, use_selectable = key
 
                 of_type = getattr(descriptor, '_of_type', None)
                 if of_type and not use_selectable:
                     use_selectable = of_type #.mapped_table
-                
+
                 if not clause:
                     entity = descriptor.parententity
                     for ent in self._mapper_entities:
                     else:
                         if not use_selectable.is_derived_from(prop.mapper.mapped_table):
                             raise sa_exc.InvalidRequestError("Selectable '%s' is not derived from '%s'" % (use_selectable.description, prop.mapper.mapped_table.description))
-            
+
                         if not isinstance(use_selectable, expression.Alias):
                             use_selectable = use_selectable.alias()
-                    
+
                         target = aliased(prop.mapper, use_selectable)
                         alias_criterion = True
             else:
                     if prop.secondary:
                         self.__currenttables.add(prop.secondary)
                     self.__currenttables.add(prop.table)
-                
+
                 if prop.mapper.with_polymorphic:
                     aliased_entity = True
 
         """
         self.__reset_joinpoint()
     reset_joinpoint = _generative(__no_statement_condition)(reset_joinpoint)
-    
+
     def select_from(self, from_obj):
         """Set the `from_obj` parameter of the query and return the newly
         resulting ``Query``.  This replaces the table which this Query selects
         
         self.__set_select_from(from_obj)
     select_from = _generative(__no_from_condition, __no_criterion_condition)(select_from)
-    
+
     def __getitem__(self, item):
         if isinstance(item, slice):
             start = item.start
         """
         self._distinct = True
     distinct = _generative(__no_statement_condition)(distinct)
-    
+
     def all(self):
         """Return the results represented by this ``Query`` as a list.
 
             statement = sql.text(statement)
         self._statement = statement
     from_statement = _generative(__no_criterion_condition)(from_statement)
-    
+
     def first(self):
         """Return the first result of this ``Query`` or None if the result doesn't contain any row.
 
 
         filtered = bool(list(self._mapper_entities))
         single_entity = filtered and len(self._entities) == 1
-            
+
         if filtered:
             if single_entity:
                 filter = util.OrderedIdentitySet
                 filter = util.OrderedSet
         else:
             filter = None
-        
+
         custom_rows = single_entity and 'append_result' in self._entities[0].extension.methods
         
         process = [query_entity.row_processor(self, context, custom_rows) for query_entity in self._entities]
                 context.progress.remove(context.refresh_instance)
 
             session._finalize_loaded(context.progress)
-                
+
             for ii, attrs in context.partials.items():
                 ii.commit(attrs)
-                
+
             for row in rows:
                 yield row
 
             q = self.__no_criterion()
         else:
             q = self._clone()
-        
+
         if ident is not None:
             mapper = q._mapper_zero()
             params = {}
     def _select_args(self):
         return {'limit':self._limit, 'offset':self._offset, 'distinct':self._distinct, 'group_by':self._group_by or None, 'having':self._having or None}
     _select_args = property(_select_args)
-    
+
     def _should_nest_selectable(self):
         kwargs = self._select_args
         return (kwargs.get('limit') is not None or kwargs.get('offset') is not None or kwargs.get('distinct', False))
 
     def _col_aggregate(self, col, func, nested_cols=None):
         whereclause = self._criterion
-        
+
         context = QueryContext(self)
         from_obj = self.__mapper_zero_from_obj()
 
             s = sql.select([func(s.corresponding_column(col) or col)]).select_from(s)
         else:
             s = sql.select([func(col)], whereclause, from_obj=from_obj, **self._select_args)
-            
+
         if self._autoflush and not self._populate_existing:
             self.session._autoflush()
         return self.session.scalar(s, params=self._params, mapper=self._mapper_zero())
                 raise sa_exc.ArgumentError("Unknown lockmode '%s'" % self._lockmode)
         else:
             for_update = False
-            
+
         for entity in self._entities:
             entity.setup_context(self, context)
 
             froms = [context.from_clause]  # "load from a single FROM" mode, i.e. when select_from() or join() is used
         else:
             froms = context.froms   # "load from discrete FROMs" mode, i.e. when each _MappedEntity has its own FROM
-         
+
         if eager_joins and self._should_nest_selectable:
             # for eager joins present and LIMIT/OFFSET/DISTINCT, wrap the query inside a select,
             # then append eager joins onto that
-            
+
             if context.order_by:
                 order_by_col_expr = list(chain(*[sql_util.find_columns(o) for o in context.order_by]))
             else:
                 context.order_by = None
                 order_by_col_expr = []
-            
+
             inner = sql.select(context.primary_columns + order_by_col_expr, context.whereclause, from_obj=froms, use_labels=True, correlate=False, order_by=context.order_by, **self._select_args)
-            
+
             if self._correlate:
                 inner = inner.correlate(*self._correlate)
-                
+
             inner = inner.alias()
-            
+
             equivs = self.__all_equivs()
 
             context.adapter = sql_util.ColumnAdapter(inner, equivs)
         else:
             if not context.order_by:
                 context.order_by = None
-            
+
             if self._distinct and context.order_by:
                 order_by_col_expr = list(chain(*[sql_util.find_columns(o) for o in context.order_by]))
                 context.primary_columns += order_by_col_expr
 
             froms += context.eager_joins.values()
-            
+
             statement = sql.select(context.primary_columns + context.secondary_columns, context.whereclause, from_obj=froms, use_labels=True, for_update=for_update, correlate=False, order_by=context.order_by, **self._select_args)
             if self._correlate:
                 statement = statement.correlate(*self._correlate)
-                
+
             if context.eager_order_by:
                 statement.append_order_by(*context.eager_order_by)
-            
+
         context.statement = statement._annotate({'_Query__no_adapt': True})
 
         return context
 
 class _QueryEntity(object):
     """represent an entity column returned within a Query result."""
-    
+
     def __new__(cls, *args, **kwargs):
         if cls is _QueryEntity:
             entity = args[1]
             else:
                 cls = _ColumnEntity
         return object.__new__(cls)
-            
+
     def _clone(self):
         q = self.__class__.__new__(self.__class__)
         q.__dict__ = self.__dict__.copy()
 
 class _MapperEntity(_QueryEntity):
     """mapper/class/AliasedClass entity"""
-    
+
     def __init__(self, query, entity, entity_name=None):
         self.primary_entity = not query._entities
         query._entities.append(self)
         if cls_or_mappers is None:
             query._reset_polymorphic_adapter(self.mapper)
             return
-            
+
         mappers, from_obj = self.mapper._with_polymorphic_args(cls_or_mappers, selectable)
         self._with_polymorphic = mappers
-        
+
         # TODO: do the wrapped thing here too so that with_polymorphic() can be
         # applied to aliases
         if not self.is_aliased_class:
             return entity is self.path_entity
         else:
             return entity.base_mapper is self.path_entity
-        
+
     def _get_entity_clauses(self, query, context):
 
         adapter = None
 
         if not adapter and self.adapter:
             adapter = self.adapter
-                
+
         if adapter:
             if query._from_obj_alias:
                 ret = adapter.wrap(query._from_obj_alias)
                 ret = adapter
         else:
             ret = query._from_obj_alias
-        
+
         return ret
-    
+
     def row_processor(self, query, context, custom_rows):
         adapter = self._get_entity_clauses(query, context)
-        
+
         if context.adapter and adapter:
             adapter = adapter.wrap(context.adapter)
         elif not adapter:
             adapter = context.adapter
-                
+
         # polymorphic mappers which have concrete tables in their hierarchy usually
-        # require row aliasing unconditionally.  
+        # require row aliasing unconditionally.
         if not adapter and self.mapper._requires_row_aliasing:
             adapter = sql_util.ColumnAdapter(self.selectable, self.mapper._equivalent_columns)
-        
+
         if self.primary_entity:
             _instance = self.mapper._instance_processor(context, (self.path_entity,), adapter, 
                 extension=self.extension, only_load_props=query._only_load_props, refresh_instance=context.refresh_instance
                 return _instance(row, None)
         
         return main
-            
+
     def setup_context(self, query, context):
         # if single-table inheritance mapper, add "typecol IN (polymorphic)" criterion so
         # that we only load the appropriate types
         if self.mapper.single and self.mapper.inherits is not None and self.mapper.polymorphic_on is not None and self.mapper.polymorphic_identity is not None:
             context.whereclause = sql.and_(context.whereclause, self.mapper.polymorphic_on.in_([m.polymorphic_identity for m in self.mapper.polymorphic_iterator()]))
-        
+
         context.froms.append(self.selectable)
 
         adapter = self._get_entity_clauses(query, context)
                     context.order_by = self.selectable.default_order_by()
             if context.order_by and adapter:
                 context.order_by = adapter.adapt_list(util.to_list(context.order_by))
-        
+
         for value in self.mapper._iterate_polymorphic_properties(self._with_polymorphic):
             if query._only_load_props and value.key not in query._only_load_props:
                 continue
             value.setup(context, self, (self.path_entity,), adapter, only_load_props=query._only_load_props, column_collection=context.primary_columns)
-        
+
     def __str__(self):
         return str(self.mapper)
 
-        
+
 class _ColumnEntity(_QueryEntity):
     """Column/expression based entity."""
 
 
         if not hasattr(column, '_label'):
             column = column.label(None)
-        
+
         self.column = column
         self.entity_name = None
         self.froms = util.Set()
         self.entities = util.Set([elem._annotations['parententity'] for elem in visitors.iterate(column, {}) if 'parententity' in elem._annotations])
-            
+
     def setup_entity(self, entity, mapper, adapter, from_obj, is_aliased_class, with_polymorphic):
         self.froms.add(from_obj)
 
     def __resolve_expr_against_query_aliases(self, query, expr, context):
         return query._adapt_clause(expr, False, True)
-        
+
     def row_processor(self, query, context, custom_rows):
         column = self.__resolve_expr_against_query_aliases(query, self.column, context)
-            
+
         if context.adapter:
             column = context.adapter.columns[column]
-            
+
         def proc(context, row):
             return row[column]
         return proc
-    
+
     def setup_context(self, query, context):
         column = self.__resolve_expr_against_query_aliases(query, self.column, context)
         context.froms += list(self.froms)
         context.primary_columns.append(column)
-    
+
     def __str__(self):
         return str(self.column)
 
         self.primary_columns = []
         self.secondary_columns = []
         self.eager_order_by = []
-        
+
         self.eager_joins = {}
         self.froms = []
         self.adapter = None
-        
+
         self.options = query._with_options
         self.attributes = query._attributes.copy()
 

lib/sqlalchemy/orm/util.py

 
     def _create_do(self, method):
         """Return a closure that loops over impls of the named method."""
+
         def _do(*args, **kwargs):
             for ext in self._extensions:
                 ret = getattr(ext, method)(*args, **kwargs)
             self.aliased_class = None
         sql_util.ColumnAdapter.__init__(self, selectable, equivalents, chain_to)
 
-
 class AliasedClass(object):
     def __init__(self, cls, alias=None):
         self.__mapper = _class_to_mapper(cls)
         self.__adapter = sql_util.ClauseAdapter(alias, equivalents=self.__mapper._equivalent_columns)
         self.__alias = alias
         self.__name__ = 'AliasedClass_' + str(self.__target)
-    
+
     def __adapt_prop(self, prop):
         existing = getattr(self.__target, prop.key)
         comparator = AliasedComparator(self, self.__adapter, existing.comparator)
             existing.impl, parententity=self, comparator=comparator)
         setattr(self, prop.key, queryattr)
         return queryattr
-        
+
     def __getattr__(self, key):
         prop = self.__mapper._get_property(key, raiseerr=False)
         if prop:
             return self.__adapt_prop(prop)
-            
+
         for base in self.__target.__mro__:
             try:
                 attr = object.__getattribute__(base, key)
 
     def __clause_element__(self):
         return self.__clause_element
-    
+
     def operate(self, op, *other, **kwargs):
         return self.adapter.traverse(self.comparator.operate(op, *other, **kwargs))
 
 class _ORMJoin(expression.Join):
 
     __visit_name__ = expression.Join.__visit_name__
-    
+
     def __init__(self, left, right, onclause=None, isouter=False):
         if hasattr(left, '_orm_mappers'):
             left_mapper = left._orm_mappers[1]
             adapt_from = left.right
-        
+
         else:
             left_mapper, left, left_is_aliased = _entity_info(left)
             if left_is_aliased or not left_mapper:
             adapt_to = right
         else:
             adapt_to = None
-            
+
         if left_mapper or right_mapper:
             self._orm_mappers = (left_mapper, right_mapper)
-            
+
             if isinstance(onclause, basestring):
                 prop = left_mapper.get_property(onclause)
             elif isinstance(onclause, attributes.QueryableAttribute):
                 else:
                     onclause = pj
                 self._target_adapter = target_adapter
-                
+
         expression.Join.__init__(self, left, right, onclause, isouter)
 
     def join(self, right, onclause=None, isouter=False):
 
 def with_parent(instance, prop):
     """Return criterion which selects instances with a given parent.
-    
+
     instance
       a parent instance, which should be persistent or detached.
 
      property
        a class-attached descriptor, MapperProperty or string property name
-       attached to the parent instance.  
+       attached to the parent instance.
 
      \**kwargs
        all extra keyword arguments are propagated to the constructor of
     else:
         desc = entity.class_manager[key]
         return desc, desc.property
-    
+
 def _orm_columns(entity):
     mapper, selectable, is_aliased_class = _entity_info(entity)
     if isinstance(selectable, expression.Selectable):
 def _orm_selectable(entity):
     mapper, selectable, is_aliased_class = _entity_info(entity)
     return selectable
-    
+
 def _is_aliased_class(entity):
     return isinstance(entity, AliasedClass)
-    
+
 def _state_mapper(state, entity_name=None):
     if state.entity_name is not attributes.NO_ENTITY_NAME:
         # Override the given entity name if the object is not transient.

test/orm/alltests.py

         'orm.manytomany',
         'orm.onetoone',
         'orm.dynamic',
+
+        'orm.deprecations',
         )
     alltests = unittest.TestSuite()
     for name in modules_to_test:

test/orm/deprecations.py

+"""The collection of modern alternatives to deprecated & removed functionality.
+
+Collects specimens of old ORM code and explicitly covers the recommended
+modern (i.e. not deprecated) alternative to them.  The tests snippets here can
+be migrated directly to the wiki, docs, etc.
+
+"""
+import testenv; testenv.configure_for_tests()
+from sqlalchemy import *
+from sqlalchemy.orm import *
+from testlib import *
+
+
+users, addresses = None, None
+session = None
+
+class Base(object):
+    def __init__(self, **kw):
+        for k, v in kw.iteritems():
+            setattr(self, k, v)
+
+class User(Base): pass
+class Address(Base): pass
+
+
+class QueryAlternativesTest(ORMTest):
+    '''Collects modern idioms for Queries
+
+    The docstring for each test case serves as miniature documentation about
+    the deprecated use case, and the test body illustrates (and covers) the
+    intended replacement code to accomplish the same task.
+
+    Documenting the "old way" including the argument signature helps these
+    cases remain useful to readers even after the deprecated method has been
+    removed from the modern codebase.
+
+    Format:
+
+    def test_deprecated_thing(self):
+        """Query.methodname(old, arg, **signature)
+
+        output = session.query(User).deprecatedmethod(inputs)
+
+        """
+        # 0.4+
+        output = session.query(User).newway(inputs)
+        assert output is correct
+
+        # 0.5+
+        output = session.query(User).evennewerway(inputs)
+        assert output is correct
+
+    '''
+    keep_mappers = True
+    keep_data = True
+
+    def define_tables(self, metadata):
+        global users_table, addresses_table
+        users_table = Table(
+            'users', metadata,
+            Column('id', Integer, primary_key=True),
+            Column('name', String(64)))
+
+        addresses_table = Table(
+            'addresses', metadata,
+            Column('id', Integer, primary_key=True),
+            Column('user_id', Integer, ForeignKey('users.id')),
+            Column('email_address', String(128)),
+            Column('purpose', String(16)),
+            Column('bounces', Integer, default=0))
+
+    def setup_mappers(self):
+        mapper(User, users_table, properties=dict(
+            addresses=relation(Address, backref='user'),
+            ))
+        mapper(Address, addresses_table)
+
+    def insert_data(self):
+        user_cols = ('id', 'name')
+        user_rows = ((1, 'jack'), (2, 'ed'), (3, 'fred'), (4, 'chuck'))
+        users_table.insert().execute(
+            [dict(zip(user_cols, row)) for row in user_rows])
+
+        add_cols = ('id', 'user_id', 'email_address', 'purpose', 'bounces')
+        add_rows = (
+            (1, 1, 'jack@jack.home', 'Personal', 0),
+            (2, 1, 'jack@jack.bizz', 'Work', 1),
+            (3, 2, 'ed@foo.bar', 'Personal', 0),
+            (4, 3, 'fred@the.fred', 'Personal', 10))
+
+        addresses_table.insert().execute(
+            [dict(zip(add_cols, row)) for row in add_rows])
+
+    def setUp(self):
+        super(QueryAlternativesTest, self).setUp()
+        global session
+        if session is None:
+            session = create_session()
+
+    def tearDown(self):
+        super(QueryAlternativesTest, self).tearDown()
+        session.clear()
+
+    ######################################################################
+
+    def test_apply_max(self):
+        """Query.apply_max(col)
+
+        max = session.query(Address).apply_max(Address.bounces)
+
+        """
+        # 0.5.0
+        maxes = list(session.query(Address).values(func.max(Address.bounces)))
+        max = maxes[0][0]
+        assert max == 10
+
+        max = session.query(func.max(Address.bounces)).one()[0]
+        assert max == 10
+
+    def test_apply_min(self):
+        """Query.apply_min(col)
+
+        min = session.query(Address).apply_min(Address.bounces)
+
+        """
+        # 0.5.0
+        mins = list(session.query(Address).values(func.min(Address.bounces)))
+        min = mins[0][0]
+        assert min == 0
+
+        min = session.query(func.min(Address.bounces)).one()[0]
+        assert min == 0
+
+    def test_apply_avg(self):
+        """Query.apply_avg(col)
+
+        avg = session.query(Address).apply_avg(Address.bounces)
+
+        """
+        avgs = list(session.query(Address).values(func.avg(Address.bounces)))
+        avg = avgs[0][0]
+        assert avg > 0 and avg < 10
+
+        avg = session.query(func.avg(Address.bounces)).one()[0]
+        assert avg > 0 and avg < 10
+
+    def test_apply_sum(self):
+        """Query.apply_sum(col)
+
+        avg = session.query(Address).apply_avg(Address.bounces)
+
+        """
+        avgs = list(session.query(Address).values(func.avg(Address.bounces)))
+        avg = avgs[0][0]
+        assert avg > 0 and avg < 10
+
+        avg = session.query(func.avg(Address.bounces)).one()[0]
+        assert avg > 0 and avg < 10
+
+    def test_count_by(self):
+        """Query.count_by(*args, **params)
+
+        num = session.query(Address).count_by(purpose='Personal')
+
+        # old-style implicit *_by join
+        num = session.query(User).count_by(purpose='Personal')
+
+        """
+        num = session.query(Address).filter_by(purpose='Personal').count()
+        assert num == 3, num
+
+        num = (session.query(User).join('addresses').
+               filter(Address.purpose=='Personal')).count()
+        assert num == 3, num
+
+    def test_count_whereclause(self):
+        """Query.count(whereclause=None, params=None, **kwargs)
+
+        num = session.query(Address).count(address_table.c.bounces > 1)
+
+        """
+        num = session.query(Address).filter(Address.bounces > 1).count()
+        assert num == 1, num
+
+    def test_execute(self):
+        """Query.execute(clauseelement, params=None, *args, **kwargs)
+
+        users = session.query(User).execute(users_table.select())
+
+        """
+        users = session.query(User).from_statement(users_table.select()).all()
+        assert len(users) == 4
+
+    def test_get_by(self):
+        """Query.get_by(*args, **params)
+
+        user = session.query(User).get_by(name='ed')
+
+        # 0.3-style implicit *_by join
+        user = session.query(User).get_by(email_addresss='fred@the.fred')
+
+        """
+        user = session.query(User).filter_by(name='ed').first()
+        assert user.name == 'ed'
+
+        user = (session.query(User).join('addresses').
+                filter(Address.email_address=='fred@the.fred')).first()
+        assert user.name == 'fred'
+
+        user = session.query(User).filter(
+            User.addresses.any(Address.email_address=='fred@the.fred')).first()
+        assert user.name == 'fred'
+
+    def test_instances_entities(self):
+        """Query.instances(cursor, *mappers_or_columns, **kwargs)
+
+        sel = users_table.join(addresses_table).select(use_labels=True)
+        res = session.query(User).instances(sel.execute(), Address)
+
+        """
+        sel = users_table.join(addresses_table).select(use_labels=True)
+        res = session.query(User, Address).instances(sel.execute())
+
+        assert len(res) == 4
+        cola, colb = res[0]
+        assert isinstance(cola, User) and isinstance(colb, Address)
+
+
+    def test_join_by(self):
+        """Query.join_by(*args, **params)
+
+        TODO
+        """
+
+    def test_join_to(self):
+        """Query.join_to(key)
+
+        TODO
+        """
+
+    def test_join_via(self):
+        """Query.join_via(keys)
+
+        TODO
+        """
+
+    def test_list(self):
+        """Query.list()
+
+        users = session.query(User).list()
+
+        """
+        users = session.query(User).all()
+        assert len(users) == 4
+
+    def test_scalar(self):
+        """Query.scalar()
+
+        user = session.query(User).filter(User.id==1).scalar()
+
+        """
+        user = session.query(User).filter(User.id==1).first()
+        assert user.id==1
+
+    def test_select(self):
+        """Query.select(arg=None, **kwargs)
+
+        users = session.query(User).select(users_table.c.name != None)
+
+        """
+        users = session.query(User).filter(User.name != None).all()
+        assert len(users) == 4
+
+    def test_select_by(self):
+        """Query.select_by(*args, **params)
+
+        users = session.query(User).select_by(name='fred')
+
+        # 0.3 magic join on *_by methods
+        users = session.query(User).select_by(email_address='fred@the.fred')
+
+        """
+        users = session.query(User).filter_by(name='fred').all()
+        assert len(users) == 1
+
+        users = session.query(User).filter(User.name=='fred').all()
+        assert len(users) == 1
+
+        users = (session.query(User).join('addresses').
+                 filter_by(email_address='fred@the.fred')).all()
+        assert len(users) == 1
+
+        users = session.query(User).filter(User.addresses.any(
+            Address.email_address == 'fred@the.fred')).all()
+        assert len(users) == 1
+
+    def test_selectfirst(self):
+        """Query.selectfirst(arg=None, **kwargs)
+
+        bounced = session.query(Address).selectfirst(
+          addresses_table.c.bounces > 0)
+
+        """
+        bounced = session.query(Address).filter(Address.bounces > 0).first()
+        assert bounced.bounces > 0
+
+    def test_selectfirst_by(self):
+        """Query.selectfirst_by(*args, **params)
+
+        onebounce = session.query(Address).selectfirst_by(bounces=1)
+
+        # 0.3 magic join on *_by methods
+        onebounce_user = session.query(User).selectfirst_by(bounces=1)
+
+        """
+        onebounce = session.query(Address).filter_by(bounces=1).first()
+        assert onebounce.bounces == 1
+
+        onebounce_user = (session.query(User).join('addresses').
+                          filter_by(bounces=1)).first()
+        assert onebounce_user.name == 'jack'
+
+        onebounce_user = (session.query(User).join('addresses').
+                          filter(Address.bounces == 1)).first()
+        assert onebounce_user.name == 'jack'
+
+        onebounce_user = session.query(User).filter(User.addresses.any(
+            Address.bounces == 1)).first()
+        assert onebounce_user.name == 'jack'
+
+
+    def test_selectone(self):
+        """Query.selectone(arg=None, **kwargs)
+
+        ed = session.query(User).selectone(users_table.c.name == 'ed')
+
+        """
+        ed = session.query(User).filter(User.name == 'jack').one()
+
+    def test_selectone_by(self):
+        """Query.selectone_by
+
+        ed = session.query(User).selectone_by(name='ed')
+
+        # 0.3 magic join on *_by methods
+        ed = session.query(User).selectone_by(email_address='ed@foo.bar')
+
+        """
+        ed = session.query(User).filter_by(name='jack').one()
+
+        ed = session.query(User).filter(User.name == 'jack').one()
+
+        ed = session.query(User).join('addresses').filter(
+            Address.email_address == 'ed@foo.bar').one()
+
+        ed = session.query(User).filter(User.addresses.any(
+            Address.email_address == 'ed@foo.bar')).one()
+
+    def test_select_statement(self):
+        """Query.select_statement(statement, **params)
+
+        users = session.query(User).select_statement(users_table.select())
+
+        """
+        users = session.query(User).from_statement(users_table.select()).all()
+        assert len(users) == 4
+
+    def test_select_text(self):
+        """Query.select_text(text, **params)
+
+        users = session.query(User).select_text('SELECT * FROM users')
+
+        """
+        users = session.query(User).from_statement('SELECT * FROM users').all()
+        assert len(users) == 4
+
+    def test_select_whereclause(self):
+        """Query.select_whereclause(whereclause=None, params=None, **kwargs)
+
+
+        users = session,query(User).select_whereclause(users.c.name=='ed')
+        users = session.query(User).select_whereclause("name='ed'")
+
+        """
+        users = session.query(User).filter(User.name=='ed').all()
+        assert len(users) == 1 and users[0].name == 'ed'
+
+        users = session.query(User).filter("name='ed'").all()
+        assert len(users) == 1 and users[0].name == 'ed'
+
+
+
+if __name__ == '__main__':
+    testenv.main()
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.