Makes sure Websocket dispatchers are using IO threads

This commit is contained in:
Vitor Pamplona
2023-02-18 19:08:52 -05:00
parent 2ac5174b78
commit 138b22348c

View File

@@ -1,6 +1,7 @@
package com.vitorpamplona.amethyst.service.relays package com.vitorpamplona.amethyst.service.relays
import java.util.UUID import java.util.UUID
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import nostr.postr.events.Event import nostr.postr.events.Event
@@ -79,7 +80,7 @@ object Client: RelayPool.Listener {
override fun onEvent(event: Event, subscriptionId: String, relay: Relay) { override fun onEvent(event: Event, subscriptionId: String, relay: Relay) {
// Releases the Web thread for the new payload. // Releases the Web thread for the new payload.
// May need to add a processing queue if processing new events become too costly. // May need to add a processing queue if processing new events become too costly.
GlobalScope.launch { GlobalScope.launch(Dispatchers.IO) {
listeners.forEach { it.onEvent(event, subscriptionId, relay) } listeners.forEach { it.onEvent(event, subscriptionId, relay) }
} }
} }
@@ -87,7 +88,7 @@ object Client: RelayPool.Listener {
override fun onError(error: Error, subscriptionId: String, relay: Relay) { override fun onError(error: Error, subscriptionId: String, relay: Relay) {
// Releases the Web thread for the new payload. // Releases the Web thread for the new payload.
// May need to add a processing queue if processing new events become too costly. // May need to add a processing queue if processing new events become too costly.
GlobalScope.launch { GlobalScope.launch(Dispatchers.IO) {
listeners.forEach { it.onError(error, subscriptionId, relay) } listeners.forEach { it.onError(error, subscriptionId, relay) }
} }
} }
@@ -95,7 +96,7 @@ object Client: RelayPool.Listener {
override fun onRelayStateChange(type: Relay.Type, relay: Relay, channel: String?) { override fun onRelayStateChange(type: Relay.Type, relay: Relay, channel: String?) {
// Releases the Web thread for the new payload. // Releases the Web thread for the new payload.
// May need to add a processing queue if processing new events become too costly. // May need to add a processing queue if processing new events become too costly.
GlobalScope.launch { GlobalScope.launch(Dispatchers.IO) {
listeners.forEach { it.onRelayStateChange(type, relay, channel) } listeners.forEach { it.onRelayStateChange(type, relay, channel) }
} }
} }
@@ -103,7 +104,7 @@ object Client: RelayPool.Listener {
override fun onSendResponse(eventId: String, success: Boolean, message: String, relay: Relay) { override fun onSendResponse(eventId: String, success: Boolean, message: String, relay: Relay) {
// Releases the Web thread for the new payload. // Releases the Web thread for the new payload.
// May need to add a processing queue if processing new events become too costly. // May need to add a processing queue if processing new events become too costly.
GlobalScope.launch { GlobalScope.launch(Dispatchers.IO) {
listeners.forEach { it.onSendResponse(eventId, success, message, relay) } listeners.forEach { it.onSendResponse(eventId, success, message, relay) }
} }
} }