windows 安装 storm 及 eclipse 调试 TopN 实例
一:安装JDK
配置Java环境变量 JAVA_HOME、Path、CLASSPATH三个值分别为(按照自己安装状况设置,此处供参考):
D:javajdk1.8
%JAVA_HOME%/bin;%JAVA_HOME%/jre/bin
.;%JAVA_HOME%/lib/dt.jar;%JAVA_HOME%/lib/tools.jar (要加.表示当前路径)
二:安装 Python
这是为了测试安装效果,我们将部署 storm-starter project案例中word coun程序,用的是python写的multi-lang bolt,使用python 2.7.11,安装路径在:
C:Python27
三:安装并运行ZooKeeper
Download Apache Zookeeper 3.4.8 ,解压配置:
> cd zookeeper-3.4.8 > copy confzoo_sample.cfg confzoo.cfg > .binzkServer.cmd
四:安装Storm
Storm的windows官方版还没有释放,here.下载,源码here下载。
注意1:
源码一定要用这个版本,否则启动会报各种错误,而这些错误和 jdk、python、zookeeper、eclipse 版本都无关。
http://dl.dropboxusercontent.com/s/iglqz73chkul1tu/storm-0.9.1-incubating-SNAPSHOT-12182013.zip
配置Storm环境变量
- Storm需要STORM_HOME和JAVA_HOME,比如STORM_HOME为:
C:storm-0.9.1-incubating-SNAPSHOT-12182013
- 在PATH中加入:
%STORM_HOME%bin;C:Python27Libsite-packages;C:Python27Scripts
此处与参考文章略有不同,下图是参考文章给出的配置
JAVA_HOME已经在安装JDK时手动配置了环境变量,而Python好像是默认自动就会配置好环境变量的,
我的Python目录下没有Scripts文件夹,暂时这样配置就可以了,不影响下面的使用。
五:启动Nimbus, Supervisor, and Storm UI Daemons
- Nimbus
注意2:
一定要在 STORM_HOME 目录下执行后续命令,否则会报错。
ERROR backtype.storm.event - Error when processing event java.lang.RuntimeException: java.io.InvalidClassException: clojure.lang.APersistentMap; local class incompatible: stream classdesc serialVersionUID = 8648225932767613808, local class serialVersionUID = 270281984708184947 at backtype.storm.utils.Utils.deserialize(Utils.java:86) ~[storm-core-0.9.1-incubating-SNAPSHOT-12182013.jar:na]
> cd %STORM_HOME%
> storm nimbus
- Supervisor
> cd %STORM_HOME%
> storm supervisor
- Storm UI # 可选,也可以用 storm list 查看所有 storm 任务
> cd %STORM_HOME%
> storm ui
浏览器打开http://localhost:8080/ 可看到Storm运行。
六:部署 Word count
部署这个jar在本地:
> storm jar storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar storm.starter.WordCountTopology WordCount -c nimbus.host=localhost
如果你刷新 Storm UI页面,会看到 “WordCount” topology显示列出,点按链接确认它处理数据。
七:eclipse 调试 TopN 实例
storm 求 csdn 密码库中密码出现的 topN,并直接在 eclipse 中调试运行:
package com.bj.test.top10;
/**
* @Author:tester
* @DateTime:2016年6月21日 下午7:58:45
* @Description: Spout作为数据源,它实现了IRichSpout接口,功能是读取一个文本文件并把它的每一行内容发送给bolt。
* @Version:1.0
*/
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class PasswdSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private FileReader fileReader;
private boolean completed = false;
public void ack(Object msgId) {
System.out.println("==============OK:" + msgId);
}
public void close() {
}
public void fail(Object msgId) {
System.out.println("++++++++++++++FAIL:" + msgId);
}
/**
* 这是Spout最主要的方法,在这里我们读取文本文件,并把它的每一行发射出去(给bolt)
* 这个方法会不断被调用,为了降低它对CPU的消耗,当任务完成时让它sleep一下
* **/
public void nextTuple() {
/**
* The nextuple it is called forever, so if we have been readed the file
* we will wait and then return
*/
if (completed) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Do nothing
}
return;
}
String line;
// Open the reader
BufferedReader reader = new BufferedReader(fileReader);
try {
// Read all lines
while ((line = reader.readLine()) != null) {
String[] words = line.split("#");
String passwd = words[1].trim();
// Emit the word
collector.emit(new Values(passwd));
/*for(String word : words){
word = word.trim();
if(!word.isEmpty()){
word = word.toLowerCase();
// Emit the word
collector.emit(new Values(word));
}
}*/
}
} catch (Exception e) {
throw new RuntimeException("Error reading tuple", e);
} finally {
completed = true;
}
}
/**
* 这是第一个方法,里面接收了三个参数,第一个是创建Topology时的配置,
* 第二个是所有的Topology数据,第三个是用来把Spout的数据发射给bolt
* **/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
try {
//获取创建Topology时指定的要读取的文件路径
this.fileReader = new FileReader(conf.get("wordsFile").toString());
} catch (FileNotFoundException e) {
throw new RuntimeException("Error reading file [" + conf.get("wordFile") + "]");
}
//初始化发射器
this.collector = collector;
}
/**
* Declare the output field "word"
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
/////////////////////////////////////////////////////////////////////////////////////////////
package com.bj.test.top10;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
import static com.bj.test.top10.SortMapByValue.*;
public class Top10Bolt extends BaseBasicBolt {
Integer id;
String name;
NavigableMap<String, Integer> counters;
/**
* Topology执行完毕的清理工作,比如关闭连接、释放资源等操作都会写在这里
* 因为这只是个Demo,我们用它来打印我们的计数器
* */
@Override
public void cleanup() {
System.out.println(">>>>>>>>>>>> Word Counter ["+name+"-"+id+"] <<<<<<<<<<<");
/*for(Map.Entry<String, Integer> entry : counters.entrySet()){
System.out.println(entry.getKey()+": "+entry.getValue());
}*/
printMap(list2Map(sortMapByValuesTopN(counters, 10)));
}
/**
* On create
*/
@Override
public void prepare(Map stormConf, TopologyContext context) {
this.counters = new TreeMap<String, Integer>().descendingMap();
this.name = context.getThisComponentId();
this.id = context.getThisTaskId();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {}
// Bolt中最重要的是execute方法,每当一个tuple传过来时它便会被调用
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String word = input.getString(0);
/**
* If the word dosn't exist in the map we will create
* this, if not We will add 1
*/
if(!counters.containsKey(word)){
counters.put(word, 1);
}else{
Integer count = counters.get(word) + 1;
counters.put(word, count);
}
}
}
/////////////////////////////////////////////////////////////////////////////////////////////
package com.bj.test.top10;
/**
* @Author:tester
* @DateTime:2016年6月21日 下午7:52:32
* @Description:
* @Version:1.0
*/
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class TopologyMain {
public static void main(String[] args) throws InterruptedException {
// 定义一个Topology
TopologyBuilder builder = new TopologyBuilder();
// executor的数目, set parallelism hint to 4
builder.setSpout("PasswdSpout", new PasswdSpout(), 1);
// set tasks number to 4
builder.setBolt("Top10Bolt", new Top10Bolt(), 1).setNumTasks(1).fieldsGrouping("PasswdSpout",
new Fields("word"));
// 配置
Config conf = new Config();
conf.put("wordsFile", "H:\mysql\csdn_database\www.csdn.net.100.sql");
// conf.put("wordsFile", "H:\mysql\csdn_database\www.csdn.net.sql");
conf.setDebug(false);
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
// use two worker processes
// conf.setNumWorkers(4);
// 创建一个本地模式cluster
LocalCluster cluster = new LocalCluster();
// 提交Topology
cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
Thread.sleep(1000);
cluster.shutdown();
}
}
Refer:
[1] windows安装storm
http://blog.csdn.net/jiutianhe/article/details/41211403
[2] storm异常集锦
http://ganliang13.iteye.com/blog/2117722
http://bimoziyan.iteye.com/blog/1981116
[2] storm教程二、安装部署
http://www.cnblogs.com/jinhong-lu/p/4634912.html
[3] Storm实战之WordCount
http://m635674608.iteye.com/blog/2221179
[4] Storm的并行度、Grouping策略以及消息可靠处理机制简介
http://m635674608.iteye.com/blog/2232221
[5] Storm的滑动窗口
http://zqhxuyuan.github.io/2015/09/10/2015-09-10-Storm-Window/
[6] [Storm中文文档]Trident教程
http://blog.csdn.net/lujinhong2/article/details/47132313
[7] Storm Trident API 实践
http://blog.csdn.net/suifeng3051/article/details/41118721
[8] flume+kafka+storm运行实例
http://my.oschina.net/u/2000675/blog/613747
[9] Kafka+Storm+HDFS整合实践
http://shiyanjun.cn/archives/934.html
- SDN开发笔记(七):L2switch源码分析(上)
- spark使用zipWithIndex和zipWithUniqueId为rdd中每条数据添加索引数据
- Spring Boot Server容器配置
- Spring Boot读取配置的几种方式
- 如何用TensorFlow构建RNN?这里有一份极简的教程
- (1024程序员节快乐)阿里祭出大器,Java代码检查插件
- Java多线程神器:join使用及原理
- SpringCloud配置中心内容加密
- Spring Boot日志集成
- MongoDB系列7:MongoDB存储引擎
- MongoDB系列6:MongoDB索引的介绍
- 文本数据处理的终极指南-[NLP入门]
- 神经网络思想建立LR模型(DL公开课第二周答案)
- 如何用卷积神经网络从歌曲中提取纯人声?这里有教程+代码
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- Kafka消费过程关键源码解析
- leetcode链表之两个链表的第一个公共节点
- 测试开发基础 mvn test | 利用 Maven Surefire Plugin 做测试用例基础执行管理
- 腾讯云Elasticsearch集群规划及性能优化实践
- 【赵渝强老师】在MongoDB中使用MapReduce方式计算聚合
- 2020-09-13:判断一个正整数是a的b次方,a和b是整数,并且大于等于2,如何求解?
- ASP.NET Core 性能优化最佳实践
- 如何在Vue中使用云开发的云函数,实现邮件发送
- 乐观锁与悲观锁
- 为什么配置文件加密了数据库配置信息,Spring Boot仍能成功连接数据库
- SpringBoot开发微信公众号
- 猿实战10——动态表单之实现类目属性绑定
- 猿实战11——类目属性绑定之el-tree的使用
- 猿实战12——类目属性之动态绑定
- 一个maskrcnn的目标检测和实例分割的小例子