Source

dbsdev / sources / utils / threadpool / KMThread.cpp

Full commit
#include "BFSleep.h"

#include "amp/amp_return_code.h"

#include "KMThread.h"
using namespace kmp::threading;

KMThread::KMThread()
{
	_thread = NULL;

	m_ptask = NULL;
	m_pthreadpool = NULL;

	m_brunning = false;
	m_uithreadID = 0;
	// create mutex
	amp_mutex_create(&_lock,AMP_DEFAULT_ALLOCATOR);
}

KMThread::~KMThread()
{
	if (m_brunning)
		End();
}

void KMThread::SetTask(KMTask* task)
{
	m_ptask = task;
}

void KMThread::Begin()
{
	// Set our thread pool
	m_pthreadpool = KMThreadpool::getInstance();
	printf("create thread...");
	int ret = amp_thread_create_and_launch(&_thread,
                                           AMP_DEFAULT_ALLOCATOR,
                                           (void*)this, 
                                           &cThreadProc);
	m_brunning = true;
	if (ret!=AMP_SUCCESS)
		m_brunning = false;
	printf(m_brunning ? "ok!\n":"failed\n");
// #if defined( _WIN32 )
// 	// Start the thread.
// 	m_hthread = (HANDLE)_beginthreadex( NULL,
// 		0,
// 		&cThreadProc,
// 		(void*)this,
// 		0,
// 		&m_uithreadID );

// 	m_brunning = true;
// 	if( m_hthread == NULL )
// 	{
// 		// You can add extra error-handling here.
// 		m_brunning = false;
// 	}
// #endif /* defined( _WIN32 ) */
}

void KMThread::End()
{
	m_brunning = false;
	amp_thread_join_and_destroy(&_thread, AMP_DEFAULT_ALLOCATOR);
// #if defined( _WIN32 )
// 	if( m_hthread != NULL )
// 	{
// 		m_brunning = false;
// 		WaitForSingleObject( m_hthread, INFINITE );
// 		DWORD ExitCode;
// 		GetExitCodeThread( m_hthread, &ExitCode );
// 		CloseHandle( m_hthread );
// 		m_hthread = NULL;
// 	}
// #endif /* defined( _WIN32 ) */
}

void KMThread::ThreadProc()
{
	m_ptask = NULL;

	// The main thread-loop. As long as this loop
	// is running, the thread stays alive.
	while(m_brunning)
	{

		//utils::sleep(1);	// wait 1 ms

		// The thread pauses when it finishes a task.
		// Adding a task resumes it.
		if(m_ptask != NULL)
		{
			//amp_mutex_lock(_lock);   <---- needed in case of shared task data (void* pointer)
			{
				KMTaskFunc task = m_ptask->GetTask();
				IKMTaskData* data = m_ptask->GetData();
				// Run the actual task
				if(task != NULL && data != NULL)
				{
					task(data);
				}
				// Task is complete.
				m_bpaused = true;
				delete m_ptask;
				m_ptask = NULL;
			}
			//amp_mutex_unlock(_lock);
		}
		
		// If we're finished with our task, grab a new one.
		if(/*m_bpaused == true && */m_ptask == NULL &&
			m_pthreadpool->IsProcessing() == true)
		{
			m_ptask = m_pthreadpool->m_qtaskList.pop();
		}
	}

}