Moving Incoming Payload processing to the Global Thread to release Web Threads faster.

This commit is contained in:
Vitor Pamplona
2023-02-14 09:17:12 -05:00
parent 385a1c3849
commit 743c23f3c8
2 changed files with 23 additions and 8 deletions

View File

@@ -1,6 +1,8 @@
package com.vitorpamplona.amethyst.service.relays package com.vitorpamplona.amethyst.service.relays
import java.util.UUID import java.util.UUID
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import nostr.postr.events.Event import nostr.postr.events.Event
/** /**
@@ -50,7 +52,7 @@ object Client: RelayPool.Listener {
RelayPool.send(signedEvent) RelayPool.send(signedEvent)
} }
fun close(subscriptionId: String){ fun close(subscriptionId: String) {
RelayPool.close(subscriptionId) RelayPool.close(subscriptionId)
} }
@@ -61,19 +63,35 @@ object Client: RelayPool.Listener {
} }
override fun onEvent(event: Event, subscriptionId: String, relay: Relay) { override fun onEvent(event: Event, subscriptionId: String, relay: Relay) {
listeners.forEach { it.onEvent(event, subscriptionId, relay) } // Releases the Web thread for the new payload.
// May need to add a processing queue if processing new events become too costly.
GlobalScope.launch {
listeners.forEach { it.onEvent(event, subscriptionId, relay) }
}
} }
override fun onError(error: Error, subscriptionId: String, relay: Relay) { override fun onError(error: Error, subscriptionId: String, relay: Relay) {
listeners.forEach { it.onError(error, subscriptionId, relay) } // Releases the Web thread for the new payload.
// May need to add a processing queue if processing new events become too costly.
GlobalScope.launch {
listeners.forEach { it.onError(error, subscriptionId, relay) }
}
} }
override fun onRelayStateChange(type: Relay.Type, relay: Relay, channel: String?) { override fun onRelayStateChange(type: Relay.Type, relay: Relay, channel: String?) {
listeners.forEach { it.onRelayStateChange(type, relay, channel) } // Releases the Web thread for the new payload.
// May need to add a processing queue if processing new events become too costly.
GlobalScope.launch {
listeners.forEach { it.onRelayStateChange(type, relay, channel) }
}
} }
override fun onSendResponse(eventId: String, success: Boolean, message: String, relay: Relay) { override fun onSendResponse(eventId: String, success: Boolean, message: String, relay: Relay) {
listeners.forEach { it.onSendResponse(eventId, success, message, relay) } // Releases the Web thread for the new payload.
// May need to add a processing queue if processing new events become too costly.
GlobalScope.launch {
listeners.forEach { it.onSendResponse(eventId, success, message, relay) }
}
} }
fun subscribe(listener: Listener) { fun subscribe(listener: Listener) {

View File

@@ -22,9 +22,6 @@ class Relay(
var activeTypes: Set<FeedType> = FeedType.values().toSet(), var activeTypes: Set<FeedType> = FeedType.values().toSet(),
) { ) {
private val httpClient = OkHttpClient.Builder() private val httpClient = OkHttpClient.Builder()
.connectTimeout(100, TimeUnit.SECONDS)
.readTimeout(100, TimeUnit.SECONDS)
.callTimeout(100, TimeUnit.SECONDS)
.followRedirects(true) .followRedirects(true)
.followSslRedirects(true) .followSslRedirects(true)
.build(); .build();