linux c-线程池

什么是线程池

什么是线程池?简单点说,线程池就是有一堆已经创建好了的线程,初始它们都处于空闲等待状态,当有新的任务需要处理的时候,就从这个池子里面取一个空闲等待的线程来处理该任务,当处理完成了就再次把该线程放回池中,以供后面的任务使用。当池子里的线程全都处理忙碌状态时,线程池中没有可用的空闲等待线程,此时,根据需要选择创建一个新的线程并置入池中,或者通知任务线程池忙,稍后再试。

为什么要使用线程池

我们说,线程的创建和销毁比之进程的创建和销毁是轻量级的,但是当我们的任务需要大量进行大量线程的创建和销毁操作时,这个消耗就会变成的相当大。比如,当你设计一个压力性能测试框架的时候,需要连续产生大量的并发操作,这个是时候,线程池就可以很好的帮上你的忙。线程池的好处就在于线程复用,一个任务处理完成后,当前线程可以直接处理下一个任务,而不是销毁后再创建,非常适用于连续产生大量并发任务的场合。

线程池的原理

线程池的任务就在于负责这些线程的创建,销毁和任务处理参数传递、唤醒和等待。
- 创建若干线程,置入线程池 - 任务达到时,从线程池取空闲线程 - 取得了空闲线程,立即进行任务处理 - 否则新建一个线程,并置入线程池,执行3 - 如果创建失败或者线程池已满,根据设计策略选择返回错误或将任务置入处理队列,等待处理 - 销毁线程池

线程池的设计

###数据结构的设计 任务结构体

typedef struct tp_work_desc_s TpWorkDesc;  
typedef void (*process_job)(TpWorkDesc*job);  
struct tp_work_desc_s {  
         void *ret; //call in, that is arguments  
	     void *arg; //call out, that is return value  
};

其中,TpWorkDesc是任务参数描述,arg是传递给任务的参数,ret则是任务处理完成后的返回值;

process_job函数是任务处理函数原型,每个任务处理函数都应该这样定义,然后将它作为参数传给线程池处理,线程池将会选择一个空闲线程通过调用该函数来进行任务处理;

线程设计

typedef struct tp_thread_info_s TpThreadInfo;  
struct tp_thread_info_s {  
         pthread_t thread_id; //thread id num  
         TPBOOL is_busy; //thread status:true-busy;flase-idle  
         pthread_cond_t thread_cond;   //thread condition sem  
         pthread_mutex_t thread_lock;  
         process_job proc_fun;  
         TpWorkDesc* th_job;  
         TpThreadPool* tp_pool;  
};

TpThreadInfo是对一个线程的描述。

thread_id是该线程的ID;

is_busy用于标识该线程是否正处理忙碌状态;

thread_cond用于任务处理时的唤醒和等待;

thread_lock,用于任务加锁,用于条件变量等待加锁;

proc_fun是当前任务的回调函数地址;

th_job是任务的参数信息;

tp_pool是所在线程池的指针;

线程池的设计

struct tp_thread_pool_s {
	unsigned min_th_num; //min thread number in the pool
	unsigned cur_th_num; //current thread number in the pool
	unsigned max_th_num; //max thread number in the pool
	pthread_mutex_t tp_lock;
	pthread_cond_t tp_cond;
	pthread_mutex_t loop_lock;
	pthread_cond_t loop_cond;
	
	TpThreadInfo *thread_info;
	TSQueue *idle_q; //idle queue
	BOOL stop_flag; //whether stop the threading pool
	
	pthread_t manage_thread_id; //manage thread id num
	float busy_threshold; //
	unsigned manage_interval; //
};

TpThreadPool是对线程池的描述。

min_th_num是线程池中至少存在的线程数,线程池初始化的过程中会创建min_th_num数量的线程;

cur_th_num是线程池当前存在的线程数量;

max_th_num则是线程池最多可以存在的线程数量;

tp_lock用于线程池管理时的互斥;

manage_thread_id是线程池的管理线程ID;

thread_info则是指向线程池数据,这里使用一个数组来存储线程池中线程的信息,该数组的大小为max_th_num;

idle_q是存储线程池空闲线程指针的队列,用于从线程池快速取得空闲线程;

stop_flag用于线程池的销毁,当stop_flag为FALSE时,表明当前线程池需要销毁,所有忙碌线程在处理完当前任务后会退出;

算法设计

线程创建

/**
 * user interface. creat thread pool.
 * para:
 * 	num: min thread number to be created in the pool
 * return:
 * 	thread pool struct instance be created successfully
 */
