前言
今天我们来总结下Redis在项目中的一些应用。
Redis在实际项目中除了可以作为缓存或者持久化数据库外,还能解决项目中遇到的一些棘手的问题。
正文
限流/防高频问题
这基本上属于项目中一个比较经典的问题了,我们以防止用户高频访问来举例,大多数的项目中都是通过Redis来解决高频访问问题的。
我们知道,对于高频访问问题,要有以下3要素:单位时间、单位时间限制访问次数、超频后的限制访问时间长。
我们定义一个Bean,控制这几个参数,当然也可以直接使用配置文件的方式进行配置等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class LimitRule {
private int seconds;
private int limitCount;
private int lockTime; public boolean enableLimitLock() { return getLockTime() > 0 && getLimitCount() > 0; } }
|
Redis有一种数据结构,名字为Zset,可以通过方法zadd添加元素,通过zcount统计记录数,我们可以用Zset的有序集的value来存放访问时间,判断超频时,只需要用zcount判断单位时间seconds内Zset里的元素数据是否超过limitCount即可,超过后即为该用户添加一个锁定lockTime的Redis key。
因此,我们代码大致如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| public class HighFreqLimit {
private static final String REDIS_VISIT_KEY_FORMAT = "redis.visit:user:zset:%s"; private static final String REDIS_LIMIT_KEY_FORMAT = "redis.limit:user:%s"; private RedisUtil redisUtil;
public HighFreqLimit(RedisUtil redisUtil){ this.redisUtil = redisUtil; }
public void checkLimit1(String userNo,LimitRule limitRule){
if(!limitRule.enableLimitLock()){ return; }
String redisKeyUserVisitZset = String.format(REDIS_VISIT_KEY_FORMAT,userNo); String redisUserFreqLimitKey = String.format(REDIS_LIMIT_KEY_FORMAT,userNo); if (redisUtil.exists(redisUserFreqLimitKey)) { throw new RuntimeException("您操作的太快了,请稍后访问"); }
long currentTimeMillis=System.currentTimeMillis(); String visitInfo = userNo +":"+ System.currentTimeMillis(); redisUtil.zadd(redisKeyUserVisitZset,System.currentTimeMillis(), visitInfo); redisUtil.expire(redisKeyUserVisitZset, limitRule.getSeconds()); long startTimeMillis = currentTimeMillis - limitRule.getSeconds() * 1000; long visitCount = redisUtil.zcount(redisKeyUserVisitZset, startTimeMillis, currentTimeMillis); if (visitCount > limitRule.getLimitCount()) { redisUtil.setString(redisUserFreqLimitKey, visitInfo); redisUtil.expire(redisUserFreqLimitKey, limitRule.getLockTime()); throw new RuntimeException("您操作的太快了,请稍后访问"); } } }
|
Redis在2.6版本后支持Lua表达式,因此我们也可以构建使用Lua表达式来解决上述问题。
构建Lua表达式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| private static final String REDIS_VISIT_KEY_FORMAT2 = "redis.visit:user:lua:%s";
public void checkLimit2(String userNo, LimitRule limitRule) { String redisKeyUserVisit = String.format(REDIS_VISIT_KEY_FORMAT2,userNo); long count; List<String> keys = new ArrayList<String>(); keys.add(redisKeyUserVisit); List<String> args = new ArrayList<String>(); args.add(limitRule.getLimitCount() + ""); args.add(limitRule.getSeconds() + ""); args.add(limitRule.getLockTime() + ""); count = Long.parseLong(redisUtil.getJedisFactory().getJedisCluster().eval(buildLuaScript(limitRule), keys, args) + ""); if(count > limitRule.getLimitCount()){ throw new RuntimeException("您操作的太快了,请稍后访问"); } } private String buildLuaScript(LimitRule limitRule) { StringBuilder lua = new StringBuilder(); lua.append("\nlocal c"); lua.append("\nc = redis.call('get',KEYS[1])"); lua.append("\nif c and tonumber(c) > tonumber(ARGV[1]) then"); lua.append("\nreturn c;"); lua.append("\nend"); lua.append("\nc = redis.call('incr',KEYS[1])"); lua.append("\nif tonumber(c) == 1 then"); lua.append("\nredis.call('expire',KEYS[1],ARGV[2])"); lua.append("\nend"); if (limitRule.enableLimitLock()) { lua.append("\nif tonumber(c) > tonumber(ARGV[1]) then"); lua.append("\nredis.call('expire',KEYS[1],ARGV[3])"); lua.append("\nend"); } lua.append("\nreturn c;"); return lua.toString(); }
|
对于上述表达式,KEYS[1]即为redisKeyUserVisit,可以看到先进行取值,如果有值并且值比limitCount大就返回了,根据后面count > limitRule.getLimitCount()
的判断说明已经超频了,如果不大于该值,则进行自增,如果该值是1,说明单位时间第一次访问,就设置它的单位时间过期,然后如果该值超频后会这是这个Key的过期时间为lockTime。
序列号生成问题
项目中另一种常见的情况就是流水号的生成了,很多业务流水号有如下格式 XXXX2019040100001 等,我最近的一个项目就有类似的复杂需求,这种情况下我们可以使用Redis来生成某一天的自增流水号,大致如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| public class SequenceUtils {
private RedisUtil redisUtil;
public SequenceUtils(RedisUtil redisUtil){ this.redisUtil = redisUtil; }
private static final int DEFAULT_LENGTH = 5; private static final int ONE_DAY_TIME = 24*60*60;
private static final String REDIS_CACHE_KEY = "redis.serialnumber:%s:%s";
private String getSequenceWithZeroPrefix(long seq) { String str = String.valueOf(seq); int len = str.length(); if (len >= DEFAULT_LENGTH) { throw new RuntimeException("Sequence generate failed!"); } int rest = DEFAULT_LENGTH - len; StringBuilder sb = new StringBuilder(); for (int i = 0; i < rest; i++) { sb.append('0'); } sb.append(str); return sb.toString(); }
private String getSequenceNoZeroPrefix(long seq){ return String.valueOf(seq); }
public String generate(String bizCode,boolean needZero){ String date = DateFormatUtils.format(new Date(),"yyyyMMdd"); String key = String.format(REDIS_CACHE_KEY,bizCode,date); long sequence = redisUtil.incr(key); redisUtil.expire(key,ONE_DAY_TIME);
String seq; if(needZero){ seq = getSequenceWithZeroPrefix(sequence); }else{ seq = getSequenceNoZeroPrefix(sequence); } StringBuffer sb = new StringBuffer(); sb.append(bizCode).append(date).append(seq);
return sb.toString(); } }
|
为保证绝对可靠,还可以进行改善,当Redis拿不到值时可以去数据库初始化今天的起始流水号等,这儿不再过多介绍,可以看到主要就是利用了Redis的自增incr和指定时间过期expire这两个关键方法。
分布式锁
还可以使用Redis做分布式锁,相比较之前说的Zookeeper实现分布式锁,使用Redis实现分布式锁,最明显的优点就是指令为内存操作,速度较快,性能较高;但缺点也比较明显,使用Redis实现分布式锁较为复杂,需要考虑超时、原子性、误删等情形,较为复杂,且由于没有等待锁的队列,等待锁只能依靠客户端自旋,效率较为低下。反观ZK实现的分布式锁,有等待锁的队列,但是添加删除节点性能较低。
我们使用Redis来简单实现一个分布式锁。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
| public class RedisLock {
private final static String LOCK_PREFIX="redis.lock:%s";
private RedisUtil redisUtil;
public RedisLock(RedisUtil redisUtil){ this.redisUtil = redisUtil; }
public long tryLock(String key,long lockTimeOut){ key= String.format(LOCK_PREFIX,key); long expireTime = 0; expireTime = System.currentTimeMillis() + lockTimeOut +1; if(redisUtil.setStringIfNotExists(key, String.valueOf(expireTime))==1){ return expireTime; }else { String curLockTimeStr = redisUtil.getString(key); if (StringUtils.isBlank(curLockTimeStr) || System.currentTimeMillis() > Long.valueOf(curLockTimeStr)) { expireTime = System.currentTimeMillis() + lockTimeOut +1; curLockTimeStr = redisUtil.getSet(key, String.valueOf(expireTime)); if (StringUtils.isBlank(curLockTimeStr) || System.currentTimeMillis() > Long.valueOf(curLockTimeStr)){ return expireTime; }else { return 0; } }else { return 0; } } }
public long lock(String key,long lockTimeOut,long perSleep) throws InterruptedException{ key= String.format(LOCK_PREFIX,key); long starttime = System.currentTimeMillis(); long sleep = (perSleep==0 ? lockTimeOut/ 10 : perSleep); long expireTime = 0; for (;;) { expireTime = System.currentTimeMillis() + lockTimeOut +1; if (redisUtil.setStringIfNotExists(key, String.valueOf(expireTime)) == 1) { return expireTime; }else { String curLockTimeStr = redisUtil.getString(key); if (StringUtils.isBlank(curLockTimeStr) || System.currentTimeMillis() > Long.valueOf(curLockTimeStr)) { expireTime = System.currentTimeMillis() + lockTimeOut +1; curLockTimeStr = redisUtil.getSet(key, String.valueOf(expireTime)); if (StringUtils.isBlank(curLockTimeStr) || System.currentTimeMillis() > Long.valueOf(curLockTimeStr)){ return expireTime; }else { Thread.sleep(sleep); } }else { Thread.sleep(sleep); } } if (lockTimeOut > 0 && ((System.currentTimeMillis() - starttime) >= lockTimeOut)) { expireTime = 0; return expireTime; } } }
public void unlock(String key,long expireTime){ key= String.format(LOCK_PREFIX,key); if (System.currentTimeMillis()-expireTime>0) { return ; } String curLockTimeStr = redisUtil.getString(key); if (StringUtils.isNotBlank(curLockTimeStr) && Long.valueOf(curLockTimeStr)>System.currentTimeMillis()) { redisUtil.delKey(key); } } }
|
可以看到在等待锁的阶段,如果设置超时时间,则客户端只能自旋等待锁,如果在指定时间内未获得锁,就会超时。
可以看到主要逻辑是,首先使用setStringIfNotExists (对应Redis的set str NX 命令)方法尝试设置key,如果成功说明获得锁,返回超时时间;如果不成功说明已经有程序在使用该锁,需要判断剩余过期时间,如果没有剩余过期时间,再尝试获得锁,否则线程sleep剩余过期时间。
可以看到,我们需要指定锁的使用时间,如果不指定时间,有可能会造成死锁等问题。
配置中心
Redis也可以用来实现配置中心的相关功能。
Redis 做配置中心,需要结合数据库来实现以确保稳定性。
数据库设计一张配置表用于存储配置数据,在Redis可以将数据存入哈希表来进行处理。
当然我们新增修改或者删除数据时需要同时对Redis和数据库进行操作。
并可以添加数据从Redis刷新到数据库和从数据库刷新到Redis等功能。
部分代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
| public class RedisConfigCenter {
public static final String CONFIG_CENTER_KEY = "redis.configcenter:hash:key"; private RedisUtil redisUtil; public RedisConfigCenter(RedisUtil redisUtil){ this.redisUtil = redisUtil; }
public boolean insertData(Map<String,String> insertData){ boolean exists = redisUtil.exists(CONFIG_CENTER_KEY); if(exists){ Map<String,String> redisMap = redisUtil.hashGetAll(CONFIG_CENTER_KEY); insertData.putAll(redisMap); } redisUtil.hashMultipleSet(CONFIG_CENTER_KEY,insertData);
return true; }
public boolean updateData(Map<String,String> updateData){ boolean exists = redisUtil.exists(CONFIG_CENTER_KEY); if(!exists){ throw new RuntimeException("请先新增数据!"); } Map<String,String> redisMap = redisUtil.hashGetAll(CONFIG_CENTER_KEY); redisMap.putAll(updateData); redisUtil.hashMultipleSet(CONFIG_CENTER_KEY,updateData); return true; }
public boolean deleteData(List<String> deleteKeys){ boolean exists = redisUtil.exists(CONFIG_CENTER_KEY); if(!exists){ throw new RuntimeException("请先新增数据!"); } Map<String,String> redisMap = redisUtil.hashGetAll(CONFIG_CENTER_KEY); deleteKeys.forEach(key->{ redisMap.remove(key); }); redisUtil.hashMultipleSet(CONFIG_CENTER_KEY,redisMap); return true; }
public Map<String,String> selectData(){ Map<String,String> map = new HashMap<>(); boolean exists = redisUtil.exists(CONFIG_CENTER_KEY); if(!exists){ return map; } map = redisUtil.hashGetAll(CONFIG_CENTER_KEY); return map; }
public boolean refreshToRedis(){ Map<String,String> map = new HashMap<>(); redisUtil.hashMultipleSet(CONFIG_CENTER_KEY,map); return true; }
public boolean refreshToDataBase(){ Map<String,String> map = redisUtil.hashGetAll(CONFIG_CENTER_KEY); return true; } }
|
当然,为保证数据可靠性,可以启动两个线程,指定时间从Redis刷新到数据库或者从数据库刷新到Redis的功能。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
|
private static boolean refreshRedisThreadStop = false; private static boolean refreshDataBaseThreadStop = false;
ExecutorService executorService = Executors.newFixedThreadPool(2);
public void init() { executorService.submit(() ->{ while (!refreshRedisThreadStop) { try { TimeUnit.MINUTES.sleep(30); refreshToRedis(); } catch (Exception e) { e.printStackTrace(); } } }); executorService.submit(() ->{ while (!refreshDataBaseThreadStop) { try { TimeUnit.MINUTES.sleep(30); refreshToDataBase(); } catch (Exception e) { e.printStackTrace(); } } }); }
|
结语
通过以上对Redis在应用中的各个功能的应用,让我们对Redis的应用有了更深入的了解,以及对Redis的应用场景有了更加深刻的认识。
Redis 在项目中基本上已经是很平常的存在了,如何使用好它,解决棘手问题乃是我们的重中之重。