okhttp3 源码分析

在上一篇 Retrofit 学习第三弹—源码分析篇 分析了 Retrofit 的源码,分析到请求 Call 位置,是调用的 okhttp3 中的 OkHttpClient 来完成请求的,所以 Retrofit 是基于 okhttp3 的一个封装,通过注解来设定参数构造出 Request,然后通过 OkHttpClient 创建 Call 实例。下面就来分析下 okhttp3 的流程。

1. okhttp 使用回顾

简单看下 okhttp Get 的请求过程:

public static String request(String url) throws IOException {
// 构建 OkHttpClient
OkHttpClient client = genericClient(headMaps);
//新建一个 Request 对象
Request request = new Request.Builder()
.url(url)
.build();
// 同步请求        
Response response = client.newCall(request).execute();
if (response.isSuccessful()) {
return response.body().string();
}else{
throw new IOException("Unexpected code " + response);
}
}

// 构建 OkHttpClient 实例
private static OkHttpClient genericClient(Map<String, String> headMaps) {
// HttpLoggingInterceptor
HttpLoggingInterceptor httpLoggingInterceptor = new HttpLoggingInterceptor(
new HttpLoggingInterceptor.Logger() {
@Override
public void log(String message) {
LogUtil.d("httpInfo", message);
}
}
);
httpLoggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
OkHttpClient httpClient = new OkHttpClient.Builder()
// 添加 header 信息
.addInterceptor(new Interceptor() {
@Override
public Response intercept(Chain chain) throws IOException {
Request.Builder newBuilder = chain.request().newBuilder();
if (headMaps != null && !headMaps.isEmpty()) {
newBuilder.headers(Headers.of(headMaps));
}
return chain.proceed(newBuilder.build());
}
})
// 添加 log 打印
.addNetworkInterceptor(httpLoggingInterceptor)
// 连接超时时间
.connectTimeout(60, TimeUnit.SECONDS)
// 读取超时时间
.readTimeout(60 * 30, TimeUnit.SECONDS)
// 写入超时时间
.writeTimeout(60 * 30, TimeUnit.SECONDS)
.build();

return httpClient;
}

以上就是 OkHttpClient 构建后,完成 get 同步请求的一个过程,包括通过 OkHttpClient.Builder 设置超时时间,设置拦截器等,后面会对自己设置的拦截器部分详细讲解。

2. 整体流程

okhttp_process

先来看下 okhttp 的整体流程,先对整体流程有个印象,然后再一步步分析具体的步骤:

  • (1) 直接构建 OkHttpClient;或者通过 Builder 构建,设置参数

  • (2) 构建 Request,url、header、body、请求方法等

  • (3) 构建 RealCall,通过 RealCall 执行请求,同步 execute() 或者异步 enqueue(Callback responseCallback)

  • (4) 只要通过 getResponseWithInterceptorChain() 方法,开启请求的责任链:自定义拦截器、RetryAndFollowUpInterceptor、BridgeInterceptor、CacheInterceptor、ConnectInterceptor、CallServerInterceptor

  • (5) 中间异常处理,请求成功 Response 处理等

3. OkHttpClient

创建 OkHttpClient 是通过 Builder 构建的,看下 Builder 中的参数:

final Dispatcher dispatcher;  //分发器
final Proxy proxy;  //代理
final List<Protocol> protocols; //协议 Http1 Http2
final List<ConnectionSpec> connectionSpecs; //传输层版本和连接协议
final List<Interceptor> interceptors; //拦截器
final List<Interceptor> networkInterceptors; //网络拦截器,一般设置 HttpLoggingInterceptor
final ProxySelector proxySelector; //代理选择
final CookieJar cookieJar; //cookie
final Cache cache; //缓存
final InternalCache internalCache;  //内部缓存
final SocketFactory socketFactory;  //socket 工厂
final SSLSocketFactory sslSocketFactory; //安全套接层socket 工厂,用于HTTPS
final CertificateChainCleaner certificateChainCleaner; // 验证确认响应证书 适用 HTTPS 请求连接的主机名。
final HostnameVerifier hostnameVerifier;    //  主机名字确认
final CertificatePinner certificatePinner;  //  证书链
final Authenticator proxyAuthenticator;     //代理身份验证
final Authenticator authenticator;      // 本地身份验证
final ConnectionPool connectionPool;    //连接池,复用连接
final Dns dns;  //域名
final boolean followSslRedirects;  //安全套接层重定向
final boolean followRedirects;  //本地重定向
final boolean retryOnConnectionFailure; //重试连接失败
final int connectTimeout;    //连接超时
final int readTimeout; //read 超时
final int writeTimeout; //write 超时 

