diana avatar diana committed ef2cb4c

just a pep8 pass of lib/sqlalchemy/orm/

Comments (0)

Files changed (4)

lib/sqlalchemy/orm/strategies.py

 from .session import _state_session
 import itertools
 
+
 def _register_attribute(strategy, mapper, useobject,
         compare_function=None,
         typecallable=None,
             for hook in listen_hooks:
                 hook(desc, prop)
 
+
 class UninstrumentedColumnLoader(LoaderStrategy):
     """Represent the a non-instrumented MapperProperty.
 
     def create_row_processor(self, context, path, mapper, row, adapter):
         return None, None, None
 
+
 class ColumnLoader(LoaderStrategy):
     """Provide loading behavior for a :class:`.ColumnProperty`."""
 
 
         _register_attribute(self, mapper, useobject=False,
             compare_function=coltype.compare_values,
-            active_history = active_history
+            active_history=active_history
        )
 
     def create_row_processor(self, context, path,
                 state._expire_attribute_pre_commit(dict_, key)
             return expire_for_non_present_col, None, None
 
+
 log.class_logger(ColumnLoader)
 
+
 class DeferredColumnLoader(LoaderStrategy):
     """Provide loading behavior for a deferred :class:`.ColumnProperty`."""
 
 
         elif not self.is_class_level:
             def set_deferred_for_local_state(state, dict_, row):
-                state._set_callable(dict_, key, LoadDeferredColumns(state, key))
+                state._set_callable(
+                    dict_, key, LoadDeferredColumns(state, key))
             return set_deferred_for_local_state, None, None
         else:
             def reset_col_for_deferred(state, dict_, row):
                     localparent.iterate_properties
                     if isinstance(p, StrategizedProperty) and
                       isinstance(p.strategy, DeferredColumnLoader) and
-                      p.group==self.group
+                      p.group == self.group
                     ]
         else:
             toload = [self.key]
 
         return attributes.ATTR_WAS_SET
 
+
 log.class_logger(DeferredColumnLoader)
 
+
 class LoadDeferredColumns(object):
     """serializable loader object used by DeferredColumnLoader"""
 
         strategy = prop._strategies[DeferredColumnLoader]
         return strategy._load_for_state(state, passive)
 
+
 class DeferredOption(StrategizedOption):
     propagate_to_loaders = True
 
         else:
             return ColumnLoader
 
+
 class UndeferGroupOption(MapperOption):
     propagate_to_loaders = True
 
     def process_query(self, query):
         query._attributes[("undefer", self.group)] = True
 
+
 class AbstractRelationshipLoader(LoaderStrategy):
     """LoaderStratgies which deal with related objects."""
 
         self.target = self.parent_property.target
         self.uselist = self.parent_property.uselist
 
+
 class NoLoader(AbstractRelationshipLoader):
     """Provide loading behavior for a :class:`.RelationshipProperty`
     with "lazy=None".
         _register_attribute(self, mapper,
             useobject=True,
             uselist=self.parent_property.uselist,
-            typecallable = self.parent_property.collection_class,
+            typecallable=self.parent_property.collection_class,
         )
 
     def create_row_processor(self, context, path, mapper, row, adapter):
             state._initialize(self.key)
         return invoke_no_load, None, None
 
+
 log.class_logger(NoLoader)
 
+
 class LazyLoader(AbstractRelationshipLoader):
     """Provide loading behavior for a :class:`.RelationshipProperty`
     with "lazy=True", that is loads when first accessed.
     def init_class_attribute(self, mapper):
         self.is_class_level = True
 
+        active_history = (
+            self.parent_property.active_history or
+            self.parent_property.direction is not interfaces.MANYTOONE or
+            not self.use_get
+        )
+
         # MANYTOONE currently only needs the
         # "old" value for delete-orphan
         # cascades.  the required _SingleParentValidator
                 mapper,
                 useobject=True,
                 callable_=self._load_for_state,
-                uselist = self.parent_property.uselist,
-                backref = self.parent_property.back_populates,
-                typecallable = self.parent_property.collection_class,
-                active_history = \
-                    self.parent_property.active_history or \
-                    self.parent_property.direction is not \
-                        interfaces.MANYTOONE or \
-                    not self.use_get,
-                )
+                uselist=self.parent_property.uselist,
+                backref=self.parent_property.back_populates,
+                typecallable=self.parent_property.collection_class,
+                active_history=active_history
+        )
 
     def lazy_clause(self, state, reverse_direction=False,
                                 alias_secondary=False,
                                     state, dict_,
                                     bind_to_col[bindparam._identifying_key])
 
-
         if self.parent_property.secondary is not None and alias_secondary:
             criterion = sql_util.ClauseAdapter(
                                 self.parent_property.secondary.alias()).\
             else:
                 return None
 
-
     def create_row_processor(self, context, path,
                                     mapper, row, adapter):
         key = self.key
 
 log.class_logger(LazyLoader)
 
+
 class LoadLazyAttribute(object):
     """serializable loader object used by LazyLoader"""
 
 
         return None, None, load_immediate
 
+
 class SubqueryLoader(AbstractRelationshipLoader):
     def __init__(self, parent):
         super(SubqueryLoader, self).__init__(parent)
         q = orig_query.session.query(effective_entity)
         q._attributes = {
             ("orig_query", SubqueryLoader): orig_query,
-            ('subquery_path', None) : subq_path
+            ('subquery_path', None): subq_path
         }
         q = q._enable_single_crit(False)
 
         left_alias = orm_util.AliasedClass(leftmost_mapper, embed_q)
         return left_alias
 
-
     def _prep_for_joins(self, left_alias, subq_path):
         subq_path = subq_path.path
 
         # figure out what's being joined.  a.k.a. the fun part
         to_join = [
-                    (subq_path[i], subq_path[i+1])
+                    (subq_path[i], subq_path[i + 1])
                     for i in xrange(0, len(subq_path), 2)
                 ]
 
                     (k, [v[0] for v in v])
                     for k, v in itertools.groupby(
                         subq,
-                        lambda x:x[1:]
+                        lambda x: x[1:]
                     ))
             path.set(context, 'collections', collections)
 
 
         return load_scalar_from_subq, None, None
 
+
 log.class_logger(SubqueryLoader)
 
+
 class JoinedLoader(AbstractRelationshipLoader):
     """Provide loading behavior for a :class:`.RelationshipProperty`
     using joined eager loading.
                                     )
                                 )
 
