Source

htsql / src / htsql / tweak / etl / cmd / insert.py

Full commit
Kirill Simonov 7232976 
Kirill Simonov 201d19a 
Kirill Simonov 7232976 


Kirill Simonov 9df0080 

Kirill Simonov 8a108b3 

Kirill Simonov c9e494e 
Kirill Simonov 8bc7f91 
Kirill Simonov 54d133d 
Kirill Simonov 959e56b 

Kirill Simonov f4c93a3 
Kirill Simonov 9df0080 


Kirill Simonov 72247d3 
Kirill Simonov 54d133d 
Kirill Simonov f4c93a3 
Kirill Simonov 72247d3 
Kirill Simonov f4c93a3 
Kirill Simonov ae6fe3d 
Kirill Simonov bc35302 
Kirill Simonov f4c93a3 
Kirill Simonov 7232976 

Kirill Simonov f4c93a3 
Kirill Simonov 7232976 

Kirill Simonov bc35302 
Kirill Simonov 8bc7f91 

Kirill Simonov 7232976 

Kirill Simonov 675d765 









Kirill Simonov 8bc7f91 
Kirill Simonov 675d765 


Kirill Simonov 8bc7f91 













Kirill Simonov 54d133d 
Kirill Simonov 8bc7f91 


































Kirill Simonov de5bc6c 






Kirill Simonov 54d133d 
Kirill Simonov de5bc6c 

Kirill Simonov 54d133d 

Kirill Simonov de5bc6c 



Kirill Simonov 959e56b 






















Kirill Simonov 89bd8ee 
Kirill Simonov de5bc6c 

Kirill Simonov 9df0080 
Kirill Simonov 7232976 
Kirill Simonov 8b247a6 
Kirill Simonov 9df0080 




Kirill Simonov 45360e4 
Kirill Simonov 9df0080 
Kirill Simonov 8b247a6 
Kirill Simonov 9df0080 

Kirill Simonov 45360e4 




Kirill Simonov 9df0080 



Kirill Simonov 45360e4 
Kirill Simonov 9df0080 
Kirill Simonov 45360e4 

Kirill Simonov 7232976 

Kirill Simonov 9df0080 
Kirill Simonov 8b247a6 



Kirill Simonov 959e56b 

Kirill Simonov 8b247a6 



Kirill Simonov 9df0080 
Kirill Simonov 959e56b 
Kirill Simonov 9df0080 
Kirill Simonov f4c93a3 


Kirill Simonov 89bd8ee 
Kirill Simonov f4c93a3 

Kirill Simonov 89bd8ee 
Kirill Simonov 9df0080 
Kirill Simonov 45360e4 


Kirill Simonov 959e56b 
Kirill Simonov 45360e4 







Kirill Simonov 959e56b 
Kirill Simonov 45360e4 



Kirill Simonov 959e56b 



Kirill Simonov 45360e4 











Kirill Simonov 89bd8ee 
Kirill Simonov 45360e4 

Kirill Simonov 89bd8ee 
Kirill Simonov 45360e4 
Kirill Simonov 9df0080 

Kirill Simonov 45360e4 





Kirill Simonov b54c823 



Kirill Simonov 89bd8ee 
Kirill Simonov 45360e4 







Kirill Simonov 89bd8ee 
Kirill Simonov 45360e4 







Kirill Simonov 89bd8ee 

Kirill Simonov 45360e4 



Kirill Simonov 959e56b 





Kirill Simonov 45360e4 
Kirill Simonov 8b247a6 
Kirill Simonov 9df0080 
































Kirill Simonov 959e56b 

Kirill Simonov 9df0080 




Kirill Simonov 959e56b 
Kirill Simonov 9df0080 



Kirill Simonov 959e56b 

Kirill Simonov 9df0080 



Kirill Simonov 7ae8dd7 
Kirill Simonov 9df0080 
Kirill Simonov bc35302 
Kirill Simonov 9df0080 

Kirill Simonov 7232976 
Kirill Simonov 9df0080 



















Kirill Simonov 8a108b3 

Kirill Simonov 9df0080 




Kirill Simonov 959e56b 
Kirill Simonov 9df0080 













Kirill Simonov f4c93a3 











Kirill Simonov 959e56b 
Kirill Simonov 9df0080 

Kirill Simonov f4c93a3 
Kirill Simonov 9df0080 










Kirill Simonov 959e56b 
Kirill Simonov 9df0080 




Kirill Simonov 8b247a6 
Kirill Simonov 9df0080 



