Processing math: 100%
大数据技术原理与应用 - (7). MapReduce

大数据技术原理与应用 - (7). MapReduce

【第三篇】 - 大数据处理与分析, 《大数据技术原理与应用, 林子雨》

本篇介绍大数据处理与分析的相关技术,包括

MapReduce 是一种并行编程模型,用于大规模数据集 (大于 1 TB) 的并行运算,它将复杂的、运行于大规模集群上的并行计算过程高度抽象到两个函数: MapReduce

MapReduce 概述

MapReduce 模型简介

  • MapReduce 将复杂的、运行于大规模集群上的并行计算过程高度地抽象到了两个函数: Map 和 Reduce
  • 编程容易,不需要掌握分布式并行编程细节,也可以很容易把自己的程序运行在分布式系统上,完成海量数据的计算
  • MapReduce 采用 “分而治之” 策略,一个存储在分布式文件系统中的大规模数据集,会被切分成许多独立的分片 (split),这些分片可以被多个 Map 任务并行处理
  • MapReduce 设计的一个理念就是 “计算向数据靠拢”,而不是“数据向计算靠拢”,因为,移动数据需要大量的网络传输开销
  • MapReduce 框架采用了 Master/ Slave 架构,包括一个 Master 和若干个 Slave。Master 上运行 JobTracker,Slave 上运行 TaskTracker
  • Hadoop 框架是用 Java 实现的,但是,MapReduce 应用程序则不一定要用 Java 来写

适合用 MapReduce 来处理的数据集需要满足一个前提条件:待处理的数据集可以分解成许多小的数据集,而且每一个小数据集都可以完全并行地进行处理。

Map 和 Reduce 函数

函数
输入
输出
说明
Map <k1,v1>
如:
<行号, “a b c”>
List(<k2,v2>) 如:
<”a”, 1>
<”b”, 1>
<”c”, 1>
1.将小数据集进一步解析成一批 <key,value> 对,输入Map函数中进行处理
2.每一个输入的 <k1,v1> 会输出一批 <k2,v2><k2,v2> 是计算的中间结果
Reduce <k2,List(v2)>
如:
< “a”, <1,1,1> >
<k3,v3> 如:
<”a”, 3>
输入的中间结果 <k2,List(v2)>中的 List(v2) 表示是一批属于同一个 k2value
  • Map 函数将输入地元素转换成 <key,value> 形式地键值对,键和值地类型也是任意的,其中键不同于一般的标志属性,即键没有唯一性,不能作为输出地身份标识。

MapReduce 体系结构

MapReduce 主要有以下 4 个部分组成:

  1. Client
    • 用户编写的 MapReduce 程序通过 Client 提交到 JobTracker 端
    • 用户可通过 Client 提供的一些接口查看作业运行状态
  2. JobTracker
    • JobTracker 负责资源监控和作业调度
    • JobTracker 监控所有 TaskTracker 与 Job 的健康状况,一旦发现失败,就将相应的任务转移到其他节点
    • JobTracker 会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器 (TaskScheduler),而调度器会在资源出现空闲时,选择合适的任务去使用这些资源
  3. TaskTracker
    • TaskTracker 会周期性地通过“心跳”将本节点上资源的使用情况和任务的运行进度汇报给 JobTracker,同时接收 JobTracker 发送过来的命令并执行相应的操作 (如启动新任务、杀死任务等)
    • TaskTracker 使用 “slot” 等量划分本节点上的资源量 (CPU、内存等)。一个 Task 获取到一个 slot 后才有机会运行,而 Hadoop 调度器的作用就是将各个TaskTracker 上的空闲 slot 分配给 Task 使用。slot 分为 Map slot 和 Reduce slot 两种,分别供 MapTask 和 Reduce Task 使用
  4. Task
    • Task 分为 Map Task 和 Reduce Task 两种,均由 TaskTracker 启动

MapReduce 工作流程

工作流程概述

MapReduce 的核心思想是“分而治之”。如上图所示,把一个大的数据拆分成许多小的数据块再多台机器上并行处理,对于一个大 MapReduce 作业:

  1. 首先会被拆分成许多个 Map 任务在多台机器上并行执行,每个 Map 任务通常运行在数据存储的节点上,以避免额外的数据传输开销。
  2. Map 任务结束后,会生成以 <key,value> 形式的中间结果。
  3. 这些中间结果会被分到多个 Reduce 任务在多台机器上执行,具有相同 key<key,value> 会被发送到同一个 Reduce 任务那里
  4. Reduce 任务会对中间结果进行汇总计算得到最后结果,并输出到分布式文件系统中。