-
     def _create_eager_adapter(self, context, row, adapter, path):
         user_defined_adapter = path.get(context,
                                 "user_defined_eager_row_processor",
                 load_scalar_from_joined_existing_row, \
                 None, load_scalar_from_joined_exec
 
+
 log.class_logger(JoinedLoader)
 
+
 class EagerLazyOption(StrategizedOption):
     def __init__(self, key, lazy=True, chained=False,
                     propagate_to_loaders=True
         return self.strategy_cls
 
 _factory = {
-    False:JoinedLoader,
-    "joined":JoinedLoader,
-    None:NoLoader,
-    "noload":NoLoader,
-    "select":LazyLoader,
-    True:LazyLoader,
-    "subquery":SubqueryLoader,
-    "immediate":ImmediateLoader
+    False: JoinedLoader,
+    "joined": JoinedLoader,
+    None: NoLoader,
+    "noload": NoLoader,
+    "select": LazyLoader,
+    True: LazyLoader,
+    "subquery": SubqueryLoader,
+    "immediate": ImmediateLoader
 }
+
+
 def factory(identifier):
     return _factory.get(identifier, LazyLoader)
 
+
 class EagerJoinOption(PropertyOption):
 
     def __init__(self, key, innerjoin, chained=False):
         else:
             paths[-1].set(query, "eager_join_type", self.innerjoin)
 
+
 class LoadEagerFromAliasOption(PropertyOption):
 
     def __init__(self, key, alias=None, chained=False):
             paths[-1].set(query, "user_defined_eager_row_processor",
                                     adapter)
 
+
 def single_parent_validator(desc, prop):
     def _do_check(state, value, oldvalue, initiator):
         if value is not None and initiator.key == prop.key:
                             active_history=True)
     event.listen(desc, 'set', set_, raw=True, retval=True,
                             active_history=True)
-

lib/sqlalchemy/orm/sync.py

 
 from . import exc, util as orm_util, attributes
 
+
 def populate(source, source_mapper, dest, dest_mapper,
                         synchronize_pairs, uowcommit, flag_cascaded_pks):
     source_dict = source.dict
                     r.references(l):
             uowcommit.attributes[("pk_cascaded", dest, r)] = True
 
+
 def clear(dest, dest_mapper, synchronize_pairs):
     for l, r in synchronize_pairs:
         if r.primary_key:
             raise AssertionError(
-                                "Dependency rule tried to blank-out primary key "
-                                "column '%s' on instance '%s'" %
-                                (r, orm_util.state_str(dest))
-                            )
+                "Dependency rule tried to blank-out primary key "
+                "column '%s' on instance '%s'" %
+                (r, orm_util.state_str(dest))
+            )
         try:
             dest_mapper._set_state_attr_by_column(dest, dest.dict, r, None)
         except exc.UnmappedColumnError:
             _raise_col_to_prop(True, None, l, dest_mapper, r)
 
