Distributed Systems
Google MapReduce
Mar 20, 2020

无论是出于对分布式系统的兴趣,还是对一个未来的 Hadooper 来说,这篇论文都值得一读。号称为 Google “三驾马车”之一,同时也作为6.824 LEC 1的 preparation, 其重要性也随之体现。趁着时间充裕,抓紧读了读并尝试着手进行后面的 lab。

概念

Google 提出了一种 Map-Reduce 的模型。map 将一对键值对处理成另外一组中间键值对, reduce 将这些中间键值对中 key 相同的合并起来。 系统关注的除了处理数据本身之外,还有调度机器之间程序的运行、处理机器故障以及管理机器之间的通信。

编程模型

就如前面提到的,用户编写的 Map 将所有的输入处理成中间状态的键值对,MapReduce 根据中间状态的 Key 将其整理到一起(因为数据在不同的机器上),并将其交给 Reduce。

之后,同样是用户编写的 Reduce 将 Key 相同的中间键值对合并到一起。合并的数据集可能只是一小部分。

下面是一个单词计数的例子:

map (String key, String value)
    // key document name
    // value document content
    for each word w int value:
        EmitIntermediate (w, "1");

reduce (String key, Iterator value)
    // key a word
    // values a list of counts
    int result = 0;
    for each v in values:
        result += ParseInt (v);
    Emit(AsString (result));

map 函数将出现过的单词与其出现过的次数相关联,这里是1。 reduce 函数将相同单词出现的次数求和。

比如,输入以下数据

hello Java
hello C
hello Java
hello C

map 将其处理之后,变成

hello 1
Java 1
hello 1
C 1
hello 1
Java 1
hello 1
C 1

其间通常会对这些数据进行 shuffle

hello 1
hello 1
hello 1
hello 1
Java 1
Java 1
C 1
C 1

最后,reduce 将这些数据合并

hello 4
Java 2
C 2

实现

执行过程的概括

输入数据被自动分成 M 份,然后分发给不同的机器上进行 map 处理。中间状态的 Key 被分成 R 份(比如 hash(key) mod R),然后被不同机器进行 reduce。其中 R 是由用户设置的。

MapReduce 首先将输入数据分成 M 份,之后在许多机器上启动 MapReduce 程序。其中有一个特殊的 master,它给其他的 workers 分配工作。共有 M 个 map 和 R 个 reduce 任务,master 将其中之一分配给空闲的 worker。map worker 读取输入文件,生成的中间键值对被缓存在内存中,这些中间键值对被定期地写入硬盘。worker 将这些数据的存储位置告诉 master,master 负责将这些位置告诉 reduce worker 以便其处理。reduce worker 得知这些地址以后,发起远程过程调用从 map worker 的硬盘上读取中间键值对,并在处理完成后将结果附加到输出文件中。

简单来说,就是 master 将任务分配给 map 和 reduce 两种 worker。map worker 处理完成之后存在本地,master 通知 reduce worker 来读取数据并处理。

Master 的数据结构

对于每个 map 和 reduce woker, master 需要储存他们的状态(空闲,正在处理,已完成),每个工作中的 worker 的标识。同时,master 也是中间数据从 map 到 reduce 传输的渠道。所以 master 也要储存其位置和 R 个中间键值对的大小。这两个数据在 map 完成时更新,之后被增量同步到 reduce 那里去。

我的一种实现:

type Master struct {
	/*
		each map and reduce task has a *state*: idle, in-progress, or completed
		here assigns:
		0 idle
		1 in-progress
        2 completed
        3 failed

		worker identity - status
	*/
	State map[string]int

	/*
		identity of each worker
	*/

	/*
		a work request *one* job, read *one or more* input files,
		write the task's output to *one or more* files

		to simplify, each task may have one or more input files,
		but one input file matches one output file

		intermediate file locations from map worker to reduce workers
	*/

	Locations []string

	//sizes of the R intermediate file regions produced by map task

	Sizes []string
}

容错

Worker

master 定期向 worker 发送心跳包,当一定时间时候如果没有收到回复,master 就会将其标记为 failed。所有该 worker 执行、完成的任务都会被其他 worker 重新执行(reduce worker 完成的除外,它的输出以及被存在 GFS 上了)。如果一个 map worker 执行的任务失败了并由其他 worker 接手,那么正在执行的 reduce 任务都会被重新执行。

Master

master fail 后会启动一个新的 master,从最新的检查点继续执行。

论文中需要注意的细节

优化

中间状态键值对被分成多份数据。在同一份数据中,键值对是以 Key 的顺序排列的(上面提到的 shuffle)。


可以看出,除去与分布式系统部分相关的功能来说,这个系统很容易理解。需要关注的点在 map 和 reduce 的处理(但这实际上是用户负责编写的),master 与 worker 之间的 RPC 及任务调度上。但是被忽略掉的往往就是最关键的的共识机制,而分布式系统的魅力也在于此。