【论文阅读】MapReduce: Simplified Data Processing on Large Clusters

原博客链接:https://tanxinyu.work/mapreduce-thesis/

相关背景

在 20 世纪初,包括本文作者在内的 Google 的很多程序员,为了处理海量的原始数据,已经实现了数以百计的、专用的计算方法。这些计算方法用来处理大量的原始数据,比如,文档抓取(类似网络爬虫的程序)、Web 请求日志等等;也为了计算处理各种类型的衍生数据,比如倒排索引、Web 文档的图结构的各种表示形势、每台主机上网络爬虫抓取的页面数量的汇总、每天被请求的最多的查询的集合等等。

要解决的问题

大多数以上提到的数据处理运算在概念上很容易理解。然而由于输入的数据量巨大,因此要想在可接受的时间内完成运算,只有将这些计算分布在成百上千的主机上。如何处理并行计算、如何分发数据、如何处理错误?所有这些问题综合在一起,需要大量的代码处理,因此也使得原本简单的运算变得难以处理。

解决方法

模型

为了解决上述复杂的问题,本文设计一个新的抽象模型,使用这个抽象模型,用户只要表述想要执行的简单运算即可,而不必关心并行计算、容错、数据分布、负载均衡等复杂的细节,这些问题都被封装在了一个库里面:利用一个输入 key/value pair 集合来产生一个输出的 key/value pair 集合。

MapReduce 库的用户可以用两个函数表达这个计算:Map 和 Reduce。

  • 用户自定义的 Map 函数接受一个输入的 key/value pair 值,然后产生一个中间 key/value pair 值的集合。MapReduce 库把所有具有相同中间 key 值 I 的中间 value 值集合在一起后按照一定的规律传递给 reduce 函数。
  • 用户自定义的 Reduce 函数接受一个中间 key 的值 I 和相关的一个 value 值的集合。Reduce 函数合并这些 value 值,形成一个较小的 value 值的集合。一般的,每次 Reduce 函数调用只产生 0 或 1 个输出 value 值。通常 Map 通过一个迭代器把中间 value 值提供给 Reduce 函数,这样 Reduce Worker 就可以处理无法全部放入内存中的大量的 value 值的集合。
    在概念上,用户定义的 Map 和 Reduce 函数都有相关联的类型:
1
2
map(k1,v1) ->list(k2,v2)
reduce(k2,list(v2)) ->list(v2)

比如,输入的 key 和 value 值与输出的 key 和 value 值在类型上推导的域不同。此外,中间 key 和 value 值与输出 key 和 value 值在类型上推导的域相同。

执行流程

通过将 Map 调用的输入数据自动分割为 M 个数据片段的集合,Map 调用被分布到多台机器上执行。输入的数据片段能够在不同的机器上并行处理。使用分区函数将 Map 调用产生的中间 key 值分成 R 个不同分区(例如,hash(key) mod R),Reduce 调用也被分布到多台机器上执行。分区数量(R)和分区函数由用户来指定。

执行流程

上图展示了 MapReduce 实现中操作的全部流程。当用户调用 MapReduce 函数时,将发生下面的一系列动作:

  1. 用户程序首先调用的 MapReduce 库将输入文件分成 M 个数据片度,每个数据片段的大小一般从 16MB 到 64MB(可以通过可选的参数来控制每个数据片段的大小)。然后用户程序在机群中创建大量的程序副本。
  2. 这些程序副本中的有一个特殊的程序–master。副本中其它的程序都是 worker 程序,由 master 分配任务。有 M 个 Map 任务和 R 个 Reduce 任务将被分配,master 将一个 Map 任务或 Reduce 任务分配给一个空闲的 worker。
  3. 被分配了 map 任务的 worker 程序读取相关的输入数据片段,从输入的数据片段中解析出 key/value pair,然后把 key/value pair 传递给用户自定义的 Map 函数,由 Map 函数生成并输出的中间 key/value pair,并缓存在内存中。
  4. 缓存中的 key/value pair 通过分区函数分成 R 个区域,之后周期性的写入到本地磁盘上。缓存的 key/value pair 在本地磁盘上的存储位置将被回传给 master,由 master 负责把这些存储位置再传送给 Reduce worker
  5. 当 Reduce worker 程序接收到 master 程序发来的数据存储位置信息后,使用 RPC 从 Map worker 所在主机的磁盘上读取这些缓存数据。当 Reduce worker 读取了所有的中间数据后,通过对 key 进行排序后使得具有相同 key 值的数据聚合在一起。由于许多不同的 key 值会映射到相同的 Reduce 任务上,因此必须进行排序。如果中间数据太大无法在内存中完成排序,那么就要在外部进行排序。
  6. Reduce worker 程序遍历排序后的中间数据,对于每一个唯一的中间 key 值,Reduce worker 程序将这个 key 值和它相关的中间 value 值的集合传递给用户自定义的 Reduce 函数。Reduce 函数的输出被追加到所属分区的输出文件。
  7. 当所有的 Map 和 Reduce 任务都完成之后,master 唤醒用户程序。在这个时候,在用户程序里的对 MapReduce 调用才返回。

