Commits

Anonymous committed 0ad6282

- alex gaynor's latest batch of pypy test fixes

  • Participants
  • Parent commits e860c98

Comments (0)

Files changed (5)

 =======
 0.7.0b3
 =======
+- general
+  - Lots of fixes to unit tests when run under Pypy.
+
 - orm
   - Changed the underlying approach to query.count().
     query.count() is now in all cases exactly:

lib/sqlalchemy/util/langhelpers.py

 # This module is part of SQLAlchemy and is released under
 # the MIT License: http://www.opensource.org/licenses/mit-license.php
 
-"""Routines to help with the creation, loading and introspection of 
+"""Routines to help with the creation, loading and introspection of
 modules, classes, hierarchies, attributes, functions, and methods.
 
 """
     pass along unrecognized keywords to it's base classes, and the collection
     process is repeated recursively on each of the bases.
 
-    Uses a subset of inspect.getargspec() to cut down on method overhead. 
+    Uses a subset of inspect.getargspec() to cut down on method overhead.
     No anonymous tuple arguments please !
 
     """
     while stack:
         class_ = stack.pop()
         ctr = class_.__dict__.get('__init__', False)
-        if not ctr or not isinstance(ctr, types.FunctionType):
+        if (not ctr or
+            not isinstance(ctr, types.FunctionType) or
+            not isinstance(ctr.func_code, types.CodeType)):
             stack.update(class_.__bases__)
             continue
 
     return list(hier)
 
 def iterate_attributes(cls):
-    """iterate all the keys and attributes associated 
+    """iterate all the keys and attributes associated
        with a class, without using getattr().
 
-       Does not use getattr() so that class-sensitive 
+       Does not use getattr() so that class-sensitive
        descriptors (i.e. property.__get__()) are not called.
 
     """
     @memoized_property
     def module(self):
         if self._il_addtl:
-            m = __import__(self._il_path, globals(), locals(), 
+            m = __import__(self._il_path, globals(), locals(),
                                 [self._il_addtl])
             try:
                 return getattr(m, self._il_addtl)
             except AttributeError:
                 raise ImportError(
-                        "Module %s has no attribute '%s'" % 
+                        "Module %s has no attribute '%s'" %
                         (self._il_path, self._il_addtl)
                     )
         else:
             attr = getattr(self.module, key)
         except AttributeError:
             raise AttributeError(
-                        "Module %s has no attribute '%s'" % 
+                        "Module %s has no attribute '%s'" %
                         (self._il_path, key)
                     )
         self.__dict__[key] = attr
     return bool(obj)
 
 def bool_or_str(*text):
-    """Return a callable that will evaulate a string as 
+    """Return a callable that will evaulate a string as
     boolean, or one of a set of "alternate" string values.
 
     """
     else:
         if isinstance(argtype, tuple):
             raise exc.ArgumentError(
-                            "Argument '%s' is expected to be one of type %s, got '%s'" % 
+                            "Argument '%s' is expected to be one of type %s, got '%s'" %
                             (name, ' or '.join("'%s'" % a for a in argtype), type(arg)))
         else:
             raise exc.ArgumentError(
-                            "Argument '%s' is expected to be of type '%s', got '%s'" % 
+                            "Argument '%s' is expected to be of type '%s', got '%s'" %
                             (name, argtype, type(arg)))
 
 
     on classes rather than instances.
 
     The decorator is currently special when using the declarative
-    module, but note that the 
+    module, but note that the
     :class:`~.sqlalchemy.ext.declarative.declared_attr`
     decorator should be used for this purpose with declarative.
 
     is strictly so that Sphinx autoattr picks up the docstring we want
     (it doesn't appear to pick up the in-module docstring if the datamember
     is in a different module - autoattribute also blows up completely).
-    If Sphinx fixes/improves this then we would no longer need 
+    If Sphinx fixes/improves this then we would no longer need
     ``doc`` here.
-    
+
     """
     symbols = {}
     _lock = threading.Lock()
 def warn(msg, stacklevel=3):
     """Issue a warning.
 
-    If msg is a string, :class:`.exc.SAWarning` is used as 
+    If msg is a string, :class:`.exc.SAWarning` is used as
     the category.
 
     .. note:: This function is swapped out when the test suite
-       runs, with a compatible version that uses 
+       runs, with a compatible version that uses
        warnings.warn_explicit, so that the warnings registry can
        be controlled.
 

test/base/test_utils.py

 import copy, threading
 from sqlalchemy import util, sql, exc
 from test.lib import TestBase
-from test.lib.testing import eq_, is_, ne_
+from test.lib.testing import eq_, is_, ne_, fails_if
 from test.lib.util import gc_collect, picklers
 from sqlalchemy.util import classproperty
 
             'apply_kw': 'a=a, b=b', 'apply_pos': 'a, b' },
            grouped=False)
 
