本文介紹了詳解Python實(shí)現(xiàn)多進(jìn)程異步事件驅(qū)動(dòng)引擎,分享給大家,具體如下:
多進(jìn)程異步事件驅(qū)動(dòng)邏輯
邏輯
code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
|
# -*- coding: utf-8 -*- ''' author: Jimmy contact: [email protected] file: eventEngine.py time: 2017/8/25 上午10:06 description: 多進(jìn)程異步事件驅(qū)動(dòng)引擎 ''' __author__ = 'Jimmy' from multiprocessing import Process, Queue class EventEngine( object ): # 初始化事件事件驅(qū)動(dòng)引擎 def __init__( self ): #保存事件列表 self .__eventQueue = Queue() #引擎開關(guān) self .__active = False #事件處理字典{'event1': [handler1,handler2] , 'event2':[handler3, ...,handler4]} self .__handlers = {} #保存事件處理進(jìn)程池 self .__processPool = [] #事件引擎主進(jìn)程 self .__mainProcess = Process(target = self .__run) #執(zhí)行事件循環(huán) def __run( self ): while self .__active: #事件隊(duì)列非空 if not self .__eventQueue.empty(): #獲取隊(duì)列中的事件 超時(shí)1秒 event = self .__eventQueue.get(block = True ,timeout = 1 ) #執(zhí)行事件 self .__process(event) else : # print('無任何事件') pass #執(zhí)行事件 def __process( self , event): if event. type in self .__handlers: for handler in self .__handlers[event. type ]: #開一個(gè)進(jìn)程去異步處理 p = Process(target = handler, args = (event, )) #保存到進(jìn)程池 self .__processPool.append(p) p.start() #開啟事件引擎 def start( self ): self .__active = True self .__mainProcess.start() #暫停事件引擎 def stop( self ): """停止""" # 將事件管理器設(shè)為停止 self .__active = False # 等待事件處理進(jìn)程退出 for p in self .__processPool: p.join() self .__mainProcess.join() #終止事件引擎 def terminate( self ): self .__active = False #終止所有事件處理進(jìn)程 for p in self .__processPool: p.terminate() self .__mainProcess.join() #注冊(cè)事件 def register( self , type , handler): """注冊(cè)事件處理函數(shù)監(jiān)聽""" # 嘗試獲取該事件類型對(duì)應(yīng)的處理函數(shù)列表,若無則創(chuàng)建 try : handlerList = self .__handlers[ type ] except KeyError: handlerList = [] self .__handlers[ type ] = handlerList # 若要注冊(cè)的處理器不在該事件的處理器列表中,則注冊(cè)該事件 if handler not in handlerList: handlerList.append(handler) def unregister( self , type , handler): """注銷事件處理函數(shù)監(jiān)聽""" # 嘗試獲取該事件類型對(duì)應(yīng)的處理函數(shù)列表,若無則忽略該次注銷請(qǐng)求 try : handlerList = self .__handlers[ type ] # 如果該函數(shù)存在于列表中,則移除 if handler in handlerList: handlerList.remove(handler) # 如果函數(shù)列表為空,則從引擎中移除該事件類型 if not handlerList: del self .__handlers[ type ] except KeyError: pass def sendEvent( self , event): #發(fā)送事件 像隊(duì)列里存入事件 self .__eventQueue.put(event) class Event( object ): #事件對(duì)象 def __init__( self , type = None ): self . type = type self . dict = {} #測(cè)試 if __name__ = = '__main__' : import time EVENT_ARTICAL = "Event_Artical" # 事件源 公眾號(hào) class PublicAccounts: def __init__( self , eventManager): self .__eventManager = eventManager def writeNewArtical( self ): # 事件對(duì)象,寫了新文章 event = Event(EVENT_ARTICAL) event. dict [ "artical" ] = u '如何寫出更優(yōu)雅的代碼\n' # 發(fā)送事件 self .__eventManager.sendEvent(event) print (u '公眾號(hào)發(fā)送新文章\n' ) # 監(jiān)聽器 訂閱者 class ListenerTypeOne: def __init__( self , username): self .__username = username # 監(jiān)聽器的處理函數(shù) 讀文章 def ReadArtical( self , event): print (u '%s 收到新文章' % self .__username) print (u '%s 正在閱讀新文章內(nèi)容:%s' % ( self .__username, event. dict [ "artical" ])) class ListenerTypeTwo: def __init__( self , username): self .__username = username # 監(jiān)聽器的處理函數(shù) 讀文章 def ReadArtical( self , event): print (u '%s 收到新文章 睡3秒再看' % self .__username) time.sleep( 3 ) print (u '%s 正在閱讀新文章內(nèi)容:%s' % ( self .__username, event. dict [ "artical" ])) def test(): listner1 = ListenerTypeOne( "thinkroom" ) # 訂閱者1 listner2 = ListenerTypeTwo( "steve" ) # 訂閱者2 ee = EventEngine() # 綁定事件和監(jiān)聽器響應(yīng)函數(shù)(新文章) ee.register(EVENT_ARTICAL, listner1.ReadArtical) ee.register(EVENT_ARTICAL, listner2.ReadArtical) for i in range ( 0 , 20 ): listner3 = ListenerTypeOne( "Jimmy" ) # 訂閱者X ee.register(EVENT_ARTICAL, listner3.ReadArtical) ee.start() #發(fā)送事件 publicAcc = PublicAccounts(ee) publicAcc.writeNewArtical() test() |
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持服務(wù)器之家。
原文鏈接:http://www.jianshu.com/p/5e7786166157?utm_source=tuicool&utm_medium=referral