返回顶部
首页 > 资讯 > 后端开发 > Python >java连接zookeeper实现zookeeper教程
  • 453
分享到

java连接zookeeper实现zookeeper教程

2024-04-02 19:04:59 453人浏览 八月长安

Python 官方文档:入门教程 => 点击学习

摘要

目录java连接ZooKeeper实现zookeeperZookeeperJavaapi基本操作1.连接客户端2.恢复回话3.创建节点4.修改节点5.删除节点6.查询节点7.查询子节

java连接zookeeper实现zookeeper

Java服务端连接Zookeeper,进行节点信息的获取,管理…整理成一个基本工具

添加依赖:


<dependency>
   <groupId>org.apache.zookeeper</groupId>
   <artifactId>zookeeper</artifactId>
   <version>3.3.6</version>
</dependency>

具体代码如下:


package com; 
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;  
 
public class BaseZookeeper implements Watcher{ 
   private ZooKeeper zookeeper;
    
   private static final int SESSION_TIME_OUT = 2000;
   private CountDownLatch countDownLatch = new CountDownLatch(1);
   @Override
   public void process(WatchedEvent event) {
      if (event.getState() == KeeperState.SyncConnected) {
         System.out.println("Watch received event");
         countDownLatch.countDown();
      }
   }   
  
   
   public void connectZookeeper(String host) throws Exception{
      zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this);
      countDownLatch.await();
      System.out.println("zookeeper connection success");
   }
  
   
   public String createnode(String path,String data) throws Exception{
      return this.zookeeper.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
   }
  
   
   public List<String> getChildren(String path) throws KeeperException, InterruptedException{
      List<String> children = zookeeper.getChildren(path, false);
      return children;
   }
  
   
   public String getData(String path) throws KeeperException, InterruptedException{
      byte[] data = zookeeper.getData(path, false, null);
      if (data == null) {
         return "";
      }
      return new String(data);
   }
  
   
   public Stat setData(String path,String data) throws KeeperException, InterruptedException{
      Stat stat = zookeeper.setData(path, data.getBytes(), -1);
      return stat;
   }
  
   
   public void deleteNode(String path) throws InterruptedException, KeeperException{
      zookeeper.delete(path, -1);
   }
  
   
   public String getCTime(String path) throws KeeperException, InterruptedException{
      Stat stat = zookeeper.exists(path, false);
      return String.valueOf(stat.getCtime());
   }
  
   
   public Integer getChildrenNum(String path) throws KeeperException, InterruptedException{
      int childenNum = zookeeper.getChildren(path, false).size();
      return childenNum;
   }
   
   public void closeConnection() throws InterruptedException{
      if (zookeeper != null) {
         zookeeper.close();
      }
   }  
}  
 

测试


public class Demo { 
    public static void main(String[] args) throws Exception {
        BaseZookeeper zookeeper = new BaseZookeeper();
        zookeeper.connectZookeeper("192.168.0.1:2181"); 
        List<String> children = zookeeper.getChildren("/");
        System.out.println(children);
    } 
}

ZookeeperJavaAPI基本操作

Zookeeper官方提供了两种语言的API,Java和C,在这里只演示JavaAPI

操作API的类中的变量,一下方法都会使用到


static Logger logg = LoggerFactory.getLogger(ZKApi.class);
private static final String zkServerPath = "10.33.57.28:2181";
private static final String zkServerPath = "127.0.0.1:2181";
private static final Integer timeOut = 5000;
private static Stat stat = new Stat(); 

以及实现接口Watcher的实现方法process