+
 def update(source, source_mapper, dest, old_prefix, synchronize_pairs):
     for l, r in synchronize_pairs:
         try:
-            oldvalue = source_mapper._get_committed_attr_by_column(source.obj(), l)
-            value = source_mapper._get_state_attr_by_column(source, source.dict, l)
+            oldvalue = source_mapper._get_committed_attr_by_column(
+                source.obj(), l)
+            value = source_mapper._get_state_attr_by_column(
+                source, source.dict, l)
         except exc.UnmappedColumnError:
             _raise_col_to_prop(False, source_mapper, l, None, r)
         dest[r.key] = value
         dest[old_prefix + r.key] = oldvalue
 
+
 def populate_dict(source, source_mapper, dict_, synchronize_pairs):
     for l, r in synchronize_pairs:
         try:
-            value = source_mapper._get_state_attr_by_column(source, source.dict, l)
+            value = source_mapper._get_state_attr_by_column(
+                source, source.dict, l)
         except exc.UnmappedColumnError:
             _raise_col_to_prop(False, source_mapper, l, None, r)
 
         dict_[r.key] = value
 
+
 def source_modified(uowcommit, source, source_mapper, synchronize_pairs):
     """return true if the source object has changes from an old to a
     new value on the given synchronize pairs
     else:
         return False
 
+
 def _raise_col_to_prop(isdest, source_mapper, source_column,
                        dest_mapper, dest_column):
     if isdest:

lib/sqlalchemy/orm/unitofwork.py

 
 sessionlib = util.importlater("sqlalchemy.orm", "session")
 
+
 def track_cascade_events(descriptor, prop):
     """Establish event listeners on object attributes which handle
     cascade-on-set/append.
                                     postsort_actions):
                 rec.execute(self)
 
