Single task queue thread pool
It is very simple to implement a thread pool with a single task queue with the modern C++ standard library (thread + lock + condition variable). The basic implementation idea is: initialize the number of threads when the thread pool is constructed, and stop the thread pool when it is destructed. You only need to provide an interface for submitting tasks to the outside world.
interface design
return type
explicit ThreadPool(size_t threads = std::thread::hardware_concurrency()); // Constructor template<typename F, typename... Args> auto enqueue(F &&f, Args &&...args); // Enqueue interface
The return value of the template function enqueue() of the enqueue interface is deduced using the auto keyword, and the actual return value is actually a future.
Input parameters
The input parameter is a callable object and its parameters. Here, the C++11 variadic template is used to pass parameters of any number of callable objects.
Basic implementation
class ThreadPool { public: explicit ThreadPool(size_t threads = std::thread::hardware_concurrency()); template<typename F, typename... Args> auto enqueue(F &&f, Args &&...args); ~ThreadPool(); private: std::vector<std::thread> workers; std::queue<std::function<void()>> tasks; std::mutex queue_mutex; std::condition_variable condition; bool stop; };
Note: std::thread::hardware_concurrency() is a useful function in newer versions of the C++ standard library. This function will return the number of threads that can be concurrently in a program at the same time. For example, on a multi-core system, the return value can be the number of CPU cores. The return value is just a hint. When the system information cannot be obtained, the function will also return 0. However, that doesn't hide how helpful this function is for the number of threads to start.
The members of this simple task queue thread pool have only one thread group and one task queue. To ensure thread safety of the task queue, a lock is provided. At the same time, a condition variable is provided, and the thread notification mechanism can be implemented by using locks and condition variables. The thread notification mechanism means that there are no tasks in the thread pool at the beginning, and all threads wait for the arrival of tasks. When a task enters the thread pool, it will notify a thread to process the incoming task.
At the same time, a stop variable is provided to stop and clean up tasks and threads when destructing. Because of laziness (high emotional intelligence: RAII style thread pool, the life cycle is basically the same as the life cycle of the application), the stop interface is not provided.
The following is the specific implementation:
#ifndef THREAD_POOL_H #define THREAD_POOL_H #include <vector> #include <queue> #include <memory> #include <thread> #include <mutex> #include <condition_variable> #include <future> #include <functional> #include <stdexcept> class ThreadPool { public: ThreadPool(size_t); template<class F, class... Args> auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>; ~ThreadPool(); private: // need to keep track of threads so we can join them std::vector< std::thread > workers; // the task queue std::queue< std::function<void()> > tasks; // synchronization std::mutex queue_mutex; std::condition_variable condition; bool stop; }; // the constructor just launches some amount of workers inline ThreadPool::ThreadPool(size_t threads) : stop(false) { for(size_t i = 0;i<threads;++i) workers.emplace_back( [this] { for(;;) { std::function<void()> task; { std::unique_lock<std::mutex> lock(this->queue_mutex); this->condition.wait(lock, [this]{ return this->stop || !this->tasks.empty(); }); if(this->stop && this->tasks.empty()) return; task = std::move(this->tasks.front()); this->tasks.pop(); } task(); } } ); } // add new work item to the pool template<class F, class... Args> auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; auto task = std::make_shared< std::packaged_task<return_type()> >( std::bind(std::forward<F>(f), std::forward<Args>(args)...) ); std::future<return_type> res = task->get_future(); { std::unique_lock<std::mutex> lock(queue_mutex); // don't allow enqueueing after stopping the pool if(stop) throw std::runtime_error("enqueue on stopped ThreadPool"); tasks.emplace([task](){ (*task)(); }); } condition.notify_one(); return res; } // the destructor joins all threads inline ThreadPool::~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; } condition.notify_all(); for(std::thread &worker: workers) worker.join(); } #endif
Example of use:
// create thread pool with 4 worker threads ThreadPool pool(4); // enqueue and store future auto result = pool.enqueue([](int answer) { return answer; }, 42); // get result from future std::cout << result.get() << std::endl;
refer to:
Implementing thread pool based on C++11
https://github.com/lzpong/threadpool
https://github.com/progschj/ThreadPool