Java OkHttp框架源码深入解析

作者:niuyongzhi 时间:2023-11-29 03:22:50 

1.OkHttp发起网络请求

可以通过OkHttpClient发起一个网络请求

//创建一个Client,相当于打开一个浏览器
OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
//创建一个请求。
       Request request = new Request.Builder()
               .url("http://www.baidu.com")
               .method("GET",null)
               .build();
   //调用Client 创建一个Call。
       Call call = okHttpClient.newCall(request);
       //Call传入一个回调函数,并加入到请求队列。
       call.enqueue(new Callback() {
           @Override
           public void onFailure(Call call, IOException e) {
           }
           @Override
           public void onResponse(Call call, Response response) throws IOException {
           }
       });
}

通过Retrofit发起一个OkHttp请求

Retrofit retrofit = new Retrofit.Builder()
               .baseUrl("http://www.baidu.com/")
               .build();
       NetInterface netInterface = retrofit.create(NetInterface.class);
       Call<Person> call = netInterface.getPerson();
       call.enqueue(new Callback<Person>() {
           @Override
           public void onResponse(Call<Person> call, Response<Person> response) {
           }
           @Override
           public void onFailure(Call<Person> call, Throwable t) {
           }
});

以上两种方式都是通过call.enqueue() 把网络请求加入到请求队列的。

这个call是RealCall的一个对象。

public void enqueue(Callback responseCallback) {
   client.dispatcher().enqueue(new AsyncCall(responseCallback));
 }

这里有两个判断条件

runningAsyncCalls.size() < maxRequests如果运行队列数量大于最大数量,

runningCallsForHost(call) < maxRequestsPerHost并且访问同一台服务器的请求数量大于最大数量,请求会放入等待队列,否则加入运行队列,直接执行。

//等待队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//运行队列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//运行队列数量最大值
private int maxRequests = 64;
//访问不同主机的最大数量
private int maxRequestsPerHost = 5;
dispatcher.java
synchronized void enqueue(AsyncCall call) {
   if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
     runningAsyncCalls.add(call);
     executorService().execute(call);
   } else {
     readyAsyncCalls.add(call);
   }
 }

接下来看这行代码executorService().execute(call);

executorService()拿到一个线程池实例,

public synchronized ExecutorService executorService() {
   if (executorService == null) {
     executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
         new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
   }
   return executorService;

execute(call)执行任务,发起网络请求。

//AsyncCall.java
@Override protected void execute() {
      try {
      //这个方法去请求网络,会返回Respose
       Response response = getResponseWithInterceptorChain();
       //请求成功,回调接口
        responseCallback.onResponse(RealCall.this, response);
      }catch(Exceptrion e){
           //失败回调
        responseCallback.onFailure(RealCall.this, e);
      }finally {
         //从当前运行队列中删除这个请求
         client.dispatcher().finished(this);
     }
}

getResponseWithInterceptorChain()

这行代码,使用了设计模式中的责任链模式。

//这个方法命名:通过 * 链,获取Response
 Response getResponseWithInterceptorChain() throws IOException {
   // Build a full stack of interceptors.
   List<Interceptor> interceptors = new ArrayList<>();
    // 这个我们自己定义的 * 。
   interceptors.addAll(client.interceptors());
   //重试和重定向 *
   interceptors.add(retryAndFollowUpInterceptor);
   //请求头 *
   interceptors.add(new BridgeInterceptor(client.cookieJar()));
   //缓存 *
   interceptors.add(new CacheInterceptor(client.internalCache()));
   //连接 *
   interceptors.add(new ConnectInterceptor(client));
   if (!forWebSocket) {
     interceptors.addAll(client.networkInterceptors());
   }
   //访问 *
   interceptors.add(new CallServerInterceptor(forWebSocket));
   // * 责任链
   Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
       originalRequest, this, eventListener, client.connectTimeoutMillis(),
       client.readTimeoutMillis(), client.writeTimeoutMillis());
   //执行 * 集合中的 *
   return chain.proceed(originalRequest);
 }

责任链模式中,链条的上游持有下游对象的引用。这样能够保证在链条上的每一个对象,都能对其符合条件的任务进行处理。

但是在上面的 * 构成责任链中,是把 * ,放在了一个集合中。

第一个参数interceptors 是一个 * 的集合。

