Kirill Simonov avatar Kirill Simonov committed ccdaa72

Refactoring translator pipeline.

Comments (0)

Files changed (32)

src/htsql/core/tr/__init__.py

 
 
 from . import (assemble, binding, bind, coerce, compile, dump, encode, space,
-        fn, frame, lookup, plan, reduce, rewrite, signature, stitch, term,
+        fn, frame, lookup, pack, reduce, rewrite, signature, stitch, term,
         translate)
 
 

src/htsql/core/tr/assemble.py

 from ..adapter import Adapter, adapt, adapt_many
 from ..domain import BooleanDomain, UntypedDomain, Record, ID
 from .coerce import coerce
-from .binding import WeakSegmentBinding
-from .space import (Code, SegmentCode, LiteralCode, FormulaCode, CastCode,
-        RecordCode, IdentityCode, AnnihilatorCode, CorrelationCode, Unit,
-        ColumnUnit, CompoundUnit)
-from .term import (PreTerm, Term, UnaryTerm, BinaryTerm, TableTerm, ScalarTerm,
+from .space import (Code, SegmentExpr, LiteralCode, FormulaCode, CastCode,
+        CorrelationCode, Unit, ColumnUnit, CompoundUnit)
+from .term import (Term, UnaryTerm, BinaryTerm, TableTerm, ScalarTerm,
         FilterTerm, JoinTerm, CorrelationTerm, EmbeddingTerm, ProjectionTerm,
-        OrderTerm, SegmentTerm, QueryTerm)
+        OrderTerm, SegmentTerm)
 from .frame import (ScalarFrame, TableFrame, NestedFrame, SegmentFrame,
-        QueryFrame, LiteralPhrase, TruePhrase, CastPhrase, ColumnPhrase,
-        ReferencePhrase, EmbeddingPhrase, FormulaPhrase, Anchor, LeadingAnchor)
+        LiteralPhrase, TruePhrase, CastPhrase, ColumnPhrase, ReferencePhrase,
+        EmbeddingPhrase, FormulaPhrase, Anchor, LeadingAnchor, Phrase)
 from .pipe import (ValuePipe, ExtractPipe, RecordPipe, MixPipe, ComposePipe,
         AnnihilatePipe, IteratePipe, SinglePipe)
 from .signature import (Signature, IsEqualSig, IsTotallyEqualSig, IsInSig,
         A key of the mapping is the broker tag.  A value of the mapping
         is a list of :class:`Claim` objects with the same broker.
 
-    `phrases_by_claim` (a mapping `Claim -> [Phrase]`)
+    `phrase_by_claim` (a mapping `Claim -> Phrase`)
         Satisfied claims.
 
         A key of the mapping is a :class:`Claim` object.  A value of the
         # Unit claims grouped by the broker.
         self.claims_by_broker = None
         # Satisfied unit claims.
-        self.phrases_by_claim = None
-        # Stencils and other composition state.
-        self.subterms_by_segment = {}
-        self.stencils_by_term = {}
-        self.location_by_term = {}
-        self.shift_by_term = {}
-        self.name_stack = []
-        self.name = None
-        self.segment_stack = []
-        self.segment = None
+        self.phrase_by_claim = None
         self.correlations_stack = []
         self.correlations = {}
 
     def pop_correlations(self):
         self.correlations = self.correlations_stack.pop()
 
-    def push_name(self, name):
-        assert isinstance(name, maybe(unicode))
-        self.name_stack.append(self.name)
-        self.name = name
-
-    def pop_name(self):
-        self.name = self.name_stack.pop()
-
-    def push_segment(self, code):
-        assert isinstance(code, SegmentCode)
-        term = self.subterms_by_segment[self.segment][code]
-        self.segment_stack.append(self.segment)
-        self.segment = term
-        self.shift_by_term[term] = 0
-
-    def pop_segment(self):
-        self.segment = self.segment_stack.pop()
-
-    def reset_shift(self, term):
-        self.shift_by_term[term] = 0
-
-    def save_subterms(self, term, subterms):
-        assert isinstance(term, maybe(SegmentTerm))
-        assert isinstance(subterms, listof(SegmentTerm))
-        term_by_code = dict((subterm.code, subterm)
-                            for subterm in subterms)
-        self.subterms_by_segment[term] = term_by_code
-
-    def save_location(self, term, location):
-        assert isinstance(term, SegmentTerm)
-        assert isinstance(location, maybe(int))
-        self.location_by_term[term] = location
-
-    def get_location(self):
-        return self.location_by_term[self.segment]
-
-    def save_stencils(self, term, stencils):
-        assert isinstance(term, SegmentTerm)
-        assert isinstance(stencils, listof(listof(int))) and len(stencils) == 3
-        self.stencils_by_term[term] = stencils
-
-    def get_code_stencil(self, term=None):
-        if term is None:
-            term = self.segment
-        return self.stencils_by_term[term][0]
-
-    def get_superkey_stencil(self, term=None):
-        if term is None:
-            term = self.segment
-        return self.stencils_by_term[term][1]
-
-    def get_key_stencil(self, term=None):
-        if term is None:
-            term = self.segment
-        return self.stencils_by_term[term][2]
-
-    def get_next_index(self):
-        stencil = self.get_code_stencil()
-        index = stencil[self.shift_by_term[self.segment]]
-        self.shift_by_term[self.segment] += 1
-        return index
-
-    def compose(self, code):
-        return Compose.__invoke__(code, self)
-
     def set_tree(self, term):
         """
         Initializes the assembling state.
         # Initialize claim containers.
         self.claim_set = set()
         self.claims_by_broker = {}
-        self.phrases_by_claim = {}
+        self.phrase_by_claim = {}
         # Initialize `claims_by_broker` with an empty list for each node
         # in the term tree.
         self.claims_by_broker[term.tag] = []
         for offspring in term.offsprings:
             self.claims_by_broker[offspring] = []
 
-    def flush(self):
-        """
-        Clears the assembling state.
-        """
-        # Revert all attributes to their pristine state.  We assume that
-        # `gate_stack` is already empty.
-        self.gate = None
-        self.claim_set = None
-        self.claims_by_broker = None
-        self.phrases_by_claim = None
-        self.segment_indexes = None
-        self.correlations_stack = []
-        self.correlations = {}
-
     def push_gate(self, is_nullable=None, dispatcher=None, router=None):
         """
         Updates the current dispatching context.
             A term node.
         """
         # Realize and call the `Assemble` adapter.
-        return assemble(term, self)
+        return Assemble.__invoke__(term, self)
 
     def appoint(self, unit):
         """
         # Restore the original gate.
         self.pop_gate()
 
-    def evaluate_all(self, code, dispatcher=None, router=None):
+    def evaluate(self, code, dispatcher=None, router=None):
         """
         Evaluates the given code node.
 
         # Update the gate to use the given dispatcher and router.
         self.push_gate(dispatcher=dispatcher, router=router)
         # Realize and call the `Evaluate` adapter.
-        phrases = list(Evaluate.__invoke__(code, self))
+        phrase = Evaluate.__invoke__(code, self)
         # Restore the original gate.
         self.pop_gate()
         # Return the generated phrase.
-        return phrases
-
-    def evaluate(self, code, dispatcher=None, router=None):
-        phrases = self.evaluate_all(code, dispatcher, router)
-        assert len(phrases) == 1
-        [phrase] = phrases
         return phrase
 
     def demand(self, claim):
             # Assign it to the broker.
             self.claims_by_broker[claim.broker].append(claim)
 
-    def supply(self, claim, *phrases):
+    def supply(self, claim, phrase):
         """
         Satisfies the claim.
 
         # A few sanity checks.  Verify that the claim was actually requested.
         assert claim in self.claim_set
         # Verify that the claim was not satisfied already.
-        assert claim not in self.phrases_by_claim
+        assert claim not in self.phrase_by_claim
         # Save the phrase.
-        self.phrases_by_claim[claim] = list(phrases)
+        self.phrase_by_claim[claim] = phrase
 
     def to_predicate(self, phrase):
         return FormulaPhrase(ToPredicateSig(), phrase.domain,
         The current state of the assembling process.
     """
 
-    adapt(PreTerm)
+    adapt(Term)
 
     def __init__(self, term, state):
-        assert isinstance(term, PreTerm)
+        assert isinstance(term, Term)
         assert isinstance(state, AssemblingState)
         self.term = term
         self.state = state
+        self.claims = state.claims_by_broker[term.tag]
 
     def __call__(self):
         # Implement in subclasses.
                                   " for a %r node" % self.term)
 
 
-class AssembleTerm(Assemble):
-    """
-    Assembles a frame for a proper term node.
-
-    This adapt :class:`Assemble` for proper term nodes (i.e., not a
-    :class:`htsql.core.tr.term.QueryTerm`).
-
-    Attributes:
-
-    `claims` (a list of :class:`Claim`)
-        The claims that are dispatched to the term.
-    """
-
-    adapt(Term)
-
-    def __init__(self, term, state):
-        super(AssembleTerm, self).__init__(term, state)
-        # Extract claims dispatched to the given term.
-        self.claims = state.claims_by_broker[term.tag]
-
-
 class AssembleScalar(Assemble):
     """
     Assembles a (scalar) frame for a scalar term.
                 # Generate a forward claim (again).
                 next_claim = self.state.forward(claim)
                 # We expect the forward claim to be already satisfied.
-                assert next_claim in self.state.phrases_by_claim
+                assert next_claim in self.state.phrase_by_claim
                 # Get the export phrase that satisfied the claim.
-                phrases = self.state.phrases_by_claim[next_claim]
+                phrase = self.state.phrase_by_claim[next_claim]
             # Otherwise, the claim is targeted to us and we are responsible
             # for evaluating the unit of the claim.
             else:
                 # Since it is a branch term, the unit must be compound.
                 # So we evaluate the code expression of the unit.
-                phrases = self.state.evaluate_all(claim.unit.code)
+                phrase = self.state.evaluate(claim.unit.code)
 
-            export = []
-            for phrase in phrases:
-                # Check if the generated phrase is a duplicate.  Even though
-                # all claims are unique, different claims may still produce
-                # identical phrases.  Therefore an extra check here is necessary.
-                if phrase not in index_by_phrase:
-                    # So it is not a duplicate: add it to the `SELECT` list.
-                    index = len(select)
-                    select.append(phrase)
-                    index_by_phrase[phrase] = index
+            # Check if the generated phrase is a duplicate.  Even though
+            # all claims are unique, different claims may still produce
+            # identical phrases.  Therefore an extra check here is necessary.
+            if phrase not in index_by_phrase:
+                # So it is not a duplicate: add it to the `SELECT` list.
+                index = len(select)
+                select.append(phrase)
+                index_by_phrase[phrase] = index
 
-                # Generate an export reference to the phrase.
-                index = index_by_phrase[phrase]
-                domain = phrase.domain
-                # The reference is nullable if the referenced phrase itself is
-                # nullable or the assembled frame is to be attached using an
-                # `OUTER JOIN`.
-                is_nullable = (phrase.is_nullable or
-                               self.state.gate.is_nullable)
-                reference = ReferencePhrase(self.term.tag, index, domain,
-                                            is_nullable, claim.unit)
-                export.append(reference)
+            # Generate an export reference to the phrase.
+            index = index_by_phrase[phrase]
+            domain = phrase.domain
+            # The reference is nullable if the referenced phrase itself is
+            # nullable or the assembled frame is to be attached using an
+            # `OUTER JOIN`.
+            is_nullable = (phrase.is_nullable or
+                           self.state.gate.is_nullable)
+            reference = ReferencePhrase(self.term.tag, index, domain,
+                                        is_nullable, claim.unit)
             # Satisfy the claim with the generated reference.
-            self.state.supply(claim, *export)
+            self.state.supply(claim, reference)
 
         # It might happen (though with the way the compiler generates the term
         # tree, it is probably impossible) that the frame has no claims
         # This is a top-level frame, so it can't have claims.
         assert not self.claims
         # Assign all the units in the `SELECT` clause.
-        self.state.schedule(self.term.code.code)
+        for code in self.term.codes:
+            self.state.schedule(code)
         for code in self.term.superkeys:
             self.state.schedule(code)
-        if self.term.subtrees:
+        if self.term.dependents:
             for code in self.term.keys:
                 self.state.schedule(code)
 
     def assemble_select(self):
         # Assemble a `SELECT` clause.
         select = []
-        stencils = []
-        phrase_groups = []
-        phrase_groups.append(self.state.evaluate_all(self.term.code.code))
-        phrase_groups.append([self.state.evaluate(code)
-                              for code in self.term.superkeys])
-        if self.term.subtrees:
-            phrase_groups.append([self.state.evaluate(code)
-                                  for code in self.term.keys])
-        else:
-            phrase_groups.append([])
+        index_by_code = {}
+        codes = self.term.codes + self.term.superkeys
+        if self.term.dependents:
+            codes += self.term.keys
         index_by_phrase = {}
-        for phrases in phrase_groups:
-            stencil = []
-            for phrase in phrases:
-                if phrase not in index_by_phrase:
-                    index = len(select)
-                    select.append(phrase)
-                    index_by_phrase[phrase] = index
-                stencil.append(index_by_phrase[phrase])
-            stencils.append(stencil)
+        for code in codes:
+            if isinstance(code, LiteralCode) and \
+                    isinstance(code.domain, (UntypedDomain, BooleanDomain)):
+                continue
+            phrase = self.state.evaluate(code)
+            if phrase not in index_by_phrase:
+                index = len(select)
+                select.append(phrase)
+                index_by_phrase[phrase] = index
+            index_by_code[code] = index_by_phrase[phrase]
         if not select:
-            select = [TruePhrase(self.term.code)]
-        self.state.save_stencils(self.term, stencils)
-        return select
+            select = [TruePhrase(self.term.space)]
+        return select, index_by_code
 
-    def assemble_subtrees(self):
-        subterms = []
-        subtrees = []
-        locations = []
-        duplicates = set()
-        for code in self.term.code.code.segments:
-            if code in duplicates:
+    def assemble_dependents(self):
+        dependents = []
+        index_by_term = {}
+        for subterm in self.term.dependents:
+            if subterm in index_by_term:
                 continue
-            duplicates.add(code)
-            subterm = self.term.subtrees[code]
-            location = len(subtrees)
-            subterms.append(subterm)
             self.state.set_tree(subterm)
-            locations.append((subterm, location))
-            subframe = self.state.assemble(subterm)
-            subtrees.append(subframe)
-        for subterm, location in locations:
-            location -= len(locations)
-            self.state.save_location(subterm, location)
-        self.state.save_subterms(self.term, subterms)
-        return subtrees
+            dependent = self.state.assemble(subterm)
+            index_by_term[subterm] = len(dependents)
+            dependents.append(dependent)
+        for subterm in index_by_term.keys():
+            index_by_term[subterm] -= len(dependents)
+        return dependents, index_by_term
 
     def assemble_frame(self, include, embed, select,
                        where, group, having,
                        order, limit, offset):
-        subtrees = self.assemble_subtrees()
-        mix_pipe = None
-        if subtrees:
-            key_pipes = []
-            stencils = []
-            stencils.append(self.state.get_key_stencil(self.term))
-            for subframe in subtrees:
-                stencils.append(self.state.get_superkey_stencil(subframe.term))
-            for stencil in stencils:
-                if len(stencil) == 0:
-                    key_pipe = ValuePipe(True)
-                elif len(stencil) == 1:
-                    key_pipe =  ExtractPipe(stencil[0])
+        select, index_by_code = select
+        dependents, index_by_term = self.assemble_dependents()
+        code_pipes = []
+        for code in self.term.codes:
+            if isinstance(code, LiteralCode) and \
+                    isinstance(code.domain, (UntypedDomain, BooleanDomain)):
+                pipe = ValuePipe(code.value)
+            else:
+                index = index_by_code[code]
+                pipe = ExtractPipe(index)
+            code_pipes.append(pipe)
+        dependent_pipes = []
+        for subterm in self.term.dependents:
+            index = index_by_term[subterm]
+            pipe = ExtractPipe(index)
+            dependent_pipes.append(pipe)
+        superkey_pipes = []
+        for code in self.term.superkeys:
+            if isinstance(code, LiteralCode) and \
+                    isinstance(code.domain, (UntypedDomain, BooleanDomain)):
+                pipe = ValuePipe(code.value)
+            else:
+                index = index_by_code[code]
+                pipe = ExtractPipe(index)
+            superkey_pipes.append(pipe)
+        if len(superkey_pipes) == 0:
+            superkey_pipe = ValuePipe(True)
+        elif len(superkey_pipes) == 1:
+            [superkey_pipe] = superkey_pipes
+        else:
+            superkey_pipe = RecordPipe(superkey_pipes)
+        key_pipes = []
+        if self.term.dependents:
+            for code in self.term.keys:
+                if isinstance(code, LiteralCode) and \
+                        isinstance(code.domain, (UntypedDomain, BooleanDomain)):
+                    pipe = ValuePipe(code.value)
                 else:
-                    key_pipe = RecordPipe([ExtractPipe(index)
-                                           for index in stencil])
-                key_pipes.append(key_pipe)
-            mix_pipe = MixPipe(key_pipes)
+                    index = index_by_code[code]
+                    pipe = ExtractPipe(index)
+                key_pipes.append(pipe)
+        if len(key_pipes) == 0:
+            key_pipe = ValuePipe(True)
+        elif len(key_pipes) == 1:
+            [key_pipe] = key_pipes
+        else:
+            key_pipe = RecordPipe(key_pipes)
         return SegmentFrame(include, embed, select,
                            where, group, having,
                            order, limit, offset,
-                           mix_pipe, subtrees, self.term)
-
-
-class AssembleQuery(Assemble):
-    """
-    Assembles a top-level query frame.
-    """
-
-    adapt(QueryTerm)
-
-    def __call__(self):
-        # Compile the segment frame.
-        segment = None
-        value_pipe = None
-        if self.term.segment is not None:
-            # Initialize the state.
-            self.state.set_tree(self.term.segment)
-            self.state.save_location(self.term.segment, None)
-            self.state.save_subterms(None, [self.term.segment])
-            self.state.push_name(self.term.binding.profile.tag)
-            # Compile the segment.
-            segment = self.state.assemble(self.term.segment)
-            # Generate the compositor.
-            value_pipe = self.state.compose(self.term.segment.code)
-            # Clean up the state.
-            self.state.flush()
-        # Generate a frame node.
-        return QueryFrame(segment, value_pipe, self.term)
+                           code_pipes, dependent_pipes,
+                           superkey_pipe, key_pipe,
+                           dependents, self.term)
 
 
 class Evaluate(Adapter):
                                   " for a %r node" % self.code)
 
 
