目录
1. MapReduce基本介绍
什么是MapReduce
MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在Hadoop集群上。
MapReduce思想
核心思想是:先分再合,分而治之,就是把一个复杂的问题先分为若干个小问题,计算后再汇总。map负责“分”,reduce负责“和”,这样做的好处是若干个小问题可以并行同时处理,彼此几乎无依赖关系,每个任务处理完都是一个局部的结果,最后reduce进行全局汇总计算,以此提升效率。不过前提是任务可以拆分,拆分之后没有依赖关系。
该思想在大数据上的应用
对相互间不具有计算依赖关系的大数据计算任务,实现并行最自然的办法就是采取MapReduce分而治之的策略。首先Map阶段进行拆分,把大数据拆分成若干份小数据,多个程序同时并行计算产生中间结果;然后是Reduce聚合阶段,通过程序对并行的结果进行最终的汇总计算,得出最终的结果。需要特别注意不可拆分的计算任务或相互间有依赖关系的数据无法进行并行计算。
MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序串行运行。MapReduce处理的数据类型是<key,value>键值对。实际使用中考虑每个阶段输入输出 key和value是什么。
MapReduce优点与缺点
- 优点:易于编程、良好的扩展性、高容错性、适合海量数据的离线处理
- 缺点:实时计算性能差、不能进行流式运算
MapReduce官方word count案例演示
首先在HDFS的/wordcount/input里准备一份单词数据
里面是几行单词:
现在任务是统计每个单词出现了几次,而且要使用mapreduce实现,在Hadoop的share/hadoop/mapreduce目录下官方提供了hadoop-mapreduce-examples-3.3.0.jar的jar包来实现简单的示例。执行下行代码,执行wordcount案例:
hadoop jar hadoop-mapreduce-examples-3.3.0.jar wordcount /wordcount/input /wordcount/output
可以看到首先是Connecting to ResourceManager连接yarn的RM申请资源,然后执行mapreduce,而且是map先100%了reduce才100%。
在指定的目标目录下生成了2个文件,一个是成功标识,一个是分区文件,这里只有一个分区,点开后可以看到已经完成了单词计数。
在word count案例中map阶段把输入的数据经过切割,全部标记1,输出就是<单词,1>。中间还有个shuffle阶段,经过默认的排序分区分组,key相同的单词会作为一组数据构成新的kv对。reduce阶段处理shuffle完的一组数据,该组数据就是该单词所有的键值对。对所有的1进行累加求和,就是单词的总次数。
读取数据组件,写出数据组件MR框架已经封装好,读取数据组件InputFormat、输出数据组件OutputFormat。
2. MapReduce原理
mapreduce超详细全流程:
- 把输入所有文件按照一定的标准逐个进行逻辑切片,形成切片规划。默认情况下,切片个数 = block块的个数,一个块默认是128M,每一个切片由一个MapTask处理,因此MapTask个数=切片个数=block块个数,其最终还是由文件个数和大小综合决定。
- 接下来每个map都相同,以一个map为例,其中读取数据的组件TextInputFormat是按照行进行读取的,一次读取一行数据,默认规则是把每一行文本内容解析成键值对。key是每一行的起始位置(单位是字节),value是本行的文本内容。
- 调用Mapper类中的map方法转换成我们的自己想要的kv结果 上阶段中每解析出来的一个<k,v>,调用一次map方法。每次调用map方法会输出零个或多个键值对。
- 按照一定的规则对第三阶段输出的键值对进行分区。默认是只有1个reducetask,只要用户不设置永远默认1个,用户也可以通过代码job.setNumReduceTasks(N)进行设置 。map输出的结果默认按照HashPartitoner哈希取模来分配分区编号,也就是map输出key的hashcode和reducetask个数取模的余数就是分区编号,虽然不能保证平均分配,但key一样的会分到一个区。
- 将MapTask的结果输出到默认大小为100M的环形缓冲区,保存的是key/value,Partition分区信息等,缓冲区数据大于0.8阈值触发溢写操作,将数据写入本地磁盘,溢写操作单独的线程进行处理的,如果单条数据过大超过80M则不经过缓冲区直接写入磁盘,在将数据写入磁盘之前需要对每个分区数据按key进行一次排序的操作(快速排序)。
- 把所有溢出的临时文件进行一次合并操作,以确保一个MapTask最终只产生一个中间数据文件,合并小文件的时候同样进行排序(归并排序),最终产生一个有序的大文件,如果设置了combinclass(需要算法满足结合律),先在map端对数据进行一个压缩,再进行传输,map任务结束,reduce任务开始。
- ReduceTask启动Fetcher线程到已经完成MapTask的节点上复制一份属于自己的数据,这些数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值的时候,就会将数据写到磁盘之上。根据我们的hash取模规则相同的key会拉取到同一个Reduce节点,但是一个Reduce节点可以有多个key。
- 在ReduceTask远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操作,同时进行排序(归并排序),先基于内容做合并排序 , 当达到阈值溢写磁盘与spill溢写类似。
- 执行用户提供的reduce计算,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对。最后把这些输出的键值对写入到HDFS文件中。一个reduce输出一个文件。
Map产生输出开始到Reduce取得数据作为输入之前的过程称作shuffle。