kafka-jstorm代码
package com.doctor.kafkajstrom; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.RandomStringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import com.doctor.kafkajstrom.log.manager.LogManager; import com.doctor.kafkajstrom.util.SpringUtil; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class LocalJstormMain { public static void main(String[] args) { // Topology definition TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word-reader", new WordReaderSpoutCh03()); builder.setSpout("signals-spout", new SignalsSpoutCh03(),6); builder.setBolt("word-normalizer", new WordTransformBoltCh03(),6) .shuffleGrouping("word-reader"); builder.setBolt("word-counter", new WordCounterBoltCh03(), 2) .fieldsGrouping("word-normalizer", new Fields("word")) .allGrouping("signals-spout", "signals"); // Configuration Config conf = new Config(); conf.setNumWorkers(3); conf.setDebug(true); // Topology run conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Count-Word-Toplogy-With-Refresh-Cache", conf, builder.createTopology()); // try { // TimeUnit.MINUTES.sleep(2); // } catch (InterruptedException e) { // e.printStackTrace(); // } // cluster.killTopology("Count-Word-Toplogy-With-Refresh-Cache"); // cluster.shutdown(); } public static class WordReaderSpoutCh03 extends BaseRichSpout { private static final Logger log = LoggerFactory.getLogger(WordReaderSpoutCh03.class); private static final long serialVersionUID = 1L; private TopologyContext context; private SpoutOutputCollector collector; private static final String WORDS = "sjdkfjksdjfdkjaaa中触及看对方的罚款多级"; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.context = context; this.collector = collector; } @Override public void ack(Object msgId) { log.info("{Ok:{}}", msgId); } @Override public void fail(Object msgId) { log.info("{Ok:{}}", msgId); } @Override public void nextTuple() { this.collector.emit(new Values(RandomStringUtils.random(6, WORDS))); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("line")); } } public static class SignalsSpoutCh03 extends BaseRichSpout { private static final long serialVersionUID = 1L; private SpoutOutputCollector spoutOutputCollector; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.spoutOutputCollector = collector; } @Override public void nextTuple() { this.spoutOutputCollector.emit("signals", new Values("refreshCache")); try { TimeUnit.MILLISECONDS.sleep(2); } catch (Throwable e) { } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream("signals", new Fields("action")); } } public static class WordTransformBoltCh03 extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; private int numCounterTasks = 0; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.numCounterTasks = context.getComponentTasks("word-counter").size(); } @Override public void execute(Tuple input) { String[] lines = input.getString(0).split(" "); for (String line : lines) { String lowerCase = line.trim().toLowerCase(); if (!lowerCase.isEmpty()) { this.collector.emit(new Values(line)); } } this.collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } public static class WordCounterBoltCh03 extends BaseRichBolt { private static final Logger log = LoggerFactory.getLogger(WordCounterBoltCh03.class); private static final long serialVersionUID = 1L; private Map<String, Integer> countMap; private OutputCollector collector; private String name; private int id; private static final ApplicationContext applicationContext; private static final LogManager logManager; static{ applicationContext = SpringUtil.of("learningJstormConfig/spring-kafkabolt-context.xml"); logManager = applicationContext.getBean(LogManager.class); log.info("--------------ApplicationContext initialized from learningJstormConfig/spring-kafkabolt-context.xml "); } @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.countMap = new HashMap<>(); this.collector = collector; this.name = context.getThisComponentId(); this.id = context.getThisTaskId(); log.info("-----------------WordCounterBoltCh03 prepare"); } @Override public void execute(Tuple input) { String word = null; try { word = input.getStringByField("word"); } catch (Throwable e) { } if (null != word) { if (!countMap.containsKey(word)) { countMap.put(word, 1); } else { Integer count = countMap.get(word); count++; countMap.put(word, count); logManager.write(word + ":" + countMap.get(word)); } } else { if ("signals.".equals(input.getSourceStreamId()) && "refreshCache".equals(input.getStringByField("action"))) { cleanup(); countMap.clear(); } } this.collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } @Override public void cleanup() { log.info("{cleanup................}"); countMap.forEach((k, v) -> { log.info("{clean up.................}"); log.info("k : {} , v : {}", k, v); }); } } }
原文地址:https://www.cnblogs.com/ymcs/p/15138605.html
- 写入Ring Buffer
- Enterprise Library 4 缓存快速入门
- Enterprise Library 4 缓存应用程序块的设计
- 让WordPress 在RSS 中Feed 输出支持“More”标签
- WordPress文章版权保护:复制文字自动添加版权信息
- 替换WordPress 自带默认的 jQuery库, jQuery库页脚加载
- Enterprise Library 4 数据访问应用程序块
- 替换EnterPrise Library 4.0 缓存应用程序块的CacheManager
- Enterprise Library 4.0缓存应用程序块
- 通过.htaccess 让WordPress 的上传文件夹更安全
- asp.net 性能调较
- 零基础学习大数据,搭建Hadoop处理环境
- 为你的WordPress 博客开启两步验证功能(技术支持:谷歌)
- 为你的WordPress 博客开启两步验证功能(技术支持:谷歌)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 如何将自定义XML视图注入SAP Fiori Elements应用
- SAP UI5控件ID的生成逻辑原理解析
- 如何自定义SAP Spartacus店铺的界面颜色风格
- 如何自定义SAP Spartacus店铺的购物车图表css风格
- 数据库PostrageSQL-从源代码安装
- Django的中间件
- WPF 使用 Skia 绘制 WriteableBitmap 图片
- dotnet 在 UOS 国产系统上使用 MonoDevelop 创建 GTK 全平台带界面应用
- dotnet 在 UOS 国产系统上安装 MonoDevelop 开发工具
- 使用SAP Spartacus快速创建一个电商店铺网站
- 使用StackBlitz和SAP Spartacus快速创建电商店铺页面
- SAP CRM Interactive Report(交互式报表)里和服务订单相关的一些字段
- SAP S/4HANA Customer Management(CRM)模块的扩展性设计
- SAP S/4HANA Customer Management(CRM)模块的Partner模型设计
- 使用soapUI消费SAP Cloud for Customer的web service