之前在工作中使用到过MapReduce的排序,当时对于这个平台的理解还比较浅显,选择的是一个最为简单的方式,就是只用一个Recude来做。因为Map之后到Reduce阶段,为了Merge的方便,MapReduce的实现会自己依据key值进行排序,这样得出的结果就是一个整体排序的结果。而如果使用超过一个Reduce任务的话,所得的结果是每个part内部有序,但是整体是需要进行merge才可以得到最终的全体有序的。今天读了《Hadoop权威指南》中的第8章,对使用Hadoop这一MapReduce的Java实现进行排序有所了解,在此进行简单的总结。
首先我们来看一下Hadoop中内部Map和Reduce两个阶段所做的排序,可以使用下图来说明。
对MapReduce或者Hadoop有所了解的人可能都知道,所谓对于key值的排序,其实是在Map阶段进行的,而Rduce阶段所做的工作是对各个Map任务的结果进行Merge工作,这样就能保证整体是有序的。如果想在使用多个Reduce任务的情况下保证结果有序,我们可以做的是在上图中的partition阶段进行控制,使分配到每个reduce
task的数据块为数值区域独立的,即比如整体数据在0~50之间,划分为5个Reduce任务的话,可以0~10区间的数据到第一个Reduce
Task,10~20之间的到第二个,以此类推。但是这样就存在一个问题,划分出的各个任务中的数据可能并不是均等的,这样某些Reduce
Task处理了很多数据,而其他的处理了很少的数据。Hadoop提供了RandomSampler类(位于InputSampler类中)来进行随机取样,然后按照取样结果对值域进行划分。一个示例代码如下:
public class SortByTemperatureUsingTotalOrderPartitioner extends Configured
implements Tool {
@Override
public int run(String[] args) throws Exception {
JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);
if (conf == null) {
return -1;
}
conf.setInputFormat(SequenceFileInputFormat.class);
conf.setOutputKeyClass(IntWritable.class);
conf.setOutputFormat(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setCompressOutput(conf, true);
SequenceFileOutputFormat
.setOutputCompressorClass(conf, GzipCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(conf,
CompressionType.BLOCK);
conf.setPartitionerClass(TotalOrderPartitioner.class);
InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler<IntWritable, Text>(
0.1, 10000, 10);
Path input = FileInputFormat.getInputPaths(conf)[0];
input = input.makeQualified(input.getFileSystem(conf));
Path partitionFile = new Path(input, "_partitions");
TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
InputSampler.writePartitionFile(conf, sampler);
// Add to DistributedCache
URI partitionUri = new URI(partitionFile.toString() + "#_partitions");
DistributedCache.addCacheFile(partitionUri, conf);
DistributedCache.createSymlink(conf);
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(
new SortByTemperatureUsingTotalOrderPartitioner(), args);
System.exit(exitCode);
}
}
使用上述程序执行所得的结果会是多个划分,每个划分内部是有序的,而且第i个划分的key值会比i+1个划分的key值都要小。这样,就可以不需要进行再一步的merge,就可以得到整体的上有序结果。
关于排序,一个更加有意思的应用是所谓的Secondary
Sort,亦即在保证第一个key值有序的情况下,对第二个key值也要保证有序(可以是升序或者降序)。此处的一个实现方法是将这两个需要排序的部分都作为key值,使用IntPair进行存储,然后自己实现一个继承自WritableComparator的名为KeyComparator的用于比较的类,其代码如下:
public static class KeyComparator extends WritableComparator {
protected KeyComparator() {
super(IntPair.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst());
if (cmp != 0) {
return cmp;
}
return -IntPair.compare(ip1.getSecond(), ip2.getSecond()); // reverse
}
}
这里对于第二列是得到降序的结果,在conf设置的时候,可以使用conf.setOutputKeyComparatorClass(KeyComparator.class);语句进行设置。这样执行计算程序的话,会存在一个问题,因为将两个int型的值共同作为key值来处理,在Map阶段结束后进行Partition的划分的时候,就会同样依照这个总key值进行划分,我们想要两个值,比如(1900,20)和(1900,23)被放到同一个Reduce任务中进行处理就无法实现,于是我们需要实现自己的Partitioner接口,代码如下:
public static class FirstPartitioner implements
Partitioner<IntPair, NullWritable> {
@Override
public void configure(JobConf job) {
}
@Override
public int getPartition(IntPair key, NullWritable value, int numPartitions) {
return Math.abs(key.getFirst() * 127) % numPartitions;
}
}
同样在配置过程中使用conf.setPartitionerClass(FirstPartitioner.class);语句进行设置。除此之外,需要进行控制的还有一个Reduce中的Group
by操作,方法是实现一个GroupComparator类,其中的比较只使用第一个键值即可,代码如下:
public static class GroupComparator extends WritableComparator {
protected GroupComparator() {
super(IntPair.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
return IntPair.compare(ip1.getFirst(), ip2.getFirst());
}
}
需要设置的是conf.setOutputValueGroupingComparator(GroupComparator.class);。这样就可以实现Secondary
Sort过程了。
分享到:
相关推荐
mapreduce实现全栈排序,简单算法已经在文档中说明,想要了解的可以查看!
很简单的,面向新手的一个MapReduce实例,对数据进行排序,对大数据理解很有帮助。
mapreduce二次排序,年份升序,按照年份聚合,气温降序
MapReduce二次排序代码实现。 二次排序这里指:在以key为关键字的排序基础上,同时按照value排序
NULL 博文链接:https://username2.iteye.com/blog/2274802
利用采样器实现mapreduce任务输出全排序大数据-MapReduce
在hadoop平台上,用mapreduce编程实现大数据的词频统计
基于MapReduce,实现并行计算环境下的网页排序算法
基于MapReduce的简单倒排索引的建立
数据排序MapReduce实例 对输入文件中每行进行排序,输入文件中的每行内容为一个数字,即一个数据。
本次实验,在 Hadoop 平台上,使用 MapReduce 实现了数据的全局排序。将详细阐述了实现所需环境及过程。用阿里云服务器安装, OS: Ubuntu20.04 LTS . Hadoop 支持用三种模式启动:单机模式、伪分布式模式、分布式...
利用MapReduce实现了求学生成绩的最大值,最小值,及成绩分布。结合我的博客“MapReduce之学生平均成绩”看,效果更好。
实现mr的wordcount功能和自定义分区的功能、自定义排序功能;com.ellis.mr1为类似wc功能,com.ellis.mr2为自定义分区功能,com.ellis.mr3为自定义排序功能
四川大学IT企业实训,拓思爱诺大数据第二次作业,MapReduce编程,包括Hadoop wordcount程序,及flowcount流量统计程序,包括重写排序及分区函数
该文档为学习基本排序算法过程中的学习笔记,大部分内容从网络上...所以快排、归并以及堆排是必须要掌握的排序算法,这都在MapReduce内部使用的排序算法,学习Hadoop的必须过程。 所谓算法稳定性即能够保证排序前两个相
(实践一)数值概要中combiner的作用 在前述数值概要的运用中,加入不同的combiner,测试不同...在ctrip数据集上进行Top 10排序。 (实践五)去重的用户—针对ctrip数据集去重 对ctrip数据集中的product-id进行去重操作
完整的二次排序具有多个层次的排序功能,可以有效提高系统的处理性能。...需要注意的是,这多个层次的排序功能均只能针对Key进行,而不能针对Value进行排序。在业务应用时, 尽量将需要排序的字段放到Key中去。
对相同分区的数据,按照key进行排序(默认按照字典排序)、分组。相同key的value放在一个集合中。 (可选)分组后对数据进行归约。 注意:MapReduce中,Mapper可以单独存在,但是Reducer不能存在。
利用MapReduce框架实现了关于音乐播放网站的两个简单问题。主要解决了多个Map多个Reduce的连接问题,二次排序问题,关于Key降序排序的问题。
3)很多人的误解在 Map 阶段,如果不使用 Combiner便不会排序,这是错误的,不管你用不用 Combiner,Map Task 均会对产生的数据排序(如果没有 Reduce Task,则不会排序,实际上 Map 阶段的排序就是为了减轻 Reduce...