需要指出:

  • 不同的 Map 任务之间不会进行通信
  • 不同的 Reduce 任务之间也不会发生任何信息交换
  • 用户不能显式地从一台机器向另一台机器发送消息
  • 所有的数据交换都是通过 MapReduce 框架自身去实现的

MapReduce 各个执行阶段

下面是 MapReduce 算法的执行过程

  1. MapReduce 框架使用 InputFormat 模块做 Map 前的预处理,比如验证输入的格式是否符合输入的定义;然后,将输入文件切分为逻辑上的多个 InputSplitInputSlit 是 MapReduce 对文件进行处理和运算的输入单位,只是一个逻辑概念。
  2. 由于 InputSplit 并非物理切分,因此需要 RecordReader (RR) 根据 InputSplit 中的信息来处理 InputSplit 中的具体记录,加载数据并转换为适合 Map 任务读取的键值对,输入给 Map 任务
  3. Map 任务会根据用户自定义的映射规则,输出一系列的 <key,value> 作为中间结果。
  4. 对 Map 输出的中间结果 <key,value> 进行 Shuffle 操作得到有序的 <key,valuelist>
  5. Reduce 以一系列 <key,valuelist> 中间结果作为输入,执行用户定义的逻辑,输出结果给 OutputFormat 模块。
  6. OutputFormat 模块会验证输出目录是否已经存在以及输出结果类型是否符合配置文件中的配置类型,如果都满足,就输出 Reduce 的结果到分布式文件系统。

Split (分片) 的解释

执行过程中的第 1 步提到 InputSplit 只是一个逻辑概念,并非物理切分:

  • HDFS 以固定大小的 block 为基本单位存储数据,而对于 MapReduce 而言,其处理单位是 split。split 是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。

Map 任务的数量

  • Hadoop 为每个 split 创建一个 Map 任务,split 的多少决定了 Map 任务的数目。大多数情况下,理想的分片大小是一个 HDFS 块。

Reduce 任务的数量

  • 最优的 Reduce 任务个数取决于集群中可用的 Reduce 任务槽 (Reduce slot) 的数目
  • 通常设置比 Reduce 任务槽数目稍微小一些的 Reduce 任务个数 (这样可以预留一些系统资源处理可能发生的错误)

Shuffle 过程详解

Shuffle 是指对 Map 输出结果进行一系列的分区 (Portiton)、排序 (Sort)、合并 (Combine)、归并 (Merge) 等处理并交给 Reduce 的过程。

Map 端的 Shuffle 过程

Map 的输出结果首先被写入缓存,当缓存满时,就启动溢写操作,把缓存中的数据写入磁盘文件、并清空缓存。

  • 当启动溢写操作时,首先需要吧缓存中的数据进行分区 (Portition),然后对每个分区的数据进行排序 (Sort)合并 (Combine),之后再写入磁盘。
  • 每次溢写操作会生成一个新的磁盘文件,随着 Map 任务的执行,会生成多个磁盘溢写文件。
  • 在 Map 任务全部结束前,这些溢写文件会被归并 (Merge) 成一个大的磁盘文件,然后通知相应的 Reduce 任务来领取属于自己的数据。

Combine 和 Merge 的区别: 两个键值对 <”a”, 1> 和 <”a”, 1>,

  • 如果合并,会得到 <”a”, 2>,
  • 如果归并,会得到 <”a”, <1, 1>>

Reduce 端的 Shuffle 过程

  • Reduce 任务通过 RPC 向 JobTracker 询问 Map 任务是否已经完成,若完成,则领取数据
  • Reduce 领取数据先放入缓存,来自不同 Map 机器,先归并 (Merge),再合并 (Combine),写入磁盘
  • 多个溢写文件归并成一个或多个大文件,文件中的键值对是排序的
  • 当数据很少时,不需要溢写到磁盘,直接在缓存中归并,然后输出给Reduce

MapReduce 应用程序执行过程

