Storm作业转化为Flink作业流程分析
一、 Storm的topology作业可以转化为Flink Job放到Flink上运行,需要修改Storm作业的代码。以wordcount为例,代码修改成可以在Flink上运行的作业后,如下:
public class WordCountTopology {
static class WordCountSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
}
public void nextTuple() {
for(String word : WordCountData.WORDS) {
this.collector.emit(new Values(new String[]{word}));
Utils.sleep(100);
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(new String[]{"word"}));
}
}
static class WordCountBolt extends BaseRichBolt {
private OutputCollector outputCollector;
private Map<String,Integer> wordCountMap = new ConcurrentHashMap<String, Integer>();
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.outputCollector = outputCollector;
}
public void execute(Tuple tuple) {
String words = tuple.getStringByField("word");
String[] wordArray = words.toLowerCase().split("\W+");
for(String word : wordArray) {
if(!wordCountMap.containsKey(word)){
wordCountMap.put(word,1);
}
else{
int wordCount = wordCountMap.get(word);
wordCount = wordCount + 1 ;
wordCountMap.put(word,wordCount);
}
}
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { }
}
static class WordCountData {
public static final String[] WORDS = new String[] {
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffe"
};
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new WordCountSpout());
builder.setBolt("bolt", new WordCountBolt(), 1).shuffleGrouping("spout");
Config conf = new Config();
conf.setDebug(false);
if (args != null && args.length > 0) {
conf.setNumWorkers(1);
// -- 代码修改前的使用StormSubmitter提交作业到远端Storm集群
//StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
// -- 代码修改后使用FlinkSubmitter提交作业到远端Flink集群
FlinkSubmitter.submitTopologyWithProgressBar(args[0], conf, FlinkTopology.createTopology(builder));
} else {
// --代码修改前使用LocalCluster提交作业到本地运行
//StormTopology topology = builder.createTopology();
//LocalCluster localCluster = new LocalCluster();
//localCluster.submitTopology("test",conf,topology);
// -- 代码修改后使用FlinkLocalCluster提交作业到本地运行
FlinkLocalCluster cluster = new FlinkLocalCluster();
cluster.submitTopology("test", conf, FlinkTopology.createTopology(builder));
Utils.sleep(40000);
cluster.killTopology("test");
cluster.shutdown();
}
}
}
Storm的作业中会定义Spout,Bolt这些组件对输入数据的处理逻辑:Spout对数据的处理在nextTuple中完成,Bolt对数据的处理在execute中;以及组件之间的grouping规则:包括shuffle_grouping(数据随机分发下游bolt组件),fields_grouping(按照字段取值不同分发到不同bolt组件),all_grouping(数据分发到下游的每一个bolt组件)等;当将其转化为Flink的Job时候,对应组件的数据处理逻辑转化为Flink Job内部的DataSource,Operator等算子的处理逻辑,对应的grouping规则转化为Flink 流式作业的上下游DataSource,Operator等之间数据的分发规则,这个转化操作是通过FlinkTopology.createTopology这一步完成的;
二、Storm topology解析成为Flink job主要是在FlinkTopology这个类中完成的,以上的wordcount作业中是通过createTopology方法进入的,如下:
private FlinkTopology(TopologyBuilder builder) {
this.builder = builder;
this.stormTopology = builder.createTopology();
// extract the spouts and bolts
this.spouts = getPrivateField("_spouts");
this.bolts = getPrivateField("_bolts");
this.env = StreamExecutionEnvironment.getExecutionEnvironment();
// Kick off the translation immediately
translateTopology();
}
/**
* Creates a Flink program that uses the specified spouts and bolts.
* @param stormBuilder The Storm topology builder to use for creating the Flink topology.
* @return A {@link FlinkTopology} which contains the translated Storm topology and may be executed.
*/
public static FlinkTopology createTopology(TopologyBuilder stormBuilder) {
return new FlinkTopology(stormBuilder);
}
在以上代码中,首先调用createTopology方法,生成一个FlinkTopology对象,在FlinkTopology构造方法里面调用translateTopology进行作业的拓扑转换,在该方法中完成主要的转换工作。在FlinkToplogy中进行作业转化解析的主要流程如下:
1. 首先获取Flink流式作业的执行环境,以及Storm作业中定义的Spout,Bolt组件集合;这些都是在FlinkTopology的构造方法中完成,代码如下:
this.spouts = getPrivateField("_spouts");
this.bolts = getPrivateField("_bolts");
this.env = StreamExecutionEnvironment.getExecutionEnvironment();
对于Storm中Spouts,Bolts组件的获取,具体实现方式是通过java反射获取私有变量的方式来完成的,略;
2. 然后完成对Spout组件的解析,挨个遍历Spout组件,使用SpoutWrapper封装起来:spout对象作为一个参数传入到SpoutWrapper,生成一个SpoutWrapper对象,然后再将SpoutWrapper转为Flink job的DatStreamSource对象;Spoutwrapper的代码实现如下:
public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> implements StoppableFunction
...............
private final IRichSpout spout;
@Override
public final void run(final SourceContext<OUT> ctx) throws Exception {
final GlobalJobParameters config = super.getRuntimeContext().getExecutionConfig()
.getGlobalJobParameters();
StormConfig stormConfig = new StormConfig();
........
final TopologyContext stormTopologyContext = WrapperSetupHelper.createTopologyContext(
(StreamingRuntimeContext) super.getRuntimeContext(), this.spout, this.name,
this.stormTopology, stormConfig);
.............
//打开数据源
this.spout.open(stormConfig, stormTopologyContext, new SpoutOutputCollector(collector));
this.spout.activate();
//读取和发送数据
if (numberOfInvocations == null) {
if (this.spout instanceof FiniteSpout) {
final FiniteSpout finiteSpout = (FiniteSpout) this.spout;
while (this.isRunning && !finiteSpout.reachedEnd()) {
finiteSpout.nextTuple();
}
} else {
while (this.isRunning) {
this.spout.nextTuple();
}
}
} else {
int counter = this.numberOfInvocations;
if (counter >= 0) {
while ((--counter >= 0) && this.isRunning) {
this.spout.nextTuple();
}
} else {
do {
collector.tupleEmitted = false;
this.spout.nextTuple();
} while (collector.tupleEmitted && this.isRunning);
}
}
}
SpoutWrapper类继承了Flink的RichParallelSourceFunction类,该类是实现了Flink的SourceFunction接口,用于连数据源;如上,在SpoutWrapper类中实现SourceFunction的run方法,在该方法中调用了Storm的组件方法进行数据源的连接和读取,发送操作:spout.open(...)进行数据源的连接,通过spout.nextTuple()数据的读取和发送,构造SpoutWrapper时候传入三个参数,构造样例如下:
new SpoutWrapper<String>(new WordCountSpout(),new String[]{Utils.DEFAULT_STREAM_ID})
第一个参数是要进行封装的Spout组件的对象,第二个参数是封装后该组件的数据流的ID;
然后通过env.addSource方法将传入的SpoutWrapper转为为Flink的DataStreamSource对象;
//调用env.addSource方法
DataStreamSource<Tuple> src = env.addSource(spoutWrapperSingleOutput, spoutId,
declarer.getOutputType(outputStreamId));
...............
//env.addSource方法的具体实现
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo)
...............
if (function instanceof StoppableFunction) {
sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function));
} else {
sourceOperator = new StreamSource<>(function);
}
return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
addSource传入三个参数,第一个是实现了SourceFunction的SpoutWrapper,用于实现对数据的处理逻辑,第二个参数是spoutId,对应转化后的DataStreamSource的sourceName,第三个参数是该sourceFunction的数据输出类型;在addSource的具体实现中,直接将传入的SourceFunction参数,即SpoutWrapper对象封装成了一个数据源的操作的operator,然后将其封装到DataStreamSource中返回;
3. 根据以上操作就将Storm作业的spout组件转为成了Flink作业的DataStreamSource了,然后将转化出来的dataStream放入到availableInputs中;其中availableInputs的数据结构定义如下:
HashMap<String, HashMap<String, DataStream<Tuple>>>
第一个参数表示对应的组件Id,第二个参数表示组件对应处理的数据流的Id,第三个参数表示要处理的数据流;
4. 然后进行Bolt组件的解析,对Bolt组件的解析主要完成:1)对上游输入流的解析,转换上游输入与Bolt组件之间的数据分发规则,为Flink的数据分发规则;2)获取Bolt组件数据输出的schema,并构造Bolt组件的数据输出;主要在两个方法中完成呢个:一个是processInput,一个是createOutput;
5. processInput:主要实现获取Bolt组件的输出流的schema,以及对Bolt组件的输入流根据grouping规则进行处理:设置相应的数据流分发规则
private DataStream<Tuple> processInput(String boltId, IRichBolt userBolt,
GlobalStreamId streamId, Grouping grouping,
Map<String, DataStream<Tuple>> producer) {
...............
final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
declarers.put(boltId, declarer);
userBolt.declareOutputFields(declarer);
//获取Bolt组件的输出流schema
this.outputStreams.put(boltId, declarer.outputStreams);
..................
// if producer was processed already
// 根据grouping规则设置相应的数据分发逻辑
if (grouping.is_set_shuffle()) {
// Storm uses a round-robin shuffle strategy
inputStream = inputStream.rebalance();
} else if (grouping.is_set_fields()) {
// global grouping is emulated in Storm via an empty fields grouping list
final List<String> fields = grouping.get_fields();
if (fields.size() > 0) {
FlinkOutputFieldsDeclarer prodDeclarer = this.declarers.get(producerId);
inputStream = inputStream.keyBy(prodDeclarer
.getGroupingFieldIndexes(inputStreamId,
grouping.get_fields()));
} else {
inputStream = inputStream.global();
}
} else if (grouping.is_set_all()) {
inputStream = inputStream.broadcast();
} else if (!grouping.is_set_local_or_shuffle()) {
throw new UnsupportedOperationException(
"Flink only supports (local-or-)shuffle, fields, all, and global grouping");
}
......
}
在该段代码中,通过FlinkOutputFieldsDeclarer获取了Bolt组件的输出的schema,在构造数据流输出的时候使用;然后进行了storm作业中的grouping规则与Flink中数据流的操作进行了相似转换:shuffle grouping对应于rebalance操作,将数据流进行随机分发;field grouping对应于keyby操作,按照字段取值进行数据分发;all grouping对应于boardcast操作,将数据分发到每一个下游的operator;以上就实现了输入流到Bolt组件分发规则的变换;
6. createOutput:在这个方法中,主要完成对待解析Bolt组件的三个处理操作,将上游的input的DataStream进行合并;然后构造Bolt组件的OutputStream,并与上游的DataStream连接;最后将Bolt组件解析出来OutputStream作为输入放入到availableInput中,作为下游Bolt组件的输入,并继续下一个Bolt组件的解析;
首先,获取上游的inputStreams,并挨个遍历:如果只有一个输入,则直接转换到singleInputStream中,如果有多个输入,则使用DataStream的Connect操作,将各个输入流进行合并连接,放到mergedInputStream中,代码逻辑实现如下:
//只有一个输入将放在singleInputStream中
DataStream<Tuple> singleInputStream = input1.getValue();
DataStream<StormTuple<Tuple>> mergedInputStream = null;
//得带上游输入
while (iterator.hasNext()) {
Entry<GlobalStreamId, DataStream<Tuple>> input2 = iterator.next();
GlobalStreamId streamId2 = input2.getKey();
DataStream<Tuple> inputStream2 = input2.getValue();
//如果有多于一个输入流,则进行输入流的合并
if (mergedInputStream == null) {
mergedInputStream = singleInputStream
.connect(inputStream2)
.flatMap(
new TwoFlinkStreamsMerger(streamId1, inputSchema1,
streamId2, this.outputStreams.get(
streamId2.get_componentId()).get(
streamId2.get_streamId())))
.returns(StormTuple.class);
} else {
mergedInputStream = mergedInputStream
.connect(inputStream2)
.flatMap(
new StormFlinkStreamMerger(streamId2, this.outputStreams.get(
streamId2.get_componentId()).get(streamId2.get_streamId())))
.returns(StormTuple.class);
}
}
进行两个输入流的connect操作后,然后在执行一个FlatMap操作,将多个输入流进行合并成一个,然后输出到下游一个flatmap的operator中,并返回对应的数据流,合并操作后的数据流如下所示:
然后,根据processInput中获取Bolt组件的输出信息schema判断其的输出的个数,如果是单个输出,则直接使用一个BoltWrapper<Tuple,Tuple>对Y进行封装,表示接收到一个Tuple类型的消息,也同样以Tuple类型转发出去;如果是多个输出,则使用BoltWrapper<Tuple,SplitStreamType<Tuple>>对Y进行封装,表示接受到一个Tuple类型的消息,则进行split到多个下游的Bolt组件的Operator上进行输出;其中BoltWrapper的代码定义如下
public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
OneInputStreamOperator<IN, OUT>
.....
@Override
public void processElement(final StreamRecord<IN> element) throws Exception {
this.flinkCollector.setTimestamp(element);
IN value = element.getValue();
if (this.stormTopology != null) {
Tuple tuple = (Tuple) value;
Integer producerTaskId = tuple.getField(tuple.getArity() - 1);
this.bolt.execute(new StormTuple<>(value, this.inputSchemas.get(producerTaskId),
producerTaskId, this.inputStreamIds.get(producerTaskId), this.inputComponentIds
.get(producerTaskId), MessageId.makeUnanchored()));
} else {
this.bolt.execute(new StormTuple<>(value, this.inputSchemas.get(null), -1, null, null,
MessageId.makeUnanchored()));
}
}
BoltWrapper继承Flink中的AbstractStreamOperator类,该类是实现了Flink的StreamOperator接口;这样传入的Bolt组件对象封装到BoltWrapper中,就转化为Flink的Operator对象了。在BoltWrapper中实现了StreamOperator的processElement方法,用于对输入元素进行处理:将输入的StreamRecord封装成可以供Bolt组件处理的StormTuple,并通过bolt将该数据分发出去;
将Bolt组件使用BoltWrapper封装后,根据上面构造的singleInputStream,或者mergedInputStream的输入合并,执行transform操作,传入对应的boltId,以及BoltWrapper参数,将其Bolt组件的处理逻辑应用到对应的输入流上,转换成对应的opeator,如下:
final BoltWrapper<Tuple, SplitStreamType<Tuple>> boltWrapperMultipleOutputs = new BoltWrapper<>(
bolt, boltId, inputStreamId1, inputComponentId1, inputSchema1, null);
..................................
multiStream = singleInputStream.transform(boltId, outType, boltWrapperMultipleOutputs);
这一步转化操作会将对应的BoltWrapper的参数转为对应的StreamTransformation,并放入到Flink执行环境的transmations变量中,transmations用于生成作业执行的streamGraph;
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo,
OneInputStreamOperator<T, R> operator) {
transformation.getOutputType();
//operator生成Transform
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(this.transformation,
operatorName,operator,outTypeInfo,environment.getParallelism());
//传入的输入流operator进行转化;
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
//执行环境的transmations中加入transmation
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
最后,将转化出来的outStream放入到availableInputs中,作为解析下一个组件的输入,继续进行下一个组件的解析,直到整个拓扑解析完成;等到整个作业解析完成,则Storm作业中组件将全部转化为Flink的Transmation,放入到执行环境的transmations中,提交作业运行的时候,transmations转化StreamGraph,再转为JobGraph,提交作业后在服务端转为ExecutationGraph执行,从而Storm的整个Topology就转化为了Flink的Job执行了;
- .NET4.0的可扩展缓存系统
- 让WordPress 在RSS 中Feed 截断文字输出
- [C#1] 11-接口
- jquery 标签中的属性操作
- 使用MongoDB存储访问者信息
- 解决WordPress 打开Feed页面“This page contains the following errors…”的问题
- jquery表单属性筛选元素
- [C#1] 10-事件
- Windows 7的VPC虚拟机自动不与主机时间同步的解决办法
- [C#1] 9-委托
- jquery基本选择器
- rainyday.js——超逼真全屏雨滴模拟插件
- [C#1] 8-数组
- ASP.NET MVC2 数据模型验证类库:MVC Foolproof Validation
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- JavaScript 字符串中的 pad 方法!
- python随机生成经纬度(用于爬虫参数伪造)
- [Go]GO语言实战-gorm获取insert的自增id 和 struct标签不映射到表字段
- 安装vcs
- Mercari数据集——机器学习&深度学习视角
- 查找重复姓名的sql语句
- nginx如何限制并发连接请求数?
- RTSP协议视频平台EasyNVR证书配置界面上传文件地址自动填写错误怎么处理?
- 开发RTSP/RTMP/GB28181/海康SDK/EHome视频融合平台EasyCVR,使用vue-cli3项目搭建多页面模式的方法
- 互联网视频直播&点播平台RTMP推流组件EasyRTMP在弱网环境下推流稳定吗?会不会有推流失败的问题?
- 安防融合视频云服务EasyCVR集成海康EHome协议实现设备录像回看返回会话ID为-1是什么情况?
- git .gitignore 忽略规则的匹配语法
- vue 初始化高德地图
- js -- 判断数组是否为空?
- 手机没网了,却还能支付,这是什么原理?