前言
Zookeeper是可以用来实现分布式锁的。
要了解它,我们先简单说下分布式锁吧。
我们知道,在系统中,当我们访问公共资源并对资源进行一些操作时,为防止出现问题,需要对公共资源依次访问,如常见的多线程售票模型等。由于在一个系统中,我们可以使用锁(如ReentrantLock)或者synchronized关键字等Java方法处理。
但是,当系统逐渐由单系统转换为分布式系统、微服务时,情况就变得复杂了,比如有某共享资源,比如有1个奖品,一个应用查询到有奖品并尝试发给用户A,另一个应用也查询到有奖品并尝试发给用户B,这样A,B均显示有奖品,实际上我们的奖品数量是不足的。如常见的秒杀系统,抽奖系统等。
这时候就需要一种全局的互斥机制来控制应用对共享资源的访问,这就是所谓的分布式锁。
PS:分布式锁的实现也可以基于缓存(如Redis)实现,亦可以通过数据库(乐观锁等)实现,实际中要确实使用到分布式锁,基于缓存的实现还是要偏多一些的。
根据上面所述,下面的图是比较好理解的。
我们再来说下分布式锁应具备的一些特点。
- 同一时间只允许一台机器(服务)的一个线程执行。
- 为整个系统必要业务提供服务,应当是高可用的。
- 性能应得以保证,不能在获取锁和释放锁过程中浪费太多资源或时间。
- 分布式锁应当具备失效机制,避免死锁发生。
- 应当具有可重入特性。
- 应当有非阻塞的特点,某个服务没有获取到锁,应返回获取失败,不能阻塞。
下面我们来用Zookeeper实现我们的分布式锁。
正文
我们在之前封装framework-zookeeper时,用到了下面这个依赖。
1 2 3 4 5
| <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.12.0</version> </dependency>
|
这里面有一个类InterProcessMutex,这是分布式锁使用的关键类。
PS:其实它已经实现了分布式锁,我们来使用下它吧。
我们创建一个Test,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @RunWith(SpringRunner.class) @SpringBootTest public class DemoApplicationTests { ExecutorService executorService = Executors.newCachedThreadPool(); static int TEST = 5; @Test public void test1() throws Exception{ for(int i=0;i<10;i++){ executorService.execute(()->{ try{ if(TEST>0){ doSomething(); } }catch (Exception e){ e.printStackTrace(); } }); } Thread.sleep(100000); } public void doSomething() throws Exception{ Thread.sleep(1000); TEST--; System.out.println(Thread.currentThread().getId()+"--"+Thread.currentThread().getName()+"执行,TEST="+TEST); } }
|
考虑到多应用分布式比较麻烦,我这里的Test使用多线程模拟分布式请求,用了线程池管理了10个线程,假设TEST静态变量为共享资源,如果TEST数量大于0的时候,我们执行doSomething方法,假设这个方法执行需要一秒,执行后TEST减一。
我们运行Test后,可以看到输出结果。
可以看到出现了负数。
我们引入InterProcessMutex,在判断TEST之前对其加分布式锁,锁的zk基路径我们定为/zwt/lock。调用acquire获取锁,完成业务逻辑后调用release方法释放锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Autowired CuratorZKClient client; @Test public void test1() throws Exception{ for(int i=0;i<10;i++){ executorService.execute(()->{ try{ InterProcessMutex interProcessMutex = new InterProcessMutex(client.getCuratorFramework(),"/zwt/lock"); interProcessMutex.acquire(); if(TEST>0){ doSomething(); } interProcessMutex.release(); }catch (Exception e){ e.printStackTrace(); } }); } Thread.sleep(100000); }
|
运行Test查看结果。
可以看到结果有序的减一最后到0结束。说明了InterProcessMutex实现了我们的分布式锁的功能。
它是如何实现的呢?
我们在上面的doSomething代码里加一些输出。如下:
1 2 3 4 5 6 7 8 9 10
| public void doSomething() throws Exception{ Thread.sleep(1000); List<String> list=client.getNodes("/zwt/lock"); list.forEach((e)->{ System.out.print(e+" "); }); System.out.println(); TEST--; System.out.println(Thread.currentThread().getId()+"--"+Thread.currentThread().getName()+"执行,TEST="+TEST); }
|
继续测试,如下结果:
数据大致如下:
可以看到,Zookeeper创建了10个临时顺序节点,每次会找到最小的节点并删除。其实这就是InterProcessMutex这个类的实现分布式锁的原理。
我们可以看下它的相关代码。
我们从acquire方法看起,调用了internalLock方法,而后调用了
attemptLock方法,这个方法会通过createsTheLock去创建锁。
可以看到createsTheLock方法里创建了临时有序节点。
再看下internalLockLoop这个方法,有些大。
可以看到会拿到有序的子节点,getSortedChildren。
然后尝试去获取锁(从最小的节点开始),getsTheLock会先获取比自己小的节点,要是自己是最小的节点就会获得锁。
拿到后就设置haveTheLock为true,没有拿到,就添加watcher,监听比自己小的节点。
然后根据设置的等待时间判断是否超时从而进行等待或者退出。
最后,如果到了时间或者出现异常,doDelete为true,就会删除节点。
再来看下release方法,里面的主要方法releaseLock,可以看到它调用了上面的deleteOurPath方法删除创建的临时节点。
在锁的获取和释放方法里可以看到下面这些地方,它可以保证我们的分布式锁具有可重入的性质。其通过lockCount(AtomicInteger )实现的,统计重入次数。
1
| final AtomicInteger lockCount = new AtomicInteger(1);
|
Zookeeper分布式锁的基本内容就是这些了,我们来总结下Zookeeper分布式锁的步骤:
- 指定一个存放锁目录(这儿我们指定的/zwt/lock)。
- 线程A想要获取锁,就需要在该目录下创建临时有序节点。
- 获取该目录下的所有子节点,然后获取比自己小的兄弟节点,如果不存在,说明自己是最小节点,那么就去获得锁。
- 线程B同线程A,创建好节点后获取目录下所有子节点,判断自己不是最小的,就会对获得锁的节点添加监听。
- 线程A处理完后释放锁,删除自己的节点,并通知,线程B监听后判断自己是不是最小节点,是的话会获取锁,不是的话在添加对当前获得锁的线程的监听。
通过上面我们可以看到Zookeeper分布式锁的一些优点,如高可用性(由Zookeeper保证)、可重入性、不会出现死锁(临时节点程序出现异常断开连接后会被删除也就失去了锁)等。
和一些缺点,如需要创建临时节点、删除临时节点,性能上肯定有一些影响。
其它
我们可以在对其进行简单封装形成自己的分布式锁工具类。
相关代码如下:
提供一个分布式锁的接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public interface DistributedReentrantLock {
boolean tryLock() throws InterruptedException;
boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException;
void unlock(); }
|
同时我们使用Zookeeper的InterProcessMutex去完成相关实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
| public class ZKDistributedReentrantLock implements DistributedReentrantLock { private static final Logger logger = LoggerFactory.getLogger(ZKDistributedReentrantLock.class);
private static final ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor( 10, new BasicThreadFactory.Builder().namingPattern("scheduled-pool-%d").build() );
public static final String ROOT_PATH = "/LOCK/";
private static final long DELAY_TIME_FOR_CLEAN = 1000;
private InterProcessMutex interProcessMutex;
private String path;
private CuratorFramework client; private volatile boolean isLockSuccess; public ZKDistributedReentrantLock(CuratorFramework client, String lockId) { this.client = client; this.path = ROOT_PATH + lockId; interProcessMutex = new InterProcessMutex(client, this.path); } public ZKDistributedReentrantLock(CuratorZKClient zkClient, String lockId) { this.client = zkClient.getCuratorFramework(); this.path = ROOT_PATH + lockId; interProcessMutex = new InterProcessMutex(client, this.path); }
@Override public boolean tryLock() throws InterruptedException { return tryLock(-1, null); }
@Override public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { try { isLockSuccess = interProcessMutex.acquire(timeout, unit); logger.debug("{} lock result:{}",this.path,isLockSuccess); return isLockSuccess; } catch (InterruptedException e) { throw e; } catch (Exception e) { logger.error(e.getMessage(),e); throw new RuntimeException(e.getMessage(),e); } }
@Override public void unlock() { if(isLockSuccess) { try { isLockSuccess = false; interProcessMutex.release(); } catch (Throwable e) { logger.error(e.getMessage(), e); } finally { executorService.schedule(new Cleaner(client, path), DELAY_TIME_FOR_CLEAN, TimeUnit.MILLISECONDS); } logger.debug("{} success unlock.",this.path); } } static class Cleaner implements Runnable { private CuratorFramework client; private String path; public Cleaner(CuratorFramework client, String path) { this.client = client; this.path = path; } @Override public void run() { try { List list = client.getChildren().forPath(path); if (list == null || list.isEmpty()) { client.delete().forPath(path); } } catch (KeeperException.NoNodeException | KeeperException.NotEmptyException e1) { } catch (Exception e) { logger.error(e.getMessage(), e); } } } }
|
实现还是比较简单的,tryLock方法主要就是使用了interProcessMutex的acquire方法,成功后记录isLockSuccess状态,失败后除了调用release方法、把isLockSuccess变为false外,还应尝试清除刚才已经创建的业务lockId节点(线程池)。
测试相关代码:
1 2 3 4 5 6 7 8 9 10 11 12 13
| ZKConfig zkConfig = new ZKConfig();
CuratorZKClient zkClient=new CuratorZKClient(zkConfig); DistributedReentrantLock lock = new ZKDistributedReentrantLock(zkClient,"test"); try{ if(lock.tryLock()){ } }catch (Exception e){ }finally { lock.unlock(); }
|
总结
通过对Zookeeper实现分布式锁的学习理解,我们又看到了Zookeeper的另外一个用途,对Zookeeper有了更深入的理解,也是蛮不错的一次学习体验。
有时间我会在研究下其它方式实现的分布式锁及其一些特点。