+ * Copyright (c) 2020 https://www.tim4.dev
+package app.example.coroutines
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.channels.consumeEach
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.consumeAsFlow
+import kotlinx.coroutines.launch
+interface IWebSocketChannel {
+ fun getIncoming(): Flow<RawData>
+ fun isClosed(): Boolean
+ fun send(data: RawData)
+ private val scope: CoroutineScope
+ private val socket: WebSocket
+ private val incoming = Channel<RawData>()
+ private val outgoing = Channel<RawData>()
+ private val incomingFlow: Flow<RawData> = incoming.consumeAsFlow()
+ val okHttpClient = OkHttpClient.Builder()
+ val request = Request.Builder()
+ okHttpClient.newWebSocket(request, WebSocketChannelListener(incoming, outgoing))
+ // Trigger shutdown of the dispatcher's executor so this process can exit cleanly.
+ okHttpClient.dispatcher().executorService().shutdown()
+ // Everything that goes into the outgoing channel is sent to the web socket.
+ // Everything that gets into the incoming channel is sent to incomingFlow.
+ scope.launch(Dispatchers.IO) {
+ override fun getIncoming(): Flow<RawData> {
+ override fun isClosed(): Boolean {
+ return incoming.isClosedForReceive || outgoing.isClosedForSend
+ scope.launch(Dispatchers.IO) {
+ socket.close(code, reason) // note: Channels will close in WebSocket.onClosed
+ override fun send(data: RawData) {
+ scope.launch(Dispatchers.IO) {
+ inner class WebSocketChannelListener(
+ private val incoming: Channel<RawData>,
+ private val outgoing: Channel<RawData>
+ ) : WebSocketListener() {
+ override fun onOpen(webSocket: WebSocket, response: Response) {}
+ override fun onMessage(webSocket: WebSocket, text: String) {
+ scope.launch(Dispatchers.IO) {
+ incoming.send(RawData(text))
+ override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
+ scope.launch(Dispatchers.IO) {
+ incoming.send(RawData(bytes.toString()))
+ override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {}
+ override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
+ override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
+ private lateinit var channel: IWebSocketChannel
+ fun webSocketCreate(scope: CoroutineScope): Flow<RawData> {
+ channel = WebSocketChannel(scope)
+ return channel.getIncoming()
+ fun webSocketSend(data: RawData) {