Source

annex / include / annex / thread_pool.hpp

#ifndef THREAD_POOL_HPP_INCLUDED
#define THREAD_POOL_HPP_INCLUDED

#include <thread>
#include <future>
#include <vector>

#include <boost/optional/optional.hpp>
#include <boost/asio.hpp>

#include "annex/make_invoke.hpp"
#include "annex/unique_task.hpp"
#include "annex/visibility.hpp"
#include "annex/concepts.hpp"

namespace annex {

class SHARED thread_pool {
public:
    explicit
    thread_pool(int nthreads = std::thread::hardware_concurrency());

    ~thread_pool();

    thread_pool(thread_pool const&) = delete;
    thread_pool& operator=(thread_pool const&) = delete;

    void
    run();

    void
    run_one();

    boost::asio::io_service&
    io_service();

    template<
        typename Functor
        , Requires<
            concepts::Callable<Functor, void()>
            , concepts::MoveConstructible<Functor>
        > = _
        , EnableIf<
            std::is_copy_constructible<Functor>
        > = _
    >
    void
    post(Functor&& t);

    template<
        typename Functor
        , Requires<
            concepts::Callable<Functor, void()>
            , concepts::MoveConstructible<Functor>
        > = _
        , DisableIf<
            std::is_copy_constructible<Functor>
        > = _
    >
    void
    post(Functor&& t);

    template<
        typename Functor
        , typename... Args
        // TODO: should change to new style
        , typename = Requires<
            concepts::Callable<Functor, void(Args...)>
        >
    >
    void
    post(Functor&& functor, Args&& ...args);

    template<
        typename Functor
        , Requires<
            concepts::Callable<Functor, void()>
        > = _
    >
    auto async(Functor&& functor)
    -> std::future<ResultOf<Functor()>>;

    template<
        typename Functor
        , typename ...Args
        , Requires<
            concepts::Callable<Functor, void(Args...)>
        > = _
    >
    auto async(Functor&& functor, Args&& ...args)
    -> std::future<ResultOf<Functor(Args...)>>;

private:
	std::vector<std::thread> workers;
    boost::asio::io_service io_service_;
    boost::optional<boost::asio::io_service::work> work;
};

template<
    typename Functor
    , Requires<
        concepts::Callable<Functor, void()>
        , concepts::MoveConstructible<Functor>
    >
    , EnableIf<
        std::is_copy_constructible<Functor>
    >
>
void
thread_pool::post(Functor&& functor)
{
    io_service().post(std::forward<Functor>(functor));
}

template<
    typename Functor
    , Requires<
        concepts::Callable<Functor, void()>
        , concepts::MoveConstructible<Functor>
    >
    , DisableIf<
        std::is_copy_constructible<Functor>
    >
>
void
thread_pool::post(Functor&& functor)
{
    // Workaround as Asio requires copy constructible functors
    auto shared = std::make_shared<Decay<Functor>>(std::forward<Functor>(functor));
    auto bound = std::bind([](decltype(shared)& p) { (*p)(); }, std::move(shared));
    io_service().post(std::move(bound));
}

template<
    typename Functor
    , typename ...Args
    , typename
    /*, Requires<
        concepts::Callable<Functor, void(Args...)>
    >*/
>
void
thread_pool::post(Functor&& functor, Args&& ...args)
{
    post( make_invoke(std::forward<Functor>(functor), std::forward<Args>(args)...) );
}

template<
    typename Functor
    , Requires<
        concepts::Callable<Functor, void()>
    >
>
auto thread_pool::async(Functor&& functor)
-> std::future<ResultOf<Functor()>>
{
    using result_type = ResultOf<Functor()>;
    using task_type = unique_task<result_type()>;
    task_type task(std::forward<Functor>(functor));
    auto future = task.get_future();
    post(std::move(task));
    return future;
}

template<
    typename Functor
    , typename... Args
    , Requires<
        concepts::Callable<Functor, void(Args...)>
    >
>
auto thread_pool::async(Functor&& functor, Args&&... args)
-> std::future<ResultOf<Functor(Args...)>>
{
    using result_type = ResultOf<Functor(Args...)>;
    using task_type = unique_task<result_type(Args...)>;
    task_type task(std::forward<Functor>(functor));
    auto future = task.get_future();
    post(make_invoke(std::move(task), std::forward<Args>(args)...));
    return future;
}

} // annex

#endif // THREAD_POOL_HPP_INCLUDED