一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - Java 從零開始手寫 RPC-timeout 超時處理

Java 從零開始手寫 RPC-timeout 超時處理

2021-10-29 22:53今日頭條老馬嘯西風 Java教程

前面我們實現了通用的 rpc,但是存在一個問題,同步獲取響應的時候沒有超時處理。如果 server 掛掉了,或者處理太慢,客戶端也不可能一直傻傻的等。

Java 從零開始手寫 RPC-timeout 超時處理

必要性

前面我們實現了通用的 rpc,但是存在一個問題,同步獲取響應的時候沒有超時處理。

如果 server 掛掉了,或者處理太慢,客戶端也不可能一直傻傻的等。

當外部的調用超過指定的時間后,就直接報錯,避免無意義的資源消耗。

思路

調用的時候,將開始時間保留。

獲取的時候檢測是否超時。

同時創建一個線程,用來檢測是否有超時的請求。

實現

思路

調用的時候,將開始時間保留。

獲取的時候檢測是否超時。

同時創建一個線程,用來檢測是否有超時的請求。

超時檢測線程

為了不影響正常業務的性能,我們另起一個線程檢測調用是否已經超時。

  1. packagecom.github.houbb.rpc.client.invoke.impl;
  2.  
  3.  
  4. importcom.github.houbb.heaven.util.common.ArgUtil;
  5. importcom.github.houbb.rpc.common.rpc.domain.RpcResponse;
  6. importcom.github.houbb.rpc.common.rpc.domain.impl.RpcResponseFactory;
  7. importcom.github.houbb.rpc.common.support.time.impl.Times;
  8.  
  9.  
  10. importjava.util.Map;
  11. importjava.util.concurrent.ConcurrentHashMap;
  12.  
  13.  
  14. /**
  15. *超時檢測線程
  16. *@authorbinbin.hou
  17. *@since0.0.7
  18. */
  19. publicclassTimeoutCheckThreadimplementsRunnable{
  20.  
  21.  
  22. /**
  23. *請求信息
  24. *@since0.0.7
  25. */
  26. privatefinalConcurrentHashMaprequestMap;
  27.  
  28.  
  29. /**
  30. *請求信息
  31. *@since0.0.7
  32. */
  33. privatefinalConcurrentHashMapresponseMap;
  34.  
  35.  
  36. /**
  37. *新建
  38. *@paramrequestMap請求Map
  39. *@paramresponseMap結果map
  40. *@since0.0.7
  41. */
  42. publicTimeoutCheckThread(ConcurrentHashMaprequestMap,
  43. ConcurrentHashMapresponseMap){
  44. ArgUtil.notNull(requestMap,"requestMap");
  45. this.requestMap=requestMap;
  46. this.responseMap=responseMap;
  47. }
  48.  
  49.  
  50. @Override
  51. publicvoidrun(){
  52. for(Map.Entryentry:requestMap.entrySet()){
  53. longexpireTime=entry.getValue();
  54. longcurrentTime=Times.time();
  55.  
  56.  
  57. if(currentTime>expireTime){
  58. finalStringkey=entry.getKey();
  59. //結果設置為超時,從請求map中移除
  60. responseMap.putIfAbsent(key,RpcResponseFactory.timeout());
  61. requestMap.remove(key);
  62. }
  63. }
  64. }
  65.  
  66.  
  67. }

這里主要存儲請求,響應的時間,如果超時,則移除對應的請求。

線程啟動

在 DefaultInvokeService 初始化時啟動:

  1. finalRunnabletimeoutThread=newTimeoutCheckThread(requestMap,responseMap);
  2. Executors.newScheduledThreadPool(1)
  3. .scheduleAtFixedRate(timeoutThread,60,60,TimeUnit.SECONDS);

DefaultInvokeService

原來的設置結果,獲取結果是沒有考慮時間的,這里加一下對應的判斷。

設置請求時間

•添加請求 addRequest

會將過時的時間直接放入 map 中。

因為放入是一次操作,查詢可能是多次。

所以時間在放入的時候計算完成。

  1. @Override
  2. publicInvokeServiceaddRequest(StringseqId,longtimeoutMills){
  3. LOG.info("[Client]startaddrequestforseqId:{},timeoutMills:{}",seqId,
  4. timeoutMills);
  5. finallongexpireTime=Times.time()+timeoutMills;
  6. requestMap.putIfAbsent(seqId,expireTime);
  7. returnthis;
  8. }

設置請求結果

•添加響應 addResponse

1.如果 requestMap 中已經不存在這個請求信息,則說明可能超時,直接忽略存入結果。

2.此時檢測是否出現超時,超時直接返回超時信息。