public void process(WatchedEvent event) {
    try {
        if (event.getType() == Event.EventType.NodeDataChanged) {
            ZooKeeper zk = null;
            zk = ZKApi.getZkConnect();
            byte[] resByt = new byte[0];
            resByt = zk.getData("/test1", false, stat);
            String resStr = new String(resByt);
            System.out.println("更改后的值:" + resStr);
            System.out.println("版本号的变化:" + stat.getVersion());
            System.out.println("-------");
            countDown.countDown();
        }else if(event.getType() == Event.EventType.NodeChildrenChanged){
            System.out.println("NodeChildrenChanged");
            ZooKeeper zk = null;
            zk = ZKApi.getZkConnect();
            List<String> srcChildList = zk.getChildren(event.getPath(), false);
            for (String child:srcChildList){
                System.out.println(child);
            }
            countDown.countDown();
        }else if(event.getType() == Event.EventType.NodeCreated){
            countDown.countDown();
        }else if (event.getType() == Event.EventType.NodeCreated){
            countDown.countDown();
        }
    }  catch (KeeperException e) {
        e.printStackTrace();
    }  catch (InterruptedException e) {
        e.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

1.连接客户端

创建客户端连接使用Zookeeper类的构造函数

Zookeeper构造函数总共四个如下:



public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
	long sessionId, byte[] sessionPasswd)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
        long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)

连接客户端代码


public static ZooKeeper getZkConnect() throws IOException {
    ZooKeeper zk = new ZooKeeper(zkServerPath, timeOut, new ZKApi());
    logg.debug("连接状态:{}", zk.getState());
    return zk;
}

DEBUG [main] - zookeeper.disableAutoWatchReset is false
DEBUG [main] - 连接状态:CONNECTING

2.恢复回话


public static void recoveryConnect() throws IOException, InterruptedException {
    ZooKeeper zooKeeper = new ZooKeeper(zkServerPath, timeOut, new ZKApi());
    long sessionId = zooKeeper.getSessionId();
    byte[] sessionPasswd = zooKeeper.getSessionPasswd();
    logg.debug("开始连接服务器 . . .");
    logg.debug("连接状态:{}",zooKeeper.getState());
    new Thread().sleep(1000 );
    logg.debug("开始重连 . . . ");
    ZooKeeper zooSession = new ZooKeeper(zkServerPath, timeOut, new ZKApi(), sessionId, sessionPasswd);
    logg.debug("重连状态:{}",zooSession.getState());
    new Thread().sleep(200);
    logg.debug("重连状态:{}",zooSession.getState());
}

DEBUG [main] - 开始连接服务器 . . .
DEBUG [main] - 连接状态:CONNECTING
DEBUG [main-SendThread(hdfa67:2181)] - Canonicalized address to hdfa67
 INFO [main-SendThread(hdfa67:2181)] - Opening Socket connection to server hdfa67/10.33.57.67:2181. Will not attempt to authenticate using SASL (unknown error)
 INFO [main-SendThread(hdfa67:2181)] - Socket connection established to hdfa67/10.33.57.67:2181, initiating session
DEBUG [main-SendThread(hdfa67:2181)] - Session establishment request sent on hdfa67/10.33.57.67:2181
 INFO [main-SendThread(hdfa67:2181)] - Session establishment complete on server hdfa67/10.33.57.67:2181, sessionid = 0x10000ea59aa0011, neGotiated timeout = 5000
DEBUG [main] - 开始重连 . . . 
 INFO [main] - Initiating client connection, connectString=10.33.57.67:2181 sessionTimeout=5000 watcher=ZKApi@73a28541 sessionId=0 sessionPasswd=<hidden>
DEBUG [main] - 重连状态:CONNECTING
DEBUG [main-SendThread(hdfa67:2181)] - Canonicalized address to hdfa67
 INFO [main-SendThread(hdfa67:2181)] - Opening socket connection to server hdfa67/10.33.57.67:2181. Will not attempt to authenticate using SASL (unknown error)
 INFO [main-SendThread(hdfa67:2181)] - Socket connection established to hdfa67/10.33.57.67:2181, initiating session
DEBUG [main-SendThread(hdfa67:2181)] - Session establishment request sent on hdfa67/10.33.57.67:2181
 INFO [main-SendThread(hdfa67:2181)] - Session establishment complete on server hdfa67/10.33.57.67:2181, sessionid = 0x10000ea59aa0012, negotiated timeout = 5000
DEBUG [main] - 重连状态:CONNECTED

3.创建节点

创建节点通过zk客户端对象的create方法进行创建,主要有两个方法:一种是同步,一种是异步,接下来的修改等方法同样如此,就不多加解释了



public String create(final String path, byte data[], List<ACL> acl,
        CreateMode createMode)
public void create(final String path, byte data[], List<ACL> acl,
        CreateMode createMode,  StrinGCallback cb, Object ctx)