-class EvaluateSegment(Evaluate):
-
-    adapt(SegmentCode)
-
-    def __call__(self):
-        # Nested segments are serialized into a separate frame tree.
-        return []
-
-
 class EvaluateLiteral(Evaluate):
     """
     Evaluates a literal code.
     adapt(LiteralCode)
 
     def __call__(self):
-        # Keep all attributes, but switch the class.
-        if not isinstance(self.code.domain, UntypedDomain):
-            yield LiteralPhrase(self.code.value, self.code.domain, self.code)
+        return LiteralPhrase(self.code.value, self.code.domain, self.code)
 
 
 class EvaluateCast(Evaluate):
         # Note that the for some source and target domains, the reducer will
         # retranslate a generic cast phrase to a more specific expression.
         base = self.state.evaluate(self.code.base)
-        yield CastPhrase(base, self.code.domain, base.is_nullable, self.code)
-
-
-class EvaluateRecord(Evaluate):
-
-    adapt_many(RecordCode,
-               IdentityCode)
-
-    def __call__(self):
-        for field in self.code.fields:
-            for phrase in self.state.evaluate_all(field):
-                yield phrase
-
-
-class EvaluateAnnihilator(Evaluate):
-
-    adapt(AnnihilatorCode)
-
-    def __call__(self):
-        yield self.state.evaluate(self.code.indicator)
-        for phrase in self.state.evaluate_all(self.code.code):
-            yield phrase
+        return CastPhrase(base, self.code.domain, base.is_nullable, self.code)
 
 
 class EvaluateCorrelation(Evaluate):
         assert self.code.code in self.state.correlations
         # FIXME: is this correct in case of nested correlated subqueries?
         # FIXME: implement decompose?
-        yield self.state.correlations[self.code.code]
+        return self.state.correlations[self.code.code]
 
 
 class EvaluateFormula(Evaluate):
         # should be overridden for nodes where it is not the case.
         is_nullable = any(cell.is_nullable for cell in arguments.cells())
         # Generate a new formula node.
-        yield FormulaPhrase(self.signature,
-                            self.domain,
-                            is_nullable,
-                            self.code,
-                            **arguments)
+        return FormulaPhrase(self.signature,
+                             self.domain,
+                             is_nullable,
+                             self.code,
+                             **arguments)
 
 
 class EvaluateIsEqualBase(EvaluateBySignature):
     adapt_many(IsEqualSig, IsInSig, CompareSig)
 
     def __call__(self):
-        for phrase in super(EvaluateIsEqualBase, self).__call__():
-            yield self.state.from_predicate(phrase)
+        phrase = super(EvaluateIsEqualBase, self).__call__()
+        return self.state.from_predicate(phrase)
 
 
 class EvaluateIsTotallyEqualBase(EvaluateBySignature):
         arguments = self.arguments.map(self.state.evaluate)
         phrase = FormulaPhrase(self.signature, self.domain,
                                False, self.code, **arguments)
-        yield self.state.from_predicate(phrase)
+        return self.state.from_predicate(phrase)
 
 
 class EvaluateNullIf(EvaluateBySignature):
         # Override the default implementation since the `null_if()`
         # operator is not null-regular, and, in fact, is always nullable.
         arguments = self.arguments.map(self.state.evaluate)
-        yield FormulaPhrase(self.signature, self.domain,
-                            True, self.code, **arguments)
+        return FormulaPhrase(self.signature, self.domain,
+                             True, self.code, **arguments)
 
 
 class EvaluateIfNull(EvaluateBySignature):
         # its arguments are nullable.
         arguments = self.arguments.map(self.state.evaluate)
         is_nullable = all(cell.is_nullable for cell in arguments.cells())
-        yield FormulaPhrase(self.signature, self.domain,
-                            is_nullable, self.code, **arguments)
+        return FormulaPhrase(self.signature, self.domain,
+                             is_nullable, self.code, **arguments)
 
 
 class EvaluateAndOrNot(EvaluateBySignature):
                                is_nullable,
                                self.code,
                                **arguments)
-        yield self.state.from_predicate(phrase)
+        return self.state.from_predicate(phrase)
 
 
 class EvaluateUnit(Evaluate):
         # Generate a claim for a unit (for the second time).
         claim = self.state.appoint(self.code)
         # We expect the claim to be already satisfied.
-        assert claim in self.state.phrases_by_claim
+        assert claim in self.state.phrase_by_claim
         # Return the export phrase corresponding to the claim.
-        return self.state.phrases_by_claim[claim]
+        return self.state.phrase_by_claim[claim]
 
 
 class Compose(Adapter):
 
 class ComposeSegment(Compose):
 
-    adapt(SegmentCode)
+    adapt(SegmentExpr)
 
     def __call__(self):
         self.state.push_segment(self.code)
         return ValuePipe(self.code.value)
 
 
-class ComposeRecord(Compose):
+#class ComposeRecord(Compose):
+#
+#    adapt(RecordCode)
+#
+#    def __call__(self):
+#        field_pipes = []
+#        field_names = []
+#        for field, profile in zip(self.code.fields,
+#                                  self.code.domain.fields):
+#            field_names.append(profile.tag)
+#            self.state.push_name(profile.tag)
+#            field_pipe = self.state.compose(field)
+#            self.state.pop_name()
+#            field_pipes.append(field_pipe)
+#        record_class = Record.make(self.state.name, field_names)
+#        return RecordPipe(field_pipes, record_class)
 
-    adapt(RecordCode)
 
-    def __call__(self):
-        field_pipes = []
-        field_names = []
-        for field, profile in zip(self.code.fields,
-                                  self.code.domain.fields):
-            field_names.append(profile.tag)
-            self.state.push_name(profile.tag)
-            field_pipe = self.state.compose(field)
-            self.state.pop_name()
-            field_pipes.append(field_pipe)
-        record_class = Record.make(self.state.name, field_names)
-        return RecordPipe(field_pipes, record_class)
+#class ComposeIdentity(Compose):
+#
+#    adapt(IdentityCode)
+#
+#    def __call__(self):
+#        field_pipes = []
+#        for field in self.code.fields:
+#            field_pipe = self.state.compose(field)
+#            field_pipes.append(field_pipe)
+#        id_class = ID.make(self.code.domain.dump)
+#        return RecordPipe(field_pipes, id_class)
 
 
-class ComposeIdentity(Compose):
+#class ComposeAnnihilator(Compose):
+#
+#    adapt(AnnihilatorCode)
+#
+#    def __call__(self):
+#        test_pipe = self.state.compose(self.code.indicator)
+#        pipe = self.state.compose(self.code.code)
+#        return AnnihilatePipe(test_pipe, pipe)
 
-    adapt(IdentityCode)
 
-    def __call__(self):
-        field_pipes = []
-        for field in self.code.fields:
-            field_pipe = self.state.compose(field)
-            field_pipes.append(field_pipe)
-        id_class = ID.make(self.code.domain.dump)
-        return RecordPipe(field_pipes, id_class)
+def assemble(segment):
+    state = AssemblingState()
+    state.set_tree(segment)
+    return state.assemble(segment)
 
 
-class ComposeAnnihilator(Compose):
-
-    adapt(AnnihilatorCode)
-
-    def __call__(self):
-        test_pipe = self.state.compose(self.code.indicator)
-        pipe = self.state.compose(self.code.code)
-        return AnnihilatePipe(test_pipe, pipe)
-
-
-def assemble(term, state=None):
-    """
-    Compiles a new frame node for the given term.
-
-    Returns a :class:`htsql.core.tr.frame.Frame` instance.
-
-    `term` (:class:`htsql.core.tr.term.Term`)
-        A term node.
-
-    `state` (:class:`AssemblingState` or ``None``)
-        The assembling state to use.  If not set, a new assembling
-        state is instantiated.
-    """
-    # Instantiate a new assembling state if not given one.
-    if state is None:
-        state = AssemblingState()
-    # Realize and apply the `Assemble` adapter; return the generated frame.
-    return Assemble.__invoke__(term, state)
-
-

src/htsql/core/tr/bind.py

         AssignSyntax, ComposeSyntax, LocateSyntax, IdentitySyntax, GroupSyntax,
         IdentifierSyntax, UnpackSyntax, ReferenceSyntax, LiftSyntax,
         StringSyntax, LabelSyntax, NumberSyntax, RecordSyntax, DirectSyntax)
-from .binding import (Binding, WrappingBinding, QueryBinding, SegmentBinding,
-        WeakSegmentBinding, RootBinding, HomeBinding, TableBinding,
-        ChainBinding, ColumnBinding, QuotientBinding, KernelBinding,
-        ComplementBinding, LocateBinding, SieveBinding, AttachBinding,
-        SortBinding, CastBinding, IdentityBinding, ImplicitCastBinding,
-        RescopingBinding, AssignmentBinding, DefineBinding,
-        DefineReferenceBinding, DefineLiftBinding, SelectionBinding,
-        WildSelectionBinding, DirectionBinding, TitleBinding, RerouteBinding,
-        ReferenceRerouteBinding, AliasBinding, LiteralBinding, FormulaBinding,
-        VoidBinding, Recipe, LiteralRecipe, SelectionRecipe, FreeTableRecipe,
-        AttachedTableRecipe, ColumnRecipe, KernelRecipe, ComplementRecipe,
-        IdentityRecipe, ChainRecipe, SubstitutionRecipe, BindingRecipe,
-        ClosedRecipe, PinnedRecipe, AmbiguousRecipe)
+from .binding import (Binding, WrappingBinding, CollectBinding, RootBinding,
+        HomeBinding, TableBinding, ChainBinding, ColumnBinding,
+        QuotientBinding, KernelBinding, ComplementBinding, LocateBinding,
+        SieveBinding, AttachBinding, SortBinding, CastBinding, IdentityBinding,
+        ImplicitCastBinding, RescopingBinding, AssignmentBinding,
+        DefineBinding, DefineReferenceBinding, DefineLiftBinding,
+        SelectionBinding, WildSelectionBinding, DirectionBinding, TitleBinding,
+        RerouteBinding, ReferenceRerouteBinding, AliasBinding, LiteralBinding,
+        FormulaBinding, VoidBinding, Recipe, LiteralRecipe, SelectionRecipe,
+        FreeTableRecipe, AttachedTableRecipe, ColumnRecipe, KernelRecipe,
+        ComplementRecipe, IdentityRecipe, ChainRecipe, SubstitutionRecipe,
+        BindingRecipe, ClosedRecipe, PinnedRecipe, AmbiguousRecipe)
 from .lookup import (lookup_attribute, lookup_reference, lookup_complement,
         lookup_attribute_set, lookup_reference_set, expand, direct, guess_tag,
         identify, unwrap)
 
 
 class BindingState(object):
-    """
-    Encapsulates the (mutable) state of the binding process.
 
