MapReduce设计模式

时间:2022-05-07
本文章向大家介绍MapReduce设计模式,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

一:概要模式 1:简介 概要设计模式更接近简单的MR应用,因为基于键将数据分组是MR范型的核心功能,所有的键将被分组汇入reducer中 本章涉及的概要模式有数值概要(numerical summarization),倒排索引(inverted index),计数器计数(counting with counter)2:概要设计模式包含 2.1:关于Combiner和paritioner combiner:reducer之前调用reducer函数,对数据进行聚合,极大的减少通过网络传输到reducer端的key/value数量,适用的条件是你可以任意的改变值的顺序,并且可以随意的将计算进行分组,同时需要注意的是一个combiner函数只对一个map函数有作用 partitioner:许多概要模式通过定制partitioner函数实现更优的将键值对分发到n个reducer中,着这样的需求场景会比较少,但如果任务的执行时间要求很高,数据量非常大,且存在数据倾斜的情况,定制partitioner将是非常有效的解决方案 源码分析请点击 编程实例请点击 2.2:数值概要模式 2.2.1:数值概要模式:计算数据聚合统计的一般性模式 2.2.2:数值概要应用的场景需要满足以下亮点: 1:要处理的数据是数值数据或者计数 2:数据可以按照某些特定的字段分组 2.2.3:适用场景: 1:单词计数 (可以使用combiner) 2:最大值/最小值/计数 (可以使用combiner) 3:平均值 (可以使用combiner,但必须做相应的处理,即迂回算法,举例如下) 给定用户的评论列表,按天计算每小时的评论长度 Map:context.write(1,tuple(1,1小时的平均长度)) reducer: 处理:sum += tulpe.gethour * tuple.getavrg count += tuple.gethour 输出: key=1 value=sum/count 4:中位数/标准差2.3:倒排索引概要

      适用场景:通常用在需要快速搜索查询响应的场景,可以对一个查询结果进行预处理并存储在一个[数据库](http://lib.csdn.net/base/mysql)中
     [倒排索引实战1](http://blog.csdn.net/gamer_gyt/article/details/47101351)         [倒排索引实战2](http://blog.csdn.net/gamer_gyt/article/details/50014143)

2.4:计数器计数 已知应用 统计记录数:简单的对指定时间段的记录数进行统计是很常见的,统计小数量级的唯一实例计数 汇总:用来执行对数据的某些字段进行汇总 二:过滤模式 1:简介 过滤模式也可以被认为是一种搜索形式,如果你对找出所有具备特定信息的记录感兴趣,就可以过滤掉不匹配搜索条件的其他记录,与大多数基础模式类似,过滤作为一种抽象模式为其他模式服务,过滤简单的对某一条记录进行评估,并基于某个条件作出判断,以确定当前这条记录是保留还是丢弃 2:适用场景 2.1:过滤, 使用过滤的唯一必要条件是数据可以被解析成记录,并可以通过非常特定的准则来确定它们是否需要保留,不需要reducer函数 近距离观察数据:准备一个特定的子集,子集中的记录有某些共同属性或者具备某些有趣的特性,需要进一步深入的分析。 跟踪某个事件的线索:从一个较大数据集中抽取一个连续事件作为线索来做案例研究。 分布式grep:通过一个正则表达式匹配每一行,输出满足条件的行 数据清理:数据有时是畸形的,不完整的 或者是格式错误的,过滤可以用于验证每一条数据是否满足记录,将不满足的数据删除 ** 简单随机抽样:可以使用随机返回True or False的评估函数做过滤,可以通过调小true返回的概率实现对结果集合大小的控制 ** 移除低分值数据:将不满足某个特定阀值的记录过滤出去 2.2:布隆过滤, 对每一条记录,抽取其中一个特征,如果抽取的特性是布隆过滤中所表示的值的集合成员,则保留记录 移除大多数不受监视的值:最直接的使用案例是清楚不感兴趣的值 对成本很高的集合成员资格检查做数据的预先过滤: 2.3:Top10,不管输入数据的大小是多少,你都可以精确的知道输出的结果的记录数 异类分析: 选取感兴趣的数据: 引人注目的指标面板: 2.4:去重,过滤掉数据集中的相似数据,找出唯一的集合 数据去重: 代码举例 抽取重复值: 规避内连接的数据膨胀:

三:数据组织模式 1:分层结构模式 分层模式是从数据中创造出不同于原结构的新纪录 适用场景:数据源被外部链接,数据是结构化的并且是基于行的 <MultipleInputs类:用于指定多个Mapper任务进行不同格式文件的输入>2:分区和分箱模式 分区:将记录进行分类(即分片,分区或者分箱),但他并不关心记录的顺序,目地是将数据集中相似的记录分成不同的,更小的数据集,在该模式下数据是通过自定义Map的分区器进行分区的。 分箱:是在不考虑记录顺序的情况下对记录进行分类,目的是将数据集中每条记录归档到一个或者多个举例 两者的不同之处在于分箱是在Map阶段对数据进行拆分,其好处是减少reduce的工作量,通常使资源分布更有效,缺点是每个mapper将为每个可能输出的箱子创建文件,对后续的分析十分不利3:全排序和混排模式 全排序:关注的是数据从记录到记录的顺序,目的是能够按照指定的键进行并行排序。适用的范围是排序的键必须具有可比性只有这样数据才能被排序 混排序:关注记录在数据集中的顺序,目的是将一个给定的记录完全随机化4:数据生成模式 四:连接模式

SQL连接模式包括内连接和外连接eg:A表 B表 内连接:只连接两个表中都用的外键连接(eg 以ID作为连接键,只连接有相同ID) 外连接:1:做外连接 以用户ID为外键的A+B做外连接 以A表为基准,A表数据全部显示,B表中不在A表中的ID显示为null2:右外连接 和做外连接相反3:全外连接 左外连接和右外连接的合并,有相同ID 的显示,没有相同ID的显示为NULL 反连接:全外连接减去内连接的结果1:reduce端连接: 相当其他连接模式来讲用时最长,但是也是实现简单并且支持所有不同类型的操作 适用场景:1:多个大数据需要按一个外键做链接操作,如果除了一个数据集以外,其他所有的数据集都可以放入内存,可以尝试使用复制连接 2:你需要灵活的执行任意类型的连接操作 等效的SQL:Select user.id,user.location,comment.uprotes from user [inner | left | right] join comments on user.id=comments.userid 优化方案:可以使用布隆过滤器执行reduce端的连接2:复制连接: 是一种特殊类型的连接操作,是在一个打的数据集和许多小的数据集之间通过MAP端执行的连接的操作,该模式完全消除了混排数据到reduce的需求 适用场景: 1:要执行的连接类型是由内连接或者左外连接,且大的输入数据集在连接操作符的“左边”时 2:除一个大的数据集外,所有的数据集都可以存入每个Map任务的内存中 性能分析:因为不需要reduce,因此在所有连接模式是最快的一种,代价是数据量,数据要能完全的储存在JVM中,这极大的受限于你愿意为每个Map和reduce分配多少内存 3:组合连接: 是一种非常特殊的连接操作,他可以在map端对许多非常大的格式化输入做连接,需要预先组织好的或者是使用特定的方式预处理过的,即在使用这个类型的连接操作之前,必须按照外键对数据集进行排序个分区,并以一种非常特殊的方式读入数据集 Hadoop通过CompositeInputFormat来支持组合连接方式 仅适用于内连接和全外连,每一个mapper的输入都需要按照指定的方式做分区和排序,对于每一个输入数据集都要分成相同数目的分区,此外,对应于某个特定的外链所做的所有记录必须处于同一分区中 通常情况下这发生在几个作业的输出有相同数量的reducer和相同的外键,并且输出文件是不可拆分的即不大于一个hdfs文件快的大小或是gzip压缩的 适用场景: 1:需要执行的是内连接或者全外连接 2:所有的数据集都足够大 3:所有的数据集都可以用相同的外键当mapper的输入键读取 4:所有的数据集有相同的数据的分区 5:数据集不会经常改变 6:每一个分区都是按照外键排序的,并且所有的外键都出现在关联分区的每个数据集中4:笛卡尔积: 是一种有效的将多个输入源的灭一个记录跟所有其他记录配对的方式适用场景: 1:需要分析各个记录的所有配对之间的关系 2:没有其他方法可以解决这个问题 3:对执行时间没有限制 等效的SQL:SELECT * FROM t1,t2 等效的PIG:CROSS a,b; 五:元模式 关于模式的模式1:作业链 针对MapReduce处理小的文件时,优化的办法是可以在作业中始终执行CombineFileInputFormat加载间歇性的输出,在进入mapper处理之前,CombineFileInputFormat会将小的块组合在一起形成较大的输入split当执行做个作业的作业链时,可以使用job.submit方法代替job.waitForCompletion()来并行的启动多个作业,调用submit方法后会立即返回至当前线程,而作业在后台运行,该方法允许一次执行多个任务, job.isComplete()是检查一个作业是否完成的非阻塞方法,该方法可以通过不断轮询的方式判断所有作业是否完成如果检测到一个依赖的作业失败了,此时你应该退出整个作业链,而不是试图让他继续示例:(1)基本作业(2)并行作业链(3)关于Shelll脚本(4)关于JobControl2:链折叠 链折叠是应用于MapReduce作业链的一种优化方法,基本上他是一个经验法则,即每一条记录都可以提交至多个mapper或者一个reducer,然后再交给一个mapper这种合并处理能够减少很多读取文件和传输数据的时间,作业链的这种结构使得这种方法是可行的,因为map阶段是完全无法共享的,因此map并不关心数据的组织形式和或者数据有没有分组,在构建大的作业链时,通过将作业链折叠,使得map阶段合并起来带来很大的性能提升链折叠的主要优点是减少mapreduce管道中移动的数据量作业链中有许多模式,可以通过下面介绍的这些方法来查找和确认哪些可以折叠(1)看看作业链的map阶段,如果多个map阶段是相邻的,将他们合并到一个阶段(2)如果作业链是以map阶段结束,将这个阶段移动到前一个reducer里边,他除去了写临时数据的IO操作,然后在reduce中执行只有map的作业,这同一也能减少任务启动的开销(3)注意,作业链的第一个map阶段无法 从下一个优化中获益,尽可能的在减少数据量(如过滤)的操作和增加数据量(如丰富)的操作之间拆分每个map阶段(合并或者其他)注意:(1)合并阶段需要大量的内存,例如将5个复制连接合并在一起可能不是一个好的选择,因为他将可能超过任务可用的总内存,在这些情况下,最好将这些操作分开(2)不管一个作业是不是作业链,都要尽早尽可能的去过滤掉更多的数据,mr作业开销最大的部分通常都是管道推送数据:加载数据,混排/排序阶段,以及存储数据实现折叠链有两种主要方法:(1)手动裁剪然后将代码粘贴在一起(2)使用特殊类ChainMapper和ChainReducer(特殊类介绍参考:http://www.iteye.com/topic/1134144)3:作业归并 和作业链折叠一样,作业归并是另一种减少MR管道IO管道的优化方法,通过作业归并可以使得加载同一份数据的两个不相关作业共享MR管道,作业归并最主要的优点是数据只需要加载和解析一次。先决条件是:两个作业必须有相同的中间键和输出格式,因为他们将共享管道,因而需要使用相同的数据类型,如果这的确是一个问题的话,可以使用序列化或者多态,但会增加复制度作业归并步骤如下:(1)将两个mapper代码放在一起(2)在mapper中生成键和值时,需要用标签加以标记,以区别map源(3)在reducer中,在解析出标签后使用if语句切换到相应的reducer代码中去执行(4)使用multipleOutputs将作业的输出分来

六:输入输出模式 自定义输入与输出

Hadoop自定义输入和输出 Hadoop允许用户修改从磁盘加载数据的方式,修改方式有两种: 1:配置如何根据HDFS的块生成连续的输入分块,配置记录在map阶段如何实现。 为此将要用到的两个类即,RecordReader和InputFormat 2:hadoop也允许用户通过类似的方式修改数据的存储形式 通过OutputFormat和RecordWriter实现

1:生成数据 这个模式下是只有Map的 (1)InputFormat凭空创建split (2)RecordReader读入虚的split并根据他生成随机记录 (3)某些情况下,能够在split中赋予一些信息,告诉recordreader生成什么 (4)通常情况下,IdentityMap仅仅将读入的数据原样输出 2:外部源输出 外部源输出详细描述:在作业提交之前,OutputFormat将验证作业配置中指定的输出规范。RecordReader负责将所有的键值对写入外部源 性能分析:必须小心数据接收者能否处理并行连接。有1000个任务将数据写入到单个SQL数据库中,者=这工作起来并不好,为避免这种情况你可能不得不让每个reducer多处理一些数据以减少写入到数据接收者的并行度,如果数据接收者支持并行写入,那么这未必是个问题。 3:外部源输入 在MapReduce中数据是以并行的方式加载而不是以串行的方式,为了能够大规模的读取数据,源需要有定义良好的边界

MR实现该模式的瓶颈将是数据源或网络,数据源对于多连接可能不具很好的扩展性,同时给定的数据源可能与MR集群的网络不在同一个网络环境下 4:分区裁剪 分区裁剪模式将通过配置决定框架如何选取输入split以及如何基于文件名过滤加载到MR作业的文件 描述:分区裁剪模式是在InputFormat中实现的,其中getsplit方法是我们需要特别注意的,因为他确定了要创建的输入split,进而确定map任务的个数, RecordReader的实现依赖于数据是如何存储的