mirror of
https://github.com/vitorpamplona/amethyst.git
synced 2025-04-09 04:18:11 +02:00
add more logs, add optional listener, fix await call
This commit is contained in:
parent
62a2cba18b
commit
d0190c87a2
@ -28,6 +28,7 @@ import kotlinx.coroutines.DelicateCoroutinesApi
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.GlobalScope
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import java.util.UUID
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeUnit
|
||||
@ -132,44 +133,35 @@ object Client : RelayPool.Listener {
|
||||
RelayPool.sendFilter(subscriptionId, filters)
|
||||
}
|
||||
|
||||
fun sendAndWaitForResponse(
|
||||
@OptIn(DelicateCoroutinesApi::class)
|
||||
suspend fun sendAndWaitForResponse(
|
||||
signedEvent: EventInterface,
|
||||
relay: String? = null,
|
||||
feedTypes: Set<FeedType>? = null,
|
||||
relayList: List<RelaySetupInfo>? = null,
|
||||
onDone: (() -> Unit)? = null,
|
||||
additionalListener: Listener? = null,
|
||||
timeoutInSeconds: Long = 30,
|
||||
) {
|
||||
checkNotInMainThread()
|
||||
|
||||
val size = if (relay != null) 1 else relayList?.size ?: RelayPool.availableRelays()
|
||||
val latch = CountDownLatch(size)
|
||||
val relayErrors = mutableMapOf<String, String>()
|
||||
|
||||
Log.d("Relay", "Waiting for $size responses")
|
||||
Log.d("sendAndWaitForResponse", "Waiting for $size responses")
|
||||
|
||||
subscribe(
|
||||
val subscription =
|
||||
object : Listener() {
|
||||
override fun onEvent(
|
||||
event: Event,
|
||||
subscriptionId: String,
|
||||
relay: Relay,
|
||||
afterEOSE: Boolean,
|
||||
) {
|
||||
if (event.id() == signedEvent.id()) {
|
||||
unsubscribe(this)
|
||||
latch.countDown()
|
||||
Log.d("Relay", "Received response for ${event.id()} relay ${relay.url} count: ${latch.count}")
|
||||
}
|
||||
}
|
||||
|
||||
override fun onError(
|
||||
error: Error,
|
||||
subscriptionId: String,
|
||||
relay: Relay,
|
||||
) {
|
||||
unsubscribe(this)
|
||||
latch.countDown()
|
||||
Log.d("Relay", "Error from relay ${relay.url} count: ${latch.count} error: $error")
|
||||
relayErrors[relay.url]?.let {
|
||||
latch.countDown()
|
||||
}
|
||||
Log.d("sendAndWaitForResponse", "onError Error from relay ${relay.url} count: ${latch.count} error: $error")
|
||||
}
|
||||
|
||||
override fun onRelayStateChange(
|
||||
@ -178,9 +170,8 @@ object Client : RelayPool.Listener {
|
||||
subscriptionId: String?,
|
||||
) {
|
||||
if (type == Relay.StateType.DISCONNECT) {
|
||||
unsubscribe(this)
|
||||
latch.countDown()
|
||||
Log.d("Relay", "Disconnected from relay ${relay.url} count: ${latch.count}")
|
||||
Log.d("sendAndWaitForResponse", "onRelayStateChange Disconnected from relay ${relay.url} count: ${latch.count}")
|
||||
}
|
||||
}
|
||||
|
||||
@ -191,16 +182,27 @@ object Client : RelayPool.Listener {
|
||||
relay: Relay,
|
||||
) {
|
||||
if (eventId == signedEvent.id()) {
|
||||
unsubscribe(this)
|
||||
latch.countDown()
|
||||
Log.d("Relay", "Received response for $eventId from relay ${relay.url} count: ${latch.count}")
|
||||
Log.d("sendAndWaitForResponse", "onSendResponse Received response for $eventId from relay ${relay.url} count: ${latch.count} message $message success $success")
|
||||
}
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
send(signedEvent, relay, feedTypes, relayList, onDone)
|
||||
latch.await(timeoutInSeconds, TimeUnit.SECONDS)
|
||||
subscribe(subscription)
|
||||
additionalListener?.let { subscribe(it) }
|
||||
|
||||
val job =
|
||||
GlobalScope.launch(Dispatchers.IO) {
|
||||
send(signedEvent, relay, feedTypes, relayList, onDone)
|
||||
}
|
||||
job.join()
|
||||
|
||||
runBlocking {
|
||||
latch.await(timeoutInSeconds, TimeUnit.SECONDS)
|
||||
}
|
||||
Log.d("sendAndWaitForResponse", "countdown finished")
|
||||
unsubscribe(subscription)
|
||||
additionalListener?.let { unsubscribe(it) }
|
||||
}
|
||||
|
||||
fun sendFilterOnlyIfDisconnected(
|
||||
|
Loading…
x
Reference in New Issue
Block a user