-    State attributes:
-
-    `root` (:class:`htsql.core.tr.binding.RootBinding`)
-        The root naming scope.
-
-    `scope` (:class:`htsql.core.tr.binding.Binding`)
-        The current naming scope.
-    """
-
-    def __init__(self, environment=None):
+    def __init__(self, root, environment=None):
+        assert isinstance(root, RootBinding)
         # The root lookup scope.
-        self.root = None
+        self.root = root
         # The current lookup scope.
-        self.scope = None
+        self.scope = root
         # The stack of previous lookup scopes.
         self.scope_stack = []
         # References in the root scope.
         self.environment = environment
-
-    def set_root(self, root):
-        """
-        Sets the root lookup context.
-
-        This function initializes the lookup context stack and must be
-        called before any calls of :meth:`push_scope` and :meth:`pop_scope`.
-
-        `root` (:class:`htsql.core.tr.binding.RootBinding`)
-            The root lookup scope.
-        """
-        # Check that the lookup stack is not initialized.
-        assert self.root is None
-        assert self.scope is None
-        assert isinstance(root, RootBinding)
-        self.root = root
-        self.scope = root
-        # Add global references.
         if self.environment is not None:
             for name, recipe in self.environment:
                 name = normalize(name)
                 self.scope = DefineReferenceBinding(self.scope, name,
                                                     recipe, self.scope.syntax)
 
-    def flush(self):
-        """
-        Clears the lookup scopes.
-        """
-        # We expect the lookup scope stack to be empty and the current
-        # scope to coincide with the root scope.
-        assert self.root is not None
-        assert not self.scope_stack
-        #assert self.root is self.scope
-        self.root = None
-        self.scope = None
-
     def push_scope(self, scope):
         """
         Sets the new lookup scope.
             binding the syntax node.
         """
         with translate_guard(syntax):
-            return bind(syntax, self, scope)
+            if scope is not None:
+                self.push_scope(scope)
+            binding = Bind.__prepare__(syntax, self)()
+            if scope is not None:
+                self.pop_scope()
+            return binding
 
     def use(self, recipe, syntax, scope=None):
         """
             seed = self.state.scope
         seed = Select.__invoke__(seed, self.state)
         domain = ListDomain(seed.domain)