第五个参数0是集合的index,RealInterceptorChain就是根据这个索引值+1,

对chain.proceed方法循环调用,进行集合遍历,并执行 * 中定义的方法的。

这个责任链模式,并没有明确的指定下游对象是什么,而是通过集合index值的变化,动态的指定的。

Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0......)
  chain.proceed(originalRequest);
  public Response proceed(Request request,...){
   //构建一个index+1的 * 链
   RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
               connection, index + 1,....);
       //拿到当前的 *
       Interceptor interceptor = interceptors.get(index);
       //调用 * intercept(next)方法,
       //在这个方法中继续调用realChain.proceed(),从而进行循环调用,index索引值再加1.
       Response response = interceptor.intercept(next);
}

2.OkHttp的连接器

1)RetryAndFollowUpInterceptor:重试和重定向 *

public Response intercept(Chain chain){
     while (true) {
       Response response;
         try {
         //创建StreamAllocation对象,这个对象会在连接 * 中用到
           StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
                 createAddress(request.url()), call, eventListener, callStackTrace);
             this.streamAllocation = streamAllocation;
            调用责任链下游 *
            response = realChain.proceed(request, streamAllocation, null, null);
           } catch (RouteException e) {
                // The attempt to connect via a route failed. The request will not have been sent.
                路由异常,请求还没发出去。
                这样这个recover(),如果返回的是false,则抛出异常,不再重试
                如果返回的是true,则执行下面的continue,进行下一次while循环,进行重试,重新发起网络请求。
                if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
                  throw e.getFirstConnectException();
                }
                releaseConnection = false;
               continue;
            } catch (IOException e) {
                // An attempt to communicate with a server failed. The request may have been sent.
                请求已经发出去了,但是和服务器连接失败了。
                这个recover()返回值的处理逻辑和上面异常一样。
                boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
                if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
                releaseConnection = false;
                continue;
              }
            } finally {//finally是必定会执行到的,不管上面的catch中执行的是continue还是thow
               // We're throwing an unchecked exception. Release any resources.
               if (releaseConnection) {
                 streamAllocation.streamFailed(null);
                 streamAllocation.release();
               }
             }
            在这个重试 * 中,okhttp的做法很巧妙。先是在外面有一个while循环,如果发生异常,
            会在recover方法中对异常类型进行判断,如果不符合属于重试,则返回false,并thow e,结束while循环。
            如果符合重试的条件,则返回true,在上面的catch代码块中执行continue方法,进入下一个while循环。
           //如果请求正常,并且返回了response,则会进行重定向的逻辑判断
           followUpRequest在这个方法中会根据ResponseCode,状态码进行重定向的判断,
           Request followUp;
                try {
                  followUp = followUpRequest(response, streamAllocation.route());
                } catch (IOException e) {
                  streamAllocation.release();
                  throw e;
                }
                如果flolowUp 为null,则不需要重定向,直接返回response
                if (followUp == null) {
                  if (!forWebSocket) {
                    streamAllocation.release();
                  }
                  return response;
                }
                 如果flolowUp 不为null,则进行重定向了请求
              如果重定向次数超过MAX_FOLLOW_UPS=20次,则抛出异常,结束while循环
             if (++followUpCount > MAX_FOLLOW_UPS) {
                    streamAllocation.release();
                    throw new ProtocolException("Too many follow-up requests: " + followUpCount);
                  }
                  if (followUp.body() instanceof UnrepeatableRequestBody) {
                    streamAllocation.release();
                    throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
                  }
                  if (!sameConnection(response, followUp.url())) {
                    streamAllocation.release();
                    //从重定向请求中拿到url,封装一个新的streamAllocation对象,
                    streamAllocation = new StreamAllocation(client.connectionPool(),
                        createAddress(followUp.url()), call, eventListener, callStackTrace);
                    this.streamAllocation = streamAllocation;
                  } else if (streamAllocation.codec() != null) {
                    throw new IllegalStateException("Closing the body of " + response
                        + " didn't close its backing stream. Bad interceptor?");
                  }
                  //将重定向请求赋值给request 进入下一个重定向的请求的while循环,继续走上面的while循环代码
                  request = followUp;
                  priorResponse = response;
                }
}
  //只有这个方法返回值为false都不进行重试。
  private boolean recover(IOException e, StreamAllocation streamAllocation,
      boolean requestSendStarted, Request userRequest) {
    streamAllocation.streamFailed(e);
    // The application layer has forbidden retries.
    应用层禁止重试。可以通过OkHttpClient进行配置(默认是允许的)
    if (!client.retryOnConnectionFailure()) return false;
    // We can't send the request body again.
    if (requestSendStarted && userRequest.body() instanceof UnrepeatableRequestBody) return false;
    // This exception is fatal. 致命的异常
    判断是否属于重试的异常
    if (!isRecoverable(e, requestSendStarted)) return false;
    // No more routes to attempt.
    没有更多可以连接的路由线路
    if (!streamAllocation.hasMoreRoutes()) return false;
    // For failure recovery, use the same route selector with a new connection.
    return true;
  }
 只有这个方法返回false,都不进行重试。
