Snippets
Created by
tim4dev
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 | /*
* The MIT License
* Copyright (c) 2020 https://www.tim4.dev
*/
package app.example.coroutines
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.launch
import okhttp3.*
import okio.ByteString
data class RawData(
val json: String
)
interface IWebSocketChannel {
fun getIncoming(): Flow<RawData>
fun isClosed(): Boolean
fun close(
code: Int = 1000,
reason: String? = null
)
fun send(data: RawData)
}
class WebSocketChannel(
private val scope: CoroutineScope
) : IWebSocketChannel {
private val socket: WebSocket
private val incoming = Channel<RawData>()
private val outgoing = Channel<RawData>()
private val incomingFlow: Flow<RawData> = incoming.consumeAsFlow()
init {
val okHttpClient = OkHttpClient.Builder()
.build()
val request = Request.Builder()
.url(Constant.WS_URL)
.build()
socket =
okHttpClient.newWebSocket(request, WebSocketChannelListener(incoming, outgoing))
// Trigger shutdown of the dispatcher's executor so this process can exit cleanly.
okHttpClient.dispatcher().executorService().shutdown()
// Everything that goes into the outgoing channel is sent to the web socket.
// Everything that gets into the incoming channel is sent to incomingFlow.
scope.launch(Dispatchers.IO) {
try {
outgoing.consumeEach {
socket.send(it.json)
}
} finally {
close()
}
}
}
override fun getIncoming(): Flow<RawData> {
return incomingFlow
}
override fun isClosed(): Boolean {
return incoming.isClosedForReceive || outgoing.isClosedForSend
}
override fun close(
code: Int,
reason: String?
) {
scope.launch(Dispatchers.IO) {
socket.close(code, reason) // note: Channels will close in WebSocket.onClosed
}
}
override fun send(data: RawData) {
scope.launch(Dispatchers.IO) {
outgoing.send(data)
}
}
inner class WebSocketChannelListener(
private val incoming: Channel<RawData>,
private val outgoing: Channel<RawData>
) : WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) {}
override fun onMessage(webSocket: WebSocket, text: String) {
scope.launch(Dispatchers.IO) {
incoming.send(RawData(text))
}
}
override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
scope.launch(Dispatchers.IO) {
incoming.send(RawData(bytes.toString()))
}
}
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {}
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
incoming.close()
outgoing.close()
}
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
incoming.close(t)
outgoing.close(t)
}
}
}
/**
* Usage
*/
class Repository {
private lateinit var channel: IWebSocketChannel
fun webSocketCreate(scope: CoroutineScope): Flow<RawData> {
channel = WebSocketChannel(scope)
return channel.getIncoming()
}
fun webSocketSend(data: RawData) {
channel.send(data)
}
}
|
Comments (0)
You can clone a snippet to your computer for local editing. Learn more.