irpas技术客

SpringBoot 整合zookeeper_老鼠扛刀满街找猫@_zookeeper整合springboot

未知 1586

文章目录 SpringBoot 整合zookeeper1 ZkClient1.1 相关依赖1.2 ZookeeperConfig1.3 ZkService 2 Apache Curator(推荐使用)2.1 相关依赖2.2 ZkCuratorConfiguration2.3 ZkCurator api 3. Curator 事件监听3.1 NodeCache 节点缓存的监听3.2 PathChildrenCache 子节点监听3.2.1 重载方法介绍3.2.2 启动模式3.2.3 代码实例 3.3 Tree Cache 节点树缓存3.3.1 代码示例

SpringBoot 整合zookeeper

Zookeeper原生Java API、ZKClient和Apache Curator 区别对比

1 ZkClient 1.1 相关依赖 <properties> <zookeeper.version>3.6.0</zookeeper.version> </properties> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>${zookeeper.version}</version> <!--需要排查日志,否则会冲突 --> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> 1.2 ZookeeperConfig application.properties zookeeper.address=127.0.0.1:2181 zookeeper.timeout=10000 ZookeeperConfig.java import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.beans.factory.annotation.Value; import java.util.concurrent.CountDownLatch; @Configuration public class ZookeeperConfig { private static final Logger logger = LoggerFactory.getLogger(ZookeeperConfig.class); /** * zookeeper 地址 */ @Value("${zookeeper.address}") private String connect; /** * connect 超时时间 */ @Value("${zookeeper.timeout}") private int timeout; /** * zookeeper client * @return */ @Bean(name = "zkClient") public ZooKeeper zkClient() { ZooKeeper zooKeeper = null; try { final CountDownLatch countDownLatch = new CountDownLatch(1); //连接成功后,会回调watcher监听,此连接操作是异步的,执行完new语句后,直接调用后续代码 // 可指定多台服务地址 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 zooKeeper = new ZooKeeper(connect, timeout, new Watcher() { @Override public void process(WatchedEvent event) { if (Event.KeeperState.SyncConnected == event.getState()) { //如果收到了服务端的响应事件,连接成功 countDownLatch.countDown(); } } }); countDownLatch.await(); logger.info("【初始化ZooKeeper连接状态....】={}", zooKeeper.getState()); } catch (Exception e) { logger.error("初始化ZooKeeper连接异常....】={}", e); } return zooKeeper; } } 1.3 ZkService import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; @Service public class ZkService { private static final Logger logger = LoggerFactory.getLogger(ZkService.class); @Autowired private ZooKeeper zkClient; /** * 判断指定节点是否存在 * * @param path * @param needWatch 指定是否复用zookeeper中默认的Watcher * @return */ public Stat exists(String path, boolean needWatch) { try { return zkClient.exists(path, needWatch); } catch (Exception e) { logger.error("【断指定节点是否存在异常】{},{}", path, e); return null; } } /** * 检测结点是否存在 并设置监听事件 * 三种监听类型: 创建,删除,更新 * * @param path * @param watcher 传入指定的监听类 * @return */ public Stat exists(String path, Watcher watcher) { try { return zkClient.exists(path, watcher); } catch (Exception e) { logger.error("【断指定节点是否存在异常】{},{}", path, e); return null; } } /** * 创建持久化节点 * * @param path * @param data */ public boolean createNode(String path, String data) { try { zkClient.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); return true; } catch (Exception e) { logger.error("【创建持久化节点异常】{},{},{}", path, data, e); return false; } } /** * 修改持久化节点 * * @param path * @param data */ public boolean updateNode(String path, String data) { try { //zk的数据版本是从0开始计数的。如果客户端传入的是-1,则表示zk服务器需要基于最新的数据进行更新。如果对zk的数据节点的更新操作没有原子性要求则可以使用-1. //version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查 zkClient.setData(path, data.getBytes(), -1); return true; } catch (Exception e) { logger.error("【修改持久化节点异常】{},{},{}", path, data, e); return false; } } /** * 删除持久化节点 * * @param path */ public boolean deleteNode(String path) { try { //version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查 zkClient.delete(path, -1); return true; } catch (Exception e) { logger.error("【删除持久化节点异常】{},{}", path, e); return false; } } /** * 获取当前节点的子节点(不包含孙子节点) * * @param path 父节点path */ public List<String> getChildren(String path) throws KeeperException, InterruptedException { List<String> list = zkClient.getChildren(path, false); return list; } /** * 获取指定节点的值 * * @param path * @return */ public String getData(String path, Watcher watcher) { try { Stat stat = new Stat(); byte[] bytes = zkClient.getData(path, watcher, stat); return new String(bytes); } catch (Exception e) { e.printStackTrace(); return null; } } } 2 Apache Curator(推荐使用) 2.1 相关依赖 <properties> <curator.version>4.3.0</curator.version> </properties> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>${curator.version}</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>${curator.version}</version> </dependency> 2.2 ZkCuratorConfiguration application.properties #重试次数 curator.retryCount=5 #重试间隔时间 curator.elapsedTimeMs=5000 # zookeeper 地址 curator.address=127.0.0.1:2181 # session超时时间 curator.sessionTimeoutMs=60000 # 连接超时时间 curator.connectionTimeoutMs=5000 ZkCuratorConfiguration.java import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.beans.factory.annotation.Value; @Configuration public class ZkCuratorConfiguration { // 重试次数 @Value("${curator.retryCount}") private int retryCount; // 重试间隔时间 @Value("${curator.elapsedTimeMs}") private int elapsedTimeMs; // zookeeper 地址 @Value("${curator.address}") private String address; // session超时时间 @Value("${curator.sessionTimeoutMs}") private int sessionTimeoutMs; // 连接超时时间 @Value("${curator.connectionTimeoutMs}") private int connectionTimeoutMs; @Bean(initMethod = "start") public CuratorFramework curatorFramework() { return CuratorFrameworkFactory.newClient( address, sessionTimeoutMs, connectionTimeoutMs, new RetryNTimes(retryCount, elapsedTimeMs)); } } 2.3 ZkCurator api import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.util.concurrent.TimeUnit; @Service public class ZkCuratorService { private static Logger logger = LoggerFactory.getLogger(ZkCuratorService.class); @Value("${service.index}") private String index; public static String CHARSET_NAME = "utf-8"; @Autowired private CuratorFramework client; /** * 创建节点(若创建节点的父节点不存在会先创建父节点再创建子节点) * @param path 节点路径 * @param value 值 * @param modeType 节点类型 */ public void createNode(String path, String value, String modeType) throws Exception { if (null == client.checkExists().forPath(path)) { if(CreateMode.PERSISTENT.equals(modeType)) { // 持久型节点 client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path,value.getBytes(CHARSET_NAME)); } else if(CreateMode.EPHEMERAL.equals(modeType)) { //临时节点 client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path,value.getBytes(CHARSET_NAME)); } else if(CreateMode.PERSISTENT_SEQUENTIAL.equals(modeType)) { // 持久类型顺序性节点 client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(path,value.getBytes(CHARSET_NAME)); } else if(CreateMode.EPHEMERAL_SEQUENTIAL.equals(modeType)) { // 临时类型顺序性节点 client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path,value.getBytes(CHARSET_NAME)); } } } //创建节点 public void createNodeSimple(String path) throws Exception { if (null == client.checkExists().forPath(path)) { client.create().creatingParentsIfNeeded().forPath(path); } } //删除节点 public void deleteNodeSimple(String path) throws Exception { if (null != client.checkExists().forPath(path)) { client.delete().deletingChildrenIfNeeded().forPath(path); } } //设置数据 public void setData(String path, String data) throws Exception { if(null == client.checkExists().forPath(path)) return; client.setData().forPath(path, data.getBytes(CHARSET_NAME)); } //添加临时节点数据 public void appendPersistentData(String path, String data) throws Exception { PersistentEphemeralNode node = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, path, data.getBytes(CHARSET_NAME)); node.start(); node.waitForInitialCreate(3, TimeUnit.SECONDS); } // 删除节点及其子节点 public void deletingChildrenIfNeeded(String path) throws Exception { if (null == client.checkExists().forPath(path)) return; // 递归删除节点 client.delete().deletingChildrenIfNeeded().forPath(path); } } 3. Curator 事件监听 Curator 事件有两种模式,一种是标准的观察模式,一种是缓存监听模式。标准的监听模式是使用Watcher 监听器。第二种缓存监听模式引入了一种本地缓存视图的Cache机制,来实现对Zookeeper服务端事件监听。Cache事件监听可以理解为一个本地缓存视图与远程Zookeeper视图的对比过程。Cache提供了反复注册的功能。Cache是一种缓存机制,可以借助Cache实现监听。简单来说,Cache在客户端缓存了znode的各种状态,当感知到zk集群的znode状态变化,会触发event事件,注册的监听器会处理这些事件。Cache事件监听的种类有3种Path Cache,Node Cache,Tree Cache。 3.1 NodeCache 节点缓存的监听 Curator引入的Cache缓存实现,是一个系列,包括了Node Cache 、Path Cache、Tree Cache三组类。其中Node Cache节点缓存可以用于ZNode节点的监听,Path Cache子节点缓存用于ZNode的子节点的监听,而Tree Cache树缓存是Path Cache的增强,不光能监听子节点,也能监听ZNode节点自身。Node Cache,可以用于监控本节点的新增,删除,更新。流程代码 // 1. 创建NodeCache 实例 NodeCache nodeCache = new NodeCache(client, "/path"); // 2. 添加监听事件 nodeCache.getListenable().addListener(new NodeCacheListener() { // 当监听的节点发生变化(增 删 改),触发执行 @Override public void nodeChanged() throws Exception { ChildData childData = nodeCache.getCurrentData(); logger.info("ZNode节点状态改变, path={}", childData.getPath()); logger.info("ZNode节点状态改变, data={}", new String(childData.getData(), "Utf-8")); logger.info("ZNode节点状态改变, stat={}", childData.getStat()); } }); // 3. 开启监听 nodeCache.start(); 3.2 PathChildrenCache 子节点监听 只能监听子节点,监听不到当前节点不能递归监听,子节点下的子节点不能递归监控 3.2.1 重载方法介绍 public PathChildrenCache(CuratorFramework client, String path,boolean cacheData) public PathChildrenCache(CuratorFramework client, String path,boolean cacheData, boolean dataIsCompressed,final ExecutorService executorService) public PathChildrenCache(CuratorFramework client, String path,boolean cacheData, boolean dataIsCompressed,ThreadFactory threadFactory) public PathChildrenCache(CuratorFramework client, String path,boolean cacheData, ThreadFactory threadFactory)

参数介绍:

CuratorFramework:Curator的客户端path:监听的节点路径cacheData:表示是否把节点内容缓存起来,如果cacheData为true,那么接收到节点列表变更事件的同时,会将获得节点内容。dataIsCompressed:表示是否对节点数据进行压缩executorService 和threadFactory:线程池工厂,当PathChildrenCache内部需要开启新的线程执行时,使用该线程池工厂来创建线程,通过传入的线程池或者线程工厂,来异步处理监听事件 3.2.2 启动模式 NORMAL:启动时,异步初始化cache,完成后不会发出通知。BUILD_INITIAL_CACHE:启动时,同步初始化cache,以及创建cache后,就从服务器拉取对应的数据。POST_INITIALIZED_EVENT:启动时,异步初始化cache,初始化完成触发PathChildrenCacheEvent.Type#INITIALIZED事件,cache中Listener会收到该事件的通知。 3.2.3 代码实例 // 1. 创建PathChildrenCache实例 PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/path", true); // 2. 添加监听时间 pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { logger.info("事件:" + event.getType().toString().toUpperCase()); logger.info("触发节点:" + event.getData().getPath()); if (Objects.nonNull(event.getData())) { logger.info("触发节点数据:" + new String(event.getData().getData())); } } }); // 3. 启动 并设置启动模式 默认:StartMode.NORMAL pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL); 3.3 Tree Cache 节点树缓存

Tree Cache可以看做是上两种的合体,Tree Cache观察的是当前ZNode节点的所有数据。而TreeCache节点树缓存是PathChildrenCache的增强,不光能监听子节点,也能监听节点自身。

3.3.1 代码示例 // 1. 创建TreeCache实例 TreeCache treeCache = new TreeCache(client, "/path"); // 2. 添加监听事件 treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { logger.info("事件:" + event.getType().toString().toUpperCase()); logger.info("触发节点:" + event.getData().getPath()); if (Objects.nonNull(event.getData())) { logger.info("触发节点数据:" + new String(event.getData().getData())); } } }); // 3. 启动 treeCache.start();

最后,说明下事件的类型,对应于节点的增加、修改、删除,TreeCache 的事件类型为: (1)NODE_ADDED (2)NODE_UPDATED (3)NODE_REMOVED 这一点,与Path Cache 的事件类型不同,与Path Cache 的事件类型为: (1)CHILD_ADDED (2)CHILD_UPDATED (3)CHILD_REMOVED


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #springboot #curator #区别对比1 #ZkClient11 #相关依赖