Source

ocaml / otherlibs / win32unix / winworker.c

/***********************************************************************/
/*                                                                     */
/*                                OCaml                                */
/*                                                                     */
/*  Contributed by Sylvain Le Gall for Lexifi                          */
/*                                                                     */
/*  Copyright 2008 Institut National de Recherche en Informatique et   */
/*  en Automatique.  All rights reserved.  This file is distributed    */
/*  under the terms of the GNU Library General Public License, with    */
/*  the special exception on linking described in file ../../LICENSE.  */
/*                                                                     */
/***********************************************************************/

/* $Id$ */

#include <mlvalues.h>
#include <alloc.h>
#include <memory.h>
#include <signals.h>
#include "winworker.h"
#include "winlist.h"
#include "windbug.h"

typedef enum {
  WORKER_CMD_NONE = 0,
  WORKER_CMD_EXEC,
  WORKER_CMD_STOP
} WORKERCMD;

struct _WORKER {
  LIST       lst;           /* This structure is used as a list. */
  HANDLE     hJobStarted;   /* Event representing that the function has begun. */
  HANDLE     hJobStop;      /* Event that can be used to notify the function that it
                               should stop processing. */
  HANDLE     hJobDone;      /* Event representing that the function has finished. */
  void      *lpJobUserData; /* User data for the job. */
  WORKERFUNC hJobFunc;      /* Function to be called during APC */
  HANDLE     hWorkerReady;  /* Worker is ready. */
  HANDLE     hCommandReady; /* Worker should execute command. */
  WORKERCMD  ECommand;      /* Command to execute */
  HANDLE     hThread;       /* Thread handle of the worker. */
};

#define THREAD_WORKERS_MAX 16
#define THREAD_WORKERS_MEM 4000

LPWORKER lpWorkers       = NULL;
DWORD    nWorkersCurrent = 0;
DWORD    nWorkersMax     = 0;
HANDLE   hWorkersMutex   = INVALID_HANDLE_VALUE;

DWORD WINAPI worker_wait (LPVOID _data)
{
  BOOL     bExit;
  LPWORKER lpWorker;

  lpWorker = (LPWORKER )_data;
  bExit    = FALSE;

  DEBUG_PRINT("Worker %x starting", lpWorker);
  while (
      !bExit
      && SignalObjectAndWait(
        lpWorker->hWorkerReady,
        lpWorker->hCommandReady,
        INFINITE,
        TRUE) == WAIT_OBJECT_0)
  {
    DEBUG_PRINT("Worker %x running", lpWorker);
    switch (lpWorker->ECommand)
    {
      case WORKER_CMD_NONE:
        break;

      case WORKER_CMD_EXEC:
        if (lpWorker->hJobFunc != NULL)
        {
          SetEvent(lpWorker->hJobStarted);
          lpWorker->hJobFunc(lpWorker->hJobStop, lpWorker->lpJobUserData);
          SetEvent(lpWorker->hJobDone);
        };
        break;

      case WORKER_CMD_STOP:
        bExit = TRUE;
        break;
    }
  };
  DEBUG_PRINT("Worker %x exiting", lpWorker);

  return 0;
}

LPWORKER worker_new (void)
{
  LPWORKER lpWorker = NULL;

  lpWorker = (LPWORKER)caml_stat_alloc(sizeof(WORKER));
  list_init((LPLIST)lpWorker);
  lpWorker->hJobStarted  = CreateEvent(NULL, TRUE, FALSE, NULL);
  lpWorker->hJobStop     = CreateEvent(NULL, TRUE, FALSE, NULL);
  lpWorker->hJobDone     = CreateEvent(NULL, TRUE, FALSE, NULL);
  lpWorker->lpJobUserData = NULL;
  lpWorker->hWorkerReady       = CreateEvent(NULL, FALSE, FALSE, NULL);
  lpWorker->hCommandReady      = CreateEvent(NULL, FALSE, FALSE, NULL);
  lpWorker->ECommand           = WORKER_CMD_NONE;
  lpWorker->hThread = CreateThread(
    NULL,
    THREAD_WORKERS_MEM,
    worker_wait,
    (LPVOID)lpWorker,
    0,
    NULL);

  return lpWorker;
};

