Rxjava功能操作符的使用方法详解

作者:Genten程泽翔 时间:2021-06-10 06:57:16 

Rxjava功能个人感觉很好用,里面的一些操作符很方便,Rxjava有:被观察者,观察者,订阅者,

被观察者通过订阅者订阅观察者,从而实现观察者监听被观察者返回的数据

下面把Rxjava常用的模型代码列出来,还有一些操作符的运用:

依赖:


compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
// Because RxAndroid releases are few and far between, it is recommended you also
// explicitly depend on RxJava's latest version for bug fixes and new features.
 compile 'io.reactivex.rxjava2:rxjava:2.1.5'

这个是另一种解析数据的方法,阿里巴巴旗下的,听说是解析最快的解析器。。。。


compile 'com.alibaba:fastjson:1.2.39'

import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.view.View;
import android.widget.TextView;

import com.alibaba.fastjson.JSONObject;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;

public class MainActivity extends AppCompatActivity {

private TextView name;

@Override
 protected void onCreate(Bundle savedInstanceState) {
   super.onCreate(savedInstanceState);
   setContentView(R.layout.activity_main);

name = (TextView) findViewById(R.id.name);
   //用来调用下面的方法,监听。
   name.setOnClickListener(new View.OnClickListener() {
     @Override
     public void onClick(View v) {

interval();
     }
   });
 }

//例1:Observer
 public void observer() {
   //观察者
   Observer<string> observer = new Observer<string>() {
     @Override
     public void onSubscribe(@NonNull Disposable d) {

}
     @Override
     public void onNext(@NonNull String s) {
       //接收从被观察者中返回的数据
       System.out.println("onNext :" + s);
     }
     @Override
     public void onError(@NonNull Throwable e) {

}
     @Override
     public void onComplete() {

}
   };
   //被观察者
   Observable<string> observable = new Observable<string>() {
     @Override
     protected void subscribeActual(Observer<!--? super String--> observer) {
       observer.onNext("11111");
       observer.onNext("22222");
       observer.onComplete();
     }
   };
   //产生了订阅
   observable.subscribe(observer);
 }

//例2:Flowable
 private void flowable(){
   //被观察者
   Flowable.create(new FlowableOnSubscribe<string>() {
     @Override
     public void subscribe(@NonNull FlowableEmitter<string> e) throws Exception {
       for (int i = 0; i < 100; i++) {
         e.onNext(i+"");
       }
     }
     //背压的策略,buffer缓冲区        观察者
     //背压一共给了五种策略
     // BUFFER、
     // DROP、打印前128个,后面的删除
     // ERROR、
     // LATEST、打印前128个和最后一个,其余删除
     // MISSING
     //这里的策略若不是BUFFER 那么,会出现著名的:MissingBackpressureException错误
   }, BackpressureStrategy.BUFFER).subscribe(new Consumer<string>() {
     @Override
     public void accept(String s) throws Exception {
       System.out.println("subscribe accept"+s);
       Thread.sleep(1000);
     }
   });
 }

//例3:线程调度器 Scheduler
 public void flowable1(){
   Flowable.create(new FlowableOnSubscribe<string>() {
     @Override
     public void subscribe(@NonNull FlowableEmitter<string> e) throws Exception {
       for (int i = 0; i < 100; i++) {
         //输出在哪个线程
         System.out.println("subscribe Thread.currentThread.getName = " + Thread.currentThread().getName());
         e.onNext(i+"");
       }
     }
   },BackpressureStrategy.BUFFER)
       //被观察者一般放在子线程
       .subscribeOn(Schedulers.io())
       //观察者一般放在主线程
       .observeOn(AndroidSchedulers.mainThread())
       .subscribe(new Consumer<string>() {
         @Override
         public void accept(String s) throws Exception {
           System.out.println("s"+ s);
           Thread.sleep(100);
           //输出在哪个线程
           System.out.println("subscribe Thread.currentThread.getName = " + Thread.currentThread().getName());
         }
       });
 }

//例4:http请求网络,map转化器,fastjson解析器
 public void map1(){
   Observable.create(new ObservableOnSubscribe<string>() {
     @Override
     public void subscribe(@NonNull final ObservableEmitter<string> e) throws Exception {
       OkHttpClient client = new OkHttpClient();
       Request request = new Request.Builder()
           .url("https://qhb.2dyt.com/Bwei/login")
           .build();
       client.newCall(request).enqueue(new Callback() {
         @Override
         public void onFailure(Call call, IOException e) {

}

@Override
         public void onResponse(Call call, Response response) throws IOException {
           String result = response.body().string();
           e.onNext(result);
         }
       });
     }
   })
       //map转换器 flatmap(无序),concatmap(有序)
       .map(new Function<string, bean="">() {
     @Override
     public Bean apply(@NonNull String s) throws Exception {
       //用fastjson来解析数据
       return JSONObject.parseObject(s,Bean.class);
     }
   }).subscribe(new Consumer<bean>() {
     @Override
     public void accept(Bean bean) throws Exception {
       System.out.println("bean = "+ bean.toString() );
     }
   });
 }

//常见rxjava操作符
 //例 定时发送消息
 public void interval(){
   Observable.interval(2,1, TimeUnit.SECONDS)
       .take(10)
       .subscribe(new Consumer<long>() {
         @Override
         public void accept(Long aLong) throws Exception {
           System.out.println("aLong = " + aLong);
         }
       });
 }

//例 zip字符串合并
 public void zip(){
   Observable observable1 = Observable.create(new ObservableOnSubscribe<string>() {
     @Override
     public void subscribe(@NonNull ObservableEmitter<string> e) throws Exception {
       e.onNext("1");
       e.onNext("2");
       e.onNext("3");
       e.onNext("4");
       e.onComplete();

}
   });
   Observable observable2 = Observable.create(new ObservableOnSubscribe<string>() {
     @Override
     public void subscribe(@NonNull ObservableEmitter<string> e) throws Exception {
       e.onNext("A");
       e.onNext("B");
       e.onNext("C");
       e.onNext("D");
       e.onComplete();
     }
   });

Observable.zip(observable1, observable2, new BiFunction<string,string,string>() {
     @Override
     public String apply(@NonNull String o, @NonNull String o2) throws Exception {
       return o + o2;
     }
   }).subscribe(new Consumer<string>() {
     @Override
     public void accept(String o) throws Exception {
       System.out.println("o"+ o);
     }
   });
 }

来源:https://www.2cto.com/kf/201710/691536.html

标签:rxjava,操作符
0
投稿

猜你喜欢