public static void createZkNode1() throws IOException, KeeperException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    String result = zk.create("/test1", "test-data".getBytes(), 
    	ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//创建一个/test的持续节点
    System.out.println(result);
//输出/test1
public static void createZkNode2() throws IOException, KeeperException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    
    String ctx = "{'create': 'success'}";
    zk.create("/test2", "test-data".getBytes(), 
    	ZooDefs.Ids.OPEN_ACL_UNSAFE,
    	CreateMode.PERSISTENT,new CreateCallBack() ,ctx);
    new Thread().sleep(2000);//需要暂停一会,否则创建失败
}

4.修改节点


public Stat setData(final String path, byte data[], int version)
public void setData(final String path, byte data[], int version,
        StatCallback cb, Object ctx)

public static void setZkNode1() throws IOException, KeeperException, InterruptedException{
    ZooKeeper zk = getZkConnect();
    Stat stat = zk.setData("/test1", "modifyed-data".getBytes(), 0);
    System.out.println(stat.getVersion());
}
public static void setZkNode2() throws IOException, KeeperException, InterruptedException{
    ZooKeeper zk = getZkConnect();
    String ctx = "{'modify': 'success'}";
    zk.setData("/test1", "modifyed-data".getBytes(),0,new ModifyCalback(),ctx);
    new Thread().sleep(1000);//必须加上,否则回掉不成功
}

5.删除节点


public void delete(final String path, int version)
public void delete(final String path, int version, VoidCallback cb,
        Object ctx)

public static void deleteZkNode1() throws IOException, KeeperException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    zk.delete("/test1",1);//不能够删除子节点
}
public static void deleteZkNode2() throws IOException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    String ctx = "{'delete': 'success'}";
    zk.delete("/test2",0,new DeleteCallBack(),ctx);//不能够删除子节点
    new Thread().sleep(1000);//必须加上,否则回掉不成功
}

6.查询节点


public byte[] getData(String path, boolean watch, Stat stat)
public byte[] getData(final String path, Watcher watcher, Stat stat)
public void getData(final String path, Watcher watcher,DataCallback cb, Object ctx)             
public void getData(String path, boolean watch, DataCallback cb, Object ctx)        

public static CountDownLatch countDown = new CountDownLatch(1);
public static void selectData1() throws IOException, KeeperException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    byte[] data = zk.getData("/test1", true, stat);
    String s = new String(data);
    System.out.println("value: "+s);
    countDown.await();
}

if (event.getType() == Event.EventType.NodeDataChanged) {
            ZooKeeper zk = null;
            zk = ZKApi.getZkConnect();
            byte[] resByt = new byte[0];
            resByt = zk.getData("/test1", false, stat);
            String resStr = new String(resByt);
            System.out.println("更改后的值:" + resStr);
            System.out.println("版本号的变化:" + stat.getVersion());
            System.out.println("-------");
            countDown.countDown();
        

由于更改之后,触发了监听器,再次在命令行中进行更改,出现了一下结果。

在这里插入图片描述

7.查询子节点

查询子节点的方法


public List<String> getChildren(final String path, Watcher watcher)
public List<String> getChildren(String path, boolean watch)
public void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx)
public void getChildren(String path, boolean watch, Children2Callback cb, Object ctx)   
public List<String> getChildren(final String path, Watcher watcher, Stat stat)
public List<String> getChildren(String path, boolean watch, Stat stat)
public void getChildren(final String path, Watcher watcher, Children2Callback cb, Object ctx)
public void getChildren(String path, boolean watch, Children2Callback cb, Object ctx)
            

代码实现


public static CountDownLatch countDown = new CountDownLatch(1);
public static void selectchildData1() throws IOException, KeeperException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    List<String> srcChildList = zk.getChildren("/test", true, stat);
    for (String child:srcChildList){
        System.out.println(child);
    }
    countDown.await();
}