void worker_free (LPWORKER lpWorker)
{
  /* Wait for termination of the worker */
  DEBUG_PRINT("Shutting down worker %x", lpWorker);
  WaitForSingleObject(lpWorker->hWorkerReady, INFINITE);
  lpWorker->ECommand = WORKER_CMD_STOP;
  SetEvent(lpWorker->hCommandReady);
  WaitForSingleObject(lpWorker->hThread, INFINITE);

  /* Free resources */
  DEBUG_PRINT("Freeing resources of worker %x", lpWorker);
  if (lpWorker->hThread != INVALID_HANDLE_VALUE)
  {
    CloseHandle(lpWorker->hThread);
    lpWorker->hThread = INVALID_HANDLE_VALUE;
  }

  if (lpWorker->hJobStarted != INVALID_HANDLE_VALUE)
  {
    CloseHandle(lpWorker->hJobStarted);
    lpWorker->hJobStarted = INVALID_HANDLE_VALUE;
  }

  if (lpWorker->hJobStop != INVALID_HANDLE_VALUE)
  {
    CloseHandle(lpWorker->hJobStop);
    lpWorker->hJobStop = INVALID_HANDLE_VALUE;
  }

  if (lpWorker->hJobDone != INVALID_HANDLE_VALUE)
  {
    CloseHandle(lpWorker->hJobDone);
    lpWorker->hJobDone = INVALID_HANDLE_VALUE;
  }

  lpWorker->lpJobUserData = NULL;
  lpWorker->hJobFunc = NULL;

  if (lpWorker->hWorkerReady != INVALID_HANDLE_VALUE)
  {
    CloseHandle(lpWorker->hWorkerReady);
    lpWorker->hWorkerReady = INVALID_HANDLE_VALUE;
  }

  if (lpWorker->hCommandReady != INVALID_HANDLE_VALUE)
  {
    CloseHandle(lpWorker->hCommandReady);
    lpWorker->hCommandReady = INVALID_HANDLE_VALUE;
  }

  caml_stat_free(lpWorker);
};

LPWORKER worker_pop (void)
{
  LPWORKER lpWorkerFree = NULL;

  WaitForSingleObject(hWorkersMutex, INFINITE);
  /* Get the first worker of the list */
  if (lpWorkers != NULL)
  {
    lpWorkerFree = lpWorkers;
    lpWorkers = LIST_NEXT(LPWORKER, lpWorkers);
  }
  nWorkersCurrent++;
  nWorkersMax = (nWorkersCurrent > nWorkersMax ? nWorkersCurrent : nWorkersMax);
  DEBUG_PRINT("Workers running current/runnning max/waiting: %d/%d/%d",
      nWorkersCurrent,
      nWorkersMax,
      list_length((LPLIST)lpWorkers));
  ReleaseMutex(hWorkersMutex);

  if (lpWorkerFree == NULL)
  {
    /* We cannot find a free worker, create one. */
    lpWorkerFree = worker_new();
  }

  /* Ensure that we don't get dangling pointer to old data. */
  list_init((LPLIST)lpWorkerFree);
  lpWorkerFree->lpJobUserData = NULL;

  /* Reset events */
  ResetEvent(lpWorkerFree->hJobStarted);
  ResetEvent(lpWorkerFree->hJobStop);
  ResetEvent(lpWorkerFree->hJobDone);

  return lpWorkerFree;
}

void worker_push(LPWORKER lpWorker)
{
  BOOL bFreeWorker;

  bFreeWorker = TRUE;

  WaitForSingleObject(hWorkersMutex, INFINITE);
  DEBUG_PRINT("Testing if we are under the maximum number of running workers");
  if (list_length((LPLIST)lpWorkers) < THREAD_WORKERS_MAX)
  {
    DEBUG_PRINT("Saving this worker for future use");
    DEBUG_PRINT("Next: %x", ((LPLIST)lpWorker)->lpNext);
    lpWorkers = (LPWORKER)list_concat((LPLIST)lpWorker, (LPLIST)lpWorkers);
    bFreeWorker = FALSE;
  };
  nWorkersCurrent--;
  DEBUG_PRINT("Workers running current/runnning max/waiting: %d/%d/%d",
      nWorkersCurrent,
      nWorkersMax,
      list_length((LPLIST)lpWorkers));
  ReleaseMutex(hWorkersMutex);

  if (bFreeWorker)
  {
    DEBUG_PRINT("Freeing worker %x", lpWorker);
    worker_free(lpWorker);
  }
}