3.放入信息后,通知其他等待的所有進程。

  1. @Override
  2. publicInvokeServiceaddResponse(StringseqId,RpcResponserpcResponse){
  3. //1.判斷是否有效
  4. LongexpireTime=this.requestMap.get(seqId);
  5. //如果為空,可能是這個結果已經超時了,被定時job移除之后,響應結果才過來。直接忽略
  6. if(ObjectUtil.isNull(expireTime)){
  7. returnthis;
  8. }
  9.  
  10.  
  11. //2.判斷是否超時
  12. if(Times.time()>expireTime){
  13. LOG.info("[Client]seqId:{}信息已超時,直接返回超時結果。",seqId);
  14. rpcResponse=RpcResponseFactory.timeout();
  15. }
  16.  
  17.  
  18. //這里放入之前,可以添加判斷。
  19. //如果seqId必須處理請求集合中,才允許放入。或者直接忽略丟棄。
  20. //通知所有等待方
  21. responseMap.putIfAbsent(seqId,rpcResponse);
  22. LOG.info("[Client]獲取結果信息,seqId:{},rpcResponse:{}",seqId,rpcResponse);
  23. LOG.info("[Client]seqId:{}信息已經放入,通知所有等待方",seqId);
  24. //移除對應的requestMap
  25. requestMap.remove(seqId);
  26. LOG.info("[Client]seqId:{}removefromrequestmap",seqId);
  27. synchronized(this){
  28. this.notifyAll();
  29. }
  30. returnthis;
  31. }

獲取請求結果

•獲取相應 getResponse

1.如果結果存在,直接返回響應結果

2.否則進入等待。

3.等待結束后獲取結果。

  1. @Override
  2. publicRpcResponsegetResponse(StringseqId){
  3. try{
  4. RpcResponserpcResponse=this.responseMap.get(seqId);
  5. if(ObjectUtil.isNotNull(rpcResponse)){
  6. LOG.info("[Client]seq{}對應結果已經獲取:{}",seqId,rpcResponse);
  7. returnrpcResponse;
  8. }
  9. //進入等待
  10. while(rpcResponse==null){
  11. LOG.info("[Client]seq{}對應結果為空,進入等待",seqId);
  12. //同步等待鎖
  13. synchronized(this){
  14. this.wait();
  15. }
  16. rpcResponse=this.responseMap.get(seqId);
  17. LOG.info("[Client]seq{}對應結果已經獲取:{}",seqId,rpcResponse);
  18. }
  19. returnrpcResponse;
  20. }catch(InterruptedExceptione){
  21. thrownewRpcRuntimeException(e);
  22. }
  23. }

可以發現獲取部分的邏輯沒變,因為超時會返回一個超時對象:RpcResponseFactory.timeout();

這是一個非常簡單的實現,如下:

  1. packagecom.github.houbb.rpc.common.rpc.domain.impl;
  2.  
  3.  
  4. importcom.github.houbb.rpc.common.exception.RpcTimeoutException;
  5. importcom.github.houbb.rpc.common.rpc.domain.RpcResponse;
  6.  
  7.  
  8. /**
  9. *響應工廠類
  10. *@authorbinbin.hou
  11. *@since0.0.7
  12. */
  13. publicfinalclassRpcResponseFactory{
  14.  
  15.  
  16. privateRpcResponseFactory(){}
  17.  
  18.  
  19. /**
  20. *超時異常信息
  21. *@since0.0.7
  22. */
  23. privatestaticfinalDefaultRpcResponseTIMEOUT;
  24.  
  25.  
  26. static{
  27. TIMEOUT=newDefaultRpcResponse();
  28. TIMEOUT.error(newRpcTimeoutException());
  29. }
  30.  
  31.  
  32. /**
  33. *獲取超時響應結果
  34. *@return響應結果
  35. *@since0.0.7
  36. */
  37. publicstaticRpcResponsetimeout(){
  38. returnTIMEOUT;
  39. }
  40.  
  41.  
  42. }

響應結果指定一個超時異常,這個異常會在代理處理結果時拋出:

  1. RpcResponserpcResponse=proxyContext.invokeService().getResponse(seqId);
  2. Throwableerror=rpcResponse.error();
  3. if(ObjectUtil.isNotNull(error)){
  4. throwerror;
  5. }
  6. returnrpcResponse.result();

測試代碼

服務端

我們故意把服務端的實現添加沉睡,其他保持不變。

  1. publicclassCalculatorServiceImplimplementsCalculatorService{
  2.  
  3.  
  4. publicCalculateResponsesum(CalculateRequestrequest){
  5. intsum=request.getOne()+request.getTwo();
  6.  
  7.  
  8. //故意沉睡3s
  9. try{
  10. TimeUnit.SECONDS.sleep(3);
  11. }catch(InterruptedExceptione){
  12. e.printStackTrace();
  13. }
  14.  
  15.  
  16. returnnewCalculateResponse(true,sum);
  17. }
  18.  
  19.  
  20. }

