Brief description
Threads in a process are like employees in a company. The number of employees should be determined according to the company's business. Too few threads are too busy, but too many threads also waste resources. Ideally, the process should have some initial number of threads (thread pool). When there is no task, these threads will automatically go to sleep. With a task, they will immediately execute the task and keep cycling. The process should also increase or decrease the number of threads according to whether its task is heavy or not. When all tasks are completed, all threads can end properly.
The following figure shows a thread pool in its initial state.
Figure 1 Schematic diagram of thread pool
The following points need to be noted:
(1) There are no tasks in the task queue at the beginning. It is an empty chain queue with a header node
(2) Use mutexes to protect this queue
(3) The condition variable is used to represent the change in the number of tasks in the task queue. In the future, if the main thread puts tasks in the queue, the condition variable can be used to wake up the sleeping thread
(4) A public switch, shutdown, is used to control the thread exit and destroy the entire thread pool
interface design
The related structure of thread pool is shown in the following table.
prototype | struct task |
---|---|
Function description | The task node contains the functions to be executed and their parameters, and is connected to a task queue through a linked list |
Member list | void * (* task)(void *arg); |
void *arg; | |
struct task *next; | |
remarks | Task instances eventually form a one-way linked list |
prototype | thread_pool |
Function description | Thread pool instance, which contains all the information of a thread pool |
Member list | pthread_mutex_t lock; // Mutex to protect task queue |
pthread_cond_t cond; // Condition variables, synchronizing all threads | |
bool shutdown; // Thread pool destroy flag | |
struct task *task_list; // Task chain queue pointer | |
pthread_t *tids; // Thread ID storage location | |
unsigned int waiting tasks; // Number of tasks waiting in the task chain queue | |
unsigned int active_threads; // Number of currently active threads | |
remarks | The number of active threads can be modified, but there is at least one active thread |
The following is the interface description of the thread pool
(1) Thread pool initialization: init_pool()
prototype | bool init_pool(thread_pool *pool, unsigned int threads_number); |
---|---|
Function description | Create a new thread pool containing threads_number active threads |
parameter | Pool: thread pool pointer |
threads_number: number of initially active threads (greater than or equal to 1) | |
Return value | Returns true for success and false for failure |
Header file | thread_pool.h |
remarks | The minimum number of threads in the thread pool is 1 |
(2) Release task: add_task()
prototype | bool add_task(thread_pool *pool, void * (*do_task)(void *arg), void *arg); |
---|---|
Function description | Dropping tasks to the thread pool |
parameter | Pool: thread pool pointer |
do_task: execution routine put into the thread pool | |
arg: execute routine do_ The parameter of task. If the execution routine does not need a parameter, it can be set to NULL | |
Return value | Returns true for success and false for failure |
Header file | thread_pool.h |
remarks | The maximum number of tasks in the task queue is MAX_WAITING_TASKS |
(3) Add active thread: add_thread()
prototype | int add_thread(thread_pool *pool, unsigned int additional_threads); |
---|---|
Function description | Increase the number of active threads in the thread pool |
parameter | Pool: the thread pool pointer of the thread needs to be increased |
additional_threads: number of new threads | |
Return value | >0: actual number of newly added threads |
-1: Failed | |
Header file | thread_pool.h |
(4) Delete active thread: remove_thread()
prototype | int remove_thread(thread_pool *pool, unsigned int removing threads); |
---|---|
Function description | Delete the number of active threads in the thread pool |
parameter | Pool: the thread pool pointer of the thread to be deleted |
removing_threads: number of threads to delete; When this parameter is set to 0, it directly returns the total number of threads in the current thread pool without any other impact on the thread pool | |
Return value | >0: number of threads remaining in the current thread pool |
-1: Failed | |
Header file | thread_pool.h |
remarks | At least 1 active thread will exist in the thread pool |
If the deleted thread is executing a task, it will be deleted after it completes the task |
(5) Destroy thread pool: destroy_pool()
prototype | bool destroy_pool(thread_pool *pool); |
---|---|
Function description | Blocking waits for all tasks to complete, and then immediately destroys the entire thread pool to free all resources and memory |
parameter | Pool: thread pool to be destroyed |
Return value | Returns true for success and false for failure |
Header file | thread_pool.h |
Source code implementation
Header file thread_ Pool h
// // // Description: this file contains the definition of the basic structure of the thread pool and various operation functions // Declaration of number // // #ifndef _THREAD_POOL_H_ #define _THREAD_POOL_H_ #include <stdio.h> #include <stdbool.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <strings.h> #include <errno.h> #include <pthread.h> #define MAX_WAITING_TASKS 1000 #define MAX_ACTIVE_THREADS 20 struct task { void *(*task)(void *arg); void *arg; struct task *next; }; typedef struct thread_pool { pthread_mutex_t lock; pthread_cond_t cond; struct task *task_list; pthread_t *tids; unsigned waiting_tasks; unsigned active_threads; bool shutdown; }thread_pool; bool init_pool(thread_pool *pool, unsigned int threads_number); bool add_task(thread_pool *pool, void *(*task)(void *arg), void *arg); int add_thread(thread_pool *pool, unsigned int additional_threads_number); int remove_thread(thread_pool *pool, unsigned int removing_threads_number); bool destroy_pool(thread_pool *pool); void *routine(void *arg); #endif
Interface implementation: thread_ Pool c
// // Description: this file contains the definition of thread pool operation function // #include "thread_pool.h" void handler(void *arg) { pthread_mutex_unlock((pthread_mutex_t *)arg); } void *routine(void *arg) { thread_pool *pool = (thread_pool *)arg; struct task *p; while(1) { pthread_cleanup_push(handler, (void *)&pool->lock); pthread_mutex_lock(&pool->lock); while(pool->waiting_tasks == 0 && !pool->shutdown) { pthread_cond_wait(&pool->cond, &pool->lock); } if(pool->waiting_tasks == 0 && pool->shutdown == true) { pthread_mutex_unlock(&pool->lock); pthread_exit(NULL); } p = pool->task_list->next; pool->task_list->next = p->next; pool->waiting_tasks--; pthread_mutex_unlock(&pool->lock); pthread_cleanup_pop(0); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); (p->task)(p->arg); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); free(p); } pthread_exit(NULL); } bool init_pool(thread_pool *pool, unsigned int threads_number) { pthread_mutex_init(&pool->lock, NULL); pthread_cond_init(&pool->cond, NULL); pool->shutdown = false; pool->task_list = malloc(sizeof(struct task)); pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS); if(pool->task_list == NULL || pool->tids == NULL) { perror("allocate memory error"); return false; } pool->task_list->next = NULL; pool->waiting_tasks = 0; pool->active_threads = threads_number; int i; for(i=0; i<pool->active_threads; i++) { if(pthread_create(&((pool->tids)[i]), NULL, routine, (void *)pool) != 0) { perror("create threads error"); return false; } } return true; } bool add_task(thread_pool *pool, void *(*task)(void *arg), void *arg) { struct task *new_task = malloc(sizeof(struct task)); if(new_task == NULL) { perror("allocate memory error"); return false; } new_task->task = task; new_task->arg = arg; new_task->next = NULL; pthread_mutex_lock(&pool->lock); if(pool->waiting_tasks >= MAX_WAITING_TASKS) { pthread_mutex_unlock(&pool->lock); fprintf(stderr, "too many tasks.\n"); free(new_task); return false; } struct task *tmp = pool->task_list; while(tmp->next != NULL) tmp = tmp->next; tmp->next = new_task; pool->waiting_tasks++; pthread_mutex_unlock(&pool->lock); pthread_cond_signal(&pool->cond); return true; } int add_thread(thread_pool *pool, unsigned additional_threads) { if(additional_threads == 0) return 0; unsigned total_threads = pool->active_threads + additional_threads; int i, actual_increment = 0; for(i = pool->active_threads; i < total_threads && i < MAX_ACTIVE_THREADS; i++) { if(pthread_create(&((pool->tids)[i]), NULL, routine, (void *)pool) != 0) { perror("add threads error"); if(actual_increment == 0) return -1; break; } actual_increment++; } pool->active_threads += actual_increment; return actual_increment; } int remove_thread(thread_pool *pool, unsigned int removing_threads) { if(removing_threads == 0) return pool->active_threads; int remain_threads = pool->active_threads - removing_threads; remain_threads = remain_threads>0 ? remain_threads:1; int i; for(i=pool->active_threads-1; i>remain_threads-1; i--) { errno = pthread_cancel(pool->tids[i]); if(errno != 0) break; } if(i == pool->active_threads-1) return -1; else { pool->active_threads = i+1; return i+1; } } bool destroy_pool(thread_pool *pool) { pool->shutdown = true; pthread_cond_broadcast(&pool->cond); int i; for(i=0; i<pool->active_threads; i++) { errno = pthread_join(pool->tids[i], NULL); if(errno != 0) { printf("join tids[%d] error: %s\n", i, strerror(errno)); } else printf("[%u] is joined\n", (unsigned)pool->tids[i]); } free(pool->task_list); free(pool->tids); free(pool); return true; }
Test procedure
main.c
// // Description: an example using thread pool // #include "thread_pool.h" void *mytask(void *arg) { int n = (int)arg; printf("[%u][%s] ==> job will be done in %d sec...\n", (unsigned)pthread_self(), __FUNCTION__, n); sleep(n); printf("[%u][%s] ==> job done!\n", (unsigned)pthread_self(), __FUNCTION__); return NULL; } void *count_time(void *arg) { int i = 0; while(1) { sleep(1); printf("sec: %d\n", ++i); } } int main(void) { pthread_t a; pthread_create(&a, NULL, count_time, NULL); // 1, initialize the pool thread_pool *pool = malloc(sizeof(thread_pool)); init_pool(pool, 2); // 2, throw tasks printf("throwing 3 tasks...\n"); add_task(pool, mytask, (void *)(rand()%10)); add_task(pool, mytask, (void *)(rand()%10)); add_task(pool, mytask, (void *)(rand()%10)); // 3, check active threads number printf("current thread number: %d\n", remove_thread(pool, 0)); sleep(9); // 4, throw tasks printf("throwing another 2 tasks...\n"); add_task(pool, mytask, (void *)(rand()%10)); add_task(pool, mytask, (void *)(rand()%10)); // 5, add threads add_thread(pool, 2); sleep(5); // 6, remove threads printf("remove 3 threads from the pool, " "current thread number: %d\n", remove_thread(pool, 3)); // 7, destroy the pool destroy_pool(pool); return 0; }
Running results
zzc@zzc-virtual-machine:~/share/example/thread$ ./test throwing 3 tasks... current thread number: 2 [3981727488][mytask] ==> job will be done in 3 sec... [3990120192][mytask] ==> job will be done in 6 sec... sec: 1 sec: 2 [3981727488][mytask] ==> job done! [3981727488][mytask] ==> job will be done in 7 sec... sec: 3 sec: 4 sec: 5 [3990120192][mytask] ==> job done! sec: 6 sec: 7 sec: 8 throwing another 2 tasks... [3990120192][mytask] ==> job will be done in 5 sec... [3892311808][mytask] ==> job will be done in 3 sec... sec: 9 [3981727488][mytask] ==> job done! sec: 10 sec: 11 [3892311808][mytask] ==> job done! sec: 12 sec: 13 remove 3 threads from the pool, current thread number: 1 [3990120192][mytask] ==> job done! [3990120192] is joined
extend
In actual use, just change the mytask function in the above code to the function function we need to implement.
summary
This paper briefly introduces the concept and characteristics of thread pool, designs the structure and related operation interfaces of thread pool, and provides the specific implementation of the interface. Finally, the running process of thread pool is demonstrated by an example program.