livy:scalaClient.submit使用scala api跑wordcount时,看着一切正常,但是spark程序没跑没有跑

时间:2019-01-17
本文章向大家介绍livy:scalaClient.submit使用scala api跑wordcount时,看着一切正常,但是spark程序没跑没有跑,主要包括livy:scalaClient.submit使用scala api跑wordcount时,看着一切正常,但是spark程序没跑没有跑使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

问题分析:livy 的java API调用行得通,scala API调不通 ,而且一切日志比对正常,就是不报错,但是就是不跑!

scala代码参考

import java.io.{File, FileNotFoundException}
import java.net.URI

import org.apache.livy.LivyClientBuilder
import org.apache.livy.scalaapi.{LivyScalaClient, ScalaJobHandle}
import org.apache.spark.storage.StorageLevel

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.language.postfixOps
import org.apache.spark.{SparkConf, SparkContext}

 
object Test {

  var scalaClient: LivyScalaClient = null


  def init(): Unit = { 
    scalaClient = new LivyScalaClient(new LivyClientBuilder().setURI(new URI("http://XXXXXX:8998")).build())
  }


  @throws(classOf[FileNotFoundException])
  def uploadRelevantJarsForJobExecution(): Unit = { 
    val scalaApiJarPath = getSourcePath(scalaClient)
    uploadJar("D:\\workspace\\ScalaDemo\\target\\original-ScalaDemo-1.0-SNAPSHOT.jar")
    uploadJar(scalaApiJarPath)
  }

  @throws(classOf[FileNotFoundException])
  private def getSourcePath(obj: Object): String = {
    val source = obj.getClass.getProtectionDomain.getCodeSource
    if (source != null && source.getLocation.getPath != "") {
      source.getLocation.getPath
    } else {
      throw new FileNotFoundException(s"Jar containing ${obj.getClass.getName} not found.")
    }
  }

  private def uploadJar(path: String) = {
    val file = new File(path)
    val uploadJarFuture = scalaClient.uploadJar(file)
    Await.result(uploadJarFuture, 40 second) match {
      case null => println("Successfully uploaded " + file.getName)
    }
  }

//ScalaJobHandle
  def processScWordCount() : ScalaJobHandle[String] = { //必须要返回ScalaJobHandle[String]

    scalaClient.submit({
      fn => {
      val sc = fn.sc
      val res = sc.textFile("hdfs://xxxx:8020//xxx/livyInput/word.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) 
      res.saveAsTextFile("hdfs://xxxxx:8020/xxx/livyInput/out000")
       val s =  res.collect().apply(1).toString()
       s  //返回结果,可以为任意字符串
      }


    })

  }
 

 

  private def stopClient(): Unit = {
    if (scalaClient != null) {
      scalaClient.stop(true)
      scalaClient = null;
    }
  }


  def main(args: Array[String]): Unit = {


    try {
      init()
      uploadRelevantJarsForJobExecution()
      println("Calling processStreamingWordCount")
     val handle1 = processScWordCount()
 
       Await.result(handle1, 100 second)  //###必须要有,否则不跑

    } finally {
      stopClient()
    }
  }
}

 

 

 

pom.xml依赖

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>xxxx</groupId>
  <artifactId>ScalaDemo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>ScalaDemo</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.11.8</scala.version>
    <spark.version>2.0.0</spark.version>
    <hadoop.version>2.6.0</hadoop.version>
    <scala.compat.version>2.11</scala.compat.version>
  </properties>

  <dependencies>

    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

 
 

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
 
 

    <dependency>
      <groupId>org.apache.livy</groupId>
      <artifactId>livy-client-http</artifactId>
      <version>0.5.0-incubating</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.livy/livy-scala-api -->
    <dependency>
      <groupId>org.apache.livy</groupId>
      <artifactId>livy-scala-api_2.11</artifactId>
      <version>0.5.0-incubating</version>
    </dependency>


 


    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
      <exclusions>
        <exclusion>
          
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-client</artifactId>
        </exclusion>
      </exclusions>
    </dependency>


  
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>


  </dependencies>

  <build>
    <pluginManagement>
      <plugins>
        <plugin>
          <groupId>net.alchim31.maven</groupId>
          <artifactId>scala-maven-plugin</artifactId>
          <version>3.2.2</version>
        </plugin>
        <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.5.1</version>
        </plugin>
      </plugins>
    </pluginManagement>
    <plugins>
      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <executions>
          <execution>
            <id>scala-compile-first</id>
            <phase>process-resources</phase>
            <goals>
              <goal>add-source</goal>
              <goal>compile</goal>
            </goals>
          </execution>
          <execution>
            <id>scala-test-compile</id>
            <phase>process-test-resources</phase>
            <goals>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <executions>
          <execution>
            <phase>compile</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>2.4.3</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <filters>
                <filter>
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>META-INF/*.SF</exclude>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                  </excludes>
                </filter>
              </filters>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

</project>