- Adds a new observer by kind and author

- Uses NIP-17 relays for DMs
- Changes default connection to account from NIP17 relays
- Only sends wraps to the appropriate relays.
This commit is contained in:
Vitor Pamplona
2024-05-28 19:13:03 -04:00
parent f8dfc06a06
commit 51a6eb4974
7 changed files with 245 additions and 29 deletions

View File

@@ -104,7 +104,7 @@ class ServiceManager {
} }
if (myAccount != null) { if (myAccount != null) {
val relaySet = myAccount.activeRelays() ?: myAccount.convertLocalRelays() val relaySet = myAccount.connectToRelays.value
Log.d("Relay", "Service Manager Connect Connecting ${relaySet.size}") Log.d("Relay", "Service Manager Connect Connecting ${relaySet.size}")
Client.reconnect(relaySet) Client.reconnect(relaySet)
@@ -112,10 +112,9 @@ class ServiceManager {
collectorJob = null collectorJob = null
collectorJob = collectorJob =
scope.launch { scope.launch {
myAccount.userProfile().flow().relays.stateFlow.collect { myAccount.connectToRelaysFlow.collect {
if (isStarted) { if (isStarted) {
val newRelaySet = myAccount.activeRelays() ?: myAccount.convertLocalRelays() Client.reconnect(it, onlyIfChanged = true)
Client.reconnect(newRelaySet, onlyIfChanged = true)
} }
} }
} }

View File

@@ -120,7 +120,6 @@ import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.combineTransform import kotlinx.coroutines.flow.combineTransform
import kotlinx.coroutines.flow.flatMapLatest import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flattenMerge import kotlinx.coroutines.flow.flattenMerge
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapLatest import kotlinx.coroutines.flow.mapLatest
import kotlinx.coroutines.flow.stateIn import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.flow.transformLatest import kotlinx.coroutines.flow.transformLatest
@@ -227,6 +226,38 @@ class Account(
class ListNameNotePair(val listName: String, val event: GeneralListEvent?) class ListNameNotePair(val listName: String, val event: GeneralListEvent?)
val connectToRelaysFlow =
combineTransform(
getDMRelayListFlow(),
userProfile().flow().relays.stateFlow,
) { dmRelayList, userProfile ->
val newRelaySet = activeRelays() ?: convertLocalRelays()
val newDMRelaySet = (dmRelayList.note.event as? ChatMessageRelayListEvent)?.relays()
if (newDMRelaySet == null) {
emit(newRelaySet)
} else {
var mappedRelaySet =
newRelaySet.map {
if (newDMRelaySet?.contains(it.url) == true) {
Relay(it.url, true, true, it.activeTypes + FeedType.PRIVATE_DMS)
} else {
it
}
}
newDMRelaySet.forEach { newUrl ->
if (mappedRelaySet.filter { it.url == newUrl }.isEmpty()) {
mappedRelaySet = mappedRelaySet + Relay(newUrl, true, true, setOf(FeedType.PRIVATE_DMS))
}
}
emit(mappedRelaySet.toTypedArray())
}
}
val connectToRelays = connectToRelaysFlow.stateIn(scope, SharingStarted.Eagerly, activeRelays() ?: convertLocalRelays())
@OptIn(ExperimentalCoroutinesApi::class) @OptIn(ExperimentalCoroutinesApi::class)
val liveKind3FollowsFlow: Flow<LiveFollowLists> = val liveKind3FollowsFlow: Flow<LiveFollowLists> =
userProfile().flow().follows.stateFlow.transformLatest { userProfile().flow().follows.stateFlow.transformLatest {
@@ -1821,13 +1852,28 @@ class Account(
val id = mine.firstOrNull()?.id val id = mine.firstOrNull()?.id
val mineNote = if (id == null) null else LocalCache.getNoteIfExists(id) val mineNote = if (id == null) null else LocalCache.getNoteIfExists(id)
signedEvents.wraps.forEach { signedEvents.wraps.forEach { wrap ->
// Creates an alias // Creates an alias
if (mineNote != null && it.recipientPubKey() != keyPair.pubKey.toHexKey()) { if (mineNote != null && wrap.recipientPubKey() != keyPair.pubKey.toHexKey()) {
LocalCache.getOrAddAliasNote(it.id, mineNote) LocalCache.getOrAddAliasNote(wrap.id, mineNote)
} }
Client.send(it) val receiver = wrap.recipientPubKey()
if (receiver != null) {
val relayList =
(
LocalCache.getAddressableNoteIfExists(ChatMessageRelayListEvent.createAddressTag(receiver))
?.event as? ChatMessageRelayListEvent
)?.relays()?.ifEmpty { null }
if (relayList != null) {
Client.sendPrivately(signedEvent = wrap, relayList = relayList)
} else {
Client.send(wrap)
}
} else {
Client.send(wrap)
}
} }
} }
@@ -2427,8 +2473,7 @@ class Account(
?: FeedType.values().toSet() ?: FeedType.values().toSet()
Relay(it.key, it.value.read, it.value.write, localFeedTypes) Relay(it.key, it.value.read, it.value.write, localFeedTypes)
} } ?: return null
?: return null
// Ugly, but forces nostr.band as the only search-supporting relay today. // Ugly, but forces nostr.band as the only search-supporting relay today.
// TODO: Remove when search becomes more available. // TODO: Remove when search becomes more available.
@@ -2564,10 +2609,18 @@ class Account(
} }
} }
fun getDMRelayList(): ChatMessageRelayListEvent? { fun getDMRelayListNote(): AddressableNote {
return LocalCache.getOrCreateAddressableNote( return LocalCache.getOrCreateAddressableNote(
ChatMessageRelayListEvent.createAddressATag(signer.pubKey), ChatMessageRelayListEvent.createAddressATag(signer.pubKey),
).event as? ChatMessageRelayListEvent )
}
fun getDMRelayListFlow(): StateFlow<NoteState> {
return getDMRelayListNote().flow().metadata.stateFlow
}
fun getDMRelayList(): ChatMessageRelayListEvent? {
return getDMRelayListNote().event as? ChatMessageRelayListEvent
} }
fun saveDMRelayList(dmRelays: List<String>) { fun saveDMRelayList(dmRelays: List<String>) {

View File

@@ -26,6 +26,7 @@ import androidx.compose.runtime.Stable
import com.vitorpamplona.amethyst.Amethyst import com.vitorpamplona.amethyst.Amethyst
import com.vitorpamplona.amethyst.commons.data.DeletionIndex import com.vitorpamplona.amethyst.commons.data.DeletionIndex
import com.vitorpamplona.amethyst.commons.data.LargeCache import com.vitorpamplona.amethyst.commons.data.LargeCache
import com.vitorpamplona.amethyst.model.observables.LatestByKindAndAuthor
import com.vitorpamplona.amethyst.model.observables.LatestByKindWithETag import com.vitorpamplona.amethyst.model.observables.LatestByKindWithETag
import com.vitorpamplona.amethyst.service.checkNotInMainThread import com.vitorpamplona.amethyst.service.checkNotInMainThread
import com.vitorpamplona.amethyst.service.relays.Relay import com.vitorpamplona.amethyst.service.relays.Relay
@@ -120,10 +121,12 @@ import kotlinx.collections.immutable.persistentSetOf
import kotlinx.collections.immutable.toImmutableList import kotlinx.collections.immutable.toImmutableList
import kotlinx.collections.immutable.toImmutableSet import kotlinx.collections.immutable.toImmutableSet
import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.launch
import java.io.File import java.io.File
import java.io.FileOutputStream import java.io.FileOutputStream
import java.io.IOException import java.io.IOException
@@ -143,12 +146,13 @@ object LocalCache {
val deletionIndex = DeletionIndex() val deletionIndex = DeletionIndex()
val observablesByKindAndETag = ConcurrentHashMap<Int, ConcurrentHashMap<HexKey, LatestByKindWithETag<Event>>>(10) private val observablesByKindAndETag = ConcurrentHashMap<Int, ConcurrentHashMap<HexKey, LatestByKindWithETag<Event>>>(10)
private val observablesByKindAndAuthor = ConcurrentHashMap<Int, ConcurrentHashMap<HexKey, LatestByKindAndAuthor<Event>>>(10)
fun <T : Event> observeETag( fun <T : Event> observeETag(
kind: Int, kind: Int,
eventId: HexKey, eventId: HexKey,
onCreate: () -> LatestByKindWithETag<T>, scope: CoroutineScope,
): LatestByKindWithETag<T> { ): LatestByKindWithETag<T> {
var eTagList = observablesByKindAndETag.get(kind) var eTagList = observablesByKindAndETag.get(kind)
@@ -162,17 +166,54 @@ object LocalCache {
return if (value != null) { return if (value != null) {
value value
} else { } else {
val newObject = onCreate() as LatestByKindWithETag<Event> val newObject = LatestByKindWithETag<T>(kind, eventId) as LatestByKindWithETag<Event>
val obj = eTagList.putIfAbsent(eventId, newObject) ?: newObject val obj = eTagList.putIfAbsent(eventId, newObject) ?: newObject
if (obj == newObject) {
// initialize
scope.launch(Dispatchers.IO) {
obj.init()
}
}
obj obj
} as LatestByKindWithETag<T> } as LatestByKindWithETag<T>
} }
fun <T : Event> observeAuthor(
kind: Int,
pubkey: HexKey,
scope: CoroutineScope,
): LatestByKindAndAuthor<T> {
var authorObsList = observablesByKindAndAuthor.get(kind)
if (authorObsList == null) {
authorObsList = ConcurrentHashMap<HexKey, LatestByKindAndAuthor<T>>(1) as ConcurrentHashMap<HexKey, LatestByKindAndAuthor<Event>>
observablesByKindAndAuthor.put(kind, authorObsList)
}
val value = authorObsList.get(pubkey)
return if (value != null) {
value
} else {
val newObject = LatestByKindAndAuthor<T>(kind, pubkey) as LatestByKindAndAuthor<Event>
val obj = authorObsList.putIfAbsent(pubkey, newObject) ?: newObject
if (obj == newObject) {
// initialize
scope.launch(Dispatchers.IO) {
obj.init()
}
}
obj
} as LatestByKindAndAuthor<T>
}
fun updateObservables(event: Event) { fun updateObservables(event: Event) {
val observablesOfKind = observablesByKindAndETag[event.kind()] ?: return val observablesOfKind = observablesByKindAndETag[event.kind()] ?: return
event.forEachTaggedEvent { event.forEachTaggedEvent {
observablesOfKind[it]?.updateIfMatches(event) observablesOfKind[it]?.updateIfMatches(event)
} }
observablesByKindAndAuthor[event.kind()]?.get(event.pubKey)?.updateIfMatches(event)
} }
fun checkGetOrCreateUser(key: String): User? { fun checkGetOrCreateUser(key: String): User? {
@@ -367,6 +408,8 @@ object LocalCache {
if (event.createdAt > (user.latestContactList?.createdAt ?: 0) && !event.tags.isEmpty()) { if (event.createdAt > (user.latestContactList?.createdAt ?: 0) && !event.tags.isEmpty()) {
user.updateContactList(event) user.updateContactList(event)
// Log.d("CL", "Consumed contact list ${user.toNostrUri()} ${event.relays()?.size}") // Log.d("CL", "Consumed contact list ${user.toNostrUri()} ${event.relays()?.size}")
updateObservables(event)
} }
} }
@@ -2049,6 +2092,22 @@ object LocalCache {
notes.forEach { _, it -> it.clearFlow() } notes.forEach { _, it -> it.clearFlow() }
addressables.forEach { _, it -> it.clearFlow() } addressables.forEach { _, it -> it.clearFlow() }
users.forEach { _, it -> it.clearFlow() } users.forEach { _, it -> it.clearFlow() }
observablesByKindAndETag.forEach { _, list ->
list.forEach { key, value ->
if (value.canDelete()) {
list.remove(key)
}
}
}
observablesByKindAndAuthor.forEach { _, list ->
list.forEach { key, value ->
if (value.canDelete()) {
list.remove(key)
}
}
}
} }
fun pruneOldAndHiddenMessages(account: Account) { fun pruneOldAndHiddenMessages(account: Account) {

View File

@@ -20,6 +20,7 @@
*/ */
package com.vitorpamplona.amethyst.model.observables package com.vitorpamplona.amethyst.model.observables
import com.vitorpamplona.amethyst.model.AddressableNote
import com.vitorpamplona.amethyst.model.Note import com.vitorpamplona.amethyst.model.Note
object CreatedAtComparator : Comparator<Note> { object CreatedAtComparator : Comparator<Note> {
@@ -41,3 +42,23 @@ object CreatedAtComparator : Comparator<Note> {
} }
} }
} }
object CreatedAtComparatorAddresses : Comparator<AddressableNote> {
override fun compare(
first: AddressableNote?,
second: AddressableNote?,
): Int {
val firstEvent = first?.event
val secondEvent = second?.event
return if (firstEvent == null && secondEvent == null) {
0
} else if (firstEvent == null) {
1
} else if (secondEvent == null) {
-1
} else {
firstEvent.createdAt().compareTo(secondEvent.createdAt())
}
}
}

View File

@@ -0,0 +1,72 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.amethyst.model.observables
import com.vitorpamplona.amethyst.model.AddressableNote
import com.vitorpamplona.amethyst.model.LocalCache
import com.vitorpamplona.amethyst.model.Note
import com.vitorpamplona.quartz.events.Event
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asStateFlow
class LatestByKindAndAuthor<T : Event>(private val kind: Int, private val pubkey: String) {
private val _latest = MutableStateFlow<T?>(null)
val latest = _latest.asStateFlow()
fun matches(event: T) = event.kind == kind && event.pubKey == pubkey
fun updateIfMatches(event: T) {
if (matches(event)) {
if (event.createdAt > (_latest.value?.createdAt ?: 0)) {
_latest.tryEmit(event)
}
}
}
fun canDelete(): Boolean {
return _latest.subscriptionCount.value == 0
}
suspend fun init() {
val latestNote =
if ((kind in 10000..19999) || (kind in 30000..39999)) {
LocalCache.addressables.maxOrNullOf(
filter = { idHex: String, note: AddressableNote ->
note.event?.let {
it.kind() == kind && it.pubKey() == pubkey
} == true
},
comparator = CreatedAtComparatorAddresses,
)?.event as? T
} else {
LocalCache.notes.maxOrNullOf(
filter = { idHex: String, note: Note ->
note.event?.let {
it.kind() == kind && it.pubKey() == pubkey
} == true
},
comparator = CreatedAtComparator,
)?.event as? T
}
_latest.tryEmit(latestNote)
}
}

View File

@@ -158,6 +158,19 @@ object Client : RelayPool.Listener {
} }
} }
fun sendPrivately(
signedEvent: EventInterface,
relayList: List<String>,
) {
checkNotInMainThread()
relayList.forEach { relayUrl ->
RelayPool.getOrCreateRelay(relayUrl, setOf(FeedType.PRIVATE_DMS), { }) {
it.sendOverride(signedEvent)
}
}
}
fun close(subscriptionId: String) { fun close(subscriptionId: String) {
RelayPool.close(subscriptionId) RelayPool.close(subscriptionId)
subscriptions = subscriptions.minus(subscriptionId) subscriptions = subscriptions.minus(subscriptionId)

View File

@@ -49,7 +49,6 @@ import com.vitorpamplona.amethyst.model.UrlCachedPreviewer
import com.vitorpamplona.amethyst.model.User import com.vitorpamplona.amethyst.model.User
import com.vitorpamplona.amethyst.model.UserState import com.vitorpamplona.amethyst.model.UserState
import com.vitorpamplona.amethyst.model.observables.CreatedAtComparator import com.vitorpamplona.amethyst.model.observables.CreatedAtComparator
import com.vitorpamplona.amethyst.model.observables.LatestByKindWithETag
import com.vitorpamplona.amethyst.service.CashuProcessor import com.vitorpamplona.amethyst.service.CashuProcessor
import com.vitorpamplona.amethyst.service.CashuToken import com.vitorpamplona.amethyst.service.CashuToken
import com.vitorpamplona.amethyst.service.HttpClientManager import com.vitorpamplona.amethyst.service.HttpClientManager
@@ -74,8 +73,10 @@ import com.vitorpamplona.quartz.encoders.HexKey
import com.vitorpamplona.quartz.encoders.Nip11RelayInformation import com.vitorpamplona.quartz.encoders.Nip11RelayInformation
import com.vitorpamplona.quartz.encoders.Nip19Bech32 import com.vitorpamplona.quartz.encoders.Nip19Bech32
import com.vitorpamplona.quartz.events.AddressableEvent import com.vitorpamplona.quartz.events.AddressableEvent
import com.vitorpamplona.quartz.events.ChatMessageRelayListEvent
import com.vitorpamplona.quartz.events.ChatroomKey import com.vitorpamplona.quartz.events.ChatroomKey
import com.vitorpamplona.quartz.events.ChatroomKeyable import com.vitorpamplona.quartz.events.ChatroomKeyable
import com.vitorpamplona.quartz.events.ContactListEvent
import com.vitorpamplona.quartz.events.DraftEvent import com.vitorpamplona.quartz.events.DraftEvent
import com.vitorpamplona.quartz.events.Event import com.vitorpamplona.quartz.events.Event
import com.vitorpamplona.quartz.events.EventInterface import com.vitorpamplona.quartz.events.EventInterface
@@ -131,6 +132,9 @@ class AccountViewModel(val account: Account, val settings: SettingsState) : View
val userFollows: LiveData<UserState> = account.userProfile().live().follows.map { it } val userFollows: LiveData<UserState> = account.userProfile().live().follows.map { it }
val userRelays: LiveData<UserState> = account.userProfile().live().relays.map { it } val userRelays: LiveData<UserState> = account.userProfile().live().relays.map { it }
val kind3Relays: StateFlow<ContactListEvent?> = observeByAuthor(ContactListEvent.KIND, account.signer.pubKey)
val dmRelays: StateFlow<ChatMessageRelayListEvent?> = observeByAuthor(ChatMessageRelayListEvent.KIND, account.signer.pubKey)
val toasts = MutableSharedFlow<ToastMsg?>(0, 3, onBufferOverflow = BufferOverflow.DROP_OLDEST) val toasts = MutableSharedFlow<ToastMsg?>(0, 3, onBufferOverflow = BufferOverflow.DROP_OLDEST)
var serviceManager: ServiceManager? = null var serviceManager: ServiceManager? = null
@@ -183,19 +187,14 @@ class AccountViewModel(val account: Account, val settings: SettingsState) : View
kind: Int, kind: Int,
eTag: HexKey, eTag: HexKey,
): StateFlow<T?> { ): StateFlow<T?> {
val observable = return LocalCache.observeETag<T>(kind = kind, eventId = eTag, viewModelScope).latest
LocalCache.observeETag<T>( }
kind = kind,
eventId = eTag,
) {
LatestByKindWithETag<T>(kind, eTag)
}
viewModelScope.launch(Dispatchers.IO) { fun <T : Event> observeByAuthor(
observable.init() kind: Int,
} pubkeyHex: HexKey,
): StateFlow<T?> {
return observable.latest return LocalCache.observeAuthor<T>(kind = kind, pubkey = pubkeyHex, viewModelScope).latest
} }
fun reactToOrDelete( fun reactToOrDelete(