Commits

Kirill Simonov committed 0b4eb6f

top(): added generic implementation.

Comments (0)

Files changed (9)

src/htsql/core/tr/compile.py

 
 from ..util import maybe, listof
 from ..adapter import Adapter, adapt, adapt_many
-from ..domain import BooleanDomain
+from ..domain import BooleanDomain, IntegerDomain
 from .error import CompileError
 from .coerce import coerce
-from .signature import IsNullSig, AndSig
+from .signature import (IsNullSig, AndSig, CompareSig,
+                        SortDirectionSig, RowNumberSig)
 from .flow import (Expression, QueryExpr, SegmentCode, Code, LiteralCode,
                    FormulaCode, Flow, RootFlow, ScalarFlow, TableFlow,
-                   QuotientFlow, ComplementFlow, MonikerFlow,
-                   ForkedFlow, LinkedFlow, FilteredFlow, OrderedFlow,
+                   QuotientFlow, ComplementFlow, MonikerFlow, ForkedFlow,
+                   LinkedFlow, ClippedFlow, FilteredFlow, OrderedFlow,
                    Unit, ScalarUnit, ColumnUnit, AggregateUnit, CorrelatedUnit,
                    KernelUnit, CoveringUnit)
 from .term import (Term, ScalarTerm, TableTerm, FilterTerm, JoinTerm,
 class CompileCovering(CompileFlow):
 
     # The implementation is shared by these three covering flows.
-    adapt_many(MonikerFlow, ForkedFlow, LinkedFlow)
+    adapt_many(MonikerFlow,
+               ForkedFlow,
+               LinkedFlow,
+               ClippedFlow)
 
     def __call__(self):
         # Moniker, forked and linked flows are represented as a seed term
         # For the linked flow, it must export the linking expressions.
         if isinstance(self.flow, LinkedFlow):
             codes += [rop for lop, rop in self.flow.images]
+        # A clipped flow must order itself (but only up to the base).
+        if isinstance(self.flow, ClippedFlow):
+            order = []
+            for code, direction in arrange(self.flow.seed):
+                if all(self.flow.base.spans(unit.flow)
+                       for unit in code.units):
+                    continue
+                codes.append(code)
+                order.append((code, direction))
         # Any companion expressions must also be included.
         codes += self.flow.companions
         seed_term = self.state.inject(seed_term, codes)
             # Append regular joints.
             joints += tie(self.flow)
 
+        # Slice a clipped flow.
+        if isinstance(self.flow, ClippedFlow):
+            partition = []
+            if not is_regular:
+                partition += [joint.rop for joint in seed_joints]
+            partition += [joint.rop for joint in tie(self.flow.ground)]
+            if partition:
+                seed_term = self.clip(seed_term, order, partition)
+            else:
+                seed_term = self.clip_root(seed_term, order)
+
         # Populate units exported by the covering term.
         units = []
 
         return JoinTerm(self.state.tag(), lkid, rkid, joints,
                         is_left, is_right, self.flow, lkid.baseline, routes)
 
+    def clip(self, term, order, partition):
+        ops = []
+        for code, direction in order:
+            op = FormulaCode(SortDirectionSig(direction=direction),
+                             code.domain, code.binding, base=code)
+            ops.append(op)
+        row_number_code = FormulaCode(RowNumberSig(), coerce(IntegerDomain()),
+                                      self.flow.binding,
+                                      partition=partition, order=ops)
+        row_number_unit = ScalarUnit(row_number_code, term.flow.base,
+                                     term.flow.binding)
+        tag = self.state.tag()
+        routes = term.routes.copy()
+        routes[row_number_unit] = tag
+        term = PermanentTerm(tag, term, term.flow, term.baseline, routes)
+        left_bound = 1
+        if self.flow.offset is not None:
+            left_bound = self.flow.offset+1
+        right_bound = left_bound+1
+        if self.flow.limit is not None:
+            right_bound = left_bound+self.flow.limit
+        left_bound_code = LiteralCode(left_bound, coerce(IntegerDomain()),
+                                      term.flow.binding)
+        right_bound_code = LiteralCode(right_bound, coerce(IntegerDomain()),
+                                       term.flow.binding)
+        left_filter = FormulaCode(CompareSig('>='), coerce(BooleanDomain()),
+                                  term.flow.binding,
+                                  lop=row_number_unit, rop=left_bound_code)
+        right_filter = FormulaCode(CompareSig('<'), coerce(BooleanDomain()),
+                                   term.flow.binding,
+                                   lop=row_number_unit, rop=right_bound_code)
+        filter = FormulaCode(AndSig(), coerce(BooleanDomain()),
+                             term.flow.binding,
+                             ops=[left_filter, right_filter])
+        return FilterTerm(self.state.tag(), term, filter,
+                          term.flow, term.baseline, term.routes.copy())
+
+
+    def clip_root(self, term, order):
+        limit = self.flow.limit
+        if limit is None:
+            limit = 1
+        offset = self.flow.offset
+        return OrderTerm(self.state.tag(), term, order, limit, offset,
+                         term.flow, term.baseline, term.routes.copy())
+
 
 class CompileFiltered(CompileFlow):
 

src/htsql/core/tr/dump.py

                     FormulaPhrase, Anchor, LeadingAnchor)
 from .signature import (Signature, isformula, IsEqualSig, IsTotallyEqualSig,
                         IsInSig, IsNullSig, IfNullSig, NullIfSig, CompareSig,
-                        AndSig, OrSig, NotSig, SortDirectionSig, ToPredicateSig,
-                        FromPredicateSig)
+                        AndSig, OrSig, NotSig, SortDirectionSig, RowNumberSig,
+                        ToPredicateSig, FromPredicateSig)
 from .plan import Plan, Statement
 import StringIO
 import re
                     self.signature)
 
 
