一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

Mysql|Sql Server|Oracle|Redis|MongoDB|PostgreSQL|Sqlite|DB2|mariadb|Access|數據庫技術|

服務器之家 - 數據庫 - Sqlite - 在SQLite中插入10億條Python VS Rust

在SQLite中插入10億條Python VS Rust

2021-09-07 23:48蟲蟲搜奇 Sqlite

寫腳本來進行數據處理,比如說給數據庫導入導出數據,這種任務一般來說最方便的方法是用python腳本,但是如果數據量比較大時候(比如上億條)時候Python就會超級慢,看到無法忍受。

在實際生活中,市場有這樣的案例:寫腳本來進行數據處理,比如說給數據庫導入導出數據,這種任務一般來說最方便的方法是用python腳本,但是如果數據量比較大時候(比如上億條)時候Python就會超級慢,看到無法忍受。在這種案例時候該怎么做呢,有一個外國老哥分享了自己的實踐經歷,并且對比了Python和Rust語言給SQLite插入十一條數據的情況,最后用Rust實現了在一分鐘來完成任務。我們在此分享一下該實踐過程,希望能對大家有所啟迪,大家也可以嘗試自己最拿手方法來實現該例子,并對比一下具體性能。

在SQLite中插入10億條Python VS Rust

概述

案例中的任務是SQLite數據庫插入10億條的數據。表(user)數據結構和約束如下:

  1. create table IF NOT EXISTS user 
  2. id INTEGER not null primary key, 
  3. area CHAR(6), 
  4. age INTEGER not null, 
  5. active INTEGER not null 
  6. ); 

隨機生成數據。其中are列為六位數的區號(任何六位數字)。age將是5、10 或15中的一個數字。Active為0或1。

  • 實驗環境硬件配置為:MacBook Pro,2019(2.4 GHz 四核i5,8GB內存,256GB SSD硬盤,Big Sur 11.1)。
  • 任務前提:任務無需保持程序穩健性,如果進程崩潰并且所有數據都丟失了也沒關系。可以再次運行腳本。
  • 需要充分利用我的機器資源:100% CPU、8GB 內存和千兆字節的SSD空間。

無需使用真正的隨機方法,stdlib偽隨機方法即可。

Python

