编程开发 购物 网址 游戏 小说 歌词 快照 开发 股票 美女 新闻 笑话 | 汉字 软件 日历 阅读 下载 图书馆 编程 China
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
移动开发 架构设计 编程语言 互联网 开发经验 Web前端 开发总结
开发杂谈 系统运维 研发管理 数据库 云 计 算 Java开发
VC(MFC) Delphi VB C++(C语言) C++ Builder 其它开发语言 云计算 Java开发 .Net开发 IOS开发 Android开发 PHP语言 JavaScript
ASP语言 HTML(CSS) HTML5 Apache MSSQL数据库 Oracle数据库 PowerBuilder Informatica 其它数据库 硬件及嵌入式开发 Linux开发资料
  编程开发知识库 -> 云计算 -> hadoop编程(8)-MapReduce案例:次排序(Secondary Sort)详解 -> 正文阅读
 

[云计算]hadoop编程(8)-MapReduce案例:次排序(Secondary Sort)详解[第1页]

所谓次排序,是指在最终输出中key关联着一个排好序的value-list,例如这样的需求:
/**
* 按年份(月份)列出温度,温度升序排列<br>
* 年-月 温度1,日期1 温度2,日期2
* 1990-1 -300,day:2 -200,day:1
*/
思路
根据我们掌握的知识,我们知道在reduce之前,数据会按key进行归并,假设我们以年-月作为map的输出键,那么同一年月的温度与日期将归并到一起供Reducer处理,在reduce函数中我们可以对value-list按温度排序。这就是第一种最易想到的解决方案:内存排序。
这个方案不具备可伸缩性,因为归约器要接收一个给定键的所有值,这可能导致归约器耗尽内存。
第二种方案是使用MapReduce框架对归约器值排序。我们知道,Reducer需要从各个节点下载同一个分区的数据,下载之后要排序分组然后归并(针对键)。利用这个特性,我们可以将自然键和次键(用于排序)构造成Maper输出的组合键,利用MapReduce会对maper输出的中间键排序的特性完成次排序。
组合键与Mapper
我们设计的组合键是(自然键,次键),在本例中最终的键是年-月,因此我们的组合键应当有year,month,temperature三个属性。
代码片段:

    private IntWritable year;
    private IntWritable month;
    private IntWritable temperature;

    public CompositeKey() {
      this(new IntWritable(), new IntWritable(), new IntWritable());
    }

    public CompositeKey(IntWritable year, IntWritable month, IntWritable temperature) {
      this.year = year;
      this.month = month;
      this.temperature = temperature;
    }

    public IntWritable getYear() {
      return year;
    }

    public IntWritable getMonth() {
      return month;
    }

    public IntWritable getTemperature() {
      return temperature;
    }

注意:需要默认构造器,且为域成员赋初始值。
组合键必须可被序列化,Hadoop的序列化方式要求实现Writable接口,通常情况下我们会实现其子接口WritableComparable,完成Writable和Comparable定义的接口方法,意味着我们的组合键可以被序列化也可以用于排序。
代码片段:

    @Override
    public int compareTo(CompositeKey other) {
      if (year.compareTo(other.year) == 0) {
        return month.compareTo(other.month);
      }
      return year.compareTo(other.year);
    }

    @Override
    public void write(DataOutput out) throws IOException {
      year.write(out);
      month.write(out);
      temperature.write(out);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
      year.readFields(in);
      month.readFields(in);
      temperature.readFields(in);
    }

注意:这里的比较逻辑,只依赖年和月,这里的比较逻辑将用于后面的分组,所以只考虑自然键的部分。
另外,我们不考虑自然键的顺序,分区算法就用默认的HashPartitioner即可,考虑到想听的年月必须在一个分区,我们要重写hashcode和equals方法:

    @Override
    public boolean equals(Object o) {
      if (this == o) return true;
      if (o == null || getClass() != o.getClass()) return false;

      CompositeKey that = (CompositeKey) o;

      if (!year.equals(that.year)) return false;
      return month.equals(that.month);
    }

    @Override
    public int hashCode() {
      int result = year.hashCode();
      result = 31 * result + month.hashCode();
      return result;
    }

再来看Mapper的代码,它只需像前几章的示例那样解析原始文本,构建CompositeKey的示例并输出即可:

public static class SecondarySortMapper extends Mapper<LongWritable, Text, CompositeKey, Text> {
    private static final int MISSING = 9999;

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      String line = value.toString(); // 整行的数据
      String year = line.substring(15, 19); // 年份
      String month = line.substring(19, 21); // 月份
      String day = line.substring(21, 23); // 月份
      int airTemperature;  // 某次记录的温度
      if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
        airTemperature = Integer.parseInt(line.substring(88, 92));
      } else {
        airTemperature = Integer.parseInt(line.substring(87, 92));
      }
      String quality = line.substring(92, 93); // 质量代码
      int yearInt = Integer.parseInt(year);
      int monthInt = Integer.parseInt(month);
      //提取有效数据
      if (airTemperature != MISSING && quality.matches("[01459]")) {
        final CompositeKey compositeKey = new CompositeKey(new IntWritable(yearInt), new IntWritable(monthInt), new IntWritable(airTemperature));
        context.write(compositeKey, new Text(airTemperature + ",day:" + day ));
      }
    }
  }

分组和排序比较器
前面我们重写了组合键的hashcode,可以按年和月的hashcode来分区。接下来我们要保证拥有同样年月的组合键被换分到同一个组:

  /**
   * 分组比较器,用于将相同的key发送至同一个reduce函数.
   * 对于组合键来说,截取其自然键部分作为比较即可。
   */
  private static class GroupingComparator extends WritableComparator {
    public GroupingComparator() {
      super(CompositeKey.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
      CompositeKey k1 = (CompositeKey) a;
      CompositeKey k2 = (CompositeKey) b;
      int cmp = k1.compareTo(k2);
      return cmp;
    }
  }

因为CompositeKey实现了WritableComparable接口,所以这里的代码比较简单。
接下来是排序比较器,它决定着归并顺序:

  /**
   * Key比较器,用于排序,考虑完整的组合键顺序
   */
  private static class KeySortingComparator extends WritableComparator {
    public KeySortingComparator() {
      super(CompositeKey.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
      CompositeKey k1 = (CompositeKey) a;
      CompositeKey k2 = (CompositeKey) b;
      int cmp = k1.compareTo(k2);
      if (cmp != 0)
        return cmp;
      return k1.getTemperature().compareTo(k2.getTemperature());
    }
  }

这里我们就一定要注意了!如果组合键的年月相同,我们继续用温度作为比较,温度低的组合键排在前面,这样在归并的时候就会按温度排序了。
Reducer代码
数据在到Reducer之前其实已经按<组合键,有序温度集合>的方式组织起来了,所以Reducer的任务比较简单,就考虑输出的格式就行:

  public static class SecondarySortReducer extends Reducer<CompositeKey, Text, Text, Text> {
    @Override
    protected void reduce(CompositeKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
      StringBuilder outKey = new StringBuilder();
      outKey.append(key.getYear().get()).append("-").append(key.getMonth().get());
      StringBuilder outValue = new StringBuilder();
      for (Iterator<Text> iter = values.iterator(); iter.hasNext(); ) {
        outValue.append(iter.next().toString()).append("\t");
      }
      context.write(new Text(outKey.toString()), new Text(outValue.toString()));
    }
  }

总结
总结起来,完成次排序关键这么几点:
自然键和次键(需要有序的键)=组合键 组合键要实现Writable接口 按自然键分区 按自然键分组 按组合键排序(自然键相同时考虑次键) 全部代码整合

package org.lanqiao.mr.secondarysort;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.lanqiao.mr.DriverUtil;
import org.lanqiao.mr.JobUtil;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;

/**
 * 按年份(月份)列出温度,温度升序排列<br>
 *   年-月   温度1,日期1  温度2,日期2
 *   1990-1 -300,day:2 -200,day:1
 */
public class ListTemperatureUsingSecondarySort extends Configured implements Tool {
  public static class SecondarySortMapper extends Mapper<LongWritable, Text, CompositeKey, Text> {
    private static final int MISSING = 9999;

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      String line = value.toString(); // 整行的数据
      String year = line.substring(15, 19); // 年份
      String month = line.substring(19, 21); // 月份
      String day = line.substring(21, 23); // 月份
      int airTemperature;  // 某次记录的温度
      if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
        airTemperature = Integer.parseInt(line.substring(88, 92));
      } else {
        airTemperature = Integer.parseInt(line.substring(87, 92));
      }
      String quality = line.substring(92, 93); // 质量代码
      int yearInt = Integer.parseInt(year);
      int monthInt = Integer.parseInt(month);
      //提取有效数据
      if (airTemperature != MISSING && quality.matches("[01459]")) {
        final CompositeKey compositeKey = new CompositeKey(new IntWritable(yearInt), new IntWritable(monthInt), new IntWritable(airTemperature));
        context.write(compositeKey, new Text(airTemperature + ",day:" + day ));
      }
    }
  }

  public static class SecondarySortReducer extends Reducer<CompositeKey, Text, Text, Text> {
    @Override
    protected void reduce(CompositeKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
      StringBuilder outKey = new StringBuilder();
      outKey.append(key.getYear().get()).append("-").append(key.getMonth().get());
      StringBuilder outValue = new StringBuilder();
      for (Iterator<Text> iter = values.iterator(); iter.hasNext(); ) {
        outValue.append(iter.next().toString()).append("\t");
      }
      context.write(new Text(outKey.toString()), new Text(outValue.toString()));
    }
  }


  @Override
  public int run(String[] args) throws Exception {
    Job job = JobUtil.getJob(this, args);

    job.setMapperClass(SecondarySortMapper.class);
    job.setMapOutputKeyClass(CompositeKey.class);
    // job.setPartitionerClass();
    job.setGroupingComparatorClass(GroupingComparator.class);
    job.setSortComparatorClass(KeySortingComparator.class);

    job.setReducerClass(SecondarySortReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    return job.waitForCompletion(true) ? 0 : 1;
  }

  public static void main(String[] args) throws Exception {
    ListTemperatureUsingSecondarySort tool = new ListTemperatureUsingSecondarySort();
    DriverUtil.runLocal(tool, args);
  }

  private static class CompositeKey implements WritableComparable<CompositeKey> {
    private IntWritable year;
    private IntWritable month;
    private IntWritable temperature;

    public CompositeKey() {
      this(new IntWritable(), new IntWritable(), new IntWritable());
    }

    public CompositeKey(IntWritable year, IntWritable month, IntWritable temperature) {
      this.year = year;
      this.month = month;
      this.temperature = temperature;
    }

    public IntWritable getYear() {
      return year;
    }

    public IntWritable getMonth() {
      return month;
    }

    public IntWritable getTemperature() {
      return temperature;
    }

    @Override
    public boolean equals(Object o) {
      if (this == o) return true;
      if (o == null || getClass() != o.getClass()) return false;

      CompositeKey that = (CompositeKey) o;

      if (!year.equals(that.year)) return false;
      return month.equals(that.month);
    }

    @Override
    public int hashCode() {
      int result = year.hashCode();
      result = 31 * result + month.hashCode();
      return result;
    }

    @Override
    public int compareTo(CompositeKey other) {
      if (year.compareTo(other.year) == 0) {
        return month.compareTo(other.month);
      }
      return year.compareTo(other.year);
    }

    @Override
    public void write(DataOutput out) throws IOException {
      year.write(out);
      month.write(out);
      temperature.write(out);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
      year.readFields(in);
      month.readFields(in);
      temperature.readFields(in);
    }
  }

  /**
   * Key比较器,用于排序,考虑完整的组合键顺序
   */
  private static class KeySortingComparator extends WritableComparator {
    public KeySortingComparator() {
      super(CompositeKey.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
      CompositeKey k1 = (CompositeKey) a;
      CompositeKey k2 = (CompositeKey) b;
      int cmp = k1.compareTo(k2);
      if (cmp != 0)
        return cmp;
      return k1.getTemperature().compareTo(k2.getTemperature());
    }
  }

  /**
   * 分组比较器,用于将相同的key发送至同一个reduce函数.
   * 对于组合键来说,截取其自然键部分作为比较即可。
   */
  private static class GroupingComparator extends WritableComparator {
    public GroupingComparator() {
      super(CompositeKey.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
      CompositeKey k1 = (CompositeKey) a;
      CompositeKey k2 = (CompositeKey) b;
      int cmp = k1.compareTo(k2);
      return cmp;
    }
  }
}

阅读全文
版权声明:本文为博主原创文章,未经博主允许不得转载。
本文已收录于以下专栏:

发表评论
HTML/XML objective-c Delphi Ruby PHP C# C++ JavaScript Visual Basic Python Java CSS SQL 其它
相关文章推荐
Hadoop实现Secondary Sort(一)
Hadoop的MapReduce模型支持基于key的排序,即在一次MapReduce之后,结果都是按照key的大小排序的。但是在很多应用情况下,我们需要对映射在一个key下的value集合进行排序,即...
hit_hlj_sgy 2013-10-18 18:00 804 Hadoop MapReduce 深入理解!二次排序案例!
1. MapReduce 处理的数据类型1.1 必须实现 org.apache.hadoop.io.Writable 接口。需要实现数据的序列化与反序列化,这样才能在多个节点之间传输数据!示例:pub...
xuxiuning 2016-04-11 21:04 9319 Hadoop MapReduce应用案例——排序
1.实例描述 对输入文件中的数据进行排序。输入文件中的每行内容
chiclewu 2014-10-29 23:01 2345 Hadoop 之 Secondary Sort介绍
Hadoop 之 Secondary Sort介绍 --------------------------- 我们知道,在reduce之前,MP框架会对收到的对按K进行排序,而对于一个特定的K来说,...
amuseme_lu 2011-11-10 14:41 8568 Hadoop Map Reduce Secondary Sort
How to sort the value? Hadoop.The.Definitive.Guide.3rd.Edition show that answer: 1. Make the k...
wasaia 2014-05-27 14:30 406 Hadoop实现Secondary Sort (三)
一、背景 排序对于MR来说是个核心内容,如何做好排序十分的重要,这几天写了一些,总结一下,以供以后读阅。 二、准备 1、hadoop版本是0.20.2 2、输入的数据格式(这个很重要,看清楚格...
hit_hlj_sgy 2013-10-15 10:34 718 我的编程学习日志(8)--排序(冒泡,选择,快速以及sort函数)
关于排序最先学的就是后一个与前一个比较并交换的冒泡排序和记录下标的选择排序,这里就不多介绍了,只把它的代码贴出来,重点说一下快速排序。 一、冒泡,选择:          //冒泡   ...
u013011866 2014-09-21 16:36 275 Hadoop之HDFS分布式文件系统NameNode及Secondary NameNode详解
一、NameNode启动时如何维护元数据: 概念介绍:        Edits文件:NameNode在本地操作系统的文件都会保存在Edits日志文件中。也就是说当文件系统中的任何元数据产生操作时,都...
mmd0308 2017-07-07 14:05 675 【Hadoop】MapReduce笔记(三):MapReduce的Shuffle和Sort阶段详解
整体的Shuffle过程包含以下几个部分:Map端Shuffle、Sort阶段、Reduce端Shuffle。即是说:Shuffle过程横跨 map 和 reduce 两端,中间包含 sort 阶...
DianaCody 2014-09-23 20:08 5890 Hadoop二次排序及MapReduce处理流程实例详解
一、概述 MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的,在我们实际的需求当中,往往有要对reduce输出结果进行二次排序的需...
lzm1340458776 2015-01-19 17:17 2215
zhengwei223 +关注
原创 86 粉丝 128 喜欢 0 码云  
他的最新文章 更多文章
hadoop编程(7)-MapReduce案例:使用TotalOrderPartitioner完成全局排序 hadoop编程(6)-MapReduce案例:Partitioner应用实例——全局排序 hadoop编程(5)-MapReduce案例:通过MinimalMapReduce进一步了解MR的机制 hadoop编程(4)-MapReduce案例:求每一年的最高温度
在线课程

【免费】搜狗机器翻译技术分享
讲师:

深度学习在推荐领域的应用和实践
讲师:吴岸城
热门文章 Shiro入门(2)Shiro提供的验证模块
23231
Shiro(4)默认鉴权与自定义鉴权
17991
Shiro(3)实现验证码认证
17348
Shiro入门(1)
16255
集成Hibernate Search做全文检索
14942
0
  云计算 最新文章
NLP06-Gensim源码简析[字典]
pandas中使用三元表达式
Docker基础笔记
大数据工程师必备技能图谱
ServiceComb中的数据最终一致性方案
63. Unique Paths II
手把手教你建github技术博客by hexo
hadoop datanode结点不启动导致dfs控制台显
spark on yarn架构简介
mapreduce程序本地模式调试
上一篇文章      下一篇文章      查看所有文章
加:2017-10-30 04:04:25  更:2017-10-30 04:04:40 
VC(MFC) Delphi VB C++(C语言) C++ Builder 其它开发语言 云计算 Java开发 .Net开发 IOS开发 Android开发 PHP语言 JavaScript
ASP语言 HTML(CSS) HTML5 Apache MSSQL数据库 Oracle数据库 PowerBuilder Informatica 其它数据库 硬件及嵌入式开发 Linux开发资料
360图书馆 软件开发资料 文字转语音 购物精选 软件下载 新闻资讯 小游戏 Chinese Culture 股票 三丰软件 开发 中国文化 网文精选 阅读网 看图 日历 万年历 2018年10日历
2018-10-19 0:54:50
多播视频美女直播
↓电视,电影,美女直播,迅雷资源↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  编程开发知识库