sparkRdd ,breeze

时间:2022-07-22
本文章向大家介绍sparkRdd ,breeze,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

常见RDD操作

textFile

在数据分析中最常见的时从外部获取数据集,这就需要textFile操作

val path = "/home/hadoop/Downloads/用户安装列表数据/*.gz"

//通过textfile文件读取数据最终获得的也是一个RDD,所以datas是一个RDD
val datas = sc.textFile(path)

textFile在读取数据时默认是按照换行符作为分割,所以一行为一个元素

datas.count()

统计行数,就是统计元素的个数 同时RDD计算具有惰性,只有涉及action操作才会执行,所以当出现count是,textFile 这些tranform操作,才会进行执行

textFile可以读取本地/HDFS目录下的文件

同时textFile可以读取多个文件(用逗号隔开),读取目录下文件,直接读取压缩文件

textFile("/input/*.gz")

map,filter

map的意思是对不同分片对每一个元素执行一个函数操作

val rdd1 = sc.parallelize(1 to 9 , 3)
val rdd2 = rdd1.map(x => x * 2)
val rdd3 = rdd2.collect()
println(rdd3.mkString(","))

map的第一个x代表是列表中一个每一个元素, => 表示的是call-by-name在需要计算的时候计算,在每一个元素都进行调用个计算

rdd2 为MapPartitionsRDD类型,经过collect转化为Int数组类型 rdd2.collect 对每一个分片进行收集变为int数组,并转换为字符串,输出

val rdd3 = rdd2.filter(x => x > 10)
val intlist2 = rdd3.collect

intlist2.foreach(println)

filter就是对于每一个元素进行过滤的操作

flatMap

flatMap是map的一对多的形式,输入一个可以对应输出多个

val rdd4 = rdd3.flatMap(x => x until 20)
println(rdd4.collect.mkString(","))

当然最常见的是对于字符串分片的操作

var rdd4 = rdd3.flatMap(x => x.split("t"))
val stringList = datardd1.collect()

mapPartition

mapPartition的输入函数是每一个分区的数据

arrayRDD.mapPartitions(datas=>{
      dbConnect = getDbConnect() //获取数据库连接
      datas.foreach(data=>{
        dbConnect.insert(data) //循环插入数据
      })
      dbConnect.commit() //提交数据库事务
      dbConnect.close() //关闭数据库连接
    })

分批将数据插入数据库

arrayRDD.mapPartitions(elements=>{
      var result = new ArrayBuffer[Int]()
      elements.foreach(element=>{
        result.+=(element)
      })
      result.iterator
    }).foreach(println)

分片求和

mapPartitionWihIndex和mapPartition是基本一样的,只是mapPartitionWihIndex是带有索引的二元组的数据

sample

对样本进行抽样,根据给定的随机种子,是否放回的抽样

 val sampleData = datas.sample(false,0.01,10)

 println(sampleData.count())

union,intersection,distinct

union,intersection分别是两个rdd的交集和并集

val rdd5 = rdd4.union(rdd3)

rdd5.collect.foreach(println)

使用distinct进行去重操作

groupByKey,reduceByKey

是针对key/value类型的数据进行分组操作,groupbykey是对数据进行分组

 val rdd0 = sc.parallelize(Array((1,1),(1,2),(1,3),(2,2),(2,3),(2,4)),3)
    val rddk = rdd0.groupByKey()

    val tuples = rddk.collect()

    for((k,v)<-tuples){
      println(k,v)
      for(s<-v){
        print(s)
      }
    }

group操作是只分组,reduce操作对分组的数据进行联合计算

 val rdd6 = rdd0.reduceByKey((x,y) => x + y)

    val arrayrdd = rdd6.collect

    for((k,v)<-arrayrdd){
      println(k,v)
    }

aggregateByKey

第一个参数为初始值,第二参数为一个函数负责将初始值合并到分组中,第三个参数是一个函数,负责将每一个分组进行合并。

def seqFunc(a,b):
    print "seqFunc:%s,%s" %(a,b)
    return max(a,b) #取最大值
def combFunc(a,b):
    print "combFunc:%s,%s" %(a ,b)
    return a + b #累加起来
'''
    aggregateByKey这个算子内部肯定有分组
'''
aggregateRDD = rdd.aggregateByKey(3, seqFunc, combFunc)
rest = aggregateRDD.collectAsMap()
for k,v in rest.items():
    print k,v

combineByKey

combineByKey是对RDD中的数据集按照key进行聚合的操作

    val data = Array((1,1.0),(1,3.0),(2,4.0),(2,5.0),(2,6.0))
    val rdd = sc.parallelize(data,2)
    
    val combine1 = rdd.combineByKey(createCombiner = (v:Double) =>(v:Double,1),
      mergeValue = (c:(Double,Int),v:Double) => (c_1 + v ,c_2 + 1),
      mergeCombiners = (c1:(Double,Int),c2:(Double,Int)) => (c1_1 + c2_1,c1_2+c2_2),
      numPartitions = 2)

