第五章 健康医疗大数据处理技术

面对人们越来越多的医疗卫生服务需求,医院传统系统架构无法及时应对大数据的机遇和挑战,越来越多的医疗机构开始充分利用健康医疗大数据的处理技术,加速改变医院信息化的传统模式,实现数字化医院建设的转型升级。利用先进的大数据技术,将医疗体系中分离的医疗资源通过电子化的方式整合,实现医疗信息、资源的共享与互动,从而改善医疗服务质量,提升医疗信息化的灵活性和效率,实现系统整体部署、按需配置、集约管理,满足其大规模应用中的多种要求。

第一节 医学大数据技术的分类

从大数据的自身特征及产生领域来看,大数据的来源广泛,数据类型和处理方法千差万别,但大数据的基本处理流程都是一致的。按照大数据处理的实际需要和一般过程,可将大数据领域里的技术分为大数据采集、存储、处理和呈现等相关技术。对于“小数据”而言,一般数据来源单一、数据集规模较小,因而不需要太大的存储介质和太强的计算处理能力,往往采用现有关系型数据库技术或者并行数据仓库等技术即可实现存储、处理和利用。但是,大数据环境下数据来源广泛、数据的类型更加多样,需要收集、存储和处理分析的数据体量庞大,对数据的应用及展现能力要求很高,并且十分注重数据处理的时效性及可用性。按照大数据处理的一般流程,大数据技术可以按以下方式分类:

一、大数据采集技术

大数据采集是指通过各种方式获得数量庞大、类型众多的结构化、半结构化及非结构化的海量数据,是大数据处理流程中最基础的一步。在数据的实时性和可靠性的要求下,需要实现以分布式平台为基础的高速、高可靠数据的抓取或采集(extract)数据全映像的大数据收集技术,实现高速数据解析、转换(transform)与装载(load)的大数据整合技术,以及实现数据一致性与安全性保证的大数据安全技术。

二、大数据存储和管理技术

大数据存储与管理要同时解决大数据在物理层面和逻辑层面的存储和管理问题。在物理层面上,需要构建可靠的分布式文件系统,例如HDFS,提供高可用的、高容错的、弹性可配置的、高效低成本的大数据存储技术。在逻辑层面上,需要研究大数据建模技术,提供分布式的非关系型大数据管理与处理能力,异构数据的数据融合和数据组织能力。

三、大数据分析技术

大数据分析是整个大数据处理流程中最核心的组成部分,通过分析大数据,发掘数据中的价值,传统的数据处理分析方法已经不能满足大数据环境下数据分析的需求,在海量数据的前提下,单纯地依靠单服务器的计算能力,已经不能满足大数据处理时效性的要求,可以通过MapReduce等并行处理技术来提高数据的处理速度,并使系统具备可扩展性和高可用性等特征。

四、大数据应用技术

将大数据分析结果解释与呈现给用户是大数据处理过程的最终目的。传统的数据显示方式不能满足大数据分析结果复杂性和数据量大的要求。
从加州大学伯克利分校提出的应用于数据分析的软件栈视角出发,大数据处理方式可以总结为如下三个类型:批量数据处理(batch data processing)、基于实时数据流的数据处理(streaming data processing)、基于历史数据的交互式查询(interactive query)。关于这三种类型大数据处理,现在已经有很多比较成熟的开源框架可以处理,如可以采用MapReduce、Spark架构进行批量数据处理;可以采用Storm、Spark Streaming进行流式计算;可以采用Google Dremel、Mpp技术和Hive进行交互式查询。下文将从以上三个方面具体阐述医疗行业大数据常用的处理技术。

第二节 批处理技术

一、批处理技术的概念

单家医院数据通常在几TB级别(不含影像),但随着精准医疗的发展和基因数据的引入,以及医疗影像越来越广泛地使用,再加上医疗行业的要求,医疗数据必须长期保存,医疗数据量正在经历快速增长。在医疗大数据的处理过程中,不可避免地会产生大量临时数据存储需求,医学大数据平台的存储必须是可扩展的,而且成本要可控。此时需要用到批处理技术对大量的静态医学数据或离线数据进行计算和处理,该技术手段主要是为了解决规模较大的非实时性数据分析,它更加关注计算框架的数据吞吐量。
在目前的批处理计算框架中,最具代表性的当属Google公司设计的Map Reduce架构。Map Reduce在海量数据环境、需要保证可伸缩性的前提下,通过使用合适的查询优化和索引技术提供较好的数据处理性能。MapReduce模型简单、易于理解、易于使用,能将一部分比较繁琐的细节隐藏起来,最大程度地简化程序员的工作。此外,机器学习和数据挖掘算法等大量数据处理问题,都能够利用MapReduce实现。

二、MapReduce编程框架

