Hadoop知识总结

时间:2021-07-12
本文章向大家介绍Hadoop知识总结,主要包括Hadoop知识总结使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

------------恢复内容开始------------

Hadoop知识点

什么是Hadoop

Hadoop是一个能够对大量数据进行分布式处理的软件框架。以一种可靠、高效、可伸缩的方式进行数据处理。主要包括三部分内容:Hdfs,MapReduce,Yarn


从2.7.3版本开始block size的默认大小为128M,之前版本的默认值是64M。对于新文件切分的大小,单位byte。每一个节点都要指定,包括客户端。

原理:文件块越大,寻址时间越短,但磁盘传输时间越长;文件块越小,寻址时间越长,但磁盘传输时间越短。


Hadoop和Spark差异


Hadoop常见版本,有哪些特点,一般是如何进行选择

Hadoop1.0由分布式存储系统HDFS和分布式计算框架MapReduce组成,其中HDFS由一个NameNode和多个DateNode组成,MapReduce由一个JobTracker和多个TaskTracker组成。在Hadoop1.0中容易导致单点故障,拓展性差,性能低,支持编程模型单一的问题。


Hadoop2.0即为克服Hadoop1.0中的不足,提出了以下关键特性:

  • Yarn:它是Hadoop2.0引入的一个全新的通用资源管理系统,完全代替了Hadoop1.0中的JobTracker。在MRv1 中的 JobTracker 资源管理和作业跟踪的功能被抽象为 ResourceManager 和 AppMaster 两个组件。Yarn 还支持多种应用程序和框架,提供统一的资源调度和管理功能

  • NameNode 单点故障得以解决:Hadoop2.2.0 同时解决了 NameNode 单点故障问题和内存受限问题,并提供 NFS,QJM 和 Zookeeper 三种可选的共享存储系统

  • HDFS 快照:指 HDFS(或子系统)在某一时刻的只读镜像,该只读镜像对于防止数据误删、丢失等是非常重要的。例如,管理员可定时为重要文件或目录做快照,当发生了数据误删或者丢失的现象时,管理员可以将这个数据快照作为恢复数据的依据

  • 支持Windows 操作系统:Hadoop 2.2.0 版本的一个重大改进就是开始支持 Windows 操作系统

  • Append:新版本的 Hadoop 引入了对文件的追加操作

同时,新版本的Hadoop对于HDFS做了两个非常重要的「增强」,分别是支持异构的存储层次和通过数据节点为存储在HDFS中的数据提供内存缓冲功能


相比于Hadoop2.0,Hadoop3.0 是直接基于 JDK1.8 发布的一个新版本,同时,Hadoop3.0引入了一些重要的功能和特性

  • HDFS可擦除编码:这项技术使HDFS在不降低可靠性的前提下节省了很大一部分存储空间

  • 多NameNode支持:在Hadoop3.0中,新增了对多NameNode的支持。当然,处于Active状态的NameNode实例必须只有一个。也就是说,从Hadoop3.0开始,在同一个集群中,支持一个 ActiveNameNode 和 多个 StandbyNameNode 的部署方式。

  • MR Native Task优化

  • Yarn基于cgroup 的内存和磁盘 I/O 隔离

  • Yarn container resizing


Hadoop常用端口号

dfs.namenode.http-address:50070
dfs.datanode.http-address:50075
SecondaryNameNode:50090
dfs.datanode.address:50010
fs.defaultFS:8020 或者9000
yarn.resourcemanager.webapp.address:8088
历史服务器web访问端口:19888
端口名称Hadoop2.xHadoop3.x
NameNode 内部通信端口 8020 / 9000 8020 / 9000/9820
NameNode HTTP UI 50070 9870
MapReduce 查看执行任务端口 8088 8088
jobhistoryserver历史服务器通信端口 19888 19888
datanode HTTP UI 50075  
SecondaryNameNode web端 50090  

搭建Hadoop集群的流程

1)准备一台虚拟机样机(配置好IP地址,主机名,映射,关闭防火墙,jdk安装。jdk安装需要配置环境变量,JAVA_HOME/bin目录下所有命令全局生效)

2)克隆两台虚拟机。(设置ssh免密登录,配置映射,修改主机名,和IP地址)

3)安装Hadoop解压到指定目录文件下

4)Hadoop文件配置

a)core-site.xml配置HDFS中namenode的地址指定Hadoop运行时产生文件的存储目录

b)hdfs-site.xml配置hdfs副本数量

c)yarn-site.xml配置reducer获取数据的方式指定yarn的resourceManager的地址

d)mapred-env.sh配置MR运行在yarn上,配置历史服务器端地址历史服务器web端地址

e)slaves配置其他虚拟机的主机名

f)hadoop-env.sh : Hadoop 环境配置文件 export JAVA_HOME=/usr/local/soft/jdk1.8.0_171

5)分发Hadoop的安装包(scp -r hadoop node1:pwd

6)格式化hdfs(hdfs namenode -format

7)启动集群(start-all.sh,因为我们是把resourcemanager和2nn放在master一台上进行启动)

Hadoop中需要哪些配置文件,其作用是什么?

1)core-site.xml

(1)fs.defaultFS:hdfs://cluster1(域名),这里的值指的是默认的HDFS路径 。