客戶端

設置對應的超時時間為 1S,其他不變:

  1. publicstaticvoidmain(String[]args){
  2. //服務配置信息
  3. ReferenceConfigconfig=newDefaultReferenceConfig();
  4. config.serviceId(ServiceIdConst.CALC);
  5. config.serviceInterface(CalculatorService.class);
  6. config.addresses("localhost:9527");
  7. //設置超時時間為1S
  8. config.timeout(1000);
  9.  
  10.  
  11. CalculatorServicecalculatorService=config.reference();
  12. CalculateRequestrequest=newCalculateRequest();
  13. request.setOne(10);
  14. request.setTwo(20);
  15.  
  16.  
  17. CalculateResponseresponse=calculatorService.sum(request);
  18. System.out.println(response);
  19. }

日志如下:

  1. .log.integration.adaptors.stdout.StdOutExImpl'adapter.
  2. [INFO][2021-10-0514:59:40.974][main][c.g.h.r.c.c.RpcClient.connect]-RPC服務開始啟動客戶端
  3. ...
  4. [INFO][2021-10-0514:59:42.504][main][c.g.h.r.c.c.RpcClient.connect]-RPC服務啟動客戶端完成,監聽地址localhost:9527
  5. [INFO][2021-10-0514:59:42.533][main][c.g.h.r.c.p.ReferenceProxy.invoke]-[Client]startcallremotewithrequest:DefaultRpcRequest{seqId='62e126d9a0334399904509acf8dfe0bb',createTime=1633417182525,serviceId='calc',methodName='sum',paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest],paramValues=[CalculateRequest{one=10,two=20}]}
  6. [INFO][2021-10-0514:59:42.534][main][c.g.h.r.c.i.i.DefaultInvokeService.addRequest]-[Client]startaddrequestforseqId:62e126d9a0334399904509acf8dfe0bb,timeoutMills:1000
  7. [INFO][2021-10-0514:59:42.535][main][c.g.h.r.c.p.ReferenceProxy.invoke]-[Client]startcallchannelid:00e04cfffe360988-000004bc-00000000-1178e1265e903c4c-7975626f
  8. ...
  9. Exceptioninthread"main"com.github.houbb.rpc.common.exception.RpcTimeoutException
  10. atcom.github.houbb.rpc.common.rpc.domain.impl.RpcResponseFactory.(RpcResponseFactory.java:23)
  11. atcom.github.houbb.rpc.client.invoke.impl.DefaultInvokeService.addResponse(DefaultInvokeService.java:72)
  12. atcom.github.houbb.rpc.client.handler.RpcClientHandler.channelRead0(RpcClientHandler.java:43)
  13. atio.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
  14. atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
  15. atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
  16. atio.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
  17. atio.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:241)
  18. atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
  19. atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
  20. atio.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
  21. atio.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
  22. atio.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
  23. atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
  24. atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
  25. atio.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
  26. atio.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
  27. atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
  28. atio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
  29. atio.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
  30. atio.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
  31. atio.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
  32. atio.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
  33. atio.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
  34. atio.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
  35. atio.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
  36. atio.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
  37. atjava.lang.Thread.run(Thread.java:748)
  38. ...
  39. [INFO][2021-10-0514:59:45.615][nioEventLoopGroup-2-1][c.g.h.r.c.i.i.DefaultInvokeService.addResponse]-[Client]seqId:62e126d9a0334399904509acf8dfe0bb信息已超時,直接返回超時結果。
  40. [INFO][2021-10-0514:59:45.617][nioEventLoopGroup-2-1][c.g.h.r.c.i.i.DefaultInvokeService.addResponse]-[Client]獲取結果信息,seqId:62e126d9a0334399904509acf8dfe0bb,rpcResponse:DefaultRpcResponse{seqId='null',error=com.github.houbb.rpc.common.exception.RpcTimeoutException,result=null}
  41. [INFO][2021-10-0514:59:45.617][nioEventLoopGroup-2-1][c.g.h.r.c.i.i.DefaultInvokeService.addResponse]-[Client]seqId:62e126d9a0334399904509acf8dfe0bb信息已經放入,通知所有等待方
  42. [INFO][2021-10-0514:59:45.618][nioEventLoopGroup-2-1][c.g.h.r.c.i.i.DefaultInvokeService.addResponse]-[Client]seqId:62e126d9a0334399904509acf8dfe0bbremovefromrequestmap
  43. [INFO][2021-10-0514:59:45.618][nioEventLoopGroup-2-1][c.g.h.r.c.c.RpcClient.channelRead0]-[Client]responseis:DefaultRpcResponse{seqId='62e126d9a0334399904509acf8dfe0bb',error=null,result=CalculateResponse{success=true,sum=30}}
  44. [INFO][2021-10-0514:59:45.619][main][c.g.h.r.c.i.i.DefaultInvokeService.getResponse]-[Client]seq62e126d9a0334399904509acf8dfe0bb對應結果已經獲取:DefaultRpcResponse{seqId='null',error=com.github.houbb.rpc.common.exception.RpcTimeoutException,result=null}
  45. ...

