Source

wxPython / sandbox / test_delayedResult.py


import wx
import delayedresult as dr


def testStruct():
    ss=dr.Struct(a='a', b='b')
    assert ss.a == 'a'
    assert ss.b == 'b'


def testHandler():
    def handler(b, d, a=None, c=None):
        assert a=='a'
        assert b=='b'
        assert c=='c'
        assert d==1
    hh=dr.Handler(handler, 1, a='a')
    hh('b', c='c')
    
    def handler2(*args, **kwargs):
        assert args[0] == 3
        assert args[1] == 1
        assert kwargs['a'] == 'a'
        assert kwargs['b'] == 'b'
    hh2 = dr.Handler(handler2, 1, a='a')
    args = ()
    hh3 = dr.Handler(hh2, b='b', *args)
    hh3(3)

    
def testSender():
    triplet = (1,'a',2.34)
    b = dr.Struct(called=False, which=1)
    assert not b.called
    def consumer(result, a, b=None):
        assert result.get() == 789 + b.which
        assert result.getJobID() == 456 
        assert a == 'a'
        b.called = True
    handler = dr.Handler(consumer, 'a', **dict(b=b))
    ss = dr.SenderNoWx( handler, jobID=456 )
    ss.sendResult(789+1)
    assert b.called 


def testSendExcept():
    def consumer(result):
        try:
            result.get()
            raise RuntimeError('should have raised!')
        except AssertionError:
            pass
    ss = dr.SenderNoWx( dr.Handler(consumer) )
    ss.sendException( AssertionError('test') )
    
    
def testThread():
    expect = dr.Struct(value=1)
    def consumer(result):
        assert result.getJobID() is None
        assert result.get() == expect.value
        expect.value += 2
    ss = dr.SenderNoWx( dr.Handler(consumer) )
    import time
    def worker(sender=None):
        sender.sendResult(1)
        time.sleep(0.1)
        sender.sendResult(3)
        time.sleep(0.1)
        sender.sendResult(5)
        time.sleep(0.1)
        return 7
    tt = dr.Producer(ss, worker, senderArg='sender')
    tt.start()
    while expect.value < 7:
        time.sleep(0.1)
        print '.'


def testStartWorker():
    print 'Doing worker thread with call-after'
    import time
    def handleButtonClick():
        produce = [123, 456, 789, 012, 345, 678, 901]
        expect = dr.Struct(idx=0)
        def worker(a, b=None, sender=None):
            assert a == 2
            assert b == 'b'
            for val in produce:
                time.sleep(0.5)
                sender.sendResult(val)
        def consumer(result, b, a=None):
            assert b == 1
            assert a=='a'
            result = result.get()
            print 'got result', result
            if expect.idx < len(produce):
                assert result == produce[ expect.idx ]#, 'Expected %s, got %s' % (
            else:
                assert result is None
                app.ExitMainLoop()
            expect.idx += 1
        
        dr.startWorker(consumer, worker, cargs=(1,), ckwargs={'a':'a'}, 
            wargs=(2,), wkwargs={'b':'b'}, senderArg='sender')

    app = wx.PySimpleApp()
    frame = wx.Frame(None) # need this otherwise MainLoop() returns immediately
    # pretend user has clicked: 
    import thread
    thread.start_new_thread( wx.CallAfter, (handleButtonClick,))
    app.MainLoop()


def testStartWorkerEvent():
    print 'Doing same with events'
    import time
    produce = [123, 456, 789, 012, 345, 678, 901]
    expect = dr.Struct(idx=0)
    def worker(a, b=None, sender=None):
        assert a == 2
        assert b == 'b'
        for val in produce:
            time.sleep(0.5)
            sender.sendResult(val)
    def consumer(event):
        assert event.a=='a'
        result = event.result.get()
        print 'got result', result
        if expect.idx < len(produce):
            assert result == produce[ expect.idx ]#, 'Expected %s, got %s' % (
        else:
            assert result is None
            app.ExitMainLoop()
        expect.idx += 1
    def handleButtonClick():
        dr.startWorker(frame, worker, 
            cargs=(eventClass,), ckwargs={'a':'a','resultAttr':'result'}, 
            wargs=(2,), wkwargs={'b':'b'}, senderArg='sender')

    app = wx.PySimpleApp()
    frame = wx.Frame(None) # need this otherwise MainLoop() returns immediately
    from wx.lib.newevent import NewEvent as wxNewEvent
    eventClass, eventBinder = wxNewEvent()
    frame.Bind(eventBinder, consumer)
    # pretend user has clicked: 
    import thread
    thread.start_new_thread( wx.CallAfter, (handleButtonClick,))
    app.MainLoop()


def testAbort():
    import threading
    abort = dr.AbortEvent()
    
    # create a wx app and a function that will cause 
    # app to close when abort occurs
    app = wx.PySimpleApp()
    frame = wx.Frame(None) # need this otherwise MainLoop() returns immediately
    def exiter():
        abort.wait()
        # make sure any events have time to be processed before exit
        wx.FutureCall(2000, app.ExitMainLoop) 
    threading.Thread(target=exiter).start()

    # now do the delayed result computation:
    def worker():
        count = 0
        while not abort(1):
            print 'Result computation not done, not aborted'
        return 'Result computed'
    def consumer(dr): # never gets called but as example
        print 'Got dr=', dr.get()
        app.ExitMainLoop()
    dr.startWorker(consumer, worker)
    
    # pretend user doing other stuff
    import time
    time.sleep(5)
    # pretend user aborts now:
    print 'Setting abort event'
    abort.set()
    app.MainLoop()


def testPreProcChain():
    # test when no chain
    def handler(dr):
        assert dr.getJobID() == 123
        assert dr.get() == 321
    pp=dr.PreProcessChain( handler )
    pp( dr.DelayedResult(321, jobID=123) )
    
    # test with chaining
    def handlerPP(chainTrav, n, a=None):
        print 'In handlerPP'
        assert n==1
        assert a=='a'
        assert chainTrav.getJobID() == 321
        res = chainTrav.get()
        assert res == 135
        print 'Done handlerPP'
        
    def subStart1(handler):
        pp=dr.PreProcessChain(handler)
        pp.addSub(subEnd1, 1, b='b')
        subStart2(pp.clone())
    def subEnd1(chainTrav, aa, b=None):
        print 'In subEnd1'
        assert aa==1
        assert b=='b'
        assert chainTrav.getJobID() == 321
        res = chainTrav.get()
        assert res == 246, 'res=%s' % res
        print 'Returning from subEnd1'
        return res - 111

    def subStart2(preProc):
        preProc.addSub(subEnd2, 3, c='c')
        ss = dr.SenderNoWx(preProc, jobID=321)
        ss.sendResult(123)
    def subEnd2(chainTrav, a, c=None):
        print 'In subEnd2'
        assert a==3
        assert c=='c'
        assert chainTrav.getJobID() == 321
        res = chainTrav.get()
        assert res == 123
        print 'Returning from subEnd2'
        return 123*2

    subStart1( dr.Handler(handlerPP, 1, a='a') )

testStruct()
testHandler()
testSender()
testSendExcept()
testThread()
testStartWorker()
testStartWorkerEvent()
testAbort()
testPreProcChain()