RxJava 触发流基本原理源码解析

作者:itbird01 时间:2023-06-24 06:02:57 

本节,我们从Rxjava使用代码入手,去结合自己已有的知识体系,加查阅部分源码验证的方式,来一起探索一下Rxjava实现的基本原理。

为了本文原理分析环节,可以被更多的人理解、学习,所以小编从初学者的角度,从使用入手,一点点的分析了其中的源码细节、思想,建议大家随着本文的章节步骤,一步一步的来阅读,才能更快、更好的理解Rxjava的真正的思想精髓,也为我们之后的实践课程留一个好的底子。

触发流

到目前为止,我们讲了构建流、订阅流,但是依然没有触发真正的observer中的事件,例如:

@Override
  public void onSubscribe(@NonNull Disposable d) {
      Log.d(TAG, "onSubscribe");
  }
  @Override
  public void onNext(@NonNull String s) {
      Log.d(TAG, "onNext s = " + s);
  }
  @Override
  public void onError(@NonNull Throwable e) {
      Log.d(TAG, "onError");
  }
  @Override
  public void onComplete() {
      Log.d(TAG, "onComplete");
  }

各位看官,莫急莫急,且听老衲娓娓道来。

还记得上面的订阅流吗?订阅流从右往左执行的,执行到最后的observable,执行了它的subscribe方法。我们从使用代码知道,最左端的observable是啥来着,大家还记得吗?当然是ObservableJust

private void test() {
//第一步:just调用
   Observable.just("https://img-blog.csdn.net/20160903083319668")
   //第二步:map调用
           .map(new Function<String, Bitmap>() {
               @Override
               public Bitmap apply(String s) throws Exception {
                   //Bitmap bitmap = downloadImage(s);
                   return null;
               }
           })
           //第三步:subscribeOn、observeOn调用
           .subscribeOn(Schedulers.newThread())
           .observeOn(AndroidSchedulers.mainThread())
           //第四步:subscribe调用
           .subscribe(new Observer<Bitmap>() {
               @Override
               public void onSubscribe() {
                   Log.d(TAG, "onSubscribe");
               }
               @Override
               public void onNext(Bitmap s) {
                   Log.d(TAG, "onNext s = " + s);
               }
               @Override
               public void onError(Throwable e) {
                   Log.e(TAG, "onError ", e);
               }
               @Override
               public void onComplete() {
                   Log.d(TAG, "onComplete");
               }
           });
}

我们就顺坡下驴,看一下ObservableJust的subscribe方法做啥了

public final class ObservableJust<T> extends Observable<T> implements ScalarSupplier<T> {
   private final T value;
   public ObservableJust(final T value) {
       this.value = value;
   }
   @Override
   protected void subscribeActual(Observer<? super T> observer) {
       ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
       observer.onSubscribe(sd);
       sd.run();
   }
   @Override
   public T get() {
       return value;
   }
}

仔细一看,这里面没有subscribe方法,那么肯定就是调用父类observable的subscribe方法了

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
   ObjectHelper.requireNonNull(observer, "observer is null");
   try {
       //对象封装,暂时不是重点,我们跳过
       observer = RxJavaPlugins.onSubscribe(this, observer);
       //判空
       ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
       subscribeActual(observer);
   } catch (NullPointerException e) { // NOPMD
       throw e;
   } catch (Throwable e) {
       Exceptions.throwIfFatal(e);
       // can't call onError because no way to know if a Disposable has been set or not
       // can't call onSubscribe because the call might have set a Subscription already
       RxJavaPlugins.onError(e);
       NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
       npe.initCause(e);
       throw npe;
   }
}

大家看到这里,其实关键在于,最终调用了一个subscribeActual方法,所以我们继续看子类ObservableJust的subscribeActual方法干啥了?

@Override
   protected void subscribeActual(Observer<? super T> observer) {
       ScalarDisposable<T> sd = new ScalarDisposable<>(observer, value);
       observer.onSubscribe(sd);
       sd.run();
   }

