start

  保证部分请求可被正常处理,确保系统在高负载情况下仍能运行,避免部分用户占用过多资源;

限流的对象:

IP:存在误判的可能,即将不同的用户看作同一个用户,因为IP可能相同; MAC/设备标识符/设备序列号:web端少用,一方面属于隐私不好获取,另一方面MAC在链路层协议中,仅在局域网内可见,当请求传输到互联网时,MAC地址不会被保留,所以也没办法拿到; 客户端:用户/请求; 服务端:服务实例;

限流的阈值设定:

  通过压测获得接口能承受的最大并发量;   或者说设置错误率的限制,如果请求数量大于阈值(防止请求量过少,错误率抖动),并且错误率也超过阈值,则可以开始限流; 实际也可以通过业务的预估并发量设置,或者说预估真实用户的请求速度,当频繁触及阈值时,可以考虑提供阈值;

基于redis,使用lua脚本实现限流:

由于分布式环境下,请求经过负载均衡后可能落在不同的机器上,需要redis计数(单位时间内访问次数); 当限流过程中,出现系统错误(即大概率redis宕机),此时的请求可以选择:下游接口承受能力弱时,干掉,避免打挂下游接口;下游接口承受能力强或者可用性要求高时,保留,保证系统处理的请求尽可能正常运行;

ddos:

防御参考Cloudflare;

限流策略

  1. 设定请求数量的最大阈值;
  2. 对请求进行分类,不同请求应用不同限流规则;
  3. 向用户提供反馈,如错误信息、重试时间;
  4. 可以考虑在高流量期间放宽限制;

Nginx代理层

http {   
  # 定义一个限流区域,使用共享内存存储状态
  # $binary_remote_addr 表示按客户端的 IP 地址进行限流
  # zone=mylimit:10m 表示分配 10MB 的共享内存用于存储请求计数信息,区域命名为 mylimit
  # rate=1r/s 表示每个 IP 每秒允许一个请求
  limit_req_zone $binary_remote_addr zone=mylimit:10m rate=1r/s;
  
  server {    
    # 监听80端口       
    listen 80;
    
    # 定义一个location块,用于匹配特定的请求路径    
    location /api/ {         
      # 应用限流规则
      # burst=5 表示允许的瞬时突发流量,最多可以允许 5 个突发请求超出速率限制
      # 但超过这个数量的请求会被延迟或拒绝
      # nodelay 表示超过速率限制但在 burst 允许范围内的请求不会被延迟,而是立即处理。
      limit_req zone=mylimit burst=5 nodelay;
      
      # 代理请求到后端服务       
      proxy_pass http://backend/;       
    }  
  }
}

令牌桶

原理

  • 令牌生成:系统按照固定的速率生成令牌,并放入令牌桶中;
  • 令牌存储:令牌桶有一个固定的容量上限,当令牌桶满时,多余的令牌被丢弃;
  • 请求处理:每个请求到来时,需要从令牌桶中取出相应数量的令牌。如果桶中有足够的令牌,请求被处理;否则,请求被拒绝或延迟直到有足够的令牌; 特点
  • 突发流量处理:可以处理突发流量,因为令牌可以积累,当有突发请求时,可以使用积累的令牌;
  • 通过令牌生成速率控制平均速率;
  • 方便应对并发出口流量;
  • 令牌生产速度固定,消费速度不固定;
// 利用channel生产/消费令牌
type TokenBucketLimit struct {
	intervalPerSecond int64
	buckets           chan struct{}
	closeCh           chan struct{}
}

func NewTokenBucketLimit(interval int64, capacity int) *TokenBucketLimit {
	return &TokenBucketLimit{
		intervalPerSecond: interval,
		buckets:           make(chan struct{}, capacity),
	}
}

func (l *TokenBucketLimit) NewServerInterceptor() grpc.UnaryServerInterceptor {
	interval := time.Second / time.Duration(l.intervalPerSecond)
	ticker := time.NewTicker(interval)
	go func() {
        defer ticker.Stop()
		for {
			select {
			case <-l.closeCh:
				return
			case <-ticker.C:
				select {
				case l.buckets <- struct{}{}:

				default:

				}
			}
		}
	}()
	// 此处可调整限流的粒度
	return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
		select {
		case <-l.buckets:
			return handler(ctx, req)
		default: // 并发越高,越不能阻塞,如果不高,此处也可以阻塞等待,直到超时
			return nil, errors.New("too many requests")
		}
	}
}

// Close 仅可调用一次
func (l *TokenBucketLimit) Close() {
	close(l.closeCh)
}
// 用锁保证令牌数的并发安全,也可换原子变量等
type TokenBucket struct {
	mu         sync.Mutex
	capacity   int              // 桶的容量
	tokens     int              // 桶中当前的令牌数
	refillRate float64          // 是令牌的每秒的填充速率
	lastRefill time.Time        // 记录上次填充令牌的时间
}

func NewTokenBucket(capacity int, refillRate float64) *TokenBucket {
	// 初始化时桶可被填满,tokens 和 capacity 相等
	return &TokenBucket{
		capacity:   capacity,
		tokens:     capacity,
		refillRate: refillRate,
		lastRefill: time.Now(),
	}
}

