Source

pyedpyper / rnd / initial / pyedpyper.py

Full commit
################################################################################
#   Copyright 2009, Dhananjay Nene, Pune, India
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
################################################################################

import csv
import Queue

"""
pyedpyper is intended to be a pipeline assembly library for building data 
processing pipelines. The goal is to build basic data pipelining constructs to 
support the following use cases :
a) Traditional EAI - files, databases, messaging
b) Straight through processing - Conditional logic triggering eg. Received 
Orders -> Internal Allocation -> Manufacturing Requisition -> Raw Materials 
Ordering
c) Upcoming Web 2.0 models - eg. Yahoo Pipes like atom feed processing, inter 
process / distributed and concurrent computation models eg. Actor models, nosql support
d) Assembly of business logic using functional programming constructs 

This code is more of a demonstrator / early release  of initial experiments in 
using coroutines and python to create an extremely capable yet compact DSL. 
While the DSL is rather rudimentary at this stage, (simply functions being 
assembled together as shown at the end), the intent is to have a more refined 
DSL allowing for maximum expressivity. 

THIS IS AN EARLY EXPERIMENTAL RELEASE
"""
 
def coroutine(func):
    """ 
    Coroutines decorator. 
    Coroutines : http://en.wikipedia.org/wiki/Coroutines
    Source : http://www.dabeaz.com/coroutines/Coroutines.pdf
    This is an excellent presentation for understanding coroutines which
    are used extensively in this design
    """
    def start(*args, **kwargs):
        cr = func(*args,**kwargs)
        cr.next()
        return cr
    return start

class coroutine_to_iterator(object):
    """
    This class adapts a coroutine to an iterator. It allows data coming in
    to a couroutine to be accessible through an iterator interface
    """
    def __init__(self,queue):
        self.queue = queue
    def __iter__(self):
        return self
    def next(self):
        return self.queue.get()

@coroutine        
def tracer(prefix = None,target = None):
    """
    Helpful debugging segment
    """
    while True :
        line = (yield)
        if prefix :
            print '%s=> %s' % (prefix,line)
        else :
            print line
        if target :
            target.send(line)

def url_opener(url,target,*args,**kwargs):
    """
    An endpoint to open data streams
    """
    args = args
    kwargs = kwargs
    type, path = url.strip().split(':',1)
    strm = None
    if type == 'file' :
        if path[0:2] == '//' :
            path = path[2:]
            strm = open(path, *args,**kwargs)
    if strm :
        for line in strm :
            target.send(line)

@coroutine
def csv_parser(target = None) :
    """
    Converts a set of csv lines into a list of values
    """
    
    queue = Queue.Queue()
    reader = csv.reader(coroutine_to_iterator(queue))
    while True :
        line = (yield)
        queue.put(line)
        lst = reader.next()
        if target :
            target.send(lst)

@coroutine
def fixed_width_splitter(widths, strip = True, target = None):
    """
    Converts fixed width formatted values into a list of values
    """
    while True :
        line = (yield)
        start = 0
        split = []
        for width in widths :
            val = line[start:start + width]
            if strip :
                val = val.strip()
            split.append(val)
            start = start + width
        if target :
            target.send(split)
            
@coroutine
def list_to_dict_converter(target = None, mapping = None):
    """
    Converts a list of items into dictionary
    """
    while True :
        line = (yield)
        vals = {}
        if not mapping :
            for i,val in enumerate(line) :
                vals[i] = val
        else :
            for key,val in mapping.items() :
                vals[val] = line[key]
            
class Generic(object):
    def __str__(self):
        attrs = (val for val in dir(self) if val[0] != '_' )
        attrvals = ("%s: %s" % (attr, self.__dict__[attr]) for attr in attrs if self.__dict__.has_key(attr) )
        return ",".join(attrvals)
    def __repr__(self):
        return self.__str__()
    def to_dict(self):
        attrs = (val for val in dir(self) if val[0] != '_' )
        dct = {}
        for attr in attrs :
            if dct.has_key(attr) :
                dct[attr] = self.__dict__[attr]
        return dct 

@coroutine
def list_to_object_converter(mapping = None,target = None):
    """
    Converts a list to object
    """
    while True :
        line = (yield)
        vals = Generic()
        if not mapping :
            for i,val in enumerate(line) :
                vals.__dict__[i] = val
        else :
            for key,val in mapping.items() :
                vals.__dict__[val] = line[key]
            
        if target :
            target.send(vals)

