简单谈谈RxJava和多线程并发

作者:zjutkz 时间:2023-08-02 00:27:52 

前言

相信对于RxJava,大家应该都很熟悉,他最核心的两个字就是异步,诚然,它对异步的处理非常的出色,但是异步绝对不等于并发,更不等于线程安全,如果把这几个概念搞混了,错误的使用RxJava,是会来带非常多的问题的。

RxJava与并发

首先让我们来看一段RxJava协议的原文:


Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.

如上所述,RxJava对多线程并发其实并没有做非常的多保护,这段话中说,如果多个Observables从多个线程中发射数据,必须要满足happens-before原则。

下面来看一个简单的例子:


final PublishSubject<Integer> subject = PublishSubject.create();

subject.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Integer integer) {
 unSafeCount = unSafeCount + integer;
 Log.d("TAG", "onNext: " + unSafeCount);
}
});

findViewById(R.id.send).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
 final int unit = 1;
 for(int i = 0;i < 10;i++) {
  new Thread(new Runnable() {
   @Override
   public void run() {
    for (int j = 0; j < 1000; j++) {
     subject.onNext(unit);
    }
   }
  }).start();
 }
}
});

这是一个最典型的多线程问题,从10个线程中发射数据并相加,这样最终得到的答案是小于10000的。虽然使用了RxJava,但是这样的使用对于并发是没有意义的,因为RxJava并没有去处理并发带来的问题。我们可以看下subject的onNext方法的源码,里面很简单,就是调用了对应observer的onNext方法而已。不止是这样,绝大多数的Subject都是线程不安全的,所以当你在使用这样的类的时候(典型场景就是自制的RxBus),如果从多个线程中发射数据,那你就要小心了。

对于这样的问题,有两种解决方案:

第一种就是简单的使用传统的解决方法,比如用AtomicInteger代替int。

第二种则是使用RxJava的解决方案,在这里就是用SerializedSubject去代替Subject:


final PublishSubject<Integer> subject = PublishSubject.create();

subject.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Integer integer) {
 unSafeCount = unSafeCount + integer;
 count.addAndGet(integer);

Log.d("TAG", "onNext: " + count);
}
});

final SerializedSubject<Integer, Integer> ser = new SerializedSubject<Integer, Integer>(subject);

findViewById(R.id.send).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
 final int unit = 1;

for(int i = 0;i < 10;i++){
  new Thread(new Runnable() {
   @Override
   public void run() {
    for(int j = 0;j < 1000;j++){
     ser.onNext(unit);
    }
   }
  }).start();
 }
}
});

可以看一下SerializedSubject的onNext方法做了什么:


@Override
public void onNext(T t) {
if (terminated) {
 return;
}
synchronized (this) {
 if (terminated) {
  return;
 }
 if (emitting) {
  FastList list = queue;
  if (list == null) {
   list = new FastList();
   queue = list;
  }
  list.add(nl.next(t));
  return;
 }
 emitting = true;
}
try {
 actual.onNext(t);
} catch (Throwable e) {
 terminated = true;
 Exceptions.throwOrReport(e, actual, t);
 return;
}
for (;;) {
 for (int i = 0; i < MAX_DRAIN_ITERATION; i++) {
  FastList list;
  synchronized (this) {
   list = queue;
   if (list == null) {
    emitting = false;
    return;
   }
   queue = null;
  }
  for (Object o : list.array) {
   if (o == null) {
    break;
   }
   try {
    if (nl.accept(actual, o)) {
     terminated = true;
     return;
    }
   } catch (Throwable e) {
    terminated = true;
    Exceptions.throwIfFatal(e);
    actual.onError(OnErrorThrowable.addValueAsLastCause(e, t));
    return;
   }
  }
 }
}
}

处理方式很简单,如果有其他线程在发射数据,那就将数据放置到队列中,等待下次发射。这保证了同一时间只会有一个线程调用onNext,onComplete和onError这些方法。

