Source

ocaml / otherlibs / win32unix / winworker.c

Full commit
xleroy 48382bb 

doligez 6cd742a 
xleroy 48382bb 
xleroy 15a3916 
xleroy 48382bb 
xleroy 15a3916 
xleroy 48382bb 









protzenk c9886c8 

frisch 9cc8769 


xleroy 48382bb 
































doligez e7d6b93 
xleroy 48382bb 


xleroy dff8115 
xleroy 48382bb 
doligez e7d6b93 
xleroy 48382bb 
doligez e7d6b93 
xleroy 48382bb 
doligez e7d6b93 
xleroy 48382bb 

xleroy dff8115 
xleroy 48382bb 


















xleroy dff8115 
xleroy 48382bb 







xleroy dff8115 
xleroy 48382bb 








doligez e7d6b93 




xleroy 48382bb 







xleroy dff8115 
xleroy 48382bb 





xleroy dff8115 
xleroy 48382bb 






































xleroy dff8115 
xleroy 48382bb 














xleroy dff8115 
xleroy 48382bb 





























xleroy dff8115 
xleroy 48382bb 

xleroy dff8115 

xleroy 48382bb 



xleroy dff8115 
xleroy 48382bb 






xleroy dff8115 
xleroy 48382bb 










xleroy dff8115 
xleroy 48382bb 
















xleroy dff8115 
xleroy 48382bb 




xleroy dff8115 
xleroy 48382bb 



doligez e7d6b93 
xleroy 48382bb 









xleroy dff8115 
xleroy 48382bb 



xleroy dff8115 
xleroy 48382bb 




xleroy dff8115 
xleroy 48382bb 











xleroy dff8115 
xleroy 48382bb 
xleroy dff8115 
xleroy 48382bb 



xleroy dff8115 
xleroy 48382bb 




/***********************************************************************/
/*                                                                     */
/*                                OCaml                                */
/*                                                                     */
/*  Contributed by Sylvain Le Gall for Lexifi                          */
/*                                                                     */
/*  Copyright 2008 Institut National de Recherche en Informatique et   */
/*  en Automatique.  All rights reserved.  This file is distributed    */
/*  under the terms of the GNU Library General Public License, with    */
/*  the special exception on linking described in file ../../LICENSE.  */
/*                                                                     */
/***********************************************************************/

/* $Id$ */

#include <mlvalues.h>
#include <alloc.h>
#include <memory.h>
#include <signals.h>
#include "winworker.h"
#include "winlist.h"
#include "windbug.h"

typedef enum {
  WORKER_CMD_NONE = 0,
  WORKER_CMD_EXEC,
  WORKER_CMD_STOP
} WORKERCMD;

struct _WORKER {
  LIST       lst;           /* This structure is used as a list. */
  HANDLE     hJobStarted;   /* Event representing that the function has begun. */
  HANDLE     hJobStop;      /* Event that can be used to notify the function that it
                               should stop processing. */
  HANDLE     hJobDone;      /* Event representing that the function has finished. */
  void      *lpJobUserData; /* User data for the job. */
  WORKERFUNC hJobFunc;      /* Function to be called during APC */
  HANDLE     hWorkerReady;  /* Worker is ready. */
  HANDLE     hCommandReady; /* Worker should execute command. */
  WORKERCMD  ECommand;      /* Command to execute */
  HANDLE     hThread;       /* Thread handle of the worker. */
};

#define THREAD_WORKERS_MAX 16
#define THREAD_WORKERS_MEM 4000

LPWORKER lpWorkers       = NULL;
DWORD    nWorkersCurrent = 0;
DWORD    nWorkersMax     = 0;
HANDLE   hWorkersMutex   = INVALID_HANDLE_VALUE;

DWORD WINAPI worker_wait (LPVOID _data)
{
  BOOL     bExit;
  LPWORKER lpWorker;

  lpWorker = (LPWORKER )_data;
  bExit    = FALSE;

  DEBUG_PRINT("Worker %x starting", lpWorker);
  while (
      !bExit
      && SignalObjectAndWait(
        lpWorker->hWorkerReady,
        lpWorker->hCommandReady,
        INFINITE,
        TRUE) == WAIT_OBJECT_0)
  {
    DEBUG_PRINT("Worker %x running", lpWorker);
    switch (lpWorker->ECommand)
    {
      case WORKER_CMD_NONE:
        break;

      case WORKER_CMD_EXEC:
        if (lpWorker->hJobFunc != NULL)
        {
          SetEvent(lpWorker->hJobStarted);
          lpWorker->hJobFunc(lpWorker->hJobStop, lpWorker->lpJobUserData);
          SetEvent(lpWorker->hJobDone);
        };
        break;

      case WORKER_CMD_STOP:
        bExit = TRUE;
        break;
    }
  };
  DEBUG_PRINT("Worker %x exiting", lpWorker);

  return 0;
}

