必要性
前面我們實現了通用的 rpc,但是存在一個問題,同步獲取響應的時候沒有超時處理。
如果 server 掛掉了,或者處理太慢,客戶端也不可能一直傻傻的等。
當外部的調用超過指定的時間后,就直接報錯,避免無意義的資源消耗。
思路
調用的時候,將開始時間保留。
獲取的時候檢測是否超時。
同時創建一個線程,用來檢測是否有超時的請求。
實現
思路
調用的時候,將開始時間保留。
獲取的時候檢測是否超時。
同時創建一個線程,用來檢測是否有超時的請求。
超時檢測線程
為了不影響正常業務的性能,我們另起一個線程檢測調用是否已經超時。
- packagecom.github.houbb.rpc.client.invoke.impl;
- importcom.github.houbb.heaven.util.common.ArgUtil;
- importcom.github.houbb.rpc.common.rpc.domain.RpcResponse;
- importcom.github.houbb.rpc.common.rpc.domain.impl.RpcResponseFactory;
- importcom.github.houbb.rpc.common.support.time.impl.Times;
- importjava.util.Map;
- importjava.util.concurrent.ConcurrentHashMap;
- /**
- *超時檢測線程
- *@authorbinbin.hou
- *@since0.0.7
- */
- publicclassTimeoutCheckThreadimplementsRunnable{
- /**
- *請求信息
- *@since0.0.7
- */
-
privatefinalConcurrentHashMap
requestMap; - /**
- *請求信息
- *@since0.0.7
- */
-
privatefinalConcurrentHashMap
responseMap; - /**
- *新建
- *@paramrequestMap請求Map
- *@paramresponseMap結果map
- *@since0.0.7
- */
-
publicTimeoutCheckThread(ConcurrentHashMap
requestMap, -
ConcurrentHashMap
responseMap){ - ArgUtil.notNull(requestMap,"requestMap");
- this.requestMap=requestMap;
- this.responseMap=responseMap;
- }
- @Override
- publicvoidrun(){
-
for(Map.Entry
entry:requestMap.entrySet()){ - longexpireTime=entry.getValue();
- longcurrentTime=Times.time();
- if(currentTime>expireTime){
- finalStringkey=entry.getKey();
- //結果設置為超時,從請求map中移除
- responseMap.putIfAbsent(key,RpcResponseFactory.timeout());
- requestMap.remove(key);
- }
- }
- }
- }
這里主要存儲請求,響應的時間,如果超時,則移除對應的請求。
線程啟動
在 DefaultInvokeService 初始化時啟動:
- finalRunnabletimeoutThread=newTimeoutCheckThread(requestMap,responseMap);
- Executors.newScheduledThreadPool(1)
- .scheduleAtFixedRate(timeoutThread,60,60,TimeUnit.SECONDS);
DefaultInvokeService
原來的設置結果,獲取結果是沒有考慮時間的,這里加一下對應的判斷。
設置請求時間
•添加請求 addRequest
會將過時的時間直接放入 map 中。
因為放入是一次操作,查詢可能是多次。
所以時間在放入的時候計算完成。
- @Override
- publicInvokeServiceaddRequest(StringseqId,longtimeoutMills){
- LOG.info("[Client]startaddrequestforseqId:{},timeoutMills:{}",seqId,
- timeoutMills);
- finallongexpireTime=Times.time()+timeoutMills;
- requestMap.putIfAbsent(seqId,expireTime);
- returnthis;
- }
設置請求結果
•添加響應 addResponse
1.如果 requestMap 中已經不存在這個請求信息,則說明可能超時,直接忽略存入結果。
2.此時檢測是否出現超時,超時直接返回超時信息。
3.放入信息后,通知其他等待的所有進程。
- @Override
- publicInvokeServiceaddResponse(StringseqId,RpcResponserpcResponse){
- //1.判斷是否有效
- LongexpireTime=this.requestMap.get(seqId);
- //如果為空,可能是這個結果已經超時了,被定時job移除之后,響應結果才過來。直接忽略
- if(ObjectUtil.isNull(expireTime)){
- returnthis;
- }
- //2.判斷是否超時
- if(Times.time()>expireTime){
- LOG.info("[Client]seqId:{}信息已超時,直接返回超時結果。",seqId);
- rpcResponse=RpcResponseFactory.timeout();
- }
- //這里放入之前,可以添加判斷。
- //如果seqId必須處理請求集合中,才允許放入。或者直接忽略丟棄。
- //通知所有等待方
- responseMap.putIfAbsent(seqId,rpcResponse);
- LOG.info("[Client]獲取結果信息,seqId:{},rpcResponse:{}",seqId,rpcResponse);
- LOG.info("[Client]seqId:{}信息已經放入,通知所有等待方",seqId);
- //移除對應的requestMap
- requestMap.remove(seqId);
- LOG.info("[Client]seqId:{}removefromrequestmap",seqId);
- synchronized(this){
- this.notifyAll();
- }
- returnthis;
- }
獲取請求結果
•獲取相應 getResponse
1.如果結果存在,直接返回響應結果
2.否則進入等待。
3.等待結束后獲取結果。
- @Override
- publicRpcResponsegetResponse(StringseqId){
- try{
- RpcResponserpcResponse=this.responseMap.get(seqId);
- if(ObjectUtil.isNotNull(rpcResponse)){
- LOG.info("[Client]seq{}對應結果已經獲取:{}",seqId,rpcResponse);
- returnrpcResponse;
- }
- //進入等待
- while(rpcResponse==null){
- LOG.info("[Client]seq{}對應結果為空,進入等待",seqId);
- //同步等待鎖
- synchronized(this){
- this.wait();
- }
- rpcResponse=this.responseMap.get(seqId);
- LOG.info("[Client]seq{}對應結果已經獲取:{}",seqId,rpcResponse);
- }
- returnrpcResponse;
- }catch(InterruptedExceptione){
- thrownewRpcRuntimeException(e);
- }
- }
可以發現獲取部分的邏輯沒變,因為超時會返回一個超時對象:RpcResponseFactory.timeout();
這是一個非常簡單的實現,如下:
- packagecom.github.houbb.rpc.common.rpc.domain.impl;
- importcom.github.houbb.rpc.common.exception.RpcTimeoutException;
- importcom.github.houbb.rpc.common.rpc.domain.RpcResponse;
- /**
- *響應工廠類
- *@authorbinbin.hou
- *@since0.0.7
- */
- publicfinalclassRpcResponseFactory{
- privateRpcResponseFactory(){}
- /**
- *超時異常信息
- *@since0.0.7
- */
- privatestaticfinalDefaultRpcResponseTIMEOUT;
- static{
- TIMEOUT=newDefaultRpcResponse();
- TIMEOUT.error(newRpcTimeoutException());
- }
- /**
- *獲取超時響應結果
- *@return響應結果
- *@since0.0.7
- */
- publicstaticRpcResponsetimeout(){
- returnTIMEOUT;
- }
- }
響應結果指定一個超時異常,這個異常會在代理處理結果時拋出:
- RpcResponserpcResponse=proxyContext.invokeService().getResponse(seqId);
- Throwableerror=rpcResponse.error();
- if(ObjectUtil.isNotNull(error)){
- throwerror;
- }
- returnrpcResponse.result();
測試代碼
服務端
我們故意把服務端的實現添加沉睡,其他保持不變。
- publicclassCalculatorServiceImplimplementsCalculatorService{
- publicCalculateResponsesum(CalculateRequestrequest){
- intsum=request.getOne()+request.getTwo();
- //故意沉睡3s
- try{
- TimeUnit.SECONDS.sleep(3);
- }catch(InterruptedExceptione){
- e.printStackTrace();
- }
- returnnewCalculateResponse(true,sum);
- }
- }
客戶端
設置對應的超時時間為 1S,其他不變:
- publicstaticvoidmain(String[]args){
- //服務配置信息
-
ReferenceConfig
config=newDefaultReferenceConfig (); - config.serviceId(ServiceIdConst.CALC);
- config.serviceInterface(CalculatorService.class);
- config.addresses("localhost:9527");
- //設置超時時間為1S
- config.timeout(1000);
- CalculatorServicecalculatorService=config.reference();
- CalculateRequestrequest=newCalculateRequest();
- request.setOne(10);
- request.setTwo(20);
- CalculateResponseresponse=calculatorService.sum(request);
- System.out.println(response);
- }
日志如下:
- .log.integration.adaptors.stdout.StdOutExImpl'adapter.
- [INFO][2021-10-0514:59:40.974][main][c.g.h.r.c.c.RpcClient.connect]-RPC服務開始啟動客戶端
- ...
- [INFO][2021-10-0514:59:42.504][main][c.g.h.r.c.c.RpcClient.connect]-RPC服務啟動客戶端完成,監聽地址localhost:9527
- [INFO][2021-10-0514:59:42.533][main][c.g.h.r.c.p.ReferenceProxy.invoke]-[Client]startcallremotewithrequest:DefaultRpcRequest{seqId='62e126d9a0334399904509acf8dfe0bb',createTime=1633417182525,serviceId='calc',methodName='sum',paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest],paramValues=[CalculateRequest{one=10,two=20}]}
- [INFO][2021-10-0514:59:42.534][main][c.g.h.r.c.i.i.DefaultInvokeService.addRequest]-[Client]startaddrequestforseqId:62e126d9a0334399904509acf8dfe0bb,timeoutMills:1000
- [INFO][2021-10-0514:59:42.535][main][c.g.h.r.c.p.ReferenceProxy.invoke]-[Client]startcallchannelid:00e04cfffe360988-000004bc-00000000-1178e1265e903c4c-7975626f
- ...
- Exceptioninthread"main"com.github.houbb.rpc.common.exception.RpcTimeoutException
-
atcom.github.houbb.rpc.common.rpc.domain.impl.RpcResponseFactory.
(RpcResponseFactory.java:23) - atcom.github.houbb.rpc.client.invoke.impl.DefaultInvokeService.addResponse(DefaultInvokeService.java:72)
- atcom.github.houbb.rpc.client.handler.RpcClientHandler.channelRead0(RpcClientHandler.java:43)
- atio.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
- atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
- atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
- atio.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
- atio.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:241)
- atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
- atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
- atio.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
- atio.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
- atio.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
- atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
- atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
- atio.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
- atio.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
- atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
- atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
- atio.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
- atio.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
- atio.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
- atio.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
- atio.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
- atio.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
- atio.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
- atio.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
- atjava.lang.Thread.run(Thread.java:748)
- ...
- [INFO][2021-10-0514:59:45.615][nioEventLoopGroup-2-1][c.g.h.r.c.i.i.DefaultInvokeService.addResponse]-[Client]seqId:62e126d9a0334399904509acf8dfe0bb信息已超時,直接返回超時結果。
- [INFO][2021-10-0514:59:45.617][nioEventLoopGroup-2-1][c.g.h.r.c.i.i.DefaultInvokeService.addResponse]-[Client]獲取結果信息,seqId:62e126d9a0334399904509acf8dfe0bb,rpcResponse:DefaultRpcResponse{seqId='null',error=com.github.houbb.rpc.common.exception.RpcTimeoutException,result=null}
- [INFO][2021-10-0514:59:45.617][nioEventLoopGroup-2-1][c.g.h.r.c.i.i.DefaultInvokeService.addResponse]-[Client]seqId:62e126d9a0334399904509acf8dfe0bb信息已經放入,通知所有等待方
- [INFO][2021-10-0514:59:45.618][nioEventLoopGroup-2-1][c.g.h.r.c.i.i.DefaultInvokeService.addResponse]-[Client]seqId:62e126d9a0334399904509acf8dfe0bbremovefromrequestmap
- [INFO][2021-10-0514:59:45.618][nioEventLoopGroup-2-1][c.g.h.r.c.c.RpcClient.channelRead0]-[Client]responseis:DefaultRpcResponse{seqId='62e126d9a0334399904509acf8dfe0bb',error=null,result=CalculateResponse{success=true,sum=30}}
- [INFO][2021-10-0514:59:45.619][main][c.g.h.r.c.i.i.DefaultInvokeService.getResponse]-[Client]seq62e126d9a0334399904509acf8dfe0bb對應結果已經獲取:DefaultRpcResponse{seqId='null',error=com.github.houbb.rpc.common.exception.RpcTimeoutException,result=null}
- ...
可以發現,超時異常。
不足之處
對于超時的處理可以拓展為雙向的,比如服務端也可以指定超時限制,避免資源的浪費。
原文鏈接:https://www.toutiao.com/a7018512258305278500/