首先是原始版本的Python方法。Python標準庫提供了一個SQLite模塊,首先使用它編寫了第一個版本。代碼如下:

  1. import sqlite3 
  2. from commons import get_random_age, get_random_active, get_random_bool, get_random_area_code, create_table 
  3. DB_NAME = "naive.db" 
  4. def faker(con: sqlite3.Connection, count=100_000): 
  5. for _ in range(count): 
  6. age = get_random_age() 
  7. active = get_random_active() 
  8. # switch for area code 
  9. if get_random_bool(): 
  10. # random 6 digit number 
  11. area = get_random_area_code() 
  12. con.execute('INSERT INTO user VALUES (NULL,?,?,?)', (area, age, active)) 
  13. else: 
  14. con.execute('INSERT INTO user VALUES (NULL,NULL,?,?)', (age, active)) 
  15. con.commit() 
  16. def main(): 
  17. con = sqlite3.connect(DB_NAME, isolation_level=None
  18. con.execute('PRAGMA journal_mode = WAL;') 
  19. create_table(con) 
  20. faker(con, count=10_000_000
  21. if __name__ == '__main__': 
  22. main() 

在SQLite中插入10億條Python VS Rust

在該腳本中,通for循環中一一插入1000萬條數據。執行花了將近15分鐘。基于此進行優化迭代,提高性能。

SQLite中,每次插入都是原子性的并且為一個事務。每個事務都需要保證寫入磁盤(涉及IO操作),因此可能會很慢。為了優化,可以嘗試通過不同大小的批量插入,對比發現,100000是最佳選擇。通過這個簡單的更改,運行時間減少到了10分鐘,優化了3分之一,但是仍然非常耗時。優化后,批量插入版本源碼:

在SQLite中插入10億條Python VS Rust

SQLite庫優化

除了在代碼層優化外,如果對于單純的數據寫入,對數據庫本身搞的優化也是非常重要的。對于SQLite優化,可以做如下配置:

  1. PRAGMA journal_mode = OFF
  2. PRAGMA synchronous = 0
  3. PRAGMA cache_size = 1000000
  4. PRAGMA locking_mode = EXCLUSIVE
  5. PRAGMA temp_store = MEMORY

具體解釋:

首先,journal_mode設置為OFF,將會關閉回滾日志,禁用 SQLite 的原子提交和回滾功能,這樣在事務失敗情況下,無法恢復,基于例子實例穩健性要求可以設置,但是嚴禁在生產環境中使用。

其次,關閉synchronous,SQLite可以不再校驗磁盤寫入的數據可靠性。寫入SQLite可能并不意味著它已刷新到磁盤。同樣,嚴禁在生產環境中啟用。

cache_size用戶指定SQLite允許在內存中保留多少內存頁。不要在生產中分配太高的的數值。

使用在EXCLUSIVE鎖定模式,SQLite連接持有的鎖永遠不會被釋放。

設置temp_store到MEMOR將使其表現得像一個內存數據庫。

優化性能

對上面的兩個腳本,添加 SQLite優化參數,然后重新運行:

  1. def main():     
  2. con = sqlite3.connect(DB_NAME, isolation_level=None)     
  3. con.execute('PRAGMA journal_mode = OFF;')     
  4. con.execute('PRAGMA synchronous = 0;')     
  5. con.execute('PRAGMA cache_size = 1000000;') # give it a GB     
  6. con.execute('PRAGMA locking_mode = EXCLUSIVE;')     
  7. con.execute('PRAGMA temp_store = MEMORY;')     
  8. create_table(con)     

faker(con, count=100_000_000)

優化后版本,原始版本,插入1億行數據,大概花了10分鐘;對比批量插入版本大概花了8.5分鐘。

pypy版本

對比CPython PyPy在數據處理中可以提高性能,據說可以提高4倍以上的性能。本實驗中也嘗試編譯PyPy解釋器,運行腳本(代碼無需修改)。

使用pypy解釋器,批處理版本,插入1億行數據只需2.5分鐘。性能大概是Cpython的3.5倍,可見傳說的4倍性能提高確實是真的,誠不我欺也!。同時,為了測試在純循環插入中消耗的時間,在腳本中刪除SQL指令并運行:

在SQLite中插入10億條Python VS Rust

以上腳本在CPython中耗時5.5分鐘 。PyPy執行耗時1.5分鐘(同樣提高了3.5倍)。

Rust

在完成Python各種優化折騰。又嘗試了Rust版本的插入,對比也有個原始版本和批量插入版本。原始版本,也是每行插入:

  1. use rusqlite::{params, Connection}; 
  2. mod common; 
  3. fn faker(mut conn: Connection, count: i64) { 
  4. let tx = conn.transaction().unwrap(); 
  5. for _ in 0..count { 
  6. let with_area = common::get_random_bool(); 
  7. let age = common::get_random_age(); 
  8. let is_active = common::get_random_active(); 
  9. if with_area { 
  10. let area_code = common::get_random_area_code(); 
  11. tx.execute( 
  12. "INSERT INTO user VALUES (NULL, ?, ?, ?)", 
  13. params![area_code, age, is_active], 
  14. .unwrap(); 
  15. } else { 
  16. tx.execute( 
  17. "INSERT INTO user VALUES (NULL, NULL, ?, ?)", 
  18. params![age, is_active], 
  19. .unwrap(); 
  20. tx.commit().unwrap(); 
  21. fn main() { 
  22. let conn = Connection::open("basic.db").unwrap(); 
  23. conn.execute_batch( 
  24. "PRAGMA journal_mode = OFF
  25. PRAGMA synchronous = 0
  26. PRAGMA cache_size = 1000000
  27. PRAGMA locking_mode = EXCLUSIVE
  28. PRAGMA temp_store = MEMORY;", 
  29. .expect("PRAGMA"); 
  30. conn.execute( 
  31. "CREATE TABLE IF NOT EXISTS user ( 
  32. id INTEGER not null primary key, 
  33. area CHAR(6), 
  34. age INTEGER not null, 
  35. active INTEGER not null)", 
  36. [], 
  37. .unwrap(); 
  38. faker(conn, 100_000_000) 

該版執行,大概用時3分鐘。然后我做了進一步的實驗:

將rusqlite,換成sqlx異步運行。

  1. use std::str::FromStr;     
  2.  
  3. use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqliteSynchronous};     
  4. use sqlx::{ConnectOptions, Connection, Executor, SqliteConnection, Statement};     
  5.  
  6. mod common;     
  7.  
  8. async fn faker(mut conn: SqliteConnection, count: i64) -> Result<(), sqlx::Error> {     
  9. let mut tx = conn.begin().await?;     
  10. let stmt_with_area = tx     
  11. .prepare("INSERT INTO user VALUES (NULL, ?, ?, ?)")     
  12. .await?;     
  13. let stmt = tx     
  14. .prepare("INSERT INTO user VALUES (NULL, NULL, ?, ?)")     
  15. .await?;     
  16. for _ in 0..count {     
  17. let with_area = common::get_random_bool();     
  18. let age = common::get_random_age();     
  19. let is_active = common::get_random_active();     
  20. if with_area {     
  21. let area_code = common::get_random_area_code();     
  22. stmt_with_area     
  23. .query()     
  24. .bind(area_code)     
  25. .bind(age)     
  26. .bind(is_active)     
  27. .execute(&mut tx)     
  28. .await?;     
  29. } else {     
  30. stmt.query()     
  31. .bind(age)     
  32. .bind(is_active)     
  33. .execute(&mut tx)     
  34. .await?;     
  35. }     
  36. }     
  37. tx.commit().await?;     
  38. Ok(())     
  39. }     
  40.  
  41. #[tokio::main]     
  42. async fn main() -> Result<(), sqlx::Error> {     
  43. let mut conn = SqliteConnectOptions::from_str("basic_async.db")     
  44. .unwrap()     
  45. .create_if_missing(true)     
  46. .journal_mode(SqliteJournalMode::Off)     
  47. .synchronous(SqliteSynchronous::Off)     
  48. .connect()     
  49. .await?;     
  50. conn.execute("PRAGMA cache_size = 1000000;").await?;     
  51. conn.execute("PRAGMA locking_mode = EXCLUSIVE;").await?;     
  52. conn.execute("PRAGMA temp_store = MEMORY;").await?;     
  53. conn.execute(     
  54. "CREATE TABLE IF NOT EXISTS user (     
  55. id INTEGER not null primary key,     
  56. area CHAR(6),     
  57. age INTEGER not null,     
  58. active INTEGER not null);",     
  59. )     
  60. .await?;     
  61. faker(conn, 100_000_000).await?;     
  62. Ok(())     

這個版本花了大約14分鐘。性能反而下降下降了。比Python版本還要差(原因值得深析)。

對執行的原始SQL語句,切換到準備好的語句并在循環中插入行,但重用了準備好的語句。該版本只用了大約一分鐘。

使用準備好的語句并將它們插入到50行的批次中,插入10億條,耗時34.3 秒。

  1. use rusqlite::{Connection, ToSql, Transaction}; 
  2. mod common; 
  3. fn faker_wrapper(mut conn: Connection, count: i64) { 
  4. let tx = conn.transaction().unwrap(); 
  5. faker(&tx, count); 
  6. tx.commit().unwrap(); 
  7. fn faker(tx: &Transaction, count: i64) { 
  8. // that is, we will batch 50 inserts of rows at once 
  9. let min_batch_size: i64 = 50
  10. if count < min_batch_size { 
  11. panic!("count cant be less than min batch size"); 
  12. // jeez, refactor this! 
  13. let mut with_area_params = " (NULL, ?, ?, ?),".repeat(min_batch_size as usize); 
  14. with_area_params.pop(); 
  15. let with_area_paramswith_area_params = with_area_params.as_str(); 
  16. let mut without_area_params = " (NULL, NULL, ?, ?),".repeat(min_batch_size as usize); 
  17. without_area_params.pop(); 
  18. let without_area_paramswithout_area_params = without_area_params.as_str(); 
  19. let st1 = format!("INSERT INTO user VALUES {}", with_area_params); 
  20. let st2 = format!("INSERT INTO user VALUES {}", without_area_params); 
  21. let mut stmt_with_area = tx.prepare_cached(st1.as_str()).unwrap(); 
  22. let mut stmt = tx.prepare_cached(st2.as_str()).unwrap(); 
  23. for _ in 0..(count / min_batch_size) { 
  24. let with_area = common::get_random_bool(); 
  25. let age = common::get_random_age(); 
  26. let is_active = common::get_random_active(); 
  27. let mut param_values: Vec<_> = Vec::new(); 
  28. if with_area { 
  29. // lets prepare the batch 
  30. let mut vector = Vec::<(String, i8, i8)>::new(); 
  31. for _ in 0..min_batch_size { 
  32. let area_code = common::get_random_area_code(); 
  33. vector.push((area_code, age, is_active)); 
  34. for batch in vector.iter() { 
  35. param_values.push(&batch.0 as &dyn ToSql); 
  36. param_values.push(&batch.1 as &dyn ToSql); 
  37. param_values.push(&batch.2 as &dyn ToSql); 
  38. stmt_with_area.execute(&*param_values).unwrap(); 
  39. } else { 
  40. // lets prepare the batch 
  41. let mut vector = Vec::<(i8, i8)>::new(); 
  42. for _ in 0..min_batch_size { 
  43. vector.push((age, is_active)); 
  44. for batch in vector.iter() { 
  45. param_values.push(&batch.0 as &dyn ToSql); 
  46. param_values.push(&batch.1 as &dyn ToSql); 
  47. stmt.execute(&*param_values).unwrap(); 
  48. fn main() { 
  49. let conn = Connection::open("basic_batched.db").unwrap(); 
  50. conn.execute_batch( 
  51. "PRAGMA journal_mode = OFF
  52. PRAGMA synchronous = 0
  53. PRAGMA cache_size = 1000000
  54. PRAGMA locking_mode = EXCLUSIVE
  55. PRAGMA temp_store = MEMORY;", 
  56. .expect("PRAGMA"); 
  57. conn.execute( 
  58. "CREATE TABLE IF NOT EXISTS user ( 
  59. id INTEGER not null primary key, 
  60. area CHAR(6), 
  61. age INTEGER not null, 
  62. active INTEGER not null)", 
  63. [], 
  64. .unwrap(); 
  65. faker_wrapper(conn, 100_000_000) 
  66. 創建了一個線程版本,其中有一個從通道接收數據的寫入線程和四個將數據推送到管道其他線程。 
  67. use rusqlite::{Connection, ToSql}; 
  68. use std::sync::mpsc; 
  69. use std::sync::mpsc::{Receiver, Sender}; 
  70. use std::thread; 
  71. mod common; 
  72. static MIN_BATCH_SIZE: i64 = 50
  73. enum ParamValues { 
  74. WithArea(Vec<(String, i8, i8)>), 
  75. WithoutArea(Vec<(i8, i8)>), 
  76. fn consumer(rx: Receiver<ParamValues>) { 
  77. let mut conn = Connection::open("threaded_batched.db").unwrap(); 
  78. conn.execute_batch( 
  79. "PRAGMA journal_mode = OFF
  80. PRAGMA synchronous = 0
  81. PRAGMA cache_size = 1000000
  82. PRAGMA locking_mode = EXCLUSIVE
  83. PRAGMA temp_store = MEMORY;", 
  84. .expect("PRAGMA"); 
  85. conn.execute( 
  86. "CREATE TABLE IF NOT EXISTS user ( 
  87. id INTEGER not null primary key, 
  88. area CHAR(6), 
  89. age INTEGER not null, 
  90. active INTEGER not null)", 
  91. [], 
  92. .unwrap(); 
  93. let tx = conn.transaction().unwrap(); 
  94. // jeez, refactor this! 
  95. let mut with_area_params = " (NULL, ?, ?, ?),".repeat(MIN_BATCH_SIZE as usize); 
  96. with_area_params.pop(); 
  97. let with_area_paramswith_area_params = with_area_params.as_str(); 
  98. let mut without_area_params = " (NULL, NULL, ?, ?),".repeat(MIN_BATCH_SIZE as usize); 
  99. without_area_params.pop(); 
  100. let without_area_paramswithout_area_params = without_area_params.as_str(); 
  101. let st1 = format!("INSERT INTO user VALUES {}", with_area_params); 
  102. let st2 = format!("INSERT INTO user VALUES {}", without_area_params); 
  103. let mut stmt_with_area = tx.prepare_cached(st1.as_str()).unwrap(); 
  104. let mut stmt_without_area = tx.prepare_cached(st2.as_str()).unwrap(); 
  105. for param_values in rx { 
  106. let mut row_values: Vec<&dyn ToSql> = Vec::new(); 
  107. match param_values { 
  108. ParamValues::WithArea(values) => { 
  109. for batch in values.iter() { 
  110. row_values.push(&batch.0 as &dyn ToSql); 
  111. row_values.push(&batch.1 as &dyn ToSql); 
  112. row_values.push(&batch.2 as &dyn ToSql); 
  113. stmt_with_area.execute(&*row_values).unwrap(); 
  114. ParamValues::WithoutArea(values) => { 
  115. for batch in values.iter() { 
  116. row_values.push(&batch.0 as &dyn ToSql); 
  117. row_values.push(&batch.1 as &dyn ToSql); 
  118. stmt_without_area.execute(&*row_values).unwrap(); 
  119. tx.commit().unwrap(); 
  120. fn producer(tx: Sender<ParamValues>, count: i64) { 
  121. if count < MIN_BATCH_SIZE { 
  122. panic!("count cant be less than min batch size"); 
  123. for _ in 0..(count / MIN_BATCH_SIZE) { 
  124. let with_area = common::get_random_bool(); 
  125. let age = common::get_random_age(); 
  126. let is_active = common::get_random_active(); 
  127. let mut param_values: Vec<_> = Vec::new(); 
  128. if with_area { 
  129. // lets prepare the batch 
  130. let mut vector = Vec::<(String, i8, i8)>::new(); 
  131. for _ in 0..MIN_BATCH_SIZE { 
  132. let area_code = common::get_random_area_code(); 
  133. vector.push((area_code, age, is_active)); 
  134. for batch in vector.iter() { 
  135. param_values.push(&batch.0 as &dyn ToSql); 
  136. param_values.push(&batch.1 as &dyn ToSql); 
  137. param_values.push(&batch.2 as &dyn ToSql); 
  138. // send the values 
  139. tx.send(ParamValues::WithArea(vector)).unwrap(); 
  140. } else { 
  141. // lets prepare the batch 
  142. let mut vector = Vec::<(i8, i8)>::new(); 
  143. for _ in 0..MIN_BATCH_SIZE { 
  144. vector.push((age, is_active)); 
  145. for batch in vector.iter() { 
  146. param_values.push(&batch.0 as &dyn ToSql); 
  147. param_values.push(&batch.1 as &dyn ToSql); 
  148. // send the values 
  149. tx.send(ParamValues::WithoutArea(vector)).unwrap(); 
  150. fn main() { 
  151. // setup the DB and tables 
  152. let (tx, rx): (Sender<ParamValues>, Receiver<ParamValues>) = mpsc::channel(); 
  153. // lets launch the consumer 
  154. let consumer_handle = thread::spawn(|| consumer(rx)); 
  155. let cpu_count = num_cpus::get(); 
  156. let total_rows = 100_000_000
  157. let each_producer_count = (total_rows / cpu_count) as i64; 
  158. let mut handles = Vec::with_capacity(cpu_count); 
  159. for _ in 0..cpu_count { 
  160. let thread_tx = tx.clone(); 
  161. handles.push(thread::spawn(move || { 
  162. producer(thread_tx, each_producer_count.clone()) 
  163. })) 
  164. for t in handles { 
  165. t.join().unwrap(); 
  166. drop(tx); 
  167. // wait till consumer is exited 
  168. consumer_handle.join().unwrap(); 

這是性能最好的版本,耗時約32.37秒。

基準測試對比:

在SQLite中插入10億條Python VS Rust

總結

通過案例不同任務實驗,總體上可以得到:

  • 通過SQLite PRAGMA語句優化設置可以提高插入性能。
  • 使用準備好的語句可以提高性能
  • 進行批量插入可以提高性能。
  • PyPy 實際上比CPython快4倍
  • 線程/異步不一定能提高性能。

原文地址:https://mp.weixin.qq.com/s?__biz=MzU0MTY5MzEwMA==&mid=2247488055&idx=1&sn=2848892932ca18c1bf6835870a795c15&chksm=fb2757f4cc50dee233e8b51e6d4fddabec2e78ae7fc63da1b70181329e3f730ebaa67cf01e96&mpshare=1&s

延伸 · 閱讀

精彩推薦
  • SqliteSQLite 入門教程三 好多約束 Constraints

    SQLite 入門教程三 好多約束 Constraints

    在上一篇隨筆的結尾,我提到了SQLite的約束, 但是在那里我把它翻譯成了限定符,不太準確,這里先更正一下,應該翻譯成約束更貼切一點。 那么什么是...

    SQLite入門教程4572020-06-05
  • SqliteSQLite速度評測代碼

    SQLite速度評測代碼

    SQLite 作為一個輕量級嵌入式數據庫,還是非常好用的。雨痕極力推薦~~~~~~ ...

    SQLite教程網5832020-06-01
  • SqliteSQLite 內存數據庫學習手冊

    SQLite 內存數據庫學習手冊

    這篇文章主要介紹SQLite 內存數據庫的使用方法, 需要的朋友可以參考下 ...

    SQLite教程網3292020-06-06
  • Sqlite基于sqlite特殊字符轉義的實現方法

    基于sqlite特殊字符轉義的實現方法

    本篇文章是對sqlite特殊字符轉義的實現方法進行了詳細的分析介紹,需要的朋友參考下 ...

    sqlite數據庫教程網4132020-06-04
  • SqliteSQLite中重置自動編號列的方法

    SQLite中重置自動編號列的方法

    這篇文章主要介紹了SQLite中重置自動編號列的方法,本文講解了3種情況和其對應解決方法,需要的朋友可以參考下 ...

    dodo84492020-06-08
  • SqliteSQLite中的WAL機制詳細介紹

    SQLite中的WAL機制詳細介紹

    這篇文章主要介紹了SQLite中的WAL機制詳細介紹,本文講解了什么是WAL、WAL如何工作、WAL的優點與缺點、WAL引入的兼容性問題、WAL引入的性能問題等內容,需要...

    dodo83402020-06-08
  • Sqlite詳解SQLite中的查詢規劃器

    詳解SQLite中的查詢規劃器

    這篇文章主要介紹了詳解SQLite中的查詢規劃器,SQLite是一個開源的嵌入式數據庫,需要的朋友可以參考下...

    SQLite教程網8892021-10-25
  • SqliteSQLite 錯誤碼整理

    SQLite 錯誤碼整理

    這篇文章主要介紹了SQLite 錯誤碼,方便大家在開發過程中快速解決問題 ...

    SQLite教程網5532020-06-06
主站蜘蛛池模板: 久草草在线视视频 | 色啪啪888.com| 国产亚洲精品综合在线网址 | 日韩免费观看 | 亚洲国产精品无码中文字满 | 洗濯屋动漫在线观看 | 91资源在线视频 | ova催眠性指导5最新在线 | 8天堂资源在线官网 | 97色吧| 国产一级真人毛爱做毛片 | 国产麻豆精品原创 | 亚洲成人伦理 | 99视频在线免费 | 91九色jiuse006| 国产精品亚洲va在线观看 | 亚洲狠狠婷婷综合久久久久网站 | 盲井在线| 女教师被学生糟蹋三天 | 三级伦理影院 | 80s在线| 国产精品va在线观看无 | 教师系列 大桥未久在线 | 男女车车好快的车车免费网站 | 2019午夜福合集高清完整版 | 2022国产麻豆剧果冻传媒入口 | 欧美久久久久久 | 免费网站直接进入 | 国产精品一区二区三区免费 | 任我淫 | 小早川怜子亚洲综合中文字幕 | 免费国产午夜高清在线视频 | 日本精品久久久久中文字幕 1 | 视频一区二区 村上凉子 | 日韩免费在线观看 | 91赵邦贺| 国产亚洲一欧美一区二区三区 | 日韩免费高清完整版 | 拍拍叫痛的无挡视频免费 | 热剧库| 美国xxnx|