I have the audacity to aggregate (copy) other people's blogs for my own convenience
Reference (plagiarism)
C++ Reference
http://www.cplusplus.com/reference/thread/
C++ threads Usage Summary (collation)_sevenjoin's blog-CSDN blog_c++ thread
https://blog.csdn.net/sevenjoin/article/details/82187127
C++ Multithreading (1) Blog of threadclass_coolwriter-CSDN Blog_c++ thread
https://blog.csdn.net/coolwriter/article/details/79883253
c++ 11 Multiline Thread Series--mutex_chen-CSDN Blog_c++ mutex
https://blog.csdn.net/chenxun_2010/article/details/49786263
C++11 condition_variable uses detail-Xiaohai Brother de-Blog Park
https://www.cnblogs.com/xiaohaigegede/p/14008121.html
Thread pool implementation based on C++11
https://zhuanlan.zhihu.com/p/367309864
C++11 Threads
Multithreaded programming in the new C++11 standard includes five headers, namely <atomic>, <thread>, <mutex>, <condition_variable>, and <future>.
- <atomic>: Header file. This header mainly declares two classes, std::atomic and std::atomic_flag. It also declares a set of C-style atomic types and functions for C-compatible atomic operations.
- <thread>: Thread header file. The header file mainly declares the std::thread class, and the std::this_thread namespace is also in the header file.
- <mutex>: Mutex variable header file. This header file mainly declares mutex-related classes, including std::mutex family classes, std::lock_guard, std::unique_lock, and other types and functions.
- <condition_variable>: Conditional variable header file. This header file mainly declares classes related to conditional variables, including std::condition_variable and std::condition_variable_any.
- <future>: Provides asynchronous access. This header file mainly declares two Provider classes std::promise, std::package_task, and two Future classes std::future and std::shared_future. There are also some types and functions associated with it, and the std::async() function is declared in this header file.
thread library
Threadclass member function:
-
get_id: Gets the thread ID and returns an object of type std::thread::id.
-
Joinable: Checks whether a thread can be joined. Checks whether a threaded object identifies a viable thread for an activity. The threaded object constructed by default, the threaded object that has completed the join, and the threaded object that has detach ed are not joinable.
-
Join: Calling this function will block the current thread. Block the thread where the caller is located until the thread identified by the join std:: threadobject ends execution.
-
detach: Separates the execution instance represented by the current thread object from the thread object, enabling the thread to execute independently. Once the thread executes, its allocated resources are released.
-
Native_handle: This function returns the thread handle associated with the specific implementation of std::thread s. native_handle_type is the bridge between threads and the operating system SDK API. For example, in Linux g++(libstdc+), native_handle_type is actually the pthread_t type inside pthreads, when the functions of the threads class cannot meet our requirements (such as changing the priority of a thread), you can call the related pthread function to the directory by using the native_handle() return value of the threadclass instance as a parameter.
-
Swap: swap the underlying handle represented by two threaded objects.
-
operator=: moves the thread object
-
hardware_concurrency: A static member function that returns the maximum number of hardware concurrent threads for the current computer. Basically, it can be considered the number of cores in the processor.
The difference between detach and join here is that join() lets the user manage the thread manually and blocks the current thread until the end of the thread identified by *this; detach() does not block the current thread, detaches the thread independently by detaching the threadobject, and frees all resources of the thread when it has finished running.
Example constructor
// thread example #include <iostream> // std::cout #include <thread> // std::thread void foo() { // do stuff... for(int i=0;i<10;++i) std::cout << "foo func run "<<i<<" times\n"; } void bar(int x) { // do stuff... for(int i=0;i<10;++i) std::cout << "bar func run "<<i<<" times with param ="<<x<<"\n"; } int main() { std::thread first (foo); // spawn new thread that calls foo() std::thread second (bar,0); // spawn new thread that calls bar(0) std::cout << "main, foo and bar now execute concurrently...\n"; // synchronize threads: // Thread join or detach must wait for the child thread to end the main process before it can exit first.join(); // pauses until first finishes second.join(); // pauses until second finishes std::cout << "foo and bar completed.\n"; return 0; }
- Output results (multiple output results can be found in inconsistent order):
main, foo and bar now execute concurrently... foo func run 0 times foo func run 1 times foo func run 2 times foo func run 3 times foo func run 4 times foo func run 5 times foo func run 6 times foo func run 7 times foo func run 9 times bar func run 0 times with param =0 bar func run 1 times with param =0 bar func run 2 times with param =0 bar func run 3 times with param =0 bar func run 4 times with param =0 bar func run 5 times with param =0 bar func run 6 times with param =0 bar func run 7 times with param =0 bar func run 8 times with param =0 bar func run 9 times with param =0 foo and bar completed.
move assignment example
// Move assignment operation, if the current object is not joinable, needs to pass a right-value reference (rhs) to the move assignment operation; if the current object can be joinable, terminate() will fail. move (1) thread& operator= (thread&& rhs) noexcept; // Copy assignment is disabled and thread s cannot be copied. copy [deleted] (2) thread& operator= (const thread&) = delete;
// example for thread::operator= #include <iostream> // std::cout #include <thread> // std::thread, std::this_thread::sleep_for #include <chrono> // std::chrono::seconds void pause_thread(int n) { std::this_thread::sleep_for(std::chrono::seconds(n));//C++ 11 this_threadclass std::cout << "pause of " << n << " seconds ended\n"; } int main() { std::thread threads[5]; // default-constructed threads std::cout << "Spawning 5 threads...\n"; for (int i = 0; i<5; ++i) threads[i] = std::thread(pause_thread, i + 1); // move-assign threads Call the move replication function here std::cout << "Done spawning threads. Now waiting for them to join:\n"; for (int i = 0; i<5; ++i) threads[i].join(); std::cout << "All threads joined!\n"; return 0; }
join
Calling this function blocks the current thread. Blocks the thread on which the caller resides until the thread identified by the join ed std::thread object ends execution.
The main thread waits for the current thread to execute before exiting.
#include<thread> #include<array> using namespace std; void show() { cout << "hello cplusplus!" << endl; } int main() { array<thread, 3> threads = { thread(show), thread(show), thread(show) }; for (int i = 0; i < 3; i++) { cout << threads[i].joinable() << endl;//Determine if a thread can join threads[i].join();//Main thread waits for current thread execution to complete before exiting } return 0; } Run Results: hello cplusplus! hello cplusplus! 1 hello cplusplus! 1 1
//joinTest.cc #include<iostream> #include<thread> class Obj { public: Obj() {std::cout << "hello ";} ~Obj() {std::cout << "world\n";} }; void joinWorker() { Obj obj; std::this_thread::sleep_for(std::chrono::seconds(2)); } int main() { std::thread j(joinWorker); j.join(); std::this_thread::sleep_for(std::chrono::seconds(1)); return 0; } Output: hello world
detach
Separates the execution instance represented by the current thread object from the thread object, enabling the thread to execute independently. Once the thread executes, its allocated resources are released.
Thread detach is detached from the main thread's binding, the main thread hangs, the child thread does not error, and the child thread executes and exits automatically.
After the thread detach, the child threads become orphaned threads and cannot communicate with each other.
#include<thread> using namespace std; void show() { cout << "hello cplusplus!" << endl; } int main() { thread th(show); //th.join(); th.detach();//Without the main thread's binding, the main thread hangs up, the sub-threads do not error, and the sub-threads automatically exit after execution. //After detach, the child threads become orphan threads and cannot communicate with each other. cout << th.joinable() << endl; return 0; } Run Results: hello cplusplus!
//detachTest.cc #include<iostream> #include<thread> class Obj { public: Obj() {std::cout << "hello ";} ~Obj() {std::cout << "world\n";} }; void detachWorker() { Obj obj; std::this_thread::sleep_for(std::chrono::seconds(2)); } int main() { std::thread d(detachWorker); d.detach(); std::this_thread::sleep_for(std::chrono::seconds(1)); return 0; } output hello
Thread Exchange and Thread Move
- Thread Exchange
#include<thread> using namespace std; int main() { thread t1([]() { cout << "thread1" << endl; }); thread t2([]() { cout << "thread2" << endl; }); cout << "thread1' id is " << t1.get_id() << endl; cout << "thread2' id is " << t2.get_id() << endl; cout << "swap after:" << endl; swap(t1, t2);//Thread Exchange cout << "thread1' id is " << t1.get_id() << endl; cout << "thread2' id is " << t2.get_id() << endl; return 0; } Run Results: thread1 thread2 thread1' id is 4836 thread2' id is 4724 swap after: thread1' id is 4724 thread2' id is 4836
- Thread Move
#include<thread> using namespace std; int main() { thread t1([]() { cout << "thread1" << endl; }); cout << "thread1' id is " << t1.get_id() << endl; thread t2 = move(t1);; cout << "thread2' id is " << t2.get_id() << endl; return 0; } Run Results: thread1 thread1' id is 5620 thread2' id is 5620
From the above code, thread t2 can move T1 through move to get all the properties of t1, but T1 is destroyed.
Thread Security
A quick look at the sample code
**#include<thread> using namespace std; const int N = 100000000; int num = 0; void run() { for (int i = 0; i < N; i++) { num++; } } int main() { clock_t start = clock(); thread t1(run); thread t2(run); t1.join(); t2.join(); clock_t end = clock(); cout << "num=" << num << ",Time-consuming " << end - start << " ms" << endl; return 0; } Run Results: num=143653419,Time consumed 730 ms
This is not the expected 200000000. Threads conflict, which results in incorrect results. The result may be any value between 2 and 2000 000. We can solve this problem by mutex, atomic variable, join, etc.
- join
#include<thread> using namespace std; const int N = 100000000; int num = 0; void run() { for (int i = 0; i < N; i++) { num++; } } int main() { clock_t start = clock(); thread t1(run); t1.join(); thread t2(run); t2.join(); clock_t end = clock(); cout << "num=" << num << ",Time-consuming " << end - start << " ms" << endl; return 0; } Run result: num=200000000,Time consumed 626 ms
- Atomic Variables
#include<thread> #include<atomic> using namespace std; const int N = 100000000; atomic_int num{ 0 };//No thread conflicts, thread security void run() { for (int i = 0; i < N; i++) { num++; } } int main() { clock_t start = clock(); thread t1(run); thread t2(run); t1.join(); t2.join(); clock_t end = clock(); cout << "num=" << num << ",Time-consuming " << end - start << " ms" << endl; return 0; } Run Results: num=200000000,Time consumed 29732 ms
- mutex
#include<thread> #include<mutex> using namespace std; const int N = 100000000; int num(0); mutex m; void run() { for (int i = 0; i < N; i++) { m.lock(); num++; m.unlock(); } } int main() { clock_t start = clock(); thread t1(run); thread t2(run); t1.join(); t2.join(); clock_t end = clock(); cout << "num=" << num << ",Time-consuming " << end - start << " ms" << endl; return 0; } Run result: num=200000000,Time consumed 128323 ms
Mutex mutex
- Four species of mutex
std::mutex, the most basic Matrix class.
std::recursive_mutex, recursive Mutex class.
std::time_mutex, Timer Mutex class.
std::recursive_timed_mutex, recursive Mutex class. - Lock class (two)
std::lock_guard, related to Mutex RAII, facilitates thread locking on mutex.
std::unique_lock, related to Mutex RAII, facilitates thread lock on mutex, but provides better lock and unlock control. - Other types
std::once_flag
std::adopt_lock_t
std::defer_lock_t
std::try_to_lock_t - Member function
std::try_lock, trying to lock multiple mutexes at once.
std::lock, can lock multiple mutexes at the same time.
std::call_once, if multiple threads need to call a function at the same time, call_once can guarantee that multiple threads will call the function only once.
lock class
lock_guard
The lock_guard object is usually used to manage a lock object, so it is related to Mutex RAII to make it easier for threads to lock on mutex exclusions, that is, the lock object it manages will remain locked for the duration of a lock_guard object's declaration cycle; and the lock_guard object it manages will be unlocked after the end of its life cycle(Note: Smart pointers like shared_ptr manage dynamically allocated memory resources).
When the lock_guard object is constructed, the Mutex object passed in (that is, the Mutex object it manages)Will be locked by the current thread. When the lock_guard object is destructed, the Mutex object it manages will be automatically unlocked. Since it does not require programmers to call lock and unlock manually to unlock and unlock Mutex, this is also the easiest and safest way to unlock and unlock Mutex, especially if the Mutex object that was previously locked after the program throws an exception can be unlocked correctlyOperation greatly simplifies the programmer's writing Mutex-related exception handling code.
- locking initialization
The lock_guard object manages the Mutex object m and locks m at construction time (m.lock()). - adopting initialization
The lock_guard object manages the Mutex object m, which, unlike locking initialization (1), is locked by the current thread. - Copy construction and move construction of lock_guard objects are disabled, so lock_guard objects cannot be copied or moved.
- Default locking initialization
// lock_guard example #include <iostream> // std::cout #include <thread> // std::thread #include <mutex> // std::mutex, std::lock_guard #include <stdexcept> // std::logic_error std::mutex mtx; void print_even (int x) { if (x%2==0) std::cout << x << " is even\n"; else throw (std::logic_error("not even")); } void print_thread_id (int id) { try { // using a local lock_guard to lock mtx guarantees unlocking on destruction / exception: std::lock_guard<std::mutex> lck (mtx); print_even(id); } catch (std::logic_error&) { std::cout << "[exception caught]\n"; } } int main () { std::thread threads[10]; // spawn 10 threads: for (int i=0; i<10; ++i) threads[i] = std::thread(print_thread_id,i+1); for (auto& th : threads) th.join(); return 0; } Output: [exception caught] 4 is even [exception caught] 2 is even [exception caught] 6 is even [exception caught] 8 is even 10 is even [exception caught]
- adopting initialization, the MTX is first locked (mtx.lock()ποΌThen use the MTX object to construct a lock_guard object
#include <iostream> // std::cout #include <thread> // std::thread #include <mutex> // std::mutex, std::lock_guard, std::adopt_lock std::mutex mtx; // mutex for critical section void print_thread_id(int id) { mtx.lock(); std::lock_guard<std::mutex> lck(mtx, std::adopt_lock); std::cout << "thread #" << id << '\n'; } int main() { std::thread threads[10]; // spawn 10 threads: for (int i = 0; i<10; ++i) threads[i] = std::thread(print_thread_id, i + 1); for (auto& th : threads) th.join(); return 0; }
unique_lock
Lock_guard only guarantees unlock operations at the time of destruction, and lock_guard itself does not provide an interface for locking and unlocking.
Mutex locks ensure synchronization between threads, but they turn parallel operations into serial operations, which have a significant impact on performance, so we want to minimize the area of locks that we lock, that is, use fine-grained locks.
To solve this problem, you can use unique_lock:
The lock() and unlock() interfaces are provided to record whether a lock is currently or not locked, and when destructing, the decision to unlock is made based on the current state (lock_guard is always unlocked).
The unique_lock object manages the lock and unlock operations on the mutex object in an exclusive ownership manner. The so-called exclusive ownership means that no other unique_lock object owns the ownership of a mutex object at the same time.
When constructing (or moving) assignments, the unique_lock object needs to pass a Mutex object as its parameter, and the newly created unique_lock object is responsible for the locking and unlocking of the incoming Mutex object.
Std:: The unique_lock object also guarantees that the Mutex object it manages will be properly unlocked when it destructs itself (even if the unlock function is not explicitly called).
unique_lock is much more flexible, less efficient, and takes up a little more memory than lock_guard.
- Default constructor
The newly created unique_lock object does not handle any Mutex objects. - locking initialization
The newly created unique_lock object manages the Mutex object m and attempts to call m.lock() to lock the Mutex object. If another unique_lock object already manages the Mutex object m at this time, the current thread will be blocked. - try-locking initialization
The newly created unique_lock object manages the Mutex object m and attempts to call m.try_lock() to lock the Mutex object, but if the lock is not successful, the current thread will not be blocked. - deferred initialization
The newly created unique_lock object manages the Mutex object m, but does not lock the Mutex object at initialization. m should be a Mutex object that is not locked by the current thread. - adopting initialization
The newly created unique_lock object manages the Mutex object m, which should be a Mutex object that has been locked by the current thread. (And the newly created unique_lock object currently has ownership of the Lock). - locking duration
The newly created unique_lock object manages the Mutex object m and attempts to lock the Mutex object for a period of time (rel_time) by calling m.try_lock_for(rel_time). - locking until a time point
The newly created unique_lock object manages the Mutex object m and attempts to lock the Mutex object before a certain point in time (abs_time) by calling m.try_lock_until(abs_time). - Copy Construction [Disabled]
The unique_lock object cannot be copied. - Move construction
The newly created unique_lock object takes ownership of the Mutex object managed by x, including the state of the current Mutex. After calling the move construct, the X object will no longer manage any Mutex objects, just as it was created by the default constructor.
/* * @Descripttion: * @version: * @Author: yanqiu * @Date: 2021-09-21 13:56:47 * @LastEditors: yanqiu * @LastEditTime: 2021-09-21 13:56:48 */ // unique_lock example #include <iostream> // std::cout #include <thread> // std::thread #include <mutex> // std::mutex, std::unique_lock std::mutex mtx; // mutex for critical section void print_block (int n, char c) { // critical section (exclusive access to std::cout signaled by lifetime of lck): std::unique_lock<std::mutex> lck (mtx); for (int i=0; i<n; ++i) { std::cout << c; } std::cout << '\n'; } int main () { std::thread th1 (print_block,50,'*'); std::thread th2 (print_block,50,'$'); th1.join(); th2.join(); return 0; } Output: ************************************************** $$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
- adopt_lock
#include <iostream> // std::cout #include <thread> // std::thread #include <mutex> // std::mutex, std::lock, std::unique_lock // std::adopt_lock, std::defer_lock std::mutex foo, bar; void task_a() { std::lock(foo, bar); // simultaneous lock (prevents deadlock) std::unique_lock<std::mutex> lck1(foo, std::adopt_lock); std::unique_lock<std::mutex> lck2(bar, std::adopt_lock); std::cout << "task a\n"; // (unlocked automatically on destruction of lck1 and lck2) } void task_b() { // foo.lock(); bar.lock(); // replaced by: std::unique_lock<std::mutex> lck1, lck2; lck1 = std::unique_lock<std::mutex>(bar, std::defer_lock); lck2 = std::unique_lock<std::mutex>(foo, std::defer_lock); std::lock(lck1, lck2); // simultaneous lock (prevents deadlock) std::cout << "task b\n"; // (unlocked automatically on destruction of lck1 and lck2) } int main() { std::thread th1(task_a); std::thread th2(task_b); th1.join(); th2.join(); return 0; }
Atomic variable atomic
std::atomic encapsulates data structures such as int, char, bool atomically. In a multi-threaded environment, access to std::atomic objects does not cause competition-risk. Using std::atomic, lock-free design of data structures can be achieved.
The so-called atomic operation takes the form of "atom is the smallest and indivisible individual"Meaning, it means that when more than one thread accesses the same global resource, it ensures that all other threads do not access the same resource at the same time. That is, it ensures that only one thread accesses the resource at the same time. This is a bit like mutually exclusive objects protecting access to shared resources, but atomic operations are closer to the bottom becauseAnd it's more efficient.
#include <iostream> #include <ctime> #include <vector> #include <thread> #include <atomic> std::atomic<size_t> count(0); void threadFun() { for (int i = 0; i < 10000; i++) count++; } int main(void) { clock_t start_time = clock(); // Start multiple threads std::vector<std::thread> threads; for (int i = 0; i < 10; i++) threads.push_back(std::thread(threadFun)); for (auto&thad : threads) thad.join(); // Check if count is correct 10000*10 = 100000 std::cout << "count number:" << count << std::endl; clock_t end_time = clock(); std::cout << "Time consuming:" << end_time - start_time << "ms" << std::endl; return 0; } Output: count number:100000 Time consuming: 15 ms
- std::atomic_flag
std::atomic_flag is a Boolean type of atom that supports two atomic operations:
- test_and_set, returns true if the atomic_flag object is set;If the atomic_flag object is not set, set it to return false
- clear.Clear atomic_flag object
- std::atomic_flag can be used for synchronization between multiple threads, similar to semaphores in linux. mutex can be implemented using atomic_flag.
#include <iostream> #include <atomic> #include <vector> #include <thread> #include <sstream> std::atomic_flag lock = ATOMIC_FLAG_INIT; std::stringstream stream; void append_numer(int x) { while (lock.test_and_set()); stream << "thread#" << x << "\n"; lock.clear(); } int main() { std::vector<std::thread> ths; for (int i=0; i<10; i++) ths.push_back(std::thread(append_numer, i)); for (int i=0; i<10; i++) ths[i].join(); std::cout << stream.str(); return 0; }
Perfect Forwarding and Transfer Semantics
C++ Advanced Knowledge: Deep Analysis of Mobile Constructors and Their Principles | Audio-visual Jump Technology
http://avdancedu.com/a39d51f9/
Chat about Perfect Forwarding in C++-Know
https://zhuanlan.zhihu.com/p/161039484
The purpose of std::move is to change to a right value after std::move, regardless of whether you pass it a left or right value.
std::forward is called perfect forwarding and its purpose is to keep the original value property unchanged.
Conditional variable condition_variable
In C++11, we can use a condition_variable to synchronize multiple threads; when the condition is not met, the threads involved are blocked until a condition occurs, and the threads are awakened.
When Dead Loop is needed to determine whether a condition is true or false, we often need to open a thread Dead Loop to determine whether it is valid or not, which consumes CPU very much. Using Conditional Variables, you can let the current thread wait, release the CPU, and if the condition changes, we notify to exit the thread and make another judgment.
When a wait function of the std::condition_variable object is called, it uses std::unique_lock (through std::mutex) to lock the current thread. The current thread will be blocked until another thread calls the notification function on the same std::condition_variable object to wake up the current thread.
std::condition_variable object usually uses std::unique_lockstd::mutex to wait
Threads that want to modify shared variables (that is, "conditions") must:
(1).Get a std::mutex
(2).Perform modification actions when holding locks
(3).Perform notify_one or notify_all on std::condition_variable (locks are not required when acting as notify actions)
1. wait function:
(1)wait(unique_lock οΌlck)
Execution of the current thread will be blocked until notify is received.
(2)wait(unique_lock οΌlckοΌPredicate pred)
The current thread is blocked only when pred=false; it is not blocked if pred=true.
Wait () can be divided into three operations in turn: release mutex, wait on condition variable, and acquire mutex again
2,notify_one:
notify_one(): No parameters, no return value.
Unblock one of the threads currently waiting for this condition. If no threads are waiting, the function does nothing. If more than one, no specific thread is specified.
// condition_variable example #include <iostream> // std::cout #include <thread> // std::thread #include <mutex> // std::mutex, std::unique_lock #include <condition_variable> // std::condition_variable std::mutex mtx; std::condition_variable cv; bool ready = false; void print_id (int id) { std::unique_lock<std::mutex> lck(mtx); while (!ready) cv.wait(lck); // ... std::cout << "thread " << id << '\n'; } void go() { std::unique_lock<std::mutex> lck(mtx); ready = true; cv.notify_all(); } int main () { std::thread threads[10]; // spawn 10 threads: for (int i=0; i<10; ++i) threads[i] = std::thread(print_id,i); std::cout << "10 threads ready to race...\n"; go(); // go! for (auto& th : threads) th.join(); return 0; }
Thread Pool
After understanding the above basic knowledge, you can finally understand the program.
Why do I need a thread pool?
-
Reduce resource consumption. Reduce thread creation and destruction by reusing created threads.
-
Increase response speed. When a task arrives, it can execute immediately without waiting for a thread to be created.
-
Improving thread manageability. Threads are scarce resources. Creating threads without restrictions not only consumes system resources, but also reduces system stability. Thread pools allow for uniform allocation, tuning, and monitoring
Thread pool principles
Reference (plagiarism)
Thread pool implementation based on C++11
https://zhuanlan.zhihu.com/p/367309864
-
Thread pool component
1. Thread pool manager: Used to create and manage thread pools, including creating and destroying thread pools and adding new tasks;
2. Work threads: refers to threads in the thread pool that are waiting when there are no tasks and can execute tasks in a loop when there are tasks.
3. Task interface: The interface that each task must implement to allow the worker thread to schedule the execution of the task. It mainly defines the entrance of the task, the end work after the task is executed, the status of the task execution, etc.
4. Task queue: It stores unprocessed tasks and provides a buffer mechanism. -
Task Execution Process
1. Has the core number of threads been reached? If not, a worker thread is created to perform the task.
2. If the work queue is full or not, the submitted tasks are stored in the work queue.
3. Whether the maximum number of threads has been reached or not, a new worker thread is created to perform the task.
4. Finally, execute a rejection policy to handle this task.
Realization Difficulties
- How do threads reuse? C++ threads perform tasks with fixed task functions and end the thread after execution. How do you assign tasks and threads?
- How does a thread-safe task queue work?
- As the main body of a thread pool, how does the thread pool function to submit tasks? Any function that accepts any parameters
Thread-safe Task Queue
- Wrap std::queue with std::mutex
- unique_lock automatically locks m_mutex and unlocks it after the end of its life cycle
- move constructor reduces overhead
template <typename T> class SafeQueue { private: std::queue<T> m_queue; //Constructing Queues Using Template Functions β std::mutex m_mutex; // Access Mutex β public: SafeQueue() {} SafeQueue(SafeQueue &&other) {} ~SafeQueue() {} β bool empty() // Is the return queue empty { std::unique_lock<std::mutex> lock(m_mutex); // Mutex variable lock to prevent m_queue from being changed β return m_queue.empty(); } β int size() { std::unique_lock<std::mutex> lock(m_mutex); // Mutex variable lock to prevent m_queue from being changed β return m_queue.size(); } β // Queue Add Element void enqueue(T &t) { std::unique_lock<std::mutex> lock(m_mutex); m_queue.emplace(t); } β // Queue out elements bool dequeue(T &t) { std::unique_lock<std::mutex> lock(m_mutex); // Queue Locking β if (m_queue.empty()) return false; t = std::move(m_queue.front()); // Remove the header element, return the header element value, and reference the right value β m_queue.pop(); // The first element of the ejection team β return true; } };
Thread Pool
There's a lot to miss here. I'll add it later
Submit function
// Submit a function to be executed asynchronously by the pool template <typename F, typename... Args> auto submit(F &&f, Args &&...args) -> std::future<decltype(f(args...))> β { // Create a function with bounded parameter ready to execute std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...); β‘// Join function and parameter definitions, special function types, avoid left and right value errors β // Encapsulate it into a shared pointer in order to be able to copy construct auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func); β’ β // Warp packaged task into void function std::function<void()> warpper_func = [task_ptr]() { (*task_ptr)(); }; β£ β // Queue Universal Security Packet Function and Push into Security Queue m_queue.enqueue(warpper_func); β // Wake up a waiting thread m_conditional_lock.notify_one(); β€ β // Return previously registered task pointer return task_ptr->get_future(); }
Built-in worker thread lei
class ThreadWorker // Built-in Thread Work Class { private: int m_id; // Work id β ThreadPool *m_pool; // Owning Thread Pool public: // Constructor ThreadWorker(ThreadPool *pool, const int id) : m_pool(pool), m_id(id) { } β // Overload () operation void operator()() { std::function<void()> func; // Define the base function class func β bool dequeued; // Is removing elements from the queue // Determine if the thread pool is closed or if it is not, cycle through the task queue to extract tasks while (!m_pool->m_shutdown) { { // Lock threading environments, hibernate and wake worker threads for mutual access std::unique_lock<std::mutex> lock(m_pool->m_conditional_mutex); β // Block the current thread if the task queue is empty if (m_pool->m_queue.empty()) { m_pool->m_conditional_lock.wait(lock); // Wait for conditional variable notification, open thread } β // Remove elements from task queue dequeued = m_pool->m_queue.dequeue(func); } β // If the removal is successful, execute the working function if (dequeued) func(); } } };
Complete Code
//thread_pool.h β #ifndef THREAD_POOL_H #define THREAD_POOL_H β #include <mutex> #include <queue> #include <functional> #include <future> #include <thread> #include <utility> #include <vector> β // Thread safe implementation of a Queue using a std::queue template <typename T> class SafeQueue { private: std::queue<T> m_queue; //Constructing Queues Using Template Functions β std::mutex m_mutex; // Access Mutex β public: SafeQueue() {} SafeQueue(SafeQueue &&other) {} ~SafeQueue() {} β bool empty() // Is the return queue empty { std::unique_lock<std::mutex> lock(m_mutex); // Mutex variable lock to prevent m_queue from being changed β return m_queue.empty(); } β int size() { std::unique_lock<std::mutex> lock(m_mutex); // Mutex variable lock to prevent m_queue from being changed β return m_queue.size(); } β // Queue Add Element void enqueue(T &t) { std::unique_lock<std::mutex> lock(m_mutex); m_queue.emplace(t); } β // Queue out elements bool dequeue(T &t) { std::unique_lock<std::mutex> lock(m_mutex); // Queue Locking β if (m_queue.empty()) return false; t = std::move(m_queue.front()); // Remove the header element, return the header element value, and reference the right value β m_queue.pop(); // The first element of the ejection team β return true; } }; β class ThreadPool { private: class ThreadWorker // Built-in Thread Work Class { private: int m_id; // Work id β ThreadPool *m_pool; // Owning Thread Pool public: // Constructor ThreadWorker(ThreadPool *pool, const int id) : m_pool(pool), m_id(id) { } β // Overload () operation void operator()() { std::function<void()> func; // Define the base function class func β bool dequeued; // Is removing elements from the queue β while (!m_pool->m_shutdown) { { // Lock threading environments, hibernate and wake worker threads for mutual access std::unique_lock<std::mutex> lock(m_pool->m_conditional_mutex); β // Block the current thread if the task queue is empty if (m_pool->m_queue.empty()) { m_pool->m_conditional_lock.wait(lock); // Wait for conditional variable notification, open thread } β // Remove elements from task queue dequeued = m_pool->m_queue.dequeue(func); } β // If the removal is successful, execute the working function if (dequeued) func(); } } }; β bool m_shutdown; // Is Thread Pool Closed β SafeQueue<std::function<void()>> m_queue; // Execute Function Security Queue, or Task Queue β std::vector<std::thread> m_threads; // Work Thread Queue β std::mutex m_conditional_mutex; // Thread Sleep Lock Mutex Variable β std::condition_variable m_conditional_lock; // Thread environment lock, which allows threads to sleep or wake up β public: // Thread pool constructor ThreadPool(const int n_threads = 4) : m_threads(std::vector<std::thread>(n_threads)), m_shutdown(false) { } β ThreadPool(const ThreadPool &) = delete; β ThreadPool(ThreadPool &&) = delete; β ThreadPool &operator=(const ThreadPool &) = delete; β ThreadPool &operator=(ThreadPool &&) = delete; β // Inits thread pool void init() { for (int i = 0; i < m_threads.size(); ++i) { m_threads.at(i) = std::thread(ThreadWorker(this, i)); // Assign worker threads } } β // Waits until threads finish their current task and shutdowns the pool void shutdown() { m_shutdown = true; m_conditional_lock.notify_all(); // Notify, wake up all worker threads β for (int i = 0; i < m_threads.size(); ++i) { if (m_threads.at(i).joinable()) // Determine if a thread is waiting { m_threads.at(i).join(); // Join thread to wait queue } } } β // Submit a function to be executed asynchronously by the pool template <typename F, typename... Args> auto submit(F &&f, Args &&...args) -> std::future<decltype(f(args...))> { // Create a function with bounded parameter ready to execute std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...); // Join function and parameter definitions, special function types, avoid left and right value errors β // Encapsulate it into a shared pointer in order to be able to copy construct auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func); β // Warp packaged task into void function std::function<void()> warpper_func = [task_ptr]() { (*task_ptr)(); }; β // Queue Universal Security Packet Function and Push into Security Queue m_queue.enqueue(warpper_func); β // Wake up a waiting thread m_conditional_lock.notify_one(); β // Return previously registered task pointer return task_ptr->get_future(); } }; β #endif
Test sample code
// test.cpp β #include <iostream> #include <random> #include "thread_pool.h" std::random_device rd; // Real Random Number Generator β std::mt19937 mt(rd()); //Generate Computed Random Number mt β std::uniform_int_distribution<int> dist(-1000, 1000); //Generate the number of discrete uniform distributions between -1000 and 1000 β auto rnd = std::bind(dist, mt); β // Set thread sleep time void simulate_hard_computation() { std::this_thread::sleep_for(std::chrono::milliseconds(2000 + rnd())); } β // A simple function that adds two numbers and prints the results void multiply(const int a, const int b) { simulate_hard_computation(); const int res = a * b; std::cout << a << " * " << b << " = " << res << std::endl; } β // Add and output results void multiply_output(int &out, const int a, const int b) { simulate_hard_computation(); out = a * b; std::cout << a << " * " << b << " = " << out << std::endl; } β // Result Return int multiply_return(const int a, const int b) { simulate_hard_computation(); const int res = a * b; std::cout << a << " * " << b << " = " << res << std::endl; return res; } β void example() { // Create a threaded pool of 3 threads ThreadPool pool(3); β // Initialize thread pool pool.init(); β // Submit a multiplication operation for a total of 30 for (int i = 1; i <= 3; ++i) for (int j = 1; j <= 10; ++j) { pool.submit(multiply, i, j); } β // Submit a function using the output parameter passed by ref int output_ref; auto future1 = pool.submit(multiply_output, std::ref(output_ref), 5, 6); β // Waiting for multiplication output to complete future1.get(); std::cout << "Last operation result is equals to " << output_ref << std::endl; β // Submitting a function using the return parameter auto future2 = pool.submit(multiply_return, 5, 3); β // Waiting for multiplication output to complete int res = future2.get(); std::cout << "Last operation result is equals to " << res << std::endl; β // Close Thread Pool pool.shutdown(); } β int main() { example(); β return 0; }