Kirill Simonov avatar Kirill Simonov committed a444b35

build_pipe() -> tr/translate.py

Comments (0)

Files changed (5)

src/htsql/core/cmd/fetch.py

 
 
 from ..adapter import adapt, Utility
-from ..util import listof
-from ..context import context
-from ..domain import ListDomain, RecordDomain, Profile, Product
 from .command import FetchCmd, SkipCmd, SQLCmd
 from .act import (analyze, Act, ProduceAction, SafeProduceAction,
                   AnalyzeAction, RenderAction)
-from ..tr.bind import bind
-from ..tr.binding import Binding
-from ..tr.encode import encode
-from ..tr.flow import OrderedFlow
-from ..tr.rewrite import rewrite
-from ..tr.compile import compile
-from ..tr.assemble import assemble
-from ..tr.reduce import reduce
-from ..tr.dump import serialize
-from ..tr.plan import Plan, Statement
+from ..domain import Product
+from ..tr.translate import translate
 from ..tr.decorate import decorate_void
-from ..connect import transaction, scramble, unscramble
-from ..error import PermissionError
-
-
-class RowStream(object):
-
-    @classmethod
-    def open(cls, statement, cursor, input=None):
-        converts = [unscramble(domain)
-                    for domain in statement.domains]
-        sql = statement.sql.encode('utf-8')
-        parameters = None
-        if statement.placeholders:
-            assert input is not None
-            parameters = {}
-            for index in sorted(statement.placeholders):
-                domain = statement.placeholders[index]
-                convert = scramble(domain)
-                value = convert(input[index])
-                parameters[str(index+1)] = value
-        if parameters is None:
-            cursor.execute(sql)
-        else:
-            cursor.execute(sql, parameters)
-        rows = []
-        for row in cursor:
-            row = tuple(convert(item)
-                        for item, convert in zip(row, converts))
-            rows.append(row)
-        substreams = [cls.open(substatement, cursor)
-                      for substatement in statement.substatements]
-        return cls(rows, substreams)
-
-    def __init__(self, rows, substreams):
-        assert isinstance(rows, list)
-        assert isinstance(substreams, listof(RowStream))
-        self.rows = rows
-        self.substreams = substreams
-        self.top = 0
-        self.last_top = None
-        self.last_key = None
-
-    def __iter__(self):
-        self.top = 0
-        for row in self.rows:
-            yield row
-            self.top += 1
-
-    def get(self, stencil):
-        return tuple(self.rows[self.top][index]
-                     for index in stencil)
-
-    def slice(self, stencil, key):
-        if key != self.last_key:
-            self.last_top = self.top
-            self.last_key = key
-            if key != ():
-                while self.top < len(self.rows):
-                    row = self.rows[self.top]
-                    if key != tuple(row[index] for index in stencil):
-                        break
-                    yield row
-                    self.top += 1
-            else:
-                assert not stencil
-                while self.top < len(self.rows):
-                    yield self.rows[self.top]
-                    self.top += 1
-        else:
-            top = self.top
-            self.top = self.last_top
-            for idx in range(self.last_top, top):
-                self.top = idx
-                yield self.rows[idx]
-            self.top = top
-
-    def close(self):
-        assert self.top == len(self.rows)
-        for substream in self.substreams:
-            substream.close()
-
-
-class FetchPipe(object):
-
-    def __init__(self, plan):
-        assert isinstance(plan, Plan)
-        self.plan = plan
-        self.profile = plan.profile
-        self.statement = plan.statement
-        self.compose = plan.compose
-
-    def __call__(self, input=None):
-        meta = self.profile.clone(plan=self.plan)
-        data = None
-        if self.statement:
-            if not context.env.can_read:
-                raise PermissionError("No read permissions")
-            stream = None
-            with transaction() as connection:
-                cursor = connection.cursor()
-                stream = RowStream.open(self.statement, cursor, input)
-            data = self.compose(None, stream)
-            stream.close()
-        return Product(meta, data)
-
-
-class BuildFetch(Utility):
-
-    def __init__(self, syntax, environment=None, limit=None):
-        self.syntax = syntax
-        self.environment = environment
-        self.limit = limit
-
-    def __call__(self):
-        if not isinstance(self.syntax, Binding):
-            binding = bind(self.syntax, environment=self.environment)
-        else:
-            binding = self.syntax
-        expression = encode(binding)
-        if self.limit is not None:
-            expression = self.safe_patch(expression, self.limit)
-        expression = rewrite(expression)
-        term = compile(expression)
-        frame = assemble(term)
-        frame = reduce(frame)
-        plan = serialize(frame)
-        return FetchPipe(plan)
-
-    def safe_patch(self, expression, limit):
-        segment = expression.segment
-        if segment is None:
-            return expression
-        flow = segment.flow
-        while not flow.is_axis:
-            if (isinstance(flow, OrderedFlow) and flow.limit is not None
-                                              and flow.limit <= limit):
-                return expression
-            flow = flow.base
-        if flow.is_root:
-            return expression
-        if isinstance(segment.flow, OrderedFlow):
-            flow = segment.flow.clone(limit=limit)
-        else:
-            flow = OrderedFlow(segment.flow, [], limit, None, segment.binding)
-        segment = segment.clone(flow=flow)
-        expression = expression.clone(segment=segment)
-        return expression
 
 
 class ProduceFetch(Act):
         cut = None
         if isinstance(self.action, SafeProduceAction):
             cut = self.action.cut
