Zookeeper概述


1.初识Zookeeper

1.1Zookeeper概念

  • Zookeeper是Apache Hadoop(大数据相关框架)项目下的一个子项目,是一个树形目录服务

  • Zookeeper翻译过来就是动物园管理员,它是用来管理Hadoop(大象)、Hive(蜜蜂)、Pig(小猪)的管理员。简称zk

  • Zookeeper是一个分布式的、开源的分布式应用程序打的协调服务。

  • Zookeeper 提供的主要功能包括:

    • 配置管理

    image-20210901105631484

    • 分布式锁

    image-20210901183325658

    • 集群管理

image-20210901183457882

2.Zookeeper命令操作

2.1数据模型

  • Zookeeper是一个树形目录服务,其数据模型和Unix的文件系统目录树很类似,拥有一个层次化结构。
  • 这里的每一个节点都被称为ZNode,每个节点上都会保存自己的数据和节点信息。
  • 节点可以拥有子节点,同时也也允许少量(1MB)数据存储在该节点之下。
  • 节点可以分为四大类:
    1. PERSISTENT 持久化节点
    2. EPHEMERAL 临时节点:-e
    3. PERSISTENT_SEQUENTIAL 持久化顺序节点:-s
    4. EPHEMERAL_SEQUENTIAL 临时顺序节点: -es

image-20210901184831866

2.2服务端常用命令

image-20210903211709275

  • 启动 Zookeeper服务: ./zkServer.sh start
  • 查看 Zookeeper服务: ./zkServer.sh status
  • 停止 Zookeeper服务: ./zkServer.sh stop
  • 重启 Zookeeper服务: ./zkServer.sh restart

2.3客户端常用命令

  1. 启动客户端:

    1
    ./zkCli.sh [-server 192.168.66.66.2181] 
  2. 退出:

    1
    quit
  3. 查看节点目录:

    1
    ls /[xxx]
  4. 创建节点:

    1
    create /[xxx]/xxx2 hy    不能重复创建节点
  5. 获取节点数据 :

    1
    get /[xxxx]
  6. 设置数据 :

    1
    set /[xxx]  hy
  7. 删除节点

    1
    delete /[xxx]        节点有子节点不能直接delete需要使用deleteall
  8. 创建临时节点(客户端断开再次打开失效):

    1
    create -e /[xxx]/xxx2
  9. 创建顺序节点(同一个节点名编号不同,所有节点共用一个编号)

    1
    create -s /[xxx]/xxx2
  10. ls2 和 ls-s 都可以查看详细信息,ls2已经淘汰

注:userservice下存放的数据是服务ip地址

image-20210903230359803

3.javaAPI操作

3.1Curator 介绍

  • Curator是Apache Zookeeper的java客户端库。
  • 常见的Zookeeper java API:
    • 原生Java API
    • ZkClient
    • Curator
  • Curator项目的目标是简化Zookeeper客户端的使用
  • Curator最初是Netfix研发的,后来捐献了Apache基金会,目前是Apache的顶级项目。
  • 官网
  • 高版本的Curator兼容低版本的Zookeeper,低版本的Curator不能兼容高版本的Zookeeper

    3.2Curator API 常用操作

3.2.1 建立连接

  • 第一种方式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
        /**
    * 建立连接
    */
    @Test
    public void testConnect(){
    // 第一种方式
    /**
    * @param connectString 连接字符串。zk server 地址和端口 "192.168.66.66:2181,192.168.66.66:2181"
    * @param sessionTimeoutMs 会话超时事件单位毫秒
    * @param connectionTimeoutMS 连接超时事件单位毫秒
    * @param retryPolicy 重试策略:重试一次,重试多次,一直重试,间隔重试等
    */
    // 每隔三秒重试一次一共重试十次
    RetryPolicy retryPolicy=new ExponentialBackoffRetry(3000,10);
    CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.66.66:2181", 60000, 15000, retryPolicy);
    // 开启连接
    client.start();
    }
    }
  • 第二种方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
    /**
* 建立连接
*/
@Test
public void testConnect(){
// 第二种方式
/**
* @param connectString 连接字符串。zk server 地址和端口 "192.168.66.66:2181,192.168.66.66:2181"
* @param sessionTimeoutMs 会话超时事件单位毫秒
* @param connectionTimeoutMS 连接超时事件单位毫秒
* @param retryPolicy 重试策略:重试一次,重试多次,一直重试,间隔重试等
*/
// 每隔三秒重试一次一共重试十次
RetryPolicy retryPolicy=new ExponentialBackoffRetry(3000,10);
// 名称空间,自动加根节点
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.66.66:2181").sessionTimeoutMs(60000).connectionTimeoutMs(15000).retryPolicy(retryPolicy).namespace("hy").build();
client.start();
}
}

注:执行完记得调用close方法关闭client。

3.2.2 创建节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
    @Test
public void testCrete() throws Exception {
// 1.基本创建
// 如果创建节点,没有指定数据,则默认将当前客户端的ip地址作为数据储存
String path = client.create().forPath("/app1");
// 2.带有数据
String path2 = client.create().forPath("/app2","jiamianqishi".getBytes());
// 3.设置节点的类型(默认:持久化)
String path3 = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3","jiamianqishi2".getBytes());
// 4.创建多级节点 /app1/p1
String path4 = client.create().creatingParentsIfNeeded().forPath("/app4/app4_1","jiamianqishi2".getBytes());




}

