Breakes the Command and Message parsers out of the Relay class

This commit is contained in:
Vitor Pamplona
2025-01-14 11:47:52 -05:00
parent 3ac303c724
commit e36e49cc88
22 changed files with 879 additions and 232 deletions

View File

@@ -21,7 +21,7 @@
package com.vitorpamplona.ammolite.relays package com.vitorpamplona.ammolite.relays
import com.vitorpamplona.ammolite.relays.relays.RelayState import com.vitorpamplona.ammolite.relays.relays.RelayState
import com.vitorpamplona.ammolite.relays.relays.SimpleRelay import com.vitorpamplona.ammolite.relays.relays.SimpleClientRelay
import com.vitorpamplona.ammolite.relays.relays.Subscription import com.vitorpamplona.ammolite.relays.relays.Subscription
import com.vitorpamplona.ammolite.relays.relays.SubscriptionCollection import com.vitorpamplona.ammolite.relays.relays.SubscriptionCollection
import com.vitorpamplona.ammolite.relays.relays.sockets.WebsocketBuilderFactory import com.vitorpamplona.ammolite.relays.relays.sockets.WebsocketBuilderFactory
@@ -96,14 +96,12 @@ class Relay(
val activeTypes: Set<FeedType>, val activeTypes: Set<FeedType>,
socketBuilderFactory: WebsocketBuilderFactory, socketBuilderFactory: WebsocketBuilderFactory,
subs: SubscriptionManager, subs: SubscriptionManager,
) : SimpleRelay.Listener { ) : SimpleClientRelay.Listener {
private var listeners = setOf<Listener>() private var listeners = setOf<Listener>()
val relaySubFilter = RelaySubFilter(url, activeTypes, subs) val relaySubFilter = RelaySubFilter(url, activeTypes, subs)
val inner = val inner =
SimpleRelay(url, socketBuilderFactory.build(forceProxy), relaySubFilter, RelayStats.get(url)).apply { SimpleClientRelay(url, socketBuilderFactory.build(forceProxy), relaySubFilter, this@Relay, RelayStats.get(url))
register(this@Relay)
}
val brief = RelayBriefInfoCache.get(url) val brief = RelayBriefInfoCache.get(url)
@@ -119,11 +117,11 @@ class Relay(
fun connect() = inner.connect() fun connect() = inner.connect()
fun connectAndRun(onConnected: () -> Unit) { fun connectAndRunAfterSync(onConnected: () -> Unit) {
// BRB is crashing OkHttp Deflater object :( // BRB is crashing OkHttp Deflater object :(
if (url.contains("brb.io")) return if (url.contains("brb.io")) return
inner.connectAndRun(onConnected) inner.connectAndRunAfterSync(onConnected)
} }
fun sendOutbox() = inner.sendOutbox() fun sendOutbox() = inner.sendOutbox()
@@ -135,13 +133,13 @@ class Relay(
filters: List<TypedFilter>, filters: List<TypedFilter>,
) { ) {
if (read) { if (read) {
inner.sendFilter(requestId, relaySubFilter.filter(filters)) inner.sendRequest(requestId, relaySubFilter.filter(filters))
} }
} }
fun connectAndSendFiltersIfDisconnected() = inner.connectAndSendFiltersIfDisconnected() fun connectAndSendFiltersIfDisconnected() = inner.connectAndSendFiltersIfDisconnected()
fun renewFilters() = inner.renewFilters() fun renewFilters() = inner.renewSubscriptions()
fun sendOverride(signedEvent: Event) = inner.send(signedEvent) fun sendOverride(signedEvent: Event) = inner.send(signedEvent)
@@ -161,53 +159,59 @@ class Relay(
activeTypes == other.feedTypes activeTypes == other.feedTypes
override fun onEvent( override fun onEvent(
relay: SimpleRelay, relay: SimpleClientRelay,
subscriptionId: String, subscriptionId: String,
event: Event, event: Event,
afterEOSE: Boolean, afterEOSE: Boolean,
) = listeners.forEach { it.onEvent(this, subscriptionId, event, afterEOSE) } ) = listeners.forEach { it.onEvent(this, subscriptionId, event, afterEOSE) }
override fun onError( override fun onError(
relay: SimpleRelay, relay: SimpleClientRelay,
subscriptionId: String, subscriptionId: String,
error: Error, error: Error,
) = listeners.forEach { it.onError(this, subscriptionId, error) } ) = listeners.forEach { it.onError(this, subscriptionId, error) }
override fun onEOSE( override fun onEOSE(
relay: SimpleRelay, relay: SimpleClientRelay,
subscriptionId: String, subscriptionId: String,
) = listeners.forEach { it.onEOSE(this, subscriptionId) } ) = listeners.forEach { it.onEOSE(this, subscriptionId) }
override fun onRelayStateChange( override fun onRelayStateChange(
relay: SimpleRelay, relay: SimpleClientRelay,
type: RelayState, type: RelayState,
) = listeners.forEach { it.onRelayStateChange(this, type) } ) = listeners.forEach { it.onRelayStateChange(this, type) }
override fun onSendResponse( override fun onSendResponse(
relay: SimpleRelay, relay: SimpleClientRelay,
eventId: String, eventId: String,
success: Boolean, success: Boolean,
message: String, message: String,
) = listeners.forEach { it.onSendResponse(this, eventId, success, message) } ) = listeners.forEach { it.onSendResponse(this, eventId, success, message) }
override fun onAuth( override fun onAuth(
relay: SimpleRelay, relay: SimpleClientRelay,
challenge: String, challenge: String,
) = listeners.forEach { it.onAuth(this, challenge) } ) = listeners.forEach { it.onAuth(this, challenge) }
override fun onNotify( override fun onNotify(
relay: SimpleRelay, relay: SimpleClientRelay,
description: String, description: String,
) = listeners.forEach { it.onNotify(this, description) } ) = listeners.forEach { it.onNotify(this, description) }
override fun onClosed(
relay: SimpleClientRelay,
subscriptionId: String,
message: String,
) { }
override fun onSend( override fun onSend(
relay: SimpleRelay, relay: SimpleClientRelay,
msg: String, msg: String,
success: Boolean, success: Boolean,
) = listeners.forEach { it.onSend(this, msg, success) } ) = listeners.forEach { it.onSend(this, msg, success) }
override fun onBeforeSend( override fun onBeforeSend(
relay: SimpleRelay, relay: SimpleClientRelay,
event: Event, event: Event,
) = listeners.forEach { it.onBeforeSend(this, event) } ) = listeners.forEach { it.onBeforeSend(this, event) }

View File

@@ -69,10 +69,7 @@ class RelayPool : Relay.Listener {
} else { } else {
addRelay(relay) addRelay(relay)
relay.connectAndRun { relay.connectAndRunAfterSync {
relay.renewFilters()
relay.sendOutbox()
whenConnected(relay) whenConnected(relay)
GlobalScope.launch(Dispatchers.IO) { GlobalScope.launch(Dispatchers.IO) {

View File

@@ -21,25 +21,36 @@
package com.vitorpamplona.ammolite.relays.relays package com.vitorpamplona.ammolite.relays.relays
import android.util.Log import android.util.Log
import com.vitorpamplona.ammolite.relays.relays.commands.toClient.AuthMessage
import com.vitorpamplona.ammolite.relays.relays.commands.toClient.ClosedMessage
import com.vitorpamplona.ammolite.relays.relays.commands.toClient.EoseMessage
import com.vitorpamplona.ammolite.relays.relays.commands.toClient.EventMessage
import com.vitorpamplona.ammolite.relays.relays.commands.toClient.NoticeMessage
import com.vitorpamplona.ammolite.relays.relays.commands.toClient.NotifyMessage
import com.vitorpamplona.ammolite.relays.relays.commands.toClient.OkMessage
import com.vitorpamplona.ammolite.relays.relays.commands.toClient.ToClientParser
import com.vitorpamplona.ammolite.relays.relays.commands.toRelay.AuthCmd
import com.vitorpamplona.ammolite.relays.relays.commands.toRelay.CloseCmd
import com.vitorpamplona.ammolite.relays.relays.commands.toRelay.CountCmd
import com.vitorpamplona.ammolite.relays.relays.commands.toRelay.EventCmd
import com.vitorpamplona.ammolite.relays.relays.commands.toRelay.ReqCmd
import com.vitorpamplona.ammolite.relays.relays.sockets.WebSocket import com.vitorpamplona.ammolite.relays.relays.sockets.WebSocket
import com.vitorpamplona.ammolite.relays.relays.sockets.WebSocketListener import com.vitorpamplona.ammolite.relays.relays.sockets.WebSocketListener
import com.vitorpamplona.ammolite.relays.relays.sockets.WebsocketBuilder import com.vitorpamplona.ammolite.relays.relays.sockets.WebsocketBuilder
import com.vitorpamplona.ammolite.service.checkNotInMainThread
import com.vitorpamplona.quartz.nip01Core.HexKey import com.vitorpamplona.quartz.nip01Core.HexKey
import com.vitorpamplona.quartz.nip01Core.core.Event import com.vitorpamplona.quartz.nip01Core.core.Event
import com.vitorpamplona.quartz.nip01Core.jackson.EventMapper
import com.vitorpamplona.quartz.nip01Core.relays.Filter import com.vitorpamplona.quartz.nip01Core.relays.Filter
import com.vitorpamplona.quartz.nip42RelayAuth.RelayAuthEvent import com.vitorpamplona.quartz.nip42RelayAuth.RelayAuthEvent
import com.vitorpamplona.quartz.utils.TimeUtils import com.vitorpamplona.quartz.utils.TimeUtils
import com.vitorpamplona.quartz.utils.bytesUsedInMemory import com.vitorpamplona.quartz.utils.bytesUsedInMemory
import com.vitorpamplona.quartz.utils.joinToStringLimited
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import kotlin.coroutines.cancellation.CancellationException import kotlin.coroutines.cancellation.CancellationException
class SimpleRelay( class SimpleClientRelay(
val url: String, val url: String,
val socketBuilder: WebsocketBuilder, val socketBuilder: WebsocketBuilder,
val subs: SubscriptionCollection, val subs: SubscriptionCollection,
val listener: Listener,
val stats: RelayStat = RelayStat(), val stats: RelayStat = RelayStat(),
) { ) {
companion object { companion object {
@@ -47,7 +58,6 @@ class SimpleRelay(
const val RECONNECTING_IN_SECONDS = 60 * 3 const val RECONNECTING_IN_SECONDS = 60 * 3
} }
private var listeners = setOf<Listener>()
private var socket: WebSocket? = null private var socket: WebSocket? = null
private var isReady: Boolean = false private var isReady: Boolean = false
private var usingCompression: Boolean = false private var usingCompression: Boolean = false
@@ -56,31 +66,29 @@ class SimpleRelay(
private var afterEOSEPerSubscription = mutableMapOf<String, Boolean>() private var afterEOSEPerSubscription = mutableMapOf<String, Boolean>()
private val authResponse = mutableMapOf<HexKey, Boolean>() private val authResponseWatcher = mutableMapOf<HexKey, Boolean>()
private val authChallengesSent = mutableSetOf<String>() private val authChallengesSent = mutableSetOf<String>()
/**
* Auth procedures require us to keep track of the outgoing events
* to make sure the relay waits for the auth to finish and send them.
*/
private val outboxCache = mutableMapOf<HexKey, Event>() private val outboxCache = mutableMapOf<HexKey, Event>()
private var connectingMutex = AtomicBoolean() private var connectingMutex = AtomicBoolean()
fun register(listener: Listener) { private val parser = ToClientParser()
listeners = listeners.plus(listener)
}
fun unregister(listener: Listener) { fun isConnectionStarted(): Boolean = socket != null
listeners = listeners.minus(listener)
}
fun isConnected(): Boolean = socket != null fun isConnected(): Boolean = socket != null && isReady
fun connect() { fun connect() = connectAndRunOverride(::sendEverything)
connectAndRun {
checkNotInMainThread()
// Sends everything. fun sendEverything() {
renewFilters() renewSubscriptions()
sendOutbox() sendOutbox()
} }
}
fun sendOutbox() { fun sendOutbox() {
synchronized(outboxCache) { synchronized(outboxCache) {
@@ -90,7 +98,14 @@ class SimpleRelay(
} }
} }
fun connectAndRun(onConnected: () -> Unit) { fun connectAndRunAfterSync(onConnected: () -> Unit) {
connectAndRunOverride {
sendEverything()
onConnected()
}
}
fun connectAndRunOverride(onConnected: () -> Unit) {
Log.d("Relay", "Relay.connect $url isAlreadyConnecting: ${connectingMutex.get()}") Log.d("Relay", "Relay.connect $url isAlreadyConnecting: ${connectingMutex.get()}")
// If there is a connection, don't wait. // If there is a connection, don't wait.
@@ -99,8 +114,6 @@ class SimpleRelay(
} }
try { try {
checkNotInMainThread()
if (socket != null) { if (socket != null) {
connectingMutex.set(false) connectingMutex.set(false)
return return
@@ -129,7 +142,6 @@ class SimpleRelay(
pingMillis: Long, pingMillis: Long,
compression: Boolean, compression: Boolean,
) { ) {
checkNotInMainThread()
Log.d("Relay", "Connect onOpen $url $socket") Log.d("Relay", "Connect onOpen $url $socket")
markConnectionAsReady(pingMillis, compression) markConnectionAsReady(pingMillis, compression)
@@ -137,22 +149,19 @@ class SimpleRelay(
// Log.w("Relay", "Relay OnOpen, Loading All subscriptions $url") // Log.w("Relay", "Relay OnOpen, Loading All subscriptions $url")
onConnected() onConnected()
listeners.forEach { it.onRelayStateChange(this@SimpleRelay, RelayState.CONNECTED) } listener.onRelayStateChange(this@SimpleClientRelay, RelayState.CONNECTED)
} }
override fun onMessage(text: String) { override fun onMessage(text: String) {
checkNotInMainThread()
stats.addBytesReceived(text.bytesUsedInMemory()) stats.addBytesReceived(text.bytesUsedInMemory())
try { try {
processNewRelayMessage(text) processNewRelayMessage(text)
} catch (e: Throwable) { } catch (e: Throwable) {
if (e is CancellationException) throw e if (e is CancellationException) throw e
e.printStackTrace() stats.newError("Error processing: $text")
text.chunked(2000) { chunked -> Log.e("Relay", "Error processing: $text")
listeners.forEach { it.onError(this@SimpleRelay, "", Error("Problem with $chunked")) } listener.onError(this@SimpleClientRelay, "", Error("Error processing $text"))
}
} }
} }
@@ -160,55 +169,45 @@ class SimpleRelay(
code: Int, code: Int,
reason: String, reason: String,
) { ) {
checkNotInMainThread()
Log.w("Relay", "Relay onClosing $url: $reason") Log.w("Relay", "Relay onClosing $url: $reason")
listeners.forEach { listener.onRelayStateChange(this@SimpleClientRelay, RelayState.DISCONNECTING)
it.onRelayStateChange(this@SimpleRelay, RelayState.DISCONNECTING)
}
} }
override fun onClosed( override fun onClosed(
code: Int, code: Int,
reason: String, reason: String,
) { ) {
checkNotInMainThread()
markConnectionAsClosed() markConnectionAsClosed()
Log.w("Relay", "Relay onClosed $url: $reason") Log.w("Relay", "Relay onClosed $url: $reason")
listeners.forEach { it.onRelayStateChange(this@SimpleRelay, RelayState.DISCONNECTED) } listener.onRelayStateChange(this@SimpleClientRelay, RelayState.DISCONNECTED)
} }
override fun onFailure( override fun onFailure(
t: Throwable, t: Throwable,
responseMessage: String?, response: String?,
) { ) {
checkNotInMainThread()
socket?.cancel() // 1000, "Normal close" socket?.cancel() // 1000, "Normal close"
// checks if this is an actual failure. Closing the socket generates an onFailure as well. // checks if this is an actual failure. Closing the socket generates an onFailure as well.
if (!(socket == null && (t.message == "Socket is closed" || t.message == "Socket closed"))) { if (!(socket == null && (t.message == "Socket is closed" || t.message == "Socket closed"))) {
stats.newError(responseMessage ?: t.message ?: "onFailure event from server: ${t.javaClass.simpleName}") stats.newError(response ?: t.message ?: "onFailure event from server: ${t.javaClass.simpleName}")
} }
// Failures disconnect the relay. // Failures disconnect the relay.
markConnectionAsClosed() markConnectionAsClosed()
Log.w("Relay", "Relay onFailure $url, $responseMessage $responseMessage ${t.message} $socket") Log.w("Relay", "Relay onFailure $url, $response $response ${t.message} $socket")
t.printStackTrace() t.printStackTrace()
listeners.forEach { listener.onError(
it.onError( this@SimpleClientRelay,
this@SimpleRelay,
"", "",
Error("WebSocket Failure. Response: $responseMessage. Exception: ${t.message}", t), Error("WebSocket Failure. Response: $response. Exception: ${t.message}", t),
) )
} }
} }
}
fun markConnectionAsReady( fun markConnectionAsReady(
pingInMs: Long, pingInMs: Long,
@@ -229,115 +228,69 @@ class SimpleRelay(
} }
fun processNewRelayMessage(newMessage: String) { fun processNewRelayMessage(newMessage: String) {
val msgArray = EventMapper.mapper.readTree(newMessage) when (val msg = parser.parse(newMessage)) {
is EventMessage -> {
when (val type = msgArray.get(0).asText()) { // Log.w("Relay", "Relay onEVENT $url $newMessage")
"EVENT" -> { listener.onEvent(this, msg.subId, msg.event, afterEOSEPerSubscription[msg.subId] == true)
val subscriptionId = msgArray.get(1).asText()
val event = EventMapper.fromJson(msgArray.get(2))
// Log.w("Relay", "Relay onEVENT ${event.kind} $url, $subscriptionId ${msgArray.get(2)}")
// filter results: subs.isActive(subscriptionId) && subs.match(subscriptionId, event)
if (true) {
listeners.forEach {
it.onEvent(
this@SimpleRelay,
subscriptionId,
event,
afterEOSEPerSubscription[subscriptionId] == true,
)
} }
} else { is EoseMessage -> {
val filter = // Log.w("Relay", "Relay onEOSE $url $newMessage")
subs.getFilters(subscriptionId).joinToStringLimited( afterEOSEPerSubscription[msg.subId] = true
separator = ",", listener.onEOSE(this@SimpleClientRelay, msg.subId)
limit = 19,
prefix = """["REQ","$subscriptionId",""",
postfix = "]",
) {
it.toJson()
} }
is NoticeMessage -> {
Log.w("Relay", "Subscription $filter is not active or the filter does not match the event ${msgArray.get(2)}") // Log.w("Relay", "Relay onNotice $url, $newMessage")
stats.newNotice(msg.message)
listener.onError(this@SimpleClientRelay, msg.message, Error("Relay sent notice: $msg.message"))
} }
} is OkMessage -> {
"EOSE" -> Log.w("Relay", "Relay on OK $url, $newMessage")
listeners.forEach {
val subscriptionId = msgArray.get(1).asText()
afterEOSEPerSubscription[subscriptionId] = true // if this is the OK of an auth event, renew all subscriptions and resend all outgoing events.
// Log.w("Relay", "Relay onEOSE $url $subscriptionId") if (authResponseWatcher.containsKey(msg.eventId)) {
it.onEOSE(this@SimpleRelay, subscriptionId) val wasAlreadyAuthenticated = authResponseWatcher.get(msg.eventId)
} authResponseWatcher.put(msg.eventId, msg.success)
"NOTICE" -> if (wasAlreadyAuthenticated != true && msg.success) {
listeners.forEach { sendEverything()
val message = msgArray.get(1).asText()
Log.w("Relay", "Relay onNotice $url, $message")
stats.newNotice(message)
it.onError(this@SimpleRelay, message, Error("Relay sent notice: $message"))
}
"OK" ->
listeners.forEach {
val eventId = msgArray[1].asText()
val success = msgArray[2].asBoolean()
val message = if (msgArray.size() > 2) msgArray[3].asText() else ""
Log.w("Relay", "Relay on OK $url, $eventId, $success, $message")
if (authResponse.containsKey(eventId)) {
val wasAlreadyAuthenticated = authResponse.get(eventId)
authResponse.put(eventId, success)
if (wasAlreadyAuthenticated != true && success) {
renewFilters()
sendOutbox()
} }
} }
if (outboxCache.contains(eventId) && !message.startsWith("auth-required")) { // remove from cache for any error that is not an auth required error.
// for auth required, we will do the auth and try to send again.
if (outboxCache.contains(msg.eventId) && !msg.message.startsWith("auth-required")) {
synchronized(outboxCache) { synchronized(outboxCache) {
outboxCache.remove(eventId) outboxCache.remove(msg.eventId)
} }
} }
if (!success) { if (!msg.success) {
stats.newNotice("Failed to receive $eventId: $message") stats.newNotice("Rejected event $msg.eventId: $msg.message")
} }
it.onSendResponse(this@SimpleRelay, eventId, success, message) listener.onSendResponse(this@SimpleClientRelay, msg.eventId, msg.success, msg.message)
} }
"AUTH" -> is AuthMessage -> {
listeners.forEach { // Log.d("Relay", "Relay onAuth $url, $newMessage")
Log.w("Relay", "Relay onAuth $url, ${ msgArray[1].asText()}") listener.onAuth(this@SimpleClientRelay, msg.challenge)
it.onAuth(this@SimpleRelay, msgArray[1].asText())
} }
"NOTIFY" -> is NotifyMessage -> {
listeners.forEach { // Log.w("Relay", "Relay onNotify $url, $newMessage")
// Log.w("Relay", "Relay onNotify $url, ${msg[1].asString}") listener.onNotify(this@SimpleClientRelay, msg.message)
it.onNotify(this@SimpleRelay, msgArray[1].asText()) }
is ClosedMessage -> {
// Log.w("Relay", "Relay Closed Subscription $url, $newMessage")
listener.onClosed(this@SimpleClientRelay, msg.subscriptionId, msg.message)
} }
"CLOSED" -> listeners.forEach { Log.w("Relay", "Relay Closed Subscription $url, $newMessage") }
else -> { else -> {
stats.newError("Unsupported message: $newMessage") stats.newError("Unsupported message: $newMessage")
listeners.forEach {
Log.w("Relay", "Unsupported message: $newMessage") Log.w("Relay", "Unsupported message: $newMessage")
it.onError( listener.onError(this, "", Error("Unsupported message: $newMessage"))
this@SimpleRelay,
"",
Error("Unknown type $type on channel. Msg was $newMessage"),
)
}
} }
} }
} }
fun disconnect() { fun disconnect() {
Log.d("Relay", "Relay.disconnect $url") Log.d("Relay", "Relay.disconnect $url")
checkNotInMainThread()
lastConnectTentative = 0L // this is not an error, so prepare to reconnect as soon as requested. lastConnectTentative = 0L // this is not an error, so prepare to reconnect as soon as requested.
socket?.cancel() socket?.cancel()
socket = null socket = null
@@ -349,30 +302,38 @@ class SimpleRelay(
fun resetEOSEStatuses() { fun resetEOSEStatuses() {
afterEOSEPerSubscription = LinkedHashMap(afterEOSEPerSubscription.size) afterEOSEPerSubscription = LinkedHashMap(afterEOSEPerSubscription.size)
authResponse.clear() authResponseWatcher.clear()
authChallengesSent.clear() authChallengesSent.clear()
} }
fun sendFilter( fun sendRequest(
requestId: String, requestId: String,
filters: List<Filter>, filters: List<Filter>,
) { ) {
checkNotInMainThread() if (isConnectionStarted()) {
if (isConnected()) {
if (isReady) { if (isReady) {
if (filters.isNotEmpty()) { if (filters.isNotEmpty()) {
writeToSocket( writeToSocket(ReqCmd.toJson(requestId, filters))
filters.joinToStringLimited( afterEOSEPerSubscription[requestId] = false
separator = ",", }
limit = 19, }
prefix = """["REQ","$requestId",""", } else {
postfix = "]", // waits 60 seconds to reconnect after disconnected.
) { if (TimeUtils.now() > lastConnectTentative + RECONNECTING_IN_SECONDS) {
it.toJson() // sends all filters after connection is successful.
}, connect()
) }
}
}
fun sendCount(
requestId: String,
filters: List<Filter>,
) {
if (isConnectionStarted()) {
if (isReady) {
if (filters.isNotEmpty()) {
writeToSocket(CountCmd.toJson(requestId, filters))
afterEOSEPerSubscription[requestId] = false afterEOSEPerSubscription[requestId] = false
} }
} }
@@ -386,30 +347,23 @@ class SimpleRelay(
} }
fun connectAndSendFiltersIfDisconnected() { fun connectAndSendFiltersIfDisconnected() {
checkNotInMainThread()
if (socket == null) { if (socket == null) {
// waits 60 seconds to reconnect after disconnected. // waits 60 seconds to reconnect after disconnected.
if (TimeUtils.now() > lastConnectTentative + RECONNECTING_IN_SECONDS) { if (TimeUtils.now() > lastConnectTentative + RECONNECTING_IN_SECONDS) {
// println("sendfilter Only if Disconnected ${url} ")
connect() connect()
} }
} }
} }
fun renewFilters() { fun renewSubscriptions() {
// Force update all filters after AUTH. // Force update all filters after AUTH.
subs.allSubscriptions().forEach { subs.allSubscriptions().forEach {
sendFilter(requestId = it.id, it.filters) sendRequest(requestId = it.id, it.filters)
} }
} }
fun send(signedEvent: Event) { fun send(signedEvent: Event) {
checkNotInMainThread() listener.onBeforeSend(this@SimpleClientRelay, signedEvent)
listeners.forEach { listener ->
listener.onBeforeSend(this@SimpleRelay, signedEvent)
}
if (signedEvent is RelayAuthEvent) { if (signedEvent is RelayAuthEvent) {
sendAuth(signedEvent) sendAuth(signedEvent)
@@ -428,9 +382,9 @@ class SimpleRelay(
// 4. auth is sent // 4. auth is sent
// ... // ...
if (!authChallengesSent.contains(challenge)) { if (!authChallengesSent.contains(challenge)) {
authResponse.put(signedEvent.id, false) authResponseWatcher[signedEvent.id] = false
authChallengesSent.add(challenge) authChallengesSent.add(challenge)
writeToSocket("""["AUTH",${signedEvent.toJson()}]""") writeToSocket(AuthCmd.toJson(signedEvent))
} }
} }
@@ -439,36 +393,27 @@ class SimpleRelay(
outboxCache.put(signedEvent.id, signedEvent) outboxCache.put(signedEvent.id, signedEvent)
} }
if (isConnected()) { if (isConnectionStarted()) {
if (isReady) { if (isReady) {
writeToSocket("""["EVENT",${signedEvent.toJson()}]""") writeToSocket(EventCmd.toJson(signedEvent))
} }
} else { } else {
// sends all filters after connection is successful. // automatically sends all filters after connection is successful.
connectAndRun { connect()
// Sends everything.
renewFilters()
sendOutbox()
}
} }
} }
private fun writeToSocket(str: String) { private fun writeToSocket(str: String) {
if (socket == null) { if (socket == null) {
listeners.forEach { listener ->
listener.onError( listener.onError(
this@SimpleRelay, this@SimpleClientRelay,
"", "",
Error("Failed to send $str. Relay is not connected."), Error("Failed to send $str. Relay is not connected."),
) )
} }
}
socket?.let { socket?.let {
checkNotInMainThread()
val result = it.send(str) val result = it.send(str)
listeners.forEach { listener -> listener.onSend(this@SimpleClientRelay, str, result)
listener.onSend(this@SimpleRelay, str, result)
}
stats.addBytesSent(str.bytesUsedInMemory()) stats.addBytesSent(str.bytesUsedInMemory())
Log.d("Relay", "Relay send $url (${str.length} chars) $str") Log.d("Relay", "Relay send $url (${str.length} chars) $str")
@@ -476,60 +421,65 @@ class SimpleRelay(
} }
fun close(subscriptionId: String) { fun close(subscriptionId: String) {
writeToSocket("""["CLOSE","$subscriptionId"]""") writeToSocket(CloseCmd.toJson(subscriptionId))
} }
interface Listener { interface Listener {
fun onEvent( fun onEvent(
relay: SimpleRelay, relay: SimpleClientRelay,
subscriptionId: String, subscriptionId: String,
event: Event, event: Event,
afterEOSE: Boolean, afterEOSE: Boolean,
) )
fun onEOSE( fun onEOSE(
relay: SimpleRelay, relay: SimpleClientRelay,
subscriptionId: String, subscriptionId: String,
) )
fun onError( fun onError(
relay: SimpleRelay, relay: SimpleClientRelay,
subscriptionId: String, subscriptionId: String,
error: Error, error: Error,
) )
fun onSendResponse(
relay: SimpleRelay,
eventId: String,
success: Boolean,
message: String,
)
fun onAuth( fun onAuth(
relay: SimpleRelay, relay: SimpleClientRelay,
challenge: String, challenge: String,
) )
fun onRelayStateChange( fun onRelayStateChange(
relay: SimpleRelay, relay: SimpleClientRelay,
type: RelayState, type: RelayState,
) )
/** Relay sent a notification */
fun onNotify( fun onNotify(
relay: SimpleRelay, relay: SimpleClientRelay,
description: String, description: String,
) )
fun onClosed(
relay: SimpleClientRelay,
subscriptionId: String,
message: String,
)
fun onBeforeSend( fun onBeforeSend(
relay: SimpleRelay, relay: SimpleClientRelay,
event: Event, event: Event,
) )
fun onSend( fun onSend(
relay: SimpleRelay, relay: SimpleClientRelay,
msg: String, msg: String,
success: Boolean, success: Boolean,
) )
fun onSendResponse(
relay: SimpleClientRelay,
eventId: String,
success: Boolean,
message: String,
)
} }
} }

View File

@@ -0,0 +1,37 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays.relays.commands.toClient
import com.fasterxml.jackson.databind.JsonNode
class AuthMessage(
val challenge: String,
) : Message {
companion object {
const val LABEL = "AUTH"
@JvmStatic
fun parse(msgArray: JsonNode): AuthMessage =
AuthMessage(
msgArray.get(1).asText(),
)
}
}

View File

@@ -0,0 +1,39 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays.relays.commands.toClient
import com.fasterxml.jackson.databind.JsonNode
class ClosedMessage(
val subscriptionId: String,
val message: String,
) : Message {
companion object {
const val LABEL = "CLOSED"
@JvmStatic
fun parse(msgArray: JsonNode): ClosedMessage =
ClosedMessage(
msgArray[1].asText(),
msgArray[2].asText(),
)
}
}

View File

@@ -0,0 +1,37 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays.relays.commands.toClient
import com.fasterxml.jackson.databind.JsonNode
class EoseMessage(
val subId: String,
) : Message {
companion object {
const val LABEL = "EOSE"
@JvmStatic
fun parse(msgArray: JsonNode): EoseMessage =
EoseMessage(
msgArray.get(1).asText(),
)
}
}

View File

@@ -0,0 +1,41 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays.relays.commands.toClient
import com.fasterxml.jackson.databind.JsonNode
import com.vitorpamplona.quartz.nip01Core.core.Event
import com.vitorpamplona.quartz.nip01Core.jackson.EventMapper
class EventMessage(
val subId: String,
val event: Event,
) : Message {
companion object {
const val LABEL = "EVENT"
@JvmStatic
fun parse(msgArray: JsonNode): EventMessage =
EventMessage(
msgArray.get(1).asText(),
EventMapper.fromJson(msgArray.get(2)),
)
}
}

View File

@@ -0,0 +1,23 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays.relays.commands.toClient
interface Message

View File

@@ -0,0 +1,37 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays.relays.commands.toClient
import com.fasterxml.jackson.databind.JsonNode
class NoticeMessage(
val message: String,
) : Message {
companion object {
const val LABEL = "NOTICE"
@JvmStatic
fun parse(msgArray: JsonNode): NoticeMessage =
NoticeMessage(
msgArray.get(1).asText(),
)
}
}

View File

@@ -0,0 +1,37 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays.relays.commands.toClient
import com.fasterxml.jackson.databind.JsonNode
class NotifyMessage(
val message: String,
) : Message {
companion object {
const val LABEL = "NOTIFY"
@JvmStatic
fun parse(msgArray: JsonNode): NotifyMessage =
NotifyMessage(
msgArray.get(1).asText(),
)
}
}

View File

@@ -0,0 +1,42 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays.relays.commands.toClient
import com.fasterxml.jackson.databind.JsonNode
import com.vitorpamplona.quartz.nip01Core.HexKey
class OkMessage(
val eventId: HexKey,
val success: Boolean,
val message: String,
) : Message {
companion object {
const val LABEL = "OK"
@JvmStatic
fun parse(msgArray: JsonNode): OkMessage =
OkMessage(
msgArray[1].asText(),
msgArray[2].asBoolean(),
if (msgArray.size() > 2) msgArray[3].asText() else "",
)
}
}

View File

@@ -0,0 +1,40 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays.relays.commands.toClient
import com.vitorpamplona.quartz.nip01Core.jackson.EventMapper
class ToClientParser {
fun parse(newMessage: String): Message? {
val msgArray = EventMapper.mapper.readTree(newMessage)
val type = msgArray.get(0).asText()
return when (type) {
EventMessage.LABEL -> EventMessage.parse(msgArray)
EoseMessage.LABEL -> EoseMessage.parse(msgArray)
NoticeMessage.LABEL -> NoticeMessage.parse(msgArray)
OkMessage.LABEL -> OkMessage.parse(msgArray)
AuthMessage.LABEL -> AuthMessage.parse(msgArray)
NotifyMessage.LABEL -> NotifyMessage.parse(msgArray)
ClosedMessage.LABEL -> ClosedMessage.parse(msgArray)
else -> null
}
}
}

View File

@@ -0,0 +1,42 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays.relays.commands.toRelay
import com.fasterxml.jackson.databind.JsonNode
import com.vitorpamplona.quartz.nip01Core.jackson.EventMapper
import com.vitorpamplona.quartz.nip42RelayAuth.RelayAuthEvent
class AuthCmd(
val event: RelayAuthEvent,
) : Command {
companion object {
const val LABEL = "AUTH"
@JvmStatic
fun toJson(authEvent: RelayAuthEvent): String = """["AUTH",${authEvent.toJson()}]"""
@JvmStatic
fun parse(msgArray: JsonNode): AuthCmd =
AuthCmd(
EventMapper.fromJson(msgArray.get(1)) as RelayAuthEvent,
)
}
}

View File

@@ -0,0 +1,40 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays.relays.commands.toRelay
import com.fasterxml.jackson.databind.JsonNode
class CloseCmd(
val subscriptionId: String,
) : Command {
companion object {
const val LABEL = "CLOSE"
@JvmStatic
fun toJson(subscriptionId: String): String = """["CLOSE","$subscriptionId"]"""
@JvmStatic
fun parse(msgArray: JsonNode): CloseCmd =
CloseCmd(
msgArray.get(1).asText(),
)
}
}

View File

@@ -0,0 +1,23 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays.relays.commands.toRelay
interface Command

View File

@@ -0,0 +1,65 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays.relays.commands.toRelay
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
import com.vitorpamplona.quartz.nip01Core.jackson.EventMapper
import com.vitorpamplona.quartz.nip01Core.relays.Filter
import com.vitorpamplona.quartz.nip01Core.relays.FilterDeserializer
import com.vitorpamplona.quartz.utils.joinToStringLimited
class CountCmd(
val subscriptionId: String,
val filters: List<Filter>,
) : Command {
companion object {
const val LABEL = "COUNT"
@JvmStatic
fun toJson(
requestId: String,
filters: List<Filter>,
): String =
filters.joinToStringLimited(
separator = ",",
limit = 19,
prefix = """["COUNT","$requestId",""",
postfix = "]",
) {
it.toJson()
}
@JvmStatic
fun parse(msgArray: JsonNode): CountCmd {
val filters = mutableListOf<Filter>()
for (i in 2 until msgArray.size()) {
val json = EventMapper.mapper.readTree(msgArray.get(i).asText())
if (json is ObjectNode) {
filters.add(FilterDeserializer.fromJson(json))
}
}
return CountCmd(msgArray.get(1).asText(), filters)
}
}
}

View File

@@ -0,0 +1,42 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays.relays.commands.toRelay
import com.fasterxml.jackson.databind.JsonNode
import com.vitorpamplona.quartz.nip01Core.core.Event
import com.vitorpamplona.quartz.nip01Core.jackson.EventMapper
class EventCmd(
val event: Event,
) : Command {
companion object {
const val LABEL = "EVENT"
@JvmStatic
fun toJson(event: Event): String = """["EVENT",${event.toJson()}]"""
@JvmStatic
fun parse(msgArray: JsonNode): EventCmd =
EventCmd(
EventMapper.fromJson(msgArray.get(1)),
)
}
}

View File

@@ -0,0 +1,66 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays.relays.commands.toRelay
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
import com.vitorpamplona.quartz.nip01Core.jackson.EventMapper
import com.vitorpamplona.quartz.nip01Core.relays.Filter
import com.vitorpamplona.quartz.nip01Core.relays.FilterDeserializer
import com.vitorpamplona.quartz.utils.joinToStringLimited
class ReqCmd(
val subscriptionId: String,
val filters: List<Filter>,
) : Command {
companion object {
const val LABEL = "REQ"
@JvmStatic
fun toJson(
requestId: String,
filters: List<Filter>,
limit: Int = 19,
): String =
filters.joinToStringLimited(
separator = ",",
limit = limit,
prefix = """["REQ","$requestId",""",
postfix = "]",
) {
it.toJson()
}
@JvmStatic
fun parse(msgArray: JsonNode): ReqCmd {
val filters = mutableListOf<Filter>()
for (i in 2 until msgArray.size()) {
val json = EventMapper.mapper.readTree(msgArray.get(i).asText())
if (json is ObjectNode) {
filters.add(FilterDeserializer.fromJson(json))
}
}
return ReqCmd(msgArray.get(1).asText(), filters)
}
}
}

View File

@@ -0,0 +1,38 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays.relays.commands.toRelay
import com.vitorpamplona.quartz.nip01Core.jackson.EventMapper
class ToRelayParser {
fun parse(newMessage: String): Command? {
val msgArray = EventMapper.mapper.readTree(newMessage)
val type = msgArray.get(0).asText()
return when (type) {
AuthCmd.LABEL -> AuthCmd.parse(msgArray)
CloseCmd.LABEL -> CloseCmd.parse(msgArray)
CountCmd.LABEL -> CountCmd.parse(msgArray)
EventCmd.LABEL -> EventCmd.parse(msgArray)
ReqCmd.LABEL -> ReqCmd.parse(msgArray)
else -> null
}
}
}

View File

@@ -40,7 +40,7 @@ import org.junit.runner.RunWith
* result. Modify your code to see how it affects performance. * result. Modify your code to see how it affects performance.
*/ */
@RunWith(AndroidJUnit4::class) @RunWith(AndroidJUnit4::class)
class EventHasherBenchmark { class EventCmdHasherBenchmark {
@get:Rule val benchmarkRule = BenchmarkRule() @get:Rule val benchmarkRule = BenchmarkRule()
@Test @Test

View File

@@ -38,7 +38,7 @@ import java.security.MessageDigest
* result. Modify your code to see how it affects performance. * result. Modify your code to see how it affects performance.
*/ */
@RunWith(AndroidJUnit4::class) @RunWith(AndroidJUnit4::class)
class EventSerializerBenchmark { class EventCmdSerializerBenchmark {
@get:Rule val benchmarkRule = BenchmarkRule() @get:Rule val benchmarkRule = BenchmarkRule()
@Test @Test

View File

@@ -0,0 +1,47 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.quartz.nip01Core.relays
import com.fasterxml.jackson.databind.node.ObjectNode
class FilterDeserializer {
companion object {
fun fromJson(jsonObject: ObjectNode): Filter {
val tags = mutableListOf<String>()
jsonObject.fieldNames().forEach {
if (it.startsWith("#")) {
tags.add(it.substring(1))
}
}
return Filter(
ids = jsonObject.get("ids").map { it.asText() },
authors = jsonObject.get("authors").map { it.asText() },
kinds = jsonObject.get("kinds").map { it.asInt() },
tags = tags.associateWith { jsonObject.get(it).map { it.asText() } },
since = jsonObject.get("since").asLong(),
until = jsonObject.get("until").asLong(),
limit = jsonObject.get("limit").asInt(),
search = jsonObject.get("search").asText(),
)
}
}
}