C# 实现Zookeeper分布式锁的参考示例

Zookeeper分布式锁的原理是巧妙的是使用了znode临时节点的特点和监听(watcher)机制,监听机制很简单,就是我们可以给znode添加一个监听器,当znode节点状态发生改变时(如:数据内容改变,节点被删除),会通知到监听器。本文讲解使用c#实现该功能

  分布式锁 

  互联网初期,我们系统一般都是单点部署,也就是在一台服务器完成系统的部署,后期随着用户量的增加,服务器的压力也越来越大,响应速度越来越慢,甚至出现服务器崩溃的情况。

  为解决服务器压力太大,响应慢的特点,分布式系统部署出现了。

  简单的说,就是我们将系统资源部署到多台服务器中,然后使用一台服务器做入口代理,根据一些决策将接收到的请求转发到资源服务器,这也就是我们常说的 反向代理(一般就是使用nginx)

   虽然分布式解决了服务器压力的问题,但也带来了新的问题。

  比如,我们有一个下单统计的功能,当完成下单后,需要执行统计功能,而在高访问的情况下,可能有两个下单请求(A和B)同时完成,然后一起执行了统计功能,这样可能导致的结果就是A请求未将B请求数据统计在内,而B请求可能也未将A请求数据统计在内,这样就造成了数据的统计错,这个问题的产生的根本原因就是统计功能的并发导致的,如果是单点部署的系统,我们简单的使用一个锁操作就能完成了,但是在分布式环境下,A和B请求可能同时运行在两个服务器中,普通的锁就不能起到效果了,这个时候就要使用分布式锁了。 

  Zookeeper分布式锁原理

  分布式锁的实现发放有多种,简单的,我们可以使用数据库表去实现它,也可以使用redis去实现它,这里要使用的Zookeeper去实现分布式锁

  Zookeeper分布式锁的原理是巧妙的是使用了znode临时节点的特点和监听(watcher)机制,监听机制很简单,就是我们可以给znode添加一个监听器,当znode节点状态发生改变时(如:数据内容改变,节点被删除),会通知到监听器。

  前面几节介绍过znode有三种类型  

  PERSISTENT:持久节点,即使在创建该特定znode的客户端断开连接后,持久节点仍然存在。默认情况下,除非另有说明,否则所有znode都是持久的。
  EPHEMERAL:临时节点,客户端是连接状态时,临时节点就是有效的。当客户端与ZooKeeper集合断开连接时,临时节点会自动删除。临时节点不允许有子节点。临时节点在leader选举中起着重要作用。
  SEQUENTIAL:顺序节点,可以是持久的或临时的。当一个新的znode被创建为一个顺序节点时,ZooKeeper通过将10位的序列号附加到原始名称来设置znode的路径,顺序节点在锁定和同步中起重要作用。

  其中,顺序节点,可以是持久的或临时的,而临时节点有个特点,就是它属于创建它的那个会话,当会话断开,临时节点就会自动删除,如果在临时节点上注册了监听器,那么监听器就会收到通知,如果临时节点有了时间顺序,那我们为实现分布式锁就又有一个想法:

  假如在Zookeeper中有一个znode节点/Locker

  1、当client1连接Zookeeper时,先判断/Locker节点是否存在子节点,如果没有子节点,那么会在/Locker节点下创建一个临时顺序的znode节点,假如是/client1,表示client1获取了锁状态,client1可以继续执行。

  2、当client2连接Zookeeper时,先判断/Locker节点是否存在子节点,发现已经存在子节点了,然后获取/Locker下的所有子节点,同时按时间顺序排序,在最后一个节点,也就是/client1节点上注册一个监听器(watcher1),同时在/Locker节点下创建一个临时顺序的znode节点,假如是/client2。同时client2将被阻塞,而阻塞状态的释放是在监听器(watcher1)中的。

  3、当client3连接Zookeeper时,先判断/Locker节点是否存在子节点,发现已经存在子节点了,然后获取/Locker下的所有子节点,同时按时间顺序排序,在最后一个节点,也就是/client2节点上注册一个监听器(watcher2),同时在/Locker节点下创建一个临时顺序的znode节点,假如是/client3。同时client2将被阻塞,而阻塞状态的释放是在监听器(watcher2)中的。

  以此类推。

  4、当client1执行完操作了,断开Zookeeper的连接,因为/client1是临时顺序节点,于是将会自动删除,而client2已经往/client1节点中注册了一个监听器(watcher1),于是watcher1将会受到通知,而watcher1又会释放client2的阻塞状态。于是client2获取锁状态,继续执行。

  5、当client2执行完操作了,断开Zookeeper的连接,因为/client2是临时顺序节点,于是将会自动删除,而client3已经往/client2节点中注册了一个监听器(watcher2),于是watcher2将会受到通知,而watcher2又会释放client3的阻塞状态。于是client3获取锁状态,继续执行。

  以此类推。

  这样,不管分布式环境中有几台服务器,都可以保证程序的排队似的执行了。

  C#实现Zookeeper分布式锁

  上一节有封装过一个ZookeeperHelper的辅助类(Zookeeper基础教程(四):C#连接使用Zookeeper),使用这个辅助类实现了一个ZookeeperLocker类: 

 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace AspNetCore.ZookeeperConsole { ///  /// 基于Zookeeper的分布式锁 ///  public class ZookeeperLocker : IDisposable { ///  /// 单点锁 ///  static object locker = new object(); ///  /// Zookeeper集群地址 ///  string[] address; ///  /// Zookeeper操作辅助类 ///  ZookeeperHelper zookeeperHelper; ///  /// 构造函数 ///  /// 分布式锁的根路径 /// 集群地址 public ZookeeperLocker(string lockerPath, params string[] address) : this(lockerPath, 0, address) { } ///  /// 构造函数 ///  /// 分布式锁的根路径 /// 回话过期时间 /// 集群地址 public ZookeeperLocker(string lockerPath, int sessionTimeout, params string[] address) { this.address = address.ToArray(); zookeeperHelper = new ZookeeperHelper(address, lockerPath); if (sessionTimeout > 0) { zookeeperHelper.SessionTimeout = sessionTimeout; } if (!zookeeperHelper.Connect()) { throw new Exception("connect failed:" + string.Join(",", address)); } lock (locker) { if (!zookeeperHelper.Exists())//根节点不存在则创建 { zookeeperHelper.SetData("", "", true); } } } ///  /// 生成一个锁 ///  /// 返回锁名 public string CreateLock() { var path = Guid.NewGuid().ToString().Replace("-", ""); while (zookeeperHelper.Exists(path)) { path = Guid.NewGuid().ToString().Replace("-", ""); } return CreateLock(path); } ///  /// 使用指定的路径名称设置锁 ///  /// 锁名,不能包含路径分隔符(/) /// 返回锁名 public string CreateLock(string path) { if (path.Contains("/")) { throw new ArgumentException("invalid path"); } return zookeeperHelper.SetData(path, "", false, true); } ///  /// 获取锁 ///  /// 锁名 /// 如果获得锁返回true,否则一直等待 public bool Lock(string path) { return LockAsync(path).GetAwaiter().GetResult(); } ///  /// 获取锁 ///  /// 锁名 /// 超时时间,单位:毫秒 /// 如果获得锁返回true,否则等待指定时间后返回false public bool Lock(string path, int millisecondsTimeout) { return LockAsync(path, millisecondsTimeout).GetAwaiter().GetResult(); } ///  /// 异步获取锁等等 ///  /// 锁名 /// 如果获得锁返回true,否则一直等待 public async Task LockAsync(string path) { return await LockAsync(path, System.Threading.Timeout.Infinite); } ///  /// 异步获取锁等等 ///  /// 锁名 /// 超时时间,单位:毫秒 /// 如果获得锁返回true,否则等待指定时间后返回false public async Task LockAsync(string path, int millisecondsTimeout) { var array = await zookeeperHelper.GetChildrenAsync("", true); if (array != null && array.Length > 0) { var first = array.FirstOrDefault(); if (first == path)//正好是优先级最高的,则获得锁 { return true; } var index = array.ToList().IndexOf(path); if (index > 0) { //否则添加监听 var are = new AutoResetEvent(false); var watcher = new NodeWatcher(); watcher.NodeDeleted += (ze) => { are.Set(); }; if (await zookeeperHelper.WatchAsync(array[index - 1], watcher))//监听顺序节点中的前一个节点 { if (!are.WaitOne(millisecondsTimeout)) { return false; } } are.Dispose(); } else { throw new InvalidOperationException($"no locker found in path:{zookeeperHelper.CurrentPath}"); } } return true; } ///  /// 释放资源 ///  public void Dispose() { zookeeperHelper.Dispose(); } } }

  现在写个程序可以模拟一下

 using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace AspNetCore.ZookeeperConsole { class Program { static void Main(string[] args) { //Zookeeper连接字符串,采用host:port格式,多个地址之间使用逗号(,)隔开 string[] address = new string[] { "192.168.209.133:2181", "192.168.209.133:2181", "192.168.209.133:2181" }; //会话超时时间,单位毫秒 int sessionTimeOut = 10000; //锁节点根路径 string lockerPath = "/Locker"; for (var i = 0; i <10; i++) { string client = "client" + i; //多线程模拟并发 new Thread(() => { using (ZookeeperLocker zookeeperLocker = new ZookeeperLocker(lockerPath, sessionTimeOut, address)) { string path = zookeeperLocker.CreateLock(); if (zookeeperLocker.Lock(path)) { //模拟处理过程 Console.WriteLine($"【{client}】获得锁:{DateTime.Now}"); Thread.Sleep(3000); Console.WriteLine($"【{client}】处理完成:{DateTime.Now}"); } else { Console.WriteLine($"【{client}】获得锁失败:{DateTime.Now}"); } } }).Start(); } Console.ReadKey(); } } }

  运行结果如下:

   

   可以发现,锁功能是实现了的

  如果程序运行中打印日志:Client session timed out, have not heard from server in 8853ms for sessionid 0x1000000ec5500b2

  或者直接抛出异常:org.apache.zookeeper.KeeperException.ConnectionLossException:“Exception_WasThrown”

  只需要适当调整sessionTimeOut时间即可

以上就是C# 实现Zookeeper分布式锁的参考示例的详细内容,更多请关注0133技术站其它相关文章!

赞(0) 打赏
未经允许不得转载:0133技术站首页 » 其他教程