(2)hadoop.tmp.dir:/export/data/hadoop_tmp,这里的路径默认是NameNode、DataNode、secondaryNamenode等存放数据的公共目录。用户也可以自己单独指定这三类节点的目录。

(3)ha.zookeeper.quorum:hadoop101:2181,hadoop102:2181,hadoop103:2181,这里是ZooKeeper集群的地址和端口。注意,数量一定是奇数,且不少于三个节点 。

2)hadoop-env.sh: 只需设置jdk的安装路径,如:export JAVA_HOME=/usr/local/jdk。

3)hdfs-site.xml

(1) dfs.replication:他决定着系统里面的文件块的数据备份个数,默认为3个。

(2) dfs.data.dir:datanode节点存储在文件系统的目录 。

(3) dfs.name.dir:是namenode节点存储hadoop文件系统信息的本地系统路径 。

4)mapred-site.xml

mapreduce.framework.name: yarn指定mr运行在yarn上

HDFS读写流程

1)客户端通过Distributed FileSystem模块向NameNode请求上传文件,NameNode检查目标文件是否已存在,父目录是否存在。

2)NameNode返回是否可以上传。

3)客户端请求第一个 Block上传到哪几个DataNode服务器上。

4)NameNode返回3个DataNode节点,分别为dn1、dn2、dn3。

5)客户端通过FSDataOutputStream模块请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成。

6)dn1、dn2、dn3逐级应答客户端。

7)客户端开始往dn1上传第一个Block(先从磁盘读取数据放到一个本地内存缓存),以Packet为单位,dn1收到一个Packet就会传给dn2,dn2传给dn3;dn1每传一个packet会放入一个应答队列等待应答。

8)当一个Block传输完成之后,客户端再次请求NameNode上传第二个Block的服务器。(重复执行3-7步)。

1)客户端通过Distributed FileSystem向NameNode请求下载文件,NameNode通过查询元数据,找到文件块所在的DataNode地址。

2)挑选一台DataNode(就近原则,然后随机)服务器,请求读取数据。

3)DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet为单位来做校验)。

4)客户端以Packet为单位接收,先在本地缓存,然后写入目标文件。

MapReduce的Shuffle过程,Hadoop优化方案

MapReduce数据读取并写入HDFS流程实际上是有10步

Shuffle阶段

1、Map方法之后Reduce方法之前这段处理过程叫「Shuffle」

2、Map方法之后,数据首先进入到分区方法,把数据标记好分区,然后把数据发送到环形缓冲区;环形缓冲区默认大小100m,环形缓冲区达到80%时,进行溢写;溢写前对数据进行排序,排序按照对key的索引进行字典顺序排序,排序的手段「快排」;溢写产生大量溢写文件,需要对溢写文件进行「归并排序」;对溢写的文件也可以进行Combiner操作,前提是汇总操作,求平均值不行。最后将文件按照分区存储到磁盘,等待Reduce端拉取。

3、每个Reduce拉取Map端对应分区的数据。拉取数据后先存储到内存中,内存不够了,再存储到磁盘。拉取完所有数据后,采用归并排序将内存和磁盘中的数据都进行排序。在进入Reduce方法前,可以对数据进行分组操作。

Shuffle优化

1)Map阶段

(1)增大环形缓冲区大小。由100m扩大到200m

(2)增大环形缓冲区溢写的比例。由80%扩大到90%

(3)减少对溢写文件的merge次数。(10个文件,一次20个merge)

(4)不影响实际业务的前提下,采用Combiner提前合并,减少 I/O

2)Reduce阶段

(1)合理设置Map和Reduce数:两个都不能设置太少,也不能设置太多。太少,会导致Task等待,延长处理时间;太多,会导致 Map、Reduce任务间竞争资源,造成处理超时等错误。

(2)设置Map、Reduce共存:调整slowstart.completedmaps参数,使Map运行到一定程度后,Reduce也开始运行,减少Reduce的等待时间。

(3)规避使用Reduce,因为Reduce在用于连接数据集的时候将会产生大量的网络消耗。

(4)增加每个Reduce去Map中拿数据的并行数

(5)集群性能可以的前提下,增大Reduce端存储数据内存的大小。

3)IO传输

采用数据压缩的方式,减少网络IO的的时间。安装Snappy和LZOP压缩编码器。

压缩:

(1)map输入端主要考虑数据量大小和切片,支持切片的有Bzip2、LZO。注意:LZO要想支持切片必须创建索引;

(2)map输出端主要考虑速度,速度快的snappy、LZO;

(3)reduce输出端主要看具体需求,例如作为下一个mr输入需要考虑切片,永久保存考虑压缩率比较大的gzip。

4)整体

(1)NodeManager默认内存8G,需要根据服务器实际配置灵活调整,例如128G内存,配置为100G内存左右,yarn.nodemanager.resource.memory-mb。

(2)单任务默认内存8G,需要根据该任务的数据量灵活调整,例如128m数据,配置1G内存,yarn.scheduler.maximum-allocation-mb。

(3)mapreduce.map.memory.mb :控制分配给MapTask内存上限,如果超过会kill掉进程(报:Container is running beyond physical memory limits. Current usage:565MB of512MB physical memory used;Killing Container)。默认内存大小为1G,如果数据量是128m,正常不需要调整内存;如果数据量大于128m,可以增加MapTask内存,最大可以增加到4-5g。