容错

worker 故障

master 与 worker 之间同步心跳,对于失效的 worker,根据其类型来做进一步处理:

  • Map worker 故障:由于 Map 任务将数据临时存储在本地,所以需要重新执行。
  • Reduce worker 故障:由于 Reduce 任务将数据存储在全局文件系统中 ,所以不需要重新执行。

master 故障

MapReduce 任务重新执行

故障语义保证

当用户提供的 Map 和 Reduce 操作是输入确定性函数(即相同的输入产生相同的输出)时,MapReduce 的分布式实现在任何情况下的输出都和所有程序没有出现任何错误、顺序的执行产生的输出是一样的。

  • Map worker 任务的原子提交:每个 Map 任务生成 R 个本地临时文件,当一个 Map 任务完成时,worker 发送一个包含 R 个临时文件名的完成消息给 master。如果 master 从一个已经完成的 Map 任务再次接收到一个完成消息,master 将忽略这个消息;
  • Reduce worker 任务的原子提交:当 Reduce 任务完成时,Reduce worker 进程以原子的方式把临时文件重命名为最终的输出文件。如果同一个 Reduce 任务在多台机器上执行,针对同一个最终的输出文件将有多个重命名操作执行。MapReduce 依赖底层文件系统提供的重命名操作的原子性来保证最终的文件系统状态仅仅包含一个 Reduce 任务产生的数据。

存储位置优化

核心思想:本地读文件以减少流量消耗

MapReduce 的 master 在调度 Map 任务时会考虑输入文件的位置信息,尽量将一个 Map 任务调度在包含相关输入数据拷贝的机器上执行;如果上述努力失败了,master 将尝试在保存有输入数据拷贝的机器附近的机器上执行 Map 任务(例如,分配到一个和包含输入数据的机器在一个交换机里的 worker 机器上执行)。

任务粒度

理想情况下,M 和 R 应当比集群中 worker 的机器数量要多得多。在每台 worker 机器都执行大量的不同任务能够提高集群的动态的负载均衡能力,并且能够加快故障恢复的速度:失效机器上执行的大量 Map 任务都可以分布到所有其他的 worker 机器上去执行。

实际使用时建议用户选择合适的 M 值,以使得每一个独立任务都是处理大约 16M 到 64M 的输入数据(这样,上面描写的输入数据本地存储优化策略才最有效),另外,也建议把 R 值设置使用的 worker 机器数量的小倍数。比如:M=200000,R=5000,使用 2000 台 worker 机器。

备用任务

影响一个 MapReduce 的总执行时间最通常的因素是“落伍者”:在运算过程中,如果有一台机器花了很长的时间才完成最后几个 Map 或 Reduce 任务,导致 MapReduce 操作总的执行时间超过预期。

为了解决落伍者的问题,当一个 MapReduce 操作接近完成的时候,master 调度备用(backup)任务进程来执行剩下的、处于处理中状态(in-progress)的任务。无论是最初的执行进程、还是备用(backup)任务进程完成了任务,MapReduce 都把这个任务标记成为已经完成。此个机制通常只会占用比正常操作多几个百分点的计算资源。但能减少近 50% 的任务完成总时间。

技巧

分区函数

MapReduce 缺省的分区函数是使用 hash 方法(比如,hash(key) mod R) 进行分区。hash 方法能产生非常平衡的分区。然而,有的时候,其它的一些分区函数对 key 值进行的分区将非常有用。比如,输出的 key 值是 URLs,有的用户希望每个主机的所有条目保持在同一个输出文件中。为了支持类似的情况,MapReduce 库的用户需要提供专门的分区函数。例如,使用“hash(Hostname(urlkey))mod R”作为分区函数就可以把所有来自同一个主机的 URLs 保存在同一个输出文件中。

顺序保证

