mirror of
https://github.com/vitorpamplona/amethyst.git
synced 2025-10-09 22:52:36 +02:00
Makes sure the NostrClient stays off in the background
This commit is contained in:
@@ -37,6 +37,7 @@ import kotlinx.coroutines.CoroutineScope
|
|||||||
import kotlinx.coroutines.DelicateCoroutinesApi
|
import kotlinx.coroutines.DelicateCoroutinesApi
|
||||||
import kotlinx.coroutines.Dispatchers
|
import kotlinx.coroutines.Dispatchers
|
||||||
import kotlinx.coroutines.FlowPreview
|
import kotlinx.coroutines.FlowPreview
|
||||||
|
import kotlinx.coroutines.NonCancellable.isActive
|
||||||
import kotlinx.coroutines.flow.SharingStarted
|
import kotlinx.coroutines.flow.SharingStarted
|
||||||
import kotlinx.coroutines.flow.combine
|
import kotlinx.coroutines.flow.combine
|
||||||
import kotlinx.coroutines.flow.flowOn
|
import kotlinx.coroutines.flow.flowOn
|
||||||
@@ -80,6 +81,11 @@ class NostrClient(
|
|||||||
|
|
||||||
private var listeners = setOf<IRelayClientListener>()
|
private var listeners = setOf<IRelayClientListener>()
|
||||||
|
|
||||||
|
// controls the state of the client in such a way that if it is active
|
||||||
|
// new filters will be sent to the relays and a potential reconnect can
|
||||||
|
// be triggered.
|
||||||
|
private var isActive = false
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Whatches for any changes in the relay list from subscriptions or outbox
|
* Whatches for any changes in the relay list from subscriptions or outbox
|
||||||
* and updates the relayPool as needed.
|
* and updates the relayPool as needed.
|
||||||
@@ -117,10 +123,15 @@ class NostrClient(
|
|||||||
fun allAvailableRelays() = relayPool.getAll()
|
fun allAvailableRelays() = relayPool.getAll()
|
||||||
|
|
||||||
// Reconnects all relays that may have disconnected
|
// Reconnects all relays that may have disconnected
|
||||||
fun connect() = relayPool.connect()
|
fun connect() {
|
||||||
|
isActive = true
|
||||||
|
relayPool.connect()
|
||||||
|
}
|
||||||
|
|
||||||
// Reconnects all relays that may have disconnected
|
fun disconnect() {
|
||||||
fun disconnect() = relayPool.disconnect()
|
isActive = false
|
||||||
|
relayPool.disconnect()
|
||||||
|
}
|
||||||
|
|
||||||
@Synchronized
|
@Synchronized
|
||||||
fun reconnect(onlyIfChanged: Boolean = false) {
|
fun reconnect(onlyIfChanged: Boolean = false) {
|
||||||
@@ -200,26 +211,28 @@ class NostrClient(
|
|||||||
val oldFilters = activeRequests.getSubscriptionFiltersOrNull(subId) ?: emptyMap()
|
val oldFilters = activeRequests.getSubscriptionFiltersOrNull(subId) ?: emptyMap()
|
||||||
activeRequests.addOrUpdate(subId, filters)
|
activeRequests.addOrUpdate(subId, filters)
|
||||||
|
|
||||||
val allRelays = filters.keys + oldFilters.keys
|
if (isActive) {
|
||||||
|
val allRelays = filters.keys + oldFilters.keys
|
||||||
|
|
||||||
allRelays.forEach { relay ->
|
allRelays.forEach { relay ->
|
||||||
val oldFilters = oldFilters[relay]
|
val oldFilters = oldFilters[relay]
|
||||||
val newFilters = filters[relay]
|
val newFilters = filters[relay]
|
||||||
|
|
||||||
if (newFilters.isNullOrEmpty()) {
|
if (newFilters.isNullOrEmpty()) {
|
||||||
// some relays are not in this sub anymore. Stop their subscriptions
|
// some relays are not in this sub anymore. Stop their subscriptions
|
||||||
relayPool.close(relay, subId)
|
relayPool.close(relay, subId)
|
||||||
} else if (oldFilters.isNullOrEmpty()) {
|
} else if (oldFilters.isNullOrEmpty()) {
|
||||||
// new relays were added. Start a new sub in them
|
// new relays were added. Start a new sub in them
|
||||||
relayPool.sendRequest(relay, subId, newFilters)
|
relayPool.sendRequest(relay, subId, newFilters)
|
||||||
} else if (needsToResendRequest(oldFilters, newFilters)) {
|
} else if (needsToResendRequest(oldFilters, newFilters)) {
|
||||||
// filters were changed enough (not only an update in since) to warn a new update
|
// filters were changed enough (not only an update in since) to warn a new update
|
||||||
relayPool.sendRequest(relay, subId, newFilters)
|
relayPool.sendRequest(relay, subId, newFilters)
|
||||||
} else {
|
} else {
|
||||||
// makes sure the relay wakes up if it was disconnected by the server
|
// makes sure the relay wakes up if it was disconnected by the server
|
||||||
// upon connection, the relay will run the default Sync and update all
|
// upon connection, the relay will run the default Sync and update all
|
||||||
// filters, including this one.
|
// filters, including this one.
|
||||||
relayPool.connectIfDisconnected(relay)
|
relayPool.connectIfDisconnected(relay)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -231,26 +244,28 @@ class NostrClient(
|
|||||||
val oldFilters = activeCounts.getSubscriptionFiltersOrNull(subId) ?: emptyMap()
|
val oldFilters = activeCounts.getSubscriptionFiltersOrNull(subId) ?: emptyMap()
|
||||||
activeCounts.addOrUpdate(subId, filters)
|
activeCounts.addOrUpdate(subId, filters)
|
||||||
|
|
||||||
val allRelays = filters.keys + oldFilters.keys
|
if (isActive) {
|
||||||
|
val allRelays = filters.keys + oldFilters.keys
|
||||||
|
|
||||||
allRelays.forEach { relay ->
|
allRelays.forEach { relay ->
|
||||||
val oldFilters = oldFilters[relay]
|
val oldFilters = oldFilters[relay]
|
||||||
val newFilters = filters[relay]
|
val newFilters = filters[relay]
|
||||||
|
|
||||||
if (newFilters.isNullOrEmpty()) {
|
if (newFilters.isNullOrEmpty()) {
|
||||||
// some relays are not in this sub anymore. Stop their subscriptions
|
// some relays are not in this sub anymore. Stop their subscriptions
|
||||||
relayPool.close(relay, subId)
|
relayPool.close(relay, subId)
|
||||||
} else if (oldFilters.isNullOrEmpty()) {
|
} else if (oldFilters.isNullOrEmpty()) {
|
||||||
// new relays were added. Start a new sub in them
|
// new relays were added. Start a new sub in them
|
||||||
relayPool.sendCount(relay, subId, newFilters)
|
relayPool.sendCount(relay, subId, newFilters)
|
||||||
} else if (needsToResendRequest(oldFilters, newFilters)) {
|
} else if (needsToResendRequest(oldFilters, newFilters)) {
|
||||||
// filters were changed enough (not only an update in since) to warn a new update
|
// filters were changed enough (not only an update in since) to warn a new update
|
||||||
relayPool.sendCount(relay, subId, newFilters)
|
relayPool.sendCount(relay, subId, newFilters)
|
||||||
} else {
|
} else {
|
||||||
// makes sure the relay wakes up if it was disconnected by the server
|
// makes sure the relay wakes up if it was disconnected by the server
|
||||||
// upon connection, the relay will run the default Sync and update all
|
// upon connection, the relay will run the default Sync and update all
|
||||||
// filters, including this one.
|
// filters, including this one.
|
||||||
relayPool.connectIfDisconnected(relay)
|
relayPool.connectIfDisconnected(relay)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -259,7 +274,9 @@ class NostrClient(
|
|||||||
event: Event,
|
event: Event,
|
||||||
connectedRelay: NormalizedRelayUrl,
|
connectedRelay: NormalizedRelayUrl,
|
||||||
) {
|
) {
|
||||||
relayPool.getRelay(connectedRelay)?.send(event)
|
if (isActive) {
|
||||||
|
relayPool.getRelay(connectedRelay)?.send(event)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fun send(
|
fun send(
|
||||||
@@ -267,7 +284,9 @@ class NostrClient(
|
|||||||
relayList: Set<NormalizedRelayUrl>,
|
relayList: Set<NormalizedRelayUrl>,
|
||||||
) {
|
) {
|
||||||
eventOutbox.markAsSending(event, relayList)
|
eventOutbox.markAsSending(event, relayList)
|
||||||
relayPool.send(event, relayList)
|
if (isActive) {
|
||||||
|
relayPool.send(event, relayList)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fun close(subscriptionId: String) {
|
fun close(subscriptionId: String) {
|
||||||
|
@@ -442,7 +442,7 @@ open class BasicRelayClient(
|
|||||||
override fun close(subscriptionId: String) {
|
override fun close(subscriptionId: String) {
|
||||||
// avoids sending closes for subscriptions that were never sent to this relay.
|
// avoids sending closes for subscriptions that were never sent to this relay.
|
||||||
if (afterEOSEPerSubscription.containsKey(subscriptionId)) {
|
if (afterEOSEPerSubscription.containsKey(subscriptionId)) {
|
||||||
writeToSocket(CloseCmd.Companion.toJson(subscriptionId))
|
writeToSocket(CloseCmd.toJson(subscriptionId))
|
||||||
afterEOSEPerSubscription[subscriptionId] = false
|
afterEOSEPerSubscription[subscriptionId] = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user