Source

pyedpyper / rnd / initial / pyedpyper.py

Full commit
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
################################################################################
#   Copyright 2009, Dhananjay Nene, Pune, India
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
################################################################################

import csv
import Queue

"""
pyedpyper is intended to be a pipeline assembly library for building data 
processing pipelines. The goal is to build basic data pipelining constructs to 
support the following use cases :
a) Traditional EAI - files, databases, messaging
b) Straight through processing - Conditional logic triggering eg. Received 
Orders -> Internal Allocation -> Manufacturing Requisition -> Raw Materials 
Ordering
c) Upcoming Web 2.0 models - eg. Yahoo Pipes like atom feed processing, inter 
process / distributed and concurrent computation models eg. Actor models, nosql support
d) Assembly of business logic using functional programming constructs 

This code is more of a demonstrator / early release  of initial experiments in 
using coroutines and python to create an extremely capable yet compact DSL. 
While the DSL is rather rudimentary at this stage, (simply functions being 
assembled together as shown at the end), the intent is to have a more refined 
DSL allowing for maximum expressivity. 

THIS IS AN EARLY EXPERIMENTAL RELEASE
"""
 
def coroutine(func):
    """ 
    Coroutines decorator. 
    Coroutines : http://en.wikipedia.org/wiki/Coroutines
    Source : http://www.dabeaz.com/coroutines/Coroutines.pdf
    This is an excellent presentation for understanding coroutines which
    are used extensively in this design
    """
    def start(*args, **kwargs):
        cr = func(*args,**kwargs)
        cr.next()
        return cr
    return start

class coroutine_to_iterator(object):
    """
    This class adapts a coroutine to an iterator. It allows data coming in
    to a couroutine to be accessible through an iterator interface
    """
    def __init__(self,queue):
        self.queue = queue
    def __iter__(self):
        return self
    def next(self):
        return self.queue.get()

@coroutine        
def tracer(prefix = None,target = None):
    """
    Helpful debugging segment
    """
    while True :
        line = (yield)
        if prefix :
            print '%s=> %s' % (prefix,line)
        else :
            print line
        if target :
            target.send(line)

def url_opener(url,target,*args,**kwargs):
    """
    An endpoint to open data streams
    """
    args = args
    kwargs = kwargs
    type, path = url.strip().split(':',1)
    strm = None
    if type == 'file' :
        if path[0:2] == '//' :
            path = path[2:]
            strm = open(path, *args,**kwargs)
    if strm :
        for line in strm :
            target.send(line)

@coroutine
def csv_parser(target = None) :
    """
    Converts a set of csv lines into a list of values
    """
    
    queue = Queue.Queue()
    reader = csv.reader(coroutine_to_iterator(queue))
    while True :
        line = (yield)
        queue.put(line)
        lst = reader.next()
        if target :
            target.send(lst)

@coroutine
def fixed_width_splitter(widths, strip = True, target = None):
    """
    Converts fixed width formatted values into a list of values
    """
    while True :
        line = (yield)
        start = 0
        split = []
        for width in widths :
            val = line[start:start + width]
            if strip :
                val = val.strip()
            split.append(val)
            start = start + width
        if target :
            target.send(split)
            
@coroutine
def list_to_dict_converter(target = None, mapping = None):
    """
    Converts a list of items into dictionary
    """
    while True :
        line = (yield)
        vals = {}
        if not mapping :
            for i,val in enumerate(line) :
                vals[i] = val
        else :
            for key,val in mapping.items() :
                vals[val] = line[key]
            
class Generic(object):
    def __str__(self):
        attrs = (val for val in dir(self) if val[0] != '_' )
        attrvals = ("%s: %s" % (attr, self.__dict__[attr]) for attr in attrs if self.__dict__.has_key(attr) )
        return ",".join(attrvals)
    def __repr__(self):
        return self.__str__()
    def to_dict(self):
        attrs = (val for val in dir(self) if val[0] != '_' )
        dct = {}
        for attr in attrs :
            if dct.has_key(attr) :
                dct[attr] = self.__dict__[attr]
        return dct 

@coroutine
def list_to_object_converter(mapping = None,target = None):
    """
    Converts a list to object
    """
    while True :
        line = (yield)
        vals = Generic()
        if not mapping :
            for i,val in enumerate(line) :
                vals.__dict__[i] = val
        else :
            for key,val in mapping.items() :
                vals.__dict__[val] = line[key]
            
        if target :
            target.send(vals)

@coroutine
def filter(predicate, next = None):
    """
    Filters the stream on the predicate. Only items that make the predicate evaluate
    to true are passed on further. The rest are dropped. However note that this is 
    just a special case of the router where only the next_if and not the next_else
    are specified.
    
    TODO: Should this be retained ? Instinctive response is DROP. Only value is syn sugar
    """
    while True :
        line = (yield)
        if predicate(line) == True :
            if next :
                next.send(line)

@coroutine
def router(predicate, next_if = None, next_else = None):
    """
    routes to one of two branches based on the results of the predicate
    """
    while True :
        line = (yield)
        if predicate(line) == True :
            if next_if :
                next_if.send(line)
        else :
            if next_else :
                next_else.send(line)

