2.2 分布式计算编程模型MapReduce

MapReduce是Google提出的一个软件架构,是一种处理海量数据的并行编程模型,用于大规模数据集(通常大于1TB)的并行运算。Map(映射)、Reduce(化简)的概念和主要思想,都是从函数式编程语言和矢量编程语言借鉴来的。正是由于MapReduce有函数式编程语言和矢量编程语言的共性,这种编程模型特别适合非结构化和结构化的海量数据的搜索、挖掘、分析与机器智能学习等。

2.2.1 产生背景

MapReduce这种并行编程模型思想最早是在1995年被提出的,Darlington等人首次提出了“map”和“fold”的概念,与Google现在所使用的“Map”和“Reduce”的思想相吻合。

与传统的分布式程序设计相比,MapReduce封装了并行处理、容错处理、本地化计算、负载均衡等细节,还提供了一个简单而强大的接口。通过这个接口,可以把大尺度的计算自动地并发和分布执行,使编程变得非常容易。另外,MapReduce也具有较好的通用性,大量不同的问题都可以简单地通过MapReduce来解决。

MapReduce把对数据集的大规模操作,分发给一个主节点管理下的各分节点共同完成,通过这种方式实现任务的可靠执行与容错机制。在每个时间周期,主节点都会对分节点的工作状态进行标记。一旦将分节点状态标记为死亡状态,则这个节点的所有任务都将分配给其他分节点重新执行。

据相关统计,每使用一次Google搜索引擎,Google的后台服务器就要进行1011次运算。这么庞大的运算量,如果没有好的负载均衡机制,有些服务器的利用率会很低,有些则会负荷太重,有些甚至可能死机,这些都会影响系统对用户的服务质量。而使用MapReduce这种编程模型,就保持了服务器之间的均衡,提高了整体效率。

2.2.2 编程模型

MapReduce的运行模型如图2-2所示。图中有M个Map操作和R个Reduce操作。

简单地说,一个Map函数就是对一部分原始数据进行指定的操作。每个Map操作都针对不同的原始数据,因此Map与Map之间是互相独立的,这使它们可以充分并行化。一个Reduce操作就是对每个Map产生的一部分中间结果进行合并操作,每个Reduce所处理的Map中间结果是互不交叉的,所有Reduce产生的最终结果经过简单连接就形成了完整的结果集,因此Reduce也可以在并行环境下执行。

在编程的时候,开发者需要编写两个主要函数:

图2-2 MapReduce的运行模型

Map和Reduce的输入参数及输出结果根据应用的不同而有所不同。Map的输入参数是in_key和in_value,它指明了Map需要处理的原始数据是哪些。Map的输出结果是一组<key,value>对,这是经过Map操作后所产生的中间结果。在进行Reduce操作之前,系统已经将所有Map产生的中间结果进行了归类处理,使得相同key对应的一系列value能够集结在一起提供给一个Reduce进行归并处理,也就是说,Reduce的输入参数是(key,[value1,…,valuem])。Reduce的工作是对这些对应相同key的value值进行归并处理,最终形成(key,final_value)的结果。这样,一个Reduce处理了一个key,所有Reduce的结果并在一起就是最终结果。

例如,假设我们想用MapReduce来计算一个大型文本文件中各单词出现的次数,Map的输入参数指明了需要处理哪部分数据,以“(在文本中的起始位置,需要处理的数据长度)”表示,经过Map处理,形成一批中间结果“<单词,出现次数>”。而Reduce处理这些中间结果,将相同单词出现的次数进行累加,得到每个单词总的出现次数。

2.2.3 实现机制

MapReduce操作的执行流程如图2-3所示。

用户程序调用MapReduce函数后,会引起下面的操作过程(图2-3中的数字标示和下面的数字标示相同)。

(1)MapReduce函数首先把输入文件分成M块,每块大概16~64MB(可以通过参数决定),接着在集群的机器上执行分派处理程序。

(2)分派处理程序中有一个程序比较特别,它是主控程序Master。剩下的分派处理程序都作为Master分派工作的Worker(工作机)。总共有M个Map任务和R个Reduce任务需要分派,Master选择空闲的Worker来分配这些Map或Reduce任务。

图2-3 MapReduce操作的执行流程

(3)一个被分配了Map任务的Worker读取并处理相关的输入块。它处理输入的数据,并且将分析出的<key,value>对传递给用户定义的Map函数。Map函数产生的中间结果<key,value>对暂时缓冲到内存。

(4)这些缓冲到内存的中间结果将被定时写到本地硬盘,这些数据通过分区函数分成R个区。中间结果在本地硬盘的位置信息将被发送回Master,然后Master负责把这些位置信息传送给Reduce Worker。

(5)当Master通知Reduce Worker关于中间<key,value>对的位置时,它调用远程过程,从Map Worker的本地硬盘上读取缓冲的中间数据。当Reduce Worker读到所有的中间数据时,它就使用中间key进行排序,这样可使有相同key的值都在一起。因为有许多不同key的Map都对应相同的Reduce任务,所以,排序是必需的。如果中间结果集过于庞大,那么就需要使用外排序。

(6)Reduce Worker根据每个唯一中间key来遍历所有排序后的中间数据,并且把key和相关的中间结果值集传递给用户定义的Reduce函数。Reduce函数的结果将被写入一个最终的输出文件。

最终,当所有的Map任务和Reduce任务都完成的时候,Master激活用户程序。此时MapReduce返回用户程序的调用点。

由于MapReduce在成百上千台机器上处理海量数据,因此容错机制是不可或缺的。总的来说,MapReduce通过重新执行失效的地方来实现容错。

1.Master失效

Master会周期性地设置检查点(Checkpoint),并导出Master的数据。一旦某个任务失效,系统就从最近的一个检查点恢复并重新执行。由于只有一个Master在运行,如果Master失效了,则只能终止整个MapReduce程序的运行并重新开始。

2.Worker失效

相对于Master失效而言,Worker失效算是一种常见的状态。Master会周期性地给Worker发送ping命令,如果有Worker没有应答,则Master认为该Worker失效,终止对这个Worker的任务调度,把失效Worker的任务调度到其他Worker上重新执行。

2.2.4 案例分析

排序通常用于衡量分布式数据处理框架的数据处理能力,下面介绍如何利用MapReduce进行数据排序。假设有一批海量的数据,每个数据都是由26个字母组成的字符串,原始的数据集是完全无序的,怎样通过MapReduce完成排序工作,使其有序(字典序)呢?可通过以下三个步骤来完成。

(1)对原始的数据进行分割(Split),得到N个不同的数据分块,如图2-4所示。

图2-4 数据分块

(2)对每个数据分块都启动一个Map进行处理。采用桶排序的方法,按照首字母将每个Map中的字符串分配到26个不同的桶中。图2-5是Map的过程及其得到的中间结果。

图2-5 Map的过程及其得到的中间结果

(3)对于Map得到的中间结果,启动26个Reduce。按照首字母将Map中不同桶中的字符串集放置到相应的Reduce中进行处理。具体来说就是首字母为a的字符串全部放在Reduce1中处理,首字母为b的字符串全部放在Reduce2中处理,以此类推。每个Reduce对于其中的字符串进行排序,结果直接输出。由于Map过程中已经做到了首字母有序,Reduce输出的结果就是最终的排序结果。这一过程如图2-6所示。

图2-6 Reduce过程

从上述过程中可以看出,由于能够实现处理过程的完全并行化,因此利用MapReduce处理海量数据是非常合适的。