> 文档中心 > 最全最详细的Zookeeper学习总结

最全最详细的Zookeeper学习总结


Zookeeper简介

分布式系统定义及⾯临的问题

Zookeeper最为主要的使用场景是作为分布式系统的分布式协同服务 。

我们将分布式系统定义为:分布式系统是同时跨越多个物理主机,独立运行的多个软件所组成系统。

分布式系统的协调工作就是通过某种方式,让每个节点的信息能够同步和共享。这依赖于服务进程之间的通信。通信方式有两种:

  • 通过网络进行信息共享
    这就像现实中,开发leader在会上把任务传达下去,组员通过听leader命令或者看leader的邮件知道自己要干什么。当任务分配有变化时,leader会单独告诉组员,或者再次召开会议。信息通过人与人之间的直接沟通,完成传递。
  • 通过共享存储
    这就好比开发eader按照约定的时间和路径,把任务分配表放到了svn 上,组员每天去svn 上拉取最新的任务分配表,然后干活。其中svn就是共享存储。更好一点的做法是, 当svn文件版本更新时,触发邮件通知,每个组员再去拉取最新的任务分配表。文样做更好,因为每次更新,组员都能第一时间得到消息,从而让自己手中的任务分配表永远是最新的。此种方式依赖于中央存储。整个过程如下图所示∶
    在这里插入图片描述

Zookeeper如何解决分布式系统面临的问题

ZooKeeper对分布式系统的协调,使用的是第二种方式,共享存储。其实共享存储,分布式应用也需要和存储进行网络通信。
实际上,通过ZooKeeper实现分布式协同的原理,和项目组通过SVN同步工作任务的例子是一样的。ZooKeeper就像是SVN。存储了任务的分配、完成情况等共享信息。每个分布式应用的节点就是组员. 订阅这些共享信息。当主节点(组leader),对某个从节点的分工信息作出改变时,相关订阅的从节点得到|zookeeper的通知,取得自口最新的任务分配。完成工作后。把完成情况存储到lzookeeper。主节点订阅了该任务的完成情况信息。所以将得到zookeeper的完工的通知。参考下图,是不是和前面项目组通过svn分配工作的例子一模一样? 仅仅是把svn和邮件系统合二为一,以ZooKeeper代替
在这里插入图片描述
注∶Slave节点要想获取ZooKeeper的更新通知,需事先在关心的数据节点上设置观察点。

大多数分布式系统中出现的问题,都源于信息的共享出了问题。如果各个节点间信息不能及时共享和同步,那么就会在协作过程中产生各种问题。ZooKeeper解决协同问题的关键,就是在于保证分布式系统信息的一致性。

ZooKeeper的基本概念

Zookeeper是一个开源的分布式协调服务,其设计目标是将那些复杂的且容易出错的分布式一致性服务封装起来,构成一个高效可靠的原语集,并以一些简单的接口提供给用户使用。zookeeper是一个典型的分布式数据一致性的解决方案,分布式应用程序可以基于它实现诸如数据订阅/发布、负载均衡、命名服务、集群管理、分布式锁和分布式队列等功能

基本概念

  • 集群角色
    通常在分布式系统中,构成一个集群的每一台机器都有自己的角色,最典型的集群就是Master/Slave模式(主备模式),此情况下把所有能够处理写操作的机器称为Master机器,把所有通过异步复制方式获取最新数据,并提供读服务的机器为Slave机器。

    而在Zookeeper中,这些概念被颠覆了。它没有沿用传递的Master/Slave概念,而是引入了Leader、Follower、Observer三种角色。Zookeeper集群中的所有机器通过Leader选举来选定一台被称为Leader的机器,Leader服务器为客户端提供读和写服务,除Leader外,其他机器包括Follower和Observer,Follower和Observer都能提供读服务,唯一的区别在于Observer不参与Leader选举过程,不参与写操作的过半写成功策略,因此Observer可以在不影响写性能的情况下提升集群的性能。

  • 会话(session)
    Session指客户端会话,一个客户端连接是指客户端和服务端之间的一个TCP长连接,Zookeeper对外的服务端口默认为2181,客户端启动的时候,首先会与服务器建立一个TCP连接,从第一次连接建立开始,客户端会话的生命周期也开始了,通过这个连接,客户端能够心跳检测与服务器保持有效的会话,也能够向Zookeeper服务器发送请求并接受响应,同时还能够通过该连接接受来自服务器的Watch事件通知。

  • 数据节点(Znode)
    在谈到分布式的时候,我们通常说的"节点"是指组成集群的每一台机器。然而,在ZooKeeper中,"节点"分为两类,
    第一类同样是指构成集群的机器,我们称之为机器节点;
    第二类则是指数据模型中的数据单元,我们称之为数据节点一ZNode。
    ZooKeeper将所有数据存储在内存中,数据模型是一棵树(ZNode Tree),由斜杠(/)进行分割的路径,就是一个Znode,例如/app/path1。每个ZNode上都会保存自己的数据内容,同时还会保存一系列属性信息。

  • 版本
    刚刚我们提到,Zookeeper的每个Znode上都会存储数据,对于每个ZNode,Zookeeper都会为其维护一个叫作Stat的数据结构,Stat记录了这个ZNode的三个数据版本,分别是version (当前ZNode的版本)、cversion(当前ZNode子节点的版本)、aversion(当前ZNode的ACL版本)。

  • Watcher(事件监听器)
    Wathcer(事件监听器),是Zookeeper中一个很重要的特性,Zookeeper允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,Zookeeper服务端会将事件通知到感兴趣的客户端,该机制是Zookeeper实现分布式协调服务的重要特性

  • ACL
    Zookeeper采用ACL(Access Control Lists)策略来进行权限控制,其定义了如下五种权限∶

    • CREATE∶创建子节点的权限。
    • READ∶获取节点数据和子节点列表的权限。
    • WRITE∶更新节点数据的权限。
    • DELETE∶删除子节点的权限。
    • ADMIN∶设置节点ACL的权限。
      其中需要注意的是,CREATE和DELETE这两种权限都是针对子节点的权限控制

Zookeeper环境搭建

Zookeeper的搭建方式

Zookeeper安装方式有三种,单机模式和集群模式以及伪集群模式。

  • 单机模式∶Zookeeper只运行在一台服务器上,适合测试环境;
  • 集群模式∶Zookeeper运行于一个集群上,适合生产环境,这个计算机集群被称为一个"集合体"
  • 伪集群模式∶就是在一台服务器上运行多个Zookeeper实例;

单机模式搭建∶

Zookeeper安装以linux环境为例∶

  1. 下载
    首先我们下载稳定版本的zookeeper, http://zookeeper.apache.org/releases.html

  2. 上传
    下载完成后,将zookeeper压缩包zookeeper-3.4.14.tar.gz上传到inux系统

  3. 解压缩压缩包

    tar -zxVf zookeeper-3.4.14.tar.gZ
  4. 进入 zookeeper-3.4.14目录,创建 data文件夹

    cd zookeeper-3.4.14 mkdir data
  5. 修改配置文件名称

    cd confmv zoo_sample.cfg zoo.cfg
  6. 修改zoo.cfg中的data属性

    dataDir=/root/zookeeper-3.4.14/data
  7. zookeeper服务启动
    进入bin目录,启动服务,输入命令。

    ./zkServer.sh start

    输出以下内容标识启动成功
    最全最详细的Zookeeper学习总结
    关闭服务输入命令

    ./zkServer.sh stop

    输出以下提示信息
    最全最详细的Zookeeper学习总结
    查看状态

    ./zkServer.sh status

    如果启动状态,提示
    在这里插入图片描述
    如果未启动状态,提示∶
    最全最详细的Zookeeper学习总结

伪集群模式∶

Zookeeper不但可以在单机上运行单机模式Zookeeper,而且可以在单机模拟集群模式 Zookeeper的运行,也就是将不同实例运行在同一台机器,用端口进行区分,伪集群模式为我们体验Zookeeper和做一些尝试性的实验提供了很大的便利。比如,我们在测试的时候,可以先使用少量数据在伪集群模式下进行测试。当测试可行的时候,再将数据移植到集群模式进行真实的数据实验。这样不但保证了它的可行性,同时大大提高了实验的效率。这种搭建方式,比较简便,成本比较低。适合测试和学习

注意事项∶
一台机器上部署了3个server,,也就是说单台机器及上运行多个Zookeeper实例。这种情况下,必须保证每个配置文档的各个端口号不能冲突,除clientPort不同之外,dataDir也不同。另外,还要在dataDir所对应的目录中创建myid文件来指定对应的Zookeeper服务器实例

  • clientPort端口∶
    如果在1台机器上部署多个server,那么每台机器都要不同的 clientPort,比如 server1是2181,server2 是2182,server3是2183
  • dataDir和dataLogDir∶
    dataDir和dataLogDir也需要区分下,将数据文件和日志文件分开存放,同时每个server的这两变量所对应的路径都是不同的
  • server.X和myid∶
    server.X这个数字就是对应,data/myid中的数字。在3个server的myid文件中分别写入了1,2,3,那么每个server中的z00.cfg都配 server.1 server.2.server.3就行了。因为在同一台机器上,后面连着的2 个端口,3个server都不要一样,否则端口冲突。