// Allow 方法用于判断当前请求是否被允许。
func (t *TokenBucket) Allow() bool {
	t.mu.Lock()
	defer t.mu.Unlock()

	now := time.Now()

	// 计算自上次填充以来经过的秒数
	timeElapsed := float64(now.Unix() - t.lastRefill.Unix())

	// 计算应该添加的令牌数。
	tokensToAdd := t.refillRate * timeElapsed

	// 更新令牌数,但不超过桶的容量
	t.tokens += int(tokensToAdd)
	if t.tokens > t.capacity {
		t.tokens = t.capacity
	}

	// 如果桶中有令牌,则移除一个令牌并允许请求通过
	if t.tokens > 0 {
		t.tokens--
		t.lastRefill = now
		return true
	}

	// 拒绝请求
	return false
}

漏桶

原理

  • 请求进入漏桶:每个请求到来时进入漏桶队列;
  • 请求处理:系统按照固定的速率从漏桶中处理请求(或“漏出”请求);
  • 队列管理:漏桶有固定的容量上限,当漏桶满时,新的请求被丢弃; 特点
  • 不支持突发流量:不能有效处理突发流量,因为请求速率是固定的;
  • 请求处理速率是固定的;
  • 流入速率不固定,流出速率固定;
// 类似于令牌桶的实现,只不过桶的容量为0
type TokenBucketLimit struct {
	intervalPerSecond int64
	buckets           chan struct{}
	closeCh           chan struct{}
}

func NewTokenBucketLimit(interval int64) *TokenBucketLimit {
	return &TokenBucketLimit{
		intervalPerSecond: interval,
		buckets:           make(chan struct{}),
	}
}

func (l *TokenBucketLimit) process() {
    interval := time.Second / time.Duration(l.intervalPerSecond)
	ticker := time.NewTicker(interval)
    
    for {
        defer ticker.Stop()
        select {
        case <-l.closeCh:
            return
        case <-ticker.C:
            <-l.buckets
        }
    }
}

func (l *TokenBucketLimit) NewServerInterceptor() grpc.UnaryServerInterceptor {
	go l.process()
    
	// 此处可调整限流的粒度
	return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
		select {
		case l.buckets <-struct{}{}:
			return handler(ctx, req)
		default: // 并发越高,越不能阻塞,如果不高,此处也可以阻塞等待,直到超时
			return nil, errors.New("too many requests")
		}
	}
}

// Close 仅可调用一次
func (l *TokenBucketLimit) Close() {
	close(l.closeCh)
}

滑动窗口

可使用redis的lua脚本实现:

注:PEXPIRE 是用来防止数据无限增长的一种方式,而 ZREMRANGEBYSCORE 则是为了保证限流算法的准确性

local key = KEYS[1]                                 -- 限流对象(Redis 键名),通常是限流的用户或操作
local window = tonumber(ARGV[1])                    -- 时间窗口的大小(单位为毫秒)
local threshold = tonumber(ARGV[2])                 -- 限流的阈值,表示在窗口期内允许的最大操作次数
local now = tonumber(ARGV[3])                       -- 当前时间戳(单位为毫秒),外部传入

local min = now - window                            -- 窗口的起始时间
redis.call('ZREMRANGEBYSCORE', key, '-inf', min)    --从有序集合 key 中移除所有时间戳早于 min 的成员

local cnt = redis.call('ZCOUNT', key, '-inf', '+inf')
if cnt >= threshold then
    return "true"                                   -- 执行限流
else
    redis.call('ZADD', key, now, now)               -- 把 score 和 member 都设置成 now
    redis.call('PEXPIRE', key, window)
    return "false"
end

go本地实现(对内存和计算要求更高,简单实现,没考虑其他逻辑):

// SlidingWindowLimiter 结构体实现滑动窗口限流算法。
type SlidingWindowLimiter struct {
	mutex          sync.Mutex
	counters       []int
	limit          int
	windowStart    time.Time
	windowDuration time.Duration
	interval       time.Duration
}

// NewSlidingWindowLimiter 构造函数初始化 SlidingWindowLimiter 实例。
func NewSlidingWindowLimiter(limit int, windowDuration time.Duration, interval time.Duration) *SlidingWindowLimiter {
	buckets := int(windowDuration / interval)
	return &SlidingWindowLimiter{
		counters:       make([]int, buckets),
		limit:          limit,
		windowStart:    time.Now(),
		windowDuration: windowDuration,
		interval:       interval,
	}
}

// Allow 方法用于判断当前请求是否被允许,并实现滑动窗口的逻辑。
func (s *SlidingWindowLimiter) Allow() bool {
	s.mutex.Lock()
	defer s.mutex.Unlock()

	// 检查是否需要滑动窗口
	if time.Since(s.windowStart) > s.windowDuration {
		s.slideWindow()
	}

	now := time.Now()
	index := int((now.UnixNano() - s.windowStart.UnixNano()) / s.interval.Nanoseconds()) % len(s.counters)

	if s.counters[index] < s.limit {
		s.counters[index]++
		return true
	}
	return false
}

// slideWindow 方法实现滑动窗口逻辑,移除最旧的时间段并重置计数器。
func (s *SlidingWindowLimiter) slideWindow() {
	// 滑动窗口,忽略最旧的时间段
	copy(s.counters, s.counters[1:])
	// 重置最后一个时间段的计数器
	s.counters[len(s.counters)-1] = 0
	// 更新窗口开始时间
	s.windowStart = time.Now()
}

参考文章

架构师核心能力:限流的底层原理解析