-        return SegmentBinding(self.state.scope, seed, domain,
+        return CollectBinding(self.state.scope, seed, domain,
                               self.syntax)
 
 
             raise Error("Found ambiguous name", syntax)
 
 
-def bind(syntax, state=None, scope=None, environment=None):
-    if state is not None:
-        if scope is not None:
-            state.push_scope(scope)
-        binding = Bind.__invoke__(syntax, state)
-        if scope is not None:
-            state.pop_scope()
-        return binding
+def bind(syntax, environment=None):
+    recipes = []
+    if environment is not None:
+        for name in sorted(environment):
+            value = environment[name]
+            if isinstance(value.domain, ListDomain):
+                item_recipes = [LiteralRecipe(item,
+                                              value.domain.item_domain)
+                                for item in value.data]
+                recipe = SelectionRecipe(item_recipes)
+            elif isinstance(value.domain, RecordDomain):
+                item_recipes = [LiteralRecipe(item, profile.domain)
+                                for item, profile in
+                                    zip(value.data, value.domain.fields)]
+                recipe = SelectionRecipe(item_recipes)
+            elif isinstance(value.domain, IdentityDomain):
+                def convert(domain, data):
+                    items = []
+                    for element, item in zip(domain.labels, data):
+                        if isinstance(element, IdentityDomain):
+                            item = convert(element, item)
+                        else:
+                            item = LiteralRecipe(item, element)
+                        items.append(item)
+                    return IdentityRecipe(items)
+                recipe = convert(value.domain, value.data)
+            else:
+                recipe = LiteralRecipe(value.data, value.domain)
+            recipes.append((name, recipe))
+    root = RootBinding(syntax)
+    state = BindingState(root, recipes)
+    if isinstance(syntax, AssignSyntax):
+        specifier = syntax.larm
+        with translate_guard(specifier):
+            if specifier.identifier is None:
+                raise Error("Expected an identifier")
+        identifier = specifier.larms[0]
+        binding = state.bind(syntax.rarm)
+        binding = Select.__invoke__(binding, state)
+        binding = TitleBinding(binding, identifier, binding.syntax)
     else:
-        recipes = []
-        if environment is not None:
-            for name in sorted(environment):
-                value = environment[name]
-                if isinstance(value.domain, ListDomain):
-                    item_recipes = [LiteralRecipe(item,
-                                                  value.domain.item_domain)
-                                    for item in value.data]
-                    recipe = SelectionRecipe(item_recipes)
-                elif isinstance(value.domain, RecordDomain):
-                    item_recipes = [LiteralRecipe(item, profile.domain)
-                                    for item, profile in
-                                        zip(value.data, value.domain.fields)]
-                    recipe = SelectionRecipe(item_recipes)
-                elif isinstance(value.domain, IdentityDomain):
-                    def convert(domain, data):
-                        items = []
-                        for element, item in zip(domain.labels, data):
-                            if isinstance(element, IdentityDomain):
-                                item = convert(element, item)
-                            else:
-                                item = LiteralRecipe(item, element)
-                            items.append(item)
-                        return IdentityRecipe(items)
-                    recipe = convert(value.domain, value.data)
-                else:
-                    recipe = LiteralRecipe(value.data, value.domain)
-                recipes.append((name, recipe))
-        state = BindingState(recipes)
-        root = RootBinding(syntax)
-        state.set_root(root)
-        if isinstance(syntax, AssignSyntax):
-            specifier = syntax.larm
-            with translate_guard(specifier):
-                if specifier.identifier is None:
-                    raise Error("Expected an identifier")
-            identifier = specifier.larms[0]
-            segment = state.bind(syntax.rarm)
-            if not isinstance(segment, SegmentBinding):
-                segment = Select.__invoke__(segment, state)
-                segment = WeakSegmentBinding(root, segment,
-                                             segment.domain, segment.syntax)
-            segment = segment.clone(seed=TitleBinding(segment.seed, identifier,
-                                                      segment.seed.syntax))
-        else:
-            segment = state.bind(syntax)
-            if not isinstance(segment, SegmentBinding):
-                segment = Select.__invoke__(segment, state)
-                segment = WeakSegmentBinding(root, segment,
-                                             segment.domain, segment.syntax)
-        state.flush()
-        profile = decorate(segment)
-        return QueryBinding(root, segment, profile, syntax)
+        binding = state.bind(syntax)
+        binding = Select.__invoke__(binding, state)
+    return binding
 
 

src/htsql/core/tr/binding.py

         self.recipe = recipe
 
 
-class QueryBinding(Binding):
-    """
-    Represents the whole HTSQL query.
-
-    `segment` (:class:`SegmentBinding` or ``None``)
-        The top segment.
-    """
-
-    def __init__(self, base, segment, profile, syntax):
-        assert isinstance(base, RootBinding)
-        assert isinstance(segment, maybe(SegmentBinding))
-        assert isinstance(profile, Profile)
-        super(QueryBinding, self).__init__(base, VoidDomain(), syntax)
-        self.segment = segment
-        self.profile = profile
-
-
-class SegmentBinding(Binding):
+class CollectBinding(Binding):
     """
     Represents a segment of an HTSQL query.
 
     def __init__(self, base, seed, domain, syntax):
         assert isinstance(base, Binding)
         assert isinstance(seed, Binding)
-        super(SegmentBinding, self).__init__(base, domain, syntax)
+        super(CollectBinding, self).__init__(base, domain, syntax)
         self.seed = seed
 
 
-class WeakSegmentBinding(SegmentBinding):
-    pass
-
-
 class ScopingBinding(Binding):
     """
     Represents a binding node that introduces a new naming scope.

src/htsql/core/tr/compile.py

 from .coerce import coerce
 from .signature import (IsNullSig, IsEqualSig, AndSig, CompareSig,
         SortDirectionSig, RowNumberSig)
-from .space import (Expression, QueryExpr, SegmentCode, Code, LiteralCode,
+from .space import (Expression, SegmentExpr, Code, LiteralCode,
         FormulaCode, Space, RootSpace, ScalarSpace, TableSpace, QuotientSpace,
         ComplementSpace, MonikerSpace, LocatorSpace, ForkedSpace, AttachSpace,
         ClippedSpace, FilteredSpace, OrderedSpace, Unit, ScalarUnit,
         CorrelationCode)
 from .term import (Term, ScalarTerm, TableTerm, FilterTerm, JoinTerm,
         EmbeddingTerm, CorrelationTerm, ProjectionTerm, OrderTerm, WrapperTerm,
-        PermanentTerm, SegmentTerm, QueryTerm, Joint)
+        PermanentTerm, SegmentTerm, Joint)
 from .stitch import arrange, spread, sew, tie
 
 
         inflated.
     """
 
-    def __init__(self):
+    def __init__(self, root):
         # The next term tag to be produced by `tag`.
         self.next_tag = 1
         # The root scalar space.
-        self.root = None
+        self.root = root
         # The stack of previous baseline spaces.
         self.baseline_stack = []
         # The current baseline space.
-        self.baseline = None
+        self.baseline = root
         # Support for nested segments.
         self.superspace_stack = []
-        self.superspace = None
+        self.superspace = root
 
     def tag(self):
         """
         self.next_tag += 1
         return tag
 
-    def set_root(self, space):
-        """
-        Initializes the root, baseline and mask spaces.
-
-        This function must be called before state attributes `root`,
-        `baseline` and `mask` could be used.
-
-        `space` (:class:`htsql.core.tr.space.RootSpace`)
-            A root scalar space.
-        """
-        assert isinstance(space, RootSpace)
-        # Check that the state spaces are not yet initialized.
-        assert self.root is None
-        assert self.baseline is None
-        assert self.superspace is None
-        self.root = space
-        self.baseline = space
-        self.superspace = space
-
-    def flush(self):
-        """
-        Clears the state spaces.
-        """
-        # Check that the state spaces are initialized and the space stacks
-        # are exhausted.
-        assert self.root is not None
-        assert not self.baseline_stack
-        assert self.baseline is self.root
-        self.root = None
-        self.baseline = None
-        self.superspace = None
-
     def push_baseline(self, baseline):
         """
         Sets a new baseline space.
         # tag, therefore we'd have to replace the tags and route tables
         # of the cached term node.
         with translate_guard(expression):
-            return compile(expression, self, baseline=baseline)
+            # If passed, assign new baseline and mask spaces.
+            if baseline is not None:
+                self.push_baseline(baseline)
+            # Realize and apply the `Compile` adapter.
+            term = Compile.__invoke__(expression, self)
+            # Restore old baseline and mask spaces.
+            if baseline is not None:
+                self.pop_baseline()
+            # Return the compiled term.
+            return term
 
     def inject(self, term, expressions):
         """
 
 class CompileQuery(Compile):
 
-    adapt(QueryExpr)
+    #adapt(QueryExpr)
 
     def __call__(self):
         # Initialize the all state spaces with a root scalar space.
 
 class CompileSegment(Compile):
 
-    adapt(SegmentCode)
+    adapt(SegmentExpr)
 
     def __call__(self):
         if not self.state.superspace.spans(self.expression.root):
                 duplicates.add(code)
 
         # List of expressions we need the term to export.
-        codes = ([self.expression.code] +
+        codes = (self.expression.codes +
                  [code for code, direction in order])
         idx = 0
         while idx+1 < len(chain):
                             kid.space, kid.baseline, kid.routes.copy())
         # Compile nested segments.
         subtrees = {}
-        for segment in self.expression.code.segments:
+        dependents = []
+        for segment in self.expression.dependents:
             if segment in subtrees:
-                continue
+                dependents.append(subtrees[segment])
             self.state.push_superspace(self.expression.root)
             self.state.push_superspace(self.expression.space)
             term = self.state.compile(segment)
             self.state.pop_superspace()
             self.state.pop_superspace()
             subtrees[segment] = term
+            dependents.append(term)
         # Construct keys for segment merging.
         superkeys = [code for code, direction in arrange(self.state.superspace,
                                                          with_strong=False)]
         keys = [code for code, direction in arrange(self.expression.space,
                                                     with_strong=False)]
         # Construct a segment term.
-        return SegmentTerm(self.state.tag(), kid, self.expression,
-                           superkeys, keys, subtrees,
+        return SegmentTerm(self.state.tag(), kid, self.expression.codes,
+                           superkeys, keys, dependents,
                            kid.space, kid.baseline, kid.routes.copy())
 
 
         return term
 
 
-def compile(expression, state=None, baseline=None):
-    """
-    Compiles a new term node for the given expression.
+def compile(segment):
+    state = CompilingState(RootSpace(None, segment.flow))
+    return state.compile(segment)
 
-    Returns a :class:`htsql.core.tr.term.Term` instance.
 
-    `expression` (:class:`htsql.core.tr.space.Expression`)
-        An expression node.
-
-    `state` (:class:`CompilingState` or ``None``)
-        The compiling state to use.  If not set, a new compiling state
-        is instantiated.
-
-    `baseline` (:class:`htsql.core.tr.space.Space` or ``None``)
-        The baseline space.  Specifies an axis that the compiled
-        term must export.  If not set, the current baseline space of
-        the state is used.
-    """
-    # Instantiate a new compiling state if not given one.
-    if state is None:
-        state = CompilingState()
-    # If passed, assign new baseline and mask spaces.
-    if baseline is not None:
-        state.push_baseline(baseline)
-    # Realize and apply the `Compile` adapter.
-    term = Compile.__invoke__(expression, state)
-    # Restore old baseline and mask spaces.
-    if baseline is not None:
-        state.pop_baseline()
-    # Return the compiled term.
-    return term
-
-

src/htsql/core/tr/dump.py

 from ..error import Error, translate_guard
 from ..syn.syntax import IdentifierSyntax, ApplySyntax, LiteralSyntax
 from .frame import (Clause, Frame, TableFrame, BranchFrame, NestedFrame,
-                    SegmentFrame, QueryFrame,
-                    Phrase, NullPhrase, CastPhrase, LiteralPhrase,
-                    ColumnPhrase, ReferencePhrase, EmbeddingPhrase,
-                    FormulaPhrase, Anchor, LeadingAnchor)
+        SegmentFrame, Phrase, NullPhrase, CastPhrase, LiteralPhrase,
+        ColumnPhrase, ReferencePhrase, EmbeddingPhrase, FormulaPhrase, Anchor,
+        LeadingAnchor)
 from .signature import (Signature, isformula, IsEqualSig, IsTotallyEqualSig,
                         IsInSig, IsNullSig, IfNullSig, NullIfSig, CompareSig,
                         AndSig, OrSig, NotSig, SortDirectionSig, RowNumberSig,
                         ToPredicateSig, FromPredicateSig, PlaceholderSig)
-from .pipe import SQLPipe, RecordPipe, ComposePipe, ProducePipe
+from .pipe import SQLPipe, RecordPipe, ComposePipe, ProducePipe, MixPipe
 from ..connect import unscramble
 import StringIO
 import re
         """
         # Realize and call the `Serialize` adapter.
         with translate_guard(clause):
-            return serialize(clause, self)
+            return Serialize.__invoke__(clause, self)
 
     def dump(self, clause):
         """
                                   " for a %r node" % self.clause)
 
 
