1 Curator介绍

image-20221021110647223

原生zookeeperAPI的不足

  • 连接对象异步创建,需要开发人员自行编码等待
  • 连接没有自动重连超时机制
  • watcher一次注册生效一次
  • 不支持递归创建树形节点

curator特点

  • 解决session会话超时重连
  • watcher反复注册
  • 简化开发api
  • 遵循Fluent风格API

2 Curator API

image-20221021110851930


image-20221021111527322

image-20221021111542040

image-20221021111638914

image-20221021111745294

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.yppah</groupId>
    <artifactId>curator</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.10</version>
            <scope>test</scope>
        </dependency>
        <!--curator-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>
        <!--日志-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.21</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.21</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>
log4j.rootLogger=off,stdout

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%d{yyyy-MM-dd HH/:mm/:ss}]%-5p %c(line/:%L) %x-%m%n

image-20221021135629154

2.1 建立连接

image-20221021153448534

package com.yppah.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.Test;

/**
 * @Author: haifei
 * @Date: 2022/10/21 11:34
 */
public class CuratorTest {

    /**
     * 建立连接-方式1
     */
    @Test
    public void testConnect() {
        /*
         * @param connectString       连接字符串。zk-server地址和端口 "192.168.38.9:2181,192.168.38.10:2181,..."
         * @param sessionTimeoutMs    会话超时时间 单位ms
         * @param connectionTimeoutMs 连接超时时间 单位ms
         * @param retryPolicy         重试策略
         */

        // 1、获取重试策略对象:3秒休眠,重试10次
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000, 10);
        // 2、获取连接对象
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                "192.168.38.9:2181",
                60 * 1000, //默认值
                15 * 1000, //默认值
                retryPolicy
        );
        // 3、开启连接
        client.start();
    }

    /**
     * 建立连接-方式2:链式编程
     */
    @Test
    public void testConnect2() {
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(3000, 10);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("192.168.38.9:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy)
                .namespace("yppah") //CuratorFramework支持名称空间设置,即自定义zk根目录名字,将来所有操作都会默认添加一个yppah前缀节点,即\yppah\+
                .build();
        client.start();
    }

}

image-20221021135924890

image-20221021153615453

2.2 添加节点

package com.yppah.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/**
 * @Author: haifei
 * @Date: 2022/10/21 15:30
 */
public class CuratorTest2 {

    private CuratorFramework client3;

    @Before
    public void testConnect3() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        client3 = CuratorFrameworkFactory.builder()
                .connectString("192.168.1.114:2181") //192.168.38.9  192.168.1.114
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy)
                .namespace("ohh")
                .build();
        client3.start();
    }

    @After
    public void close() {
        if (client3 != null) {
            client3.close();
        }
    }

    /**
     * 创建节点:create 持久 临时 顺序 数据
     * 1. 基本创建 :create().forPath("")
     * 2. 创建节点 带有数据:create().forPath("",data)
     * 3. 设置节点的类型:create().withMode().forPath(""[,data])
     * 4. 创建多级节点  /app4/p1 :create().creatingParentsIfNeeded().forPath(""[,data])
     */
    @Test
    public void testCreate() throws Exception {
        //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
        String path = client3.create().forPath("/app1");
        System.out.println(path);
    }

    @Test
    public void testCreate2() throws Exception {
        //创建节点,指定数据
        String path = client3.create().forPath("/app2", "waiwai".getBytes());
        System.out.println(path);
    }

    @Test
    public void testCreate3() throws Exception {
        //设置节点的类型(默认持久化)
        String path = client3.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
        System.out.println(path);
//        PERSISTENT(0, false, false, false, false),
//        PERSISTENT_SEQUENTIAL(2, false, true, false, false),
//        EPHEMERAL(1, true, false, false, false),
//        EPHEMERAL_SEQUENTIAL(3, true, true, false, false),
//        CONTAINER(4, false, false, true, false),
//        PERSISTENT_WITH_TTL(5, false, false, false, true),
//        PERSISTENT_SEQUENTIAL_WITH_TTL(6, false, true, false, true);
    }

    @Test
    public void testCreate4() throws Exception {
        //创建多级节点  /app4/p1
        // creatingParentsIfNeeded():如果父节点不存在,则创建父节点
        String path = client3.create().creatingParentsIfNeeded().forPath("/app4/p1");
        System.out.println(path);
    }
}


image-20221021142105744

image-20221021153747153

image-20221021154105141

ps:可以看出命名空间并不是自定义了根目录\,而是给所有待操作的节点加了一个前缀节点


image-20221021155109609

image-20221021155341214

2.3 查询节点

package com.yppah.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.List;

/**
 * @Author: haifei
 * @Date: 2022/10/21 15:30
 */
