開發高并發系統時有三把利器用來保護系統:緩存、降級和限流。
api網關作為所有請求的入口,請求量大,我們可以通過對并發訪問的請求進行限速來保護系統的可用性。
常用的限流算法比如有令牌桶算法,漏桶算法,計數器算法等。
在zuul中我們可以自己去實現限流的功能 (zuul中如何限流在我的書 《spring cloud微服務-全棧技術與案例解析》 中有詳細講解) ,spring cloud gateway的出現本身就是用來替代zuul的。
要想替代那肯定得有強大的功能,除了性能上的優勢之外,spring cloud gateway還提供了很多新功能,比如今天我們要講的限流操作,使用起來非常簡單,今天我們就來學習在如何在spring cloud gateway中進行限流操作。
目前限流提供了基于redis的實現,我們需要增加對應的依賴:
1
2
3
4
|
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-data-redis-reactive</artifactid> </dependency> |
可以通過keyresolver來指定限流的key,比如我們需要根據用戶來做限流,ip來做限流等等。
ip限流
1
2
3
4
|
@bean public keyresolver ipkeyresolver() { return exchange -> mono.just(exchange.getrequest().getremoteaddress().gethostname()); } |
通過exchange對象可以獲取到請求信息,這邊用了hostname,如果你想根據用戶來做限流的話這邊可以獲取當前請求的用戶id或者用戶名就可以了,比如:
用戶限流
使用這種方式限流,請求路徑中必須攜帶userid參數。
1
2
3
4
|
@bean keyresolver userkeyresolver() { return exchange -> mono.just(exchange.getrequest().getqueryparams().getfirst( "userid" )); } |
接口限流
獲取請求地址的uri作為限流key。
1
2
3
4
|
@bean keyresolver apikeyresolver() { return exchange -> mono.just(exchange.getrequest().getpath().value()); } |
然后配置限流的過濾器信息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
server: port: 8084 spring: redis: host: 127.0 . 0.1 port: 6379 cloud: gateway: routes: - id: fsh-house uri: lb: //fsh-house predicates: - path=/house/** filters: - name: requestratelimiter args: redis-rate-limiter.replenishrate: 10 redis-rate-limiter.burstcapacity: 20 key-resolver: "#{@ipkeyresolver}" |
- filter名稱必須是requestratelimiter
- redis-rate-limiter.replenishrate:允許用戶每秒處理多少個請求
- redis-rate-limiter.burstcapacity:令牌桶的容量,允許在一秒鐘內完成的最大請求數
- key-resolver:使用spel按名稱引用bean
可以訪問接口進行測試,這時候redis中會有對應的數據:
127.0.0.1:6379> keys *
1) "request_rate_limiter.{localhost}.timestamp"
2) "request_rate_limiter.{localhost}.tokens"
大括號中就是我們的限流key,這邊是ip,本地的就是localhost
- timestamp:存儲的是當前時間的秒數,也就是system.currenttimemillis() / 1000或者instant.now().getepochsecond()
- tokens:存儲的是當前這秒鐘的對應的可用的令牌數量
spring cloud gateway目前提供的限流還是相對比較簡單的,在實際中我們的限流策略會有很多種情況,比如:
- 每個接口的限流數量不同,可以通過配置中心動態調整
- 超過的流量被拒絕后可以返回固定的格式給調用方
- 對某個服務進行整體限流(這個大家可以思考下用spring cloud gateway如何實現,其實很簡單)
- ……
當然我們也可以通過重新redisratelimiter來實現自己的限流策略,這個我們后面再進行介紹。
限流源碼
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
// routeid也就是我們的fsh-house,id就是限流的key,也就是localhost。 public mono<response> isallowed(string routeid, string id) { // 會判斷redisratelimiter是否初始化了 if (! this .initialized.get()) { throw new illegalstateexception( "redisratelimiter is not initialized" ); } // 獲取routeid對應的限流配置 config routeconfig = getconfig().getordefault(routeid, defaultconfig); if (routeconfig == null ) { throw new illegalargumentexception( "no configuration found for route " + routeid); } // 允許用戶每秒做多少次請求 int replenishrate = routeconfig.getreplenishrate(); // 令牌桶的容量,允許在一秒鐘內完成的最大請求數 int burstcapacity = routeconfig.getburstcapacity(); try { // 限流key的名稱(request_rate_limiter.{localhost}.timestamp,request_rate_limiter.{localhost}.tokens) list<string> keys = getkeys(id); // the arguments to the lua script. time() returns unixtime in seconds. list<string> scriptargs = arrays.aslist(replenishrate + "" , burstcapacity + "" , instant.now().getepochsecond() + "" , "1" ); // allowed, tokens_left = redis.eval(script, keys, args) // 執行lua腳本 flux<list< long >> flux = this .redistemplate.execute( this .script, keys, scriptargs); // .log("redisratelimiter", level.finer); return flux.onerrorresume(throwable -> flux.just(arrays.aslist(1l, -1l))) .reduce( new arraylist< long >(), (longs, l) -> { longs.addall(l); return longs; }) .map(results -> { boolean allowed = results.get( 0 ) == 1l; long tokensleft = results.get( 1 ); response response = new response(allowed, getheaders(routeconfig, tokensleft)); if (log.isdebugenabled()) { log.debug( "response: " + response); } return response; }); } catch (exception e) { log.error( "error determining if user allowed from redis" , e); } return mono.just( new response( true , getheaders(routeconfig, -1l))); } |
lua腳本在:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
local tokens_key = keys[ 1 ] local timestamp_key = keys[ 2 ] --redis.log(redis.log_warning, "tokens_key " .. tokens_key) local rate = tonumber(argv[ 1 ]) local capacity = tonumber(argv[ 2 ]) local now = tonumber(argv[ 3 ]) local requested = tonumber(argv[ 4 ]) local fill_time = capacity/rate local ttl = math.floor(fill_time* 2 ) --redis.log(redis.log_warning, "rate " .. argv[ 1 ]) --redis.log(redis.log_warning, "capacity " .. argv[ 2 ]) --redis.log(redis.log_warning, "now " .. argv[ 3 ]) --redis.log(redis.log_warning, "requested " .. argv[ 4 ]) --redis.log(redis.log_warning, "filltime " .. fill_time) --redis.log(redis.log_warning, "ttl " .. ttl) local last_tokens = tonumber(redis.call( "get" , tokens_key)) if last_tokens == nil then last_tokens = capacity end --redis.log(redis.log_warning, "last_tokens " .. last_tokens) local last_refreshed = tonumber(redis.call( "get" , timestamp_key)) if last_refreshed == nil then last_refreshed = 0 end --redis.log(redis.log_warning, "last_refreshed " .. last_refreshed) local delta = math.max( 0 , now-last_refreshed) local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) local allowed = filled_tokens >= requested local new_tokens = filled_tokens local allowed_num = 0 if allowed then new_tokens = filled_tokens - requested allowed_num = 1 end --redis.log(redis.log_warning, "delta " .. delta) --redis.log(redis.log_warning, "filled_tokens " .. filled_tokens) --redis.log(redis.log_warning, "allowed_num " .. allowed_num) --redis.log(redis.log_warning, "new_tokens " .. new_tokens) redis.call( "setex" , tokens_key, ttl, new_tokens) redis.call( "setex" , timestamp_key, ttl, now) return { allowed_num, new_tokens } |
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:https://mp.weixin.qq.com/s/70i4X_b-gtegofF-8W-uBQ