MapReduce 确保在给定的分区中,中间 key/value pair 数据的处理顺序是按照 key 值增量顺序处理的。这样的顺序保证对每个分成生成一个有序的输出文件,这对于需要对输出文件按 key 值随机存取的应用非常有意义,对在排序输出的数据集也很有帮助。

Combiner 函数

在某些情况下,Map 函数产生的中间 key 值的重复数据会占很大的比重,并且,用户自定义的 Reduce 函数满足结合律和交换律。在 2.1 节的词数统计程序是个很好的例子。由于词频率倾向于一个 zipf 分布(齐夫分布),每个 Map 任务将产生成千上万个这样的记录。所有的这些记录将通过网络被发送到一个单独的 Reduce 任务,然后由这个 Reduce 任务把所有这些记录累加起来产生一个数字。MapReduce 允许用户指定一个可选的 combiner 函数,combiner 函数首先在本地将这些记录进行一次合并,然后将合并的结果再通过网络发送出去。

Combiner 函数在每台执行 Map 任务的机器上都会被执行一次。一般情况下,Combiner 和 Reduce 函数是一样的。Combiner 函数和 Reduce 函数之间唯一的区别是 MapReduce 库怎样控制函数的输出。Reduce 函数的输出被保存在最终的输出文件里,而 Combiner 函数的输出被写到中间文件里,然后被发送给 Reduce 任务。

部分的合并中间结果可以显著的提高一些 MapReduce 操作的速度。

输入和输出的类型

支持常用的类型,可以通过提供一个简单的 Reader 接口实现来支持一个新的输入类型。Reader 并非一定要从文件中读取数据,比如可以很容易的实现一个从数据库里读记录的 Reader,或者从内存中的数据结构读取数据的 Reader。

副作用

在某些情况下,MapReduce 的使用者发现,如果在 Map 或 Reduce 操作过程中增加辅助的输出文件会比较省事。MapReduce 依靠程序 writer 把这种“副作用”变成原子的和幂等的。通常应用程序首先把输出结果写到一个临时文件中,在输出全部数据之后,在使用系统级的原子操作 rename 重新命名这个临时文件。

跳过损坏的记录

每个 worker 进程都设置了信号处理函数捕获内存段异常(segmentation violation)和总线错误(bus error)。 在执行 Map 或者 Reduce 操作之前,MapReduce 库通过全局变量保存记录序号。如果用户程序触发了一个系统信号,消息处理函数将用“最后一口气”通过 UDP 包向 master 发送处理的最后一条记录的序号。当 master 看到在处理某条特定记录不止失败一次时,master 就标志着条记录需要被跳过,并且在下次重新执行相关的 Map 或者 Reduce 任务的时候跳过这条记录。

本地执行

支持本地串行执行以方便调试

状态信息

master 支持嵌入 HTTP 服务器以显示一组状态信息页面,用户可以监控各种执行状态。状态信息页面显示了包括计算执行的进度,比如已经完成了多少任务、有多少任务正在处理、输入的字节数、中间数据的字节数、输出的字节数、处理百分比等等

计数器

MapReduce 库使用计数器统计不同事件发生次数。比如,用户可能想统计已经处理了多少个单词、已经索引的多少篇 German 文档等等。

这些计数器的值周期性的从各个单独的 worker 机器上传递给 master(附加在 ping 的应答包中传递)。master 把执行成功的 Map 和 Reduce 任务的计数器值进行累计,当 MapReduce 操作完成之后,返回给用户代码。

计数器当前的值也会显示在 master 的状态页面上,这样用户就可以看到当前计算的进度。当累加计数器的值的时候,master 要检查重复运行的 Map 或者 Reduce 任务,避免重复累加(之前提到的备用任务和失效后重新执行任务这两种情况会导致相同的任务被多次执行)。

应用场景

分布式的 Grep:Map 函数输出匹配某个模式的一行,Reduce 函数是一个恒等函数,即把中间数据复制到输出。

  • 计算 URL 访问频率:Map 函数处理日志中 web 页面请求的记录,然后输出 (URL,1)。Reduce 函数把相同 URL 的 value 值都累加起来,产生 (URL, 记录总数)结果。
    网络链接倒排:Map 函数在源页面(source)中搜索所有的链接目标(target)并输出为 (target,source)。Reduce 函数把给定链接目标(target)的链接组合成一个列表,输出 (target,list(source))。
  • 每个主机的检索词向量:检索词向量用一个(词,频率)列表来概述出现在文档或文档集中的最重要的一些词。Map 函数为每一个输入文档输出(主机名,检索词向量),其中主机名来自文档的 URL。Reduce 函数接收给定主机的所有文档的检索词向量,并把这些检索词向量加在一起,丢弃掉低频的检索词,输出一个最终的(主机名,检索词向量)。
  • 倒排索引:Map 函数分析每个文档输出一个(词,文档号)的列表,Reduce 函数的输入是一个给定词的所有(词,文档号),排序所有的文档号,输出(词,list(文档号))。所有的输出集合形成一个简单的倒排索引,它以一种简单的算法跟踪词在文档中的位置。
    分布式排序:Map 函数从每个记录提取 key,输出 (key,record)。Reduce 函数不改变任何的值。这个运算依赖分区机制和排序属性。

