周末看了nginx線程池部分的代碼,順手照抄了一遍,寫成了自己的版本。實現上某些地方還是有差異的,不過基本結構全部摘抄。
在這里分享一下。如果你看懂了我的版本,也就證明你看懂了nginx的線程池。
本文只列出了關鍵數據結構和API,重在理解nginx線程池設計思路。完整代碼在最后的鏈接里。
1.任務節點
1
2
3
4
5
6
7
8
9
|
typedef void (*CB_FUN)( void *); //任務結構體 typedef struct task { void *argv; //任務函數的參數(任務執行結束前,要保證參數地址有效) CB_FUN handler; //任務函數(返回值必須為0 非0值用作增加線程,和銷毀線程池) struct task *next; //任務鏈指針 }zoey_task_t; |
handler為函數指針,是實際的任務函數,argv為該函數的參數,next指向下一個任務。
2.任務隊列
1
2
3
4
5
6
7
|
typedef struct task_queue { zoey_task_t *head; //隊列頭 zoey_task_t **tail; //隊列尾 unsigned int maxtasknum; //最大任務限制 unsigned int curtasknum; //當前任務數 }zoey_task_queue_t; |
head為任務隊列頭指針,tail為任務隊列尾指針,maxtasknum為隊列最大任務數限制,curtasknum為隊列當前任務數。
3.線程池
1
2
3
4
5
6
7
8
9
10
|
typedef struct threadpool { pthread_mutex_t mutex; //互斥鎖 pthread_cond_t cond; //條件鎖 zoey_task_queue_t tasks; //任務隊列 unsigned int threadnum; //線程數 unsigned int thread_stack_size; //線程堆棧大小 }zoey_threadpool_t; |
mutex為互斥鎖 cond為條件鎖。mutex和cond共同保證線程池任務的互斥領取或者添加。
tasks指向任務隊列。
threadnum為線程池的線程數
thread_stack_size為線程堆棧大小
4.啟動配置
1
2
3
4
5
6
7
|
//配置參數 typedef struct threadpool_conf { unsigned int threadnum; //線程數 unsigned int thread_stack_size; //線程堆棧大小 unsigned int maxtasknum; //最大任務限制 }zoey_threadpool_conf_t; |
啟動配置結構體是初始化線程池時的一些參數。
5.初始化線程池
首先檢查參數是否合法,然后初始化mutex,cond,key(pthread_key_t)。key用來讀寫線程全局變量,此全局變量控制線程是否退出。
最后創建線程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
zoey_threadpool_t* zoey_threadpool_init(zoey_threadpool_conf_t *conf) { zoey_threadpool_t *pool = NULL; int error_flag_mutex = 0; int error_flag_cond = 0; pthread_attr_t attr; do { if (z_conf_check(conf) == -1){ //檢查參數是否合法 break ; } pool = (zoey_threadpool_t *) malloc ( sizeof (zoey_threadpool_t)); //申請線程池句柄 if (pool == NULL){ break ; } //初始化線程池基本參數 pool->threadnum = conf->threadnum; pool->thread_stack_size = conf->thread_stack_size; pool->tasks.maxtasknum = conf->maxtasknum; pool->tasks.curtasknum = 0; z_task_queue_init(&pool->tasks); if (z_thread_key_create() != 0){ //創建一個pthread_key_t,用以訪問線程全局變量。 free (pool); break ; } if (z_thread_mutex_create(&pool->mutex) != 0){ //初始化互斥鎖 z_thread_key_destroy(); free (pool); break ; } if (z_thread_cond_create(&pool->cond) != 0){ //初始化條件鎖 z_thread_key_destroy(); z_thread_mutex_destroy(&pool->mutex); free (pool); break ; } if (z_threadpool_create(pool) != 0){ //創建線程池 z_thread_key_destroy(); z_thread_mutex_destroy(&pool->mutex); z_thread_cond_destroy(&pool->cond); free (pool); break ; } return pool; } while (0); return NULL; } |
6.添加任務
首先申請一個任務節點,實例化后將節點加入任務隊列,并將當前任務隊列數++并通知其他進程有新任務。整個過程加鎖。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
int zoey_threadpool_add_task(zoey_threadpool_t *pool, CB_FUN handler, void * argv) { zoey_task_t *task = NULL; //申請一個任務節點并賦值 task = (zoey_task_t *) malloc ( sizeof (zoey_task_t)); if (task == NULL){ return -1; } task->handler = handler; task->argv = argv; task->next = NULL; if (pthread_mutex_lock(&pool->mutex) != 0){ //加鎖 free (task); return -1; } do { if (pool->tasks.curtasknum >= pool->tasks.maxtasknum){ //判斷工作隊列中的任務數是否達到限制 break ; } //將任務節點尾插到任務隊列 *(pool->tasks.tail) = task; pool->tasks.tail = &task->next; pool->tasks.curtasknum++; //通知阻塞的線程 if (pthread_cond_signal(&pool->cond) != 0){ break ; } //解鎖 pthread_mutex_unlock(&pool->mutex); return 0; } while (0); pthread_mutex_unlock(&pool->mutex); free (task); return -1; } |
7.銷毀線程池
銷毀線程池其實也是向任務隊列添加任務,只不過添加的任務是讓線程退出。z_threadpool_exit_cb函數會將lock置0后退出線程,lock為0表示此線程
已經退出,接著退出下一個線程。退出完線程釋放所有資源。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
void zoey_threadpool_destroy(zoey_threadpool_t *pool) { unsigned int n = 0; volatile unsigned int lock; //z_threadpool_exit_cb函數會使對應線程退出 for (; n < pool->threadnum; n++){ lock = 1; if (zoey_threadpool_add_task(pool, z_threadpool_exit_cb, &lock) != 0){ return ; } while (lock){ usleep(1); } } z_thread_mutex_destroy(&pool->mutex); z_thread_cond_destroy(&pool->cond); z_thread_key_destroy(); free (pool); } |
8.增加一個線程
很簡單,再生成一個線程以及線程數++即可。加鎖。
1
2
3
4
5
6
7
8
9
10
|
int zoey_thread_add(zoey_threadpool_t *pool) { int ret = 0; if (pthread_mutex_lock(&pool->mutex) != 0){ return -1; } ret = z_thread_add(pool); pthread_mutex_unlock(&pool->mutex); return ret; } |
9.改變任務隊列最大任務限制
當num=0時設置線程數為無限大。
1
2
3
4
5
6
7
8
|
void zoey_set_max_tasknum(zoey_threadpool_t *pool,unsigned int num) { if (pthread_mutex_lock(&pool->mutex) != 0){ return -1; } z_change_maxtask_num(pool, num); //改變最大任務限制 pthread_mutex_unlock(&pool->mutex); } |
10.使用示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
int main() { int array[10000] = {0}; int i = 0; zoey_threadpool_conf_t conf = {5,0,5}; //實例化啟動參數 zoey_threadpool_t *pool = zoey_threadpool_init(&conf); //初始化線程池 if (pool == NULL){ return 0; } for (; i < 10000; i++){ array[i] = i; if (i == 80){ zoey_thread_add(pool); //增加線程 zoey_thread_add(pool); } if (i == 100){ zoey_set_max_tasknum(pool, 0); //改變最大任務數 0為不做上限 } while (1){ if (zoey_threadpool_add_task(pool, testfun, &array[i]) == 0){ break ; } printf ( "error in i = %d\n" ,i); } } zoey_threadpool_destroy(pool); while (1){ sleep(5); } return 0; } |
11.源碼
https://github.com/unlikewashface/zoey_threadpool.git
線程池可以發揮更多作用,比如可以把連接放到線程池里。nginx的異步加lua的協程是個非常好的組合,現在有了線程池后,線程池加協程將是另一個選擇。總而言之,如果在保證性能的情況下,讓nginx開發變得非常簡單,這是非常利好的消息。