Commits

dizzyd  committed 308e791

Fix bz://877; keydir NIF should not hold a rwlock across invocations (even though it works on most platforms).

A new thread is now spun up on iteration start to hold the rwlock until iteration completes.

  • Participants
  • Parent commits 1600b36

Comments (0)

Files changed (1)

File c_src/bitcask_nifs.c

 typedef struct
 {
     bitcask_keydir* keydir;
+    ErlNifTid       il_thread;  /* Iterator lock thread */
+    ErlNifMutex*    il_signal_mutex;
+    ErlNifCond*     il_signal;
+    int             il_signal_flag;
 } bitcask_keydir_handle;
 
 typedef struct
 static ERL_NIF_TERM ATOM_FSTAT_ERROR;
 static ERL_NIF_TERM ATOM_FTRUNCATE_ERROR;
 static ERL_NIF_TERM ATOM_GETFL_ERROR;
+static ERL_NIF_TERM ATOM_ILT_CREATE_ERROR; /* Iteration lock thread creation error */
+static ERL_NIF_TERM ATOM_ITERATION_IN_PROCESS;
 static ERL_NIF_TERM ATOM_ITERATION_NOT_PERMITTED;
+static ERL_NIF_TERM ATOM_ITERATION_NOT_STARTED;
 static ERL_NIF_TERM ATOM_LOCK_NOT_WRITABLE;
 static ERL_NIF_TERM ATOM_NOT_FOUND;
 static ERL_NIF_TERM ATOM_NOT_READY;
     bitcask_keydir_handle* handle = enif_alloc_resource_compat(env,
                                                         bitcask_keydir_RESOURCE,
                                                         sizeof(bitcask_keydir_handle));
+    memset(handle, '\0', sizeof(bitcask_keydir_handle));
 
     // Now allocate the actual keydir instance. Because it's unnamed/shared, we'll
     // leave the name and lock portions null'd out
         bitcask_keydir_handle* handle = enif_alloc_resource_compat(env,
                                                             bitcask_keydir_RESOURCE,
                                                             sizeof(bitcask_keydir_handle));
+        memset(handle, '\0', sizeof(bitcask_keydir_handle));
         handle->keydir = keydir;
         ERL_NIF_TERM result = enif_make_resource(env, handle);
         enif_release_resource_compat(env, handle);
         bitcask_keydir_handle* new_handle = enif_alloc_resource_compat(env,
                                                                 bitcask_keydir_RESOURCE,
                                                                 sizeof(bitcask_keydir_handle));
+        memset(handle, '\0', sizeof(bitcask_keydir_handle));
 
         // Now allocate the actual keydir instance. Because it's unnamed/shared, we'll
         // leave the name and lock portions null'd out
     }
 }
 
