linux system thread pool

Brief description

Threads in a process are like employees in a company. The number of employees should be determined according to the company's business. Too few threads are too busy, but too many threads also waste resources. Ideally, the process should have some initial number of threads (thread pool). When there is no task, these threads will automatically go to sleep. With a task, they will immediately execute the task and keep cycling. The process should also increase or decrease the number of threads according to whether its task is heavy or not. When all tasks are completed, all threads can end properly.

The following figure shows a thread pool in its initial state.

Figure 1 Schematic diagram of thread pool

The following points need to be noted:

(1) There are no tasks in the task queue at the beginning. It is an empty chain queue with a header node

(2) Use mutexes to protect this queue

(3) The condition variable is used to represent the change in the number of tasks in the task queue. In the future, if the main thread puts tasks in the queue, the condition variable can be used to wake up the sleeping thread

(4) A public switch, shutdown, is used to control the thread exit and destroy the entire thread pool

interface design

The related structure of thread pool is shown in the following table.

prototypestruct task
Function descriptionThe task node contains the functions to be executed and their parameters, and is connected to a task queue through a linked list
Member listvoid * (* task)(void *arg);
void *arg;
struct task *next;
remarksTask instances eventually form a one-way linked list
prototypethread_pool
Function descriptionThread pool instance, which contains all the information of a thread pool
Member listpthread_mutex_t lock; // Mutex to protect task queue
pthread_cond_t cond; // Condition variables, synchronizing all threads
bool shutdown; // Thread pool destroy flag
struct task *task_list; // Task chain queue pointer
pthread_t *tids; // Thread ID storage location
unsigned int waiting tasks; // Number of tasks waiting in the task chain queue
unsigned int active_threads; // Number of currently active threads
remarksThe number of active threads can be modified, but there is at least one active thread

The following is the interface description of the thread pool

(1) Thread pool initialization: init_pool()

prototypebool init_pool(thread_pool *pool, unsigned int threads_number);
Function descriptionCreate a new thread pool containing threads_number active threads
parameterPool: thread pool pointer
threads_number: number of initially active threads (greater than or equal to 1)
Return valueReturns true for success and false for failure
Header filethread_pool.h
remarksThe minimum number of threads in the thread pool is 1

(2) Release task: add_task()

prototypebool add_task(thread_pool *pool, void * (*do_task)(void *arg), void *arg);
Function descriptionDropping tasks to the thread pool
parameterPool: thread pool pointer
do_task: execution routine put into the thread pool
arg: execute routine do_ The parameter of task. If the execution routine does not need a parameter, it can be set to NULL
Return valueReturns true for success and false for failure
Header filethread_pool.h
remarksThe maximum number of tasks in the task queue is MAX_WAITING_TASKS

(3) Add active thread: add_thread()

prototypeint add_thread(thread_pool *pool, unsigned int additional_threads);
Function descriptionIncrease the number of active threads in the thread pool
parameterPool: the thread pool pointer of the thread needs to be increased
additional_threads: number of new threads
Return value>0: actual number of newly added threads
-1: Failed
Header filethread_pool.h

(4) Delete active thread: remove_thread()

prototypeint remove_thread(thread_pool *pool, unsigned int removing threads);
Function descriptionDelete the number of active threads in the thread pool
parameterPool: the thread pool pointer of the thread to be deleted
removing_threads: number of threads to delete; When this parameter is set to 0, it directly returns the total number of threads in the current thread pool without any other impact on the thread pool
Return value>0: number of threads remaining in the current thread pool
-1: Failed
Header filethread_pool.h
remarksAt least 1 active thread will exist in the thread pool
If the deleted thread is executing a task, it will be deleted after it completes the task

(5) Destroy thread pool: destroy_pool()

prototypebool destroy_pool(thread_pool *pool);
Function descriptionBlocking waits for all tasks to complete, and then immediately destroys the entire thread pool to free all resources and memory
parameterPool: thread pool to be destroyed
Return valueReturns true for success and false for failure
Header filethread_pool.h

Source code implementation

Header file thread_ Pool h

//
//  
//  Description: this file contains the definition of the basic structure of the thread pool and various operation functions
//               Declaration of number
//
//

#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_

#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>

#include <errno.h>
#include <pthread.h>