-        pipe = build_fetch(self.command.syntax, self.action.environment, cut)
+        pipe = translate(self.command.syntax, self.action.environment, cut)
         return pipe()
 
 
     adapt(FetchCmd, AnalyzeAction)
 
     def __call__(self):
-        pipe = build_fetch(self.command.syntax, self.action.environment)
+        pipe = translate(self.command.syntax, self.action.environment)
         return pipe.plan
 
 
         return (status, headers, body)
 
 
-build_fetch = BuildFetch.__invoke__
-
-

src/htsql/core/tr/__init__.py

 
 
 from . import (assemble, binding, bind, coerce, compile, dump, encode, flow,
-        fn, frame, lookup, plan, reduce, rewrite, signature, stitch, term)
+        fn, frame, lookup, plan, reduce, rewrite, signature, stitch, term,
+        translate)
 
 

src/htsql/core/tr/translate.py

+#
+# Copyright (c) 2006-2013, Prometheus Research, LLC
+#
+
+
+from ..util import listof
+from ..context import context
+from ..domain import ListDomain, RecordDomain, Profile, Product
+from ..syn.syntax import Syntax
+from ..tr.bind import bind
+from ..tr.binding import Binding
+from ..tr.encode import encode
+from ..tr.flow import OrderedFlow
+from ..tr.rewrite import rewrite
+from ..tr.compile import compile
+from ..tr.assemble import assemble
+from ..tr.reduce import reduce
+from ..tr.dump import serialize
+from ..tr.plan import Plan, Statement
+from ..connect import transaction, scramble, unscramble
+from ..error import PermissionError
+
+
+class RowStream(object):
+
+    @classmethod
+    def open(cls, statement, cursor, input=None):
+        converts = [unscramble(domain)
+                    for domain in statement.domains]
+        sql = statement.sql.encode('utf-8')
+        parameters = None
+        if statement.placeholders:
+            assert input is not None
+            parameters = {}
+            for index in sorted(statement.placeholders):
+                domain = statement.placeholders[index]
+                convert = scramble(domain)
+                value = convert(input[index])
+                parameters[str(index+1)] = value
+        if parameters is None:
+            cursor.execute(sql)
+        else:
+            cursor.execute(sql, parameters)
+        rows = []
+        for row in cursor:
+            row = tuple(convert(item)
+                        for item, convert in zip(row, converts))
+            rows.append(row)
+        substreams = [cls.open(substatement, cursor)
+                      for substatement in statement.substatements]
+        return cls(rows, substreams)
+
+    def __init__(self, rows, substreams):
+        assert isinstance(rows, list)
+        assert isinstance(substreams, listof(RowStream))
+        self.rows = rows
+        self.substreams = substreams
+        self.top = 0
+        self.last_top = None
+        self.last_key = None
+
+    def __iter__(self):
+        self.top = 0
+        for row in self.rows:
+            yield row
+            self.top += 1
+
+    def get(self, stencil):
+        return tuple(self.rows[self.top][index]
+                     for index in stencil)
+
+    def slice(self, stencil, key):
+        if key != self.last_key:
+            self.last_top = self.top
+            self.last_key = key
+            if key != ():
+                while self.top < len(self.rows):
+                    row = self.rows[self.top]
+                    if key != tuple(row[index] for index in stencil):
+                        break
+                    yield row
+                    self.top += 1
+            else:
+                assert not stencil
+                while self.top < len(self.rows):
+                    yield self.rows[self.top]
+                    self.top += 1
+        else:
+            top = self.top
+            self.top = self.last_top
+            for idx in range(self.last_top, top):
+                self.top = idx
+                yield self.rows[idx]
+            self.top = top
+
+    def close(self):
+        assert self.top == len(self.rows)
+        for substream in self.substreams:
+            substream.close()
+
+
+class FetchPipe(object):
+
+    def __init__(self, plan):
+        assert isinstance(plan, Plan)
+        self.plan = plan
+        self.profile = plan.profile
+        self.statement = plan.statement
+        self.compose = plan.compose
+
+    def __call__(self, input=None):
+        meta = self.profile.clone(plan=self.plan)
+        data = None
+        if self.statement:
+            if not context.env.can_read:
+                raise PermissionError("No read permissions")
+            stream = None
+            with transaction() as connection:
+                cursor = connection.cursor()
+                stream = RowStream.open(self.statement, cursor, input)
+            data = self.compose(None, stream)
+            stream.close()
+        return Product(meta, data)
+
+
+def translate(syntax, environment=None, limit=None):
+    assert isinstance(syntax, (Syntax, Binding, unicode, str))
+    if isinstance(syntax, (str, unicode)):
+        syntax = parse(syntax)
+    if not isinstance(syntax, Binding):
+        binding = bind(syntax, environment=environment)
+    else:
+        binding = syntax
+    expression = encode(binding)
+    if limit is not None:
+        expression = safe_patch(expression, limit)
+    expression = rewrite(expression)
+    term = compile(expression)
+    frame = assemble(term)
+    frame = reduce(frame)
+    plan = serialize(frame)
+    return FetchPipe(plan)
+
+
+def safe_patch(expression, limit):
+    segment = expression.segment
+    if segment is None:
+        return expression
+    flow = segment.flow
+    while not flow.is_axis:
+        if (isinstance(flow, OrderedFlow) and flow.limit is not None
+                                          and flow.limit <= limit):
+            return expression
+        flow = flow.base
+    if flow.is_root:
+        return expression
+    if isinstance(segment.flow, OrderedFlow):
+        flow = segment.flow.clone(limit=limit)
+    else:
+        flow = OrderedFlow(segment.flow, [], limit, None, segment.binding)
+    segment = segment.clone(flow=flow)
+    expression = expression.clone(segment=segment)
+    return expression
+
+