private boolean isRecoverable(IOException e, boolean requestSendStarted) {
  // If there was a protocol problem, don't recover.
  出现了协议异常,不再重试
  if (e instanceof ProtocolException) {
    return false;
  }
  // If there was an interruption don't recover, but if there was a timeout connecting to a route
  // we should try the next route (if there is one).
  requestSendStarted为false时,并且异常类型为Scoket超时异常,将会进行下一次重试
  if (e instanceof InterruptedIOException) {
    return e instanceof SocketTimeoutException && !requestSendStarted;
  }
  // Look for known client-side or negotiation errors that are unlikely to be fixed by trying
  // again with a different route.
  如果是一个握手异常,并且证书出现问题,则不能重试
  if (e instanceof SSLHandshakeException) {
    // If the problem was a CertificateException from the X509TrustManager,
    // do not retry.
    if (e.getCause() instanceof CertificateException) {
      return false;
    }
  }

2)BridgeInterceptor 桥 * :连接服务器的桥梁,主要是在请求头中设置一些参数配置

如:请求内容长度,编码,gzip压缩等。

public Response intercept(Chain chain) throws IOException {
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();
    RequestBody body = userRequest.body();
   if (body != null) {
     MediaType contentType = body.contentType();
     if (contentType != null) {
       requestBuilder.header("Content-Type", contentType.toString());
     }
     ..................
   }
   在请求头中添加gizp,是否压缩
 boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true;
      requestBuilder.header("Accept-Encoding", "gzip");
    }
   //cookies
    List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
    if (!cookies.isEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies));
    }
    调用责任链中下一个 * 的方法,网络请求得到的数据封装到networkResponse中
    Response networkResponse = chain.proceed(requestBuilder.build());
   对cookie进行处理
   HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
   如果设置了gzip,则会对networkResponse进行解压缩。
    if (transparentGzip
           && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
           && HttpHeaders.hasBody(networkResponse)) {
         GzipSource responseBody = new GzipSource(networkResponse.body().source());
         Headers strippedHeaders = networkResponse.headers().newBuilder()
             .removeAll("Content-Encoding")
             .removeAll("Content-Length")
             .build();
         responseBuilder.headers(strippedHeaders);
         String contentType = networkResponse.header("Content-Type");
         responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
       }
   return responseBuilder.build();
}

3)CacheInterceptor缓存 *

public Response intercept(Chain chain){
  //  this.cache = DiskLruCache.create(fileSystem, directory, 201105, 2, maxSize);
   这个缓存在底层使用的是DiskLruCache
   //以request为key从缓存中拿到response。
    Response cacheCandidate = cache != null
           ? cache.get(chain.request()): null;
    long now = System.currentTimeMillis();
    //缓存策略
    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;
  // If we're forbidden from using the network and the cache is insufficient, fail.
  //如果请求和响应都为null,直接返回504
  if (networkRequest == null && cacheResponse == null) {
    return new Response.Builder()
        .request(chain.request())
        .protocol(Protocol.HTTP_1_1)
        .code(504)
        .message("Unsatisfiable Request (only-if-cached)")
        .body(Util.EMPTY_RESPONSE)
        .sentRequestAtMillis(-1L)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();
  }
  // If we don't need the network, we're done.
  //如果请求为null,缓存不为null,则直接使用缓存。
      if (networkRequest == null) {
        return cacheResponse.newBuilder()
            .cacheResponse(stripBody(cacheResponse))
            .build();
      }
    Response networkResponse = null;
       try {
         //调用责任链下一个 *
         networkResponse = chain.proceed(networkRequest);
       } finally {
       }
     Response response = networkResponse.newBuilder()
           .cacheResponse(stripBody(cacheResponse))
           .networkResponse(stripBody(networkResponse))
           .build();
    // Offer this request to the cache.
    //将响应存入缓存。
     CacheRequest cacheRequest = cache.put(response);
}

