Fixes reconnecting issue when DataSources were not active and become active later.

This commit is contained in:
Vitor Pamplona 2023-12-26 18:01:31 -05:00
parent 8ed960530c
commit 1cedc8a6c8
4 changed files with 62 additions and 37 deletions

View File

@ -30,7 +30,7 @@ abstract class NostrDataSource(val debugName: String) {
fun printCounter() {
eventCounter.forEach {
Log.d("STATE DUMP", "Received Events ${it.key}: ${it.value.counter}")
Log.d("STATE DUMP ${this.javaClass.simpleName}", "Received Events ${it.key}: ${it.value.counter}")
}
}
@ -45,6 +45,8 @@ abstract class NostrDataSource(val debugName: String) {
eventCounter = eventCounter + Pair(key, Counter(1))
}
// Log.d(this@NostrDataSource.javaClass.simpleName, "Relay ${relay.url}: ${event.kind}")
consume(event, relay)
if (afterEOSE) {
markAsEOSE(subscriptionId, relay)
@ -53,16 +55,18 @@ abstract class NostrDataSource(val debugName: String) {
}
override fun onError(error: Error, subscriptionId: String, relay: Relay) {
// Log.e("ERROR", "Relay ${relay.url}: ${error.message}")
// if (subscriptions.containsKey(subscriptionId)) {
// Log.e(
// this@NostrDataSource.javaClass.simpleName,
// "Relay OnError ${relay.url}: ${error.message}"
// )
// }
}
override fun onRelayStateChange(type: Relay.StateType, relay: Relay, subscriptionId: String?) {
// Log.d("RELAY", "Relay ${relay.url} ${when (type) {
// Relay.Type.CONNECT -> "connected."
// Relay.Type.DISCONNECT -> "disconnected."
// Relay.Type.DISCONNECTING -> "disconnecting."
// Relay.Type.EOSE -> "sent all events it had stored."
// }}")
// if (subscriptions.containsKey(subscriptionId)) {
// Log.d(this@NostrDataSource.javaClass.simpleName, "Relay ${relay.url} ${subscriptionId} ${type.name}")
// }
if (type == Relay.StateType.EOSE && subscriptionId != null && subscriptions.containsKey(subscriptionId)) {
markAsEOSE(subscriptionId, relay)
@ -88,12 +92,13 @@ abstract class NostrDataSource(val debugName: String) {
}
init {
Log.d("Init", "${this.javaClass.simpleName} Subscribe")
Log.d(this.javaClass.simpleName, "${this.javaClass.simpleName} Subscribe")
Client.subscribe(clientListener)
}
fun destroy() {
// makes sure to run
Log.d(this.javaClass.simpleName, "${this.javaClass.simpleName} Unsubscribe")
stop()
Client.unsubscribe(clientListener)
scope.cancel()
@ -177,32 +182,44 @@ abstract class NostrDataSource(val debugName: String) {
subscriptions.values.forEach { updatedSubscription ->
val updatedSubscriptionNewFilters = updatedSubscription.typedFilters
if (currentFilters.containsKey(updatedSubscription.id)) {
if (updatedSubscriptionNewFilters == null) {
// was active and is not active anymore, just close.
Client.close(updatedSubscription.id)
} else {
// was active and is still active, check if it has changed.
if (updatedSubscription.toJson() != currentFilters[updatedSubscription.id]) {
Client.close(updatedSubscription.id)
if (active) {
Client.sendFilter(updatedSubscription.id, updatedSubscriptionNewFilters)
}
} else {
// hasn't changed, does nothing.
if (active) {
Client.sendFilterOnlyIfDisconnected(updatedSubscription.id, updatedSubscriptionNewFilters)
}
}
val isActive = Client.isActive(updatedSubscription.id)
if (!isActive && updatedSubscriptionNewFilters != null) {
// Filter was removed from the active list
if (active) {
Client.sendFilter(updatedSubscription.id, updatedSubscriptionNewFilters)
}
} else {
if (updatedSubscriptionNewFilters == null) {
// was not active and is still not active, does nothing
if (currentFilters.containsKey(updatedSubscription.id)) {
if (updatedSubscriptionNewFilters == null) {
// was active and is not active anymore, just close.
Client.close(updatedSubscription.id)
} else {
// was active and is still active, check if it has changed.
if (updatedSubscription.toJson() != currentFilters[updatedSubscription.id]) {
Client.close(updatedSubscription.id)
if (active) {
Log.d(this@NostrDataSource.javaClass.simpleName, "Update Filter 1 ${updatedSubscription.id} ${Client.isSubscribed(clientListener)}")
Client.sendFilter(updatedSubscription.id, updatedSubscriptionNewFilters)
}
} else {
// hasn't changed, does nothing.
if (active) {
Log.d(this@NostrDataSource.javaClass.simpleName, "Update Filter 2 ${updatedSubscription.id} ${Client.isSubscribed(clientListener)}")
Client.sendFilterOnlyIfDisconnected(updatedSubscription.id, updatedSubscriptionNewFilters)
}
}
}
} else {
// was not active and becomes active, sends the filter.
if (updatedSubscription.toJson() != currentFilters[updatedSubscription.id]) {
if (active) {
Client.sendFilter(updatedSubscription.id, updatedSubscriptionNewFilters)
if (updatedSubscriptionNewFilters == null) {
// was not active and is still not active, does nothing
} else {
// was not active and becomes active, sends the filter.
if (updatedSubscription.toJson() != currentFilters[updatedSubscription.id]) {
if (active) {
Log.d(this@NostrDataSource.javaClass.simpleName, "Update Filter 3 ${updatedSubscription.id} ${Client.isSubscribed(clientListener)}")
Client.sendFilter(updatedSubscription.id, updatedSubscriptionNewFilters)
}
}
}
}

View File

@ -86,7 +86,7 @@ object Client : RelayPool.Listener {
checkNotInMainThread()
subscriptions = subscriptions + Pair(subscriptionId, filters)
RelayPool.sendFilterOnlyIfDisconnected()
RelayPool.sendFilterOnlyIfDisconnected(subscriptionId)
}
fun send(
@ -152,6 +152,10 @@ object Client : RelayPool.Listener {
subscriptions = subscriptions.minus(subscriptionId)
}
fun isActive(subscriptionId: String): Boolean {
return subscriptions.contains(subscriptionId)
}
@OptIn(DelicateCoroutinesApi::class)
override fun onEvent(event: Event, subscriptionId: String, relay: Relay, afterEOSE: Boolean) {
// Releases the Web thread for the new payload.
@ -209,6 +213,10 @@ object Client : RelayPool.Listener {
listeners = listeners.plus(listener)
}
fun isSubscribed(listener: Listener): Boolean {
return listeners.contains(listener)
}
fun unsubscribe(listener: Listener) {
listeners = listeners.minus(listener)
}

View File

@ -290,7 +290,7 @@ class Relay(
if (filters.isNotEmpty()) {
val request = filters.joinToStringLimited(
separator = ",",
limit = 10,
limit = 20,
prefix = """["REQ","$requestId",""",
postfix = "]"
) { it.filter.toJson(url) }
@ -339,7 +339,7 @@ class Relay(
return buffer.toString()
}
fun sendFilterOnlyIfDisconnected() {
fun sendFilterOnlyIfDisconnected(subscriptionId: String) {
checkNotInMainThread()
if (socket == null) {

View File

@ -60,8 +60,8 @@ object RelayPool : Relay.Listener {
relays.forEach { it.sendFilter(subscriptionId) }
}
fun sendFilterOnlyIfDisconnected() {
relays.forEach { it.sendFilterOnlyIfDisconnected() }
fun sendFilterOnlyIfDisconnected(subscriptionId: String) {
relays.forEach { it.sendFilterOnlyIfDisconnected(subscriptionId) }
}
fun sendToSelectedRelays(list: List<Relay>, signedEvent: EventInterface) {