LPWORKER worker_new (void)
{
  LPWORKER lpWorker = NULL;

  lpWorker = (LPWORKER)caml_stat_alloc(sizeof(WORKER));
  list_init((LPLIST)lpWorker);
  lpWorker->hJobStarted  = CreateEvent(NULL, TRUE, FALSE, NULL);
  lpWorker->hJobStop     = CreateEvent(NULL, TRUE, FALSE, NULL);
  lpWorker->hJobDone     = CreateEvent(NULL, TRUE, FALSE, NULL);
  lpWorker->lpJobUserData = NULL;
  lpWorker->hWorkerReady       = CreateEvent(NULL, FALSE, FALSE, NULL);
  lpWorker->hCommandReady      = CreateEvent(NULL, FALSE, FALSE, NULL);
  lpWorker->ECommand           = WORKER_CMD_NONE;
  lpWorker->hThread = CreateThread(
    NULL,
    THREAD_WORKERS_MEM,
    worker_wait,
    (LPVOID)lpWorker,
    0,
    NULL);

  return lpWorker;
};

void worker_free (LPWORKER lpWorker)
{
  /* Wait for termination of the worker */
  DEBUG_PRINT("Shutting down worker %x", lpWorker);
  WaitForSingleObject(lpWorker->hWorkerReady, INFINITE);
  lpWorker->ECommand = WORKER_CMD_STOP;
  SetEvent(lpWorker->hCommandReady);
  WaitForSingleObject(lpWorker->hThread, INFINITE);

  /* Free resources */
  DEBUG_PRINT("Freeing resources of worker %x", lpWorker);
  if (lpWorker->hThread != INVALID_HANDLE_VALUE)
  {
    CloseHandle(lpWorker->hThread);
    lpWorker->hThread = INVALID_HANDLE_VALUE;
  }

  if (lpWorker->hJobStarted != INVALID_HANDLE_VALUE)
  {
    CloseHandle(lpWorker->hJobStarted);
    lpWorker->hJobStarted = INVALID_HANDLE_VALUE;
  }

  if (lpWorker->hJobStop != INVALID_HANDLE_VALUE)
  {
    CloseHandle(lpWorker->hJobStop);
    lpWorker->hJobStop = INVALID_HANDLE_VALUE;
  }

  if (lpWorker->hJobDone != INVALID_HANDLE_VALUE)
  {
    CloseHandle(lpWorker->hJobDone);
    lpWorker->hJobDone = INVALID_HANDLE_VALUE;
  }

  lpWorker->lpJobUserData = NULL;
  lpWorker->hJobFunc = NULL;

  if (lpWorker->hWorkerReady != INVALID_HANDLE_VALUE)
  {
    CloseHandle(lpWorker->hWorkerReady);
    lpWorker->hWorkerReady = INVALID_HANDLE_VALUE;
  }

  if (lpWorker->hCommandReady != INVALID_HANDLE_VALUE)
  {
    CloseHandle(lpWorker->hCommandReady);
    lpWorker->hCommandReady = INVALID_HANDLE_VALUE;
  }

  caml_stat_free(lpWorker);
};

LPWORKER worker_pop (void)
{
  LPWORKER lpWorkerFree = NULL;

  WaitForSingleObject(hWorkersMutex, INFINITE);
  /* Get the first worker of the list */
  if (lpWorkers != NULL)
  {
    lpWorkerFree = lpWorkers;
    lpWorkers = LIST_NEXT(LPWORKER, lpWorkers);
  }
  nWorkersCurrent++;
  nWorkersMax = (nWorkersCurrent > nWorkersMax ? nWorkersCurrent : nWorkersMax);
  DEBUG_PRINT("Workers running current/runnning max/waiting: %d/%d/%d",
      nWorkersCurrent,
      nWorkersMax,
      list_length((LPLIST)lpWorkers));
  ReleaseMutex(hWorkersMutex);

  if (lpWorkerFree == NULL)
  {
    /* We cannot find a free worker, create one. */
    lpWorkerFree = worker_new();
  }

  /* Ensure that we don't get dangling pointer to old data. */
  list_init((LPLIST)lpWorkerFree);
  lpWorkerFree->lpJobUserData = NULL;

  /* Reset events */
  ResetEvent(lpWorkerFree->hJobStarted);
  ResetEvent(lpWorkerFree->hJobStop);
  ResetEvent(lpWorkerFree->hJobDone);

  return lpWorkerFree;
}