很多参数我们在使用时都是默认的,即使我们不设置参数 Builder 中也会给出默认的参数,如超时时间

connectTimeout = 10_000;
readTimeout = 10_000;
writeTimeout = 10_000;

4. RealCall

有了 OkHttpClient 实例,然后需要一个 Request 参数,设置 url,请求方法等,通过 newCall(Request request) 方法来得到 RealCall,RealCall 是请求执行的对象,请求分为同步请求和异步请求

  • void enqueue(Callback responseCallback) 异步请求

  • Response execute() 同步请求

上面例子中 Response response = client.newCall(request).execute(); 开启了整个 GET 请求。

RealCall 构造方法:

private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
// 失败重试以及重定向 拦截器
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}

同步请求 execute()

public Response execute() throws IOException {
// (1) 一个 Call 只能请求一次,重复请求抛出异常
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
try {
// (2) 将请求添加到分发器的同步请求集合中
client.dispatcher().executed(this);
// (3) 开启请求责任链,请求真正执行的位置
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
// (4) 请求结束,将请求从分发器的同步结合中移除
client.dispatcher().finished(this);
}
}

(1) 一个 Call 对象只能执行一次,重复请求抛出异常,再次请求会重新创建 RealCall 对象;

(2) 将请求添加到 dispatcher 分发器的同步请求集合中,对于同步请求,分发器仅仅是记录一下同步请求的集合,并没有做更多的操作;

(3) okhttp 网络请求是通过一连串的拦截器来完成的,基于责任链的方式,也是最重要的过程;

(4) 请求结束,告知分发器,将请求从同步集合中移除。

对于同步请求 Dispatcher 并没有过多参与,仅仅是在请求时将请求添加同步请求集合中, 请求完毕,从集合中移除;而异步请求,分发器才真正发挥作用, 里面是通过线程池来执行异步请求。

请求真正执行的位置是第 (3) 步,开启请求的责任链

Response getResponseWithInterceptorChain() throws IOException {
//  interceptors 集合
List<Interceptor> interceptors = new ArrayList<>();
// (1) 使用者自定义拦截器
interceptors.addAll(client.interceptors());
// (2) 失败重试以及重定向 拦截器
interceptors.add(retryAndFollowUpInterceptor);
// (3) 桥接拦截器
interceptors.add(new BridgeInterceptor(client.cookieJar()));
// (4) 缓存拦截器
interceptors.add(new CacheInterceptor(client.internalCache()));
// (5) 连接拦截器
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
// (6) 网络拦截器,一般是设置打印 log 的拦截器,HttpLoggingInterceptor
interceptors.addAll(client.networkInterceptors());
}
// (7) 请求服务器拦截器(最后一步)
interceptors.add(new CallServerInterceptor(forWebSocket));
// (8) 创建责任链
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
// (9) 开启责任链
return chain.proceed(originalRequest);
}

getResponseWithInterceptorChain() 方法主要是创建各个拦截器,并添加到集合中,然后创建责任链 RealInterceptorChain,该责任链持有各个拦截器,开启责任链,执行每一个拦截器,最终完成网络请求。注意拦截器添加的顺序,也就是责任链的执行顺序。

5. 责任链模式

在分析各个拦截器执行的过程前,插入责任链模式的一个小例子,方便对责任链的理解。

责任链可以抽象成一个链条 IChain

// 责任链
public interface IChain {
void proceed();
}

链条有 N 个节点,每个节点都有一个节点处理器 INodeHandler

// 节点处理器
public interface INodeHandler {

void handle(IChain chain);
}

实际例子:假设有一个汽车生产线,生产组装汽车 Car,简化分为 3 个步骤,组装车身,安装轮胎,添加内饰。汽车生产线相当于责任链,组装的 3 个步骤相当于 3 个节点处理器。处理过程开始时,首先将各个节点处理器添加到责任链的节点处理器结合中,每个节点执行完毕,改变责任链中当前需要执行的索引,也就是完成一个步骤,告知责任链需要执行下一个步骤,最终所有节点都执行完毕,一个汽车也就生产完了。下面给出主要的代码。