public class CuratorTest3 {

    private CuratorFramework client3;

    @Before
    public void testConnect3() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        client3 = CuratorFrameworkFactory.builder()
                .connectString("101.43.128.227:2181") //192.168.38.9  192.168.1.114
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy)
                .namespace("ohh")
                .build();
        client3.start();
    }

    @After
    public void close() {
        if (client3 != null) {
            client3.close();
        }
    }

    /**
     * 查询节点:
     * 1. 查询数据:get: getData().forPath()
     * 2. 查询子节点: ls: getChildren().forPath()
     * 3. 查询节点状态信息:ls -s:getData().storingStatIn(状态对象).forPath()
     */
    @Test
    public void testGet() throws Exception {
        //1. 查询数据:get
        byte[] data = client3.getData().forPath("/app1");
        System.out.println(new String(data));
    }

    @Test
    public void testGet2() throws Exception {
        //2. 查询查询子节点: ls
        List<String> path = client3.getChildren().forPath("/"); //即查询“/ohh下的子节点”
        System.out.println(path);
    }

    @Test
    public void testGet3() throws Exception {
        Stat stat = new Stat();
        System.out.println(stat);
        //3. 查询节点状态信息:ls -s
        client3.getData().storingStatIn(stat).forPath("/app1");
        System.out.println(stat);
    }
}

image-20221022083903909

image-20221022083942739

image-20221022084931398

image-20221022085005748

image-20221022085545861

image-20221022085514832

image-20221022085812665

2.4 修改节点

package com.yppah.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/**
 * @Author: haifei
 * @Date: 2022/10/22 9:03
 */
public class CuratorTest4 {

    private CuratorFramework client3;

    @Before
    public void testConnect3() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        client3 = CuratorFrameworkFactory.builder()
                .connectString("101.43.128.227:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy)
                .namespace("ohh")
                .build();
        client3.start();
    }

    @After
    public void close() {
        if (client3 != null) {
            client3.close();
        }
    }

    /**
     * 修改数据
     * 1. 基本修改数据:setData().forPath()
     * 2. 根据版本修改: setData().withVersion().forPath()
     *      * version 是通过查询出来的。目的就是为了让其他客户端或者线程 不干扰我。
     */
    @Test
    public void testSet() throws Exception {
        client3.setData().forPath("/app1", "yppah".getBytes());
    }

    @Test
    public void testSet2() throws Exception {
        Stat stat = new Stat();
        client3.getData().storingStatIn(stat).forPath("/app1"); //查询节点状态信息:ls -s
        int version = stat.getVersion();
        System.out.println(version);
        client3.setData().withVersion(version).forPath("/app1", "baby".getBytes());
    }

}

image-20221022090843928

image-20221022090916422

image-20221022090942252

image-20221022092114923

image-20221022092317351

2.5 删除节点

package com.yppah.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/**
 * @Author: haifei
 * @Date: 2022/10/22 9:29
 */
public class CuratorTest5 {

    private CuratorFramework client3;

    @Before
    public void testConnect3() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        client3 = CuratorFrameworkFactory.builder()
                .connectString("101.43.128.227:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy)
                .namespace("ohh")
                .build();
        client3.start();
    }

    @After
    public void close() {
        if (client3 != null) {
            client3.close();
        }
    }

    /**
     * 删除节点: delete deleteall
     * 1. 删除单个节点:delete().forPath("/app1");
     * 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1");
     * 3. 必须成功的删除:为了防止网络抖动。本质就是重试。  client.delete().guaranteed().forPath("/app2");
     * 4. 回调:inBackground
     */
    @Test
    public void testDel() throws Exception {
        //1、删除单个节点
        client3.delete().forPath("/app1");
    }

    @Test
    public void testDel2() throws Exception {
        //2、删除带有子节点的节点
        client3.delete().deletingChildrenIfNeeded().forPath("/app4");
    }

    @Test
    public void testDel3() throws Exception {
        //3、必须成功的删除
        client3.delete().guaranteed().forPath("/app2");
    }

    @Test
    public void testDel4() throws Exception {
        //4、回调
        client3.delete().guaranteed().inBackground(new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                System.out.println("我被删除了");
                System.out.println(curatorEvent);
            }
        }).forPath("/app1");
    }

}

image-20221022102253159

image-20221022102316821

image-20221022102512036

image-20221022102529627

image-20221022102556204

image-20221022102758308

image-20221022102816118

image-20221022103227784

原文地址:http://www.cnblogs.com/yppah/p/16812698.html

1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长! 2. 分享目的仅供大家学习和交流,请务用于商业用途! 3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入! 4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解! 5. 如有链接无法下载、失效或广告,请联系管理员处理! 6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需! 7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员! 8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载 声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性