用java提交一个Spark应用程序

时间:2022-04-25
本文章向大家介绍用java提交一个Spark应用程序,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

第一种方式

经常有人在公众号留言或者在群里问浪尖,如何使用java代码提交一个Spark应用程序。在这里浪尖介绍一种简单的方法来实现这个功能。

首先用vim打开一个文件,MyLauncher.java

编写代码如下:

import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;
import java.util.HashMap;
public class MyLauncher {
 public static void main(String[] args) throws Exception {
HashMap<String,String> map = newHashMap<String, String>();
       map.put("HADOOP_CONF_DIR","/home/hadoop/conf");
       map.put("YARN_CONF_DIR","/home/hadoop/conf");
       map.put("SPARK_CONF_DIR","/home/hadoop/spark/conf");
     new SparkLauncher(map)
        .setAppResource("/data/newStreaming/uesc-analyzer.jar")
        .setMainClass("ucloud.UESBash.testSchema")
        .setMaster("yarn-cluster")
        .setConf(SparkLauncher.DRIVER_MEMORY, "2g")
       .setVerbose(true).startApplication();
       Thread.sleep(100000);
      // Use handle API to monitor / control application.
}
}

接着,进行编译

javac -cp /home/hadoop/spark/lib/spark-assembly-1.6.0-hadoop2.6.0-cdh5.4.9.jarMyLauncher.java

然后提交执行

java -cp/home/hadoop/spark/lib/spark-assembly-1.6.0-hadoop2.6.0-cdh5.4.9.jar:.MyLauncher

这样就可以在yarn的界面上看到运行情况了。

注释:可能你比较奇怪我为啥在那加了个停顿,因为正常情况下我们要进行一些其它操作监控Spark应用而不是直接退出,直接退出可能会导致提交还没完成退出,以至于应用程序不能被提交了。

当然,还有另外一种方式,但是不建议使用,上面采用的这种方式呢可以更好地监控我们Spark程序的状态。

第二种方式

import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;
import java.util.HashMap;
public class MyLauncher {
 public static void main(String[] args) throws Exception {
HashMap<String,String> map = newHashMap<String, String>();
       map.put("HADOOP_CONF_DIR","/home/hadoop/conf");
       map.put("YARN_CONF_DIR","/home/hadoop/conf");
       map.put("SPARK_CONF_DIR","/home/hadoop/spark/conf");
    Process spark  = new SparkLauncher(map)
        .setAppResource("/data/newStreaming/uesc-analyzer.jar")
        .setMainClass("ucloud.UESBash.testSchema")
        .setMaster("yarn-cluster")
        .setConf(SparkLauncher.DRIVER_MEMORY, "2g")
       .setVerbose(true).launch();
      // Use handle API to monitor / control application.
       spark.waitFor();    
}
}

第二种方式是启动了一个子进程来加载提交应用程序。

至于SparkLauncher其它标准函数操作,比如如何传参,如何指定内存,cpu,如何指定钩子函数这些浪尖就不在这里啰嗦了,有兴趣可以去翻看这个类的源码。也可以在公众号里输入:submit,来获取这个类和钩子函数使用的复杂代码。