(4)mapreduce.reduce.memory.mb:控制分配给ReduceTask内存上限。默认内存大小为1G,如果数据量是128m,正常不需要调整内存;如果数据量大于128m,可以增加ReduceTask内存大小为4-5g。

(5)mapreduce.map.java.opts:控制MapTask堆内存大小。(如果内存不够,报:java.lang.OutOfMemoryError)

(6)mapreduce.reduce.java.opts:控制ReduceTask堆内存大小。(如果内存不够,报:java.lang.OutOfMemoryError)

(7)可以增加MapTask的CPU核数,增加ReduceTask的CPU核数

(8)增加每个Container的CPU核数和内存大小

(9)在hdfs-site.xml文件中配置多目录

(10)NameNode有一个工作线程池,用来处理不同DataNode的并发心跳以及客户端并发的元数据操作。dfs.namenode.handler.count=20 * log2(Cluster Size),比如集群规模为10台时,此参数设置为60。

基于MapReduce做Hadoop的优化

1)HDFS小文件影响

  • 影响NameNode的寿命,因为文件元数据存储在NameNode的内存中

  • 影响计算引擎的任务数量,比如每个小的文件都会生成一个Map任务

2)数据输入小文件处理

  • 合并小文件:对小文件进行归档(Har)、自定义Inputformat将小文件存储成SequenceFile文件。

  • 采用ConbinFileInputFormat来作为输入,解决输入端大量小文件场景

  • 对于大量小文件Job,可以开启JVM重用会减少45%运行时间

3)Map阶段

  • 增大环形缓冲区大小。由100m扩大到200m

  • 增大环形缓冲区溢写的比例。由80%扩大到90%

  • 减少对溢写文件的merge次数。(10个文件,一次20个merge)

  • 不影响实际业务的前提下,采用Combiner提前合并,减少 I/O

4)Reduce阶段

  • 合理设置Map和Reduce数:两个都不能设置太少,也不能设置太多。太少,会导致Task等待,延长处理时间;太多,会导致 Map、Reduce任务间竞争资源,造成处理超时等错误。

  • 设置Map、Reduce共存:调整 slowstart.completedmaps 参数,使Map运行到一定程度后,Reduce也开始运行,减少Reduce的等待时间

  • 规避使用Reduce,因为Reduce在用于连接数据集的时候将会产生大量的网络消耗。

  • 增加每个Reduce去Map中拿数据的并行数

  • 集群性能可以的前提下,增大Reduce端存储数据内存的大小

5)IO传输

  • 采用数据压缩的方式,减少网络IO的的时间

  • 使用SequenceFile二进制文件

6)整体

  • MapTask默认内存大小为1G,可以增加MapTask内存大小为4

  • ReduceTask默认内存大小为1G,可以增加ReduceTask内存大小为4-5g

  • 可以增加MapTask的cpu核数,增加ReduceTask的CPU核数

  • 增加每个Container的CPU核数和内存大小

  • 调整每个Map Task和Reduce Task最大重试次数

7)压缩

详细查看Hadoop3.x的mapreduce压缩P58

Yarn的job提交流程

job提交过程详解

(1)作业提交

1)Clinet调用job.waitForCompletion方法,向整个集群提交MapReduce作业**

2)Clinet向RM申请一个作业id**

3)RM给Clinet返回该job资源的提交路径和作业id**

4)Client 提交 jar 包、切片信息和配置文件到指定的资源提交路径。**

5)Client 提交完资源后,向 RM 申请运行 MrAppMaster。

(2)作业初始化

6)当 RM 收到 Client 的请求后,将该 job 添加到容量调度器

7)某一个空闲的 NM 领取到该 Job。

8)该 NM 创建 Container,并产生 MRAppmaster。

9)下载 Client 提交的资源到本地。

(3)任务分配

10)MrAppMaster 向 RM 申请运行多个 MapTask 任务资源。

11)RM 将运行 MapTask 任务分配给另外两个 NodeManager,另两个 NodeManager

分别领取任务并创建容器。

(4)任务运行

12)MR 向两个接收到任务的 NodeManager 发送程序启动脚本,这两个NodeManager 分别启动 MapTask,MapTask 对数据分区排序。

13)MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。

14)ReduceTask 向 MapTask 获取相应分区的数据

15)程序运行完毕后,MR 会向 RM 申请注销自己。

(5)进度和状态更新

YARN 中的任务将其进度和状态(包括 counter)返回给应用管理器, 客户端每秒(通过mapreduce.client.progressmonitor.pollinterval 设置)向应用管理器请求进度更新, 展示给用户。

(6)作业完成

除了向应用管理器请求作业进度外, 客户端每 5 秒都会通过调用 waitForCompletion()来检查作业是否完成。时间间隔可以通过 mapreduce.client.completion.pollinterval 来设置。作业完成之后, 应用管理器和 Container 会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。

其中简略版对应的步骤分别如下:
1、client向RM提交应用程序,其中包括启动该应用的ApplicationMaster的必须信息,例如ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等
2、ResourceManager启动一个container用于运行ApplicationMaster
3、启动中的ApplicationMaster向ResourceManager注册自己,启动成功后与RM保持心跳
4、ApplicationMaster向ResourceManager发送请求,申请相应数目的container
5、申请成功的container,由ApplicationMaster进行初始化。container的启动信息初始化后,AM与对应的NodeManager通信,要求NM启动container
6、NM启动container
7、container运行期间,ApplicationMaster对container进行监控。container通过RPC协议向对应的AM汇报自己的进度和状态等信息
8、应用运行结束后,ApplicationMaster向ResourceManager注销自己,并允许属于它的container被收回