-class SerializeQuery(Serialize):
-    """
-    Serializes an HTSQL query to an execution plan.
-    """
-
-    adapt(QueryFrame)
-
-    def __call__(self):
-        # When exists, serialize the query segment.
-        profile = self.clause.binding.profile
-        sql_pipe = self.state.serialize(self.clause.segment)
-        value_pipe = self.clause.value_pipe
-        pipe = ComposePipe(sql_pipe, value_pipe)
-        pipe = ProducePipe(profile, pipe, sql=self.state.sql)
-        return pipe
-
-
 class SerializeSegment(Serialize):
     """
     Serializes an HTSQL segment to SQL.
         # Retrieve and return the generated SQL.
         placeholders = self.state.placeholders
         sql = self.state.flush()
-        accumulated_sql = sql.splitlines()
         input_domains = None
         if placeholders:
             input_domains = []
                 input_domains.append(placeholders[index])
         output_domains = [phrase.domain for phrase in self.clause.select]
         pipe = SQLPipe(sql, input_domains, output_domains)
-        if self.clause.subtrees:
-            subpipes = []
-            for subframe in self.clause.subtrees:
-                subpipe = self.state.serialize(subframe)
-                subpipes.append(subpipe)
-                accumulated_sql.append(u"")
-                for line in self.state.sql.splitlines():
-                    if line:
-                        accumulated_sql.append(u"  "+line)
-                    else:
-                        accumulated_sql.append(u"")
-            pipe = RecordPipe([pipe]+subpipes)
-            pipe = ComposePipe(pipe, self.clause.mix_pipe)
-        self.state.sql = u"\n".join(accumulated_sql)+"\n"
+        if self.clause.dependents:
+            feeds = [pipe]
+            keys = [self.clause.key_pipe]
+            for subframe in self.clause.dependents:
+                feed = self.state.serialize(subframe)
+                feeds.append(feed)
+                keys.append(subframe.superkey_pipe)
+            pipe = RecordPipe(feeds)
+            mix_pipe = MixPipe(keys)
+            pipe = ComposePipe(pipe, mix_pipe)
         return pipe
 
     def aliasing(self, frame=None,
                     index=unicode(self.signature.index+1))
 
 
-def serialize(clause, state=None):
-    """
-    Translates a clause node to SQL.
+def serialize(clause):
+    state = SerializingState()
+    return state.serialize(clause)
 
