From 95cc0783fd8a5f21f89d1bfc38de67a368c4bc30 Mon Sep 17 00:00:00 2001 From: Vitor Pamplona Date: Wed, 29 Oct 2025 18:30:00 -0400 Subject: [PATCH] Adds a req that returns the list of events in order of arrival --- .../reqs/NostrClientStaticReqAsStateFlow.kt | 102 +++++++++++++++ .../NostrClientSubscriptionAsFlowTest.kt | 121 ++++++++++++++++++ 2 files changed, 223 insertions(+) create mode 100644 quartz/src/commonMain/kotlin/com/vitorpamplona/quartz/nip01Core/relay/client/reqs/NostrClientStaticReqAsStateFlow.kt create mode 100644 quartz/src/jvmAndroidTest/kotlin/com/vitorpamplona/quartz/nip01Core/relay/NostrClientSubscriptionAsFlowTest.kt diff --git a/quartz/src/commonMain/kotlin/com/vitorpamplona/quartz/nip01Core/relay/client/reqs/NostrClientStaticReqAsStateFlow.kt b/quartz/src/commonMain/kotlin/com/vitorpamplona/quartz/nip01Core/relay/client/reqs/NostrClientStaticReqAsStateFlow.kt new file mode 100644 index 000000000..65897db25 --- /dev/null +++ b/quartz/src/commonMain/kotlin/com/vitorpamplona/quartz/nip01Core/relay/client/reqs/NostrClientStaticReqAsStateFlow.kt @@ -0,0 +1,102 @@ +/** + * Copyright (c) 2025 Vitor Pamplona + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the + * Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN + * AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +package com.vitorpamplona.quartz.nip01Core.relay.client.reqs + +import com.vitorpamplona.quartz.nip01Core.core.Event +import com.vitorpamplona.quartz.nip01Core.core.HexKey +import com.vitorpamplona.quartz.nip01Core.relay.client.INostrClient +import com.vitorpamplona.quartz.nip01Core.relay.filters.Filter +import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl +import com.vitorpamplona.quartz.nip01Core.relay.normalizer.RelayUrlNormalizer +import com.vitorpamplona.quartz.utils.RandomInstance +import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.callbackFlow + +/** + * 1. Subscribes to a req while the flow is active + * 2. Stores the incoming events in a list, where: + * - Before EOSE, events are added to the end of the list + * - After EOSE, events are added to the beginning of the list + * 3. If the client disconnects and reconnects, the client + * will resend the req with the same filter, which will download + * the same events again, where: + * - They will be ignored if they are already in the list. + * - They will be added to the beginning of the list if they are new. + */ +fun INostrClient.reqResultsInOrderAsFlow( + relay: String, + filters: List, +) = reqResultsInOrderAsFlow(RelayUrlNormalizer.normalize(relay), filters) + +fun INostrClient.reqResultsInOrderAsFlow( + relay: String, + filter: Filter, +) = reqResultsInOrderAsFlow(RelayUrlNormalizer.normalize(relay), listOf(filter)) + +fun INostrClient.reqResultsInOrderAsFlow( + relay: NormalizedRelayUrl, + filter: Filter, +) = reqResultsInOrderAsFlow(relay, listOf(filter)) + +fun INostrClient.reqResultsInOrderAsFlow( + relay: NormalizedRelayUrl, + filters: List, +): Flow> = + callbackFlow { + val subId = RandomInstance.randomChars(10) + var hasBeenLive = false + val eventIds = mutableSetOf() + val events = mutableListOf() + + val listener = + object : IRequestListener { + override fun onEvent( + event: Event, + isLive: Boolean, + relay: NormalizedRelayUrl, + forFilters: List?, + ) { + if (event.id !in eventIds) { + if (hasBeenLive) { + events.add(0, event) + } else { + events.add(event) + } + eventIds.add(event.id) + trySend(events.toList()) + } + } + + override fun onEose( + relay: NormalizedRelayUrl, + forFilters: List?, + ) { + hasBeenLive = true + } + } + + openReqSubscription(subId, mapOf(relay to filters), listener) + + awaitClose { + close(subId) + } + } diff --git a/quartz/src/jvmAndroidTest/kotlin/com/vitorpamplona/quartz/nip01Core/relay/NostrClientSubscriptionAsFlowTest.kt b/quartz/src/jvmAndroidTest/kotlin/com/vitorpamplona/quartz/nip01Core/relay/NostrClientSubscriptionAsFlowTest.kt new file mode 100644 index 000000000..958a09742 --- /dev/null +++ b/quartz/src/jvmAndroidTest/kotlin/com/vitorpamplona/quartz/nip01Core/relay/NostrClientSubscriptionAsFlowTest.kt @@ -0,0 +1,121 @@ +/** + * Copyright (c) 2025 Vitor Pamplona + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the + * Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN + * AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +package com.vitorpamplona.quartz.nip01Core.relay + +import com.vitorpamplona.quartz.nip01Core.core.Event +import com.vitorpamplona.quartz.nip01Core.metadata.MetadataEvent +import com.vitorpamplona.quartz.nip01Core.relay.client.NostrClient +import com.vitorpamplona.quartz.nip01Core.relay.client.reqs.reqResultsInOrderAsFlow +import com.vitorpamplona.quartz.nip01Core.relay.filters.Filter +import com.vitorpamplona.quartz.utils.Log +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.flow.debounce +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.advanceUntilIdle +import kotlinx.coroutines.test.runTest +import kotlin.test.Test +import kotlin.test.assertEquals + +class NostrClientSubscriptionAsFlowTest : BaseNostrClientTest() { + fun List.printDates(): String { + val starting = this[0].createdAt + return joinToString { (it.createdAt - starting).toString() } + } + + @Test + fun testNostrClientSubscriptionAsFlow() = + runTest { + val appScope = CoroutineScope(Dispatchers.Default + SupervisorJob()) + val client = NostrClient(socketBuilder, appScope) + + val flow = + client.reqResultsInOrderAsFlow( + relay = "wss://relay.damus.io", + filter = + Filter( + kinds = listOf(MetadataEvent.KIND), + limit = 10, + ), + ) + + var feedStates = listOf() + val job = + launch { + flow.collect { + Log.d("ZZ", "List timestamp deltas ${it.printDates()}") + feedStates = it + } + } + + // Advance the test dispatcher to ensure emissions are processed + while (feedStates.size < 10) { + advanceUntilIdle() + } + + job.cancel() // Cancel the collection job + + client.disconnect() + appScope.cancel() + + assertEquals(10, feedStates.size) + } + + @Test + fun testNostrClientSubscriptionAsFlowDebouncing() = + runTest { + val appScope = CoroutineScope(Dispatchers.Default + SupervisorJob()) + val client = NostrClient(socketBuilder, appScope) + + val flow = + client.reqResultsInOrderAsFlow( + relay = "wss://relay.damus.io", + filter = + Filter( + kinds = listOf(MetadataEvent.KIND), + limit = 10, + ), + ) + + var feedStates = listOf() + val job = + launch { + flow.debounce(100).collect { + Log.d("ZZ", "List timestamp deltas ${it.printDates()}") + feedStates = it + } + } + + // Advance the test dispatcher to ensure emissions are processed + while (feedStates.size < 10) { + advanceUntilIdle() + } + + job.cancel() // Cancel the collection job + + client.disconnect() + appScope.cancel() + + assertEquals(10, feedStates.size) + } +}