经验分享

  • 约束编程模式使得并行和分布式计算非常容易,也易于构造容错的计算环境;
  • 网络带宽是稀有资源。大量的系统优化是针对减少网络传输量为目的的:本地优化策略使大量的数据从本地磁盘读取,中间文件写入本地磁盘、并且只写一份中间文件也节约了网络带宽。
  • 多次执行相同的任务可以减少硬件配置不平衡带来的负面影响,同时解决了由于机器失效导致的数据丢失问题。

创新之处

  • 通过简单的接口实现了自动的并行化和大规模的分布式计算,通过使用 MapReduce 模型接口实现了在大量普通 PC 机上的高性能计算。
  • 向工业界证明了 MapReduce 模型在分布式计算上的可行性,拉开了分布式计算的序幕并影响了其后所有的计算框架,包括现在流行的批处理框架 Spark 和流处理框架 Flink 都很受其影响。

不足之处

  • 基于历史局限性和当时的成本考虑,没有利用内存去更高效的处理数据,不过也为 Spark 提供了思路。
  • 没有将资料调度和计算调度分离,使得 MapReduce 系统看起来较为冗杂。在开源的 Hadoop 生态中,MapReduce 现只关注于计算,具体的资源调度由 Yarn 管理。

相关系统

  • 分布式存储系统:GFS/Colossus/HDFS
  • 批处理框架:Spark
  • 流处理框架:Flink
  • 高可用机制:Chubby/ZooKeeper

相关资料

补充MapReduce论文示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#include "mapreduce/mapreduce.h"

// User’s map function
class WordCounter : public Mapper {
public:
virtual void Map(const MapInput& input) {
const string& text = input.value();
const int n = text.size();
for (int i = 0; i < n; ) {
// Skip past leading whitespace
while ((i < n) && isspace(text[i]))
i++;

// Find word end
int start = i;
while ((i < n) && !isspace(text[i]))
i++;

if (start < i)
Emit(text.substr(start,i-start),"1");
}
}
};
REGISTER_MAPPER(WordCounter);

// User’s reduce function
class Adder : public Reducer {
virtual void Reduce(ReduceInput* input) {
// Iterate over all entries with the
// same key and add the values
int64 value = 0;
while (!input->done()) {
value += StringToInt(input->value());
input->NextValue();
}

// Emit sum for input->key()
Emit(IntToString(value));
}
};
REGISTER_REDUCER(Adder);

int main(int argc, char** argv) {
ParseCommandLineFlags(argc, argv);

MapReduceSpecification spec;

// Store list of input files into "spec"
for (int i = 1; i < argc; i++) {
MapReduceInput* input = spec.add_input();
input->set_format("text");
input->set_filepattern(argv[i]);
input->set_mapper_class("WordCounter");
}

// Specify the output files:
// /gfs/test/freq-00000-of-00100
// /gfs/test/freq-00001-of-00100
// ...
MapReduceOutput* out = spec.output();
out->set_filebase("/gfs/test/freq");
out->set_num_tasks(100);
out->set_format("text");
out->set_reducer_class("Adder");

// Optional: do partial sums within map
// tasks to save network bandwidth
out->set_combiner_class("Adder");

// Tuning parameters: use at most 2000
// machines and 100 MB of memory per task
spec.set_machines(2000);
spec.set_map_megabytes(100);
spec.set_reduce_megabytes(100);

// Now run it
MapReduceResult result;
if (!MapReduce(spec, &result)) abort();

// Done: ’result’ structure contains info
// about counters, time taken, number of
// machines used, etc.

return 0;
}

【论文阅读】MapReduce: Simplified Data Processing on Large Clusters
http://example.com/2023/11/05/MapReducePaperRead/
作者
滑滑蛋
发布于
2023年11月5日
许可协议