Spark你一定学得会(一)No.7
我是小蕉。
上一篇大家说没有干货,妈蛋回南天哪来的干货你告诉我!!!还好这几天天气还不错,干货来了。
首先祭上今天关键代码,要做的事情就是从Hive表中取得年龄数据,然后去重,统计每个年龄的人数。如果你能看到这里,我当你知道RDD,HDFS,还有scala是什么东东,不知道的看我上一篇或者上某搜索引擎去,我不管。
case class PERSON(
val name:String,
val age:String
);
object Some{
def main(args: Array[String]): Unit = {
val conf:SparkConf = new SparkConf().setAppName("HelloWorld")
val sc:SparkContext = new SparkContext(conf);
val hc:HiveContext = new HiveContext(sc);
val datas:DataFrame = hc.sql("SELECT NAME,AGE FROM PERSONS");
//记录可能重复,去个重先
val dataDistincted = datas.distinct();
//将行记录转换为对象方便操作
val persons:RDD[PERSON] = dataDistincted.map{case Row(name:String,age:String) => PERSON(name,age)};
//过滤年龄小于10的用户
val filtered10Person = persons.filter(x => x.age.toInt >= 10);
//根据年龄对用户进行分组
val groupedByEdge = filtered10Person.groupBy(p => p.age)
//打印出最终结果
groupedByEdge.collect().foreach(println);
groupedByEdge.saveAsHadoopFile("/myHadoopPath")
} }
大家跟我一起来,关于Spark集群的安装我就不介绍了大家自己上某搜索引擎去搜跟着做就可以了,今天主要介绍如何开始玩Spark。
一般我们的Spark程序会配合ozzie等定时调度工具来进行调度,从Hive库中读取数据然后通过数据处理来达到离线计算的功能。咱一行一行来。
case class PERSON( val name:String, val age:String );
这个没什么特殊的,case class就是定义了一个序列化的POJO类。
val conf:SparkConf = new SparkConf().setAppName("HelloWorld")
这个是Spark的一个配置类,用于配置所有Spark相关的初始化配置项。至于详细的大家上官网去看吧,配置蛮多的,都可以在这里配。上面是指定应用名为HelloWord。
val sc:SparkContext = new SparkContext(conf);
实例化一个SparkContext,这个是Spark的上下文,所有跟Spark交互的玩意都要跟它交互,其他什么其他的Context都是基于这个来进行的,而且一个应用里边只能有一个上下文,多了会报错,不信你试试。
val hc:HiveContext = new HiveContext(sc);
Spark实现了访问Hive库的API,这个是封装了大部分操作的Context,其实最有用也就一个,下面会说到,关于Hive大家别问我啊,就是基于HDFS的关系型数据库,关注的盆友要是多的话我后面专门开一次讲一讲这个东东。
val datas:DataFrame = hc.sql("SELECT NAME,AGE FROM PERSONS");
关键代码来了,敲黑板,这个是从Hive库中进行操作HQL并且把它们当成DataFrame来用,你问我DataFrame是什么,我来告诉李,就是自带Schema,能做各种类数据库操作的RDD,其他的跟RDD没什么区别。这里我们从PERSONS表中取得NAME,AGE两个字段。
val dataDistincted = datas.distinct();
好了,介绍今天第一个action算子,distinct,这个算子会比较整个数据集,然后进行去重,去重的方式就是看所有的字段是不是都一样一样的。
val persons:RDD[PERSON] = dataDistincted.map{case Row(name:String,age:String) => PERSON(name,age)};
这里是通过RDD的map转换操作,这个会并行便利RDD中每一个记录,然后转换成我们想要的类型,这里是将DataFrame中的Row数据,转换成我们定义的POJO以方面后面操作。不特殊,跟java里边的for遍历差不多,但是这个是并行的。
val filtered10Person = persons.filter(x => x.age.toInt >= 10);
好了,第二个转换操作filter,顾名思义就是过滤嘛,但是这个跟其他的过滤比较特殊,这个过滤是将filter里边的函数,条件为true的留下来,false的剔除。所以上边的操作就是将十岁及十岁以上的人留下来。
val groupedByEdge = filtered10Person.groupBy(p => p.age)
这个就比较特殊了,大家应该没见过,这个groupBy操作,也就是将整个数据集按照某种值进行分组。例子中按每个PERSON的age值进行分组,那么结果我们将会得到根据年龄分组的数据,也就是我们想要的分组功能了。至于说为什么不能分段统计,当然可以了,这个留给你们自己玩,你先做个转换呗。
groupedByEdge.collect().foreach(println);
打印出来,完事。啊哈?为什么要collect,因为RDD分布在集群中,而日志只能出现在Driver,你不collect没法打印啊。
groupedByEdge.saveAsHadoopFile("/myHadoopPath")
存到HDFS上,完事。
然后我不管你用什么方式打包一个名字叫bigjiao.jar的包出来,不懂得上某搜索引擎去。
在spark集群上提交命令:
spark-submit --master local[*] --class Some bigjiao.jar
- [WCF-Discovery] 实例演示:如何利用服务发现机制实现服务的“动态”调用?
- [WCF-Discovery]服务如何能被”发现”
- 我的数据访问函数库的源代码(一)—— 共用部分
- 《WCF服务编程》关于“队列服务”一个值得商榷的地方
- 我的数据访问函数库的源代码(二)—— SQL语句部分
- 来源于WCF的设计模式:可扩展对象模式[上篇]
- 我的数据访问函数库的源代码(三)——返回结构数组
- 我的数据访问函数库的源代码(四)—— 存储过程部分,包括存储过程的参数的封装
- [WCF 4.0新特性] 路由服务[实例篇]
- [WCF 4.0新特性] 默认终结点
- 三层架构之我见 —— 不同于您见过的三层架构。
- 来源于WCF的设计模式:可扩展对象模式[下篇]
- [WCF 4.0新特性] 标准终结点与无(.SVC)文件服务激活
- 我的数据访问类(第二版)—— for .net2.0 (二)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- [蓝桥杯][2013年第四届真题]幸运数
- 04 . Filebeat简介原理及配置文件和一些案例
- 05 . ELK Stack+Redis日志收集平台
- python开发【第一篇】
- 内置函数--bin() oct() int() hex()
- 08 . Prometheus+Grafana监控haproxy+rabbitmq
- 内置函数值 -- chr() ord() -- 字符和ascii的转换
- python内置函数-compile()
- 02 . Shell变量和逻辑判断及循环使用
- Python内置函数(21)——filter
- 内置函数 -- filter 和 map
- 内置函数--global() 和 local()
- python file文件操作--内置对象open
- 07 . Prometheus监控Memcached并配置Grafana
- 内置函数 -- bytes -- 字节码与字符串相互转换