- Hadoop 2.X HDFS源码剖析
- 徐鹏
- 9874字
- 2024-01-05 17:29:21
1.2.1 Hadoop RPC接口
Hadoop RPC调用使得HDFS进程能够像本地调用一样调用另一个进程中的方法,并且可以传递Java基本类型或者自定义类作为参数,同时接收返回值。如果远程进程在调用过程中出现异常,本地进程也会收到对应的异常。目前Hadoop RPC调用是基于Protobuf实现的,我们会在第2章中介绍底层的具体实现,本节主要介绍Hadoop RPC接口的定义。Hadoop RPC接口主要定义在org.apache.hadoop.hdfs.protocol包和org.apache.hadoop.hdfs.server.protocol包中,包括以下几个接口。
■ ClientProtocol:ClientProtocol定义了客户端与名字节点间的接口,这个接口定义的方法非常多,客户端对文件系统的所有操作都需要通过这个接口,同时客户端读、写文件等操作也需要先通过这个接口与Namenode协商之后,再进行数据块的读出和写入操作。
■ ClientDatanodeProtocol:客户端与数据节点间的接口。ClientDatanodeProtocol中定义的方法主要是用于客户端获取数据节点信息时调用,而真正的数据读写交互则是通过流式接口进行的。
■ DatanodeProtocol:数据节点通过这个接口与名字节点通信,同时名字节点会通过这个接口中方法的返回值向数据节点下发指令。注意,这是名字节点与数据节点通信的唯一方式。这个接口非常重要,数据节点会通过这个接口向名字节点注册、汇报数据块的全量以及增量的存储情况。同时,名字节点也会通过这个接口中方法的返回值,将名字节点指令带回该数据块,根据这些指令,数据节点会执行数据块的复制、删除以及恢复操作。
■ InterDatanodeProtocol:数据节点与数据节点间的接口,数据节点会通过这个接口和其他数据节点通信。这个接口主要用于数据块的恢复操作,以及同步数据节点上存储的数据块副本的信息。
■ NamenodeProtocol:第二名字节点与名字节点间的接口。由于Hadoop2.X中引入了HA机制,检查点操作也不再由第二名字节点执行了,所以NamenodeProtocol我们就不详细介绍了。
■ 其他接口:主要包括安全相关接口(RefreshAuthorizationPolicyProtocol、RefreshUser MappingsProtocol)、HA相关接口(HAServiceProtocol)等。HA相关接口的实现和定义我们将在第3章的HA小节中介绍。
下面我们重点介绍ClientProtocol、ClientDatanodeProtocol、DatanodeProtocol、InterDatanode Protocol和NamenodeProtocol等接口的定义。
1.ClientProtocol
ClientProtocol定义了所有由客户端发起的、由Namenode响应的操作。这个接口非常大,有80多个方法,我们把这个接口中的方法分为如下几类。
■ HDFS文件读相关的操作。
■ HDFS文件写以及追加写的相关操作。
■ 管理HDFS命名空间(namespace)的相关操作。
■ 系统问题与管理相关的操作。
■ 快照相关的操作。
■ 缓存相关的操作。
■ 其他操作。
HDFS文件读操作、HDFS文件写与追加写操作,以及命名空间的管理操作,这三个部分都可以在FileSystem类中找到对应的方法,这些方法都是用来支持Hadoop文件系统实现的。对于系统问题与管理相关的操作,则是由DFSAdmin这个工具类发起的,其中的方法是用于支持管理员配置和管理HDFS的。而快照和缓存则都是Hadoop2.X中引入的新特性,ClientProtocol中也有对应的方法用于支持这两个新特性。当然,ClientProtocol中还包括安全、XAttr等方法,这部分不是重点,我们就不再详细介绍了。
(1)读数据相关方法
ClientProtocol中与客户端读取文件相关的方法主要有两个:getBlockLocations()和reportBadBlocks()。
客户端会调用ClientProtocol.getBlockLocations()方法获取HDFS文件指定范围内所有数据块的位置信息。这个方法的参数是HDFS文件的文件名以及读取范围,返回值是文件指定范围内所有数据块的文件名以及它们的位置信息,使用LocatedBlocks对象封装。每个数据块的位置信息指的是存储这个数据块副本的所有Datanode的信息,这些Datanode会以与当前客户端的距离远近排序。客户端读取数据时,会首先调用getBlockLocations()方法获取HDFS文件的所有数据块的位置信息,然后客户端会根据这些位置信息从数据节点读取数据块。ClientProtocol.getBlockLocations()方法的定义如下:
客户端会调用ClientProtocol.reportBadBlocks()方法向Namenode汇报错误的数据块。当客户端从数据节点读取数据块且发现数据块的校验和并不正确时,就会调用这个方法向Namenode汇报这个错误的数据块信息。ClientProtocol.reportBadBlocks()方法的定义如下:
(2)写/追加写数据相关方法
在HDFS客户端操作中最重要的一部分就是写入一个新的HDFS文件,或者打开一个已有的HDFS文件并执行追加写操作。ClientProtocol中定义了8个方法支持HDFS文件的写操作:create()、append()、addBlock()、complete()、abandonBlock()、getAddtionnalDatanodes()、updateBlockForPipeline()和updatePipeline()。
create()方法用于在HDFS的文件系统目录树中创建一个新的空文件,创建的路径由src参数指定。这个空文件创建后对于其他的客户端是“可读”的,但是这些客户端不能删除、重命名或者移动这个文件,直到这个文件被关闭或者租约过期。客户端写一个新的文件时,会首先调用create()方法在文件系统目录树中创建一个空文件,然后调用addBlock()方法获取存储文件数据的数据块的位置信息,最后客户端就可以根据位置信息建立数据流管道,向数据节点写入数据了。create()方法的定义如下:
append()方法用于打开一个已有的文件,如果这个文件的最后一个数据块没有写满,则返回这个数据块的位置信息(使用LocatedBlock对象封装);如果这个文件的最后一个数据块正好写满,则创建一个新的数据块并添加到这个文件中,然后返回这个新添加的数据块的位置信息。客户端追加写一个已有文件时,会先调用append()方法获取最后一个可写数据块的位置信息,然后建立数据流管道,并向数据节点写入追加的数据。如果客户端将这个数据块写满,与create()方法一样,客户端会调用addBlock()方法获取新的数据块。
客户端调用addBlock()方法向指定文件添加一个新的数据块,并获取存储这个数据块副本的所有数据节点的位置信息(使用LocatedBlock对象封装)。要特别注意的是,调用addBlock()方法时还要传入上一个数据块的引用。Namenode在分配新的数据块时,会顺便提交上一个数据块,这里previous参数就是上一个数据块的引用。excludeNodes参数则是数据节点的黑名单,保存了客户端无法连接的一些数据节点,建议Namenode在分配保存数据块副本的数据节点时不要考虑这些节点。favoredNodes参数则是客户端所希望的保存数据块副本的数据节点的列表。客户端调用addBlock()方法获取新的数据块的位置信息后,会建立到这些数据节点的数据流管道,并通过数据流管道将数据写入数据节点。addBlock()方法的定义如下:
当客户端完成了整个文件的写入操作后,会调用complete()方法通知Namenode。这个操作会提交新写入HDFS文件的所有数据块,当这些数据块的副本数量满足系统配置的最小副本系数(默认值为1),也就是该文件的所有数据块至少有一个有效副本时,complete()方法会返回true,这时Namenode中文件的状态也会从构建中状态转换为正常状态;否则,complete()会返回false,客户端就需要重复调用complete()操作,直至该方法返回true。
上面描述的5个方法都是在正常流程时,客户端写文件必须调用的方法。但是对于一个分布式系统来说,写流程中涉及的任何一个节点都有可能出现故障。出现故障的情况也是需要考虑在内的,所以ClientProtocol定义了abandonBlock()、getAdditionnalDatanode()、updateBlockForPipeline()以及updatePipeline()等方法,用于在异常情况下进行恢复操作。
客户端调用abandonBlock()方法放弃一个新申请的数据块。考虑下面这种情况:当客户端获取了一个新申请的数据块,发现无法建立到存储这个数据块副本的某些数据节点的连接时,会调用abandonBlock()方法通知名字节点放弃这个数据块,之后客户端会再次调用addBlock()方法获取新的数据块,并在传入参数时将无法连接的数据节点放入excludeNodes参数列表中,以避免Namenode将数据块的副本分配到该节点上,造成客户端再次无法连接这个节点的情况。
abandonBlock()方法用于处理客户端建立数据流管道时数据节点出现故障的情况。那么,如果客户端已经成功建立了数据流管道,在客户端写某个数据块时,存储这个数据块副本的某个数据节点出现了错误该如何处理呢?这个操作就比较复杂了,客户端首先会调用getAdditionalDatanode()方法向Namenode申请一个新的Datanode来替代出现故障的Datanode。然后客户端会调用updateBlockForPipeline()方法向Namenode申请为这个数据块分配新的时间戳,这样故障节点上的没能写完整的数据块的时间戳就会过期,在后续的块汇报操作中会被删除。最后客户端就可以使用新的时间戳建立新的数据流管道,来执行对数据块的写操作了。数据流管道建立成功后,客户端还需要调用updatePipeline()方法更新Namenode中当前数据块的数据流管道信息。至此,一个完整的恢复操作结束。
上面我们描述的都是在写数据操作时数据节点发生故障的情况,包括了数据流管道建立时以及建立后数据节点发生故障的情况。在写数据的过程中,Client节点也有可能在任意时刻发生故障,为了预防这种情况,对于任意一个Client打开的文件都需要Client定期调用ClientProtocol.renewLease()方法更新租约(关于租约请参考第3章中租约相关小节)。如果Namenode长时间没有收到Client的租约更新消息,就会认为Client发生故障,这时就会触发一次租约恢复操作,关闭文件并且同步所有数据节点上这个文件数据块的状态,确保HDFS系统中这个文件是正确且一致保存的。
如果在写操作时,名字节点发生故障该如何处理呢?这就要涉及HDFS的HA架构了,请读者参考第3章的HA部分。
(3)命名空间管理的相关方法
ClientProtocol中有很重要的一部分操作是对Namenode命名空间的修改。我们知道FileSystem类也定义了对文件系统命名空间修改操作的API(FileSystem类抽象了一个文件系统对外提供的API接口),HDFS则满足FileSystem类抽象的所有方法。表1-1总结了FileSystem API与ClientProtocol接口的对应关系。
表1-1 FileSystem API与ClientProtocol接口的对应关系
通过表1-1我们可以看出,ClientProtocol中涉及的命名空间管理的方法都有与之对应的HDFS文件系统API,且方法的名称和参数很相近。在这里我们以setReplication()为例,setReplication()方法在ClientProtocol中的定义是:
在FileSystem接口中的定义是:
可以看到,setReplication()方法在FileSystem和ClientProtocol中定义的方法名及参数都很接近,大部分情况下ClientProtocol接口方法定义的参数更多,可以很好地支持FileSystemAPI定义的操作。
(4)系统问题与管理操作
ClientProtocol中另一个重要的部分就是支持DFSAdmin工具的接口方法,DFSAdmin是供HDFS管理员管理HDFS集群的命令行工具。一个典型的dfsadmin命令如下所示,管理员可以添加不同的参数以触发HDFS进行相应的操作。
表1-2给出了ClientProtocol中定义的接口方法与dfsadmin命令参数之间的对应关系,我们会重点讲解几个比较重要的方法。
表1-2 ClientProtocol中定义的接口方法与dfsadmin命令参数之间的对应关系
续表
首先看一下setSafeMode()方法,这里涉及一个非常重要的概念——安全模式。安全模式是Namenode的一种状态,处于安全模式中的Namenode不接受客户端对命名空间的修改操作,整个命名空间都处于只读状态。同时,Namenode也不会向Datanode下发任何数据块的复制、删除指令。管理员可以通过dfsadmin setSafemode命令触发Namenode进入或者退出安全模式,同时还可以使用这个命令查询安全模式的状态。需要注意的是,刚刚启动的Namenode会直接自动进入安全模式,当Namenode中保存的满足最小副本系数的数据块达到一定的比例时,Namenode会自动退出安全模式。而对于用户通过dfsAdmin方式触发Namenode进入安全模式的情况,则只能由管理员手动关闭安全模式,Namenode不可以自动退出。dfsadmin setSafemode命令正是通过调用ClientProtocol.setSafeMode()方法实现的。setSafeMode()方法的定义如下:
了解了安全模式之后,我们来看看必须在安全模式中才能进行的两个操作。`-saveNamespace`用于将整个命名空间保存到新的fsimage文件中,并且重置editlog文件;而`-rollEdits`则会触发重置editlog文件的操作,关闭当前正在写入的editlog文件,开启一个新的editlog文件(fsiamge与editlog文件请参考第3章的fsimage小节)。
refreshNodes()方法会触发Namenode刷新数据节点列表。管理员可以通过include文件指定可以连接到Namenode的数据节点列表,通过exclude文件指定不能连接到Namenode的数据节点列表。每当管理员修改了这两个配置文件后,都需要通过`-refreshNodes`选项触发Namenode刷新数据节点列表,这个操作会造成Namenode从文件系统中移除已有的数据节点,或者添加新的数据节点(请参考第3章的数据节点管理小节)。
finalizeUpgrade()和rollingUpgrade()操作都是与Namenode升级相关的,管理员可以通过`-rollingUpgrade`选项触发Namenode进行升级操作。当Namenode成功地执行了升级操作后,管理员可以通过`-finalizeUpgrade`提交升级操作,提交升级操作会删除升级操作创建的一些临时目录,提交升级操作之后就不可以再回滚了(请参考第4章的Storage小节)。
对于其他方法,请读者参考表1-2中的说明,这里不再详细介绍了。
(5)快照相关操作
Hadoop 2.X添加了新的快照特性,用户可以为HDFS的任意路径创建快照。快照保存了一个时间点上HDFS某个路径中所有数据的拷贝,快照可以将失效的集群回滚到之前一个正常的时间点上。用户可以通过`hdfs dfs`命令执行创建、删除以及重命名快照等操作,ClientProtocol也定义了对应的方法来支持快照命令。
需要特别注意的是,在创建快照之前,必须先通过`hdfs dfsadmin -allowSnapshot`命令开启目录的快照功能,否则不可以在该目录上创建快照。表1-3给出了快照操作与ClientProtocol中相关方法的对应关系,请读者参考(快照相关内容我们会在第3章的快照小节中介绍)。
表1-3 快照操作与ClientProtocol中相关方法的对应关系
(6)缓存相关操作
HDFS 2.3版本添加了集中式缓存管理(HDFS Centralized Cache Management)功能。用户可以指定一些经常被使用的数据或者高优先级任务对应的数据,让它们常驻内存而不被淘汰到磁盘上,这对于提升Hadoop系统和上层应用的执行效率与实时性有很大的帮助。
这里涉及两个概念。
■ cache directive:表示要被缓存到内存的文件或者目录。
■ cache pool:用于管理一系列的cache directive,类似于命名空间。同时使用UNIX风格的文件读、写、执行权限管理机制。
表1-4总结了缓存相关命令与ClientProtocol方法之间的对应关系,请读者参考(缓存相关内容我们会在第3章的缓存管理小节中介绍)。
表1-4 缓存相关命令与ClientProtocol方法之间的对应关系
续表
(7)其他操作
安全相关以及XAttr相关命令,主要都是增加、删除以及List操作,这里就不再详细介绍了
2.ClientDatanodeProtocol
ClientDatanodeProtocol定义了Client与Datanode之间的接口。相比ClientProtocol,ClientDatanodeProtocol的定义简单很多,如图1-2所示。
图1-2 ClientDatanodeProtocol定义
ClientDatanodeProtocol中定义的接口可以分为两部分:一部分是支持HDFS文件读取操作的,例如getReplicaVisibleLength()以及getBlockLocalPathInfo();另一部分是支持DFSAdmin中与数据节点管理相关的命令。下面我们就看一下图1-2中所示9个方法的具体定义。
(1)getReplicaVisibleLength()
客户端会调用getReplicaVisibleLength()方法从数据节点获取某个数据块副本真实的数据长度。当客户端读取一个HDFS文件时,需要获取这个文件对应的所有数据块的长度,用于建立数据块的输入流,然后读取数据。但是Namenode元数据中文件的最后一个数据块长度与Datanode实际存储的可能不一致,所以客户端在创建输入流时就需要调用getReplicaVisibleLength()方法从Datanode获取这个数据块的真实长度。
(2)getBlockLocalPathInfo()
HDFS对于本地读取,也就是Client和保存该数据块的Datanode在同一台物理机器上时,是有很多优化的。Client会调用ClientProtocol.getBlockLocalPathInfo()方法获取指定数据块文件以及数据块校验文件在当前节点上的本地路径,然后利用这个本地路径执行本地读取操作,而不是通过流式接口执行远程读取,这样也就大大优化了读取的性能。
在HDFS 2.6版本中,客户端会通过调用DataTransferProtocol接口从数据节点获取数据块文件的文件描述符,然后打开并读取文件以实现短路读操作,而不是通过ClientDatanodeProtoco接口。客户端的短路读操作请参考第5章的文件短路读操作小节。
(3)refreshNamenodes()
在用户管理员命令中有一个`hdfs dfsadmindatanodehost:port`命令,用于触发指定的Datanode重新加载配置文件,停止服务那些已经从配置文件中删除的块池(blockPool),开始服务新添加的块池。块池的概念请参考第4章的Datanode逻辑结构小节。
这条命令底层就是由ClientDatanodeProtocol.refreshNamenodes()方法实现的,客户端会通过这个接口触发对应的Datanode执行操作。
(4)deleteBlockPool()
在用户管理员命令中还有一个与块池管理相关的`hdfs dfsadmin-deleteBlockPool datanode-host:port blockpoolId [force]`命令,用于从指定Datanode删除blockpoolId对应的块池,如果force参数被设置了,那么无论这个块池目录中有没有数据都会被强制删除;否则,只有这个块池目录为空的情况下才会被删除。需要注意的是,如果Datanode还在服务这个块池,这个命令的执行将会失败。要停止一个数据节点服务指定的块池,需要调用上面提到的refreshNamenodes()方法。
deleteBlockPool()方法有两个参数,其中blockpoolId用于设置要被删除的块池ID;force用于设置是否强制删除。
(5)getHdfsBlocksMetadata()
getHdfsBlocksMetadata()方法主要用于获取数据块是存储在指定Datanode的哪个卷(volume)上的,这个方法主要是为了支持DistributedFileSystem.getFileBlockStorageLocations()方法。关于卷(volume)的定义请参考第4章。
(6)shutdownDatanode()
shutdownDatanode()方法用于关闭一个数据节点,这个方法主要是为了支持管理命令`hdfs dfsadmin-shutdownDatanode <datanode_host:ipc_port> [upgrade]`。
(7)getDatanodeInfo()
getDatanodeInfo()方法用于获取指定Datanode的信息,这里的信息包括Datanode运行的HDFS版本、Datanode配置的HDFS版本,以及Datanode的启动时间。对应于管理命令`hdfs dfsadmin-getDatanodeInfo`。
(8)startReconfiguration()
startReconfiguration()方法用于触发Datanode异步地从磁盘重新加载配置,并且应用该配置。这个方法用于支持管理命令` hdfs dfsadmin-getDatanodeInfo-reconfigstart`。
(9)getReconfigurationStatus()
我们知道startReconfiguration()方法是异步地加载配置操作,所以HDFS提供了getReconfigurationStatus()方法用于查询上一次触发的重新加载配置操作的运行情况。对应于管理命令`hdfs dfsadmin-getDatanodeInfo-reconfigstartstatus`。我们可以看到getReconfigurationStatus()方法和startReconfiguration()方法对应的管理命令是一样的,只不过参数不同,一个是start,一个是status。
3.DatanodeProtocol
ClientProtocol和DatanodeProtocol都是由客户端发起调用的接口,下面我们介绍服务器间的接口。DatanodeProtocol是Datanode与Namenode间的接口,Datanode会使用这个接口与Namenode握手、注册、发送心跳、进行全量以及增量的数据块汇报。Namenode会在Datanode的心跳响应中携带名字节点指令,Datanode收到名字节点指令之后会执行对应的操作。要特别注意的是,Namenode向Datanode下发名字节点指令是没有任何其他接口的,只会通过DatanodeProtocol的返回值来下发命令。
DatanodeProtocol定义的方法如图1-3所示,我们可以将DatanodeProtocol定义的方法分为三种类型:Datanode启动相关、心跳相关以及数据块读写相关。下面将会分别介绍这三种类型的方法,以及Namenode向Datanode下发的指令。
图1-3 DatanodeProtocol定义
(1)Datanode启动相关方法
一个完整的Datanode启动操作会与Namenode进行4次交互,也就是调用4次DatanodeProtocol定义的方法。首先调用versionRequest()与Namenode进行握手操作,然后调用registerDatanode()向Namenode注册当前的Datanode,接着调用blockReport()汇报Datanode上存储的所有数据块,最后调用cacheReport()汇报Datanode缓存的所有数据块。
我们首先看一下versionRequest()方法。Datanode启动时会首先调用versionRequest()方法与Namenode进行握手。这个方法的返回值是一个NamespaceInfo对象,NamespaceInfo对象会封装当前HDFS集群的命名空间信息,包括存储系统的布局版本号(layoutversion)、当前的命名空间的ID(namespaceId)、集群ID(clusterId)、文件系统的创建时间(ctime)、构建时的HDFS版本号(buildVersion)、块池ID(blockpoolId)、当前的软件版本号(softwareVersion)等。Datanode获取到NamespaceInfo对象后,就会比较Datanode当前的HDFS版本号和Namenode的HDFS版本号,如果Datanode版本与Namenode版本不能协同工作,则抛出异常,Datanode也就无法注册到该Namenode上。如果当前Datanode上已经有了文件存储的目录,那么Datanode还会检查Datanode存储上的块池ID、文件系统ID以及集群ID与Namenode返回的是否一致。
成功进行握手操作后,Datanode会调用ClientProtocol.registerDatanode()方法向Namenode注册当前的Datanode,这个方法的参数是一个DatanodeRegistration对象,它封装了DatanodeID、Datanode的存储系统的布局版本号(layoutversion)、当前命名空间的ID(namespaceId)、集群ID(clusterId)、文件系统的创建时间(ctime)以及Datanode当前的软件版本号(softwareVersion)。名字节点会判断Datanode的软件版本号与Namenode的软件版本号是否兼容,如果兼容则进行注册操作,并返回一个DatanodeRegistration对象供Datanode后续处理逻辑使用。
Datanode成功向Namenode注册之后,Datanode会通过调用DatanodeProtocol.blockReport()方法向Namenode上报它管理的所有数据块的信息。这个方法需要三个参数:Datanode Registration用于标识当前的Datanode;poolId用于标识数据块所在的块池ID;reports是一个StorageBlockReport对象的数组,每个StorageBlockReport对象都用于记录Datanode上一个存储空间存储的数据块。这里需要特别注意的是,上报的数据块是以长整型数组保存的,每个已经提交的数据块(finalized)以3个长整型来表示,每个构建中的数据块(under-construction)以4个长整型来表示。之所以不使用ExtendedBlock对象保存上报的数据块,是因为这样可以减少blockReport()操作所使用的内存,Namenode接收到消息时,不需要创建大量的ExtendedBlock对象,只需要不断地从长整型数组中提取数据块即可。
Namenode接收到blockReport()请求之后,会根据Datanode上报的数据块存储情况建立数据块与数据节点之间的对应关系。同时,Namenode会在blockReport()的响应中携带名字节点指令,通知数据节点进行重新注册、发送心跳、备份或者删除Datanode本地磁盘上数据块副本的操作。这些名字节点指令都是以DatanodeCommand对象封装的,我们会在DatanodeCommand小节详细介绍名字节点指令以及DatanodeCommand对象。
blockReport()方法只在Datanode启动时以及指定间隔时执行一次。在这里间隔是由dfs.blockreport.intervalMsec参数配置的,默认是6小时执行一次。cacheReport()方法与blockReport()方法是完全一致的,只不过汇报的是当前Datanode上缓存的所有数据块。cacheReport()方法的定义如下:
(2)心跳相关方法
我们知道分布式系统的节点之间大多采用心跳来维护节点的健康状态。HDFS也是一样,Datanode会定期(由dfs.heartbeat.interval配置项配置,默认是3秒)向Namenode发送心跳,如果Namenode长时间没有接到Datanode发送的心跳,则Namenode会认为该Datanode失效。
ClientProtocol.sendHeartbeat()方法就是用于心跳汇报的接口,除了携带标识Datanode身份的DatanodeRegistration对象外,还包括数据节点上所有存储的状态、缓存的状态、正在写文件数据的连接数、读写数据使用的线程数等。
sendHeartbeat()会返回一个HeartbeatResponse对象,这个对象包含了Namenode向Datanode发送的名字节点指令,以及当前Namenode的HA状态。需要特别注意的是,在开启了HA的HDFS集群中,Datanode是需要同时向Active Namenode以及Standby Namenode发送心跳的,不过只有ActiveNamenode才能向Datanode下发名字节点指令。
(3)数据块读写相关方法
上面两个小节我们介绍了Datanode启动时以及发送心跳时与Namenode交互的方法。这一小节将介绍Datanode在进行数据块读写操作时与Namenode交互的方法,包括DatanodeProtocol中的reportBadBlocks()、blockReceivedAndDeleted()以及commitBlockSynchronization()方法。下面我们依次介绍这三个方法的定义及使用。
reportBadBlocks()与ClientProtocol.reportBadBlocks()方法很类似,Datanode会调用这个方法向Namenode汇报损坏的数据块。Datanode会在三种情况下调用这个方法:DataBlockScanner线程定期扫描数据节点上存储的数据块,发现数据块的校验出现错误时;数据流管道写数据时,Datanode接受了一个新的数据块,进行数据块校验操作出现错误时;进行数据块复制操作(DataTransfer),Datanode读取本地存储的数据块时,发现本地数据块副本的长度小于Namenode记录的长度,则认为该数据块已经无效,会调用reportBadBlocks()方法。reportBadBlocks()方法的参数是LocatedBlock对象,这个对象描述了出现错误数据块的位置,Namenode收到reportBadBlocks()请求后,会下发数据块副本删除指令删除错误的数据块。
Datanode会定期(默认是5分钟,不可以配置)调用blockReceivedAndDeleted()方法向Namenode汇报Datanode新接受的数据块或者删除的数据块。Datanode接受一个数据块,可能是因为Client写入了新的数据块,或者从别的Datanode上复制一个数据块到当前Datanode。Datanode删除一个数据块,则有可能是因为该数据块的副本数量过多,Namenode向当前Datanode下发了删除数据块副本的指令。我们可以把blockReceivedAndDeleted()方法理解为blockReport()的增量汇报,这个方法的参数包括DatanodeRegistration对象、增量汇报数据块所在的块池ID,以及StorageReceivedDeletedBlocks对象的数组,这里的StorageReceived DeletedBlocks对象封装了Datanode的一个数据存储上新添加以及删除的数据块集合。Namenode接受了这个请求之后,会更新它内存中数据块与数据节点的对应关系。
DatanodeProtocol中与数据块读写相关的最后一个方法是commitBlockSynchronization(),这个方法用于在租约恢复操作时同步数据块的状态。在租约恢复操作时,主数据节点完成所有租约恢复协调操作后调用commitBlockSynchronization()方法同步Datanode和Namenode上数据块的状态,所以commitBlockSynchronization()方法包含了大量的参数。对于租约恢复,我们会在第3章的租约管理小节以及第4章的文件系统数据集小节中介绍,请读者参考这两个章节内容。
(4)其他方法
DataProtocol中最后一个方法是errorReport(),该方法用于向名字节点上报运行过程中发生的一些状况,如磁盘不可用等,这个方法在调试时非常有用。由于这个方法并不涉及具体的逻辑,我们就不再详细介绍了。
(5)DatanodeCommand
通过前面几个小节的介绍我们知道,sendHeartbeat()、blockReport()以及cacheReport()方法的返回值都会携带Namenode向Datanode下发的名字节点指令。在HDFS中,使用DatanodeCommand类描述Namenode向Datanode发出的名字节点指令。DatanodeCommand类以及它的子类结构如图1-4所示。
图1-4 DatanodeCommand类结构
DatanodeCommand是所有名字节点的基类,它一共有7个子类,但在DatanodeProtocol中一共定义了10个名字节点指令,每个指令都有一个唯一的编号与之对应,代码如下:
可以看到,DatanodeProtocol中定义的指令类型数量和DatanodeCommand子类的数量并不一致。这是因为指令编号DNA_SHUTDOWN已经废弃不用了,Datanode接收到DNA_SHUTDOWN指令后会直接抛出UnsupportedOperationException异常。关闭Datanode是通过调用ClientDatanodeProtocol.shutdownDatanode()方法来触发的。
这里同时要注意的是,DNA_TRANSFER、DNA_RECOVERBLOCK以及DNA_INVALIDATE都是通过BlockCommand子类来封装的,只不过参数不同。DNA_TRANSFER指令用于触发数据节点的数据块复制操作,当HDFS系统中某个数据块的副本数小于配置的副本系数时,Namenode会通过DNA_TRANSFER指令通知某个拥有这个数据块副本的Datanode将该数据块复制到其他数据节点上。DNA_INVALIDATE用于通知Datanode删除数据节点上的指定数据块,这是因为Namenode发现了某个数据块的副本数已经超过了配置的副本系数,这时Namenode会通知某个数据节点删除这个数据节点上多余的数据块副本。当客户端在写文件时发生异常退出,会造成数据流管道中不同数据节点上数据块状态的不一致,这时Namenode会从数据流管道中选出一个数据节点作为主恢复节点,协调数据流管道中的其他数据节点进行租约恢复操作,以同步这个数据块的状态。此时Namenode就会向这个数据节点下发DNA_RECOVERBLOCK指令,通知数据节点开始租约恢复操作。
至于DatanodeCommand中其他类的作用都比较简单,这里就不再单独介绍了。
4.InterDatanodeProtocol
介绍完了Datanode与Namenode之间的接口,我们来介绍InterDatanodeProtocol——Datanode与Datanode之间的接口。InterDatanodeProtocol接口主要用于租约恢复操作,如图1-5所示,InterDatanodeProtocol只有initReplicaRecovery()和updateReplicaUnderRecovery()两个方法。
图1-5 InterDatanodeProtocol接口
客户端打开一个文件进行写操作时,首先要获取这个文件的租约,并且还需要定期更新租约。当Namenode的租约监控线程发现某个HDFS文件租约长期没有更新时,就会认为写这个文件的客户端发生异常,这时Namenode就需要触发租约恢复操作——同步数据流管道中所有Datanode上该文件数据块的状态,并强制关闭这个文件。
租约恢复的控制并不是由Namenode负责的,而是Namenode从数据流管道中选出一个主恢复节点,然后通过下发DatanodeCommand的恢复指令触发这个数据节点控制租约恢复操作,也就是由这个主恢复节点协调整个租约恢复操作的过程。主恢复节点会调用InterDatanodeProtocol接口来指挥数据流管道的其他数据节点进行租约恢复。租约恢复操作其实很简单,就是将数据流管道中所有数据节点上保存的同一个数据块状态(时间戳和数据块长度)同步一致。当成功完成租约恢复后,主恢复节点会调用DatanodeProtocol.commitBlock Synchronization()方法同步名字节点上该数据块的时间戳和数据块长度,保持名字节点和数据节点的一致。
由于数据流管道中同一个数据块状态(长度和时间戳)在不同的Datanode上可能是不一致的,所以主恢复节点会首先调用InterDatanodeProtocol.initReplicaRecovery()方法获取数据流管道中所有数据节点上保存的指定数据块的状态,这里的数据块状态使用ReplicaRecoveryInfo类封装。主恢复节点会根据收集到的这些状态,确定一个当前数据块的新长度,并且使用Namenode下发的recoverId作为数据块的新时间戳。
主恢复节点计算出数据块的新长度后,就会调用InterDatanodeProtocol.updateReplicaUnder Recovery()方法将数据流管道中所有节点上该数据块的长度同步为新的长度,将数据块的时间戳同步为新的时间戳。
当完成了所有的同步操作后,主恢复节点节就可以调用DatanodeProtocol.commitBlock Synchronization()将Namenode上该数据块的长度和时间戳同步为新的长度和时间戳,这样Datanode和Namenode的数据也就一致了。
5.NamenodeProtocol
NamenodeProtocol定义了第二名字节点与名字节点之间的接口。由于在Hadoop 2.X架构中第二名字节点的功能已经完全被Standby节点所取代,这个接口我们就不再详细介绍了。图1-6给出了NamenodeProtocol定义的所有方法,读者可以参考这个图读取相关的代码。
图1-6 NamenodeProtocol定义