1. Pypy
  2. Untitled project
  3. pypy

Commits

Armin Rigo  committed 9923393 Merge

Merge gil-improvement: improve the balance of threads getting the GIL,
notably when there is a mixture of CPU-intensive and I/O threads.

Previously, any CPU-intensive thread would release and
immediately re-acquire the GIL every 100 bytecodes; but most of
the time on multi-CPU machines this didn't actually gave other
threads a chance to run. And so when an I/O thread did any
operation, chances were that they would loose control and not
regain it for quite a long time.

Now, the yield-after-N-bytecodes step is different from the
releasing and reacquiring the GIL around I/O: the former actually
forces another thread to run (if there is one), while the later
does not (which, in the common case where the I/O is very short,
tends to let the same thread continuing to run). Also, using
10000 instead of 100 bytecodes as sys.checkinterval makes more
sense in this version.

This makes the example in https://bugs.pypy.org/issue884 work
without any noticable delay at all, at least on Linux.

The Windows version was tested too, but didn't give so good
improvements; see comment in thread_nt.c. Will try to tweak a
bit more, so I'm not closing the gil-improvement branch right
now...

  • Participants
  • Parent commits 7eb1a79, bdeb78c
  • Branches default

Comments (0)

Files changed (9)

File pypy/interpreter/executioncontext.py

View file
         self._nonperiodic_actions = []
         self.has_bytecode_counter = False
         self.fired_actions = None
-        self.checkinterval_scaled = 100 * TICK_COUNTER_STEP
+        # the default value is not 100, unlike CPython 2.7, but a much
+        # larger value, because we use a technique that not only allows
+        # but actually *forces* another thread to run whenever the counter
+        # reaches zero.
+        self.checkinterval_scaled = 10000 * TICK_COUNTER_STEP
         self._rebuild_action_dispatcher()
 
     def fire(self, action):

File pypy/module/thread/gil.py

View file
 
 class GILThreadLocals(OSThreadLocals):
     """A version of OSThreadLocals that enforces a GIL."""
-    ll_GIL = thread.null_ll_lock
+    gil_ready = False
 
     def initialize(self, space):
         # add the GIL-releasing callback as an action on the space
 
     def setup_threads(self, space):
         """Enable threads in the object space, if they haven't already been."""
-        if not self.ll_GIL:
-            try:
-                self.ll_GIL = thread.allocate_ll_lock()
-            except thread.error:
+        if not self.gil_ready:
+            if not thread.gil_allocate():
                 raise wrap_thread_error(space, "can't allocate GIL")
-            thread.acquire_NOAUTO(self.ll_GIL, True)
+            self.gil_ready = True
             self.enter_thread(space)   # setup the main thread
             result = True
         else:
         # test_lock_again after the global state was cleared by
         # test_compile_lock.  As a workaround, we repatch these global
         # fields systematically.
-        spacestate.ll_GIL = self.ll_GIL
         invoke_around_extcall(before_external_call, after_external_call)
         return result
 
     def reinit_threads(self, space):
-        if self.ll_GIL:
-            self.ll_GIL = thread.allocate_ll_lock()
-            thread.acquire_NOAUTO(self.ll_GIL, True)
-            self.enter_thread(space)
+        if self.gil_ready:
+            self.gil_ready = False
+            self.setup_threads(space)
 
     def yield_thread(self):
-        thread.yield_thread()  # explicitly release the gil (used by test_gil)
-
+        do_yield_thread()
 
 class GILReleaseAction(PeriodicAsyncAction):
     """An action called every sys.checkinterval bytecodes.  It releases
     """
 
     def perform(self, executioncontext, frame):
-        # Other threads can run between the release() and the acquire()
-        # implicit in the following external function call (which has
-        # otherwise no effect).
-        thread.yield_thread()
+        do_yield_thread()
 
 
 class SpaceState:
 
     def _freeze_(self):
-        self.ll_GIL = thread.null_ll_lock
         self.action_after_thread_switch = None
         # ^^^ set by AsyncAction.fire_after_thread_switch()
         return False
     # this function must not raise, in such a way that the exception
     # transformer knows that it cannot raise!
     e = get_errno()