4)ConnectInterceptor 连接 * 。当一个请求发出,需要建立连接,然后再通过流进行读写。

public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
   Request request = realChain.request();
   //在重定向 * 中创建,
   StreamAllocation streamAllocation = realChain.streamAllocation();
   boolean doExtensiveHealthChecks = !request.method().equals("GET");
   //从连接池中,找到一个可以复用的连接,
   HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
  // RealConnection 中封装了一个Socket和一个Socket连接池
   RealConnection connection = streamAllocation.connection();
   //调用下一个 *
   return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
//遍历连接池
RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
   assert (Thread.holdsLock(this));
   for (RealConnection connection : connections) {
     if (connection.isEligible(address, route)) {
       streamAllocation.acquire(connection, true);
       return connection;
     }
   }
   return null;
 }
 public boolean isEligible(Address address, @Nullable Route route) {
   // If this connection is not accepting new streams, we're done.
   if (allocations.size() >= allocationLimit || noNewStreams) return false;
   // If the non-host fields of the address don't overlap, we're done.
   if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;
   // If the host exactly matches, we're done: this connection can carry the address.
   从连接池中找到一个连接参数一致且并未占用的连接
   if (address.url().host().equals(this.route().address().url().host())) {
     return true; // This connection is a perfect match.
 }

5)CallServerInterceptor 请求服务器 *

/** This is the last interceptor in the chain. It makes a network call to the server. */
这是责任链中最后一个 * ,这个会去请求服务器。
public Response intercept(Chain chain) throws IOException {
     RealInterceptorChain realChain = (RealInterceptorChain) chain;
     HttpCodec httpCodec = realChain.httpStream();
     StreamAllocation streamAllocation = realChain.streamAllocation();
     RealConnection connection = (RealConnection) realChain.connection();
     Request request = realChain.request();
     //将请求头写入缓存
     httpCodec.writeRequestHeaders(request);
     return response;

来源:https://blog.csdn.net/niuyongzhi/article/details/126464240

标签:Java,OkHttp,框架,源码
0
投稿

猜你喜欢

  • 详解使用JRebel插件实现SpringBoot应用代码热加载

    2021-06-15 10:39:00
  • Java自定义实现链队列详解

    2023-06-22 12:47:31
  • Java实现FTP上传与下载功能

    2021-09-22 18:28:51
  • AsyncTask官方文档教程整理

    2023-07-31 20:25:08
  • 代码分析c++中string类

    2023-04-24 17:15:29
  • 解决logback的日志文件路径问题

    2023-06-13 10:48:09
  • java文件的重命名与移动操作实例代码

    2022-06-16 04:18:12
  • c#读取XML多级子节点

    2023-10-10 21:03:44
  • Android下拉刷新控件PullToRefresh实例解析

    2022-01-27 08:01:14
  • Android权限管理之Permission权限机制及使用详解

    2023-04-07 09:12:45
  • 初识Spring Boot框架和快速入门

    2022-10-17 00:58:52
  • Spring Boot 整合 Apache Dubbo的示例代码

    2021-10-09 03:52:07
  • Java动态代理静态代理实例分析

    2023-11-14 18:06:07
  • 详解在Spring Boot框架下使用WebSocket实现消息推送

    2023-03-08 01:32:05
  • java mybatis如何操作postgresql array数组类型

    2023-04-25 22:59:37
  • WPF利用WindowChrome实现自定义窗口

    2021-06-08 07:33:55
  • Java数据结构及算法实例:三角数字

    2023-08-24 17:52:25
  • Android中fragment与activity之间的交互(两种实现方式)

    2021-06-23 13:11:31
  • SpringBoot集成yitter-idgenerator(雪花漂移)分布式Id自增的实现

    2022-03-27 10:47:08
  • 解决maven build 无反应,直接terminated的问题

    2021-11-28 18:56:13
  • asp之家 软件编程 m.aspxhome.com