可以發現,超時異常。

不足之處

對于超時的處理可以拓展為雙向的,比如服務端也可以指定超時限制,避免資源的浪費。

原文鏈接:https://www.toutiao.com/a7018512258305278500/

延伸 · 閱讀

精彩推薦
  • Java教程Java實現搶紅包功能

    Java實現搶紅包功能

    這篇文章主要為大家詳細介紹了Java實現搶紅包功能,采用多線程模擬多人同時搶紅包,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙...

    littleschemer13532021-05-16
  • Java教程小米推送Java代碼

    小米推送Java代碼

    今天小編就為大家分享一篇關于小米推送Java代碼,小編覺得內容挺不錯的,現在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧...

    富貴穩中求8032021-07-12
  • Java教程Java使用SAX解析xml的示例

    Java使用SAX解析xml的示例

    這篇文章主要介紹了Java使用SAX解析xml的示例,幫助大家更好的理解和學習使用Java,感興趣的朋友可以了解下...

    大行者10067412021-08-30
  • Java教程xml與Java對象的轉換詳解

    xml與Java對象的轉換詳解

    這篇文章主要介紹了xml與Java對象的轉換詳解的相關資料,需要的朋友可以參考下...

    Java教程網2942020-09-17
  • Java教程20個非常實用的Java程序代碼片段

    20個非常實用的Java程序代碼片段

    這篇文章主要為大家分享了20個非常實用的Java程序片段,對java開發項目有所幫助,感興趣的小伙伴們可以參考一下 ...

    lijiao5352020-04-06
  • Java教程升級IDEA后Lombok不能使用的解決方法

    升級IDEA后Lombok不能使用的解決方法

    最近看到提示IDEA提示升級,尋思已經有好久沒有升過級了。升級完畢重啟之后,突然發現好多錯誤,本文就來介紹一下如何解決,感興趣的可以了解一下...

    程序猿DD9332021-10-08
  • Java教程Java BufferWriter寫文件寫不進去或缺失數據的解決

    Java BufferWriter寫文件寫不進去或缺失數據的解決

    這篇文章主要介紹了Java BufferWriter寫文件寫不進去或缺失數據的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望...

    spcoder14552021-10-18
  • Java教程Java8中Stream使用的一個注意事項

    Java8中Stream使用的一個注意事項

    最近在工作中發現了對于集合操作轉換的神器,java8新特性 stream,但在使用中遇到了一個非常重要的注意點,所以這篇文章主要給大家介紹了關于Java8中S...

    阿杜7482021-02-04
主站蜘蛛池模板: 深夜影院a | 无人在线高清免费看 | 极品一区 | 男人狂躁女人下半身 | 国产真实伦对白在线播放 | 午夜福利理论片在线播放 | 亚洲精品视频免费在线观看 | 精品亚洲麻豆1区2区3区 | 国色天香社区在线视频播放 | 久久国产精品福利影集 | 精品国产午夜久久久久九九 | 精品欧美| 亚州免费一级毛片 | 午夜福利08550 | 95视频免费看片 | 手机看片日韩1024你懂的首页 | 久久这里只有精品视频e | 日韩欧美一区二区三区免费观看 | 古代翁熄乩伦小说h | 日本啊v在线观看 | 成人精品免费网站 | 久久精品国产亚洲AV热无遮挡 | 操国产美女| 国产精品日韩在线观看 | 欧美日韩一区二区三区在线视频 | 四虎在线网站 | 成年人视频免费在线播放 | 99久9在线视频 | 久久高清一级毛片 | 处女私拍| 被18号每天强行榨干acg | 窝窝午夜理伦影院 | 93版高校教师 | 69一级毛片 | 色美| 黑人巨大和日本娇小中出 | 日韩毛片免费 | 国产成人精品s8sp视频 | 国内久久婷婷综合欲色啪 | 亚洲精品AV无码永久无码 | 91在线精品老司机免费播放 |