前言: 在進行數據處理的時候,我們經常會用到 pandas 。但是 pandas 本身好像并沒有提供多進程的機制。本文將介紹如何來自己實現 pandas (apply 函數)的多進程執行。其中,我們主要借助 joblib 庫,這個庫為python 提供了一個非常簡潔方便的多進程實現方法。
所以,本文將按照下面的安排展開,前面可能比較啰嗦,若只是想知道怎么用可直接看第三部分:
- 首先簡單介紹 pandas 中的分組聚合操作 groupby。
- 然后簡單介紹 joblib 的使用方法。
- 最后,通過一個去停用詞的實驗詳細介紹如何實現 pandas 中 apply 函數多進程執行。
注意:本文說的都是多進程而不是多線程。
1. dataframe.groupby 分組聚合操作
1
2
3
|
# groupby 操作 df1 = pd.dataframe({ 'a' :[ 1 , 2 , 1 , 2 , 1 , 2 ], 'b' :[ 3 , 3 , 3 , 4 , 4 , 4 ], 'data' :[ 12 , 13 , 11 , 8 , 10 , 3 ]}) df1 |
按照某列分組
1
2
3
4
5
|
grouped = df1.groupby( 'b' ) # 按照 'b' 這列分組了,name 為 'b' 的 key 值,group 為對應的df_group for name, group in grouped: print name, '->' print group |
1
2
3
4
5
6
7
8
9
10
|
3 - > a b data 0 1 3 12 1 2 3 13 2 1 3 11 4 - > a b data 3 2 4 8 4 1 4 10 5 2 4 3 |
按照多列分組
1
2
3
4
5
|
grouped = df1.groupby([ 'a' , 'b' ]) # 按照 'b' 這列分組了,name 為 'b' 的 key 值,group 為對應的df_group for name, group in grouped: print name, '->' print group |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
( 1 , 3 ) - > a b data 0 1 3 12 2 1 3 11 ( 1 , 4 ) - > a b data 4 1 4 10 ( 2 , 3 ) - > a b data 1 2 3 13 ( 2 , 4 ) - > a b data 3 2 4 8 5 2 4 3 |
若 df.index 為[1,2,3…]這樣一個 list, 那么按照 df.index分組,其實就是每組就是一行,在后面去停用詞實驗中,我們就用這個方法把 df_all 處理成每行為一個元素的 list, 再用多進程處理這個 list。
1
2
3
4
5
6
|
grouped = df1.groupby(df1.index) # 按照 index 分組,其實每行就是一個組了 print len (grouped), type (grouped) for name, group in grouped: print name, '->' print group |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
6 < class 'pandas.core.groupby.dataframegroupby' > 0 - > a b data 0 1 3 12 1 - > a b data 1 2 3 13 2 - > a b data 2 1 3 11 3 - > a b data 3 2 4 8 4 - > a b data 4 1 4 10 5 - > a b data 5 2 4 3 |
2. joblib 用法
refer:
1
2
3
|
# 1. embarrassingly parallel helper: to make it easy to write readable parallel code and debug it quickly: from joblib import parallel, delayed from math import sqrt |
處理小任務的時候,多進程并沒有體現出優勢。
1
2
|
% time result1 = parallel(n_jobs = 1 )(delayed(sqrt)(i * * 2 ) for i in range ( 10000 )) % time result2 = parallel(n_jobs = 8 )(delayed(sqrt)(i * * 2 ) for i in range ( 10000 )) |
1
2
3
4
|
cpu times: user 316 ms, sys: 0 ns, total: 316 ms wall time: 309 ms cpu times: user 692 ms, sys: 384 ms, total: 1.08 s wall time: 1.03 s |
當需要處理大量數據的時候,并行處理就體現出了它的優勢
1
|
% time result = parallel(n_jobs = 1 )(delayed(sqrt)(i * * 2 ) for i in range ( 1000000 )) |
1
2
|
cpu times: user 3min 43s , sys: 5.66 s, total: 3min 49s wall time: 3min 33s |
1
|
% time result = parallel(n_jobs = 8 )(delayed(sqrt)(i * * 2 ) for i in range ( 1000000 )) |
1
2
|
cpu times: user 50.9 s, sys: 12.6 s, total: 1min 3s wall time: 52 s |
3. apply 函數的多進程執行(去停用詞)
多進程的實現主要參考了 stack overflow 的解答: parallelize apply after pandas groupby
上圖中,我們要把 abstracttext 去停用詞, 處理成 abstracttext1 那樣。首先,導入停用詞表。
1
2
3
4
5
6
|
# 讀入所有停用詞 with open ( 'stopwords.txt' , 'rb' ) as inp: lines = inp.read() stopwords = re.findall( '"(.*?)"' , lines) print len (stopwords) print stopwords[: 10 ] |
1
2
|
692 [ 'a' , "a's" , 'able ', ' about ', ' above ', ' according ', ' accordingly ', ' across ', ' actually ', ' after'] |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
# 對 abstracttext 去停用詞 # 方法一:暴力法,對每個詞進行判斷 def remove_stopwords1(text): words = text.split( ' ' ) new_words = list () for word in words: if word not in stopwords: new_words.append(word) return new_words # 方法二:先構建停用詞的映射 for word in stopwords: if word in words_count.index: words_count[word] = - 1 def remove_stopwords2(text): words = text.split( ' ' ) new_words = list () for word in words: if words_count[word] ! = - 1 : new_words.append(word) return new_words % time df_all[ 'abstracttext1' ] = df_all[ 'abstracttext' ]. apply (remove_stopwords1) % time df_all[ 'abstracttext2' ] = df_all[ 'abstracttext' ]. apply (remove_stopwords2) |
1
2
3
4
|
cpu times: user 8min 56s , sys: 2.72 s, total: 8min 59s wall time: 8min 48s cpu times: user 1min 2s , sys: 4.12 s, total: 1min 6s wall time: 1min 2s |
上面我嘗試了兩種不同的方法來去停用詞:
方法一中使用了比較粗暴的方法:首先用一個 list 存儲所有的 stopwords,然后對于每一個 text 中的每一個 word,我們判斷它是否出現在 stopwords 的list中(復雜度 o(n)o(n) ), 若為 stopword 則去掉。
方法二中我用 一個series(words_count) 對所有的詞進行映射,如果該詞為 stopword, 則把它的值修改為 -1。這樣,對于 text 中的每個詞 ww, 我們只需要判斷它的值是否為 -1 即可判定是否為 stopword (復雜度 o(1)o(1))。
所以,在這兩個方法中,我們都是采用單進程來執行,方法二的速度(1min 2s)明顯高于方法一(8min 48s)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
from joblib import parallel, delayed import multiprocessing # 方法三:對方法一使用多進程 def tmp_func(df): df[ 'abstracttext3' ] = df[ 'abstracttext' ]. apply (remove_stopwords1) return df def apply_parallel(df_grouped, func): """利用 parallel 和 delayed 函數實現并行運算""" results = parallel(n_jobs = - 1 )(delayed(func)(group) for name, group in df_grouped) return pd.concat(results) if __name__ = = '__main__' : time0 = time.time() df_grouped = df_all.groupby(df_all.index) df_all = applyparallel(df_grouped, tmp_func) print 'time costed {0:.2f}' . format (time.time() - time0) |
1
|
time costed 150.81 |
1
2
3
4
5
6
7
8
9
10
11
12
13
|
# 方法四:對方法二使用多進程 def tmp_func(df): df[ 'abstracttext3' ] = df[ 'abstracttext' ]. apply (remove_stopwords2) return df def apply_parallel(df_grouped, func): """利用 parallel 和 delayed 函數實現并行運算""" results = parallel(n_jobs = - 1 )(delayed(func)(group) for name, group in df_grouped) return pd.concat(results) if __name__ = = '__main__' : time0 = time.time() df_grouped = df_all.groupby(df_all.index) df_all = applyparallel(df_grouped, tmp_func) print 'time costed {0:.2f}' . format (time.time() - time0) |
1
|
time costed 123.80 |
上面方法三和方法四分別對應于前面方法一和方法二,但是都是用了多進程操作。結果是方法一使用多進程以后,速度一下子提高了好幾倍,但是方法二的多進程速度不升反降。這是不是有問題?的確,但是首先可以肯定,我們的代碼沒有問題。下圖顯示了我用 top 命令看到各個方法的進程執行情況??梢钥闯?,在方法三和方法四中,的的確確是 12 個cpu核都跑起來了。只是在方法四中,每個核占用的比例都是比較低的。
fig1. 單進程 cpu 使用情況
fig2. 方法三 cpu 使用情況
fig3. 方法四 cpu 使用情況
一個直觀的解釋就是,當我們開啟多進程的時候,進程開啟和最后結果合并,進程結束,這些操作都是要消耗時間的。如果我們執行的任務比較小,那么進程開啟等操作所消耗的時間可能就要比執行任務本身消耗的時間還多。這樣就會出現多進程的方法四比單進程的方法二耗時更多的情況了。
所以總結來說,在處理小任務的時候沒有必要開啟多進程。借助joblib (parallel, delayed 兩個函數) ,我們能夠很方便地實現 python 多進程。
以上這篇pandas apply 函數 實現多進程的示例講解就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持服務器之家。
原文鏈接:https://blog.csdn.net/Jerr__y/article/details/71425298