Logstash: 应用实践 - 装载 CSV 文档到 Elasticsearch
【腾讯云 Elasticsearch Service】高可用,可伸缩,云端全托管。集成X-Pack高级特性,适用日志分析/企业搜索/BI分析等场景
在进行我们这个实践之前,相信大家已经安装好自己的 Logstash 环境。如果大家还没安装好Logstash,可以参照我之前的文章 “如何安装Elastic栈中的Logstash”。
Logstash 到底是做什么的?
我们先看一下如下的图:
简单地说,Logstash 就是位于 Data 和 Elasticsearch 之间的一个中间件。Logstash 是一个功能强大的工具,可与各种部署集成。 它提供了大量插件。 它从数据源实时地把数据进行采集,可帮助您解析,丰富,转换和缓冲来自各种来源的数据,并最终把数据传入到Elasticsearch之中。 如果您的数据需要 Beats 中没有的其他处理,则需要将 Logstash 添加到部署中。Logstash 部署于 ingest node 之中。
更具体的描述:
在Logstash之中,它也分为三个部分:
- Inputs
- Filters
- Ouput
Input 创建事件,Filter 修改输入事件,然后 Ouput 将它们发送到目标。 Input 和 Output 支持编解码器,使用编解码器,你可以在数据进入或退出管道时进行编码或解码,而不必使用单独的过滤器。 默认情况下,Logstash 在管道(pipeline)阶段之间使用内存中有界队列(输入到过滤器和过滤器到输出)来缓冲事件。 如果Logstash 不安全地终止,则存储在内存中的所有事件都将丢失。 为防止数据丢失,您可以使 Logstash 通过使用持久队列将正在进行的事件持久化到磁盘上。可以通过在 logstash.yml 文件中设置 queue.type: persistent 属性来启用持久队列,该文件位于LOGSTASH_HOME/config 文件夹下。 logstash.yml 是一个配置文件,其中包含与 Logstash 相关的设置。 默认情况下,文件存储在 LOGSTASH_HOME/data /queue 中。 你可以通过在 logstash.yml 中设置 path.queue 属性来覆盖它。
Logstash 配置由一系列输入,过滤器和输出插件及其相应的属性组成。 每个插件在解析,处理和最终以所需格式放置数据方面起着重要作用。 输入插件生成事件,过滤器修改它们,输出将它们发送到其他系统。 Logstash 提供超过 200 个插件,以及创建和贡献自己的灵活性:
在我们安装的 Logstash bin 目录下,我们可以使用如下的命令来获得所有的 plugins:
$ ./logstash-plugin list
下载 Data
为了能够使得我的练习能够进行,我们必须先得到数据。我们可以到网址 kaggle.com 进行下载。该网站含有大量的数据可以供我们进行下载。我们在该网页搜索 “cars”:
我们可以得到一叫做“Classified Ads for Cars”的 dataset。从上面我们可以看到有大概有 92M 的数据,不多,但是确实也不少啊。对于我们做这个练习来说,确实应该是可以的了。在网页上,我们可以看到这个 csv 文件的所有的数据描述。我们可以先注册一个账号,然后点击 “Download” 按钮,下载这个数据,并解压这个文件。在我们的电脑的 “Downloads” 文件目录下可以看到被解压的文件 “all_anonymized_2015_11_2017_03.csv”。我们可以在我们的 home 目录下创建一个叫做 data 的目录,并把这个文件拷贝过去,并重新命名为 cars.csv。
localhost:data liuxg$ tree.└── cars.csv 0 directories, 1 filelocalhost:data liuxg$ pwd/Users/liuxg/data
我们可以看到在data目录下只有叫做cars.csv的文件。我们可以打开这个文件,我们可以看到如下的内容:
这是一个非常大的文件。在这个文件里,我们可以看到 maker, model, mileage, manufacture 等等字段。
Index CSV 文件到 Elasticsearch
在上一节中,我们已经把我们的数据存入到我们的data目录中。在这节里我们来讲述如何把数据写入到 Elasticsearch 之中。首先,我们可以参阅链接 “Configuring Logstash”。我们需要创建一个属于我们自己的 config 文件。通常一个 config 文件由如下的三个部分组成:
# This is a comment. You should use comments to describe# parts of your configuration.input { ...} filter { ...} output { ...}
我们在之前已经创建好的目录data下创建一个叫做 longstash_cars.config 的文件。我们可以使用我们喜欢的编辑器来编辑这个文件。
logstash_cars.config 文件的内容如下:
input { file { path => "/Users/liuxg/data/cars.csv" start_position => "beginning" sincedb_path => "/dev/null" }} filter { csv { separator => "," columns => [ "maker", "model", "mileage", "manufacture_year", "engine_displacement", "engine_power", "body_type", "color_slug", "stk_year", "transmission", "door_count", "seat_count", "fuel_type", "date_created", "date_last_seen", "price_eur" ] } mutate { convert => ["mileage", "integer"] } mutate { convert => ["price_eur", "float"] } mutate { convert => ["engine_power", "integer"] } mutate { convert => ["door_count", "integer"] } mutate { convert => ["seat_count", "integer"] }} output { elasticsearch { hosts => "localhost:9200" index => "cars" document_type => "sold_cars" } stdout {}}
这里有几点需要说明的:
- 在 input 中,我们定义了一个文件,它的path指向我们的 csv 文件的位置。start_position 指向beginning。如果对于一个实时的数据源来说,它通常是 ending,这样表示它每次都是从最后拿到那个数据。sincedb_path 通常指向一个文件。这个文件保存上次操作的位置。针对我们的情况,我们设置为 /dev/null,表明,我们不存储这个数据
- 在 filter 中,CSV filter 是非常直接的,不太需要很多的解释。这里的 column 都来自于我们的 csv 表格。通常 Logstash 会把每个数据都当做是字符串。针对我们的情况,我们可看到 mileage 是一个整型数,price_eur 是一个浮点数。这些我们都需要进行调整。这个调整我们可以通过 mutate 来完成
- 在 output 之中,我们制定本地的 Elasticsearch 为我们的数据库,它的 index 是 cars,同时 document_type 为_doc。我们也同时使用 stdout,这样我们可以在terminal屏幕中看出数据在处理之中
装载数据到 Elasticsearch
我们首先进入到 Logstash 的安装目录,然后打入如下的命令:
sudo ./bin/logstash -f ~/data/logstash_cars.config
提示:在运行 Logstash 时使用 -r 标志可让您在更改和保存配置后自动重新加载配置。 在测试新配置时,这将很有用,因为你可以对其进行修改,这样就不必在每次更改配置时都手动启动Logstash。
然后,我们可以在屏幕上看到如下的输出:
同时如果我们这在 Kibana 上可以看到正在 index 的 cars index数量(count)是一直变化的。
因为这是一个很大的文件,所以建立索引需要一段时间,而且我的电脑也将会是非常的热。
经过一段时间的运行,我们可以看到屏幕上不再滚动了,表明Logstash已经完成了数据的传输。我们可以在 Kibana上打入如下的命令:
GET cars/_stats
我看可以看到我们的cars Index的统计数据:
我们可以看到共有 3,167,984 条数据,并且它的大小有 2.466G 这么大的数据。
我们可以打入如下的命令:
GET cars
我们可以看到 Elasticsearch 已经为我们创建好了 mapping,并且它们的数据类型也可以看到:
{ "cars" : { "aliases" : { }, "mappings" : { "properties" : { "@timestamp" : { "type" : "date" }, ... "color_slug" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "door_count" : { "type" : "long" }, "engine_power" : { "type" : "long" }, "fuel_type" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "host" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, ... }, "settings" : { "index" : { "creation_date" : "1567949013407", "number_of_shards" : "1", "number_of_replicas" : "1", "uuid" : "tYT-XWpFTGiqwn_rMX4S3A", "version" : { "created" : "7030099" }, "provided_name" : "cars" } } }}
我们可以看到 door_count 及 engine_power 都已经修改为 long 数据类型了。在默认的情况下已经为我们创建了一个 replica。
在 Kibana 上显示数据
我们打开我们的 Kibana。我们首先现在我们的 Kibana 进行如下的操作:
我们建立一个叫做 cars* 的 index pattern。
最后我的到如下的画面:
再接下来,我们选择 Discover(左边最上面的那个图标)来装载我们的数据:
我们可以选择最近一个小时的数据的数据。我们立马可以看到一个统计图,显示在每个时间有多少个数据进来。
我们也可以做一个报表通过点击 “add” 按钮选择 maker, fuel_type,price_eur 及
最终我们看到如下的一个列表:
创建 Visualization
我们选择创建一个 Visualization:
我们选择 “Pie” Chart:
我们可以按照如下的选择生产一个 top 10 的生产厂商
按照如下的方法可以生产 top 10 的汽车型号:
生成最终 Dashboard
最新活动
包含文章发布时段最新活动,前往ES产品介绍页,可查找ES当前活动统一入口
Elasticsearch Service自建迁移特惠政策>>
Elasticsearch Service 新用户特惠狂欢,最低4折首购优惠 >>
Elasticsearch Service 企业首购特惠,助力企业复工复产>>
关注“腾讯云大数据”公众号,技术交流、最新活动、服务专享一站Get~
- Rafy 框架 - 流水号插件
- 产品前端重构(TypeScript、MVC框架设计)
- 寻找最优持仓期的开盘缺口盈利交易策略基于Matlab
- Android SlidingMenu 侧拉菜单的使用(详细配置)
- Rafy 框架 - 幽灵插件(假删除)
- 用粒子群优化算法求解旅行商问题
- 使用CNN(LSTM架构)进行序列预测基于TensorFlow
- 【独家】周志华教授gcForest(多粒度级联森林)算法预测股指期货涨跌
- 如何利用SOTER,1个版本内完成指纹支付开发?
- Rafy 框架 - 大批量导入实体
- Rafy 框架 - 执行SQL或存储过程
- 关于activitygroup过时,用frament替换操作
- Rafy 框架 - 为数据库生成注释
- CNN预测股票走势基于Tensorflow(思路+程序)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 回炉重造:计算图
- CenterNet的骨干网络之DLASeg
- 一份朴实无华的移动端盒子滤波算法优化笔记
- [译]基于以太坊和USDC搭建去中心化金融系统
- 使用Substrate开发区块链存证dApp
- Flutter Dojo设计之道——如何打造一个通用的Playground
- [译]构建去中心化智能合约编程货币
- 你不知道c语言写的程序要多块——以NGS fastq文件reads计数为例
- JVM系列之:从汇编角度分析Volatile
- JVM系列之:从汇编角度分析NullCheck
- 基于机器学习的文本分类!
- JVM系列之:再谈java中的safepoint
- 《图解算法》第2章 选择排序
- 《图解算法》第3章 递归
- 《图解算法》第4章 快速排序