汽车类 Car

public class Car {

private String body;
private String inner;
private String wheel;

public Car() {
}

...
}

汽车生产线责任链

public class CarChain implements IChain {

private Car car;
private List<INodeHandler> handlerList = new ArrayList<>();
private int index;

public CarChain(Car car) {
this.car = car;
}

public Car getCar() {
return car;
}

public void setCar(Car car) {
this.car = car;
}

public List<INodeHandler> getHandlerList() {
return handlerList;
}

public void setHandlerList(List<INodeHandler> handlerList) {
this.handlerList = handlerList;
}

public int getIndex() {
return index;
}

public void setIndex(int index) {
this.index = index;
}

// 责任链执行,每次执行索引对应的节点
@Override
public void proceed() {
if (handlerList == null || handlerList.size() < 1){
throw new RuntimeException("handlerList is empty");
}
if (index >= handlerList.size()) {
System.out.println("Car is finished");
System.out.println("A new car : " + car.toString());
return;
}
handlerList.get(index).handle(this);
}
}

责任链的 3 个节点处理器

// 车身节点处理器
public class CarBodyHandler implements INodeHandler{

@Override
public void handle(IChain chain) {
CarChain carChain = (CarChain) chain;
carChain.getCar().setBody("Body");
carChain.setIndex(carChain.getIndex() + 1);
carChain.proceed();
}
}

// 轮胎节点处理器
public class CarWheelHandler implements INodeHandler{

@Override
public void handle(IChain chain) {
CarChain carChain = (CarChain) chain;
carChain.getCar().setWheel("Wheel");
carChain.setIndex(carChain.getIndex() + 1);
carChain.proceed();
}
}

// 内饰节点处理器
public class CarInnerHandler implements INodeHandler{

@Override
public void handle(IChain chain) {
CarChain carChain = (CarChain) chain;
carChain.getCar().setInner("Inner");
carChain.setIndex(carChain.getIndex() + 1);
carChain.proceed();
}
}

测试责任链

public static void main(String[] args) {
Car car = new Car();
System.out.println("before -- " + car.toString());
CarChain carChain = new CarChain(car);
List<INodeHandler> nodeHandlers = new ArrayList<>();
nodeHandlers.add(new CarBodyHandler());
nodeHandlers.add(new CarWheelHandler());
nodeHandlers.add(new CarInnerHandler());
carChain.setHandlerList(nodeHandlers);
carChain.proceed();
}

6. RealInterceptorChain

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();

calls++;

// 假设已经有了流 stream,确保 connection 和 请求的端口是一致的,是可用的
if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}

// 假设已经有了流 stream,确保前一个拦截器调用了 责任链的  proceed()  方法
if (this.httpCodec != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}

// 调用下一个拦截器,将索引 +1 处理,可以看到和上面的例子的方式是一样的
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
// 调用拦截器进行处理
Response response = interceptor.intercept(next);

// 确保如果不是最后一个拦截器,仅需要调用 proceed() 方法
// 因为在这个方法中传入的参数是前面拦截器一步一步准备的,以便于最后一个拦截器请求服务器
if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}

// 请求结果不能为空
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}
// 请求结果的 body 不能为空
if (response.body() == null) {
throw new IllegalStateException(
"interceptor " + interceptor + " returned a response with no body");
}

return response;
}

RealInterceptorChain 是执行的责任链,传入的参数很多,有些参数不是开始就有的,需要每个拦截器在执行过程中准备的,所以需要除了最后一个拦截器 CallServerInterceptor 以外,其他拦截器都需要调用 RealInterceptorChain 的 proceed() 方法,以便于为下一步准备参数。

7. Interceptor

下面来看看各个拦截器,根据执行的顺序来分析。

注意:上面提到,除了 拦截器 CallServerInterceptor 以外,其他拦截器都需要调用 RealInterceptorChain 的 proceed() 方法,分析每个拦截器时,我们需要关注一下。

7.1 自定义拦截器

在最开始的 get 请求的例子中,添加了一个自定义的拦截器,用于设置网络请求的 header,主要通过责任链 Chain 拿到请求,然后重新构建一个请求,构建过程中将 header 设置进去,然后调用 proceed() 方法,向下传递。