Kirill Simonov 8b247a6 
Kirill Simonov 9df0080 

Kirill Simonov f4c93a3 



Kirill Simonov 9df0080 
Kirill Simonov f4c93a3 

Kirill Simonov 9df0080 
Kirill Simonov f4c93a3 

Kirill Simonov b115d97 
Kirill Simonov f4c93a3 




















Kirill Simonov 959e56b 
Kirill Simonov f4c93a3 
Kirill Simonov 9df0080 
Kirill Simonov f4c93a3 

Kirill Simonov 451f0ec 
Kirill Simonov f4c93a3 







Kirill Simonov 5f481bd 
Kirill Simonov 8b247a6 



Kirill Simonov f4c93a3 
Kirill Simonov 9df0080 


Kirill Simonov 959e56b 
Kirill Simonov 9df0080 
Kirill Simonov 959e56b 
Kirill Simonov 9df0080 

Kirill Simonov 959e56b 
Kirill Simonov 9df0080 




Kirill Simonov 959e56b 
Kirill Simonov 9df0080 






Kirill Simonov 959e56b 




Kirill Simonov 89bd8ee 
Kirill Simonov 9df0080 




Kirill Simonov 959e56b 


Kirill Simonov 9df0080 

Kirill Simonov 959e56b 

Kirill Simonov 9df0080 
Kirill Simonov bc35302 



Kirill Simonov 9df0080 
Kirill Simonov bc35302 

Kirill Simonov 89bd8ee 
Kirill Simonov bc35302 
Kirill Simonov 9df0080 
Kirill Simonov ae6fe3d 




Kirill Simonov bc35302 

Kirill Simonov 9df0080 
Kirill Simonov ae6fe3d 
Kirill Simonov bc35302 
Kirill Simonov ae6fe3d 



Kirill Simonov bc35302 


Kirill Simonov 9df0080 
Kirill Simonov bc35302 













Kirill Simonov 5f481bd 
Kirill Simonov 9df0080 

Kirill Simonov 959e56b 
Kirill Simonov f4c93a3 

Kirill Simonov 9df0080 





Kirill Simonov 72247d3 
Kirill Simonov 9df0080 





Kirill Simonov 8b247a6 

Kirill Simonov 9df0080 

Kirill Simonov 8b247a6 

Kirill Simonov 959e56b 
Kirill Simonov 8b247a6 

Kirill Simonov 959e56b 

Kirill Simonov 9df0080 

Kirill Simonov 959e56b 










Kirill Simonov 89bd8ee 
Kirill Simonov 959e56b 
Kirill Simonov 9df0080 
Kirill Simonov 8b247a6 





Kirill Simonov 9df0080 

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
#
# Copyright (c) 2006-2013, Prometheus Research, LLC
#


from ....core.util import listof
from ....core.adapter import Utility, Adapter, adapt, adapt_many
from ....core.error import Error, PermissionError
from ....core.context import context
from ....core.connect import transaction, scramble, unscramble
from ....core.domain import (Domain, ListDomain, RecordDomain, BooleanDomain,
        IntegerDomain, FloatDomain, DecimalDomain, TextDomain, DateDomain,
        TimeDomain, DateTimeDomain, IdentityDomain, UntypedDomain, Product,
        Value, ID)
from ....core.classify import normalize, classify, relabel
from ....core.model import (HomeNode, TableNode, Arc, TableArc, ColumnArc,
        ChainArc)
from ....core.entity import TableEntity, ColumnEntity
from ....core.cmd.act import Act, ProduceAction, act
from ....core.cmd.fetch import RowStream, build_fetch
from ....core.tr.bind import BindingState, Select
from ....core.syn.syntax import VoidSyntax, IdentifierSyntax
from ....core.tr.binding import (VoidBinding, RootBinding, FormulaBinding,
        LocateBinding, SelectionBinding, SieveBinding, AliasBinding,
        SegmentBinding, QueryBinding, FreeTableRecipe, ColumnRecipe)
from ....core.tr.signature import IsEqualSig, AndSig, PlaceholderSig
from ....core.tr.decorate import decorate
from ....core.tr.coerce import coerce
from ....core.tr.lookup import identify
from .command import InsertCmd
from ..tr.dump import serialize_insert
import itertools
import datetime
import decimal


class Clarify(Adapter):

    adapt(Domain, Domain)

    def __init__(self, origin_domain, domain):
        self.origin_domain = origin_domain
        self.domain = domain

    def __call__(self):
        if self.origin_domain == self.domain:
            return (lambda v: v)
        return None


