Zookeeper 伪集群安装部署操作 Java操作Zookeeper实现分布式锁和队列
* 分布式事物 * CAP理论 * CAP定理的应用 * BASE理论 * 2PC提交 * 3PC提交 * zookeeper集群的角色 * ZAB协议的核心思想 * ZAB协议两种模式 * ZAB协议消息广播流程图 * ZAB协议奔溃恢复 * SpringBoot集成Zookeeper实战
阶段1:提交事物请求
阶段二:执行事物提交
中断事务
优缺点
ZAB协议的核心是定义了对于那些会改变Zookeeper服务器数据状态的事务请求的处理方式即:
pom.xml引入zk依赖
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.5</version> </dependency>
application.yml配置
zookeeper: address: 127.0.0.1:2181 timeout: 4000
ZookeeperConfig配置类
@Configuration public class ZookeeperConfig { private static final Logger logger = LoggerFactory.getLogger(ZookeeperConfig.class); @Value("${zookeeper.address}") private String connectString; @Value("${zookeeper.timeout}") private int timeout; @Bean(name = "zkClient") public ZooKeeper zkClient(){ ZooKeeper zooKeeper=null; try { final CountDownLatch countDownLatch = new CountDownLatch(1); //连接成功后,会回调watcher监听,此连接操作是异步的,执行完new语句后,直接调用后续代码 // 可指定多台服务地址 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 zooKeeper = new ZooKeeper(connectString, timeout, new Watcher() { @Override public void process(WatchedEvent event) { if(Event.KeeperState.SyncConnected==event.getState()){ //如果收到了服务端的响应事件,连接成功 countDownLatch.countDown(); } } }); countDownLatch.await(); logger.info("【初始化ZooKeeper连接状态....】={}",zooKeeper.getState()); }catch (Exception e){ logger.error("初始化ZooKeeper连接异常....】={}",e); } return zooKeeper; } }
zk客户端zpi封装工具类
@Component public class ZkApi { private static final Logger logger = LoggerFactory.getLogger(ZkApi.class); @Autowired private ZooKeeper zkClient; /** * 判断指定节点是否存在 * @param path * @param needWatch 指定是否复用zookeeper中默认的Watcher * @return */ public Stat exists(String path, boolean needWatch){ try { return zkClient.exists(path,needWatch); } catch (Exception e) { logger.error("【断指定节点是否存在异常】{},{}",path,e); return null; } } /** * 检测结点是否存在 并设置监听事件 * 三种监听类型: 创建,删除,更新 * * @param path * @param watcher 传入指定的监听类 * @return */ public Stat exists(String path,Watcher watcher ){ try { return zkClient.exists(path,watcher); } catch (Exception e) { logger.error("【断指定节点是否存在异常】{},{}",path,e); return null; } } /** * 创建持久化节点 * @param path * @param data */ public boolean createNode(String path, String data){ try { zkClient.create(path,data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); return true; } catch (Exception e) { logger.error("【创建持久化节点异常】{},{},{}",path,data,e); return false; } } /** * 修改持久化节点 * @param path * @param data */ public boolean updateNode(String path, String data){ try { //zk的数据版本是从0开始计数的。如果客户端传入的是-1,则表示zk服务器需要基于最新的数据进行更新。如果对zk的数据节点的更新操作没有原子性要求则可以使用-1. //version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查 zkClient.setData(path,data.getBytes(),-1); return true; } catch (Exception e) { logger.error("【修改持久化节点异常】{},{},{}",path,data,e); return false; } } /** * 删除持久化节点 * @param path */ public boolean deleteNode(String path){ try { //version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查 zkClient.delete(path,-1); return true; } catch (Exception e) { logger.error("【删除持久化节点异常】{},{}",path,e); return false; } } /** * 获取当前节点的子节点(不包含孙子节点) * @param path 父节点path */ public List<String> getChildren(String path) throws KeeperException, InterruptedException{ List<String> list = zkClient.getChildren(path, false); return list; } /** * 获取指定节点的值 * @param path * @return */ public String getData(String path,Watcher watcher){ try { Stat stat=new Stat(); byte[] bytes=zkClient.getData(path,watcher,stat); return new String(bytes); }catch (Exception e){ e.printStackTrace(); return null; } } /** * 测试方法 初始化 */ @PostConstruct public void init(){ String path="/zk-watcher-2"; logger.info("【执行初始化测试方法。。。。。。。。。。。。】"); createNode(path,"测试"); String value=getData(path,new WatcherApi()); logger.info("【执行初始化测试方法getData返回值。。。。。。。。。。。。】={}",value); // 删除节点出发 监听事件 deleteNode(path); } }
实现Watcher监听
public class WatcherApi implements Watcher { private static final Logger logger = LoggerFactory.getLogger(WatcherApi.class); @Override public void process(WatchedEvent event) { logger.info("【Watcher监听事件】={}",event.getState()); logger.info("【监听路径为】={}",event.getPath()); logger.info("【监听的类型为】={}",event.getType()); // 三种监听类型: 创建,删除,更新 } }
原文链接:https://blog.csdn.net/u010391342/article/details/100404588