@coroutine
def router(predicate, next_if = None, next_else = None):
    """
    routes to one of two branches based on the results of the predicate
    """
    while True :
        line = (yield)
        if predicate(line) == True :
            if next_if :
                next_if.send(line)
        else :
            if next_else :
                next_else.send(line)

@coroutine
def splitter(targets = []):
    """
    Parallely broadcasts across multiple streams
    """
    while True :
        line = (yield)
        for target in targets :
            target.send(line)

@coroutine
def batcher(predicate,target = None):
    """
    Accumulates data element and releases it in batches,
    each batch being identified by the predicate
    """
    batch = []
    try:
        while True :
            batch.append((yield))
            if predicate(batch) :
                if target :
                    target.send(batch)
                    batch = []
    except GeneratorExit:
        if target and len(batch) > 0:
            target.send(batch)

import simplejson
class GenericJsonEncoder(simplejson.JSONEncoder) :
    """
    Encoder to convert the object into a json stream
    """
    def default(self,obj):
        if isinstance(obj,Generic) :
            return obj.to_dict()
        else :
            return simplejson.JSONEncoder.default(obj)
                            

@coroutine
def json_writer(mapping = None, target = None):
    """
    Convert stream to a json stream
    """
    while True :
        line = (yield)
        json = None
        if not mapping :
            json = simplejson.dumps(line, cls=GenericJsonEncoder)
        if json and target :
            target.send(line)        

@coroutine
def mergepoint(handle,keyfunc,merge):
    """
    An intermediate point which acts as one of the multiple
    entry points leading into a merge. Handle identifies this
    mergepoint uniquely across multiple such points, whereas
    keyfunc computes the key for the data value
    """
    while True :
        line = (yield)
        key = keyfunc(line)    
        if merge :
            merge.send((handle,key,line))
        
@coroutine
def merge(handles,mergefunc,overwrite=True,next=None):
    """
    The actual merge point. This emits the merged data
    """
    vals ={}
    
    while True :
        handle,key,line = (yield)
        if handle in handles :
            # new key seen for the first time
            if key not in vals :
                vals[key] = {}
                vals[key][handle] = line
            # this is a repeat key and handle 
            # just replace /overwrite the value
            elif vals[key].has_key(handle) :
                if overwrite :
                    vals[key][handle] = line
            else :
                # key exists but handle doesnt
                vals[key][handle] = line
                if len(vals[key]) == len(handles) :
                    # values for all handles received
                    keydict = vals.pop(key)
                    val = mergefunc(keydict)
                    if next :
                        next.send(val)
                    
##############################################################################
# Sample Usage
# NOTE: The usage constructs / use case is an entirely arbitrary sequence of
# operations
##############################################################################


# Declare a eventual merge        
mergepype = merge(
                ['contacts','fixedwidth'],
                lambda dct : ":".join([dct['contacts'].email,dct['fixedwidth'][2]]),
                True,
                tracer('PostJoin')
            )            
# Open file
url_opener(
    "file://contacts.csv",
    # pass to csv parser
    csv_parser(
        # convert to object with given attributes
        list_to_object_converter(
            {0: 'name', 2:'email'},
            # split the data into multiple stream
            splitter([
                # conditionally route the data depending upon predicate evaluation
                router(
                    lambda x : x.email[-3:] == 'com',
                    # if email ends with .com
                    splitter([
                        # Write to json
                        json_writer(target = tracer('json')),
                        # Also trace 
                        tracer('Second')
                    ]),
                    # if email doesn't end with .com
                    batcher(
                        # create batches of two
                        lambda x : len(x)>= 2,
                        # further trace as batched
                        tracer('Batched')
                    )
                ),
                # Also send data to a mergepoint stream
                mergepoint(
                    # Declare this data stream of key contacts
                    'contacts',
                    # provide function extract the real key from the data
                    lambda x : x.email,
                    # specify the exact point where this will merge
                    mergepype
                )
            ])
        )
    )
)

# Open another fixed width file
url_opener(
    "file://userinfo.fxw",
    # split it into fields of 20, 20 and 40 characters
    fixed_width_splitter(
        [20,20,40],
        True,
        # send to mergepoint
        mergepoint(
            # this mergepoint handle is 'fixedwidth'
            'fixedwidth',
            # Create a new data based on the expression below
            lambda x : x[0] + '@' + x[1],
            # Merge it into the earlier stream (at mergepype)
            mergepype
        )
    )
)