場景
發布微服務的操作一般都是打完新代碼的包,kill掉在跑的應用,替換新的包,啟動。
spring cloud 中使用eureka為注冊中心,它是允許服務列表數據的延遲性的,就是說即使應用已經不在服務列表了,客戶端在一段時間內依然會請求這個地址。那么就會出現請求正在發布的地址,而導致失敗。
我們會優化服務列表的刷新時間,以提高服務列表信息的時效性。但是無論怎樣,都無法避免有那么一段時間是數據不一致的。
所以我們想到一個辦法就是重試機制,當a機子在重啟時,同個集群的b是可以正常提供服務的,如果有重試機制就可以在上面這個場景里進行重試到b而不影響正確響應。
操作
需要進行如下的操作:
1
2
3
4
5
6
|
ribbon: ReadTimeout: 10000 ConnectTimeout: 10000 MaxAutoRetries: 0 MaxAutoRetriesNextServer: 1 OkToRetryOnAllOperations: false |
引入spring-retry包
1
2
3
4
|
< dependency > < groupId >org.springframework.retry</ groupId > < artifactId >spring-retry</ artifactId > </ dependency > |
以zuul為例子還需要配置開啟重試:
1
|
zuul.retryable=true |
遇到了問題
然而萬事總沒那么一帆風順,通過測試重試機制生效了,但是并沒有我想象的去請求另一臺健康的機子,于是被迫去吧開源碼看一看,最終發現是源碼的bug,不過已經修復,升級版本即可。
代碼分析
使用的版本是
spring-cloud-netflix-core:1.3.6.RELEASE
spring-retry:1.2.1.RELEASE
spring cloud 依賴版本:
1
2
3
4
5
6
7
8
9
10
11
|
< dependencyManagement > < dependencies > < dependency > < groupId >org.springframework.cloud</ groupId > < artifactId >spring-cloud-dependencies</ artifactId > < version >${spring-cloud.version}</ version > < type >pom</ type > < scope >import</ scope > </ dependency > </ dependencies > </ dependencyManagement > |
因為啟用了重試,所以請求應用時會執行RetryableRibbonLoadBalancingHttpClient.execute方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
public RibbonApacheHttpResponse execute( final RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception { final RequestConfig.Builder builder = RequestConfig.custom(); IClientConfig config = configOverride != null ? configOverride : this .config; builder.setConnectTimeout(config.get( CommonClientConfigKey.ConnectTimeout, this .connectTimeout)); builder.setSocketTimeout(config.get( CommonClientConfigKey.ReadTimeout, this .readTimeout)); builder.setRedirectsEnabled(config.get( CommonClientConfigKey.FollowRedirects, this .followRedirects)); final RequestConfig requestConfig = builder.build(); final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryPolicyFactory.create( this .getClientName(), this ); RetryCallback retryCallback = new RetryCallback() { @Override public RibbonApacheHttpResponse doWithRetry(RetryContext context) throws Exception { //on retries the policy will choose the server and set it in the context //extract the server and update the request being made RibbonApacheHttpRequest newRequest = request; if (context instanceof LoadBalancedRetryContext) { ServiceInstance service = ((LoadBalancedRetryContext)context).getServiceInstance(); if (service != null ) { //Reconstruct the request URI using the host and port set in the retry context newRequest = newRequest.withNewUri( new URI(service.getUri().getScheme(), newRequest.getURI().getUserInfo(), service.getHost(), service.getPort(), newRequest.getURI().getPath(), newRequest.getURI().getQuery(), newRequest.getURI().getFragment())); } } newRequest = getSecureRequest(request, configOverride); HttpUriRequest httpUriRequest = newRequest.toRequest(requestConfig); final HttpResponse httpResponse = RetryableRibbonLoadBalancingHttpClient. this .delegate.execute(httpUriRequest); if (retryPolicy.retryableStatusCode(httpResponse.getStatusLine().getStatusCode())) { if (CloseableHttpResponse. class .isInstance(httpResponse)) { ((CloseableHttpResponse)httpResponse).close(); } throw new RetryableStatusCodeException(RetryableRibbonLoadBalancingHttpClient. this .clientName, httpResponse.getStatusLine().getStatusCode()); } return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI()); } }; return this .executeWithRetry(request, retryPolicy, retryCallback); } |
我們發現先new 一個RetryCallback,然后執行this.executeWithRetry(request, retryPolicy, retryCallback);
而這個RetryCallback.doWithRetry的代碼我們清楚看到是實際請求的代碼,也就是說this.executeWithRetry方法最終還是會調用RetryCallback.doWithRetry
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback, RetryState state) throws E, ExhaustedRetryException { RetryPolicy retryPolicy = this .retryPolicy; BackOffPolicy backOffPolicy = this .backOffPolicy; // Allow the retry policy to initialise itself... RetryContext context = open(retryPolicy, state); if ( this .logger.isTraceEnabled()) { this .logger.trace( "RetryContext retrieved: " + context); } // Make sure the context is available globally for clients who need // it... RetrySynchronizationManager.register(context); Throwable lastException = null ; boolean exhausted = false ; try { // Give clients a chance to enhance the context... boolean running = doOpenInterceptors(retryCallback, context); if (!running) { throw new TerminatedRetryException( "Retry terminated abnormally by interceptor before first attempt" ); } // Get or Start the backoff context... BackOffContext backOffContext = null ; Object resource = context.getAttribute( "backOffContext" ); if (resource instanceof BackOffContext) { backOffContext = (BackOffContext) resource; } if (backOffContext == null ) { backOffContext = backOffPolicy.start(context); if (backOffContext != null ) { context.setAttribute( "backOffContext" , backOffContext); } } /* * We allow the whole loop to be skipped if the policy or context already * forbid the first try. This is used in the case of external retry to allow a * recovery in handleRetryExhausted without the callback processing (which * would throw an exception). */ while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) { try { if (this.logger.isDebugEnabled()) { this.logger.debug("Retry: count=" + context.getRetryCount()); } // Reset the last exception, so if we are successful // the close interceptors will not think we failed... lastException = null; return retryCallback.doWithRetry(context); } catch (Throwable e) { lastException = e; try { registerThrowable(retryPolicy, state, context, e); } catch (Exception ex) { throw new TerminatedRetryException("Could not register throwable", ex); } finally { doOnErrorInterceptors(retryCallback, context, e); } if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) { try { backOffPolicy.backOff(backOffContext); } catch (BackOffInterruptedException ex) { lastException = e; // back off was prevented by another thread - fail the retry if (this.logger.isDebugEnabled()) { this.logger .debug("Abort retry because interrupted: count=" + context.getRetryCount()); } throw ex; } } if (this.logger.isDebugEnabled()) { this.logger.debug( "Checking for rethrow: count=" + context.getRetryCount()); } if (shouldRethrow(retryPolicy, context, state)) { if (this.logger.isDebugEnabled()) { this.logger.debug("Rethrow in retry for policy: count=" + context.getRetryCount()); } throw RetryTemplate.<E>wrapIfNecessary(e); } } /* * A stateful attempt that can retry may rethrow the exception before now, * but if we get this far in a stateful retry there's a reason for it, * like a circuit breaker or a rollback classifier. */ if (state != null && context.hasAttribute(GLOBAL_STATE)) { break ; } } if (state == null && this .logger.isDebugEnabled()) { this .logger.debug( "Retry failed last attempt: count=" + context.getRetryCount()); } exhausted = true ; return handleRetryExhausted(recoveryCallback, context, state); } catch (Throwable e) { throw RetryTemplate.<E>wrapIfNecessary(e); } finally { close(retryPolicy, context, state, lastException == null || exhausted); doCloseInterceptors(retryCallback, context, lastException); RetrySynchronizationManager.clear(); } } |
在一個while循環里實現重試機制,當執行retryCallback.doWithRetry(context)出現異常的時候,就會catch異常,然后用 retryPolicy判斷是否進行重試,特別注意registerThrowable(retryPolicy, state, context, e);方法,不但判斷了是否重試,在重試情況下會新選出一個機子放入context,然后再去執行retryCallback.doWithRetry(context)時帶入,如此就實現了換機子重試了。
但是我的配置怎么會沒有換機子呢?調試代碼發現registerThrowable(retryPolicy, state, context, e);選出來的機子沒問題,就是新的健康的機子,但是在執行retryCallback.doWithRetry(context)代碼的時候依然請求的是那臺掛掉的機子。
所以我們再仔細看一下retryCallback.doWithRetry(context)的代碼:
我們發現了這行代碼:
1
2
3
4
5
6
7
8
9
|
newRequest = getSecureRequest(request, configOverride); protected RibbonApacheHttpRequest getSecureRequest(RibbonApacheHttpRequest request, IClientConfig configOverride) { if (isSecure(configOverride)) { final URI secureUri = UriComponentsBuilder.fromUri(request.getUri()) .scheme( "https" ).build( true ).toUri(); return request.withNewUri(secureUri); } return request; } |
newRequest在前面已經使用context構建完畢,request是上一次請求的數據,只要執行這個代碼就會發現newRequest永遠都會被request覆蓋??吹竭@里我們才發現原來是一個源碼bug。
issue地址:https://github.com/spring-cloud/spring-cloud-netflix/issues/2667
總結
這是一次很普通的查問題過程,在這個過程中當我發現配置沒有達到我的預期時,我先查看了配置的含義,嘗試多次無果,于是進行斷點調試發現異常中斷點后,因為場景需要一臺機子健康一臺機子下線,我模擬了數百次,最終才定位到了這行代碼。開源項目即使是優秀的項目必然也會有bug存在,不迷信,不盲目。另一方面,閱讀源碼能力也是一個解決問題的重要能力,像我在找源碼入口,定位代碼時耗費了很多的時間。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:http://www.cnblogs.com/killbug/p/9150067.html