1. sboz
  2. Klubok

Wiki

Clone wiki

Klubok / Home

Overview

Klubok is a C++ library wrapping pthread functionality like thread, mutex, condition variable, into classes and additionally providing a thread pool implementation on top of these classes. To use the thread pool it is sufficient to provide a derived class containing only processing logic. The main focus of the library is on simplicity and ease of use.

Examples

The library consists of two parts. The first part wraps the pthread functionality, while the other provides thread pool implementation using these wrapper classes.

Wrapper classes

The base part of the library consists of classes Thread, Mutex, ConditionVariable, MutexedVariable, ConditionVariableWithMutex and ConditionVariableFull The first three classes correspond as closely as reasonable to their counterparts of the pthread library. The remaining three classes are simply combinations of a variable, whose type is specified by a template parameter, and of mutex and/or condition variable. Note that instances of Thread, Mutex, ConditionVariable and ConditionVariableWithMutex can not be copied or copy constructed. Instances of MutexedVariable and ConditionVariableFull on copying copy only the variable value.

To create a thread it is necessary to subclass the Thread class and to implement run method. An instance of the derived classes can be executed in a new thread by calling start method.

#include <iostream>
#include "Klubok/thread.h"
using namespace klubok;
using namespace std;

class MyThread : public Thread
{
	public:
		MyThread() {};
		~MyThread (){};
		void run()
		{
		   cout << "Doing job inside new thread"<<endl; 
		};
};

int main()
{
	MyThread thread;
	thread.start();
}

If it takes rather long for the new thread to finish. It is possible to wait for the thread by calling join method. It is also possible to cancel the thread without finishing by cancel method. This method sends the cancellation request and joins the thread. In default configuration cancellation type is asynchronous. If desired it can be changed to deferred, see Worker class for example.

thread.join() //will hang here until thread finishes
thread.start() //start a new one
thread.cancel() //cancel execution

Thread class provides top level try/catch block, so that the process is not terminated on an handled exception. On a caught exception the thread finishes. To check if any exceptions happened during execution use isSuccessful method. If thread failed, use getErrorMessage method to get the exception message. If in the thread any resource deallocation should be done after an exception cleanup method should be implemented in MyThread class.

if (!thread.isSuccessful())
	throw runtime_error(thread.getErrorMessage());

Mutex class provides two basic operations lock and unlock If two threads have to write to the same variable they both should receive the same mutex instance and before writing lock it and unlock on write completion. If the mutex is locked by another thread, the current thread waits for the mutex to be released.

class MyThread : public Thread
{
	public:
		MyThread(int &var, Mutex &mutex)
			: var(var), mutex(mutex)
		{};
		~MyThread (){};
		void run()
		{
			mutex.lock();
			var++;
			mutex.unlock();
		};
		int &var;
		Mutex &mutex;
};

MutexedVariable is a template class holding both mutex and the corresponding variable, so that less parameters should be passed around. In addition to mutex operations lock / unlock this class also provides direct access to the variable with getValue and copy and assigment operators. Note that in contrast to pthread primitives this class supports copying, which simply transfers the variable value.

MutexedVariable<int> mvar;
mvar.lock();
mvar = 10;
mvar.getValue()++;
mvar.unlock();

ConditionVariable is used to conditions on variables and signal their changes to the waiters. ConditionVariable should always be used with a mutex and a conditional expression. The methods of ConditionVariable are wait, signal and broadcast, they correspond directly to the pthread functions. It is worth noting, that the wait method is a cancellation point and a special care must be taken to unlock the mutex on cancellation, see e.g. ConditionVariableFull.

class WaitThread : public Thread
{
	public:
		WaitThread(int &var, Mutex &mutex,
				 ConditionVariable &cond)
			: var(var), mutex(mutex), cond(cond)
		{};
		~WaitThread (){};
		void run()
		{
			mutex.lock();
			//wait until var becomes above 10
			while (var <= 10)
				cond.wait(mut); //mutex is required!
			mutex.unlock();
		};
		int &var;
		Mutex &mutex;
		ConditionVariable &cond;
};