OkHttpClient httpClient = new OkHttpClient.Builder()
// 添加 header 信息
.addInterceptor(new Interceptor() {
@Override
public Response intercept(Chain chain) throws IOException {
Request.Builder newBuilder = chain.request().newBuilder();
if (headMaps != null && !headMaps.isEmpty()) {
newBuilder.headers(Headers.of(headMaps));
}
return chain.proceed(newBuilder.build());
}
})
.build();

7.2 RetryAndFollowUpInterceptor

@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();

StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;

int followUpCount = 0;
Response priorResponse = null;
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}

Response response;
boolean releaseConnection = true;
try {
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
// 通过路线连接失败,请求将不会再发送
if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
throw e.getLastConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
// 与服务器尝试通信失败,请求不会再发送。
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
// 抛出未检查的异常,释放资源
if (releaseConnection) {
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}

// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}

Request followUp = followUpRequest(response, streamAllocation.route());

if (followUp == null) {
if (!forWebSocket) {
streamAllocation.release();
}
return response;
}

closeQuietly(response.body());

if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}

if (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
}

if (!sameConnection(response, followUp.url())) {
streamAllocation.release();
streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(followUp.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
} else if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}

request = followUp;
priorResponse = response;
}
}

(1) 失败重试和重定向拦截器主要构建 StreamAllocation 参数,StreamAllocation 类主要协调 Connections、Streams、Calls,Connections 是对远程服务器连接的物理 socket;Streams是在 Connections 上的 Http 请求/响应 对;Calls 是一系列的 Streams

(2)通过调用 chain.proceed(request, streamAllocation, null, null);向下传递,并对很多种异常情况做出了处理

(3) 对返回结果 response 处理

7.3 BridgeInterceptor

@Override public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();

// (1)将用户的request转换为发送到server的请求
RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}

long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}

if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}

if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}

// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}

List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}

if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}

Response networkResponse = chain.proceed(requestBuilder.build());

HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

// (2)将服务器的响应转换成返回结果的响应,主要添加一些附加信息,如请求等
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);

if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}

return responseBuilder.build();
}

BridgeInterceptor 是桥接拦截器,主要将用户请求转换成对服务器的请求,从代码中可以看出,添加了很多请求头中的信息,如 Content-Type、Content-Length、Host、Connection、Accept-Encoding、Cookie、User-Agent;同时在请求结果回来时,也能够对响应做出转换,在响应中添加附加信息等。

7.4 CacheInterceptor

@Override 
public Response intercept(Chain chain) throws IOException {
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;

long now = System.currentTimeMillis();
// 创建缓存
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;

if (cache != null) {
cache.trackResponse(strategy);
}

// 有缓存,但是不可用,关闭缓存的结果
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}

// 不允许网络请求时返回失败的结果
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}

// 不需要请求时,从缓存中取出结果
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
// 继续执行责任链
Response networkResponse = null;
try {
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}

// 如果有缓存结果,需要选择性返回响应
if (cacheResponse != null) {
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();

// 更新缓存
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}

Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();

if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}

if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}

return response;
}

CacheInterceptor 缓存拦截器没有过多说的,主要是对请求进行缓存,然后继续执行责任链,同样, 当响应返回时,需要根据结果对比缓存的响应,选择性返回,并且更新缓存。

7.5 ConnectInterceptor

RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();

// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();

return realChain.proceed(request, streamAllocation, httpCodec, connection);
public HttpCodec newStream(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();

try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);

synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}

连接拦截器主要是构建 RealConnection 对象和 HttpCodec 对象,其中 RealConnection 是在连接池中查找,HttpCodec 是对 Http 请求和相应的一个编码和解码的抽象,它的构建需要 StreamAllocation、RealConnection,StreamAllocation 在 RetryAndFollowUpInterceptor 中已经构建,RealConnection 是从连接池中查找。有了 streamAllocation, httpCodec, connection 这个几个参数,通过调用 realChain.proceed(request, streamAllocation, httpCodec, connection); 传递到责任链的下一个步骤中。

7.6 netWorkInterceptor

这一个拦截器是我们自己添加的拦截器,如果没有设置,则不会执行,这个拦截器一般是用于打印 log 信息,将请求信息和响应信息打印出来,一般使用 HttpLoggingInterceptor,我们只需要自定义一下打印的格式即可。

7.7 CallServerInterceptor

@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();

long sentRequestMillis = System.currentTimeMillis();
// (1)写入请求
realChain.eventListener().requestHeadersStart(realChain.call());
httpCodec.writeRequestHeaders(request);
realChain.eventListener().requestHeadersEnd(realChain.call(), request);

Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(true);
}

