Commits

March Liu committed 62aecb9

add euclid

Comments (0)

Files changed (4)

       url='http://bitbucket.org/March/socrates/',
       packages=['socrates',
                 'socrates.parser',
-                'socrates.parser.prdparser'],
+                'socrates.parser.prdparser',
+                'socrates.euclid'],
       package_dir={"":"src"},
       requires=['sqlalchemy(==0.5.6)'])
 

src/socrates/euclid/__init__.py

Empty file added.

src/socrates/euclid/stacklesseuclid.py

+#!/usr/local/bin/spython
+# -*- coding:utf-8 -*-
+
+"""
+内部数据引擎,建议不要直接访问,所有的访问都从内外通道获取。
+"""
+
+import stackless
+#import logging
+
+def taskly(foo):
+    """
+将函数封装为异步 tasklet 的修饰器,该修饰器不返回结果。
+如需返回结果使用下面的 taskeletlycb。
+    """
+    def re(*args, **argv):
+        stackless.tasklet(foo)(*args, **argv).run()
+    return re
+
+def tasklycb(foo):
+    """
+将函数封装为异步 tasklet 的修饰器,该修饰器修饰后的函数
+参数接受一个 stackless.channel (关键字参数 channel = channel),
+将foo的返回值传入其中。
+    """
+    def re(*args, **argv):
+        channel=argv["channel"]
+        _argv=dict((k,v) for k in argv if k != 'channel')
+        channel.send(foo(argv["channel"]*args, **_argv))
+    return re
+
+class channelit(object):
+    """
+    缓冲化操作类,用于封装常用操作,避免其灌入过于频繁导致内存不足。
+    """
+    def __init__(self, size):
+        self.size = size
+        self.channel = stackless.channel()
+    def __call__(self, foo):
+        def re(*args, **argv):
+            while self.channel.balance > self.size:
+                stackless.schedule(10)
+            else:
+                self.channel.send((args, argv))
+        stackless.tasklet(self.run)(foo).run()
+        return re
+    def run(self, foo):
+        while True:
+            args, argv = self.channel.receive()
+            foo(*args, **argv)
+            stackless.schedule()
+
+class channelitcb(object):
+    """
+    缓冲化操作类,带返回值。用于封装常用操作,避免其灌入过于频繁导致内存不足。
+    """
+    def __init__(self, size):
+        self.size = size
+        self.channel = stackless.channel()
+    def __call__(self, foo):
+        def re(*args, **argv):
+            while self.channel.balance > self.size:
+                stackless.schedule(10)
+            else:
+                self.channel.send((args, argv))
+        stackless.tasklet(self.run)(foo).run()
+        return re
+    def run(self, foo):
+        while True:
+            args, argv = self.channel.receive()
+            _argv=dict((k,v) for k, v in argv.iteritems() if k != "channel")
+            channel=argv['channel']
+            channel.send(foo(*args, **_argv))
+            stackless.schedule()
+
+class Euclid(object):
+    """
+   这是一个退化的socrates plato 模型,尽可能针对云查杀
+的需求优化,不追求通用。
+    """
+    def __init__(self, indexes=None):
+        """ DataPool([indexes={"primary1","primary2", ...}])
+创建数据池对象,设置其索引。 
+目前版本只支持唯一索引。即每个index key都只对应一个subject。
+        """
+        self.channel = stackless.channel()
+        self.__data__=[]
+        if indexes != None:
+            for index in indexes:
+                setattr(self, index, {})
+        self.__indexes__ = set(indexes)
+        self.tasklet=stackless.tasklet(self.run)()
+
+    def addIndexes(self, *indexes):
+        """
+添加新的索引谓词
+        """
+        for index in indexes:
+            setattr(self, index, {})
+        self.__indexes__.update(indexes)
+        for subj in self.__data__:
+            for index in indexes:
+                if index in subj:
+                    getattr(self, index)[subj[index]]=subj
+
+    #asyAddIndexes = taskly(Euclid.addIndexes)
+    @taskly
+    def asyAddIndexes(self, *indexes):
+        return self.addIndexes(*indexes)
+
+    def run(self):
+        """ 
+简单粗暴版本,直接把传进来的操作run到数据池上,阿门……
+        """
+        while True:
+            tasklet = self.channel.receive()
+            tasklet(self).run()
+            stackless.schedule()
+
+    def writeSubjects(self, *subjects):
+        """
+        用于写入条目,每个subject是一个字典,如果在数据池中有同key的
+     字典对象,就覆盖KV,否则保存新的subject
+        """
+        for subject in subjects:
+            data = self.matchSubject(subject)
+            if data is None:
+                self.__new_subject__(subject)
+            else:
+                data.update(subject)
+
+    @taskly
+    def writeSubjectsTaskly(self, *subjects):
+        sef.writeSubjects(*subjects)
+                
+    def __new_subject__(self, subject):
+        """
+        插入一个新的subject。
+        """
+        #logging.info("new subject %s", subject)
+
+        self.__data__.append(subject)
+        for predicate in self.__indexes__:
+            if predicate in subject:
+                obj = subject[predicate]
+                getattr(self, predicate)[obj]=subject
+
+    def matchSubject(self, subject):
+        """
+给出一个subject,查询是否已经储存了同义条目,即关键字一致的条目。
+目前,只要有一个key相等,就可以返回这个subject。如果匹配条目中
+没有key,则进入慢速的遍历查询。如果都没有匹配到,则返回None。
+        """
+        for predicate in self.__indexes__:
+            if predicate in subject:
+                obj = subject[predicate]
+                data = getattr(self, predicate, None)
+                if obj in data:
+                    return data[obj]
+        else:
+            for subj in self.__data__:
+                for key in subject:
+                    if key == subj[key]:
+                        return subj
+            else:
+                return None
+
+    def doFunc(self, predicate, subject, foo):
+        """
+将函数作用于匹配的子句,返回结果,要求foo能够接受 None
+值,返回默认值。
+        """
+        subject = self.matchSubject(subject)
+        if subject != None:
+            if predicate in subject:
+                subject[predicate]=foo(subject[predicate])
+            else:
+                subject[predicate] = foo(None)
+            return subject[predicate]
+        else:
+            return None
+
+    @channelitcb(100000)
+    def asyDoFunc(self, predicate, subject, foo):
+        return self.doFunc(predicate, subject, foo)