MapReduce是一种编程抽象模型,通过简单的接口来实现自动的并行化和大规模的分布式计算,保证解决方案高效、可伸缩。MapReduce一词最早由Google公司在一篇名为 MapReduce-Simplified Data Processing on Large Clusters的论文中提出,文中提到设计这个抽象模型的灵感来自Lisp和许多其他函数式语言的Map和Reduce的原语,由于意识到大多数的运算都包含这样的操作:在输入数据的逻辑记录上应用Map操作得出一个中间key/value pair集合,然后在所有具有相同key值的value值上应用Reduce操作,从而达到合并中间的数据,得到一个想要的结果。使用MapReduce模型,再结合用户实现的Map和Reduce函数,就可以非常容易地实现大规模并行化计算。
(一)编程模型
MapReduce编程模型的原理是:利用一个输入key/value pair集合来产生一个输出的key/value pair集合。MapReduce库的用户用两个函数表达这个计算——Map和Reduce。
用户自定义的Map函数接收一个输入的key/value pair值,然后产生一个中间key/value pair值的集合。映射器(mapper)签名如下:
Map():(Key1,Value1)->[(Key2,Value2)]
MapReduce库把所有具有相同中间key值的中间value值集合在一起,然后传递给Reduce函数。
用户自定义的Reduce函数接收一个中间key的值I和相关的一个value值的集合。Reduce函数合并这些value值,形成一个较小的value值的集合。一般情况下,每次Reduce函数调用只产生0或1个输出value值。通常通过一个迭代器把中间value值提供给Reduce函数,这样就可以处理无法全部放入内存中的大量value值的集合。
图5-1为一个基本模型,图中输入数据被划分为5个分区,每个分区分别发送一个映射器。各个映射器会生成任意数目的键-值对。当所有的映射器完成工作后,这些键会经过排序、洗牌,然后发送给规约器。最后,规约器将生成所需的输出,每个规约器同样可以包含任意数目的新键-值对。
图5-1 MapReduce数据流程
(二)执行概括
通过将Map调用的输入数据自动分割为M个数据片段的集合,Map调用被分布到多台机器上执行。输入的数据片段能够在不同的机器上并行处理。使用分区函数将Map调用产生的中间key值分成R个不同分区[如hash(key)mod R],Reduce调用也被分布到多台机器上执行。分区数量R和分区函数由用户来指定。
图5-2展示了MapReduce实现中操作的全部流程。当用户调用MapReduce函数时,将发生下面的一系列动作:
1.用户程序首先调用的MapReduce库将输入文件分成M个数据片段,然后用户程序在机群中创建大量的程序副本。
2.这些程序副本中有一个特殊的程序——Master。副本中其他的程序都是Worker程序,由Master分配任务。有M个Map任务(对应数据分片)和R个Reduce任务将被分配,Master将一个Map任务或Reduce任务分配给一个空闲的Worker。
3.被分配了Map任务的Worker程序读取相关的输入数据片段,从输入的数据片段中解析出key/value pair,然后把key/value pair传递给用户自定义的Map函数,由Map函数生成并输出中间key/value pair,并缓存在内存中。
4.缓存中的key/value pair通过分区函数分成R个区域,之后周期性地写入到本地磁盘上。缓存的key/value pair在本地磁盘上的存储位置将被回传给Master,由Master负责把这些存储位置再传送给Reduce Worker。
图5-2 MapReduce执行概括
5.当Reduce Worker程序接收到Master程序发来的数据存储位置信息后,使用RPC从Map Worker所在主机的磁盘上读取这些缓存数据。当Reduce Worker读取了所有的中间数据后,通过对key进行排序后使得具有相同key值的数据聚合在一起。由于许多不同的key值会映射到相同的Reduce任务上,因此必须进行排序。如果中间数据太大,无法在内存中完成排序,那么就要在外部进行排序。
6.Reduce Worker程序遍历排序后的中间数据,对于每一个唯一的中间key值,Reduce Worker程序将这个key值和它相关的中间value值的集合传递给用户自定义的Reduce函数。Reduce函数的输出被追加到所属分区的输出文件。
7.当所有的Map和Reduce任务都完成之后,Master唤醒用户程序。在这个时候,在用户程序里的对MapReduce调用才返回。
在成功完成任务之后,MapReduce的输出存放在R个输出文件中(对应每个Reduce任务产生一个输出文件,文件名由用户指定)。一般情况下,不需要将这R个输出文件合并成一个文件,这些文件经常作为另外一个MapReduce的输入,或者在另外一个可以处理多个分割文件的分布式应用中使用。
上述执行流程表明,MapReduce是一个不共享数据处理模型,意味着所有映射器可以独立地工作,而且映射器完成它们的任务时,规约器也能独立地开始工作(映射器和规约器之间不共享任何数据或临界区)。基于这种不共享范式,能有效提高并行性。
下面从Map任务和Reduce任务的层次来分析工作原理(MapReduce on Hadoop,Google的MapReduce实现是一个专用解决方案,没有开源)。
图5-3展示了MapReduce操作的其中一条流程。
1.每个输入分片会让一个Map任务来处理,Map输出的结果会暂时放在一个环形内存缓冲区中,当该缓冲区快要溢出时,会在本地文件系统中创建一个溢出文件,将该缓冲区中的数据写入这个文件。
2.在写入磁盘之前,后台线程首先根据Reduce任务的数目将数据划分为相同数目的分区,也就是一个Reduce任务对应一个分区的数据。这样做是为了避免有些Reduce任务分配到大量数据,而有些Reduce任务却被分配到很少数据,甚至没有被分配到数据的尴尬局面。其实分区就是对数据进行Hash的过程。然后对每个分区中的数据进行排序,如果此时设置了Combiner,将排序后的结果进行Combine操作,这样做的目的是让尽可能少的数据写入到磁盘。
3.当Map任务输出最后一个记录时,可能会有很多的溢出文件,这时需要将这些文件合并。合并的过程中会不断地进行排序和Combine操作,目的有两个:①尽量减少每次写入磁盘的数据量;②尽量减少下一复制阶段网络传输的数据量。最后合并成一个已分区且已排序的文件。为了减少网络传输的数据量,这里可以对数据进行压缩。
图5-3 MapReduce执行流程
4.Reduce会接收到不同Map任务传来的数据,并且每个Map传来的数据都是有序的。如果Reduce接收的数据量足够小,则直接存储在内存中;如果数据量超过了该缓冲区大小的一定比例,则对数据合并后溢写到磁盘中。
5.随着文件的增多,后台线程会将它们合并成一个更大的有序的文件,最后一次合并的结果直接输入到Reduce函数,处理后得到输出数据。
(三)适用场景
很多情况下MapReduce都很适用:①必须处理大量数据;②需要利用并行分布式计算、数据存储与数据本地化;③可以独立地完成很多任务而无须同步;④可以利用排序和洗牌;⑤需要容错性,不能接收任务失败。
下面情况并不是很适合使用Mapreduce:①需要迭代方法,比如K均值聚类中,需要反复地读取之前的迭代结果(可以使用类似Spark的数据集位于内存流中的分布式架构);②一个值的计算依赖于上一个计算的值,比如斐波那契数列;③数据集很小,完全可以在一台机器上计算;④需要同步处理共享数据;⑤一个操作依赖其他操作;⑥基本操作是处理器敏感型操作。
MapReduce是实现分布式计算的一项开创性技术,主要设计用于批处理,所以不要期望它能够在非常短的时间内迅速得到结果,当然适当地使用集群,可以得到近实时的响应。

三、Spark架构和原理

Spark是加州大学伯克利分校的AMP Labs开发的开源分布式轻量级通用计算框架,基于内存设计。对比MapReduce,Spark继承了MapReduce的线性扩展性和容错性,同时对它做了一些重要扩展(图5-4)。
首先,Spark摒弃了MapReduce先Map再Reduce这样的严厉方式,Spark引擎可以执行更通用的有向无环图(DAG)算子,在MapReduce中需要将中间结果写入分布式文件系统时,Spark能将中间结果直接传到流水作业线的下一步。一个Spark流程可以包含多个MapReduce任务,具有更高级的抽象,它类似于Dryad,Dryad也是从MapReduce衍生出来的,起源于微软研究院。
图5-4 Spark软件栈
其次,Spark补充了这种能力,通过提供许多转换操作,用户可以更自然地表达计算逻辑,更加面向开发人员。
最后,Spark扩展了MapReduce的内存计算能力。它的弹性分布式数据集(RDD)抽象,使开发人员将流水线上的任何点物化在跨集群节点的内存中,后续步骤如果需要相同的数据集时,不必重新计算或从磁盘加载。这个特性使Spark可以应用于以前分布式处理引擎无法胜任的应用场景。
Spark非常适合用于涉及大量迭代的算法,这些算法需要多次遍历相同数据集,Spark也适用于反应式应用,这些应用需要扫描大量内存数据并快速响应用户的查询。
Spark Core提供了Spark最基础与核心的功能,如图5-5所示,主要包括以下功能:
图5-5 Spark架构
1.SparkContext模块
通常而言,Driver Application的执行与输出都是通过SparkContext来完成的。在正式提交Application之前,首先需要初始化SparkContext。SparkContext隐藏了网络通信、分布式部署、消息通信、存储能力、计算能力、缓存、测量系统、文件服务、Web服务等内容,应用程序开发者只需要使用SparkContext提供的API完成功能开发。SparkContext内置的DAGScheduler负责创建Job,将DAG中的RDD划分到不同的Stage,提交Stage等功能。内置的TaskScheduler负责资源的申请、任务的提交及请求集群对任务的调度等工作。
2.存储体系
Spark优先考虑使用各节点的内存作为存储,当内存不足时才会考虑使用磁盘,这极大地减少了磁盘IO,提升了任务执行的效率,使得Spark适用于实时计算、流式计算等场景。此外,Spark还提供了以内存为中心的高容错的分布式文件系统Tachyon供用户进行选择。Tachyon能够为Spark提供可靠的内存级的文件共享服务。
3.计算引擎
计算引擎由SparkContext中的DAGScheduler、RDD以及具体节点上的Executor负责执行的Map和Reduce任务组成。DAGScheduler和RDD虽然位于SparkContext内部,但是在任务正式提交与执行之前会将Job中的RDD组织成有向无环图(DAG),并对Stage进行划分,决定了任务执行阶段任务的数量、迭代计算、Shuffle等过程。
4.部署模式
由于单节点不足以提供足够的存储和计算能力,所以作为大数据处理的Spark在SparkContext的TaskScheduler组件中提供了对Standalone部署模式的实现和Yarn(Yet another Resource Negotiator)、Mesos等分布式资源管理系统的支持。通过使用Standalone、Yarn、Mesos等部署模式为Task分配计算资源,提高任务的并发执行效率。
相对于MapReduce而言,Spark更加易用,支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。Spark支持交互式的Python和Scala的shell,这意味着可以非常方便地在这些shell中使用Spark集群来验证解决问题的方法,而不是像以前一样,需要打包、上传集群、验证等。这对于原型开发非常重要。Spark具有通用性,可以用于批处理、交互式查询(通过SparkSQL)、实时流处理(通过SparkStreaming)、机器学习(通过SparkMLlib)和图计算(通过SparkGraphX),这些不同类型的处理都可以在同一个应用中无缝使用,并且没有牺牲性能。另外,Spark具有可融合性,可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和ApacheMesos作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassandra等。这些都是Spark的优势之处。

四、BSP框架

