BigData--MapReduce进阶(一)之框架原理
时间:2022-07-25
本文章向大家介绍BigData--MapReduce进阶(一)之框架原理,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
MapReduce进阶(一)–框架原理
1、InputFormat
MapReduce数据流
2、MapTask并行度决定机制
数据块:Block是HDFS物理上把数据分成一块一块。
数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。
- 1)一个Job的Map阶段并行度由客户端在提交Job时的切片数决定
- 2)每一个Split切片分配一个MapTask并行实例处理
- 3)默认情况下,切片大小=BlockSize
- 4)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
3、Job提交流程源码解析
4、FileInputFormat切片源码解析(input.getSplits(job))
1)源码解析
2)切片机制
- (1)简单地按照文件的内容长度进行切片
- (2)切片大小,默认等于Block大小
- (3)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
3)切片大小的参数配置
5、小文件切片–CombineTextInputFormat切片机制
生成切片过程包括:虚拟存储过程和切片过程二部分。
(1)虚拟存储过程:
将输入目录下所有文件大小,依次和设置的
setMaxInputSplitSize
值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。
(2)切片过程:
Code
(a)判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。
(b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
(c)测试举例:有4个小文件大小分别为1.7M、5.1M、3.4M以及6.8M这四个小文件,则虚拟存储之后形成6个文件块,大小分别为:1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M)
最终会形成3个切片,大小分别为:(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M
6、自定义InputFormat
1) WholeFileInputFormat 继承FileInputFormat
java
package cn.buildworld.mapreduce.inputformat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
/**
* @author MiChong
* @date 2020-05-25 16:12
*/
public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
@Override
public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
return new WholeFileRecordReader();
}
}
2)自定义RecordReader–WholeFileRecordReader
java
package cn.buildworld.mapreduce.inputformat;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
/**
* @author MiChong
* @date 2020-05-25 16:15
* <p>
* 自定义RecordReader,处理一个文件,把这个文件直接读成 一个KV值
*/
public class WholeFileRecordReader extends RecordReader<Text, BytesWritable> {
private boolean notRead = true;
private Text key = new Text();
private BytesWritable value = new BytesWritable();
private FSDataInputStream inputStream;
private Path path;
private FileSplit fs;
/**
* 初始化方法,框架会在开始的时候调用一次
*
* @param split
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
//转换切片类型到文件切片
fs = (FileSplit) split;
//通过切片获取路径
path = fs.getPath();
//通过路径获取文件系统
FileSystem fileSystem = path.getFileSystem(context.getConfiguration());
//开流
inputStream = fileSystem.open(path);
}
/**
* 读取下一组KV值
*
* @return 如果读到了,返回true,读完了,返回False
* @throws IOException
* @throws InterruptedException
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (notRead) {
// 具体读文件的过程
//读key
key.set(fs.getPath().toString());
//读value
byte[] buf = new byte[(int) fs.getLength()];
inputStream.read(buf);
value.set(buf, 0, buf.length);
notRead = false;
return true;
} else {
return false;
}
}
/**
* 获取到当前的key
*
* @return 当前的key
* @throws IOException
* @throws InterruptedException
*/
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
/**
* 获取当前读到的Value
*
* @return 当前Value
* @throws IOException
* @throws InterruptedException
*/
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return value;
}
/**
* 当前数据读取的进度
*
* @return 当前进度
* @throws IOException
* @throws InterruptedException
*/
@Override
public float getProgress() throws IOException, InterruptedException {
return notRead ? 0 : 1;
}
/**
* 关闭资源
*
* @throws IOException
*/
@Override
public void close() throws IOException {
IOUtils.closeStream(inputStream);
}
}
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- iOS开发~UIView layer 之前的关系
- iOS进阶_KVC(&KVC赋值取值过程分析&KVC自定义&异常处理)
- python 学习笔记(8)——python绝对路径相对路径
- 设计模式-访问者模式
- 设计模式-空对象模式
- 设计模式-观察者模式
- Selenium自动化最佳实践技巧(上)
- 如何同时压测创建和删除接口
- 为什么测试覆盖率如此重要
- 5行Python就能爬取 3000+ 上市公司的信息?
- iOS 代码染色原理及技术实践
- 微信小程序【常用组件及自定义组件】
- 虚拟机字节码执行引擎,JVM的马达图,是爱情呀
- 微信小程序【浅提WXSS样式】
- Kubernetes Liveness and Readiness Probes