void worker_init (void)
{
  int i = 0;

  /* Init a shared variable. The only way to ensure that no other
     worker will be at the same point is to use a critical section.
     */
  DEBUG_PRINT("Allocating mutex for workers");
  if (hWorkersMutex == INVALID_HANDLE_VALUE)
  {
    hWorkersMutex = CreateMutex(NULL, FALSE, NULL);
  }
}

void worker_cleanup(void)
{
  LPWORKER lpWorker = NULL;

  /* WARNING: we can have a race condition here, if while this code
     is executed another worker is waiting to access hWorkersMutex,
     he will never be able to get it...
     */
  if (hWorkersMutex != INVALID_HANDLE_VALUE)
  {
    WaitForSingleObject(hWorkersMutex, INFINITE);
    DEBUG_PRINT("Freeing global resource of workers");
    /* Empty the queue of worker worker */
    while (lpWorkers != NULL)
    {
      ReleaseMutex(hWorkersMutex);
      lpWorker = worker_pop();
      DEBUG_PRINT("Freeing worker %x", lpWorker);
      WaitForSingleObject(hWorkersMutex, INFINITE);
      worker_free(lpWorker);
    };
    ReleaseMutex(hWorkersMutex);

    /* Destroy associated mutex */
    CloseHandle(hWorkersMutex);
    hWorkersMutex = INVALID_HANDLE_VALUE;
  };
}

LPWORKER worker_job_submit (WORKERFUNC f, void *user_data)
{
  LPWORKER lpWorker = worker_pop();

  DEBUG_PRINT("Waiting for worker to be ready");
  enter_blocking_section();
  WaitForSingleObject(lpWorker->hWorkerReady, INFINITE);
  ResetEvent(lpWorker->hWorkerReady);
  leave_blocking_section();
  DEBUG_PRINT("Worker is ready");

  lpWorker->hJobFunc      = f;
  lpWorker->lpJobUserData = user_data;
  lpWorker->ECommand      = WORKER_CMD_EXEC;

  DEBUG_PRINT("Call worker (func: %x, worker: %x)", f, lpWorker);
  SetEvent(lpWorker->hCommandReady);

  return (LPWORKER)lpWorker;
}

HANDLE worker_job_event_done (LPWORKER lpWorker)
{
  return lpWorker->hJobDone;
}

void worker_job_stop (LPWORKER lpWorker)
{
  DEBUG_PRINT("Sending stop signal to worker %x", lpWorker);
  SetEvent(lpWorker->hJobStop);
  DEBUG_PRINT("Signal sent to worker %x", lpWorker);
}

void worker_job_finish (LPWORKER lpWorker)
{
  DEBUG_PRINT("Finishing call of worker %x", lpWorker);
  enter_blocking_section();
  WaitForSingleObject(lpWorker->hJobDone, INFINITE);
  leave_blocking_section();

  worker_push(lpWorker);
}
Tip: Filter by directory path e.g. /media app.js to search for public/media/app.js.
Tip: Use camelCasing e.g. ProjME to search for ProjectModifiedEvent.java.
Tip: Filter by extension type e.g. /repo .js to search for all .js files in the /repo directory.
Tip: Separate your search with spaces e.g. /ssh pom.xml to search for src/ssh/pom.xml.
Tip: Use ↑ and ↓ arrow keys to navigate and return to view the file.
Tip: You can also navigate files with Ctrl+j (next) and Ctrl+k (previous) and view the file with Ctrl+o.
Tip: You can also navigate files with Alt+j (next) and Alt+k (previous) and view the file with Alt+o.