-    `clause` (:class:`htsql.core.tr.frame.Clause`)
-        The clause to serialize.
 
-    `state` (:class:`SerializingState` or ``None``)
-        The serializing state to use.  If not set, a new serializing
-        state is instantiated.
-    """
-    # Create a new serializing state if necessary.
-    if state is None:
-        state = SerializingState()
-    # Realize and apply the `Serialize` adapter.
-    return Serialize.__invoke__(clause, state)
-
-

src/htsql/core/tr/encode.py

         OpaqueDomain)
 from ..error import Error, translate_guard
 from .coerce import coerce
-from .binding import WeakSegmentBinding
-from .flow import (Flow, QueryFlow, SegmentFlow, SelectionFlow, HomeFlow,
+from .flow import (Flow, CollectFlow, SelectionFlow, HomeFlow,
         RootFlow, TableFlow, ChainFlow, ColumnFlow, QuotientFlow, KernelFlow,
         ComplementFlow, IdentityFlow, LocateFlow, CoverFlow, ForkFlow,
         AttachFlow, ClipFlow, SieveFlow, SortFlow, CastFlow, RescopingFlow,
 from .space import (RootSpace, ScalarSpace, DirectTableSpace, FiberTableSpace,
         QuotientSpace, ComplementSpace, MonikerSpace, LocatorSpace,
         ForkedSpace, AttachSpace, ClippedSpace, FilteredSpace, OrderedSpace,
-        QueryExpr, SegmentCode, LiteralCode, FormulaCode, CastCode, RecordCode,
-        AnnihilatorCode, IdentityCode, ColumnUnit, ScalarUnit, KernelUnit)
+        SegmentExpr, LiteralCode, FormulaCode, CastCode, ColumnUnit,
+        ScalarUnit, KernelUnit)
 from .signature import Signature, IsNullSig, NullIfSig, IsEqualSig, AndSig
 import decimal
 
 
 class EncodingState(object):
-    """
-    Encapsulates the (mutable) state of the encoding process.
-
-    Currently encoding is a stateless process, but we will likely add
-    extra state in the future.  The state is also used to store the
-    cache of flow to space and flow to code translations.
-    """
-
-    # Indicates whether results of `encode` or `relate` are cached.
-    # Caching means that two calls of `encode` (or `relate`) on the
-    # same `Flow` instance produce the same object.
-    #
-    # By default, caching is turned on; however the translator must
-    # never rely on that.  That is, the result generated by the
-    # translator must not depend on whether caching is enabled or
-    # disabled.  This parameter gives us an easy way to check this
-    # assumption.  Different results usually mean a bug in comparison
-    # by value for code objects.
-    with_cache = True
 
     def __init__(self):
-        # A mapping of cached results of `encode()`.
         self.flow_to_code = {}
-        # A mapping of cached results of `relate()`.
         self.flow_to_space = {}
-
-    def flush(self):
-        """
-        Clears the encoding state.
-        """
-        self.flow_to_code.clear()
-        self.flow_to_state.clear()
+        self.flow_to_bundle = {}
 
     def encode(self, flow):
-        """
-        Encodes the given flow node to a code expression node.
-
-        Returns a :class:`htsql.core.tr.space.Code` node (in some cases,
-        a :class:`htsql.core.tr.space.Expression` node).
-
-        `flow` (:class:`htsql.core.tr.flow.Flow`)
-            The flow node to encode.
-        """
         # When caching is enabled, we check if `flow` was
         # already encoded.  If not, we encode it and save the
         # result.
         with translate_guard(flow):
-            if self.with_cache:
-                if flow not in self.flow_to_code:
-                    #FIXME: reduce recursion depth
-                    #code = encode(flow, self)
-                    code = Encode.__prepare__(flow, self)()
-                    self.flow_to_code[flow] = code
-                return self.flow_to_code[flow]
-            # Caching is disabled; return a new instance every time.
-            return encode(flow, self)
+            if flow not in self.flow_to_code:
+                code = Encode.__prepare__(flow, self)()
+                self.flow_to_code[flow] = code
+            return self.flow_to_code[flow]
 
     def relate(self, flow):
-        """
-        Encodes the given flow node to a space expression node.
+        with translate_guard(flow):
+            if flow not in self.flow_to_space:
+                space = Relate.__prepare__(flow, self)()
+                self.flow_to_space[flow] = space
+            return self.flow_to_space[flow]
 
-        Returns a :class:`htsql.core.tr.space.Space` node.
+    def unpack(self, flow):
+        with translate_guard(flow):
+            if flow not in self.flow_to_bundle:
+                bundle = Unpack.__prepare__(flow, self)()
+                self.flow_to_bundle[flow] = bundle
+            return self.flow_to_bundle[flow]
 
-        `flow` (:class:`htsql.core.tr.flow.Flow`)
-            The flow node to encode.
-        """
-        # When caching is enabled, we check if `flow` was
-        # already encoded.  If not, we encode it and save the
-        # result.
-        with translate_guard(flow):
-            if self.with_cache:
-                if flow not in self.flow_to_space:
-                    #FIXME: reduce recursion depth
-                    #space = relate(flow, self)
-                    space = Relate.__prepare__(flow, self)()
-                    self.flow_to_space[flow] = space
-                return self.flow_to_space[flow]
-            # Caching is disabled; return a new instance every time.
-            return relate(flow, self)
+
+class Bundle(object):
+
+    def __init__(self, codes, segments):
+        self.codes = codes
+        self.segments = segments
 
 
 class EncodeBase(Adapter):
     def __call__(self):
         # The default implementation generates an error.
         # FIXME: a better error message?
-        raise Error("Expected a flow expression")
+        #raise Error("Expected a flow expression")
+        return self.state.relate(self.flow.base)
 
 
-class EncodeQuery(Encode):
-
-    adapt(QueryFlow)
+class Unpack(EncodeBase):
 
     def __call__(self):
-        # Encode the segment node if it is provided.
-        segment = None
-        if self.flow.segment is not None:
-            segment = self.state.encode(self.flow.segment)
-        # Construct the expression node.
-        return QueryExpr(segment, self.flow)
+        code = self.state.encode(self.flow)
+        return Bundle([code], [])
 
 
-class EncodeSegment(Encode):
+class UnpackCollect(Unpack):
 
-    adapt(SegmentFlow)
+    adapt(CollectFlow)
 
     def __call__(self):
         root = self.state.relate(self.flow.base)
-        code = self.state.encode(self.flow.seed)
-        # List of all unit expressions.
-        units = code.units
-        # No units means a root scalar space.
-        if not units:
-            space = RootSpace(None, self.flow)
-        # Otherwise, find a dominating unit space.
+        if coerce(self.flow.seed.domain) is not None:
+            bundle = None
+            code = self.state.encode(self.flow.seed)
+            units = code.units
+            space = None
+        elif isinstance(self.flow.seed.domain, RecordDomain):
+            bundle = self.state.unpack(self.flow.seed)
+            space = self.state.relate(self.flow.seed)
+            units = None
         else:
-            # List of dominating spaces.
-            spaces = []
-            for unit in units:
-                if any(space.dominates(unit.space) for space in spaces):
-                    continue
-                spaces = [space for space in spaces
-                              if not unit.space.dominates(space)]
-                spaces.append(unit.space)
-            # More than one dominating space means the output space
-            # cannot be inferred from the columns unambiguously.
-            if len(spaces) > 1:
-                raise Error("Cannot deduce an unambiguous segment flow")
-            # Otherwise, `spaces` contains a single maximal space node.
+            bundle = self.state.unpack(self.flow.seed)
+            units = [unit for code in bundle.codes
+                          for unit in code.units]
+            space = None
+        if space is None:
+            if not units:
+                space = RootSpace(None, self.flow)
             else:
-                [space] = spaces
+                spaces = []
+                for unit in units:
+                    if any(space.dominates(unit.space) for space in spaces):
+                        continue
+                    spaces = [space for space in spaces
+                                  if not unit.space.dominates(space)]
+                    spaces.append(unit.space)
+                if len(spaces) > 1:
+                    raise Error("Cannot deduce an unambiguous segment flow")
+                else:
+                    [space] = spaces
         if not space.spans(root):
             raise Error("Expected a descendant segment flow")
-        if (isinstance(code, LiteralCode) and
-                isinstance(code.domain, UntypedDomain)):
-            if code.value is None:
-                filter = LiteralCode(False, coerce(BooleanDomain()),
-                                     code.flow)
+        if bundle is None:
+            if (isinstance(code, LiteralCode) and
+                    isinstance(code.domain, UntypedDomain)):
+                if code.value is None:
+                    filter = LiteralCode(False, coerce(BooleanDomain()),
+                                         code.flow)
+                    space = FilteredSpace(space, filter, space.flow)
+            else:
+                filter = FormulaCode(IsNullSig(-1), coerce(BooleanDomain()),
+                                     code.flow, op=code)
                 space = FilteredSpace(space, filter, space.flow)
-        elif coerce(code.domain) is not None:
-            filter = FormulaCode(IsNullSig(-1), coerce(BooleanDomain()),
-                                 code.flow, op=code)
-            space = FilteredSpace(space, filter, space.flow)
-        if isinstance(self.flow.binding, WeakSegmentBinding):
-            if not root.spans(space):
-                raise Error("Expected a singular expression")
-        return SegmentCode(root, space, code, self.flow)
+        if bundle is not None:
+            codes = bundle.codes
+            dependents = bundle.segments
+        else:
+            codes = [code]
+            dependents = []
+        segment = SegmentExpr(root, space, codes, dependents, self.flow)
+        return Bundle([], [segment])
 
 
 class RelateRoot(Relate):
         return EncodeBySignature.__prepare__(self.flow, self.state)()
 
 
-class RelateFormula(Relate):
-
-    adapt(FormulaFlow)
-
-    def __call__(self):
-        # Delegate the translation to the `RelateBySignature` adapter.
-        return RelateBySignature.__prepare__(self.flow, self.state)()
-
-
 class EncodeBySignatureBase(Adapter):
     """
     Translates a formula node.
                            **arguments)
 
 
