Wiki

Clone wiki

Okapi / lib_concurrent

Describes lib-concurrent, its current implementations and the assumptions they make for safe threaded processing.

Introduction

Okapi is not generally thread safe. The lib-concurrent module attempts to implement code that hides the thornier details of concurrency and implements the common use cases for which concurrency is useful. In order to safely implement concurrency certain strong assumptions must be true. The implementations and assumptions are described below.

Details

WorkQueuePipelineDriver

Assumptions

  • Steps in the pipeline must not expect to be visited by all batch items.
  • Batch items are not completed in the same order they are defined.
  • Steps must be created with the zero argument, default constructor. setParameters is the only safe way to initialize state.

WorkQueuePipelineDriver is a concurrent implementation of IPipelineDriver. Up to n Batch items are processed concurrently.

WorkQueuePipelineDriver clones the specified number of pipelines based on the template or main pipeline. The steps of the template pipeline are also cloned and initialized. The state of each cloned step mirrors the steps in the template pipeline including: parameters, runtime parameters (using okapi parameter annotations) etc.

Once the n work queues are running all queues must finish before another batch is processed. This means that long running batch items (i.e., large files) can block the processing of other items. This is a weakness in the current implementation and could be resolved by more intelligent load balancing algorithms.

Example:

pipeline = new WorkQueuePipelineDriver(); \\ default workqueue count equals the number of cpu cores

pipeline = new WorkQueuePipelineDriver(16); \\ with 16 threads or workqueues

TextUnitWorkQueueStep

Assumptions

  • Will only process TextUnits concurrently.
  • Steps must be essentially stateless: TextUnit in, TextUnit out
  • DocumentParts are passed through wthout processing of any kind. If DocumentPart processing is needed then a new implementation must be created or override the startConcurrentProcesing() method.

TextUnitWorkQueueStep is an implementation of IWorkQueueStep. IWorkQueueStep extends IPipelineStep and adds the methods needed for any current pipeline implementation (Pipeline, PipelineDriver, WorkQueuePipelineDriver) to run it concurrently.

TextUnitWorkQueueStep is designed to be a wrapper step that takes a template or main step and the number of work queues or threads to create. TextUnitWorkQueueStep uses SortableEvent and will return all processed events in the same order they were received. Events are returned in a single MultiEvent. TextUnitWorkQueueStep buffers TextUnits and DocumentParts until there are n TextUnits. N being the number of defined workqueues. A NOOP event is returned while buffering these events.

Example:

\\ process 4 TextUnits concurrently using LongRunningStep. Return the TextUnits in original order

pipeline.addStep(new TextUnitWorkQueueStep(new LongRunningStep(), 4));

Updated