Implementing thread pool based on C++11

Single task queue thread pool

It is very simple to implement a thread pool with a single task queue with the modern C++ standard library (thread + lock + condition variable). The basic implementation idea is: initialize the number of threads when the thread pool is constructed, and stop the thread pool when it is destructed. You only need to provide an interface for submitting tasks to the outside world.

interface design

return type

explicit ThreadPool(size_t threads = std::thread::hardware_concurrency());  // Constructor
​
template<typename F, typename... Args>
auto enqueue(F &&f, Args &&...args); // Enqueue interface

The return value of the template function enqueue() of the enqueue interface is deduced using the auto keyword, and the actual return value is actually a future.

Input parameters

The input parameter is a callable object and its parameters. Here, the C++11 variadic template is used to pass parameters of any number of callable objects.

Basic implementation

class ThreadPool {
public:
    explicit ThreadPool(size_t threads = std::thread::hardware_concurrency());
​
    template<typename F, typename... Args>
    auto enqueue(F &&f, Args &&...args);
​
    ~ThreadPool();
​
private:
    std::vector<std::thread> workers;
    std::queue<std::function<void()>> tasks;
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};

Note: std::thread::hardware_concurrency() is a useful function in newer versions of the C++ standard library. This function will return the number of threads that can be concurrently in a program at the same time. For example, on a multi-core system, the return value can be the number of CPU cores. The return value is just a hint. When the system information cannot be obtained, the function will also return 0. However, that doesn't hide how helpful this function is for the number of threads to start.

The members of this simple task queue thread pool have only one thread group and one task queue. To ensure thread safety of the task queue, a lock is provided. At the same time, a condition variable is provided, and the thread notification mechanism can be implemented by using locks and condition variables. The thread notification mechanism means that there are no tasks in the thread pool at the beginning, and all threads wait for the arrival of tasks. When a task enters the thread pool, it will notify a thread to process the incoming task.

At the same time, a stop variable is provided to stop and clean up tasks and threads when destructing. Because of laziness (high emotional intelligence: RAII style thread pool, the life cycle is basically the same as the life cycle of the application), the stop interface is not provided.

The following is the specific implementation:

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>

class ThreadPool {
public:
    ThreadPool(size_t);
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) 
        -> std::future<typename std::result_of<F(Args...)>::type>;
    ~ThreadPool();
private:
    // need to keep track of threads so we can join them
    std::vector< std::thread > workers;
    // the task queue
    std::queue< std::function<void()> > tasks;
    
    // synchronization
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};
 
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
    :   stop(false)
{
    for(size_t i = 0;i<threads;++i)
        workers.emplace_back(
            [this]
            {
                for(;;)
                {
                    std::function<void()> task;

                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock,
                            [this]{ return this->stop || !this->tasks.empty(); });
                        if(this->stop && this->tasks.empty())
                            return;
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }

                    task();
                }
            }
        );
}

// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) 
    -> std::future<typename std::result_of<F(Args...)>::type>
{
    using return_type = typename std::result_of<F(Args...)>::type;

    auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        
    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queue_mutex);

        // don't allow enqueueing after stopping the pool
        if(stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");

        tasks.emplace([task](){ (*task)(); });
    }
    condition.notify_one();
    return res;
}

// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for(std::thread &worker: workers)
        worker.join();
}

#endif

 

Example of use:

// create thread pool with 4 worker threads
ThreadPool pool(4);

// enqueue and store future
auto result = pool.enqueue([](int answer) { return answer; }, 42);

// get result from future
std::cout << result.get() << std::endl;

 

  

refer to:

Implementing thread pool based on C++11

https://github.com/lzpong/threadpool

https://github.com/progschj/ThreadPool

  

Tags: C++

Posted by akkad on Mon, 17 Oct 2022 10:37:51 +0530