BSP模型,即整体同步并行模型,英文全称为Bulk Synchronous Parallel,由哈佛大学Viliant和牛津大学Bill McColl在20世纪90年代提出,是一种异步的MIMD-DM模型,提供块间同步处理、块内异步并行的计算。BSP处理模型要求多个计算处理单元提供计算资源。
BSP模型的计算逻辑有一系列的迭代步组成,其中每一步迭代称为一个“超级步”。每个超级步分为三个阶段:本地计算、消息通信和全局同步。
1.本地计算
应用计算逻辑的独立计算过程。计算单元在这一阶段互相独立平行计算,计算的数据在本地的内存中存储。在该阶段,计算单元之间没有关联的针对自身的数据执行计算逻辑。
2.消息通信
在本地计算阶段完成后,各个计算单元要通过通信技术交换各自的数据信息,而没有执行任何的数据计算处理。
3.全局同步
保证每个计算单元在下一超级步开始时当前超级步中的处理全部结束,即块间的同步。全局同步虽然可能造成因各个处理单元负载及实际处理情况而导致的进度参差不齐的短板现象,即在整个计算过程中往往存在个别单元进展较慢,其他计算单元被动同步等待的情况,但是同时也避免了因异步造成的死锁问题。
采用基于BSP模型实现大规模图处理,首先由Google公司提出,处理平台名称是Pregel,是该公司专门用于处理图计算或迭代计算的系统。Google公司利用MapReduce处理80%的任务,Pregel处理20%的任务。Pregel实现提出了“以顶点为中心计算,消息传递的计算模型”,这个设计使用户的执行逻辑十分简单,用户只要控制顶点的执行逻辑即可,不需要全局考虑数据的全局处理流程。
BSP模型处理图迭代过程比MapReduce优势明显主要体现在如下方面:
1.BSP模型处理过程本身具有迭代的性质,BSP迭代过程称为超级步。BSP计算平台在执行迭代处理时不需要以云平台作业的形式串行处理而产生额外的作业提交、调度、存储过程等开销。
2.BSP处理过程中的本地计算明确说明计算针对本地存储资源的数据,因此BSP类系统的实现在原始数据的初始化加载后一般不会发生迁移变化,每次进行超级步计算时计算的对象都是初始化加载的数据。
3.BSP系统执行计算流程时,通信过程一般以消息的形式交换中间结果,保证各个处理单元上的数据在下一超级步中能够及时更新。这显然相对于MapReduce全局Shuffle处理排序及网络通信针对所有数据(原始数据及中间结果)更有优势,特别是针对中间结果数据比原始数据更小的情况特别有效。

第三节 流处理技术

大数据平台除了需要支持医疗机构数据的全量导入处理之外,必须支持新数据的增量导入。但前述的离线数据分析技术,通常需要对全量数据进行处理,在成本上是非常不合算的。因此必须要有适合的增量数据处理的架构支持。流式大数据实时处理可以为大数据驱动的深度学习提供计算框架支撑。作为一种针对流数据的实时计算模型,流式计算可以有效缩短整个链路的数据流延迟,简化实时计算逻辑,减少计算成本,最终有效满足大数据业务的实时处理需求,因而不断提升重要性,开启了未来计算的新时代。

一、流的概念

流式数据(data stream),也称数据流,是大数据环境下的一种数据形态,其理论诞生于20世纪末,并在云计算和物联网发展下逐步成为当前的研究热点。数据流与传统的数据是相对的,与静态、批处理和持久化的数据库相比,流式计算以连续、无边界和瞬时性为特征,适合高速并发和大规模数据实时处理的场景。实时的数据流,存在以下几个特征:
1.实时、高速
数据以高并发的方式迅速到达,业务计算要求快速、连续、响应,数据处理的速度至少能够匹配数据到达的速度。
2.无边界
数据到达、处理和向后传递均是持续不断的。从全局的角度看,累计数据的规模随时间推移不断扩大;从数据的局部角度看,窗口技术(滑动窗口、标界窗口等)可以限定处理数据的范围,但是这一部分数据是不断变化的,有新数据到达,同时又有数据因为过期而被移除。
3.瞬间性和有限持久性
通常情况下,原始数据在单遍扫描、处理后被丢弃,并不进行保存,只有计算结果和部分中间数据在有限时间内被保存和向后传递。
4.价值的时间偏倚性
随时间的流逝,数据中所蕴含的知识价值也在衰减,也即流中数据项的重要程度是不同的,最近到达的数据往往比先到达的数据更有价值。
数据流作为一种数据模型的研究,起源于1988年Digital Equipment Corporation的技术报告,该报告第一次给出了“data stream”这一名称和定义。2002年以后,随着在ACM SIGMOD、VLDB等数据处理顶级会议上出现了一系列研究工作和学术界发布了一批原型系统,数据流发展成为一个研究方向,特别是在云计算、物联网方兴未艾的当今,许多有影响力的互联网企业,如IBM、Yahoo、Twitter和Facebook等公司,纷纷推出了自己的产品。本质上,数据流是一类数据项以追加递增方式形成的无限集合。
数据流发展自数据库的研究领域,源于静态数据处理的传统数据库技术已经不能满足实时数据处理的需求。数据流和数据库两种数据处理技术,在基本思路和方法路线上完全不同,它们之间的对比如表5-1所示。
表5-1 数据流与数据库的比较
1.面向的数据不同 数据库适合处理静态/离线数据,而且数据之间满足关系代数的范式。数据流针对无边界、离散的在线数据,数据项与数据库的元组相比,有时间戳的概念。同时,数据流持续不断地到达,更新频率比数据库高数个量级。
2.使用模式不同 数据库维护数据,应用提交查询请求,以提取的方式获得返回结果。数据流管理系统维护查询,数据不断到达,应用通过系统的推送获得连续的结果。
3.查询实现不同 数据库可以多遍扫描,以随机访问的方式获取数据;对给定的查询通常以事务的方式一次性返回;处理结束后原始数据仍然是持久化的。数据流单遍扫描、顺序访问持续到达的数据,根据注册的查询连续返回结果;原始数据只在内存中暂存,扫描完成后丢弃,处理结果往往也是有限的持久化。

二、业界两种典型的流引擎