但是这样操作显然是会造成性能的影响的,所以RxJava并不会把所有的操作都打上线程安全的标签。

在这里就要引申出一个问题,那就是使用者对create方法的滥用,其实这个方法不应该被使用者频繁的调用的,因为你必须要小心的处理所有的数据发射,接收的逻辑。相反的,使用已有的操作符能很好的解决这个问题,所以下次大家在遇到问题的时候不要简单的使用create去自己写,而是应该想想有没有现成的操作符可以完成相应的需求。

RxJava中的一些操作符

RxJava中有一些操作符也和多线程并发有关,下面让我来讲一讲merge和concat,以及他们的一些变种操作符。

对于多线程发射数据,有时候我们需要得到的结果也保持和发射时候一样的顺序,这个时候如果我们使用merge这个操作符去结合多个发射源,那么就会产生一定的问题了(例子中做了非常不好的示范——使用了create操作符,请大家不要学习这样的写法,这里单纯是为了求证结果)。


Observable o1 = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(final Subscriber<? super Integer> subscriber) {
 new Thread(new Runnable() {
  @Override
  public void run() {
   try {
    Thread.sleep(1000);
    subscriber.onNext(1);
    subscriber.onCompleted();
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }
 }).start();
}
});
Observable o2 = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
 subscriber.onNext(2);
 subscriber.onCompleted();
}
});

Observable.merge(o1,o2)
 .subscribe(new Subscriber<Integer>() {
  @Override
  public void onCompleted() {

}

@Override
  public void onError(Throwable e) {

}

@Override
  public void onNext(Integer i) {
   Log.d("TAG", "onNext: " + i);
  }
 });

对于这样的场景,我们得到的答案将是2,1而不是先得到o1发射的数据,再获取o2的数据。

究其原因,就是因为merge其实就是给什么传什么,也不会去管数据发射的顺序:


@Override
public void onNext(Observable<? extends T> t) {
 if (t == null) {
   return;
 }
 if (t == Observable.empty()) {
   emitEmpty();
 } else
 if (t instanceof ScalarSynchronousObservable) {
   tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
 } else {
   InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
   addInner(inner);
   t.unsafeSubscribe(inner);
   emit();
 }
}

可以看到在经过lift操作之后,对应的中间人MergeSubscriber的onNext,没有什么多余的代码,所以在多个Observable从多线程中发射数据的时候,顺序当然不能得到保证。

一个单词说明这个问题:interleaving——交错。merge后的数据源可能是交错的。由于merge有这样数据交错的问题,所以它的变种—flatMap也会有同样的问题。

对于这样的场景,我们可以使用concat操作符来完成:


Concat waits to subscribe to each additional Observable that you pass to it until the previous Observable completes.

根据文档,我们知道concat操作符是一个接一个的处理数据源的数据的。


if (wip.getAndIncrement() != 0) {
 return;
}

final int delayErrorMode = this.delayErrorMode;