-class RelateBySignature(EncodeBySignatureBase):
-    """
-    Translates a formula flow to a space node.
-
-    This is an auxiliary adapter used to relate
-    class:`htsql.core.tr.flow.FormulaFlow` nodes.  The adapter is
-    polymorphic on the formula signature.
-
-    Unless overridden, the adapter generates an error.
-    """
-
-    def __call__(self):
-        # Override in subclasses for formulas that generate space nodes.
-        raise Error("a flow expression is expected")
-
-
-class EncodeSelection(Encode):
+class UnpackSelection(Unpack):
 
     adapt(SelectionFlow)
 
     def __call__(self):
+        codes = []
+        segments = []
         space = self.state.relate(self.flow)
-        fields = [self.state.encode(element)
-                  for element in self.flow.elements]
-        code = RecordCode(fields, self.flow.domain, self.flow)
-        unit = ScalarUnit(code, space, self.flow)
         indicator = LiteralCode(True, coerce(BooleanDomain()), self.flow)
         indicator = ScalarUnit(indicator, space, self.flow)
-        return AnnihilatorCode(unit, indicator, self.flow)
+        codes.append(indicator)
+        for element in self.flow.elements:
+            bundle = self.state.unpack(element)
+            codes.extend(bundle.codes)
+            segments.extend(bundle.segments)
+        return Bundle(codes, segments)
 
 
 class RelateSelection(Relate):
 
-    adapt(SelectionFlow)
+    adapt_many(SelectionFlow,
+               IdentityFlow)
 
     def __call__(self):
         return self.state.relate(self.flow.base)
 
 
-class EncodeIdentity(Encode):
+class UnpackIdentity(Unpack):
 
     adapt(IdentityFlow)
 
     def __call__(self):
-        space = self.state.relate(self.flow.base)
-        fields = [self.state.encode(element)
-                  for element in self.flow.elements]
-        code = IdentityCode(fields, self.flow)
-        unit = ScalarUnit(code, space, self.flow)
+        codes = []
+        segments = []
+        space = self.state.relate(self.flow)
         indicator = LiteralCode(True, coerce(BooleanDomain()), self.flow)
         indicator = ScalarUnit(indicator, space, self.flow)
-        return AnnihilatorCode(unit, indicator, self.flow)
+        codes.append(indicator)
+        for element in self.flow.elements:
+            bundle = self.state.unpack(element)
+            codes.extend(bundle.codes)
+            segments.extend(bundle.segments)
+        return Bundle(codes, segments)
 
 
 def encode(flow, state=None):
     return Relate.__invoke__(flow, state)
 
 
+def encode(flow):
+    state = EncodingState()
+    bundle = state.unpack(flow)
+    if len(bundle.codes) == 0 and len(bundle.segments) == 1:
+        [segment] = bundle.segments
+    else:
+        root = RootSpace(None, flow)
+        space = state.relate(flow)
+        if not root.spans(space) or \
+            any(not space.spans(unit.space) for code in bundle.codes
+                                            for unit in code.units):
+            with translate_guard(flow):
+                raise Error("Expected a singular expression")
+        segment = SegmentExpr(root, space, bundle.codes, bundle.segments, flow)
+    return segment
+
+

src/htsql/core/tr/flow.py

         self.offset = offset
 
 
-class QueryFlow(Flow):
-
-    def __init__(self, base, segment, profile, binding):
-        assert isinstance(base, RootFlow)
-        assert isinstance(segment, maybe(SegmentFlow))
-        assert isinstance(profile, Profile)
-        super(QueryFlow, self).__init__(base, VoidDomain(), binding)
-        self.segment = segment
-        self.profile = profile
-
-
-class SegmentFlow(Flow):
+class CollectFlow(Flow):
 
     def __init__(self, base, seed, domain, binding):
         assert isinstance(base, Flow)
         assert isinstance(seed, Flow)
-        super(SegmentFlow, self).__init__(base, domain, binding)
+        super(CollectFlow, self).__init__(base, domain, binding)
         self.seed = seed
 
 

src/htsql/core/tr/fn/assemble.py

                                **arguments)
         if self.is_predicate:
             phrase = self.state.from_predicate(phrase)
-        yield phrase
+        return phrase
 
 
 class EvaluateWrapExists(EvaluateFunction):
                                self.code, predicates=predicates,
                                consequents=consequents,
                                alternative=alternative)
-        yield phrase
+        return phrase
 
 
 class EvaluateSwitch(EvaluateFunction):
             is_nullable = True
         phrase = FormulaPhrase(self.signature, self.domain, is_nullable,
                                self.code, **arguments)
-        yield phrase
+        return phrase
 
 

src/htsql/core/tr/fn/bind.py

         FormulaBinding, CastBinding, ImplicitCastBinding, WrappingBinding,
         TitleBinding, DirectionBinding, QuotientBinding, AssignmentBinding,
         DefineBinding, DefineReferenceBinding, SelectionBinding, HomeBinding,
-        RescopingBinding, CoverBinding, ForkBinding, ClipBinding,
-        SegmentBinding, QueryBinding, AliasBinding, Binding, BindingRecipe,
-        ComplementRecipe, KernelRecipe, SubstitutionRecipe, ClosedRecipe)
+        RescopingBinding, CoverBinding, ForkBinding, ClipBinding, AliasBinding,
+        Binding, BindingRecipe, ComplementRecipe, KernelRecipe,
+        SubstitutionRecipe, ClosedRecipe)
 from ..bind import BindByName, BindingState
 from ...error import Error, translate_guard
 from ..coerce import coerce

src/htsql/core/tr/frame.py

 from ..error import point
 from .coerce import coerce
 from .space import Expression
-from .term import Term, QueryTerm
+from .term import Term
 from .pipe import Pipe
 from .signature import Signature, Bag, Formula
 
 
     def __init__(self, include, embed, select, where,
                  group, having, order, limit, offset,
-                 mix_pipe, subtrees, term):
-        assert isinstance(mix_pipe, maybe(Pipe))
-        assert isinstance(subtrees, listof(SegmentFrame))
-        assert bool(mix_pipe) == bool(subtrees)
+                 code_pipes, dependent_pipes, superkey_pipe, key_pipe,
+                 dependents, term):
+        assert isinstance(code_pipes, listof(Pipe))
+        assert isinstance(dependent_pipes, listof(Pipe))
+        assert isinstance(superkey_pipe, Pipe)
+        assert isinstance(key_pipe, Pipe)
+        assert isinstance(dependents, listof(SegmentFrame))
         super(SegmentFrame, self).__init__(include, embed, select, where,
                                            group, having, order, limit, offset,
                                            term)
