Source

dbsdev / temp / test_threadpool / main.cpp

Full commit
//////////////////////////////////////////////////////////////////////////
// Main.cpp
//
// Author: Keith Maggio
// Purpose: Main program entrance.
// Free for use and modification.
// Revision 1, July 4th, 2010
//////////////////////////////////////////////////////////////////////////
#include <iostream>
#include <string>
#include <ctime>
#include "KMUtility.h"
#include "KMThreadPool.h"
using kmp::threading::KMThreadpool;
using kmp::threading::utility::IKMTaskData;

#include "amp/amp.h"

// Time keeping
#include "BFTimer.h"
#include "BFSleep.h"

#define MAX_TASKS 2500 // Change this to modify the number of tasks

//////////////////////////////////////////////////////////////////////////
// This is our sample derived data class.
class SampleTaskData : public IKMTaskData
{
public:
    int tasknum;
    int counter;
    double sinval;
    SampleTaskData()
    : counter(0), sinval(0.0)
    {}
    SampleTaskData(int data) { tasknum = data; }
    ~SampleTaskData() {}
};

void Sample1(IKMTaskData* data)
{
    // Once we have the sample data, we cast it
    // To the type we need.
    SampleTaskData* _data = (SampleTaskData*)data;
    // printf("S%d\n", _data->tasknum); // When the task started
    for(int i=0;i<2000000;++i)
    {
        _data->counter++;
        _data->counter/=2;
        _data->sinval = sin(_data->counter);
    }
    // printf("F%d\n", _data->tasknum); // When the task finishes
}

void Sample2(IKMTaskData* data)
{
    SampleTaskData* _data = (SampleTaskData*)data;
    // printf("S%d\n", _data->tasknum);
    for(int i=0;i<500000;++i)
    {
        _data->counter++;
        _data->counter/=2;
        _data->sinval = sin(_data->counter);
    }
    // printf("F%d\n", _data->tasknum);
}
//////////////////////////////////////////////////////////////////////////

int main(int argc, char *argv[])
{
// #ifdef __Debug__
    std::string tmpstr;
    std::cout << "type anything to start\n";
    std::cin >> tmpstr;
// #endif

    BFTimer elapsed;

    amp_platform_t platform = AMP_PLATFORM_UNINITIALIZED;
    
    int error_code = AMP_SUCCESS;
    error_code = amp_platform_create(&platform,AMP_DEFAULT_ALLOCATOR);
    if (error_code != AMP_SUCCESS)
        return -1;
    
    // Extract numbers from platform description.
    std::size_t core_count = 0;
    std::size_t active_core_count = 0;
    std::size_t hwthread_count = 0;
    std::size_t active_hwthread_count = 0;

    amp_platform_get_installed_core_count(platform, &core_count);
    amp_platform_get_active_core_count(platform, &active_core_count);
    amp_platform_get_installed_hwthread_count(platform, &hwthread_count);
    amp_platform_get_active_hwthread_count(platform, &active_hwthread_count);
    amp_platform_destroy(&platform,AMP_DEFAULT_ALLOCATOR);    
    
    std::cout << "amp_platform_check\n";
    std::cout << "Core count    : " << active_core_count << "/" << core_count << " (active/installed)\n";
    std::cout << "Hwthread count: " << active_hwthread_count << "/" << hwthread_count << " (active/installed)\n";
    std::cout << "\n\n";


    // -- serial execution -- //
    printf("Serial execution....\n");
    SampleTaskData* sdata = new SampleTaskData(0);
    elapsed.start();
    for(int i = 0; i < MAX_TASKS; ++i)
    {
        if(i%2==0)
            Sample1(sdata);
        else
            Sample2(sdata);
    }
    float serialTime = elapsed.getElapsedTime();
    elapsed.stop();

    // -- thread pool -- //
    unsigned int minWorkers = core_count;
    unsigned int maxWorkers = hwthread_count;
    if (argc>1) minWorkers = (unsigned int)atoi(argv[1]);
    if (argc>2) maxWorkers = (unsigned int)atoi(argv[2]);

    KMThreadpool* pool = KMThreadpool::getInstance();
    // Initialize the threadpool.
    pool->Initialize(minWorkers,maxWorkers);
    printf("Thread pool Initialized with %d threads.\n", pool->getActiveThreads());
    // Add tasks to our threadpool.
    printf("Thread pool execution....\n");
    for(int i = 0; i < MAX_TASKS; ++i)
    {
        SampleTaskData* data = new SampleTaskData(i);
        if(i%2==0)
            pool->AddTask(Sample1, data);
        else
            pool->AddTask(Sample2, data);
    }

    // See how long it takes to execute
    elapsed.start();
    pool->BeginProcessing();
    // while(true)
    // {
    //     // Check to see if all tasks are completed. This is
    //     // only for this sample program. In others, the
    //     // pool will be shut down once the program exits.
    //     volatile unsigned int at = pool->getActiveThreads();
    //     volatile unsigned int dt = pool->getNumDormantThreads();
    //     volatile unsigned int wt = pool->hasWaitingTasks() ? 1 : 0;
    //     //printf(" %d %d %d\n",at,dt,wt);
    //     if( pool->getActiveThreads()==pool->getNumDormantThreads() && !pool->hasWaitingTasks())
    //         break;

    // }
    while (pool->hasWaitingTasks())
    {
        // wait
        utils::sleep(1);   // sleep 1 ms
    }

    float poolTime = elapsed.getElapsedTime();
    pool->StopProcessing();
    // Fin
    pool->Shutdown();
    printf("Serial execution time:      %f seconds.\n", serialTime);
    printf("Thread Pool execution time: %f seconds.\n", poolTime);
    // printf("CPU Time (ExecutionTime/NumberOfThreads+NumberOfProcessors) is %f seconds\n     on a duel-core processor. ",
    //         (imediatefinish/(pool->getActiveThreads()+2)));

    return 0;
}