mapreduce mapper key
- 作者: WC叶子
- 来源: 51数据库
- 2020-09-26
从Map到Reduce
MapReduce其实是分治算法的一种实现,其处理过程亦和用管道命令来处理十分相似,一些简单的文本字符的处理甚至也可以使用Unix的管道命令来替代,从处理流程的角度来看大概如下:
cat input | grep | sort | uniq -c | cat > output # Input -> Map -> Shuffle & Sort -> Reduce -> Output
简单的流程图如下:
对于Shuffle,简单地说就是将Map的输出通过一定的算法划分到合适的Reducer中进行处理。Sort当然就是对中间的结果进行按key排序,因为Reducer的输入是严格要求按key排序的。
Input->Map->Shuffle&Sort->Reduce->Output只是从宏观的角度对MapReduce的简单描述,实际在MapReduce的框架中,即从编程的角度来看,其处理流程是Input->Map->Sort->Combine->Partition->Reduce->Output。用之前的对温度进行统计的例子来讲述这些过程。
Input Phase
输入的数据需要以一定的格式传递给Mapper的,格式有多种,如TextInputFormat、DBInputFormat、SequenceFileInput等等,可以使用JobConf.setInputFormat来设置,这个过程还应该包括对输入的数据进行任务粒度划分(split)然后再传递给Mapper。在温度的例子中,由于处理的都是文本数据,输入的格式使用默认的TextInputFormat即可。
Map Phase
对输入的key、value对进行处理,输出的是key、value的集合,即map (k1, v1) -> list(k2, v2),使用JobConf.setMapperClass设置自己的Mapper。在例子中,将(行号、温度的文本数据)作为key/value输入,经过处理后,从温度的文件数据中提取出日期中的年份和该日的温度数据,形成新的key/value对,最后以list(年, 温度)的结果输出,如[(1950, 10), (1960, 40), (1960, 5)]。
Sort Phase
对Mapper输出的数据进行排序,可以通过JobConf.setOutputKeyComparatorClass来设置自己的排序规则。在例子中,经过排序之后,输出的list集合是按年份进行排序的list(年, 温度),如[(1950, 10), (1950, 5), (1960, 40)]。
Combine Phase
这个阶段是将中间结果中有相同的key的<key, value>对合并成一对,Combine的过程与Reduce很相似,使用的甚至是Reduce的接口。通过Combine能够减少<key, value>的集合数量,从而减少网络流量。Combine只是一个可选的优化过程,并且无论Combine执行多少次(>=0),都会使Reducer产生相同的输出,使用JobConf.setCombinerClass来设置自定义的Combine Class。在例子中,假如map1产生出的结果为[(1950, 0), (1950, 20), (1950, 10)],在map2产生出的结果为[(1950, 15), (1950, 25)],这两组数据作为Reducer的输入并经过Reducer处理后的年最高温度结果为(1950, 25),然而当在Mapper之后加了Combine(Combine先过滤出最高温度),则map1的输出是[(1950, 20)]和map2的输出是[(1950, 25)],虽然其他的三组数据被抛弃了,但是对于Reducer的输出而言,处理后的年最高温度依然是(1950, 25)。
Partition Phase
把Mapper任务输出的中间结果按key的范围划分成R份(R是预先定义的Reduce任务的个数),默认的划分算法是”(key.hashCode() & Integer.MAX_VALUE) % numPartitions”,这样保证了某一范围的key一定是由某个Reducer来处理,简化了Reducer的处理流程,使用JobConf.setPartitionClass来设置自定义的Partition Class。在例子中,默认就自然是对年份进行取模了。
Reduce Phase
Reducer获取Mapper输出的中间结果,作为输入对某一key范围区间进行处理,使用JobConf.setReducerClass来设置。在例子中,与Combine Phase中的处理是一样的,把各个Mapper传递过来的数据计算年最高温度。
Output Phase
Reducer的输出格式和Mapper的输入格式是相对应的,当然Reducer的输出还可以作为另一个Mapper的输入继续进行处理。
MapReduce其实是分治算法的一种实现,其处理过程亦和用管道命令来处理十分相似,一些简单的文本字符的处理甚至也可以使用Unix的管道命令来替代,从处理流程的角度来看大概如下:
cat input | grep | sort | uniq -c | cat > output # Input -> Map -> Shuffle & Sort -> Reduce -> Output
简单的流程图如下:
对于Shuffle,简单地说就是将Map的输出通过一定的算法划分到合适的Reducer中进行处理。Sort当然就是对中间的结果进行按key排序,因为Reducer的输入是严格要求按key排序的。
Input->Map->Shuffle&Sort->Reduce->Output只是从宏观的角度对MapReduce的简单描述,实际在MapReduce的框架中,即从编程的角度来看,其处理流程是Input->Map->Sort->Combine->Partition->Reduce->Output。用之前的对温度进行统计的例子来讲述这些过程。
Input Phase
输入的数据需要以一定的格式传递给Mapper的,格式有多种,如TextInputFormat、DBInputFormat、SequenceFileInput等等,可以使用JobConf.setInputFormat来设置,这个过程还应该包括对输入的数据进行任务粒度划分(split)然后再传递给Mapper。在温度的例子中,由于处理的都是文本数据,输入的格式使用默认的TextInputFormat即可。
Map Phase
对输入的key、value对进行处理,输出的是key、value的集合,即map (k1, v1) -> list(k2, v2),使用JobConf.setMapperClass设置自己的Mapper。在例子中,将(行号、温度的文本数据)作为key/value输入,经过处理后,从温度的文件数据中提取出日期中的年份和该日的温度数据,形成新的key/value对,最后以list(年, 温度)的结果输出,如[(1950, 10), (1960, 40), (1960, 5)]。
Sort Phase
对Mapper输出的数据进行排序,可以通过JobConf.setOutputKeyComparatorClass来设置自己的排序规则。在例子中,经过排序之后,输出的list集合是按年份进行排序的list(年, 温度),如[(1950, 10), (1950, 5), (1960, 40)]。
Combine Phase
这个阶段是将中间结果中有相同的key的<key, value>对合并成一对,Combine的过程与Reduce很相似,使用的甚至是Reduce的接口。通过Combine能够减少<key, value>的集合数量,从而减少网络流量。Combine只是一个可选的优化过程,并且无论Combine执行多少次(>=0),都会使Reducer产生相同的输出,使用JobConf.setCombinerClass来设置自定义的Combine Class。在例子中,假如map1产生出的结果为[(1950, 0), (1950, 20), (1950, 10)],在map2产生出的结果为[(1950, 15), (1950, 25)],这两组数据作为Reducer的输入并经过Reducer处理后的年最高温度结果为(1950, 25),然而当在Mapper之后加了Combine(Combine先过滤出最高温度),则map1的输出是[(1950, 20)]和map2的输出是[(1950, 25)],虽然其他的三组数据被抛弃了,但是对于Reducer的输出而言,处理后的年最高温度依然是(1950, 25)。
Partition Phase
把Mapper任务输出的中间结果按key的范围划分成R份(R是预先定义的Reduce任务的个数),默认的划分算法是”(key.hashCode() & Integer.MAX_VALUE) % numPartitions”,这样保证了某一范围的key一定是由某个Reducer来处理,简化了Reducer的处理流程,使用JobConf.setPartitionClass来设置自定义的Partition Class。在例子中,默认就自然是对年份进行取模了。
Reduce Phase
Reducer获取Mapper输出的中间结果,作为输入对某一key范围区间进行处理,使用JobConf.setReducerClass来设置。在例子中,与Combine Phase中的处理是一样的,把各个Mapper传递过来的数据计算年最高温度。
Output Phase
Reducer的输出格式和Mapper的输入格式是相对应的,当然Reducer的输出还可以作为另一个Mapper的输入继续进行处理。
推荐阅读
