diff --git a/app/src/main/java/com/vitorpamplona/amethyst/ServiceManager.kt b/app/src/main/java/com/vitorpamplona/amethyst/ServiceManager.kt index 239e46ad1..1ec243aa6 100644 --- a/app/src/main/java/com/vitorpamplona/amethyst/ServiceManager.kt +++ b/app/src/main/java/com/vitorpamplona/amethyst/ServiceManager.kt @@ -31,8 +31,12 @@ import com.vitorpamplona.amethyst.service.relays.Client import com.vitorpamplona.quartz.encoders.bechToBytes import com.vitorpamplona.quartz.encoders.decodePublicKeyAsHexOrNull import com.vitorpamplona.quartz.encoders.toHexKey +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -41,6 +45,9 @@ class ServiceManager { private var isStarted: Boolean = false // to not open amber in a loop trying to use auth relays and registering for notifications private var account: Account? = null + private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) + private var collectorJob: Job? = null + private fun start(account: Account) { this.account = account start() @@ -78,6 +85,17 @@ class ServiceManager { Log.d("Relay", "Service Manager Connect Connecting ${relaySet.size}") Client.reconnect(relaySet) + collectorJob?.cancel() + collectorJob = null + collectorJob = scope.launch { + myAccount.userProfile().flow().relays.stateFlow.collect { + if (isStarted) { + val newRelaySet = myAccount.activeRelays() ?: myAccount.convertLocalRelays() + Client.reconnect(newRelaySet, onlyIfChanged = true) + } + } + } + // start services NostrAccountDataSource.account = myAccount NostrAccountDataSource.otherAccounts = LocalPreferences.allSavedAccounts().mapNotNull { @@ -113,6 +131,9 @@ class ServiceManager { private fun pause() { Log.d("ServiceManager", "Pausing Relay Services") + collectorJob?.cancel() + collectorJob = null + NostrAccountDataSource.stopSync() NostrHomeDataSource.stopSync() NostrChannelDataSource.stopSync() @@ -192,4 +213,9 @@ class ServiceManager { fun pauseForGood() { forceRestart(null, false, true) } + + fun pauseForGoodAndClearAccount() { + account = null + forceRestart(null, false, true) + } } diff --git a/app/src/main/java/com/vitorpamplona/amethyst/model/Account.kt b/app/src/main/java/com/vitorpamplona/amethyst/model/Account.kt index 28887af22..247a94cc9 100644 --- a/app/src/main/java/com/vitorpamplona/amethyst/model/Account.kt +++ b/app/src/main/java/com/vitorpamplona/amethyst/model/Account.kt @@ -1874,11 +1874,6 @@ class Account( return (activeRelays() ?: convertLocalRelays()).filter { it.write } } - fun reconnectIfRelaysHaveChanged() { - val newRelaySet = activeRelays() ?: convertLocalRelays() - Client.reconnect(newRelaySet, true) - } - fun isAllHidden(users: Set): Boolean { return users.all { isHidden(it) } } @@ -1957,7 +1952,7 @@ class Account( ).toSet() } - suspend fun saveRelayList(value: List) { + fun saveRelayList(value: List) { try { localRelays = value.toSet() return sendNewRelayList(value.associate { it.url to ContactListEvent.ReadWrite(it.read, it.write) }) @@ -2003,13 +1998,6 @@ class Account( } suspend fun registerObservers() = withContext(Dispatchers.Main) { - // Observes relays to restart connections - userProfile().live().relays.observeForever { - GlobalScope.launch(Dispatchers.IO) { - reconnectIfRelaysHaveChanged() - } - } - // saves contact list for the next time. userProfile().live().follows.observeForever { GlobalScope.launch(Dispatchers.IO) { diff --git a/app/src/main/java/com/vitorpamplona/amethyst/model/User.kt b/app/src/main/java/com/vitorpamplona/amethyst/model/User.kt index 23d942e73..618fee816 100644 --- a/app/src/main/java/com/vitorpamplona/amethyst/model/User.kt +++ b/app/src/main/java/com/vitorpamplona/amethyst/model/User.kt @@ -25,6 +25,7 @@ import com.vitorpamplona.quartz.events.UserMetadata import com.vitorpamplona.quartz.events.toImmutableListOfLists import kotlinx.collections.immutable.persistentSetOf import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.MutableStateFlow import java.math.BigDecimal @Stable @@ -123,6 +124,7 @@ class User(val pubkeyHex: String) { } liveSet?.innerRelays?.invalidateData() + flowSet?.relays?.invalidateData() } fun addReport(note: Note) { @@ -356,6 +358,7 @@ class User(val pubkeyHex: String) { } var liveSet: UserLiveSet? = null + var flowSet: UserFlowSet? = null fun live(): UserLiveSet { if (liveSet == null) { @@ -383,6 +386,47 @@ class User(val pubkeyHex: String) { } } } + + @Synchronized + fun createOrDestroyFlowSync(create: Boolean) { + if (create) { + if (flowSet == null) { + flowSet = UserFlowSet(this) + } + } else { + if (flowSet != null && flowSet?.isInUse() == false) { + flowSet?.destroy() + flowSet = null + } + } + } + + fun flow(): UserFlowSet { + if (flowSet == null) { + createOrDestroyFlowSync(true) + } + return flowSet!! + } + + fun clearFlow() { + if (flowSet != null && flowSet?.isInUse() == false) { + createOrDestroyFlowSync(false) + } + } +} + +@Stable +class UserFlowSet(u: User) { + // Observers line up here. + val relays = UserBundledRefresherFlow(u) + + fun isInUse(): Boolean { + return relays.stateFlow.subscriptionCount.value > 0 + } + + fun destroy() { + relays.destroy() + } } @Stable @@ -491,6 +535,27 @@ class UserBundledRefresherLiveData(val user: User) : LiveData(UserSta } } +@Stable +class UserBundledRefresherFlow(val user: User) { + // Refreshes observers in batches. + private val bundler = BundledUpdate(500, Dispatchers.IO) + val stateFlow = MutableStateFlow(UserState(user)) + + fun destroy() { + bundler.cancel() + } + + fun invalidateData() { + checkNotInMainThread() + + bundler.invalidate() { + checkNotInMainThread() + + stateFlow.emit(UserState(user)) + } + } +} + class UserLoadingLiveData(val user: User, initialValue: Y?) : MediatorLiveData(initialValue) { override fun onActive() { super.onActive() diff --git a/app/src/main/java/com/vitorpamplona/amethyst/ui/screen/AccountStateViewModel.kt b/app/src/main/java/com/vitorpamplona/amethyst/ui/screen/AccountStateViewModel.kt index 173d36ea7..22acc1fc8 100644 --- a/app/src/main/java/com/vitorpamplona/amethyst/ui/screen/AccountStateViewModel.kt +++ b/app/src/main/java/com/vitorpamplona/amethyst/ui/screen/AccountStateViewModel.kt @@ -57,7 +57,7 @@ class AccountStateViewModel() : ViewModel() { _accountContent.update { AccountState.LoggedOff } viewModelScope.launch(Dispatchers.IO) { - serviceManager?.pauseForGood() + serviceManager?.pauseForGoodAndClearAccount() } }