-    thread.release_NOAUTO(spacestate.ll_GIL)
+    thread.gil_release()
     set_errno(e)
 before_external_call._gctransformer_hint_cannot_collect_ = True
 before_external_call._dont_reach_me_in_del_ = True
 
 def after_external_call():
     e = get_errno()
-    thread.acquire_NOAUTO(spacestate.ll_GIL, True)
+    thread.gil_acquire()
     thread.gc_thread_run()
     spacestate.after_thread_switch()
     set_errno(e)
 # pointers in the shadow stack.  This is necessary because the GIL is
 # not held after the call to before_external_call() or before the call
 # to after_external_call().
+
+def do_yield_thread():
+    # explicitly release the gil, in a way that tries to give more
+    # priority to other threads (as opposed to continuing to run in
+    # the same thread).
+    if thread.gil_yield_thread():
+        thread.gc_thread_run()
+        spacestate.after_thread_switch()
+do_yield_thread._gctransformer_hint_close_stack_ = True
+do_yield_thread._dont_reach_me_in_del_ = True
+do_yield_thread._dont_inline_ = True
+
+# do_yield_thread() needs a different hint: _gctransformer_hint_close_stack_.
+# The *_external_call() functions are themselves called only from the rffi
+# module from a helper function that also has this hint.

File pypy/module/thread/ll_thread.py

View file
     include_dirs = [str(py.path.local(autopath.pypydir).join('translator', 'c'))],
     export_symbols = ['RPyThreadGetIdent', 'RPyThreadLockInit',
                       'RPyThreadAcquireLock', 'RPyThreadReleaseLock',
-                      'RPyThreadYield',
+                      'RPyGilAllocate', 'RPyGilYieldThread',
+                      'RPyGilRelease', 'RPyGilAcquire',
                       'RPyThreadGetStackSize', 'RPyThreadSetStackSize',
                       'RPyOpaqueDealloc_ThreadLock',
                       'RPyThreadAfterFork']
                                          [TLOCKP], lltype.Void,
                                          _nowrapper=True)
 
-# this function does nothing apart from releasing the GIL temporarily.
-yield_thread = llexternal('RPyThreadYield', [], lltype.Void, threadsafe=True)
+# these functions manipulate directly the GIL, whose definition does not
+# escape the C code itself
+gil_allocate     = llexternal('RPyGilAllocate', [], lltype.Signed,
+                              _nowrapper=True)
+gil_yield_thread = llexternal('RPyGilYieldThread', [], lltype.Signed,
+                              _nowrapper=True)
+gil_release      = llexternal('RPyGilRelease', [], lltype.Void,
+                              _nowrapper=True)
+gil_acquire      = llexternal('RPyGilAcquire', [], lltype.Void,
+                              _nowrapper=True)
 
 def allocate_lock():
     return Lock(allocate_ll_lock())

File pypy/module/thread/test/test_gil.py

View file
     use_threads = True
     bigtest = False
 
-    def test_one_thread(self):
+    def test_one_thread(self, skew=+1):
+        from pypy.rlib.debug import debug_print
         if self.bigtest:
-            N = 1000000
+            N = 100000
+            skew *= 25000
         else:
             N = 100
+            skew *= 25
         space = FakeSpace()
         class State:
             pass
         state = State()
-        def runme():
-            for i in range(N):
+        def runme(main=False):
+            j = 0
+            for i in range(N + [-skew, skew][main]):
+                state.datalen1 += 1   # try to crash if the GIL is not
+                state.datalen2 += 1   # correctly acquired
                 state.data.append((thread.get_ident(), i))
+                state.datalen3 += 1
+                state.datalen4 += 1
+                assert state.datalen1 == len(state.data)
+                assert state.datalen2 == len(state.data)
+                assert state.datalen3 == len(state.data)
+                assert state.datalen4 == len(state.data)
+                debug_print(main, i, state.datalen4)
                 state.threadlocals.yield_thread()
+                assert i == j
+                j += 1
         def bootstrap():
             try:
                 runme()
                 thread.gc_thread_die()
         def f():
             state.data = []
+            state.datalen1 = 0
+            state.datalen2 = 0
+            state.datalen3 = 0
+            state.datalen4 = 0
             state.threadlocals = gil.GILThreadLocals()
             state.threadlocals.setup_threads(space)
             thread.gc_thread_prepare()
             subident = thread.start_new_thread(bootstrap, ())
             mainident = thread.get_ident()
