Reactor中的onErrorContinue 和 onErrorResume

作者:十二又十三 时间:2022-12-01 14:30:58 

前言

这似乎是 Reactor 的热门搜索之一,至少当我在谷歌中输入 onErrorContinue 时,onErrorResume 会在它旁边弹出。让我把我的测试代码和我的一些解释粘贴在下面。

1 基础功能

这是一个简单的函数,将 5 个连续的数字分别乘以 2,然后相加,当 i==2 时抛出一个异常:

public static void main(String... args) {
     Flux.range(1,5)
             .doOnNext(i -> System.out.println("input=" + i))
             .map(i -> i == 2 ? i / 0 : i)
             .map(i -> i * 2)
             .reduce((i,j) -> i+j)
             .doOnNext(i -> System.out.println("sum=" + i))
             .block();
 }

显然,输出如下:

input=1
input=2
Exception in thread "main" java.lang.ArithmeticException: / by zero

2 只有 onErrorResume ()

public static void main(String... args) {
     Flux.range(1,5)
             .doOnNext(i -> System.out.println("input=" + i))
             .map(i -> i == 2 ? i / 0 : i)
             .map(i -> i * 2)
             .onErrorResume(err -> {
                 log.info("onErrorResume");
                 return Flux.empty();
             })
             .reduce((i,j) -> i+j)
             .doOnNext(i -> System.out.println("sum=" + i))
             .block();
 }

输出如下:

input=1
input=2
17:40:47.828 [main] INFO com.example.demo.config.TestRunner - onErrorResume
sum=2

正如官方文档描述(onErrorResume)的那样,onErrorResume 用返回内容替换 Flux,因此在 2 之后的数据不会被处理。唯一值得一提的是,onErrorResume() 不必马上在错误之后捕获异常。在 onErrorContinue() 的官网文档中(onErrorContinue),只有 onErrorContinue() 强调了了 upstream 关键词,但显然,它还有其他含义。

3 只有 onErrorContinue()

public static void main(String... args) {
 Flux.range(1,5)
         .doOnNext(i -> System.out.println("input=" + i))
         .map(i -> i == 2 ? i / 0 : i)
         .map(i -> i * 2)
         .onErrorContinue((err, i) -> {log.info("onErrorContinue={}", i);})
         .reduce((i,j) -> i+j)
         .doOnNext(i -> System.out.println("sum=" + i))
         .block();
}

输出:

input=1
input=2
17:43:10.656 [main] INFO com.example.demo.config.TestRunner - onErrorContinue=2
input=3
input=4
input=5
sum=26

显然,onErrorContinue 会丢弃错误数据 2, 然后继续数字 3 直到 5。

4 onErrorResume() 然后 onErrorContinue()

public static void main(String... args) {
   Flux.range(1,5)
           .doOnNext(i -> System.out.println("input=" + i))
           .map(i -> i == 2 ? i / 0 : i)
           .map(i -> i * 2)
           .onErrorResume(err -> {
               log.info("onErrorResume");
               return Flux.empty();
           })
           .onErrorContinue((err, i) -> {log.info("onErrorContinue={}", i);})
           .reduce((i,j) -> i+j)
           .doOnNext(i -> System.out.println("sum=" + i))
           .block();
}

输出和上面一样:

input=1
input=2
17:47:05.789 [main] INFO com.example.demo.config.TestRunner - onErrorContinue=2
input=3
input=4
input=5
sum=26

这样的结果,你想到了吗?onErrorContinue() 会在 onErrorResume() 得到错误之前处理这个错误。当两个错误处理函数在同一个函数中的时候很明显,但是当你的函数中只有 onErrorResume(),而一些调用者实际上有 onErrorContinue() 时,你的 onErrorResume() 没有被调用的原因可能就不那么明显了。

5 使用 onErrorResume() 模拟 onErrorContinue()

有些博客建议我们完全不用 onErrorContinue(),且在所有场景中仅用 onErrorResume()。但是上述示例已经展示了它们会产生不同的结果。那我们怎么实现呢?

