自定義 webflux 容器配置
配置代碼
@Component public class ContainerConfig extends ReactiveWebServerFactoryCustomizer { public ContainerConfig(ServerProperties serverProperties) { super(serverProperties); } @Override public void customize(ConfigurableReactiveWebServerFactory factory) { super.customize(factory); NettyReactiveWebServerFactory nettyFactory = (NettyReactiveWebServerFactory) factory; nettyFactory.setResourceFactory(null); nettyFactory.addServerCustomizers(server -> server.tcpConfiguration(tcpServer -> tcpServer.runOn(LoopResources.create("mfilesvc", Runtime.getRuntime().availableProcessors() * 4, Runtime.getRuntime().availableProcessors() * 8, true)) .selectorOption(CONNECT_TIMEOUT_MILLIS, 200) ).channelGroup(new ChannelGroup()) ); } @Override public int getOrder() { return -10; } }
服務(wù)重啟時(shí) 報(bào)錯(cuò)
SpringContextShutdownHook Socket couldn"t be stopped within 3000ms
解決方案
初識Spring WebFlux
在我的認(rèn)識中,大部分人都在用SpringMVC(包括我自己)。在最近的學(xué)習(xí)中,發(fā)現(xiàn)spring5中有一個(gè)和SpringMVC平級的東西Spring WebFlux,接下來初步認(rèn)識一下這是個(gè)什么東東?
Spring Web新的改變
眾所周知Spring MVC是同步阻塞的IO模型,當(dāng)我們在處理一個(gè)耗時(shí)的任務(wù)時(shí),如上傳文件,服務(wù)器的處理線程會一直處于等待狀態(tài),等待文件的上傳,這期間什么也做不了,等到文件上傳完畢后可能需要寫入,寫入的過程線程又只能在那等待,非常浪費(fèi)資源。為了避免這類資源的浪費(fèi),Spring WebFlux應(yīng)運(yùn)而生,在Spring WebFlux中若文件還沒上傳完畢,線程可以先去做其他事情,當(dāng)文件上傳完畢后會通知線程,線程再來處理,后續(xù)寫入也是類似的,通過異步非阻塞機(jī)制節(jié)省了系統(tǒng)資源,極大的提高了系統(tǒng)的并發(fā)量。這兩種形式,是不是像極了BIO和NIO這兩種形式,實(shí)際上,SpringMVC和Spring WebFlux也就是這兩種IO特點(diǎn)的體現(xiàn)。
以下為官網(wǎng)的介紹:
Spring WebFlux的特性
1.異步非阻塞
如上文所說,線程不需要一直處于等待狀態(tài),Spring WebFlux很好的體現(xiàn)了NIO的異步非阻塞思想。
2.響應(yīng)式(reactive)編程
響應(yīng)式編程是一種新的編程風(fēng)格,其特點(diǎn)是異步和并發(fā)、事件驅(qū)動、推送PUSH機(jī)制一級觀察者模式的衍生。reactive引用允許開發(fā)者構(gòu)建事件驅(qū)動,可擴(kuò)展性,彈性的反應(yīng)系統(tǒng):提供高度敏感的實(shí)時(shí)用戶體驗(yàn)感覺,可伸縮性和彈性的引用程序棧的支持,隨時(shí)可以部署在多核和云計(jì)算架構(gòu)。
Reactive的主要接口:
- Publisher:發(fā)布者,數(shù)據(jù)的生產(chǎn)端
- Subscriber:消費(fèi)者,此處可以定義獲取到數(shù)據(jù)后響應(yīng)的操作
- Processor:消費(fèi)者與發(fā)布者之間的數(shù)據(jù)處理
- back pressure:背壓,消費(fèi)者告訴發(fā)布者自己能處理多少數(shù)據(jù)
消費(fèi)者的回調(diào)方法:
- onSubscribe:訂閱關(guān)系處理,用它來響應(yīng)發(fā)布者
- onNext:接收到數(shù)據(jù)后會響應(yīng)的方法
- onError:出現(xiàn)錯(cuò)誤時(shí)處理的方法
- onComplete:任務(wù)完成后響應(yīng)的方法
3.適配多種web容器
既然Spring WebFlux很好的體現(xiàn)了NIO的異步非阻塞思想。作為首屈一指的NIO框架netty,便是Spring WebFlux默認(rèn)的運(yùn)行容器。此外,大家熟悉的Tomcat、Jetty等Servlet容器,也能運(yùn)行Spring WebFlux,前提是容器需要支持Servlet3.1,因?yàn)榉亲枞鸌O是使用了Servlet3.1的特性。
Spring WebFlux簡單實(shí)踐
本文默認(rèn)開發(fā)環(huán)境是JDK8,開發(fā)工具是IDEA。實(shí)踐分兩部分內(nèi)容,第一部分與SpringMVC對比開發(fā)中的不一樣的地方;第二部分為Spring WebFlux獨(dú)有的響應(yīng)式編程的簡單實(shí)踐。
在webflux中,Mono代表返回0或1個(gè)元素(相當(dāng)于一個(gè)對象)。Flux代表返回0-n個(gè)元素(相當(dāng)于集合)
1.工程創(chuàng)建
新建springboot工程。
選擇Web -> Spring Reactive Web 創(chuàng)建
或者在springboot工程中pom文件添加依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency>
2.Controller中與SpringMVC的對比
在controller中,webflux的寫法可以和springMVC的寫法類似
@RestController @Slf4j public class UserController { @RequestMapping("/index") public String index(){ log.info("springmvc index begin"); String result="cc666"; log.info("springmvc index end"); return result; } @RequestMapping("/index2") public Mono<String> index2(){ log.info("webflux index begin"); Mono<String> result=Mono.just("666cc"); log.info("webflux index end"); return result; } }
3.異步非阻塞的體現(xiàn)
上面已經(jīng)實(shí)現(xiàn)了初步的數(shù)據(jù)返回,不過webflux和springmvc目前來看沒有什么區(qū)別,已知springmvc線程執(zhí)行時(shí)會阻塞的,webflux線程是異步非阻塞的。下面修改一下代碼,在獲取數(shù)據(jù)的時(shí)候加一些額外的耗時(shí)操作,看看webflux是否是真的異步非阻塞
@RestController @Slf4j @AllArgsConstructor public class UserController { /** * 模擬耗時(shí)查詢操作 */ public String createStr(){ try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } return "cc666cc"; } @RequestMapping("/index") public String index(){ log.info("springmvc index begin"); String result=this.createStr(); log.info("springmvc index end"); return result; } @RequestMapping("/index2") public Mono<String> index2(){ log.info("webflux index begin"); Mono<String> result=Mono.fromSupplier(()->this.createStr()); log.info("webflux index end"); return result; } }
通過日志結(jié)果,可以很明顯的發(fā)現(xiàn),雖然前端頁面展示的效果是一樣的,但springmvc是等待后返回結(jié)果;而webflux是先執(zhí)行,等有結(jié)果后,再返回結(jié)果。由此體現(xiàn)了webflux異步非阻塞的特性
springmvc
2020-08-04 21:28:57.430 INFO 14156 --- [ctor-http-nio-2] c.w.webflux.controller.UserController : springmvc index begin
2020-08-04 21:29:00.430 INFO 14156 --- [ctor-http-nio-2] c.w.webflux.controller.UserController : springmvc index end
webflux
2020-08-04 21:29:09.640 INFO 14156 --- [ctor-http-nio-2] c.w.webflux.controller.UserController : webflux index begin
2020-08-04 21:29:09.641 INFO 14156 --- [ctor-http-nio-2] c.w.webflux.controller.UserController : webflux index end
4.添加數(shù)據(jù)庫支持
對于數(shù)據(jù)庫的支持,webflux用到的是r2dbc這樣一個(gè)東西。
R2DBC(Reactive Relational Database Connectivity)是一個(gè)使用反應(yīng)式驅(qū)動集成關(guān)系數(shù)據(jù)庫的孵化器。Spring Data R2DBC運(yùn)用熟悉的Spring抽象和repository 支持R2DBC。基于此,在響應(yīng)式程序棧上使用關(guān)系數(shù)據(jù)訪問技術(shù),構(gòu)建由Spring驅(qū)動的程序?qū)⒆兊梅浅:唵巍?/p>
在pom中引入依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-r2dbc</artifactId> </dependency> <dependency> <groupId>com.github.jasync-sql</groupId> <artifactId>jasync-r2dbc-mysql</artifactId> <version>1.1.3</version> </dependency>
在application.yml中加入數(shù)據(jù)源:
spring: r2dbc: url: r2dbc:mysql://127.0.0.1:3306/study username: xxx password: xxx
5.Dao的編寫
Dao的編寫和springmvc中類似,本文中繼承了ReactiveCrudRepository類,是Repository的一個(gè)實(shí)現(xiàn)類,其中實(shí)現(xiàn)了簡單的crud操作,model和dao的實(shí)現(xiàn):
@Table("user") @Data @AllArgsConstructor @NoArgsConstructor public class User { @Id private Long id; private String username; private String password; }
public interface UserDao extends ReactiveCrudRepository<User,Long> { }
6.Controller的編寫
controller的編寫還是和springmvc類似,這里采用RESTful的方式
@RestController @AllArgsConstructor public class UserController { private final UserDao userDao; @GetMapping("/findAll") public Flux<User> findAll(){ return userDao.findAll(); } @PostMapping("/save") public Mono save(@RequestBody User user){ return this.userDao.save(user); } @DeleteMapping("/delete/{id}") public Mono delete(@PathVariable Long id){ return this.userDao.deleteById(id); } @GetMapping("/get/{id}") public Mono get(@PathVariable Long id){ return this.userDao.findById(id); } }
7.響應(yīng)式編程Handler的編寫
在使用上,webflux可以和springmvc類似,不過webflux也有自己的一套響應(yīng)式編程的寫法,先定義handler,類似于controller,不過只有業(yè)務(wù)處理的代碼,其中就用到了reactive模擬的request(ServerRequest )和response(ServerResponse),同樣的實(shí)現(xiàn)簡單的crud功能:
@Component @AllArgsConstructor public class UserHandler { private final UserDao userDao; public Mono<ServerResponse> saveUser(ServerRequest request){ Mono<User> mono=request.bodyToMono(User.class); User user = mono.block(); return ServerResponse.ok().build(this.userDao.save(user).then()); } public Mono<ServerResponse> deleteById(ServerRequest request){ Long id=Long.parseLong(request.pathVariable("id")); return ServerResponse.ok().build(this.userDao.deleteById(id).then()); } public Mono<ServerResponse> getByid(ServerRequest request){ Long id=Long.parseLong(request.pathVariable("id")); Mono<User> mono = this.userDao.findById(id); return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(mono,User.class); } public Mono<ServerResponse> findAll(ServerRequest request){ Flux<User> all = this.userDao.findAll(); return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(all,User.class); } }
8.響應(yīng)式編程Route的編寫
在handler中我們編寫了處理代碼,但是怎么通過請求地址訪問呢?
學(xué)習(xí)過vue的老鐵們應(yīng)該知道,vue是通過路由定義地址和頁面的對應(yīng)關(guān)系的,vue也是響應(yīng)式編程的一種體現(xiàn)。webflux中也是通過類似的方式來實(shí)現(xiàn)的,在Route中定義規(guī)則:
@Configuration public class UserRoute { @Bean public RouterFunction<ServerResponse> routeUser(UserHandler userHandler){ return RouterFunctions .route(RequestPredicates.GET("findAll2") .and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),userHandler::findAll) .andRoute(RequestPredicates.GET("/get2/{id}") .and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),userHandler::getByid) .andRoute(RequestPredicates.DELETE("/delete2/{id}") .and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),userHandler::deleteById) .andRoute(RequestPredicates.POST("/save2") .and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),userHandler::saveUser); } }
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持服務(wù)器之家。
原文鏈接:https://blog.csdn.net/IT_liuzhiyuan/article/details/105871279