Spark机器学习API之特征处理

时间:2022-05-06
本文章向大家介绍Spark机器学习API之特征处理,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

问题导读: 1.怎样利用Spark机器学习API进行特征提取?

2.怎样利用Spark机器学习API进行特征选择?

3.Spark机器学习API中的特征选择有哪几种方法?

Spark机器学习库中包含了两种实现方式,一种是spark.mllib,这种是基础的API,基于RDDs之上构建,另一种是spark.ml,这种是higher-level API,基于DataFrames之上构建,spark.ml使用起来比较方便和灵活。 Spark机器学习中关于特征处理的API主要包含三个方面:特征提取、特征转换与特征选择。 特征提取(Feature Extractors) 1. TF-IDF (HashingTF and IDF)TF-IDF是文本特征提取中一个常用的方法,用以评估一字词对于一个文件集或一个语料库中的其中一份文件的重要程度。 2. Word2VecWord2Vec是一个将单词转换成向量形式的工具。可以把对文本内容的处理简化为向量空间中的向量运算,计算出向量空间上的相似度,来表示文本语义上的相似度。 下面的例子将每个文档中的词语转换成长度为3的向量:

[Java] 纯文本查看 复制代码

?

package com.lxw1234.spark.features
  
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.ml.feature.Word2Vec
  
/**
 * auth: [url=http://lxw1234.com]http://lxw1234.com[/url]
 */
object TestWord2Vec {
  def main(args : Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("lxw1234.com")
    val sc = new SparkContext(conf)
     
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._
     
    val documentDF = sqlContext.createDataFrame(Seq(
      "苹果 官网 苹果 宣布".split(" "),
      "苹果 梨 香蕉".split(" ")
    ).map(Tuple1.apply)).toDF("text")
     
    val word2Vec = new Word2Vec().setInputCol("text").setOutputCol("result").setVectorSize(3).setMinCount(1)
    val model = word2Vec.fit(documentDF)
     
    val result = model.transform(documentDF)
    result.collect().foreach(println)
     
  }
}

程序运行输出如下: [WrappedArray(苹果, 官网, 苹果, 宣布),[0.006021047011017799,-0.002911671996116638,0.05357655562693253]] [WrappedArray(苹果, 梨, 香蕉),[-0.10302492479483286,-0.059321289261182145,0.05107089380423228]] 3. CountVectorizer该方法用于将所有的文本词语进行编号,每个词语对应一个编号,并统计该词语在文档中的词频作为特征向量。

[Java] 纯文本查看 复制代码

?

package com.lxw1234.spark.features
  
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}
  
/**
 * auth: [url=http://lxw1234.com]http://lxw1234.com[/url]
 */
object TestCountVectorizer {
  def main(args : Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("lxw1234.com")
    val sc = new SparkContext(conf)
     
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._
     
    val df = sqlContext.createDataFrame(Seq(
      (0, Array("苹果","官网","苹果","宣布")),
      (1, Array("苹果","梨","香蕉"))
    )).toDF("id", "words")
     
    var cvModel: CountVectorizerModel = new CountVectorizer()
      .setInputCol("words")
      .setOutputCol("features")
      .setVocabSize(5)  //设置词语的总个数,词语编号后的数值均小于该值
      .setMinDF(1) //设置包含词语的最少的文档数
      .fit(df)
     
    println("output1:")
    cvModel.transform(df).select("id","words","features").collect().foreach(println)
     
    var cvModel2: CountVectorizerModel = new CountVectorizer()
      .setInputCol("words")
      .setOutputCol("features")
      .setVocabSize(3)  //设置词语的总个数,词语编号后的数值均小于该值
      .setMinDF(2) //设置包含词语的最少的文档数
      .fit(df)
     
    println("output2:")
    cvModel2.transform(df).select("id","words","features").collect().foreach(println)
     
  }
}

程序output1的输出如下:

[0,WrappedArray(苹果, 官网, 苹果, 宣布),(5,[0,3,4],[2.0,1.0,1.0])] [1,WrappedArray(苹果, 梨, 香蕉),(5,[0,1,2],[1.0,1.0,1.0])]

程序output2的输出如下:

[0,WrappedArray(苹果, 官网, 苹果, 宣布),(1,[0],[2.0])] [1,WrappedArray(苹果, 梨, 香蕉),(1,[0],[1.0])]

因为setMinDF(2)设置了词语最低出现的文档数为2,因此只保留了”苹果”一词。

特征选择(Feature Selectors) 1. VectorSlicer

VectorSlicer用于从原来的特征向量中切割一部分,形成新的特征向量,比如,原来的特征向量长度为10,我们希望切割其中的5~10作为新的特征向量,使用VectorSlicer可以快速实现。

[Java] 纯文本查看 复制代码

?

package com.lxw1234.spark.features.selectors
  
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
  
import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NumericAttribute}
import org.apache.spark.ml.feature.VectorSlicer
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
  
