Commits

Kirill Simonov  committed c6b3a74

Refactored the plan tree for the SQL translator.

  • Participants
  • Parent commits 6c65964

Comments (0)

Files changed (11)

File src/htsql/core/cmd/fetch.py

         if isinstance(self.action, SafeProduceAction):
             cut = self.action.cut
         pipe = translate(self.command.syntax, self.action.environment, cut)
-        return pipe()
+        output = pipe()(None)
+        return output
 
 
 class AnalyzeFetch(Act):
 
     def __call__(self):
         pipe = translate(self.command.syntax, self.action.environment)
-        return pipe.plan
+        return pipe
 
 
 class ProduceSkip(Act):
     adapt(SQLCmd, RenderAction)
 
     def __call__(self):
-        plan = analyze(self.command.feed)
+        pipe = analyze(self.command.feed)
         status = '200 OK'
         headers = [('Content-Type', 'text/plain; charset=UTF-8')]
         body = []
-        if plan.statement:
-            queue = [plan.statement]
-            while queue:
-                statement = queue.pop(0)
-                if body:
-                    body.append("\n")
-                body.append(statement.sql.encode('utf-8'))
-                queue.extend(statement.substatements)
+        if 'sql' in pipe.properties:
+            body.append(pipe.properties['sql'].encode('utf-8'))
         return (status, headers, body)
 
 

File src/htsql/core/fmt/text.py

         DecimalDomain, FloatDomain, TextDomain, EnumDomain, DateDomain,
         TimeDomain, DateTimeDomain, ListDomain, RecordDomain, UntypedDomain,
         VoidDomain, OpaqueDomain, Profile)
+from ..tr.pipe import SQLPipe
 import re
 import decimal
 import datetime
                 line.append(u" :\n")
             yield u"".join(line)
         yield u"\n"
-        if (addon.debug and self.meta.plan is not None and
-                self.meta.plan.statement is not None):
+        if addon.debug and (self.meta.syntax or hasattr(self.product, 'sql')):
             yield u" ----\n"
             if self.meta.syntax:
                 yield u" %s\n" % self.meta.syntax
-            queue = [(0, self.meta.plan.statement)]
-            while queue:
-                depth, statement = queue.pop(0)
+            if hasattr(self.product, 'sql'):
                 sql = re.sub(ur'[\0-\x09\x0b-\x1f\x7f]', u'\ufffd',
-                             statement.sql)
-                if depth:
-                    yield u"\n"
+                             self.product.sql)
                 for line in sql.splitlines():
-                    yield u" "*(depth*2+1) + u"%s\n" % line
-                for substatement in statement.substatements:
-                    queue.append((depth+1, substatement))
+                    if line:
+                        yield u" %s\n" % line
+                    else:
+                        yield u"\n"
 
 
 class ToText(Adapter):

File src/htsql/core/tr/assemble.py

 from .frame import (ScalarFrame, TableFrame, NestedFrame, SegmentFrame,
         QueryFrame, LiteralPhrase, TruePhrase, CastPhrase, ColumnPhrase,
         ReferencePhrase, EmbeddingPhrase, FormulaPhrase, Anchor, LeadingAnchor)
+from .pipe import (ValuePipe, ExtractPipe, RecordPipe, MixPipe, ComposePipe,
+        AnnihilatePipe, IteratePipe, SinglePipe)
 from .signature import (Signature, IsEqualSig, IsTotallyEqualSig, IsInSig,
         IsNullSig, NullIfSig, IfNullSig, CompareSig, AndSig, OrSig, NotSig,
         SortDirectionSig, ToPredicateSig, FromPredicateSig)
     def pop_segment(self):
         self.segment = self.segment_stack.pop()
 
+    def reset_shift(self, term):
+        self.shift_by_term[term] = 0
+
     def save_subterms(self, term, subterms):
         assert isinstance(term, maybe(SegmentTerm))
         assert isinstance(subterms, listof(SegmentTerm))
         assert isinstance(stencils, listof(listof(int))) and len(stencils) == 3
         self.stencils_by_term[term] = stencils
 
-    def get_code_stencil(self):
-        return self.stencils_by_term[self.segment][0]
+    def get_code_stencil(self, term=None):
+        if term is None:
+            term = self.segment
+        return self.stencils_by_term[term][0]
 
