From cf4851660268996155ca25d41162a56395c815ba Mon Sep 17 00:00:00 2001 From: Vitor Pamplona Date: Mon, 7 Jul 2025 13:37:57 -0400 Subject: [PATCH] Makes sure Filters are not updated if the since is the only change --- .../vitorpamplona/amethyst/model/Account.kt | 6 +- .../service/notifications/RegisterAccounts.kt | 28 ++-- .../ComposeSubscriptionManager.kt | 10 -- .../ComposeSubscriptionManagerControls.kt | 4 - .../MutableComposeSubscriptionManager.kt | 10 -- .../eoseManagers/BaseEoseManager.kt | 34 ++-- .../eoseManagers/PerUniqueIdEoseManager.kt | 4 +- .../PerUserAndFollowListEoseManager.kt | 4 +- .../eoseManagers/PerUserEoseManager.kt | 5 +- .../eoseManagers/SingleSubEoseManager.kt | 4 +- .../SingleSubNoEoseCacheEoseManager.kt | 5 +- .../account/AccountFilterAssembler.kt | 6 +- .../metadata/AccountMetadataEoseManager.kt | 2 +- .../ChannelFinderFilterAssemblyGroup.kt | 4 - .../event/EventFinderFilterAssembler.kt | 4 - .../nwc/NWCPaymentFilterAssembler.kt | 4 - .../user/UserFinderFilterAssembler.kt | 4 - .../searchCommand/SearchFilterAssembler.kt | 4 - .../amethyst/ui/note/RelayCompose.kt | 3 +- .../ui/screen/loggedIn/AccountViewModel.kt | 16 -- .../datasource/ChatroomFilterAssembler.kt | 6 +- .../datasource/ChannelFilterAssembler.kt | 6 +- .../datasource/ChatroomListFilterAssembler.kt | 6 +- .../datasource/CommunityFilterAssembler.kt | 6 +- .../datasource/DiscoveryFilterAssembler.kt | 6 +- .../datasource/GeoHashFilterAssembler.kt | 6 +- .../datasource/HashtagFilterAssembler.kt | 6 +- .../home/datasource/HomeFilterAssembler.kt | 6 +- .../datasource/UserProfileFilterAssembler.kt | 4 - .../datasources/ThreadFilterAssembler.kt | 6 +- .../video/datasource/VideoFilterAssembler.kt | 6 +- .../relays/datasources/Subscription.kt | 60 ++----- .../datasources/SubscriptionController.kt | 153 ++++------------- .../SubscriptionControllerService.kt | 33 ---- .../relays/datasources/SubscriptionSet.kt | 47 ------ .../relays/datasources/SubscriptionStats.kt | 22 ++- .../nip01Core/relay/client/NostrClient.kt | 154 +++++++++++++++--- .../NostrClientSingleDownloadExt.kt | 7 +- .../relay/client/pool/PoolSubscription.kt | 29 ---- .../client/pool/PoolSubscriptionRepository.kt | 38 ++--- .../relay/client/pool/RelayBasedFilter.kt | 8 + .../nip01Core/relay/client/pool/RelayPool.kt | 40 ++++- .../client/single/basic/BasicRelayClient.kt | 2 +- .../vitorpamplona/quartz/utils/LargeCache.kt | 104 +++++++++++- 44 files changed, 397 insertions(+), 525 deletions(-) delete mode 100644 ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/SubscriptionControllerService.kt delete mode 100644 ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/SubscriptionSet.kt delete mode 100644 quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/pool/PoolSubscription.kt diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/model/Account.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/model/Account.kt index 1a2b26af0..ace9e8406 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/model/Account.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/model/Account.kt @@ -97,7 +97,6 @@ import com.vitorpamplona.quartz.nip01Core.crypto.KeyPair import com.vitorpamplona.quartz.nip01Core.hints.EventHintBundle import com.vitorpamplona.quartz.nip01Core.relay.client.NostrClient import com.vitorpamplona.quartz.nip01Core.relay.client.acessories.downloadFirstEvent -import com.vitorpamplona.quartz.nip01Core.relay.client.pool.RelayBasedFilter import com.vitorpamplona.quartz.nip01Core.relay.client.single.IRelayClient import com.vitorpamplona.quartz.nip01Core.relay.filters.Filter import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl @@ -953,9 +952,8 @@ class Account( it.host?.let { host -> client.downloadFirstEvent( filters = - note.relays.map { relay -> - RelayBasedFilter( - relay, + note.relays.associateWith { relay -> + listOf( Filter( ids = listOf(host.id), ), diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/notifications/RegisterAccounts.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/notifications/RegisterAccounts.kt index bac952a2c..9a43fb398 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/notifications/RegisterAccounts.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/notifications/RegisterAccounts.kt @@ -96,26 +96,26 @@ class RegisterAccounts( val readyToSend = accounts .mapNotNull { - Log.d(tag, "Register Account ${it.npub}") + if (it.hasPrivKey || it.loggedInWithExternalSigner) { + Log.d(tag, "Register Account ${it.npub}") - val acc = LocalPreferences.loadCurrentAccountFromEncryptedStorage(it.npub) - if (acc != null && acc.isWriteable()) { - val nip65Read = acc.backupNIP65RelayList?.readRelaysNorm() ?: emptyList() + val acc = LocalPreferences.loadCurrentAccountFromEncryptedStorage(it.npub) + if (acc != null && acc.isWriteable()) { + val nip65Read = acc.backupNIP65RelayList?.readRelaysNorm() ?: emptyList() - Log.d(tag, "Register Account ${it.npub} NIP65 Reads ${nip65Read.joinToString(", ") { it.url } }") + Log.d(tag, "Register Account ${it.npub} NIP65 Reads ${nip65Read.joinToString(", ") { it.url } }") - val nip17Read = acc.backupDMRelayList?.relays() ?: emptyList() + val nip17Read = acc.backupDMRelayList?.relays() ?: emptyList() - Log.d(tag, "Register Account ${it.npub} NIP17 Reads ${nip17Read.joinToString(", ") { it.url } }") + Log.d(tag, "Register Account ${it.npub} NIP17 Reads ${nip17Read.joinToString(", ") { it.url } }") - val readKind3Relays = acc.backupContactList?.relays()?.mapNotNull { if (it.value.read) it.key else null } ?: emptyList() + val relays = (nip65Read + nip17Read) - Log.d(tag, "Register Account ${it.npub} Kind3 Reads ${readKind3Relays.joinToString(", ") { it.url } }") - - val relays = (nip65Read + nip17Read + readKind3Relays) - - if (relays.isNotEmpty()) { - Pair(acc, relays) + if (relays.isNotEmpty()) { + Pair(acc, relays) + } else { + null + } } else { null } diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/composeSubscriptionManagers/ComposeSubscriptionManager.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/composeSubscriptionManagers/ComposeSubscriptionManager.kt index 236d469ee..4c9bb3f66 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/composeSubscriptionManagers/ComposeSubscriptionManager.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/composeSubscriptionManagers/ComposeSubscriptionManager.kt @@ -34,14 +34,8 @@ abstract class ComposeSubscriptionManager : ComposeSubscriptionManagerControl fun subscribe(query: T?) { if (query == null) return - val wasEmpty = composeSubscriptions.isEmpty() - composeSubscriptions.put(query, query) - if (wasEmpty) { - start() - } - invalidateKeys() } @@ -52,10 +46,6 @@ abstract class ComposeSubscriptionManager : ComposeSubscriptionManagerControl composeSubscriptions.remove(query) invalidateKeys() - - if (composeSubscriptions.isEmpty()) { - stop() - } } fun allKeys() = composeSubscriptions.keys diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/composeSubscriptionManagers/ComposeSubscriptionManagerControls.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/composeSubscriptionManagers/ComposeSubscriptionManagerControls.kt index 6a1d54fd1..479bde071 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/composeSubscriptionManagers/ComposeSubscriptionManagerControls.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/composeSubscriptionManagers/ComposeSubscriptionManagerControls.kt @@ -21,10 +21,6 @@ package com.vitorpamplona.amethyst.service.relayClient.composeSubscriptionManagers interface ComposeSubscriptionManagerControls { - fun start() - - fun stop() - fun invalidateKeys() fun invalidateFilters() diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/composeSubscriptionManagers/MutableComposeSubscriptionManager.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/composeSubscriptionManagers/MutableComposeSubscriptionManager.kt index a9520a230..9966f901d 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/composeSubscriptionManagers/MutableComposeSubscriptionManager.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/composeSubscriptionManagers/MutableComposeSubscriptionManager.kt @@ -45,8 +45,6 @@ abstract class MutableComposeSubscriptionManager( fun subscribe(query: T?) { if (query == null) return - val wasEmpty = composeSubscriptions.isEmpty() - composeSubscriptions[query]?.cancel() composeSubscriptions[query] = scope.launch { @@ -55,10 +53,6 @@ abstract class MutableComposeSubscriptionManager( } } - if (wasEmpty) { - start() - } - invalidateKeys() } @@ -70,10 +64,6 @@ abstract class MutableComposeSubscriptionManager( composeSubscriptions.remove(query) invalidateKeys() - - if (composeSubscriptions.isEmpty()) { - stop() - } } fun allKeys() = composeSubscriptions.keys diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/eoseManagers/BaseEoseManager.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/eoseManagers/BaseEoseManager.kt index fb5fce646..f90d643bd 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/eoseManagers/BaseEoseManager.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/eoseManagers/BaseEoseManager.kt @@ -22,49 +22,41 @@ package com.vitorpamplona.amethyst.service.relayClient.eoseManagers import android.util.Log import com.vitorpamplona.amethyst.isDebug +import com.vitorpamplona.ammolite.relays.BundledUpdate import com.vitorpamplona.ammolite.relays.datasources.SubscriptionController import com.vitorpamplona.quartz.nip01Core.relay.client.NostrClient +import kotlinx.coroutines.Dispatchers abstract class BaseEoseManager( val client: NostrClient, val allKeys: () -> Set, ) { - val orchestrator = - SubscriptionController(client) { - if (isDebug) Log.d("${this.javaClass.simpleName}", "Updating Subscriptions") - updateSubscriptions(allKeys()) - } + val orchestrator = SubscriptionController(client) abstract fun updateSubscriptions(keys: Set) fun printStats() = orchestrator.printStats(this.javaClass.simpleName) - fun invalidateFilters() = orchestrator.invalidateFilters() + // Refreshes observers in batches. + private val bundler = BundledUpdate(300, Dispatchers.Default) - fun start() { - orchestrator.start() - if (isDebug) { - Log.d("${this.javaClass.simpleName}", "Start") + fun invalidateFilters() { + bundler.invalidate { + forceInvalidate() } } - fun stop() { - orchestrator.stop() - if (isDebug) { - Log.d("${this.javaClass.simpleName}", "Stop") - } + fun forceInvalidate() { + updateSubscriptions(allKeys()) + + orchestrator.updateRelays() } fun destroy() { + bundler.cancel() orchestrator.destroy() if (isDebug) { Log.d("${this.javaClass.simpleName}", "Destroy, Unsubscribe") } } - - fun init() { - if (isDebug) { - Log.d("${this.javaClass.simpleName}", "Init, Subscribe") - } - } } diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/eoseManagers/PerUniqueIdEoseManager.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/eoseManagers/PerUniqueIdEoseManager.kt index 8cf1b6df9..2a9f0169b 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/eoseManagers/PerUniqueIdEoseManager.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/eoseManagers/PerUniqueIdEoseManager.kt @@ -25,6 +25,7 @@ import com.vitorpamplona.amethyst.service.relays.SincePerRelayMap import com.vitorpamplona.ammolite.relays.datasources.Subscription import com.vitorpamplona.quartz.nip01Core.relay.client.NostrClient import com.vitorpamplona.quartz.nip01Core.relay.client.pool.RelayBasedFilter +import com.vitorpamplona.quartz.nip01Core.relay.client.pool.groupByRelay import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl import kotlin.collections.distinctBy @@ -90,7 +91,8 @@ abstract class PerUniqueIdEoseManager( uniqueSubscribedAccounts.forEach { val mainKey = id(it) - findOrCreateSubFor(it).relayBasedFilters = updateFilter(it, since(it))?.ifEmpty { null } + val newFilters = updateFilter(it, since(it))?.ifEmpty { null } + findOrCreateSubFor(it).updateFilters(newFilters?.groupByRelay()) updated.add(mainKey) } diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/eoseManagers/PerUserAndFollowListEoseManager.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/eoseManagers/PerUserAndFollowListEoseManager.kt index 0c0da81ec..948060682 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/eoseManagers/PerUserAndFollowListEoseManager.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/eoseManagers/PerUserAndFollowListEoseManager.kt @@ -26,6 +26,7 @@ import com.vitorpamplona.amethyst.service.relays.SincePerRelayMap import com.vitorpamplona.ammolite.relays.datasources.Subscription import com.vitorpamplona.quartz.nip01Core.relay.client.NostrClient import com.vitorpamplona.quartz.nip01Core.relay.client.pool.RelayBasedFilter +import com.vitorpamplona.quartz.nip01Core.relay.client.pool.groupByRelay import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl /** @@ -89,7 +90,8 @@ abstract class PerUserAndFollowListEoseManager( uniqueSubscribedAccounts.forEach { val user = user(it) val sub = findOrCreateSubFor(it) - sub.relayBasedFilters = updateFilter(it, since(it))?.ifEmpty { null } + val newFilters = updateFilter(it, since(it))?.ifEmpty { null } + sub.updateFilters(newFilters?.groupByRelay()) updated.add(user) } diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/eoseManagers/PerUserEoseManager.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/eoseManagers/PerUserEoseManager.kt index 1854f57d1..404ab6143 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/eoseManagers/PerUserEoseManager.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/eoseManagers/PerUserEoseManager.kt @@ -26,6 +26,7 @@ import com.vitorpamplona.amethyst.service.relays.SincePerRelayMap import com.vitorpamplona.ammolite.relays.datasources.Subscription import com.vitorpamplona.quartz.nip01Core.relay.client.NostrClient import com.vitorpamplona.quartz.nip01Core.relay.client.pool.RelayBasedFilter +import com.vitorpamplona.quartz.nip01Core.relay.client.pool.groupByRelay import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl import kotlin.collections.distinctBy @@ -87,7 +88,9 @@ abstract class PerUserEoseManager( uniqueSubscribedAccounts.forEach { val user = user(it) - findOrCreateSubFor(it).relayBasedFilters = updateFilter(it, since(it))?.ifEmpty { null } + val newFilters = updateFilter(it, since(it))?.ifEmpty { null } + + findOrCreateSubFor(it).updateFilters(newFilters?.groupByRelay()) updated.add(user) } diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/eoseManagers/SingleSubEoseManager.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/eoseManagers/SingleSubEoseManager.kt index 992465e0c..c11aff657 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/eoseManagers/SingleSubEoseManager.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/eoseManagers/SingleSubEoseManager.kt @@ -24,6 +24,7 @@ import com.vitorpamplona.amethyst.service.relays.EOSERelayList import com.vitorpamplona.amethyst.service.relays.SincePerRelayMap import com.vitorpamplona.quartz.nip01Core.relay.client.NostrClient import com.vitorpamplona.quartz.nip01Core.relay.client.pool.RelayBasedFilter +import com.vitorpamplona.quartz.nip01Core.relay.client.pool.groupByRelay import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl import kotlin.collections.distinctBy @@ -64,8 +65,9 @@ abstract class SingleSubEoseManager( override fun updateSubscriptions(keys: Set) { val uniqueSubscribedAccounts = keys.distinctBy { distinct(it) } + val newFilters = updateFilter(uniqueSubscribedAccounts, since())?.ifEmpty { null } - sub.relayBasedFilters = updateFilter(uniqueSubscribedAccounts, since())?.ifEmpty { null } + sub.updateFilters(newFilters?.groupByRelay()) } abstract fun updateFilter( diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/eoseManagers/SingleSubNoEoseCacheEoseManager.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/eoseManagers/SingleSubNoEoseCacheEoseManager.kt index 86a72b96d..90d8d79fa 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/eoseManagers/SingleSubNoEoseCacheEoseManager.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/eoseManagers/SingleSubNoEoseCacheEoseManager.kt @@ -22,6 +22,7 @@ package com.vitorpamplona.amethyst.service.relayClient.eoseManagers import com.vitorpamplona.quartz.nip01Core.relay.client.NostrClient import com.vitorpamplona.quartz.nip01Core.relay.client.pool.RelayBasedFilter +import com.vitorpamplona.quartz.nip01Core.relay.client.pool.groupByRelay import kotlin.collections.distinctBy /** @@ -43,8 +44,8 @@ abstract class SingleSubNoEoseCacheEoseManager( override fun updateSubscriptions(keys: Set) { val uniqueSubscribedAccounts = keys.distinctBy { distinct(it) } - - sub.relayBasedFilters = updateFilter(uniqueSubscribedAccounts)?.ifEmpty { null } + val newFilters = updateFilter(uniqueSubscribedAccounts)?.ifEmpty { null } + sub.updateFilters(newFilters?.groupByRelay()) } abstract fun updateFilter(keys: List): List? diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/reqCommand/account/AccountFilterAssembler.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/reqCommand/account/AccountFilterAssembler.kt index cd4be9b40..68fea5dac 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/reqCommand/account/AccountFilterAssembler.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/reqCommand/account/AccountFilterAssembler.kt @@ -47,15 +47,11 @@ class AccountFilterAssembler( AccountNotificationsEoseManager(client, ::allKeys), ) - override fun start() = group.forEach { it.start() } - - override fun stop() = group.forEach { it.stop() } - override fun invalidateKeys() = invalidateFilters() override fun invalidateFilters() = group.forEach { it.invalidateFilters() } - override fun destroy() = group.forEach { it.start() } + override fun destroy() = group.forEach { it.destroy() } override fun printStats() = group.forEach { it.printStats() } } diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/reqCommand/account/metadata/AccountMetadataEoseManager.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/reqCommand/account/metadata/AccountMetadataEoseManager.kt index 61f9a7b7b..044a23d75 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/reqCommand/account/metadata/AccountMetadataEoseManager.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/reqCommand/account/metadata/AccountMetadataEoseManager.kt @@ -38,7 +38,7 @@ class AccountMetadataEoseManager( client: NostrClient, allKeys: () -> Set, ) : PerUserEoseManager(client, allKeys) { - override fun user(query: AccountQueryState) = query.account.userProfile() + override fun user(key: AccountQueryState) = key.account.userProfile() fun relayFlow(query: AccountQueryState) = query.account.outboxRelays.flow diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/reqCommand/channel/ChannelFinderFilterAssemblyGroup.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/reqCommand/channel/ChannelFinderFilterAssemblyGroup.kt index cc794358f..797d2ccce 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/reqCommand/channel/ChannelFinderFilterAssemblyGroup.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/reqCommand/channel/ChannelFinderFilterAssemblyGroup.kt @@ -44,10 +44,6 @@ class ChannelFinderFilterAssemblyGroup( ChannelMetadataAndLiveActivityWatcherSubAssembler(client, ::allKeys), ) - override fun start() = group.forEach { it.start() } - - override fun stop() = group.forEach { it.stop() } - override fun invalidateFilters() = group.forEach { it.invalidateFilters() } override fun invalidateKeys() = invalidateFilters() diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/reqCommand/event/EventFinderFilterAssembler.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/reqCommand/event/EventFinderFilterAssembler.kt index 0ba866dcf..beaf7d251 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/reqCommand/event/EventFinderFilterAssembler.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/reqCommand/event/EventFinderFilterAssembler.kt @@ -40,10 +40,6 @@ class EventFinderFilterAssembler( EventWatcherSubAssembler(client, ::allKeys), ) - override fun start() = group.forEach { it.start() } - - override fun stop() = group.forEach { it.stop() } - override fun invalidateFilters() = group.forEach { it.invalidateFilters() } override fun invalidateKeys() = invalidateFilters() diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/reqCommand/nwc/NWCPaymentFilterAssembler.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/reqCommand/nwc/NWCPaymentFilterAssembler.kt index 4e4d91572..71ca89d70 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/reqCommand/nwc/NWCPaymentFilterAssembler.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/reqCommand/nwc/NWCPaymentFilterAssembler.kt @@ -41,10 +41,6 @@ class NWCPaymentFilterAssembler( NWCPaymentWatcherSubAssembler(client, ::allKeys), ) - override fun start() = group.forEach { it.start() } - - override fun stop() = group.forEach { it.stop() } - override fun invalidateFilters() = group.forEach { it.invalidateFilters() } override fun invalidateKeys() = invalidateFilters() diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/reqCommand/user/UserFinderFilterAssembler.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/reqCommand/user/UserFinderFilterAssembler.kt index b7e6b8a52..c736b6e45 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/reqCommand/user/UserFinderFilterAssembler.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/reqCommand/user/UserFinderFilterAssembler.kt @@ -42,10 +42,6 @@ class UserFinderFilterAssembler( UserWatcherSubAssembler(client, ::allKeys), ) - override fun start() = group.forEach { it.start() } - - override fun stop() = group.forEach { it.stop() } - override fun invalidateFilters() = group.forEach { it.invalidateFilters() } override fun invalidateKeys() = invalidateFilters() diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/searchCommand/SearchFilterAssembler.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/searchCommand/SearchFilterAssembler.kt index caa17059f..754332de8 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/searchCommand/SearchFilterAssembler.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/relayClient/searchCommand/SearchFilterAssembler.kt @@ -49,10 +49,6 @@ class SearchFilterAssembler( SearchWatcherSubAssembler(cache, client, ::allKeys), ) - override fun start() = group.forEach { it.start() } - - override fun stop() = group.forEach { it.stop() } - override fun invalidateFilters() = group.forEach { it.invalidateFilters() } override fun invalidateKeys() = invalidateFilters() diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/note/RelayCompose.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/note/RelayCompose.kt index 4fa5b6d89..45ff9323d 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/note/RelayCompose.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/note/RelayCompose.kt @@ -109,7 +109,8 @@ private fun RelayOptions( onAddRelay: () -> Unit, onRemoveRelay: () -> Unit, ) { - val userState by accountViewModel.normalizedKind3RelaySetFlow.collectAsStateWithLifecycle() + val userState by accountViewModel.account.trustedRelays.flow + .collectAsStateWithLifecycle() if (!userState.contains(relay.url)) { AddRelayButton(onAddRelay) diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/AccountViewModel.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/AccountViewModel.kt index 6056e32b1..e45801ec9 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/AccountViewModel.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/AccountViewModel.kt @@ -161,22 +161,6 @@ class AccountViewModel( var firstRoute: Route? = null - val normalizedKind3RelaySetFlow: StateFlow> = - account - .userProfile() - .flow() - .relays.stateFlow - .map { contactListState -> - contactListState.user.latestContactList - ?.relays() - ?.keys ?: emptySet() - }.flowOn(Dispatchers.Default) - .stateIn( - viewModelScope, - SharingStarted.WhileSubscribed(10000, 10000), - emptySet(), - ) - val toastManager = ToastManager() val feedStates = AccountFeedContentStates(this) diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/chats/privateDM/datasource/ChatroomFilterAssembler.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/chats/privateDM/datasource/ChatroomFilterAssembler.kt index 2ba8587ca..740d4cddc 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/chats/privateDM/datasource/ChatroomFilterAssembler.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/chats/privateDM/datasource/ChatroomFilterAssembler.kt @@ -41,15 +41,11 @@ class ChatroomFilterAssembler( ChatroomFilterSubAssembler(client, ::allKeys), ) - override fun start() = group.forEach { it.start() } - - override fun stop() = group.forEach { it.stop() } - override fun invalidateKeys() = invalidateFilters() override fun invalidateFilters() = group.forEach { it.invalidateFilters() } - override fun destroy() = group.forEach { it.start() } + override fun destroy() = group.forEach { it.destroy() } override fun printStats() = group.forEach { it.printStats() } } diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/chats/publicChannels/datasource/ChannelFilterAssembler.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/chats/publicChannels/datasource/ChannelFilterAssembler.kt index b0faabfc6..bc579d454 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/chats/publicChannels/datasource/ChannelFilterAssembler.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/chats/publicChannels/datasource/ChannelFilterAssembler.kt @@ -44,15 +44,11 @@ class ChannelFilterAssembler( ChannelFromUserFilterSubAssembler(client, ::allKeys), ) - override fun start() = group.forEach { it.start() } - - override fun stop() = group.forEach { it.stop() } - override fun invalidateKeys() = invalidateFilters() override fun invalidateFilters() = group.forEach { it.invalidateFilters() } - override fun destroy() = group.forEach { it.start() } + override fun destroy() = group.forEach { it.destroy() } override fun printStats() = group.forEach { it.printStats() } } diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/chats/rooms/datasource/ChatroomListFilterAssembler.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/chats/rooms/datasource/ChatroomListFilterAssembler.kt index 4649f1c2b..fa72099c3 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/chats/rooms/datasource/ChatroomListFilterAssembler.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/chats/rooms/datasource/ChatroomListFilterAssembler.kt @@ -39,15 +39,11 @@ class ChatroomListFilterAssembler( FollowingEphemeralChatSubAssembler(client, ::allKeys), ) - override fun start() = group.forEach { it.start() } - - override fun stop() = group.forEach { it.stop() } - override fun invalidateKeys() = invalidateFilters() override fun invalidateFilters() = group.forEach { it.invalidateFilters() } - override fun destroy() = group.forEach { it.start() } + override fun destroy() = group.forEach { it.destroy() } override fun printStats() = group.forEach { it.printStats() } } diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/communities/datasource/CommunityFilterAssembler.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/communities/datasource/CommunityFilterAssembler.kt index bc9087f18..e69a2dad9 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/communities/datasource/CommunityFilterAssembler.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/communities/datasource/CommunityFilterAssembler.kt @@ -37,15 +37,11 @@ class CommunityFilterAssembler( CommunityFeedFilterSubAssembler(client, ::allKeys), ) - override fun start() = group.forEach { it.start() } - - override fun stop() = group.forEach { it.stop() } - override fun invalidateKeys() = invalidateFilters() override fun invalidateFilters() = group.forEach { it.invalidateFilters() } - override fun destroy() = group.forEach { it.start() } + override fun destroy() = group.forEach { it.destroy() } override fun printStats() = group.forEach { it.printStats() } } diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/discover/datasource/DiscoveryFilterAssembler.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/discover/datasource/DiscoveryFilterAssembler.kt index e820995bc..b3a9c8f08 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/discover/datasource/DiscoveryFilterAssembler.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/discover/datasource/DiscoveryFilterAssembler.kt @@ -41,15 +41,11 @@ class DiscoveryFilterAssembler( DiscoveryFollowsDiscoverySubAssembler3(client, ::allKeys), ) - override fun start() = group.forEach { it.start() } - - override fun stop() = group.forEach { it.stop() } - override fun invalidateKeys() = invalidateFilters() override fun invalidateFilters() = group.forEach { it.invalidateFilters() } - override fun destroy() = group.forEach { it.start() } + override fun destroy() = group.forEach { it.destroy() } override fun printStats() = group.forEach { it.printStats() } } diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/geohash/datasource/GeoHashFilterAssembler.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/geohash/datasource/GeoHashFilterAssembler.kt index 088fbe0da..a1d84cf27 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/geohash/datasource/GeoHashFilterAssembler.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/geohash/datasource/GeoHashFilterAssembler.kt @@ -45,15 +45,11 @@ class GeoHashFilterAssembler( GeoHashFeedFilterSubAssembler(client, ::allKeys), ) - override fun start() = group.forEach { it.start() } - - override fun stop() = group.forEach { it.stop() } - override fun invalidateKeys() = invalidateFilters() override fun invalidateFilters() = group.forEach { it.invalidateFilters() } - override fun destroy() = group.forEach { it.start() } + override fun destroy() = group.forEach { it.destroy() } override fun printStats() = group.forEach { it.printStats() } } diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/hashtag/datasource/HashtagFilterAssembler.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/hashtag/datasource/HashtagFilterAssembler.kt index f21cce322..ada311d14 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/hashtag/datasource/HashtagFilterAssembler.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/hashtag/datasource/HashtagFilterAssembler.kt @@ -40,15 +40,11 @@ class HashtagFilterAssembler( HashtagFeedFilterSubAssembler(client, ::allKeys), ) - override fun start() = group.forEach { it.start() } - - override fun stop() = group.forEach { it.stop() } - override fun invalidateKeys() = invalidateFilters() override fun invalidateFilters() = group.forEach { it.invalidateFilters() } - override fun destroy() = group.forEach { it.start() } + override fun destroy() = group.forEach { it.destroy() } override fun printStats() = group.forEach { it.printStats() } } diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/home/datasource/HomeFilterAssembler.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/home/datasource/HomeFilterAssembler.kt index 06638abdc..43d1cd770 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/home/datasource/HomeFilterAssembler.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/home/datasource/HomeFilterAssembler.kt @@ -46,15 +46,11 @@ class HomeFilterAssembler( // MixGeohashHashtagsCommunityEoseManager(client, ::allKeys), ) - override fun start() = group.forEach { it.start() } - - override fun stop() = group.forEach { it.stop() } - override fun invalidateKeys() = invalidateFilters() override fun invalidateFilters() = group.forEach { it.invalidateFilters() } - override fun destroy() = group.forEach { it.start() } + override fun destroy() = group.forEach { it.destroy() } override fun printStats() = group.forEach { it.printStats() } } diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/profile/datasource/UserProfileFilterAssembler.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/profile/datasource/UserProfileFilterAssembler.kt index be10dce70..f2a5a1d03 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/profile/datasource/UserProfileFilterAssembler.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/profile/datasource/UserProfileFilterAssembler.kt @@ -42,10 +42,6 @@ class UserProfileFilterAssembler( UserProfileZapsFilterSubAssembler(client, ::allKeys), ) - override fun start() = group.forEach { it.start() } - - override fun stop() = group.forEach { it.stop() } - override fun invalidateFilters() = group.forEach { it.invalidateFilters() } override fun invalidateKeys() = invalidateFilters() diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/threadview/datasources/ThreadFilterAssembler.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/threadview/datasources/ThreadFilterAssembler.kt index 3e9152ff8..59659205f 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/threadview/datasources/ThreadFilterAssembler.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/threadview/datasources/ThreadFilterAssembler.kt @@ -40,15 +40,11 @@ class ThreadFilterAssembler( ThreadEventLoaderSubAssembler(client, ::allKeys), ) - override fun start() = group.forEach { it.start() } - - override fun stop() = group.forEach { it.stop() } - override fun invalidateKeys() = invalidateFilters() override fun invalidateFilters() = group.forEach { it.invalidateFilters() } - override fun destroy() = group.forEach { it.start() } + override fun destroy() = group.forEach { it.destroy() } override fun printStats() = group.forEach { it.printStats() } } diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/video/datasource/VideoFilterAssembler.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/video/datasource/VideoFilterAssembler.kt index d1f95d119..99a3d8f21 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/video/datasource/VideoFilterAssembler.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/video/datasource/VideoFilterAssembler.kt @@ -40,15 +40,11 @@ class VideoFilterAssembler( VideoOutboxEventsFilterSubAssembler(client, ::allKeys), ) - override fun start() = group.forEach { it.start() } - - override fun stop() = group.forEach { it.stop() } - override fun invalidateKeys() = invalidateFilters() override fun invalidateFilters() = group.forEach { it.invalidateFilters() } - override fun destroy() = group.forEach { it.start() } + override fun destroy() = group.forEach { it.destroy() } override fun printStats() = group.forEach { it.printStats() } } diff --git a/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/Subscription.kt b/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/Subscription.kt index a5595470f..364613cdb 100644 --- a/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/Subscription.kt +++ b/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/Subscription.kt @@ -20,70 +20,34 @@ */ package com.vitorpamplona.ammolite.relays.datasources -import com.vitorpamplona.quartz.nip01Core.relay.client.pool.RelayBasedFilter import com.vitorpamplona.quartz.nip01Core.relay.client.single.newSubId import com.vitorpamplona.quartz.nip01Core.relay.filters.Filter import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl -import kotlin.collections.forEachIndexed +import kotlin.contracts.ExperimentalContracts data class Subscription( val id: String = newSubId(), val onEose: ((time: Long, relayUrl: NormalizedRelayUrl) -> Unit)? = null, ) { - var relayBasedFilters: List? = null // Inactive when null + private var filters: Map>? = null // Inactive when null fun reset() { - relayBasedFilters = null + filters = null } + fun updateFilters(newFilters: Map>?) { + filters = newFilters + } + + fun filters() = filters + + @OptIn(ExperimentalContracts::class) + fun isActive() = filters != null + fun callEose( time: Long, relay: NormalizedRelayUrl, ) { onEose?.let { it(time, relay) } } - - fun hasChangedFiltersFrom(otherFilters: List?): Boolean { - if (relayBasedFilters == null && otherFilters == null) return false - if (relayBasedFilters?.size != otherFilters?.size) return true - - relayBasedFilters?.forEachIndexed { index, relaySetFilter -> - val otherFilter = otherFilters?.getOrNull(index) ?: return true - - if (relaySetFilter.relay != otherFilter.relay) return true - - return isDifferent(relaySetFilter.filter, otherFilter.filter) - } - return false - } - - fun isDifferent( - filter1: Filter, - filter2: Filter, - ): Boolean { - // Does not check SINCE on purpose. Avoids replacing the filter if SINCE was all that changed. - // fast check - if (filter1.authors?.size != filter2.authors?.size || - filter1.ids?.size != filter2.ids?.size || - filter1.tags?.size != filter2.tags?.size || - filter1.kinds?.size != filter2.kinds?.size || - filter1.limit != filter2.limit || - filter1.search?.length != filter2.search?.length || - filter1.until != filter2.until - ) { - return true - } - - // deep check - if (filter1.ids != filter2.ids || - filter1.authors != filter2.authors || - filter1.tags != filter2.tags || - filter1.kinds != filter2.kinds || - filter1.search != filter2.search - ) { - return true - } - - return false - } } diff --git a/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/SubscriptionController.kt b/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/SubscriptionController.kt index 5dd295a5d..d1767f3fa 100644 --- a/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/SubscriptionController.kt +++ b/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/SubscriptionController.kt @@ -20,16 +20,13 @@ */ package com.vitorpamplona.ammolite.relays.datasources -import com.vitorpamplona.ammolite.relays.BundledUpdate import com.vitorpamplona.quartz.nip01Core.core.Event import com.vitorpamplona.quartz.nip01Core.relay.client.NostrClient import com.vitorpamplona.quartz.nip01Core.relay.client.listeners.IRelayClientListener -import com.vitorpamplona.quartz.nip01Core.relay.client.pool.RelayBasedFilter import com.vitorpamplona.quartz.nip01Core.relay.client.single.IRelayClient +import com.vitorpamplona.quartz.nip01Core.relay.filters.Filter import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl -import kotlinx.coroutines.DelicateCoroutinesApi -import kotlinx.coroutines.Dispatchers -import java.util.concurrent.atomic.AtomicBoolean +import com.vitorpamplona.quartz.utils.LargeCache /** * Semantically groups Nostr filters and subscriptions in data source objects that @@ -37,13 +34,9 @@ import java.util.concurrent.atomic.AtomicBoolean */ class SubscriptionController( val client: NostrClient, - val updateSubscriptions: () -> Unit, -) : SubscriptionControllerService { - private val subscriptions = SubscriptionSet() - private var active: Boolean = false - private val changingFilters = AtomicBoolean() - - val stats = SubscriptionStats() +) { + private val subscriptions = LargeCache() + private val stats = SubscriptionStats() private val clientListener = object : IRelayClientListener { @@ -54,10 +47,10 @@ class SubscriptionController( arrivalTime: Long, afterEOSE: Boolean, ) { - if (subscriptions.contains(subId)) { + if (subscriptions.containsKey(subId)) { stats.add(subId, event.kind) if (afterEOSE) { - runAfterEOSE(subId, relay, arrivalTime) + subscriptions.get(subId)?.callEose(arrivalTime, relay.url) } } } @@ -67,145 +60,63 @@ class SubscriptionController( subId: String, arrivalTime: Long, ) { - if (subscriptions.contains(subId)) { - runAfterEOSE(subId, relay, arrivalTime) + if (subscriptions.containsKey(subId)) { + subscriptions.get(subId)?.callEose(arrivalTime, relay.url) } } } - private fun runAfterEOSE( - subscriptionId: String, - relay: IRelayClient, - arrivalTime: Long, - ) { - subscriptions[subscriptionId]?.callEose(arrivalTime, relay.url) - } - init { client.subscribe(clientListener) } - override fun destroy() { - // makes sure to run - stop() + fun destroy() { client.unsubscribe(clientListener) - bundler.cancel() } - override fun start() { - active = true - invalidateFilters() - } - - @OptIn(DelicateCoroutinesApi::class) - override fun stop() { - active = false - - subscriptions.forEach { subscription -> - client.close(subscription.id) - subscription.reset() - } - } - - override fun printStats(tag: String) = stats.printCounter(tag) + fun printStats(tag: String) = stats.printCounter(tag) fun getSub(subId: String) = subscriptions.get(subId) - fun requestNewSubscription(onEOSE: ((Long, NormalizedRelayUrl) -> Unit)? = null): Subscription = subscriptions.newSub(onEOSE) + fun requestNewSubscription(onEOSE: ((Long, NormalizedRelayUrl) -> Unit)? = null): Subscription = Subscription(onEose = onEOSE).also { subscriptions.put(it.id, it) } fun dismissSubscription(subId: String) = getSub(subId)?.let { dismissSubscription(it) } fun dismissSubscription(subscription: Subscription) { client.close(subscription.id) subscription.reset() - subscriptions.remove(subscription) + subscriptions.remove(subscription.id) } - fun isUpdatingFilters() = changingFilters.get() - - // Refreshes observers in batches. - private val bundler = BundledUpdate(300, Dispatchers.Default) - - override fun invalidateFilters() { - bundler.invalidate { - // println("DataSource: ${this.javaClass.simpleName} InvalidateFilters") - - // adds the time to perform the refresh into this delay - // holding off new updates in case of heavy refresh routines. - resetFiltersSuspend() - } - } - - private fun resetFiltersSuspend() { - // only runs one at a time. Ignores the others - if (changingFilters.compareAndSet(false, true)) { - try { - resetFiltersSuspendInner() - } finally { - changingFilters.getAndSet(false) + fun updateRelays() { + val currentFilters = + subscriptions.associateWith { id, sub -> + client.getSubscriptionFiltersOrNull(id) } - } - } - private fun resetFiltersSuspendInner() { - // saves the channels that are currently active - val activeSubscriptions = subscriptions.actives() - // saves the current content to only update if it changes - val currentFilters = activeSubscriptions.associate { it.id to client.getSubscriptionFiltersOrNull(it.id) } - - // updates all filters - updateSubscriptions() - - // Makes sure to only send an updated filter when it actually changes. - subscriptions.forEach { newSubscriptionFilters -> - val currentFilters = currentFilters[newSubscriptionFilters.id] - updateRelaysIfNeeded(newSubscriptionFilters, currentFilters) + subscriptions.forEach { id, sub -> + updateRelaysIfNeeded(id, sub.filters(), currentFilters[id]) } } fun updateRelaysIfNeeded( - updatedSubscription: Subscription, - currentFilters: List?, + subId: String, + updatedFilters: Map>?, + currentFilters: Map>?, ) { - val updatedSubscriptionNewFilters = updatedSubscription.relayBasedFilters - - val isActive = client.isActive(updatedSubscription.id) - - if (!isActive && updatedSubscriptionNewFilters != null) { - // Filter was removed from the active list - // but it is supposed to be there. Send again. - if (active) { - client.sendFilter(updatedSubscription.id, updatedSubscriptionNewFilters) + if (currentFilters != null) { + if (updatedFilters == null) { + // was active and is not active anymore, just close. + client.close(subId) + } else { + client.sendRequest(subId, updatedFilters) } } else { - if (currentFilters != null) { - if (updatedSubscriptionNewFilters == null) { - // was active and is not active anymore, just close. - client.close(updatedSubscription.id) - } else { - // was active and is still active, check if it has changed. - if (updatedSubscription.hasChangedFiltersFrom(currentFilters)) { - client.close(updatedSubscription.id) - if (active) { - client.sendFilter(updatedSubscription.id, updatedSubscriptionNewFilters) - } - } else { - // hasn't changed, does nothing. - // unless the relay has disconnected, then reconnect. - if (active) { - client.sendFilterOnlyIfDisconnected(updatedSubscription.id, updatedSubscriptionNewFilters) - } - } - } + if (updatedFilters == null) { + // was not active and is still not active, does nothing } else { - if (updatedSubscriptionNewFilters == null) { - // was not active and is still not active, does nothing - } else { - // was not active and becomes active, sends the filter. - if (active) { - client.sendFilter(updatedSubscription.id, updatedSubscriptionNewFilters) - } - } + // was not active and becomes active, sends the entire filter. + client.sendRequest(subId, updatedFilters) } } } diff --git a/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/SubscriptionControllerService.kt b/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/SubscriptionControllerService.kt deleted file mode 100644 index 0f422a732..000000000 --- a/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/SubscriptionControllerService.kt +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Copyright (c) 2025 Vitor Pamplona - * - * Permission is hereby granted, free of charge, to any person obtaining a copy of - * this software and associated documentation files (the "Software"), to deal in - * the Software without restriction, including without limitation the rights to use, - * copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the - * Software, and to permit persons to whom the Software is furnished to do so, - * subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS - * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR - * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN - * AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION - * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - */ -package com.vitorpamplona.ammolite.relays.datasources - -interface SubscriptionControllerService { - fun start() - - fun stop() - - fun invalidateFilters() - - fun destroy() - - fun printStats(tag: String) -} diff --git a/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/SubscriptionSet.kt b/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/SubscriptionSet.kt deleted file mode 100644 index c40e3a42a..000000000 --- a/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/SubscriptionSet.kt +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Copyright (c) 2025 Vitor Pamplona - * - * Permission is hereby granted, free of charge, to any person obtaining a copy of - * this software and associated documentation files (the "Software"), to deal in - * the Software without restriction, including without limitation the rights to use, - * copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the - * Software, and to permit persons to whom the Software is furnished to do so, - * subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS - * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR - * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN - * AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION - * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - */ -package com.vitorpamplona.ammolite.relays.datasources - -import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl - -class SubscriptionSet { - private var subscriptions = mapOf() - - fun contains(subId: String) = subscriptions.containsKey(subId) - - fun add(sub: Subscription) { - subscriptions = subscriptions + Pair(sub.id, sub) - } - - fun remove(subId: String) { - subscriptions = subscriptions.minus(subId) - } - - fun remove(sub: Subscription) = remove(sub.id) - - fun newSub(onEOSE: ((Long, NormalizedRelayUrl) -> Unit)? = null): Subscription = Subscription(onEose = onEOSE).also { add(it) } - - fun forEach(action: (Subscription) -> Unit) = subscriptions.values.forEach(action) - - operator fun get(subId: String) = subscriptions[subId] - - fun actives() = subscriptions.values.filter { it.relayBasedFilters != null } -} diff --git a/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/SubscriptionStats.kt b/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/SubscriptionStats.kt index 554a30cf1..03e333050 100644 --- a/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/SubscriptionStats.kt +++ b/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/datasources/SubscriptionStats.kt @@ -21,17 +21,19 @@ package com.vitorpamplona.ammolite.relays.datasources import android.util.Log +import com.vitorpamplona.quartz.utils.LargeCache class SubscriptionStats { data class Counter( val subscriptionId: String, val eventKind: Int, - var counter: Int, - ) + ) { + var counter: Int = 0 + } - private var eventCounter = mapOf() + private var eventCounter = LargeCache() - fun eventCounterIndex( + private fun eventCounterIndex( str1: String, str2: Int, ): Int = 31 * str1.hashCode() + str2.hashCode() @@ -41,19 +43,15 @@ class SubscriptionStats { eventKind: Int, ) { val key = eventCounterIndex(subscriptionId, eventKind) - val keyValue = eventCounter[key] - if (keyValue != null) { - keyValue.counter++ - } else { - eventCounter = eventCounter + Pair(key, Counter(subscriptionId, eventKind, 1)) - } + val stats = eventCounter.getOrCreate(key) { Counter(subscriptionId, eventKind) } + stats.counter++ } fun printCounter(tag: String) { - eventCounter.forEach { + eventCounter.forEach { _, stats -> Log.d( tag, - "Received Events ${it.value.subscriptionId} ${it.value.eventKind}: ${it.value.counter}", + "Received Events ${stats.subscriptionId} ${stats.eventKind}: ${stats.counter}", ) } } diff --git a/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/NostrClient.kt b/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/NostrClient.kt index 56dcf7955..ea512b6ca 100644 --- a/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/NostrClient.kt +++ b/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/NostrClient.kt @@ -25,12 +25,12 @@ import com.vitorpamplona.quartz.nip01Core.relay.client.listeners.IRelayClientLis import com.vitorpamplona.quartz.nip01Core.relay.client.listeners.RelayState import com.vitorpamplona.quartz.nip01Core.relay.client.pool.PoolEventOutboxRepository import com.vitorpamplona.quartz.nip01Core.relay.client.pool.PoolSubscriptionRepository -import com.vitorpamplona.quartz.nip01Core.relay.client.pool.RelayBasedFilter import com.vitorpamplona.quartz.nip01Core.relay.client.pool.RelayPool import com.vitorpamplona.quartz.nip01Core.relay.client.single.IRelayClient import com.vitorpamplona.quartz.nip01Core.relay.client.single.basic.BasicRelayClient import com.vitorpamplona.quartz.nip01Core.relay.client.single.newSubId import com.vitorpamplona.quartz.nip01Core.relay.client.stats.RelayStats +import com.vitorpamplona.quartz.nip01Core.relay.filters.Filter import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl import com.vitorpamplona.quartz.nip01Core.relay.sockets.WebsocketBuilder import kotlinx.coroutines.CoroutineScope @@ -44,14 +44,15 @@ import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.flow.stateIn /** - * The Nostr Client manages a relay pool, keeps active subscriptions and manages sending of events. + * The Nostr Client manages a relay pool, keeps active subscriptions and manages re-sending of events. */ class NostrClient( private val websocketBuilder: WebsocketBuilder, private val scope: CoroutineScope, ) : IRelayClientListener { private val relayPool: RelayPool = RelayPool(this, ::buildRelay) - private val activeSubscriptions: PoolSubscriptionRepository = PoolSubscriptionRepository() + private val activeRequests: PoolSubscriptionRepository = PoolSubscriptionRepository() + private val activeCounts: PoolSubscriptionRepository = PoolSubscriptionRepository() private val eventOutbox: PoolEventOutboxRepository = PoolEventOutboxRepository() private var listeners = setOf() @@ -62,19 +63,20 @@ class NostrClient( */ private val allRelays = combine( - activeSubscriptions.relays, + activeRequests.relays, + activeCounts.relays, eventOutbox.relays, - ) { subs, outbox -> - subs + outbox + ) { reqs, counts, outbox -> + reqs + counts + outbox }.onStart { - activeSubscriptions.relays.value + eventOutbox.relays.value + activeRequests.relays.value + activeCounts.relays.value + eventOutbox.relays.value }.onEach { relayPool.updatePool(it) }.flowOn(Dispatchers.Default) .stateIn( scope, SharingStarted.Companion.Eagerly, - activeSubscriptions.relays.value + eventOutbox.relays.value, + activeRequests.relays.value + activeCounts.relays.value + eventOutbox.relays.value, ) fun buildRelay(relay: NormalizedRelayUrl): IRelayClient = @@ -84,7 +86,8 @@ class NostrClient( listener = relayPool, stats = RelayStats.get(relay), ) { liveRelay -> - activeSubscriptions.forEachSub(relay, liveRelay::sendRequest) + activeRequests.forEachSub(relay, liveRelay::sendRequest) + activeCounts.forEachSub(relay, liveRelay::sendCount) eventOutbox.forEachUnsentEvent(relay, liveRelay::send) } @@ -109,20 +112,124 @@ class NostrClient( } } - fun sendFilter( - subscriptionId: String = newSubId(), - filters: List = listOf(), - ) { - activeSubscriptions.addOrUpdate(subscriptionId, filters) - relayPool.sendRequest(subscriptionId, filters) + fun needsToResendRequest( + oldFilters: List, + newFilters: List, + ): Boolean { + if (oldFilters.size != newFilters.size) return true + + oldFilters.forEachIndexed { index, oldFilter -> + val newFilter = newFilters.getOrNull(index) ?: return true + + return needsToResendRequest(oldFilter, newFilter) + } + return false } - fun sendFilterOnlyIfDisconnected( - subscriptionId: String = newSubId(), - filters: List = listOf(), + /** + * Checks if the filter has changed, with a special case for when the since changes due to new + * EOSE times. + */ + fun needsToResendRequest( + oldFilter: Filter, + newFilter: Filter, + ): Boolean { + // Does not check SINCE on purpose. Avoids replacing the filter if SINCE was all that changed. + // fast check + if (oldFilter.authors?.size != newFilter.authors?.size || + oldFilter.ids?.size != newFilter.ids?.size || + oldFilter.tags?.size != newFilter.tags?.size || + oldFilter.kinds?.size != newFilter.kinds?.size || + oldFilter.limit != newFilter.limit || + oldFilter.search?.length != newFilter.search?.length || + oldFilter.until != newFilter.until + ) { + return true + } + + // deep check + if (oldFilter.ids != newFilter.ids || + oldFilter.authors != newFilter.authors || + oldFilter.tags != newFilter.tags || + oldFilter.kinds != newFilter.kinds || + oldFilter.search != newFilter.search + ) { + return true + } + + if (oldFilter.since != null) { + if (newFilter.since == null) { + // went was checking the future only and now wants everything + return true + } else if (oldFilter.since > newFilter.since) { + // went backwards in time, forces update + return true + } + } + + return false + } + + fun sendRequest( + subId: String = newSubId(), + filters: Map>, ) { - activeSubscriptions.addOrUpdate(subscriptionId, filters) - relayPool.connectIfDisconnected() + val oldFilters = activeRequests.getSubscriptionFiltersOrNull(subId) ?: emptyMap() + activeRequests.addOrUpdate(subId, filters) + + val allRelays = filters.keys + oldFilters.keys + + allRelays.forEach { relay -> + val oldFilters = oldFilters[relay] + val newFilters = filters[relay] + + if (newFilters.isNullOrEmpty()) { + // some relays are not in this sub anymore. Stop their subscriptions + relayPool.close(relay, subId) + } else if (oldFilters.isNullOrEmpty()) { + // new relays were added. Start a new sub in them + relayPool.sendRequest(relay, subId, newFilters) + } else if (needsToResendRequest(oldFilters, newFilters)) { + // filters were changed enough (not only an update in since) to warn a new update + relayPool.sendRequest(relay, subId, newFilters) + } else { + // makes sure the relay wakes up if it was disconnected by the server + // upon connection, the relay will run the default Sync and update all + // filters, including this one. + relayPool.connectIfDisconnected(relay) + } + } + } + + fun sendCount( + subId: String = newSubId(), + filters: Map>, + ) { + val oldFilters = activeCounts.getSubscriptionFiltersOrNull(subId) ?: emptyMap() + activeCounts.addOrUpdate(subId, filters) + + val allRelays = filters.keys + oldFilters.keys + + allRelays.forEach { relay -> + val oldFilters = oldFilters[relay] + val newFilters = filters[relay] + + if (newFilters.isNullOrEmpty()) { + // some relays are not in this sub anymore. Stop their subscriptions + relayPool.close(relay, subId) + } else if (oldFilters.isNullOrEmpty()) { + // new relays were added. Start a new sub in them + relayPool.sendCount(relay, subId, newFilters) + } else if (needsToResendRequest(oldFilters, newFilters)) { + // filters were changed enough (not only an update in since) to warn a new update + relayPool.sendCount(relay, subId, newFilters) + } else { + // makes sure the relay wakes up if it was disconnected by the server + // upon connection, the relay will run the default Sync and update all + // filters, including this one. + relayPool.connectIfDisconnected(relay) + } + } } fun sendIfExists( @@ -142,11 +249,10 @@ class NostrClient( fun close(subscriptionId: String) { relayPool.close(subscriptionId) - activeSubscriptions.remove(subscriptionId) + activeRequests.remove(subscriptionId) + activeCounts.remove(subscriptionId) } - fun isActive(subscriptionId: String): Boolean = activeSubscriptions.isActive(subscriptionId) - override fun onEvent( relay: IRelayClient, subId: String, @@ -236,7 +342,7 @@ class NostrClient( listeners = listeners.minus(listener) } - fun getSubscriptionFiltersOrNull(subId: String): List? = activeSubscriptions.getSubscriptionFiltersOrNull(subId) + fun getSubscriptionFiltersOrNull(subId: String): Map>? = activeRequests.getSubscriptionFiltersOrNull(subId) fun relayStatusFlow() = relayPool.statusFlow } diff --git a/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/acessories/NostrClientSingleDownloadExt.kt b/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/acessories/NostrClientSingleDownloadExt.kt index 29fff8941..dbee706e9 100644 --- a/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/acessories/NostrClientSingleDownloadExt.kt +++ b/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/acessories/NostrClientSingleDownloadExt.kt @@ -23,9 +23,10 @@ package com.vitorpamplona.quartz.nip01Core.relay.client.acessories import com.vitorpamplona.quartz.nip01Core.core.Event import com.vitorpamplona.quartz.nip01Core.relay.client.NostrClient import com.vitorpamplona.quartz.nip01Core.relay.client.listeners.IRelayClientListener -import com.vitorpamplona.quartz.nip01Core.relay.client.pool.RelayBasedFilter import com.vitorpamplona.quartz.nip01Core.relay.client.single.IRelayClient import com.vitorpamplona.quartz.nip01Core.relay.client.single.newSubId +import com.vitorpamplona.quartz.nip01Core.relay.filters.Filter +import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.delay @@ -33,7 +34,7 @@ import kotlinx.coroutines.launch fun NostrClient.downloadFirstEvent( subscriptionId: String = newSubId(), - filters: List = listOf(), + filters: Map>, onResponse: (Event) -> Unit, ) { val listener = @@ -56,7 +57,7 @@ fun NostrClient.downloadFirstEvent( subscribe(listener) - sendFilter(subscriptionId, filters) + sendRequest(subscriptionId, filters) GlobalScope.launch(Dispatchers.IO) { delay(30000) diff --git a/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/pool/PoolSubscription.kt b/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/pool/PoolSubscription.kt deleted file mode 100644 index 1f48aa7cc..000000000 --- a/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/pool/PoolSubscription.kt +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Copyright (c) 2025 Vitor Pamplona - * - * Permission is hereby granted, free of charge, to any person obtaining a copy of - * this software and associated documentation files (the "Software"), to deal in - * the Software without restriction, including without limitation the rights to use, - * copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the - * Software, and to permit persons to whom the Software is furnished to do so, - * subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS - * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR - * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN - * AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION - * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - */ -package com.vitorpamplona.quartz.nip01Core.relay.client.pool - -import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl - -class PoolSubscription( - var filters: List = emptyList(), -) { - fun toFilter(relay: NormalizedRelayUrl) = filters.mapNotNull { it.toFilter(relay) } -} diff --git a/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/pool/PoolSubscriptionRepository.kt b/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/pool/PoolSubscriptionRepository.kt index 19a0aa404..c24064a12 100644 --- a/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/pool/PoolSubscriptionRepository.kt +++ b/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/pool/PoolSubscriptionRepository.kt @@ -22,20 +22,17 @@ package com.vitorpamplona.quartz.nip01Core.relay.client.pool import com.vitorpamplona.quartz.nip01Core.relay.filters.Filter import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl +import com.vitorpamplona.quartz.utils.LargeCache import kotlinx.coroutines.flow.MutableStateFlow class PoolSubscriptionRepository { - private var subscriptions = mapOf() + private var subscriptions = LargeCache>>() val relays = MutableStateFlow(setOf()) fun updateRelays() { val myRelays = mutableSetOf() - subscriptions.values.forEach { - it.filters.forEach { - if (!myRelays.contains(it.relay)) { - myRelays.add(it.relay) - } - } + subscriptions.forEach { sub, perRelayFilters -> + myRelays.addAll(perRelayFilters.keys) } if (relays.value != myRelays) { @@ -45,20 +42,15 @@ class PoolSubscriptionRepository { fun addOrUpdate( subscriptionId: String, - filters: List = listOf(), + filters: Map>, ) { - val currentFilter = subscriptions[subscriptionId] - if (currentFilter == null) { - subscriptions = subscriptions + Pair(subscriptionId, PoolSubscription(filters)) - } else { - currentFilter.filters = filters - } + subscriptions.put(subscriptionId, filters) updateRelays() } fun remove(subscriptionId: String) { - if (subscriptions.contains(subscriptionId)) { - subscriptions = subscriptions.minus(subscriptionId) + if (subscriptions.containsKey(subscriptionId)) { + subscriptions.remove(subscriptionId) updateRelays() } } @@ -67,9 +59,9 @@ class PoolSubscriptionRepository { relay: NormalizedRelayUrl, run: (String, List) -> Unit, ) { - subscriptions.forEach { (subId, filters) -> - val filters = filters.toFilter(relay) - if (filters.isNotEmpty()) { + subscriptions.forEach { subId, filters -> + val filters = filters[relay] + if (!filters.isNullOrEmpty()) { run(subId, filters) } else { null @@ -77,11 +69,5 @@ class PoolSubscriptionRepository { } } - fun isActive(subscriptionId: String): Boolean = subscriptions.contains(subscriptionId) - - fun allSubscriptions(): Map = subscriptions - - fun getSubscriptionFilters(subId: String): List = subscriptions[subId]?.filters ?: emptyList() - - fun getSubscriptionFiltersOrNull(subId: String): List? = subscriptions[subId]?.filters + fun getSubscriptionFiltersOrNull(subId: String): Map>? = subscriptions.get(subId) } diff --git a/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/pool/RelayBasedFilter.kt b/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/pool/RelayBasedFilter.kt index 2383936e7..b5ebfe358 100644 --- a/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/pool/RelayBasedFilter.kt +++ b/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/pool/RelayBasedFilter.kt @@ -40,3 +40,11 @@ class RelayBasedFilter( null } } + +fun List.groupByRelay(): Map> { + val result = mutableMapOf>() + for (relayBasedFilter in this) { + result.getOrPut(relayBasedFilter.relay) { mutableListOf() }.add(relayBasedFilter.filter) + } + return result +} diff --git a/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/pool/RelayPool.kt b/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/pool/RelayPool.kt index 3c96b0e86..e17954840 100644 --- a/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/pool/RelayPool.kt +++ b/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/pool/RelayPool.kt @@ -26,6 +26,7 @@ import com.vitorpamplona.quartz.nip01Core.relay.client.listeners.EmptyClientList import com.vitorpamplona.quartz.nip01Core.relay.client.listeners.IRelayClientListener import com.vitorpamplona.quartz.nip01Core.relay.client.listeners.RelayState import com.vitorpamplona.quartz.nip01Core.relay.client.single.IRelayClient +import com.vitorpamplona.quartz.nip01Core.relay.filters.Filter import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl import com.vitorpamplona.quartz.utils.LargeCache import kotlinx.coroutines.flow.MutableStateFlow @@ -33,7 +34,7 @@ import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow import kotlin.collections.forEach import kotlin.collections.isNotEmpty -import kotlin.collections.mapNotNull +import kotlin.collections.isNullOrEmpty val UnsupportedRelayCreation: (url: NormalizedRelayUrl) -> IRelayClient = { throw UnsupportedOperationException("Cannot create new relays") @@ -77,30 +78,48 @@ class RelayPool( relay.connectAndSyncFiltersIfDisconnected() } + fun connectIfDisconnected(relay: NormalizedRelayUrl) = relays.get(relay)?.connectAndSyncFiltersIfDisconnected() + fun disconnect() = relays.forEach { url, relay -> relay.disconnect() } fun sendRequest( + relay: NormalizedRelayUrl, subId: String, - filters: List, + filters: List, + ) { + relays.get(relay)?.sendRequest(subId, filters) + } + + fun sendRequest( + subId: String, + filters: Map>, ) { relays.forEach { url, relay -> - val filters = filters.mapNotNull { it.toFilter(url) } - if (filters.isNotEmpty()) { + val filters = filters[relay.url] + if (!filters.isNullOrEmpty()) { relay.sendRequest(subId, filters) } } } - fun sendCounter( + fun sendCount( + relay: NormalizedRelayUrl, subId: String, - filters: List, + filters: List, + ) { + relays.get(relay)?.sendRequest(subId, filters) + } + + fun sendCount( + subId: String, + filters: Map>, ) { relays.forEach { url, relay -> - val filters = filters.mapNotNull { it.toFilter(url) } - if (filters.isNotEmpty()) { + val filters = filters[relay.url] + if (!filters.isNullOrEmpty()) { relay.sendCount(subId, filters) } } @@ -111,6 +130,11 @@ class RelayPool( relay.close(subscriptionId) } + fun close( + relay: NormalizedRelayUrl, + subscriptionId: String, + ) = relays.get(relay)?.close(subscriptionId) + fun send( signedEvent: Event, list: Set, diff --git a/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/single/basic/BasicRelayClient.kt b/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/single/basic/BasicRelayClient.kt index 7e48fac95..80bb399f4 100644 --- a/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/single/basic/BasicRelayClient.kt +++ b/quartz/src/main/java/com/vitorpamplona/quartz/nip01Core/relay/client/single/basic/BasicRelayClient.kt @@ -293,7 +293,7 @@ open class BasicRelayClient( } private fun processClosed(msg: ClosedMessage) { - // Log.w(logTag, "Relay Closed Subscription $newMessage") + Log.w(logTag, "Relay Closed Subscription ${msg.subscriptionId} ${msg.message}") afterEOSEPerSubscription[msg.subscriptionId] = false listener.onClosed(this@BasicRelayClient, msg.subscriptionId, msg.message) } diff --git a/quartz/src/main/java/com/vitorpamplona/quartz/utils/LargeCache.kt b/quartz/src/main/java/com/vitorpamplona/quartz/utils/LargeCache.kt index 65d5cff60..e2678d9f0 100644 --- a/quartz/src/main/java/com/vitorpamplona/quartz/utils/LargeCache.kt +++ b/quartz/src/main/java/com/vitorpamplona/quartz/utils/LargeCache.kt @@ -20,8 +20,10 @@ */ package com.vitorpamplona.quartz.utils +import android.R.attr.value import java.util.concurrent.ConcurrentSkipListMap import java.util.function.BiConsumer +import kotlin.collections.LinkedHashMap class LargeCache { private val cache = ConcurrentSkipListMap() @@ -170,14 +172,32 @@ class LargeCache { return runner.count } - private fun innerForEach(runner: BiConsumer) { - // val (value, elapsed) = - // measureTimedValue { - cache.forEach(runner) - // } - // println("LargeCache full loop $elapsed \t for $runner") + public fun associate(transform: (K, V) -> Pair): Map { + val runner = BiAssociateCollector(size(), transform) + innerForEach(runner) + return runner.results + } - listOf(1, 2, 3).joinToString() + public fun associateNotNull(transform: (K, V) -> Pair?): Map { + val runner = BiAssociateNotNullCollector(size(), transform) + innerForEach(runner) + return runner.results + } + + public fun associateWith(transform: (K, V) -> U?): Map { + val runner = BiAssociateWithCollector(size(), transform) + innerForEach(runner) + return runner.results + } + + public fun associateNotNullWith(transform: (K, V) -> U): Map { + val runner = BiAssociateNotNullWithCollector(size(), transform) + innerForEach(runner) + return runner.results + } + + private fun innerForEach(runner: BiConsumer) { + cache.forEach(runner) } fun joinToString( @@ -255,6 +275,13 @@ fun interface BiMapper { ): R? } +fun interface BiMapperNotNull { + fun map( + k: K, + v: V, + ): R +} + class BiMapCollector( val mapper: BiMapper, ) : BiConsumer { @@ -271,6 +298,69 @@ class BiMapCollector( } } +class BiAssociateCollector( + val size: Int, + val mapper: BiMapperNotNull>, +) : BiConsumer { + var results: LinkedHashMap = LinkedHashMap(size) + + override fun accept( + k: K, + v: V, + ) { + val pair = mapper.map(k, v) + results.put(pair.first, pair.second) + } +} + +class BiAssociateNotNullCollector( + val size: Int, + val mapper: BiMapper?>, +) : BiConsumer { + var results: LinkedHashMap = LinkedHashMap(size) + + override fun accept( + k: K, + v: V, + ) { + val pair = mapper.map(k, v) + if (pair != null) { + results.put(pair.first, pair.second) + } + } +} + +class BiAssociateWithCollector( + val size: Int, + val mapper: BiMapper, +) : BiConsumer { + var results: LinkedHashMap = LinkedHashMap(size) + + override fun accept( + k: K, + v: V, + ) { + results.put(k, mapper.map(k, v)) + } +} + +class BiAssociateNotNullWithCollector( + val size: Int, + val mapper: BiMapper, +) : BiConsumer { + var results: LinkedHashMap = LinkedHashMap(size) + + override fun accept( + k: K, + v: V, + ) { + val newValue = mapper.map(k, v) + if (newValue != null) { + results.put(k, newValue) + } + } +} + class BiMapUniqueCollector( val mapper: BiMapper, ) : BiConsumer {