Yarn默认的调度器,分类,以及它们之间的区别

1)Hadoop调度器主要分为三类:

  • FIFO Scheduler:先进先出调度器:优先提交的,优先执行,后面提交的等待【生产环境不会使用】

  • Capacity Scheduler:容量调度器:允许看创建多个任务对列,多个任务对列可以同时执行。但是一个队列内部还是先进先出。【Hadoop2.7.2默认的调度器】

  • Fair Scheduler:公平调度器:第一个程序在启动时可以占用其他队列的资源(100%占用),当其他队列有任务提交时,占用资源的队列需要将资源还给该任务。还资源的时候,效率比较慢。【CDH版本的yarn调度器默认】

Hadoop的参数优化

  • 在hdfs-site.xml文件中配置多目录,最好提前配置好,否则更改目录需要重新启动集群

  • NameNode有一个工作线程池,用来处理不同DataNode的并发心跳以及客户端并发的元数据操作

  • 编辑日志存储路径dfs.namenode.edits.dir设置与镜像文件存储路径dfs.namenode.name.dir尽量分开,达到最低写入延迟

  • 服务器节点上YARN可使用的物理内存总量,默认是8192(MB),注意,如果你的节点内存资源不够8GB,则需要调减小这个值,而YARN不会智能的探测节点的物理内存总量

  • 单个任务可申请的最多物理内存量,默认是8192(MB

Hadoop宕机

如果MR造成系统宕机。此时要控制Yarn同时运行的任务数,和每个任务申请的最大内存。调整参数:yarn.scheduler.maximum-allocation-mb(单个任务可申请的最多物理内存量,默认是8192MB)。

如果写入文件过量造成NameNode宕机。那么调高Kafka的存储大小,控制从Kafka到HDFS的写入速度。高峰期的时候用Kafka进行缓存,高峰期过去数据同步会自动跟上。

Hadoop数据倾斜问题

1)提前在map进行combine,减少传输的数据量

在Mapper加上combiner相当于提前进行reduce,即把一个Mapper中的相同key进行了聚合,减少shuffle过程中传输的数据量,以及Reducer端的计算量。

如果导致数据倾斜的key 大量分布在不同的mapper的时候,这种方法就不是很有效了

2)数据倾斜的key 大量分布在不同的mapper

在这种情况,大致有如下几种方法:

  • 「局部聚合加全局聚合」

第一次在map阶段对那些导致了数据倾斜的key 加上1到n的随机前缀,这样本来相同的key 也会被分到多个Reducer 中进行局部聚合,数量就会大大降低。

第二次mapreduce,去掉key的随机前缀,进行全局聚合。

「思想」:二次mr,第一次将key随机散列到不同 reducer 进行处理达到负载均衡目的。第二次再根据去掉key的随机前缀,按原key进行reduce处理。

这个方法进行两次mapreduce,性能稍差

「增加Reducer,提升并行度」

JobConf.setNumReduceTasks(int)

  • 「实现自定义分区」

根据数据分布情况,自定义散列函数,将key均匀分配到不同Reducer

NameNode和secondarynameNode工作机制

思考:NameNode中的元数据是存储在哪里的?

首先,我们做个假设,如果存储在NameNode节点的磁盘中,因为经常需要进行随机访问,还有响应客户请求,必然是效率过低。因此,元数据需要存放在内存中。但如果只存在内存中,一旦断电,元数据丢失,整个集群就无法工作了。因此产生在磁盘中备份元数据的FsImage。

这样又会带来新的问题,当在内存中的元数据更新时,如果同时更新FsImage,就会导致效率过低,但如果不更新,就会发生一致性问题,一旦NameNode节点断电,就会产生数据丢失。因此,引入Edits文件(只进行追加操作,效率很高)。每当元数据有更新或者添加元数据时,修改内存中的元数据并追加到Edits中。这样,一旦NameNode节点断电,可以通过FsImage和Edits的合并,合成元数据。

但是,如果长时间添加数据到Edits中,会导致该文件数据过大,效率降低,而且一旦断电,恢复元数据需要的时间过长。因此,需要定期进行FsImage和Edits的合并,如果这个操作由NameNode节点完成,又会效率过低。因此,引入一个新的节点SecondaryNamenode,专门用于FsImage和Edits的合并。

  1. 第一阶段:NameNode启动

(1)第一次启动NameNode格式化后,创建Fsimage和Edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。

(2)客户端对元数据进行增删改的请求。

(3)NameNode记录操作日志,更新滚动日志。

(4)NameNode在内存中对数据进行增删改。

  1. 第二阶段:Secondary NameNode工作

(1)Secondary NameNode询问NameNode是否需要CheckPoint。直接带回NameNode是否检查结果。

(2)Secondary NameNode请求执行CheckPoint。

(3)NameNode滚动正在写的Edits日志。

(4)将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode。

(5)Secondary NameNode加载编辑日志和镜像文件到内存,并合并。

(6)生成新的镜像文件fsimage.chkpoint。

(7)拷贝fsimage.chkpoint到NameNode。

(8)NameNode将fsimage.chkpoint重新命名成fsimage。

------------恢复内容结束------------

