关于springboot响应式编程整合webFlux的问题

作者:pshdhx_albert 时间:2023-12-07 07:25:55 

在servlet3.0标准之前,是每一个请求对应一个线程。如果此时一个线程出现了高延迟,就会产生阻塞问题,从而导致整个服务出现严重的性能情况,因为一旦要调用第三方接口,就有可能出现这样的操作了。早期的处理方式只能是手工控制线程。

在servlet3.0标准之后,为了解决此类问题,所以提供了异步响应的支持。在异步响应处理结构中,可以将耗时操作的部分交由一个专属的异步线程进行响应处理,同时请求的线程资源将被释放,并将该线程返回到线程池中,以供其他用户使用,这样的操作机制将极大的提升程序的并发性能。

对于以上给出的响应式编程支持,仅仅是一些原生的支持模式,而现在既然基于springboot程序开发,那么就需要考虑一些更简单的整合。

而在spring中实现响应式编程,那么则需要使用到spring webFlux,该组件是一个重新构建的且基于Reactive Streams标准实现的异步非阻塞Web开发框架,以Reactor开发框架为基础,可以更加容易实现高并发访问下的请求处理模型。在springboot2.x版本中提供了webFlux依赖模块,该模块有两种模型实现:一种是基于功能性端点的方式,另一种是基于SpringMVC注解方式。

 Maven引入

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

整合处理器:

package com.example.oldguy.myWebFlux.handler;

import com.example.oldguy.myVo.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
@Component
@Slf4j
public class MessageHandler {
   public Mono<Message> echoHandler(Message message){
       log.info("【{}】业务层接收处理数据:{}",Thread.currentThread().getName());
       message.setTitle("【】"+Thread.currentThread().getName()+"】"+message.getTitle());
       message.setContent("【】"+Thread.currentThread().getName()+"】"+message.getContent());
       return Mono.create(item->item.success(message)); //实现数据响应
   }
}

整合控制器:

package com.example.oldguy.myController;

