irpas技术客

zookeeper应用实战之分布式锁_、楽._zookeeper分布式锁

irpas 3217

1. 什么是分布式锁?

我们先来看这样一个场景,如下图所示,两个用户同时去抢购秒杀商品,当秒杀服务同时收到秒杀请求时,都去进行库存扣减,此时在没有做任何处理的情况下,就会导致库存数量变成负数从而导致超卖现象。

这种情况下如果是单体项目,我们一般会选择加锁的方式来避免并发的问题。但是在分布式场景中,采用传统的锁并不能解决跨进程并发的问题,所以需要引入一个分布式锁,来解决多个节点之间的访问控制。

2. Zookeeper如何实现分布式锁

实现分布式的方式有很多种,本文主要讲述如何使用zookeeper实现分布式锁。我们可以基于zookeeper的两种特性来实现分布式锁,首先我们来看第一种:

2.1 唯一节点特性

我们可以基于唯一节点特性来实现分布式锁的操作,如下图所示。多个应用程序去抢占锁资源时,只需要在指定节点上创建一个 /Lock 节点,由于Zookeeper中节点的唯一性特性,使得只会有一个用户成功创建 /Lock 节点,剩下没有创建成功的用户表示竞争锁失败。

这种方法虽然能达到目的,但是会有一个问题,如下图所示,假设有非常多的节点需要等待获得锁,那么等待的方式自然是使用watcher机制来监听/lock节点的删除事件,一旦发现该节点被删除说明之前获得锁的节点已经释放了锁,那么此时剩下的B、C、D节点会同时收到删除事件从而去竞争锁,这个过程会产生惊群效应。

什么是“惊群效应”呢?简单来说就是如果存在许多的客户端在等待获取锁,当成功获取到锁的进程释放该节点后,所有处于等待状态的客户端都会被唤醒,这个时候zookeeper会在短时间内发送大量子节点变更事件给所有待获取锁的客户端,然后实际情况是只会有一个客户端获得锁。如果在集群规模比较大的情况下,会对zookeeper服务器的性能产生比较的影响。

2.2 有序节点

为了解决惊群效应,我们可以采用Zookeeper的有序节点特性来实现分布式锁。

如下图所示,每个客户端都往指定的节点下注册一个临时有序节点,越早创建的节点,节点的顺序编号就越小,那么我们可以判断子节点中最小的节点设置为获得锁。如果自己的节点不是所有子节点中最小的,意味着还没有获得锁。这个的实现和前面单节点实现的差异性在于,每个节点只需要监听比自己小的节点,当比自己小的节点删除以后,客户端会收到watcher事件,此时再次判断自己的节点是不是所有子节点中最小的,如果是则获得锁,否则就不断重复这个过程,这样就不会导致羊群效应,因为每个客户端只需要监控一个节点。

使用有序节点实现分布式锁的流程大致如下:

3. Curator实现分布式锁

在日常开发种,我们无需自己去实现分布式锁,只需要使用Curator即可实现分布式锁。

为了实现分布式锁,我们先演示一个存在并发异常的场景。

3.1 商品抢购场景

SQL

