compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
// Because RxAndroid releases are few and far between, it is recommended you also
// explicitly depend on RxJava's latest version for bug fixes and new features.
 compile 'io.reactivex.rxjava2:rxjava:2.1.5'


compile 'com.alibaba:fastjson:1.2.39'

import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.view.View;
import android.widget.TextView;

import com.alibaba.fastjson.JSONObject;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;

public class MainActivity extends AppCompatActivity {

private TextView name;

 protected void onCreate(Bundle savedInstanceState) {

name = (TextView) findViewById(R.id.name);
   name.setOnClickListener(new View.OnClickListener() {
     public void onClick(View v) {


 public void observer() {
   Observer<string> observer = new Observer<string>() {
     public void onSubscribe(@NonNull Disposable d) {

     public void onNext(@NonNull String s) {
       System.out.println("onNext :" + s);
     public void onError(@NonNull Throwable e) {

     public void onComplete() {

   Observable<string> observable = new Observable<string>() {
     protected void subscribeActual(Observer<!--? super String--> observer) {

 private void flowable(){
   Flowable.create(new FlowableOnSubscribe<string>() {
     public void subscribe(@NonNull FlowableEmitter<string> e) throws Exception {
       for (int i = 0; i < 100; i++) {
     //背压的策略,buffer缓冲区        观察者
     // BUFFER、
     // DROP、打印前128个,后面的删除
     // ERROR、
     // LATEST、打印前128个和最后一个,其余删除
     // MISSING
     //这里的策略若不是BUFFER 那么,会出现著名的:MissingBackpressureException错误
   }, BackpressureStrategy.BUFFER).subscribe(new Consumer<string>() {
     public void accept(String s) throws Exception {
       System.out.println("subscribe accept"+s);

//例3:线程调度器 Scheduler
 public void flowable1(){
   Flowable.create(new FlowableOnSubscribe<string>() {
     public void subscribe(@NonNull FlowableEmitter<string> e) throws Exception {
       for (int i = 0; i < 100; i++) {
         System.out.println("subscribe Thread.currentThread.getName = " + Thread.currentThread().getName());
       .subscribe(new Consumer<string>() {
         public void accept(String s) throws Exception {
           System.out.println("s"+ s);
           System.out.println("subscribe Thread.currentThread.getName = " + Thread.currentThread().getName());

 public void map1(){
   Observable.create(new ObservableOnSubscribe<string>() {
     public void subscribe(@NonNull final ObservableEmitter<string> e) throws Exception {
       OkHttpClient client = new OkHttpClient();
       Request request = new Request.Builder()
       client.newCall(request).enqueue(new Callback() {
         public void onFailure(Call call, IOException e) {


         public void onResponse(Call call, Response response) throws IOException {
           String result = response.body().string();
       //map转换器 flatmap(无序),concatmap(有序)
       .map(new Function<string, bean="">() {
     public Bean apply(@NonNull String s) throws Exception {
       return JSONObject.parseObject(s,Bean.class);
   }).subscribe(new Consumer<bean>() {
     public void accept(Bean bean) throws Exception {
       System.out.println("bean = "+ bean.toString() );

 //例 定时发送消息
 public void interval(){
   Observable.interval(2,1, TimeUnit.SECONDS)
       .subscribe(new Consumer<long>() {
         public void accept(Long aLong) throws Exception {
           System.out.println("aLong = " + aLong);

//例 zip字符串合并
 public void zip(){
   Observable observable1 = Observable.create(new ObservableOnSubscribe<string>() {
     public void subscribe(@NonNull ObservableEmitter<string> e) throws Exception {

   Observable observable2 = Observable.create(new ObservableOnSubscribe<string>() {
     public void subscribe(@NonNull ObservableEmitter<string> e) throws Exception {

Observable.zip(observable1, observable2, new BiFunction<string,string,string>() {
     public String apply(@NonNull String o, @NonNull String o2) throws Exception {
       return o + o2;
   }).subscribe(new Consumer<string>() {
     public void accept(String o) throws Exception {
       System.out.println("o"+ o);




