SpringBoot整合Zookeeper详细教程

作者:擅长开发Bug的Mr.NaCl 时间:2022-07-24 11:33:09 

一、引言

使用原生的zookeeper时候会遇到watcher一次注册生效一次等情况,因此使用curator

curator是Netflix公司开源的一个 zookeeper客户端原生API接口上进行了包装,解决了很多问题并提供Zookeeper分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等的应用的抽象封装

二、引入依赖

<dependency>
   <groupId>org.apache.zookeeper</groupId>
   <artifactId>zookeeper</artifactId>
   <version>3.6.2</version>
</dependency>
<dependency>
   <groupId>org.apache.curator</groupId>
   <artifactId>curator-framework</artifactId>
   <version>5.2.0</version>
</dependency>
<dependency>
   <groupId>org.apache.curator</groupId>
   <artifactId>curator-recipes</artifactId>
   <version>5.2.0</version>
</dependency>
<dependency>
   <groupId>com.alibaba</groupId>
   <artifactId>fastjson</artifactId>
   <version>2.0.10</version>
</dependency>

三、编写客户端

要改Windows的host文件。host文件位置是C:\Windows\System32\drivers\etc

SpringBoot整合Zookeeper详细教程

3.1、ZookeeperConfig

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ZookeeperConfig {
   //集群地址,分割不能有空格 在程序里写connectString不可使用ip,必须使用主机名
   private String connectString = "master:2181,slave1:2181,slave2:2181";
   //连接超时时间
   private int sessionTimeout = 5000;
   //会话存活时间,根据业务灵活指定
   private Integer sessionTimeOut=5000;
   //重试机制时间参数
   private Integer sleepMsBetweenRetry=1000;
   //重试机制重试次数
   private Integer maxRetries=3;
   //命名空间(父节点名称)
   private String namespace="";
   /**
    - `session`重连策略
    - `RetryPolicy retry Policy = new RetryOneTime(3000);`
    - 说明:三秒后重连一次,只重连一次
    - `RetryPolicy retryPolicy = new RetryNTimes(3,3000);`
    - 说明:每三秒重连一次,重连三次
    - `RetryPolicy retryPolicy = new RetryUntilElapsed(1000,3000);`
    - 说明:每三秒重连一次,总等待时间超过个`10`秒后停止重连
    - `RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3)`
    - 说明:这个策略的重试间隔会越来越长
    - 公式:`baseSleepTImeMs * Math.max(1,random.nextInt(1 << (retryCount + 1)))`
    - `baseSleepTimeMs` = `1000` 例子中的值
    - `maxRetries` = `3` 例子中的值
    */
   @Bean("curatorClient")
   public CuratorFramework curatorClient() throws Exception {
       CuratorFramework client = CuratorFrameworkFactory.builder()
               .connectString(connectString)
               .connectionTimeoutMs(sessionTimeout)
               .sessionTimeoutMs(sessionTimeOut)
               //session重连策略
               .retryPolicy(new ExponentialBackoffRetry(sleepMsBetweenRetry,maxRetries))
               //设置命名空间 在操作节点的时候,会以这个为父节点
               .namespace(namespace)
               .build();
       client.start();
       //注册 *
       ZookeeperWatches watches = new ZookeeperWatches(client);
       watches.znodeWatcher();
       watches.znodeChildrenWatcher();
       return client;
   }
}

3.2、ZookeeperWatches

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.zookeeper.data.Stat;
public class ZookeeperWatches {
   private CuratorFramework client;
   public ZookeeperWatches(CuratorFramework client) {
       this.client = client;
   }
   public void znodeWatcher() throws Exception {
       NodeCache nodeCache = new NodeCache(client, "/");
       nodeCache.start();
       nodeCache.getListenable().addListener(new NodeCacheListener() {
           @Override
           public void nodeChanged() throws Exception {
               System.out.println("=======节点改变===========");
               String path = nodeCache.getPath();
               String currentDataPath = nodeCache.getCurrentData().getPath();
               String currentData = new String(nodeCache.getCurrentData().getData());
               Stat stat = nodeCache.getCurrentData().getStat();
               System.out.println("path:"+path);
               System.out.println("currentDataPath:"+currentDataPath);
               System.out.println("currentData:"+currentData);
           }
       });
       System.out.println("节点监听注册完成");
   }
   public void znodeChildrenWatcher() throws Exception {
       PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/",true);
       pathChildrenCache.start();
       pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
           @Override
           public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
               System.out.println("=======节点子节点改变===========");
               PathChildrenCacheEvent.Type type = event.getType();
               String childrenData = new String(event.getData().getData());
               String childrenPath = event.getData().getPath();
               Stat childrenStat = event.getData().getStat();
               System.out.println("子节点监听类型:"+type);
               System.out.println("子节点路径:"+childrenPath);
               System.out.println("子节点数据:"+childrenData);
               System.out.println("子节点元数据:"+childrenStat);
           }
       });
       System.out.println("子节点监听注册完成");
   }
}

3.3、ZookeeperController