开始搭建伪集群

  1. 下载
    首先我们下载最新稳定版本的zookeeper

  2. 上传
    下载完成后,将zookeeper压缩包 zookeeper-3.4.14.tar.gz上传到linux系统

  3. 解压 压缩包
    创建目录zkcluster

    mkdir zkcluster
  4. 解压 zookeeper-3.4.14.tar.gz到zkcluster目录下

    tar -zxvf zookeeper-3.4.14.tar.gz -C /zkcluster
  5. 改变名称

    mv zookeeper-3.4.14 zookeeper01
  6. 复制并改名

    cp -r zookeeper01/ zookeeper02 cp -r zookeeper01/ zookeeper03
  7. 分别在zookeeper01、zookeeper02、zookeeper03目录下创建data及logs目录

    mkdir data cd data mkdir logs
  8. 修改配置文件名称

    cd confmv zoO_sample.cfg zoo.cfg
  9. 配置每一个Zookeeper 的dataDir(zo0.cfg)clientPort分别为218121822183

    clientPort=2181dataDir=/zkcluster/zookeeper01/datadataLogDir=/zkcluster/zookeeper01/data/logs
    clientPort=2182dataDir=/zkcluster/zookeeper02/datadataLogDir=/zkcluster/zookeeper02/data/logs
    clientPort=2183dataDir=/zkcluster/zookeeper03/datadataLogDir=/zkcluster/zookeeper03/data/logs
  10. 配置集群

    • 在每个zookeeper的 data 目录下创建一个myid 文件,内容分别是1、2、3。这个文件就是记录每个服务器的ID

      touch myid
    • 在每一个z0okeeper 的 zoo.cfg配置客户端访问端口(clientPort)和集群服务器IP列表。

    server.1=10.211.55.4:2881:3881 server.2=10.211.55.4:2882:3882 server.3=10.211.55.4:2883:3883#server.服务器ID=服务器IP地址∶服务器之间通信端口∶服务器之间投票选举端口
  11. 启动集群
    依次启动三个zk实例

Zookeeper基本使用

ZooKeeper系统模型

ZooKeeper数据模型Znode
在ZooKeeper中,数据信息被保存在一个个数据节点上,这些节点被称为znode。ZNode 是
ZooKeeper 中最小数据单位,在 ZNode下面又可以再挂ZNode,这样一层层下去就形成了一个层次化命名空间 ZNode 树,我们称为 ZNode Tree,它采用了类似文件系统的层级树状结构进行管理。见下图示例
在这里插入图片描述
在 Zookeeper中,每一个数据节点都是一个 ZNode,上图根目录下有两个节点,分别是∶ app1和app2,其中 app1 下面又有三个子节点,所有ZNode按层次化进行组织,形成这么一颗树,ZNode的节点路径标识方式和Unix文件系统路径非常相似,都是由一系列使用斜杠(/)进行分割的路径表示,开发人员可以向这个节点写入数据,也可以在这个节点下面创建子节点。

ZNode 的类型

Zookeeper的znode tree是由一系列数据节点组成的,那接下来,我们就对数据节点做详细讲解
Zookeeper 节点类型可以分为三大类∶

  • 持久性节点(Persistent)
  • 临时性节点(Ephemeral)
  • 顺序性节点(Sequential)

在开发中在创建节点的时候通过组合可以生成以下四种节点类型∶ 持久节点、持久顺序节点,临时节点、临时顺序节点。不同类型的节点则会有不同的生命周期。

  • 持久节点∶
    是Zookeeper中最常见的一种节点类型,所谓持久节点,就是指节点被创建后会一直存在服务器,直到删除操作主动清除
  • 持久顺序节点∶
    就是有顺序的持久节点,节点特性和持久节点是一样的.只是额外特性表现在顺序上。顺序特性实质是在创建节点的时候,会在节点名后面加上一个数字后缀,来表示其顺序。
  • 临时节点∶
    就是会被自动清理掉的节点.它的生命周期和客户端会话绑在一起.客户端会话结束、节点会被删除掉。与持久性节点不同的是,临时节点不能创建子节点。
  • 临时顺序节点∶
    就是有顺序的临时节点,和持久顺序节点相同,在其创建的时候会在名字后面加上数字后缀

事务ID
事务是对物理和抽象的应用状态上的操作集合。狭义上的事务通常指的是数据库事务,一般包含了一系列对数据库有序的读写操作,这些数据库事务具有所谓的ACID特性,即原子性(Atomic)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)。

而在ZooKeeper中,事务是指能够改变ZooKeeper服务器状态的操作,我们也称之为事务操作或更新操作,一般包括数据节点创建与删除、数据节点内容更新等操作。对于每一个事务请求,ZooKeeper都会为其分配一个全局唯一的事务ID,用 ZXID来表示,通常是一个64位的数字。每一个ZXID 对应一次更新操作,从这些ZXID中可以间接地识别出ZooKeeper处理这些更新操作请求的全局顺序

ZNode 的状态信息

在这里插入图片描述
整个ZNode 节点内容包括两部分∶节点数据内容和节点状态信息。图中quota 是数据内容,其他的属于状态信息。那么这些状态信息都有什么含义呢?
cZxid 就是Create zZXID,表示节点被创建时的事务ID。ctime 就是 Create Time,表示节点创建时间。

mzxid 就是 Modified zXID,表示节点最后一次被修改时的事务ID。mtime 就是 Modified Time,表示节点最后一次被修改的时间。p2xid 表示该节点的子节点列表最后一次被修改时的事务 ID。只有子节点列表变更才会更新 pzxid,子节点内容变更不会更新。cversion 表示子节点的版本号。dataVersion 表示内容版本号。aclVersion 标识acl版本ephemeral0wner 表示创建该临时节点时的会话 sessionID,如果是持久性节点那么值为 0 dataLength 表示数据长度。numChildren 表示直系子节点数。

Watcher–数据变更通知

Zookeeper使用Watcher机制实现分布式数据的发布/订阅功能

一个典型的发布/订阅模型系统定义了一种 一对多的订阅关系,能够让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态变化时,会通知所有订阅者,使它们能够做出相应的处理。

在 ZooKeeper 中,引入了Watcher 机制来实现这种分布式的通知功能。ZooKeeper 允许客户端向服务端注册一个Watcher 监听,当服务端的一些指定事件触发了这个Watcher,那么就会向指定客户端发送一个事件通知来实现分布式的通知功能。

整个Watcher注册与通知过程如图所示。
在这里插入图片描述
Zookeeper的Watcher机制主要包括客户端线程、客户端WatcherManager、Zookeeper服务器三部分。

具体工作流程为∶客户端在向Zookeeper服务器注册的同时,会将Watcher对象存储在客户端的WatcherManager当中。当Zookeeper服务器触发Watcher事件后,会向客户端发送通知,客户端线程从WatcherManager中取出对应的Watcher对象来执行回调逻辑。

ACL保障数据的安全

Zookeeper作为一个分布式协调框架,其内部存储了分布式系统运行时状态的元数据,这些元数据会直接影响基于Zookeeper进行构造的分布式系统的运行状态,因此,如何保障系统中数据的安全,从而避免因误操作所带来的数据随意变更而导致的数据库异常十分重要。在Zookeeper中,提供了一套完善的ACL(Access Control List)权限控制机制来保障数据的安全。