+static void* iterator_lock_thread(void* arg)
+{
+    bitcask_keydir_handle* handle = (bitcask_keydir_handle*)arg;
+
+    // First, grab the read lock for iteration
+    R_LOCK(handle->keydir);
+
+    // Set the flag that we are ready to begin iteration
+    handle->il_signal_flag = 1;
+
+    // Signal the invoking thread to indicate that the read lock is acquired
+    // and then wait for a signal back to release the read lock.
+    enif_mutex_lock(handle->il_signal_mutex);
+    enif_cond_signal(handle->il_signal);
+    while (handle->il_signal_flag)
+    {
+        enif_cond_wait(handle->il_signal, handle->il_signal_mutex);
+    }
+
+    // Iterating thread is all done with the read lock; release it and exit
+    R_UNLOCK(handle->keydir);
+    enif_mutex_unlock(handle->il_signal_mutex);
+    return 0;
+}
+
 ERL_NIF_TERM bitcask_nifs_keydir_itr(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
 {
     bitcask_keydir_handle* handle;
     {
         bitcask_keydir* keydir = handle->keydir;
 
-        // Grab the lock and initialize the iterator
-        R_LOCK(keydir);
+        // If a iterator thread is already active for this keydir, bail
+        if (handle->il_thread)
+        {
+            return enif_make_tuple2(env, ATOM_ERROR, ATOM_ITERATION_IN_PROCESS);
+        }
+
+        // Create the mutex and signal
+        handle->il_signal_flag = 0;
+        handle->il_signal_mutex = enif_mutex_create("bitcask_itr_signal_mutex");
+        handle->il_signal = enif_cond_create("bitcask_itr_signal");
+
+        // Grab the mutex BEFORE spawning the thread; otherwise we might miss the signal
+        enif_mutex_lock(handle->il_signal_mutex);
+
+        // Spawn the lock thread
+        int rc = enif_thread_create("bitcask_itr_lock_thread", &(handle->il_thread),
+                                    iterator_lock_thread, handle, 0);
+        if (rc != 0)
+        {
+            // Failed to create the lock holder -- release the lock and cleanup
+            enif_mutex_unlock(handle->il_signal_mutex);
+            enif_cond_destroy(handle->il_signal);
+            enif_mutex_destroy(handle->il_signal_mutex);
+            handle->il_thread = 0;
+            return enif_make_tuple2(env, ATOM_ERROR,
+                                    enif_make_tuple2(env, ATOM_ILT_CREATE_ERROR, rc));
+        }
+
+        // Wait for the signal from the lock thread that iteration may commence
+        while (!handle->il_signal_flag)
+        {
+            enif_cond_wait(handle->il_signal, handle->il_signal_mutex);
+        }
+
+        // Ready to go; initialize the iterator and unlock the signal mutex
         keydir->iterator = kh_begin(keydir->entries);
+        enif_mutex_unlock(handle->il_signal_mutex);
+
         return ATOM_OK;
     }
     else
     {
         bitcask_keydir* keydir = handle->keydir;
 
+        if (handle->il_signal_flag != 1)
+        {
+            // Iteration not started!
+            return enif_make_tuple2(env, ATOM_ERROR, ATOM_ITERATION_NOT_STARTED);
+        }
+
         while (keydir->iterator != kh_end(keydir->entries))
         {
             if (kh_exist(keydir->entries, keydir->iterator))
 
     if (enif_get_resource(env, argv[0], bitcask_keydir_RESOURCE, (void**)&handle))
     {
-        bitcask_keydir* keydir = handle->keydir;
+        if (handle->il_signal_flag != 1)
+        {
+            // Iteration not started!
+            return enif_make_tuple2(env, ATOM_ERROR, ATOM_ITERATION_NOT_STARTED);
+        }
 
-        // Unlock the keydir
-        R_UNLOCK(keydir);
+        // Lock the signal mutex, clear the il_signal_flag and tell the lock thread
+        // to drop the read lock
+        enif_mutex_lock(handle->il_signal_mutex);
+        handle->il_signal_flag = 0;
+        enif_cond_signal(handle->il_signal);
+        enif_mutex_unlock(handle->il_signal_mutex);
+        enif_thread_join(handle->il_thread, 0);
+
+        // Cleanup
+        enif_cond_destroy(handle->il_signal);
+        enif_mutex_destroy(handle->il_signal_mutex);
+        handle->il_thread = 0;
+
         return ATOM_OK;
     }
     else
     ATOM_FSTAT_ERROR = enif_make_atom(env, "fstat_error");
     ATOM_FTRUNCATE_ERROR = enif_make_atom(env, "ftruncate_error");
     ATOM_GETFL_ERROR = enif_make_atom(env, "getfl_error");
+    ATOM_ILT_CREATE_ERROR = enif_make_atom(env, "ilt_create_error");
+    ATOM_ITERATION_IN_PROCESS = enif_make_atom(env, "iteration_in_process");
     ATOM_ITERATION_NOT_PERMITTED = enif_make_atom(env, "iteration_not_permitted");
+    ATOM_ITERATION_NOT_STARTED = enif_make_atom(env, "iteration_not_started");
     ATOM_LOCK_NOT_WRITABLE = enif_make_atom(env, "lock_not_writable");
     ATOM_NOT_FOUND = enif_make_atom(env, "not_found");
     ATOM_NOT_READY = enif_make_atom(env, "not_ready");