#define MAX_WAITING_TASKS	1000
#define MAX_ACTIVE_THREADS	20

struct task
{
	void *(*task)(void *arg);
	void *arg;

	struct task *next;
};

typedef struct thread_pool
{
	pthread_mutex_t lock;
	pthread_cond_t  cond;
	struct task *task_list;

	pthread_t *tids;

	unsigned waiting_tasks;
	unsigned active_threads;

	bool shutdown;
}thread_pool;


bool
init_pool(thread_pool *pool,
          unsigned int threads_number);

bool
add_task(thread_pool *pool,
         void *(*task)(void *arg),
         void *arg);

int 
add_thread(thread_pool *pool,
           unsigned int additional_threads_number);

int 
remove_thread(thread_pool *pool,
              unsigned int removing_threads_number);

bool destroy_pool(thread_pool *pool);
void *routine(void *arg);

#endif

Interface implementation: thread_ Pool c

//
//  Description: this file contains the definition of thread pool operation function
//

#include "thread_pool.h"

void handler(void *arg)
{

	pthread_mutex_unlock((pthread_mutex_t *)arg);
}

void *routine(void *arg)
{
	thread_pool *pool = (thread_pool *)arg;
	struct task *p;

	while(1)
	{

		pthread_cleanup_push(handler, (void *)&pool->lock);
		pthread_mutex_lock(&pool->lock);


		while(pool->waiting_tasks == 0 && !pool->shutdown)
		{
			pthread_cond_wait(&pool->cond, &pool->lock);
		}


		if(pool->waiting_tasks == 0 && pool->shutdown == true)
		{
			pthread_mutex_unlock(&pool->lock);
			pthread_exit(NULL);
		}


		p = pool->task_list->next;
		pool->task_list->next = p->next;
		pool->waiting_tasks--;


		pthread_mutex_unlock(&pool->lock);
		pthread_cleanup_pop(0);


		pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
		(p->task)(p->arg);
		pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);

		free(p);
	}

	pthread_exit(NULL);
}

bool init_pool(thread_pool *pool, unsigned int threads_number)
{
	pthread_mutex_init(&pool->lock, NULL);
	pthread_cond_init(&pool->cond, NULL);

	pool->shutdown = false;
	pool->task_list = malloc(sizeof(struct task));
	pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);

	if(pool->task_list == NULL || pool->tids == NULL)
	{
		perror("allocate memory error");
		return false;
	}

	pool->task_list->next = NULL;

	pool->waiting_tasks = 0;
	pool->active_threads = threads_number;

	int i;
	for(i=0; i<pool->active_threads; i++)
	{
		if(pthread_create(&((pool->tids)[i]), NULL,
					routine, (void *)pool) != 0)
		{
			perror("create threads error");
			return false;
		}
	}

	return true;
}

bool add_task(thread_pool *pool,
			void *(*task)(void *arg), void *arg)
{
	struct task *new_task = malloc(sizeof(struct task));
	if(new_task == NULL)
	{
		perror("allocate memory error");
		return false;
	}
	new_task->task = task;
	new_task->arg = arg;
	new_task->next = NULL;


	pthread_mutex_lock(&pool->lock);
	if(pool->waiting_tasks >= MAX_WAITING_TASKS)
	{
		pthread_mutex_unlock(&pool->lock);

		fprintf(stderr, "too many tasks.\n");
		free(new_task);

		return false;
	}
	
	struct task *tmp = pool->task_list;
	while(tmp->next != NULL)
		tmp = tmp->next;

	tmp->next = new_task;
	pool->waiting_tasks++;


	pthread_mutex_unlock(&pool->lock);
	pthread_cond_signal(&pool->cond);

	return true;
}

int add_thread(thread_pool *pool, unsigned additional_threads)
{
	if(additional_threads == 0)
		return 0;

	unsigned total_threads =
		     pool->active_threads + additional_threads;

	int i, actual_increment = 0;
	for(i = pool->active_threads;
	    i < total_threads && i < MAX_ACTIVE_THREADS;
	    i++)
	{
		if(pthread_create(&((pool->tids)[i]),
				NULL, routine, (void *)pool) != 0)
		{
			perror("add threads error");

			if(actual_increment == 0)
				return -1;

			break;
		}
		actual_increment++; 
	}

	pool->active_threads += actual_increment;
	return actual_increment;
}

