Makes sure Filters are not updated if the since is the only change

This commit is contained in:
Vitor Pamplona
2025-07-07 13:37:57 -04:00
parent 28d182cc55
commit cf48516602
44 changed files with 397 additions and 525 deletions

View File

@@ -97,7 +97,6 @@ import com.vitorpamplona.quartz.nip01Core.crypto.KeyPair
import com.vitorpamplona.quartz.nip01Core.hints.EventHintBundle
import com.vitorpamplona.quartz.nip01Core.relay.client.NostrClient
import com.vitorpamplona.quartz.nip01Core.relay.client.acessories.downloadFirstEvent
import com.vitorpamplona.quartz.nip01Core.relay.client.pool.RelayBasedFilter
import com.vitorpamplona.quartz.nip01Core.relay.client.single.IRelayClient
import com.vitorpamplona.quartz.nip01Core.relay.filters.Filter
import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl
@@ -953,9 +952,8 @@ class Account(
it.host?.let { host ->
client.downloadFirstEvent(
filters =
note.relays.map { relay ->
RelayBasedFilter(
relay,
note.relays.associateWith { relay ->
listOf(
Filter(
ids = listOf(host.id),
),

View File

@@ -96,26 +96,26 @@ class RegisterAccounts(
val readyToSend =
accounts
.mapNotNull {
Log.d(tag, "Register Account ${it.npub}")
if (it.hasPrivKey || it.loggedInWithExternalSigner) {
Log.d(tag, "Register Account ${it.npub}")
val acc = LocalPreferences.loadCurrentAccountFromEncryptedStorage(it.npub)
if (acc != null && acc.isWriteable()) {
val nip65Read = acc.backupNIP65RelayList?.readRelaysNorm() ?: emptyList()
val acc = LocalPreferences.loadCurrentAccountFromEncryptedStorage(it.npub)
if (acc != null && acc.isWriteable()) {
val nip65Read = acc.backupNIP65RelayList?.readRelaysNorm() ?: emptyList()
Log.d(tag, "Register Account ${it.npub} NIP65 Reads ${nip65Read.joinToString(", ") { it.url } }")
Log.d(tag, "Register Account ${it.npub} NIP65 Reads ${nip65Read.joinToString(", ") { it.url } }")
val nip17Read = acc.backupDMRelayList?.relays() ?: emptyList()
val nip17Read = acc.backupDMRelayList?.relays() ?: emptyList()
Log.d(tag, "Register Account ${it.npub} NIP17 Reads ${nip17Read.joinToString(", ") { it.url } }")
Log.d(tag, "Register Account ${it.npub} NIP17 Reads ${nip17Read.joinToString(", ") { it.url } }")
val readKind3Relays = acc.backupContactList?.relays()?.mapNotNull { if (it.value.read) it.key else null } ?: emptyList()
val relays = (nip65Read + nip17Read)
Log.d(tag, "Register Account ${it.npub} Kind3 Reads ${readKind3Relays.joinToString(", ") { it.url } }")
val relays = (nip65Read + nip17Read + readKind3Relays)
if (relays.isNotEmpty()) {
Pair(acc, relays)
if (relays.isNotEmpty()) {
Pair(acc, relays)
} else {
null
}
} else {
null
}

View File

@@ -34,14 +34,8 @@ abstract class ComposeSubscriptionManager<T> : ComposeSubscriptionManagerControl
fun subscribe(query: T?) {
if (query == null) return
val wasEmpty = composeSubscriptions.isEmpty()
composeSubscriptions.put(query, query)
if (wasEmpty) {
start()
}
invalidateKeys()
}
@@ -52,10 +46,6 @@ abstract class ComposeSubscriptionManager<T> : ComposeSubscriptionManagerControl
composeSubscriptions.remove(query)
invalidateKeys()
if (composeSubscriptions.isEmpty()) {
stop()
}
}
fun allKeys() = composeSubscriptions.keys

View File

@@ -21,10 +21,6 @@
package com.vitorpamplona.amethyst.service.relayClient.composeSubscriptionManagers
interface ComposeSubscriptionManagerControls {
fun start()
fun stop()
fun invalidateKeys()
fun invalidateFilters()

View File

@@ -45,8 +45,6 @@ abstract class MutableComposeSubscriptionManager<T : MutableQueryState>(
fun subscribe(query: T?) {
if (query == null) return
val wasEmpty = composeSubscriptions.isEmpty()
composeSubscriptions[query]?.cancel()
composeSubscriptions[query] =
scope.launch {
@@ -55,10 +53,6 @@ abstract class MutableComposeSubscriptionManager<T : MutableQueryState>(
}
}
if (wasEmpty) {
start()
}
invalidateKeys()
}
@@ -70,10 +64,6 @@ abstract class MutableComposeSubscriptionManager<T : MutableQueryState>(
composeSubscriptions.remove(query)
invalidateKeys()
if (composeSubscriptions.isEmpty()) {
stop()
}
}
fun allKeys() = composeSubscriptions.keys

View File

@@ -22,49 +22,41 @@ package com.vitorpamplona.amethyst.service.relayClient.eoseManagers
import android.util.Log
import com.vitorpamplona.amethyst.isDebug
import com.vitorpamplona.ammolite.relays.BundledUpdate
import com.vitorpamplona.ammolite.relays.datasources.SubscriptionController
import com.vitorpamplona.quartz.nip01Core.relay.client.NostrClient
import kotlinx.coroutines.Dispatchers
abstract class BaseEoseManager<T>(
val client: NostrClient,
val allKeys: () -> Set<T>,
) {
val orchestrator =
SubscriptionController(client) {
if (isDebug) Log.d("${this.javaClass.simpleName}", "Updating Subscriptions")
updateSubscriptions(allKeys())
}
val orchestrator = SubscriptionController(client)
abstract fun updateSubscriptions(keys: Set<T>)
fun printStats() = orchestrator.printStats(this.javaClass.simpleName)
fun invalidateFilters() = orchestrator.invalidateFilters()
// Refreshes observers in batches.
private val bundler = BundledUpdate(300, Dispatchers.Default)
fun start() {
orchestrator.start()
if (isDebug) {
Log.d("${this.javaClass.simpleName}", "Start")
fun invalidateFilters() {
bundler.invalidate {
forceInvalidate()
}
}
fun stop() {
orchestrator.stop()
if (isDebug) {
Log.d("${this.javaClass.simpleName}", "Stop")
}
fun forceInvalidate() {
updateSubscriptions(allKeys())
orchestrator.updateRelays()
}
fun destroy() {
bundler.cancel()
orchestrator.destroy()
if (isDebug) {
Log.d("${this.javaClass.simpleName}", "Destroy, Unsubscribe")
}
}
fun init() {
if (isDebug) {
Log.d("${this.javaClass.simpleName}", "Init, Subscribe")
}
}
}

View File

@@ -25,6 +25,7 @@ import com.vitorpamplona.amethyst.service.relays.SincePerRelayMap
import com.vitorpamplona.ammolite.relays.datasources.Subscription
import com.vitorpamplona.quartz.nip01Core.relay.client.NostrClient
import com.vitorpamplona.quartz.nip01Core.relay.client.pool.RelayBasedFilter
import com.vitorpamplona.quartz.nip01Core.relay.client.pool.groupByRelay
import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl
import kotlin.collections.distinctBy
@@ -90,7 +91,8 @@ abstract class PerUniqueIdEoseManager<T>(
uniqueSubscribedAccounts.forEach {
val mainKey = id(it)
findOrCreateSubFor(it).relayBasedFilters = updateFilter(it, since(it))?.ifEmpty { null }
val newFilters = updateFilter(it, since(it))?.ifEmpty { null }
findOrCreateSubFor(it).updateFilters(newFilters?.groupByRelay())
updated.add(mainKey)
}

View File

@@ -26,6 +26,7 @@ import com.vitorpamplona.amethyst.service.relays.SincePerRelayMap
import com.vitorpamplona.ammolite.relays.datasources.Subscription
import com.vitorpamplona.quartz.nip01Core.relay.client.NostrClient
import com.vitorpamplona.quartz.nip01Core.relay.client.pool.RelayBasedFilter
import com.vitorpamplona.quartz.nip01Core.relay.client.pool.groupByRelay
import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl
/**
@@ -89,7 +90,8 @@ abstract class PerUserAndFollowListEoseManager<T>(
uniqueSubscribedAccounts.forEach {
val user = user(it)
val sub = findOrCreateSubFor(it)
sub.relayBasedFilters = updateFilter(it, since(it))?.ifEmpty { null }
val newFilters = updateFilter(it, since(it))?.ifEmpty { null }
sub.updateFilters(newFilters?.groupByRelay())
updated.add(user)
}

View File

@@ -26,6 +26,7 @@ import com.vitorpamplona.amethyst.service.relays.SincePerRelayMap
import com.vitorpamplona.ammolite.relays.datasources.Subscription
import com.vitorpamplona.quartz.nip01Core.relay.client.NostrClient
import com.vitorpamplona.quartz.nip01Core.relay.client.pool.RelayBasedFilter
import com.vitorpamplona.quartz.nip01Core.relay.client.pool.groupByRelay
import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl
import kotlin.collections.distinctBy
@@ -87,7 +88,9 @@ abstract class PerUserEoseManager<T>(
uniqueSubscribedAccounts.forEach {
val user = user(it)
findOrCreateSubFor(it).relayBasedFilters = updateFilter(it, since(it))?.ifEmpty { null }
val newFilters = updateFilter(it, since(it))?.ifEmpty { null }
findOrCreateSubFor(it).updateFilters(newFilters?.groupByRelay())
updated.add(user)
}

View File

@@ -24,6 +24,7 @@ import com.vitorpamplona.amethyst.service.relays.EOSERelayList
import com.vitorpamplona.amethyst.service.relays.SincePerRelayMap
import com.vitorpamplona.quartz.nip01Core.relay.client.NostrClient
import com.vitorpamplona.quartz.nip01Core.relay.client.pool.RelayBasedFilter
import com.vitorpamplona.quartz.nip01Core.relay.client.pool.groupByRelay
import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl
import kotlin.collections.distinctBy
@@ -64,8 +65,9 @@ abstract class SingleSubEoseManager<T>(
override fun updateSubscriptions(keys: Set<T>) {
val uniqueSubscribedAccounts = keys.distinctBy { distinct(it) }
val newFilters = updateFilter(uniqueSubscribedAccounts, since())?.ifEmpty { null }
sub.relayBasedFilters = updateFilter(uniqueSubscribedAccounts, since())?.ifEmpty { null }
sub.updateFilters(newFilters?.groupByRelay())
}
abstract fun updateFilter(

View File

@@ -22,6 +22,7 @@ package com.vitorpamplona.amethyst.service.relayClient.eoseManagers
import com.vitorpamplona.quartz.nip01Core.relay.client.NostrClient
import com.vitorpamplona.quartz.nip01Core.relay.client.pool.RelayBasedFilter
import com.vitorpamplona.quartz.nip01Core.relay.client.pool.groupByRelay
import kotlin.collections.distinctBy
/**
@@ -43,8 +44,8 @@ abstract class SingleSubNoEoseCacheEoseManager<T>(
override fun updateSubscriptions(keys: Set<T>) {
val uniqueSubscribedAccounts = keys.distinctBy { distinct(it) }
sub.relayBasedFilters = updateFilter(uniqueSubscribedAccounts)?.ifEmpty { null }
val newFilters = updateFilter(uniqueSubscribedAccounts)?.ifEmpty { null }
sub.updateFilters(newFilters?.groupByRelay())
}
abstract fun updateFilter(keys: List<T>): List<RelayBasedFilter>?

View File

@@ -47,15 +47,11 @@ class AccountFilterAssembler(
AccountNotificationsEoseManager(client, ::allKeys),
)
override fun start() = group.forEach { it.start() }
override fun stop() = group.forEach { it.stop() }
override fun invalidateKeys() = invalidateFilters()
override fun invalidateFilters() = group.forEach { it.invalidateFilters() }
override fun destroy() = group.forEach { it.start() }
override fun destroy() = group.forEach { it.destroy() }
override fun printStats() = group.forEach { it.printStats() }
}

View File

@@ -38,7 +38,7 @@ class AccountMetadataEoseManager(
client: NostrClient,
allKeys: () -> Set<AccountQueryState>,
) : PerUserEoseManager<AccountQueryState>(client, allKeys) {
override fun user(query: AccountQueryState) = query.account.userProfile()
override fun user(key: AccountQueryState) = key.account.userProfile()
fun relayFlow(query: AccountQueryState) = query.account.outboxRelays.flow

View File

@@ -44,10 +44,6 @@ class ChannelFinderFilterAssemblyGroup(
ChannelMetadataAndLiveActivityWatcherSubAssembler(client, ::allKeys),
)
override fun start() = group.forEach { it.start() }
override fun stop() = group.forEach { it.stop() }
override fun invalidateFilters() = group.forEach { it.invalidateFilters() }
override fun invalidateKeys() = invalidateFilters()

View File

@@ -40,10 +40,6 @@ class EventFinderFilterAssembler(
EventWatcherSubAssembler(client, ::allKeys),
)
override fun start() = group.forEach { it.start() }
override fun stop() = group.forEach { it.stop() }
override fun invalidateFilters() = group.forEach { it.invalidateFilters() }
override fun invalidateKeys() = invalidateFilters()

View File

@@ -41,10 +41,6 @@ class NWCPaymentFilterAssembler(
NWCPaymentWatcherSubAssembler(client, ::allKeys),
)
override fun start() = group.forEach { it.start() }
override fun stop() = group.forEach { it.stop() }
override fun invalidateFilters() = group.forEach { it.invalidateFilters() }
override fun invalidateKeys() = invalidateFilters()

View File

@@ -42,10 +42,6 @@ class UserFinderFilterAssembler(
UserWatcherSubAssembler(client, ::allKeys),
)
override fun start() = group.forEach { it.start() }
override fun stop() = group.forEach { it.stop() }
override fun invalidateFilters() = group.forEach { it.invalidateFilters() }
override fun invalidateKeys() = invalidateFilters()

View File

@@ -49,10 +49,6 @@ class SearchFilterAssembler(
SearchWatcherSubAssembler(cache, client, ::allKeys),
)
override fun start() = group.forEach { it.start() }
override fun stop() = group.forEach { it.stop() }
override fun invalidateFilters() = group.forEach { it.invalidateFilters() }
override fun invalidateKeys() = invalidateFilters()

View File

@@ -109,7 +109,8 @@ private fun RelayOptions(
onAddRelay: () -> Unit,
onRemoveRelay: () -> Unit,
) {
val userState by accountViewModel.normalizedKind3RelaySetFlow.collectAsStateWithLifecycle()
val userState by accountViewModel.account.trustedRelays.flow
.collectAsStateWithLifecycle()
if (!userState.contains(relay.url)) {
AddRelayButton(onAddRelay)

View File

@@ -161,22 +161,6 @@ class AccountViewModel(
var firstRoute: Route? = null
val normalizedKind3RelaySetFlow: StateFlow<Set<NormalizedRelayUrl>> =
account
.userProfile()
.flow()
.relays.stateFlow
.map { contactListState ->
contactListState.user.latestContactList
?.relays()
?.keys ?: emptySet()
}.flowOn(Dispatchers.Default)
.stateIn(
viewModelScope,
SharingStarted.WhileSubscribed(10000, 10000),
emptySet(),
)
val toastManager = ToastManager()
val feedStates = AccountFeedContentStates(this)

View File

@@ -41,15 +41,11 @@ class ChatroomFilterAssembler(
ChatroomFilterSubAssembler(client, ::allKeys),
)
override fun start() = group.forEach { it.start() }
override fun stop() = group.forEach { it.stop() }
override fun invalidateKeys() = invalidateFilters()
override fun invalidateFilters() = group.forEach { it.invalidateFilters() }
override fun destroy() = group.forEach { it.start() }
override fun destroy() = group.forEach { it.destroy() }
override fun printStats() = group.forEach { it.printStats() }
}

View File

@@ -44,15 +44,11 @@ class ChannelFilterAssembler(
ChannelFromUserFilterSubAssembler(client, ::allKeys),
)
override fun start() = group.forEach { it.start() }
override fun stop() = group.forEach { it.stop() }
override fun invalidateKeys() = invalidateFilters()
override fun invalidateFilters() = group.forEach { it.invalidateFilters() }
override fun destroy() = group.forEach { it.start() }
override fun destroy() = group.forEach { it.destroy() }
override fun printStats() = group.forEach { it.printStats() }
}

View File

@@ -39,15 +39,11 @@ class ChatroomListFilterAssembler(
FollowingEphemeralChatSubAssembler(client, ::allKeys),
)
override fun start() = group.forEach { it.start() }
override fun stop() = group.forEach { it.stop() }
override fun invalidateKeys() = invalidateFilters()
override fun invalidateFilters() = group.forEach { it.invalidateFilters() }
override fun destroy() = group.forEach { it.start() }
override fun destroy() = group.forEach { it.destroy() }
override fun printStats() = group.forEach { it.printStats() }
}

View File

@@ -37,15 +37,11 @@ class CommunityFilterAssembler(
CommunityFeedFilterSubAssembler(client, ::allKeys),
)
override fun start() = group.forEach { it.start() }
override fun stop() = group.forEach { it.stop() }
override fun invalidateKeys() = invalidateFilters()
override fun invalidateFilters() = group.forEach { it.invalidateFilters() }
override fun destroy() = group.forEach { it.start() }
override fun destroy() = group.forEach { it.destroy() }
override fun printStats() = group.forEach { it.printStats() }
}

View File

@@ -41,15 +41,11 @@ class DiscoveryFilterAssembler(
DiscoveryFollowsDiscoverySubAssembler3(client, ::allKeys),
)
override fun start() = group.forEach { it.start() }
override fun stop() = group.forEach { it.stop() }
override fun invalidateKeys() = invalidateFilters()
override fun invalidateFilters() = group.forEach { it.invalidateFilters() }
override fun destroy() = group.forEach { it.start() }
override fun destroy() = group.forEach { it.destroy() }
override fun printStats() = group.forEach { it.printStats() }
}

View File

@@ -45,15 +45,11 @@ class GeoHashFilterAssembler(
GeoHashFeedFilterSubAssembler(client, ::allKeys),
)
override fun start() = group.forEach { it.start() }
override fun stop() = group.forEach { it.stop() }
override fun invalidateKeys() = invalidateFilters()
override fun invalidateFilters() = group.forEach { it.invalidateFilters() }
override fun destroy() = group.forEach { it.start() }
override fun destroy() = group.forEach { it.destroy() }
override fun printStats() = group.forEach { it.printStats() }
}

View File

@@ -40,15 +40,11 @@ class HashtagFilterAssembler(
HashtagFeedFilterSubAssembler(client, ::allKeys),
)
override fun start() = group.forEach { it.start() }
override fun stop() = group.forEach { it.stop() }
override fun invalidateKeys() = invalidateFilters()
override fun invalidateFilters() = group.forEach { it.invalidateFilters() }
override fun destroy() = group.forEach { it.start() }
override fun destroy() = group.forEach { it.destroy() }
override fun printStats() = group.forEach { it.printStats() }
}

View File

@@ -46,15 +46,11 @@ class HomeFilterAssembler(
// MixGeohashHashtagsCommunityEoseManager(client, ::allKeys),
)
override fun start() = group.forEach { it.start() }
override fun stop() = group.forEach { it.stop() }
override fun invalidateKeys() = invalidateFilters()
override fun invalidateFilters() = group.forEach { it.invalidateFilters() }
override fun destroy() = group.forEach { it.start() }
override fun destroy() = group.forEach { it.destroy() }
override fun printStats() = group.forEach { it.printStats() }
}

View File

@@ -42,10 +42,6 @@ class UserProfileFilterAssembler(
UserProfileZapsFilterSubAssembler(client, ::allKeys),
)
override fun start() = group.forEach { it.start() }
override fun stop() = group.forEach { it.stop() }
override fun invalidateFilters() = group.forEach { it.invalidateFilters() }
override fun invalidateKeys() = invalidateFilters()

View File

@@ -40,15 +40,11 @@ class ThreadFilterAssembler(
ThreadEventLoaderSubAssembler(client, ::allKeys),
)
override fun start() = group.forEach { it.start() }
override fun stop() = group.forEach { it.stop() }
override fun invalidateKeys() = invalidateFilters()
override fun invalidateFilters() = group.forEach { it.invalidateFilters() }
override fun destroy() = group.forEach { it.start() }
override fun destroy() = group.forEach { it.destroy() }
override fun printStats() = group.forEach { it.printStats() }
}

View File

@@ -40,15 +40,11 @@ class VideoFilterAssembler(
VideoOutboxEventsFilterSubAssembler(client, ::allKeys),
)
override fun start() = group.forEach { it.start() }
override fun stop() = group.forEach { it.stop() }
override fun invalidateKeys() = invalidateFilters()
override fun invalidateFilters() = group.forEach { it.invalidateFilters() }
override fun destroy() = group.forEach { it.start() }
override fun destroy() = group.forEach { it.destroy() }
override fun printStats() = group.forEach { it.printStats() }
}

View File

@@ -20,70 +20,34 @@
*/
package com.vitorpamplona.ammolite.relays.datasources
import com.vitorpamplona.quartz.nip01Core.relay.client.pool.RelayBasedFilter
import com.vitorpamplona.quartz.nip01Core.relay.client.single.newSubId
import com.vitorpamplona.quartz.nip01Core.relay.filters.Filter
import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl
import kotlin.collections.forEachIndexed
import kotlin.contracts.ExperimentalContracts
data class Subscription(
val id: String = newSubId(),
val onEose: ((time: Long, relayUrl: NormalizedRelayUrl) -> Unit)? = null,
) {
var relayBasedFilters: List<RelayBasedFilter>? = null // Inactive when null
private var filters: Map<NormalizedRelayUrl, List<Filter>>? = null // Inactive when null
fun reset() {
relayBasedFilters = null
filters = null
}
fun updateFilters(newFilters: Map<NormalizedRelayUrl, List<Filter>>?) {
filters = newFilters
}
fun filters() = filters
@OptIn(ExperimentalContracts::class)
fun isActive() = filters != null
fun callEose(
time: Long,
relay: NormalizedRelayUrl,
) {
onEose?.let { it(time, relay) }
}
fun hasChangedFiltersFrom(otherFilters: List<RelayBasedFilter>?): Boolean {
if (relayBasedFilters == null && otherFilters == null) return false
if (relayBasedFilters?.size != otherFilters?.size) return true
relayBasedFilters?.forEachIndexed { index, relaySetFilter ->
val otherFilter = otherFilters?.getOrNull(index) ?: return true
if (relaySetFilter.relay != otherFilter.relay) return true
return isDifferent(relaySetFilter.filter, otherFilter.filter)
}
return false
}
fun isDifferent(
filter1: Filter,
filter2: Filter,
): Boolean {
// Does not check SINCE on purpose. Avoids replacing the filter if SINCE was all that changed.
// fast check
if (filter1.authors?.size != filter2.authors?.size ||
filter1.ids?.size != filter2.ids?.size ||
filter1.tags?.size != filter2.tags?.size ||
filter1.kinds?.size != filter2.kinds?.size ||
filter1.limit != filter2.limit ||
filter1.search?.length != filter2.search?.length ||
filter1.until != filter2.until
) {
return true
}
// deep check
if (filter1.ids != filter2.ids ||
filter1.authors != filter2.authors ||
filter1.tags != filter2.tags ||
filter1.kinds != filter2.kinds ||
filter1.search != filter2.search
) {
return true
}
return false
}
}

View File

@@ -20,16 +20,13 @@
*/
package com.vitorpamplona.ammolite.relays.datasources
import com.vitorpamplona.ammolite.relays.BundledUpdate
import com.vitorpamplona.quartz.nip01Core.core.Event
import com.vitorpamplona.quartz.nip01Core.relay.client.NostrClient
import com.vitorpamplona.quartz.nip01Core.relay.client.listeners.IRelayClientListener
import com.vitorpamplona.quartz.nip01Core.relay.client.pool.RelayBasedFilter
import com.vitorpamplona.quartz.nip01Core.relay.client.single.IRelayClient
import com.vitorpamplona.quartz.nip01Core.relay.filters.Filter
import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import java.util.concurrent.atomic.AtomicBoolean
import com.vitorpamplona.quartz.utils.LargeCache
/**
* Semantically groups Nostr filters and subscriptions in data source objects that
@@ -37,13 +34,9 @@ import java.util.concurrent.atomic.AtomicBoolean
*/
class SubscriptionController(
val client: NostrClient,
val updateSubscriptions: () -> Unit,
) : SubscriptionControllerService {
private val subscriptions = SubscriptionSet()
private var active: Boolean = false
private val changingFilters = AtomicBoolean()
val stats = SubscriptionStats()
) {
private val subscriptions = LargeCache<String, Subscription>()
private val stats = SubscriptionStats()
private val clientListener =
object : IRelayClientListener {
@@ -54,10 +47,10 @@ class SubscriptionController(
arrivalTime: Long,
afterEOSE: Boolean,
) {
if (subscriptions.contains(subId)) {
if (subscriptions.containsKey(subId)) {
stats.add(subId, event.kind)
if (afterEOSE) {
runAfterEOSE(subId, relay, arrivalTime)
subscriptions.get(subId)?.callEose(arrivalTime, relay.url)
}
}
}
@@ -67,145 +60,63 @@ class SubscriptionController(
subId: String,
arrivalTime: Long,
) {
if (subscriptions.contains(subId)) {
runAfterEOSE(subId, relay, arrivalTime)
if (subscriptions.containsKey(subId)) {
subscriptions.get(subId)?.callEose(arrivalTime, relay.url)
}
}
}
private fun runAfterEOSE(
subscriptionId: String,
relay: IRelayClient,
arrivalTime: Long,
) {
subscriptions[subscriptionId]?.callEose(arrivalTime, relay.url)
}
init {
client.subscribe(clientListener)
}
override fun destroy() {
// makes sure to run
stop()
fun destroy() {
client.unsubscribe(clientListener)
bundler.cancel()
}
override fun start() {
active = true
invalidateFilters()
}
@OptIn(DelicateCoroutinesApi::class)
override fun stop() {
active = false
subscriptions.forEach { subscription ->
client.close(subscription.id)
subscription.reset()
}
}
override fun printStats(tag: String) = stats.printCounter(tag)
fun printStats(tag: String) = stats.printCounter(tag)
fun getSub(subId: String) = subscriptions.get(subId)
fun requestNewSubscription(onEOSE: ((Long, NormalizedRelayUrl) -> Unit)? = null): Subscription = subscriptions.newSub(onEOSE)
fun requestNewSubscription(onEOSE: ((Long, NormalizedRelayUrl) -> Unit)? = null): Subscription = Subscription(onEose = onEOSE).also { subscriptions.put(it.id, it) }
fun dismissSubscription(subId: String) = getSub(subId)?.let { dismissSubscription(it) }
fun dismissSubscription(subscription: Subscription) {
client.close(subscription.id)
subscription.reset()
subscriptions.remove(subscription)
subscriptions.remove(subscription.id)
}
fun isUpdatingFilters() = changingFilters.get()
// Refreshes observers in batches.
private val bundler = BundledUpdate(300, Dispatchers.Default)
override fun invalidateFilters() {
bundler.invalidate {
// println("DataSource: ${this.javaClass.simpleName} InvalidateFilters")
// adds the time to perform the refresh into this delay
// holding off new updates in case of heavy refresh routines.
resetFiltersSuspend()
}
}
private fun resetFiltersSuspend() {
// only runs one at a time. Ignores the others
if (changingFilters.compareAndSet(false, true)) {
try {
resetFiltersSuspendInner()
} finally {
changingFilters.getAndSet(false)
fun updateRelays() {
val currentFilters =
subscriptions.associateWith { id, sub ->
client.getSubscriptionFiltersOrNull(id)
}
}
}
private fun resetFiltersSuspendInner() {
// saves the channels that are currently active
val activeSubscriptions = subscriptions.actives()
// saves the current content to only update if it changes
val currentFilters = activeSubscriptions.associate { it.id to client.getSubscriptionFiltersOrNull(it.id) }
// updates all filters
updateSubscriptions()
// Makes sure to only send an updated filter when it actually changes.
subscriptions.forEach { newSubscriptionFilters ->
val currentFilters = currentFilters[newSubscriptionFilters.id]
updateRelaysIfNeeded(newSubscriptionFilters, currentFilters)
subscriptions.forEach { id, sub ->
updateRelaysIfNeeded(id, sub.filters(), currentFilters[id])
}
}
fun updateRelaysIfNeeded(
updatedSubscription: Subscription,
currentFilters: List<RelayBasedFilter>?,
subId: String,
updatedFilters: Map<NormalizedRelayUrl, List<Filter>>?,
currentFilters: Map<NormalizedRelayUrl, List<Filter>>?,
) {
val updatedSubscriptionNewFilters = updatedSubscription.relayBasedFilters
val isActive = client.isActive(updatedSubscription.id)
if (!isActive && updatedSubscriptionNewFilters != null) {
// Filter was removed from the active list
// but it is supposed to be there. Send again.
if (active) {
client.sendFilter(updatedSubscription.id, updatedSubscriptionNewFilters)
if (currentFilters != null) {
if (updatedFilters == null) {
// was active and is not active anymore, just close.
client.close(subId)
} else {
client.sendRequest(subId, updatedFilters)
}
} else {
if (currentFilters != null) {
if (updatedSubscriptionNewFilters == null) {
// was active and is not active anymore, just close.
client.close(updatedSubscription.id)
} else {
// was active and is still active, check if it has changed.
if (updatedSubscription.hasChangedFiltersFrom(currentFilters)) {
client.close(updatedSubscription.id)
if (active) {
client.sendFilter(updatedSubscription.id, updatedSubscriptionNewFilters)
}
} else {
// hasn't changed, does nothing.
// unless the relay has disconnected, then reconnect.
if (active) {
client.sendFilterOnlyIfDisconnected(updatedSubscription.id, updatedSubscriptionNewFilters)
}
}
}
if (updatedFilters == null) {
// was not active and is still not active, does nothing
} else {
if (updatedSubscriptionNewFilters == null) {
// was not active and is still not active, does nothing
} else {
// was not active and becomes active, sends the filter.
if (active) {
client.sendFilter(updatedSubscription.id, updatedSubscriptionNewFilters)
}
}
// was not active and becomes active, sends the entire filter.
client.sendRequest(subId, updatedFilters)
}
}
}

View File

@@ -1,33 +0,0 @@
/**
* Copyright (c) 2025 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays.datasources
interface SubscriptionControllerService {
fun start()
fun stop()
fun invalidateFilters()
fun destroy()
fun printStats(tag: String)
}

View File

@@ -1,47 +0,0 @@
/**
* Copyright (c) 2025 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays.datasources
import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl
class SubscriptionSet {
private var subscriptions = mapOf<String, Subscription>()
fun contains(subId: String) = subscriptions.containsKey(subId)
fun add(sub: Subscription) {
subscriptions = subscriptions + Pair(sub.id, sub)
}
fun remove(subId: String) {
subscriptions = subscriptions.minus(subId)
}
fun remove(sub: Subscription) = remove(sub.id)
fun newSub(onEOSE: ((Long, NormalizedRelayUrl) -> Unit)? = null): Subscription = Subscription(onEose = onEOSE).also { add(it) }
fun forEach(action: (Subscription) -> Unit) = subscriptions.values.forEach(action)
operator fun get(subId: String) = subscriptions[subId]
fun actives() = subscriptions.values.filter { it.relayBasedFilters != null }
}

View File

@@ -21,17 +21,19 @@
package com.vitorpamplona.ammolite.relays.datasources
import android.util.Log
import com.vitorpamplona.quartz.utils.LargeCache
class SubscriptionStats {
data class Counter(
val subscriptionId: String,
val eventKind: Int,
var counter: Int,
)
) {
var counter: Int = 0
}
private var eventCounter = mapOf<Int, Counter>()
private var eventCounter = LargeCache<Int, Counter>()
fun eventCounterIndex(
private fun eventCounterIndex(
str1: String,
str2: Int,
): Int = 31 * str1.hashCode() + str2.hashCode()
@@ -41,19 +43,15 @@ class SubscriptionStats {
eventKind: Int,
) {
val key = eventCounterIndex(subscriptionId, eventKind)
val keyValue = eventCounter[key]
if (keyValue != null) {
keyValue.counter++
} else {
eventCounter = eventCounter + Pair(key, Counter(subscriptionId, eventKind, 1))
}
val stats = eventCounter.getOrCreate(key) { Counter(subscriptionId, eventKind) }
stats.counter++
}
fun printCounter(tag: String) {
eventCounter.forEach {
eventCounter.forEach { _, stats ->
Log.d(
tag,
"Received Events ${it.value.subscriptionId} ${it.value.eventKind}: ${it.value.counter}",
"Received Events ${stats.subscriptionId} ${stats.eventKind}: ${stats.counter}",
)
}
}

View File

@@ -25,12 +25,12 @@ import com.vitorpamplona.quartz.nip01Core.relay.client.listeners.IRelayClientLis
import com.vitorpamplona.quartz.nip01Core.relay.client.listeners.RelayState
import com.vitorpamplona.quartz.nip01Core.relay.client.pool.PoolEventOutboxRepository
import com.vitorpamplona.quartz.nip01Core.relay.client.pool.PoolSubscriptionRepository
import com.vitorpamplona.quartz.nip01Core.relay.client.pool.RelayBasedFilter
import com.vitorpamplona.quartz.nip01Core.relay.client.pool.RelayPool
import com.vitorpamplona.quartz.nip01Core.relay.client.single.IRelayClient
import com.vitorpamplona.quartz.nip01Core.relay.client.single.basic.BasicRelayClient
import com.vitorpamplona.quartz.nip01Core.relay.client.single.newSubId
import com.vitorpamplona.quartz.nip01Core.relay.client.stats.RelayStats
import com.vitorpamplona.quartz.nip01Core.relay.filters.Filter
import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl
import com.vitorpamplona.quartz.nip01Core.relay.sockets.WebsocketBuilder
import kotlinx.coroutines.CoroutineScope
@@ -44,14 +44,15 @@ import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.stateIn
/**
* The Nostr Client manages a relay pool, keeps active subscriptions and manages sending of events.
* The Nostr Client manages a relay pool, keeps active subscriptions and manages re-sending of events.
*/
class NostrClient(
private val websocketBuilder: WebsocketBuilder,
private val scope: CoroutineScope,
) : IRelayClientListener {
private val relayPool: RelayPool = RelayPool(this, ::buildRelay)
private val activeSubscriptions: PoolSubscriptionRepository = PoolSubscriptionRepository()
private val activeRequests: PoolSubscriptionRepository = PoolSubscriptionRepository()
private val activeCounts: PoolSubscriptionRepository = PoolSubscriptionRepository()
private val eventOutbox: PoolEventOutboxRepository = PoolEventOutboxRepository()
private var listeners = setOf<IRelayClientListener>()
@@ -62,19 +63,20 @@ class NostrClient(
*/
private val allRelays =
combine(
activeSubscriptions.relays,
activeRequests.relays,
activeCounts.relays,
eventOutbox.relays,
) { subs, outbox ->
subs + outbox
) { reqs, counts, outbox ->
reqs + counts + outbox
}.onStart {
activeSubscriptions.relays.value + eventOutbox.relays.value
activeRequests.relays.value + activeCounts.relays.value + eventOutbox.relays.value
}.onEach {
relayPool.updatePool(it)
}.flowOn(Dispatchers.Default)
.stateIn(
scope,
SharingStarted.Companion.Eagerly,
activeSubscriptions.relays.value + eventOutbox.relays.value,
activeRequests.relays.value + activeCounts.relays.value + eventOutbox.relays.value,
)
fun buildRelay(relay: NormalizedRelayUrl): IRelayClient =
@@ -84,7 +86,8 @@ class NostrClient(
listener = relayPool,
stats = RelayStats.get(relay),
) { liveRelay ->
activeSubscriptions.forEachSub(relay, liveRelay::sendRequest)
activeRequests.forEachSub(relay, liveRelay::sendRequest)
activeCounts.forEachSub(relay, liveRelay::sendCount)
eventOutbox.forEachUnsentEvent(relay, liveRelay::send)
}
@@ -109,20 +112,124 @@ class NostrClient(
}
}
fun sendFilter(
subscriptionId: String = newSubId(),
filters: List<RelayBasedFilter> = listOf(),
) {
activeSubscriptions.addOrUpdate(subscriptionId, filters)
relayPool.sendRequest(subscriptionId, filters)
fun needsToResendRequest(
oldFilters: List<Filter>,
newFilters: List<Filter>,
): Boolean {
if (oldFilters.size != newFilters.size) return true
oldFilters.forEachIndexed { index, oldFilter ->
val newFilter = newFilters.getOrNull(index) ?: return true
return needsToResendRequest(oldFilter, newFilter)
}
return false
}
fun sendFilterOnlyIfDisconnected(
subscriptionId: String = newSubId(),
filters: List<RelayBasedFilter> = listOf(),
/**
* Checks if the filter has changed, with a special case for when the since changes due to new
* EOSE times.
*/
fun needsToResendRequest(
oldFilter: Filter,
newFilter: Filter,
): Boolean {
// Does not check SINCE on purpose. Avoids replacing the filter if SINCE was all that changed.
// fast check
if (oldFilter.authors?.size != newFilter.authors?.size ||
oldFilter.ids?.size != newFilter.ids?.size ||
oldFilter.tags?.size != newFilter.tags?.size ||
oldFilter.kinds?.size != newFilter.kinds?.size ||
oldFilter.limit != newFilter.limit ||
oldFilter.search?.length != newFilter.search?.length ||
oldFilter.until != newFilter.until
) {
return true
}
// deep check
if (oldFilter.ids != newFilter.ids ||
oldFilter.authors != newFilter.authors ||
oldFilter.tags != newFilter.tags ||
oldFilter.kinds != newFilter.kinds ||
oldFilter.search != newFilter.search
) {
return true
}
if (oldFilter.since != null) {
if (newFilter.since == null) {
// went was checking the future only and now wants everything
return true
} else if (oldFilter.since > newFilter.since) {
// went backwards in time, forces update
return true
}
}
return false
}
fun sendRequest(
subId: String = newSubId(),
filters: Map<NormalizedRelayUrl, List<Filter>>,
) {
activeSubscriptions.addOrUpdate(subscriptionId, filters)
relayPool.connectIfDisconnected()
val oldFilters = activeRequests.getSubscriptionFiltersOrNull(subId) ?: emptyMap()
activeRequests.addOrUpdate(subId, filters)
val allRelays = filters.keys + oldFilters.keys
allRelays.forEach { relay ->
val oldFilters = oldFilters[relay]
val newFilters = filters[relay]
if (newFilters.isNullOrEmpty()) {
// some relays are not in this sub anymore. Stop their subscriptions
relayPool.close(relay, subId)
} else if (oldFilters.isNullOrEmpty()) {
// new relays were added. Start a new sub in them
relayPool.sendRequest(relay, subId, newFilters)
} else if (needsToResendRequest(oldFilters, newFilters)) {
// filters were changed enough (not only an update in since) to warn a new update
relayPool.sendRequest(relay, subId, newFilters)
} else {
// makes sure the relay wakes up if it was disconnected by the server
// upon connection, the relay will run the default Sync and update all
// filters, including this one.
relayPool.connectIfDisconnected(relay)
}
}
}
fun sendCount(
subId: String = newSubId(),
filters: Map<NormalizedRelayUrl, List<Filter>>,
) {
val oldFilters = activeCounts.getSubscriptionFiltersOrNull(subId) ?: emptyMap()
activeCounts.addOrUpdate(subId, filters)
val allRelays = filters.keys + oldFilters.keys
allRelays.forEach { relay ->
val oldFilters = oldFilters[relay]
val newFilters = filters[relay]
if (newFilters.isNullOrEmpty()) {
// some relays are not in this sub anymore. Stop their subscriptions
relayPool.close(relay, subId)
} else if (oldFilters.isNullOrEmpty()) {
// new relays were added. Start a new sub in them
relayPool.sendCount(relay, subId, newFilters)
} else if (needsToResendRequest(oldFilters, newFilters)) {
// filters were changed enough (not only an update in since) to warn a new update
relayPool.sendCount(relay, subId, newFilters)
} else {
// makes sure the relay wakes up if it was disconnected by the server
// upon connection, the relay will run the default Sync and update all
// filters, including this one.
relayPool.connectIfDisconnected(relay)
}
}
}
fun sendIfExists(
@@ -142,11 +249,10 @@ class NostrClient(
fun close(subscriptionId: String) {
relayPool.close(subscriptionId)
activeSubscriptions.remove(subscriptionId)
activeRequests.remove(subscriptionId)
activeCounts.remove(subscriptionId)
}
fun isActive(subscriptionId: String): Boolean = activeSubscriptions.isActive(subscriptionId)
override fun onEvent(
relay: IRelayClient,
subId: String,
@@ -236,7 +342,7 @@ class NostrClient(
listeners = listeners.minus(listener)
}
fun getSubscriptionFiltersOrNull(subId: String): List<RelayBasedFilter>? = activeSubscriptions.getSubscriptionFiltersOrNull(subId)
fun getSubscriptionFiltersOrNull(subId: String): Map<NormalizedRelayUrl, List<Filter>>? = activeRequests.getSubscriptionFiltersOrNull(subId)
fun relayStatusFlow() = relayPool.statusFlow
}

View File

@@ -23,9 +23,10 @@ package com.vitorpamplona.quartz.nip01Core.relay.client.acessories
import com.vitorpamplona.quartz.nip01Core.core.Event
import com.vitorpamplona.quartz.nip01Core.relay.client.NostrClient
import com.vitorpamplona.quartz.nip01Core.relay.client.listeners.IRelayClientListener
import com.vitorpamplona.quartz.nip01Core.relay.client.pool.RelayBasedFilter
import com.vitorpamplona.quartz.nip01Core.relay.client.single.IRelayClient
import com.vitorpamplona.quartz.nip01Core.relay.client.single.newSubId
import com.vitorpamplona.quartz.nip01Core.relay.filters.Filter
import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
@@ -33,7 +34,7 @@ import kotlinx.coroutines.launch
fun NostrClient.downloadFirstEvent(
subscriptionId: String = newSubId(),
filters: List<RelayBasedFilter> = listOf(),
filters: Map<NormalizedRelayUrl, List<Filter>>,
onResponse: (Event) -> Unit,
) {
val listener =
@@ -56,7 +57,7 @@ fun NostrClient.downloadFirstEvent(
subscribe(listener)
sendFilter(subscriptionId, filters)
sendRequest(subscriptionId, filters)
GlobalScope.launch(Dispatchers.IO) {
delay(30000)

View File

@@ -1,29 +0,0 @@
/**
* Copyright (c) 2025 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.quartz.nip01Core.relay.client.pool
import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl
class PoolSubscription(
var filters: List<RelayBasedFilter> = emptyList(),
) {
fun toFilter(relay: NormalizedRelayUrl) = filters.mapNotNull { it.toFilter(relay) }
}

View File

@@ -22,20 +22,17 @@ package com.vitorpamplona.quartz.nip01Core.relay.client.pool
import com.vitorpamplona.quartz.nip01Core.relay.filters.Filter
import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl
import com.vitorpamplona.quartz.utils.LargeCache
import kotlinx.coroutines.flow.MutableStateFlow
class PoolSubscriptionRepository {
private var subscriptions = mapOf<String, PoolSubscription>()
private var subscriptions = LargeCache<String, Map<NormalizedRelayUrl, List<Filter>>>()
val relays = MutableStateFlow(setOf<NormalizedRelayUrl>())
fun updateRelays() {
val myRelays = mutableSetOf<NormalizedRelayUrl>()
subscriptions.values.forEach {
it.filters.forEach {
if (!myRelays.contains(it.relay)) {
myRelays.add(it.relay)
}
}
subscriptions.forEach { sub, perRelayFilters ->
myRelays.addAll(perRelayFilters.keys)
}
if (relays.value != myRelays) {
@@ -45,20 +42,15 @@ class PoolSubscriptionRepository {
fun addOrUpdate(
subscriptionId: String,
filters: List<RelayBasedFilter> = listOf(),
filters: Map<NormalizedRelayUrl, List<Filter>>,
) {
val currentFilter = subscriptions[subscriptionId]
if (currentFilter == null) {
subscriptions = subscriptions + Pair(subscriptionId, PoolSubscription(filters))
} else {
currentFilter.filters = filters
}
subscriptions.put(subscriptionId, filters)
updateRelays()
}
fun remove(subscriptionId: String) {
if (subscriptions.contains(subscriptionId)) {
subscriptions = subscriptions.minus(subscriptionId)
if (subscriptions.containsKey(subscriptionId)) {
subscriptions.remove(subscriptionId)
updateRelays()
}
}
@@ -67,9 +59,9 @@ class PoolSubscriptionRepository {
relay: NormalizedRelayUrl,
run: (String, List<Filter>) -> Unit,
) {
subscriptions.forEach { (subId, filters) ->
val filters = filters.toFilter(relay)
if (filters.isNotEmpty()) {
subscriptions.forEach { subId, filters ->
val filters = filters[relay]
if (!filters.isNullOrEmpty()) {
run(subId, filters)
} else {
null
@@ -77,11 +69,5 @@ class PoolSubscriptionRepository {
}
}
fun isActive(subscriptionId: String): Boolean = subscriptions.contains(subscriptionId)
fun allSubscriptions(): Map<String, PoolSubscription> = subscriptions
fun getSubscriptionFilters(subId: String): List<RelayBasedFilter> = subscriptions[subId]?.filters ?: emptyList()
fun getSubscriptionFiltersOrNull(subId: String): List<RelayBasedFilter>? = subscriptions[subId]?.filters
fun getSubscriptionFiltersOrNull(subId: String): Map<NormalizedRelayUrl, List<Filter>>? = subscriptions.get(subId)
}

View File

@@ -40,3 +40,11 @@ class RelayBasedFilter(
null
}
}
fun List<RelayBasedFilter>.groupByRelay(): Map<NormalizedRelayUrl, List<Filter>> {
val result = mutableMapOf<NormalizedRelayUrl, MutableList<Filter>>()
for (relayBasedFilter in this) {
result.getOrPut(relayBasedFilter.relay) { mutableListOf() }.add(relayBasedFilter.filter)
}
return result
}

View File

@@ -26,6 +26,7 @@ import com.vitorpamplona.quartz.nip01Core.relay.client.listeners.EmptyClientList
import com.vitorpamplona.quartz.nip01Core.relay.client.listeners.IRelayClientListener
import com.vitorpamplona.quartz.nip01Core.relay.client.listeners.RelayState
import com.vitorpamplona.quartz.nip01Core.relay.client.single.IRelayClient
import com.vitorpamplona.quartz.nip01Core.relay.filters.Filter
import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl
import com.vitorpamplona.quartz.utils.LargeCache
import kotlinx.coroutines.flow.MutableStateFlow
@@ -33,7 +34,7 @@ import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlin.collections.forEach
import kotlin.collections.isNotEmpty
import kotlin.collections.mapNotNull
import kotlin.collections.isNullOrEmpty
val UnsupportedRelayCreation: (url: NormalizedRelayUrl) -> IRelayClient = {
throw UnsupportedOperationException("Cannot create new relays")
@@ -77,30 +78,48 @@ class RelayPool(
relay.connectAndSyncFiltersIfDisconnected()
}
fun connectIfDisconnected(relay: NormalizedRelayUrl) = relays.get(relay)?.connectAndSyncFiltersIfDisconnected()
fun disconnect() =
relays.forEach { url, relay ->
relay.disconnect()
}
fun sendRequest(
relay: NormalizedRelayUrl,
subId: String,
filters: List<RelayBasedFilter>,
filters: List<Filter>,
) {
relays.get(relay)?.sendRequest(subId, filters)
}
fun sendRequest(
subId: String,
filters: Map<NormalizedRelayUrl, List<Filter>>,
) {
relays.forEach { url, relay ->
val filters = filters.mapNotNull { it.toFilter(url) }
if (filters.isNotEmpty()) {
val filters = filters[relay.url]
if (!filters.isNullOrEmpty()) {
relay.sendRequest(subId, filters)
}
}
}
fun sendCounter(
fun sendCount(
relay: NormalizedRelayUrl,
subId: String,
filters: List<RelayBasedFilter>,
filters: List<Filter>,
) {
relays.get(relay)?.sendRequest(subId, filters)
}
fun sendCount(
subId: String,
filters: Map<NormalizedRelayUrl, List<Filter>>,
) {
relays.forEach { url, relay ->
val filters = filters.mapNotNull { it.toFilter(url) }
if (filters.isNotEmpty()) {
val filters = filters[relay.url]
if (!filters.isNullOrEmpty()) {
relay.sendCount(subId, filters)
}
}
@@ -111,6 +130,11 @@ class RelayPool(
relay.close(subscriptionId)
}
fun close(
relay: NormalizedRelayUrl,
subscriptionId: String,
) = relays.get(relay)?.close(subscriptionId)
fun send(
signedEvent: Event,
list: Set<NormalizedRelayUrl>,

View File

@@ -293,7 +293,7 @@ open class BasicRelayClient(
}
private fun processClosed(msg: ClosedMessage) {
// Log.w(logTag, "Relay Closed Subscription $newMessage")
Log.w(logTag, "Relay Closed Subscription ${msg.subscriptionId} ${msg.message}")
afterEOSEPerSubscription[msg.subscriptionId] = false
listener.onClosed(this@BasicRelayClient, msg.subscriptionId, msg.message)
}

View File

@@ -20,8 +20,10 @@
*/
package com.vitorpamplona.quartz.utils
import android.R.attr.value
import java.util.concurrent.ConcurrentSkipListMap
import java.util.function.BiConsumer
import kotlin.collections.LinkedHashMap
class LargeCache<K, V> {
private val cache = ConcurrentSkipListMap<K, V>()
@@ -170,14 +172,32 @@ class LargeCache<K, V> {
return runner.count
}
private fun innerForEach(runner: BiConsumer<K, V>) {
// val (value, elapsed) =
// measureTimedValue {
cache.forEach(runner)
// }
// println("LargeCache full loop $elapsed \t for $runner")
public fun <T, U> associate(transform: (K, V) -> Pair<T, U>): Map<T, U> {
val runner = BiAssociateCollector(size(), transform)
innerForEach(runner)
return runner.results
}
listOf(1, 2, 3).joinToString()
public fun <T, U> associateNotNull(transform: (K, V) -> Pair<T, U>?): Map<T, U> {
val runner = BiAssociateNotNullCollector(size(), transform)
innerForEach(runner)
return runner.results
}
public fun <U> associateWith(transform: (K, V) -> U?): Map<K, U?> {
val runner = BiAssociateWithCollector(size(), transform)
innerForEach(runner)
return runner.results
}
public fun <U> associateNotNullWith(transform: (K, V) -> U): Map<K, U> {
val runner = BiAssociateNotNullWithCollector(size(), transform)
innerForEach(runner)
return runner.results
}
private fun innerForEach(runner: BiConsumer<K, V>) {
cache.forEach(runner)
}
fun joinToString(
@@ -255,6 +275,13 @@ fun interface BiMapper<K, V, R> {
): R?
}
fun interface BiMapperNotNull<K, V, R> {
fun map(
k: K,
v: V,
): R
}
class BiMapCollector<K, V, R>(
val mapper: BiMapper<K, V, R?>,
) : BiConsumer<K, V> {
@@ -271,6 +298,69 @@ class BiMapCollector<K, V, R>(
}
}
class BiAssociateCollector<K, V, T, U>(
val size: Int,
val mapper: BiMapperNotNull<K, V, Pair<T, U>>,
) : BiConsumer<K, V> {
var results: LinkedHashMap<T, U> = LinkedHashMap(size)
override fun accept(
k: K,
v: V,
) {
val pair = mapper.map(k, v)
results.put(pair.first, pair.second)
}
}
class BiAssociateNotNullCollector<K, V, T, U>(
val size: Int,
val mapper: BiMapper<K, V, Pair<T, U>?>,
) : BiConsumer<K, V> {
var results: LinkedHashMap<T, U> = LinkedHashMap(size)
override fun accept(
k: K,
v: V,
) {
val pair = mapper.map(k, v)
if (pair != null) {
results.put(pair.first, pair.second)
}
}
}
class BiAssociateWithCollector<K, V, U>(
val size: Int,
val mapper: BiMapper<K, V, U?>,
) : BiConsumer<K, V> {
var results: LinkedHashMap<K, U?> = LinkedHashMap(size)
override fun accept(
k: K,
v: V,
) {
results.put(k, mapper.map(k, v))
}
}
class BiAssociateNotNullWithCollector<K, V, U>(
val size: Int,
val mapper: BiMapper<K, V, U>,
) : BiConsumer<K, V> {
var results: LinkedHashMap<K, U> = LinkedHashMap(size)
override fun accept(
k: K,
v: V,
) {
val newValue = mapper.map(k, v)
if (newValue != null) {
results.put(k, newValue)
}
}
}
class BiMapUniqueCollector<K, V, R>(
val mapper: BiMapper<K, V, R?>,
) : BiConsumer<K, V> {