- Bugfix to mark relay connections as AfterEOSE when the relay client or server closes the subscription

- Adds an arrival time to log the time EOSEs were received in case the listener is running async processes later.
This commit is contained in:
Vitor Pamplona
2025-05-13 14:43:28 -04:00
parent 2d5732bcce
commit 50cde1bc3e
9 changed files with 47 additions and 50 deletions

View File

@@ -42,6 +42,7 @@ class RelayLogger(
event: Event,
subscriptionId: String,
relay: Relay,
arrivalTime: Long,
afterEOSE: Boolean,
) {
Log.d(TAG, "Relay onEVENT ${relay.url} ($subscriptionId - $afterEOSE) ${event.toJson()}")

View File

@@ -46,6 +46,7 @@ class RelaySpeedLogger(
event: Event,
subscriptionId: String,
relay: Relay,
arrivalTime: Long,
afterEOSE: Boolean,
) {
val now = TimeUtils.now() / 10

View File

@@ -25,12 +25,9 @@ import com.vitorpamplona.ammolite.service.checkNotInMainThread
import com.vitorpamplona.quartz.nip01Core.core.Event
import com.vitorpamplona.quartz.nip01Core.relay.RelayState
import com.vitorpamplona.quartz.nip01Core.relay.sockets.WebsocketBuilderFactory
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.util.UUID
@@ -42,13 +39,6 @@ import java.util.concurrent.TimeUnit
*/
class NostrClient(
private val websocketBuilder: WebsocketBuilderFactory,
private val listenerScope: CoroutineScope =
CoroutineScope(
Dispatchers.Default + SupervisorJob() +
CoroutineExceptionHandler { _, throwable ->
Log.e("NostrClient", "Caught exception: ${throwable.message}", throwable)
},
),
) : RelayPool.Listener {
private val relayPool: RelayPool = RelayPool()
private val activeSubscriptions: MutableSubscriptionManager = MutableSubscriptionManager()
@@ -135,12 +125,14 @@ class NostrClient(
event: Event,
subId: String,
relay: Relay,
arrivalTime: Long,
afterEOSE: Boolean,
) {
if (subId == subscriptionId) {
onResponse(event)
unsubscribe(this)
close(subscriptionId)
onResponse(event)
}
}
},
@@ -186,6 +178,7 @@ class NostrClient(
override fun onEOSE(
relay: Relay,
subscriptionId: String,
arrivalTime: Long,
) {
latch.countDown()
Log.d("sendAndWaitForResponse", "onEOSE relay ${relay.url} count: ${latch.count}")
@@ -320,32 +313,28 @@ class NostrClient(
event: Event,
subscriptionId: String,
relay: Relay,
arrivalTime: Long,
afterEOSE: Boolean,
) {
// Releases the Web thread for the new payload.
// May need to add a processing queue if processing new events become too costly.
listenerScope.launch {
listeners.forEach { it.onEvent(event, subscriptionId, relay, afterEOSE) }
}
listeners.forEach { it.onEvent(event, subscriptionId, relay, arrivalTime, afterEOSE) }
}
override fun onEOSE(
relay: Relay,
subscriptionId: String,
arrivalTime: Long,
) {
listenerScope.launch {
listeners.forEach { it.onEOSE(relay, subscriptionId) }
}
listeners.forEach { it.onEOSE(relay, subscriptionId, arrivalTime) }
}
override fun onRelayStateChange(
type: RelayState,
relay: Relay,
) {
listenerScope.launch {
listeners.forEach { it.onRelayStateChange(type, relay) }
}
}
@OptIn(DelicateCoroutinesApi::class)
override fun onSendResponse(
@@ -356,10 +345,8 @@ class NostrClient(
) {
// Releases the Web thread for the new payload.
// May need to add a processing queue if processing new events become too costly.
listenerScope.launch {
listeners.forEach { it.onSendResponse(eventId, success, message, relay) }
}
}
@OptIn(DelicateCoroutinesApi::class)
override fun onAuth(
@@ -368,7 +355,7 @@ class NostrClient(
) {
// Releases the Web thread for the new payload.
// May need to add a processing queue if processing new events become too costly.
listenerScope.launch { listeners.forEach { it.onAuth(relay, challenge) } }
listeners.forEach { it.onAuth(relay, challenge) }
}
@OptIn(DelicateCoroutinesApi::class)
@@ -378,10 +365,8 @@ class NostrClient(
) {
// Releases the Web thread for the new payload.
// May need to add a processing queue if processing new events become too costly.
listenerScope.launch {
listeners.forEach { it.onNotify(relay, description) }
}
}
@OptIn(DelicateCoroutinesApi::class)
override fun onSend(
@@ -389,20 +374,16 @@ class NostrClient(
msg: String,
success: Boolean,
) {
listenerScope.launch {
listeners.forEach { it.onSend(relay, msg, success) }
}
}
@OptIn(DelicateCoroutinesApi::class)
override fun onBeforeSend(
relay: Relay,
event: Event,
) {
listenerScope.launch {
listeners.forEach { it.onBeforeSend(relay, event) }
}
}
@OptIn(DelicateCoroutinesApi::class)
override fun onError(
@@ -410,10 +391,8 @@ class NostrClient(
subscriptionId: String,
relay: Relay,
) {
listenerScope.launch {
listeners.forEach { it.onError(error, subscriptionId, relay) }
}
}
fun subscribe(listener: Listener) {
listeners = listeners.plus(listener)
@@ -439,6 +418,7 @@ class NostrClient(
event: Event,
subscriptionId: String,
relay: Relay,
arrivalTime: Long,
afterEOSE: Boolean,
) = Unit
@@ -446,6 +426,7 @@ class NostrClient(
open fun onEOSE(
relay: Relay,
subscriptionId: String,
arrivalTime: Long,
) = Unit
/** Connected to or disconnected from a relay */

View File

@@ -161,8 +161,9 @@ class Relay(
relay: SimpleClientRelay,
subscriptionId: String,
event: Event,
time: Long,
afterEOSE: Boolean,
) = listeners.forEach { it.onEvent(this, subscriptionId, event, afterEOSE) }
) = listeners.forEach { it.onEvent(this, subscriptionId, event, time, afterEOSE) }
override fun onError(
relay: SimpleClientRelay,
@@ -173,7 +174,8 @@ class Relay(
override fun onEOSE(
relay: SimpleClientRelay,
subscriptionId: String,
) = listeners.forEach { it.onEOSE(this, subscriptionId) }
time: Long,
) = listeners.forEach { it.onEOSE(this, subscriptionId, time) }
override fun onRelayStateChange(
relay: SimpleClientRelay,
@@ -219,12 +221,14 @@ class Relay(
relay: Relay,
subscriptionId: String,
event: Event,
time: Long,
afterEOSE: Boolean,
)
fun onEOSE(
relay: Relay,
subscriptionId: String,
time: Long,
)
fun onError(

View File

@@ -170,12 +170,14 @@ class RelayPool : Relay.Listener {
event: Event,
subscriptionId: String,
relay: Relay,
arrivalTime: Long,
afterEOSE: Boolean,
)
fun onEOSE(
relay: Relay,
subscriptionId: String,
arrivalTime: Long,
)
fun onRelayStateChange(
@@ -222,9 +224,10 @@ class RelayPool : Relay.Listener {
relay: Relay,
subscriptionId: String,
event: Event,
arrivalTime: Long,
afterEOSE: Boolean,
) {
listeners.forEach { it.onEvent(event, subscriptionId, relay, afterEOSE) }
listeners.forEach { it.onEvent(event, subscriptionId, relay, arrivalTime, afterEOSE) }
}
override fun onError(
@@ -239,8 +242,9 @@ class RelayPool : Relay.Listener {
override fun onEOSE(
relay: Relay,
subscriptionId: String,
arrivalTime: Long,
) {
listeners.forEach { it.onEOSE(relay, subscriptionId) }
listeners.forEach { it.onEOSE(relay, subscriptionId, arrivalTime) }
updateStatus()
}

View File

@@ -38,6 +38,7 @@ class EventCollector(
event: Event,
subscriptionId: String,
relay: Relay,
arrivalTime: Long,
afterEOSE: Boolean,
) {
onEvent(event, relay)

View File

@@ -27,7 +27,6 @@ import com.vitorpamplona.ammolite.relays.Relay
import com.vitorpamplona.ammolite.relays.TypedFilter
import com.vitorpamplona.quartz.nip01Core.core.Event
import com.vitorpamplona.quartz.nip01Core.relay.RelayState
import com.vitorpamplona.quartz.utils.TimeUtils
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import java.util.concurrent.atomic.AtomicBoolean
@@ -55,6 +54,7 @@ abstract class NostrDataSource(
event: Event,
subscriptionId: String,
relay: Relay,
arrivalTime: Long,
afterEOSE: Boolean,
) {
if (subscriptions.contains(subscriptionId)) {
@@ -63,7 +63,7 @@ abstract class NostrDataSource(
consume(event, relay)
if (afterEOSE) {
markAsEOSE(subscriptionId, relay)
markAsEOSE(subscriptionId, relay, arrivalTime)
}
}
}
@@ -71,9 +71,10 @@ abstract class NostrDataSource(
override fun onEOSE(
relay: Relay,
subscriptionId: String,
arrivalTime: Long,
) {
if (subscriptions.contains(subscriptionId)) {
markAsEOSE(subscriptionId, relay)
markAsEOSE(subscriptionId, relay, arrivalTime)
}
}
@@ -253,10 +254,10 @@ abstract class NostrDataSource(
open fun markAsEOSE(
subscriptionId: String,
relay: Relay,
arrivalTime: Long,
) {
subscriptions[subscriptionId]?.callEose(
// in case people's clock is slighly off.
TimeUtils.oneMinuteAgo(),
arrivalTime,
relay.url,
)
}

View File

@@ -39,6 +39,7 @@ class RelayLogger(
event: Event,
subscriptionId: String,
relay: Relay,
arrivalTime: Long,
afterEOSE: Boolean,
) {
Log.d("Relay", "Relay onEVENT ${relay.url} ($subscriptionId - $afterEOSE) ${event.toJson()}")

View File

@@ -234,13 +234,12 @@ class SimpleClientRelay(
fun processNewRelayMessage(newMessage: String) {
when (val msg = parser.parse(newMessage)) {
is EventMessage -> {
// Log.w("Relay", "Relay onEVENT $url $newMessage")
listener.onEvent(this, msg.subId, msg.event, afterEOSEPerSubscription[msg.subId] == true)
listener.onEvent(this, msg.subId, msg.event, TimeUtils.now(), afterEOSEPerSubscription[msg.subId] == true)
}
is EoseMessage -> {
// Log.w("Relay", "Relay onEOSE $url $newMessage")
afterEOSEPerSubscription[msg.subId] = true
listener.onEOSE(this@SimpleClientRelay, msg.subId)
listener.onEOSE(this, msg.subId, TimeUtils.now())
}
is NoticeMessage -> {
// Log.w("Relay", "Relay onNotice $url, $newMessage")
@@ -282,6 +281,7 @@ class SimpleClientRelay(
listener.onNotify(this@SimpleClientRelay, msg.message)
}
is ClosedMessage -> {
afterEOSEPerSubscription[msg.subscriptionId] = false
// Log.w("Relay", "Relay Closed Subscription $url, $newMessage")
listener.onClosed(this@SimpleClientRelay, msg.subscriptionId, msg.message)
}
@@ -318,8 +318,8 @@ class SimpleClientRelay(
if (isConnectionStarted()) {
if (isReady) {
if (filters.isNotEmpty()) {
writeToSocket(ReqCmd.toJson(requestId, filters))
afterEOSEPerSubscription[requestId] = false
writeToSocket(ReqCmd.toJson(requestId, filters))
}
}
} else {
@@ -339,8 +339,8 @@ class SimpleClientRelay(
if (isConnectionStarted()) {
if (isReady) {
if (filters.isNotEmpty()) {
writeToSocket(CountCmd.toJson(requestId, filters))
afterEOSEPerSubscription[requestId] = false
writeToSocket(CountCmd.toJson(requestId, filters))
}
}
} else {
@@ -423,6 +423,7 @@ class SimpleClientRelay(
fun close(subscriptionId: String) {
writeToSocket(CloseCmd.toJson(subscriptionId))
afterEOSEPerSubscription[subscriptionId] = false
}
interface Listener {
@@ -433,6 +434,7 @@ class SimpleClientRelay(
relay: SimpleClientRelay,
subscriptionId: String,
event: Event,
arrivalTime: Long,
afterEOSE: Boolean,
)
@@ -442,6 +444,7 @@ class SimpleClientRelay(
fun onEOSE(
relay: SimpleClientRelay,
subscriptionId: String,
arrivalTime: Long,
)
/**