  • 详解C#中的session用法

    2022-10-29 22:03:13
  • Java使用entrySet方法获取Map集合中的元素

    2022-08-06 17:41:05
  • 如何将写好的.py/.java程序变成.exe文件详解

    2022-04-06 09:22:14
  • C# Winform选项卡集成窗体详解

    2021-08-12 17:13:55
  • Spring 事务事件监控及实现原理解析

    2023-12-24 04:20:20
  • idea生成类注释和方法注释的正确方法(推荐)

    2022-09-11 03:45:14
  • JDK1.8中的ConcurrentHashMap源码分析

    2023-11-27 06:02:32
  • 深入理解Java设计模式之代理模式

    2022-01-14 07:42:00
  • java实现微信公众号发送模版消息

    2022-04-23 08:09:11
  • 关于java String中intern的深入讲解

    2023-01-24 18:18:36
  • C#实现添加多行文本水印到Word文档

    2023-03-22 07:45:33
  • java使用TimerTask定时器获取指定网络数据

    2022-08-14 10:52:20
  • Android使用Canvas对象实现刮刮乐效果

    2021-11-27 02:53:36
  • java迷宫算法的理解(递归分割,递归回溯,深搜,广搜)

    2022-10-22 10:36:31
  • C#中lock用法详解

    2021-06-07 23:24:05
  • Spring Boot示例分析讲解自动化装配机制核心注解

    2022-07-26 15:56:14
  • 基于C#动手实现网络服务器Web Server

    2023-01-21 20:13:28
  • Java中GUI工具包AWT和Swing用法介绍

    2022-02-06 09:02:35
  • C#编程读取文档Doc、Docx及Pdf内容的方法

    2023-01-16 06:59:29
  • Java经典算法汇总之选择排序(SelectionSort)

    2021-12-23 03:59:52
  • asp之家 软件编程 m.aspxhome.com