Updates EOSE in the same thread of the new event to reduce the amount of coroutine launches.

This commit is contained in:
Vitor Pamplona 2023-12-02 14:54:34 -05:00
parent 66b8b04db3
commit 6ef1bee8a0
4 changed files with 20 additions and 18 deletions

View File

@ -35,7 +35,7 @@ abstract class NostrDataSource(val debugName: String) {
}
private val clientListener = object : Client.Listener() {
override fun onEvent(event: Event, subscriptionId: String, relay: Relay) {
override fun onEvent(event: Event, subscriptionId: String, relay: Relay, afterEOSE: Boolean) {
if (subscriptionId in subscriptions.keys) {
val key = "$debugName $subscriptionId ${event.kind}"
val keyValue = eventCounter.get(key)
@ -46,6 +46,9 @@ abstract class NostrDataSource(val debugName: String) {
}
consume(event, relay)
if (afterEOSE) {
markAsEOSE(subscriptionId, relay)
}
}
}
@ -62,11 +65,7 @@ abstract class NostrDataSource(val debugName: String) {
// }}")
if (type == Relay.StateType.EOSE && subscriptionId != null && subscriptionId in subscriptions.keys) {
// updates a per subscripton since date
subscriptions[subscriptionId]?.updateEOSE(
TimeUtils.oneMinuteAgo(), // in case people's clock is slighly off.
relay.url
)
markAsEOSE(subscriptionId, relay)
}
}
@ -221,6 +220,13 @@ abstract class NostrDataSource(val debugName: String) {
LocalCache.getNoteIfExists(eventId)?.addRelay(relay)
}
open fun markAsEOSE(subscriptionId: String, relay: Relay) {
subscriptions[subscriptionId]?.updateEOSE(
TimeUtils.oneMinuteAgo(), // in case people's clock is slighly off.
relay.url
)
}
abstract fun updateChannelFilters()
open fun auth(relay: Relay, challenge: String) = Unit
open fun notify(relay: Relay, description: String) = Unit

View File

@ -153,11 +153,11 @@ object Client : RelayPool.Listener {
}
@OptIn(DelicateCoroutinesApi::class)
override fun onEvent(event: Event, subscriptionId: String, relay: Relay) {
override fun onEvent(event: Event, subscriptionId: String, relay: Relay, afterEOSE: Boolean) {
// Releases the Web thread for the new payload.
// May need to add a processing queue if processing new events become too costly.
GlobalScope.launch(Dispatchers.Default) {
listeners.forEach { it.onEvent(event, subscriptionId, relay) }
listeners.forEach { it.onEvent(event, subscriptionId, relay, afterEOSE) }
}
}
@ -225,7 +225,7 @@ object Client : RelayPool.Listener {
/**
* A new message was received
*/
open fun onEvent(event: Event, subscriptionId: String, relay: Relay) = Unit
open fun onEvent(event: Event, subscriptionId: String, relay: Relay, afterEOSE: Boolean) = Unit
/**
* A new or repeat message was received

View File

@ -204,11 +204,7 @@ class Relay(
// Log.w("Relay", "Relay onEVENT ${event.kind} $url, $subscriptionId ${msgArray.get(2)}")
listeners.forEach {
it.onEvent(this@Relay, subscriptionId, event)
if (afterEOSEPerSubscription[subscriptionId] == true) {
Log.w("Relay", "Relay onEOSE $url $subscriptionId")
it.onRelayStateChange(this@Relay, StateType.EOSE, subscriptionId)
}
it.onEvent(this@Relay, subscriptionId, event, afterEOSEPerSubscription[subscriptionId] == true)
}
}
"EOSE" -> listeners.forEach {
@ -389,7 +385,7 @@ class Relay(
/**
* A new message was received
*/
fun onEvent(relay: Relay, subscriptionId: String, event: Event)
fun onEvent(relay: Relay, subscriptionId: String, event: Event, afterEOSE: Boolean)
fun onError(relay: Relay, subscriptionId: String, error: Error)

View File

@ -103,7 +103,7 @@ object RelayPool : Relay.Listener {
}
interface Listener {
fun onEvent(event: Event, subscriptionId: String, relay: Relay)
fun onEvent(event: Event, subscriptionId: String, relay: Relay, afterEOSE: Boolean)
fun onError(error: Error, subscriptionId: String, relay: Relay)
@ -116,8 +116,8 @@ object RelayPool : Relay.Listener {
fun onNotify(relay: Relay, description: String)
}
override fun onEvent(relay: Relay, subscriptionId: String, event: Event) {
listeners.forEach { it.onEvent(event, subscriptionId, relay) }
override fun onEvent(relay: Relay, subscriptionId: String, event: Event, afterEOSE: Boolean) {
listeners.forEach { it.onEvent(event, subscriptionId, relay, afterEOSE) }
}
override fun onError(relay: Relay, subscriptionId: String, error: Error) {