public static void main(String... args) {
   Flux.range(1,5)
           .doOnNext(i -> System.out.println("input=" + i))
           .flatMap(i -> Mono.just(i)
                   .map(j -> j == 2 ? j / 0 : j)
                   .map(j -> j * 2)
                   .onErrorResume(err -> {
                       System.out.println("onErrorResume");
                       return Mono.empty();
                   })
           )
           .reduce((i,j) -> i+j)
           .doOnNext(i -> System.out.println("sum=" + i))
           .block();
}

因此,本质上是将可能在 flatMap 或 concatMap 中抛出错误的操作包装起来,并在其上使用 onErrorResume()。这样,它会产生相同的结果:

input=1
input=2
onErrorResume
input=3
input=4
input=5
sum=26

6 使用 onErrorResume() 和下游的 onErrorContinue() 模拟 onErrorContinue()

有时候,onErrorContinue() 放在调用程序中,您无法控制它。但你仍然需要 onErrorResume()。你该怎么办?

public static void main(String... args) {
   Flux.range(1,5)
           .doOnNext(i -> System.out.println("input=" + i))
           .flatMap(i -> Mono.just(i)
                   .map(j -> j == 2 ? j / 0 : j)
                   .map(j -> j * 2)
                   .onErrorResume(err -> {
                       System.out.println("onErrorResume");
                       return Mono.empty();
                   })
                   .onErrorStop()
           )
           .onErrorContinue((err, i) -> {log.info("onErrorContinue={}", i);})
           .reduce((i,j) -> i+j)
           .doOnNext(i -> System.out.println("sum=" + i))
           .block();
}

秘诀是在 onErrorResume() 代码块的末尾添加 onErrorStop() ——这会阻塞 onErrorContinue(),这样它就不会在 onErrorResume() 之前占用错误。尝试删除 onErrorStop(),你会看到 onErrorContinue() 在 onErrorResume 之前弹出。

来源:https://blog.csdn.net/Prepared/article/details/126473838

标签:Reactor,onErrorContinue,onErrorResume
0
投稿

猜你喜欢

  • 结合线程池实现apache kafka消费者组的误区及解决方法

    2023-08-06 15:40:31
  • 一文详解如何在控制台显示MyBatis的SQL语句

    2023-01-09 06:43:38
  • java获取注册ip实例

    2023-11-03 23:01:12
  • JPA Specification常用查询+排序实例

    2023-11-23 04:56:32
  • 深入学习java中的Groovy 和 Scala 类

    2023-04-09 10:51:29
  • Java增加自定义注解进行校验入参详解

    2023-01-05 13:34:25
  • Android Studio3.2中导出jar包的过程详解

    2021-10-14 07:06:05
  • SpringBoot在RequestBody中使用枚举参数案例详解

    2022-12-15 05:16:30
  • C# 重写ComboBox实现下拉任意组件的方法

    2022-01-24 03:07:29
  • Android 设置应用全屏的两种解决方法

    2023-05-07 01:27:50
  • MyBatis的9种动态标签详解

    2021-06-21 19:03:40
  • C#检查字符串是否是合法URL地址的方法

    2022-08-09 16:24:06
  • Spring Boot实现分布式锁的自动释放的示例代码

    2023-10-17 11:06:24
  • Qt for Android开发实例教程

    2023-06-27 10:00:39
  • C++内存模型与名称空间概念讲解

    2023-07-15 03:43:43
  • springboot如何使用logback-spring配置日志格式,并分环境配置

    2023-11-10 04:37:34
  • 浅谈C#与Java两种语言的比较

    2023-09-26 13:05:19
  • Android视频播放器屏幕左侧边随手指上下滑动亮度调节功能的原理实现

    2022-09-12 23:27:10
  • 基于Transactional事务的使用以及注意说明

    2022-02-24 12:23:08
  • 总结Java的Struts框架的异常处理方法

    2022-04-12 01:29:44
  • asp之家 软件编程 m.aspxhome.com