diff --git a/app/src/main/java/com/vitorpamplona/amethyst/service/relays/Client.kt b/app/src/main/java/com/vitorpamplona/amethyst/service/relays/Client.kt index 75a2a5be6..e922670dd 100644 --- a/app/src/main/java/com/vitorpamplona/amethyst/service/relays/Client.kt +++ b/app/src/main/java/com/vitorpamplona/amethyst/service/relays/Client.kt @@ -112,7 +112,7 @@ object Client : RelayPool.Listener { val relay = Relay(url, true, true, feedTypes ?: emptySet(), HttpClient.getProxy()) RelayPool.addRelay(relay) - relay.requestAndWatch { + relay.connectAndRun { allSubscriptions().forEach { relay.sendFilter(requestId = it) } diff --git a/app/src/main/java/com/vitorpamplona/amethyst/service/relays/Relay.kt b/app/src/main/java/com/vitorpamplona/amethyst/service/relays/Relay.kt index 4fe979957..74bdc5cba 100644 --- a/app/src/main/java/com/vitorpamplona/amethyst/service/relays/Relay.kt +++ b/app/src/main/java/com/vitorpamplona/amethyst/service/relays/Relay.kt @@ -15,6 +15,7 @@ import okhttp3.WebSocket import okhttp3.WebSocketListener import java.net.Proxy import java.time.Duration +import java.util.concurrent.atomic.AtomicBoolean enum class FeedType { FOLLOWS, PUBLIC_CHATS, PRIVATE_DMS, GLOBAL, SEARCH, WALLET_CONNECT @@ -29,14 +30,16 @@ class Relay( var activeTypes: Set = FeedType.values().toSet(), proxy: Proxy? ) { - val seconds = if (proxy != null) 20L else 10L - val duration = Duration.ofSeconds(seconds) + companion object { + // waits 3 minutes to reconnect once things fail + const val RECONNECTING_IN_SECONDS = 60 * 3 + } private val httpClient = OkHttpClient.Builder() .proxy(proxy) - .readTimeout(duration) - .connectTimeout(duration) - .writeTimeout(duration) + .readTimeout(Duration.ofSeconds(if (proxy != null) 20L else 10L)) + .connectTimeout(Duration.ofSeconds(if (proxy != null) 20L else 10L)) + .writeTimeout(Duration.ofSeconds(if (proxy != null) 20L else 10L)) .followRedirects(true) .followSslRedirects(true) .build() @@ -50,9 +53,9 @@ class Relay( var spamCounter = 0 var errorCounter = 0 - var ping: Long? = null + var pingInMs: Long? = null - var closingTime = 0L + var closingTimeInSeconds = 0L var afterEOSE = false @@ -68,10 +71,8 @@ class Relay( return socket != null } - @Synchronized - fun requestAndWatch() { - checkNotInMainThread() - requestAndWatch { + fun connect() { + connectAndRun { checkNotInMainThread() // Sends everything. @@ -81,9 +82,16 @@ class Relay( } } - @Synchronized - fun requestAndWatch(onConnected: (Relay) -> Unit) { + private var connectingBlock = AtomicBoolean() + + fun connectAndRun(onConnected: (Relay) -> Unit) { + // If there is a connection, don't wait. + if (connectingBlock.getAndSet(true)) { + return + } + checkNotInMainThread() + if (socket != null) return try { @@ -91,132 +99,143 @@ class Relay( .header("User-Agent", "Amethyst/${BuildConfig.VERSION_NAME}") .url(url.trim()) .build() - val listener = object : WebSocketListener() { - override fun onOpen(webSocket: WebSocket, response: Response) { - checkNotInMainThread() + socket = httpClient.newWebSocket(request, RelayListener(onConnected)) + } catch (e: Exception) { + errorCounter++ + markConnectionAsClosed() + Log.e("Relay", "Relay Invalid $url") + e.printStackTrace() + } finally { + connectingBlock.set(false) + } + } - afterEOSE = false - isReady = true - ping = response.receivedResponseAtMillis - response.sentRequestAtMillis - // Log.w("Relay", "Relay OnOpen, Loading All subscriptions $url") - onConnected(this@Relay) + inner class RelayListener(val onConnected: (Relay) -> Unit) : WebSocketListener() { + override fun onOpen(webSocket: WebSocket, response: Response) { + checkNotInMainThread() - listeners.forEach { it.onRelayStateChange(this@Relay, Type.CONNECT, null) } + markConnectionAsReady(response.receivedResponseAtMillis - response.sentRequestAtMillis) + + // Log.w("Relay", "Relay OnOpen, Loading All subscriptions $url") + onConnected(this@Relay) + + listeners.forEach { it.onRelayStateChange(this@Relay, Type.CONNECT, null) } + } + + override fun onMessage(webSocket: WebSocket, text: String) { + checkNotInMainThread() + + eventDownloadCounterInBytes += text.bytesUsedInMemory() + + try { + processNewRelayMessage(text) + } catch (t: Throwable) { + t.printStackTrace() + text.chunked(2000) { chunked -> + listeners.forEach { it.onError(this@Relay, "", Error("Problem with $chunked")) } } + } + } - override fun onMessage(webSocket: WebSocket, text: String) { - checkNotInMainThread() + override fun onClosing(webSocket: WebSocket, code: Int, reason: String) { + checkNotInMainThread() - eventDownloadCounterInBytes += text.bytesUsedInMemory() + listeners.forEach { + it.onRelayStateChange( + this@Relay, + Type.DISCONNECTING, + null + ) + } + } - try { - val msg = Event.gson.fromJson(text, JsonElement::class.java).asJsonArray - val type = msg[0].asString - val channel = msg[1].asString + override fun onClosed(webSocket: WebSocket, code: Int, reason: String) { + checkNotInMainThread() - when (type) { - "EVENT" -> { - val event = Event.fromJson(msg[2], Client.lenient) + markConnectionAsClosed() - // Log.w("Relay", "Relay onEVENT $url, $channel") - listeners.forEach { - it.onEvent(this@Relay, channel, event) - if (afterEOSE) { - it.onRelayStateChange(this@Relay, Type.EOSE, channel) - } - } - } - "EOSE" -> listeners.forEach { - afterEOSE = true - // Log.w("Relay", "Relay onEOSE $url, $channel") - it.onRelayStateChange(this@Relay, Type.EOSE, channel) - } - "NOTICE" -> listeners.forEach { - Log.w("Relay", "Relay onNotice $url, $channel") - it.onError(this@Relay, channel, Error("Relay sent notice: " + channel)) - } - "OK" -> listeners.forEach { - Log.w("Relay", "Relay on OK $url, ${msg[1].asString}, ${msg[2].asBoolean}, ${msg[3].asString}") - it.onSendResponse(this@Relay, msg[1].asString, msg[2].asBoolean, msg[3].asString) - } - "AUTH" -> listeners.forEach { - // Log.w("Relay", "Relay$url, ${msg[1].asString}") - it.onAuth(this@Relay, msg[1].asString) - } - else -> listeners.forEach { - // Log.w("Relay", "Relay something else $url, $channel") - it.onError( - this@Relay, - channel, - Error("Unknown type $type on channel $channel. Msg was $text") - ) - } - } - } catch (t: Throwable) { - t.printStackTrace() - text.chunked(2000) { chunked -> - listeners.forEach { it.onError(this@Relay, "", Error("Problem with $chunked")) } - } - } - } + listeners.forEach { it.onRelayStateChange(this@Relay, Type.DISCONNECT, null) } + } - override fun onClosing(webSocket: WebSocket, code: Int, reason: String) { - checkNotInMainThread() + override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) { + checkNotInMainThread() - listeners.forEach { - it.onRelayStateChange( - this@Relay, - Type.DISCONNECTING, - null - ) - } - } + errorCounter++ - override fun onClosed(webSocket: WebSocket, code: Int, reason: String) { - checkNotInMainThread() + socket?.close(1000, "Normal close") + // Failures disconnect the relay. + markConnectionAsClosed() - socket = null - isReady = false - afterEOSE = false - closingTime = TimeUtils.now() - listeners.forEach { it.onRelayStateChange(this@Relay, Type.DISCONNECT, null) } - } + Log.w("Relay", "Relay onFailure $url, ${response?.message} $response") + t.printStackTrace() + listeners.forEach { + it.onError(this@Relay, "", Error("WebSocket Failure. Response: $response. Exception: ${t.message}", t)) + } + } + } - override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) { - checkNotInMainThread() + fun markConnectionAsReady(pingInMs: Long) { + this.afterEOSE = false + this.isReady = true + this.pingInMs = pingInMs + } - errorCounter++ + fun markConnectionAsClosed() { + this.socket = null + this.isReady = false + this.afterEOSE = false + this.closingTimeInSeconds = TimeUtils.now() + } - socket?.close(1000, "Normal close") - // Failures disconnect the relay. - socket = null - isReady = false - afterEOSE = false - closingTime = TimeUtils.now() + fun processNewRelayMessage(newMessage: String) { + val msgArray = Event.gson.fromJson(newMessage, JsonElement::class.java).asJsonArray + val type = msgArray[0].asString + val channel = msgArray[1].asString - Log.w("Relay", "Relay onFailure $url, ${response?.message} $response") - t.printStackTrace() - listeners.forEach { - it.onError(this@Relay, "", Error("WebSocket Failure. Response: $response. Exception: ${t.message}", t)) + when (type) { + "EVENT" -> { + val event = Event.fromJson(msgArray[2], Client.lenient) + + // Log.w("Relay", "Relay onEVENT $url, $channel") + listeners.forEach { + it.onEvent(this@Relay, channel, event) + if (afterEOSE) { + it.onRelayStateChange(this@Relay, Type.EOSE, channel) } } } - - socket = httpClient.newWebSocket(request, listener) - } catch (e: Exception) { - errorCounter++ - isReady = false - afterEOSE = false - closingTime = TimeUtils.now() - Log.e("Relay", "Relay Invalid $url") - e.printStackTrace() + "EOSE" -> listeners.forEach { + afterEOSE = true + // Log.w("Relay", "Relay onEOSE $url, $channel") + it.onRelayStateChange(this@Relay, Type.EOSE, channel) + } + "NOTICE" -> listeners.forEach { + Log.w("Relay", "Relay onNotice $url, $channel") + it.onError(this@Relay, channel, Error("Relay sent notice: " + channel)) + } + "OK" -> listeners.forEach { + Log.w("Relay", "Relay on OK $url, ${msgArray[1].asString}, ${msgArray[2].asBoolean}, ${msgArray[3].asString}") + it.onSendResponse(this@Relay, msgArray[1].asString, msgArray[2].asBoolean, msgArray[3].asString) + } + "AUTH" -> listeners.forEach { + // Log.w("Relay", "Relay$url, ${msg[1].asString}") + it.onAuth(this@Relay, msgArray[1].asString) + } + else -> listeners.forEach { + // Log.w("Relay", "Relay something else $url, $channel") + it.onError( + this@Relay, + channel, + Error("Unknown type $type on channel $channel. Msg was $newMessage") + ) + } } } fun disconnect() { // httpClient.dispatcher.executorService.shutdown() - closingTime = TimeUtils.now() + closingTimeInSeconds = TimeUtils.now() socket?.close(1000, "Normal close") socket = null isReady = false @@ -241,9 +260,9 @@ class Relay( } } else { // waits 60 seconds to reconnect after disconnected. - if (TimeUtils.now() > closingTime + 60) { + if (TimeUtils.now() > closingTimeInSeconds + RECONNECTING_IN_SECONDS) { // sends all filters after connection is successful. - requestAndWatch() + connect() } } } @@ -254,9 +273,9 @@ class Relay( if (socket == null) { // waits 60 seconds to reconnect after disconnected. - if (TimeUtils.now() > closingTime + 60) { + if (TimeUtils.now() > closingTimeInSeconds + RECONNECTING_IN_SECONDS) { // println("sendfilter Only if Disconnected ${url} ") - requestAndWatch() + connect() } } } diff --git a/app/src/main/java/com/vitorpamplona/amethyst/service/relays/RelayPool.kt b/app/src/main/java/com/vitorpamplona/amethyst/service/relays/RelayPool.kt index d2562529f..a5fe3a219 100644 --- a/app/src/main/java/com/vitorpamplona/amethyst/service/relays/RelayPool.kt +++ b/app/src/main/java/com/vitorpamplona/amethyst/service/relays/RelayPool.kt @@ -51,7 +51,7 @@ object RelayPool : Relay.Listener { fun requestAndWatch() { checkNotInMainThread() - relays.forEach { it.requestAndWatch() } + relays.forEach { it.connect() } } fun sendFilter(subscriptionId: String) {