Python 官方文档:入门教程 => 点击学习
目录ZooKeeper数据模型ZooKeeper服务端常用命令ZooKeeper客户端命令使用Curator api操作Zookeeper建立连接Watch事件监听分布式锁实现概述Z
./zkServer.sh start
./zkServer.sh status
./zkServer.sh stop
./zkServer.sh restart
@Test
public void testConnect() {
//重试策略
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(3000, 10);
//第一种方式
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.130.120:2181", 60 * 1000, 15 * 1000, retry);
//第二种方式
CuratorFramework client1 = CuratorFrameworkFactory.builder().connectString("192.168.130.120:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retry).namespace("hrbu").build();
//开启连接
client.start();
}
参数解读
sessionTimeoutMs – session timeout (会话超时时间)
connectionTimeoutMs – connection timeout (连接超时时间)
retryPolicy – retry policy to use (重试策略)
会话超时时间和连接超时时间有默认值。
第二种链式编程的方式可以指定一个工作空间,在此客户端下的所有操作都会将此工作空间作为根目录。
注意
如果使用的是云服务器需要将指定端口打开
firewall-cmd --zone=public --add-port=2181/tcp --permanent
开放端口
firewall-cmd --zone=public --list-ports
查看已经开放的端口
systemctl restart firewalld
重启防火墙生效
最后别忘了在服务器的安全组里面添加端口,将2181端口打开
添加节点
@Test
public void testCreate1() throws Exception {
//基本创建
CreateBuilder createBuilder = client.create();
//创建时不指定数据,会将当前客户端ip存到里面
createBuilder.forPath("/app1");
//指定数据
createBuilder.forPath("/app2", "hello".getBytes());
}
@Test
public void testCreate2() throws Exception {
CreateBuilder createBuilder = client.create();
//设置节点类型,默认的类型是持久化
//CreateMode是枚举类型
createBuilder.withMode(CreateMode.EPHEMERAL).forPath("/app3");
}
@Test
public void testCreate3() throws Exception {
CreateBuilder createBuilder = client.create();
//创建多级节点,如果父节点不存在,则创建父节点。
createBuilder.creatingParentContainersIfNeeded().forPath("/app4/app4_1");
}
查询节点
@Test
public void testGet() throws Exception {
//查询数据
byte[] bytes = client.getData().forPath("/app1");
System.out.println(new String(bytes));
//查询子节点
List<String> strings = client.getChildren().forPath("/app4");
strings.forEach(System.out::println);
//查询节点状态信息
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/app1");
System.out.println(stat);
}
修改节点
@Test
public void testSet() throws Exception {
//修改数据
client.setData().forPath("/app1","hrbu".getBytes());
//根据版本修改
int version = 0;
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/app1");
version = stat.getVersion();
client.setData().withVersion(version).forPath("/app1", "HRBU".getBytes());
}
删除节点
@Test
public void testDelete() throws Exception {
//删除单个节点
client.delete().forPath("/app4/app4_1");
//删除带有子节点的节点
client.delete().deletinGChildrenIfNeeded().forPath("/app4");
//强制删除
client.delete().guaranteed().forPath("/app4");
//回调
client.delete().guaranteed().inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("执行删除操作");
}
}).forPath("/app4");
}
NodeCache
@Test
public void testNodeCache() throws Exception {
//NodeCache:指定一个节点注册监听器
//创建NodeCache对象
final NodeCache nodeCache = new NodeCache(client, "/app1");
//注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("app1节点发生变化");
//获取修改节点后的数据
byte[] data = nodeCache.getCurrentData().getData();
System.out.println("变化后的节点:"+new String(data));
}
});
//开启监听,如果为true,则开启则开启监听,加载缓冲数据
nodeCache.start(true);
}
PathChildrenCache
@Test
public void testPathChildrenCache() throws Exception {
//PathChildrenCache:监听某个节点的所有子节点
//创建监听对象
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/hrbu", true);
//绑定监听器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("子节点发生变化");
System.out.println(pathChildrenCacheEvent);
//监听子节点的数据变更,并且得到变更后的数据
//获取类型
PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
//判断类型
if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
//获取数据
byte[] data = pathChildrenCacheEvent.getData().getData();
System.out.println(new String(data));
}
}
});
//开启
pathChildrenCache.start();
}
TreeCache
@Test
public void testTreeCache() throws Exception {
//创建监听器
TreeCache treeCache = new TreeCache(client, "/");
//注册监听
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
System.out.println("节点发生变化");
System.out.println(treeCacheEvent);
}
});
//开启
treeCache.start();
}
在Curator中有五种锁方案:
package com.hrbu.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.TimeUnit;
public class Ticket12306 implements Runnable{
private int tickets = 10;//数据库的票数
private InterProcessMutex lock ;
public Ticket12306(){
//重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("8.130.32.75:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy)
.build();
//开启连接
client.start();
lock = new InterProcessMutex(client,"/lock");
}
@Override
public void run() {
while(true){
//获取锁
try {
lock.acquire(3, TimeUnit.SECONDS);
if(tickets > 0){
System.out.println(Thread.currentThread()+":"+tickets);
Thread.sleep(100);
tickets--;
}
} catch (Exception e) {
e.printStackTrace();
}finally {
//释放锁
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
package com.hrbu.curator;
public class LockTest {
public static void main(String[] args) {
Ticket12306 ticket12306 = new Ticket12306();
//创建客户端
Thread t1 = new Thread(ticket12306,"携程");
Thread t2 = new Thread(ticket12306,"飞猪");
t1.start();
t2.start();
}
}
到此这篇关于ZooKeeper命令及JavaAPI操作的文章就介绍到这了,更多相关ZooKeeper JavaAPI操作内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!
--结束END--
本文标题: ZooKeeper命令及JavaAPI操作代码
本文链接: https://lsjlt.com/news/199707.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-03-01
2024-03-01
2024-03-01
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0