3.2.3 删除节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
    /**delete\deleteall
* 1.删除单个节点
* 2.删除带有子节点的节点
* 3.必须成功的删除
* 4.回调
* @throws Exception
*/
@Test
public void testdelete() throws Exception {
//1.删除单个节点
client.delete().forPath("/app1");
// 2.删除带有子节点的节点
client.delete().deletingChildrenIfNeeded().forPath("/app4");
// 3.必须成功的删除为了解决发送过程中客户端和服务端连接抖动可能会导致删除失败
client.delete().guaranteed().forPath("/app2");
// 4.回调,删除操作后执行回调函数
client.delete().guaranteed().inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
System.out.println("我被删除了");
System.out.println(curatorEvent);
}
}).forPath("/app2");


}

3.2.4 修改节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
  /**
* 修改:
* 1.普通修改:
* 2.版本校验修改
* @throws Exception
*/
@Test
public void testset() throws Exception {
// 1普通修改
client.setData().forPath("/app1","ryu".getBytes());
// 2.版本校验修改,目的是其他线程或客户端不能干扰
// 2.1从状态信息中获取版本信息

Stat status=new Stat();

client.getData().storingStatIn(status).forPath("/app1");
int version=status.getVersion();
// 2.2修改时校验版本,如果版本没变就能修改
client.setData().withVersion(version).forPath("/app1","long".getBytes());


}

3.2.5 查询节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
   /**
* 查询节点:
* 1.查询数据:get
* 2.查询子节点:ls
* 3.查询详细信息: ls -s
* @throws Exception
*/
@Test
public void testGet1() throws Exception {
// 1.查询数据:get
byte []data=client.getData().forPath("/app1");
System.out.println(new String(data));
// 2.查询子节点:ls
List<String> list = client.getChildren().forPath("/");
System.out.println(list);
// 3.查询状态信息
Stat status=new Stat();
client.getData().storingStatIn(status).forPath("/app1");
// 节点的状态信息打印
System.out.println(status);
}

3.2.6 Watch事件监听

  • Zookeeper允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,Zookeeper服务端会将事件通知到感兴趣的客户端上去,该机制时Zookeeper实现分布式协调服务的重要特性。

  • Zookeeper中引入了Watcher机制来实现了发布\订阅功能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。

  • Zookeeper原生支持通过注册watcher来进行事件监听,但是其使用并不是特别方便需要开发人员自己反复注册watcher,比较繁琐。

  • Curator引入了Cache来实现对Zookeeper服务端事件的监听。

  • Zookeeper提供了三种Watcher:

    1. NodeCache:只监听某一个特定的节点
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
        /**
    * NodeCache: 给指定一个节点注册监听器
    */
    @Test
    public void testNodeCache() throws Exception {
    // 1.创建NodeCache对象
    // 参数:客户端 节点 是否压缩
    NodeCache nodeCache=new NodeCache(client, "/app1",false);
    // 2.注册监听
    nodeCache.getListenable().addListener(new NodeCacheListener() {
    @Override
    public void nodeChanged() throws Exception {
    System.out.println("节点变化了");
    byte[] app1Datas = client.getData().forPath("/app1");
    System.out.println(new String(app1Datas));



    }
    });
    // 3.开启监听 如果设置为true,则开启监听时加载缓存数据
    nodeCache.start(true);

    while (true){

    }


    }
    1. PathChildrenCache:监控一个ZNode的子节点
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
        /**
    * PathChildrenCache: 给指定一个节点注册监听器
    */
    @Test
    public void tesPathChildrenCache() throws Exception {
    // 1.参数列表:客户端对象、节点、是否缓存数据、是否压缩、线程池
    // 创建PathChildrenCache对象
    PathChildrenCache pathChildrenCache=new PathChildrenCache(client,"/app2",true);
    // 2.绑定监听器
    pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
    @Override
    public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
    if (pathChildrenCacheEvent.getType()== PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED){
    System.out.println("重新连接了");
    }
    else if(pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
    System.out.println("数据变化了");
    System.out.println(pathChildrenCacheEvent.getData());
    System.out.println(new String(pathChildrenCacheEvent.getData().getData()));

    }


    }
    });
    pathChildrenCache.start();



    while (true){

    }


    }
    1. TreeCache:可以监控整个树上所有的节点(本身和自身的子节点),类似于NodeCache和PathChildrenCache的组合
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
        /**
    * TreeCache: 监听某个节点自己和所有子节点们
    */
    @Test
    public void testTreeCache() throws Exception {
    // 1.参数列表:客户端对象、节点
    // 创建TreeCache对象
    TreeCache treeCache=new TreeCache(client,"/app2");
    // 2.绑定监听器
    treeCache.getListenable().addListener(new TreeCacheListener() {
    @Override
    public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
    System.out.println("节点变化了");
    System.out.println(treeCacheEvent);

    }
    }
    );



    while (true){

    }


    }

3.3分布式锁

  • 在我们进行单机应用开发,涉及并发

模拟12306售票案例


文章作者: 哈雅布撒
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 哈雅布撒 !
  目录