1. select模塊
針對select,要先理解其他幾個概念:
文件描述符:
文件描述符在形式上是一個非負整數。實際上,它是一個索引值,指向內核為每一個進程所維護的該進程打開文件的記錄表。當程序打開一個現有文件或者創建一個新文件時,內核向進程返回一個文件描述符。
內核空間:
Linux簡化了分段機制,使得虛擬地址與線性地址總是一致,因此,Linux的虛擬地址空間也為0~4G。Linux內核將這4G字節的空間分為兩部分。將最高的1G字節(從虛擬地址0xC0000000到0xFFFFFFFF),供內核使用,稱為“內核空間”。而將較低的3G字節(從虛擬地址 0x00000000到0xBFFFFFFF),供各個進程使用,稱為“用戶空間)。因為每個進程可以通過系統調用進入內核,因此,Linux內核由系統內的所有進程共享。于是,從具體進程的角度來看,每個進程可以擁有4G字節的虛擬空間。
內核空間中存放的是內核代碼和數據,而進程的用戶空間中存放的是用戶程序的代碼和數據。不管是內核空間還是用戶空間,它們都處于虛擬空間中。
內核空間和用戶空間一般通過系統調用進行通信。
select就是針對許多文件描述符(簡稱fd)進行監控,它有三個參數:
- rlist -- wait until ready for reading
- wlist -- wait until ready for writing
- xlist -- wait for an "exceptional condition"
第一個參數監控 進來的 數據的fd列表,select監控這個列表,等待這些fd發送過來數據,一旦數據發送過來了(可以讀取了),就返回一個可讀的fd列表
第二個參數監控 出去的 數據的fd列表,select監控這個列表,等待這些fd發送出去數據,一旦fd準備好發送了(可以寫入了),就返回一個可寫的fd列表
第三個參數監控fd列表,返回出異常的fd列表
服務端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
import select import socket import sys import queue # 生成socket對象 server = socket.socket() # 設置非阻塞模式 server.setblocking( False ) # 綁定地址,設置監聽 server.bind(( 'localhost' , 9999 )) server.listen( 5 ) # 將自己也放進待監測列表里 inputs = [server, ] outputs = [] message_queues = {} while True : ''' 關于socket可讀可寫的判斷,可以參考博客:https://blog.csdn.net/majianfei1023/article/details/45788591 ''' rlist, wlist, elist = select.select(inputs,outputs,inputs) #如果沒有任何fd就緒,那程序就會一直阻塞在這里 for r in rlist: # 遍歷已經可以準備讀取數據的 fd if r is server: # 如果這個 fd 是server,即 server 有數據待接收讀取,說明有新的客戶端連接過來了 conn, client_addr = r.accept() print ( "new connection from" ,client_addr) conn.setblocking( False ) inputs.append(conn) # 將這個新的客戶端連接添加到檢測的列表中 message_queues[conn] = queue.Queue() # 用隊列存儲客戶端發送來的數據,等待服務器統一返回數據 else : # 這個可讀的 r 不是服務器,那就是某個客戶端。就是說客戶端發送數據過來了,這些數據處于待讀取狀態 try : # 異常處理,這是為了防止客戶端異常斷開報錯(比如手動關掉客戶端黑窗口,服務器也會跟著報錯退出) data = r.recv( 1024 ) if data: # 根據判斷data是否為空,判斷客戶端是否斷開 print ( "收到來自[%s]的數據:" % r.getpeername()[ 0 ], data) message_queues[r].put(data) # 收到的數據先放到queue里,一會返回給客戶端 if r not in outputs: outputs.append(r) # 放進可寫的fd列表中,表明這些 fd 已經準備好去發送數據了。 else : # 如果數據為空,表明客戶端斷開了 print ( '客戶端斷開了' ) if r in outputs: outputs.remove(r) # 清理已斷開的連接 inputs.remove(r) # 清理已斷開的連接 del message_queues[r] # 清理已斷開的連接 except ConnectionResetError: # 如果報錯,說明客戶端斷開了 print ( "客戶端異常斷開了" , r) if r in outputs: outputs.remove(r) # 清理已斷開的連接 inputs.remove(r) # 清理已斷開的連接 del message_queues[r] # 清理已斷開的連接 for w in wlist: # 遍歷可寫的 fd 列表,即準備好發送數據的那些fd # 判斷隊列是否為空 try : next_msg = message_queues[w].get_nowait() except queue.Empty: # print("client [%s]" % w.getpeername()[0], "queue is empty..") outputs.remove(w) # 隊列不為空,就把隊列中的數據改成大寫,原樣發回去 else : # print("sending msg to [%s]"% w.getpeername()[0], next_msg) w.send(next_msg.upper()) for e in elist: # 處理報錯的 fd e.close() print ( "Error occured in " ,e.getpeername()) inputs.remove(e) if e in outputs: outputs.remove(e) del message_queues[e] |
客戶端:
1
2
3
4
5
6
7
8
9
10
11
12
|
import socket import sys sock = socket.socket() sock.connect(( 'localhost' , 9999 )) while True : c = input ( '>>>:' ).strip() sock.send(c.encode()) data = sock.recv( 1024 ) print (data.decode()) sock.close() |
2. selectors模塊
官方文檔:https://docs.python.org/3/library/selectors.html
服務端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
import selectors import socket # 根據平臺自動選擇最佳的IO多路機制,比如linux就會選擇epoll,windows會選擇select sel = selectors.DefaultSelector() def accept(sock, mask): # 建立客戶端連接 conn, addr = sock.accept() print ( 'accepted' , conn, 'from' , addr) # 設置非阻塞模式 conn.setblocking( False ) # 再次注冊一個連接,將其加入監測列表中, sel.register(conn, selectors.EVENT_READ, read) def read(conn, mask): try : # 拋出客戶端強制關閉的異常(如手動關閉客戶端黑窗口) data = conn.recv( 1000 ) # Should be ready if data: print ( 'echoing' , repr (data), 'to' , conn) conn.send(data) # Hope it won't block else : print ( 'Client closed.' , conn) # 將conn從監測列表刪除 sel.unregister(conn) conn.close() except ConnectionResetError: print ( 'Client forcibly closed.' , conn) # 將conn從監測列表刪除 sel.unregister(conn) conn.close() # 創建socket對象 sock = socket.socket() # 綁定端口,設置監聽 sock.bind(( 'localhost' , 1234 )) sock.listen( 100 ) # 設置為非阻塞模式 sock.setblocking( False ) # 注冊一個文件對象,監測它的IO事件,data是和文件對象相關的數據(此處放置了一個 accept 函數的內存地址) # register(fileobj, events, data=None) sel.register(sock, selectors.EVENT_READ, accept) while True : ''' sel.select() 看似是select方法,實際上會根據平臺自動選擇使用select還是epoll 它返回一個(key, events)元組, key是一個namedtuple類型的元組,可以使用 key.name 獲取元組的數據 key 的內容(fileobj,fd,events,data): fileobj 已經注冊的文件對象 fd 也就是第一個參數的那個文件對象的更底層的文件描述符 events 等待的IO事件 data 可選項。可以存一些和fileobj有關的數據,如 sessioin 的 id ''' events = sel.select() # 監測有無活動對象,沒有就阻塞在這里等待 for key, mask in events: # 有活動對象了 callback = key.data # key.data 是注冊時傳遞的 accept 函數 callback(key.fileobj, mask) # key.fileobj 就是傳遞的 socket 對象 |
客戶端:
1
2
3
4
5
6
7
8
|
import socket tin = socket.socket() tin.connect(( 'localhost' , 1234 )) while True : inp = input ( '>>>>' ) tin.send(inp.encode( 'utf8' )) data = tin.recv( 1024 ) print (data.decode( 'utf8' )) |
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:https://www.cnblogs.com/wztshine/p/12091062.html