spark Map,Filter,FlatMap

时间:2021-07-16
本文章向大家介绍spark Map,Filter,FlatMap,主要包括spark Map,Filter,FlatMap使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

map

package com.shujia.spark.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo2Map {
  def main(args: Array[String]): Unit = {
    
    val conf: SparkConf = new SparkConf()
      .setAppName("map")
      .setMaster("local")

    //spark  上下文对象
    val sc = new SparkContext(conf)


    /**
      * map : 一行一行处理rdd中的数据
      */

    /**
      * 构建rdd的方法
      * 1、读取文件
      * 2、基于scala集合构建rdd
      *
      */

    //基于scala集合构建rdd
    val listRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 4)

    println("listRDD分区数据:" + listRDD.getNumPartitions)

    val mapRDD: RDD[Int] = listRDD.map(i => {
      i * 2
    })

    //打印rdd中的数据
    mapRDD.foreach(println)

    /**
      * mapValues ; 处理kv格式rdd的value
      */

    //转换成kv格式
    val kvRDD: RDD[(Int, Int)] = listRDD.map(i => (i, i))

    val mapValuesRDD: RDD[(Int, Int)] = kvRDD.mapValues(i => i * 2)

    mapValuesRDD.foreach(println)


    /**
      * mapPartitions: 一次处理一个分区的数据,返回值需要是一个迭代器
      * mapPartitionsWithIndex: 多了一个下标
      */
    val mapPartitionsRDD: RDD[Int] = listRDD.mapPartitions((iter: Iterator[Int]) => {
      val list: List[Int] = iter.toList
      list.map(i => i * 2).toIterator
    })

    mapPartitionsRDD.foreach(println)


    val mapPartitionsWithIndexRDD: RDD[Int] = listRDD.mapPartitionsWithIndex {
      case (index: Int, iter: Iterator[Int]) =>
        println("当前分区编号:" + index)
        iter
    }

    mapPartitionsWithIndexRDD.foreach(println)

  }

}

filter

package com.shujia.spark.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo3Filter {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setAppName("map")
      .setMaster("local")

    //spark  上下文对象
    val sc = new SparkContext(conf)

    val listRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8))

    /**
      * filter算子,函数返回true保留数据,函数返回false过滤数据
      *
      */


    /**
      * 转换算子:懒执行,需要action算子触发执行
      * 操作算子: 触发任务执行,每一个action算子都会触发一个任务
      *
      */

    println("filter之前")
    val filterRDD: RDD[Int] = listRDD.filter(i => {
      println("filter:" + i)
      i % 2 == 1
    })
    println("filter之后")

    //每一个action算子触发执行的时候都会将前面的代码执行一遍
    filterRDD.foreach(println)
    filterRDD.foreach(println)
  }
}

flatmap

package com.shujia.spark.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo4FlatMap {
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf()
      .setAppName("map")
      .setMaster("local")

    //spark  上下文对象
    val sc = new SparkContext(conf)

    val listADD: RDD[String] = sc.parallelize(List("java,spark,hadoop", "spark,java,hadoop"))

    /**
      * flatMap:将一行转换成多行
      *
      */
    val wordADD: RDD[String] =listADD.flatMap(line=>{
      println("flatMap:" + line)
      line.split(",")
    })

    wordADD.foreach(println)

  }
}

原文地址:https://www.cnblogs.com/lipinbigdata/p/15022018.html