Commits

Kirill Simonov  committed 8b247a6

ETL: added support for scalar queries.

  • Participants
  • Parent commits 5ac6419

Comments (0)

Files changed (7)

File src/htsql/tweak/etl/cmd/delete.py

                     extract_node.node.table)
             meta = decorate(VoidBinding())
             data = None
-            for record in product.data:
+            if extract_node.is_list:
+                records = product.data
+            else:
+                records = [product.data]
+            for record in records:
                 if record is None:
                     continue
                 id_value, row = extract_node(record)

File src/htsql/tweak/etl/cmd/insert.py

 
 class ExtractNodePipe(object):
 
-    def __init__(self, node, arcs, id_convert, converts):
+    def __init__(self, node, arcs, id_convert, converts, is_list):
         assert isinstance(node, TableNode)
         assert isinstance(arcs, listof(Arc))
         assert isinstance(converts, list)
         self.arcs = arcs
         self.id_convert = id_convert
         self.converts = converts
+        self.is_list = is_list
 
     def __call__(self, row):
         if self.id_convert is not None:
 
     def __call__(self):
         domain = self.profile.domain
-        if not (isinstance(domain, ListDomain) and
-                isinstance(domain.item_domain, RecordDomain)):
+        is_list = (isinstance(domain, ListDomain))
+        if not ((isinstance(domain, ListDomain) and
+                 isinstance(domain.item_domain, RecordDomain)) or
+                isinstance(domain, RecordDomain)):
             feed_type = domain.family
             if isinstance(domain, ListDomain):
                 feed_type += " of " + domain.item_domain.family
             raise BadRequestError("unexpected feed type: expected"
                                   " a list of records; got %s" % feed_type)
-        fields = domain.item_domain.fields
+        if is_list:
+            fields = domain.item_domain.fields
+        else:
+            fields = domain.fields
         if self.profile.tag is None:
             raise BadRequestError("missing table name")
         signature = (normalize(self.profile.tag), None)
                                                  field.domain.family))
                     convert = (lambda v, i=idx, c=clarify: c(v[i]))
                     converts.append(convert)
-        return ExtractNodePipe(node, arcs, id_convert, converts)
+        return ExtractNodePipe(node, arcs, id_convert, converts, is_list)
 
 
 class ExtractTablePipe(object):
 
 class BuildResolveIdentity(Utility):
 
-    def __init__(self, table, columns):
+    def __init__(self, table, columns, is_list=True):
         assert isinstance(table, TableEntity)
         assert isinstance(columns, listof(ColumnEntity))
         self.table = table
         self.columns = columns
+        self.is_list = is_list
 
     def __call__(self):
         state = BindingState()
         profile = decorate(binding)
         binding = QueryBinding(state.scope, binding, profile, syntax)
         pipe = build_fetch(binding)
-        return ResolveIdentityPipe(pipe.profile, pipe)
+        profile = pipe.profile
+        if not self.is_list:
+            profile = profile.clone(domain=profile.domain.item_domain)
+        return ResolveIdentityPipe(profile, pipe)
 
 
 class ResolveChainPipe(object):
             execute_insert = BuildExecuteInsert.__invoke__(
                     extract_table.table, extract_table.columns)
             resolve_identity = BuildResolveIdentity.__invoke__(
-                    execute_insert.table, execute_insert.output_columns)
+                    execute_insert.table, execute_insert.output_columns,
+                    extract_node.is_list)
             meta = resolve_identity.profile
             data = []
-            for record in product.data:
+            if extract_node.is_list:
+                records = product.data
+            else:
+                records = [product.data]
+            for record in records:
                 if record is None:
                     continue
                 row = resolve_identity(
                             extract_table(
                                 extract_node(record))))
                 data.append(row)
+            if not extract_node.is_list:
+                assert len(data) <= 1
+                if data:
+                    data = data[0]
+                else:
+                    data = None
             return Product(meta, data)
 
 

File src/htsql/tweak/etl/cmd/merge.py

                     extract_table_for_update.table,
                     extract_table_for_update.columns)
             resolve_identity = BuildResolveIdentity.__invoke__(
-                    execute_insert.table, execute_insert.output_columns)
+                    execute_insert.table, execute_insert.output_columns,
+                    extract_node.is_list)
             meta = resolve_identity.profile
             data = []
-            for record in product.data:
+            if extract_node.is_list:
+                records = product.data
+            else:
+                records = [product.data]
+            for record in records:
                 if record is None:
                     continue
                 row = extract_node(record)
                     key = execute_insert(row)
                 row = resolve_identity(key)
                 data.append(row)
+            if not extract_node.is_list:
+                assert len(data) <= 1
+                if data:
+                    data = data[0]
+                else:
+                    data = None
             return Product(meta, data)
 
 

