4.7 分布式数据处理
分布式系统通常用于收集、访问和操作大型数据集。 例如,本章前面描述的数据库系统可以操作存储在多台机器上的数据集。 任何一台机器都不可能包含响应查询所需的数据,因此需要通信来处理请求。
本节研究一个典型的大数据处理场景,在这个场景中,一个数据集太大,无法由一台机器处理,而是分布在多台机器之间,每台机器处理数据集的一部分。处理的结果通常必须跨机器聚合,以便一台机器的计算结果可以与其他机器结合。为了协调这种分布式数据处理,我们将讨论一个称为MapReduce的编程框架。
使用MapReduce创建分布式数据处理应用程序结合了本文中提出的许多思想。 应用程序是用纯函数表示的,这些纯函数用于映射大型数据集,然后将映射的值序列减少到最终结果中。
在MapReduce程序中,使用函数式编程中熟悉的概念来最大限度地发挥优势。MapReduce要求用于映射和减少数据的函数是纯函数。一般来说,仅用纯函数表示的程序在如何执行方面具有相当大的灵活性。子表达式可以以任意顺序并行计算,而不会影响最终结果。MapReduce应用程序并行地评估许多纯函数,重新排序计算,以便在分布式系统中有效地执行。
MapReduce的主要优点是:
处理数据和组合结果的map和reduce函数。
机器之间的通信和协调。
协调机制处理分布式计算中出现的许多问题,如机器故障、网络故障和进度监控。虽然管理这些问题会给MapReduce应用程序带来一些复杂性,但应用程序开发人员不会看到这些复杂性。相反,构建一个MapReduce应用程序只需要指定上面(1)中的map和reduce函数;分布式计算的挑战通过抽象来隐藏。
4.7.1 MapReduce
MapReduce框架假设输入的是一个任意类型的大的、无序的输入流。例如,在一些庞大的语料库中,每个输入可能是一行文本。计算分三个步骤进行。
map函数应用于每个输入,输出任意类型的零个或多个中间键值对。
中间键值对都按键进行分组,这样具有相同键值的对就可以归纳到一起。
reduce函数结合给定键k的值;它输出0个或多个值,每个值在最终输出中与k相关联。
为了执行这个计算,MapReduce框架创建任务(可能在不同的机器上),这些任务在计算中扮演不同的角色。map任务将map函数应用于输入数据的某个子集并输出中间键-值对。reduce任务按键对键值对进行排序和分组,然后对每个键的值应用reduce函数。 map和reduce任务之间的所有通信都由框架处理,正如按键分组中间键-值对的任务一样。
为了在MapReduce应用程序中使用多台机器,多个mapper在一个map阶段并行运行,多个reducers在一个reduce阶段并行运行。在这些阶段之间,sort阶段通过对键值对进行排序将它们分组在一起,以便具有相同键值的所有键值对都相邻。
考虑一个文本语料库中元音的计数问题。 我们可以使用MapReduce框架来解决这个问题,并选择适当的map和reduce函数。 map函数接受一行文本作为输入,并输出键-值对,其中键是元音,值是一个计数。 零计数从输出中被省略:
reduce函数是Python内置的sum函数,它接受一个遍历值(给定键的所有值)的迭代器作为输入,并返回它们的和。
4.7.2 本地实现
为了指定一个MapReduce应用程序,我们需要一个MapReduce框架的实现,我们可以在其中插入map和reduce函数。 在下一节中,我们将使用开源Hadoop实现。 在本节中,我们将使用Unix操作系统的内置工具开发一个最小的实现。
Unix操作系统在用户程序和计算机的底层硬件之间创建了一个抽象屏障。它为程序之间的通信提供了一种机制,特别是通过允许一个程序使用另一个程序的输出。 在他们关于Unix编程的开创性著作中,Kernigham和Pike断言到:“系统的力量更多地来自程序之间的关系,而不是程序本身。”
通过在第一行添加注释,指示应该使用Python 3解释器执行程序,可以将Python源文件转换为Unix程序。Unix程序的输入是一个可迭代对象,称为标准输入,可以通过sys.stdin访问。 在这个对象上迭代会产生具有字符串值的文本行。 Unix程序的输出称为标准输出,可以通过sys.stdout访问。内置的print函数将一行文本写入标准输出。下面的Unix程序将其输入的每一行反向写入输出:
如果将该程序保存到一个名为rev.py的文件中,则可以将其作为Unix程序执行。首先,我们需要告诉操作系统我们已经创建了一个可执行程序:
接下来,我们可以向这个程序传递输入。一个程序的输入可以来自另一个程序。这种效果是使用|符号(称为“管道”)实现的,它将程序在管道之前的输出传输到管道之后的程序中。nslookup程序输出IP地址的主机名(本例中为New York Times):
cat程序输出文件的内容。因此,rev.py程序可用于反转rev.py文件的内容:
这些工具足以让我们实现一个基本的MapReduce框架。这个版本只有一个map任务和一个reduce任务,它们都是用Python实现的Unix程序。我们使用以下命令运行一个完整的MapReduce应用程序:
mapper.py和reducer.py程序必须实现map函数和reduce函数,以及一些简单的输入和输出行为。例如,为了实现上面描述的元音计数应用程序,我们将编写以下count_vowels_mapper.py程序:
此外,我们将编写如下的sum_reducer.py程序:
mr模块是本文的附带模块,它提供emit函数来发送键-值对,以及group_values_by_key函数来将具有相同键的值组合在一起。该模块还包括MapReduce的Hadoop分布式实现接口。最后,假设我们有以下名为haiku.txt的输入文件:
使用Unix管道的本地执行给我们俳句中每个元音的计数:
4.7.3 分布式实现
Hadoop是MapReduce框架的一个开源实现,它在一个机器集群上执行MapReduce应用程序,分发输入数据和计算,以实现高效的并行处理。它的流接口允许任意Unix程序定义map和reduce函数。 事实上,count_vowels_mapper.py和sum_reducer.py可以直接与Hadoop安装一起使用,以计算大型文本语料库上的元音计数。
与我们简单的本地MapReduce实现相比,Hadoop提供了一些优势。 首先是速度:map和reduce函数是在不同机器上同时运行的不同任务下并行应用的。 第二个是容错:当一个任务由于任何原因失败时,它的结果可以由另一个任务重新计算,以完成整个计算。 第三个是监控:框架提供了一个用户界面来跟踪MapReduce应用程序的进度。
为了使用提供的mapreduce.py模块运行元音计数应用程序,安装Hadoop,将Hadoop的赋值语句更改为本地安装的根目录,将一组文本文件复制到Hadoop分布式文件系统中,然后运行:
其中[input]和[output]是Hadoop文件系统中的目录。有关Hadoop流接口和系统使用的更多信息,请参阅Hadoop流文档。
Last updated
Was this helpful?