-
     def finalize_flush_changes(self):
         """mark processed objects as clean / deleted after a successful
         flush().
         self.session._remove_newly_deleted(isdel)
         self.session._register_newly_persistent(other)
 
+
 class IterateMappersMixin(object):
     def _mappers(self, uow):
         if self.fromparent:
         else:
             return self.dependency_processor.mapper.self_and_descendants
 
+
 class Preprocess(IterateMappersMixin):
     def __init__(self, dependency_processor, fromparent):
         self.dependency_processor = dependency_processor
         else:
             return False
 
+
 class PostSortRec(object):
     disabled = False
 
             ",".join(str(x) for x in self.__dict__.values())
         )
 
+
 class ProcessAll(IterateMappersMixin, PostSortRec):
     def __init__(self, uow, dependency_processor, delete, fromparent):
         self.dependency_processor = dependency_processor
                 if isdelete == self.delete and not listonly:
                     yield state
 
+
 class IssuePostUpdate(PostSortRec):
     def __init__(self, uow, mapper, isdelete):
         self.mapper = mapper
 
         persistence.post_update(self.mapper, states, uow, cols)
 
+
 class SaveUpdateAll(PostSortRec):
     def __init__(self, uow, mapper):
         self.mapper = mapper
             uow
         )
 
-
     def per_state_flush_actions(self, uow):
         states = list(uow.states_for_mapper_hierarchy(
                                     self.mapper, False, False))
             states_for_prop = uow.filter_states_for_dep(dep, states)
             dep.per_state_flush_actions(uow, states_for_prop, False)
 
+
 class DeleteAll(PostSortRec):
     def __init__(self, uow, mapper):
         self.mapper = mapper
             states_for_prop = uow.filter_states_for_dep(dep, states)
             dep.per_state_flush_actions(uow, states_for_prop, True)
 
+
 class ProcessState(PostSortRec):
     def __init__(self, uow, dependency_processor, delete, state):
         self.dependency_processor = dependency_processor
             self.delete
         )
 
+
 class SaveUpdateState(PostSortRec):
     def __init__(self, uow, state, mapper):
         self.state = state
             orm_util.state_str(self.state)
         )
 
+
 class DeleteState(PostSortRec):
     def __init__(self, uow, state, mapper):
         self.state = state
             self.__class__.__name__,
             orm_util.state_str(self.state)
         )
-

lib/sqlalchemy/orm/util.py

 
 _none_set = frozenset([None])
 
+
 class CascadeOptions(frozenset):
     """Keeps track of the options sent to relationship().cascade"""
 
             ",".join([x for x in sorted(self)])
         )
 
+
 def _validator_events(desc, key, validator, include_removes):
     """Runs a validation method on an attribute value to be set or appended."""
 
     if include_removes:
         event.listen(desc, "remove", remove, raw=True, retval=True)
 
+
 def polymorphic_union(table_map, typecolname,
                         aliasname='p_union', cast_nulls=True):
     """Create a ``UNION`` statement used by a polymorphic mapper.
                                      from_obj=[table]))
     return sql.union_all(*result).alias(aliasname)
 
+
 def identity_key(*args, **kwargs):
     """Get an identity key.
 
     mapper = object_mapper(instance)
     return mapper.identity_key_from_instance(instance)
 
+
 class ORMAdapter(sql_util.ColumnAdapter):
     """Extends ColumnAdapter to accept ORM entities.
 
         else:
             return None
 
+
 class PathRegistry(object):
     """Represent query load paths and registry functions.
 
     def __repr__(self):
         return "%s(%r)" % (self.__class__.__name__, self.path, )
 
+
 class RootRegistry(PathRegistry):
     """Root registry, defers to mappers so that
     paths are maintained per-root-mapper.
         return mapper._sa_path_registry
 PathRegistry.root = RootRegistry()
 
+
 class KeyRegistry(PathRegistry):
     def __init__(self, parent, key):
         self.key = key
                 self, entity
             )
 
+
 class EntityRegistry(PathRegistry, dict):
     is_aliased_class = False
 
         return '<AliasedClass at 0x%x; %s>' % (
             id(self), self.__target.__name__)
 
+
 AliasedInsp = util.namedtuple("AliasedInsp", [
         "entity",
         "mapper",
         "polymorphic_on"
     ])
 
+
 class AliasedInsp(_InspectionAttr, AliasedInsp):
     """Provide an inspection interface for an
     :class:`.AliasedClass` object.
         :class:`.AliasedInsp`."""
         return self.mapper.class_
 
