Spark读取mysql数据作为DataFrame

时间:2019-10-17
本文章向大家介绍Spark读取mysql数据作为DataFrame,主要包括Spark读取mysql数据作为DataFrame使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

在日常工作中,有时候需要读取mysql的数据作为DataFrame数据源进行后期的Spark处理,Spark自带了一些方法供我们使用,读取mysql我们可以直接使用表的结构信息,而不需要自己再去定义每个字段信息。
下面是我的实现方式。

1.mysql的信息:

mysql的信息我保存在了外部的配置文件,这样方便后续的配置添加。也可以自己写死  这样可以获取多个数据源;

在resource.properties中添加如下配置信息

mysql.driver=com.mysql.jdbc.Driver
mysql.url=jdbc:mysql://127.0.0.1:3306/test?useSSL=false&autoReconnect=true&failOverReadOnly=false&rewriteBatchedStatements=true
mysql.username=root
mysql.password=aaa

添加坐标依赖,由于公司使用2.11版本的spark,所以这里就添加如下配置:

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <scala.version>2.11.8</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <spark.version>2.1.0</spark.version>
        <log4j.version>2.7</log4j.version>
        <mysql.version>5.1.38</mysql.version>
  </properties>

  <dependencies>
          <!-- Spark Core:Spark的核心组件,不管用用到Spark的什么功能,本依赖都必须添加。类比Spring Core -->
          <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>compile</scope>
        </dependency>
        <!-- Spark Sql组件,但凡要与数据库打交道都需添加此依赖,鉴于使用Spark纯计算场景不多,该依赖通常是必须之一。-->
        <dependency>
               <groupId>org.apache.spark</groupId>
               <artifactId>spark-sql_${scala.binary.version}</artifactId>
               <version>${spark.version}</version>
               <scope>compile</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
        <!-- Spark本地模式所依赖的jar -->
        <dependency>
            <groupId>commons-configuration</groupId>
            <artifactId>commons-configuration</artifactId>
            <version>1.10</version>
        </dependency>
         <!-- Spark本地模式所依赖的jar -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-auth</artifactId>
            <version>2.4.0</version>
        </dependency>

      <!--mysql连接驱动-->
      <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>${mysql.version}</version>
      </dependency>

  </dependencies>

这里我写了两种方式,一种是与java整合实现,一种是scala实现:

方式一:scala

object SparkFromMysql {

  private val session: SparkSession = SparkSession.builder().appName("My First Spark Application!").master("local[2]").getOrCreate()

  def main(args: Array[String]): Unit = {

    val frame:DataFrame = SparkFromMysql.readMysqlTable("test","1=1")
    frame.show(10)
  }

  def readMysqlTable(table:String,filterCondition:String):DataFrame ={
    var tableName = "(select * from "+table+" where "+ filterCondition+" ) as t1"
    val properties = new Properties()
    session.read.format("jdbc")
      .option("url",Constant.MYSQL_URL)
      .option("driver",Constant.MYSQL_DRIVER)
      .option("user",Constant.MYSQL_USERNAME)
      .option("password",Constant.MYSQL_PASSWORD)
      .option("dbtable",tableName)
      .load()

  }

}
**
 * @author ming
 * @date 2019/10/17 9:54
 */
public class Constant {

    private static final String PROPERTY_FILE = "resource.properties";

    private static final Configuration CONF = new Configuration(PROPERTY_FILE);

    public static final String MYSQL_DRIVER = CONF.getValue("mysql.driver");
    public static final String MYSQL_URL = CONF.getValue("mysql.url");
    public static final String MYSQL_USERNAME = CONF.getValue("mysql.username");
    public static final String MYSQL_PASSWORD = CONF.getValue("mysql.password");



}
public class Configuration {

    private static final Logger log = LoggerFactory.getLogger(Configuration.class);

    private Properties propertie;
    private InputStream inputFile;
    private OutputStream outputFile;

    /**
     * 初始化Configuration类
     */
    public Configuration() {
        propertie = new Properties();
    }

    /**
     * 初始化Configuration类
     *
     * @param filePath 要读取的配置文件的路径+名称
     */
    public Configuration(String filePath) {
        propertie = new Properties();
        try {
            inputFile = Thread.currentThread().getContextClassLoader().getResourceAsStream(filePath);
            propertie.load(inputFile);
            inputFile.close();
        } catch (FileNotFoundException ex) {
            log.info("读取属性文件--->失败!- 原因:文件路径错误或者文件不存在", ex);
        } catch (IOException ex) {
            log.info("装载文件--->失败!", ex);
        }
    }