DROP TABLE IF EXISTS `goods_stock`; CREATE TABLE `goods_stock` ( `id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT, `goods_no` int(11) NOT NULL COMMENT '商品编号', `stock` int(11) NULL DEFAULT NULL COMMENT '库存', `isActive` smallint(6) NULL DEFAULT NULL COMMENT '是否上架(1上,0不是)', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 3 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;

整个项目采用spring boot+mybatis-plus框架,代码一键生成。主要编写controller层代码即可:

这个抢购接口乍一看好像没啥问题,但实际上是存在问题的,因为他不具有原子性,在高并发场景下会造成数据多扣减。

我们可以使用jmeter对这个接口进行压测,用1500个线程,库存数量设置成100,监视数据库中库存的变化发现,整个库存变化过程是非常混乱的。可能一会数字变小,但是一会又变大了…

@RestController @RequestMapping("/goods-stock") public class GoodsStockController { @Autowired private IGoodsStockService goodsStockService; @GetMapping("/{goodsNo}") public String purchase(@PathVariable("goodsNo") Integer goodsNo) throws Exception { QueryWrapper<GoodsStock> queryWrapper = new QueryWrapper<>(); queryWrapper.eq("goods_no", goodsNo); GoodsStock goodsStock = goodsStockService.getOne(queryWrapper); Thread.sleep(new Random().nextInt(1000)); //增加问题出现的频率 if (goodsStock == null) { return "指定商品不存在"; } if (goodsStock.getStock().intValue() < 1) { return "库存不够"; } goodsStock.setStock(goodsStock.getStock() - 1); boolean res = goodsStockService.updateById(goodsStock); if (res) { return "抢购:" + goodsNo + "成功"; } return "抢购失败"; } } 3.2 引入Zookeeper实现分布式锁

针对以上问题,我们可以引入zookeeper。curator客户端对于锁这块做了一些封装,curator提供了InterProcessMutex 这样一个api。除了分布式锁之外,还提供了leader选举、分布式队列等常用的功能。

InterProcessMutex:分布式可重入排它锁

InterProcessSemaphoreMutex:分布式排它锁

InterProcessReadWriteLock:分布式读写锁

具体的使用方法如下:

引入pom

<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>5.2.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.2.0</version> </dependency>

CuratorConfig

@Configuration public class CuratorConfig { @Bean public CuratorFramework curatorFramework() { CuratorFramework curatorFramework = CuratorFrameworkFactory .builder() .connectString("localhost:2181") .sessionTimeoutMs(15000) .connectionTimeoutMs(20000) .retryPolicy(new ExponentialBackoffRetry(1000, 10)) .build(); curatorFramework.start(); return curatorFramework; } }

修改接口,增加锁机制

@Scope(scopeName = "prototype") @RestController @RequestMapping("/goods-stock") public class GoodsStockController { @Autowired private IGoodsStockService goodsStockService; @Autowired private CuratorFramework curatorFramework; @GetMapping("/{goodsNo}") public String purchase(@PathVariable("goodsNo") Integer goodsNo) throws Exception { QueryWrapper<GoodsStock> queryWrapper = new QueryWrapper<>(); queryWrapper.eq("goods_no", goodsNo); //基于临时有序节点来实现的分布式锁. InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/Locks"); try { lock.acquire(); //抢占分布式锁资源(阻塞的) GoodsStock goodsStock = goodsStockService.getOne(queryWrapper); Thread.sleep(new Random().nextInt(1000)); if (goodsStock == null) { return "指定商品不存在"; } if (goodsStock.getStock().intValue() < 1) { return "库存不够"; } goodsStock.setStock(goodsStock.getStock() - 1); boolean res = goodsStockService.updateById(goodsStock); if (res) { return "抢购书籍:" + goodsNo + "成功"; } } finally { lock.release(); //释放锁 } return "抢购失败"; } }

修改完成后,我们继续通过jmeter压测,可以看到就不存在库存超卖问题了。

4. Curator实现分布式锁的源码分析

前面我们已经理解的Zookeeper实现分布式锁的原理,以及基于Curator完成了分布式锁的使用,那么我们继续来分析Curator是如何基于代码实现这一过程。

4.1 抢占锁的逻辑 Curator构造函数 InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) { this.threadData = Maps.newConcurrentMap(); // maxLeases=1,表示可以获得分布式锁的线程数量(跨JVM)为1,即为互斥锁 // 锁节点的名称前缀,lock-0000001, 后面部分是有序递增的序列号 this.basePath = PathUtils.validatePath(path); // internals的类型为LockInternals,InterProcessMutex将分布式锁的申请和释放操作委托 给internals执行 this.internals = new LockInternals(client, driver, path, lockName, maxLeases); } acquire方法

调用acquire方法,该方法有两个重载方法,另外一个是带超时时间,当等待超时没有获得锁则放弃锁的占用。

public void acquire() throws Exception { //无限等待的阻塞方法 if (!this.internalLock(-1L, (TimeUnit)null)) { throw new IOException("Lost connection while trying to acquire lock: " + this.basePath); } } internalLock private boolean internalLock(long time, TimeUnit unit) throws Exception { //得到当前线程 Thread currentThread = Thread.currentThread(); //使用threadData存储线程重入的情况 InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread); if (lockData != null) { //同一线程再次acquire,首先判断当前的映射表内(threadData)是否有该线程的锁信息,如果有则原子+1,然后返回 lockData.lockCount.incrementAndGet(); return true; } else { // 映射表内没有对应的锁信息,尝试通过LockInternals获取锁 String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes()); if (lockPath != null) { // 成功获取锁,记录信息到映射表 InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath); this.threadData.put(currentThread, newLockData); return true; } else { return false; } } } // 锁信息 // Zookeeper中一个临时顺序节点对应一个“锁”,但让锁生效激活需要排队(公平锁),下面会继续分析 private static class LockData { final Thread owningThread; final String lockPath; final AtomicInteger lockCount; private LockData(Thread owningThread, String lockPath) { this.lockCount = new AtomicInteger(1); // 分布式锁重入次数 this.owningThread = owningThread; this.lockPath = lockPath; } } attemptLock

尝试获得锁,实际上是向zookeeper注册一个临时有序节点,并且判断当前创建的节点的顺序是否是最小节点。如果是则表示获得锁成功:

// 尝试获取锁,并返回锁对应的Zookeeper临时顺序节点的路径 String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception { long startMillis = System.currentTimeMillis(); // 无限等待时,millisToWait为null Long millisToWait = unit != null ? unit.toMillis(time) : null; // 创建ZNode节点时的数据内容,无关紧要,这里为null byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes; int retryCount = 0; // 当前已经重试次数,与CuratorFramework的重试策略有关 // 在Zookeeper中创建的临时顺序节点的路径,相当于一把待激活的分布式锁 // 激活条件:同级目录子节点,名称排序最小(排队,公平锁) String ourPath = null; boolean hasTheLock = false; // 是否已经完成尝试获取分布式锁的操作 boolean isDone = false; while(!isDone) { isDone = true; try { // 从InterProcessMutex的构造函数可知实际driver为 StandardLockInternalsDriver的实例 // 在Zookeeper中创建临时顺序节点 ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes); // 循环等待来激活分布式锁,实现锁的公平性 hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath); } catch (NoNodeException var14) { // 容错处理,不影响主逻辑的理解,可跳过 // 因为会话过期等原因,StandardLockInternalsDriver因为无法找到创建的临时顺序节点而抛出NoNodeException异常 if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) { throw var14; } isDone = false; } } // 成功获得分布式锁,返回临时顺序节点的路径,上层将其封装成锁信息记录在映射表,方便锁重入 return hasTheLock ? ourPath : null; } createsTheLock

在Zookeeper中创建临时顺序节点.

public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception { String ourPath; // lockNodeBytes不为null则作为数据节点内容,否则采用默认内容(IP地址) if (lockNodeBytes != null) { // creatingParentContainersIfNeeded:用于创建容器节点 // withProtection:临时子节点会添加GUID前缀 ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path, lockNodeBytes); } else { // CreateMode.EPHEMERAL_SEQUENTIAL:临时顺序节点,Zookeeper能保证在节点产生的顺序性 // 依据顺序来激活分布式锁,从而也实现了分布式锁的公平性 ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path); } return ourPath; } internalLockLoop

循环等待来激活分布式锁,实现锁的公平性

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception { boolean haveTheLock = false; // 是否已经持有分布式锁 boolean doDelete = false; // 是否需要删除子节点 try { if (this.revocable.get() != null) { ((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath); } //在没有获得锁的情况下持续循环 while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) { List<String> children = this.getSortedChildren(); // 获取排序后的子节点列表 // 获取前面自己创建的临时顺序子节点的名称 String sequenceNodeName = ourPath.substring(this.basePath.length() + 1); // 实现锁的公平性的核心逻辑 PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, sequenceNodeName, this.maxLeases); if (predicateResults.getsTheLock()) { // 获得了锁,中断循环,继续返回上层 haveTheLock = true; } else { // 没有获得到锁,监听上一临时顺序节点 String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch(); synchronized(this) { try { // exists()会导致导致资源泄漏,因此exists()可以监听不存在的 ZNode,因此采用getData() // 上一临时顺序节点如果被删除,会唤醒当前线程继续竞争锁,正常情况下 能直接获得锁,因为锁是公平的 ((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath); if (millisToWait == null) { //是否有超时机制 this.wait(); //不限时等待 } else { millisToWait = millisToWait - (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if (millisToWait > 0L) { this.wait(millisToWait); //限时等待被唤醒 } else { doDelete = true; // 获取锁超时,标记删除之前创建的临时顺序节点 break; } } } catch (NoNodeException var19) { } } } } } catch (Exception var21) { ThreadUtils.checkInterrupted(var21); doDelete = true; // 标记删除,在finally删除之前创建的临时顺序节点(后台不断尝试) throw var21; // 重新抛出,尝试重新获取锁 } finally { if (doDelete) { this.deleteOurPath(ourPath); //删除当前节点 } } return haveTheLock; } driver.getsTheLock

StandardLockInternalsDriver

public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception { // 之前创建的临时顺序节点在排序后的子节点列表中的索引 int ourIndex = children.indexOf(sequenceNodeName); // 校验之前创建的临时顺序节点是否有效 validateOurIndex(sequenceNodeName, ourIndex); // 锁公平性的核心逻辑 // 由InterProcessMutex的构造函数可知,maxLeases为1,即只有ourIndex为0时,线程才能持有锁,或者说该线程创建的临时顺序节点激活了锁 // Zookeeper的临时顺序节点特性能保证跨多个JVM的线程并发创建节点时的顺序性,越早创建临时顺序节点成功的线程会更早地激活锁或获得锁 boolean getsTheLock = ourIndex < maxLeases; // 如果已经获得了锁,则无需监听任何节点,否则需要监听上一顺序节点(ourIndex-1) // 因为锁是公平的,因此无需监听除了(ourIndex-1)以外的所有节点,这是为了减少羊群效应,非常巧妙的设计!! String pathToWatch = getsTheLock ? null : (String)children.get(ourIndex - maxLeases); // 返回获取锁的结果,交由上层继续处理(添加监听等操作) return new PredicateResults(pathToWatch, getsTheLock); } static void validateOurIndex(String sequenceNodeName, int ourIndex) throws KeeperException { if (ourIndex < 0) { // 容错处理,可跳过 // 由于会话过期或连接丢失等原因,该线程创建的临时顺序节点被Zookeeper服务端删除,往外抛出NoNodeException // 如果在重试策略允许范围内,则进行重新尝试获取锁,这会重新重新生成临时顺序节点 throw new NoNodeException("Sequential path not found: " + sequenceNodeName); } } 4.2 释放锁的逻辑 release public void release() throws Exception { Thread currentThread = Thread.currentThread(); InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread); if (lockData == null) { // 无法从映射表中获取锁信息,表示当前没有持有锁 throw new IllegalMonitorStateException("You do not own the lock: " + this.basePath); } else { // 锁是可重入的,初始值为1,原子-1到0,锁才释放 int newLockCount = lockData.lockCount.decrementAndGet(); if (newLockCount <= 0) { if (newLockCount < 0) { throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + this.basePath); } else { try { // lockData != null && newLockCount == 0,释放锁资源 this.internals.releaseLock(lockData.lockPath); } finally { // 最后从映射表中移除当前线程的锁信息 this.threadData.remove(currentThread); } } } } } releaseLock final void releaseLock(String lockPath) throws Exception { // 移除订阅事件 this.client.removeWatchers(); this.revocable.set((Object)null); // 删除临时顺序节点,只会触发后一顺序节点去获取锁,理论上不存在竞争,只排队,非抢占,公平锁,先到先得 this.deleteOurPath(lockPath); } private void deleteOurPath(String ourPath) throws Exception { try { // 后台不断尝试删除 ((ChildrenDeletable)this.client.delete().guaranteed()).forPath(ourPath); } catch (NoNodeException var3) { } } 4.3 锁撤销

InterProcessMutex支持一种协商撤销互斥锁的机制, 可以用于死锁的情况。

想要撤销一个互斥锁可以调用下面这个方法:

void makeRevocable(RevocationSpec entry) { this.revocable.set(entry); }

这个方法可以让锁持有者来处理撤销动作。 当其他进程/线程想要你释放锁时,就会回调参数中的监听器方法。 但是,此方法不是强制撤销的,是一种协商机制。

当想要去撤销/释放一个锁时,可以通过 Revoker 中的静态方法来发出请求:

Revoker.attemptRevoke();

public static void attemptRevoke(CuratorFramework client, String path) throws Exception { try { client.setData().forPath(path, LockInternals.REVOKE_MESSAGE); } catch (NoNodeException var3) { } } path :加锁的zk节点path,通常可以通过 InterProcessMutex.getParticipantNodes() 获得。

这个方法会发出撤销某个锁的请求。如果锁的持有者注册了上述的 RevocationListener监听器,那么就会调用监听器方法协商撤销锁。

5. 项目地址

zookeeper-demo


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #zookeeper分布式锁 #1 #2