spark Map,Filter,FlatMap
时间:2021-07-16
本文章向大家介绍spark Map,Filter,FlatMap,主要包括spark Map,Filter,FlatMap使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
map
package com.shujia.spark.core import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Demo2Map { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() .setAppName("map") .setMaster("local") //spark 上下文对象 val sc = new SparkContext(conf) /** * map : 一行一行处理rdd中的数据 */ /** * 构建rdd的方法 * 1、读取文件 * 2、基于scala集合构建rdd * */ //基于scala集合构建rdd val listRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 4) println("listRDD分区数据:" + listRDD.getNumPartitions) val mapRDD: RDD[Int] = listRDD.map(i => { i * 2 }) //打印rdd中的数据 mapRDD.foreach(println) /** * mapValues ; 处理kv格式rdd的value */ //转换成kv格式 val kvRDD: RDD[(Int, Int)] = listRDD.map(i => (i, i)) val mapValuesRDD: RDD[(Int, Int)] = kvRDD.mapValues(i => i * 2) mapValuesRDD.foreach(println) /** * mapPartitions: 一次处理一个分区的数据,返回值需要是一个迭代器 * mapPartitionsWithIndex: 多了一个下标 */ val mapPartitionsRDD: RDD[Int] = listRDD.mapPartitions((iter: Iterator[Int]) => { val list: List[Int] = iter.toList list.map(i => i * 2).toIterator }) mapPartitionsRDD.foreach(println) val mapPartitionsWithIndexRDD: RDD[Int] = listRDD.mapPartitionsWithIndex { case (index: Int, iter: Iterator[Int]) => println("当前分区编号:" + index) iter } mapPartitionsWithIndexRDD.foreach(println) } }
filter
package com.shujia.spark.core import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Demo3Filter { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() .setAppName("map") .setMaster("local") //spark 上下文对象 val sc = new SparkContext(conf) val listRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8)) /** * filter算子,函数返回true保留数据,函数返回false过滤数据 * */ /** * 转换算子:懒执行,需要action算子触发执行 * 操作算子: 触发任务执行,每一个action算子都会触发一个任务 * */ println("filter之前") val filterRDD: RDD[Int] = listRDD.filter(i => { println("filter:" + i) i % 2 == 1 }) println("filter之后") //每一个action算子触发执行的时候都会将前面的代码执行一遍 filterRDD.foreach(println) filterRDD.foreach(println) } }
flatmap
package com.shujia.spark.core import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Demo4FlatMap { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() .setAppName("map") .setMaster("local") //spark 上下文对象 val sc = new SparkContext(conf) val listADD: RDD[String] = sc.parallelize(List("java,spark,hadoop", "spark,java,hadoop")) /** * flatMap:将一行转换成多行 * */ val wordADD: RDD[String] =listADD.flatMap(line=>{ println("flatMap:" + line) line.split(",") }) wordADD.foreach(println) } }
原文地址:https://www.cnblogs.com/lipinbigdata/p/15022018.html
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- python实现在线翻译
- Python字符串格式化常用手段及注意事项
- PHP count_chars()函数讲解
- 浅谈keras使用中val_acc和acc值不同步的思考
- PHP安装BCMath扩展的方法
- keras实现多GPU或指定GPU的使用介绍
- 浅谈keras中的keras.utils.to_categorical用法
- python 使用多线程创建一个Buffer缓存器的实现思路
- keras.utils.to_categorical和one hot格式解析
- Python OpenCV读取中文路径图像的方法
- Java如何基于wsimport调用wcf接口
- Python装饰器结合递归原理解析
- Windows上php5.6操作mongodb数据库示例【配置、连接、获取实例】
- 浅谈keras通过model.fit_generator训练模型(节省内存)
- PHP实现通过文本文件统计页面访问量功能示例