Elasticsearch是一個分布式、Restful的搜索及分析服務器,Apache Solr一樣,它也是基于Lucence的索引服務器,但我認為Elasticsearch對比Solr的優點在于:
- 輕量級:安裝啟動方便,下載文件之后一條命令就可以啟動;
- Schema free:可以向服務器提交任意結構的JSON對象,Solr中使用schema.xml指定了索引結構;
- 多索引文件支持:使用不同的index參數就能創建另一個索引文件,Solr中需要另行配置;
- 分布式:Solr Cloud的配置比較復雜。
環境搭建
啟動Elasticsearch,訪問端口在9200,通過瀏覽器可以查看到返回的JSON數據,Elasticsearch提交和返回的數據格式都是JSON.
1
|
>> bin / elasticsearch - f |
安裝官方提供的Python API,在OS X上安裝后出現一些Python運行錯誤,是因為setuptools版本太舊引起的,刪除重裝后恢復正常。
1
|
>> pip install elasticsearch |
索引操作
對于單條索引,可以調用create或index方法。
1
2
3
4
5
|
from datetime import datetime from elasticsearch import Elasticsearch es = Elasticsearch() #create a localhost server connection, or Elasticsearch("ip") es.create(index = "test-index" , doc_type = "test-type" , id = 1 , body = { "any" : "data" , "timestamp" : datetime.now()}) |
Elasticsearch批量索引的命令是bulk,目前Python API的文檔示例較少,花了不少時間閱讀源代碼才弄清楚批量索引的提交格式。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
from datetime import datetime from elasticsearch import Elasticsearch from elasticsearch import helpers es = Elasticsearch( "10.18.13.3" ) j = 0 count = int (df[ 0 ].count()) actions = [] while (j < count): action = { "_index" : "tickets-index" , "_type" : "tickets" , "_id" : j + 1 , "_source" : { "crawaldate" :df[ 0 ][j], "flight" :df[ 1 ][j], "price" : float (df[ 2 ][j]), "discount" : float (df[ 3 ][j]), "date" :df[ 4 ][j], "takeoff" :df[ 5 ][j], "land" :df[ 6 ][j], "source" :df[ 7 ][j], "timestamp" : datetime.now()} } actions.append(action) j + = 1 if ( len (actions) = = 500000 ): helpers.bulk(es, actions) del actions[ 0 : len (actions)] if ( len (actions) > 0 ): helpers.bulk(es, actions) del actions[ 0 : len (actions)] |
在這里發現Python API序列化JSON時對數據類型支撐比較有限,原始數據使用的NumPy.Int32必須轉換為int才能索引。此外,現在的bulk操作默認是每次提交500條數據,我修改為5000甚至50000進行測試,會有索引不成功的情況。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
#helpers.py source code def streaming_bulk(client, actions, chunk_size = 500 , raise_on_error = False , expand_action_callback = expand_action, * * kwargs): actions = map (expand_action_callback, actions) # if raise on error is set, we need to collect errors per chunk before raising them errors = [] while True : chunk = islice(actions, chunk_size) bulk_actions = [] for action, data in chunk: bulk_actions.append(action) if data is not None : bulk_actions.append(data) if not bulk_actions: return def bulk(client, actions, stats_only = False , * * kwargs): success, failed = 0 , 0 # list of errors to be collected is not stats_only errors = [] for ok, item in streaming_bulk(client, actions, * * kwargs): # go through request-reponse pairs and detect failures if not ok: if not stats_only: errors.append(item) failed + = 1 else : success + = 1 return success, failed if stats_only else errors |
對于索引的批量刪除和更新操作,對應的文檔格式如下,更新文檔中的doc節點是必須的。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
{ '_op_type' : 'delete' , '_index' : 'index-name' , '_type' : 'document' , '_id' : 42 , } { '_op_type' : 'update' , '_index' : 'index-name' , '_type' : 'document' , '_id' : 42 , 'doc' : { 'question' : 'The life, universe and everything.' } } |
常見錯誤
- SerializationError:JSON數據序列化出錯,通常是因為不支持某個節點值的數據類型
- RequestError:提交數據格式不正確
- ConflictError:索引ID沖突
- TransportError:連接無法建立
性能
上面是使用MongoDB和Elasticsearch存儲相同數據的對比,雖然服務器和操作方式都不完全相同,但可以看出數據庫對批量寫入還是比索引服務器更具備優勢。
Elasticsearch的索引文件是自動分塊,達到千萬級數據對寫入速度也沒有影響。但在達到磁盤空間上限時,Elasticsearch出現了文件合并錯誤,并且大量丟失數據(共丟了100多萬條),停止客戶端寫入后,服務器也無法自動恢復,必須手動停止。在生產環境中這點比較致命,尤其是使用非Java客戶端,似乎無法在客戶端獲取到服務端的Java異常,這使得程序員必須很小心地處理服務端的返回信息。