前言 我们在使用缓存型数据库时,如Redis ,为保证缓存数据库的高可用,提高缓存数据库的读写性能,最简单的方式是我们做缓存数据库的读写分离,组成Master-Master 或者Master-Slave 的形式,或者搭建缓存数据库集群。
当数据量更大时,和数据库类似,我们可以对缓存数据库进行“分库分表”操作。
在对缓存数据库进行“分库分表”操作中,便会涉及到分布式缓存算法的一些内容。我们来看下。
正文 现在假设我们有一个网站,需要使用缓存数据库Redis 存储图片资源,存储的格式为键值对,key值为图片名称,value为该图片所在文件服务器的路径,我们需要根据文件名查找该文件所在文件服务器上的路径,数据量大概有2000W左右,按照我们约定的规则进行分库,规则就是随机分配,我们可以部署8台缓存服务器,每台服务器大概含有500W条数据,并且进行主从复制,示意图如下:
由于规则随机,所以我们的数据可能存储在任何一组Redis 中,我们如果想找某张图片,比如“a.png”,则需要遍历4组Redis 查询,这显然是不能接受的。
我们可以使用数据库“分库分表”的一些规则来,如按照Hash值,按照类别,按照某个字段值来进行分配。
Redis 作为通用缓存数据库,我们按照数据key的Hash值来对数据分类较好,因为Hash值不涉及到具体的业务逻辑。
使用Hash 可以知道,我们使用Hash,每一张图片在分库时都可以定位到特定的服务器,如下图:
上图中,假设我们查找的是”a.png”,由于有4台服务器(排除从库),因此公式为hash(a.png) % 4 = 2 ,可知定位到了第2号服务器,这样的话就不会遍历所有的服务器,大大提升了性能。
使用Hash的问题 上述的方式虽然提升了性能,我们不再需要对整个Redis服务器进行遍历。但是,使用上述Hash算法进行缓存时,会出现一些缺陷,主要体现在服务器数量变动的时候,所有缓存的位置都要发生改变。
试想一下,如果4台缓存服务器已经不能满足我们的缓存需求,那么我们应该怎么做呢?
很简单,多增加几台缓存服务器不就行了。
假设我们增加了一台缓存服务器,那么缓存服务器的数量就由4台变成了5台。那么原本hash(a.png) % 4 = 2 的公式就变成了 hash(a.png) % 5 =? 。
假设有20个数据需要存储,在有4个Redis 节点的时候如下图:
当我们添加1个Redis 节点之后,数据的分布如下图所示:
上图中蓝色部分代表与4个节点时存储位置一致的数据,其命中率为:4/20=20%。也就是说这种情况带来的结果就是当服务器数量变动时,很多缓存的位置都要发生改变。
也就是当服务器数量发生改变时,所有缓存在一定时间内是失效的,当应用无法从缓存获取数据时,大量请求会调用数据库获取数据,导致数据库瘫痪(缓存雪崩)。
同样的,如果一台服务器出现故障,我们要将其移除,缓存服务器数量从4台变为3台,也会出现上述问题。
我们应该避免这种问题,这种问题是由于Hash算法本身的缘故,使用取模法进行缓存时,这种情况是无法避免的。
为了解决上述问题,一致性Hash算法诞生了。
一致性Hash算法 一致性Hash算法也是使用取模的方法,只是,上述的取模法是对服务器的数量进行取模,而一致性Hash算法是对2^32-1 取模。
简单来说,一致性Hash算法将整个Hash值空间组织成一个虚拟的圆环,如假设某Hash函数H的值空间为0-2^32-1 (即Hash值是一个32位无符号整形),整个Hash空间环如下:
整个空间按顺时针方向组织。0~2^32-1 在零点中方向重合。
下一步将各个服务器使用Hash进行处理,具体可以选择服务器的ip或主机名作为关键字进行Hash,这样每台机器就能确定其在Hash环上的位置,这里假设将上文中四台服务器使用ip地址Hash后在环空间的位置如下:
接下来使用如下算法定位数据访问到相应服务器:将数据key使用相同的函数Hash计算出哈希值,并确定此数据在环上的位置,从此位置沿环顺时针“行走”,第一台遇到的服务器就是其应该定位到的服务器。
例如我们有Object A 、Object B 、Object C 、Object D 四个数据对象,经过哈希计算后,在环空间上的位置如下:
根据一致性哈希算法,数据A 会被定为到Node A 上,B 被定为到Node B 上,C 被定为到Node C 上,D 被定为到Node D 上。
一致性Hash算法的容错性和可扩展性 下面分析一致性哈希算法的容错性和可扩展性。
现假设Node C 不幸宕机,可以看到此时对象A 、B 、D 不会受到影响,只有C 对象被重定位到Node D 。一般的,在一致性哈希算法中,如果一台服务器不可用,则受影响的数据仅仅是此服务器到其环空间中前一台服务器(即沿着逆时针方向行走遇到的第一台服务器)之间数据,其它不会受到影响。
下面考虑另外一种情况,如果在系统中增加一台服务器Node X ,如下图所示:
此时对象Object A 、B 、D 不受影响,只有对象C 需要重定位到新的Node X 。一般的,在一致性哈希算法中,如果增加一台服务器,则受影响的数据仅仅是新服务器到其环空间中前一台服务器(即沿着逆时针方向行走遇到的第一台服务器)之间数据,其它数据也不会受到影响。
综上所述,一致性哈希算法对于节点的增减都只需重定位环空间中的一小部分数据,具有较好的容错性和可扩展性。
一致性Hash算法的数据倾斜问题 另外,一致性哈希算法在服务节点太少时,容易因为节点分部不均匀而造成数据倾斜问题。例如系统中只有两台服务器,其环分布如下,
此时必然造成大量数据集中到Node A 上,而只有极少量会定位到Node B 上。
为了解决这种数据倾斜问题,一致性哈希算法引入了虚拟节点机制,即对每一个服务节点计算多个哈希,每个计算结果位置都放置一个此服务节点,称为虚拟节点。具体做法可以在服务器ip或主机名的后面增加编号来实现。
例如上面的情况,可以为每台服务器计算三个虚拟节点,于是可以分别计算 Node A#1 、Node A#2 、Node A#3 、Node B#1 、Node B#2 、Node B#3 的哈希值,于是形成六个虚拟节点:
同时数据定位算法不变,只是多了一步虚拟节点到实际节点的映射,例如定位到Node A#1 、Node A#2 、Node A#3 三个虚拟节点的数据均定位到Node A 上。这样就解决了服务节点少时数据倾斜的问题。
在实际应用中,通常将虚拟节点数设置为32甚至更大,因此即使很少的服务节点也能做到相对均匀的数据分布。
Java简单实现 下面我们通过代码来简单实现下一致性Hash算法的效果。如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 public class ConsistHash <K extends Comparable ,V > { public static final long MAX_VALUE = 0xffffffffL ; public static final long MIN_VALUE = 0 ; public final List<Long> list = Collections.synchronizedList(new ArrayList<>()); public final Map<Long,RedisServer<K,V>> cache = new HashMap<>(); private List<String> serverList; public ConsistHash (List<String> serverList) { this .serverList = serverList; if (CollectionUtils.isEmpty(serverList)){ throw new RuntimeException("服务器列表为空!" ); } initHashLoop(serverList); } private long hash (Object object) { long hash; int hashCode = object.hashCode(); if (hashCode < 0 ){ hash = (long )(-hashCode) + (long )Integer.MAX_VALUE; }else { hash = hashCode; } return hash; } private void initHashLoop (List<String> serverList) { for (String s : serverList) { long h = hash(s); list.add(h); cache.put(h,new RedisServer<>()); } Collections.sort(list); } public void set (K key,V value) { long h = hash(key); for (int i = 0 ; i < list.size(); i++) { long temp = list.get(i); if (i == 0 && h <= temp){ cache.get(temp).set(key,value); break ; } if (i+1 < list.size() && h > temp && h <= list.get(i+1 )){ cache.get(list.get(i+1 )).set(key,value); break ; } if (h > list.get(list.size() -1 )){ cache.get(list.get(0 )).set(key,value); break ; } } } public V get (K key) { long h = hash(key); for (int i = 0 ; i < list.size(); i++) { long temp = list.get(i); if (i == 0 && h <= temp){ return cache.get(temp).get(key); } if (i+1 < list.size() && h > temp && h <= list.get(i+1 )){ return cache.get(list.get(i+1 )).get(key); } if (h > list.get(list.size() -1 )){ return cache.get(list.get(0 )).get(key); } } return null ; } public void removeServer (String server) { Long h = hash(server); list.remove(h); cache.remove(h); } public void addServer (String server) { Long h = hash(server); list.add(h); Collections.sort(list); cache.put(h,new RedisServer<>()); } public void print () { StringJoiner sj1 = new StringJoiner("," ,"[" ,"]" ); list.forEach(e->{ sj1.add(e.toString()); }); System.out.println(sj1); StringJoiner sj2 = new StringJoiner("\n" ); cache.forEach((k,v)->{ StringJoiner sj3 = new StringJoiner("," ); v.map.forEach((a,b)->{ sj3.add("[" +a+"->" +b+"]" ); }); sj2.add(k+":{" +sj3.toString()+"}" ); }); System.out.println(sj2.toString()); } public class RedisServer <K extends Comparable ,V > { private final Map<K,V> map = new ConcurrentHashMap<>(); public V get (K key) { return map.get(key); } public void set (K key,V value) { map.put(key,value); } } public static void main (String[] args) { List<String> list = new ArrayList<>(); list.add("10.1.1.2" ); list.add("10.2.2.3" ); list.add("10.3.4.4" ); list.add("10.10.2.5" ); ConsistHash<String,String> consistHash = new ConsistHash<>(list); consistHash.set("key1111133132311" ,"key11111" ); consistHash.set("qweqrgdfgdgdgdgdf" ,"qweqrgdfgdgdgdgdf" ); consistHash.set("12232433sfddsf" ,"12232433sfddsf" ); consistHash.set("xsdsf567899" ,"xsdsf567899" ); consistHash.set("12qwqs" ,"12qwqs" ); consistHash.set("ccc" ,"ccc" ); consistHash.set("ffffffff" ,"ffffffff" ); System.out.println("宕机前:" ); consistHash.print(); consistHash.removeServer("10.1.1.2" ); System.out.println("宕机后:" ); consistHash.print(); System.out.println("数据重新分布:" ); consistHash.set("key1111133132311" ,"key11111" ); consistHash.set("qweqrgdfgdgdgdgdf" ,"qweqrgdfgdgdgdgdf" ); consistHash.set("12232433sfddsf" ,"12232433sfddsf" ); consistHash.set("xsdsf567899" ,"xsdsf567899" ); consistHash.set("12qwqs" ,"12qwqs" ); consistHash.set("ccc" ,"ccc" ); consistHash.set("ffffffff" ,"ffffffff" ); consistHash.print(); consistHash.addServer("10.1.1.2" ); System.out.println("添加一个服务器:" ); consistHash.set("key1111133132311" ,"key11111" ); consistHash.set("qweqrgdfgdgdgdgdf" ,"qweqrgdfgdgdgdgdf" ); consistHash.set("12232433sfddsf" ,"12232433sfddsf" ); consistHash.set("xsdsf567899" ,"xsdsf567899" ); consistHash.set("12qwqs" ,"12qwqs" ); consistHash.set("ccc" ,"ccc" ); consistHash.set("ffffffff" ,"ffffffff" ); consistHash.print(); } }
上述代码中,需要注意的点如下:
我们用了一个简易的RedisServer
类来模拟Redis 服务; Hash算法也非常简单,采用了Java 中对象的hashCode
方法,如果取到负值,变为正值并加上Integer.MAX_VALUE
; 其实上面的Hash算法直接用hashCode
方法也是可以的,当然Hash数据范围变为Integer.MIN_VALUE
- Integer.MAX_VALUE
; 我们运行测试类,可以看到其实服务器Hash后分布并不是很均匀的(数据倾斜),我们可以通过上面说的“虚拟节点”来解决,这儿就不上代码了。 总结 本文中,我们了解到了什么是一致性Hash算法,并通过代码加深了对算法的理解。
由于分布式系统中,每个节点都有可能新增或失效,如何保证系统能够正常运行,并对外提供稳定服务,一致性Hash算法即提供了处理这种问题的一个思路。
参考资料 一致性Hash算法原理 一致性Hash算法