Moves the relay observer and reconnection to Service Manager's scope.

This commit is contained in:
Vitor Pamplona
2023-12-05 21:48:43 -05:00
parent ce2cfb7c86
commit 332a2f41b6
4 changed files with 93 additions and 14 deletions

View File

@ -31,8 +31,12 @@ import com.vitorpamplona.amethyst.service.relays.Client
import com.vitorpamplona.quartz.encoders.bechToBytes import com.vitorpamplona.quartz.encoders.bechToBytes
import com.vitorpamplona.quartz.encoders.decodePublicKeyAsHexOrNull import com.vitorpamplona.quartz.encoders.decodePublicKeyAsHexOrNull
import com.vitorpamplona.quartz.encoders.toHexKey import com.vitorpamplona.quartz.encoders.toHexKey
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
@ -41,6 +45,9 @@ class ServiceManager {
private var isStarted: Boolean = false // to not open amber in a loop trying to use auth relays and registering for notifications private var isStarted: Boolean = false // to not open amber in a loop trying to use auth relays and registering for notifications
private var account: Account? = null private var account: Account? = null
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
private var collectorJob: Job? = null
private fun start(account: Account) { private fun start(account: Account) {
this.account = account this.account = account
start() start()
@ -78,6 +85,17 @@ class ServiceManager {
Log.d("Relay", "Service Manager Connect Connecting ${relaySet.size}") Log.d("Relay", "Service Manager Connect Connecting ${relaySet.size}")
Client.reconnect(relaySet) Client.reconnect(relaySet)
collectorJob?.cancel()
collectorJob = null
collectorJob = scope.launch {
myAccount.userProfile().flow().relays.stateFlow.collect {
if (isStarted) {
val newRelaySet = myAccount.activeRelays() ?: myAccount.convertLocalRelays()
Client.reconnect(newRelaySet, onlyIfChanged = true)
}
}
}
// start services // start services
NostrAccountDataSource.account = myAccount NostrAccountDataSource.account = myAccount
NostrAccountDataSource.otherAccounts = LocalPreferences.allSavedAccounts().mapNotNull { NostrAccountDataSource.otherAccounts = LocalPreferences.allSavedAccounts().mapNotNull {
@ -113,6 +131,9 @@ class ServiceManager {
private fun pause() { private fun pause() {
Log.d("ServiceManager", "Pausing Relay Services") Log.d("ServiceManager", "Pausing Relay Services")
collectorJob?.cancel()
collectorJob = null
NostrAccountDataSource.stopSync() NostrAccountDataSource.stopSync()
NostrHomeDataSource.stopSync() NostrHomeDataSource.stopSync()
NostrChannelDataSource.stopSync() NostrChannelDataSource.stopSync()
@ -192,4 +213,9 @@ class ServiceManager {
fun pauseForGood() { fun pauseForGood() {
forceRestart(null, false, true) forceRestart(null, false, true)
} }
fun pauseForGoodAndClearAccount() {
account = null
forceRestart(null, false, true)
}
} }

View File

@ -1874,11 +1874,6 @@ class Account(
return (activeRelays() ?: convertLocalRelays()).filter { it.write } return (activeRelays() ?: convertLocalRelays()).filter { it.write }
} }
fun reconnectIfRelaysHaveChanged() {
val newRelaySet = activeRelays() ?: convertLocalRelays()
Client.reconnect(newRelaySet, true)
}
fun isAllHidden(users: Set<HexKey>): Boolean { fun isAllHidden(users: Set<HexKey>): Boolean {
return users.all { isHidden(it) } return users.all { isHidden(it) }
} }
@ -1957,7 +1952,7 @@ class Account(
).toSet() ).toSet()
} }
suspend fun saveRelayList(value: List<RelaySetupInfo>) { fun saveRelayList(value: List<RelaySetupInfo>) {
try { try {
localRelays = value.toSet() localRelays = value.toSet()
return sendNewRelayList(value.associate { it.url to ContactListEvent.ReadWrite(it.read, it.write) }) return sendNewRelayList(value.associate { it.url to ContactListEvent.ReadWrite(it.read, it.write) })
@ -2003,13 +1998,6 @@ class Account(
} }
suspend fun registerObservers() = withContext(Dispatchers.Main) { suspend fun registerObservers() = withContext(Dispatchers.Main) {
// Observes relays to restart connections
userProfile().live().relays.observeForever {
GlobalScope.launch(Dispatchers.IO) {
reconnectIfRelaysHaveChanged()
}
}
// saves contact list for the next time. // saves contact list for the next time.
userProfile().live().follows.observeForever { userProfile().live().follows.observeForever {
GlobalScope.launch(Dispatchers.IO) { GlobalScope.launch(Dispatchers.IO) {

View File

@ -25,6 +25,7 @@ import com.vitorpamplona.quartz.events.UserMetadata
import com.vitorpamplona.quartz.events.toImmutableListOfLists import com.vitorpamplona.quartz.events.toImmutableListOfLists
import kotlinx.collections.immutable.persistentSetOf import kotlinx.collections.immutable.persistentSetOf
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.MutableStateFlow
import java.math.BigDecimal import java.math.BigDecimal
@Stable @Stable
@ -123,6 +124,7 @@ class User(val pubkeyHex: String) {
} }
liveSet?.innerRelays?.invalidateData() liveSet?.innerRelays?.invalidateData()
flowSet?.relays?.invalidateData()
} }
fun addReport(note: Note) { fun addReport(note: Note) {
@ -356,6 +358,7 @@ class User(val pubkeyHex: String) {
} }
var liveSet: UserLiveSet? = null var liveSet: UserLiveSet? = null
var flowSet: UserFlowSet? = null
fun live(): UserLiveSet { fun live(): UserLiveSet {
if (liveSet == null) { if (liveSet == null) {
@ -383,6 +386,47 @@ class User(val pubkeyHex: String) {
} }
} }
} }
@Synchronized
fun createOrDestroyFlowSync(create: Boolean) {
if (create) {
if (flowSet == null) {
flowSet = UserFlowSet(this)
}
} else {
if (flowSet != null && flowSet?.isInUse() == false) {
flowSet?.destroy()
flowSet = null
}
}
}
fun flow(): UserFlowSet {
if (flowSet == null) {
createOrDestroyFlowSync(true)
}
return flowSet!!
}
fun clearFlow() {
if (flowSet != null && flowSet?.isInUse() == false) {
createOrDestroyFlowSync(false)
}
}
}
@Stable
class UserFlowSet(u: User) {
// Observers line up here.
val relays = UserBundledRefresherFlow(u)
fun isInUse(): Boolean {
return relays.stateFlow.subscriptionCount.value > 0
}
fun destroy() {
relays.destroy()
}
} }
@Stable @Stable
@ -491,6 +535,27 @@ class UserBundledRefresherLiveData(val user: User) : LiveData<UserState>(UserSta
} }
} }
@Stable
class UserBundledRefresherFlow(val user: User) {
// Refreshes observers in batches.
private val bundler = BundledUpdate(500, Dispatchers.IO)
val stateFlow = MutableStateFlow(UserState(user))
fun destroy() {
bundler.cancel()
}
fun invalidateData() {
checkNotInMainThread()
bundler.invalidate() {
checkNotInMainThread()
stateFlow.emit(UserState(user))
}
}
}
class UserLoadingLiveData<Y>(val user: User, initialValue: Y?) : MediatorLiveData<Y>(initialValue) { class UserLoadingLiveData<Y>(val user: User, initialValue: Y?) : MediatorLiveData<Y>(initialValue) {
override fun onActive() { override fun onActive() {
super.onActive() super.onActive()

View File

@ -57,7 +57,7 @@ class AccountStateViewModel() : ViewModel() {
_accountContent.update { AccountState.LoggedOff } _accountContent.update { AccountState.LoggedOff }
viewModelScope.launch(Dispatchers.IO) { viewModelScope.launch(Dispatchers.IO) {
serviceManager?.pauseForGood() serviceManager?.pauseForGoodAndClearAccount()
} }
} }