class ClarifyFromUntyped(Clarify):

    adapt(UntypedDomain, Domain)

    def __call__(self):
        return (lambda v, p=self.domain.parse: p(v))


class ClarifyFromSelf(Clarify):

    adapt_many((BooleanDomain, BooleanDomain),
               (IntegerDomain, IntegerDomain),
               (FloatDomain, FloatDomain),
               (DecimalDomain, DecimalDomain),
               (TextDomain, TextDomain),
               (DateDomain, DateDomain),
               (TimeDomain, TimeDomain),
               (DateTimeDomain, DateTimeDomain))

    def __call__(self):
        return (lambda v: v)


class ClarifyDecimal(Clarify):

    adapt_many((IntegerDomain, DecimalDomain),
               (FloatDomain, DecimalDomain))

    def __call__(self):
        return (lambda v: decimal.Decimal(v) if v is not None else None)


class ClarifyFloat(Clarify):

    adapt_many((IntegerDomain, FloatDomain),
               (DecimalDomain, FloatDomain))

    def __call__(self):
        return (lambda v: float(v) if v is not None else None)


class ClarifyDateTimeFromDate(Clarify):

    adapt(DateDomain, DateTimeDomain)

    def __call__(self):
        return (lambda v: datetime.datetime.combine(v, datetime.time())
                            if v is not None else None)


class ClarifyIdentity(Clarify):

    adapt(IdentityDomain, IdentityDomain)

    def __call__(self):
        if self.origin_domain == self.domain:
            return (lambda v: v)
        if self.origin_domain.width != self.domain.width:
            return None
        converts = []
        for origin_field, field in zip(self.origin_domain.labels,
                                       self.domain.labels):
            convert = Clarify.__invoke__(origin_field, field)
            if convert is None:
                return None
            converts.append(convert)
        id_class = ID.make(self.domain.dump)
        return (lambda v, id_class=id_class, cs=converts:
                        id_class(c(i) for i, c in zip(v, cs))
                                      if v is not None else None)


class ExtractValuePipe(object):

    def __init__(self, name, from_domain, to_domain, convert, index):
        self.name = name
        self.from_domain = from_domain
        self.to_domain = to_domain
        self.convert = convert
        self.index = index

    def __call__(self, row):
        item = row[self.index]
        try:
            return self.convert(item)
        except ValueError:
            message = "Failed to adapt value of %s to %s" \
                    % (self.name, self.to_domain)
            quote = unicode(Value(self.from_domain, item))
            raise Error(message, quote)


class ExtractNodePipe(object):

    def __init__(self, node, arcs, id_convert, converts, is_list):
        assert isinstance(node, TableNode)
        assert isinstance(arcs, listof(Arc))
        assert isinstance(converts, list)
        self.node = node
        self.arcs = arcs
        self.id_convert = id_convert
        self.converts = converts
        self.is_list = is_list

    def __call__(self, row):
        if self.id_convert is not None:
            return (self.id_convert(row),
                    tuple(convert(row) for convert in self.converts))
        else:
            return tuple(convert(row) for convert in self.converts)