(一)Storm
Apache Storm是一个分布式流式计算引擎,现已成为Apache的顶级项目。像Hadoop离线处理批量数据一样,Storm可以实时处理流数据,其架构依赖Zookeeper。Storm已被许多知名企业应用于实时计算领域,如淘宝实时分析、阿里云Galaxy实时计算、携程网站性能监控等。Storm的用户可以使用多种编程语言进行实时计算的开发。
1.Storm架构
(1)Apache Storm系统采用典型的主从(Master-Slave)架构方式,如图5-6所示,由Nimbus和Supervisor两类角色构成。调度相关的信息存储在Zookeeper集群。
图5-6 Storm集群架构
(2)Nimbus是集群的Master节点,主要职责是管理和协调在集群上运行的拓扑,并对其运行过程进行监控,包括拓扑任务的发布、将任务分发给具体的进程、在任务运行异常时重新指派等。Nimbus主要通过Zookeeper将任务信息分配给Supervisor上具体的进程并且实时监控任务的执行状态。
(3)Supervisor是集群的Slave节点,其工作包括监听和接收Nimbus分发的任务,并管理自身节点上Worker进程的生命周期。一个任务拓扑通常由多个Worker来协调完成。
(4)Zookeeper是分布式服务框架。Storm主要使用Zookeeper来协调Nimbus和Supervisor,其功能包括:①存储客户端提交的任务;②存储集群服务器的状态信息和应用的配置信息;③存储Supervisor以及Worker的运行状态和心跳信号。Nimbus利用Zookeeper存储的信息来监控集群和任务的运行状态。
2.可扩展、可靠的分布式流式数据处理
(1)可集成多样化数据源的流式数据:
Storm可以集成任何消息队列和数据库系统,从中获取数据进行处理。拓扑(topology)的源组件(spout),抽象了数据获取的接口,使之可集成至Storm集群中。Storm可以通过源组件插件集成多种数据源。Storm可以将数据库作为数据源方便地集成。对于开发人员,仅需要打开数据库链接并进行数据存取操作,其他的复杂过程,如并行化、数据分片和失败后重新读取,可以由Storm完成。
(2)简便的变成概念和接口:
Storm用简单易用的API,编程人员只需要转换从数据获取的数据为自定义的数据项(tuple),便可以使用Storm的其他功能进行处理。Storm有三个主要的编程概念:源组件、处理组件(bolt)、拓扑。
1)源组件:
是流式数据的源头,是一个计算作业的起始单元,它封装数据源中的数据为Storm可以识别的数据项。
2)处理组件:
是处理过程单元,从输入流中获取一定数量的数据项处理后,将结果作为输出流发送。流式数据处理的业务逻辑大部分都是在这里实现的。
3)拓扑:
是由源组件和处理组件为点组成的网络,网络中的边便是一个处理组件订阅了某个或某些其他处理组件或源组件的输出流。拓扑可以是任意复杂多阶段流计算的网络。
(3)可扩展性:
拓扑天然具有并行性,可以跨机器甚至集群执行。拓扑中各个不同的组建,可以配置为各自不同的并行度。通过用户提交的负载平衡的命令,拓扑可以适应变化环境的集群,自动调整组建的任务在各个机器间的分布式布局。
(4)容错能力:
Storm具有适应性的容错能力。当工作进程(worker)失败时,Storm可以自动重启失败进程;当一个节点(supervisor)宕机时,其上的所有工作进程都会在其他节点被重启。对于Storm的守护进程,nimbus和supervisor被设计为无状态和快速恢复,也即当这些守护进程失败时,它们可以通过重启被恢复而不会产生其他额外的影响。
(5)数据处理过程的保障:
Storm保证每个数据项能够被完全处理。这是Storm的核心机制之一,高效地追踪到达拓扑的每一个数据项的处理过程。当处理失败或者处理超时,对应的数据项通过源组件重新获取后发送。
(6)可使用多种编程语言开发:
Storm被设计成可以使用多种语言进行编程的方式。Storm的核心实际使用了Thrift接口定义,用于约束通信协议和提交拓扑。由于Thrift跨语言的能力,拓扑可以使用多样的编程语言来描述。类似地,源组件和处理组件也是可以使用多种编程语言实现的。
(7)易于部署和操作:
Storm的集群仅需要少量的配置和安装工作,可以方便地部署和启动。这种能力使得Storm很容易被产品化。此外,在业务计算被提交后,Storm仍可以方便地进行修改和配置操作。
(8)免费和开源:
Storm在Eclipse Public License(EPL)协议下开放源码和使用。EPL协议是一个自由的开源协议,允许用户的Storm应用开源或者作为自行有产权封闭。
(二)Spark Streaming
Spark Streaming是Spark的扩展系统,用于支持连续的流式计算。作为UC Berkeley元计算软件套件的一部分,Spark Streaming是建立在Spark上的应用框架,利用Spark的底层框架作为其执行基础。
开发Spark Streaming的动机,来源于当前互联网系统处理数据的实时性需求和批处理模型之间的矛盾。例如,大型网站的统计、分析、入侵检测和垃圾邮件过滤等。Spark Streaming的设计目标就是为这类应用提供近似实时的数据处理(小于1ms的延迟)。Spark Streaming在追求低延迟处理目标的同时,还为故障和超时等提供了类似批处理系统的容错机制,力求在扩展系统规模和提供简便的编程模型的同时,最大化成本效益。在Spark框架之上,Spark Streaming实现了流式数据处理的系统,主要包含如下特征:
1.Spark Streaming的计算流程,是将流式计算分解成一系列短小的批处理作业。计算的批处理引擎是Spark,把Spark Streaming的输入数据按照设定的批大小(如1秒)分成一段段数据(discretized stream,DStream),把每一段数据都转换成Spark中的弹性分布数据集(resilient distributed dataset,RDD),然后将Spark Streaming中对Dstream的Transformation操作变为针对Spark中RDD的Transformation,并将RDD经过操作转换为中间结果保存在内存中。整个流式计算格局业务的需求可以对中间的结果进行叠加,或者存储到外部设备(图5-7)。
2.Spark Streaming的容错性,是通过Spark中RDD的容错机制实现的。每一个RDD都是一个不可变的分布式、可重算的数据集,其记录着确定性的操作继承关系,所以只要输入数据是可容错的,那么任意一个RDD分区出错或不可用,都可以利用原始输入数据通过转换操作重新算出。RDD中任意的分区出错,都可以并行地在其他机器上将实际的分区计算出来。
图5-7 Spark Streaming的架构与计算流程
3.计算的准实时性 一般短小的批处理作业范围是0.5~2秒,Spark Streaming能够满足除对实时性要求非常高(如高频实时交易)之外的所有流式准实时计算场景。Storm可配置的延迟是毫秒级的,Spark Streaming被称为“准实时”数据处理系统。
4.Spark Streaming拓展性依赖于Spark,可线性拓展到百个节点,在四核CPU节点的吞吐量可以以秒级的延迟处理6GB/s的数据量。
5.Spark Streaming的编程模型和Spark的编程模型一致,因为它们建立在同一框架模型上。对于Spark来说,编程就是对RDD的操作,对于Spark Streaming来说,就是对Dstream的操作。编程模型的高度一致,使得Spark Streaming无缝地将业务逻辑在实时处理和批处理上切换和复用。
6.Spark Streaming提供了一套高效、可容错的准实时大规模流式处理框架,它能和批处理计算及准实时查询共存于同一个软件栈。这种兼容性不仅降低了学习成本,更重要的是保障了业务逻辑的持续性和复用能力。目前从学术界开源出来的Spark已经成为Apache孵化项目,这势必会扩大它的应用范围并加速其推广。

第四节 交互式分析

一、交互式分析的概念

交互式分析,形如传统BI领域的一种分析,强调根据与用户交互而进行快速的数据分析,典型的应用就是数据钻取。例如,在BI中,可以对于数据进行切片和多粒度的聚合,从而通过多维分析技术实现数据钻取。交互式分析包含了与用户的交互设计和数据快速分析。交互式设计是一种让产品易用、有效的技术,它致力于了解目标用户和他们的期望,了解用户在同产品交互时的行为,了解用户本身的心理和行为特点,同时,还包括了解各种有效的交互方式,并对它们进行增强和扩充。交互设计还涉及多个学科,以及和多领域、多背景人员的沟通。

二、Google Dremel简介