if(event.getType() == Event.EventType.NodeChildrenChanged){
    System.out.println("NodeChildrenChanged");
    ZooKeeper zk = null;
    zk = ZKApi.getZkConnect();
    List<String> srcChildList = zk.getChildren(event.getPath(), false);
    for (String child:srcChildList){
        System.out.println(child);
    }

运行结果完成后,触监听器,再次删除test1

在这里插入图片描述

第二种异步方式实现


public static void selectchildData2() throws IOException, KeeperException, InterruptedException{
    ZooKeeper zk = getZkConnect();
    String ctx = "{'selectChild': 'success'}";
    zk.getChildren("/test",false,new ChildrenCallback(),ctx);
    new Thread().sleep(1000);
}

8.使用递归得到所有的节点


public static void selectchildData3() throws IOException, KeeperException, InterruptedException{
   getChild("/");
}
public static void getChild(String path) throws IOException, KeeperException, InterruptedException {
    System.out.println(path);
    ZooKeeper zk = getZkConnect();
    List<String> childrenList = zk.getChildren(path, false, stat);
    if(childrenList.isEmpty() || childrenList ==null)
        return;
    for(String s:childrenList){
        if(path.equals("/"))
           getChild(path+s);
        else {
            getChild(path+"/"+s);
        }
    }
}

运行结果:

/zookeeper
/zookeeper/config
/zookeeper/quota
/ldd
/ldd/l
/loo
/t1
/test1
/seq
/seq/seq30000000002
/seq/seq20000000001
/seq/se0000000003
/seq/seq10000000000

9.判断节点是否存在


public static void existNode() throws IOException, KeeperException, InterruptedException {
    ZooKeeper zk = getZkConnect();
    Stat stat = zk.exists("/ff", true);
    System.out.println(stat);
}
//输出null则不存在

10.自定义权限


public static void oneSelfACL() throws Exception {
    ZooKeeper zk = getZkConnect();
    ArrayList<ACL> acls = new ArrayList<ACL>();
  //  zk.create("/test1","test-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); //所有人均可访问
    Id id1 = new Id("digest", ACLUtils.getDigestUserPassword("id1:123456"));
    Id id2 = new Id("digest", ACLUtils.getDigestUserPassword("id2:123456"));
   // Id ipId = new Id("ip","127.0.0.1");ip设置
    // acls.add(new ACL(ZooDefs.Perms.ALL,id1));
    acls.add(new ACL(ZooDefs.Perms.ALL,id1));
    acls.add(new ACL(ZooDefs.Perms.DELETE,id2));
    //注册过的用户必须通过addAuthInfo才可以操作节点
    zk.addAuthInfo("digest","id1:123456".getBytes());
    zk.create("/test2","test2-data".getBytes(), acls,CreateMode.PERSISTENT);
}

结果如下:

在这里插入图片描述

直接登录id1由于在程序已经注册完成

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程网。

--结束END--

本文标题: java连接zookeeper实现zookeeper教程

本文链接: https://lsjlt.com/news/156983.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • java连接zookeeper实现zookeeper教程
    目录java连接zookeeper实现zookeeperZookeeperJavaAPI基本操作1.连接客户端2.恢复回话3.创建节点4.修改节点5.删除节点6.查询节点7.查询子节...
    99+
    2024-04-02
  • Python与ZooKeeper集群连接
    由于项目的需要,需要学习Python客户端连接ZooKeeper集群,并实现创建临时节点、获得指定的路径下的信息、监听子节点变化的功能。 环境配置 ZooKeeper集群的安装可以参考http://blog.csdn.net/mr...
    99+
    2023-01-31
    集群 Python ZooKeeper
  • java连接zookeeper的3种方式小结
    目录java连接zookeeper3种方式1、使用zookeeper原始api2、使用ZkClient客户端连接,这种连接比较简单3、使用curator连接Java集成zookeep...
    99+
    2024-04-02
  • 解决java连接zookeeper很慢的问题
    目录java连接zookeeper很慢记一次惨痛的zookeeper连接教训java连接zookeeper很慢 最近在学习zookeeper,但是在连接zookeeper服务端时很慢...
    99+
    2024-04-02
  • C#如何连接使用Zookeeper
      Zookeeper作为分布式的服务框架,虽然是java写的,但是强大的C#也可以连接使用。   C#要连接使用Zookeeper,需要借助第三方插件,而现在主要有两个插件可供使用...
    99+
    2024-04-02
  • Java调用Zookeeper的实现步骤
    目录watch机制常用APIJAVA调用watch机制 Zookeeper watch是一种监听通知机制,可以随时监听一些数据的变化,从而实现数据的及时性。 Zookeeper所有的...
    99+
    2024-04-02
  • 如何解决java连接zookeeper很慢的问题
    这篇文章主要为大家展示了“如何解决java连接zookeeper很慢的问题”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“如何解决java连接zookeeper很慢的问题”这篇文章吧。java连接...
    99+
    2023-06-25
  • SpringBoot整合Zookeeper详细教程
    目录一、引言二、引入依赖三、编写客户端3.1、ZookeeperConfig3.2、ZookeeperWatches3.3、ZookeeperController一、引言 使用原生的...
    99+
    2022-12-23
    SpringBoot整合Zookeeper SpringBoot Zookeeper
  • Java如何实现ZooKeeper分布式锁
    这篇文章主要介绍了Java如何实现ZooKeeper分布式锁,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。什么是分布式锁在我们进行单机应用开发,涉及并发同步的时候,我们往往采...
    99+
    2023-06-29
  • apache-zookeeper-3.7.1 安装部署教程
    apache-zookeeper-3.7.1 安装部署 下载地址:https://mirrors.bfsu.edu.cn/apache/zookeeper/ apache-zookeeper-3.7.11.下载直接解压,...
    99+
    2023-02-03
    apache-zookeeper-3.7.1 安装 apache-zookeeper-3.7.1
  • apache-zookeeper-3.7.1 安装部署教程
    apache-zookeeper-3.7.1 安装部署 下载地址:https://mirrors.bfsu.edu.cn/apache/zookeeper/ apache-zooke...
    99+
    2023-02-03
    apache-zookeeper-3.7.1 安装 apache-zookeeper-3.7.1
  • java操作zookeeper实例代码
    本篇内容主要讲解“java操作zookeeper实例代码”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“java操作zookeeper实例代码”吧!package...
    99+
    2024-04-02
  • IDEA整合Dubbo+Zookeeper+SpringBoot实现
    目录1. 提出需求2. 环境准备3. 业务接口整合4. 服务提供者5. 服务消费者6. 测试7. 最后附上另外两种配置文件整合方式7.1 XML整合配置文件7.2 配置类整合本文主要...
    99+
    2024-04-02
  • zookeeper如何实现竞争锁
    这篇文章给大家分享的是有关zookeeper如何实现竞争锁的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。package com.hgs.sharelocks;import java.util....
    99+
    2023-06-02
  • ZooKeeper框架教程Curator分布式锁实现及源码分析
    目录  如何使用InterProcessMutex  实现思路   代码实现概述  InterProcessMutex源码分析&nb...
    99+
    2024-04-02
  • Docker快速安装Zookeeper的详细教程
    Docker快速安装Zookeeper 换了公司后很久没用过Zookeeper,最近因为在搞Elastic-Job需要用到,这里简单记录下用Docker搭建过程,下一篇会讲解怎么快...
    99+
    2024-04-02
  • ZooKeeper Java API编程的示例分析
    这篇文章主要为大家展示了“ZooKeeper Java API编程的示例分析”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“ZooKeeper Java API编程的示例分析”这篇文章吧。开发应用...
    99+
    2023-05-30
    zookeeper java
  • zookeeper分布式锁如何实现
    这篇文章主要介绍“zookeeper分布式锁如何实现”,在日常操作中,相信很多人在zookeeper分布式锁如何实现问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”zookeeper分布式锁如何实现”的疑惑有所...
    99+
    2023-06-27
  • Zookeeper如何实现分布式锁
    这篇“Zookeeper如何实现分布式锁”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Zookeeper如何实现分布式锁”文...
    99+
    2023-06-27
  • 基于ZooKeeper实现队列源码
    实现原理先进先出队列是最常用的队列,使用Zookeeper实现先进先出队列就是在特定的目录下创建PERSISTENT_EQUENTIAL节点,创建成功时Watcher通知等待的队列,队列删除序列号最小的节点用以消费。此场景下Zookeepe...
    99+
    2023-05-31
    java zookeeper zookeepe
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作