在linux 沒有實現epoll事件驅動機制之前,我們一般選擇用select或者poll等io多路復用的方法來實現并發服務程序。在linux新的內核中,有了一種替換它的機制,就是epoll。
select()和poll() io多路復用模型
select的缺點:
1.單個進程能夠監視的文件描述符的數量存在最大限制,通常是1024,當然可以更改數量,但由于select采用輪詢的方式掃描文件描述符,文件描述符數量越多,性能越差;(在linux內核頭文件中,有這樣的定義:#define __fd_setsize 1024)
2.內核 / 用戶空間內存拷貝問題,select需要復制大量的句柄數據結構,產生巨大的開銷;
3.select返回的是含有整個句柄的數組,應用程序需要遍歷整個數組才能發現哪些句柄發生了事件;
4.select的觸發方式是水平觸發,應用程序如果沒有完成對一個已經就緒的文件描述符進行io操作,那么之后每次select調用還是會將這些文件描述符通知進程。
相比select模型,poll使用鏈表保存文件描述符,因此沒有了監視文件數量的限制,但其他三個缺點依然存在。
假設我們的服務器需要支持100萬的并發連接,則在__fd_setsize 為1024的情況下,則我們至少需要開辟1k個進程才能實現100萬的并發連接。除了進程間上下文切換的時間消耗外,從內核/用戶空間大量的無腦內存拷貝、數組輪詢等,是系統難以承受的。因此,基于select模型的服務器程序,要達到10萬級別的并發訪問,是一個很難完成的任務。
epoll io多路復用模型實現機制
由于epoll的實現機制與select/poll機制完全不同,上面所說的 select的缺點在epoll上不復存在。
設想一下如下場景:有100萬個客戶端同時與一個服務器進程保持著tcp連接。而每一時刻,通常只有幾百上千個tcp連接是活躍的(事實上大部分場景都是這種情況)。如何實現這樣的高并發?
在select/poll時代,服務器進程每次都把這100萬個連接告訴操作系統(從用戶態復制句柄數據結構到內核態),讓操作系統內核去查詢這些套接字上是否有事件發生,輪詢完后,再將句柄數據復制到用戶態,讓服務器應用程序輪詢處理已發生的網絡事件,這一過程資源消耗較大,因此,select/poll一般只能處理幾千的并發連接。
epoll的設計和實現與select完全不同。epoll通過在linux內核中申請一個簡易的文件系統(文件系統一般用什么數據結構實現?b+樹)。把原先的select/poll調用分成了3個部分:
1)調用epoll_create()建立一個epoll對象(在epoll文件系統中為這個句柄對象分配資源)
2)調用epoll_ctl向epoll對象中添加這100萬個連接的套接字
3)調用epoll_wait收集發生的事件的連接
如此一來,要實現上面說是的場景,只需要在進程啟動時建立一個epoll對象,然后在需要的時候向這個epoll對象中添加或者刪除連接。同時,epoll_wait的效率也非常高,因為調用epoll_wait時,并沒有一股腦的向操作系統復制這100萬個連接的句柄數據,內核也不需要去遍歷全部的連接。
epoll實現機制
當某一進程調用epoll_create方法時,linux內核會創建一個eventpoll結構體,這個結構體中有兩個成員與epoll的使用方式密切相關。eventpoll結構體如下所示:
1
2
3
4
5
6
7
8
|
struct eventpoll{ .... /*紅黑樹的根節點,這顆樹中存儲著所有添加到epoll中的需要監控的事件*/ struct rb_root rbr; /*雙鏈表中則存放著將要通過epoll_wait返回給用戶的滿足條件的事件*/ struct list_head rdlist; .... }; |
每一個epoll對象都有一個獨立的eventpoll結構體,用于存放通過epoll_ctl方法向epoll對象中添加進來的事件。這些事件都會掛載在紅黑樹中,如此,重復添加的事件就可以通過紅黑樹而高效的識別出來(紅黑樹的插入時間效率是lgn,其中n為樹的高度)。
而所有添加到epoll中的事件都會與設備(網卡)驅動程序建立回調關系,也就是說,當相應的事件發生時會調用這個回調方法。這個回調方法在內核中叫ep_poll_callback,它會將發生的事件添加到rdlist雙鏈表中。
在epoll中,對于每一個事件,都會建立一個epitem結構體,如下所示:
1
2
3
4
5
6
7
|
struct epitem{ struct rb_node rbn; //紅黑樹節點 struct list_head rdllink; //雙向鏈表節點 struct epoll_filefd ffd; //事件句柄信息 struct eventpoll *ep; //指向其所屬的eventpoll對象 struct epoll_event event; //期待發生的事件類型 } |
當調用epoll_wait檢查是否有事件發生時,只需要檢查eventpoll對象中的rdlist雙鏈表中是否有epitem元素即可。如果rdlist不為空,則把發生的事件復制到用戶態,同時將事件數量返回給用戶。
通過紅黑樹和雙鏈表數據結構,并結合回調機制,造就了epoll的高效。
epoll的接口
1.epoll_create
創建epoll句柄
函數聲明:int epoll_create(int size)
參數:size用來告訴內核這個監聽的數目一共有多大。
返回值:返回創建了的epoll句柄。
當創建好epoll句柄后,它就是會占用一個fd值,在linux下如果查看/proc/進程id/fd/,是能夠看到這個fd的,所以在使用完epoll后,必須調用close()關閉,否則可能導致fd被耗盡。
2.epoll_ctl
將被監聽的描述符添加到epoll句柄或從epool句柄中刪除或者對監聽事件進行修改。
函數申明:int epoll_ctl(int epfd, int op, int fd, struct epoll_event*event);
參數:
epfd: epoll_create()的返回值
op:表示要進行的操作,其值分別為:
epoll_ctl_add: 注冊新的fd到epfd中;
epoll_ctl_mod: 修改已經注冊的fd的監聽事件;
epoll_ctl_del: 從epfd中刪除一個fd;
fd:需要操作/監聽的文件句柄
event:是告訴內核需要監聽什么事件,struct epoll_event如下:
1
2
3
4
5
6
7
8
9
10
11
|
typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t; struct epoll_event { __uint32_t events; /* epoll events */ epoll_data_t data; /* user data variable */ }; |
events可以是以下幾個宏的集合:
epollin:觸發該事件,表示對應的文件描述符上有可讀數據。(包括對端socket正常關閉);
epollout:觸發該事件,表示對應的文件描述符上可以寫數據;
epollpri:表示對應的文件描述符有緊急的數據可讀(這里應該表示有帶外數據到來);
epollerr:表示對應的文件描述符發生錯誤;
epollhup: 表示對應的文件描述符被掛斷;
epollet:將epoll設為邊緣觸發(edgetriggered)模式,這是相對于水平觸發(level triggered)來說的。
epolloneshot: 只監聽一次事件,當監聽完這次事件之后,如果還需要繼續監聽這個socket的話,需要再次把這個socket加入到epoll隊列里。
示例:
1
2
3
4
5
6
7
|
struct epoll_event ev; //設置與要處理的事件相關的文件描述符 ev.data.fd=listenfd; //設置要處理的事件類型 ev.events=epollin|epollet; //注冊epoll事件 epoll_ctl(epfd,epoll_ctl_add,listenfd,&ev); |
1.epoll_wait
等侍注冊在epfd上的socket fd的事件的發生,如果發生則將發生的sokct fd和事件類型放入到events數組中。
函數原型:int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
參數:
epfd:由epoll_create 生成的epoll文件描述符
events:用于回傳代處理事件的數組
maxevents:每次能處理的最大事件數
timeout:等待i/o事件發生的超時毫秒數,-1相當于阻塞,0相當于非阻塞。一般用-1即可
epoll的工作模式
et(edgetriggered):高速工作模式,只支持no_block(非阻塞模式)。在此模式下,當描述符從未就緒變為就緒時,內核通過epoll告知。然后它會假設用戶知道文件描述符已經就緒,并且不會再為那個文件描述符發送更多的就緒通知,直到某些操作導致那個文件描述符不再為就緒狀態了。(觸發模式只在數據就緒時通知一次,若數據沒有讀完,下一次不會通知,直到有新的就緒數據)
lt(leveltriggered):缺省工作方式,支持blocksocket和no_blocksocket。在lt模式下內核會告知一個文件描述符是否就緒了,然后可以對這個就緒的fd進行io操作。如果不作任何操作,內核還是會繼續通知!若數據沒有讀完,內核也會繼續通知,直至設備數據為空為止!
示例說明:
1.我們已經把一個用來從管道中讀取數據的文件句柄(rfd)添加到epoll描述符
2. 這個時候從管道的另一端被寫入了2kb的數據
3. 調用epoll_wait(2),并且它會返回rfd,說明它已經準備好讀取操作
4. 然后我們讀取了1kb的數據
5. 調用epoll_wait(2)……
et工作模式:
如果我們在第1步將rfd添加到epoll描述符的時候使用了epollet標志,在第2步執行了一個寫操作,第三步epoll_wait會返回同時通知的事件會銷毀。因為第4步的讀取操作沒有讀空文件輸入緩沖區內的數據,因此我們在第5步調用epoll_wait(2)完成后,是否掛起是不確定的。epoll工作在et模式的時候,必須使用非阻塞套接口,以避免由于一個文件句柄的阻塞讀/阻塞寫操作把處理多個文件描述符的任務餓死。
只有當read(2)或者write(2)返回eagain時(認為讀完)才需要掛起,等待。但這并不是說每次read()時都需要循環讀,直到讀到產生一個eagain才認為此次事件處理完成,當read()返回的讀到的數據長度小于請求的數據長度時(即小于sizeof(buf)),就可以確定此時緩沖中已沒有數據了,也就可以認為此事讀事件已處理完成。
lt工作模式:
lt方式調用epoll接口的時候,它就相當于一個速度比較快的poll(2),并且無論后面的數據是否被使用,因此他們具有同樣的職能。
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
|
/* * file epolltest.c */ #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <errno.h> #include <sys/socket.h> #include <netdb.h> #include <fcntl.h> #include <sys/epoll.h> #include <string.h> #define maxevents 64 //函數: //功能:創建和綁定一個tcp socket //參數:端口 //返回值:創建的socket static int create_and_bind ( char *port) { struct addrinfo hints; struct addrinfo *result, *rp; int s, sfd; memset (&hints, 0, sizeof ( struct addrinfo)); hints.ai_family = af_unspec; /* return ipv4 and ipv6 choices */ hints.ai_socktype = sock_stream; /* we want a tcp socket */ hints.ai_flags = ai_passive; /* all interfaces */ s = getaddrinfo (null, port, &hints, &result); if (s != 0) { fprintf (stderr, "getaddrinfo: %s\n" , gai_strerror (s)); return -1; } for (rp = result; rp != null; rp = rp->ai_next) { sfd = socket (rp->ai_family, rp->ai_socktype, rp->ai_protocol); if (sfd == -1) continue ; s = bind (sfd, rp->ai_addr, rp->ai_addrlen); if (s == 0) { /* we managed to bind successfully! */ break ; } close (sfd); } if (rp == null) { fprintf (stderr, "could not bind\n" ); return -1; } freeaddrinfo (result); return sfd; } //函數 //功能:設置socket為非阻塞的 static int make_socket_non_blocking ( int sfd) { int flags, s; //得到文件狀態標志 flags = fcntl (sfd, f_getfl, 0); if (flags == -1) { perror ( "fcntl" ); return -1; } //設置文件狀態標志 flags |= o_nonblock; s = fcntl (sfd, f_setfl, flags); if (s == -1) { perror ( "fcntl" ); return -1; } return 0; } //端口由參數argv[1]指定 int main ( int argc, char *argv[]) { int sfd, s; int efd; struct epoll_event event; struct epoll_event *events; if (argc != 2) { fprintf (stderr, "usage: %s [port]\n" , argv[0]); exit (exit_failure); } sfd = create_and_bind (argv[1]); if (sfd == -1) abort (); s = make_socket_non_blocking (sfd); if (s == -1) abort (); s = listen (sfd, somaxconn); if (s == -1) { perror ( "listen" ); abort (); } //除了參數size被忽略外,此函數和epoll_create完全相同 efd = epoll_create1 (0); if (efd == -1) { perror ( "epoll_create" ); abort (); } event.data.fd = sfd; event.events = epollin | epollet; //讀入,邊緣觸發方式 s = epoll_ctl (efd, epoll_ctl_add, sfd, &event); if (s == -1) { perror ( "epoll_ctl" ); abort (); } /* buffer where events are returned */ events = calloc (maxevents, sizeof event); /* the event loop */ while (1) { int n, i; n = epoll_wait (efd, events, maxevents, -1); for (i = 0; i < n; i++) { if ((events[i].events & epollerr) || (events[i].events & epollhup) || (!(events[i].events & epollin))) { /* an error has occured on this fd, or the socket is not ready for reading (why were we notified then?) */ fprintf (stderr, "epoll error\n" ); close (events[i].data.fd); continue ; } else if (sfd == events[i].data.fd) { /* we have a notification on the listening socket, which means one or more incoming connections. */ while (1) { struct sockaddr in_addr; socklen_t in_len; int infd; char hbuf[ni_maxhost], sbuf[ni_maxserv]; in_len = sizeof in_addr; infd = accept (sfd, &in_addr, &in_len); if (infd == -1) { if (( errno == eagain) || ( errno == ewouldblock)) { /* we have processed all incoming connections. */ break ; } else { perror ( "accept" ); break ; } } //將地址轉化為主機名或者服務名 s = getnameinfo (&in_addr, in_len, hbuf, sizeof hbuf, sbuf, sizeof sbuf, ni_numerichost | ni_numericserv); //flag參數:以數字名返回 //主機地址和服務地址 if (s == 0) { printf ( "accepted connection on descriptor %d " "(host=%s, port=%s)\n" , infd, hbuf, sbuf); } /* make the incoming socket non-blocking and add it to the list of fds to monitor. */ s = make_socket_non_blocking (infd); if (s == -1) abort (); event.data.fd = infd; event.events = epollin | epollet; s = epoll_ctl (efd, epoll_ctl_add, infd, &event); if (s == -1) { perror ( "epoll_ctl" ); abort (); } } continue ; } else { /* we have data on the fd waiting to be read. read and display it. we must read whatever data is available completely, as we are running in edge-triggered mode and won't get a notification again for the same data. */ int done = 0; while (1) { ssize_t count; char buf[512]; count = read (events[i].data.fd, buf, sizeof (buf)); if (count == -1) { /* if errno == eagain, that means we have read all data. so go back to the main loop. */ if ( errno != eagain) { perror ( "read" ); done = 1; } break ; } else if (count == 0) { /* end of file. the remote has closed the connection. */ done = 1; break ; } /* write the buffer to standard output */ s = write (1, buf, count); if (s == -1) { perror ( "write" ); abort (); } } if (done) { printf ( "closed connection on descriptor %d\n" , events[i].data.fd); /* closing the descriptor will make epoll remove it from the set of descriptors which are monitored. */ close (events[i].data.fd); } } } } free (events); close (sfd); return exit_success; } |
代碼編譯后,./epolltest 8888 ,在另外一個終端中執行
telnet 192.168.1.161 8888 ,192.168.1.161
為執行測試程序的ip。在telnet終端敲入任何字符敲入enter后,會在測試終端顯示敲入的字符。
總結
以上就是本文關于linux epoll機制詳解的全部內容,希望對大家有所幫助。感興趣的朋友可以繼續參閱本站其他相關專題,如有不足之處,歡迎留言指出。感謝朋友們對本站的支持!
原文鏈接:http://blog.csdn.net/u010657219/article/details/44061629