(一)Google Dremel设计
随着Hadoop的流行,大规模的数据分析系统已经越来越普及。数据分析师需要一个能将数据“玩转”的交互式系统。如此一来,就可以非常方便快捷地浏览数据,建立分析模型。Dremel系统主要有以下特点:
1.Dremel是一个大规模系统 在一个PB级别的数据集上面,将任务缩短到秒级,无疑需要大量的并发。磁盘的顺序读速度在100MB/S上下,那么在1秒内处理1TB数据,意味着至少需要有1万个磁盘的并发读。Google公司一向是用廉价机器办大事的好手。但是机器越多,出问题概率越大,如此大的集群规模需要有足够的容错考虑,保证整个分析的速度不被集群中的个别慢(坏)节点影响。
2.Dremel是MR交互式查询能力不足的补充 和MapReduce一样,Dremel也需要和数据运行在一起,将计算移动到数据上面。所以它需要GFS这样的文件系统作为存储层。在设计之初,Dremel并非是MapReduce的替代品,它只是可以执行非常快的分析,在使用的时候,常常用它来处理MapReduce的结果集或者用来建立分析原型。
3.Dremel的数据模型是嵌套(nested)的,互联网数据常常是非关系型的。Dremel还需要有一个灵活的数据模型,这个数据模型至关重要。Dremel支持一个嵌套(nested)的数据模型,类似于Json。传统的关系模型,由于不可避免地有大量的Join操作,在处理如此大规模的数据时,往往有心无力。
4.Dremel中的数据是用列式存储的 使用列式存储、分析的时候,可以只扫描需要的那部分数据,减少CPU和磁盘的访问量。同时列式存储是压缩友好的,使用压缩,可以综合CPU和磁盘,发挥最大的效能。对于关系型数据,如果使用列式存储,我们都很有经验。但是对于嵌套(nested)的结构,Dremel也可以用列存储,非常值得我们学习。
5.Dremel结合了Web搜索和并行DBMS的技术 首先,它借鉴了Web搜索中“查询树”的概念,将一个相对巨大复杂的查询,分割成较小、较简单的查询。大事化小,小事化了,能并发的在大量节点上跑。其次,和并行DBMS类似,Dremel提供了一个SQL-like的接口,就像Hive和Pig。
(二)Google Dremel查询方式
Dremel可以使用一种SQL-like的语法查询嵌套数据。由于Dremel的数据是只读的,并且会密集地发起多次类似的请求,所以可以保留上次请求的信息,优化下次请求的explain过程。explain是一个树状架构,当Client发出一个请求,根节点收到请求,根据metadata,将其分解到枝叶,直到位于数据上面的叶子(Server)。他们扫描处理数据,又不断汇总到根节点。
举个例子:对于请求:
SELECT A,COUNT(B)FROM T GROUP BY A
根节点收到请求,会根据数据的分区请求,将请求变成可以拆分的样子。原来的请求会变为:
SELECT A,SUM(c)FROM(R1 UNION ALL...Rn)GROUP BY A
R1,…RN是T的分区计算出的结果集。越大的表有越多的分区,越多的分区可以越好地支持并发。
然后再将请求切分,发送到每个分区的叶子(Server)上面去,对于每个Server
Ri=SELECT A,COUNT(B)AS c FROM Ti GROUP BY A
结构集一定会比原始数据小很多,处理起来也更快。根服务器可以很快地将数据汇总。具体的聚合方式,可以使用现有的并行数据库技术。
Dremel是一个多用户的系统。切割分配任务的时候,还需要考虑用户优先级和负载均衡。对于大型系统,还需要考虑容错,如果一个Server出现故障或变慢,不能让整个查询受到明显影响。
通常情况下,每个计算节点执行多个任务。例如,技巧中有3000个Server,每个Server使用8个线程,可以有24 000个计算单元。如果一张表可以划分为100 000个区,就意味着大约每个计算单元需要计算5个区。在执行的过程中,如果某一个计算单元太忙,就会另外启动一个来计算,这个过程是动态分配的。
对于GFS这样的存储,一份数据一般有3份拷贝,计算单元很容易就能分配到数据所在的节点上,典型的情况可以到达95%的命中率。
Dremel还有一个配置,就是在执行查询的时候,可以指定扫描部分分区,比如可以扫描30%的分区,在使用的时候,相当于随机抽样,加快查询。
(三)Google Dremel与Hadoop
Dremel不是用来替代MapReduce,而是和其更好地结合。Hadoop的Hive、Pig无法提供及时的查询,而Dremel的快速查询技术可以给Hadoop提供有力的补充。同时Dremel可以用来分析MapReduce的结果集,只需要将MapReduce的OutputFormat修改为Dremel的格式,就可以几乎不引入额外开销,将数据导入Dremel。使用Dremel来开发数据分析模型,MapReduce来执行数据分析模型。
Hadoop的Hive、Pig现在也有了列存储的模式,架构上和Dremel接近。但是无论存储结构还是计算方式都没有Dremel精致。对Hadoop实时性的改进一直是个热点话题。要想在Hadoop中复制一个Dremel,并且相对现有解决方案有突破,Hadoop自身需要进行一些改进。一个是HDFS需要对并发细碎的数据读性能有大的改进,HDFS需要更加的低延迟。另外,Hadoop需要不仅仅支持MapReduce这一种计算框架。其他部分,Hadoop都有对应的开源组件。

三、MPP DB技术

