Java扩展库RxJava的基本结构与适用场景小结

作者:hi大头鬼hi 时间:2022-12-27 10:03:15 

基本结构

我们先来看一段最基本的代码,分析这段代码在RxJava中是如何实现的。


Observable.OnSubscribe<String> onSubscriber1 = new Observable.OnSubscribe<String>() {
 @Override
 public void call(Subscriber<? super String> subscriber) {
   subscriber.onNext("1");
   subscriber.onCompleted();
 }
};
Subscriber<String> subscriber1 = new Subscriber<String>() {
 @Override
 public void onCompleted() {

}

@Override
 public void onError(Throwable e) {

}

@Override
 public void onNext(String s) {

}
};

Observable.create(onSubscriber1)
   .subscribe(subscriber1);

首先我们来看一下Observable.create的代码


public final static <T> Observable<T> create(OnSubscribe<T> f) {
 return new Observable<T>(hook.onCreate(f));
}

protected Observable(OnSubscribe<T> f) {
 this.onSubscribe = f;
}

直接就是调用了Observable的构造函数来创建一个新的Observable对象,这个对象我们暂时标记为observable1,以便后面追溯。
同时,会将我们传入的OnSubscribe对象onSubscribe1保存在observable1的onSubscribe属性中,这个属性在后面的上下文中很重要,大家留心一下。

接下来我们来看看subscribe方法。


public final Subscription subscribe(Subscriber<? super T> subscriber) {
 return Observable.subscribe(subscriber, this);
}

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
 ...
 subscriber.onStart();
 hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
 return hook.onSubscribeReturn(subscriber);
}

可以看到,subscribe之后,就直接调用了observable1.onSubscribe.call方法,也就是我们代码中的onSubscribe1对象的call方法
,传入的参数就是我们代码中定义的subscriber1对象。call方法中所做的事情就是调用传入的subscriber1对象的onNext和onComplete方法。
这样就实现了观察者和被观察者之间的通讯,是不是很简单?


public void call(Subscriber<? super String> subscriber) {
 subscriber.onNext("1");
 subscriber.onCompleted();
}

RxJava使用场景小结

1.取数据先检查缓存的场景
取数据,首先检查内存是否有缓存
然后检查文件缓存中是否有
最后才从网络中取
前面任何一个条件满足,就不会执行后面的


final Observable<String> memory = Observable.create(new Observable.OnSubscribe<String>() {
 @Override
 public void call(Subscriber<? super String> subscriber) {
   if (memoryCache != null) {
     subscriber.onNext(memoryCache);
   } else {
     subscriber.onCompleted();
   }
 }
});
Observable<String> disk = Observable.create(new Observable.OnSubscribe<String>() {
 @Override
 public void call(Subscriber<? super String> subscriber) {
   String cachePref = rxPreferences.getString("cache").get();
   if (!TextUtils.isEmpty(cachePref)) {
     subscriber.onNext(cachePref);
   } else {
     subscriber.onCompleted();
   }
 }
});

Observable<String> network = Observable.just("network");

//主要就是靠concat operator来实现
Observable.concat(memory, disk, network)
.first()
.subscribeOn(Schedulers.newThread())
.subscribe(s -> {
 memoryCache = "memory";
 System.out.println("--------------subscribe: " + s);
});

2.界面需要等到多个接口并发取完数据,再更新


//拼接两个Observable的输出,不保证顺序,按照事件产生的顺序发送给订阅者
private void testMerge() {
 Observable<String> observable1 = DemoUtils.createObservable1().subscribeOn(Schedulers.newThread());
 Observable<String> observable2 = DemoUtils.createObservable2().subscribeOn(Schedulers.newThread());

Observable.merge(observable1, observable2)
     .subscribeOn(Schedulers.newThread())
     .subscribe(System.out::println);
}

3.一个接口的请求依赖另一个API请求返回的数据

举个例子,我们经常在需要登陆之后,根据拿到的token去获取消息列表。

这里用RxJava主要解决嵌套回调的问题,有一个专有名词叫Callback hell


NetworkService.getToken("username", "password")
 .flatMap(s -> NetworkService.getMessage(s))
 .subscribe(s -> {
   System.out.println("message: " + s);
 });

4.界面按钮需要防止连续点击的情况


RxView.clicks(findViewById(R.id.btn_throttle))
 .throttleFirst(1, TimeUnit.SECONDS)
 .subscribe(aVoid -> {
   System.out.println("click");
 });

5.响应式的界面

比如勾选了某个checkbox,自动更新对应的preference


SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this);
RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);

Preference<Boolean> checked = rxPreferences.getBoolean("checked", true);

CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test);
RxCompoundButton.checkedChanges(checkBox)
   .subscribe(checked.asAction());

6.复杂的数据变换


Observable.just("1", "2", "2", "3", "4", "5")
 .map(Integer::parseInt)
 .filter(s -> s > 1)
 .distinct()
 .take(3)
 .reduce((integer, integer2) -> integer.intValue() + integer2.intValue())
 .subscribe(System.out::println);//9
标签:Java,RxJava
0
投稿

猜你喜欢

  • C#简单的向量用法实例教程

    2022-09-27 09:57:29
  • 详解Spring Security中的HttpBasic登录验证模式

    2023-12-03 03:58:14
  • java弹幕小游戏1.0版本

    2021-12-06 04:42:48
  • Android编程之canvas绘制各种图形(点,直线,弧,圆,椭圆,文字,矩形,多边形,曲线,圆角矩形)

    2023-10-16 12:16:14
  • java8 利用reduce实现将列表中的多个元素的属性求和并返回操作

    2021-09-29 06:53:38
  • Java客户端调用.NET的WebService实例

    2023-11-03 17:22:00
  • 秒懂Java枚举类型(enum)

    2023-03-30 07:39:41
  • c#中Invoke与BeginInvoke的用法及说明

    2023-06-10 12:39:49
  • 必须要学会的JMM与volatile

    2021-07-30 14:07:53
  • java 回调机制的实例详解

    2023-12-04 10:03:04
  • SpringBoot实现埋点监控

    2022-11-27 06:32:07
  • Android应用程序转到后台并回到前台判断方法

    2022-11-12 19:49:35
  • RandomAccessFile简介_动力节点Java学院整理

    2021-12-18 09:02:15
  • Kotlin协程之Flow基础原理示例解析

    2021-10-17 21:07:44
  • IDEA提高开发效率的7个插件(推荐)

    2021-10-16 15:28:35
  • Flutter实现图文并茂的列表

    2023-11-10 21:21:45
  • Android Service中使用Toast无法正常显示问题的解决方法

    2022-01-23 08:38:12
  • 详解Java Spring AOP

    2023-09-06 15:40:02
  • Android开发签名知识梳理总结

    2023-03-15 03:52:02
  • Android开发实现的IntentUtil跳转多功能工具类【包含视频、音频、图片、摄像头等操作功能】

    2023-05-27 23:58:01
  • asp之家 软件编程 m.aspxhome.com