Commits

Mike Bayer committed fc18aef

- The exists() construct won't "export" its contained list
of elements as FROM clauses, allowing them to be used more
effectively in the columns clause of a SELECT.

- and_() and or_() now generate a ColumnElement, allowing
boolean expressions as result columns, i.e.
select([and_(1, 0)]). [ticket:798]

Comments (0)

Files changed (5)

       [ticket:1068].  This feature is on hold pending further
       development.
 
+    - The exists() construct won't "export" its contained list 
+      of elements as FROM clauses, allowing them to be used more
+      effectively in the columns clause of a SELECT.
+      
+    - and_() and or_() now generate a ColumnElement, allowing
+      boolean expressions as result columns, i.e.
+      select([and_(1, 0)]).  [ticket:798]
+    
     - Added func.min(), func.max(), func.sum() as "generic functions",
       which basically allows for their return type to be determined
       automatically.  Helps with dates on SQLite, decimal types, 

lib/sqlalchemy/sql/expression.py

     """
     if len(clauses) == 1:
         return clauses[0]
-    return ClauseList(operator=operators.and_, *clauses)
+    return BooleanClauseList(operator=operators.and_, *clauses)
 
 def or_(*clauses):
     """Join a list of clauses together using the ``OR`` operator.
 
     if len(clauses) == 1:
         return clauses[0]
-    return ClauseList(operator=operators.or_, *clauses)
+    return BooleanClauseList(operator=operators.or_, *clauses)
 
 def not_(clause):
     """Return a negation of the given clause, i.e. ``NOT(clause)``.
     """Describe a list of clauses, separated by an operator.
 
     By default, is comma-separated, such as a column listing.
+
     """
     __visit_name__ = 'clauselist'
 
         else:
             return False
 
+class BooleanClauseList(ClauseList, ColumnElement):
+    __visit_name__ = 'clauselist'
+    
+    def __init__(self, *clauses, **kwargs):
+        super(BooleanClauseList, self).__init__(*clauses, **kwargs)
+        self.type = sqltypes.to_instance(kwargs.get('type_', sqltypes.Boolean))
+
+    def self_group(self, against=None):
+        return _Grouping(self)
+    
 class _CalculatedClause(ColumnElement):
     """Describe a calculated SQL expression that has a type, like ``CASE``.
 
         e.element = self.element.correlate(fromclause).self_group()
         return e
 
+    def _get_from_objects(self, **modifiers):
+        return []
+
     def where(self, clause):
         """return a new exists() construct with the given expression added to its WHERE clause, joined
         to the existing clause via AND, if any."""

test/orm/query.py

         x = func.lala(users.c.id).label('foo')
         self.assert_compile(sess.query(x).filter(x==5).statement, 
             "SELECT lala(users.id) AS foo FROM users WHERE lala(users.id) = :param_1", dialect=default.DefaultDialect())
-        
+
 class CompileTest(QueryTest):
         
     def test_deferred(self):

test/sql/query.py

         self.assert_(not (rp != equal))
         self.assert_(not (equal != equal))
 
+    def test_or_and_as_columns(self):
+        if testing.against('sqlite'):
+            true, false = 1, 0
+        else:
+            true, false = literal_column('true'), literal_column('false')
+        
+        self.assertEquals(testing.db.execute(select([and_(true, false)])).scalar(), False)
+        self.assertEquals(testing.db.execute(select([and_(true, true)])).scalar(), True)
+        self.assertEquals(testing.db.execute(select([or_(true, false)])).scalar(), True)
+        self.assertEquals(testing.db.execute(select([or_(false, false)])).scalar(), False)
+        self.assertEquals(testing.db.execute(select([not_(or_(false, false))])).scalar(), True)
+
+        row = testing.db.execute(select([or_(false, false).label("x"), and_(true, false).label("y")])).fetchone()
+        assert row.x == False
+        assert row.y == False
+
+        row = testing.db.execute(select([or_(true, false).label("x"), and_(true, false).label("y")])).fetchone()
+        assert row.x == True
+        assert row.y == False
+        
     def test_fetchmany(self):
         users.insert().execute(user_id = 7, user_name = 'jack')
         users.insert().execute(user_id = 8, user_name = 'ed')