@RestController
@RequestMapping(value = "/zookeeper")
public class ZookeeperController {
   @Resource(name = "curatorClient")
   private CuratorFramework client;
   @RequestMapping("/createZnode")
   public String createZnode(){
       String path = "/nacl";
       String data = "shuaige";
       List<ACL> aclList = new ArrayList<>();
       Id id = new Id("world", "anyone");
       aclList.add(new ACL(ZooDefs.Perms.ALL, id));
       try {
           client.create()
                   .creatingParentsIfNeeded()  //没有父节点时 创建父节点
                   .withMode(CreateMode.PERSISTENT)  //节点类型
                   .withACL(aclList)   //配置权限
                   .forPath(path, data.getBytes());
       } catch (Exception e) {
           e.printStackTrace();
           return "节点创建失败"+e.getMessage();
       }
       return "节点创建成功";
   }
   @RequestMapping("/selectZnode")
   public String  selectZnode(){
       HashMap<String,String> hashMap=new HashMap();
       String path="/nacl";
       Stat stat;
       try {
           stat = client.checkExists().forPath(path);
           if (stat == null) {
               hashMap.put("Error","不存在该节点");
           }
           String dataString = new String(client.getData().forPath(path));
           hashMap.put(path, dataString);
           hashMap.put("stat", stat.toString());
       } catch (Exception e) {
           e.printStackTrace();
       }
       return JSON.toJSONString(hashMap);
   }
   @RequestMapping("/selectChildrenZnode")
   public String selectChildrenZnode(){
       Map<String, String> hashMap = new HashMap<>();
       String path="/";
       try {
           List<String> list = client.getChildren().forPath(path);
           for (String s : list) {
               String dataString = new String(client.getData().forPath(path+s));
               hashMap.put(path+s, dataString);
           }
       } catch (Exception e) {
           e.printStackTrace();
       }
       return JSON.toJSONString(hashMap);
   }
   @RequestMapping("/setData")
   public String setData() {
       String path="/nacl";
       String data="big";
       Integer version=0;
       HashMap<String,String> hashMap=new HashMap<>();
       try {
           Stat stat = client.setData().withVersion(version).forPath(path, data.getBytes());
           hashMap.put("success", "修改成功");
           hashMap.put("version", String.valueOf(stat.getVersion()));
       } catch (Exception e) {
           e.printStackTrace();
           hashMap.put("error", "修改失败:"+e.getMessage());
       }
       return JSON.toJSONString(hashMap);
   }
   @RequestMapping("/delete")
   public String delete() {
       HashMap<String,String> hashMap=new HashMap<>();
       String path="/nacl";
       String data="big";
       Integer version=1;
       try {
           client.delete().withVersion(version).forPath(path);
           hashMap.put("success", "删除成功");
       } catch (Exception e) {
           e.printStackTrace();
           hashMap.put("error", "删除失败:"+e.getMessage());
       }
       return JSON.toJSONString(hashMap);
   }
   @RequestMapping("/createAsyncZnode")
   public String createAsyncZnode(){
       String path = "/nacl";
       String data = "shuaige";
       try {
           client.create()
                   .creatingParentsIfNeeded()
                   .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                   //异步回调   增删改都有异步方法
                   .inBackground(new BackgroundCallback() {
                       @Override
                       public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                           System.out.println("异步回调--获取权限:"+client.getACL().forPath(path));
                           System.out.println("异步回调--获取数据:"+new String(client.getData().forPath(path)));
                           System.out.println("异步回调--获取事件名称:"+event.getName());
                           System.out.println("异步回调--获取事件类型:"+event.getType());
                       }
                   })
                   .forPath(path, data.getBytes());
       } catch (Exception e) {
           e.printStackTrace();
           return "节点创建失败"+e.getMessage();
       }
       return "节点创建成功";
   }
}

来源:https://blog.csdn.net/lushixuan12345/article/details/128201573

标签:SpringBoot,整合,Zookeeper
0
投稿

猜你喜欢

  • android实现通知栏下载更新app示例

    2022-01-06 01:58:56
  • Android中三种注入事件方法比较

    2022-11-20 18:17:23
  • Java传入用户名和密码并自动提交表单实现登录到其他系统的实例代码

    2023-09-20 00:40:46
  • JavaFX之TableView的使用详解

    2022-07-16 06:32:24
  • 浅谈Java数值类型的转换与强制转换

    2022-07-01 15:24:00
  • c#基础知识---委托,匿名函数,lambda

    2023-06-12 18:18:07
  • Intellij Idea中进行Mybatis逆向工程的实现

    2021-06-05 00:08:53
  • Unity实现俄罗斯方块

    2021-05-28 13:37:08
  • C++智能指针读书笔记

    2022-02-24 06:51:28
  • Idea2020.2创建JavaWeb项目(部署Tomcat)方法详解

    2023-11-02 13:29:52
  • Java中synchronized关键字引出的多种锁 问题

    2021-06-05 04:42:45
  • Java反射 JavaBean对象自动生成插入,更新,删除,查询sql语句操作

    2022-04-05 20:07:11
  • java使用jacob实现word转pdf

    2023-05-12 11:45:33
  • 将cantk runtime嵌入到现有的APP中的方法

    2021-12-04 06:06:09
  • 详解C#中通过委托来实现回调函数功能的方法

    2021-08-14 05:02:58
  • Spring session 获取当前账户登录数的实例代码

    2022-10-17 10:02:05
  • C# 使用Log4net添加日志记录的方法

    2021-11-16 01:57:23
  • java分页工具类的使用方法

    2023-08-17 02:00:14
  • Spring Cloud Config实现分布式配置中心

    2022-04-25 16:28:07
  • java中JVM中如何存取数据和相关信息详解

    2023-08-10 03:49:46
  • asp之家 软件编程 m.aspxhome.com