From 51a6eb4974c8ed8c91af9a4b8e52aae5039c83f5 Mon Sep 17 00:00:00 2001 From: Vitor Pamplona Date: Tue, 28 May 2024 19:13:03 -0400 Subject: [PATCH] - Adds a new observer by kind and author - Uses NIP-17 relays for DMs - Changes default connection to account from NIP17 relays - Only sends wraps to the appropriate relays. --- .../vitorpamplona/amethyst/ServiceManager.kt | 7 +- .../vitorpamplona/amethyst/model/Account.kt | 71 +++++++++++++++--- .../amethyst/model/LocalCache.kt | 65 ++++++++++++++++- .../model/observables/CreatedAtComparator.kt | 21 ++++++ .../observables/LatestByKindAndAuthor.kt | 72 +++++++++++++++++++ .../amethyst/service/relays/Client.kt | 13 ++++ .../ui/screen/loggedIn/AccountViewModel.kt | 25 ++++--- 7 files changed, 245 insertions(+), 29 deletions(-) create mode 100644 app/src/main/java/com/vitorpamplona/amethyst/model/observables/LatestByKindAndAuthor.kt diff --git a/app/src/main/java/com/vitorpamplona/amethyst/ServiceManager.kt b/app/src/main/java/com/vitorpamplona/amethyst/ServiceManager.kt index d86cd26bb..b87b2e5c2 100644 --- a/app/src/main/java/com/vitorpamplona/amethyst/ServiceManager.kt +++ b/app/src/main/java/com/vitorpamplona/amethyst/ServiceManager.kt @@ -104,7 +104,7 @@ class ServiceManager { } if (myAccount != null) { - val relaySet = myAccount.activeRelays() ?: myAccount.convertLocalRelays() + val relaySet = myAccount.connectToRelays.value Log.d("Relay", "Service Manager Connect Connecting ${relaySet.size}") Client.reconnect(relaySet) @@ -112,10 +112,9 @@ class ServiceManager { collectorJob = null collectorJob = scope.launch { - myAccount.userProfile().flow().relays.stateFlow.collect { + myAccount.connectToRelaysFlow.collect { if (isStarted) { - val newRelaySet = myAccount.activeRelays() ?: myAccount.convertLocalRelays() - Client.reconnect(newRelaySet, onlyIfChanged = true) + Client.reconnect(it, onlyIfChanged = true) } } } diff --git a/app/src/main/java/com/vitorpamplona/amethyst/model/Account.kt b/app/src/main/java/com/vitorpamplona/amethyst/model/Account.kt index 5dbf72f7a..1cc3c7e69 100644 --- a/app/src/main/java/com/vitorpamplona/amethyst/model/Account.kt +++ b/app/src/main/java/com/vitorpamplona/amethyst/model/Account.kt @@ -120,7 +120,6 @@ import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.combineTransform import kotlinx.coroutines.flow.flatMapLatest import kotlinx.coroutines.flow.flattenMerge -import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.mapLatest import kotlinx.coroutines.flow.stateIn import kotlinx.coroutines.flow.transformLatest @@ -227,6 +226,38 @@ class Account( class ListNameNotePair(val listName: String, val event: GeneralListEvent?) + val connectToRelaysFlow = + combineTransform( + getDMRelayListFlow(), + userProfile().flow().relays.stateFlow, + ) { dmRelayList, userProfile -> + val newRelaySet = activeRelays() ?: convertLocalRelays() + val newDMRelaySet = (dmRelayList.note.event as? ChatMessageRelayListEvent)?.relays() + + if (newDMRelaySet == null) { + emit(newRelaySet) + } else { + var mappedRelaySet = + newRelaySet.map { + if (newDMRelaySet?.contains(it.url) == true) { + Relay(it.url, true, true, it.activeTypes + FeedType.PRIVATE_DMS) + } else { + it + } + } + + newDMRelaySet.forEach { newUrl -> + if (mappedRelaySet.filter { it.url == newUrl }.isEmpty()) { + mappedRelaySet = mappedRelaySet + Relay(newUrl, true, true, setOf(FeedType.PRIVATE_DMS)) + } + } + + emit(mappedRelaySet.toTypedArray()) + } + } + + val connectToRelays = connectToRelaysFlow.stateIn(scope, SharingStarted.Eagerly, activeRelays() ?: convertLocalRelays()) + @OptIn(ExperimentalCoroutinesApi::class) val liveKind3FollowsFlow: Flow = userProfile().flow().follows.stateFlow.transformLatest { @@ -1821,13 +1852,28 @@ class Account( val id = mine.firstOrNull()?.id val mineNote = if (id == null) null else LocalCache.getNoteIfExists(id) - signedEvents.wraps.forEach { + signedEvents.wraps.forEach { wrap -> // Creates an alias - if (mineNote != null && it.recipientPubKey() != keyPair.pubKey.toHexKey()) { - LocalCache.getOrAddAliasNote(it.id, mineNote) + if (mineNote != null && wrap.recipientPubKey() != keyPair.pubKey.toHexKey()) { + LocalCache.getOrAddAliasNote(wrap.id, mineNote) } - Client.send(it) + val receiver = wrap.recipientPubKey() + if (receiver != null) { + val relayList = + ( + LocalCache.getAddressableNoteIfExists(ChatMessageRelayListEvent.createAddressTag(receiver)) + ?.event as? ChatMessageRelayListEvent + )?.relays()?.ifEmpty { null } + + if (relayList != null) { + Client.sendPrivately(signedEvent = wrap, relayList = relayList) + } else { + Client.send(wrap) + } + } else { + Client.send(wrap) + } } } @@ -2427,8 +2473,7 @@ class Account( ?: FeedType.values().toSet() Relay(it.key, it.value.read, it.value.write, localFeedTypes) - } - ?: return null + } ?: return null // Ugly, but forces nostr.band as the only search-supporting relay today. // TODO: Remove when search becomes more available. @@ -2564,10 +2609,18 @@ class Account( } } - fun getDMRelayList(): ChatMessageRelayListEvent? { + fun getDMRelayListNote(): AddressableNote { return LocalCache.getOrCreateAddressableNote( ChatMessageRelayListEvent.createAddressATag(signer.pubKey), - ).event as? ChatMessageRelayListEvent + ) + } + + fun getDMRelayListFlow(): StateFlow { + return getDMRelayListNote().flow().metadata.stateFlow + } + + fun getDMRelayList(): ChatMessageRelayListEvent? { + return getDMRelayListNote().event as? ChatMessageRelayListEvent } fun saveDMRelayList(dmRelays: List) { diff --git a/app/src/main/java/com/vitorpamplona/amethyst/model/LocalCache.kt b/app/src/main/java/com/vitorpamplona/amethyst/model/LocalCache.kt index 81eb50a7d..c2616010f 100644 --- a/app/src/main/java/com/vitorpamplona/amethyst/model/LocalCache.kt +++ b/app/src/main/java/com/vitorpamplona/amethyst/model/LocalCache.kt @@ -26,6 +26,7 @@ import androidx.compose.runtime.Stable import com.vitorpamplona.amethyst.Amethyst import com.vitorpamplona.amethyst.commons.data.DeletionIndex import com.vitorpamplona.amethyst.commons.data.LargeCache +import com.vitorpamplona.amethyst.model.observables.LatestByKindAndAuthor import com.vitorpamplona.amethyst.model.observables.LatestByKindWithETag import com.vitorpamplona.amethyst.service.checkNotInMainThread import com.vitorpamplona.amethyst.service.relays.Relay @@ -120,10 +121,12 @@ import kotlinx.collections.immutable.persistentSetOf import kotlinx.collections.immutable.toImmutableList import kotlinx.collections.immutable.toImmutableSet import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.launch import java.io.File import java.io.FileOutputStream import java.io.IOException @@ -143,12 +146,13 @@ object LocalCache { val deletionIndex = DeletionIndex() - val observablesByKindAndETag = ConcurrentHashMap>>(10) + private val observablesByKindAndETag = ConcurrentHashMap>>(10) + private val observablesByKindAndAuthor = ConcurrentHashMap>>(10) fun observeETag( kind: Int, eventId: HexKey, - onCreate: () -> LatestByKindWithETag, + scope: CoroutineScope, ): LatestByKindWithETag { var eTagList = observablesByKindAndETag.get(kind) @@ -162,17 +166,54 @@ object LocalCache { return if (value != null) { value } else { - val newObject = onCreate() as LatestByKindWithETag + val newObject = LatestByKindWithETag(kind, eventId) as LatestByKindWithETag val obj = eTagList.putIfAbsent(eventId, newObject) ?: newObject + if (obj == newObject) { + // initialize + scope.launch(Dispatchers.IO) { + obj.init() + } + } obj } as LatestByKindWithETag } + fun observeAuthor( + kind: Int, + pubkey: HexKey, + scope: CoroutineScope, + ): LatestByKindAndAuthor { + var authorObsList = observablesByKindAndAuthor.get(kind) + + if (authorObsList == null) { + authorObsList = ConcurrentHashMap>(1) as ConcurrentHashMap> + observablesByKindAndAuthor.put(kind, authorObsList) + } + + val value = authorObsList.get(pubkey) + + return if (value != null) { + value + } else { + val newObject = LatestByKindAndAuthor(kind, pubkey) as LatestByKindAndAuthor + val obj = authorObsList.putIfAbsent(pubkey, newObject) ?: newObject + if (obj == newObject) { + // initialize + scope.launch(Dispatchers.IO) { + obj.init() + } + } + obj + } as LatestByKindAndAuthor + } + fun updateObservables(event: Event) { val observablesOfKind = observablesByKindAndETag[event.kind()] ?: return event.forEachTaggedEvent { observablesOfKind[it]?.updateIfMatches(event) } + + observablesByKindAndAuthor[event.kind()]?.get(event.pubKey)?.updateIfMatches(event) } fun checkGetOrCreateUser(key: String): User? { @@ -367,6 +408,8 @@ object LocalCache { if (event.createdAt > (user.latestContactList?.createdAt ?: 0) && !event.tags.isEmpty()) { user.updateContactList(event) // Log.d("CL", "Consumed contact list ${user.toNostrUri()} ${event.relays()?.size}") + + updateObservables(event) } } @@ -2049,6 +2092,22 @@ object LocalCache { notes.forEach { _, it -> it.clearFlow() } addressables.forEach { _, it -> it.clearFlow() } users.forEach { _, it -> it.clearFlow() } + + observablesByKindAndETag.forEach { _, list -> + list.forEach { key, value -> + if (value.canDelete()) { + list.remove(key) + } + } + } + + observablesByKindAndAuthor.forEach { _, list -> + list.forEach { key, value -> + if (value.canDelete()) { + list.remove(key) + } + } + } } fun pruneOldAndHiddenMessages(account: Account) { diff --git a/app/src/main/java/com/vitorpamplona/amethyst/model/observables/CreatedAtComparator.kt b/app/src/main/java/com/vitorpamplona/amethyst/model/observables/CreatedAtComparator.kt index fb6ba16f4..1996f4585 100644 --- a/app/src/main/java/com/vitorpamplona/amethyst/model/observables/CreatedAtComparator.kt +++ b/app/src/main/java/com/vitorpamplona/amethyst/model/observables/CreatedAtComparator.kt @@ -20,6 +20,7 @@ */ package com.vitorpamplona.amethyst.model.observables +import com.vitorpamplona.amethyst.model.AddressableNote import com.vitorpamplona.amethyst.model.Note object CreatedAtComparator : Comparator { @@ -41,3 +42,23 @@ object CreatedAtComparator : Comparator { } } } + +object CreatedAtComparatorAddresses : Comparator { + override fun compare( + first: AddressableNote?, + second: AddressableNote?, + ): Int { + val firstEvent = first?.event + val secondEvent = second?.event + + return if (firstEvent == null && secondEvent == null) { + 0 + } else if (firstEvent == null) { + 1 + } else if (secondEvent == null) { + -1 + } else { + firstEvent.createdAt().compareTo(secondEvent.createdAt()) + } + } +} diff --git a/app/src/main/java/com/vitorpamplona/amethyst/model/observables/LatestByKindAndAuthor.kt b/app/src/main/java/com/vitorpamplona/amethyst/model/observables/LatestByKindAndAuthor.kt new file mode 100644 index 000000000..41fb72896 --- /dev/null +++ b/app/src/main/java/com/vitorpamplona/amethyst/model/observables/LatestByKindAndAuthor.kt @@ -0,0 +1,72 @@ +/** + * Copyright (c) 2024 Vitor Pamplona + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the + * Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN + * AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +package com.vitorpamplona.amethyst.model.observables + +import com.vitorpamplona.amethyst.model.AddressableNote +import com.vitorpamplona.amethyst.model.LocalCache +import com.vitorpamplona.amethyst.model.Note +import com.vitorpamplona.quartz.events.Event +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.asStateFlow + +class LatestByKindAndAuthor(private val kind: Int, private val pubkey: String) { + private val _latest = MutableStateFlow(null) + val latest = _latest.asStateFlow() + + fun matches(event: T) = event.kind == kind && event.pubKey == pubkey + + fun updateIfMatches(event: T) { + if (matches(event)) { + if (event.createdAt > (_latest.value?.createdAt ?: 0)) { + _latest.tryEmit(event) + } + } + } + + fun canDelete(): Boolean { + return _latest.subscriptionCount.value == 0 + } + + suspend fun init() { + val latestNote = + if ((kind in 10000..19999) || (kind in 30000..39999)) { + LocalCache.addressables.maxOrNullOf( + filter = { idHex: String, note: AddressableNote -> + note.event?.let { + it.kind() == kind && it.pubKey() == pubkey + } == true + }, + comparator = CreatedAtComparatorAddresses, + )?.event as? T + } else { + LocalCache.notes.maxOrNullOf( + filter = { idHex: String, note: Note -> + note.event?.let { + it.kind() == kind && it.pubKey() == pubkey + } == true + }, + comparator = CreatedAtComparator, + )?.event as? T + } + + _latest.tryEmit(latestNote) + } +} diff --git a/app/src/main/java/com/vitorpamplona/amethyst/service/relays/Client.kt b/app/src/main/java/com/vitorpamplona/amethyst/service/relays/Client.kt index cd6bff1c0..d0265b791 100644 --- a/app/src/main/java/com/vitorpamplona/amethyst/service/relays/Client.kt +++ b/app/src/main/java/com/vitorpamplona/amethyst/service/relays/Client.kt @@ -158,6 +158,19 @@ object Client : RelayPool.Listener { } } + fun sendPrivately( + signedEvent: EventInterface, + relayList: List, + ) { + checkNotInMainThread() + + relayList.forEach { relayUrl -> + RelayPool.getOrCreateRelay(relayUrl, setOf(FeedType.PRIVATE_DMS), { }) { + it.sendOverride(signedEvent) + } + } + } + fun close(subscriptionId: String) { RelayPool.close(subscriptionId) subscriptions = subscriptions.minus(subscriptionId) diff --git a/app/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/AccountViewModel.kt b/app/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/AccountViewModel.kt index b214b1e0a..6ad0ac818 100644 --- a/app/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/AccountViewModel.kt +++ b/app/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/AccountViewModel.kt @@ -49,7 +49,6 @@ import com.vitorpamplona.amethyst.model.UrlCachedPreviewer import com.vitorpamplona.amethyst.model.User import com.vitorpamplona.amethyst.model.UserState import com.vitorpamplona.amethyst.model.observables.CreatedAtComparator -import com.vitorpamplona.amethyst.model.observables.LatestByKindWithETag import com.vitorpamplona.amethyst.service.CashuProcessor import com.vitorpamplona.amethyst.service.CashuToken import com.vitorpamplona.amethyst.service.HttpClientManager @@ -74,8 +73,10 @@ import com.vitorpamplona.quartz.encoders.HexKey import com.vitorpamplona.quartz.encoders.Nip11RelayInformation import com.vitorpamplona.quartz.encoders.Nip19Bech32 import com.vitorpamplona.quartz.events.AddressableEvent +import com.vitorpamplona.quartz.events.ChatMessageRelayListEvent import com.vitorpamplona.quartz.events.ChatroomKey import com.vitorpamplona.quartz.events.ChatroomKeyable +import com.vitorpamplona.quartz.events.ContactListEvent import com.vitorpamplona.quartz.events.DraftEvent import com.vitorpamplona.quartz.events.Event import com.vitorpamplona.quartz.events.EventInterface @@ -131,6 +132,9 @@ class AccountViewModel(val account: Account, val settings: SettingsState) : View val userFollows: LiveData = account.userProfile().live().follows.map { it } val userRelays: LiveData = account.userProfile().live().relays.map { it } + val kind3Relays: StateFlow = observeByAuthor(ContactListEvent.KIND, account.signer.pubKey) + val dmRelays: StateFlow = observeByAuthor(ChatMessageRelayListEvent.KIND, account.signer.pubKey) + val toasts = MutableSharedFlow(0, 3, onBufferOverflow = BufferOverflow.DROP_OLDEST) var serviceManager: ServiceManager? = null @@ -183,19 +187,14 @@ class AccountViewModel(val account: Account, val settings: SettingsState) : View kind: Int, eTag: HexKey, ): StateFlow { - val observable = - LocalCache.observeETag( - kind = kind, - eventId = eTag, - ) { - LatestByKindWithETag(kind, eTag) - } + return LocalCache.observeETag(kind = kind, eventId = eTag, viewModelScope).latest + } - viewModelScope.launch(Dispatchers.IO) { - observable.init() - } - - return observable.latest + fun observeByAuthor( + kind: Int, + pubkeyHex: HexKey, + ): StateFlow { + return LocalCache.observeAuthor(kind = kind, pubkey = pubkeyHex, viewModelScope).latest } fun reactToOrDelete(