**------------恢复内容开始------------**

------------恢复内容开始------------

Hadoop知识点

什么是Hadoop

Hadoop是一个能够对大量数据进行分布式处理的软件框架。以一种可靠、高效、可伸缩的方式进行数据处理。主要包括三部分内容:Hdfs,MapReduce,Yarn


从2.7.3版本开始block size的默认大小为128M,之前版本的默认值是64M。对于新文件切分的大小,单位byte。每一个节点都要指定,包括客户端。

原理:文件块越大,寻址时间越短,但磁盘传输时间越长;文件块越小,寻址时间越长,但磁盘传输时间越短。


Hadoop和Spark差异


Hadoop常见版本,有哪些特点,一般是如何进行选择

Hadoop1.0由分布式存储系统HDFS和分布式计算框架MapReduce组成,其中HDFS由一个NameNode和多个DateNode组成,MapReduce由一个JobTracker和多个TaskTracker组成。在Hadoop1.0中容易导致单点故障,拓展性差,性能低,支持编程模型单一的问题。


Hadoop2.0即为克服Hadoop1.0中的不足,提出了以下关键特性:

  • Yarn:它是Hadoop2.0引入的一个全新的通用资源管理系统,完全代替了Hadoop1.0中的JobTracker。在MRv1 中的 JobTracker 资源管理和作业跟踪的功能被抽象为 ResourceManager 和 AppMaster 两个组件。Yarn 还支持多种应用程序和框架,提供统一的资源调度和管理功能

  • NameNode 单点故障得以解决:Hadoop2.2.0 同时解决了 NameNode 单点故障问题和内存受限问题,并提供 NFS,QJM 和 Zookeeper 三种可选的共享存储系统

  • HDFS 快照:指 HDFS(或子系统)在某一时刻的只读镜像,该只读镜像对于防止数据误删、丢失等是非常重要的。例如,管理员可定时为重要文件或目录做快照,当发生了数据误删或者丢失的现象时,管理员可以将这个数据快照作为恢复数据的依据

  • 支持Windows 操作系统:Hadoop 2.2.0 版本的一个重大改进就是开始支持 Windows 操作系统

  • Append:新版本的 Hadoop 引入了对文件的追加操作

同时,新版本的Hadoop对于HDFS做了两个非常重要的「增强」,分别是支持异构的存储层次和通过数据节点为存储在HDFS中的数据提供内存缓冲功能


相比于Hadoop2.0,Hadoop3.0 是直接基于 JDK1.8 发布的一个新版本,同时,Hadoop3.0引入了一些重要的功能和特性

  • HDFS可擦除编码:这项技术使HDFS在不降低可靠性的前提下节省了很大一部分存储空间

  • 多NameNode支持:在Hadoop3.0中,新增了对多NameNode的支持。当然,处于Active状态的NameNode实例必须只有一个。也就是说,从Hadoop3.0开始,在同一个集群中,支持一个 ActiveNameNode 和 多个 StandbyNameNode 的部署方式。

  • MR Native Task优化

  • Yarn基于cgroup 的内存和磁盘 I/O 隔离

  • Yarn container resizing


Hadoop常用端口号

dfs.namenode.http-address:50070
dfs.datanode.http-address:50075
SecondaryNameNode:50090
dfs.datanode.address:50010
fs.defaultFS:8020 或者9000
yarn.resourcemanager.webapp.address:8088
历史服务器web访问端口:19888
端口名称Hadoop2.xHadoop3.x
NameNode 内部通信端口 8020 / 9000 8020 / 9000/9820
NameNode HTTP UI 50070 9870
MapReduce 查看执行任务端口 8088 8088
jobhistoryserver历史服务器通信端口 19888 19888
datanode HTTP UI 50075  
SecondaryNameNode web端 50090  

搭建Hadoop集群的流程

1)准备一台虚拟机样机(配置好IP地址,主机名,映射,关闭防火墙,jdk安装。jdk安装需要配置环境变量,JAVA_HOME/bin目录下所有命令全局生效)

2)克隆两台虚拟机。(设置ssh免密登录,配置映射,修改主机名,和IP地址)

3)安装Hadoop解压到指定目录文件下

4)Hadoop文件配置

a)core-site.xml配置HDFS中namenode的地址指定Hadoop运行时产生文件的存储目录

b)hdfs-site.xml配置hdfs副本数量

c)yarn-site.xml配置reducer获取数据的方式指定yarn的resourceManager的地址

d)mapred-env.sh配置MR运行在yarn上,配置历史服务器端地址历史服务器web端地址

e)slaves配置其他虚拟机的主机名

f)hadoop-env.sh : Hadoop 环境配置文件 export JAVA_HOME=/usr/local/soft/jdk1.8.0_171

5)分发Hadoop的安装包(scp -r hadoop node1:pwd

6)格式化hdfs(hdfs namenode -format

7)启动集群(start-all.sh,因为我们是把resourcemanager和2nn放在master一台上进行启动)

Hadoop中需要哪些配置文件,其作用是什么?

1)core-site.xml

(1)fs.defaultFS:hdfs://cluster1(域名),这里的值指的是默认的HDFS路径 。

(2)hadoop.tmp.dir:/export/data/hadoop_tmp,这里的路径默认是NameNode、DataNode、secondaryNamenode等存放数据的公共目录。用户也可以自己单独指定这三类节点的目录。