/**
 * By  [url=http://lxw1234.com]http://lxw1234.com[/url]
 */
object TestVectorSlicer extends App {
    val conf = new SparkConf().setMaster("local").setAppName("lxw1234.com")
    val sc = new SparkContext(conf)
     
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._
     
     
    //构造特征数组
    val data = Array(Row(Vectors.dense(-2.0, 2.3, 0.0)))
     
    //为特征数组设置属性名(字段名),分别为f1 f2 f3
    val defaultAttr = NumericAttribute.defaultAttr
    val attrs = Array("f1", "f2", "f3").map(defaultAttr.withName)
    val attrGroup = new AttributeGroup("userFeatures", attrs.asInstanceOf[Array[Attribute]])
     
    //构造DataFrame
    val dataRDD = sc.parallelize(data)
    val dataset = sqlContext.createDataFrame(dataRDD, StructType(Array(attrGroup.toStructField())))
     
    print("原始特征:")
    dataset.take(1).foreach(println)
     
     
    //构造切割器
    var slicer = new VectorSlicer().setInputCol("userFeatures").setOutputCol("features")
     
    //根据索引号,截取原始特征向量的第1列和第3列
    slicer.setIndices(Array(0,2))
    print("output1: ") 
    slicer.transform(dataset).select("userFeatures", "features").first()
     
    //根据字段名,截取原始特征向量的f2和f3
    slicer = new VectorSlicer().setInputCol("userFeatures").setOutputCol("features")
    slicer.setNames(Array("f2","f3"))
    print("output2: ") 
    slicer.transform(dataset).select("userFeatures", "features").first()
     
    //索引号和字段名也可以组合使用,截取原始特征向量的第1列和f2
    slicer = new VectorSlicer().setInputCol("userFeatures").setOutputCol("features")
    slicer.setIndices(Array(0)).setNames(Array("f2"))
    print("output3: ") 
    slicer.transform(dataset).select("userFeatures", "features").first()
     
     
}

程序运行输出为:

[Plain Text] 纯文本查看 复制代码

?

原始特征:
[[-2.0,2.3,0.0]]
  
output1:
org.apache.spark.sql.Row = [[-2.0,2.3,0.0],[-2.0,0.0]]
  
output2:
org.apache.spark.sql.Row = [[-2.0,2.3,0.0],[2.3,0.0]]
  
output3:
org.apache.spark.sql.Row = [[-2.0,2.3,0.0],[-2.0,2.3]]

2. RFormula

RFormula用于将数据中的字段通过R语言的Model Formulae转换成特征值,输出结果为一个特征向量和Double类型的label。

[Java] 纯文本查看 复制代码

?

package com.lxw1234.spark.features.selectors
  
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
  
import org.apache.spark.ml.feature.RFormula
  
/**
 * By  [url=http://lxw1234.com]http://lxw1234.com[/url]
 */
object TestRFormula extends App {
   
    val conf = new SparkConf().setMaster("local").setAppName("lxw1234.com")
    val sc = new SparkContext(conf)
     
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._
     
    //构造数据集
    val dataset = sqlContext.createDataFrame(Seq(
      (7, "US", 18, 1.0),
      (8, "CA", 12, 0.0),
      (9, "NZ", 15, 0.0)
    )).toDF("id", "country", "hour", "clicked")
    dataset.select("id", "country", "hour", "clicked").show()
     
    //当需要通过country和hour来预测clicked时候,
    //构造RFormula,指定Formula表达式为clicked ~ country + hour
    val formula = new RFormula().setFormula("clicked ~ country + hour").setFeaturesCol("features").setLabelCol("label")
    //生成特征向量及label
    val output = formula.fit(dataset).transform(dataset)
    output.select("id", "country", "hour", "clicked", "features", "label").show()
}

程序输出:

3. ChiSqSelector

ChiSqSelector用于使用卡方检验来选择特征(降维)。

[Java] 纯文本查看 复制代码

?

package com.lxw1234.spark.features.selectors
  
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.mllib.linalg.Vectors
  
/**
 * By  [url=http://lxw1234.com]http://lxw1234.com[/url]
 */
object TestChiSqSelector extends App {
   
    val conf = new SparkConf().setMaster("local").setAppName("lxw1234.com")
    val sc = new SparkContext(conf)
     
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._
     
    //构造数据集
    val data = Seq(
      (7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
      (8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
      (9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
    )
    val df = sc.parallelize(data).toDF("id", "features", "clicked")
    df.select("id", "features","clicked").show()
     
    //使用卡方检验,将原始特征向量(特征数为4)降维(特征数为3)
    val selector = new ChiSqSelector().setNumTopFeatures(3).setFeaturesCol("features").setLabelCol("clicked").setOutputCol("selectedFeatures")
     
    val result = selector.fit(df).transform(df)
    result.show()
  
}

程序输出为: