在這篇文章中,我們將分析一個網絡爬蟲。
網絡爬蟲是一個掃描網絡內容并記錄其有用信息的工具。它能打開一大堆網頁,分析每個頁面的內容以便尋找所有感興趣的數據,并將這些數據存儲在一個數據庫中,然后對其他網頁進行同樣的操作。
如果爬蟲正在分析的網頁中有一些鏈接,那么爬蟲將會根據這些鏈接分析更多的頁面。
搜索引擎就是基于這樣的原理實現的。
這篇文章中,我特別選了一個穩定的、”年輕”的開源項目pyspider,它是由 binux 編碼實現的。
注:據認為pyspider持續監控網絡,它假定網頁在一段時間后會發生變化,因此一段時間后它將會重新訪問相同的網頁。
概述
爬蟲pyspider主要由四個組件組成。包括調度程序(scheduler),抓取程序(fetcher),內容處理程序(processor)以及一個監控組件。
調度程序接受任務并決定該做什么。這里有幾種可能性,它可以丟棄一個任務(可能這個特定的網頁剛剛被抓取過了),或者給任務分配不同的優先級。
當各個任務的優先級確定之后,它們被傳入抓取程序。它重新抓取網頁。這個過程很復雜,但邏輯上比較簡單。
當網絡上的資源被抓取下來,內容處理程序就負責抽取有用的信息。它運行一個用戶編寫的Python腳本,這個腳本并不像沙盒一樣被隔離。它的職責還包括捕獲異常或日志,并適當地管理它們。
最后,爬蟲pyspider中有一個監控組件。
爬蟲pyspider提供一個異常強大的網頁界面(web ui),它允許你編輯和調試你的腳本,管理整個抓取過程,監控正在進行的任務,并最終輸出結果。
項目和任務
在pyspider中,我們有項目和任務的概念。
一個任務指的是一個需要從網站檢索并進行分析的單獨頁面。
一個項目指的是一個更大的實體,它包括爬蟲涉及到的所有頁面,分析網頁所需要的python腳本,以及用于存儲數據的數據庫等等。
在pyspider中我們可以同時運行多個項目。
代碼結構分析
根目錄
在根目錄中可以找到的文件夾有:
- data,空文件夾,它是存放由爬蟲所生成的數據的地方。
- docs,包含該項目文檔,里邊有一些markdown代碼。
- pyspider,包含項目實際的代碼。
- test,包含相當多的測試代碼。
- 這里我將重點介紹一些重要的文件:
- .travis.yml,一個很棒的、連續性測試的整合。你如何確定你的項目確實有效?畢竟僅在你自己的帶有固定版本的庫的機器上進行測試是不夠的。
- Dockerfile,同樣很棒的工具!如果我想在我的機器上嘗試一個項目,我只需要運行Docker,我不需要手動安裝任何東西,這是一個使開發者參與到你的項目中的很好的方式。
- LICENSE,對于任何開源項目都是必需的,(如果你自己有開源項目的話)不要忘記自己項目中的該文件。
- requirements.txt,在Python世界中,該文件用于指明為了運行該軟件,需要在你的系統中安裝什么Python包,在任何的Python項目中該文件都是必須的。
- run.py,該軟件的主入口點。
- setup.py,該文件是一個Python腳本,用于在你的系統中安裝pyspider項目。
已經分析完項目的根目錄了,僅根目錄就能說明該項目是以一種非常專業的方式進行開發的。如果你正在開發任何的開源程序,希望你能達到這樣的水準。
文件夾pyspider
讓我們更深入一點兒,一起來分析實際的代碼。
在這個文件夾中還能找到其他的文件夾,整個軟件背后的邏輯已經被分割,以便更容易的進行管理和擴展。
這些文件夾是:database、fetcher、libs、processor、result、scheduler、webui。
在這個文件夾中我們也能找到整個項目的主入口點,run.py。
文件run.py
這個文件首先完成所有必需的雜事,以保證爬蟲成功地運行。最終它產生所有必需的計算單元。向下滾動我們可以看到整個項目的入口點,cli()。
函數cli()
這個函數好像很復雜,但與我相隨,你會發現它并沒有你想象中復雜。函數cli()的主要目的是創建數據庫和消息系統的所有連接。它主要解析命令行參數,并利用所有我們需要的東西創建一個大字典。最后,我們通過調用函數all()開始真正的工作。
函數all()
一個網絡爬蟲會進行大量的IO操作,因此一個好的想法是產生不同的線程或子進程來管理所有的這些工作。通過這種方式,你可以在等待網絡獲取你當前html頁面的同時,提取前一個頁面的有用信息。
函數all()決定是否運行子進程或者線程,然后調用不同的線程或子進程里的所有的必要函數。這時pyspider將產生包括webui在內的,爬蟲的所有邏輯模塊所需要的,足夠數量的線程。當我們完成項目并關閉webui時,我們將干凈漂亮地關閉每一個進程。
現在我們的爬蟲就開始運行了,讓我們進行更深入一點兒的探索。
調度程序
調度程序從兩個不同的隊列中獲取任務(newtask_queue和status_queue),并把任務加入到另外一個隊列(out_queue),這個隊列稍后會被抓取程序讀取。
調度程序做的第一件事情是從數據庫中加載所需要完成的所有的任務。之后,它開始一個無限循環。在這個循環中會調用幾個方法:
1._update_projects():嘗試更新的各種設置,例如,我們想在爬蟲工作的時候調整爬取速度。
2._check_task_done():分析已完成的任務并將其保存到數據庫,它從status_queue中獲取任務。
3._check_request():如果內容處理程序要求分析更多的頁面,把這些頁面放在隊列newtask_queue中,該函數會從該隊列中獲得新的任務。
4._check_select():把新的網頁加入到抓取程序的隊列中。
5._check_delete():刪除已被用戶標記的任務和項目。
6._try_dump_cnt():記錄一個文件中已完成任務的數量。對于防止程序異常所導致的數據丟失,這是有必要的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
def run( self ): while not self ._quit: try : time.sleep( self .LOOP_INTERVAL) self ._update_projects() self ._check_task_done() self ._check_request() while self ._check_cronjob(): pass self ._check_select() self ._check_delete() self ._try_dump_cnt() self ._exceptions = 0 except KeyboardInterrupt: break except Exception as e: logger.exception(e) self ._exceptions + = 1 if self ._exceptions > self .EXCEPTION_LIMIT: break continue |
循環也會檢查運行過程中的異常,或者我們是否要求python停止處理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
finally : # exit components run in subprocess for each in threads: if not each.is_alive(): continue if hasattr (each, 'terminate' ): each.terminate() each.join() |
抓取程序
抓取程序的目的是檢索網絡資源。
pyspider能夠處理普通HTML文本頁面和基于AJAX的頁面。只有抓取程序能意識到這種差異,了解這一點非常重要。我們將僅專注于普通的html文本抓取,然而大部分的想法可以很容易地移植到Ajax抓取器。
這里的想法在某種形式上類似于調度程序,我們有分別用于輸入和輸出的兩個隊列,以及一個大的循環。對于輸入隊列中的所有元素,抓取程序生成一個請求,并將結果放入輸出隊列中。
它聽起來簡單但有一個大問題。網絡通常是極其緩慢的,如果因為等待一個網頁而阻止了所有的計算,那么整個過程將會運行的極其緩慢。解決方法非常的簡單,即不要在等待網絡的時候阻塞所有的計算。這個想法即在網絡上發送大量消息,并且相當一部分消息是同時發送的,然后異步等待響應的返回。一旦我們收回一個響應,我們將會調用另外的回調函數,回調函數將會以最適合的方式管理這樣的響應。
爬蟲pyspider中的所有的復雜的異步調度都是由另一個優秀的開源項目
1
|
http: / / www.tornadoweb.org / en / stable / |
完成。
現在我們的腦海里已經有了極好的想法了,讓我們更深入地探索這是如何實現的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
def run( self ): def queue_loop(): if not self .outqueue or not self .inqueue: return while not self ._quit: try : if self .outqueue.full(): break task = self .inqueue.get_nowait() task = utils.decode_unicode_obj(task) self .fetch(task) except queue.Empty: break tornado.ioloop.PeriodicCallback(queue_loop, 100 , io_loop = self .ioloop).start() self ._running = True self .ioloop.start() <strong> |
函數run()</strong>
函數run()是抓取程序fetcher中的一個大的循環程序。
函數run()中定義了另外一個函數queue_loop(),該函數接收輸入隊列中的所有任務,并抓取它們。同時該函數也監聽中斷信號。函數queue_loop()作為參數傳遞給tornado的類PeriodicCallback,如你所猜,PeriodicCallback會每隔一段具體的時間調用一次queue_loop()函數。函數queue_loop()也會調用另一個能使我們更接近于實際檢索Web資源操作的函數:fetch()。
函數fetch(self, task, callback=None)
網絡上的資源必須使用函數phantomjs_fetch()或簡單的http_fetch()函數檢索,函數fetch()只決定檢索該資源的正確方法是什么。接下來我們看一下函數http_fetch()。
函數http_fetch(self, url, task, callback)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
def http_fetch( self , url, task, callback): '''HTTP fetcher''' fetch = copy.deepcopy( self .default_options) fetch[ 'url' ] = url fetch[ 'headers' ][ 'User-Agent' ] = self .user_agent def handle_response(response): ... return task, result try : request = tornado.httpclient.HTTPRequest(header_callback = header_callback, * * fetch) if self .async: self .http_client.fetch(request, handle_response) else : return handle_response( self .http_client.fetch(request)) |
終于,這里才是完成真正工作的地方。這個函數的代碼有點長,但有清晰的結構,容易閱讀。
在函數的開始部分,它設置了抓取請求的header,比如User-Agent、超時timeout等等。然后定義一個處理響應response的函數:handle_response(),后邊我們會分析這個函數。最后我們得到一個tornado的請求對象request,并發送這個請求對象。請注意在異步和非異步的情況下,是如何使用相同的函數來處理響應response的。
讓我們往回看一下,分析一下函數handle_response()做了什么。
函數handle_response(response)
1
2
3
4
5
6
|
def handle_response(response): result = {} result[ 'orig_url' ] = url result[ 'content' ] = response.body or '' callback( 'http' , task, result) return task, result |
這個函數以字典的形式保存一個response的所有相關信息,例如url,狀態碼和實際響應等,然后調用回調函數。這里的回調函數是一個小方法:send_result()。
函數send_result(self, type, task, result)
1
2
3
|
def send_result( self , type , task, result): if self .outqueue: self .outqueue.put((task, result)) |
這個最后的函數將結果放入到輸出隊列中,等待內容處理程序processor的讀取。
內容處理程序processor
內容處理程序的目的是分析已經抓取回來的頁面。它的過程同樣也是一個大循環,但輸出中有三個隊列(status_queue, newtask_queue 以及result_queue)而輸入中只有一個隊列(inqueue)。
讓我們稍微深入地分析一下函數run()中的循環過程。
函數run(self)
1
2
3
4
5
6
7
8
9
10
11
12
|
def run( self ): try : task, response = self .inqueue.get(timeout = 1 ) self .on_task(task, response) self ._exceptions = 0 except KeyboardInterrupt: break except Exception as e: self ._exceptions + = 1 if self ._exceptions > self .EXCEPTION_LIMIT: break continue |
這個函數的代碼比較少,易于理解,它簡單地從隊列中得到需要被分析的下一個任務,并利用on_task(task, response)函數對其進行分析。這個循環監聽中斷信號,只要我們給Python發送這樣的信號,這個循環就會終止。最后這個循環統計它引發的異常的數量,異常數量過多會終止這個循環。
函數on_task(self, task, response)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
def on_task( self , task, response): response = rebuild_response(response) project = task[ 'project' ] project_data = self .project_manager.get(project, updatetime) ret = project_data[ 'instance' ].run( status_pack = { 'taskid' : task[ 'taskid' ], 'project' : task[ 'project' ], 'url' : task.get( 'url' ), ... } self .status_queue.put(utils.unicode_obj(status_pack)) if ret.follows: self .newtask_queue.put( [utils.unicode_obj(newtask) for newtask in ret.follows]) for project, msg, url in ret.messages: self .inqueue.put(({...},{...})) return True |
函數on_task()是真正干活的方法。
它嘗試利用輸入的任務找到任務所屬的項目。然后它運行項目中的定制腳本。最后它分析定制腳本返回的響應response。如果一切順利,將會創建一個包含所有我們從網頁上得到的信息的字典。最后將字典放到隊列status_queue中,稍后它會被調度程序重新使用。
如果在分析的頁面中有一些新的鏈接需要處理,新鏈接會被放入到隊列newtask_queue中,并在稍后被調度程序使用。
現在,如果有需要的話,pyspider會將結果發送給其他項目。
最后如果發生了一些錯誤,像頁面返回錯誤,錯誤信息會被添加到日志中。
結束!