Source

Socrates / src / socrates / euclid / stacklesseuclid.py

#!/usr/local/bin/spython
# -*- coding:utf-8 -*-

"""
内部数据引擎,建议不要直接访问,所有的访问都从内外通道获取。
"""

import stackless
#import logging

def taskly(foo):
    """
将函数封装为异步 tasklet 的修饰器,该修饰器不返回结果。
如需返回结果使用下面的 taskeletlycb。
    """
    def re(*args, **argv):
        stackless.tasklet(foo)(*args, **argv).run()
    return re

def tasklycb(foo):
    """
将函数封装为异步 tasklet 的修饰器,该修饰器修饰后的函数
参数接受一个 stackless.channel (关键字参数 channel = channel),
将foo的返回值传入其中。
    """
    def re(*args, **argv):
        channel=argv["channel"]
        _argv=dict((k,v) for k in argv if k != 'channel')
        channel.send(foo(argv["channel"]*args, **_argv))
    return re

class channelit(object):
    """
    缓冲化操作类,用于封装常用操作,避免其灌入过于频繁导致内存不足。
    """
    def __init__(self, size):
        self.size = size
        self.channel = stackless.channel()
    def __call__(self, foo):
        def re(*args, **argv):
            while self.channel.balance > self.size:
                stackless.schedule(10)
            else:
                self.channel.send((args, argv))
        stackless.tasklet(self.run)(foo).run()
        return re
    def run(self, foo):
        while True:
            args, argv = self.channel.receive()
            foo(*args, **argv)
            stackless.schedule()

class channelitcb(object):
    """
    缓冲化操作类,带返回值。用于封装常用操作,避免其灌入过于频繁导致内存不足。
    """
    def __init__(self, size):
        self.size = size
        self.channel = stackless.channel()
    def __call__(self, foo):
        def re(*args, **argv):
            while self.channel.balance > self.size:
                stackless.schedule(10)
            else:
                self.channel.send((args, argv))
        stackless.tasklet(self.run)(foo).run()
        return re
    def run(self, foo):
        while True:
            args, argv = self.channel.receive()
            _argv=dict((k,v) for k, v in argv.iteritems() if k != "channel")
            channel=argv['channel']
            channel.send(foo(*args, **_argv))
            stackless.schedule()

class Euclid(object):
    """
   这是一个退化的socrates plato 模型,尽可能针对云查杀
的需求优化,不追求通用。
    """
    def __init__(self, indexes=None):
        """ DataPool([indexes={"primary1","primary2", ...}])
创建数据池对象,设置其索引。 
目前版本只支持唯一索引。即每个index key都只对应一个subject。
        """
        self.channel = stackless.channel()
        self.__data__=[]
        if indexes != None:
            for index in indexes:
                setattr(self, index, {})
        self.__indexes__ = set(indexes)
        self.tasklet=stackless.tasklet(self.run)()

    def addIndexes(self, *indexes):
        """
添加新的索引谓词
        """
        for index in indexes:
            setattr(self, index, {})
        self.__indexes__.update(indexes)
        for subj in self.__data__:
            for index in indexes:
                if index in subj:
                    getattr(self, index)[subj[index]]=subj

    #asyAddIndexes = taskly(Euclid.addIndexes)
    @taskly
    def asyAddIndexes(self, *indexes):
        return self.addIndexes(*indexes)

    def run(self):
        """ 
简单粗暴版本,直接把传进来的操作run到数据池上,阿门……
        """
        while True:
            tasklet = self.channel.receive()
            tasklet(self).run()
            stackless.schedule()

    def writeSubjects(self, *subjects):
        """
        用于写入条目,每个subject是一个字典,如果在数据池中有同key的
     字典对象,就覆盖KV,否则保存新的subject
        """
        for subject in subjects:
            data = self.matchSubject(subject)
            if data is None:
                self.__new_subject__(subject)
            else:
                data.update(subject)

    @taskly
    def writeSubjectsTaskly(self, *subjects):
        sef.writeSubjects(*subjects)
                
    def __new_subject__(self, subject):
        """
        插入一个新的subject。
        """
        #logging.info("new subject %s", subject)

        self.__data__.append(subject)
        for predicate in self.__indexes__:
            if predicate in subject:
                obj = subject[predicate]
                getattr(self, predicate)[obj]=subject

    def matchSubject(self, subject):
        """
给出一个subject,查询是否已经储存了同义条目,即关键字一致的条目。
目前,只要有一个key相等,就可以返回这个subject。如果匹配条目中
没有key,则进入慢速的遍历查询。如果都没有匹配到,则返回None。
        """
        for predicate in self.__indexes__:
            if predicate in subject:
                obj = subject[predicate]
                data = getattr(self, predicate, None)
                if obj in data:
                    return data[obj]
        else:
            for subj in self.__data__:
                for key in subject:
                    if key == subj[key]:
                        return subj
            else:
                return None

    def doFunc(self, predicate, subject, foo):
        """
将函数作用于匹配的子句,返回结果,要求foo能够接受 None
值,返回默认值。
        """
        subject = self.matchSubject(subject)
        if subject != None:
            if predicate in subject:
                subject[predicate]=foo(subject[predicate])
            else:
                subject[predicate] = foo(None)
            return subject[predicate]
        else:
            return None

    @channelitcb(100000)
    def asyDoFunc(self, predicate, subject, foo):
        return self.doFunc(predicate, subject, foo)