(一)MPP数据库的概念与分类
1.MPP数据库的定义
MPP,即massive parallel processing,是系统架构角度的一种服务器分类方法。根据系统架构,商用服务器主要分为三类,对称多处理器结构(symmetric multi-processor,SMP)、非一致存储访问结构(non-uniform memory access,NUMA),以及海量并行处理结构(massive parallel processing,MPP)。
MPP数据库是指架构于MPP服务器之上的数据仓库环境。因为典型的数据仓库环境具有大量复杂的数据处理和综合分析,具有很高的I/O处理能力,并且存储系统需要足够的I/O宽带与之匹配,而MPP服务器架构具有非常优越的并行处理能力,适合复杂的数据综合分析与处理环境,其节点互联网络的I/O性能非常突出,所以MPP架构非常适合构建大型数据仓库。当然,这也需要借助支持MPP技术的关系型数据库系统来屏蔽节点之间负载平衡与调度的复杂性。
2.MPP架构分类
MPP架构非常复杂,需要数据库系统来屏蔽节点间的负载平衡和调度复杂度,所以数据库架构的设计非常重要。在数据库架构设计中,主要分为Share Disk和Share Nothing(表5-2)。
表5-2 Share Disk和Share Nothing对比表
Sharding架构其实就是指Share Nothing架构,它把某张表从物理存储上水平分割,并分配给多台服务器(或多个实例),每台服务器可以独立工作,具备共同的Schema,如MySQL Proxy和Google的各种架构,只需要增加服务器数量就可以增加处理能力和容量。
(二)常见的MPP数据库架构
1.Greenplum架构
(1)原理分析:
最早采用MPP架构的是Teradata数据库,整体上采用Share Nothing架构进行组织。Greenplum数据库在开源的PostgreSQL的基础上采用了MPP架构,作出了性能非常强大的关系型分布式数据仓库。为了兼容Hadoop生态,又推出了HAWQ,分析引擎保留了Greenplum的高性能引擎,下层存储不再采用本地硬盘而改用HDFS,规避本地硬盘可靠性差的问题,同时融入Hadoop生态。Greenplum采用Share Nothing架构(MPP),主机、操作系统内存、存储都是自我控制的,不存在共享。该架构主要由Master Host(主节点)、Segment Host(工作节点)、Interconnect(内部通信)三大部分组成。Greenplum主要优点是大规模的并行处理能力。
1)大规模存储:
Greenplum数据库通过将数据分布到多个节点上来实现规模数据的存储。数据库的瓶颈经常发生在I/O方面,数据库的诸多性能问题最终总能归咎到I/O上。Greenplum采用分而治之的方法,将数据规律地分布到节点上,充分利用Segment主机的I/O能力,以此让系统达到最大的I/O能力(主要是带宽),如图5-8所示。在Greenplum中,每张表都是分布到所有节点上的。Master Host首先通过对表的某列或多列进行Hash运算,然后根据Hash结果将表的数据分布到Segment Host中。在整个过程中,Master Host中不存在任何用户数据,只是对客户端进行访问控制和存储表分布逻辑的元数据。
图5-8 Greenplum存储架构
2)并行处理:
Greenplum的并行处理主要体现在外部表并行装载、并行备份恢复与并行查询处理三个方面。数据的并行装载主要采用外部表或者Web表方式,通常情况下通过gpdist程序来实现,如图5-9所示。
Gpfdist程序能够以370MB/s的速度装载TEXT格式文件,以200MB/s的速度装载CSV格式文件。在ETL宽带为1GB的情况下,可以同时运行3个gpfdist程序装载TEXT格式文件,或者同时运行5个gpfdist程序装载CSV格式文件。例如,图5-9中采用两个gpfdist程序进行数据装载。可以根据实际环境,通过配置postgresql.conf参数文件来优化装载性能。查询性能的强弱往往由查询优化器的水平来决定,Greenplum主节点负责解析SQL与生成执行计划。
(2)优势与缺点:
Greenplum的高性能得益于其良好的体系架构。在MPP架构中,每个SMP节点也可以运行自己的操作系统、数据库等。每个节点内的CPU不能访问另一个节点的内存。节点之间的信息交互是通过节点互联网络实现的,这个过程一般称为数据重分配(data redistribution)。与传统的SMP架构明显不同,通常情况下,MPP架构因为要在不同处理单元之间传送信息,所以它的效率要比SMP差一点,但这也不是绝对的,因为MPP架构不共享资源,因此,对它而言,资源要比SMP多,当需要处理的事务达到一定规模时,MPP的效率要比SMP高。这需要视通信时间占用计算时间的比例而定,如果通信时间比较多,那么MPP架构就不占优势;相反,通信时间比较少,那么MPP可以充分发挥资源的优势,达到高效率。在当前使用的OTLP程序中,用户访问一个中心数据库,如果采用SMP架构,其效率要比采用MPP架构高得多。MPP架构在决策支持和数据挖掘方面有明显优势,可以这样说,如果操作相互之间没有什么关系,处理单元之间需要进行的通信比较少,那么采用MPP架构较好,相反就不合适了。
图5-9 Greenplum并行处理
(3)适用场景:
Greenplum数据引擎是为新一代数据仓库和大规模分析处理而建立的软件解决方案,其最大的特点是不需要高端的硬件支持仍然可以支撑大规模的高性能数据仓库和商业智能查询。在数据仓库、商业智能的应用上,尤其在海量数据的处理方面Greenplum表现出极其优异的性能。
传统数据库侧重交易处理,关注的是多用户的同时的双向操作,在保障即时性的要求下,系统通过内存来处理数据的分配、读写等操作,存在IO瓶颈。分析型数据库是以实时多维分析技术作为基础,对数据进行多角度的模拟和归纳,从而得出数据中所包含的信息和知识。Greenplum虽然是关系型数据库产品,但是它具有查询速度快、数据装载速度快、批量DML处理快的主要特点,而且性能可以随着硬件的添加呈线性增加,拥有非常良好的可扩展性。因此,Greenplum主要适用于面向分析的应用,比如构建企业级ODS/EDW、数据集市等。在国内,阿里巴巴(中国)网络技术有限公司,从2008年开始引入Greenplum,将原有的Oracle RAC迁移到Greenplum上,作为数据仓库的计算中心,其中一个应用就是通过分析用户的网络点击日志进行产品的关联分析。支付宝在2008年也引入了Greenplum数据库作为数据中心。国内还有很多银行也引入了Greenplum作为基础的数据平台等。在TB级的数据仓库的OLAP应用中,Greenplum在易用性和性能方面有着很大的优势。
2.DB2 DPF架构
(1)原理分析:
IBM推出的ISAS(IBM smart analytics system)一体机解决方案里面装载的DB2的DPF(database partitioning feature)版本,采用的也是MPP架构,每个数据库都有独立的日志、引擎、锁、缓存管理。服务器之间通过万兆交换机交换数据。服务器内部通过share memory实现相互访问。服务器为16核,每个核对应8GB内存和一个RAID组。
1)分区技术:
在MPP架构中解决了各个节点的并行处理问题。Greenplum和DB2 DPF都采取了同样的思路——表分区,就是将一张完整的表,通过Hash算法,尽量均衡地分布在不同的节点上。
2)数据装载:
Greenplum Master节点只承担少量的控制功能,以及和客户端的交互,完全不承担任何计算。DB2 DPF装载必须由admin节点来完成,通过admin节点上的多进程对数据进行分发。装载需要消耗一定的性能。与之相反的是,Greenplum在进行数据装载时,不是一般想象的存在一个中心数据分发节点,而是所有节点同时读取数据,然后根据Hash算法将属于自己的数据留下,将其他节点的数据通过网络直接传送给它们,所以数据装载的速度非常快。
3)数据压缩技术:
DB2 DPF压缩基于表级别,目前不能指定压缩级别。提供类似WinZip的压缩级别。ISAS压缩的特点是不仅对数据进行压缩,索引和临时表也会自动压缩。Greenplum支持两种压缩算法:一种是ZLIB,压缩比较高,提供1~9个级别,数字越大,压缩比越高;另一种是QUICKLZ,其压缩比较小,相应的CPU负荷比较低。4.2版本以后提供一种新的、基于列的压缩算法RLE,提供基于列级别的压缩。
4)在线扩容:
二者都支持在线扩容,扩容时,表数据需要重新分布。在进行表重分布时,是一张一张表进行的。正在进行数据重分布的表不能加载数据。这点类似于Greenplum,Greenplum在进行数据重分布时,正在重分布的表不能读写。另外,Greenplum会自动去掉唯一性限制,所以在进行表重分布时,遇到重估的行不会报错,所以可能导致ETL出错。
DPF是稳定的特性,通过它可以将数据库分成多个数据库分区。每个数据库分区有它自己的一组计算资源,包括CPU和存储。MDC是在DB2 Version 8中引入的,通过它可以在物理上将在多维上具有类似值的行聚合在一起放在磁盘上。这种聚合能为常见分析性查询提供高效的I/O,提高检索数据的效率。TP是在DB2 9中引入的,与MDC类似,它也可以将具有近似值的行存储在一起。TP的优势在于为表添加或删除大量数据这个方面,即转入和转出。
数据库分区、表分区和MDC能同时应用在一个设计中。要部署大型应用程序,最好在同一个数据库设计中实现数据库分区、表分区和MDC,以满足应用的多样化需求。可以应用数据库分区获得扩展性,并确保在逻辑分区之间均匀分布数据,表分区可以方便查询分区消除和数据转出,MDC可以用来提高查询性能和方便转入数据。
(2)适用场景:
DB2的并发性非常好,金融机构用的比较多。
3.Sysbase IQ架构
(1)原理分析:
Sysbase IQ在16.0版本以前都采用Share Everything架构,所有的数据都存放在一个共享的SAN存储中,和Oracle RAC的架构类似。从IQ 16.0 SP10开始引入Share Nothing Multiplex/MPP的支持。“Share Nothing Multiplex”是一个在大数据环境下针对大规模并行处理(massively parallel processing,MPP)的存储和处理架构。在这个存储架构下,主数据(primary data)存储在一组节点中的直插式存储(direct-attached storage,DAS)设备集合中,而不是存储在一个共享存储区域网络设备中(SAN存储设备中)。
Share Nothing Multiplex的优势主要有以下几点:
1)I/O扩展能力提升:
此前的IQ Multipex采用Share Disk存储架构来存放数据,当节点增加、数据量不断增长时,共享存储成为瓶颈。Share Nothing Multiplex采用分布式存储技术,极大地提升了I/O扩展能力。每个节点中的本地DAS存储设备能够实现比共享SAN存储设备更高的I/O性能。IQ 16.0 SP10引入的Share Nothing storage类似于Hadoop的HDFS,使用每个节点自己本地的存储存放用户数据的子集。在IQ Multiplex中,可以同时使用Share Nothing dbspaces;在一个物理集群中,Share Nothing Multiplex和Share Disk Multiplex可以同时存在。
2)提供更强的数据保护:
每个节点中的DAS设备可以指定镜像文件,通过镜像技术能够提升数据的高可用性。这与Hadoop的HDFS也有一些类似,都是通过数据冗余技术来避免数据损坏、丢失。
3)存储设备管理变得相对简单:
管理IQ Multiplex共享存储设备时,需要管理裸设备。Share Disk存储结构要求每个节点上的设备路径一定要相同,并且指向一定要正确。当节点数量比较多或者共享裸设备数量比较多时,会造成管理负担,并且容易出错。IQ 6.0 SP10的Share Nothing Multiplex采用了分布式存储架构,在每个节点上可以使用裸设备,也可以使用文件系统中的文件。
4)数据库备份的变化:
由于采用了节点本地的设备镜像技术,因而IQ Share Nothing Multiplex的数据保护能力得到了进一步的提升(类似于Hadoop HDFS采用的数据冗余技术)。对于Share Nothing Multiplex来说,数据库备份被分解到各个节点,每个节点只负责备份自己拥有的本地设备上的数据,并且节点之间可以并行备份。这降低了原来Share Disk Multiplex备份时对于备份设备的压力。
(2)优势与局限性
1)列存储:
IQ以列存储数据,而不是行,这与其他所有关系型数据库引擎广泛使用的存储方式相反。在其他关系型数据库内核中,数据库的一张表典型表示为一条数据库页链,每个数据页中有一行或多行数据记录。在数据仓库应用中,从查询性能的观点出发,这种存储方式是所有可能的数据存储方式中最不可取的。在IQ中,每张表是一组相互独立的页链的集合,每条页链代表表中的一列。如果有100张表,就将有100条相互独立的页链,每一列都有一条页链与之相对应,而不像其他数据库引擎,一张表对应一条页链。列存储所固有的优越性在于:大多数数据仓库应用的查询只关心表中所有列的一个很小的子集,从而可以以很少的磁盘I/O得到查询结果。
2)数据压缩:
Sysbase IQ使用了数据压缩。这是由于数据按列存储时,相邻的字段值具有相同的数据类型,其二进制值的范围通常也要小的多,所以压缩更容易,压缩比例更高。Sybase IQ对列存储的数据通常能得到大于50%的压缩。更大的压缩比例,加上大页面I/O,使得Sybase IQ获得优良的查询性能的同时,坚守了存储空间的需求。
3)查询效果瞬间响应:
IQ通过列存储、革命性的位图索引方法及智能的动态访问技术实现了更快的查询响应速度,比传统的数据库查询速度提高10~1000倍。