我们可以从三个方面来理解ACL机制∶权限模式(Scheme)、授权对象(ID)、权限(Permission),通常使用"scheme∶ id∶permission"来标识一个有效的ACL信息。

  • 权限模式∶Scheme
    权限模式用来确定权限验证过程中使用的检验策略,有如下四种模式∶

    1. IP
      IP模式就是通过IP地址粒度来进行权限控制,如"ip∶192.168.0.110"表示权限控制针对该IP地址,同时IP模式可以支持按照网段方式进行配置,如"ip∶192.168.0.1/24"表示针对192.168.0.*这个网段进行权限控制。
    2. Digest
      Digest是最常用的权限控制模式,要更符合我们对权限控制的认识.其使
      用"username∶password"形式的权限标识来进行权限配置,便于区分不同应用来进行权限控制。当我们通过"username∶password"形式配置了权限标识后,Zookeeper会先后对其进行SHA-1加密和BASE64编码。
    3. World
      World是一种最开放的权限控制模式,这种权限控制方式几乎没有任何作用,数据节点的访问权限对所有用户开放,即所有用户都可以在不进行任何权限校验的情况下操作ZooKeeper上的数据。另外,World模式也可以看作是一种特殊的Digest模式,它只有一个权限标识,即"world∶anyone"。
    4. Super
      Super模式,顾名思义就是超级用户的意思,也是一种特殊的Digest模式。在Super模式下,超级用户可以对任意ZooKeeper上的数据节点进行任何操作。
  • 授权对象∶ID
    授权对象指的是权限赋予的用户或一个指定实体,例如IP地址或是机器等。在不同的权限模式下,授权对象是不同的,表中列出了各个权限模式和授权对象之间的对应关系。

    权限模式 授权对象
    IP 通常是一个IP地址或IP段∶例如∶192.168.10.110或192.168.10.1/24
    Digest 自定义,通常是username∶BASE64(SHA-1username∶password)例如∶zm:sdfndslIndlksfn7c=
    Digest 只有一个ID∶ anyone
    Super 超级用户
  • 权限
    权限就是指那些通过权限检查后可以被允许执行的操作。在ZooKeeper中,所有对数据的操作权限分为以下五大类∶

    1. CREATE((C)∶数据节点的创建权限,允许授权对象在该数据节点下创建子节点。
    2. DELETE(D)∶子节点的删除权限,允许授权对象删除该数据节点的子节点。
    3. READ(R)数据节点的读取权限,允许授权对象访问该数据节点并读取其数据内容或子节点列表等。
    4. WRITE(W)∶数据节点的更新权限,允许授权对象对该数据节点进行更新操作。
    5. ADMIN(A)∶ 数据节点的管理权限,允许授权对象对该数据节点进行 ACL 相关的设置操作。

ZooKeeper命令行操作

现在已经搭建起了一个能够正常运行的zookeeper服务了,所以接下来,就是来借助客户端来对zookeeper的数据节点进行操作

首先,进入到zookeeper的bin目录之后

通过zkClient进入zookeeper客户端命令行

./zkcli.sh 连接本地的zookeeper服务器./zkCli.sh -server ip∶port 连接指定的服务器

连接成功之后,系统会输出Zookeeper的相关环境及配置信息等信息。输入help之后,屏幕会输出可用的Zookeeper命令,如下图所示
在这里插入图片描述

创建节点
使用create命令,可以创建一个Zookeeper节点,如

create [-s][-e] path data acl其中,-s或-e分别指定节点特性,顺序或临时节点,若不指定,则创建持久节点;acl用来进行权限控制。
  1. 创建顺序节点
    使用create-s /zk-test 123命令创建zk-test顺序节点

    Create -s/zk-test 123

    最全最详细的Zookeeper学习总结
    执行完后,就在根节点下创建了一个叫做/zk-test的节点,该节点内容就是123,同时可以看到创建的zk-test节点后面添加了一串数字以示区别

  2. 创建临时节点
    使用 create-e /zk-temp 123命令创建zk-templ临时节
    最全最详细的Zookeeper学习总结
    临时节点在客户端会话结束后,就会自动删除,下面使用quit命令退出客户端
    在这里插入图片描述

    再次使用客户端连接服务端,并使用Is/命令查看根目录下的节点
    最全最详细的Zookeeper学习总结
    可以看到根目录下已经不存在zk-temp临时节点了

  3. 创建永久节点
    使用 create /zk-permanent 123命令创建zk-permanent永久节点
    最全最详细的Zookeeper学习总结
    可以看到永久节点不同于顺序节点,不会自动在后面添加一串数字

  4. 读取节点
    与读取相关的命令有ls 命令和get 命令

    Is命令可以列出Zookeeper指定节点下的所有子节点.但只能查看指定节点下的第一级的所有子节点;

    1s path其中,path表示的是指定数据节点的节点路径

    get命令可以获取Zookeeper指定节点的数据内容和属性信息。

    get path

    若获取根节点下面的所有子节点,使用ls/命令即可
    最全最详细的Zookeeper学习总结
    若想获取/zk-permanent的数据内容和属性,可使用如下命令∶get /zk-permanent
    在这里插入图片描述
    从上面的输出信息中,我们可以看到,第一行是节点/zk-permanent 的数据内容,其他几行则是创建该节点的事务ID(cZxid)、最后一次更新该节点的事务ID(mZxid)和最后一次更新该节点的时间(mtime)等属性信息

  5. 更新节点
    
使用set命令,可以更新指定节点的数据内容,用法如下

    

set path data [ version ]

    其中,data就是要更新的新内容,version表示数据版本,在zookeeper中,节点的数据是有版本概念的,这个参数用于指定本次更新操作是基于Znode的哪一个数据版本进行的,如将/zk-permanent节点的数据更新为456,可以使用如下命令∶
    

在这里插入图片描述

    现在dataVersion已经变为1了,表示进行了更新

  6. 删除节点
    
使用delete命令可以删除Zookeeper上的指定节点,用法如下

    delete path [version]

    其中version也是表示数据版本,使用delete /zk-permanent 命令即可删除/zk-permanent节点
    最全最详细的Zookeeper学习总结
    可以看到,已经成功删除/zk-permanent节点。值得注意的是,若删除节点存在子节点,那么无法删除该节点,必须先删除子节点,再删除父节点

Zookeeper的api使用

Zookeeper作为一个分布式框架,主要用来解决分布式一致性问题,它提供了简单的分布式原语,并且对多种编程语言提供了API,所以接下来重点来看下Zookeeper的java客户端API使用方式
Zookeeper API共包含五个包,分别为∶

  1. org.apache.2ookeeper
  2. org.apache.zookeeper.data
  3. org.apache.zookeeper.server
  4. org.apache.zookeeper.server.quorum
  5. org.apache.zookeeper.server.upgrade
    其中org.apache.z0okeeper,包含Zookeeper类,他是我们编程时最常用的类文件。这个类是Zookeeper客户端的主要类文件。如果要使用Zookeeper服务,应用程序首先必须创建一个Zookeeper 实例,这时就需要使用此类。一旦客户端和Zookeeper服务端建立起了连接,Zookeeper系统将会给本次连接会话分配一个ID值,并且客户端将会周期性的向服务器端发送心跳来维持会话连接。只要连接有效,客户端就可以使用Zookeeper API来做相应处理了。

准备工作∶

  1. 导入依赖

    <dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.4.14</version></dependency>
  2. 建立会话

    public class CreateSession implements Watcher {//countDownLatch这个类使⼀个线程等待,主要不让main⽅法结束private static CountDownLatch countDownLatch = new CountDownLatch(1);public static void main(String[] args) throws InterruptedException,IOException {/*客户端可以通过创建⼀个zk实例来连接zk服务器new Zookeeper(connectString,sesssionTimeOut,Wather)connectString: 连接地址:IP:端⼝sesssionTimeOut:会话超时时间:单位毫秒Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)*/ZooKeeper zooKeeper = new ZooKeeper("10.211.55.4:2181", 5000, new CreateSession());System.out.println(zooKeeper.getState());countDownLatch.await();//表示会话真正建⽴System.out.println("=========Client Connected tozookeeper==========");}// 当前类实现了Watcher接⼝,重写了process⽅法,该⽅法负责处理来⾃Zookeeper服务端的 watcher通知,在收到服务端发送过来的SyncConnected事件之后,解除主程序在CountDownLatch上的等待阻塞,⾄此,会话创建完毕public void process(WatchedEvent watchedEvent) {//当连接创建了,服务端发送给客户端SyncConnected事件if(watchedEvent.getState() == Event.KeeperState.SyncConnected){countDownLatch.countDown();} }}

注意,ZooKeeper 客户端和服务端会话的建立是一个异步的过程,也就是说在程序中,构造方法会在处理完客户端初始化工作后立即返回,在大多数情况下,此时并没有真正建立好一个可用的会话.在会话的生命周期中处于"CONNECTING"的状态。 当该会话真正创建完毕后ZooKeeper服务端会向会话对应的客户端发送一个事件通知,以告知客户端,客户端只有在获取这个通知之后,才算真正建立了会话。

  1. 创建节点

    public class CreateNote implements Watcher {//countDownLatch这个类使⼀个线程等待,主要不让main⽅法结束private static CountDownLatch countDownLatch = new CountDownLatch(1);private static ZooKeeper zooKeeper;public static void main(String[] args) throws Exception {zooKeeper = new ZooKeeper("10.211.55.4:2181", 5000, new CreateNote());countDownLatch.await();}public void process(WatchedEvent watchedEvent) {//当连接创建了,服务端发送给客户端SyncConnected事件if(watchedEvent.getState() == Event.KeeperState.SyncConnected){countDownLatch.countDown(); }//调⽤创建节点⽅法try {createNodeSync(); } catch (Exception e) {e.printStackTrace(); } } private void createNodeSync() throws Exception {/* path :节点创建的路径* data[] :节点创建要保存的数据,是个byte类型的* acl :节点创建的权限信息(4种类型)* ANYONE_ID_UNSAFE : 表示任何⼈* AUTH_IDS :此ID仅可⽤于设置ACL。它将被客户机验证的ID替换。* OPEN_ACL_UNSAFE :这是⼀个完全开放的ACL(常⽤)-->world:anyone* CREATOR_ALL_ACL :此ACL授予创建者身份验证ID的所有权限* createMode :创建节点的类型(4种类型)* PERSISTENT:持久节点* PERSISTENT_SEQUENTIAL:持久顺序节点* EPHEMERAL:临时节点* EPHEMERAL_SEQUENTIAL:临时顺序节点String node = zookeeper.create(path,data,acl,createMode);*/String node_PERSISTENT = zooKeeper.create("/learn_persistent", "持久节点内 容".getBytes("utf-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);String node_PERSISTENT_SEQUENTIAL = zooKeeper.create("/learn_persistent_sequential", "持久节点内容".getBytes("utf-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);String node_EPERSISTENT = zooKeeper.create("/learn_ephemeral", "临时节点内 容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);System.out.println("创建的持久节点是:"+node_PERSISTENT);System.out.println("创建的持久顺序节点是:"+node_PERSISTENT_SEQUENTIAL);System.out.println("创建的临时节点是:"+node_EPERSISTENT); }}
  2. 获取节点数据

    public class GetNoteData implements Watcher {//countDownLatch这个类使⼀个线程等待,主要不让main⽅法结束private static CountDownLatch countDownLatch = new CountDownLatch(1);private static ZooKeeper zooKeeper;public static void main(String[] args) throws Exception {zooKeeper = new ZooKeeper("10.211.55.4:2181", 10000, new GetNoteDate());Thread.sleep(Integer.MAX_VALUE); } public void process(WatchedEvent watchedEvent) {//⼦节点列表发⽣变化时,服务器会发出NodeChildrenChanged通知,但不会把变化情况告诉给客户端// 需要客户端⾃⾏获取,且通知是⼀次性的,需反复注册监听if(watchedEvent.getType() ==Event.EventType.NodeChildrenChanged){//再次获取节点数据try {List<String> children = zooKeeper.getChildren(watchedEvent.getPath(), true);System.out.println(children); } catch (KeeperException e) {e.printStackTrace(); } catch (InterruptedException e) {e.printStackTrace(); }}//当连接创建了,服务端发送给客户端SyncConnected事件if(watchedEvent.getState() == Event.KeeperState.SyncConnected){try {//调⽤获取单个节点数据⽅法getNoteDate();getChildrens(); } catch (KeeperException e) {e.printStackTrace(); } catch (InterruptedException e) {e.printStackTrace(); }} }private static void getNoteData() throws Exception {/* path : 获取数据的路径* watch : 是否开启监听* stat : 节点状态信息* null: 表示获取最新版本的数据* zk.getData(path, watch, stat);*/byte[] data = zooKeeper.getData("/learn_persistent/learn-children", true, null);System.out.println(new String(data,"utf-8")); } private static void getChildrens() throws KeeperException, InterruptedException {/*path:路径watch:是否要启动监听,当⼦节点列表发⽣变化,会触发监听zooKeeper.getChildren(path, watch);*/List<String> children = zooKeeper.getChildren("/learn_persistent", true);System.out.println(children); }}
  3. 修改节点数据

    public class updateNote implements Watcher {private static ZooKeeper zooKeeper;public static void main(String[] args) throws Exception {zooKeeper = new ZooKeeper("10.211.55.4:2181", 5000, new updateNote());Thread.sleep(Integer.MAX_VALUE); } public void process(WatchedEvent watchedEvent) {//当连接创建了,服务端发送给客户端SyncConnected事件try {updateNodeSync(); } catch (Exception e) {e.printStackTrace(); } } private void updateNodeSync() throws Exception {/*path:路径data:要修改的内容 byte[]version:为-1,表示对最新版本的数据进⾏修改zooKeeper.setData(path, data,version);*/byte[] data = zooKeeper.getData("/learn_persistent", false, null);System.out.println("修改前的值:"+new String(data));//修改 stat:状态信息对象 -1:最新版本Stat stat = zooKeeper.setData("/learn_persistent", "客户端修改内 容".getBytes(), -1);byte[] data2 = zooKeeper.getData("/learn_persistent", false, null);System.out.println("修改后的值:"+new String(data2)); }}
  4. 删除节点

    public class DeleteNote implements Watcher {private static ZooKeeper zooKeeper;public static void main(String[] args) throws Exception {zooKeeper = new ZooKeeper("10.211.55.4:2181", 5000, new DeleteNote());Thread.sleep(Integer.MAX_VALUE);}public void process(WatchedEvent watchedEvent) {//当连接创建了,服务端发送给客户端SyncConnected事件try {deleteNodeSync(); } catch (Exception e) {e.printStackTrace(); }}private void deleteNodeSync() throws KeeperException, InterruptedException {/*zooKeeper.exists(path,watch) :判断节点是否存在zookeeper.delete(path,version) : 删除节点*/Stat exists = zooKeeper.exists("/learn_persistent/learn-children", false);System.out.println(exists == null ? "该节点不存在":"该节点存在");zooKeeper.delete("/learn_persistent/learn-children",-1);Stat exists2 = zooKeeper.exists("/learn_persistent/learn-children", false);System.out.println(exists2 == null ? "该节点不存在":"该节点存在"); }}

Zookeeper-开源客户端

ZkClient

ZkClient是Github上一个开源的zookeeper客户端,在Zookeeper原生API接口之上进行了包装,是一个更易用的Zookeeper客户端,同时,zkClient在内部还实现了诸如Session超时重连、Watcher反复注册等功能。
接下来,还是从创建会话、创建节点、读取数据、更新数据、删除节点等方面来介绍如何使用zkClient 这个z0okeeper客户端

  1. 添加依赖∶
    在pom.xml文件中添加如下内容

    <dependency><groupId>com.101tec</groupId><artifactId>zkclient</artifactId><version>0.2</version></dependency>
  2. 创建会话∶
    使用ZkClient可以轻松的创建会话,连接到服务端。

    package com.hust.grid.leesf.zkclient.examples;import java.io.IOException;import org.I0Itec.zkclient.ZkClient;public class CreateSession {/*创建⼀个zkClient实例来进⾏连接注意:zkClient通过对zookeeperAPI内部包装,将这个异步的会话创建过程同步化了*/public static void main(String[] args) {ZkClient zkClient = new ZkClient("127.0.0.1:2181");System.out.println("ZooKeeper session established.");}}

    运行结果∶ZooKeeper session established.
    结果表明已经成功创建会话。

  3. 创建节点
    ZkClient提供了递归创建节点的接口,即其帮助开发者先完成父节点的创建,再创建子节点

    import org.I0Itec.zkclient.ZkClient;public class Create_Node_Sample {public static void main(String[] args) {ZkClient zkClient = new ZkClient("127.0.0.1:2181");System.out.println("ZooKeeper session established.");//createParents的值设置为true,可以递归创建节点zkClient.createPersistent("/learn-zkClient/learn-c1",true);System.out.println("success create znode."); }}内;

    运行结果∶success create znode.
    结果表明已经成功创建了节点,值得注意的是,在原生态接口中是无法创建成功的(父节点不存在),但是通过ZkClient通过设置createParents参数为true可以递归的先创建父节点,再创建子节点

  4. 删除节点
    ZkClient提供了递归删除节点的接口,即其帮助开发者先删除所有子节点(存在),再删除父节点。

    package com.hust.grid.leesf.zkclient.examples;import org.I0Itec.zkclient.ZkClient;public class Del_Data_Sample {public static void main(String[] args) throws Exception {String path = "/learn-zkClient/learn-c1";ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);zkClient.deleteRecursive(path);System.out.println("success delete znode.");}}

    运行结果∶Success delete znode.
    结果表明ZkClient可直接删除带子节点的父节点,因为其底层先删除其所有子节点,然后再删除父节点

  5. 获取子节点

    package com.hust.grid.leesf.zkclient.examples;import java.util.List;import org.I0Itec.zkclient.IZkChildListener;import org.I0Itec.zkclient.ZkClient;public class Get_Children_Sample {public static void main(String[] args) throws Exception {ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);List<String> children = zkClient.getChildren("/learn-zkClient");System.out.println(children);//注册监听事件zkClient.subscribeChildChanges(path, new IZkChildListener() {public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {System.out.println(parentPath + " ' s child changed, currentChilds:" + currentChilds); } });zkClient.createPersistent("/learn-zkClient");Thread.sleep(1000);zkClient.createPersistent("/learn-zkClient/c1");Thread.sleep(1000);zkClient.delete("/learn-zkClient/c1");Thread.sleep(1000);zkClient.delete(path);Thread.sleep(Integer.MAX_VALUE); }}

    运行结果:

    /zk-book 's child changed, currentChilds:[]/zk-book 's child changed, currentChilds:[c1]/zk-book 's child changed, currentChilds:[]/zk-book 's child changed, currentChilds:null

    结果表明∶

    客户端可以对一个不存在的节点进行子节点变更的监听。

    一日客户端对一个节点注册了子节点列表变更监听之后,那么当该节点的子节点列表发生变更时,服务端都会通知客户端,并将最新的子节点列表发送给客户端
。
    该节点本身的创建或删除也会通知到客户端。

  6. 获取数据(节点是否存在、更新、删除)

    public class Get_Data_Sample {public static void main(String[] args) throws InterruptedException {String path = "/learn-zkClient-Ep";ZkClient zkClient = new ZkClient("127.0.0.1:2181");//判断节点是否存在boolean exists = zkClient.exists(path);if (!exists){zkClient.createEphemeral(path, "123"); }//注册监听zkClient.subscribeDataChanges(path, new IZkDataListener() {public void handleDataChange(String path, Object data) throws Exception {System.out.println(path+"该节点内容被更新,更新后的内容"+data); } public void handleDataDeleted(String s) throws Exception {System.out.println(s+" 该节点被删除"); } });//获取节点内容Object o = zkClient.readData(path);System.out.println(o);//更新zkClient.writeData(path,"4567");Thread.sleep(1000);//删除zkClient.delete(path);Thread.sleep(1000); }}

    运行结果:

    123/learn-zkClient-Ep该节点内容被更新,更新后的内容4567/learn-zkClient-Ep 该节点被删除

    结果表明可以成功监听节点数据变化或删除事件。

Curator客户端

curator是Netfli公司开源的一套Zookeeper客户端框架,和ZKClient一样,Curator解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连,反复注册Watcher和
NodeExistsException异常等,是最流行的Zookeeper客户端之一。从编码风格上来讲,它提供了基于Fluent的编程风格支持

  1. 添加依赖
    在pom.xml文件中添加如下内容∶

    <dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>2.12.0</version></dependency>
  2. 创建会话
    Curator的创建会话方式与原生的API和ZkClient的创建方式区别很大。Curator创建客户端是通过CuratorFrameworkFactory工厂类来实现的。具体如下∶

    • 使用CuratorFramework这个工厂类的两个静态方法来创建一个客户端

      public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)

      其中参数RetryPolicy提供重试策略的接口,可以让用户实现自定义的重试策略,默认提供了以下实现,分别为ExponentialBackoffRetry(基于backoff的重连策略)、RetryNTimes(重连N次策略)、RetryForever(永远重试策略)

    • 通过调用CuratorFramework中的start(()方法来启动会话

      RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",retryPolicy);client.start();
      RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 5000,1000,retryPolicy);client.start();

      其实进一步查看源代码可以得知,其实这两种方法内部实现一样,只是对外包装成不同的方法。它们的底层都是通过第三个方法builder来实现的

      RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);private static CuratorFramework Client = CuratorFrameworkFactory.builder().connectString("serverl:2181,server2:2181,server3:2181"). sessionTimeoutMs (50000).connectionTimeoutMs(30000).retryPolicy(retryPolicy).build();client.start();

      参数

      • connectString∶zk的server地址,多个server之间使用英文逗号分隔开
      • connectionTimeoutMs∶连接超时时间,如上是30s,默认是15s
      • sessionTimeoutMs∶会话超时时间,如上是50s,默认是60s
      • retryPolicy∶失败重试策略
        • ExponentialBackoffRetry∶构造器含有三个参数 ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
          • baseSleepTimeMs∶初始的sleep时间,用于计算之后的每次重试的sleep时间,
            • 计算公式∶当前sleep时间=baseSleepTimeMs*Math.max(1, random.nextInt(1<<retryCount+1).)
            • maxRetries∶最大重试次数
            • maxSleepMs∶最大sleep时间,如果上述的当前sleep计算出来比这个大,那么sleep用这个时间,默认的最大时间是Integer.MAX_VALUE毫秒。
        • 其他,查看org.apache.curator.RetryPolicy接口的实现类
        • start()∶完成会话的创建
    package com.hust.grid.leesf.curator.examples;import org.apache.curator.RetryPolicy;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.retry.ExponentialBackoffRetry;public class Create_Session_Sample {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 5000, 3000, retryPolicy);client.start();System.out.println("Zookeeper session1 established. ");CuratorFramework client1 = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") //server地址 .sessionTimeoutMs(5000) // 会话超时时间 .connectionTimeoutMs(3000) // 连接超时时间 .retryPolicy(retryPolicy) // 重试策略 .namespace("base") // ᇿ⽴命名空间/base .build(); //client1.start();System.out.println("Zookeeper session2 established. "); } }

    运行结果∶Zookeeper session1 established.Zookeeper session2 established

    需要注意的是session2会话含有隔离命名空间,即客户端对Zookeeper上数据节点的任何操作都是相对/base目录进行的,这有利于实现不同的Zookeeper的业务之间的隔离

  3. 创建节点
    
curator提供了一系列Fluent风格的接口,通过使用Fluent编程风格的接口,开发人员可以进行自由组合来完成各种类型节点的创建。

    
下面简单介绍一下常用的几个节点创建场景。


    
 1.创建一个初始内容为空的节点

    client.create().forPath(path);

    Curator默认创建的是持久节点,内容为空。

    2.创建一个包含内容的节点

    client.create().forPath(path,"我是内容".getBytes());

    Curator和ZkClient不同的是依旧采用Zookeeper原生API的风格,内容使用byte【】作为方法参数。

    3.递归创建父节点,并选择节点类型

    Client.Create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPa th(path);

    creatingParentslfNeeded这个接口非常有用,在使用ZooKeeper 的过程中,开发人员经常会碰到NoNodeException 异常,其中一个可能的原因就是试图对一个不存在的父节点创建子节点。因此,开发人员不得不在每次创建节点之前,都判断一下该父节点是否存在一一这个处理通常比较麻烦。在使用Curator 之后,通过调用creatingParentslfNeeded接口,Curator 就能够自动地递归创建所有需要的父节点。

    下面通过一个实际例子来演示如何在代码中使用这些API。

    package com.hust.grid.leesf.curator.examples;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.zookeeper.CreateMode;public static void main(String[] args) throws Exception {CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") //server地址 .sessionTimeoutMs(5000) // 会话超时时间 .connectionTimeoutMs(3000) // 连接超时时间 .retryPolicy(new ExponentialBackoffRetry(1000,5)) // 重试策略 .build();client.start();System.out.println("Zookeeper session established. ");//添加节点String path = "/learn-curator/c1";client.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT).forPath(path,"init".getBytes());Thread.sleep(1000);System.out.println("success create znode"+path); }

    运行结果∶Zookeeper session established.success create znode/learn-curator/c1
    其中,也创建了learn-curator/c1的父节点learn-curator节点。

  4. 删除节点
    删除节点的方法也是基于Fluent方式来进行操作,不同类型的操作调用 新增不同的方法调用即可。

    1. 删除一个子节点

      client.delete().forPath(path);
    2. 删除节点并递归删除其子节点

      client.delete().deletingChildrenIfNeeded().forPath(path);
    3. 指定版本进行删除

      client.delete().withVersion(1).forPath(path);

      如果此版本已经不存在,则删除异常,异常信息如下。

      org.apache.zookeeper.KeeperException$BadVersionException:KeeperErrorCode =BadVersion for
    4. 强制保证删除一个节点

      client.delete().guaranteed().forPath(path);

      只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到节点删除成功。比如遇到一些网络异常的情况,此guaranteed的强制删除就会很有效果。
      演示实例∶

      package com.hust.grid.leesf.curator.examples;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.data.Stat;public static void main(String[] args) throws Exception {CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") //server地址 .sessionTimeoutMs(5000) // 会话超时时间 .connectionTimeoutMs(3000) // 连接超时时间 .retryPolicy(new ExponentialBackoffRetry(1000,5)) // 重试策略 .build(); client.start();System.out.println("Zookeeper session established. ");//删除节点String path = "/learn-curator";client.delete().deletingChildrenIfNeeded().withVersion(-1).forPath(path);System.out.println("success create znode"+path);  }

      运行结果∶Zookeeper session established.success create znode/learn-curator
      结果表明成功删除/learn-curator节点

  5. 获取数据

    获取节点数据内容API相当简单,同时Curator提供了传入一个Stat变量的方式来存储服务器端返回的最新的节点状态信息

    // 普通查询client.getData().forPath(path);// 包含状态查询Stat stat = new Stat();client.getData().storingStatIn(stat).forPath(path);

    演示:

    package com.hust.grid.leesf.curator.examples;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.data.Stat;public class Get_Node_Sample {public static void main(String[] args) throws Exception {CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") //server地址 .sessionTimeoutMs(5000) // 会话超时时间 .connectionTimeoutMs(3000) // 连接超时时间 .retryPolicy(new ExponentialBackoffRetry(1000,5)) //重试策略 .build(); client.start();System.out.println("Zookeeper session established. ");//添加节点String path = "/learn-curator/c1";client.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT).forPath(path,"init".getBytes());System.out.println("success create znode"+path);//获取节点数据Stat stat = new Stat();byte[] bytes = client.getData().storingStatIn(stat).forPath(path);System.out.println(new String(bytes)); } }

    运行结果:Zookeeper session established. success create znode/learn-curator/c1 init
    结果表明成功获取了节点的数据

  6. 更新数据
    

更新数据,如果未传入version参数,那么更新当前最新版本,如果传入version则更新指定version,如果version已经变更,则抛出异常。

    // 普通更新
 client.setData().forPath(path,"新内容".getBytes());// 指定版本更新
 client.setData().withVersion(1).forPath(path);

    版本不一致异常信息∶

    org.apache.zookeeper.KeeperExceptionSBadversionException:KeeperErrorCode =BadVersion for

    案例演示∶

    package com.hust.grid.leesf.curator.examples;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.data.Stat;public class Set_Node_Sample {public static void main(String[] args) throws Exception {CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("127.0.0.1:2181") //server地址 .sessionTimeoutMs(5000) // 会话超时时间 .connectionTimeoutMs(3000) // 连接超时时间 .retryPolicy(new ExponentialBackoffRetry(1000,5)) //重试策略 .build(); //client.start();System.out.println("Zookeeper session established. ");String path = "/learn-curator/c1";//获取节点数据Stat stat = new Stat();byte[] bytes = client.getData().storingStatIn(stat).forPath(path);System.out.println(new String(bytes));//更新节点数据int version = client.setData().withVersion(stat.getVersion()).forPath(path).getVersion();System.out.println("Success set node for : " + path + ", new version: "+version);client.setData().withVersion(stat.getVersion()).forPath(path).getVersion(); } }

    运行结果:

    Zookeeper session established.initSuccess set node for : /learn-curator/c1, new version: 1Exception in thread "main"org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =BadVersion for /learn-curator/c1

    结果表明当携带的数据版本不一致时,无法完成更新操作。

Zookeeper应用场景

ZooKeeper是一个典型的发布/订阅模式的分布式数据管理与协调框架,我们可以使用它来进行分布式数据的发布与订阅。另一方面,通过对ZooKeeper中丰富的数据节点类型进行交叉使用,配合Watcher 事件通知机制,可以非常方便地构建一系列分布式应用中都会涉及的核心功能,如数据发布/订阅、命名服务、集群管理、Master选举、分布式锁和分布式队列等。那接下来就针对这些典型的分布式应用场景来做下介绍

数据发布/订阅

数据发布/订阅(Publish/Subscribe)系统,即所谓的配置中心,顾名思义就是发布者将数据发布到ZooKeeper的一个或一系列节点上,供订阅者进行数据订阅,进而达到动态获取数据的目的。实现配置信息的集中式管理和数据的动态更新。

发布/订阅系统一般有两种设计模式,分别是推(Push)模式和拉(PulI)模式。在推模式中,服务端主动将数据更新发送给所有订阅的客户端;而拉模式则是由客户端主动发起请求来获取最新数据,通常客户端都采用定时进行轮询拉取的方式。

ZooKeeper 采用的是推拉相结合的方式∶ 客户端向服务端注册自己需要关注的节点,一旦该节点的数据发生变更,那么服务端就会向相应的客户端发送Watcher事件通知,客户端接收到这个消息通知之后,需要主动到服务端获取最新的数据。

如果将配置信息存放到ZooKeeper上进行集中管理,那么通常情况下,应用在启动的时候都会主动到Zo0Keeper服务端上进行一次配置信息的获取,同时,在指定节点上注册一个Watcher监听,这样一来,但凡配置信息发生变更,服务端都会实时通知到所有订阅的客户端,从而达到实时获取最新配置信息的目的。

下面我们通过一个"配置管理"的实际案例来展示ZooKeeper在"数据发布/订阅"场景下的使用方式。

在我们平常的应用系统开发中,经常会碰到这样的需求∶系统中需要使用一些通用的配置信息,例如机器列表信息、运行时的开关配置、数据库配置信息等。这些全局配置信息通常具备以下3个特性。

1. 数据量通常比较小。


2. 数据内容在运行时会发生动态变化。

3. 集群中各机器共享,配置一致。

对干这类配置信息,一般的做法通常可以选择将其存储在本地配置文件或是内存变量中。无论采用哪种方式,其实都可以简单地实现配置管理,在集群机器规模不大、配置变更不是特别频繁的情况下,无论刚刚提到的哪种方式,都能够非常方便地解决配置管理的问题。但是.一日机器规模变大,日配置信息变更越来越频繁后,我们发现依靠现有的这两种方式解决配置管理就变得越来越困难了。我们既希望能够快速地做到全局配置信息的变更,同时希望变更成本足够小.因此我们必须寻求一种更为分布式化的解决方案

接下来我们就以一个"数据库切换"的应用场景展开,看看如何使用ZooKeeper来实现配置管理∶

  • 配置存储
    在进行配置管理之前,首先我们需要将初始化配置信息存储到Zookeeper上去,一般情况下,我们可以在Zookeeper上选取一个数据节点用于配置信息的存储,例如∶/app1/database_config
    /corfgseverapp1database config

    配置管理的z0okeeper节点示意图
    我们将需要管理的配置信息写入到该数据节点中去,例如∶

    #数据库配置信息#DBCPdbcp.driverClassName=com.mysql.jdbc.Driverdbcp.dbJDBCUrl=jdbc:mysgl://127.0.0.1:3306/learn-test dbcp.username=zm dbcp.pasSword=1234 dbcp.maxActive=30 dbcp.maxIdle=10
  • 配置获取
    集群中每台机器在启动初始化阶段,首先会从上面提到的ZooKeeper配置节点上读取数据库信息.同时,客户端还需要在该配置节点上注册一个数据变更的Watcher监听,一日发生节点数据变更.所有订阅的客户端都能够获取到数据变更通知。

  • 配置变更
    在系统运行过程中,可能会出现需要进行数据库切换的情况,这个时候就需要进行配置变更。借助ZooKeeper,我们只需要对ZooKeeper上配置节点的内容进行更新,ZooKeeper就能够帮我们将数据变更的通知发送到各个客户端,每个客户端在接收到这个变更通知后。就可以重新进行最新数据的获取。

命名服务

命名服务(Name Service)也是分布式系统中比较常见的一类场景,是分布式系统最基本的公共服务之一。在分布式系统中,被命名的实体通常可以是集群中的机器、提供的服务地址或远程对象等一—这些我们都可以统称它们为名字(Name),其中较为常见的就是一些分布式服务框架(如RPC、RMI)中的服务地址列表,通过使用命名服务,客户端应用能够根据指定名字来获取资源的实体、服务地址和提供者的信息等。

ZooKeeper 提供的命名服务功能能够帮助应用系统通过一个资源引用的方式来实现对资源的定位与使用。另外,广义上命名服务的资源定位都不是真正意义的实体资源一一在分布式环境中.上层应用仅仅需要一个全局唯一的名字,类似于数据库中的唯一主键。

所以接下来。我们来看看如何使用ZooKeeper来实现一套分布式全局唯一ID的分配机制

所谓ID,就是一个能够唯一标识某个对象的标识符。在我们熟悉的关系型数据库中,各个表都需要一个主键来唯一标识每条数据库记录,这个主键就是这样的唯一ID。在过去的单库单表型系统中,通常可以使用数据库字段自带的auto increment属性来自动为每条数据库记录生成一个唯一的ID,数据库会保证生成的这个ID在全局唯一。但是随着数据库数据规模的不断增大,分库分表随之出现,而auto_increment属性仅能针对单一表中的记录自动生成ID.因此在这种情况下,就无法再依靠数据库的auto_increment属性来唯一标识一条记录了。于是,我们必须寻求一种能够在分布式环境下生成全局唯ID的方法。

一说起全局唯一 ID,相信大家都会联想到UUID。没错,UUID 是通用唯一识别码(Universally Unique Identifier)的简称,是一种在分布式系统中广泛使用的用于唯一标识元素的标准 确实,UUID 是一个非常不错的全局唯一ID生成方式,能够非常简便地保证分布式环境中的唯一性。一个标准的UUID是一个包含32 位字符和4个短线的字符串,例如"e70f1357-f260-46ff-a32d-53a086c57ade"。UUID的优势自然不必多说,我们重点来看看它的缺陷。

  • 长度过长
    UUID 最大的问题就在于生成的字符串过长。显然,和数据库中的 INT类型相比,存储一个UUID需要花费更多的空间。
  • 含义不明
    上面我们已经看到一个典型的 UUID 是类似于"e70f1357-f260-46ff-a32d-53a086c57ade"的一个字符串。根据这个字符串,开发人员从字面上基本看不出任何其表达的含义,这将会大大影响问题排查和开发调试的效率。

所以接下来,我们结合一个分布式任务调度系统来看看如何使用ZooKeepe来实现这类全局唯一ID的生成。 之前我们已经提到,通过调用ZooKeeper节点创建的API接口可以创建一个顺序节点,并且在API返回值中会返回这个节点的完整名字。利用这个特性,我们就可以借助ZooKeeper来生成全局唯一的ID 了,如下图
在这里插入图片描述

全局唯一ID生成的ZooKeeper节点示意图
说明

对于一个任务列表的主键,使用ZooKeeper生成唯一ID的基本步骤∶

  • 所有客户端都会根据自己的任务类型,在指定类型的任务下面通过调用create()接口来创建一个
顺序节点,例如创建"iob-"节点。
  • 节点创建完毕后,create()接口会返回一个完整的节点名,例如"job-000000003"。
  • 客户端拿到这个返回值后,拼接上 type类型,例如"type2-job-0000000003",这就可以作为一个
全局唯一的ID了。

在ZooKeeper中,每一个数据节点都能够维护一份子节点的顺序顺列,当客户端对其创建一个顺序子节点的时候 ZooKeeper 会自动以后缀的形式在其子节点上添加一个序号,在这个场景中就是利用了ZooKeeper的这个特性
。

集群管理

随着分布式系统规模的日益扩大,集群中的机器规模也随之变大,那如何更好地进行集群管理也显得越来越重要了。所谓集群管理,包括集群监控与集群控制两大块,前者侧重对集群运行时状态的收集,后者则是对集群进行操作与控制。
在日常开发和运维过程中,我们经常会有类似于如下的需求∶

● 如何快速的统计出当前生产环境下一共有多少台机器
● 如何快速的获取到机器上下线的情况
● 如何实时监控集群中每台主机的运行时状态

在传统的基于Agent的分布式集群管理体系中,都是通过在集群中的每台机器上部署一个 Agent,由这个Agent 负责主动向指定的一个监控中心系统(监控中心系统负责将所有数据进行集中处理,形成一系列报表,并负责实时报警,以下简称"监控中心")汇报自己所在机器的状态。在集群规模适中的场景卜, 这确实是一种在牛产实践中厂泛使用的解决方案.能够快速有效地实现分布式环境集群临控 但是一旦系统的业务场景增多,集群规模变大之后,该解决方案的弊端也就显现出来了。

大规模升级困难
以客户端形式存在的 Agent,在大规模使用后,一旦遇上需要大规模升级的情况,就非常麻烦,在升级成本和升级进度的控制上面临巨大的挑战。

统一的Agent无法满足多样的需求
对于机器的CPU使用率、负载(Load)、内存使用率、网络吞吐以及磁盘容量等机器基本的物理状态,使用统一的Agent来进行监控或许都可以满足。但是,如果需要深入应用内部,对一些业务状态进行监控,例如,在一个分布式消息中间件中,希望监控到每个消费者对消息的消费状态;或者在一个分布式任务调度系统中,需要对每个机器上任务的执行情况进行监控。很显然,对于这些业务耦合紧密的监控需求,不适合由一个统一的Agent来提供。

编程语言多样性
随着越来越多编程语言的出现,各种异构系统层出不穷。如果使用传统的Agent方式,那么需要提供各种语言的 Agent 客户端。另一方面,"监控中心"在对异构系统的数据进行整合上面临巨大挑战。

Zookeeper的两大特性∶

  1. 客户端如果对Zookeeper的数据节点注册Watcher监听,那么当该数据节点的内容或是其子节点列表发生变更时,Zookeeper服务器就会向订阅的客户端发送变更通知。
  2. 对在Zookeeper上创建的临时节点,一旦客户端与服务器之间的会话失效,那么临时节点也会被自动删除

利用其两大特性,可以实现集群机器存活监控系统,若监控系统在/clusterServers节点上注册一个Watcher监听,那么但凡进行动态添加机器的操作,就会在/clusterServers节点下创建一个临时节点∶/clusterServers/【Hostname】,这样,监控系统就能够实时监测机器的变动情况。

下面通过分布式日志收集系统这个典型应用来学习Zookeeper如何实现集群管理。

分布式日志收集系统

分布式日志收集系统的核心工作就是收集分布在不同机器上的系统日志。在这里我们重点来看分布式日志系统(以下简称"日志系统")的收集器模块。

在一个典型的日志系统的架构设计中,整个日志系统会把所有需要收集的日志机器(我们以"日志源机器"代表此类机器)分为多个组别. 每个组别对应一个收集器. 这个收集器其实就是一个后台机器 (我们以"收集器机器"代表此类机器),用于收集日志。

对于大规模的分布式日志收集系统场景,通常需要解决两个问题∶

·变化的日志源机器在生产环境中,伴随着机器的变动,每个应用的机器几平每天都是在变化的(机器硬件问题、扩容、机房迁移或是网络问题等都会导致一个应用的机器变化),也就是说每个组别中的日志源机器通常是在不断变化的· 变化的收集器机器日志收集系统自身也会有机器的变更或扩容,于是会出现新的收集器机器加入或是老的收集器机器退出的情况。

无论是日志源机器还是收集器机器的变更.最终都可以归结为如何快速,合理、动态地为每个收集器分配对应的日志源机器。这也成为了整个日志系统正确稳定运转的前提,也是日志收集过程中最大的技术挑战之一,在这种情况下,我们就可以引入zookeeper了,下面我们就来看ZooKeeper在这个场号中的使用。

使用Zookeeper的场景步骤如下

  • 注册收集器机器
    使用ZooKeeper来进行日志系统收集器的注册,典型做法是在ZooKeeper上创建一个节点作为收集器的根节点,例如/logs/collector(下文我们以"收集器节点"代表该数据节点),每个收集器机器在启动的时候,都会在收集器节点下创建自己的节点,例如/logs/collector/[Hostname]
    在这里插入图片描述

  • 任务分发
    待所有收集器机器都创建好自己对应的节点后,系统根据收集器节点下子节点的个数,将所有日志源机器分成对应的若干组,然后将分组后的机器列表分别写到这些收集器机器创建的子节点(例如/logS/collector/host1)上去。这样一来,每个收集器机器都能够从自己对应的收集器节点上获取日志源机器列表,进而开始进行日志收集工作。

  • 状态汇报
    完成收集器机器的注册以及任务分发后,我们还要考虑到这些机器随时都有挂掉的可能。因此,针对这个问题,我们需要有一个收集器的状态汇报机制∶每个收集器机器在创建完自己的专属节点后,还需要在对应的子节点上创建一个状态子节点,例如/logs/collector/host1/status.每个收集器机器都需要定期向该节点写入自己的状态信息。我们可以把这种策略看作是一种心跳检测机制,通常收集器机器都会在这个节点中写入日志收集进度信息。日志系统根据该状态子节点的最后更新时间来判断对应的收集器机器是否存活。

  • 动态分配
    如果收集器机器挂掉或是扩容了,就需要动态地进行收集任务的分配。在运行过程中,日志系统始终关注着/logs/collector这个节点下所有子节点的变更,一旦检测到有收集器机器停止汇报或是有新的收集器机器加入,就要开始进行任务的重新分配。无论是针对收集器机器停止汇报还是新机器加入的情况,日志系统都需要将之前分配给该收集器的所有仟务进行转移。为了解决这个问题.通常有两种做法∶

    • 全局动态分配
      这是一种简单粗暴的做法,在出现收集器机器挂掉或是新机器加入的时候,日志系统需要根据新的收集器机器列表,立即对所有的日志源机器重新进行一次分组,然后将其分配给剩下的收集器机器。

    • 局部动态分配
      全局动态分配方式虽然策略简单,但是存存一个问题∶ 一个或部分收集器机器的变更,就会导致全局动态任务的分配,影响面比较大,因此风险也就比较大。所谓局部动态分配,顾名思义就是在小范围内进行任务的动态分配。在这种策略中,每个收集器机器在汇报自己日志收集状态的同时,也会把自己的负载汇报上去。请注意,这里提到的负载并不仅仅只是简单地指机器CPU负载(Load),而是一个对当前收集器任务执行的综合评估,这个评估算法和ZooKeeper本身并没有太大的关系,这里不再赘述。

      在这种策略中,如果一个收集器机器挂了,那么日志系统就会把之前分配给这个机器的任务重新分配到那些负载较低的机器上去。同样,如果有新的收集器机器加入,会从那些负 载高的机器上转移部分任务给这个新加入的机器。

      上述步骤已经完整的说明了整个日志收集系统的工作流程,其中有两点注意事项∶

      • 节点类型在/logs/collector节点下创建临时节点可以很好的判断机器是否存活,但是,若机器挂了,其节点会被删除,记录在节点上的日志源机器列表也被清除,所以需要选择持久 节点来标识每一台机器。同时在节点下分别创建 /logs/collector/Hostnamel/status节点来表征每一个收集器机器的状态,这样,既能实现对所有机器的监控,同时机器挂掉后,依然能够将分配任务还原。
      • 日志系统节点监听
        若采用Watcher机制,那么通知的消息量的网络开销非常大,需要采用日志系统主动轮询收集器节点的策略,这样可以节省网络流量,但是存在一定的延时。

Master选举

Master选举是一个在分布式系统中非常常见的应用场景。分布式最核心的特性就是能够将具有独立计算能力的系统单元部署在不同的机器上,构成一个完整的分布式系统。而与此同时,实际场景中往往也需要在这些分布在不同机器上的独立系统单元中选出一个所谓的"老大",在计算机中、我们称之为Maste。

在分布式系统中,Master往往用来协调集群中其他系统单元,具有对分布式系统状态变更的决定权。例如,在一些读写分离的应用场景中,客户端的写请求往往是由 Master来处理的;而在另一些场景中,Master则常常负责处理一些复杂的逻辑,并将处理结果同步给集群中其他系统单元。Master选举可以说是ZooKeeper最典型的应用场景了,接下来,我们就结合"一种海量数据处理与共享模型"这个具体例子来看看 ZooKeeper在集群Master选举中的应用场景。

在分布式环境中,经常会碰到这样的应用场景∶ 集群中的所有系统单元需要对前端业务提供数据,比如一个商品 ID,或者是一个网站轮播广告的广告 ID(通常出现在一些广告投放系统中)等,而这些商品ID或是广告ID往往需要从一系列的海量数据处理中计算得到——这通常是一个非常耗费I/O 和CPU资源的过程。鉴于该计算过程的复杂性,如果让集群中的所有机器都执行这个计算逻辑的话,那么将耗费非常多的资源。一种比较好的方法就是只让集群中的部分,甚至只让其中的一台机器去处理数据计算,一旦计算出数据结果,就可以共享给整个集群中的其他所有客户端机器,这样可以大大减少重复劳动.提升性能。这里我们以一个简单的广告投放系统后台场景为例来讲解这个模型。
在这里插入图片描述

整个系统大体上可以分成客户端集群、分布式缓存系统、海量数据处理总线和 ZooKeeper四个部分

首先我们来看整个系统的运行机制。图中的Client集群每天定时会通过ZooKeeper来实现Master选举。选举产生Master客户端之后,这个Master就会负责进行一系列的海量数据处理,最终计算得到一个数据结果,井将其放置在个内存仔/数据库中。同时,Master还需要通知集群中其他所有的客户端从这个内存/数据库中共享计算结果。

接下去,我们将重点来看 Master 选举的过程。首先来明确下 Master 选举的需求∶在集群的所有机器中选举出一台机器作为Master。针对这个需求,通常情况下,我们可以选择常见的关系型数据库中的主键特性来实现∶ 集群中的所有机器都向数据库中插入一条相同主键 ID 的记录,数据库会帮助我们自动进行主键冲突检查,也就是说.所有进行插入操作的客户端机器中,只有一台机器能够成功,那么,我们就认为向数据库中成功插入数据的客户端机器成为Master。

借助数据库的这种方案确实可行,依靠关系型数据库的主键特性能够很好地保证在集群中选举出唯一的一个Master。但是我们需要考虑的另一个问题是,如果当前选举出的Master挂了,那么该如何处理?谁来告诉我Master挂了呢?显然,关系型数据库没法通知我们这个事件。那么,如果使用ZooKeeper是否可以做到这一点呢?那在之前,我们介绍了ZooKeeper创建节点的API接口,其中一个重要特性便是∶利用ZooKeeper的强一致性,能够很好保证在分布式高并发情况下节点的创建一定能够保证全局唯一性,即ZooKeeper将会保证客户端无法重复创建一个已经存在的数据节点。也就是说,如果同时有多个客户端请求创建同一个节点,那么最终一定只有一个客户端请求能够创建成功。利用这个特性,就能很容易地在分布式环境中进行Master选举了。
/master_election202011-11binding

在这个系统中,首先会在 ZooKeeper 上创建一个日期节点,例如"2020-11-11

客户端集群每天都会定时往ZooKeeper 上创建一个临时节点,例如/master_election/2020-11-11/binding。在这个过程中,只有一个客户端能够成功创建这个节点,那么这个客户端所在的机器就成为了Master。同时,其他没有在ZooKeeper上成功创建节点的客户端,都会在节点 /master_election/2020-11-11上注册一个子节点变更的 Watcher,用于监控当前的 Master 机器是否存活,一旦发现当前的 Master 挂了,那么其余的客户端将会重新进行Master选举。

从上面的讲解中,我们可以看到,如果仅仅只是想实现Master选举的话,那么其实只需要有一个能够保证数据唯一性的组件即可,例如关系型数据库的主键模型就是非常不错的选择。但是,如果希望能够快速地进行集群 Master 动态选举,那么就可以基于 ZooKeeper来实现。

分布式锁

分布式锁是控制分布式系统之间同步访问共享资源的一种方式。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要通过一些互斥手段来防止彼此之间的干扰,以保证一致性,在这种情况下,就需要使用分布式锁了。

在平时的实际项目开发中,我们往往很少会去在意分布式锁,而是依赖于关系型数据库固有的排他性来实现不同进程之间的互斥。这确实是一种非常简便且被广泛使用的分布式锁实现方式。然而有一个不争的事实是,目前绝大多数大型分布式系统的性能瓶颈都集中在数据库操作上。因此,如果上层业务再给数据库添加一些额外的锁,例如行锁、表锁甚至是繁重的事务处理,那么就会让数据库更加不堪重负

下面我们来看看使用ZooKeeper如何实现分布式锁,这里主要讲解排他锁和共享锁两类分布式锁。

排他锁

排他锁(Exclusive Locks,简称X锁),又称为写锁或独占锁,是一种基本的锁类型。如果事务T1对数据对象 O1加上了排他锁,那么在整个加锁期间,只允许事务 T1对 O1进行读取和更新操作,其他任何事务都不能再对这个数据对象进行任何类型的操作——直到T1释放了排他锁。

从上面讲解的排他锁的基本概念中,我们可以看到,排他锁的核心是如何保证当前有且仅有一个事务获得锁,并且锁被释放后,所有正在等待获取锁的事务都能够被通知到。

下面我们就来看看如何借助ZooKeeper实现排他锁∶

  • 定义锁
    在通常的lava开发编程中,有两种常见的方式可以用来定义锁,分别是synchronized机制和IDK5提供的ReentrantLock。然而,在ZooKeeper中,没有类似于这样的API可以直接使用,而是通过 ZooKeeper 上的数据节点来表示一个锁,例如/exclusive_lock/lock节点就可以被定义为一个锁,如图∶
    最全最详细的Zookeeper学习总结

  • 获取锁
    在需要获取排他锁时,所有的客户端都会试图通过调用 create()接口,在 /exclusive_lock节点下创建临时子节点/exclusive lock/lock。在前面,我们也介绍了,ZooKeeper 会保证在所有的客户端中。最终只有一个客户端能够创建成功,那么就可以认为该客户端获取了锁。同时,所有没有获取到锁的客户端就需要到/exclusive lock节点上注册一个子节点变更的Watcher监听,以便实时监听到ock节点的变更情况

  • 释放锁
    在"定义锁"部分,我们已经提到,/exclusive_lock/lock是一个临时节点,因此在以下两种情况下,都有可能释放锁。·当前获取锁的客户端机器发生宕机,那么ZooKeeper上的这个临时节点就会被移除。·正常执行完业务逻辑后,客户端就会主动将自己创建的临时节点删除。无论在什么情况下移除了lock节点,ZooKeeper都会通知所有在/exclusive_lock节点上注册了子节点变更Watcher监听的客户端。这些客户端在接收到通知后,再次重新发起分布式锁获取,即重复"获取锁"过程。整个排他锁的获取和释放流程,如下图
    <img src="https://img-blog.csdnimg.cn/0e6ae9c3a092489591e4b68c424163dc.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAQ2FwdGFpbiBMZW8=,size_20,color_FFFFFF,t_70,g_se,x_16" alt="缺取锁是否已经被其他事务等待锁Y获取z-仓建lock 临时节点

共享锁

共享锁(Shared Locks,简称S锁),又称为读锁,同样是一种基本的锁类型。

如果事务T1对数据对象O1加上了共享锁,那么当前事务只能对O1进行读取操作,其他事务也只能对这个数据对象加共享锁一-直到该数据对象上的所有共享锁都被释放。

共享锁和排他锁最根本的区别在于,加上排他锁后,数据对象只对一个事务可见。而加上共享锁后,数据对所有事务都可见。
下面我们就来看看如何借助ZooKeeper来实现共享锁。

  • 定义锁
    和排他锁一样,同样是通过 ZooKeeper 上的数据节点来表示一个锁,是一个类似于
    "/shared_lock/【Hostname】-请求类型-序号"的临时顺序节点,例如/shared_lock/host1-R-0000000001,那么,这个节点就代表了一个共享锁,如图所示∶
    /sharedlocKhost1-R-000001host2-W-0000002host3-R-00000003

  • 获取锁
    在需要获取共享锁时,所有客户端都会到/shared_lock 这个节点下面创建一个临时顺序节点,如果当前是读请求,那么就创建例如/shared_lock/host1-R-0000000001的节点;如果是写请求,那么就创建例如/shared_lock/host2-W-0000000002的节点。

    判断读写顺序
    通过Zookeeper来确定分布式读写顺序,大致分为四步

    • 创建完节点后,获取/shared lock节点下所有子节点,并对该节点变更注册监听。
    • 确定自己的节点序号在所有子节点中的顺序。
    • 对于读请求∶ 若没有比自己序号小的子节点或所有比自己序号小的子节点都是读请求,那么表明自己已经成功获取到共享锁,同时开始执行读取逻辑,若有写请求,则需要等待。对于写请求∶若自己不是序号最小的子节点,那么需要等待。
    • 接收到watcher通知后,重复步骤1
  • 释放锁,其释放锁的流程与独占锁一致。

羊群效应

上面讲解的这个共享锁实现,大体上能够满足一般的分布式集群竞争锁的需求,并且性能都还可以一一这里说的一般场景是指集群规模不是特别大,一般是在10台机器以内。但是如果机器规模扩大之后,会有什么问题呢?我们着重来看上面"判断读写顺序"过程的步骤3,结合下面的图,看看实际运行中的情况
/shared_lockhost1-R-0000001host2-W-0000002host3-R-0000003host4-W-0000004host5-W-0000005

针对如上图所示的情况进行分析

  • host1首先进行读操作,完成后将节点/shared_lock/host1-R-00000001删除。
  • 余下4台机器均收到这个节点移除的通知,然后重新从/shared lock节点上获取一份新的子节点列表。
  • 每台机器判断自己的读写顺序,其中host2检测到自己序号最小,于是进行写操作,余
    下的机器则继续等待。
  • 继续
    可以看到,host1客户端在移除自己的共享锁后.Zookeeper发送了子节点更变Watcher通知给所有机器,然而除了给host2产生影响外,对其他机器没有任何作用。大量的Watcher通知和子节点列表获取两个操作会重复运行,这样不仅会对zookeeper服务器造成巨大的性能影响影响和网络开销,更为严重的是,如果同一时间有多个节点对应的客户端完成事务或是事务中断引起节点消失,ZooKeeper服务器就会在短时间内向其余客户端发送大量的事件通知,这就是所谓的羊群效应

上面这个ZooKeeper分布式共享锁实现中出现羊群效应的根源在于,没有找准客户端真正的关注点。我们再来回顾一下上面的分布式锁竞争过程.它的核心逻辑在干∶ 判断自己是否是所有子节点中序号最小的。于是,很容易可以联想到.每个节点对应的客户端只需要关注比自己序号小的那个相关节点的变更情况就可以了一一而不需要关注全局的子列表变更情况。

可以有如下改动来避免羊群效应。

改进后的分布式锁实现∶
首先,我们需要肯定的一点是,上面提到的共享锁实现,从整体思路上来说完全正确。这里主要的改动在于∶每个锁竞争者,只需要关注/shared lock节点下序号比自己小的那个节点是否存在即可,具体实现如下。

  • 客户端调用create接口常见类似于/shared_lock/【Hostname】-请求类型-序号的临时顺序节点。
  • 客户端调用getChildren接口获取所有已经创建的子节点列表(不注册任何Watcher)。
  • 如果无法获取共享锁,就调用exist接口来对比自己小的节点注册Watcher。对于读请求∶向比自己序号小的最后一个写请求节点注册Watcher临听。对干写请求∶向比自己序号小的最后一个节点注册Watcher监听。
  • 等待Watcher通知,继续进入步骤2。

此方案改动主要在于∶每个锁竞争者,只需要关注/shared_lock节点下序号比自己小的那个节点是否存在即可。
在这里插入图片描述

注意 相信很多同学都会觉得改进后的分布式锁实现相对来说比较麻烦。确实如此,如同在多线程并发编程实践中,我们会去尽量缩小锁的范围——对于分布式锁实现的改进其实也是同样的思路。那么对于开发人员来说,是否必须按照改进后的思路来设计实现自己的分布式锁呢?答案是否定的。在具体的实际开发过程中,我们提倡根据具体的业务场景和集群规模来选择适合自己的分布式锁实现∶在集群规模不大、网络资源丰富的情况下,第一种分布式锁实现方式是简单实用的选择;而如果集群规模达到一定程度,并且希望能够精细化地控制分布式锁机制,那么就可以试试改进版的分布式锁实现。

分布式队列

分布式队列可以简单分为两大类∶一种是常规的FIFO先入先出队列模型,还有一种是 等待队列元素聚集后统一安排处理执行的Barrier模型

  • FIFO先入先出
    FIFO(First Input First Output,先入先出), FIFO 队列是一种非常典型且应用广泛的按序执行的队列模型∶先进入队列的请求操作先完成后,才会开始处理后面的请求。
    使用ZooKeeper实现FIFO队列,和之前提到的共享锁的实现非常类似。FIFO队列就类似于一个全写的共享锁模型,大体的设计思路其实非常简单∶所有客户端都会到/queue_fifo 这个节点下面创建一个临时顺序节点,例如如/queue_fifo/host1-00000001。
    /queue_ffhost1-0000001host2-00000002host5-0000005
    创建完节点后,根据如下4个步骤来确定执行顺序。

    • 通过调用getChildren接口来获取/queue fifo节点的所有子节点,即获取队列中所有的元素。
    • 确定自己的节点序号在所有子节点中的顺序。
    • 如果自己的序号不是最小,那么需要等待,同时向比自己序号小的最后一个节点注册Watcher监听。
    • 接收到Watcher通知后,重复步骤1。
  • Barrier∶分布式屏障
    Barrier原意是指障碍物、屏障,而在分布式系统中,特指系统之间的一个协调条件,规定了一个队列的元素必须都集聚后才能统一进行安排.否则一直等待。这往往出现在那些大规模分布式并行计算的应用场景上∶最终的合并计算需要基干很多并行计算的子结果来进行。这些队列其实是在 FIEO队列的基础上进行了增强,大致的设计思想如下∶开始时,/queue_barrier 节点是一个已经存在的默认节点,并且将其节点的数据内容赋值为一个数字n来代表Barrier值,例如n=10表示只有当/queue barrier节点下的子节点个数达到10后,才会打开Barrier。之后,所有的客户端都会到/queue_barrie节点下创建一个临时节点,例如/queue_barrier/host1,如图所示。
    在这里插入图片描述

创建完节点后,按照如下步骤执行。

  • 通过调用getData接口获取/queue_barrier节点的数据内容∶10。
  • 通过调用getChildren接口获取/queue_barrier节点下的所有子节点,同时注册对子节点变更的Watcher监听。
  • 统计子节点的个数。
  • 如果子节点个数还不足10个,那么需要等待。
  • 接受到Wacher通知后,重复步骤2
    在这里插入图片描述

内容发现有点多,先更新这一部分吧,还有ZK的深入进阶和源码分析,咱们下篇接着聊!