-    def get_superkey_stencil(self):
-        return self.stencils_by_term[self.segment][1]
+    def get_superkey_stencil(self, term=None):
+        if term is None:
+            term = self.segment
+        return self.stencils_by_term[term][1]
 
-    def get_key_stencil(self):
-        return self.stencils_by_term[self.segment][2]
+    def get_key_stencil(self, term=None):
+        if term is None:
+            term = self.segment
+        return self.stencils_by_term[term][2]
 
     def get_next_index(self):
         stencil = self.get_code_stencil()
         self.shift_by_term[self.segment] += 1
         return index
 
-    def decompose(self, code):
-        return Decompose.__invoke__(code, self)
+    def compose(self, code):
+        return Compose.__invoke__(code, self)
 
     def set_tree(self, term):
         """
         return select
 
     def assemble_subtrees(self):
+        subterms = []
         subtrees = []
+        locations = []
         duplicates = set()
         for code in self.term.code.code.segments:
             if code in duplicates:
             duplicates.add(code)
             subterm = self.term.subtrees[code]
             location = len(subtrees)
+            subterms.append(subterm)
             self.state.set_tree(subterm)
-            self.state.save_location(subterm, location)
+            locations.append((subterm, location))
             subframe = self.state.assemble(subterm)
             subtrees.append(subframe)
-        self.state.save_subterms(self.term,
-                                 [frame.term for frame in subtrees])
+        for subterm, location in locations:
+            location -= len(locations)
+            self.state.save_location(subterm, location)
+        self.state.save_subterms(self.term, subterms)
         return subtrees
 
     def assemble_frame(self, include, embed, select,
                        where, group, having,
                        order, limit, offset):
         subtrees = self.assemble_subtrees()
+        mix_pipe = None
+        if subtrees:
+            key_pipes = []
+            stencils = []
+            stencils.append(self.state.get_key_stencil(self.term))
+            for subframe in subtrees:
+                stencils.append(self.state.get_superkey_stencil(subframe.term))
+            for stencil in stencils:
+                if len(stencil) == 0:
+                    key_pipe = ValuePipe(True)
+                elif len(stencil) == 1:
+                    key_pipe =  ExtractPipe(stencil[0])
+                else:
+                    key_pipe = RecordPipe([ExtractPipe(index)
+                                           for index in stencil])
+                key_pipes.append(key_pipe)
+            mix_pipe = MixPipe(key_pipes)
         return SegmentFrame(include, embed, select,
                            where, group, having,
                            order, limit, offset,
-                           subtrees, self.term)
+                           mix_pipe, subtrees, self.term)
 
 
 class AssembleQuery(Assemble):
     def __call__(self):
         # Compile the segment frame.
         segment = None
-        compose = None
+        value_pipe = None
         if self.term.segment is not None:
             # Initialize the state.
             self.state.set_tree(self.term.segment)
             self.state.save_location(self.term.segment, None)
             self.state.save_subterms(None, [self.term.segment])
+            self.state.push_name(self.term.binding.profile.tag)
             # Compile the segment.
             segment = self.state.assemble(self.term.segment)
             # Generate the compositor.
-            self.state.push_name(self.term.binding.profile.tag)
-            compose = self.state.decompose(self.term.segment.code)
+            value_pipe = self.state.compose(self.term.segment.code)
             # Clean up the state.
             self.state.flush()
         # Generate a frame node.
-        return QueryFrame(segment, compose, self.term)
+        return QueryFrame(segment, value_pipe, self.term)
 
 
 class Evaluate(Adapter):
         return self.state.phrases_by_claim[claim]
 
 
-class Decompose(Adapter):
+class Compose(Adapter):
 
     adapt(Code)
 
 
     def __call__(self):
         index = self.state.get_next_index()
-        def compose_value(row, stream, index=index):
-            return row[index]
-        return compose_value
+        return ExtractPipe(index)
 
 
-class DecomposeSegment(Decompose):
+class ComposeSegment(Compose):
 
     adapt(SegmentCode)
 
     def __call__(self):
-        if self.state.segment is None:
-            self.state.push_segment(self.code)
-            compose_code = self.state.decompose(self.code.code)
-            self.state.pop_segment()
-            is_single = (isinstance(self.code.binding, WeakSegmentBinding))
-            if not is_single:
-                def compose_root_segment(row, stream,
-                                         compose_code=compose_code):
-                    items = []
-                    for row in stream:
-                        items.append(compose_code(row, stream))
-                    return items
-                return compose_root_segment
-            else:
-                def compose_root_value(row, stream,
-                                       compose_code=compose_code):
-                    items = []
-                    for row in stream:
-                        items.append(compose_code(row, stream))
-                    assert len(items) <= 1
-                    if items:
-                        return items[0]
-                    else:
-                        return None
-                return compose_root_value
-        else:
-            key_stencil = self.state.get_key_stencil()
-            self.state.push_segment(self.code)
-            location = self.state.get_location()
-            superkey_stencil = self.state.get_superkey_stencil()
-            compose_code = self.state.decompose(self.code.code)
-            self.state.pop_segment()
-            def compose_nested_segment(row, stream, compose_code=compose_code,
-                                       location=location,
-                                       key_stencil=key_stencil,
-                                       superkey_stencil=superkey_stencil):
-                items = []
-                key = stream.get(key_stencil)
-                substream = stream.substreams[location]
-                for row in substream.slice(superkey_stencil, key):
-                    items.append(compose_code(row, substream))
-                return items
-            return compose_nested_segment
+        self.state.push_segment(self.code)
+        location = self.state.get_location()
+        pipe = self.state.compose(self.code.code)
+        pipe = IteratePipe(pipe)
+        self.state.pop_segment()
+        is_single = (isinstance(self.code.binding, WeakSegmentBinding))
+        if is_single:
+            pipe = ComposePipe(pipe, SinglePipe())
+        if location is not None:
+            pick_pipe = ExtractPipe(location)
+            pipe = ComposePipe(pick_pipe, pipe)
+        return pipe
 
 
-class DecomposeCompound(Decompose):
+class ComposeCompound(Compose):
 
     adapt(CompoundUnit)
 
     def __call__(self):
-        return self.state.decompose(self.code.code)
+        return self.state.compose(self.code.code)
 
 
-class DecomposeLiteral(Decompose):
+class ComposeLiteral(Compose):
 
     adapt(LiteralCode)
 
     def __call__(self):
         if not isinstance(self.code.domain, UntypedDomain):
-            return super(DecomposeLiteral, self).__call__()
-        def compose_untyped(row, stream, value=self.code.value):
-            return value
-        return compose_untyped
+            return super(ComposeLiteral, self).__call__()
+        return ValuePipe(self.code.value)
 
 
-class DecomposeRecord(Decompose):
+class ComposeRecord(Compose):
 
     adapt(RecordCode)
 
     def __call__(self):
-        compose_fields = []
+        field_pipes = []
         field_names = []
         for field, profile in zip(self.code.fields,
                                   self.code.domain.fields):
             field_names.append(profile.tag)
             self.state.push_name(profile.tag)
-            compose_field = self.state.decompose(field)
+            field_pipe = self.state.compose(field)
             self.state.pop_name()
-            compose_fields.append(compose_field)
+            field_pipes.append(field_pipe)
         record_class = Record.make(self.state.name, field_names)
-        def compose_record(row, stream, record_class=record_class,
-                           compose_fields=compose_fields):
-            return record_class(compose_field(row, stream)
-                                for compose_field in compose_fields)
-        return compose_record
+        return RecordPipe(field_pipes, record_class)
 
 
-class DecomposeIdentity(Decompose):
+class ComposeIdentity(Compose):
 
     adapt(IdentityCode)
 
     def __call__(self):
-        compose_fields = []
+        field_pipes = []
         for field in self.code.fields:
-            compose_field = self.state.decompose(field)
-            compose_fields.append(compose_field)
-        # FIXME: a reference leak?
+            field_pipe = self.state.compose(field)
+            field_pipes.append(field_pipe)
         id_class = ID.make(self.code.domain.dump)
-        def compose_identity(row, stream, compose_fields=compose_fields,
-                             id_class=id_class):
-            return id_class(compose_field(row, stream)
-                            for compose_field in compose_fields)
-        return compose_identity
+        return RecordPipe(field_pipes, id_class)
 
 
-class DecomposeAnnihilator(Decompose):
+class ComposeAnnihilator(Compose):
 
     adapt(AnnihilatorCode)
 
     def __call__(self):
-        compose_indicator = self.state.decompose(self.code.indicator)
-        compose = self.state.decompose(self.code.code)
-        def compose_nullable(row, stream,
-                             compose_indicator=compose_indicator,
-                             compose=compose):
-            value = compose(row, stream)
-            if compose_indicator(row, stream) is None:
-                return None
-            return value
-        return compose_nullable
+        test_pipe = self.state.compose(self.code.indicator)
+        pipe = self.state.compose(self.code.code)
+        return AnnihilatePipe(test_pipe, pipe)
 
 
 def assemble(term, state=None):

File src/htsql/core/tr/dump.py

                         IsInSig, IsNullSig, IfNullSig, NullIfSig, CompareSig,
                         AndSig, OrSig, NotSig, SortDirectionSig, RowNumberSig,
                         ToPredicateSig, FromPredicateSig, PlaceholderSig)
-from .plan import Plan, Statement
+from .pipe import SQLPipe, RecordPipe, ComposePipe, ProducePipe
+from ..connect import unscramble
 import StringIO
 import re
 import math
         # The active serializing hints and directives.
         self.hook = None
         self.placeholders = {}
+        self.sql = None
 
     def set_tree(self, frame):
         """
     def __call__(self):
         # When exists, serialize the query segment.
         profile = self.clause.binding.profile
