大数据算法设计模式(1) - topN spark实现
时间:2022-05-02
本文章向大家介绍大数据算法设计模式(1) - topN spark实现,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
topN算法,spark实现
package com.kangaroo.studio.algorithms.topn;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import scala.Tuple2;
import java.io.Serializable;
import java.util.*;
public class TopNSpark implements Serializable {
private JavaSparkContext jsc;
Broadcast<Integer> topNum;
private String inputPath;
/*
* 构造函数
* 1. 初始化JavaSparkContext
* 2. 初始化广播变量topN个数, 可以被所有partition共享
* 3. 初始化输入路径
* */
public TopNSpark(Integer Num, String path) {
jsc = new JavaSparkContext();
topNum = jsc.broadcast(Num);
inputPath = path;
}
/*
* 程序入口函数
* */
public void run() {
/*
* 读入inputPath中的数据
* */
JavaRDD<String> lines = jsc.textFile(inputPath, 1);
/*
* 将rdd规约到9个分区
* */
JavaRDD<String> rdd = lines.coalesce(9);
/*
* 将输入转化为kv格式
* key是规约的主键, value是排序参考的个数
* 注: 这里的key并不唯一, 即相同的key可能有多条记录, 所以下面我们规约key成唯一键
* 输入:line, 输出:kv
* */
JavaPairRDD<String, Integer> kv = rdd.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
String[] tokens = s.split(",");
return new Tuple2<String, Integer>(tokens[0], Integer.parseInt(tokens[1]));
}
});
/*
* 规约主键成为唯一键
* 输入:kv, 输出:kv
* */
JavaPairRDD<String, Integer> uniqueKeys = kv.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
});
/*
* 计算各个分区的topN
* 这里通过广播变量拿到了topN具体个数, 每个分区都保留topN, 所有分区总个数: partitionNum * topN
* 输入:kv, 输出:SortMap, 长度topN
* */
JavaRDD<SortedMap<Integer, String>> partitions = uniqueKeys.mapPartitions(new FlatMapFunction<Iterator<Tuple2<String,Integer>>, SortedMap<Integer, String>>() {
public Iterable<SortedMap<Integer, String>> call(Iterator<Tuple2<String, Integer>> iter) throws Exception {
final int N = topNum.getValue();
SortedMap<Integer, String> topN = new TreeMap<Integer, String>();
while (iter.hasNext()) {
Tuple2<String, Integer> tuple = iter.next();
topN.put(tuple._2, tuple._1);
if (topN.size() > N) {
topN.remove(topN.firstKey());
}
}
return Collections.singletonList(topN);
}
});
/*
* 规约所有分区的topN SortMap, 得到最终的SortMap, 长度topN
* reduce过后, 数据已经到了本地缓存, 这是最后结果
* 输入: SortMap, 长度topN, 当然有partitionNum个, 输出:SortMap, 长度topN
* */
SortedMap<Integer, String> finalTopN = partitions.reduce(new Function2<SortedMap<Integer, String>, SortedMap<Integer, String>, SortedMap<Integer, String>>() {
public SortedMap<Integer, String> call(SortedMap<Integer, String> m1, SortedMap<Integer, String> m2) throws Exception {
final int N = topNum.getValue();
SortedMap<Integer, String> topN = new TreeMap<Integer, String>();
for (Map.Entry<Integer, String> entry : m1.entrySet()) {
topN.put(entry.getKey(), entry.getValue());
if (topN.size() > N) {
topN.remove(topN.firstKey());
}
}
for (Map.Entry<Integer, String> entry : m2.entrySet()) {
topN.put(entry.getKey(), entry.getValue());
if (topN.size() > N) {
topN.remove(topN.firstKey());
}
}
return topN;
}
});
/*
* 将本地缓存的最终结果打印出来
* */
for (Map.Entry<Integer, String> entry : finalTopN.entrySet()) {
System.out.println(entry.getKey() + " -- " + entry.getValue());
}
}
public static void main(String[] args) {
/*
* topN个数:topN
* 输入数据路径:inputPath
* */
Integer topN = Integer.parseInt(args[0]);
String inputPath = args[1];
TopNSpark topNSpark = new TopNSpark(topN, inputPath);
topNSpark.run();
}
}
- Angular企业级开发(7)-MVC之控制器
- Angular企业级开发(8)-控制器的作用域
- 使用jQuery Draggable和Droppable实现拖拽功能
- CSS魔法堂:重拾Border之——图片作边框
- Mobile Web中URL设计问题
- 使用root用户连接Ubuntu16.04时,提示SSH连接被拒绝
- CSS魔法堂:Box-Shadow没那么简单啦:)
- java操作redis: 将string、list、map、自定义的对象保存到redis中
- 运行第一个Docker容器-Docker for Web Developers(1)
- 手动实现jQuery Tools里面tab功能
- Angular企业级开发(9)-前后端分离之后添加验证码
- 基于thrift的微服务框架
- Sublime Text 快速格式化
- HTML中拖放介绍
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 最通俗易懂的一篇文章了解JVM、JRE、JDK的关系是什么?
- 六. CSS 样式补充之 font & background
- 七 .Html的表格
- rollup + typescript 构建 ts 包
- node 写爬虫,原来这么简单
- 计时器 hook
- 自定义eslint 配置包
- 【61期】MySQL行锁和表锁的含义及区别(MySQL面试第四弹)
- 关于死锁你了解多少,通过“让APP随手机壳改变颜色,程序员和产品经理大家”这一事,了解下死锁可好?
- 三阴性乳腺癌表达矩阵探索笔记之GSEA
- 关于Python异常处理,你需要了解的知识点
- 三阴性乳腺癌表达数据探索笔记之GSVA分析
- 无敌解决GitHub无法ping通也无法登录的问题无敌解决idea连接GitHub提示Invalid authentication data. Connection reset
- 文献笔记七十一:REDO根据vcf文件检测植物细胞器基因组RNA编辑位点
- 如如何基于Docker快速搭建Elasticsearch集群?