diff --git a/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/Client.kt b/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/Client.kt index e41972900..53c663582 100644 --- a/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/Client.kt +++ b/ammolite/src/main/java/com/vitorpamplona/ammolite/relays/Client.kt @@ -28,6 +28,7 @@ import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking import java.util.UUID import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit @@ -132,44 +133,35 @@ object Client : RelayPool.Listener { RelayPool.sendFilter(subscriptionId, filters) } - fun sendAndWaitForResponse( + @OptIn(DelicateCoroutinesApi::class) + suspend fun sendAndWaitForResponse( signedEvent: EventInterface, relay: String? = null, feedTypes: Set? = null, relayList: List? = null, onDone: (() -> Unit)? = null, + additionalListener: Listener? = null, timeoutInSeconds: Long = 30, ) { checkNotInMainThread() val size = if (relay != null) 1 else relayList?.size ?: RelayPool.availableRelays() val latch = CountDownLatch(size) + val relayErrors = mutableMapOf() - Log.d("Relay", "Waiting for $size responses") + Log.d("sendAndWaitForResponse", "Waiting for $size responses") - subscribe( + val subscription = object : Listener() { - override fun onEvent( - event: Event, - subscriptionId: String, - relay: Relay, - afterEOSE: Boolean, - ) { - if (event.id() == signedEvent.id()) { - unsubscribe(this) - latch.countDown() - Log.d("Relay", "Received response for ${event.id()} relay ${relay.url} count: ${latch.count}") - } - } - override fun onError( error: Error, subscriptionId: String, relay: Relay, ) { - unsubscribe(this) - latch.countDown() - Log.d("Relay", "Error from relay ${relay.url} count: ${latch.count} error: $error") + relayErrors[relay.url]?.let { + latch.countDown() + } + Log.d("sendAndWaitForResponse", "onError Error from relay ${relay.url} count: ${latch.count} error: $error") } override fun onRelayStateChange( @@ -178,9 +170,8 @@ object Client : RelayPool.Listener { subscriptionId: String?, ) { if (type == Relay.StateType.DISCONNECT) { - unsubscribe(this) latch.countDown() - Log.d("Relay", "Disconnected from relay ${relay.url} count: ${latch.count}") + Log.d("sendAndWaitForResponse", "onRelayStateChange Disconnected from relay ${relay.url} count: ${latch.count}") } } @@ -191,16 +182,27 @@ object Client : RelayPool.Listener { relay: Relay, ) { if (eventId == signedEvent.id()) { - unsubscribe(this) latch.countDown() - Log.d("Relay", "Received response for $eventId from relay ${relay.url} count: ${latch.count}") + Log.d("sendAndWaitForResponse", "onSendResponse Received response for $eventId from relay ${relay.url} count: ${latch.count} message $message success $success") } } - }, - ) + } - send(signedEvent, relay, feedTypes, relayList, onDone) - latch.await(timeoutInSeconds, TimeUnit.SECONDS) + subscribe(subscription) + additionalListener?.let { subscribe(it) } + + val job = + GlobalScope.launch(Dispatchers.IO) { + send(signedEvent, relay, feedTypes, relayList, onDone) + } + job.join() + + runBlocking { + latch.await(timeoutInSeconds, TimeUnit.SECONDS) + } + Log.d("sendAndWaitForResponse", "countdown finished") + unsubscribe(subscription) + additionalListener?.let { unsubscribe(it) } } fun sendFilterOnlyIfDisconnected(