Elasticsearch 通过Scroll遍历索引,构造pandas dataframe 【Python多进程实现】
首先,python 多线程不能充分利用多核CPU的计算资源(只能共用一个CPU),所以得用多进程。笔者从3.7亿数据的索引,取200多万的数据,从取数据到构造pandas dataframe总共大概用时14秒左右。每个分片用一个进程查询数据,最后拼接出完整的结果。
由于返回的json数据量较大,每次100多万到200多万,如何快速根据json构造pandas 的dataframe是个问题 — 笔者测试过read_json()、json_normalize()、DataFrame(eval(pandas_json))及DataFrame.from_dict(),from_dict()速度最快
转载请注明出处:https://www.cnblogs.com/NaughtyCat/p/how-to-get-all-results-from-es-by-scroll-python-version.html
- Elasticsearch scroll取数据— python版
源码如下:
def es_scroll(index, min_timestamp, max_timestamp, slice_no):
es = Elasticsearch('http://localhost:9200', timeout = 30, max_retries=10, retry_on_timeout=True)
page = es.search(
index = index,
doc_type = "tls_book",
scroll = '1m',
body={
"slice": {
"id": slice_no,
"max": SLICES
},
"_source": [
"SrcIP"
],
"sort": [
"_doc"
],
"query": {
"range" : {
"@timestamp" : {
"gte" : min_timestamp,
"lte" : max_timestamp,
"boost" : 2.0
}
}
}
},
version = False,
size = 10000)
sid = page['_scroll_id']
scroll_size = page['hits']['total']
# Start scrolling
df = pd.DataFrame()
appended_data = []
while (scroll_size > 0):
frame = pd.DataFrame.from_dict([document['_source'] for document in page["hits"]["hits"]])
appended_data.append(frame)
page = es.scroll(scroll_id = sid, scroll = '1m', request_timeout = 30)
# Update the scroll ID
sid = page['_scroll_id']
# Get the number of results that we returned in the last scroll
scroll_size = len(page['hits']['hits'])
if len(appended_data) > 0:
df = pd.concat(appended_data, ignore_index=True, sort = False)
del appended_data
gc.collect()
es.clear_scroll(body={'scroll_id': sid})
return df
注:
(1)通过 "_source" 关键字,指定要取的字段,可减少不必要的字段,提高查询速度
(2)官方文档指出,通过 "sort": [ "_doc"] —即按照_doc排序,可提高查询效率
(3)根据自己的环境,测试合理的 size ,效率会有数倍的差距。笔者环境(128G, 32核)一次取10000性能最好,网上大多测试,size取2000或者1000似乎较佳
(4)clear_scroll及时清理用完的scroll_id
(5)如果数据量较大,设置超时和重试次数(默认是10秒,否则超时会取不到数据),具体如下
timeout = 30, max_retries=10, retry_on_timeout=True
(6)Sliced scroll
如果返回的数据量特别大,可通过slice让多个分片独自来处理请求,如下(id从0开始):
"slice": {
"id": slice_no,
"max": SLICES
},
参考: https://www.elastic.co/guide/en/elasticsearch/reference/5.1/search-request-scroll.html#sliced-scroll
- python 多进程如何个函数传多个参数
python多进程或者多线程要向调用的函数传递多个参数,需要构造参数元组集合,代码如下(本示例每个进程不同的只有es的slice_id):
def build_parameters(index, min_timestamp, max_timestamp):
parmeters =[]
for num in range(0, SLICES):
tuple_paremeter = (index, min_timestamp, max_timestamp, num)
parmeters.append(tuple_paremeter)
return parmeters
- python多进程实例
示例使用进程池,及starmap 传递调用的函数及参数 (with相当于try, excepion, finallly的集合,会自动做资源的释放或关闭等)
with multiprocessing.Pool(processes = SLICES) as pool:
result = pool.starmap(es_scroll, parameters)
然后,拼接返回的dataframe 集合即可构造一个完整的dataframe,如下:
frame = pd.concat(result, ignore_index=True, sort = False)
*******************************************************************************************
精力有限,想法太多,专注做好一件事就行
- 我只是一个程序猿。5年内把代码写好,技术博客字字推敲,坚持零拷贝和原创
- 写博客的意义在于打磨文笔,训练逻辑条理性,加深对知识的系统性理解;如果恰好又对别人有点帮助,那真是一件令人开心的事
*******************************************************************************************
- 微博爬虫
- 电话域名受欢迎,微语言融资3000万
- 前端魔法堂——异常不仅仅是try/catch
- (cljs/run-at (JSVM. :all) "一起实现柯里化")
- (cljs/run-at (JSVM. :browser) "简单类型可不简单啊~")
- 前端魔法堂:解秘FOUC
- JS魔法堂:深究JS异步编程模型
- 前端魔法堂:屏蔽Backspace导致页面回退
- “表情包”火爆全球,域名emojis.com小六位易主
- 前端魔法堂:onsubmit和submit事件处理函数怎么不生效呢?
- (cljs/run-at (JSVM. :all) "Metadata就这样哦")
- (cljs/run-at (JSVM. :all) "细说函数")
- 动手写个数字输入框2:起手式——拦截非法字符
- Linux安装DNSmasq搭建自己的公共DNS
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法