Storm——分布式实时流式计算框架

时间:2022-07-22
本文章向大家介绍Storm——分布式实时流式计算框架,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

Storm

第一章 是什么

一 介绍

Storm是Twitter开源的分布式实时大数据处理框架,最早开源于github. 2013年,Storm进入Apache社区进行孵化. 2014年9月,晋级成为了Apache顶级项目. 国内外各大网站使用,例如雅虎、阿里、度 官网 http://storm.apache.org/

特点

  • Storm是个实时的、分布式以及具备高容错的计算系统
  • Storm进程常驻内存
  • Storm数据不经过磁盘,在内存中处理
  • 高可靠性 异常处理 消息可靠性保障机制(ACK)
  • 可维护性 StormUI 图形化监控接口

注意:

  1. MapReduce无法做到实时处理, 制约因素是数据量级大, 分布式计算, IO操作(浪费时间)
  2. 分布式能够解决单点故障

二 拓扑流程

组件说明

  • spout : 相当于数据源
  • tuple : 相当于元数据
  • bolt : 数据处理的最小单位, 只负责处理一部分处理逻辑, bolt异步多线程处理, 最后再汇总

拓扑图

架构 详细说明见第四章第一节

  • Nimbus: 资源分配,任务调度, 上传jar ( 类比老板 )
  • Supervisor : 开启或进程 ( 类比包工头,根据ZK分配信息决定 )
  • Worker: 位于Supervisor节点上, 而且可以有多个, 每个Worker可以接一个或多个任务(Task),根据自身的能力和业务复杂度处理{ Task: 包括 bolt(逻辑单元处理) 和spout(推送数据) }

数据结构

  • ZMQ(twitter早期产品) ZeroMQ 开源的消息传递框架,并不是一个MessageQueue
  • Storm使用Netty进行传输, Netty是基于NIO的网络框架,更加高效。(之所以Storm 0.9版本之后使用Netty,是因为ZMQ的license和Storm的license不兼容。)

流式处理

  • 流式处理(异步 与 同步) 客户端提交数据进行结算,并不会等待数据计算结果
  • 逐条处理 例:ETL(数据清洗)extracted transform load
  • 统计分析 例:计算PV、UV、访问热点 以及某些数据的聚合、加和、平均等 客户端提交数据之后,计算完成结果存储到Redis、HBase、MySQL或者其他MQ当中, 客户端并不关心最终结果是多少。

有向无环图(DAG,directed acyclic graph): 起始点一定是spout, 终点一定是 bolt, 拓扑有方向, 如下图

实时处理

  • 实时请求应答服务(同步) 客户端提交数据请求之后,立刻取得计算结果并返回给客户端
  • Drpc: distributed remote procedure call, 分布式远程过程/服务调用.
  • 实时请求处理 例:图片特征提取

三 性能对比

Storm 与MapReduce的关系

  • Storm:进程、线程常驻内存运行,数据不进入磁盘,数据通过网络传递。
  • MapReduce:为TB、PB级别数据设计的批处理计算框架。

Storm 与 Spark Streaming 的关系

  • Storm:纯流式处理 专门为流式处理设计 数据传输模式更为简单,很多地方也更为高效 并不是不能做批处理,它也可以来做微批处理,来提高吞吐
  • Spark Streaming:微批处理 将RDD做的很小来用小的批处理来接近流式处理 基于内存和DAG可以把处理任务做的很快

四 计算模型

1.Topology(译为拓扑结构) – DAG有向无环图的实现

  • 对于Storm实时计算逻辑的封装. 即,由一系列通过数据流相互关联的Spout、Bolt所组成的拓扑结构
  • 生命周期:此拓扑只要启动就会一直在集群中运行,直到手动将其kill,否则不会终止 (区别于MapReduce当中的Job,MR当中的Job在计算执行完成就会终止)

2.Tuple – 元组

  • Stream中最小数据组成单元

3.Stream – 数据流

  • 从Spout中源源不断传递数据给Bolt、以及上一个Bolt传递数据给下一个Bolt,所形成的这些数据通道即叫做Stream
  • Stream声明时需给其指定一个Id(默认为Default)
  • 实际开发场景中,多使用单一数据流,此时不需要单独指定StreamId

4.Spout – 数据源

  • 拓扑中数据流的源。一般会从指定外部的数据源读取元组(Tuple)发送到拓扑(Topology)中
  • 一个Spout可以发送多个数据流(Stream) 可先通过OutputFieldsDeclarer中的declare方法声明定义的不同数据流,发送数据时通过SpoutOutputCollector中的emit方法指定数据流Id(streamId)参数将数据发送出去
  • Spout中最核心的方法是nextTuple,该方法会被Storm线程不断调用、主动从数据源拉取数据,再通过emit方法将数据生成元组(Tuple)发送给之后的Bolt计算