class BuildExtractNode(Utility):

    def __init__(self, profile, with_id=False, with_fields=True):
        self.profile = profile
        self.with_id = with_id
        self.with_fields = with_fields

    def __call__(self):
        domain = self.profile.domain
        is_list = (isinstance(domain, ListDomain))
        if not ((isinstance(domain, ListDomain) and
                 isinstance(domain.item_domain, RecordDomain)) or
                isinstance(domain, RecordDomain)):
            raise Error("Expected a record or a list of records;"
                        " got %s" % domain)
        if is_list:
            fields = domain.item_domain.fields
        else:
            fields = domain.fields
        if self.profile.tag is None:
            raise Error("Missing table name")
        signature = (normalize(self.profile.tag), None)
        arc_by_signature = dict(((label.name, label.arity), label.arc)
                                for label in classify(HomeNode()))
        if signature not in arc_by_signature:
            raise Error("Got unknown table", self.profile.tag)
        arc = arc_by_signature[signature]
        if not isinstance(arc, TableArc):
            raise Error("Expected the name of a table", self.profile.tag)
        node = TableNode(arc.table)
        id_convert = None
        if self.with_id:
            if not fields:
                raise Error("Expected the first field to be an identity")
            id_field = fields[0]
            state = BindingState()
            syntax = VoidSyntax()
            scope = RootBinding(syntax)
            state.set_root(scope)
            seed = state.use(FreeTableRecipe(node.table), syntax)
            recipe = identify(seed)
            if recipe is None:
                raise Error("Cannot determine identity of the table")
            identity = state.use(recipe, syntax, scope=seed)
            id_domain = identity.domain
            clarify = Clarify.__invoke__(id_field.domain, id_domain)
            if clarify is None:
                raise Error("Expected the first field to be"
                            " the table identity; got %s" % id_field.domain)
            id_convert = ExtractValuePipe(signature[0], id_field.domain,
                                          id_domain, clarify, 0)
        if self.with_fields:
            labels = classify(node)
            arc_by_signature = dict(((label.name, label.arity), label.arc)
                                    for label in labels)
            index_by_arc = {}
            for idx, field in enumerate(fields):
                if self.with_id and idx == 0:
                    continue
                if field.tag is None:
                    continue
                signature = (normalize(field.tag), None)
                if signature not in arc_by_signature:
                    raise Error("Expected a column or a link name", field.tag)
                arc = arc_by_signature[signature]
                if not isinstance(arc, (ColumnArc, ChainArc)):
                    raise Error("Expected a column or a link name", field.tag)
                index_by_arc[arc] = idx
        arcs = []
        converts = []
        if self.with_fields:
            for label in labels:
                if label.arc in index_by_arc:
                    arc = label.arc
                    idx = index_by_arc[arc]
                    field = fields[idx]
                    if (isinstance(arc, ColumnArc) and arc.link is not None
                            and isinstance(field.domain, IdentityDomain)):
                        arc = arc.link
                    if arc in arcs:
                        raise Error("Got duplicate field", field.tag)
                    arcs.append(arc)
                    if isinstance(arc, ColumnArc):
                        arc_domain = arc.column.domain
                    elif isinstance(arc, ChainArc):
                        joins = arc.joins
                        if not joins[0].is_direct and \
                                all(join.reverse().is_contracting
                                    for join in joins[1:]):
                            raise Error("Cannot assign to link", field.tag)
                        state = BindingState()
                        syntax = VoidSyntax()
                        scope = RootBinding(syntax)
                        state.set_root(scope)
                        seed = state.use(FreeTableRecipe(arc.target.table),
                                         syntax)
                        recipe = identify(seed)
                        if recipe is None:
                            raise Error("Cannot determine identity of a link",
                                        field.tag)
                        identity = state.use(recipe, syntax, scope=seed)
                        arc_domain = identity.domain
                    clarify = Clarify.__invoke__(field.domain, arc_domain)
                    if clarify is None:
                        raise Error("Invalid type for column %s:"
                                    " expected %s; got %s"
                                    % (field.tag.encode('utf-8'),
                                       arc_domain, field.domain))
                    convert = ExtractValuePipe(label.name, field.domain,
                                               arc_domain, clarify, idx)
                    converts.append(convert)
        return ExtractNodePipe(node, arcs, id_convert, converts, is_list)


class ExtractTablePipe(object):

    def __init__(self, table, columns, resolves, extracts):
        assert isinstance(table, TableEntity)
        assert isinstance(columns, listof(ColumnEntity))
        self.table = table
        self.columns = columns
        self.resolves = resolves
        self.extracts = extracts

    def __call__(self, row):
        row = [resolve(item) for item, resolve in zip(row, self.resolves)]
        return tuple(extract(row) for extract in self.extracts)


class BuildExtractTable(Utility):

    def __init__(self, node, arcs):
        assert isinstance(node, TableNode)
        assert isinstance(arcs, listof(Arc))
        self.node = node
        self.arcs = arcs

    def __call__(self):
        table = self.node.table
        resolves = []
        extract_by_column = {}
        for idx, arc in enumerate(self.arcs):
            if isinstance(arc, ColumnArc):
                column = arc.column
                if column in extract_by_column:
                    raise Error("Duplicate column assignment for %s"
                                % column.name.encode('utf-8'))
                resolve = (lambda v: v)
                extract = (lambda r, i=idx: r[i])
                resolves.append(resolve)
                extract_by_column[column] = extract
            elif isinstance(arc, ChainArc):
                resolve_pipe = BuildResolveChain.__invoke__(arc)
                resolve = (lambda v, p=resolve_pipe: p(v))
                resolves.append(resolve)
                for column_idx, column in enumerate(resolve_pipe.columns):
                    if column in extract_by_column:
                        raise Error("Duplicate column assignment for %s"
                                    % column.name.encode('utf-8'))
                    extract = (lambda r, i=idx, k=column_idx: r[i][k])
                    extract_by_column[column] = extract
        columns = []
        extracts = []
        for column in table:
            if column in extract_by_column:
                columns.append(column)
                extracts.append(extract_by_column[column])
        return ExtractTablePipe(table, columns, resolves, extracts)