(3)ha.zookeeper.quorum:hadoop101:2181,hadoop102:2181,hadoop103:2181,这里是ZooKeeper集群的地址和端口。注意,数量一定是奇数,且不少于三个节点 。

2)hadoop-env.sh: 只需设置jdk的安装路径,如:export JAVA_HOME=/usr/local/jdk。

3)hdfs-site.xml

(1) dfs.replication:他决定着系统里面的文件块的数据备份个数,默认为3个。

(2) dfs.data.dir:datanode节点存储在文件系统的目录 。

(3) dfs.name.dir:是namenode节点存储hadoop文件系统信息的本地系统路径 。

4)mapred-site.xml

mapreduce.framework.name: yarn指定mr运行在yarn上

HDFS读写流程

1)客户端通过Distributed FileSystem模块向NameNode请求上传文件,NameNode检查目标文件是否已存在,父目录是否存在。

2)NameNode返回是否可以上传。

3)客户端请求第一个 Block上传到哪几个DataNode服务器上。

4)NameNode返回3个DataNode节点,分别为dn1、dn2、dn3。

5)客户端通过FSDataOutputStream模块请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成。

6)dn1、dn2、dn3逐级应答客户端。

7)客户端开始往dn1上传第一个Block(先从磁盘读取数据放到一个本地内存缓存),以Packet为单位,dn1收到一个Packet就会传给dn2,dn2传给dn3;dn1每传一个packet会放入一个应答队列等待应答。

8)当一个Block传输完成之后,客户端再次请求NameNode上传第二个Block的服务器。(重复执行3-7步)。

1)客户端通过Distributed FileSystem向NameNode请求下载文件,NameNode通过查询元数据,找到文件块所在的DataNode地址。

2)挑选一台DataNode(就近原则,然后随机)服务器,请求读取数据。

3)DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet为单位来做校验)。

4)客户端以Packet为单位接收,先在本地缓存,然后写入目标文件。

MapReduce的Shuffle过程,Hadoop优化方案

MapReduce数据读取并写入HDFS流程实际上是有10步

Shuffle阶段

1、Map方法之后Reduce方法之前这段处理过程叫「Shuffle」

2、Map方法之后,数据首先进入到分区方法,把数据标记好分区,然后把数据发送到环形缓冲区;环形缓冲区默认大小100m,环形缓冲区达到80%时,进行溢写;溢写前对数据进行排序,排序按照对key的索引进行字典顺序排序,排序的手段「快排」;溢写产生大量溢写文件,需要对溢写文件进行「归并排序」;对溢写的文件也可以进行Combiner操作,前提是汇总操作,求平均值不行。最后将文件按照分区存储到磁盘,等待Reduce端拉取。

3、每个Reduce拉取Map端对应分区的数据。拉取数据后先存储到内存中,内存不够了,再存储到磁盘。拉取完所有数据后,采用归并排序将内存和磁盘中的数据都进行排序。在进入Reduce方法前,可以对数据进行分组操作。

Shuffle优化

1)Map阶段

(1)增大环形缓冲区大小。由100m扩大到200m

(2)增大环形缓冲区溢写的比例。由80%扩大到90%

(3)减少对溢写文件的merge次数。(10个文件,一次20个merge)

(4)不影响实际业务的前提下,采用Combiner提前合并,减少 I/O

2)Reduce阶段

(1)合理设置Map和Reduce数:两个都不能设置太少,也不能设置太多。太少,会导致Task等待,延长处理时间;太多,会导致 Map、Reduce任务间竞争资源,造成处理超时等错误。

(2)设置Map、Reduce共存:调整slowstart.completedmaps参数,使Map运行到一定程度后,Reduce也开始运行,减少Reduce的等待时间。

(3)规避使用Reduce,因为Reduce在用于连接数据集的时候将会产生大量的网络消耗。

(4)增加每个Reduce去Map中拿数据的并行数

(5)集群性能可以的前提下,增大Reduce端存储数据内存的大小。

3)IO传输

采用数据压缩的方式,减少网络IO的的时间。安装Snappy和LZOP压缩编码器。

压缩:

(1)map输入端主要考虑数据量大小和切片,支持切片的有Bzip2、LZO。注意:LZO要想支持切片必须创建索引;

(2)map输出端主要考虑速度,速度快的snappy、LZO;

(3)reduce输出端主要看具体需求,例如作为下一个mr输入需要考虑切片,永久保存考虑压缩率比较大的gzip。

4)整体

(1)NodeManager默认内存8G,需要根据服务器实际配置灵活调整,例如128G内存,配置为100G内存左右,yarn.nodemanager.resource.memory-mb。

(2)单任务默认内存8G,需要根据该任务的数据量灵活调整,例如128m数据,配置1G内存,yarn.scheduler.maximum-allocation-mb。

(3)mapreduce.map.memory.mb :控制分配给MapTask内存上限,如果超过会kill掉进程(报:Container is running beyond physical memory limits. Current usage:565MB of512MB physical memory used;Killing Container)。默认内存大小为1G,如果数据量是128m,正常不需要调整内存;如果数据量大于128m,可以增加MapTask内存,最大可以增加到4-5g。

(4)mapreduce.reduce.memory.mb:控制分配给ReduceTask内存上限。默认内存大小为1G,如果数据量是128m,正常不需要调整内存;如果数据量大于128m,可以增加ReduceTask内存大小为4-5g。

