`
jiaweihao1987
  • 浏览: 16883 次
  • 性别: Icon_minigender_1
  • 来自: 北京
最近访客 更多访客>>
社区版块
存档分类
最新评论

使用MapReduce进行排序

阅读更多

 

      之前在工作中使用到过MapReduce的排序,当时对于这个平台的理解还比较浅显,选择的是一个最为简单的方式,就是只用一个Recude来做。因为Map之后到Reduce阶段,为了Merge的方便,MapReduce的实现会自己依据key值进行排序,这样得出的结果就是一个整体排序的结果。而如果使用超过一个Reduce任务的话,所得的结果是每个part内部有序,但是整体是需要进行merge才可以得到最终的全体有序的。今天读了《Hadoop权威指南》中的第8章,对使用Hadoop这一MapReduce的Java实现进行排序有所了解,在此进行简单的总结。

      首先我们来看一下Hadoop中内部Map和Reduce两个阶段所做的排序,可以使用下图来说明。

 

MapReduce的过程

       对MapReduce或者Hadoop有所了解的人可能都知道,所谓对于key值的排序,其实是在Map阶段进行的,而Rduce阶段所做的工作是对各个Map任务的结果进行Merge工作,这样就能保证整体是有序的。如果想在使用多个Reduce任务的情况下保证结果有序,我们可以做的是在上图中的partition阶段进行控制,使分配到每个reduce task的数据块为数值区域独立的,即比如整体数据在0~50之间,划分为5个Reduce任务的话,可以0~10区间的数据到第一个Reduce Task,10~20之间的到第二个,以此类推。但是这样就存在一个问题,划分出的各个任务中的数据可能并不是均等的,这样某些Reduce Task处理了很多数据,而其他的处理了很少的数据。Hadoop提供了RandomSampler类(位于InputSampler类中)来进行随机取样,然后按照取样结果对值域进行划分。一个示例代码如下:

 

public class SortByTemperatureUsingTotalOrderPartitioner extends Configured
		implements Tool {
	@Override
	public int run(String[] args) throws Exception {
		JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);
		if (conf == null) {
			return -1;
		}
		conf.setInputFormat(SequenceFileInputFormat.class);
		conf.setOutputKeyClass(IntWritable.class);
		conf.setOutputFormat(SequenceFileOutputFormat.class);
		SequenceFileOutputFormat.setCompressOutput(conf, true);
		SequenceFileOutputFormat
				.setOutputCompressorClass(conf, GzipCodec.class);
		SequenceFileOutputFormat.setOutputCompressionType(conf,
				CompressionType.BLOCK);
		conf.setPartitionerClass(TotalOrderPartitioner.class);
		InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler<IntWritable, Text>(
				0.1, 10000, 10);
		Path input = FileInputFormat.getInputPaths(conf)[0];
		input = input.makeQualified(input.getFileSystem(conf));
		Path partitionFile = new Path(input, "_partitions");
		TotalOrderPartitioner.setPartitionFile(conf, partitionFile);
		InputSampler.writePartitionFile(conf, sampler);
		// Add to DistributedCache
		URI partitionUri = new URI(partitionFile.toString() + "#_partitions");
		DistributedCache.addCacheFile(partitionUri, conf);
		DistributedCache.createSymlink(conf);
		JobClient.runJob(conf);
		return 0;
	}

	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(
				new SortByTemperatureUsingTotalOrderPartitioner(), args);
		System.exit(exitCode);
	}
}

      使用上述程序执行所得的结果会是多个划分,每个划分内部是有序的,而且第i个划分的key值会比i+1个划分的key值都要小。这样,就可以不需要进行再一步的merge,就可以得到整体的上有序结果。

 

      关于排序,一个更加有意思的应用是所谓的Secondary Sort,亦即在保证第一个key值有序的情况下,对第二个key值也要保证有序(可以是升序或者降序)。此处的一个实现方法是将这两个需要排序的部分都作为key值,使用IntPair进行存储,然后自己实现一个继承自WritableComparator的名为KeyComparator的用于比较的类,其代码如下:

 

public static class KeyComparator extends WritableComparator {
	protected KeyComparator() {
		super(IntPair.class, true);
	}

	@Override
	public int compare(WritableComparable w1, WritableComparable w2) {
		IntPair ip1 = (IntPair) w1;
		IntPair ip2 = (IntPair) w2;
		int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst());
		if (cmp != 0) {
			return cmp;
		}
		return -IntPair.compare(ip1.getSecond(), ip2.getSecond()); // reverse
	}
}

 

       这里对于第二列是得到降序的结果,在conf设置的时候,可以使用conf.setOutputKeyComparatorClass(KeyComparator.class);语句进行设置。这样执行计算程序的话,会存在一个问题,因为将两个int型的值共同作为key值来处理,在Map阶段结束后进行Partition的划分的时候,就会同样依照这个总key值进行划分,我们想要两个值,比如(1900,20)和(1900,23)被放到同一个Reduce任务中进行处理就无法实现,于是我们需要实现自己的Partitioner接口,代码如下:

 

public static class FirstPartitioner implements
		Partitioner<IntPair, NullWritable> {
	@Override
	public void configure(JobConf job) {
	}

	@Override
	public int getPartition(IntPair key, NullWritable value, int numPartitions) {
		return Math.abs(key.getFirst() * 127) % numPartitions;
	}
}

 

      同样在配置过程中使用conf.setPartitionerClass(FirstPartitioner.class);语句进行设置。除此之外,需要进行控制的还有一个Reduce中的Group by操作,方法是实现一个GroupComparator类,其中的比较只使用第一个键值即可,代码如下:

public static class GroupComparator extends WritableComparator {
	protected GroupComparator() {
		super(IntPair.class, true);
	}

	@Override
	public int compare(WritableComparable w1, WritableComparable w2) {
		IntPair ip1 = (IntPair) w1;
		IntPair ip2 = (IntPair) w2;
		return IntPair.compare(ip1.getFirst(), ip2.getFirst());
	}
}
       需要设置的是conf.setOutputValueGroupingComparator(GroupComparator.class);。这样就可以实现Secondary Sort过程了。
0
0
分享到:
评论

相关推荐

    mapreduce实现全栈排序

    mapreduce实现全栈排序,简单算法已经在文档中说明,想要了解的可以查看!

    分布式文件系统实例-mapreduce-排序

    很简单的,面向新手的一个MapReduce实例,对数据进行排序,对大数据理解很有帮助。

    mapreduce二次排序

    mapreduce二次排序,年份升序,按照年份聚合,气温降序

    MapReduce二次排序

    MapReduce二次排序代码实现。 二次排序这里指:在以key为关键字的排序基础上,同时按照value排序

    MapReduce2中自定义排序分组

    NULL 博文链接:https://username2.iteye.com/blog/2274802

    利用采样器实现mapreduce任务输出全排序

    利用采样器实现mapreduce任务输出全排序大数据-MapReduce

    大数据 hadoop mapreduce 词频统计

    在hadoop平台上,用mapreduce编程实现大数据的词频统计

    基于MapReduce的网页排序算法

    基于MapReduce,实现并行计算环境下的网页排序算法

    基于MapReduce的简单倒排索引的建立

    基于MapReduce的简单倒排索引的建立

    数据排序MapReduce实例

    数据排序MapReduce实例 对输入文件中每行进行排序,输入文件中的每行内容为一个数字,即一个数据。

    使用Java MapReduce实现数据全局排序【100012685】

    本次实验,在 Hadoop 平台上,使用 MapReduce 实现了数据的全局排序。将详细阐述了实现所需环境及过程。用阿里云服务器安装, OS: Ubuntu20.04 LTS . Hadoop 支持用三种模式启动:单机模式、伪分布式模式、分布式...

    基于MapReduce的学生平均成绩统计

    利用MapReduce实现了求学生成绩的最大值,最小值,及成绩分布。结合我的博客“MapReduce之学生平均成绩”看,效果更好。

    mapreduce wc单词计数 自定义分区 自定义排序实现

    实现mr的wordcount功能和自定义分区的功能、自定义排序功能;com.ellis.mr1为类似wc功能,com.ellis.mr2为自定义分区功能,com.ellis.mr3为自定义排序功能

    拓思爱诺大数据-第二次作业MapReduce编程

    四川大学IT企业实训,拓思爱诺大数据第二次作业,MapReduce编程,包括Hadoop wordcount程序,及flowcount流量统计程序,包括重写排序及分区函数

    基本排序算法及其在MapReduce的应用

    该文档为学习基本排序算法过程中的学习笔记,大部分内容从网络上...所以快排、归并以及堆排是必须要掌握的排序算法,这都在MapReduce内部使用的排序算法,学习Hadoop的必须过程。 所谓算法稳定性即能够保证排序前两个相

    Mapreduce-实践

    (实践一)数值概要中combiner的作用 在前述数值概要的运用中,加入不同的combiner,测试不同...在ctrip数据集上进行Top 10排序。 (实践五)去重的用户—针对ctrip数据集去重 对ctrip数据集中的product-id进行去重操作

    MapReduce模型--二次排序

    完整的二次排序具有多个层次的排序功能,可以有效提高系统的处理性能。...需要注意的是,这多个层次的排序功能均只能针对Key进行,而不能针对Value进行排序。在业务应用时, 尽量将需要排序的字段放到Key中去。

    Hadoop中MapReduce基本案例及代码(五)

    对相同分区的数据,按照key进行排序(默认按照字典排序)、分组。相同key的value放在一个集合中。 (可选)分组后对数据进行归约。 注意:MapReduce中,Mapper可以单独存在,但是Reducer不能存在。

    MapReduce的小应用

    利用MapReduce框架实现了关于音乐播放网站的两个简单问题。主要解决了多个Map多个Reduce的连接问题,二次排序问题,关于Key降序排序的问题。

    2018最新BAT大数据面试题.docx

    3)很多人的误解在 Map 阶段,如果不使用 Combiner便不会排序,这是错误的,不管你用不用 Combiner,Map Task 均会对产生的数据排序(如果没有 Reduce Task,则不会排序,实际上 Map 阶段的排序就是为了减轻 Reduce...

Global site tag (gtag.js) - Google Analytics