test/sql/select.py

         self.assert_compile(select([table1, exists([1], from_obj=table2).label('foo')]), "SELECT mytable.myid, mytable.name, mytable.description, EXISTS (SELECT 1 FROM myothertable) AS foo FROM mytable", params={})
 
         self.assert_compile(
-          table1.select(exists([1], table2.c.otherid == table1.c.myid).correlate(table1)),
-          "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT 1 FROM myothertable WHERE myothertable.otherid = mytable.myid)"
+          table1.select(exists().where(table2.c.otherid == table1.c.myid).correlate(table1)),
+          "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT * FROM myothertable WHERE myothertable.otherid = mytable.myid)"
         )
 
         self.assert_compile(
-          table1.select(exists([1]).where(table2.c.otherid == table1.c.myid).correlate(table1)),
-          "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT 1 FROM myothertable WHERE myothertable.otherid = mytable.myid)"
+          table1.select(exists().where(table2.c.otherid == table1.c.myid).correlate(table1)),
+          "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT * FROM myothertable WHERE myothertable.otherid = mytable.myid)"
         )
 
         self.assert_compile(
-          table1.select(exists([1]).where(table2.c.otherid == table1.c.myid).correlate(table1)).replace_selectable(table2, table2.alias()),
-          "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT 1 FROM myothertable AS myothertable_1 WHERE myothertable_1.otherid = mytable.myid)"
+          table1.select(exists().where(table2.c.otherid == table1.c.myid).correlate(table1)).replace_selectable(table2, table2.alias()),
+          "SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE EXISTS (SELECT * FROM myothertable AS myothertable_1 WHERE myothertable_1.otherid = mytable.myid)"
         )
 
         self.assert_compile(
-          table1.select(exists([1]).where(table2.c.otherid == table1.c.myid).correlate(table1)).select_from(table1.join(table2, table1.c.myid==table2.c.otherid)).replace_selectable(table2, table2.alias()),
-          "SELECT mytable.myid, mytable.name, mytable.description FROM mytable JOIN myothertable AS myothertable_1 ON mytable.myid = myothertable_1.otherid WHERE EXISTS (SELECT 1 FROM myothertable AS myothertable_1 WHERE myothertable_1.otherid = mytable.myid)"
+          table1.select(exists().where(table2.c.otherid == table1.c.myid).correlate(table1)).select_from(table1.join(table2, table1.c.myid==table2.c.otherid)).replace_selectable(table2, table2.alias()),
+          "SELECT mytable.myid, mytable.name, mytable.description FROM mytable JOIN myothertable AS myothertable_1 ON mytable.myid = myothertable_1.otherid WHERE EXISTS (SELECT * FROM myothertable AS myothertable_1 WHERE myothertable_1.otherid = mytable.myid)"
         )
+        
+        self.assert_compile(
+            select([
+                or_(
+                    exists().where(table2.c.otherid=='foo'),
+                    exists().where(table2.c.otherid=='bar')
+                )
+            ]),
+            "SELECT ((EXISTS (SELECT * FROM myothertable WHERE myothertable.otherid = :otherid_1)) "\
+            "OR (EXISTS (SELECT * FROM myothertable WHERE myothertable.otherid = :otherid_2))) AS anon_1"
+        )
+        
 
     def test_where_subquery(self):
         s = select([addresses.c.street], addresses.c.user_id==users.c.user_id, correlate=True).alias('s')
     
 
     def test_conjunctions(self):
+        a, b, c = 'a', 'b', 'c'
+        x = and_(a, b, c)
+        assert isinstance(x.type,  Boolean)
+        assert str(x) == 'a AND b AND c'
+        self.assert_compile(
+            select([x.label('foo')]),
+            'SELECT (a AND b AND c) AS foo'
+        )
+        
         self.assert_compile(
             and_(table1.c.myid == 12, table1.c.name=='asdf', table2.c.othername == 'foo', "sysdate() = today()"),
             "mytable.myid = :myid_1 AND mytable.name = :name_1 "\