Refactors Relay class to remove the Synchronized block

This commit is contained in:
Vitor Pamplona
2023-07-25 10:23:20 -04:00
parent b919b165d6
commit cd6f9576cc
3 changed files with 139 additions and 120 deletions

View File

@@ -112,7 +112,7 @@ object Client : RelayPool.Listener {
val relay = Relay(url, true, true, feedTypes ?: emptySet(), HttpClient.getProxy())
RelayPool.addRelay(relay)
relay.requestAndWatch {
relay.connectAndRun {
allSubscriptions().forEach {
relay.sendFilter(requestId = it)
}

View File

@@ -15,6 +15,7 @@ import okhttp3.WebSocket
import okhttp3.WebSocketListener
import java.net.Proxy
import java.time.Duration
import java.util.concurrent.atomic.AtomicBoolean
enum class FeedType {
FOLLOWS, PUBLIC_CHATS, PRIVATE_DMS, GLOBAL, SEARCH, WALLET_CONNECT
@@ -29,14 +30,16 @@ class Relay(
var activeTypes: Set<FeedType> = FeedType.values().toSet(),
proxy: Proxy?
) {
val seconds = if (proxy != null) 20L else 10L
val duration = Duration.ofSeconds(seconds)
companion object {
// waits 3 minutes to reconnect once things fail
const val RECONNECTING_IN_SECONDS = 60 * 3
}
private val httpClient = OkHttpClient.Builder()
.proxy(proxy)
.readTimeout(duration)
.connectTimeout(duration)
.writeTimeout(duration)
.readTimeout(Duration.ofSeconds(if (proxy != null) 20L else 10L))
.connectTimeout(Duration.ofSeconds(if (proxy != null) 20L else 10L))
.writeTimeout(Duration.ofSeconds(if (proxy != null) 20L else 10L))
.followRedirects(true)
.followSslRedirects(true)
.build()
@@ -50,9 +53,9 @@ class Relay(
var spamCounter = 0
var errorCounter = 0
var ping: Long? = null
var pingInMs: Long? = null
var closingTime = 0L
var closingTimeInSeconds = 0L
var afterEOSE = false
@@ -68,10 +71,8 @@ class Relay(
return socket != null
}
@Synchronized
fun requestAndWatch() {
checkNotInMainThread()
requestAndWatch {
fun connect() {
connectAndRun {
checkNotInMainThread()
// Sends everything.
@@ -81,9 +82,16 @@ class Relay(
}
}
@Synchronized
fun requestAndWatch(onConnected: (Relay) -> Unit) {
private var connectingBlock = AtomicBoolean()
fun connectAndRun(onConnected: (Relay) -> Unit) {
// If there is a connection, don't wait.
if (connectingBlock.getAndSet(true)) {
return
}
checkNotInMainThread()
if (socket != null) return
try {
@@ -91,132 +99,143 @@ class Relay(
.header("User-Agent", "Amethyst/${BuildConfig.VERSION_NAME}")
.url(url.trim())
.build()
val listener = object : WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) {
checkNotInMainThread()
socket = httpClient.newWebSocket(request, RelayListener(onConnected))
} catch (e: Exception) {
errorCounter++
markConnectionAsClosed()
Log.e("Relay", "Relay Invalid $url")
e.printStackTrace()
} finally {
connectingBlock.set(false)
}
}
afterEOSE = false
isReady = true
ping = response.receivedResponseAtMillis - response.sentRequestAtMillis
// Log.w("Relay", "Relay OnOpen, Loading All subscriptions $url")
onConnected(this@Relay)
inner class RelayListener(val onConnected: (Relay) -> Unit) : WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) {
checkNotInMainThread()
listeners.forEach { it.onRelayStateChange(this@Relay, Type.CONNECT, null) }
markConnectionAsReady(response.receivedResponseAtMillis - response.sentRequestAtMillis)
// Log.w("Relay", "Relay OnOpen, Loading All subscriptions $url")
onConnected(this@Relay)
listeners.forEach { it.onRelayStateChange(this@Relay, Type.CONNECT, null) }
}
override fun onMessage(webSocket: WebSocket, text: String) {
checkNotInMainThread()
eventDownloadCounterInBytes += text.bytesUsedInMemory()
try {
processNewRelayMessage(text)
} catch (t: Throwable) {
t.printStackTrace()
text.chunked(2000) { chunked ->
listeners.forEach { it.onError(this@Relay, "", Error("Problem with $chunked")) }
}
}
}
override fun onMessage(webSocket: WebSocket, text: String) {
checkNotInMainThread()
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
checkNotInMainThread()
eventDownloadCounterInBytes += text.bytesUsedInMemory()
listeners.forEach {
it.onRelayStateChange(
this@Relay,
Type.DISCONNECTING,
null
)
}
}
try {
val msg = Event.gson.fromJson(text, JsonElement::class.java).asJsonArray
val type = msg[0].asString
val channel = msg[1].asString
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
checkNotInMainThread()
when (type) {
"EVENT" -> {
val event = Event.fromJson(msg[2], Client.lenient)
markConnectionAsClosed()
// Log.w("Relay", "Relay onEVENT $url, $channel")
listeners.forEach {
it.onEvent(this@Relay, channel, event)
if (afterEOSE) {
it.onRelayStateChange(this@Relay, Type.EOSE, channel)
}
}
}
"EOSE" -> listeners.forEach {
afterEOSE = true
// Log.w("Relay", "Relay onEOSE $url, $channel")
it.onRelayStateChange(this@Relay, Type.EOSE, channel)
}
"NOTICE" -> listeners.forEach {
Log.w("Relay", "Relay onNotice $url, $channel")
it.onError(this@Relay, channel, Error("Relay sent notice: " + channel))
}
"OK" -> listeners.forEach {
Log.w("Relay", "Relay on OK $url, ${msg[1].asString}, ${msg[2].asBoolean}, ${msg[3].asString}")
it.onSendResponse(this@Relay, msg[1].asString, msg[2].asBoolean, msg[3].asString)
}
"AUTH" -> listeners.forEach {
// Log.w("Relay", "Relay$url, ${msg[1].asString}")
it.onAuth(this@Relay, msg[1].asString)
}
else -> listeners.forEach {
// Log.w("Relay", "Relay something else $url, $channel")
it.onError(
this@Relay,
channel,
Error("Unknown type $type on channel $channel. Msg was $text")
)
}
}
} catch (t: Throwable) {
t.printStackTrace()
text.chunked(2000) { chunked ->
listeners.forEach { it.onError(this@Relay, "", Error("Problem with $chunked")) }
}
}
}
listeners.forEach { it.onRelayStateChange(this@Relay, Type.DISCONNECT, null) }
}
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
checkNotInMainThread()
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
checkNotInMainThread()
listeners.forEach {
it.onRelayStateChange(
this@Relay,
Type.DISCONNECTING,
null
)
}
}
errorCounter++
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
checkNotInMainThread()
socket?.close(1000, "Normal close")
// Failures disconnect the relay.
markConnectionAsClosed()
socket = null
isReady = false
afterEOSE = false
closingTime = TimeUtils.now()
listeners.forEach { it.onRelayStateChange(this@Relay, Type.DISCONNECT, null) }
}
Log.w("Relay", "Relay onFailure $url, ${response?.message} $response")
t.printStackTrace()
listeners.forEach {
it.onError(this@Relay, "", Error("WebSocket Failure. Response: $response. Exception: ${t.message}", t))
}
}
}
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
checkNotInMainThread()
fun markConnectionAsReady(pingInMs: Long) {
this.afterEOSE = false
this.isReady = true
this.pingInMs = pingInMs
}
errorCounter++
fun markConnectionAsClosed() {
this.socket = null
this.isReady = false
this.afterEOSE = false
this.closingTimeInSeconds = TimeUtils.now()
}
socket?.close(1000, "Normal close")
// Failures disconnect the relay.
socket = null
isReady = false
afterEOSE = false
closingTime = TimeUtils.now()
fun processNewRelayMessage(newMessage: String) {
val msgArray = Event.gson.fromJson(newMessage, JsonElement::class.java).asJsonArray
val type = msgArray[0].asString
val channel = msgArray[1].asString
Log.w("Relay", "Relay onFailure $url, ${response?.message} $response")
t.printStackTrace()
listeners.forEach {
it.onError(this@Relay, "", Error("WebSocket Failure. Response: $response. Exception: ${t.message}", t))
when (type) {
"EVENT" -> {
val event = Event.fromJson(msgArray[2], Client.lenient)
// Log.w("Relay", "Relay onEVENT $url, $channel")
listeners.forEach {
it.onEvent(this@Relay, channel, event)
if (afterEOSE) {
it.onRelayStateChange(this@Relay, Type.EOSE, channel)
}
}
}
socket = httpClient.newWebSocket(request, listener)
} catch (e: Exception) {
errorCounter++
isReady = false
afterEOSE = false
closingTime = TimeUtils.now()
Log.e("Relay", "Relay Invalid $url")
e.printStackTrace()
"EOSE" -> listeners.forEach {
afterEOSE = true
// Log.w("Relay", "Relay onEOSE $url, $channel")
it.onRelayStateChange(this@Relay, Type.EOSE, channel)
}
"NOTICE" -> listeners.forEach {
Log.w("Relay", "Relay onNotice $url, $channel")
it.onError(this@Relay, channel, Error("Relay sent notice: " + channel))
}
"OK" -> listeners.forEach {
Log.w("Relay", "Relay on OK $url, ${msgArray[1].asString}, ${msgArray[2].asBoolean}, ${msgArray[3].asString}")
it.onSendResponse(this@Relay, msgArray[1].asString, msgArray[2].asBoolean, msgArray[3].asString)
}
"AUTH" -> listeners.forEach {
// Log.w("Relay", "Relay$url, ${msg[1].asString}")
it.onAuth(this@Relay, msgArray[1].asString)
}
else -> listeners.forEach {
// Log.w("Relay", "Relay something else $url, $channel")
it.onError(
this@Relay,
channel,
Error("Unknown type $type on channel $channel. Msg was $newMessage")
)
}
}
}
fun disconnect() {
// httpClient.dispatcher.executorService.shutdown()
closingTime = TimeUtils.now()
closingTimeInSeconds = TimeUtils.now()
socket?.close(1000, "Normal close")
socket = null
isReady = false
@@ -241,9 +260,9 @@ class Relay(
}
} else {
// waits 60 seconds to reconnect after disconnected.
if (TimeUtils.now() > closingTime + 60) {
if (TimeUtils.now() > closingTimeInSeconds + RECONNECTING_IN_SECONDS) {
// sends all filters after connection is successful.
requestAndWatch()
connect()
}
}
}
@@ -254,9 +273,9 @@ class Relay(
if (socket == null) {
// waits 60 seconds to reconnect after disconnected.
if (TimeUtils.now() > closingTime + 60) {
if (TimeUtils.now() > closingTimeInSeconds + RECONNECTING_IN_SECONDS) {
// println("sendfilter Only if Disconnected ${url} ")
requestAndWatch()
connect()
}
}
}

View File

@@ -51,7 +51,7 @@ object RelayPool : Relay.Listener {
fun requestAndWatch() {
checkNotInMainThread()
relays.forEach { it.requestAndWatch() }
relays.forEach { it.connect() }
}
fun sendFilter(subscriptionId: String) {