@coroutine
def splitter(targets = []):
    """
    Parallely broadcasts across multiple streams
    """
    while True :
        line = (yield)
        for target in targets :
            target.send(line)

@coroutine
def batcher(predicate,target = None):
    """
    Accumulates data element and releases it in batches,
    each batch being identified by the predicate
    """
    batch = []
    try:
        while True :
            batch.append((yield))
            if predicate(batch) :
                if target :
                    target.send(batch)
                    batch = []
    except GeneratorExit:
        if target and len(batch) > 0:
            target.send(batch)

import simplejson
class GenericJsonEncoder(simplejson.JSONEncoder) :
    """
    Encoder to convert the object into a json stream
    """
    def default(self,obj):
        if isinstance(obj,Generic) :
            return obj.to_dict()
        else :
            return simplejson.JSONEncoder.default(obj)
                            

@coroutine
def json_writer(mapping = None, target = None):
    """
    Convert stream to a json stream
    """
    while True :
        line = (yield)
        json = None
        if not mapping :
            json = simplejson.dumps(line, cls=GenericJsonEncoder)
        if json and target :
            target.send(line)        

@coroutine
def mergepoint(handle,keyfunc,merge):
    """
    An intermediate point which acts as one of the multiple
    entry points leading into a merge. Handle identifies this
    mergepoint uniquely across multiple such points, whereas
    keyfunc computes the key for the data value
    """
    while True :
        line = (yield)
        key = keyfunc(line)    
        if merge :
            merge.send((handle,key,line))
        
@coroutine
def merge(handles,mergefunc,overwrite=True,next=None):
    """
    The actual merge point. This emits the merged data
    """
    vals ={}
    
    while True :
        handle,key,line = (yield)
        if handle in handles :
            # new key seen for the first time
            if key not in vals :
                vals[key] = {}
                vals[key][handle] = line
            # this is a repeat key and handle 
            # just replace /overwrite the value
            elif vals[key].has_key(handle) :
                if overwrite :
                    vals[key][handle] = line
            else :
                # key exists but handle doesnt
                vals[key][handle] = line
                if len(vals[key]) == len(handles) :
                    # values for all handles received
                    keydict = vals.pop(key)
                    val = mergefunc(keydict)
                    if next :
                        next.send(val)
 
                   
##############################################################################
# Cryptographic operations
##############################################################################
@coroutine
def digest(dtype='sha256', next = None):
    from Crypto.Hash import MD5
    from Crypto.Hash import SHA256
    while True :
        line = (yield)
        m = None
        if dtype == 'sha256' :
            m = SHA256.new()
        elif dtype == 'md5' :
            m = MD5.new()
        if m :
            m.update(line)
            val = m.hexdigest()
        if next :
            next.send(val)

@coroutine
def encrypt(key, type = 'aes', next = None):
    from Crypto.Cipher import AES
    padding = 'XXXXXXXXXXXXXXXX'
    while True :
        line = (yield)
        c = None
        if type == 'aes' :
            c = AES.new(key,AES.MODE_ECB)
        
        if c :
            pad = len(line) % 16
            if pad > 0 :
                pad = 16 - pad
            line = line + padding[0:pad]
            val = c.encrypt(line)
            next.send(val)
##############################################################################
# Sample Usage
# NOTE: The usage constructs / use case is an entirely arbitrary sequence of
# operations
##############################################################################


# Declare a eventual merge        
mergepype = merge(
                ['contacts','fixedwidth'],
                lambda dct : ":".join([dct['contacts'].email,dct['fixedwidth'][2]]),
                True,
                splitter([
                    digest(
                         next = tracer('Digest')
                    ),
                    encrypt(
                        'thisisasecretkey',
                        next = tracer('Encrypted')
                    ),
                    tracer('PostJoin')
                ])
            )            
# Open file
url_opener(
    "file://contacts.csv",
    # pass to csv parser
    csv_parser(
        # convert to object with given attributes
        list_to_object_converter(
            {0: 'name', 2:'email'},
            # split the data into multiple stream
            splitter([
                # conditionally route the data depending upon predicate evaluation
                router(
                    lambda x : x.email[-3:] == 'com',
                    # if email ends with .com
                    splitter([
                        # Write to json
                        json_writer(target = tracer('json')),
                        # Also trace 
                        tracer('Second')
                    ]),
                    # if email doesn't end with .com
                    batcher(
                        # create batches of two
                        lambda x : len(x)>= 2,
                        # further trace as batched
                        tracer('Batched')
                    )
                ),
                # Also send data to a mergepoint stream
                mergepoint(
                    # Declare this data stream of key contacts
                    'contacts',
                    # provide function extract the real key from the data
                    lambda x : x.email,
                    # specify the exact point where this will merge
                    mergepype
                )
            ])
        )
    )
)

# Open another fixed width file
url_opener(
    "file://userinfo.fxw",
    # split it into fields of 20, 20 and 40 characters
    fixed_width_splitter(
        [20,20,40],
        True,
        # send to mergepoint
        mergepoint(
            # this mergepoint handle is 'fixedwidth'
            'fixedwidth',
            # Create a new data based on the expression below
            lambda x : x[0] + '@' + x[1],
            # Merge it into the earlier stream (at mergepype)
            mergepype
        )
    )
)