TpThreadPool *tp_create(unsigned min_num, unsigned max_num) {
	TpThreadPool *pTp;
	pTp = (TpThreadPool*) malloc(sizeof(TpThreadPool));

	memset(pTp, 0, sizeof(TpThreadPool));

	//init member var
	pTp->min_th_num = min_num;
	pTp->cur_th_num = min_num;
	pTp->max_th_num = max_num;
    /*初始化线程池里的一个mutex_lock,用于线程池管理的互斥*/
	pthread_mutex_init(&pTp->tp_lock, NULL);
	pthread_cond_init(&pTp->tp_cond, NULL);
    /*这个loop_lock用于*
	pthread_mutex_init(&pTp->loop_lock, NULL);
	pthread_cond_init(&pTp->loop_cond, NULL);

	//malloc mem for num thread info struct
	if (NULL != pTp->thread_info)
		free(pTp->thread_info);
	pTp->thread_info = (TpThreadInfo*) malloc(sizeof(TpThreadInfo) * pTp->max_th_num);
	memset(pTp->thread_info, 0, sizeof(TpThreadInfo) * pTp->max_th_num);

	return pTp;
}

创建伊始,线程池线程容量大小上限为max_th_num,初始容量为min_th_num; 线程的初始化

/**
 * member function reality. thread pool init function.
 * para:
 * 	pTp: thread pool struct instance ponter
 * return:
 * 	true: successful; false: failed
 */
int tp_init(TpThreadPool *pTp) {
	int i;
	int err;
	TpThreadInfo *pThi;

	//init_queue(&pTp->idle_q, NULL);
    /*创建一个队列,用于存放thread_info*/
	pTp->idle_q = ts_queue_create();
	pTp->stop_flag = FALSE;
	pTp->busy_threshold = BUSY_THRESHOLD;
	pTp->manage_interval = MANAGE_INTERVAL;

	//create work thread and init work thread info
	for (i = 0; i < pTp->min_th_num; i++) {
		pThi = pTp->thread_info + i; //一个线程对应于一个thread_info
		pThi->tp_pool = pTp;         //记录指针
		pThi->is_busy = FALSE;
		pthread_cond_init(&pThi->thread_cond, NULL);
		pthread_mutex_init(&pThi->thread_lock, NULL);
		pThi->proc_fun = NULL;
		pThi->arg = NULL;
		ts_queue_enq_data(pTp->idle_q, pThi);
        /*new一个线程,调用的函数为tp_work_thread,参数为pThi(thread_info)*/
		err = pthread_create(&pThi->thread_id, NULL, tp_work_thread, pThi);
		if (0 != err) {
			perror("tp_init: create work thread failed.");
			ts_queue_destroy(pTp->idle_q);
			return -1;
		}
	}

	//create manage thread
	err = pthread_create(&pTp->manage_thread_id, NULL, tp_manage_thread, pTp);
	if (0 != err) {//clear_queue(&pTp->idle_q);
		ts_queue_destroy(pTp->idle_q);
		fprintf(stderr, "tp_init: creat manage thread failed\n");
		return 0;
	}
	
	//wait for all threads are ready
	while(i++ < pTp->cur_th_num){
		pthread_mutex_lock(&pTp->tp_lock);
		pthread_cond_wait(&pTp->tp_cond, &pTp->tp_lock);
		pthread_mutex_unlock(&pTp->tp_lock);
	}
	DEBUG("All threads are ready now\n");
	return 0;
}

初始线程池中线程数量为min_th_num,对这些线程一一进行初始化; 将这些初始化的空闲线程一一置入空闲队列;

创建管理线程,用于监控线程池的状态,并适当回收多余的线程资源;

线程池的关闭和销毁

/**
 * member function reality. thread pool entirely close function.
 * para:
 * 	pTp: thread pool struct instance ponter
 * return:
 */
void tp_close(TpThreadPool *pTp, BOOL wait) {
	unsigned i;

	pTp->stop_flag = TRUE;
	if (wait) {
		DEBUG("current number of threads: %u", pTp->cur_th_num);
		for (i = 0; i < pTp->cur_th_num; i++) {
			pthread_cond_signal(&pTp->thread_info[i].thread_cond);
		}
		for (i = 0; i < pTp->cur_th_num; i++) {
			if(0 != pthread_join(pTp->thread_info[i].thread_id, NULL)){
				perror("pthread_join");
			}
			//DEBUG("join a thread success.\n");
			pthread_mutex_destroy(&pTp->thread_info[i].thread_lock);
			pthread_cond_destroy(&pTp->thread_info[i].thread_cond);
		}
	} else {
		//close work thread
		for (i = 0; i < pTp->cur_th_num; i++) {
			kill((pid_t)pTp->thread_info[i].thread_id, SIGKILL);
			pthread_mutex_destroy(&pTp->thread_info[i].thread_lock);
			pthread_cond_destroy(&pTp->thread_info[i].thread_cond);
		}
	}
	//close manage thread
	kill((pid_t)pTp->manage_thread_id, SIGKILL);
	pthread_mutex_destroy(&pTp->tp_lock);
	pthread_cond_destroy(&pTp->tp_cond);
	pthread_mutex_destroy(&pTp->loop_lock);
	pthread_cond_destroy(&pTp->loop_cond);

	//clear_queue(&pTp->idle_q);
	ts_queue_destroy(pTp->idle_q);
	//free thread struct
	free(pTp->thread_info);
	pTp->thread_info = NULL;
}

线程池关闭的过程中,可以选择是否对正在处理的任务进行等待,如果是,则会唤醒所有任务,然后等待所有任务执行完成,然后返回;如果不是,则将立即杀死所有线程,然后返回,注意:这可能会导致任务的处理中断而产生错误!

/**
 * member function reality. main interface opened.
 * after getting own worker and job, user may use the function to process the task.
 * para:
 * 	pTp: thread pool struct instance ponter
 *	worker: user task reality.
 *	job: user task para
 * return:
 */
int tp_process_job(TpThreadPool *pTp, process_job proc_fun, void *arg) {
	TpThreadInfo *pThi ;
	//fill pTp->thread_info's relative work key
    /*从队列中取出一个Thread_info*/
	pThi = (TpThreadInfo *) ts_queue_deq_data(pTp->idle_q);
	if(pThi){
		DEBUG("Fetch a thread from pool.\n");
		pThi->is_busy = TRUE;
		pThi->proc_fun = proc_fun;
		pThi->arg = arg;
		//let the thread to deal with this job
		DEBUG("wake up thread %u\n", pThi->thread_id);
		pthread_cond_signal(&pThi->thread_cond)}
	else{
		//if all current thread are busy, new thread is created here
		if(!(pThi = tp_add_thread(pTp, proc_fun, arg))){
			DEBUG("The thread pool is full, no more thread available.\n");
			return -1;
		}
		/* should I wait? */
		//pthread_mutex_lock(&pTp->tp_lock);
		//pthread_cond_wait(&pTp->tp_cond, &pTp->tp_lock);
		//pthread_mutex_unlock(&pTp->tp_lock);
		
		DEBUG("No more idle thread, a new thread is created.\n");
	}
	return 0;
}

当一个新任务到达是,线程池首先会检查是否有可用的空闲线程,如果是,则采用才空闲线程进行任务处理并返回TRUE,如果不是,则尝试新建一个线程,并使用该线程对任务进行处理,如果失败则返回FALSE,说明线程池忙碌或者出错。

/**
 * internal interface. real work thread.
 * @params:
 * 	arg: args for this method
 * @return:
 *	none
 */
static void *tp_work_thread(void *arg) {
	TpThreadInfo *pTinfo = (TpThreadInfo *) arg;
	TpThreadPool *pTp = pTinfo->tp_pool;

	//wake up waiting thread, notify it I am ready
	pthread_cond_signal(&pTp->tp_cond);
	while (!(pTp->stop_flag)) {
		//process
		if(pTinfo->proc_fun){
			DEBUG("thread %u is running\n", pTinfo->thread_id);
			pTinfo->proc_fun(pTinfo->arg);
			//thread state shoulde be set idle after work
			pTinfo->is_busy = FALSE;
			pTinfo->proc_fun = NULL;
			//I am idle now
			ts_queue_enq_data(pTp->idle_q, pTinfo);
		}
			
		//wait cond for processing real job.

		DEBUG("thread %u is waiting for a job\n", pTinfo->thread_id);
		pthread_mutex_lock(&pTinfo->thread_lock);
        /*等待process_job中发送一个send_signed来唤醒*/
		pthread_cond_wait(&pTinfo->thread_cond, &pTinfo->thread_lock);
		pthread_mutex_unlock(&pTinfo->thread_lock);
		DEBUG("thread %u end waiting for a job\n", pTinfo->thread_id);

		if(pTinfo->tp_pool->stop_flag){
			DEBUG("thread %u stop\n", pTinfo->thread_id);
			break;
		}
	}
	DEBUG("Job done, thread %u is idle now.\n", pTinfo->thread_id);
}

上面这个函数是任务处理函数,该函数将始终处理等待唤醒状态,直到新任务到达或者线程销毁时被唤醒,然后调用任务处理回调函数对任务进行处理;当任务处理完成时,则将自己置入空闲队列中,以供下一个任务处理。

/**
 * member function reality. add new thread into the pool and run immediately.
 * para:
 * 	pTp: thread pool struct instance ponter
 * 	proc_fun:
 * 	job:
 * return:
 * 	pointer of TpThreadInfo
 */
static TpThreadInfo *tp_add_thread(TpThreadPool *pTp, process_job proc_fun, void *arg) {
	int err;
	TpThreadInfo *new_thread;

	pthread_mutex_lock(&pTp->tp_lock);
	if (pTp->max_th_num <= pTp->cur_th_num){
		pthread_mutex_unlock(&pTp->tp_lock);
		return NULL;
	}

	//malloc new thread info struct
	new_thread = pTp->thread_info + pTp->cur_th_num; 
	pTp->cur_th_num++;
	pthread_mutex_unlock(&pTp->tp_lock);

	new_thread->tp_pool = pTp;
	//init new thread's cond & mutex
	pthread_cond_init(&new_thread->thread_cond, NULL);
	pthread_mutex_init(&new_thread->thread_lock, NULL);

	//init status is busy, only new process job will call this function
	new_thread->is_busy = TRUE;
	new_thread->proc_fun = proc_fun;
	new_thread->arg = arg;

	err = pthread_create(&new_thread->thread_id, NULL, tp_work_thread, new_thread);
	if (0 != err) {
		perror("tp_add_thread: pthread_create");
		free(new_thread);
		return NULL;
	}
	return new_thread;
}

上面这个函数用于向线程池中添加新的线程,该函数将会在当线程池没有空闲线程可用时被调用。 函数将会新建一个线程,并设置自己的状态为busy(立即就要被用于执行任务)

线程池的管理

线程池的管理主要是监控线程池的整体忙碌状态,当线程池大部分线程处于空闲状态时,管理线程将适当的销毁一定数量的空闲线程,以便减少线程池对系统资源的消耗。 这里设计认为,当空闲线程的数量超过线程池线程数量的1/2时,线程池总体处理空闲状态,可以适当销毁部分线程池的线程,以减少线程池对系统资源的开销。

线程池状态计算

/**
 * member function reality. get current thread pool status:idle, normal, busy, .etc.
 * para:
 * 	pTp: thread pool struct instance ponter
 * return:
 * 	0: idle; 1: normal or busy(don't process)
 */
int tp_get_tp_status(TpThreadPool *pTp) {
	float busy_num = 0.0;
	int i;

	//get busy thread number
	busy_num = pTp->cur_th_num - ts_queue_count(pTp->idle_q);

	DEBUG("Current thread pool status, current num: %u, busy num: %u, idle num: %u\n", pTp->cur_th_num, (unsigned)busy_num, ts_queue_count(pTp->idle_q));
	if(busy_num / (pTp->cur_th_num) < pTp->busy_threshold)
		return 0;//idle status
	else
		return 1;//busy or normal status	
}

线程的销毁算法 - 从空闲队列中dequeue一个空闲线程指针,该指针指向线程信息数组的某项,例如这里是p; - 销毁该线程 - 把线程信息数组的最后一项拷贝至位置p - 线程池数量减少一,即cur_th_num–

/**
 * member function reality. delete idle thread in the pool.
 * only delete last idle thread in the pool.
 * para:
 * 	pTp: thread pool struct instance ponter
 * return:
 * 	true: successful; false: failed
 */
int tp_delete_thread(TpThreadPool *pTp) {
	unsigned idx;
	TpThreadInfo *pThi;
	TpThreadInfo tT;

	//current thread num can't < min thread num
	if (pTp->cur_th_num <= pTp->min_th_num)
		return -1;
	//all threads are busy
	pThi = (TpThreadInfo *) ts_queue_deq_data(pTp->idle_q);
	if(!pThi)
		return -1;
	
	//after deleting idle thread, current thread num -1
	pthread_mutex_lock(&pTp->tp_lock);
	pTp->cur_th_num--;
	/** swap this thread to the end, and free it! **/
	memcpy(&tT, pThi, sizeof(TpThreadInfo));
	memcpy(pThi, pTp->thread_info + pTp->cur_th_num, sizeof(TpThreadInfo));
	memcpy(pTp->thread_info + pTp->cur_th_num, &tT, sizeof(TpThreadInfo));
	pthread_mutex_unlock(&pTp->tp_lock);

	//kill the idle thread and free info struct
	kill((pid_t)tT.thread_id, SIGKILL);
	pthread_mutex_destroy(&tT.thread_lock);
	pthread_cond_destroy(&tT.thread_cond);

	return 0;
}

线程池的管理

线程池通过一个管理线程来进行监控,管理线程将会每隔一段时间对线程池的状态进行计算,根据线程池的状态适当的销毁部分线程,减少对系统资源的消耗。

/**
 * internal interface. manage thread pool to delete idle thread.
 * para:
 * 	pthread: thread pool struct ponter
 * return:
 */
static void *tp_manage_thread(void *arg) {
	TpThreadPool *pTp = (TpThreadPool*) arg;//main thread pool struct instance

	//1?
	sleep(pTp->manage_interval);

	do {
		if (tp_get_tp_status(pTp) == 0) {
			do {
				if (!tp_delete_thread(pTp))
					break;
			} while (TRUE);
		}//end for if

		//1?
		sleep(pTp->manage_interval);
	} while (!pTp->stop_flag);
	return NULL;
}