Mapreduce 任务提交源码分析1
时间:2022-04-27
本文章向大家介绍Mapreduce 任务提交源码分析1,主要内容包括提交过程、总结、基本概念、基础应用、原理机制和需要注意的事项等,并结合实例形式分析了其使用技巧,希望通过本文能帮助到大家理解应用这部分内容。
提交过程
一般我们mapreduce任务是通过如下命令进行提交的
$HADOOP_HOME/bin/hadoop jar $MR_JAR $MAIN_CLASS
hadoop脚本中有如下代码
elif [ "$COMMAND" = "jar" ] ; then
CLASS=org.apache.hadoop.util.RunJar
//... 略
exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"
可以看到hadoop命令提交mapreduce其实就是执行了org.apache.hadoop.util.RunJar类的main方法,接下来我们来看下这个main方法,只关注最核心的逻辑,其他不重要的部分略去。
public static void main(String[] args) throws Throwable {
String usage = "RunJar jarFile [mainClass] args...";
// 第一个参数是jar文件路径,第二个参数是主类名(可选),后续跟其他参数
// ...
int firstArg = 0;
String fileName = args[firstArg++];
File file = new File(fileName);
// 构建jar文件对象
// ...
// --: 这部分逻辑是获取主类名
// 优先从jar文件的Manifest信息中获取主类名; (只有当打包jar时采用可运行的jar文件的方式才有这个信息,否则普通的jar文件中不包含该信息)
// 如果无法获取到,则采用第二参数值作为主类名;
// ---------------------------------------------------------------------------
String mainClassName = null;
JarFile jarFile;
try {
jarFile = new JarFile(fileName);
} catch(IOException io) {
throw new IOException("Error opening job jar: " + fileName)
.initCause(io);
}
Manifest manifest = jarFile.getManifest();
if (manifest != null) {
mainClassName = manifest.getMainAttributes().getValue("Main-Class");
}
jarFile.close();
if (mainClassName == null) {
if (args.length < 2) {
System.err.println(usage);
System.exit(-1);
}
mainClassName = args[firstArg++];
}
mainClassName = mainClassName.replaceAll("/", ".");
// ---------------------------------------------------------------------------
// 获取Hadoop临时目录,指的是本地操作系统的一个目录
// hadoop.tmp.dir配置可在core-site.xml配置文件中配置
File tmpDir = new File(new Configuration().get("hadoop.tmp.dir"));
ensureDirectory(tmpDir);
// --:为这个任务在临时目录下面创建一个临时的工作目录,目录名的格式为:“hadoop-unjar + Long型随机数”
//------------------------------------------------------------------------------
final File workDir;
try {
workDir = File.createTempFile("hadoop-unjar", "", tmpDir);
} catch (IOException ioe) {
System.err.println("Error creating temp dir in hadoop.tmp.dir "
+ tmpDir + " due to " + ioe.getMessage());
System.exit(-1);
return;
}
if (!workDir.delete()) {
System.err.println("Delete failed for " + workDir);
System.exit(-1);
}
ensureDirectory(workDir);
//------------------------------------------------------------------------------
// 为Java进程添加一个钩子程序,当程序shutdown时清楚临时工作目录
ShutdownHookManager.get().addShutdownHook(
new Runnable() {
@Override
public void run() {
FileUtil.fullyDelete(workDir);
}
}, SHUTDOWN_HOOK_PRIORITY);
// 将jar文件里面的内容解压到这个临时的工作目录
unJar(file, workDir);
// -- 将:
// workDir/, workDir/classes/, workDir/lib/${allfiles} 添加到classpath
//------------------------------------------------------------------------------
ArrayList<URL> classPath = new ArrayList<URL>();
classPath.add(new File(workDir+"/").toURI().toURL());
classPath.add(file.toURI().toURL());
classPath.add(new File(workDir, "classes/").toURI().toURL());
File[] libs = new File(workDir, "lib").listFiles();
if (libs != null) {
for (int i = 0; i < libs.length; i++) {
classPath.add(libs[i].toURI().toURL());
}
}
//------------------------------------------------------------------------------
// --: 以上面的classpath创建一个URLClassLoader,作为主线程的classLoader
ClassLoader loader =
new URLClassLoader(classPath.toArray(new URL[0]));
Thread.currentThread().setContextClassLoader(loader);
// --:通过反射的机制去调用主类的main方法,并将除jar文件和[mainClass]以外的所有参数传递给该main方法
Class<?> mainClass = Class.forName(mainClassName, true, loader);
Method main = mainClass.getMethod("main", new Class[] {
Array.newInstance(String.class, 0).getClass()
});
String[] newArgs = Arrays.asList(args)
.subList(firstArg, args.length).toArray(new String[0]);
try {
main.invoke(null, new Object[] { newArgs });
} catch (InvocationTargetException e) {
throw e.getTargetException();
}
}
总结
hadoop命令做的事情就是将jar文件的内容解压到临时工作目录,并将解压后的workDir/, workDir/classes/, workDir/lib/${allfiles} 一系列路径加入到自定义的ClassLoader中,并通过反射的机制去执行jar文件中Manifest中的主类或是用户指定的主类。
- Linux下恶意文件大规模共性分析探讨
- 虚拟时钟
- 线上服务 CPU 100%?一键定位 so easy!
- 设置输出延迟
- 设置输入延时约束
- MySQL 死锁与日志二三事
- 一千个不用 Null 的理由
- TensorFlow强化学习入门(1.5)——上下文赌博机
- 以太坊·代币开发详解
- JSON Web Token - 在Web应用间安全地传递信息
- TensorFlow强化学习入门(2)——基于策略的Agents
- 用ABAP 生成二维码 QR Code
- CDS view注解解析 - @Environment.systemField
- Document flow API in SAP CRM and C4C
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- R语言预测人口死亡率:用李·卡特模型、非线性模型进行平滑估计
- Dockerfile 指令
- Docker 构建容器Tomcat+Nginx+MySQL
- 三种动态控制SAP CRM WebClient UI assignment block显示与否的方法
- TCGA数据库中癌症名称缩写
- CloudFlare自定义节点优化网站
- 什么是SSL?为什么要为WordPress网站使用SSL?
- R语言再保险合同定价案例研究
- SAP CRM附件的技术属性设计原理
- R语言对混合分布中的不可观测与可观测异质性因子分析
- R替换函数gsub
- R语言泊松回归对保险定价建模中的应用:风险敞口作为可能的解释变量
- asp dotnet core 提供大文件下载的测试
- R语言模拟人类生活预期寿命动态可视化动画图gif
- Python遍历字典