接续根据ScalarDisposable的run方法

public static final class ScalarDisposable<T>
   extends AtomicInteger
   implements QueueDisposable<T>, Runnable {
       private static final long serialVersionUID = 3880992722410194083L;
       final Observer<? super T> observer;
       final T value;
//...省略很多代码
       @Override
       public void run() {
           if (get() == START && compareAndSet(START, ON_NEXT)) {
           //可以看到这里执行了onNext、onComplete方法
               observer.onNext(value);
               if (get() == ON_NEXT) {
                   lazySet(ON_COMPLETE);
                   observer.onComplete();
               }
           }
       }
   }

小结

看到这里,我们知道了,开始一层一层的从左往右去调用observer的相关方法了。 由订阅流可知,每层的observable实际上拥有下一层的observer的代理类,所以自然而然,从最左边开始调用observer的相关方法开始,触发流,就是从左往右,一层一层的剥开之前包裹的observer,然后顺序调用里面的onNext、onComplete等方法。 不信,我们挑一个ObservableMap来验证一下。

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
   final Function<? super T, ? extends U> function;
   public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
       super(source);
       this.function = function;
   }
   @Override
   public void subscribeActual(Observer<? super U> t) {
       source.subscribe(new MapObserver<T, U>(t, function));
   }
   static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
       final Function<? super T, ? extends U> mapper;
       MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
           super(actual);
           this.mapper = mapper;
       }
       @Override
       public void onNext(T t) {
           if (done) {
               return;
           }
           if (sourceMode != NONE) {
               downstream.onNext(null);
               return;
           }
           U v;
           try {
               v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
           } catch (Throwable ex) {
               fail(ex);
               return;
           }
           //此处调用了下游的observer的onNext方法
           downstream.onNext(v);
       }
   }
}

可以看到里面,的确调用了下游的observer的onNext方法。

来源:https://juejin.cn/post/7182841957583224890

标签:RxJava,触发流,原理
0
投稿

猜你喜欢

  • 如何使用C#将Tensorflow训练的.pb文件用在生产环境详解

    2023-02-12 21:25:59
  • SpringBoot集成Redis流程详解

    2022-11-08 21:38:11
  • Spring Security角色继承分析

    2022-02-03 03:39:12
  • Android7.0中关于ContentProvider组件详解

    2023-10-30 19:48:29
  • 基于SpringBoot启动类静态资源路径问题

    2023-07-20 05:53:16
  • C# 弹出窗口show()和showdialog()的两种方式

    2022-05-08 17:12:36
  • 通过Java带你了解网络IO模型

    2022-12-25 10:59:22
  • Java文件操作实例详解

    2023-11-25 10:29:40
  • springboot整合JSR303参数校验与全局异常处理的方法

    2023-10-06 01:31:40
  • Android依赖注入框架Dagger2的使用方法

    2021-12-19 20:25:39
  • android studio3.0.1无法启动Gradle守护进程的解决方法

    2022-05-03 01:06:28
  • SpringBoot 使用log4j2的配置过程

    2021-11-09 04:15:25
  • 详解C# 虚方法virtual

    2022-11-05 09:41:17
  • Java 开启多线程常见的4种方法

    2023-11-23 02:30:10
  • Android如何快速集成腾讯Bugly

    2021-11-26 00:52:26
  • Java实现堆排序(大根堆)的示例代码

    2023-09-13 15:24:01
  • 详解房卡麻将分析系列 "牌局回放" 之 播放处理

    2022-12-26 08:54:27
  • 基于JavaMail API收发邮件的方法

    2022-03-10 09:34:24
  • Spring内存缓存Caffeine的基本使用教程分享

    2023-05-26 00:30:33
  • Java实战之基于swing的QQ邮件收发功能实现

    2023-11-15 01:34:26
  • asp之家 软件编程 m.aspxhome.com