class SignalThread : public Thread
{
	public:
		SignalThread(int &var, Mutex &mutex,
				 ConditionVariable &cond)
			: var(var), mutex(mutex), cond(cond)
		{};
		~SignalThread (){};
		void run()
		{
			for (int i = 0; i < 15; i++)
			{
				mutex.lock();
				var++;
				cond.signal(); //wakes up one waiting thread
				mutex.unlock();
			}
			mutex.lock();
			cond.broadcast(); //wakes up all waiting threds
			mutex.unlock();
		};
		int &var;
		Mutex &mutex;
		ConditionVariable &cond;
};

ConditionVariableWithMutex combines both a condition variable and the corresponding mutex. It provides both the mutex and condition variable interfaces. The use precautions are the same as for ConditionVariable.

ConditionVariableWithMutex cond;
cond.lock();
cond.wait(); //no explicit mutex, the internal one is used
cond.unlock();
...
cond.signal();
cond.broadcast();

Template class ConditionVariableFull combines all three parts, i.e. variable, mutex and condition variable, in one place. This class provides both mutex and condition variable interfaces, and, in addition, also direct access to the variable with getValue, and copy and assigment operators. Note that in contrast to pthread primitives this class supports copying, which simply transfers the variable value. Besides, ConditionVariableFull implements conveniences methods for setting the value and signalling and waiting until a simple boolean condition, i.e. until equal, until larger, etc. These methods take care of proper locking/unlocking of mutexes and cancellation handling. The waitUntil methods employ pthread cleanup stack to install mutex unlocking on cancellation.

ConditionVariableFull<int> cond;
cond.lock();
cond.getValue()++;
cond.unlock();
...
cond.setAndSignal(10); //does locking itself
cond.setAndBroadcast(20); //does locking itself
...
cond.waitUntilEqual(20); //does locking and cleanup stack

Thread pool

ThreadPool creates a specified number of worker threads and manages a job queue to submit the jobs to the workers Worker. A job must be derived from AbstractJob class and implement execute and optionally cleanup methods. Jobs can be added to the pool by addJob methods taking either a single job or a vector of jobs. After completion jobs are not tracked anymore and are not deallocated.

#include <iostream>
#include <vector>
#include "Klubok/pool.h"
using namespace klubok;
using namespace std;

class MyJob : public AbstractJob
{
	public:
		MyJob() {};
		~MyJob (){};
		void execute()
		{
		   cout << "I'm the John's job!"<<endl; 
		};
};

int main()
{
	vector<AbstractJob*> jobs; 
	for (int i = 0; i<10; i++)
		jobs.push_back(new MyJob);
	ThreadPool pool(3); //3 worker threads
	poo.addJob(jobs);
	sleep(10); //not the proper way, see how to do this below
	for (int i = 0; i<10; i++)
		delete jobs[i];
}

ThreadPool starts worker threads execution on creation. Afterwards the workers run asynchronously. If it is required to synchronize the thread running the pool and the workers one of wait, waitUntilEmpty, stop and stopWhenEmpty can be used. The wait method simply joins all the workers to the pool thread, i.e. it will sit at this point until either all workers are stopped by an exception or the program is terminated. The waitUntilEmpty waits until no jobs are left in the queue and then returns, the last job from the queue is waited for. The stop method sends cancellation requests to all threads and waits until request completion. This method is used in ThreadPool destructor, so that if remaining jobs can be neglected it is enough to let the pool to go out of scope. The stopWhenEmpty method waits until the job queue becomes empty and only then sends the stop request. The latter method is particularly convenient if only a fixed number of jobs has to be accomplished, like in the above example.

//instead of sleeping
pool.waitUntilEmpty();

Note that Worker instances used deferred cancellation policy and set cancellation points before starting the job. If the job is very time consuming it is the user's responsibility to setup additional cancellation points and mutex cleanups. For small jobs it is not necessary however.

On any exception in the job the worker thread executing this job is stopped and is not restarted. The error message is passed to the pull. After pool completion, e.g. by stopWhenEmpty it is possible to check whether any exceptions happened by calling isSuccessful method. If there was an exception the details can be retrieved by getErrorMessage. It is possible to restart the pool by method restart.

if (!pool.isSuccessful())
	throw runtime_error(pool.getErrorMessage());

Build

Installation is standard for cmake projects.

>>> mkdir build && cd build
>>> cmake ..
>>> make
>>> make install

To build tests Boost.Tests is required.

>>> make tests
>>> tests/tests --log_level=test_suite

API

Doxygen generated

Updated