MapRduce 过程详解

实例分析:WordCount

问题描述

  • 输入: 一个包含大量单词的文本文件

  • 输出: 文件中每个单词及其出现次数 (频数),并按照单词字母顺序排序,每个单词和其频数占一行,单词和频数之间有间隔

输入 输出
Hello World,
Hello Hadoop,
Hello MapReduce
Hadoop 1
Hello 3
MapReduce 1
World 1

WordCount 执行过程实例

MapReduce 的具体应用

MapReduce可以很好地应用于各种计算问题

  • 关系代数运算(选择、投影、并、交、差、连接)
    • 关系的选择运算
    • 关系的投影运算
    • 关系的并、交、差运算
    • 关系的自然连接运算
  • 分组与聚合运算
  • 矩阵-向量乘法
  • 矩阵乘法

用 MapReduce 实现关系的自然连接

  • 假设有关系 R(A,B) 和 S(B,C),对二者进行自然连接操作
  • 使用 Map 过程,把来自 R 的每个元组 <a,b> 转换成一个键值对 <b,<R,a>> ,其中的键就是属性B的值。把关系 R 包含到值中,这样做使得我们可以在 Reduce 阶段,只把那些来自 R 的元组和来自 S 的元组进行匹配。类似地,使用 Map 过程,把来自 S 的每个元组 <b,c>,转换成一个键值对 <b,<S,c>>
  • 所有具有相同B值的元组被发送到同一个 Reduce 进程中,Reduce 进程的任务是,把来自关系 R 和 S 的、具有相同属性 B 值的元组进行合并
  • Reduce 进程的输出则是连接后的元组 <a,b,c>,输出被写到一个单独的输出文件中

MapReduce 编程实践 (Hadoop 3.1.3)

编程实践来自于厦门大学数据库实验室林子雨老师编写的案例:http://dblab.xmu.edu.cn/blog/2481-2/
MapReduce是谷歌公司的核心计算模型,Hadoop开源实现了MapReduce。MapReduce将复杂的、运行于大规模集群上的并行计算过程高度抽象到了两个函数:Map和Reduce,并极大地方便了分布式编程工作,编程人员在不会分布式并行编程的情况下,也可以很容易将自己的程序运行在分布式系统上,完成海量数据的计算。
本教程以一个词频统计任务为主线,详细介绍MapReduce基础编程方法。环境是Ubuntu18.04(或Ubuntu16.04或Ubuntu14.04)、Hadoop3.1.3,开发工具是Eclipse。

词频统计任务要求

首先,在 Linux 系统本地创建两个文件,即文件 wordfile1.txtwordfile2.txt。在实际应用中,这两个文件可能会非常大,会被分布存储到多个节点上。但是,为了简化任务,这里的两个文件只包含几行简单的内容。需要说明的是,针对这两个小数据集样本编写的 MapReduce 词频统计程序,不作任何修改,就可以用来处理大规模数据集的词频统计。

文件 wordfile1.txt 的内容如下:

1
2
I love Spark
I love Hadoop

文件 wordfile2.txt 的内容如下:

1
2
Hadoop is good
Spark is fast

假设 HDFS 中有一个 /user/hadoop/input 文件夹,并且文件夹为空,请把文件 wordfile1.txtwordfile2.txt 上传到 HDFS中 的 input 文件夹下。现在需要设计一个词频统计程序,统计 input 文件夹下所有文件中每个单词的出现次数,也就是说,程序应该输出如下形式的结果:

1
2
3
4
5
6
7
fast  1
good 1
Hadoop 2
I 2
is 2
love 2
Spark 2

WordCount 程序代码:

wordcount.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public WordCount() {
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if(otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(WordCount.TokenizerMapper.class);
job.setCombinerClass(WordCount.IntSumReducer.class);
job.setReducerClass(WordCount.IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true)?0:1);
}
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
public TokenizerMapper() {
}
public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
context.write(this.word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public IntSumReducer() {
}
public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
IntWritable val;
for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
val = (IntWritable)i$.next();
}
this.result.set(sum);
context.write(key, this.result);
}
}
}

详细过程请访问林子雨老师的教程——MapReduce编程实践(Hadoop3.1.3)


Comments

Related Issues not found

Please contact @zengzhanhang to initialize the comment

Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×