用户登录
用户注册

分享至

Android使用okHttp实现WebSocket源码分析

  • 作者: 嗷嗷de奥特曼
  • 来源: 51数据库
  • 2021-11-09

项目中需要实现一个类似心跳包的内容,语音通话期间,需要不断的请求服务器发送数据,确保正在连接中,开始想到的是使用service开启子线程,定时请求接口发送数据,但数据量很大时,http请求头部太大,每次请求需要发送重复的头数据,非常消耗流量,于是考虑使用长连接的方式。Android中实现WebSocket有很多种方式,这里说一下okHttp的方式。先来大致了解一下websocket。

  • WebSocket属于双向通信协议,Http是单向的
  • webSocket是需要握手进行建立连接的,只是在建立握手时,数据是通过HTTP传输的,但建立之后传输数据不需要http协议
  • websocket的请求链接不是http://这种,而是以ws://开头
  • websocket的Connection链接类型是 Upgrade:websocket,表示将该请求升级为websocket。
  • websocket的头部响应码是101,表示本次链接的http协议即将被更改为upgrade:websocket 指定的协议。

使用okhttp封装websocketdemo

public class WebSocketDemo {

    private final String TAG = WebSocketDemo.class.getSimpleName();

    private OkHttpClient CLIENT;
    private WebSocket mWebSocket;

    private static WebSocketDemo ourInstance;

    public static WebSocketDemo getDefault() {
        if (ourInstance == null) {
            synchronized (WebSocketDemo.class) {
                if (ourInstance == null) {
                    ourInstance = new WebSocketDemo();
                }
            }
        }
        return ourInstance;
    }

    private WebSocketDemo() {
        CLIENT = new OkHttpClient.Builder()
                .writeTimeout(5, TimeUnit.SECONDS)
                .readTimeout(5, TimeUnit.SECONDS)
                .connectTimeout(5, TimeUnit.SECONDS)
                .build();
    }

    public void connect(String url) {
        if (mWebSocket != null) {
            mWebSocket.cancel();
        }
        Request request = new Request.Builder()
                .url(url)
                .build();
        mWebSocket = CLIENT.newWebSocket(request, new SocketListener());
    }

    public void sendMessage(String message) {
        mWebSocket.send(message);
    }

    public void sendMessage(byte... data) {
        ByteString bs = ByteString.of(data);
        mWebSocket.send(bs);
    }

    public void close(int code, String reason) {
        mWebSocket.close(code, reason);
    }

    class SocketListener extends WebSocketListener {

        @Override
        public void onOpen(WebSocket webSocket, Response response) {
            super.onOpen(webSocket, response);
            Log.i(TAG, "onOpen response=" + response);
        }

        @Override
        public void onMessage(WebSocket webSocket, String text) {
            super.onMessage(webSocket, text);
            Log.i(TAG, "onMessage text=" + text);
        }

        @Override
        public void onMessage(WebSocket webSocket, ByteString bytes) {
            super.onMessage(webSocket, bytes);
            Log.i(TAG, "onMessage bytes=" + bytes);
        }

        @Override
        public void onClosing(WebSocket webSocket, int code, String reason) {
            super.onClosing(webSocket, code, reason);
            Log.i(TAG, "onClosing code=" + code);
        }

        @Override
        public void onClosed(WebSocket webSocket, int code, String reason) {
            super.onClosed(webSocket, code, reason);
            Log.i(TAG, "onClosed code=" + code);
        }

        @Override
        public void onFailure(WebSocket webSocket, Throwable t, Response response) {
            super.onFailure(webSocket, t, response);
            Log.i(TAG, "onFailure t=" + t.getMessage());
        }
    }

}