5.Bolt – 数据流处理组件

  • 拓扑中数据处理均有Bolt完成。对于简单的任务或者数据流转换,单个Bolt可以简单实现;更加复杂场景往往需要多个Bolt分多个步骤完成
  • 一个Bolt可以发送多个数据流(Stream) 可先通过OutputFieldsDeclarer中的declare方法声明定义的不同数据流,发送数据时通过SpoutOutputCollector中的emit方法指定数据流Id(streamId)参数将数据发送出去
  • Bolt中最核心的方法是execute方法,该方法负责接收到一个元组(Tuple)数据、真正实现核心的业务逻辑

6.Stream Grouping – 数据流分组(即数据分发策略)

注意: 1,4,5,6 在Storm开发中经常用到


第二章 Storm编程案例

环境准备, 案例用到的jar在底部分享, 下载后在项目下创建一个lib目录, 然后右击bulild path全部即可

一 WordSum ( 数据累加 )

Spout

用于数据的推送 这里是将每个i 的值推送给 bolt 进行处理

package ah.szxy.storm.bolt;

import java.util.List;
import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

/**
 * 手动继承 BaseRichSpout, 实现它的未实现方法
 * @author chy
 */
public class WsSpout extends BaseRichSpout{

	private Map map;
	private TopologyContext context;
	private SpoutOutputCollector collector;
	int i=0;//nextTuple方法会被循环调用,因此i应该是成员变量
	
	/**
	 * 1.配置初始化spout	
	 * 将局部变量赋值给成员变量, 目的是提升局部变量的作用域
	 */
	@Override
	public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {
		this.map=map;
		this.context=context;
		this.collector=collector;
	}
	
	/**
	 * 2.采集并且向后推送数据
	 */
	@Override
	public void nextTuple() {
		
		/**
		 * 这里体现了面向接口的核心思想
		 * 如果声明直接使用Values, 接收数据的类型就会被限制死了
		 */
		List list = new Values(i++);
		this.collector.emit(list);
		System.err.println("num==========="+list);
		Utils.sleep(1000);//和线程休眠效果一样,storm包提供
	}

	/**
	 * 3.向接收的数据的逻辑处理单元声明发送数据的字段名称
	 */
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declare) {
		declare.declare(new Fields("num"));
	}

}

Bolt

用于对spout的数据进行逻辑处理 这里是对数据进行求和

package ah.szxy.storm.spout;

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
/**
 * 继承BaseRichBolt, 实现相关方法
 * @author chy
 *
 */
public class WsBolt extends BaseRichBolt{
	//成员变量
	private Map stormConf;
	private TopologyContext comtext;
	private OutputCollector collector;
	//求和
	int sum=0;
	
	/**
	 * 准备阶段(提供逻辑运算的环境)
	 * 将局部变量赋值给成员变量, 目的是提升局部变量的作用域
	 */
	@Override
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.stormConf=stormConf;
		this.comtext=context;
		this.collector=collector;
	}
	
	/**
	 * 获取数据 ( 有必要的话, 向后继续发送数据 )
	 */
	@Override
	public void execute(Tuple input) {
//		input.getInteger(0);	
		int num=input.getIntegerByField("num");//接收的是spout类中declareOutputFields方法声明的字段名称
		sum+=num;
		System.err.println("sum========================="+sum);
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		
	}

}

Test

构建拓扑结构模型 测试程序是否正常运行

package ah.szxy.storm.test;

import ah.szxy.storm.bolt.WsSpout;
import ah.szxy.storm.spout.WsBolt;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

public class TestWs {
	
	/**
	 * 建立拓扑结构, 加入集群中运行
	 * @param args 命令行参数
	 */
	public static void main(String[] args) {
		
		//构建storm拓扑结构( Topology: 拓扑结构)
		TopologyBuilder tb=new TopologyBuilder();
		tb.setSpout("wsspout", new WsSpout());
		tb.setBolt("wsbolt", new WsBolt()).shuffleGrouping("wsspout");
		
		//创建本地storm集群
		LocalCluster lc=new LocalCluster();
		
		//将任务布置到集群中运行
		lc.submitTopology("wordsum", new Config(), tb.createTopology());
	}
}

注意:

  • 由结果可以看出, 执行一次spout就会执行一次bolt操作
  • 而且他们顺序有时候会颠倒, 原因是他们执行的是异步nio(多线程并行,谁快谁先执行)操作而不是串行操作, 但是最后的结果不会受到影响

二 WordCount

Spout

需要注意的是这里采取了随机的方式推送数据 因此下面在结果打印时, 打印的数据可能相同

/**
 * spout数据推送
 * @author chy
 *
 */
public class WcSpout extends BaseRichSpout{
	