class ExecuteInsertPipe(object):

    def __init__(self, table, input_columns, output_columns, sql):
        assert isinstance(table, TableEntity)
        assert isinstance(input_columns, listof(ColumnEntity))
        assert isinstance(output_columns, listof(ColumnEntity))
        assert isinstance(sql, unicode)
        self.table = table
        self.input_columns = input_columns
        self.output_columns = output_columns
        self.sql = sql
        self.input_converts = [scramble(column.domain)
                               for column in input_columns]
        self.output_converts = [unscramble(column.domain)
                                for column in output_columns]

    def __call__(self, row):
        row = tuple(convert(item)
               for item, convert in zip(row, self.input_converts))
        if not context.env.can_write:
            raise PermissionError("No write permissions")
        with transaction() as connection:
            cursor = connection.cursor()
            cursor.execute(self.sql.encode('utf-8'), row)
            rows = cursor.fetchall()
            if len(rows) != 1:
                raise Error("Failed to insert a record")
            [row] = rows
        return row


class BuildExecuteInsert(Utility):

    def __init__(self, table, columns):
        assert isinstance(table, TableEntity)
        assert isinstance(columns, listof(ColumnEntity))
        self.table = table
        self.columns = columns

    def __call__(self):
        table = self.table
        returning_columns = []
        if table.primary_key is not None:
            returning_columns = table.primary_key.origin_columns
        else:
            for key in table.unique_keys:
                if key.is_partial:
                    continue
                if all(not column.is_nullable
                       for column in key.origin_columns):
                    returning_columns = key.origin_columns
                    break
        if not returning_columns:
            raise Error("Table does not have a primary key")
        sql = serialize_insert(table, self.columns, returning_columns)
        return ExecuteInsertPipe(table, self.columns, returning_columns, sql)


class ResolveIdentityPipe(object):

    def __init__(self, profile, pipe):
        self.profile = profile
        self.pipe = pipe

    def __call__(self, row):
        product = self.pipe(row)
        data = product.data
        if len(data) != 1:
            raise Error("Unable to locate the inserted record")
        return data[0]


class BuildResolveIdentity(Utility):

    def __init__(self, table, columns, is_list=True):
        assert isinstance(table, TableEntity)
        assert isinstance(columns, listof(ColumnEntity))
        self.table = table
        self.columns = columns
        self.is_list = is_list

    def __call__(self):
        state = BindingState()
        syntax = VoidSyntax()
        scope = RootBinding(syntax)
        state.set_root(scope)
        scope = state.use(FreeTableRecipe(self.table), syntax)
        state.push_scope(scope)
        conditions = []
        for idx, column in enumerate(self.columns):
            column_binding = state.use(ColumnRecipe(column), syntax)
            placeholder_binding = FormulaBinding(scope,
                                                 PlaceholderSig(idx),
                                                 column_binding.domain,
                                                 syntax)
            condition = FormulaBinding(scope,
                                       IsEqualSig(+1),
                                       coerce(BooleanDomain()),
                                       syntax,
                                       lop=column_binding,
                                       rop=placeholder_binding)
            conditions.append(condition)
        if len(conditions) == 1:
            [condition] = conditions
        else:
            condition = FormulaBinding(scope,
                                       AndSig(),
                                       coerce(BooleanDomain()),
                                       syntax,
                                       ops=conditions)
        scope = SieveBinding(scope, condition, syntax)
        state.push_scope(scope)
        recipe = identify(scope)
        if recipe is None:
            raise Error("Cannot determine table identity")
        binding = state.use(recipe, syntax)
        labels = relabel(TableArc(self.table))
        if labels:
            label = labels[0]
            identifier = IdentifierSyntax(label.name)
            binding = AliasBinding(binding, identifier)
        state.pop_scope()
        state.pop_scope()
        binding = Select.__invoke__(binding, state)
        domain = ListDomain(binding.domain)
        binding = SegmentBinding(state.scope, binding, domain, syntax)
        profile = decorate(binding)
        binding = QueryBinding(state.scope, binding, profile, syntax)
        pipe = build_fetch(binding)
        profile = pipe.profile
        if not self.is_list:
            profile = profile.clone(domain=profile.domain.item_domain)
        return ResolveIdentityPipe(profile, pipe)


