一:twisted中的adbapi
? 數據庫pymysql的commit()和execute()在提交數據時,都是同步提交至數據庫,由于scrapy框架數據的解析和異步多線程的,所以scrapy的數據解析速度,要遠高于數據的寫入數據庫的速度。如果數據寫入過慢,會造成數據庫寫入的阻塞,影響數據庫寫入的效率。
使用twisted異步IO框架,實現數據的異步寫入,通過多線程異步的形式對數據進行寫入,可以提高數據的寫入速度。
1.1 兩個主要方法
adbapi.ConnectionPool:
創建一個數據庫連接池對象,其中包括多個連接對象,每個連接對象在獨立的線程中工作。adbapi只是提供了異步訪問數據庫的編程框架,再其內部依然使MySQLdb這樣的庫訪問數據庫。
dbpool.runInteraction(do_insert,item):
異步調用do_insert函數,dbpool會選擇連接池中的一個連接對象在獨立線程中調用insert_db,其中參數item會被傳給do_insert的第二個參數,傳給do_insert的第一個參數是一個Transaction對象,其接口與Cursor對象類似,可以調用execute方法執行SQL語句,do_insert執行后,連接對象會自動調用commit方法
1.2 使用實例
1
|
from twisted.enterprise import adbapi |
1
2
3
4
|
# 初始化數據庫連接池(線程池) # 參數一:mysql的驅動 # 參數二:連接mysql的配置信息 dbpool = adbapi.ConnectionPool( 'pymysql' , * * params) |
1
2
3
|
# 參數1:在異步任務中要執行的函數insert_db; # 參數2:給該函數insert_db傳遞的參數 query = self .dbpool.runInteraction( self .do_insert, item) |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
# 在execute()之后,不需要再進行commit(),連接池內部會進行提交的操作。 def do_insert( self , cursor, item): insert_sql = """ insert into qa_sample( need_id, need_question_uptime, need_title, need_title_describe, need_answer_uptime, need_answer) values (%s, %s, %s, %s, %s, %s) """ params = (item[ 'need_id' ], item[ 'need_question_uptime' ], item[ 'need_title' ], item[ 'need_title_describe' ], item[ 'need_answer_uptime' ], item[ 'need_answer' ]) cursor.execute(insert_sql, params) |
二:結合scrapy中的pipelines
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
# -*- coding: utf-8 -*- from twisted.enterprise import adbapi import pymysql # Define your item pipelines here # # Don't forget to add your pipeline to the ITEM_PIPELINES setting # See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html class QaSpiderPipeline( object ): def process_item( self , item, spider): return item class MysqlTwistedPipeline( object ): def __init__( self , dbpool): self .dbpool = dbpool @classmethod def from_settings( cls , settings): dbparams = dict ( host = settings[ 'MYSQL_HOST' ], db = settings[ 'MYSQL_DBNAME' ], user = settings[ 'MYSQL_USER' ], passwd = settings[ 'MYSQL_PASSWORD' ], charset = 'utf8' , cursorclass = pymysql.cursors.DictCursor, use_unicode = True ) dbpool = adbapi.ConnectionPool( 'pymysql' , * * dbparams) return cls (dbpool) def process_item( self , item, spider): query = self .dbpool.runInteraction( self .do_insert, item) def do_insert( self , cursor, item): insert_sql = """ insert into qa_sample( need_id, need_question_uptime, need_title, need_title_describe, need_answer_uptime, need_answer) values (%s, %s, %s, %s, %s, %s) """ params = (item[ 'need_id' ], item[ 'need_question_uptime' ], item[ 'need_title' ], item[ 'need_title_describe' ], item[ 'need_answer_uptime' ], item[ 'need_answer' ]) cursor.execute(insert_sql, params) |
到此這篇關于Python中Scrapy+adbapi提高數據庫寫入效率實現的文章就介紹到這了,更多相關Scrapy+adbapi數據庫寫入內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!
原文鏈接:https://juejin.cn/post/7021024993500725256