	private Map conf;
	private TopologyContext context;
	private SpoutOutputCollector collector;
	//定义需要被统计字符串数据
	String[] text= {
			"I am a walker",
			"I like play computer and comic",
			"I like study and sing",
			"My nickname is TimePause",
			"TimePause is not simple history"
	};
	//定义一个随机数变量r
	Random r=new Random();
	
	
	/**
	 * 初始化方法
	 */
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		this.conf=conf;
		this.context=context;
		this.collector=collector;
	}
	
	/**
	 * 采集并向后推送数据
	 */
	@Override
	public void nextTuple() {
		//从数组中随机取出一行,放到list集合中
		List line=new Values(text[r.nextInt(text.length)]); 
		//推送数据
		this.collector.emit(line);
		System.err.println("spout emit line========"+line);
		Utils.sleep(1000);
	}

	/**
	 * 向接收的数据的逻辑处理单元声明发送数据的字段名称
	 */
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {

		declarer.declare(new Fields("line"));
	}

}

Bolt

/**
 * 第一个Bolt---进行分词
 * @author chy
 *
 */
public class WcSplitBolt extends BaseRichBolt{
	Map stormConf;
	TopologyContext context;
	OutputCollector collector;
	
	/**
	 * 准备阶段(提供逻辑运算的环境)
	 * 将局部变量赋值给成员变量, 目的是提升局部变量的作用域
	 */
	@Override
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.stormConf=stormConf;
		this.context=context;
		this.collector=collector;
	}
	
	/**
	 * 获取tuple元祖中每一行数据并切割
	 * @param input
	 */
	@Override
	public void execute(Tuple input) {
		//input.getString(0);//通过偏移量获取
		String line=input.getStringByField("line");
		//切割
		String[] words = line.split(" ");
		
		for (String word : words) {
			List wordList=new Values(word);
			this.collector.emit(wordList);//发送数据
		}
	}
	
	/**
	 * 向接收的数据的逻辑处理单元声明发送数据的字段名称
	 */
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("wordList"));
	}

}
/**
 * 第二个Bolt---分词后的统计
 * @author chy
 *
 */
public class WcCountBolt extends BaseRichBolt{
	
	//用来存放,单词,以及单词出现的个数
	Map<String, Integer> map=new HashMap<String, Integer>();
	
	/**
	 * 准备阶段(提供逻辑运算的环境)
	 */
	@Override
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		
	}
	
	/**
	 * 获取tuple元祖中每一个单词, 并且按照单词统计出出现的次数
	 * @param input
	 */
	@Override
	public void execute(Tuple input) {
		String word=input.getStringByField("wordList");//到这里获取的方式时一个一个的获取
		
		//存放单词数量,之所以不设置为全局是因为每次key的值都不一样
		int count=1;
		if (map.containsKey(word)) {//如果出现,则count+1
			count=(int)map.get(word)+1;//map.get(key)获取map的值
		}
		map.put(word, count);
		
		System.err.println("WcCountBolt emit===key:"+word+"==count:"+count);
	}
	
	/**
	 * 向接收的数据的逻辑处理单元声明发送数据的字段名称
	 */
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
	
	}

}

Test

/**
 * 测试类
 * @author chy
 *
 */
public class TestWc {
	public static void main(String[] args) {
		//创建拓扑结构
		TopologyBuilder tb=new TopologyBuilder();
		tb.setSpout("WcSpout", new WcSpout());
		tb.setBolt("WcSplitBolt",new WcSplitBolt()).shuffleGrouping("WcSpout");
		//fieldsGrouping:根据单词属性名称进行分组
		tb.setBolt("WcCountBolt", new WcCountBolt(), 3).fieldsGrouping("WcSplitBolt", new Fields("wordList"));
		
		//创建本地集群
		LocalCluster lc=new LocalCluster();
		//发布任务到集群
		lc.submitTopology("WordCount", new Config(), tb.createTopology());
	}
}

结果展示

因为spout采取随机推送, 因此数据重复的可能性非常大


第三章 Storm Grouping

由上面两个案例的test方法中我们可以看到Storm Grouping的作用,下面我们来具体学习一下它吧~~~

1. Shuffle Grouping

随机分组,随机派发stream里面的tuple,保证每个bolt task接收到的tuple数目大致相同。 轮询,平均分配

2. Fields Grouping

按字段分组,比如,按"user-id"这个字段来分组,那么具有同样"user-id"的 tuple 会被分到相同的Bolt里的一个task, 而不同的"user-id"则可能会被分配到不同的task。

3. All Grouping

广播发送,对于每一个tuple,所有的bolts都会收到

4. Global Grouping

全局分组,把tuple分配给task id最低的task 。

5. None Grouping