void worker_push(LPWORKER lpWorker)
{
  BOOL bFreeWorker;

  bFreeWorker = TRUE;

  WaitForSingleObject(hWorkersMutex, INFINITE);
  DEBUG_PRINT("Testing if we are under the maximum number of running workers");
  if (list_length((LPLIST)lpWorkers) < THREAD_WORKERS_MAX)
  {
    DEBUG_PRINT("Saving this worker for future use");
    DEBUG_PRINT("Next: %x", ((LPLIST)lpWorker)->lpNext);
    lpWorkers = (LPWORKER)list_concat((LPLIST)lpWorker, (LPLIST)lpWorkers);
    bFreeWorker = FALSE;
  };
  nWorkersCurrent--;
  DEBUG_PRINT("Workers running current/runnning max/waiting: %d/%d/%d",
      nWorkersCurrent,
      nWorkersMax,
      list_length((LPLIST)lpWorkers));
  ReleaseMutex(hWorkersMutex);

  if (bFreeWorker)
  {
    DEBUG_PRINT("Freeing worker %x", lpWorker);
    worker_free(lpWorker);
  }
}

void worker_init (void)
{
  int i = 0;

  /* Init a shared variable. The only way to ensure that no other
     worker will be at the same point is to use a critical section.
     */
  DEBUG_PRINT("Allocating mutex for workers");
  if (hWorkersMutex == INVALID_HANDLE_VALUE)
  {
    hWorkersMutex = CreateMutex(NULL, FALSE, NULL);
  }
}

void worker_cleanup(void)
{
  LPWORKER lpWorker = NULL;

  /* WARNING: we can have a race condition here, if while this code
     is executed another worker is waiting to access hWorkersMutex,
     he will never be able to get it...
     */
  if (hWorkersMutex != INVALID_HANDLE_VALUE)
  {
    WaitForSingleObject(hWorkersMutex, INFINITE);
    DEBUG_PRINT("Freeing global resource of workers");
    /* Empty the queue of worker worker */
    while (lpWorkers != NULL)
    {
      ReleaseMutex(hWorkersMutex);
      lpWorker = worker_pop();
      DEBUG_PRINT("Freeing worker %x", lpWorker);
      WaitForSingleObject(hWorkersMutex, INFINITE);
      worker_free(lpWorker);
    };
    ReleaseMutex(hWorkersMutex);

    /* Destroy associated mutex */
    CloseHandle(hWorkersMutex);
    hWorkersMutex = INVALID_HANDLE_VALUE;
  };
}

LPWORKER worker_job_submit (WORKERFUNC f, void *user_data)
{
  LPWORKER lpWorker = worker_pop();

  DEBUG_PRINT("Waiting for worker to be ready");
  enter_blocking_section();
  WaitForSingleObject(lpWorker->hWorkerReady, INFINITE);
  ResetEvent(lpWorker->hWorkerReady);
  leave_blocking_section();
  DEBUG_PRINT("Worker is ready");

  lpWorker->hJobFunc      = f;
  lpWorker->lpJobUserData = user_data;
  lpWorker->ECommand      = WORKER_CMD_EXEC;

  DEBUG_PRINT("Call worker (func: %x, worker: %x)", f, lpWorker);
  SetEvent(lpWorker->hCommandReady);

  return (LPWORKER)lpWorker;
}

HANDLE worker_job_event_done (LPWORKER lpWorker)
{
  return lpWorker->hJobDone;
}

void worker_job_stop (LPWORKER lpWorker)
{
  DEBUG_PRINT("Sending stop signal to worker %x", lpWorker);
  SetEvent(lpWorker->hJobStop);
  DEBUG_PRINT("Signal sent to worker %x", lpWorker);
}

void worker_job_finish (LPWORKER lpWorker)
{
  DEBUG_PRINT("Finishing call of worker %x", lpWorker);
  enter_blocking_section();
  WaitForSingleObject(lpWorker->hJobDone, INFINITE);
  leave_blocking_section();

  worker_push(lpWorker);
}