-            runme()
+            runme(True)
             still_waiting = 3000
             while len(state.data) < 2*N:
+                debug_print(len(state.data))
                 if not still_waiting:
                     raise ValueError("time out")
                 still_waiting -= 1
                 if not we_are_translated(): gil.before_external_call()
                 time.sleep(0.01)
                 if not we_are_translated(): gil.after_external_call()
+            debug_print("leaving!")
             i1 = i2 = 0
             for tid, i in state.data:
                 if tid == mainident:
                     assert i == i2; i2 += 1
                 else:
                     assert 0
-            assert i1 == N
-            assert i2 == N
+            assert i1 == N + skew
+            assert i2 == N - skew
             return len(state.data)
 
         fn = self.getcompiled(f, [])
         res = fn()
         assert res == 2*N
 
+    def test_one_thread_rev(self):
+        self.test_one_thread(skew=-1)
+
 
 class TestRunDirectly(GILTests):
     def getcompiled(self, f, argtypes):

File pypy/module/thread/test/test_thread.py

View file
 
         def busy_wait():
             for x in range(1000):
-                time.sleep(0.01)
+                print 'tick...', x  # <-force the GIL to be released, as
+                time.sleep(0.01)    #   time.sleep doesn't do non-translated
 
         # This is normally called by app_main.py
         signal.signal(signal.SIGINT, signal.default_int_handler)

File pypy/translator/c/gcc/trackgcroot.py

View file
         'paddq', 'pinsr',
         # zero-extending moves should not produce GC pointers
         'movz', 
+        # locked operations should not move GC pointers, at least so far
+        'lock',
         ])
 
     # a partial list is hopefully good enough for now; it's all to support

File pypy/translator/c/src/thread.h

View file
 
 #endif
 
-/* common helper: this does nothing, but is called with the GIL released.
-   This gives other threads a chance to grab the GIL and run. */
-void RPyThreadYield(void);
-
-#ifndef PYPY_NOT_MAIN_FILE
-void RPyThreadYield(void)
-{
-}
-#endif
+long RPyGilAllocate(void);
+long RPyGilYieldThread(void);
+void RPyGilRelease(void);
+void RPyGilAcquire(void);
 
 #endif

File pypy/translator/c/src/thread_nt.h

View file
 #define RPyThreadTLS_Set(key, value)	TlsSetValue(key, value)
 
 
+/************************************************************/
+/* GIL code                                                 */
+/************************************************************/
+
+static volatile LONG pending_acquires = -1;
+static CRITICAL_SECTION mutex_gil;
+static HANDLE cond_gil;
+
+long RPyGilAllocate(void)
+{
+    pending_acquires = 0;
+    InitializeCriticalSection(&mutex_gil);
+    EnterCriticalSection(&mutex_gil);
+    cond_gil = CreateEvent (NULL, FALSE, FALSE, NULL);
+    return 1;
+}
+
+long RPyGilYieldThread(void)
+{
+    /* can be called even before RPyGilAllocate(), but in this case,
+       pending_acquires will be -1 */
+    if (pending_acquires <= 0)
+        return 0;
+    InterlockedIncrement(&pending_acquires);
+    PulseEvent(&cond_gil);
+
+    /* hack: the three following lines do a pthread_cond_wait(), and
+       normally specifying a timeout of INFINITE would be fine.  But the
+       first and second operations are not done atomically, so there is a
+       (small) risk that PulseEvent misses the WaitForSingleObject().
+       In this case the process will just sleep a few milliseconds. */
+    LeaveCriticalSection(&mutex_gil);
+    WaitForSingleObject(&cond_gil, 15);
+    EnterCriticalSection(&mutex_gil);
+
+    InterlockedDecrement(&pending_acquires);
+    return 1;
+}
+
+void RPyGilRelease(void)
+{
+    LeaveCriticalSection(&mutex_gil);
+    PulseEvent(&cond_gil);
+}
+
+void RPyGilAcquire(void)
+{
+    InterlockedIncrement(&pending_acquires);
+    EnterCriticalSection(&mutex_gil);
+    InterlockedDecrement(&pending_acquires);
+}
+
+
 #endif /* PYPY_NOT_MAIN_FILE */