-        compose = self.clause.compose
-        statement = None
-        if self.clause.segment is not None:
-            statement = self.state.serialize(self.clause.segment)
-
-        # Produce an execution plan.
-        return Plan(profile, statement, compose)
+        sql_pipe = self.state.serialize(self.clause.segment)
+        value_pipe = self.clause.value_pipe
+        pipe = ComposePipe(sql_pipe, value_pipe)
+        pipe = ProducePipe(profile, pipe, sql=self.state.sql)
+        return pipe
 
 
 class SerializeSegment(Serialize):
         # Retrieve and return the generated SQL.
         placeholders = self.state.placeholders
         sql = self.state.flush()
-        domains = [phrase.domain for phrase in self.clause.select]
-        substatements = [self.state.serialize(subframe)
-                         for subframe in self.clause.subtrees]
-        return Statement(sql, domains, substatements, placeholders)
+        accumulated_sql = sql.splitlines()
+        input_domains = None
+        if placeholders:
+            input_domains = []
+            for index in sorted(placeholders):
+                input_domains.append(placeholders[index])
+        output_domains = [phrase.domain for phrase in self.clause.select]
+        pipe = SQLPipe(sql, input_domains, output_domains)
+        if self.clause.subtrees:
+            subpipes = []
+            for subframe in self.clause.subtrees:
+                subpipe = self.state.serialize(subframe)
+                subpipes.append(subpipe)
+                accumulated_sql.append(u"")
+                for line in self.state.sql.splitlines():
+                    if line:
+                        accumulated_sql.append(u"  "+line)
+                    else:
+                        accumulated_sql.append(u"")
+            pipe = RecordPipe([pipe]+subpipes)
+            pipe = ComposePipe(pipe, self.clause.mix_pipe)
+        self.state.sql = u"\n".join(accumulated_sql)+"\n"
+        return pipe
 
     def aliasing(self, frame=None,
                  taken_select_aliases=None,
     def __call__(self):
         super(DumpSegment, self).__call__()
         # FIXME: add a semicolon?
-        # Make sure the statement ends with a new line.
-        self.newline()
+        ## Make sure the statement ends with a new line.
+        #self.newline()
 
 
 class DumpLeadingAnchor(Dump):

File src/htsql/core/tr/frame.py

 from .coerce import coerce
 from .space import Expression
 from .term import Term, QueryTerm
+from .pipe import Pipe
 from .signature import Signature, Bag, Formula
 
 
 
     def __init__(self, include, embed, select, where,
                  group, having, order, limit, offset,
-                 subtrees, term):
+                 mix_pipe, subtrees, term):
+        assert isinstance(mix_pipe, maybe(Pipe))
         assert isinstance(subtrees, listof(SegmentFrame))