int remove_thread(thread_pool *pool, unsigned int removing_threads)
{
	if(removing_threads == 0)
		return pool->active_threads;

	int remain_threads = pool->active_threads - removing_threads;
	remain_threads = remain_threads>0 ? remain_threads:1;

	int i;
	for(i=pool->active_threads-1; i>remain_threads-1; i--)
	{
		errno = pthread_cancel(pool->tids[i]);
		if(errno != 0)
			break;
	}

	if(i == pool->active_threads-1)
		return -1;
	else
	{
		pool->active_threads = i+1;
		return i+1;
	}
}

bool destroy_pool(thread_pool *pool)
{

	pool->shutdown = true;
	pthread_cond_broadcast(&pool->cond);

	int i;
	for(i=0; i<pool->active_threads; i++)
	{
		errno = pthread_join(pool->tids[i], NULL);
		if(errno != 0)
		{
			printf("join tids[%d] error: %s\n",
					i, strerror(errno));
		}
		else
			printf("[%u] is joined\n", (unsigned)pool->tids[i]);
		
	}

	free(pool->task_list);
	free(pool->tids);
	free(pool);

	return true;
}

Test procedure

main.c

//

//  Description: an example using thread pool

//

#include "thread_pool.h"

void *mytask(void *arg)
{
	int n = (int)arg;

	printf("[%u][%s] ==> job will be done in %d sec...\n",
		(unsigned)pthread_self(), __FUNCTION__, n);

	sleep(n);

	printf("[%u][%s] ==> job done!\n",
		(unsigned)pthread_self(), __FUNCTION__);

	return NULL;
}

void *count_time(void *arg)
{
	int i = 0;
	while(1)
	{
		sleep(1);
		printf("sec: %d\n", ++i);
	}
}

int main(void)
{
	pthread_t a;
	pthread_create(&a, NULL, count_time, NULL);

	// 1, initialize the pool
	thread_pool *pool = malloc(sizeof(thread_pool));
	init_pool(pool, 2);

	// 2, throw tasks
	printf("throwing 3 tasks...\n");
	add_task(pool, mytask, (void *)(rand()%10));
	add_task(pool, mytask, (void *)(rand()%10));
	add_task(pool, mytask, (void *)(rand()%10));

	// 3, check active threads number
	printf("current thread number: %d\n",
			remove_thread(pool, 0));
	sleep(9);

	// 4, throw tasks
	printf("throwing another 2 tasks...\n");
	add_task(pool, mytask, (void *)(rand()%10));
	add_task(pool, mytask, (void *)(rand()%10));

	// 5, add threads
	add_thread(pool, 2);

	sleep(5);

	// 6, remove threads
	printf("remove 3 threads from the pool, "
	       "current thread number: %d\n",
			remove_thread(pool, 3));

	// 7, destroy the pool
	destroy_pool(pool);
	return 0;
}

Running results

zzc@zzc-virtual-machine:~/share/example/thread$ ./test 
throwing 3 tasks...
current thread number: 2
[3981727488][mytask] ==> job will be done in 3 sec...
[3990120192][mytask] ==> job will be done in 6 sec...
sec: 1
sec: 2
[3981727488][mytask] ==> job done!
[3981727488][mytask] ==> job will be done in 7 sec...
sec: 3
sec: 4
sec: 5
[3990120192][mytask] ==> job done!
sec: 6
sec: 7
sec: 8
throwing another 2 tasks...
[3990120192][mytask] ==> job will be done in 5 sec...
[3892311808][mytask] ==> job will be done in 3 sec...
sec: 9
[3981727488][mytask] ==> job done!
sec: 10
sec: 11
[3892311808][mytask] ==> job done!
sec: 12
sec: 13
remove 3 threads from the pool, current thread number: 1
[3990120192][mytask] ==> job done!
[3990120192] is joined

extend

In actual use, just change the mytask function in the above code to the function function we need to implement.

summary

This paper briefly introduces the concept and characteristics of thread pool, designs the structure and related operation interfaces of thread pool, and provides the specific implementation of the interface. Finally, the running process of thread pool is demonstrated by an example program.

Tags: Linux data structure linked list

Posted by gte806e on Wed, 01 Jun 2022 18:31:44 +0530