File pypy/translator/c/src/thread_pthread.h

View file
 #include <signal.h>
 #include <stdio.h>
 #include <errno.h>
+#include <assert.h>
 
 /* The following is hopefully equivalent to what CPython does
    (which is trying to compile a snippet of code using it) */
 #define RPyThreadTLS_Set(key, value)	pthread_setspecific(key, value)
 
 
+/************************************************************/
+/* GIL code                                                 */
+/************************************************************/
+
+#ifdef __llvm__
+#  define HAS_ATOMIC_ADD
+#endif
+
+#ifdef __GNUC__
+#  if __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 1)
+#    define HAS_ATOMIC_ADD
+#  endif
+#endif
+
+#ifdef HAS_ATOMIC_ADD
+#  define atomic_add __sync_fetch_and_add
+#else
+#  if defined(__amd64__)
+#    define atomic_add(ptr, value)  asm volatile ("lock addq %0, %1"        \
+                                 : : "ri"(value), "m"(*(ptr)) : "memory")
+#  elif defined(__i386__)
+#    define atomic_add(ptr, value)  asm volatile ("lock addl %0, %1"        \
+                                 : : "ri"(value), "m"(*(ptr)) : "memory")
+#  else
+#    error "Please use gcc >= 4.1 or write a custom 'asm' for your CPU."
+#  endif
+#endif
+
+#define ASSERT_STATUS(call)                             \
+    if (call != 0) {                                    \
+        fprintf(stderr, "Fatal error: " #call "\n");    \
+        abort();                                        \
+    }
+
+static void _debug_print(const char *msg)
+{
+#if 0
+    int col = (int)pthread_self();
+    col = 31 + ((col / 8) % 8);
+    fprintf(stderr, "\033[%dm%s\033[0m", col, msg);
+#endif
+}
+
+static volatile long pending_acquires = -1;
+static pthread_mutex_t mutex_gil = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t cond_gil = PTHREAD_COND_INITIALIZER;
+
+static void assert_has_the_gil(void)
+{
+#ifdef RPY_ASSERT
+    assert(pthread_mutex_trylock(&mutex_gil) != 0);
+    assert(pending_acquires >= 0);
+#endif
+}
+
+long RPyGilAllocate(void)
+{
+    _debug_print("RPyGilAllocate\n");
+    pending_acquires = 0;
+    pthread_mutex_trylock(&mutex_gil);
+    assert_has_the_gil();
+    return 1;
+}
+
+long RPyGilYieldThread(void)
+{
+    /* can be called even before RPyGilAllocate(), but in this case,
+       pending_acquires will be -1 */
+#ifdef RPY_ASSERT
+    if (pending_acquires >= 0)
+        assert_has_the_gil();
+#endif
+    if (pending_acquires <= 0)
+        return 0;
+    atomic_add(&pending_acquires, 1L);
+    _debug_print("{");
+    ASSERT_STATUS(pthread_cond_signal(&cond_gil));
+    ASSERT_STATUS(pthread_cond_wait(&cond_gil, &mutex_gil));
+    _debug_print("}");
+    atomic_add(&pending_acquires, -1L);
+    assert_has_the_gil();
+    return 1;
+}
+
+void RPyGilRelease(void)
+{
+    _debug_print("RPyGilRelease\n");
+#ifdef RPY_ASSERT
+    assert(pending_acquires >= 0);
+#endif
+    assert_has_the_gil();
+    ASSERT_STATUS(pthread_mutex_unlock(&mutex_gil));
+    ASSERT_STATUS(pthread_cond_signal(&cond_gil));
+}
+
+void RPyGilAcquire(void)
+{
+    _debug_print("about to RPyGilAcquire...\n");
+#ifdef RPY_ASSERT
+    assert(pending_acquires >= 0);
+#endif
+    atomic_add(&pending_acquires, 1L);
+    ASSERT_STATUS(pthread_mutex_lock(&mutex_gil));
+    atomic_add(&pending_acquires, -1L);
+    assert_has_the_gil();
+    _debug_print("RPyGilAcquire\n");
+}
+
+
 #endif /* PYPY_NOT_MAIN_FILE */