class ResolveChainPipe(object):

    def __init__(self, name, columns, domain, pipe):
        assert isinstance(columns, listof(ColumnEntity))
        self.name = name
        self.columns = columns
        self.pipe = pipe
        self.domain = domain

    def __call__(self, value):
        if value is None:
            return (None,)*len(self.columns)
        raw_values = []
        for leaf in self.domain.leaves:
            raw_value = value
            for idx in leaf:
                raw_value = raw_value[idx]
            raw_values.append(raw_value)
        product = self.pipe(raw_values)
        data = product.data
        if len(data) != 1:
            quote = None
            if self.name:
                quote = u"%s[%s]" % (self.name, self.domain.dump(value))
            else:
                quote = u"[%s]" % self.domain.dump(value)
            raise Error("Unable to resolve a link", quote)
        return data[0]


class BuildResolveChain(Utility):

    def __init__(self, arc):
        self.arc = arc
        self.joins = arc.joins

    def __call__(self):
        target_labels = relabel(TableArc(self.arc.target.table))
        target_name = target_labels[0].name if target_labels else None
        joins = self.joins
        state = BindingState()
        syntax = VoidSyntax()
        scope = RootBinding(syntax)
        state.set_root(scope)
        seed = state.use(FreeTableRecipe(joins[-1].target), syntax)
        recipe = identify(seed)
        if recipe is None:
            raise Error("Cannot determine identity of a link", target_name)
        identity = state.use(recipe, syntax, scope=seed)
        count = itertools.count()
        def make_images(identity):
            images = []
            for field in identity.elements:
                if isinstance(field.domain, IdentityDomain):
                    images.extend(make_images(field))
                else:
                    item = FormulaBinding(scope,
                                          PlaceholderSig(next(count)),
                                          field.domain,
                                          syntax)
                    images.append((item, field))
            return images
        images = make_images(identity)
        scope = LocateBinding(scope, seed, images, None, syntax)
        state.push_scope(scope)
        if len(joins) > 1:
            recipe = AttachedTableRecipe([join.reverse()
                                          for join in joins[:0:-1]])
            scope = state.use(recipe, syntax)
            state.push_scope(scope)
        elements = []
        for column in joins[0].target_columns:
            binding = state.use(ColumnRecipe(column), syntax)
            elements.append(binding)
        fields = [decorate(element) for element in elements]
        domain = RecordDomain(fields)
        scope = SelectionBinding(scope, elements, domain, syntax)
        binding = Select.__invoke__(scope, state)
        domain = ListDomain(binding.domain)
        binding = SegmentBinding(state.root, binding, domain, syntax)
        profile = decorate(binding)
        binding = QueryBinding(state.root, binding, profile, syntax)
        pipe =  build_fetch(binding)
        columns = joins[0].origin_columns[:]
        domain = identity.domain
        return ResolveChainPipe(target_name, columns, domain, pipe)


class ProduceInsert(Act):

    adapt(InsertCmd, ProduceAction)

    def __call__(self):
        with transaction() as connection:
            product = act(self.command.feed, self.action)
            extract_node = BuildExtractNode.__invoke__(product.meta)
            extract_table = BuildExtractTable.__invoke__(
                    extract_node.node, extract_node.arcs)
            execute_insert = BuildExecuteInsert.__invoke__(
                    extract_table.table, extract_table.columns)
            resolve_identity = BuildResolveIdentity.__invoke__(
                    execute_insert.table, execute_insert.output_columns,
                    extract_node.is_list)
            meta = resolve_identity.profile
            data = []
            if extract_node.is_list:
                records = product.data
                record_domain = product.meta.domain.item_domain
            else:
                records = [product.data]
                record_domain = product.meta.domain
            for idx, record in enumerate(records):
                if record is None:
                    continue
                try:
                    row = resolve_identity(
                            execute_insert(
                                extract_table(
                                    extract_node(record))))
                except Error, exc:
                    if extract_node.is_list:
                        message = "While inserting record #%s" % (idx+1)
                    else:
                        message = "While inserting a record"
                    quote = record_domain.dump(record)
                    exc.wrap(message, quote)
                    raise
                data.append(row)
            if not extract_node.is_list:
                assert len(data) <= 1
                if data:
                    data = data[0]
                else:
                    data = None
            return Product(meta, data)