    public Configuration(InputStream inputFile) {
        this.inputFile = inputFile;
        propertie = new Properties();
        try {
            propertie.load(inputFile);
            inputFile.close();
        } catch (FileNotFoundException ex) {
            log.info("读取属性文件--->失败!- 原因:文件路径错误或者文件不存在", ex);
        } catch (IOException ex) {
            log.info("装载文件--->失败!", ex);
        }
    }

    public static Properties getPropertiesFromFile(String filePath) {
        Properties properties = new Properties();
        if (filePath == null || filePath.trim().length() == 0) {
            return null;
        }
        try {
            InputStream propertyIS = new FileInputStream(filePath);
            properties.load(propertyIS);
            propertyIS.close();
        } catch (FileNotFoundException ex) {
            log.info("读取属性文件--->失败!- 原因:文件路径错误或者文件不存在. filePath=" + filePath, ex);
        } catch (IOException ex) {
            log.info("装载文件--->失败!", ex);
        }
        return properties;
    }

    /**
     * 重载函数,得到key的值
     *
     * @param key 取得其值的键
     * @return key的值
     */
    public String getValue(String key) {
        if (propertie.containsKey(key)) {
            return propertie.getProperty(key);// 得到某一属性的值
        } else
            return "";
    }

    /**
     * 重载函数,得到key的值
     *
     * @param fileName properties文件的路径+文件名
     * @param key      取得其值的键
     * @return key的值
     */
    public String getValue(String fileName, String key) {
        try {
            String value = "";
            inputFile = new FileInputStream(fileName);
            propertie.load(inputFile);
            inputFile.close();
            if (propertie.containsKey(key)) {
                value = propertie.getProperty(key);
                return value;
            } else {
                return value;
            }
        } catch (FileNotFoundException e) {
            log.info("", e);
            return "";
        } catch (IOException e) {
            log.info("", e);
            return "";
        } catch (Exception ex) {
            log.info("", ex);
            return "";
        }
    }

    /**
     * 清除properties文件中所有的key和其值
     */
    public void clear() {
        propertie.clear();
    }

    /**
     * 改变或添加一个key的值,当key存在于properties文件中时该key的值被value所代替, 当key不存在时,该key的值是value
     *
     * @param key   要存入的键
     * @param value 要存入的值
     */
    public void setValue(String key, String value) {
        propertie.setProperty(key, value);
    }

    /**
     * 将更改后的文件数据存入指定的文件中,该文件可以事先不存在。
     *
     * @param fileName    文件路径+文件名称
     * @param description 对该文件的描述
     */
    public void saveFile(String fileName, String description) {
        try {
            outputFile = new FileOutputStream(fileName);
            propertie.store(outputFile, description);
            outputFile.close();
        } catch (FileNotFoundException e) {
            log.info("", e);
        } catch (IOException ioe) {
            log.info("", ioe);
        }
    }
}

方式二:与java整合

object SparkFromMysqlOne {

  val session: SparkSession = SparkSession.builder().appName("My First Spark Application!").master("local[2]").getOrCreate()

  def main(args: Array[String]): Unit = {
    var proPath = "F:\\spark_test_1\\HelloSpark\\src\\main\\resources\\resource.properties"
    val tableName = "test"
    val filterCondition = "name = 'alan'"
    readMysqlTable(tableName,proPath).show()
    readMysqlTable(tableName,filterCondition,proPath).show()

  }

  /**
    * 获取mysql表数据,不加过滤条件
    * @param tableName 表名
    * @param proPath mysql配置信息
    * @return
    */
  def readMysqlTable(tableName:String,proPath:String): DataFrame ={

    val properties = SparkFromMysqlOne.getProperties(proPath)
    val dataFrame = session.read.format("jdbc").option("url", properties.getProperty("mysql.url")).option("driver", properties.getProperty("mysql.driver")).option("user", properties.getProperty("mysql.username")).option("password", properties.getProperty("mysql.password")).option("dbtable", tableName).load()
    dataFrame
  }

  /**
    * 获取mysql表数据,加过滤条件
    * @param tableName 表名
    * @param filterConditin 加过滤条件
    * @param proPath mysql配置信息
    * @return
    */
  def readMysqlTable(tableName:String,filterCondition:String,proPath: String): DataFrame ={
    val properties = getProperties(proPath)
    val sql = "(select * from " + tableName + " where " + filterCondition + " ) as t1"
    val dataFrame = session.read.format("jdbc").option("url", properties.getProperty("mysql.url")).option("driver", properties.getProperty("mysql.driver")).option("user", properties.getProperty("mysql.username")).option("password", properties.getProperty("mysql.password")).option("dbtable", sql).load()
    dataFrame

  }


  def getProperties(proPath:String): Properties ={
    val properties = new Properties()
    properties.load(new FileInputStream(proPath))
    properties
  }
}

原文地址:https://www.cnblogs.com/AlanWilliamWalker/p/11694888.html