線程池有兩個核心的概念,一個是任務隊列,一個是工作線程隊列。任務隊列負責存放主線程需要處理的任務,工作線程隊列其實是一個死循環,負責從任務隊列中取出和運行任務,可以看成是一個生產者和多個消費l者的模型。在一些高并發的網絡應用中,線程池也是常用的技術。陳碩大神推薦的C++多線程服務端編程模式為:one loop per thread + thread pool,通常會有單獨的線程負責接受來自客戶端的請求,對請求稍作解析后將數據處理的任務提交到專門的計算線程池。
ThreadPool 線程池同步事件: 線程池內的線程函數同樣支持互斥鎖
,信號控制
,內核事件控制
,臨界區控制
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
|
#include <Windows.h> #include <iostream> #include <stdlib.h> unsigned long g_count = 0; // -------------------------------------------------------------- // 線程池同步-互斥量同步 void NTAPI TaskHandlerMutex(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work) { // 鎖定資源 WaitForSingleObject(*( HANDLE *)Context, INFINITE); for ( int x = 0; x < 100; x++) { printf ( "線程ID: %d ---> 子線程: %d \n" , GetCurrentThreadId(), x); g_count = g_count + 1; } // 解鎖資源 ReleaseMutexWhenCallbackReturns(Instance, *( HANDLE *)Context); } void TestMutex() { // 創建互斥量 HANDLE hMutex = CreateMutex(NULL, FALSE, NULL); PTP_WORK pool = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandlerMutex, &hMutex, NULL); for ( int i = 0; i < 1000; i++) { SubmitThreadpoolWork(pool); } WaitForThreadpoolWorkCallbacks(pool, FALSE); CloseThreadpoolWork(pool); CloseHandle(hMutex); printf ( "相加后 ---> %d \n" , g_count); } // -------------------------------------------------------------- // 線程池同步-事件內核對象 void NTAPI TaskHandlerKern(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work) { // 鎖定資源 WaitForSingleObject(*( HANDLE *)Context, INFINITE); for ( int x = 0; x < 100; x++) { printf ( "線程ID: %d ---> 子線程: %d \n" , GetCurrentThreadId(), x); g_count = g_count + 1; } // 解鎖資源 SetEventWhenCallbackReturns(Instance, *( HANDLE *)Context); } void TestKern() { HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); SetEvent(hEvent); PTP_WORK pwk = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandlerKern, &hEvent, NULL); for ( int i = 0; i < 1000; i++) { SubmitThreadpoolWork(pwk); } WaitForThreadpoolWorkCallbacks(pwk, FALSE); CloseThreadpoolWork(pwk); printf ( "相加后 ---> %d \n" , g_count); } // -------------------------------------------------------------- // 線程池同步-信號量同步 void NTAPI TaskHandlerSemaphore(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work) { // 鎖定資源 WaitForSingleObject(*( HANDLE *)Context, INFINITE); for ( int x = 0; x < 100; x++) { printf ( "線程ID: %d ---> 子線程: %d \n" , GetCurrentThreadId(), x); g_count = g_count + 1; } // 解鎖資源 ReleaseSemaphoreWhenCallbackReturns(Instance, *( HANDLE *)Context, 1); } void TestSemaphore() { // 創建信號量為100 HANDLE hSemaphore = CreateSemaphore(NULL, 0, 100, NULL); ReleaseSemaphore(hSemaphore, 10, NULL); PTP_WORK pwk = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandlerSemaphore, &hSemaphore, NULL); for ( int i = 0; i < 1000; i++) { SubmitThreadpoolWork(pwk); } WaitForThreadpoolWorkCallbacks(pwk, FALSE); CloseThreadpoolWork(pwk); CloseHandle(hSemaphore); printf ( "相加后 ---> %d \n" , g_count); } // -------------------------------------------------------------- // 線程池同步-臨界區 void NTAPI TaskHandlerLeave(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work) { // 鎖定資源 EnterCriticalSection((CRITICAL_SECTION*)Context); for ( int x = 0; x < 100; x++) { printf ( "線程ID: %d ---> 子線程: %d \n" , GetCurrentThreadId(), x); g_count = g_count + 1; } // 解鎖資源 LeaveCriticalSectionWhenCallbackReturns(Instance, (CRITICAL_SECTION*)Context); } void TestLeave() { CRITICAL_SECTION cs; InitializeCriticalSection(&cs); PTP_WORK pwk = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandlerLeave, &cs, NULL); for ( int i = 0; i < 1000; i++) { SubmitThreadpoolWork(pwk); } WaitForThreadpoolWorkCallbacks(pwk, FALSE); DeleteCriticalSection(&cs); CloseThreadpoolWork(pwk); printf ( "相加后 ---> %d \n" , g_count); } int main( int argc, char *argv) { //TestMutex(); //TestKern(); //TestSemaphore(); TestLeave(); system ( "pause" ); return 0; } |
簡單的IO讀寫:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
#include <Windows.h> #include <iostream> #include <stdlib.h> // 簡單的異步文本讀寫 int ReadWriteIO() { char enContent[] = "hello lyshark" ; char deContent[255] = { 0 }; // 異步寫文件 HANDLE hFileWrite = CreateFile(L "d://test.txt" , GENERIC_WRITE, 0, NULL, OPEN_ALWAYS, FILE_FLAG_SEQUENTIAL_SCAN, NULL); if (INVALID_HANDLE_VALUE == hFileWrite) { return 0; } WriteFile(hFileWrite, enContent, strlen (enContent), NULL, NULL); FlushFileBuffers(hFileWrite); CancelSynchronousIo(hFileWrite); CloseHandle(hFileWrite); // 異步讀文件 HANDLE hFileRead = CreateFile(L "d://test.txt" , GENERIC_READ, 0, NULL, OPEN_ALWAYS, NULL, NULL); if (INVALID_HANDLE_VALUE == hFileRead) { return 0; } ReadFile(hFileRead, deContent, 255, NULL, NULL); CloseHandle(hFileRead); std::cout << "讀出內容: " << deContent << std::endl; return 1; } // 通過IO獲取文件大小 int GetFileSize() { HANDLE hFile = CreateFile(L "d://test.txt" , 0, 0, NULL, OPEN_EXISTING, NULL, NULL); if (INVALID_HANDLE_VALUE == hFile) { return 0; } ULARGE_INTEGER ulFileSize; ulFileSize.LowPart = GetFileSize(hFile, &ulFileSize.HighPart); LARGE_INTEGER lFileSize; BOOL ret = GetFileSizeEx(hFile, &lFileSize); std::cout << "文件大小A: " << ulFileSize.QuadPart << " bytes" << std::endl; std::cout << "文件大小B: " << lFileSize.QuadPart << " bytes" << std::endl; CloseHandle(hFile); return 1; } // 通過IO設置文件指針和文件尾 int SetFilePointer() { char deContent[255] = { 0 }; DWORD readCount = 0; HANDLE hFile = CreateFile(L "d://test.txt" , GENERIC_WRITE, 0, NULL, OPEN_ALWAYS, NULL, NULL); if (INVALID_HANDLE_VALUE == hFile) { return 0; } LARGE_INTEGER liMove; // 設置移動位置 liMove.QuadPart = 2; SetFilePointerEx(hFile, liMove, NULL, FILE_BEGIN); // 移動到文件末尾 SetEndOfFile(hFile); ReadFile(hFile, deContent, 255, &readCount, NULL); std::cout << "移動指針后讀取: " << deContent << " 讀入長度: " << readCount << std::endl; CloseHandle(hFile); // 設置編碼格式 _wsetlocale(LC_ALL, L "chs" ); setlocale (LC_ALL, "chs" ); wprintf(L "%s" , deContent); } int main( int argc, char *argv) { // 讀寫IO ReadWriteIO(); // 取文件長度 GetFileSize(); // 設置文件指針 SetFilePointer(); return 0; } |
到此這篇關于C/C++ 原生API實現線程池的文章就介紹到這了,更多相關C++實現線程池內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!
原文鏈接:https://www.cnblogs.com/LyShark/p/15493202.html