Adds an outbox cache to the Relay class in order to resend events after authentication.

This commit is contained in:
Vitor Pamplona
2024-07-09 17:30:16 -04:00
parent 217870244b
commit c7061066e9

View File

@@ -21,7 +21,6 @@
package com.vitorpamplona.ammolite.relays package com.vitorpamplona.ammolite.relays
import android.util.Log import android.util.Log
import com.vitorpamplona.ammolite.BuildConfig
import com.vitorpamplona.ammolite.service.HttpClientManager import com.vitorpamplona.ammolite.service.HttpClientManager
import com.vitorpamplona.ammolite.service.checkNotInMainThread import com.vitorpamplona.ammolite.service.checkNotInMainThread
import com.vitorpamplona.quartz.encoders.HexKey import com.vitorpamplona.quartz.encoders.HexKey
@@ -75,7 +74,7 @@ class Relay(
private var afterEOSEPerSubscription = mutableMapOf<String, Boolean>() private var afterEOSEPerSubscription = mutableMapOf<String, Boolean>()
private val authResponse = mutableMapOf<HexKey, Boolean>() private val authResponse = mutableMapOf<HexKey, Boolean>()
private val sendWhenReady = mutableListOf<EventInterface>() private val outboxCache = mutableMapOf<HexKey, EventInterface>()
fun register(listener: Listener) { fun register(listener: Listener) {
listeners = listeners.plus(listener) listeners = listeners.plus(listener)
@@ -93,6 +92,15 @@ class Relay(
// Sends everything. // Sends everything.
renewFilters() renewFilters()
sendOutbox()
}
}
private fun sendOutbox() {
synchronized(outboxCache) {
outboxCache.values.forEach {
send(it)
}
} }
} }
@@ -157,13 +165,6 @@ class Relay(
// Log.w("Relay", "Relay OnOpen, Loading All subscriptions $url") // Log.w("Relay", "Relay OnOpen, Loading All subscriptions $url")
onConnected(this@Relay) onConnected(this@Relay)
synchronized(sendWhenReady) {
sendWhenReady.forEach {
send(it)
}
sendWhenReady.clear()
}
listeners.forEach { it.onRelayStateChange(this@Relay, StateType.CONNECT, null) } listeners.forEach { it.onRelayStateChange(this@Relay, StateType.CONNECT, null) }
} }
@@ -310,8 +311,17 @@ class Relay(
if (authResponse.containsKey(eventId)) { if (authResponse.containsKey(eventId)) {
val wasAlreadyAuthenticated = authResponse.get(eventId) val wasAlreadyAuthenticated = authResponse.get(eventId)
authResponse.put(eventId, success) authResponse.put(eventId, success)
println("AABBCC Auth Response $url $wasAlreadyAuthenticated $success")
if (wasAlreadyAuthenticated != true && success) { if (wasAlreadyAuthenticated != true && success) {
renewFilters() renewFilters()
sendOutbox()
}
}
if (outboxCache.contains(eventId) && !message.startsWith("auth-required")) {
Log.w("Relay", "Relay on OK $url with message `$message`")
synchronized(outboxCache) {
outboxCache.remove(eventId)
} }
} }
@@ -325,7 +335,7 @@ class Relay(
} }
"AUTH" -> "AUTH" ->
listeners.forEach { listeners.forEach {
// Log.w("Relay", "Relay onAuth $url, ${msg[1].asString}") Log.w("Relay", "Relay onAuth $url, ${ msgArray[1].asText()}")
it.onAuth(this@Relay, msgArray[1].asText()) it.onAuth(this@Relay, msgArray[1].asText())
} }
"NOTIFY" -> "NOTIFY" ->
@@ -477,23 +487,7 @@ class Relay(
sendAuth(signedEvent) sendAuth(signedEvent)
} else { } else {
if (write) { if (write) {
if (isConnected()) { sendEvent(signedEvent)
if (isReady) {
writeToSocket("""["EVENT",${signedEvent.toJson()}]""")
} else {
synchronized(sendWhenReady) {
sendWhenReady.add(signedEvent)
}
}
} else {
// sends all filters after connection is successful.
connectAndRun {
writeToSocket("""["EVENT",${signedEvent.toJson()}]""")
// Sends everything.
renewFilters()
}
}
} }
} }
} }
@@ -504,21 +498,20 @@ class Relay(
} }
private fun sendEvent(signedEvent: EventInterface) { private fun sendEvent(signedEvent: EventInterface) {
synchronized(outboxCache) {
outboxCache.put(signedEvent.id(), signedEvent)
}
if (isConnected()) { if (isConnected()) {
if (isReady) { if (isReady) {
writeToSocket("""["EVENT",${signedEvent.toJson()}]""") writeToSocket("""["EVENT",${signedEvent.toJson()}]""")
} else {
synchronized(sendWhenReady) {
sendWhenReady.add(signedEvent)
}
} }
} else { } else {
// sends all filters after connection is successful. // sends all filters after connection is successful.
connectAndRun { connectAndRun {
writeToSocket("""["EVENT",${signedEvent.toJson()}]""")
// Sends everything. // Sends everything.
renewFilters() renewFilters()
sendOutbox()
} }
} }
} }
@@ -541,9 +534,9 @@ class Relay(
} }
RelayStats.addBytesSent(url, str.bytesUsedInMemory()) RelayStats.addBytesSent(url, str.bytesUsedInMemory())
if (BuildConfig.DEBUG) { // if (BuildConfig.DEBUG) {
Log.d("Relay", "Relay send $url $str") Log.d("Relay", "Relay send $url $str")
} // }
} }
} }