四、SQL on Hadoop

随着数据库种类的不断增多,使得各类数据都能存在数据库汇总,这对于大数据查询和分析都大有裨益。但是随之产生了另一个问题:传统的SQL语言不能应对目前的非关系型数据库,而传统的数据仓库厂家又对Hadoop了解不深,对Hadoop不能很快适应,这就使得SQL on Hadoop技术发展起来。SQL on Hadoop是直接建立在Hadoop上的SQL查询,保证了Hadoop性能,以及巧用了SQL的灵活性。SQL基本操作是把传统的SQL语言进行中间转换后再操作,并且它能够让企业把信息管理能力从结构化数据延伸到非结构化数据。
(一)Hive:基本的Hadoop分析
Hive是建立在Hadoop上的数据仓库基础构架,提供了一个被称为Hive查询语言(简称HiveQL或HQL)的SQL方言,来查询存储在Hadoop集群中的数据。HQL不仅允许了解SQL的用户查询数据,也允许了解MapReduce开发者的开发自定义的Mpper、Ruducer来处理内建的Mapper和Reducer完成不了的复杂的分析工作。Hive提供工具进行数据提取、转化、加载(ETL),一种存储、查询和分析存储在Hadoop的大规模数据的机制。Hive和数据库相比较见表5-3。
表5-3 Hive和数据库相比较
Hive在大规模数据集上不能够实现低延迟快速查询,这是由于Hive是基于静态批处理的Hadoop构建的,而Hadoop的延迟和开销一般较高。Hive把用户的HiveQL语句经解释器转换为MapReduce作业提交给Hadoop集群,之后Hadoop监控作业执行过程,返回作业执行结果给用户。
1.Hive的技术架构
Hadoop和MapReduce是Hive架构的根基。Hive架构包括如下组件:CLI(Command Line Interface)、JDBC/ODBC、Thrift Server、Web GUI、MetaStore和Driver(Complier、Optimizer和Executor)。这些组件可以分为两大类:服务端组件和客户端组件,如表5-4、图5-10所示。
表5-4 Hive组件定义及分类
图5-10 Hive的架构
2.Hive的数据模型
(1)Hive数据库:
与传统数据库相似,在第三方数据库里Hive数据库实际是一张表。
(2)内部表(table):
Hive的内部表的概念和关系型数据库中类似。每个内部表存放数据时,在Hive中都有一个相应的目录。在删除表时,元数据与数据都会被删除。内部表创建和数据加载是两个过程,但能够在同一个语句完成,在加载数据时,实际数目会被移动到数据仓库目录中,之后会直接在数据仓库目录中完成对数据的访问。
(3)外部表:
外部表指向HDFS中已有的数据,它能够创建内部表和分区。它在元数据的组织上和内部表是一样的,但在实际数据存储时有很大的差异。外部表只有一个过程,加载数据和创建表同时完成。实际数据存储在Location后面指定的HDFS路径中,实际数据不会移动到数据仓库目录汇总。删除一个Exter Table时,也仅删除这个链接。
(4)分区(partition):
分区与数据库中的分区列的密集索引类似,但Hive中分区的组织方式和数据库中不同。在Hive中,表中的一个分区与表下的一个目录对应,并且所有分区的数据都存储在这个目录中。
(5)桶(bucket):
桶是将表的列通过Hush算法进一步分解成不同的文件存储,目的是为了并行,每个桶对应一个文件。
(6)Hive视图:
Hive视图类似于传统数据库的视图。视图是只读的,它基于的基本表若发生改变,数据增加不会影响视图的呈现,但是删除后,会出现故障。
Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供完整的SQL查询功能,可以将SQL语句转换为MapReduce任务运行。Hive支持HSQL,这是一种类SQL。也正是由于这种机制,导致Hive最大的缺点是慢。Map/Reduce调度本身只适合批量、长周期任务,类似查询这种要求短、平、快的业务,代价太高。
(二)实时互动的SQL:Impala和Drill
1.Impala
Impala是Cloudera公司的实时查询开源项目,提供SQL语义,能查询存储在Hadoop的HDFS和Hbase中的PB级大数据。Impala可以看成Google Dremel架构和MPP结构的混合体。根据官网的产品实测,Impala比原来基于Mapreducede Hive QL查询速度提高了3~90倍,这也是它最大的优点。
Impala最初是参照Dremel系统进行设计的。Dremel实现了嵌套型数据的列存储,并使用了多层查询树,使得任务可以在数千个节点上并行执行和聚合结果,这使得它在大数据上实现交互式的响应速度。
Impala拥有全局统一的元数据存储和调度,接口采用的是比较通用的HiveSQL和JDBC/ODBC等。Impala查询是完全的分布式并行处理,它的分布式引擎由Query Planner、Query Coordinator、Query Executor三部分组成,它可以在HDFS或Hbase上使用Select、Jion和统计函数直接进行数据查询,很大程度上减少了延时。由于Impala可以在HDFS或HBASE的存储和元数据上直接查询,所以它拥有Hadoop的灵活性、横向扩充性和低成本的特点。Impala架构如图5-11所示。
图5-11 Impala架构
Impala是由Statestore和Impalad两个基本单元组成的。Statestore用来监控集群中各个节点的健康状况,提供节点注册、错误检测等功能。Impala在每个节点运行后台服务Impalad,Impalad用来响应外部请求,并完成实际的查询处理。Impalad主要包含Query Planner、Query Coordinator和Query Exec Engine三个模块。Query Planner接收来自SQL APP和ODBC的查询,然后将查询转换为许多子查询。Query Coordinator负责将这些子查询分发到各个节点上,由各个节点上的Query Exec Engine负责子查询的执行,最后返回子查询的结果,这些中间结果经过聚集之后最终返回给用户。
Impala执行一个查询的过程为:客户端发送SQL查询请求,Impalad作为协调者接受请求,对请求进行解析、语义分析,将解析结果(一个解析树)和语义分析结果(请求的全局信息,如表名、列名、等价类等)打包后发送给作为请求计划者(QueryPlanner)的Impalad,请求计划者则根据输入信息构建一个查询计划,构建过程分为两步,第一步是将解析树翻译成一个不可执行的单节点计划树,这棵树的节点可以是HDFS扫描、Hash Join、Hash Aggregation等,这一步的主要工作是将谓词分配给最底层的节点、根据等价类推断谓词、对表分区进行剪枝、基于开销进行计划优化等。第二步是将第一步的结果作为输入,以最小化数据移动和最大化扫描本地化为目标生成一个分布式执行计划,例如,对于Join操作,有广播和分区两种策略,根据对网络传输开销的估计,计划会选择开销较小的一种策略,而对于聚集操作,则会先在一个节点上做完预聚集(pre-aggregation)后,再发送到其他节点做合并聚集(merge-aggregation)。在生成分布式计划树后,根据数据交换边界对树进行划分,划分完的每个部分作为一个计划分片,它是Impala后台的执行单元。然后,计划分片会被分发到集群中并执行,查询的结果会汇聚到协调者,再由协调者返回给客户端。
Impala采用的是与Hive相同的元数据、SQL语法、ODBC驱动程序和用户接口(Hue Beeswax),可以在使用ClouDera Hadoop(CDH)产品时,与批处理和实时查询的平台是统一的。
2.Drill
Apache Drill是一个能够对大数据进行实时分布式查询的引擎,目前它已经成为Apache的顶级项目,Drill是开源版本的Google Dremel。所有这些数据都可以像使用传统数据库的针对表查询一样进行快速实时的查询。Drill是一个专门用于互动分析大型数据集的分布式系统,采用标准的SQL来进行大数据的互动分析。客户端使用任何SQL工具输入一个SQL语句后,会通过Drill的ODBC驱动器连接到Drill的驱动,并经过SQL查询的Parser进行解析,经Query Planner进行计划,在节点的Drillbit上进行执行,多个Drillbit共同完成大规模的查询执行。查询过程与其核心模块如图5-12、图5-13所示。
图5-12 Drillbit查询过程
图5-13 Drillbit核心模块
Drill的体系架构包含4层:①通过解析用户查询来构建执行计划的查询语言;②执行查询计划器所生成的物理计划的低时延分布式执行引擎;③支持各类数据格式嵌套的数据格式;④支持各类数据源的可扩展,它初始关注Hadoop。Drill由于本身特性,具有查询快、体系结构开放、现代化等特点。交互式分析作为基于Map-reduce技术的批处理分析的良好补充,在交互式分析的使用案例中,一般的做法本质上与Drill类似,都是去构建执行引擎,或者说去建构一些数据切片,并能够快速地串起来,最终得到分析结果。