test/euclid/funtest.py

+#!/usr/local/bin/spython
+# -*- coding:utf-8 -*-
+
+import timeit
+import stackless
+import pickle
+import socrates.euclid.stacklesseuclid as engine
+import random
+import hashlib
+import uuid
+import logging
+import datetime
+
+"""
+欧几里得引擎功能测试
+目前版本仅支持stackless版本。
+"""
+
+filemd5s = []
+lession_channel = stackless.channel()
+__count__=0
+
+euclid = engine.Euclid("filemd5")
+#inc = lambda x:x+1
+
+def inc(obj):
+    if obj == None:
+        return 1
+    else:
+        return obj+1
+
+def testinc(md5, pool):
+    pool.asyDoFunc("width", {"filemd5":md5}, inc, channel=lession_channel)
+
+def lession():
+    global __count__
+    while True:
+        lession_channel.receive()
+        __count__+=1
+        stackless.schedule()
+
+def dotestinc(pool, channel):
+    while True:
+        md5 = channel.receive()
+        doinc = stackless.tasklet(testinc(md5))
+        data = pickle.dumps(doinc)
+        pool.channel.send(data)
+        stackless.schedule()
+
+def init(pool):
+    """
+  数据初始化
+    """
+    for simulate_md5 in filemd5s:
+        subject = dict(filemd5=simulate_md5,)
+        pool.writeSubjects(subject)
+
+def newSubject(pool, filemd5):
+    subject = dict(filemd5=filemd5)
+    pool.writeSubjectsTaskly(subject)
+
+if __name__=="__main__":
+    import sys
+
+    howlong=int(sys.argv[1])
+    #stackless.tasklet(init)(engine.datapool).run()
+    #init(engine.datapool)
+
+    channel = stackless.channel()
+    stackless.tasklet(dotestinc)(euclid, channel).run()
+    stackless.tasklet(lession)().run()
+    #先初始化数据
+    for line in sys.stdin:
+        md5=line.strip()
+        #newSubject(engine.datapool, md5)
+        filemd5s.append(md5)
+        euclid.writeSubjects({"filemd5":md5})
+
+    #计时执行,统计时段内的访问次数
+    start = datetime.datetime.now()
+    print "%s 计时开始……"%start.strftime("%H:%M:%S")
+    while (datetime.datetime.now() - start).seconds < howlong:
+        testinc(random.choice(filemd5s), euclid)
+    end = datetime.datetime.now()
+    print "%s 停止发送请求"%end.strftime("%H:%M:%S")
+    while lession_channel.balance > 0:
+        print "等待 %s 项队列任务完成……"%lession_channel.balance
+        stackless.schedule(100)
+    #输出结果
+    print datetime.datetime.now().strftime("%H:%M:%S")
+    print "在 %s 条记录上进行了 %s 秒满负载访问模拟"%(len(filemd5s), howlong)
+    print "完成 %s 次统计访问操作并得到了结果"%__count__
+    print "约合每秒 %s 次请求"%((__count__)*1.0/howlong)