MapReduce能够执行大型数据集间的“连接”(join)操作,但是,自己从头写相关代码来执行连接的确非常棘手。除了写MapReduce程序,还可以考虑采用更高级的框架,如Pig、Hive、Cascading、Cruc或Spark等,它们都将连接操作视为整个实现的核心部分。
先简要地描述待解决的问题。假设有两个数据集:气象站数据库和天气记录数据集,并考虑如何合二为一。一个典型的查询是:输出各气象站的历史信息,同时各行记录也包含气象站的元数据信息。
连接操作的具体实现技术取决于数据集的规模及分区方式。如果一个数据集很大(例如天气记录)而另外一个集合很小,以至于可以分发到集群中的每一个节点之中(例如气象站元数据),则可以执行一个MapReduce作业,将各个气象站的天气记录放到一块(例如,根据气象站ID执行部分排序),从而实现连接。mapper或reducer根据各气象站ID从较小的数据集合中找到气象站元数据,使元数据能够被写到各条记录之中。
连接操作如果由mapper执行则称为“map端连接”;如果由reducer执行,则称为“reduce端连接”。
如果两个数据集的规模均很大,以至于没有哪个数据集可以被完全复制到集群的每个节点,我们仍然可以使用MapReduce来进行连接,至于到底采用map端连接还是reduce端连接,则取决于数据的组织方式。最常见的一个例子便是用户数据库和用户活动日志(例如访问日志)。对于一个热门服务来说,将用户数据库(或日志)分发到所有MapReduce节点中是行不通的。
1.map端连接
在两个大规模输入数据集之间的map端连接会在数据到达map函数之前就执行连接操作。为达到该目的,各map的输入数据必须先分区并且以特定方式排序。各个输入数据集被划分成相同数量的分区,并且均按相同的键(连接键)排序。同一键的所有记录均会放在同一分区之中。听起来似乎要求非常严格(的确如此),但这的确合乎MapReduce作业的输出。
map端连接操作可以连接多个作业的输出,只要这些作业的reducer数量相同、键相同并且输出文件是不可切分的(例如,借助于小于一个HDFS块,或进行gzip压缩来实现)。在天气例子中,如果气象站文件以气象站ID部分排序,记录文件也以气象站ID部分排序,而且reducer的数量相同,则就满足了执行map端连接的前提条件。
利用org.apache.hadoop.mapreduce.join中的CompositeInputFormat类来运行一个map端连接。CompositeInputFormat类的输入源和连接类型(内连接或外连接)可以通过一个连接表达式进行配置,连接表达式的语法简单。详情与示例可参见包文档。
org.apache.hadoop.examples.Join是一个通用的执行map端连接的命令行程序样例。该例运行一个基于多个输入数据集的mapper和reducer的MapReduce作业,以执行给定的连接操作。
2.reduce端连接
由于reduce端连接并不要求输入数据集符合特定结构,因而reduce端连接比map端连接更为常用。但是,由于两个数据集均需经过MapReduce的shuffle过程,所以reduce端连接的效率往往要低一些。基本思路是mapper为各个记录标记源,并且使用连接键作为map输出键,使键相同的记录放在同一个reducer中。以下技术能帮助实现reduce端连接。
(1)多输入(www.zuozong.com)
数据集的输入源往往打多种格式,因此可以使用MultipleInputs类来方便地解析和标注各个源。
(2)辅助排序
reducer将从两个源中选出键相同的记录,但这些记录不保证是经过排序的。然而,为了更好地执行连接操作,一个源的数据排列在另一个源的数据前是非常重要的。以天气数据连接为例,对应每个键,气象站记录的值必须是最先看到的,这样reducer能够将气象站名称填到天气记录之中再马上输出。虽然也可以不指定数据传输次序,并将待处理的记录缓存在内存之中,但应该尽量避免这种情况,因为其中任何一组的记录数量可能非常庞大,远远超出reducer的可用内存容量。
为标记每个记录,我们使用TextPair类,包括键(存储气象站ID)和“标记”。在这里,“标记”是一个虚拟的字段,其唯一目的是用于记录的排序,使气象站记录比天气记录先到达。一种简单的做法就是:对于气象站记录,“标记”值为0;对于天气记录,“标记”值为1。下面两个示例分别描述了执行该任务的两个mapper类。
例:在reduce端连接中,标记气象站记录的mapper
例:在reduce端连接中标记天气记录的mapper
reducer知道自己会先接收气象站记录。因此从中抽取出值,并将其作为后续每条输出记录的一部分写到输出文件。如下例所示。
例:用于连接已标记的气象站记录和天气记录的reducer
上述代码假设天气记录的每个气象站ID恰巧与气象站数据集中的一条记录准确匹配。如果该假设不成立,则需要泛化代码,使用另一个TextPair将标记放入值的对象中。reduce()方法在处理天气记录之前,要能够区分哪些记录是气象站名称,检测(和处理)缺失或重复的记录。
将作业连接在一起通过驱动类来完成。这里,关键点在于根据组合键的第一个字段(即气象站ID)进行分区和分组,即使用一个自定义的partitioner(即KeyPartitioner)和一个自定义的分组comparator(FirstComparator,作为TextPair的嵌套类)。
例:对天气记录和气象站名称执行连接操作
在样本数据上运行这个程序,获得以下输出:
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。