不分组,这个分组的意思是说stream不关心到底怎样分组。目前这种分组和Shuffle grouping是一样的效果。 有一点不同的是storm会把使用none grouping的这个bolt放到这个bolt的订阅者同一个线程里面去执行(未来Storm如果可能的话会这样设计)。

6. Direct Grouping

指向型分组, 这是一种比较特别的分组方法,用这种分组意味着消息(tuple)的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为 Direct Stream 的消息流可以声明这种分组方法。而且这种消息tuple必须使用 emitDirect 方法来发射。消息处理者可以通过 TopologyContext 来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)

7. Local or shuffle grouping

本地或随机分组。如果目标bolt有一个或者多个task与源bolt的task在同一个工作进程中,tuple将会被随机发送给这些同进程中的tasks。否则,和普通的Shuffle Grouping行为一致

8. customGrouping

自定义,相当于mapreduce 自己去实现一个partition一样。


第四章 安装

伪分布式搭建

单一节点安装, 但是具备分布式所具备的所有组件

## 单机模式
## 上传解压,资料分享至末尾
$ tar xf apache-storm-0.10.0.tar.gz 
$ cd apache-storm-0.10.0

$ storm安装目录下创建log:  mkdir logs
$ ./bin/storm --help

下面分别启动ZooKeeper、Nimbus、UI、supervisor、logviewer
##错误信息放到标准输入中, 
$ ./bin/storm dev-zookeeper >> ./logs/zk.out 2>&1 &
$ ./bin/storm nimbus >> ./logs/nimbus.out 2>&1 &
$ ./bin/storm ui >> ./logs/ui.out 2>&1 &
$ ./bin/storm supervisor >> ./logs/supervisor.out 2>&1 &
$ ./bin/storm logviewer >> ./logs/logviewer.out 2>&1 &

# 需要等一会儿
$ jps
6966 Jps
6684 logviewer
6680 dev_zookeeper
6681 nimbus
6682 core
6683 supervisor

# 访问图形化界面( 图1 )
http://nodex:8080

# 提交任务到Storm集群当中运行:
## 首先将WrodCount程序打包成 WrodCount.jar 放到/root/chy/software ,需要阅读下方的注意事项
## 在Strom根目录下运如下命令 ./bin/storm jar jar全路径 主类/启动类的全路径( 图2 )
./bin/storm jar /root/chy/software/WrodCount.jar ah.szxy.storm.tesTestWc wc

注意: 在将项目打包放到伪分布式环境中时, 修改了主类如下的代码, 使其能够依靠集群环境下运行

//提交任务,输入了额外的参数,在集群环境下运行;否则在项目自身的环境下运行
		Config config = new Config();
		if (args.length>0) {
			try {
				StormSubmitter.submitTopology(args[0], config, tb.createTopology());
			} catch (Exception e) {
			}
			
		}else {
			//创建本地集群
			LocalCluster lc=new LocalCluster();
			//发布任务到集群
			lc.submitTopology("WordCount", config, tb.createTopology());
			
		}

图1

图2

完全分布式搭建

环境要求

java -version
JDK 1.6+

python -V (系统内置)
Python 2.6.6+

ZooKeeper3.4.5+
storm 0.9.4+

各节点分配情况

Nimbus

Supervisor

Zookeeper

node2

*

*

node3

*

*

node4

*

*

具体步骤

思路: 首先在node2配置storm, 配置完成后分发给node3,node4

node1作为nimbus,
# 1. 开始配置storm.yaml
$ vim conf/storm.yaml
--------------------------------------
storm.zookeeper.servers:
  - "node2"
  - "node3"
  - "node4"

# 任务的存储目录
storm.local.dir: "/tmp/storm"
# 声明主节点在哪里
nimbus.host: "node2"

# 指定从节点的槽位,一个从节点对应四个槽位,一个槽位对应一个worker,一个worker对应一个端口
supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703
-------------------------------


# 2.在storm目录中创建logs目录
$ mkdir logs

# 3. (分发)集群其他服务器node3,node4
## 启动ZooKeeper集群(node2,3,4)
zkServer.sh start

# 4. node1上启动Nimbus 
##  2>&1的意思就是将标准错误重定向到标准输出, & 为后台输出
$ ./bin/storm nimbus >> ./logs/nimbus.out 2>&1 &
$ tail -f logs/nimbus.log

$ ./bin/storm ui >> ./logs/ui.out 2>&1 &
$ tail -f logs/ui.log


# 5. 节点node2和node3启动supervisor,按照配置,每启动一个supervisor就有了4个slots
$ ./bin/storm supervisor >> ./logs/supervisor.out 2>&1 &
$ tail -f logs/supervisor.log (当然node1也可以启动supervisor)


# 6.访问图形化界面(图1),至此安装完成
http://node2:8080/



