mirror of
https://github.com/vitorpamplona/amethyst.git
synced 2025-09-29 06:52:50 +02:00
Avoids race condition when update EOSEs
This commit is contained in:
@@ -14,6 +14,7 @@ import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.launch
|
||||
import java.util.UUID
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import kotlin.Error
|
||||
|
||||
abstract class NostrDataSource(val debugName: String) {
|
||||
@@ -23,6 +24,7 @@ abstract class NostrDataSource(val debugName: String) {
|
||||
data class Counter(var counter: Int)
|
||||
|
||||
private var eventCounter = mapOf<String, Counter>()
|
||||
var changingFilters = AtomicBoolean()
|
||||
|
||||
fun printCounter() {
|
||||
eventCounter.forEach {
|
||||
@@ -59,7 +61,10 @@ abstract class NostrDataSource(val debugName: String) {
|
||||
|
||||
if (type == Relay.Type.EOSE && subscriptionId != null && subscriptionId in subscriptions.keys) {
|
||||
// updates a per subscripton since date
|
||||
subscriptions[subscriptionId]?.updateEOSE(TimeUtils.now(), relay.url)
|
||||
subscriptions[subscriptionId]?.updateEOSE(
|
||||
TimeUtils.fiveMinutesAgo(), // in case people's clock is slighly off.
|
||||
relay.url
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -136,6 +141,8 @@ abstract class NostrDataSource(val debugName: String) {
|
||||
// saves the current content to only update if it changes
|
||||
val currentFilters = activeSubscriptions.associate { it.id to it.toJson() }
|
||||
|
||||
changingFilters.getAndSet(true)
|
||||
|
||||
updateChannelFilters()
|
||||
|
||||
// Makes sure to only send an updated filter when it actually changes.
|
||||
@@ -167,6 +174,8 @@ abstract class NostrDataSource(val debugName: String) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
changingFilters.getAndSet(false)
|
||||
}
|
||||
|
||||
open fun consume(event: Event, relay: Relay) {
|
||||
|
@@ -134,6 +134,9 @@ object NostrSingleEventDataSource : NostrDataSource("SingleEventFeed") {
|
||||
}
|
||||
|
||||
val singleEventChannel = requestNewChannel { time, relayUrl ->
|
||||
// Ignores EOSE if it is in the middle of a filter change.
|
||||
if (changingFilters.get()) return@requestNewChannel
|
||||
|
||||
checkNotInMainThread()
|
||||
|
||||
eventsToWatch.forEach {
|
||||
|
Reference in New Issue
Block a user