源码分析

  • 懒汉式单例获取,保证唯一且线程安全,并创造okhttpclient实例
  • 连接方法,通过client.newWebSocket()创建WebSocket对象,方法中创建RealWebSocke,并开始连接
  /**
   * Uses {@code request} to connect a new web socket.
   */
  @Override public WebSocket newWebSocket(Request request, WebSocketListener listener) {
    RealWebSocket webSocket = new RealWebSocket(request, listener, new Random(), pingInterval);
    webSocket.connect(this);
    return webSocket;
  }
  public RealWebSocket(Request request, WebSocketListener listener, Random random,
      long pingIntervalMillis) {
    if (!"GET".equals(request.method())) {
      throw new IllegalArgumentException("Request must be GET: " + request.method());
    }
    this.originalRequest = request;
    this.listener = listener;
    this.random = random;
    this.pingIntervalMillis = pingIntervalMillis;

    byte[] nonce = new byte[16];
    random.nextBytes(nonce);
    this.key = ByteString.of(nonce).base64();

    this.writerRunnable = new Runnable() {
      @Override public void run() {
        try {
          while (writeOneFrame()) {
          }
        } catch (IOException e) {
          failWebSocket(e, null);
        }
      }
    };
  }
  • 通过RealWebSocket的创建内容,分析出请求必须是get请求方式,原因分析过,websocket协议需要发送一个http的get请求进行握手;接下来有一些默认赋值,最后实例了一个wirterRunnable,方法里主要是while循环不断的从队列中取出数据,发送到服务器。
  • 接下来就是真正的connect方法
  public void connect(OkHttpClient client) {
    client = client.newBuilder()
        .eventListener(EventListener.NONE)
        .protocols(ONLY_HTTP1)
        .build();
    final Request request = originalRequest.newBuilder()
        .header("Upgrade", "websocket")
        .header("Connection", "Upgrade")
        .header("Sec-WebSocket-Key", key)
        .header("Sec-WebSocket-Version", "13")
        .build();
    call = Internal.instance.newWebSocketCall(client, request);
    call.enqueue(new Callback() {
      @Override public void onResponse(Call call, Response response) {
        try {
          checkResponse(response);
        } catch (ProtocolException e) {
          failWebSocket(e, response);
          closeQuietly(response);
          return;
        }

        // Promote the HTTP streams into web socket streams.
        StreamAllocation streamAllocation = Internal.instance.streamAllocation(call);
        streamAllocation.noNewStreams(); // Prevent connection pooling!
        Streams streams = streamAllocation.connection().newWebSocketStreams(streamAllocation);

        // Process all web socket messages.
        try {
          listener.onOpen(RealWebSocket.this, response);
          String name = "OkHttp WebSocket " + request.url().redact();
          initReaderAndWriter(name, streams);
          streamAllocation.connection().socket().setSoTimeout(0);
          loopReader();
        } catch (Exception e) {
          failWebSocket(e, null);
        }
      }

      @Override public void onFailure(Call call, IOException e) {
        failWebSocket(e, null);
      }
    });
  }
  • 通过该方法可以看到upgrade-websocket、connection-upgrade设置好头信息,构建符合websocket协议握手规范的http请求;enqueue()方法中,有个checkResponse()方法,其主要功能是校验回应值相关信息,包括code值,connection和upgrade等内容。
void checkResponse(Response response) throws ProtocolException {
    if (response.code() != 101) {
      throw new ProtocolException("Expected HTTP 101 response but was '"
          + response.code() + " " + response.message() + "'");
    }

    String headerConnection = response.header("Connection");
    if (!"Upgrade".equalsIgnoreCase(headerConnection)) {
      throw new ProtocolException("Expected 'Connection' header value 'Upgrade' but was '"
          + headerConnection + "'");
    }

    String headerUpgrade = response.header("Upgrade");
    if (!"websocket".equalsIgnoreCase(headerUpgrade)) {
      throw new ProtocolException(
          "Expected 'Upgrade' header value 'websocket' but was '" + headerUpgrade + "'");
    }

    String headerAccept = response.header("Sec-WebSocket-Accept");
    String acceptExpected = ByteString.encodeUtf8(key + WebSocketProtocol.ACCEPT_MAGIC)
        .sha1().base64();
    if (!acceptExpected.equals(headerAccept)) {
      throw new ProtocolException("Expected 'Sec-WebSocket-Accept' header value '"
          + acceptExpected + "' but was '" + headerAccept + "'");
    }
  }
  • 链接并校验通过后,通过流来传递数据,链接过程中设置超时时间为0,也就是永不超时,维持这个链接。最后调用loopReader()循环读取服务器消息。
  /** Receive frames until there are no more. Invoked only by the reader thread. */
  public void loopReader() throws IOException {
    while (receivedCloseCode == -1) {
      // This method call results in one or more onRead* methods being called on this thread.
      reader.processNextFrame();
    }
  }
  void processNextFrame() throws IOException {
    readHeader();
    if (isControlFrame) {
      readControlFrame();
    } else {
      readMessageFrame();
    }
  }
  private void readHeader() throws IOException {
    if (closed) throw new IOException("closed");

    ...

    int b1 = source.readByte() & 0xff;

    ...
  }
  • readHeader()方法主要关注一下readByte()方法,最后调用的是输入流的read方法,这里也是IO阻塞模型,等待接收消息。

至此websocket的源码分析已解析完,希望能帮助各位朋友们,有哪块说的不对的,欢迎指出

软件
前端设计
程序设计
Java相关