+
 inspection._inspects(AliasedClass)(lambda target: target._aliased_insp)
 
+
 def aliased(element, alias=None, name=None, adapt_on_names=False):
     """Produce an alias of the given element, usually an :class:`.AliasedClass`
     instance.
         return AliasedClass(element, alias=alias,
                     name=name, adapt_on_names=adapt_on_names)
 
+
 def with_polymorphic(base, classes, selectable=False,
                         polymorphic_on=None, aliased=False,
                         innerjoin=False):
     """
     return sql_util._deep_annotate(element, {'_orm_adapt': True}, exclude)
 
+
 def _orm_deannotate(element):
     """Remove annotations that link a column to a particular mapping.
 
                 values=("_orm_adapt", "parententity")
             )
 
+
 def _orm_full_deannotate(element):
     return sql_util._deep_deannotate(element)
 
+
 class _ORMJoin(expression.Join):
     """Extend Join to support ORM constructs as input."""
 
     def outerjoin(self, right, onclause=None, join_to_left=True):
         return _ORMJoin(self, right, onclause, True, join_to_left)
 
+
 def join(left, right, onclause=None, isouter=False, join_to_left=True):
     """Produce an inner join between left and right clauses.
 
     """
     return _ORMJoin(left, right, onclause, isouter, join_to_left)
 
+
 def outerjoin(left, right, onclause=None, join_to_left=True):
     """Produce a left outer join between left and right clauses.
 
     """
     return _ORMJoin(left, right, onclause, True, join_to_left)
 
+
 def with_parent(instance, prop):
     """Create filtering criterion that relates this query's primary entity
     to the given related instance, using established :func:`.relationship()`
     else:
         return expression._column_as_key(attr)
 
+
 _state_mapper = util.dottedgetter('manager.mapper')
 
+
 @inspection._inspects(object)
 def _inspect_mapped_object(instance):
     try:
     except exc.NO_STATE:
         return None
 
+
 @inspection._inspects(type)
 def _inspect_mapped_class(class_, configure=False):
     try:
     """
     return object_state(instance).mapper
 
+
 def object_state(instance):
     """Given an object, return the :class:`.InstanceState`
     associated with the object.
     else:
         return state
 
+
 def class_mapper(class_, configure=True):
     """Given a class, return the primary :class:`.Mapper` associated
     with the key.
     else:
         return mapper
 
+
 def _class_to_mapper(class_or_mapper):
     insp = inspection.inspect(class_or_mapper, False)
     if insp is not None:
     else:
         raise exc.UnmappedClassError(class_or_mapper)
 
+
 def _mapper_or_none(entity):
     """Return the :class:`.Mapper` for the given class or None if the
     class is not mapped."""
     else:
         return None
 
+
 def _is_mapped_class(entity):
     """Return True if the given object is a mapped class,
     :class:`.Mapper`, or :class:`.AliasedClass`."""
                     (description, key)
                 )
 
+
 def _orm_columns(entity):
     insp = inspection.inspect(entity, False)
     if hasattr(insp, 'selectable'):
     else:
         return [entity]
 
+
 def has_identity(object):
     state = attributes.instance_state(object)
     return state.has_identity
 
+
 def instance_str(instance):
     """Return a string describing an instance."""
 
     return state_str(attributes.instance_state(instance))
 
+
 def state_str(state):
     """Return a string describing an instance via its InstanceState."""
 
     else:
         return '<%s at 0x%x>' % (state.class_.__name__, id(state.obj()))
 
+
 def state_class_str(state):
     """Return a string describing an instance's class via its InstanceState."""
 
     else:
         return '<%s>' % (state.class_.__name__, )
 
+
 def attribute_str(instance, attribute):
     return instance_str(instance) + "." + attribute
 
+
 def state_attribute_str(state, attribute):
     return state_str(state) + "." + attribute
-
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.