start
保证部分请求可被正常处理,确保系统在高负载情况下仍能运行,避免部分用户占用过多资源;
限流的对象:
IP:存在误判的可能,即将不同的用户看作同一个用户,因为IP可能相同; MAC/设备标识符/设备序列号:web端少用,一方面属于隐私不好获取,另一方面MAC在链路层协议中,仅在局域网内可见,当请求传输到互联网时,MAC地址不会被保留,所以也没办法拿到; 客户端:用户/请求; 服务端:服务实例;
限流的阈值设定:
通过压测获得接口能承受的最大并发量; 或者说设置错误率的限制,如果请求数量大于阈值(防止请求量过少,错误率抖动),并且错误率也超过阈值,则可以开始限流; 实际也可以通过业务的预估并发量设置,或者说预估真实用户的请求速度,当频繁触及阈值时,可以考虑提供阈值;
基于redis,使用lua脚本实现限流:
由于分布式环境下,请求经过负载均衡后可能落在不同的机器上,需要redis计数(单位时间内访问次数); 当限流过程中,出现系统错误(即大概率redis宕机),此时的请求可以选择:下游接口承受能力弱时,干掉,避免打挂下游接口;下游接口承受能力强或者可用性要求高时,保留,保证系统处理的请求尽可能正常运行;
ddos:
防御参考Cloudflare;
限流策略
- 设定请求数量的最大阈值;
- 对请求进行分类,不同请求应用不同限流规则;
- 向用户提供反馈,如错误信息、重试时间;
- 可以考虑在高流量期间放宽限制;
Nginx代理层
http {
# 定义一个限流区域,使用共享内存存储状态
# $binary_remote_addr 表示按客户端的 IP 地址进行限流
# zone=mylimit:10m 表示分配 10MB 的共享内存用于存储请求计数信息,区域命名为 mylimit
# rate=1r/s 表示每个 IP 每秒允许一个请求
limit_req_zone $binary_remote_addr zone=mylimit:10m rate=1r/s;
server {
# 监听80端口
listen 80;
# 定义一个location块,用于匹配特定的请求路径
location /api/ {
# 应用限流规则
# burst=5 表示允许的瞬时突发流量,最多可以允许 5 个突发请求超出速率限制
# 但超过这个数量的请求会被延迟或拒绝
# nodelay 表示超过速率限制但在 burst 允许范围内的请求不会被延迟,而是立即处理。
limit_req zone=mylimit burst=5 nodelay;
# 代理请求到后端服务
proxy_pass http://backend/;
}
}
}
令牌桶
原理:
- 令牌生成:系统按照固定的速率生成令牌,并放入令牌桶中;
- 令牌存储:令牌桶有一个固定的容量上限,当令牌桶满时,多余的令牌被丢弃;
- 请求处理:每个请求到来时,需要从令牌桶中取出相应数量的令牌。如果桶中有足够的令牌,请求被处理;否则,请求被拒绝或延迟直到有足够的令牌; 特点:
- 突发流量处理:可以处理突发流量,因为令牌可以积累,当有突发请求时,可以使用积累的令牌;
- 通过令牌生成速率控制平均速率;
- 方便应对并发出口流量;
- 令牌生产速度固定,消费速度不固定;
// 利用channel生产/消费令牌
type TokenBucketLimit struct {
intervalPerSecond int64
buckets chan struct{}
closeCh chan struct{}
}
func NewTokenBucketLimit(interval int64, capacity int) *TokenBucketLimit {
return &TokenBucketLimit{
intervalPerSecond: interval,
buckets: make(chan struct{}, capacity),
}
}
func (l *TokenBucketLimit) NewServerInterceptor() grpc.UnaryServerInterceptor {
interval := time.Second / time.Duration(l.intervalPerSecond)
ticker := time.NewTicker(interval)
go func() {
defer ticker.Stop()
for {
select {
case <-l.closeCh:
return
case <-ticker.C:
select {
case l.buckets <- struct{}{}:
default:
}
}
}
}()
// 此处可调整限流的粒度
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
select {
case <-l.buckets:
return handler(ctx, req)
default: // 并发越高,越不能阻塞,如果不高,此处也可以阻塞等待,直到超时
return nil, errors.New("too many requests")
}
}
}
// Close 仅可调用一次
func (l *TokenBucketLimit) Close() {
close(l.closeCh)
}
// 用锁保证令牌数的并发安全,也可换原子变量等
type TokenBucket struct {
mu sync.Mutex
capacity int // 桶的容量
tokens int // 桶中当前的令牌数
refillRate float64 // 是令牌的每秒的填充速率
lastRefill time.Time // 记录上次填充令牌的时间
}
func NewTokenBucket(capacity int, refillRate float64) *TokenBucket {
// 初始化时桶可被填满,tokens 和 capacity 相等
return &TokenBucket{
capacity: capacity,
tokens: capacity,
refillRate: refillRate,
lastRefill: time.Now(),
}
}
// Allow 方法用于判断当前请求是否被允许。
func (t *TokenBucket) Allow() bool {
t.mu.Lock()
defer t.mu.Unlock()
now := time.Now()
// 计算自上次填充以来经过的秒数
timeElapsed := float64(now.Unix() - t.lastRefill.Unix())
// 计算应该添加的令牌数。
tokensToAdd := t.refillRate * timeElapsed
// 更新令牌数,但不超过桶的容量
t.tokens += int(tokensToAdd)
if t.tokens > t.capacity {
t.tokens = t.capacity
}
// 如果桶中有令牌,则移除一个令牌并允许请求通过
if t.tokens > 0 {
t.tokens--
t.lastRefill = now
return true
}
// 拒绝请求
return false
}
漏桶
原理:
- 请求进入漏桶:每个请求到来时进入漏桶队列;
- 请求处理:系统按照固定的速率从漏桶中处理请求(或“漏出”请求);
- 队列管理:漏桶有固定的容量上限,当漏桶满时,新的请求被丢弃; 特点:
- 不支持突发流量:不能有效处理突发流量,因为请求速率是固定的;
- 请求处理速率是固定的;
- 流入速率不固定,流出速率固定;
// 类似于令牌桶的实现,只不过桶的容量为0
type TokenBucketLimit struct {
intervalPerSecond int64
buckets chan struct{}
closeCh chan struct{}
}
func NewTokenBucketLimit(interval int64) *TokenBucketLimit {
return &TokenBucketLimit{
intervalPerSecond: interval,
buckets: make(chan struct{}),
}
}
func (l *TokenBucketLimit) process() {
interval := time.Second / time.Duration(l.intervalPerSecond)
ticker := time.NewTicker(interval)
for {
defer ticker.Stop()
select {
case <-l.closeCh:
return
case <-ticker.C:
<-l.buckets
}
}
}
func (l *TokenBucketLimit) NewServerInterceptor() grpc.UnaryServerInterceptor {
go l.process()
// 此处可调整限流的粒度
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
select {
case l.buckets <-struct{}{}:
return handler(ctx, req)
default: // 并发越高,越不能阻塞,如果不高,此处也可以阻塞等待,直到超时
return nil, errors.New("too many requests")
}
}
}
// Close 仅可调用一次
func (l *TokenBucketLimit) Close() {
close(l.closeCh)
}
滑动窗口
可使用redis的lua脚本实现:
注:
PEXPIRE
是用来防止数据无限增长的一种方式,而ZREMRANGEBYSCORE
则是为了保证限流算法的准确性;
local key = KEYS[1] -- 限流对象(Redis 键名),通常是限流的用户或操作
local window = tonumber(ARGV[1]) -- 时间窗口的大小(单位为毫秒)
local threshold = tonumber(ARGV[2]) -- 限流的阈值,表示在窗口期内允许的最大操作次数
local now = tonumber(ARGV[3]) -- 当前时间戳(单位为毫秒),外部传入
local min = now - window -- 窗口的起始时间
redis.call('ZREMRANGEBYSCORE', key, '-inf', min) --从有序集合 key 中移除所有时间戳早于 min 的成员
local cnt = redis.call('ZCOUNT', key, '-inf', '+inf')
if cnt >= threshold then
return "true" -- 执行限流
else
redis.call('ZADD', key, now, now) -- 把 score 和 member 都设置成 now
redis.call('PEXPIRE', key, window)
return "false"
end
go本地实现(对内存和计算要求更高,简单实现,没考虑其他逻辑):
// SlidingWindowLimiter 结构体实现滑动窗口限流算法。
type SlidingWindowLimiter struct {
mutex sync.Mutex
counters []int
limit int
windowStart time.Time
windowDuration time.Duration
interval time.Duration
}
// NewSlidingWindowLimiter 构造函数初始化 SlidingWindowLimiter 实例。
func NewSlidingWindowLimiter(limit int, windowDuration time.Duration, interval time.Duration) *SlidingWindowLimiter {
buckets := int(windowDuration / interval)
return &SlidingWindowLimiter{
counters: make([]int, buckets),
limit: limit,
windowStart: time.Now(),
windowDuration: windowDuration,
interval: interval,
}
}
// Allow 方法用于判断当前请求是否被允许,并实现滑动窗口的逻辑。
func (s *SlidingWindowLimiter) Allow() bool {
s.mutex.Lock()
defer s.mutex.Unlock()
// 检查是否需要滑动窗口
if time.Since(s.windowStart) > s.windowDuration {
s.slideWindow()
}
now := time.Now()
index := int((now.UnixNano() - s.windowStart.UnixNano()) / s.interval.Nanoseconds()) % len(s.counters)
if s.counters[index] < s.limit {
s.counters[index]++
return true
}
return false
}
// slideWindow 方法实现滑动窗口逻辑,移除最旧的时间段并重置计数器。
func (s *SlidingWindowLimiter) slideWindow() {
// 滑动窗口,忽略最旧的时间段
copy(s.counters, s.counters[1:])
// 重置最后一个时间段的计数器
s.counters[len(s.counters)-1] = 0
// 更新窗口开始时间
s.windowStart = time.Now()
}