(5)mapreduce.map.java.opts:控制MapTask堆内存大小。(如果内存不够,报:java.lang.OutOfMemoryError)

(6)mapreduce.reduce.java.opts:控制ReduceTask堆内存大小。(如果内存不够,报:java.lang.OutOfMemoryError)

(7)可以增加MapTask的CPU核数,增加ReduceTask的CPU核数

(8)增加每个Container的CPU核数和内存大小

(9)在hdfs-site.xml文件中配置多目录

(10)NameNode有一个工作线程池,用来处理不同DataNode的并发心跳以及客户端并发的元数据操作。dfs.namenode.handler.count=20 * log2(Cluster Size),比如集群规模为10台时,此参数设置为60。

基于MapReduce做Hadoop的优化

1)HDFS小文件影响

  • 影响NameNode的寿命,因为文件元数据存储在NameNode的内存中

  • 影响计算引擎的任务数量,比如每个小的文件都会生成一个Map任务

2)数据输入小文件处理

  • 合并小文件:对小文件进行归档(Har)、自定义Inputformat将小文件存储成SequenceFile文件。

  • 采用ConbinFileInputFormat来作为输入,解决输入端大量小文件场景

  • 对于大量小文件Job,可以开启JVM重用会减少45%运行时间

3)Map阶段

  • 增大环形缓冲区大小。由100m扩大到200m

  • 增大环形缓冲区溢写的比例。由80%扩大到90%

  • 减少对溢写文件的merge次数。(10个文件,一次20个merge)

  • 不影响实际业务的前提下,采用Combiner提前合并,减少 I/O

4)Reduce阶段

  • 合理设置Map和Reduce数:两个都不能设置太少,也不能设置太多。太少,会导致Task等待,延长处理时间;太多,会导致 Map、Reduce任务间竞争资源,造成处理超时等错误。

  • 设置Map、Reduce共存:调整 slowstart.completedmaps 参数,使Map运行到一定程度后,Reduce也开始运行,减少Reduce的等待时间

  • 规避使用Reduce,因为Reduce在用于连接数据集的时候将会产生大量的网络消耗。

  • 增加每个Reduce去Map中拿数据的并行数

  • 集群性能可以的前提下,增大Reduce端存储数据内存的大小

5)IO传输

  • 采用数据压缩的方式,减少网络IO的的时间

  • 使用SequenceFile二进制文件

6)整体

  • MapTask默认内存大小为1G,可以增加MapTask内存大小为4

  • ReduceTask默认内存大小为1G,可以增加ReduceTask内存大小为4-5g

  • 可以增加MapTask的cpu核数,增加ReduceTask的CPU核数

  • 增加每个Container的CPU核数和内存大小

  • 调整每个Map Task和Reduce Task最大重试次数

7)压缩

详细查看Hadoop3.x的mapreduce压缩P58

Yarn的job提交流程

job提交过程详解

(1)作业提交

1)Clinet调用job.waitForCompletion方法,向整个集群提交MapReduce作业**

2)Clinet向RM申请一个作业id**

3)RM给Clinet返回该job资源的提交路径和作业id**

4)Client 提交 jar 包、切片信息和配置文件到指定的资源提交路径。**

5)Client 提交完资源后,向 RM 申请运行 MrAppMaster。

(2)作业初始化

6)当 RM 收到 Client 的请求后,将该 job 添加到容量调度器

7)某一个空闲的 NM 领取到该 Job。

8)该 NM 创建 Container,并产生 MRAppmaster。

9)下载 Client 提交的资源到本地。

(3)任务分配

10)MrAppMaster 向 RM 申请运行多个 MapTask 任务资源。

11)RM 将运行 MapTask 任务分配给另外两个 NodeManager,另两个 NodeManager

分别领取任务并创建容器。

(4)任务运行

12)MR 向两个接收到任务的 NodeManager 发送程序启动脚本,这两个NodeManager 分别启动 MapTask,MapTask 对数据分区排序。

13)MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。

14)ReduceTask 向 MapTask 获取相应分区的数据

15)程序运行完毕后,MR 会向 RM 申请注销自己。

(5)进度和状态更新

YARN 中的任务将其进度和状态(包括 counter)返回给应用管理器, 客户端每秒(通过mapreduce.client.progressmonitor.pollinterval 设置)向应用管理器请求进度更新, 展示给用户。

(6)作业完成

除了向应用管理器请求作业进度外, 客户端每 5 秒都会通过调用 waitForCompletion()来检查作业是否完成。时间间隔可以通过 mapreduce.client.completion.pollinterval 来设置。作业完成之后, 应用管理器和 Container 会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。

其中简略版对应的步骤分别如下:
1、client向RM提交应用程序,其中包括启动该应用的ApplicationMaster的必须信息,例如ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等
2、ResourceManager启动一个container用于运行ApplicationMaster
3、启动中的ApplicationMaster向ResourceManager注册自己,启动成功后与RM保持心跳
4、ApplicationMaster向ResourceManager发送请求,申请相应数目的container
5、申请成功的container,由ApplicationMaster进行初始化。container的启动信息初始化后,AM与对应的NodeManager通信,要求NM启动container
6、NM启动container
7、container运行期间,ApplicationMaster对container进行监控。container通过RPC协议向对应的AM汇报自己的进度和状态等信息
8、应用运行结束后,ApplicationMaster向ResourceManager注销自己,并允许属于它的container被收回

