前言 在分布式环境下,为了应对高并发,通常有以下几种手段,优先级从高到低依次为缓存、限流、降级、熔断。
缓存即是缓存热点数据,以便请求来时可以做出快速反映,减小数据库压力。
降级和熔断广义上来讲也算是限流的一种,本质上也是阻止请求进来。
今天这篇文章我们主要来讲一讲限流的两种算法以及实现方式。
正文 限流的意义 对于一个对外服务应用,我们为什么要限流?它的意义在哪里呢?
我们知道,对于对外应用,有很多情况会导致流量增大:
需要注意流量变“大”是相对的,相对于我们服务所能承受的流量。
比如我们服务支持10000QPS的处理能力,如果每秒处理5000个请求,显然不大,但如果服务只支持1000QPS的处理能力,那每秒5000的请求对于服务器显然“大”了。
如果长时间这样,显然会导致我们的服务熔断或挂掉,为了保证服务器稳定,我们自然要对流量进行控制,这就是限流。
PS:限流的优先级在缓存之后,因此对于这种情况,我们应首先尽可能的提高服务的QPS能力,优化逻辑,缓存热点数据等。
漏桶算法 算法内容 漏桶算法(Leaky Bucket)的原理比较简单:水(请求)先进入到漏桶里,人为设置一个最大出水速率,漏桶以<=出水速率的速度出水,当水流入速度过大会直接溢出(拒绝服务)。
如下图:
该算法思想如下:
比如设置请求速率为1000QPS,容量池为5000,当请求小于1000QPS时,正常进行处理; 当请求为2000QPS时,每秒处理1000个请求后会剩下1000个请求; 当第5s时,容量已满,后续新请求丢弃,直到容量池内有请求被处理掉。 可以看到这种算法强制限制请求速率,缺点是十分明显的:
无法面对突发的大流量 —— 比如上述例子,设置请求速率为1000QPS,应用在绝大多数情况下请求小于1000QPS,突然某天活动,用户访问激增,那么此种限流会导致大多数用户无法正常访问服务。 无法有效利用资源 —— 虽然我们服务处理能力是1000QPS,但这不是绝对的,比如前6s为2000QPS,后面时间为500QPS,一小段时间服务器资源是可以承受这段请求压力的,但是漏桶算法这种情况下会丢弃一部分请求。 相关代码实现 相关代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class LeakyBucketLimit { private final int RATE = 10 ; private final int BURST = 50 ; private long lastTime = System.currentTimeMillis() /1000 ; private int count; private void refresh () { long now = System.currentTimeMillis() / 1000 ; count = (int )Math.max(0 ,count - (now - lastTime)*RATE); lastTime = now; } public boolean limit () { refresh(); if (count < BURST){ count++; return true ; }else { return false ; } } public static void main (String[] args) throws Exception { LeakyBucketLimit limit = new LeakyBucketLimit(); for (int i = 0 ; i < 100 ; i++) { System.out.println(i+"->" +limit.limit()); } } }
运行结果如下:
可以看到当达到容量50时请求就会被拒绝。
PS:需要注意的是,实际中当请求完成后,会被释放,池子里请求是动态增加减少的过程。
通常来说,实际中使用漏桶算法来进行限流的场景并不多。
令牌桶算法 算法内容 令牌桶算法(Token Bucket)是网络流量整形(Traffic Shaping)和限流(Rate Limiting)中最常使用的一种算法,它可用于控制发送到网络上数据的数量并允许突发数据的发送。
令牌桶算法可以认为是对漏桶算法的一种改进,主要在于令牌桶算法在限制平均调用速率的同时还允许一定程度的突发调用。
令牌桶算法的实现原理如下图:
整个过程如下:
系统以恒定的速率产生令牌,然后将令牌放入令牌桶中; 令牌桶有一个容量,当令牌桶满了的时候,再向其中放入的令牌就会被丢弃; 每次一个请求过来,需要从令牌桶中获取一个令牌,假设有令牌,那么提供服务;假设没有令牌,那么拒绝服务。 现在我们来看下为什么令牌桶可以防止一定程度上的突发流量。
假设我们想要调用速率为1000QPS,那么设置令牌的生成速度1000个/s,第一秒请求800个,那么会余下200个令牌,在第二秒请求时,就支持1200个请求了。
需要注意的是令牌的生成并不是无上限的,也必须有个容量,比如令牌的生成速度1000个/s,前5秒请求200个/s,那么第6秒支持4000个请求,但这远超系统承受能力,因此令牌桶算法需要设置桶中令牌的上限。
由于令牌桶算法允许一定程度的突发调用,因此在限流场景里比漏桶算法更加广泛。
RateLimiter Google 开源工具包Guava 提供了限流工具类RateLimiter
,该类就是基于令牌桶算法来完成限流,非常易于使用。
RateLimiter
经常用于限制对一些物理资源或者逻辑资源的访问速率。它支持两种获取permits
接口,一种是如果拿不到立刻返回false,一种会阻塞等待一段时间看能不能拿到。
RateLimiter
和 Java 中的信号量(java.util.concurrent.Semaphore
)类似,Semaphore
通常用于限制并发量。
我们通过Maven 引入相关Jar 包。
1 2 3 4 5 6 <dependency > <groupId > com.google.guava</groupId > <artifactId > guava</artifactId > <version > 30.1.1-jre</version > </dependency >
找到RateLimiter
,我们在源码注释中可以看到如下一些代码及说明。
比如我们需要处理一个任务列表,但我们不希望每秒的任务提交超过两个:
1 2 3 4 5 6 7 final RateLimiter rateLimiter = RateLimiter.create(2.0 ); void submitTasks (List<Runnable> tasks, Executor executor) { for (Runnable task : tasks) { rateLimiter.acquire(); executor.execute(task); } }
想象下我们制造了一个数据流,并希望以每秒5kb的速率处理它。可以通过要求每个字节代表一个许可,然后指定每秒5000个许可来完成:
1 2 3 4 5 6 final RateLimiter rateLimiter = RateLimiter.create(5000.0 );void submitPacket (byte [] packet) { rateLimiter.acquire(packet.length); networkService.send(packet); }
有一点非常重要,那就是请求的许可数从来不会影响到请求本身的限制(调用acquire(1) 和调用acquire(1000) 将得到相同的限制效果,如果存在这样的调用的话),但会影响下一次请求的限制,也就是说,如果一个高开销的任务抵达一个空闲的RateLimiter
,它会被马上许可,但是下一个请求会经历额外的限制,从而来偿付高开销任务。注意:RateLimiter
并不提供公平性的保证。
因此,可以认为RateLimiter
支持预消费的能力。突发流量的处理,在令牌桶算法中有两种方式,一种是有足够的令牌才能消费,一种是先消费后还令牌。
RateLimiter相关源码 RateLimiter
其实是一个abstract类
,但是它提供了几个static
方法用于创建RateLimiter
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public abstract class RateLimiter { public static RateLimiter create (double permitsPerSecond) { } public static RateLimiter create (double permitsPerSecond, long warmupPeriod, TimeUnit unit) { } public static RateLimiter create (double permitsPerSecond, Duration warmupPeriod) { return create(permitsPerSecond, toNanosSaturated(warmupPeriod), TimeUnit.NANOSECONDS); } }
RateLimiter
提供了两个获取令牌的方法,不带参数表示获取一个令牌。如果没有令牌则一直等待,返回等待的时间(单位为秒),没有被限流则直接返回0.0。
1 2 public double acquire () ;public double acquire (int permits) ;
尝试获取令牌,分为待超时时间和不带超时时间两种。
1 2 3 4 5 6 7 8 public boolean tryAcquire () ;public boolean tryAcquire (int permits) ;public boolean tryAcquire (long timeout, TimeUnit unit) ;public boolean tryAcquire (int permits, long timeout, TimeUnit unit) ;
我们从acquire
来看RateLimiter
如何实现限流。相关代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public double acquire (int permits) { long microsToWait = reserve(permits); stopwatch.sleepMicrosUninterruptibly(microsToWait); return 1.0 * microsToWait / SECONDS.toMicros(1L ); } final long reserve (int permits) { checkPermits(permits); synchronized (mutex()) { return reserveAndGetWaitLength(permits, stopwatch.readMicros()); } } final long reserveAndGetWaitLength (int permits, long nowMicros) { long momentAvailable = reserveEarliestAvailable(permits, nowMicros); return max(momentAvailable - nowMicros, 0 ); } abstract long reserveEarliestAvailable (int permits, long nowMicros) ;
RateLimiter
的具体实现SmoothRateLimiter
里的reserveEarliestAvailable
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 final long reserveEarliestAvailable (int requiredPermits, long nowMicros) { resync(nowMicros); long returnValue = nextFreeTicketMicros; double storedPermitsToSpend = min(requiredPermits, this .storedPermits); double freshPermits = requiredPermits - storedPermitsToSpend; long waitMicros = storedPermitsToWaitTime(this .storedPermits, storedPermitsToSpend) + (long ) (freshPermits * stableIntervalMicros); this .nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); this .storedPermits -= storedPermitsToSpend; return returnValue; } abstract long storedPermitsToWaitTime (double storedPermits, double permitsToTake) ;
storedPermitsToWaitTime
方法有两种实现,因为获取存储的令牌由于资源不足有可能需要等待时间。
一种是资源确实不足,这些剩余的资源我们是可以继续使用的;另一种是提供资源的服务过去还没准备好(预热期)。
为此,RateLimiter
对于storedPermitsToWaitTime
有两种实现策略,即SmoothBursty
和SmoothWarmingUp
。
SmoothBursty SmoothBursty
使用storedPermits
不需要额外等待时间。并且默认maxBurstSeconds
为1,因此maxPermits
为permitsPerSecond
,即最多可以存储1秒的剩余令牌,比如QPS=5,则maxPermits=5。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 static final class SmoothBursty extends SmoothRateLimiter {final double maxBurstSeconds;SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) { super (stopwatch); this .maxBurstSeconds = maxBurstSeconds; } @Override void doSetRate (double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = this .maxPermits; maxPermits = maxBurstSeconds * permitsPerSecond; if (oldMaxPermits == Double.POSITIVE_INFINITY) { storedPermits = maxPermits; } else { storedPermits = (oldMaxPermits == 0.0 ) ? 0.0 : storedPermits * maxPermits / oldMaxPermits; } } @Override long storedPermitsToWaitTime (double storedPermits, double permitsToTake) { return 0L ; } @Override double coolDownIntervalMicros () { return stableIntervalMicros; } }
SmoothWarmingUp SmoothWarmingUp
的maxPermits
等于预热(warmup)期间能产生的令牌数,比如QPS=4,warmup为2秒,则maxPermits=8。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 static final class SmoothWarmingUp extends SmoothRateLimiter { private final long warmupPeriodMicros; private double slope; private double thresholdPermits; private double coldFactor; SmoothWarmingUp( SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) { super (stopwatch); this .warmupPeriodMicros = timeUnit.toMicros(warmupPeriod); this .coldFactor = coldFactor; } @Override void doSetRate (double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = maxPermits; double coldIntervalMicros = stableIntervalMicros * coldFactor; thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros; maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros); slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits); if (oldMaxPermits == Double.POSITIVE_INFINITY) { storedPermits = 0.0 ; } else { storedPermits = (oldMaxPermits == 0.0 ) ? maxPermits : storedPermits * maxPermits / oldMaxPermits; } } @Override long storedPermitsToWaitTime (double storedPermits, double permitsToTake) { double availablePermitsAboveThreshold = storedPermits - thresholdPermits; long micros = 0 ; if (availablePermitsAboveThreshold > 0.0 ) { double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake); double length = permitsToTime(availablePermitsAboveThreshold) + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake); micros = (long ) (permitsAboveThresholdToTake * length / 2.0 ); permitsToTake -= permitsAboveThresholdToTake; } micros += (long ) (stableIntervalMicros * permitsToTake); return micros; } private double permitsToTime (double permits) { return stableIntervalMicros + permits * slope; } @Override double coolDownIntervalMicros () { return warmupPeriodMicros / maxPermits; } }
RateLimiter的限制 需要注意的是RateLimiter
虽然很强大,但是它只支持单机环境。
比如我们服务集群中有5台服务器,要保证1000QPS的集群接口调用量,RateLimiter
就无法实现了。
集群控流通常的做法是采用Redis 来进行限制,主要有两种方式:
固定窗口计数 结合lua脚本,实现分布式的令牌桶算法 我们这儿就暂不做详细讨论。
总结 本文我们介绍了漏桶算法和令牌桶算法,了解了它们各自的特点。
对漏桶算法进行了简单的代码实现,对令牌桶算法我们分析了Guava 提供了限流工具类RateLimiter
。
除了讨论了它们的特点,也说明了它们的一些不足。
不过无论哪种限流算法,都有自己的应用之处,具体场景需要具体分析。