手写线程池与性能分析
推荐一个零声学院免费公开课程,个人觉得老师讲得不错,分享给大家:Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等技术内容,立即学习
手写线程池与性能分析
作为一款服务器而言,很多服务器的源码,最底层最底层这些东西,是很通用的
比如我们之前讲过的网络,以及池式结构,最底层的这些组件是提供给我们应用层写业务逻辑的
在工作中接触的并不是很多,但是在面试的时候会问很多,工作中是以写业务为主,比如说工作中要你写个线程池这种可能性不大,比如说要你写一个连接池的可能性也不大,大量的时间可能是在写业务为主,那但是这个东西了,他在面试的时候,又是非常非常管用的东西。
简历里面肯定写一些通用的组件为主。
4个池式结构
先跟大家解释一下就是这些东西到底是用来做什么的?
对于一款服务器而言,然后n多的客户端连接上这个,然后再发每一个数据的时候,客户端给服务器发送的请求,发送的每一帧请求。
服务端这边接收完了之后进行处理,在recv()完了之后,那这里面我们正在进行解析,解析完数据之后,比如说我们写一些日志,那在这种同步的过程中间,写日志的时候
大概是这么一个流程,那对于parser这个过程只是把这个协议解析出来,解析出来之后,我们还有一系列的动作,比如说我们要去数据库查询,或者说我们要记录一些日志,或者说对数据库进行增删改查操作,
还有在这个过程中进行这些crud的操作的过程中,那会发现当这个日志记录的时候,它的时间周期会比较长,
就是在这一步落盘的时候,要写磁盘速度比较慢,对数据库操作的时候也会比较慢,所以很多的时候在这里大家就会引入一个东西,消息队列
抛到一个消息队列里面,然后再由消息队列进行去处理,或者说我通知给另外一个进程,由另外一个进程或者另外一个线程来对它进行操作,对这个日志进行罗盘来进行crud操作
那这个过程中间还有一种做法就是我们直接引入一个线程池,把这些日志或者这些crud当做一个一个的任务,把它抛给线程池,由线程池来处理这些任务,线程池就是解决这么一个问题,线程池的使用场景还是非常多的,
比如说写日志
我们对服务器的一些业务需要计算,需要一些计算结果,计算结果计算的时间周期比较长的时候,我们也可以引入另外的线程,反正说我们在开线程的过程中间都可以想到线程池,第三个就是crud,不光是在服务器上面
线程池到底是什么东西?
我们在这里跟大家分析一下,它的使用场景先讲了,那我们再来分析一下它到底怎么做?
首先第一个它是由一个一个的任务所组成,
比如说我们写日志是一个任务,我们做计算也好,把一个计算任务把它抛进去,对数据库操作,我们也可以把它当做一次任务。
这里肯定就是有一个这个东西:任务队列
任务队列是提供给外界去用的时候,我们一个个的任务把它抛给任务队列
这时我们外面写的应用程序调用的接口,调用接口时候就直接可以写成一个push_tasks,把这任务给抛出去。
那任务在什么时候执行呢?
这边当然还有一个执行队列,就是每1个节点,这里6个框,每1个框你就可以理解为是1个线程。
然后这一系列的线程就组成了一个集合,这个集合,就变成了一个执行队列,那这个执行队列执行什么东西呢,就从任务队列里面取任务,并且进行执行
第一个问题,
多个线程同时在一个任务队列里面,取任务,需要加锁,这是第一个问题。那这个任务队列就是这里几个线程的临界资源,取任务需要对这个任务队列进行加锁
第二个问题
举例的话就可能好理解,惊群那个案例一样,在中午的时候,你去一个食堂吃饭的时候,这个打饭的这个窗口上面,这6个阿姨相当于是去执行一个一个任务。
在这个过程中,每一次取任务的时候,他们肯定形成了某一种契约,就是每一次谁拿一个走,但是这里还有一个问题是什么?就是如果在下午三四点钟食堂没有人了,那这6个阿姨都在休息是吧?那这个状态在线程里面如何去表示这个休息的状态呢?
就是当任务不足的时候,如何去进行休息,线程如何操作?
条件变量 这是第二个。
在执行条件变量的时候是什么意思呢?这个状态进入挂起,进入一个条件等待,等待一个什么条件呢?等待这个任务队列不为空的一个条件满足,然后中间的线程取消休息,他这时候条件满足,条件等待等待任务队列不为空
这个线程池已经慢慢的整个原理性的问题,讲的差不多,可以看到线程池现在能理解透这两个问题。
那再来一个问题,线程池的工作原理?
把刚刚这个模式
一个任务队列加上一系列的线程,从这个任务队列取任务并且执行,
放在这里
就是我们每读出一个数据之后,然后把这个数据我们可以连同解析一起,把它当做一个任务,再由线程池来进行执行,就这么一个动作
那我们再来回过头来捋一下,回过头来再来思考一下。
网络这一层,整个一个服务器从他接收数据,网络数据的处理分为三个部分
就是从网卡里面接收数据到处理数据,这个过程总共有三个阶段。
第一是检测io里面是否有数据,是否可读,是否可写,io事件是否就绪,这是第一个。
检测完了之后,
第二对它进行读写操作,
第三步读写完之后对数据进行解析与操作。
那这三个步骤里第一个步骤,网络数据最前面这层通过epoll
第二个是我们通过recv/send
第三步parser我们进行解析,那这个解析的流程就会比较多,
这三个过程,这是对每一帧网络数据都要经过这三个流程
那这个过程中间就引起了一个现象,写一个比如说多线程,然后多线程网络框架那这个东西怎么去理解,哪个地方用的多线程,
这个流程可以通过这么一个数据的流向,
这个线程池,在这个里面,我们怎么跟数据流程去结合?
举个例子,先假设我们已经有了线程池,现在不用去管线程池的实现,
是一个线程,我们要引入线程池,引入线程池的话,各位那我们可以怎么做?有这么两种做法,两种做法都是可以的。
第一种我们直接把每一个任务把它抛到线程池里面,这是第一种做法。现在也就是说我们读、写、以及解析,都在这个线程
这是第一种做法。
第二种做法我们也可以是这样,
我们把recv放过来,我们再接着剩下的buffer,放线程池
讨论一下,
第一种第二种第三种都是可以实现的。
第一种做法就是一个单线程,第二种做法我们是把读写io以及解析数据这两个任务抛到线程池
第三种我们只把解析完的数据抛到线程池里面
但是每一种都有每一种优点和缺点,各位都有优点的缺点,
但是我们现在来分析一下这三种做法它的优点和缺点分别是什么?
第一种大家可以看到它是一个单线程,
如果对于任务比较轻的纯内存操作,这个是ok的,这里面不涉及到比如说写日志,它只是业务比较复杂而已,因为它在读写的时候不耗时的,可以考虑第三种
那第二种这种做法把fd抛给另外一个线程,他解决了什么问题?
如果这种情况是针对于哪一种?
针对于io的操作时间比较长,比如说这个fd是一个阻塞的,就可以采用第二种方式,
这三种做法里面速度最快的,不影响主循环第二种是最快的,但是
第二种有一个很大的问题,有一个大的问题,多个线程共用一个fd现象,
这个现象是怎么来的?
我不是一个任务,我把它抛给线程池吗?那怎么出现多个线程池共用一个fd的现象?
epoll检测完之后把这个 fd直接抛给了线程池,现在由线程池再进行处理,现在两个任务分别发出去,但是时间间隔不是很长,那也就是说这个 fd检测完之后,这个fd很有可能分给另外一线程
它的缺点就会出现一个现象,就是有可能因为这两个线程你都把它抛给了线程池来处理,所有的数据操作我们是不管的,那大家就会出现一个现象。
就是可能a线程对他进行读,对他进行写的时候,线程b很有可能把数据给closer关闭了。
就是线程a在准备数据的时候或者数据就绪的时候,
那结果线程b进行close掉了
因为这个fd大家可以看到我们这个代码上面我们是不进行任何的处理,而对应的只是把所有的fd都给有线程处理
这几个现象里面,既然这一个它的效率不影响主循环的情况下的效率是最高的,但是这个 fd共享就是比较难的。
我们该如何去解决这个问题,或者说有没有解的角度,fd不应该是在多线程共享
多线程网络框架,它的其实对于底层来说,就是把这三个组件,你哪些东西把它丢给线程池里面,
把检测io放到多个线的里面,ok的。
把读写lo放到多个线程里面也是ok的,
把协议解析以及操作放到多线程里面也是ok的
始终保持这三点
线程池的核心有俩部分组成一个任务队列,第二个是我们讲到的执行队列,第三个部分我们需要管理组件,有秩序的工作。
第一个,加锁,互斥锁
第二个,条件变量
就是通过这俩方面使它有秩序的管理
我们来分析这个任务队列由那些东西组成?
有个前提,任务队列它是任务的一个集合,首先我们先把任务描述清楚,然后再来描述这个集合,两层关系
那任务里面包含那些东西呢?
我们怎么去把它封装成一个一个任务?
第一个首先每一个人任务都不一样,存款或者有些取款,或者有些可能去打印流水等等
这里需要一个回调函数callback();这个回调函数是由任务自己本身去实现的,把它抽象出来,
第二个还有一个参数,如果你想要去办存款,你得带上你的现金,如果你去办取款,你得带上你的银行卡,所以每一个任务执行,每一个回调函数都需要带不同的参数
这个任务我们封装完之后,我们怎么把它变成一个队列,为它加上前后指针,我们就能够把它串起来。
一个回调函数,以及一个回调函数对应的参数在加上前驱后继,我们只要记录一个头指针?
如果当我们任务很少时,我们一次性开了很多线程我们怎么去退出?
中间就用一些进行休息,这里有一个状态terminate,
关于线程的退出,pthread_exit(thid);退出线程id这个是OK的,
返回的时候,如果我们有线程加入pthread_join(thid)等待它返回的话,父线程针对子线程返回的时候它会等待这个结果,pthread_detach()分离后俩个就失去了亲缘关系,pthread_join(thid)就等待不出结果,
terminate这里线程退出我们是比较友好一点,pthread_cancel()退出比较粗暴,exit()函数我们压根就不知道这是什么状态,调用就直接关闭了,我们在执行阶段就能看到他的委婉,terminate可以走完整个流程完整的退出,会更加的优雅
这个管理组件里面大概包含这4个项目
关于队列我们添加和删除一个节点需要使用宏定义,为什么选用宏定义去添加删除节点呢?
可以注意到NJOB和NWORKER是俩个不同的类型,不同的类型我们对她进行同样的操作,在C++里面使用模板来做是OK的但是在C代码里面没有模板这个概念,也就是模板的底层应该是宏定义,
这个do /while(0)是什么意思?
宏定义的这个过程它跟内联是一样的,就是把代码copy过去,但是一个是在预编译的时候,一个是在编译的时候
由于do/while(0)这个语法就决定了它需要携带一个分号,这个地方采用头插法
删除也是类似的
所以后面不管是对NJOB的操作或者是NWORKER的操作都可以借助这俩个宏,
那线程池它需要具备用哪些接口?需要哪些API线程池才能够去使用?
首先第一个肯定有的一个是初始化,给出两个参数,第一个参数是线程池的对象,第二个是说我们创建多少个线程池
初始化的时候我们只需要创建一个东西,就是这个线程池的数量有多少?
线程创建我们只依赖于线程数量。
第二个函数,我们要做的就是往线程池里面抛一个任务,就是把一个task抛到对应的任务队列里面
第三个接口,既然有创建当然就有销毁,
三个接口是对外的使用,理解这三个接口就可以了。
一个线程池它由那些东西组成?
第一个一个任务队列,第二个一个执行队列,第三个一个管理这俩个队列正常运行的一个组件
那这个在创建的时候呢,我们需要把执行队列初始化完,与之对应的互斥量条件变量创建完,
那在创建这个任务队列的时候,创建这个执行队列的时候,每一个任务每一个执行每一个worker是需要跟这个线程一一对应上的,然后把这个work直接把它抛到线程里面,每一个线程拿这个worker去工作,初始化可以说是4个部分,
1.参数是否合法
2.mutex
3.cond
4.worker
创建完之后线程池就可以开始工作了。
线程的入口函数和任务的回调函数之间是什么关系?
线程回调函数是不是就等同于任务回调函数?
这里面这个绿色的就相当于线程的入口函数,它是不断的去任务队列里面取任务执行,不断就是循环,取完任务之后,执行的时候就是执行回调函数
这个移除就相当于是把这个头节点给移除掉。
我们开的线程比较多的时候在那个地方进行放缩?怎么释放一些线程
接下来丢一个任务进去他能做那些事情?
现在多个线程等待这个条件,通知的时候会有惊群现象吗?
nginx线程池源码
ngx_thread_pool_cycle
防止线程被其它的东西屏蔽这些终止,就是防止线程pthread_sigmask被其它东西影响做了一个屏蔽
这里的cycle就是线程的入口函数,然后对应这里面的任务就是一个永真的循环,然后判断任务队列是不是为空,
条件等待往下面走是封装的接口,如果不为空的话从任务队列里面取出一个任务出来,出来之后解锁,之后就是handler,就是每一个任务的回调函数,逻辑都是一样的
看源码就可以这样对比,先把基础主体把握住,
第二个就是往线程池里面post抛一个任务进去
方法逻辑都是把任务加进去然后再去通知,
这个东西是什么意思?last它有俩个指针
这是往队列里面添加一个节点然后waiting++
它主要是统计任务的数量方便我们去放缩任务,但任务量很多我们可以增加线程,很少我们可以减少线程,
那这个二级指针到底做了什么?
这个二级指针就是这个
就是拿着它的第一个字段指向的下一个内存,为什么这么做好处在哪?
线程池本身没有性能优化,要前后对比,不加入线程池与加入线程池的区别,大概有6倍
多个线程共用一个fd怎么解,参考协程
做线程放缩怎么做?添加一个变量waiting,每添加任务++,判断任务数量和线程数量比例,线程数量退出,任务数量和线程数量统计,比如线程放缩策略4:6
#include #include #include #include #define LL_ADD(item, list) do { \item->prev = NULL;\item->next = list;\list = item;\} while(0)#define LL_REMOVE(item, list) do {\if (item->prev != NULL) item->prev->next = item->next;\if (item->next != NULL) item->next->prev = item->prev;\if (list == item) list = item->next;\item->prev = item->next = NULL;\} while(0)typedef struct NWORKER {pthread_t thread;int terminate;struct NWORKQUEUE *workqueue;struct NWORKER *prev;struct NWORKER *next;} nWorker;typedef struct NJOB {void (*job_function)(struct NJOB *job);void *user_data;struct NJOB *prev;struct NJOB *next;} nJob;typedef struct NWORKQUEUE {struct NWORKER *workers;struct NJOB *waiting_jobs;pthread_mutex_t jobs_mtx;pthread_cond_t jobs_cond;} nWorkQueue;typedef nWorkQueue nThreadPool;static void *ntyWorkerThread(void *ptr) {nWorker *worker = (nWorker*)ptr;while (1) {pthread_mutex_lock(&worker->workqueue->jobs_mtx);while (worker->workqueue->waiting_jobs == NULL) {if (worker->terminate) break;pthread_cond_wait(&worker->workqueue->jobs_cond, &worker->workqueue->jobs_mtx);}if (worker->terminate) {pthread_mutex_unlock(&worker->workqueue->jobs_mtx);break;}nJob *job = worker->workqueue->waiting_jobs;if (job != NULL) {LL_REMOVE(job, worker->workqueue->waiting_jobs);}pthread_mutex_unlock(&worker->workqueue->jobs_mtx);if (job == NULL) continue;job->job_function(job);}free(worker);pthread_exit(NULL);}int ntyThreadPoolCreate(nThreadPool *workqueue, int numWorkers) {if (numWorkers < 1) numWorkers = 1;memset(workqueue, 0, sizeof(nThreadPool));pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;memcpy(&workqueue->jobs_cond, &blank_cond, sizeof(workqueue->jobs_cond));pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;memcpy(&workqueue->jobs_mtx, &blank_mutex, sizeof(workqueue->jobs_mtx));int i = 0;for (i = 0;i < numWorkers;i ++) {nWorker *worker = (nWorker*)malloc(sizeof(nWorker));if (worker == NULL) {perror("malloc");return 1;}memset(worker, 0, sizeof(nWorker));worker->workqueue = workqueue;int ret = pthread_create(&worker->thread, NULL, ntyWorkerThread, (void *)worker);if (ret) {perror("pthread_create");free(worker);return 1;}LL_ADD(worker, worker->workqueue->workers);}return 0;}void ntyThreadPoolShutdown(nThreadPool *workqueue) {nWorker *worker = NULL;for (worker = workqueue->workers;worker != NULL;worker = worker->next) {worker->terminate = 1;}pthread_mutex_lock(&workqueue->jobs_mtx);workqueue->workers = NULL;workqueue->waiting_jobs = NULL;pthread_cond_broadcast(&workqueue->jobs_cond);pthread_mutex_unlock(&workqueue->jobs_mtx);}void ntyThreadPoolQueue(nThreadPool *workqueue, nJob *job) {pthread_mutex_lock(&workqueue->jobs_mtx);LL_ADD(job, workqueue->waiting_jobs);pthread_cond_signal(&workqueue->jobs_cond);pthread_mutex_unlock(&workqueue->jobs_mtx);}/ debug thread pool ///sdk --> software develop kit// 提供SDK给其他开发者使用#if 1#define KING_MAX_THREAD80#define KING_COUNTER_SIZE1000void king_counter(nJob *job) {int index = *(int*)job->user_data;printf("index : %d, selfid : %lu\n", index, pthread_self());free(job->user_data);free(job);}int main(int argc, char *argv[]) {nThreadPool pool;ntyThreadPoolCreate(&pool, KING_MAX_THREAD);int i = 0;for (i = 0;i < KING_COUNTER_SIZE;i ++) {nJob *job = (nJob*)malloc(sizeof(nJob));if (job == NULL) {perror("malloc");exit(1);}job->job_function = king_counter;job->user_data = malloc(sizeof(int));*(int*)job->user_data = i;ntyThreadPoolQueue(&pool, job);}getchar();printf("\n");}#endif
/* * Copyright (C) Nginx, Inc. * Copyright (C) Valentin V. Bartenev */#ifndef _NGX_THREAD_POOL_H_INCLUDED_#define _NGX_THREAD_POOL_H_INCLUDED_#include #include #include struct ngx_thread_task_s { ngx_thread_task_t *next; ngx_uint_t id; void *ctx; void (*handler)(void *data, ngx_log_t *log); ngx_event_t event;};typedef struct ngx_thread_pool_s ngx_thread_pool_t;ngx_thread_pool_t *ngx_thread_pool_add(ngx_conf_t *cf, ngx_str_t *name);ngx_thread_pool_t *ngx_thread_pool_get(ngx_cycle_t *cycle, ngx_str_t *name);ngx_thread_task_t *ngx_thread_task_alloc(ngx_pool_t *pool, size_t size);ngx_int_t ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task);#endif /* _NGX_THREAD_POOL_H_INCLUDED_ */
/* * Copyright (C) Nginx, Inc. * Copyright (C) Valentin V. Bartenev * Copyright (C) Ruslan Ermilov */#include #include #include typedef struct { ngx_array_t pools;} ngx_thread_pool_conf_t;typedef struct { ngx_thread_task_t *first; ngx_thread_task_t**last;} ngx_thread_pool_queue_t;#define ngx_thread_pool_queue_init(q) \ (q)->first = NULL;\ (q)->last = &(q)->firststruct ngx_thread_pool_s { ngx_thread_mutex_t mtx; ngx_thread_pool_queue_t queue; ngx_int_t waiting; ngx_thread_cond_t cond; ngx_log_t *log; ngx_str_t name; ngx_uint_t threads; ngx_int_t max_queue; u_char *file; ngx_uint_t line;};static ngx_int_t ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool);static void ngx_thread_pool_destroy(ngx_thread_pool_t *tp);static void ngx_thread_pool_exit_handler(void *data, ngx_log_t *log);static void *ngx_thread_pool_cycle(void *data);static void ngx_thread_pool_handler(ngx_event_t *ev);static char *ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);static void *ngx_thread_pool_create_conf(ngx_cycle_t *cycle);static char *ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf);static ngx_int_t ngx_thread_pool_init_worker(ngx_cycle_t *cycle);static void ngx_thread_pool_exit_worker(ngx_cycle_t *cycle);static ngx_command_t ngx_thread_pool_commands[] = { { ngx_string("thread_pool"), NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE23, ngx_thread_pool, 0, 0, NULL }, ngx_null_command};static ngx_core_module_t ngx_thread_pool_module_ctx = { ngx_string("thread_pool"), ngx_thread_pool_create_conf, ngx_thread_pool_init_conf};ngx_module_t ngx_thread_pool_module = { NGX_MODULE_V1, &ngx_thread_pool_module_ctx, /* module context */ ngx_thread_pool_commands,/* module directives */ NGX_CORE_MODULE, /* module type */ NULL, /* init master */ NULL, /* init module */ ngx_thread_pool_init_worker, /* init process */ NULL, /* init thread */ NULL, /* exit thread */ ngx_thread_pool_exit_worker, /* exit process */ NULL, /* exit master */ NGX_MODULE_V1_PADDING};static ngx_str_t ngx_thread_pool_default = ngx_string("default");static ngx_uint_t ngx_thread_pool_task_id;static ngx_atomic_t ngx_thread_pool_done_lock;static ngx_thread_pool_queue_t ngx_thread_pool_done;static ngx_int_tngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool){ int err; pthread_ttid; ngx_uint_t n; pthread_attr_t attr; if (ngx_notify == NULL) { ngx_log_error(NGX_LOG_ALERT, log, 0, "the configured event method cannot be used with thread pools"); return NGX_ERROR; } ngx_thread_pool_queue_init(&tp->queue); if (ngx_thread_mutex_create(&tp->mtx, log) != NGX_OK) { return NGX_ERROR; } if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) { (void) ngx_thread_mutex_destroy(&tp->mtx, log); return NGX_ERROR; } tp->log = log; err = pthread_attr_init(&attr); if (err) { ngx_log_error(NGX_LOG_ALERT, log, err, "pthread_attr_init() failed"); return NGX_ERROR; } err = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); if (err) { ngx_log_error(NGX_LOG_ALERT, log, err, "pthread_attr_setdetachstate() failed"); return NGX_ERROR; }#if 0 err = pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN); if (err) { ngx_log_error(NGX_LOG_ALERT, log, err, "pthread_attr_setstacksize() failed"); return NGX_ERROR; }#endif for (n = 0; n < tp->threads; n++) { err = pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp); if (err) { ngx_log_error(NGX_LOG_ALERT, log, err, "pthread_create() failed"); return NGX_ERROR; } } (void) pthread_attr_destroy(&attr); return NGX_OK;}static voidngx_thread_pool_destroy(ngx_thread_pool_t *tp){ ngx_uint_t n; ngx_thread_task_t task; volatile ngx_uint_t lock; ngx_memzero(&task, sizeof(ngx_thread_task_t)); task.handler = ngx_thread_pool_exit_handler; task.ctx = (void *) &lock; for (n = 0; n < tp->threads; n++) { lock = 1; if (ngx_thread_task_post(tp, &task) != NGX_OK) { return; } while (lock) { ngx_sched_yield(); } task.event.active = 0; } (void) ngx_thread_cond_destroy(&tp->cond, tp->log); (void) ngx_thread_mutex_destroy(&tp->mtx, tp->log);}static voidngx_thread_pool_exit_handler(void *data, ngx_log_t *log){ ngx_uint_t *lock = data; *lock = 0; pthread_exit(0);}ngx_thread_task_t *ngx_thread_task_alloc(ngx_pool_t *pool, size_t size){ ngx_thread_task_t *task; task = ngx_pcalloc(pool, sizeof(ngx_thread_task_t) + size); if (task == NULL) { return NULL; } task->ctx = task + 1; return task;}ngx_int_tngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task){ if (task->event.active) { ngx_log_error(NGX_LOG_ALERT, tp->log, 0, "task #%ui already active", task->id); return NGX_ERROR; } if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) { return NGX_ERROR; } if (tp->waiting >= tp->max_queue) { (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log); ngx_log_error(NGX_LOG_ERR, tp->log, 0, "thread pool \"%V\" queue overflow: %i tasks waiting", &tp->name, tp->waiting); return NGX_ERROR; } task->event.active = 1; task->id = ngx_thread_pool_task_id++; task->next = NULL; if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) { (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log); return NGX_ERROR; } *tp->queue.last = task; tp->queue.last = &task->next; tp->waiting++; (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log); ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0, "task #%ui added to thread pool \"%V\"", task->id, &tp->name); return NGX_OK;}static void *ngx_thread_pool_cycle(void *data){ ngx_thread_pool_t *tp = data; int err; sigset_t set; ngx_thread_task_t *task;#if 0 ngx_time_update();#endif ngx_log_debug1(NGX_LOG_DEBUG_CORE, tp->log, 0, "thread in pool \"%V\" started", &tp->name); sigfillset(&set); sigdelset(&set, SIGILL); sigdelset(&set, SIGFPE); sigdelset(&set, SIGSEGV); sigdelset(&set, SIGBUS); err = pthread_sigmask(SIG_BLOCK, &set, NULL); if (err) { ngx_log_error(NGX_LOG_ALERT, tp->log, err, "pthread_sigmask() failed"); return NULL; } for ( ;; ) { if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) { return NULL; } /* the number may become negative */ tp->waiting--; while (tp->queue.first == NULL) { if (ngx_thread_cond_wait(&tp->cond, &tp->mtx, tp->log) != NGX_OK) { (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log); return NULL; } } task = tp->queue.first; tp->queue.first = task->next; if (tp->queue.first == NULL) { tp->queue.last = &tp->queue.first; } if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) { return NULL; }#if 0 ngx_time_update();#endif ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0, "run task #%ui in thread pool \"%V\"", task->id, &tp->name); task->handler(task->ctx, tp->log); ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0, "complete task #%ui in thread pool \"%V\"", task->id, &tp->name); task->next = NULL; ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048); *ngx_thread_pool_done.last = task; ngx_thread_pool_done.last = &task->next; ngx_memory_barrier(); ngx_unlock(&ngx_thread_pool_done_lock); (void) ngx_notify(ngx_thread_pool_handler); }}static voidngx_thread_pool_handler(ngx_event_t *ev){ ngx_event_t *event; ngx_thread_task_t *task; ngx_log_debug0(NGX_LOG_DEBUG_CORE, ev->log, 0, "thread pool handler"); ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048); task = ngx_thread_pool_done.first; ngx_thread_pool_done.first = NULL; ngx_thread_pool_done.last = &ngx_thread_pool_done.first; ngx_memory_barrier(); ngx_unlock(&ngx_thread_pool_done_lock); while (task) { ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0, "run completion handler for task #%ui", task->id); event = &task->event; task = task->next; event->complete = 1; event->active = 0; event->handler(event); }}static void *ngx_thread_pool_create_conf(ngx_cycle_t *cycle){ ngx_thread_pool_conf_t *tcf; tcf = ngx_pcalloc(cycle->pool, sizeof(ngx_thread_pool_conf_t)); if (tcf == NULL) { return NULL; } if (ngx_array_init(&tcf->pools, cycle->pool, 4, sizeof(ngx_thread_pool_t *)) != NGX_OK) { return NULL; } return tcf;}static char *ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf){ ngx_thread_pool_conf_t *tcf = conf; ngx_uint_t i; ngx_thread_pool_t **tpp; tpp = tcf->pools.elts; for (i = 0; i < tcf->pools.nelts; i++) { if (tpp[i]->threads) { continue; } if (tpp[i]->name.len == ngx_thread_pool_default.len && ngx_strncmp(tpp[i]->name.data, ngx_thread_pool_default.data, ngx_thread_pool_default.len) == 0) { tpp[i]->threads = 32; tpp[i]->max_queue = 65536; continue; } ngx_log_error(NGX_LOG_EMERG, cycle->log, 0, "unknown thread pool \"%V\" in %s:%ui", &tpp[i]->name, tpp[i]->file, tpp[i]->line); return NGX_CONF_ERROR; } return NGX_CONF_OK;}static char *ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf){ ngx_str_t *value; ngx_uint_t i; ngx_thread_pool_t *tp; value = cf->args->elts; tp = ngx_thread_pool_add(cf, &value[1]); if (tp == NULL) { return NGX_CONF_ERROR; } if (tp->threads) { ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "duplicate thread pool \"%V\"", &tp->name); return NGX_CONF_ERROR; } tp->max_queue = 65536; for (i = 2; i < cf->args->nelts; i++) { if (ngx_strncmp(value[i].data, "threads=", 8) == 0) { tp->threads = ngx_atoi(value[i].data + 8, value[i].len - 8); if (tp->threads == (ngx_uint_t) NGX_ERROR || tp->threads == 0) { ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,"invalid threads value \"%V\"", &value[i]); return NGX_CONF_ERROR; } continue; } if (ngx_strncmp(value[i].data, "max_queue=", 10) == 0) { tp->max_queue = ngx_atoi(value[i].data + 10, value[i].len - 10); if (tp->max_queue == NGX_ERROR) { ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,"invalid max_queue value \"%V\"", &value[i]); return NGX_CONF_ERROR; } continue; } } if (tp->threads == 0) { ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "\"%V\" must have \"threads\" parameter", &cmd->name); return NGX_CONF_ERROR; } return NGX_CONF_OK;}ngx_thread_pool_t *ngx_thread_pool_add(ngx_conf_t *cf, ngx_str_t *name){ ngx_thread_pool_t*tp, **tpp; ngx_thread_pool_conf_t *tcf; if (name == NULL) { name = &ngx_thread_pool_default; } tp = ngx_thread_pool_get(cf->cycle, name); if (tp) { return tp; } tp = ngx_pcalloc(cf->pool, sizeof(ngx_thread_pool_t)); if (tp == NULL) { return NULL; } tp->name = *name; tp->file = cf->conf_file->file.name.data; tp->line = cf->conf_file->line; tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cf->cycle->conf_ctx, ngx_thread_pool_module); tpp = ngx_array_push(&tcf->pools); if (tpp == NULL) { return NULL; } *tpp = tp; return tp;}ngx_thread_pool_t *ngx_thread_pool_get(ngx_cycle_t *cycle, ngx_str_t *name){ ngx_uint_t i; ngx_thread_pool_t**tpp; ngx_thread_pool_conf_t *tcf; tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_thread_pool_module); tpp = tcf->pools.elts; for (i = 0; i < tcf->pools.nelts; i++) { if (tpp[i]->name.len == name->len && ngx_strncmp(tpp[i]->name.data, name->data, name->len) == 0) { return tpp[i]; } } return NULL;}static ngx_int_tngx_thread_pool_init_worker(ngx_cycle_t *cycle){ ngx_uint_t i; ngx_thread_pool_t**tpp; ngx_thread_pool_conf_t *tcf; if (ngx_process != NGX_PROCESS_WORKER && ngx_process != NGX_PROCESS_SINGLE) { return NGX_OK; } tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_thread_pool_module); if (tcf == NULL) { return NGX_OK; } ngx_thread_pool_queue_init(&ngx_thread_pool_done); tpp = tcf->pools.elts; for (i = 0; i < tcf->pools.nelts; i++) { if (ngx_thread_pool_init(tpp[i], cycle->log, cycle->pool) != NGX_OK) { return NGX_ERROR; } } return NGX_OK;}static voidngx_thread_pool_exit_worker(ngx_cycle_t *cycle){ ngx_uint_t i; ngx_thread_pool_t**tpp; ngx_thread_pool_conf_t *tcf; if (ngx_process != NGX_PROCESS_WORKER && ngx_process != NGX_PROCESS_SINGLE) { return; } tcf = (ngx_thread_pool_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_thread_pool_module); if (tcf == NULL) { return; } tpp = tcf->pools.elts; for (i = 0; i < tcf->pools.nelts; i++) { ngx_thread_pool_destroy(tpp[i]); }}