CherryPy / cherrypy / process / unix_threading / test.py

#! /usr/bin/env python
import select
import sys
import os
import threading
from _native_event import NativeEvent
import time

"""
def test(event_class):
    events = []
    def wait_thread():
        e = event_class()
        events.append(e)
        for i in range(100):
            e.wait()
    threads = []
    for _ in range(100):
        t = threading.Thread(target=wait_thread)
        t.start()
        threads.append(t)
    time.sleep(1)
    for i in range(100):
        for e in events:
            e.set()
            e.clear()
    for t in threads:
        t.join()

if __name__ == '__main__':
    if eval(sys.argv[1]) == 0:
        test(threading.Event)
    else:
        test(unix_threading.NativeEvent)
"""
import unix_threading._pthread_cond
print unix_threading._pthread_cond.__file__
print os.getpid()
def test():
    ev = NativeEvent()
    def proc():
        time.sleep(10)
        ev.set()

    t = threading.Thread(target=proc)
    t.start()
    
    try:
        ev.wait()
    except KeyboardInterrupt:
        print 'hi'

test()
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.