+class DumpRowNumber(DumpBySignature):
+
+    adapt(RowNumberSig)
+
+    def __call__(self):
+        self.write(u"ROW_NUMBER() OVER (")
+        if self.phrase.partition:
+            self.format("PARTITION BY {partition:union{, }}", self.arguments)
+            if self.phrase.order:
+                self.write(u" ")
+        if self.phrase.order:
+            self.format("ORDER BY {order:union{, }}", self.arguments)
+        self.write(u")")
+
+
 class DumpToPredicate(DumpBySignature):
 
     adapt(ToPredicateSig)

src/htsql/core/tr/flow.py

         assert isinstance(flow, (ComplementFlow,
                                  MonikerFlow,
                                  ForkedFlow,
-                                 LinkedFlow))
+                                 LinkedFlow,
+                                 ClippedFlow))
         super(CoveringUnit, self).__init__(
                     code=code,
                     flow=flow,

src/htsql/core/tr/signature.py

         return (self.direction,)
 
 
+class RowNumberSig(Signature):
+
+    slots = [
+            Slot('partition', is_mandatory=False, is_singular=False),
+            Slot('order', is_mandatory=False, is_singular=False),
+    ]
+
+
 class ToPredicateSig(UnarySig):
     pass
 

src/htsql/core/tr/stitch.py

 from .syntax import IdentifierSyntax
 from .flow import (Flow, ScalarFlow, TableFlow, FiberTableFlow, QuotientFlow,
                    ComplementFlow, MonikerFlow, ForkedFlow, LinkedFlow,
-                   OrderedFlow, ColumnUnit, KernelUnit, CoveringUnit)
+                   ClippedFlow, OrderedFlow, ColumnUnit, KernelUnit,
+                   CoveringUnit)
 from .term import Joint
 
 
 class ArrangeCovering(Arrange):
 
     # The implementation is shared by all covering flows.
-    adapt_many(ComplementFlow, MonikerFlow, ForkedFlow, LinkedFlow)
+    adapt_many(ComplementFlow,
+               MonikerFlow,
+               ForkedFlow,
+               LinkedFlow,
+               ClippedFlow)
 
     def __call__(self):
         # Start with the parent ordering.
 class SpreadCovering(Spread):
 
     # The implementation is shared by all covering flows.
-    adapt_many(ComplementFlow, MonikerFlow, ForkedFlow, LinkedFlow)
+    adapt_many(ComplementFlow,
+               MonikerFlow,
+               ForkedFlow,
+               LinkedFlow,
+               ClippedFlow)
 
     def __call__(self):
         # Native units of the complement are inherited from the seed flow.
 class SewCovering(Sew):
 
     # The implementation is shared by all covering flows.
-    adapt_many(ComplementFlow, MonikerFlow, LinkedFlow, ForkedFlow)
+    adapt_many(ComplementFlow,
+               MonikerFlow,
+               LinkedFlow,
+               ForkedFlow,
+               ClippedFlow)
 
     def __call__(self):
         # To sew two terms representing a covering flow, we sew all axial flows
             yield joint.clone(rop=rop)
 
 
+class TieClipped(Tie):
+
+    adapt(ClippedFlow)
+
+    def __call__(self):
+        flow = self.flow.inflate()
+        joints = tie(flow.ground)
+        for joint in joints:
+            rop = CoveringUnit(joint.rop, flow, joint.rop.binding)
+            yield joint.clone(rop=rop)
+
+
 class TieForked(Tie):
 
     adapt(ForkedFlow)

src/htsql_mssql/core/tr/__init__.py

 #
 
 
-from . import compile, dump, encode, reduce, signature
+from . import compile, dump, encode, reduce
 
 

src/htsql_mssql/core/tr/compile.py

 from htsql.core.tr.term import PermanentTerm, FilterTerm
 from htsql.core.tr.flow import LiteralCode, FormulaCode, ScalarUnit
 from htsql.core.tr.coerce import coerce
-from htsql.core.tr.signature import CompareSig, AndSig, SortDirectionSig
-from .signature import RowNumberSig
+from htsql.core.tr.signature import (CompareSig, AndSig, SortDirectionSig,
+                                     RowNumberSig)
 from htsql.core.tr.compile import CompileOrdered
 from htsql.core.tr.stitch import arrange, spread
 
                              code.domain, code.binding, base=code)
             ops.append(op)
         row_number_code = FormulaCode(RowNumberSig(), coerce(IntegerDomain()),
-                                      self.flow.binding, ops=ops)
+                                      self.flow.binding,
+                                      partition=[], order=ops)
         row_number_unit = ScalarUnit(row_number_code, self.flow.base,
                                      self.flow.binding)
         tag = self.state.tag()

src/htsql_mssql/core/tr/dump.py

                                    DumpDateIncrement, DumpDateDecrement,
                                    DumpDateTimeIncrement,
                                    DumpDateTimeDecrement, DumpDateDifference)
-from .signature import RowNumberSig
 import math
 
 
         self.format("({op} <> 0)", self.arguments)
 
 
-class MSSQLDumpRowNumber(DumpBySignature):
-
-    adapt(RowNumberSig)
-
-    def __call__(self):
-        self.format("ROW_NUMBER() OVER (ORDER BY {ops:union{, }})",
-                    self.arguments)
-
-
 class MSSQLDumpBoolean(DumpBoolean):
 
     def __call__(self):

src/htsql_mssql/core/tr/signature.py

-#
-# Copyright (c) 2006-2012, Prometheus Research, LLC
-#
-
-
-from htsql.core.tr.signature import ConnectiveSig
-
-
-class RowNumberSig(ConnectiveSig):
-    pass
-
-