From 50cde1bc3e7bd9059887df3b6ff106ffbcd51f0c Mon Sep 17 00:00:00 2001 From: Vitor Pamplona Date: Tue, 13 May 2025 14:43:28 -0400 Subject: [PATCH] - Bugfix to mark relay connections as AfterEOSE when the relay client or server closes the subscription - Adds an arrival time to log the time EOSEs were received in case the listener is running async processes later. --- .../service/relayClient/RelayLogger.kt | 1 + .../service/relayClient/RelaySpeedLogger.kt | 1 + .../ammolite/relays/NostrClient.kt | 53 ++++++------------- .../vitorpamplona/ammolite/relays/Relay.kt | 8 ++- .../ammolite/relays/RelayPool.kt | 8 ++- .../relays/datasources/EventCollector.kt | 1 + .../relays/datasources/NostrDataSource.kt | 11 ++-- .../relays/datasources/RelayLogger.kt | 1 + .../nip01Core/relay/SimpleClientRelay.kt | 13 +++-- 9 files changed, 47 insertions(+), 50 deletions(-) diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/RelayLogger.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/RelayLogger.kt index b25d09c80..2de74dd96 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/RelayLogger.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/RelayLogger.kt @@ -42,6 +42,7 @@ class RelayLogger( event: Event, subscriptionId: String, relay: Relay, + arrivalTime: Long, afterEOSE: Boolean, ) { Log.d(TAG, "Relay onEVENT ${relay.url} ($subscriptionId - $afterEOSE) ${event.toJson()}") diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/RelaySpeedLogger.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/RelaySpeedLogger.kt index f34380ada..1e1b7e566 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/RelaySpeedLogger.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/RelaySpeedLogger.kt @@ -46,6 +46,7 @@ class RelaySpeedLogger( event: Event, subscriptionId: String, relay: Relay, + arrivalTime: Long, afterEOSE: Boolean, ) { val now = TimeUtils.now() / 10 diff --git a/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/NostrClient.kt b/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/NostrClient.kt index bd3422f2b..7b1876621 100644 --- a/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/NostrClient.kt +++ b/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/NostrClient.kt @@ -25,12 +25,9 @@ import com.vitorpamplona.ammolite.service.checkNotInMainThread import com.vitorpamplona.quartz.nip01Core.core.Event import com.vitorpamplona.quartz.nip01Core.relay.RelayState import com.vitorpamplona.quartz.nip01Core.relay.sockets.WebsocketBuilderFactory -import kotlinx.coroutines.CoroutineExceptionHandler -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import java.util.UUID @@ -42,13 +39,6 @@ import java.util.concurrent.TimeUnit */ class NostrClient( private val websocketBuilder: WebsocketBuilderFactory, - private val listenerScope: CoroutineScope = - CoroutineScope( - Dispatchers.Default + SupervisorJob() + - CoroutineExceptionHandler { _, throwable -> - Log.e("NostrClient", "Caught exception: ${throwable.message}", throwable) - }, - ), ) : RelayPool.Listener { private val relayPool: RelayPool = RelayPool() private val activeSubscriptions: MutableSubscriptionManager = MutableSubscriptionManager() @@ -135,12 +125,14 @@ class NostrClient( event: Event, subId: String, relay: Relay, + arrivalTime: Long, afterEOSE: Boolean, ) { if (subId == subscriptionId) { - onResponse(event) unsubscribe(this) close(subscriptionId) + + onResponse(event) } } }, @@ -186,6 +178,7 @@ class NostrClient( override fun onEOSE( relay: Relay, subscriptionId: String, + arrivalTime: Long, ) { latch.countDown() Log.d("sendAndWaitForResponse", "onEOSE relay ${relay.url} count: ${latch.count}") @@ -320,31 +313,27 @@ class NostrClient( event: Event, subscriptionId: String, relay: Relay, + arrivalTime: Long, afterEOSE: Boolean, ) { // Releases the Web thread for the new payload. // May need to add a processing queue if processing new events become too costly. - listenerScope.launch { - listeners.forEach { it.onEvent(event, subscriptionId, relay, afterEOSE) } - } + listeners.forEach { it.onEvent(event, subscriptionId, relay, arrivalTime, afterEOSE) } } override fun onEOSE( relay: Relay, subscriptionId: String, + arrivalTime: Long, ) { - listenerScope.launch { - listeners.forEach { it.onEOSE(relay, subscriptionId) } - } + listeners.forEach { it.onEOSE(relay, subscriptionId, arrivalTime) } } override fun onRelayStateChange( type: RelayState, relay: Relay, ) { - listenerScope.launch { - listeners.forEach { it.onRelayStateChange(type, relay) } - } + listeners.forEach { it.onRelayStateChange(type, relay) } } @OptIn(DelicateCoroutinesApi::class) @@ -356,9 +345,7 @@ class NostrClient( ) { // Releases the Web thread for the new payload. // May need to add a processing queue if processing new events become too costly. - listenerScope.launch { - listeners.forEach { it.onSendResponse(eventId, success, message, relay) } - } + listeners.forEach { it.onSendResponse(eventId, success, message, relay) } } @OptIn(DelicateCoroutinesApi::class) @@ -368,7 +355,7 @@ class NostrClient( ) { // Releases the Web thread for the new payload. // May need to add a processing queue if processing new events become too costly. - listenerScope.launch { listeners.forEach { it.onAuth(relay, challenge) } } + listeners.forEach { it.onAuth(relay, challenge) } } @OptIn(DelicateCoroutinesApi::class) @@ -378,9 +365,7 @@ class NostrClient( ) { // Releases the Web thread for the new payload. // May need to add a processing queue if processing new events become too costly. - listenerScope.launch { - listeners.forEach { it.onNotify(relay, description) } - } + listeners.forEach { it.onNotify(relay, description) } } @OptIn(DelicateCoroutinesApi::class) @@ -389,9 +374,7 @@ class NostrClient( msg: String, success: Boolean, ) { - listenerScope.launch { - listeners.forEach { it.onSend(relay, msg, success) } - } + listeners.forEach { it.onSend(relay, msg, success) } } @OptIn(DelicateCoroutinesApi::class) @@ -399,9 +382,7 @@ class NostrClient( relay: Relay, event: Event, ) { - listenerScope.launch { - listeners.forEach { it.onBeforeSend(relay, event) } - } + listeners.forEach { it.onBeforeSend(relay, event) } } @OptIn(DelicateCoroutinesApi::class) @@ -410,9 +391,7 @@ class NostrClient( subscriptionId: String, relay: Relay, ) { - listenerScope.launch { - listeners.forEach { it.onError(error, subscriptionId, relay) } - } + listeners.forEach { it.onError(error, subscriptionId, relay) } } fun subscribe(listener: Listener) { @@ -439,6 +418,7 @@ class NostrClient( event: Event, subscriptionId: String, relay: Relay, + arrivalTime: Long, afterEOSE: Boolean, ) = Unit @@ -446,6 +426,7 @@ class NostrClient( open fun onEOSE( relay: Relay, subscriptionId: String, + arrivalTime: Long, ) = Unit /** Connected to or disconnected from a relay */ diff --git a/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/Relay.kt b/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/Relay.kt index a40758544..fb704e94a 100644 --- a/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/Relay.kt +++ b/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/Relay.kt @@ -161,8 +161,9 @@ class Relay( relay: SimpleClientRelay, subscriptionId: String, event: Event, + time: Long, afterEOSE: Boolean, - ) = listeners.forEach { it.onEvent(this, subscriptionId, event, afterEOSE) } + ) = listeners.forEach { it.onEvent(this, subscriptionId, event, time, afterEOSE) } override fun onError( relay: SimpleClientRelay, @@ -173,7 +174,8 @@ class Relay( override fun onEOSE( relay: SimpleClientRelay, subscriptionId: String, - ) = listeners.forEach { it.onEOSE(this, subscriptionId) } + time: Long, + ) = listeners.forEach { it.onEOSE(this, subscriptionId, time) } override fun onRelayStateChange( relay: SimpleClientRelay, @@ -219,12 +221,14 @@ class Relay( relay: Relay, subscriptionId: String, event: Event, + time: Long, afterEOSE: Boolean, ) fun onEOSE( relay: Relay, subscriptionId: String, + time: Long, ) fun onError( diff --git a/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/RelayPool.kt b/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/RelayPool.kt index e347a4bcc..f3c9aa369 100644 --- a/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/RelayPool.kt +++ b/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/RelayPool.kt @@ -170,12 +170,14 @@ class RelayPool : Relay.Listener { event: Event, subscriptionId: String, relay: Relay, + arrivalTime: Long, afterEOSE: Boolean, ) fun onEOSE( relay: Relay, subscriptionId: String, + arrivalTime: Long, ) fun onRelayStateChange( @@ -222,9 +224,10 @@ class RelayPool : Relay.Listener { relay: Relay, subscriptionId: String, event: Event, + arrivalTime: Long, afterEOSE: Boolean, ) { - listeners.forEach { it.onEvent(event, subscriptionId, relay, afterEOSE) } + listeners.forEach { it.onEvent(event, subscriptionId, relay, arrivalTime, afterEOSE) } } override fun onError( @@ -239,8 +242,9 @@ class RelayPool : Relay.Listener { override fun onEOSE( relay: Relay, subscriptionId: String, + arrivalTime: Long, ) { - listeners.forEach { it.onEOSE(relay, subscriptionId) } + listeners.forEach { it.onEOSE(relay, subscriptionId, arrivalTime) } updateStatus() } diff --git a/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/EventCollector.kt b/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/EventCollector.kt index aa3bdea35..b810c1c98 100644 --- a/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/EventCollector.kt +++ b/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/EventCollector.kt @@ -38,6 +38,7 @@ class EventCollector( event: Event, subscriptionId: String, relay: Relay, + arrivalTime: Long, afterEOSE: Boolean, ) { onEvent(event, relay) diff --git a/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/NostrDataSource.kt b/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/NostrDataSource.kt index 4575d3059..74fc9f75f 100644 --- a/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/NostrDataSource.kt +++ b/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/NostrDataSource.kt @@ -27,7 +27,6 @@ import com.vitorpamplona.ammolite.relays.Relay import com.vitorpamplona.ammolite.relays.TypedFilter import com.vitorpamplona.quartz.nip01Core.core.Event import com.vitorpamplona.quartz.nip01Core.relay.RelayState -import com.vitorpamplona.quartz.utils.TimeUtils import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.Dispatchers import java.util.concurrent.atomic.AtomicBoolean @@ -55,6 +54,7 @@ abstract class NostrDataSource( event: Event, subscriptionId: String, relay: Relay, + arrivalTime: Long, afterEOSE: Boolean, ) { if (subscriptions.contains(subscriptionId)) { @@ -63,7 +63,7 @@ abstract class NostrDataSource( consume(event, relay) if (afterEOSE) { - markAsEOSE(subscriptionId, relay) + markAsEOSE(subscriptionId, relay, arrivalTime) } } } @@ -71,9 +71,10 @@ abstract class NostrDataSource( override fun onEOSE( relay: Relay, subscriptionId: String, + arrivalTime: Long, ) { if (subscriptions.contains(subscriptionId)) { - markAsEOSE(subscriptionId, relay) + markAsEOSE(subscriptionId, relay, arrivalTime) } } @@ -253,10 +254,10 @@ abstract class NostrDataSource( open fun markAsEOSE( subscriptionId: String, relay: Relay, + arrivalTime: Long, ) { subscriptions[subscriptionId]?.callEose( - // in case people's clock is slighly off. - TimeUtils.oneMinuteAgo(), + arrivalTime, relay.url, ) } diff --git a/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/RelayLogger.kt b/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/RelayLogger.kt index 71f9c7c93..4cb900ab6 100644 --- a/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/RelayLogger.kt +++ b/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/RelayLogger.kt @@ -39,6 +39,7 @@ class RelayLogger( event: Event, subscriptionId: String, relay: Relay, + arrivalTime: Long, afterEOSE: Boolean, ) { Log.d("Relay", "Relay onEVENT ${relay.url} ($subscriptionId - $afterEOSE) ${event.toJson()}") diff --git a/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/SimpleClientRelay.kt b/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/SimpleClientRelay.kt index ff1e6bff3..22504e4d4 100644 --- a/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/SimpleClientRelay.kt +++ b/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/SimpleClientRelay.kt @@ -234,13 +234,12 @@ class SimpleClientRelay( fun processNewRelayMessage(newMessage: String) { when (val msg = parser.parse(newMessage)) { is EventMessage -> { - // Log.w("Relay", "Relay onEVENT $url $newMessage") - listener.onEvent(this, msg.subId, msg.event, afterEOSEPerSubscription[msg.subId] == true) + listener.onEvent(this, msg.subId, msg.event, TimeUtils.now(), afterEOSEPerSubscription[msg.subId] == true) } is EoseMessage -> { // Log.w("Relay", "Relay onEOSE $url $newMessage") afterEOSEPerSubscription[msg.subId] = true - listener.onEOSE(this@SimpleClientRelay, msg.subId) + listener.onEOSE(this, msg.subId, TimeUtils.now()) } is NoticeMessage -> { // Log.w("Relay", "Relay onNotice $url, $newMessage") @@ -282,6 +281,7 @@ class SimpleClientRelay( listener.onNotify(this@SimpleClientRelay, msg.message) } is ClosedMessage -> { + afterEOSEPerSubscription[msg.subscriptionId] = false // Log.w("Relay", "Relay Closed Subscription $url, $newMessage") listener.onClosed(this@SimpleClientRelay, msg.subscriptionId, msg.message) } @@ -318,8 +318,8 @@ class SimpleClientRelay( if (isConnectionStarted()) { if (isReady) { if (filters.isNotEmpty()) { - writeToSocket(ReqCmd.toJson(requestId, filters)) afterEOSEPerSubscription[requestId] = false + writeToSocket(ReqCmd.toJson(requestId, filters)) } } } else { @@ -339,8 +339,8 @@ class SimpleClientRelay( if (isConnectionStarted()) { if (isReady) { if (filters.isNotEmpty()) { - writeToSocket(CountCmd.toJson(requestId, filters)) afterEOSEPerSubscription[requestId] = false + writeToSocket(CountCmd.toJson(requestId, filters)) } } } else { @@ -423,6 +423,7 @@ class SimpleClientRelay( fun close(subscriptionId: String) { writeToSocket(CloseCmd.toJson(subscriptionId)) + afterEOSEPerSubscription[subscriptionId] = false } interface Listener { @@ -433,6 +434,7 @@ class SimpleClientRelay( relay: SimpleClientRelay, subscriptionId: String, event: Event, + arrivalTime: Long, afterEOSE: Boolean, ) @@ -442,6 +444,7 @@ class SimpleClientRelay( fun onEOSE( relay: SimpleClientRelay, subscriptionId: String, + arrivalTime: Long, ) /**