Embulk--异构数据库和文件系统处理神器

时间:2022-06-05
本文章向大家介绍Embulk--异构数据库和文件系统处理神器,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

Embulk是一款开源的批处理框架,它主要用于异构数据库,文件存储以及云服务之间的数据传输工具。特色:

  1. 支持并行和分布式处理大数据集
  2. 大部分插件支持事务处理
  3. 支持重跑(需要自身幂等性支持)

Embulk使用Yaml进行配置,主要包括下面几个section:

in:从数据源读取数据数据(基于文件(ftp等)和基于记录(数据库等))
    parser:如果数据源是文件,parser解析文件格式(基于文件)
    decoder:用来解压缩和加解密数据(基于文件)
out:输出数据到目标数据源
     formatter:将数据输出成相应文件格式(基于文件)
     encoder:压缩数据或加解密数据(基于数据)
filters:输出后的数据过滤(可选)
exec:执行引擎(可选)

官方网站模板如下:

in:
  type: file
  path_prefix: ./mydata/csv/
  decoders:
  - {type: gzip}
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    null_string: 'NULL'
    skip_header_lines: 1
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y%m%d'}
    - {name: comment, type: string}
filters:
  - type: speedometer
    speed_limit: 250000
out:
  type: stdout

每个插件的具体的模板可以参考: http://www.embulk.org/plugins/

运行流程

安装:

curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar"
chmod +x ~/.embulk/bin/embulk
echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
source ~/.bashrc

下载elasticsearch的输出插件,file的插件在安装的时候自带。

embulk gem install embulk-output-elasticsearch

创建一个简单的yml文件:seed.yml

in:
  type: file
  path_prefix: ./mydata/csv/
out:
  type: elasticsearch
  index: embulk
  index_type: embulk
  nodes:
    - host: localhost

随后embulk会自动猜测你需要的Yaml文件:

embulk guess ./mydata/seed.yml -o config.yml

如下:

in:
  type: file
  path_prefix: ./mydata/csv/
  decoders:
  - {type: gzip}
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: ''
    null_string: 'NULL'
    skip_header_lines: 1
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y%m%d'}
    - {name: comment, type: string}
out:
  type: elasticsearch
  index: embulk
  index_type: embulk
  nodes:
  - {host: localhost}

你看Embulk帮你补全了大部分,并在默认目录下生成了config.yml 如果你的文件中牵扯到时区的话,可以加上:

parser:
  default_timezone: 'Asia/Tokyo'

之后就可以执行yml文件:

embulk run config.yml -c diff.yml

diff.yml记录了上次最新跑过的数据,下一次会自动更新,并不会再运行记录在diff.yml文件里的文件

in: {last_path: mydata/csv/sample_01.csv.gz}
out: {}
Embulk事务的支持

当数据中途因为各种原因断了的时候,Embulk支持重跑,只需要运行时加上resume-state.yml的生成路径

embulk run config.yml -r resume-state.yml

事务失败后,再次运行:

embulk run config.yml -r resume-state.yml

Embulk会根据resume-state.yml记录的状态重跑数据 如果不再需要事务支持,只需要删除即可。

embulk cleanup config.yml -r resume-state.yml