Source

pyedpyper / rnd / initial / pyedpyper.py

################################################################################
#   Copyright 2009, Dhananjay Nene, Pune, India
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
################################################################################

import csv
import Queue

"""
pyedpyper is intended to be a pipeline assembly library for building data 
processing pipelines. The goal is to build basic data pipelining constructs to 
support the following use cases :
a) Traditional EAI - files, databases, messaging
b) Straight through processing - Conditional logic triggering eg. Received 
Orders -> Internal Allocation -> Manufacturing Requisition -> Raw Materials 
Ordering
c) Upcoming Web 2.0 models - eg. Yahoo Pipes like atom feed processing, inter 
process / distributed and concurrent computation models eg. Actor models, nosql support
d) Assembly of business logic using functional programming constructs 

This code is more of a demonstrator / early release  of initial experiments in 
using coroutines and python to create an extremely capable yet compact DSL. 
While the DSL is rather rudimentary at this stage, (simply functions being 
assembled together as shown at the end), the intent is to have a more refined 
DSL allowing for maximum expressivity. 

THIS IS AN EARLY EXPERIMENTAL RELEASE
"""
 
def coroutine(func):
    """ 
    Coroutines decorator. 
    Coroutines : http://en.wikipedia.org/wiki/Coroutines
    Source : http://www.dabeaz.com/coroutines/Coroutines.pdf
    This is an excellent presentation for understanding coroutines which
    are used extensively in this design
    """
    def start(*args, **kwargs):
        cr = func(*args,**kwargs)
        cr.next()
        return cr
    return start

class coroutine_to_iterator(object):
    """
    This class converts a coroutine to an iterator. It allows data coming in
    to a couroutine to be accessible through an iterator interface
    """
    def __init__(self,queue):
        self.queue = queue
    def __iter__(self):
        return self
    def next(self):
        return self.queue.get()

@coroutine        
def tracer(prefix = None,target = None):
    """
    Helpful debugging segment
    """
    while True :
        line = (yield)
        if prefix :
            print '%s=> %s' % (prefix,line)
        else :
            print line
        if target :
            target.send(line)

def url_opener(url,target,*args,**kwargs):
    """
    An endpoint to open data streams
    """
    args = args
    kwargs = kwargs
    type, path = url.strip().split(':',1)
    strm = None
    if type == 'file' :
        if path[0:2] == '//' :
            path = path[2:]
            strm = open(path, *args,**kwargs)
    if strm :
        for line in strm :
            target.send(line)

@coroutine
def csv_parser(target = None) :
    """
    Converts each incoming csv line into an array of values
    """
    
    queue = Queue.Queue()
    reader = csv.reader(coroutine_to_iterator(queue))
    while True :
        line = (yield)
        queue.put(line)
        lst = reader.next()
        if target :
            target.send(lst)

@coroutine
def fixed_width_splitter(widths, strip = True, target = None):
    while True :
        line = (yield)
        start = 0
        split = []
        for width in widths :
            val = line[start:start + width]
            if strip :
                val = val.strip()
            split.append(val)
            start = start + width
        if target :
            target.send(split)
            
@coroutine
def list_to_dict_converter(target = None, mapping = None):
    while True :
        line = (yield)
        vals = {}
        if not mapping :
            for i,val in enumerate(line) :
                vals[i] = val
        else :
            for key,val in mapping.items() :
                vals[val] = line[key]
            
class Generic(object):
    def __str__(self):
        attrs = (val for val in dir(self) if val[0] != '_' )
        attrvals = ("%s: %s" % (attr, self.__dict__[attr]) for attr in attrs if self.__dict__.has_key(attr) )
        return ",".join(attrvals)
    def __repr__(self):
        return self.__str__()
    def to_dict(self):
        attrs = (val for val in dir(self) if val[0] != '_' )
        dct = {}
        for attr in attrs :
            if dct.has_key(attr) :
                dct[attr] = self.__dict__[attr]
        return dct 

