Adds a req that returns the list of events in order of arrival

This commit is contained in:
Vitor Pamplona
2025-10-29 18:30:00 -04:00
parent f9f7cdf12f
commit 95cc0783fd
2 changed files with 223 additions and 0 deletions

View File

@@ -0,0 +1,102 @@
/**
* Copyright (c) 2025 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.quartz.nip01Core.relay.client.reqs
import com.vitorpamplona.quartz.nip01Core.core.Event
import com.vitorpamplona.quartz.nip01Core.core.HexKey
import com.vitorpamplona.quartz.nip01Core.relay.client.INostrClient
import com.vitorpamplona.quartz.nip01Core.relay.filters.Filter
import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl
import com.vitorpamplona.quartz.nip01Core.relay.normalizer.RelayUrlNormalizer
import com.vitorpamplona.quartz.utils.RandomInstance
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
/**
* 1. Subscribes to a req while the flow is active
* 2. Stores the incoming events in a list, where:
* - Before EOSE, events are added to the end of the list
* - After EOSE, events are added to the beginning of the list
* 3. If the client disconnects and reconnects, the client
* will resend the req with the same filter, which will download
* the same events again, where:
* - They will be ignored if they are already in the list.
* - They will be added to the beginning of the list if they are new.
*/
fun INostrClient.reqResultsInOrderAsFlow(
relay: String,
filters: List<Filter>,
) = reqResultsInOrderAsFlow(RelayUrlNormalizer.normalize(relay), filters)
fun INostrClient.reqResultsInOrderAsFlow(
relay: String,
filter: Filter,
) = reqResultsInOrderAsFlow(RelayUrlNormalizer.normalize(relay), listOf(filter))
fun INostrClient.reqResultsInOrderAsFlow(
relay: NormalizedRelayUrl,
filter: Filter,
) = reqResultsInOrderAsFlow(relay, listOf(filter))
fun INostrClient.reqResultsInOrderAsFlow(
relay: NormalizedRelayUrl,
filters: List<Filter>,
): Flow<List<Event>> =
callbackFlow {
val subId = RandomInstance.randomChars(10)
var hasBeenLive = false
val eventIds = mutableSetOf<HexKey>()
val events = mutableListOf<Event>()
val listener =
object : IRequestListener {
override fun onEvent(
event: Event,
isLive: Boolean,
relay: NormalizedRelayUrl,
forFilters: List<Filter>?,
) {
if (event.id !in eventIds) {
if (hasBeenLive) {
events.add(0, event)
} else {
events.add(event)
}
eventIds.add(event.id)
trySend(events.toList())
}
}
override fun onEose(
relay: NormalizedRelayUrl,
forFilters: List<Filter>?,
) {
hasBeenLive = true
}
}
openReqSubscription(subId, mapOf(relay to filters), listener)
awaitClose {
close(subId)
}
}

View File

@@ -0,0 +1,121 @@
/**
* Copyright (c) 2025 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.quartz.nip01Core.relay
import com.vitorpamplona.quartz.nip01Core.core.Event
import com.vitorpamplona.quartz.nip01Core.metadata.MetadataEvent
import com.vitorpamplona.quartz.nip01Core.relay.client.NostrClient
import com.vitorpamplona.quartz.nip01Core.relay.client.reqs.reqResultsInOrderAsFlow
import com.vitorpamplona.quartz.nip01Core.relay.filters.Filter
import com.vitorpamplona.quartz.utils.Log
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import kotlin.test.Test
import kotlin.test.assertEquals
class NostrClientSubscriptionAsFlowTest : BaseNostrClientTest() {
fun List<Event>.printDates(): String {
val starting = this[0].createdAt
return joinToString { (it.createdAt - starting).toString() }
}
@Test
fun testNostrClientSubscriptionAsFlow() =
runTest {
val appScope = CoroutineScope(Dispatchers.Default + SupervisorJob())
val client = NostrClient(socketBuilder, appScope)
val flow =
client.reqResultsInOrderAsFlow(
relay = "wss://relay.damus.io",
filter =
Filter(
kinds = listOf(MetadataEvent.KIND),
limit = 10,
),
)
var feedStates = listOf<Event>()
val job =
launch {
flow.collect {
Log.d("ZZ", "List timestamp deltas ${it.printDates()}")
feedStates = it
}
}
// Advance the test dispatcher to ensure emissions are processed
while (feedStates.size < 10) {
advanceUntilIdle()
}
job.cancel() // Cancel the collection job
client.disconnect()
appScope.cancel()
assertEquals(10, feedStates.size)
}
@Test
fun testNostrClientSubscriptionAsFlowDebouncing() =
runTest {
val appScope = CoroutineScope(Dispatchers.Default + SupervisorJob())
val client = NostrClient(socketBuilder, appScope)
val flow =
client.reqResultsInOrderAsFlow(
relay = "wss://relay.damus.io",
filter =
Filter(
kinds = listOf(MetadataEvent.KIND),
limit = 10,
),
)
var feedStates = listOf<Event>()
val job =
launch {
flow.debounce(100).collect {
Log.d("ZZ", "List timestamp deltas ${it.printDates()}")
feedStates = it
}
}
// Advance the test dispatcher to ensure emissions are processed
while (feedStates.size < 10) {
advanceUntilIdle()
}
job.cancel() // Cancel the collection job
client.disconnect()
appScope.cancel()
assertEquals(10, feedStates.size)
}
}