-        self.mix_pipe = mix_pipe
-        self.subtrees = subtrees
+        self.code_pipes = code_pipes
+        self.dependent_pipes = dependent_pipes
+        self.superkey_pipe = superkey_pipe
+        self.key_pipe = key_pipe
+        self.dependents = dependents
 
 
 class Anchor(Clause):
                                             is_left, is_right)
 
 
-class QueryFrame(Clause):
-    """
-    Represents the whole HTSQL query.
-
-    `segment` (:class:`SegmentFrame` or ``None``)
-        The query segment.
-    """
-
-    def __init__(self, segment, value_pipe, term):
-        assert isinstance(segment, maybe(SegmentFrame))
-        assert isinstance(value_pipe, maybe(Pipe))
-        assert isinstance(term, QueryTerm)
-        super(QueryFrame, self).__init__(term.expression)
-        self.segment = segment
-        self.value_pipe = value_pipe
-        self.term = term
-
-
 class Phrase(Clause):
     """
     Represents a SQL expression.

src/htsql/core/tr/lookup.py

 from ..syn.syntax import IdentifierSyntax
 from ..error import point
 from .binding import (Binding, ScopeBinding, ChainingBinding, WrappingBinding,
-        DecorateBinding, SegmentBinding, HomeBinding, RootBinding,
+        DecorateBinding, CollectBinding, HomeBinding, RootBinding,
         TableBinding, ChainBinding, ColumnBinding, QuotientBinding,
         ComplementBinding, CoverBinding, ForkBinding, AttachBinding,
         ClipBinding, LocateBinding, RescopingBinding, DefineBinding,
         return lookup(self.binding.base, self.probe)
 
 
-class GuessFromSegment(Lookup):
+class GuessFromCollect(Lookup):
 
-    adapt_many((SegmentBinding, GuessTagProbe),
-               (SegmentBinding, GuessHeaderProbe),
-               (SegmentBinding, GuessPathProbe))
+    adapt_many((CollectBinding, GuessTagProbe),
+               (CollectBinding, GuessHeaderProbe),
+               (CollectBinding, GuessPathProbe))
 
     def __call__(self):
         return lookup(self.binding.seed, self.probe)
 
 
-class UnwrapSegment(Lookup):
+class UnwrapCollect(Lookup):
 
-    adapt(SegmentBinding, UnwrapProbe)
+    adapt(CollectBinding, UnwrapProbe)
 
     def __call__(self):
         if isinstance(self.binding, self.probe.binding_class):

src/htsql/core/tr/pack.py

+#
+# Copyright (c) 2006-2013, Prometheus Research, LLC
+#
+
+
+from ..adapter import Adapter, adapt
+from ..domain import Record, ID
+from .flow import Flow, CollectFlow, SelectionFlow, IdentityFlow
+from .pipe import (RecordPipe, AnnihilatePipe, IteratePipe, ComposePipe,
+        ValuePipe, SinglePipe)
+
+
+class PackingState(object):
+
+    def __init__(self, segment, name):
+        self.segment_stack = []
+        self.code_pipes_stack = []
+        self.dependent_pipes_stack = []
+        self.segment = segment
+        self.code_pipes = segment.code_pipes[:]
+        self.dependent_pipes = segment.dependent_pipes[:]
+        self.name = name
+        self.name_stack = []
+        self.is_top = True
+
+    def push_name(self, name):
+        self.name_stack.append(self.name)
+        self.name = name
+
+    def pop_name(self):
+        self.name = self.name_stack.pop()
+
+    def descend(self, index):
+        self.segment_stack.append(self.segment)
+        self.code_pipes_stack.append(self.code_pipes)
+        self.dependent_pipes_stack.append(self.dependent_pipes)
+        self.segment = self.segment.dependents[index]
+        self.code_pipes = self.segment.code_pipes[:]
+        self.dependent_pipes = self.segment.dependent_pipes[:]
+
+    def ascend(self):
+        self.segment = self.segment_stack.pop()
+        self.code_pipes = self.code_pipes_stack.pop()
+        self.dependent_pipes = self.dependent_pipes_stack.pop()
+
+    def pull_code(self):
+        return self.code_pipes.pop(0)
+
+    def pull_dependent(self):
+        return self.dependent_pipes.pop(0)
+
+    def pack(self, flow):
+        if not isinstance(flow, CollectFlow):
+            self.is_top = False
+        return Pack.__invoke__(flow, self)
+
+
+class Pack(Adapter):
+
+    adapt(Flow)
+
+    def __init__(self, flow, state):
+        self.flow = flow
+        self.state = state
+
+    def __call__(self):
+        return self.state.pull_code()
+
+
+class PackCollect(Pack):
+
+    adapt(CollectFlow)
+
+    def __call__(self):
+        if not self.state.is_top:
+            dependent_pipe = self.state.pull_dependent()
+            self.state.descend(dependent_pipe.index)
+            pipe = self.state.pack(self.flow.seed)
+            pipe = IteratePipe(pipe)
+            pipe = ComposePipe(dependent_pipe, pipe)
+            self.state.ascend()
+        else:
+            self.state.is_top = False
+            pipe = self.state.pack(self.flow.seed)
+            pipe = IteratePipe(pipe)
+        return pipe
+
+
+class PackSelection(Pack):
+
+    adapt(SelectionFlow)
+
+    def __call__(self):
+        test = self.state.pull_code()
+        field_pipes = []
+        field_names = []
+        for field, profile in zip(self.flow.elements,
+                                  self.flow.domain.fields):
+            field_names.append(profile.tag)
+            self.state.push_name(profile.tag)
+            field_pipe = self.state.pack(field)
+            self.state.pop_name()
+            field_pipes.append(field_pipe)
+        record_class = Record.make(self.state.name, field_names)
+        pipe = RecordPipe(field_pipes, record_class)
+        if isinstance(test, ValuePipe) and test.data is True:
+            return pipe
+        return AnnihilatePipe(test, pipe)
+
+
+class PackIdentity(Pack):
+
+    adapt(IdentityFlow)
+
+    def __call__(self):
+        test = self.state.pull_code()
+        field_pipes = []
+        for field in self.flow.elements:
+            field_pipe = self.state.pack(field)
+            field_pipes.append(field_pipe)
+        id_class = ID.make(self.flow.domain.dump)
+        pipe = RecordPipe(field_pipes, id_class)
+        if isinstance(test, ValuePipe) and test.data is True:
+            return pipe
+        return AnnihilatePipe(test, pipe)
+
+
+def pack(flow, segment, name):
+    state = PackingState(segment, name)
+    pipe = state.pack(flow)
+    if not isinstance(flow, CollectFlow):
+        pipe = IteratePipe(pipe)
+        pipe = ComposePipe(pipe, SinglePipe())
+    return pipe
+
+

src/htsql/core/tr/plan.py

-#
-# Copyright (c) 2006-2013, Prometheus Research, LLC
-#
-
-
-"""
-:mod:`htsql.core.tr.plan`
-=========================
-
-This module declares a SQL execution plan.
-"""
-
-
-from ..util import maybe, listof, Printable
-from ..domain import Profile, Domain
-
-
-class Statement(Printable):
-
-    def __init__(self, sql, domains, substatements,
-                 placeholders=None, is_single=False):
-        assert isinstance(sql, unicode)
-        assert isinstance(domains, listof(Domain))
-        assert isinstance(substatements, listof(Statement))
-        self.sql = sql
-        self.domains = domains
-        self.substatements = substatements
-        self.placeholders = placeholders
-        self.is_single = is_single
-
-
-class Plan(Printable):
-
-    def __init__(self, profile, statement, compose):
-        assert isinstance(profile, Profile)
-        assert isinstance(statement, maybe(Statement))
-        self.profile = profile
-        self.statement = statement
-        self.compose = compose
-
-

src/htsql/core/tr/reduce.py

 #
 
 
-"""
-:mod:`htsql.core.tr.reduce`
-===========================
-
-This module implements the reducing process.
-"""
-
-
 from ..adapter import Adapter, adapt
 from ..domain import BooleanDomain, TextDomain, IntegerDomain
 from .coerce import coerce
 from .stitch import arrange
 from .term import PermanentTerm
 from .frame import (Clause, Frame, ScalarFrame, BranchFrame, NestedFrame,
-                    QueryFrame, SegmentFrame, Phrase, LiteralPhrase, NullPhrase,
-                    TruePhrase, FalsePhrase, CastPhrase, FormulaPhrase,
-                    ExportPhrase, ReferencePhrase, Anchor, LeadingAnchor)
+        SegmentFrame, Phrase, LiteralPhrase, NullPhrase, TruePhrase,
+        FalsePhrase, CastPhrase, FormulaPhrase, ExportPhrase, ReferencePhrase,
+        Anchor, LeadingAnchor)
 from .signature import (Signature, isformula, IsEqualSig, IsTotallyEqualSig,
-                        IsInSig, IsNullSig, IfNullSig, NullIfSig,
-                        AndSig, OrSig, NotSig, SortDirectionSig,
-                        FromPredicateSig, ToPredicateSig)
+        IsInSig, IsNullSig, IfNullSig, NullIfSig, AndSig, OrSig, NotSig,
+        SortDirectionSig, FromPredicateSig, ToPredicateSig)
 
 
 class ReducingState(object):
         # `SELECT` clauses of collapsed frames by the frame tag.