矩阵向量

使用breeze创建矩阵和向量

//创建0矩阵和0向量

    val m1 = DenseMatrix.zeros[Double](2,3)

    println(m1)

    val v1 = DenseVector.zeros[Double](3)

    println(v1)

//初始化自定义数值的向量,给向量填充指定数值

    val v2 = DenseVector.fill(3){10.0}
    //创建单位向量
    val v3 = DenseVector.ones[Double](3)

//创建等差列表向量

    val v4 = DenseVector.range(1,10,2)

    println(v4)

//创建单位矩阵

    val m2 = DenseMatrix.eye[Double](3)

    println(m2)

//创建对角矩阵

    val m3 = diag(DenseVector(1.0,2.0,3.0))

    println(m3)

//从函数创建矩阵和向量

    //i 代表的是索引下标
    val v9 = DenseVector.tabulate(7){i =>2*i}
    println(v9)
    
    //i,j都是坐标 
    val m4 = DenseMatrix.tabulate(3,2){case(i,j) => i+j}

//将数组直接转换为向量或者矩阵

val v10 = new DenseVector(Array(1,2,3,4))
println(v10)
val m5 = new DenseMatrix(2,3,Array(11,2,3,2,3,6))
println(m5)

//随机生成向量和矩阵

val v11 = DenseVector.rand(4)

    val m6 = DenseMatrix.rand(2,3)
    println(m6)
对向量矩阵的访问

//访问向量数组

    var a = DenseVector(1,2,3,4,5,6,7,8,9,10)
    println(a.valueAt(3))
    println(a(5 to 0 by -1))
    println(a(0))
    println(a(1 to -1))

//访问矩阵

 val m = DenseMatrix((1.0,2.0,3.0),(3.0,4.0,5.0))

    println(m)
    //访问单元素
    println(m(0,1))
    //获取一列元素
    println(m(::,1))
    //获取一行
    println(m(1,::))
Breeze元素操作

//矩阵重塑

val m1 = DenseMatrix((1.0,2.0,3.0),(3.0,4.0,5.0))
println(m1)
println(m1.reshape(3,2))

//矩阵的转置

    //从函数创建矩阵和向量
    //i 代表的是索引下标
    val v9 = DenseVector.tabulate(7){i =>2*i}
    println(v9)

//矩阵转换为向量

 println(m1.toDenseVector)
 DenseVector(1.0, 3.0, 2.0, 4.0, 3.0, 5.0)

主对角线/上三角/下三角

  val m2 = DenseMatrix((1.0,2.0,3.0),(4.0,5.0,6.0),(7.0,8.0,9.0))

    //下三角/上三角
    println(lowerTriangular(m2))
    println(upperTriangular(m2))
    println(m2.copy)

    //主对角向量
    println(diag(m2))

//重新赋值

 //给最后一列重新赋值
    m2(::,2):=5.0
    println(m2)

//矩阵进行垂直拼接和横向拼接

    val m3 = DenseMatrix((1,2,3),(4,5,6))
    val m4 = DenseMatrix((1,1,1),(2,2,2))

    var m5 = DenseMatrix.vertcat(m3,m4)
    println(m5)
    m5 = DenseMatrix.horzcat(m3,m4)
    println(m5)
    ```
向量的拼接同矩阵

##### 数值计算

//相同大小的矩阵相加
val m3 = DenseMatrix((1,2,3),(4,5,6))
val m4 = DenseMatrix((1,1,1),(2,2,2))
/数值计算
//矩阵相加
println(m3 + m4)
//对应每一项相乘
println(m3*:*m4)
//每一项相除
println(m3/:/m4)
//每一项小于
println(m3<:<m4)
println(m3:==m4)
println(m3:+=1)
println(m3:*=2)
  println(max(m3))

println(argmax(m3))

//向量的点乘 println(DenseVector(1,2,3,4) dot DenseVector(1,1,1,1)) }

//求和
//对行/列求和
println(sum(m4))
println(sum(m4,Axis._0))
println(sum(m4,Axis._1))
//求迹

println(trace(m2))

//累加

accumulate(DenseVector(1,2,3,4))

##### 布尔操作

m3&:&m4 相与 m3|:|m4 相或 !m3 非操作 any(m3) 存在不为0返回true all(m3) 所有不为0返回true 线性代数矩阵间操作

val c = DenseMatrix((1.0,2.0,3.0),(4.0,5.0,6.0),(7.0,8.0,9.0))

val d = DenseMatrix((1.0,1.0,1.0),(1.0,1.0,1.0),(1.0,1.0,1.0))

println(cd)
println(c.t)
println(det(c))
// 求逆
inv(c)
//求特征值和特征向量
eigSym(c)
//求范数
norm(c)
//求矩阵的质
rank(c)