+        assert bool(mix_pipe) == bool(subtrees)
         super(SegmentFrame, self).__init__(include, embed, select, where,
                                            group, having, order, limit, offset,
                                            term)
+        self.mix_pipe = mix_pipe
         self.subtrees = subtrees
 
 
         The query segment.
     """
 
-    def __init__(self, segment, compose, term):
+    def __init__(self, segment, value_pipe, term):
         assert isinstance(segment, maybe(SegmentFrame))
+        assert isinstance(value_pipe, maybe(Pipe))
         assert isinstance(term, QueryTerm)
         super(QueryFrame, self).__init__(term.expression)
         self.segment = segment
-        self.compose = compose
+        self.value_pipe = value_pipe
         self.term = term
 
 

File src/htsql/core/tr/pipe.py

+#
+# Copyright (c) 2006-2013, Prometheus Research, LLC
+#
+
+
+from ..util import Clonable, YAMLable
+from ..context import context
+from ..domain import Product
+from ..connect import transaction, scramble, unscramble
+from ..error import PermissionError
+import operator
+
+
+class Pipe(Clonable, YAMLable):
+    pass
+
+
+class ComposePipe(Pipe):
+
+    def __init__(self, left_pipe, right_pipe):
+        self.left_pipe = left_pipe
+        self.right_pipe = right_pipe
+
+    def __call__(self):
+        def compose(input, left=self.left_pipe(),
+                           right=self.right_pipe()):
+            return right(left(input))
+        return compose
+
+    def __yaml__(self):
+        yield ('left', self.left_pipe)
+        yield ('right', self.right_pipe)
+
+
+class SQLPipe(Pipe):
+
+    def __init__(self, sql, input_domains, output_domains):
+        self.sql = sql
+        self.input_domains = input_domains
+        self.output_domains = output_domains
+
+    def __call__(self):
+        def run_sql(input, sql=self.sql.encode('utf-8'),
+                           input_domains=self.input_domains,
+                           output_domains=self.output_domains):
+            if not context.env.can_read:
+                raise PermissionError("No read permissions")
+            scrambles = None
+            if input_domains is not None:
+                scrambles = [scramble(domain) for domain in input_domains]
+            unscrambles = [unscramble(domain) for domain in output_domains]
+            with transaction() as connection:
+                cursor = connection.cursor()
+                if scrambles is None:
+                    assert input is None
+                    cursor.execute(sql)
+                else:
+                    assert isinstance(input, (tuple, list))
+                    assert len(input) == len(scrambles)
+                    parameters = dict((str(index+1), scramble(item))
+                            for index, (item, scramble)
+                                    in enumerate(zip(input, scrambles)))
+                    cursor.execute(sql, parameters)
+                output = []
+                for row in cursor:
+                    assert len(row) == len(unscrambles)
+                    output.append(tuple(unscramble(item)
+                                  for item, unscramble in zip(row, unscrambles)))
+            return output
+        return run_sql
+
+    def __yaml__(self):
+        yield ('sql', self.sql+'\n')
+        if self.input_domains:
+            yield ('input', [unicode(domain)
+                             for domain in self.input_domains])
+        if self.output_domains:
+            yield ('output', [unicode(domain)
+                              for domain in self.output_domains])
+
+
+class ProducePipe(Pipe):
+
+    def __init__(self, meta, data_pipe, **properties):
+        self.meta = meta
+        self.data_pipe = data_pipe
+        self.properties = properties
+
+    def __call__(self):
+        def produce(input, make_data=self.data_pipe(),
+                           meta=self.meta,
+                           pipe=self,
+                           properties=self.properties):
+            return Product(meta, make_data(input), pipe=pipe, **properties)
+        return produce
+
+    def __yaml__(self):
+        yield ('meta', str(self.meta))
+        yield ('data', self.data_pipe)
+
+
+class ValuePipe(Pipe):
+
+    def __init__(self, data):
+        self.data = data
+
+    def __call__(self):
+        def make_value(input, data=self.data):
+            return data
+        return make_value
+
+    def __yaml__(self):
+        yield ('data', self.data)
+
+
+class RecordPipe(Pipe):
+
+    def __init__(self, field_pipes, record_class=tuple):
+        self.field_pipes = field_pipes
+        self.record_class = record_class
+
+    def __call__(self):
+        make_fields = [field_pipe() for field_pipe in self.field_pipes]
+        def make_record(input, make_fields=make_fields,
+                               record_class=self.record_class):
+            return record_class(make_field(input)
+                                for make_field in make_fields)
+        return make_record
+
+    def __yaml__(self):
+        yield ('fields', self.field_pipes)
+
+
+class ExtractPipe(Pipe):
+
+    def __init__(self, index):
+        self.index = index
+
+    def __call__(self):
+        return operator.itemgetter(self.index)
+
+    def __yaml__(self):
+        yield ('index', self.index)
+
+
+class SinglePipe(Pipe):
+
+    def __init__(self):
+        pass
+
+    def __call__(self):
+        def make_single(input):
+            assert len(input) <= 1
+            if input:
+                return input[0]
+        return make_single
+
+
+class IteratePipe(Pipe):
+
+    def __init__(self, value_pipe):
+        self.value_pipe = value_pipe
+
+    def __call__(self):
+        def iterate(input, make_value=self.value_pipe()):
+            return map(make_value, input)
+        return iterate
+
+    def __yaml__(self):
+        yield ('value', self.value_pipe)
+
+
+class AnnihilatePipe(Pipe):
+
+    def __init__(self, test_pipe, value_pipe):
+        self.test_pipe = test_pipe
+        self.value_pipe = value_pipe
+
+    def __call__(self):
+        if (isinstance(self.test_pipe, ValuePipe) and
+                self.test_pipe.data is True):
+            return self.value_pipe()
+        def annihilate(input, test=self.test_pipe(),
+                              make_value=self.value_pipe()):
+            if test(input) is True:
+                return make_value(input)
+        return annihilate
+
+    def __yaml__(self):
+        yield ('test', self.test_pipe)
+        yield ('value', self.value_pipe)
+
+
+class MixPipe(Pipe):
+
+    def __init__(self, key_pipes):
+        self.key_pipes = key_pipes
+
+    def __call__(self):
+        make_keys = [key_pipe() for key_pipe in self.key_pipes]
+        def mix(input, make_parent_key=make_keys[0],
+                       make_kid_keys=make_keys[1:]):
+            parent = input[0]
+            kids = input[1:]
+            kids_range = range(len(kids))
+            tops = [0]*len(kids)
+            output = []
+            for parent_row in parent:
+                row = list(parent_row)
+                parent_key = make_parent_key(parent_row)
+                for idx in kids_range:
+                    kid = kids[idx]
+                    top = tops[idx]
+                    make_kid_key = make_kid_keys[idx]
+                    kid_rows = []
+                    while (top < len(kid) and
+                           make_kid_key(kid[top]) == parent_key):
+                        kid_rows.append(kid[top])
+                        top += 1
+                    tops[idx] = top
+                    row.append(kid_rows)
+                output.append(tuple(row))
+            for idx in kids_range:
+                assert tops[idx] == len(kids[idx])
+            return output
+        return mix
+
+    def __yaml__(self):
+        yield ('keys', self.key_pipes)
+
+

File src/htsql/core/tr/translate.py

 #
 
 
-from ..util import listof
-from ..context import context
-from ..domain import ListDomain, RecordDomain, Profile, Product
 from ..syn.syntax import Syntax
 from ..tr.bind import bind
 from ..tr.binding import Binding
 from ..tr.assemble import assemble
 from ..tr.reduce import reduce
 from ..tr.dump import serialize
-from ..tr.plan import Plan, Statement
-from ..connect import transaction, scramble, unscramble
-from ..error import PermissionError
-
-
-class RowStream(object):
-
-    @classmethod
-    def open(cls, statement, cursor, input=None):
-        converts = [unscramble(domain)
-                    for domain in statement.domains]
-        sql = statement.sql.encode('utf-8')
-        parameters = None
-        if statement.placeholders:
-            assert input is not None
-            parameters = {}
-            for index in sorted(statement.placeholders):
-                domain = statement.placeholders[index]
-                convert = scramble(domain)
-                value = convert(input[index])
-                parameters[str(index+1)] = value
-        if parameters is None:
-            cursor.execute(sql)
-        else:
-            cursor.execute(sql, parameters)
-        rows = []
-        for row in cursor:
-            row = tuple(convert(item)
-                        for item, convert in zip(row, converts))
-            rows.append(row)
-        substreams = [cls.open(substatement, cursor)
-                      for substatement in statement.substatements]
-        return cls(rows, substreams)
-
-    def __init__(self, rows, substreams):
-        assert isinstance(rows, list)
-        assert isinstance(substreams, listof(RowStream))
-        self.rows = rows
-        self.substreams = substreams
-        self.top = 0
-        self.last_top = None
-        self.last_key = None
-
-    def __iter__(self):
-        self.top = 0
-        for row in self.rows:
-            yield row
-            self.top += 1
-
-    def get(self, stencil):
-        return tuple(self.rows[self.top][index]
-                     for index in stencil)
-
-    def slice(self, stencil, key):
-        if key != self.last_key:
-            self.last_top = self.top
-            self.last_key = key
-            if key != ():
-                while self.top < len(self.rows):
-                    row = self.rows[self.top]
-                    if key != tuple(row[index] for index in stencil):
-                        break
-                    yield row
-                    self.top += 1
-            else:
-                assert not stencil
-                while self.top < len(self.rows):
-                    yield self.rows[self.top]
-                    self.top += 1
-        else:
-            top = self.top
-            self.top = self.last_top
-            for idx in range(self.last_top, top):
-                self.top = idx
-                yield self.rows[idx]
-            self.top = top
-
-    def close(self):
-        assert self.top == len(self.rows)
-        for substream in self.substreams:
-            substream.close()
-
-
-class FetchPipe(object):
-
-    def __init__(self, plan):
-        assert isinstance(plan, Plan)
-        self.plan = plan
-        self.profile = plan.profile
-        self.statement = plan.statement
-        self.compose = plan.compose
-
-    def __call__(self, input=None):
-        meta = self.profile.clone(plan=self.plan)
-        data = None
-        if self.statement:
-            if not context.env.can_read:
-                raise PermissionError("No read permissions")
-            stream = None
-            with transaction() as connection:
-                cursor = connection.cursor()
-                stream = RowStream.open(self.statement, cursor, input)
-            data = self.compose(None, stream)
-            stream.close()
-        return Product(meta, data)
 
 
 def translate(syntax, environment=None, limit=None):
     term = compile(expression)
     frame = assemble(term)
     frame = reduce(frame)
-    plan = serialize(frame)
-    return FetchPipe(plan)
+    pipe = serialize(frame)
+    return pipe
 
 
 def safe_patch(expression, limit):

File src/htsql/tweak/etl/cmd/insert.py

         self.pipe = pipe
 
     def __call__(self, row):
-        product = self.pipe(row)
+        product = self.pipe()(row)
         data = product.data
         if len(data) != 1:
             raise Error("Unable to locate the inserted record")
         profile = decorate(binding)
         binding = QueryBinding(state.scope, binding, profile, syntax)
         pipe = translate(binding)
-        profile = pipe.profile
+        profile = pipe.meta
         if not self.is_list:
             profile = profile.clone(domain=profile.domain.item_domain)
         return ResolveIdentityPipe(profile, pipe)
             for idx in leaf:
                 raw_value = raw_value[idx]
             raw_values.append(raw_value)
-        product = self.pipe(raw_values)
+        product = self.pipe()(raw_values)
         data = product.data
         if len(data) != 1:
             quote = None

File src/htsql/tweak/etl/cmd/merge.py

             for idx in leaf:
                 raw_value = raw_value[idx]
             raw_values.append(raw_value)
-        product = self.pipe(raw_values)
+        product = self.pipe()(raw_values)
         data = product.data
         assert len(data) <= 1
         if data:

File src/htsql/tweak/shell/command.py

         yield JS_END
 
     def render_sql(self, plan):
-        if plan.statement is not None:
-            sql = []
-            queue = [plan.statement]
-            while queue:
-                statement = queue.pop(0)
-                sql.append(statement.sql)
-                queue.extend(statement.substatements)
-            sql = u"\n".join(sql)
+        if 'sql' in plan.properties:
+            sql = plan.properties['sql']
         else:
             sql = u""
         yield JS_MAP

File src/htsql_oracle/core/connect.py

 from htsql.core.connect import Connect, Scramble, Unscramble, UnscrambleError
 from htsql.core.adapter import adapt
 from htsql.core.context import context
+from htsql.core.error import Error
 from htsql.core.domain import (BooleanDomain, DecimalDomain, TextDomain,
         DateDomain, TimeDomain)
 import datetime
                 value = value.read()
             except cx_Oracle.Error, exc:
                 message = str(exc)
-                raise OracleError(message, exc)
+                raise Error(message, exc)
         if isinstance(value, str):
             value = value.decode('utf-8')
         return value