@coroutine
def list_to_object_converter(mapping = None,target = None):
    while True :
        line = (yield)
        vals = Generic()
        if not mapping :
            for i,val in enumerate(line) :
                vals.__dict__[i] = val
        else :
            for key,val in mapping.items() :
                vals.__dict__[val] = line[key]
            
        if target :
            target.send(vals)

@coroutine
def router(predicate, next_if = None, next_else = None):
    while True :
        line = (yield)
        if predicate(line) == True :
            if next_if :
                next_if.send(line)
        else :
            if next_else :
                next_else.send(line)

@coroutine
def splitter(targets = []):
    while True :
        line = (yield)
        for target in targets :
            target.send(line)

@coroutine
def batcher(predicate,target = None):
    batch = []
    try:
        while True :
            batch.append((yield))
            if predicate(batch) :
                if target :
                    target.send(batch)
                    batch = []
    except GeneratorExit:
        if target and len(batch) > 0:
            target.send(batch)

import simplejson
class GenericJsonEncoder(simplejson.JSONEncoder) :
    def default(self,obj):
        if isinstance(obj,Generic) :
            return obj.to_dict()
        else :
            return simplejson.JSONEncoder.default(obj)
                            

@coroutine
def json_writer(mapping = None, target = None):
    while True :
        line = (yield)
        json = None
        if not mapping :
            json = simplejson.dumps(line, cls=GenericJsonEncoder)
        if json and target :
            target.send(line)        

@coroutine
def joinpoint(handle,keyfunc,join):
    while True :
        line = (yield)
        key = keyfunc(line)    
        if join :
            join.send((handle,key,line))
        
@coroutine
def join(handles,joinfunc,overwrite=True,next=None):
    vals ={}
    
    while True :
        handle,key,line = (yield)
        if handle in handles :
            # new key seen for the first time
            if key not in vals :
                vals[key] = {}
                vals[key][handle] = line
            # this is a repeat key and handle 
            # just replace /overwrite the value
            elif vals[key].has_key(handle) :
                if overwrite :
                    vals[key][handle] = line
            else :
                # key exists but handle doesnt
                vals[key][handle] = line
                if len(vals[key]) == len(handles) :
                    # values for all handles received
                    keydict = vals.pop(key)
                    val = joinfunc(keydict)
                    if next :
                        next.send(val)
                    
##############################################################################
# Sample Usage
# NOTE: The usage constructs / use case is an entirely arbitrary sequence of
# operations
##############################################################################


# Declare a eventual join        
joinpype = join(
                ['contacts','fixedwidth'],
                lambda dct : ":".join([dct['contacts'].email,dct['fixedwidth'][2]]),
                True,
                tracer('PostJoin')
            )            
# Open file
url_opener(
    "file://contacts.csv",
    # pass to csv parser
    csv_parser(
        # convert to object with given attributes
        list_to_object_converter(
            {0: 'name', 2:'email'},
            # split the data into multiple stream
            splitter([
                # conditionally route the data depending upon predicate evaluation
                router(
                    lambda x : x.email[-3:] == 'com',
                    # if email ends with .com
                    splitter([
                        # Write to json
                        json_writer(target = tracer('json')),
                        # Also trace 
                        tracer('Second')
                    ]),
                    # if email doesn't end with .com
                    batcher(
                        # create batches of two
                        lambda x : len(x)>= 2,
                        # further trace as batched
                        tracer('Batched')
                    )
                ),
                # Also send data to a joinpoint stream
                joinpoint(
                    # Declare this data stream of key contacts
                    'contacts',
                    # provide function extract the real key from the data
                    lambda x : x.email,
                    # specify the exact point where this will join
                    joinpype
                )
            ])
        )
    )
)

# Open another fixed width file
url_opener(
    "file://userinfo.fxw",
    # split it into fields of 20, 20 and 40 characters
    fixed_width_splitter(
        [20,20,40],
        True,
        # send to joinpoint
        joinpoint(
            # this joinpoint handle is 'fixedwidth'
            'fixedwidth',
            # Create a new data based on the expression below
            lambda x : x[0] + '@' + x[1],
            # Merge it into the earlier stream (at joinpype)
            joinpype
        )
    )
)