import com.example.oldguy.myVo.Message;
import com.example.oldguy.myWebFlux.handler.MessageHandler;
import com.example.oldguy.mytask.MyThreadTask;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.bind.WebDataBinder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.InitBinder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import org.springframework.web.context.request.async.DeferredResult;
import javax.servlet.http.HttpServletRequest;
import java.beans.PropertyEditorSupport;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* 异步线程的处理机制
*/
@RestController
@RequestMapping("/message/*")
@Slf4j
@Api(tags = "异步处理")
public class AsyncController {
   @Autowired
   private ThreadPoolTaskExecutor threadPoolTaskExecutor;
   private MyThreadTask task;
   private MessageHandler messageHandler;
   /**
    * 日期转换
    * @param
    * @return
    */
   private static final DateTimeFormatter LOCAL_DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd");
   @InitBinder
   public void initBinder(WebDataBinder binder){
       binder.registerCustomEditor(Date.class,new PropertyEditorSupport(){
           @Override
           public void setAsText(String text) throws IllegalArgumentException {
               LocalDate localDate = LocalDate.parse(text,LOCAL_DATE_FORMAT);
               Instant instant = localDate.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant();
               super.setValue(Date.from(instant));
           }
       });
   }
   @GetMapping("runnable")
   @ApiOperation("异常处理Runnable")
   public Object message(String message) {
       log.info("外部线程:{}", Thread.currentThread().getName());
       HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
       DeferredResult<String> result = new DeferredResult<>(6000L); //设置异步响应
       this.threadPoolTaskExecutor.execute(new Runnable() { //线程核心任务
           @SneakyThrows
           public void run() {
               log.info("内部线程:{}",Thread.currentThread().getName());
               TimeUnit.SECONDS.sleep(7);
               result.setResult("[echo]"+message); //执行最终的响应
       result.onCompletion(new Runnable() { //完成处理线程
               log.info("完成线程:{}",Thread.currentThread().getName()); //日志输出
       result.onTimeout(new Runnable() {
               log.info("超时线程:{}",Thread.currentThread().getName());
               result.setResult("【请求超时】"+request.getRequestURI()); //超时路径
       return  result;
   @GetMapping("task")
   @ApiOperation("task异步任务开启")
   public Object messageTask(String message){
       log.info("外部线程{}",Thread.currentThread().getName());
       this.task.startTaskHander();
       return "【echo】"+message;
   @GetMapping("webflux")
   @ApiOperation("整合webflux")
   public Object echo(Message message){
       log.info("接收用户信息,用户方发送的参数为message={}",message);
       return this.messageHandler.echoHandler(message);
}

页面响应:

关于springboot响应式编程整合webFlux的问题

控制台响应:

2021-11-30 15:04:06.946  INFO 22884 --- [nio-1999-exec-1] c.e.oldguy.myController.AsyncController  : 接收用户信息,用户方发送的参数为message=Message(title=pansd, pubdate=Tue Nov 30 00:00:00 CST 2021, content=come on baby)
2021-11-30 15:04:06.947  INFO 22884 --- [nio-1999-exec-1] c.e.o.myWebFlux.handler.MessageHandler   : 【http-nio-1999-exec-1】业务层接收处理数据:Message(title=pansd, pubdate=Tue Nov 30 00:00:00 CST 2021, content=come on baby)

 webFlux响应map和List


//webFlux响应集合
   public Flux<Message> list(Message message){
       List<Message> messageList = new ArrayList<>();
       for(int i=0;i<10;i++){
           Message m = new Message();
           m.setTitle(i+"--"+message.getTitle());
           m.setContent(i+"--"+message.getContent());
           m.setPubdate(message.getPubdate());
           messageList.add(m);
       }
       return Flux.fromIterable(messageList);
   }
   public Flux<Map.Entry<String,Message>> map(Message message){
       Map<String,Message> map = new HashMap<>();
       for(int i=0;i<10;i++){
           Message m = new Message();
           m.setTitle(i+"--"+message.getTitle());
           m.setContent(i+"--"+message.getContent());
           m.setPubdate(message.getPubdate());
           map.put("pansd-"+i,m);
       }
//        Set<Map.Entry<String, Message>> entries = map.entrySet();
       return Flux.fromIterable(map.entrySet());
   }
@GetMapping("webfluxList")
@ApiOperation("整合webfluxList")
public Object echoList(Message message){
   log.info("接收用户信息,用户方发送的参数为message={}",message);
   return this.messageHandler.list(message);
}

@GetMapping("webfluxMap")
@ApiOperation("整合webfluxMap")
public Object echoMap(Message message){
   log.info("接收用户信息,用户方发送的参数为message={}",message);
   return this.messageHandler.map(message);
}

来源:https://blog.csdn.net/pshdhx/article/details/121636277

标签:springboot,响应式编程,webFlux
0
投稿

猜你喜欢

  • ehcache模糊批量移除缓存的方法

    2023-01-11 12:30:37
  • Java jvm中Code Cache案例详解

    2022-02-04 17:00:53
  • 如何从UA分辨出Android设备类型

    2023-09-03 00:37:44
  • java中public class与class的区别详解

    2023-04-09 11:49:40
  • java使用common-fileupload实现文件上传

    2022-03-06 03:21:08
  • 将Java对象序列化成JSON和XML格式的实例

    2022-09-23 23:39:11
  • MyBatis动态SQL表达式详解

    2023-11-29 00:42:36
  • IDEA中项目集成git提交代码的详细步骤

    2021-09-08 04:33:39
  • C#中怎样从指定字符串中查找并替换字符串?

    2023-09-30 14:26:24
  • Java设计模式之命令模式_动力节点Java学院整理

    2023-12-11 13:32:03
  • Java序列化与反序列化的实例分析讲解

    2022-09-16 05:58:39
  • 详解Java中的ThreadLocal

    2022-08-19 17:48:43
  • Spring Boot Admin实践详解

    2023-08-25 06:57:53
  • htmlcleaner使用方法及xpath语法初探

    2023-04-11 07:14:06
  • 关于线程池你不得不知道的一些设置

    2021-06-08 11:34:45
  • 详解C#如何实现读写ini文件

    2022-02-04 23:15:27
  • C#实现文件与字符串互转的方法详解

    2023-04-26 03:18:12
  • Java设计模式之工厂模式

    2023-12-18 01:40:50
  • Android基础教程数据存储之文件存储

    2023-08-05 18:18:10
  • Java 使用 HttpClient 发送 GET请求和 POST请求

    2023-07-23 07:56:13
  • asp之家 软件编程 m.aspxhome.com