File src/htsql/tweak/etl/cmd/update.py

                     extract_table.table,
                     extract_table.columns)
             resolve_identity = BuildResolveIdentity.__invoke__(
-                    execute_update.table, execute_update.output_columns)
+                    execute_update.table, execute_update.output_columns,
+                    extract_node.is_list)
             meta = resolve_identity.profile
             data = []
-            for record in product.data:
+            if extract_node.is_list:
+                records = product.data
+            else:
+                records = [product.data]
+            for record in records:
                 if record is None:
                     continue
                 key_id, row = extract_node(record)
                 key = execute_update(key, row)
                 row = resolve_identity(key)
                 data.append(row)
+            if not extract_node.is_list:
+                assert len(data) <= 1
+                if data:
+                    data = data[0]
+                else:
+                    data = None
             return Product(meta, data)
 
 

File src/htsql/tweak/etl/tr/bind.py

 from ....core.tr.lookup import lookup_command
 from ....core.tr.decorate import decorate
 from ....core.tr.binding import QueryBinding, SegmentBinding, CommandBinding
-from ....core.tr.syntax import IdentifierSyntax
+from ....core.tr.syntax import IdentifierSyntax, WeakSegmentSyntax
 from ....core.tr.signature import ConnectiveSig
 from ....core.tr.fn.bind import BindCommand
 from ....core.cmd.command import DefaultCmd
     cmd = None
 
     def expand(self, op):
+        op = WeakSegmentSyntax(op, op.mark)
         op = self.state.bind(op)
         feed = lookup_command(op)
         if feed is None:
     def expand(self, ops):
         commands = []
         for op in ops:
+            op = WeakSegmentSyntax(op, op.mark)
             op = self.state.bind(op)
             command = lookup_command(op)
             if command is None:

File test/input/etl.yaml

                       manufacturer:=[0992],
                       title:='Lenovo ThinkPad T430'}/:insert,
            /product[A0000003])
+- uri: /insert(manufacturer:={code:='SNY', name:='Sony'})/:json
+- uri: /merge({'A0000004' :as sku,
+               [SNY] :as manufacturer,
+               'Sony Vaio' :as title} :as product)/:json
+- uri: /merge({'A0000004' :as sku,
+               'Sony Vaio SVE1711X1EB' :as title} :as product)/:json
+- uri: /update(product[A0000004]{id(), list_price:=1279})/:json
 - uri: /product
 

File test/output/pgsql.yaml

            FROM "product"
            WHERE ("product"."sku" = 'A0000003')
            ORDER BY 1 ASC
+      - uri: /insert(manufacturer:={code:='SNY', name:='Sony'})/:json
+        status: 200 OK
+        headers:
+        - [Content-Type, application/javascript]
+        - [Content-Disposition, inline; filename="manufacturer.js"]
+        body: |
+          {
+            "manufacturer": "SNY"
+          }
+      - uri: /merge({'A0000004' :as sku, [SNY] :as manufacturer, 'Sony Vaio' :as title}
+          :as product)/:json
+        status: 200 OK
+        headers:
+        - [Content-Type, application/javascript]
+        - [Content-Disposition, inline; filename="product.js"]
+        body: |
+          {
+            "product": "A0000004"
+          }
+      - uri: /merge({'A0000004' :as sku, 'Sony Vaio SVE1711X1EB' :as title} :as product)/:json
+        status: 200 OK
+        headers:
+        - [Content-Type, application/javascript]
+        - [Content-Disposition, inline; filename="product.js"]
+        body: |
+          {
+            "product": "A0000004"
+          }
+      - uri: /update(product[A0000004]{id(), list_price:=1279})/:json
+        status: 200 OK
+        headers:
+        - [Content-Type, application/javascript]
+        - [Content-Disposition, inline; filename="product.js"]
+        body: |
+          {
+            "product": "A0000004"
+          }
       - uri: /product
         status: 200 OK
         headers:
           \                            :              : you go                           :
           \           :\n | A0000003 | 0992              |                   | Lenovo
           ThinkPad T430        | true         |                                  |
-          \           |\n\n ----\n /product\n SELECT \"product\".\"sku\",\n        \"product\".\"manufacturer_code\",\n
+          \           |\n | A0000004 | SNY               |                   | Sony
+          Vaio SVE1711X1EB       | true         |                                  |
+          \   1279.00 |\n\n ----\n /product\n SELECT \"product\".\"sku\",\n        \"product\".\"manufacturer_code\",\n
           \       \"product\".\"product_line_code\",\n        \"product\".\"title\",\n
           \       \"product\".\"is_available\",\n        \"product\".\"description\",\n
           \       \"product\".\"list_price\"\n FROM \"product\"\n ORDER BY 1 ASC\n"