src/htsql/tweak/etl/cmd/insert.py

         ChainArc)
 from ....core.entity import TableEntity, ColumnEntity
 from ....core.cmd.act import Act, ProduceAction, act
-from ....core.cmd.fetch import RowStream, build_fetch
+from ....core.tr.translate import translate
 from ....core.tr.bind import BindingState, Select
 from ....core.syn.syntax import VoidSyntax, IdentifierSyntax
 from ....core.tr.binding import (VoidBinding, RootBinding, FormulaBinding,
         binding = SegmentBinding(state.scope, binding, domain, syntax)
         profile = decorate(binding)
         binding = QueryBinding(state.scope, binding, profile, syntax)
-        pipe = build_fetch(binding)
+        pipe = translate(binding)
         profile = pipe.profile
         if not self.is_list:
             profile = profile.clone(domain=profile.domain.item_domain)
         binding = SegmentBinding(state.root, binding, domain, syntax)
         profile = decorate(binding)
         binding = QueryBinding(state.root, binding, profile, syntax)
-        pipe =  build_fetch(binding)
+        pipe =  translate(binding)
         columns = joins[0].origin_columns[:]
         domain = identity.domain
         return ResolveChainPipe(target_name, columns, domain, pipe)

src/htsql/tweak/etl/cmd/merge.py

 from ....core.classify import localize, relabel
 from ....core.connect import transaction, scramble, unscramble
 from ....core.domain import IdentityDomain, RecordDomain, ListDomain, Product
-from ....core.cmd.fetch import build_fetch
+from ....core.cmd.fetch import translate
 from ....core.cmd.act import Act, ProduceAction, act
 from ....core.tr.bind import BindingState, Select
 from ....core.syn.syntax import VoidSyntax
         binding = SegmentBinding(state.root, binding, domain, syntax)
         profile = decorate(binding)
         binding = QueryBinding(state.root, binding, profile, syntax)
-        pipe =  build_fetch(binding)
+        pipe =  translate(binding)
         domain = identity.domain
         return ResolveKeyPipe(name, columns, domain, pipe, self.with_error)
 
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.