for (;;) {
 if (actual.isUnsubscribed()) {
   return;
 }

if (!active) {
   if (delayErrorMode == BOUNDARY) {
     if (error.get() != null) {
       Throwable ex = ExceptionsUtils.terminate(error);
       if (!ExceptionsUtils.isTerminated(ex)) {
         actual.onError(ex);
       }
       return;
     }
   }

boolean mainDone = done;
   Object v = queue.poll();
   boolean empty = v == null;

if (mainDone && empty) {
     Throwable ex = ExceptionsUtils.terminate(error);
     if (ex == null) {
       actual.onCompleted();
     } else
     if (!ExceptionsUtils.isTerminated(ex)) {
       actual.onError(ex);
     }
     return;
   }

if (!empty) {

Observable<? extends R> source;

try {
       source = mapper.call(NotificationLite.<T>instance().getValue(v));
     } catch (Throwable mapperError) {
       Exceptions.throwIfFatal(mapperError);
       drainError(mapperError);
       return;
     }

if (source == null) {
       drainError(new NullPointerException("The source returned by the mapper was null"));
       return;
     }

if (source != Observable.empty()) {

if (source instanceof ScalarSynchronousObservable) {
         ScalarSynchronousObservable<? extends R> scalarSource = (ScalarSynchronousObservable<? extends R>) source;

active = true;

arbiter.setProducer(new ConcatMapInnerScalarProducer<T, R>(scalarSource.get(), this));
       } else {
         ConcatMapInnerSubscriber<T, R> innerSubscriber = new ConcatMapInnerSubscriber<T, R>(this);
         inner.set(innerSubscriber);

if (!innerSubscriber.isUnsubscribed()) {
           active = true;

source.unsafeSubscribe(innerSubscriber);
         } else {
           return;
         }
       }
       request(1);
     } else {
       request(1);
       continue;
     }
   }
 }
 if (wip.decrementAndGet() == 0) {
   break;
 }
}

通过源码我们可以知道,active字段就保证了如果上一个数据源还没有发射完数据,就会一直在for循环中等待,直到上一个数据源发射完了数据重置了active字段。

对于concat,其实还存在一个问题,那就是多个Observable变成了串行,会大大的增加整个RxJava事件流的处理时间,对于这个场景,我们可以使用concatEager来解决。concatEager的源码就不带大家分析了,有兴趣的同学可以自行查看。

总结

这篇文章比较短,讲的东西也比较浅显,其实就是讨论了一下RxJava中多线程并发的几个问题。最后我想说,RxJava并不是什么高大上的东西,在你的项目引入之前,要考虑一下是否真的有必要这么做。就算真的有场景需要RxJava,也请不要一口气把项目中所有的操作都换成RxJava,一些简单的操作不一定需要使用RxJava的操作符的实现,用了反而降低了代码的可读性,切勿为了使用Rx而使用Rx。

好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流。

来源:http://zjutkz.net/2017/02/09/浅谈RxJava与多线程并发/

标签:rxjava,多线程并发
0
投稿

猜你喜欢

  • c#中XML解析文件出错解决方法

    2022-01-21 00:38:50
  • mybatis-plus 查询传入参数Map,返回List<Map>方式

    2022-01-05 13:39:24
  • 一文带你了解Java中的函数式编程

    2022-08-02 07:32:29
  • java获取json中的全部键值对实例

    2023-08-23 19:53:03
  • C#实现中英文混合字符串截取的方法

    2022-07-11 10:02:35
  • c# 日历控件的实现

    2022-08-21 11:02:22
  • android 微信 sdk api调用不成功解决方案

    2023-08-27 15:31:45
  • SpringBoot整合Elasticsearch游标查询的示例代码(scroll)

    2022-02-11 02:02:13
  • android当前apn的状态以及获取方法

    2022-06-20 14:52:25
  • Tablayout简单使用方法总结

    2022-01-08 16:27:37
  • 如何利用泛型封装通用的service层

    2023-05-15 04:55:43
  • Mybatis注解增删改查的实例代码

    2022-03-31 01:26:15
  • C#开发Winform实现文件操作案例

    2022-04-28 15:30:53
  • Java通过FTP服务器上传下载文件的方法

    2021-08-15 07:26:39
  • SpringCloud Zuul自定义filter代码实例

    2023-12-03 02:49:50
  • Android studio编写简单的手电筒APP

    2023-11-29 18:43:01
  • SpringMVC框架实现图片上传与下载

    2022-01-12 23:50:52
  • SpringCloud如何使用Eureka实现服务之间的传递数据

    2022-02-17 18:47:37
  • Android布局技巧之使用ViewStub

    2023-03-22 06:34:10
  • Java压缩/解压文件的实现代码

    2023-08-26 04:02:56
  • asp之家 软件编程 m.aspxhome.com