五、数据仓库

(一)数据仓库的定义及特点
数据仓库(data warehouse)是在企业管理和决策中集成的、面向主题的、与时间相关的、不可修改的数据集合。
数据仓库主要功能是组织透过资讯系统的联机事务处理(OLTP)经年累月所累积的大量资料,透过数据仓库理论所特有的资料储存架构,作有系统的分析整理,有利于各种分析方法如联机分析处理(OLAP)、数据挖掘(data mining)的进行,进而支持如决策支持系统(DSS)、主管资讯系统(EIS)的创建,帮助决策者快速有效地分析出有价值的资讯。首先,数据仓库于企业数据库并不相同,企业数据库只是数据的集合,而数据仓库用于企业战略性决策分析,处理面向分析型数据。其次,数据仓库将多个异构数据有效地清洗、转换,加载集成管理人员需要的格式,然后按照既定的主题进行重组。
数据仓库有以下几个特点:
1.面向主题
普通的操作型数据库是以数据集合面向事物处理,一般以应用程序为中心、底层组织存放的。数据仓库以一定的主题进行组织,面向主题的数据仓库可以依据主题对数据分析对象进行高层次的稳定一致性操作,从而得到企业决策人员所得到的统一、准确的多维度数据,通常一个主题与多个信息系统密切相关。
2.集成
数据仓库的集成性指的是数据仓库中的数据往往是分散且独立的,需要对数据进行抽取、清洗、组合、转换等一系列操作。统一数据的异构性,消除数据中的差错值、矛盾值,然后对数据进行重组,使得数据仓库可以得出主题域所需要的数据,达到企业全局的一致性。
3.相对稳定
数据仓库数据的稳定性指的是,数据仓库一般的操作是加载、刷新、查询,很少像操作型数据库那样需要实时更新数据。仓库存放的数据一般是历史数据的集合,所以在加载、转化、集合好历史数据后,基本都是查询工作,数据仓库可以保证稳定性,实现企业的决策分析。
4.时间特性
操作型数据库系统提供的数据一般是指包含某一个时间段的数据,反映的是当前信息,而数据仓库依据主题域预测出未来数据的变化趋势及数据架构模式。决策者通过直观的数据变化趋势分析企业的发展轨迹和未来的方向,所以说数据仓库的作用不仅是提供数据,还要给决策者提供一些策略上的建议等。
数据仓库的数据源一般来自操作系统,也就是说,必须在某个时点从操作型系统获取数据并将其导入数据仓库,这个过程就是通常所说的抽取(extract)、转换(transform)和装载(load)过程,简称ETL过程。之所以不直接在操作型系统上执行分析查询,而是从操作型系统抽取数据,最主要有以下两个原因:
1.在操作型系统上直接执行分析查询会使业务系统受到影响,很可能使其变慢甚至宕机。
2.在操作型系统中很可能查不到分析所需要的数据。出于性能的考虑,操作型系统一般都不会保留很长的历史记录,而只是保留近期活跃的数据。数据仓库理论上应该保留所有决策需要的数据,即除了活跃数据外,还应该包含大量的历史归档数据。
转换是保证数据仓库数据一致和性能优良的关键步骤。操作型数据大多是分散在多个业务系统之中,彼此间缺乏联系,同一数据在系统间还可能存在二义性,很难成为对于整个企业一致的数据视图。对数据仓库的操作,具有典型的大数据量、低并发、绝大多数是读操作的特点。基于以上两个原因,从操作型系统抽取来的原始数据要经过一系列的数据清洗、加工和转换,使其成为一致的便于查询和使用的格式。这些转换包括数据类型转换、日期时间标准化、把规范化模式逆规范化为星型模式等。
装载操作实际上就是把转换后的数据导入数据仓库的表中,给下游的数据集市、OLAP系统或BI系统准备好可供查询的数据。
(二)数据仓库的作用
1.仓库提供加强的商业智能
商业智能可以防止管理人员和企业高管发生误操作,管理人员可以不用凭借直觉作出决策。商业智能的数据仓库模块可以作用于多个业务流程中,如市场划分、财务预测、销售管理、市场预测、人群受众预测、互联网投放预算等。
2.数据仓库可提供报表和图表
数据仓库可以提供不同事务系统的多维度报表和图表功能,其中数据来自不同的事务处理系统。业务用户可以快速地从多个数据源得到一个数据趋势图,保证了报表和图表反映的是整个企业的完整、一致的信息。业务人员可节约大量时间和资金直接观测到数据,生成报表。
3.数据仓库能够多维度分析
数据仓库可以把元数据清洗、转化、汇总,然后输出多维数据报表,简化了数据的分析处理逻辑,并从不同维度的角度观察同一组数据的展示方式,从而有效帮助业务人员得到数据的多维展示效果。在广告DSP平台里,通常需要对时间、地域、人群属性、兴趣等多维度进行分析。应用分析可以在查询中对不同数据进行各种维度的组合比较和计算。
4.数据仓库是数据挖掘的关键
数据挖掘就是对已有的数据进行识别,对未来的数据发展情况作出预测。由于数据仓库能够实现企业的全局性、准确性和一致性,所以在数据仓库的基础上进行数据挖掘,一方面是对企业业务目前运营状况的总结,还能够分析和预测业务未来的发展轨迹和转型方向。
(三)数据仓库系统的框架
数据仓库系统(data warehouse system,DWS)就是对仓库中的原始数据进行一系列抽取、清洗、转换、汇总、集合的操作。当数据集合好之后,需要借用层次较好的工具对数据进行更新、使用、加载等,用以支持数据仓库的管理。图5-14就是数据仓库的系统框架。
图5-14 数据仓库的系统框架
1.源数据
源数据是数据仓库系统的根基,构建整个系统的数据源,包括整个企业需要的内外部信息。内部信息包括存放于关系型数据库管理系统中各种业务分析数据、操作记录日志和计划决策文件等。外部信息则包括竞品分析文档、市场规模、法律条规、行业需求条例的信息。
2.仓库管理
仓库管理通过一系列工具做到对数据的归档、安全、备份、维护等操作。在确定数据仓库信息需求后,建模分析,然后确定源数据到目标数据仓库的抽取、清洗、加载、计算等一系列操作,最后通过主题域将不同维度数据划分到相应合理分配的物理存储结构中。
3.数据分析工具
数据分析工具提供实际决策问题所需要的各种工具,如用户查询报表工具、数据管理工具、实时线上分析工具等,以满足决策支持系统的根本要求。
(何彩升 孙琳)

参考文献

1.J Dean SG.MapReduce:Simplified Data Processing on Large Clusters.Communications of the ACM,2008,51:13.
2.Valiant LG.A bridging model for parallel computation.Comm Acm,1990,33:103-111.
3.Library WPBulk synchronous parallel.(2011).
4.Reed FJ,BenjaminZooKeeper:Distributed Process Coordination:O'Reilly Media,Inc.(2013).
5.Toshniwal A,Taneja S,Shukla A,et al.Storm@twitter.2014:147-156.
6.M.Kornacker AB,V.Bittorf,T.Bobrovytsky,et al.Impala:A modern,open-source sql engine for hadoop.CIDR,2015.
7.张志彬.数据仓库建设.广东科技,2004:77-79.
8.岑琴.商业智能BI在劳动密集型企业产品营销中的应用研究.浙江师范大学,2007.
9.陈艳羽.数据仓库技术在吉林省通信公司社区综合营销系统中的应用.东北师范大学,2008.