# 集群测试
## 上传jar任务到Storm集群当中运行(可以从Supervisor节点提交,但是会汇总到nimbus的/tmp/storm目录下, 图2,图3):
$ ./bin/storm jar /root/chy/software/WrodCount2.jar ah.szxy.storm.test.TestWc wc


## 观察关闭一个supervisor后,nimbus的重新调度
## 再次启动一个新的supervisor后,观察,并rebalance, 可以通过图形化页面来操作

注意: 在打包前, 修改了主类的相关代码 , 设置了相关的进程和线程数, 以及worker的数目

public class TestWc {
	/**
	 * 建立拓扑结构, 加入集群中运行
	 * @param args 命令行参数
	 */
	public static void main(String[] args) {
		//创建拓扑结构
		TopologyBuilder tb=new TopologyBuilder();
		tb.setSpout("WcSpout", new WcSpout(),2);
		tb.setBolt("WcSplitBolt",new WcSplitBolt(),4).shuffleGrouping("WcSpout");
		//fieldsGrouping:根据单词属性名称进行分组
		tb.setBolt("WcCountBolt", new WcCountBolt(),2).setNumTasks(4).fieldsGrouping("WcSplitBolt", new Fields("wordList"));
		
		//提交任务,输入了额外的参数,在集群环境下运行;否则在项目自身的环境下运行
		Config config = new Config();
		config.setNumWorkers(2);
		if (args.length>0) {
			try {
				StormSubmitter.submitTopology(args[0], config, tb.createTopology());
			} catch (Exception e) {
			}
			
		}else {
			//创建本地集群
			LocalCluster lc=new LocalCluster();
			//发布任务到集群
			lc.submitTopology("WordCount", config, tb.createTopology());
			
		}
		
	}
}

图1

图2

图3


第五章 深入理解Storm

一 Strom架构

Storm架构组件介绍

  • Nimbus 资源调度 任务分配 接收jar包
  • Supervisor 接收nimbus分配的任务 启停自己管理的worker进程(当前supervisor上worker数量由配置文件设定)
  • Worker 运行具体处理运算组件的进程(每个Worker对应执行一个Topology的子集) worker任务类型,即spout任务、bolt任务两种 启动executor(executor即worker, JVM进程中的一个java线程,一般默认每个executor负责执行一个task任务)
  • Zookeeper 健康检查( 心跳检测 ) 程序协调( 主备切换 )

Storm与Hadoop结构区别

Storm任务流程

Storm本地目录树

Storm Zookeeper目录树

二 Storm集群的并发机制

Worker – 进程

  • 一个Topology拓扑会包含一个或多个Worker(每个Worker进程只能从属于一个特定的Topology)
  • 这些Worker进程会并行跑在集群中不同的服务器上,即一个Topology拓扑其实是由并行运行在Storm集群中多台服务器上的进程所组成

Executor – 线程

  • Executor是由Worker进程中生成的一个线程
  • 每个Worker进程中会运行拓扑当中的一个或多个Executor线程
  • 一个Executor线程中可以执行一个或多个Task任务(默认每个Executor只执行一个Task任务),但是这些Task任务都是对应着同一个组件(Spout、Bolt)。

Task

  • 实际执行数据处理的最小单元
  • 每个task即为一个Spout或者一个Bolt
  • Task数量在整个Topology生命周期中保持不变,Executor数量可以变化或手动调整 (默认情况下,Task数量和Executor是相同的,即每个Executor线程中默认运行一个Task任务)

代码实现

设置Worker进程数

Config.setNumWorkers(int workers)

设置Executor线程数

TopologyBuilder.setSpout(String id, IRichSpout spout, Number parallelism_hint)
TopologyBuilder.setBolt(String id, IRichBolt bolt, Number parallelism_hint)

:其中, parallelism_hint即为executor线程数

设置Task数量

ComponentConfigurationDeclarer.setNumTasks(Number val)

例:

Config conf = new Config() ;
conf.setNumWorkers(2);

TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("spout", new MySpout(), 1);
topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
               .setNumTasks(4)
               .shuffleGrouping("blue-spout);

复杂情况下的配置图与代码截图

该图5进程6任务的原因是: 有一个进程分配了两个任务(GreenBolt)

配置图

代码截图

因为有两个worker, 因此进程数是原来的两倍, 可知原来进程为5个

动态调整Worker进程数量、以及Executor线程数量

Rebalance – 再平衡 即,动态调整Topology拓扑的Worker进程数量、以及Executor线程数量

支持两种调整方式: 1、通过Storm UI 2、通过Storm CLI

通过Storm CLI动态调整:

例:

storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
可以通过 help rebalance
将mytopology拓扑worker进程数量调整为5个
“ blue-spout ” 所使用的线程数量调整为3个
“ yellow-bolt ”所使用的线程数量调整为10个

三 Storm通信机制

Worker进程间的数据通信

  • ZMQ ZeroMQ 开源的消息传递框架,并不是一个MessageQueue
  • Netty Netty是基于NIO的网络框架,更加高效。(之所以Storm 0.9版本之后使用Netty,是因为ZMQ的license和Storm的license不兼容。)

Worker内部的数据通信

  • Disruptor 实现了“队列”的功能。 可以理解为一种事件监听或者消息处理机制,即在队列当中一边由生产者放入消息数据,另一边消费者并行取出消息数据处理。

四 Storm容错机制

1、集群节点宕机

  • Nimbus服务器 单点故障?重启(极小概率出现, 因为自身基于Netty和队列机制)
  • 非Nimbus服务器 故障时,该节点上所有Task任务都会超时,Nimbus会将这些Task任务重新分配到其他服务器上运行

2、进程挂掉

  • Worker 挂掉时,Supervisor会重新启动这个进程。如果启动过程中仍然一直失败,并且无法向Nimbus发送心跳,Nimbus会将该Worker重新分配到其他服务器上
  • Supervisor 无状态(所有的状态信息都存放在Zookeeper中来管理) 快速失败(每当遇到任何异常情况,都会自动毁灭)
  • Nimbus 无状态(所有的状态信息都存放在Zookeeper中来管理) 快速失败(每当遇到任何异常情况,都会自动毁灭)

3、消息的完整性

  • 从Spout中发出的Tuple,以及基于他所产生Tuple, 由这些消息就构成了一棵tuple树
  • 当这棵tuple树发送完成,并且树当中每一条消息都被正确处理,就表明spout发送消息被“完整处理”,即消息的完整性

Acker – 消息完整性的实现机制

  • Storm的拓扑当中特殊的一些任务
  • 负责跟踪每个Spout发出的Tuple的DAG(有向无环图)

五 Storm Drpc

  • DRPC (Distributed RPC) remote procedure call :分布式远程过程调用
  • DRPC 是通过一个 DRPC 服务端(DRPC server)来实现分布式 RPC 功能的。
  • DRPC Server 负责接收 RPC 请求,并将该请求发送到 Storm中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。 (其实,从客户端的角度来说,DPRC 与普通的 RPC 调用并没有什么区别。)

DRPC设计目的:

为了充分利用Storm的计算能力实现高密度的并行实时计算。 (Storm接收若干个数据流输入,数据在Topology当中运行完成,然后通过DRPC将结果进行输出。)

Drpc 流程介绍

  • 客户端通过向 DRPC 服务器发送待执行函数的名称以及该函数的参数来获取处理结果。 实现该函数的拓扑使用一个DRPCSpout 从 DRPC 服务器中接收一个函数调用流
  • DRPC 服务器会为每个函数调用都标记了一个唯一的 id。随后拓扑会执行函数来计算结果,并在拓扑的最后使用一个名为 ReturnResults 的 bolt 连接到 DRPC 服务器,根据函数调用的 id 来将函数调用的结果返回。

定义DRPC拓扑:

  • 方法1: 通过LinearDRPCTopologyBuilder (该方法也过期,不建议使用) 该方法会自动为我们设定Spout、将结果返回给DRPC Server等,我们只需要将Topology实现
  • 方法2: 直接通过普通的拓扑构造方法TopologyBuilder来创建DRPC拓扑 需要手动设定好开始的DRPCSpout以及结束的ReturnResults

运行模式

1、本地模式

2、远程模式(集群模式)

# 1. 修改配置文件conf/storm.yaml(指定为当前主节点nimbus即可)

----------将该更改分发到集群的其他节点-----------------
drpc.servers:
    - "node2“
----------------------------------------------------

# 2. 启动DRPC Server
bin/storm drpc &

# 3. 通过StormSubmitter.submitTopology提交拓扑

六 Strom 事务机制

事务性拓扑(Transactional Topologies):保证消息(tuple)被且仅被处理一次 官网介绍

Design 1

强顺序流(强有序)

  • 引入事务(transaction)的概念,每个transaction(即每个tuple)关联一个transaction id。
  • Transaction id从1开始,每个tuple会按照顺序+1。
  • 在处理tuple时,将处理成功的tuple结果以及transaction id同时写入数据库中进行存储。

两种情况: 1、当前transaction id与数据库中的transaction id不一致( 表示新的事务, 往里面存) 2、两个transaction id相同( 覆盖或者让新的变量指向原来的数据库)

缺点: 一次只能处理一个tuple,无法实现分布式计算

Design 2

强顺序的Batch流

  • 事务(transaction)以batch为单位,即把一批tuple称为一个batch,每次处理一个batch。
  • 每个batch(一批tuple)关联一个transaction id
  • 每个batch内部可以并行计算

Design 3 Storm’s design

一个关键的认识是,并非所有处理批处理元组的工作都需要有序地进行。例如,在计算全局计数时,计算分为两个部分:

  • 计算批次的部分计数
  • 使用部分计数更新数据库中的全局计数

#2的计算需要在批之间进行严格排序,但是没有理由您不应该通过为多个批并行计算#1 来流水线化批的计算。因此,当批次1正在更新数据库时,批次2至10可以计算其部分计数。

Storm通过将批处理的计算分为两个阶段来实现这一区别:

  • 处理阶段:这是可以并行完成批处理的阶段
  • 提交阶段:批处理的提交阶段是有序的。因此,直到成功完成批次1的提交后,批次2的提交才完成。

这两个阶段一起称为“交易”。在给定的时刻,许多批次可以处于处理阶段,但是只有一个批次可以处于提交阶段如果批处理或提交阶段发生任何故障,则将重播整个事务(两个阶段)。

Design details(设计细节)

  • Manages state - 状态管理 Storm通过Zookeeper存储所有transaction相关信息(包含了:当前transaction id 以及batch的元数据信息)
  • Coordinates the transactions - 协调事务 Storm会管理决定transaction应该处理什么阶段(processing、committing)
  • Fault detection - 故障检测 Storm内部通过Acker机制保障消息被正常处理(用户不需要手动去维护)
  • First class batch processing API Storm提供batch bolt接口

三种事务: 三种分区介绍

  • 普通事务
  • Partitioned Transaction - 分区事务
  • Opaque Transaction - 不透明分区事务

第六章 Flume-Kafka-Storm整合案例实现

前提: 安装了Flume,Kafka,以及Storm Flume介绍以及安装 Kafka介绍以及安装

一 架构设计

二 过程描述

该过程实现了数据的清洗

  • 我们通过客户端(flume的api程序RpcClientDemo )向flume写入数据
  • Flume通过启动脚本整合kafka将输入写入到topic 中, 名为testflume
  • Storm集群通过kafkaSpout 程序接收 testflume 的数据, 通过 FilterBolt过滤指定格式的数据,然后通过 kafkaBolt 输出到Kafka集群中的 LogError主题中输出
  • 我们可以通过kafka的消费者端来查看 LogError主题中输出的指定格式的数据

三 具体步骤

1.启动zk集群,kafka集群,flume

启动zk
zkServer.sh start

启动kafka
kafka-server-start.sh /opt/kafka/config/server.properties

启动flume( flume-kafka.conf为flume的启动脚本,见本人Kafka博文介绍第三章 )
flume-ng agent -n a1 -c conf -f /opt/flume/conf/flume-kafka.conf -Dflume.root.logger=DEBUG,console

2.启动kafka的消费者端进程

监听testflume 数据流转
kafka-console-consumer.sh --zookeeper node2:2181,node3:2181,node4:2181 --from-beginning --topic testflume

监听LogError数据流转
kafka-console-consumer.sh --zookeeper node2:2181,node3:2181,node4:2181 --from-beginning --topic LogError

3.运行代码测试 a.运行RpcClientDemo , 查看testflume监听的数据流转情况(图1)

/**
 * Flume官网案例
 * http://flume.apache.org/FlumeDeveloperGuide.html 
 * @author root
 */
public class RpcClientDemo {
	
	public static void main(String[] args) {
		MyRpcClientFacade client = new MyRpcClientFacade();
		// Initialize client with the remote Flume agent's host and port
		client.init("node2", 41414);

		// Send 10 events to the remote Flume agent. That agent should be
		// configured to listen with an AvroSource.
		for (int i = 100; i < 150; i++) {
			String sampleData = "Hello Flume!ERROR" + i;
			client.sendDataToFlume(sampleData);
			System.out.println("发送数据:" + sampleData);
		}

		client.cleanUp();
	}
}

class MyRpcClientFacade {
	private RpcClient client;
	private String hostname;
	private int port;

	public void init(String hostname, int port) {
		// Setup the RPC connection
		this.hostname = hostname;
		this.port = port;
		this.client = RpcClientFactory.getDefaultInstance(hostname, port);
		// Use the following method to create a thrift client (instead of the
		// above line):
		// this.client = RpcClientFactory.getThriftInstance(hostname, port);
	}

	public void sendDataToFlume(String data) {
		// Create a Flume Event object that encapsulates the sample data
		Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

		// Send the event
		try {
			client.append(event);
		} catch (EventDeliveryException e) {
			// clean up and recreate the client
			client.close();
			client = null;
			client = RpcClientFactory.getDefaultInstance(hostname, port);
			// Use the following method to create a thrift client (instead of
			// the above line):
			// this.client = RpcClientFactory.getThriftInstance(hostname, port);
		}
	}

	public void cleanUp() {
		// Close the RPC connection
		client.close();
	}
}

b.运行LogFilterTopology ,过滤数据,并将数据发送给kafka集群中的 LogError主题,效果如图2

/**
 * This topology demonstrates Storm's stream groupings and multilang
 * capabilities.
 */
public class LogFilterTopology {

	public static class FilterBolt extends BaseBasicBolt {
		@Override
		public void execute(Tuple tuple, BasicOutputCollector collector) {
			String line = tuple.getString(0);
			System.err.println("Accept:  " + line);
			// 包含ERROR的行留下
			if (line.contains("ERROR")) {
				System.err.println("Filter:  " + line);
				collector.emit(new Values(line));
			}
		}

		@Override
		public void declareOutputFields(OutputFieldsDeclarer declarer) {
			// 定义message提供给后面FieldNameBasedTupleToKafkaMapper使用
			declarer.declare(new Fields("message"));
		}
	}

	public static void main(String[] args) throws Exception {
		TopologyBuilder builder = new TopologyBuilder();

		// https://github.com/apache/storm/tree/master/external/storm-kafka
		// config kafka spout,话题
		String topic = "testflume";
		ZkHosts zkHosts = new ZkHosts("node2:2181,node3:2181,node4:2181");
		// /MyKafka,偏移量offset的根目录,记录队列取到了哪里
		SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic, "/MyKafka", "MyTrack");// 对应一个应用
		List<String> zkServers = new ArrayList<String>();
		System.out.println(zkHosts.brokerZkStr);
		for (String host : zkHosts.brokerZkStr.split(",")) {
			zkServers.add(host.split(":")[0]);
		}

		spoutConfig.zkServers = zkServers;
		spoutConfig.zkPort = 2181;
		// 是否从头开始消费
		spoutConfig.forceFromStart = true;
		spoutConfig.socketTimeoutMs = 60 * 1000;
		// StringScheme将字节流转解码成某种编码的字符串
		spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

		KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

		// set kafka spout
		builder.setSpout("kafka_spout", kafkaSpout, 3);

		// set bolt
		builder.setBolt("filter", new FilterBolt(), 8).shuffleGrouping("kafka_spout");

		// 数据写出
		// set kafka bolt
		// withTopicSelector使用缺省的选择器指定写入的topic: LogError
		// withTupleToKafkaMapper tuple==>kafka的key和message
		KafkaBolt kafka_bolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector("LogError"))
				.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());

		builder.setBolt("kafka_bolt", kafka_bolt, 2).shuffleGrouping("filter");

		Config conf = new Config();
		// set producer properties.
		Properties props = new Properties();
		props.put("metadata.broker.list", "node2:9092,node3:9092,node4:9092");
		/**
		 * Kafka生产者ACK机制 0 : 生产者不等待Kafka broker完成确认,继续发送下一条数据 1 :
		 * 生产者等待消息在leader接收成功确认之后,继续发送下一条数据 -1 :
		 * 生产者等待消息在follower副本接收到数据确认之后,继续发送下一条数据
		 */
		props.put("request.required.acks", "1");
		props.put("serializer.class", "kafka.serializer.StringEncoder");
		conf.put("kafka.broker.properties", props);

		conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(new String[] { "node2", "node3", "node4" }));

		// 本地方式运行
		LocalCluster localCluster = new LocalCluster();
		localCluster.submitTopology("mytopology", conf, builder.createTopology());

	}
}

c.修改 RpcClientDemo 中的循环语句,验证 FilterBolt是否起到了过滤的作用 查看testflume, 图3; 查看LogError, 图4 可以看到数据流转到了testflume主题, 而没有流转到LogError,由此可以看出 FilterBolt起到了过滤的作用

		// Send 10 events to the remote Flume agent. That agent should be
		// configured to listen with an AvroSource.
		for (int i = 200; i < 250; i++) {
			String sampleData = "Hello Flume!" + i;
			client.sendDataToFlume(sampleData);
			System.out.println("发送数据:" + sampleData);
		}

图1

图2

图3

四 项目应用架构

  • 采集层:实现日志收集,使用负载均衡策略
  • 消息队列:作用是解耦及不同速度系统缓冲
  • 实时处理单元:用Storm来进行数据处理,最终数据流入DB中
  • 展示单元:数据可视化,使用WEB框架展示
  • 美团Flume架构
  • Flume的负载均衡

如果自己想应聘大公司, 一定要去别人技术分享网站看一看,就像美团技术团队官网


链接:https://pan.baidu.com/s/1wu9qYQZPxkqOdiY5QGR2cg 点赞私聊获取资料~~~ 提取码:m8kh