Yarn默认的调度器,分类,以及它们之间的区别

1)Hadoop调度器主要分为三类:

  • FIFO Scheduler:先进先出调度器:优先提交的,优先执行,后面提交的等待【生产环境不会使用】

  • Capacity Scheduler:容量调度器:允许看创建多个任务对列,多个任务对列可以同时执行。但是一个队列内部还是先进先出。【Hadoop2.7.2默认的调度器】

  • Fair Scheduler:公平调度器:第一个程序在启动时可以占用其他队列的资源(100%占用),当其他队列有任务提交时,占用资源的队列需要将资源还给该任务。还资源的时候,效率比较慢。【CDH版本的yarn调度器默认】

Hadoop的参数优化

  • 在hdfs-site.xml文件中配置多目录,最好提前配置好,否则更改目录需要重新启动集群

  • NameNode有一个工作线程池,用来处理不同DataNode的并发心跳以及客户端并发的元数据操作

  • 编辑日志存储路径dfs.namenode.edits.dir设置与镜像文件存储路径dfs.namenode.name.dir尽量分开,达到最低写入延迟

  • 服务器节点上YARN可使用的物理内存总量,默认是8192(MB),注意,如果你的节点内存资源不够8GB,则需要调减小这个值,而YARN不会智能的探测节点的物理内存总量

  • 单个任务可申请的最多物理内存量,默认是8192(MB

Hadoop宕机

如果MR造成系统宕机。此时要控制Yarn同时运行的任务数,和每个任务申请的最大内存。调整参数:yarn.scheduler.maximum-allocation-mb(单个任务可申请的最多物理内存量,默认是8192MB)。

如果写入文件过量造成NameNode宕机。那么调高Kafka的存储大小,控制从Kafka到HDFS的写入速度。高峰期的时候用Kafka进行缓存,高峰期过去数据同步会自动跟上。

Hadoop数据倾斜问题

1)提前在map进行combine,减少传输的数据量

在Mapper加上combiner相当于提前进行reduce,即把一个Mapper中的相同key进行了聚合,减少shuffle过程中传输的数据量,以及Reducer端的计算量。

如果导致数据倾斜的key 大量分布在不同的mapper的时候,这种方法就不是很有效了

2)数据倾斜的key 大量分布在不同的mapper

在这种情况,大致有如下几种方法:

  • 「局部聚合加全局聚合」

第一次在map阶段对那些导致了数据倾斜的key 加上1到n的随机前缀,这样本来相同的key 也会被分到多个Reducer 中进行局部聚合,数量就会大大降低。

第二次mapreduce,去掉key的随机前缀,进行全局聚合。

「思想」:二次mr,第一次将key随机散列到不同 reducer 进行处理达到负载均衡目的。第二次再根据去掉key的随机前缀,按原key进行reduce处理。

这个方法进行两次mapreduce,性能稍差

「增加Reducer,提升并行度」

JobConf.setNumReduceTasks(int)

  • 「实现自定义分区」

根据数据分布情况,自定义散列函数,将key均匀分配到不同Reducer

NameNode和secondarynameNode工作机制

思考:NameNode中的元数据是存储在哪里的?

首先,我们做个假设,如果存储在NameNode节点的磁盘中,因为经常需要进行随机访问,还有响应客户请求,必然是效率过低。因此,元数据需要存放在内存中。但如果只存在内存中,一旦断电,元数据丢失,整个集群就无法工作了。因此产生在磁盘中备份元数据的FsImage。

这样又会带来新的问题,当在内存中的元数据更新时,如果同时更新FsImage,就会导致效率过低,但如果不更新,就会发生一致性问题,一旦NameNode节点断电,就会产生数据丢失。因此,引入Edits文件(只进行追加操作,效率很高)。每当元数据有更新或者添加元数据时,修改内存中的元数据并追加到Edits中。这样,一旦NameNode节点断电,可以通过FsImage和Edits的合并,合成元数据。

但是,如果长时间添加数据到Edits中,会导致该文件数据过大,效率降低,而且一旦断电,恢复元数据需要的时间过长。因此,需要定期进行FsImage和Edits的合并,如果这个操作由NameNode节点完成,又会效率过低。因此,引入一个新的节点SecondaryNamenode,专门用于FsImage和Edits的合并。

  1. 第一阶段:NameNode启动

(1)第一次启动NameNode格式化后,创建Fsimage和Edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。

(2)客户端对元数据进行增删改的请求。

(3)NameNode记录操作日志,更新滚动日志。

(4)NameNode在内存中对数据进行增删改。

  1. 第二阶段:Secondary NameNode工作

(1)Secondary NameNode询问NameNode是否需要CheckPoint。直接带回NameNode是否检查结果。

(2)Secondary NameNode请求执行CheckPoint。

(3)NameNode滚动正在写的Edits日志。

(4)将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode。

(5)Secondary NameNode加载编辑日志和镜像文件到内存,并合并。

(6)生成新的镜像文件fsimage.chkpoint。

(7)拷贝fsimage.chkpoint到NameNode。

(8)NameNode将fsimage.chkpoint重新命名成fsimage。

------------恢复内容结束------------

**------------恢复内容结束------------**

原文地址:https://www.cnblogs.com/xiguabigdata/p/15003522.html