if (responseBuilder == null) {
// (2)如果有请求体,写入请求体,POST 请求
realChain.eventListener().requestBodyStart(realChain.call());
long contentLength = request.body().contentLength();
CountingSink requestBodyOut =
new CountingSink(httpCodec.createRequestBody(request, contentLength));
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);

request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
realChain.eventListener()
.requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
} else if (!connection.isMultiplexed()) {
streamAllocation.noNewStreams();
}
}

httpCodec.finishRequest();

// (3)请求码判断
int code = response.code();
if (code == 100) {
responseBuilder = httpCodec.readResponseHeaders(false);
response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();

code = response.code();
}

realChain.eventListener()
.responseHeadersEnd(realChain.call(), response);

if (forWebSocket && code == 101) {
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
// (4) openResponseBody 获取响应体信息
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}

if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}

if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
// (5)往上级 ConnectInterceptor 返回 Response。
return response;
}

CallServerInterceptor 是请求责任链的最后一个动作,完成后就可以获取到 Response。CallServerInterceptor 主要完成这个几个任务,信息的写和读取主要是利用 Okio 来完成。

(1) 写入请求头信息。

(2) 有请求体的情况下,写入请求体信息。

(3) 结束请求。

(4) 读取响应头信息。

(5) 往上一级 ConnectInterceptor 返回一个网络请求回来的 Response。

小结

okhttp 的请求实际上就是分解为各个拦截器的动作,各个拦截器按照先后的顺序,依次执行,采用责任链模式,最终完成请求,责任链在请求过程中逐步携带请求的参数信息, 逐步的意思就是来源前面拦截器创建的参数,后面的拦截器依赖前面拦截器的参数,参数由责任链传递过来;此外,在请求结果返回时,责任链中又从后面回溯到前面,依次处理返回结果,就是这样一去一回的过程。

process1

异步请求

上面是同步请求的详细过程,下面再简单看一下异步请求的过程。

异步执行过程通过调用 client.newCall(request).enqueue(callback);

@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

realCall 的 enqueue() 方法实际上是调用分发器 dispatcher 的 enqueue() 方法,同时将 回调方法包装了一下,包装成一个 AsyncCall,AsyncCall 实际上是一个 Runnable。

synchronized void enqueue(AsyncCall call) {
// 正在异步执行的请求数量小于最大限制,并且正在要请求的这个 call,指向的目标服务器没有达到最大数量,小于 5 个
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else { 
// 添加到异步请求的准备集合当中
readyAsyncCalls.add(call);
}
}

(1) Dispatcher 将请求添加到集合中,添加过程中先判断正在异步执行的请求数量是否最大限制,并且正在要请求的这个 call,指向的目标服务器都否达到最大数量,小于默认 5 个,也就说当前请求数量没有达到最大数量,请求的目标主机 不超过 5 个时(可修改数量),就添加到正在执行的集合中,否则添加到等待集合中,等待执行。

(2) executorService 是一个线程池,异步执行 AsyncCall。

@Override 
protected void execute() {
boolean signalledCallback = false;
try {
// (1) 上面同步过程分析过的,网络请求的入口,责任链开启的地方
Response response = getResponseWithInterceptorChain();
// (2) 请求取消
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
// (3) 请求成功
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}

线程池开始执行一个 AsyncCall 时,调用它的 execute() 方法。可以看到实际上执行过程和同步过程基本一致,只不过最后请求结果返回时,通过 responseCallback 回调给请求者。

(1) getResponseWithInterceptorChain() 请求责任链开启,和上面同步分析过程一致

(2) 取消请求或者请求过程中发生异常,通过回调接口告知请求失败 responseCallback.onFailure()

(3) 请求成功 responseCallback.onResponse(RealCall.this, response);将请求结果返回

以上就是 okhttp 请求过程分析,里面涉及一些网络请求,okio 的东西,没有具体讲,后面再有文章单独来讲,重点需要掌握责任链这个设计模式的使用,文章中也有一个小例子,没有看懂 okhttp 责任链模式,可以回头看看上面的小例子,当然责任链模式还有其他形式,如通过每个节点来设置后续节点,这样就需要持有责任链。

参考

OKHttp源码解析

OKHTTP拦截器CallServerInterceptor的简单分析

打赏一个呗

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