+    @fails_if(lambda: util.pypy, "object.__init__ is introspectable")
     def test_init_grouped(self):
         object_spec = {
             'args': '(self)', 'self_arg': 'self',
         self._test_init(None, object_spec, wrapper_spec, custom_spec)
         self._test_init(True, object_spec, wrapper_spec, custom_spec)
 
+    @fails_if(lambda: util.pypy,  "object.__init__ can be introspected")
     def test_init_bare(self):
         object_spec = {
             'args': 'self', 'self_arg': 'self',

test/orm/test_session.py

     def test_object_session_raises(self):
         assert_raises(
             orm_exc.UnmappedInstanceError,
-            object_session, 
+            object_session,
             object()
         )
 
         assert_raises(
             orm_exc.UnmappedInstanceError,
-            object_session, 
+            object_session,
             User()
         )
 
         sess.add(u1)
         assert u1 in sess.new
 
-        # test expired attributes 
+        # test expired attributes
         # get unexpired
         u1 = sess.query(User).first()
         sess.expire(u1)
 
     @testing.resolve_artifact_names
     def test_autoflush_expressions(self):
-        """test that an expression which is dependent on object state is 
+        """test that an expression which is dependent on object state is
         evaluated after the session autoflushes.   This is the lambda
         inside of strategies.py lazy_clause.
 
         assert sess.connection(mapper=Address, bind=e1).engine is e1
         assert sess.connection(mapper=Address).engine is e2
         assert sess.connection(clause=addresses.select()).engine is e2
-        assert sess.connection(mapper=User, 
+        assert sess.connection(mapper=User,
                                 clause=addresses.select()).engine is e1
-        assert sess.connection(mapper=User, 
-                                clause=addresses.select(), 
+        assert sess.connection(mapper=User,
+                                clause=addresses.select(),
                                 bind=e2).engine is e2
 
         sess.close()

test/orm/test_subquery_relations.py

     def test_basic(self):
         mapper(User, users, properties={
             'addresses':relationship(
-                            mapper(Address, addresses), 
+                            mapper(Address, addresses),
                             order_by=Address.id)
         })
         sess = create_session()
 
         self.assert_sql_count(testing.db, go, 2)
 
-        def go(): 
+        def go():
             eq_(
-                self.static.user_address_result, 
+                self.static.user_address_result,
                 q.order_by(User.id).all()
             )
         self.assert_sql_count(testing.db, go, 2)
 
         self.assert_sql_count(testing.db, go, 2)
 
-        def go(): 
+        def go():
             eq_(
-                self.static.user_address_result, 
+                self.static.user_address_result,
                 q.order_by(u.id).all()
             )
         self.assert_sql_count(testing.db, go, 2)
     def test_from_get(self):
         mapper(User, users, properties={
             'addresses':relationship(
-                            mapper(Address, addresses), 
+                            mapper(Address, addresses),
                             order_by=Address.id)
         })
         sess = create_session()
     def test_from_params(self):
         mapper(User, users, properties={
             'addresses':relationship(
-                            mapper(Address, addresses), 
+                            mapper(Address, addresses),
                             order_by=Address.id)
         })
         sess = create_session()
         )
 
     @testing.resolve_artifact_names
-    def test_many_to_many(self):
+    def test_many_to_many_plain(self):
         mapper(Keyword, keywords)
         mapper(Item, items, properties = dict(
                 keywords = relationship(Keyword, secondary=item_keywords,
             eq_(self.static.item_keyword_result, q.all())
         self.assert_sql_count(testing.db, go, 2)
 
+    @testing.resolve_artifact_names
+    def test_many_to_many_with_join(self):
+        mapper(Keyword, keywords)
+        mapper(Item, items, properties = dict(
+                keywords = relationship(Keyword, secondary=item_keywords,
+                                    lazy='subquery', order_by=keywords.c.id)))
+
+        q = create_session().query(Item).order_by(Item.id)
         def go():
             eq_(self.static.item_keyword_result[0:2],
                 q.join('keywords').filter(Keyword.name == 'red').all())
         self.assert_sql_count(testing.db, go, 2)
 
+    @testing.resolve_artifact_names
+    def test_many_to_many_with_join_alias(self):
+        mapper(Keyword, keywords)
+        mapper(Item, items, properties = dict(
+                keywords = relationship(Keyword, secondary=item_keywords,
+                                    lazy='subquery', order_by=keywords.c.id)))
+
+        q = create_session().query(Item).order_by(Item.id)
         def go():
             eq_(self.static.item_keyword_result[0:2],
                 (q.join('keywords', aliased=True).
     @testing.resolve_artifact_names
     def test_orderby(self):
         mapper(User, users, properties = {
-            'addresses':relationship(mapper(Address, addresses), 
+            'addresses':relationship(mapper(Address, addresses),
                         lazy='subquery', order_by=addresses.c.email_address),
         })
         q = create_session().query(User)
     @testing.resolve_artifact_names
     def test_orderby_multi(self):
         mapper(User, users, properties = {
-            'addresses':relationship(mapper(Address, addresses), 
-                            lazy='subquery', 
+            'addresses':relationship(mapper(Address, addresses),
+                            lazy='subquery',
                             order_by=[
                                     addresses.c.email_address,
                                     addresses.c.id]),
 
     @testing.resolve_artifact_names
     def test_orderby_related(self):
-        """A regular mapper select on a single table can 
+        """A regular mapper select on a single table can
             order by a relationship to a second table"""
 
         mapper(Address, addresses)
         mapper(User, users, properties = dict(
-            addresses = relationship(Address, 
+            addresses = relationship(Address,
                                         lazy='subquery',
                                         order_by=addresses.c.id),
         ))
             'orders':relationship(Order, order_by=orders.c.id), # o2m, m2o
         })
         mapper(Order, orders, properties={
-            'items':relationship(Item, 
+            'items':relationship(Item,
                         secondary=order_items, order_by=items.c.id),  #m2m
         })
         mapper(Item, items, properties={
-            'keywords':relationship(Keyword, 
+            'keywords':relationship(Keyword,
                                         secondary=item_keywords,
                                         order_by=keywords.c.id) #m2m
         })
         mapper(Keyword, keywords)
 
         callables = {
-                        'joinedload':joinedload, 
+                        'joinedload':joinedload,
                     'subqueryload':subqueryload
                 }
 
 
         for o, i, k, count in configs:
             mapper(User, users, properties={
-                'orders':relationship(Order, lazy=opts[o], order_by=orders.c.id), 
+                'orders':relationship(Order, lazy=opts[o], order_by=orders.c.id),
             })
             mapper(Order, orders, properties={
-                'items':relationship(Item, 
-                            secondary=order_items, lazy=opts[i], order_by=items.c.id), 
+                'items':relationship(Item,
+                            secondary=order_items, lazy=opts[i], order_by=items.c.id),
             })
             mapper(Item, items, properties={
-                'keywords':relationship(Keyword, 
+                'keywords':relationship(Keyword,
                                             lazy=opts[k],
                                             secondary=item_keywords,
                                             order_by=keywords.c.id)
 
     @testing.resolve_artifact_names
     def test_double(self):
-        """Eager loading with two relationships simultaneously, 
+        """Eager loading with two relationships simultaneously,
             from the same table, using aliases."""
 
         openorders = sa.alias(orders, 'openorders')
 
     @testing.resolve_artifact_names
     def test_double_same_mappers(self):
-        """Eager loading with two relationships simulatneously, 
+        """Eager loading with two relationships simulatneously,
         from the same table, using aliases."""
 
         mapper(Address, addresses)
                 order_by=items.c.id)
         })
         mapper(User, users, properties={
-            'addresses':relationship(mapper(Address, addresses), 
-                            lazy='subquery', 
+            'addresses':relationship(mapper(Address, addresses),
+                            lazy='subquery',
                             order_by=addresses.c.id),
             'orders':relationship(Order, lazy='select', order_by=orders.c.id)
         })
     @testing.resolve_artifact_names
     def test_one_to_many_scalar(self):
         mapper(User, users, properties = dict(
-            address = relationship(mapper(Address, addresses), 
+            address = relationship(mapper(Address, addresses),
                                     lazy='subquery', uselist=False)
         ))
         q = create_session().query(User)
                'orders':relationship(Order, backref='user', lazy='subquery',
                                             order_by=orders.c.id),
                'max_order':relationship(
-                                mapper(Order, max_orders, non_primary=True), 
+                                mapper(Order, max_orders, non_primary=True),
                                 lazy='subquery', uselist=False)
                })
 
             ], q.order_by(User.id).all())
         self.assert_sql_count(testing.db, go, 3)
 
-    @testing.resolve_artifact_names 
+    @testing.resolve_artifact_names
     def test_uselist_false_warning(self):
-        """test that multiple rows received by a 
+        """test that multiple rows received by a
         uselist=False raises a warning."""
 
         mapper(User, users, properties={
                 self.children.append(node)
 
         mapper(Node, nodes, properties={
-            'children':relationship(Node, 
-                                        lazy='subquery', 
+            'children':relationship(Node,
+                                        lazy='subquery',
                                         join_depth=3, order_by=nodes.c.id)
         })
         sess = create_session()
         sess.expunge_all()
 
         def go():
-            eq_( 
+            eq_(
                 Node(data='n1', children=[Node(data='n11'), Node(data='n12')]),
                 sess.query(Node).order_by(Node.id).first(),
                 )