Separates Typed Relay class from a SimpleRelay class to prepare to move to Quartz

This commit is contained in:
Vitor Pamplona 2025-01-13 19:15:20 -05:00
parent 9b0f24ba94
commit e3e90229ce
32 changed files with 984 additions and 618 deletions

View File

@ -55,7 +55,7 @@ import kotlin.time.measureTimedValue
class Amethyst : Application() {
val applicationIOScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
val client: NostrClient = NostrClient(OkHttpWebSocket.Builder())
val client: NostrClient = NostrClient(OkHttpWebSocket.BuilderFactory())
// Service Manager is only active when the activity is active.
val serviceManager = ServiceManager(client, applicationIOScope)

View File

@ -2456,9 +2456,9 @@ object LocalCache {
if (event is AddressableEvent && relay != null) {
// updates relay with a new event.
getAddressableNoteIfExists(event.addressTag())?.let {
it.event?.let { existingEvent ->
if (existingEvent.createdAt > event.createdAt) {
getAddressableNoteIfExists(event.addressTag())?.let { note ->
note.event?.let { existingEvent ->
if (existingEvent.createdAt > event.createdAt && !note.hasRelay(relay)) {
Log.d("LocalCache", "Updating ${relay.url} with a new version of ${event.toJson()} to ${existingEvent.toJson()}")
relay.send(existingEvent)
}

View File

@ -433,6 +433,8 @@ open class Note(
}
}
fun hasRelay(relay: Relay) = relay.brief !in relays
fun addRelay(relay: Relay) {
if (relay.brief !in relays) {
addRelaySync(relay.brief)

View File

@ -307,9 +307,9 @@ class User(
relay: Relay,
eventTime: Long,
) {
val here = relaysBeingUsed[relay.url]
val here = relaysBeingUsed[relay.brief.url]
if (here == null) {
relaysBeingUsed = relaysBeingUsed + Pair(relay.url, RelayInfo(relay.url, eventTime, 1))
relaysBeingUsed = relaysBeingUsed + Pair(relay.brief.url, RelayInfo(relay.brief.url, eventTime, 1))
} else {
if (eventTime > here.lastEvent) {
here.lastEvent = eventTime

View File

@ -20,9 +20,10 @@
*/
package com.vitorpamplona.amethyst.service.okhttp
import com.vitorpamplona.ammolite.sockets.WebSocket
import com.vitorpamplona.ammolite.sockets.WebSocketListener
import com.vitorpamplona.ammolite.sockets.WebsocketBuilder
import com.vitorpamplona.ammolite.relays.relays.sockets.WebSocket
import com.vitorpamplona.ammolite.relays.relays.sockets.WebSocketListener
import com.vitorpamplona.ammolite.relays.relays.sockets.WebsocketBuilder
import com.vitorpamplona.ammolite.relays.relays.sockets.WebsocketBuilderFactory
import okhttp3.Request
import okhttp3.Response
@ -73,14 +74,19 @@ class OkHttpWebSocket(
) = out.onFailure(t, response?.message)
}
class Builder : WebsocketBuilder {
class Builder(
val forceProxy: Boolean,
) : WebsocketBuilder {
override fun build(
url: String,
forceProxy: Boolean,
out: WebSocketListener,
) = OkHttpWebSocket(url, forceProxy, out)
}
class BuilderFactory : WebsocketBuilderFactory {
override fun build(forceProxy: Boolean) = Builder(forceProxy)
}
override fun cancel() {
socket?.cancel()
}

View File

@ -53,7 +53,7 @@ import com.vitorpamplona.amethyst.ui.stringRes
import com.vitorpamplona.amethyst.ui.theme.StdHorzSpacer
import com.vitorpamplona.amethyst.ui.theme.StdVertSpacer
import com.vitorpamplona.amethyst.ui.theme.imageModifier
import com.vitorpamplona.ammolite.relays.RelayStat
import com.vitorpamplona.ammolite.relays.relays.RelayStat
@OptIn(ExperimentalMaterial3Api::class)
@Composable

View File

@ -53,7 +53,7 @@ import com.vitorpamplona.amethyst.ui.stringRes
import com.vitorpamplona.amethyst.ui.theme.StdHorzSpacer
import com.vitorpamplona.amethyst.ui.theme.StdVertSpacer
import com.vitorpamplona.amethyst.ui.theme.imageModifier
import com.vitorpamplona.ammolite.relays.RelayStat
import com.vitorpamplona.ammolite.relays.relays.RelayStat
@OptIn(ExperimentalMaterial3Api::class)
@Composable

View File

@ -61,7 +61,7 @@ import com.vitorpamplona.amethyst.ui.theme.RowColSpacing
import com.vitorpamplona.amethyst.ui.theme.StdHorzSpacer
import com.vitorpamplona.amethyst.ui.theme.grayText
import com.vitorpamplona.ammolite.relays.Constants
import com.vitorpamplona.ammolite.relays.RelayStat
import com.vitorpamplona.ammolite.relays.relays.RelayStat
@Composable
fun AllRelayListView(

View File

@ -23,7 +23,7 @@ package com.vitorpamplona.amethyst.ui.actions.relays
import androidx.compose.runtime.Immutable
import com.vitorpamplona.ammolite.relays.FeedType
import com.vitorpamplona.ammolite.relays.RelayBriefInfoCache
import com.vitorpamplona.ammolite.relays.RelayStat
import com.vitorpamplona.ammolite.relays.relays.RelayStat
import com.vitorpamplona.quartz.nip01Core.HexKey
@Immutable

View File

@ -94,7 +94,7 @@ import com.vitorpamplona.ammolite.relays.Constants
import com.vitorpamplona.ammolite.relays.Constants.activeTypesGlobalChats
import com.vitorpamplona.ammolite.relays.FeedType
import com.vitorpamplona.ammolite.relays.RelayBriefInfoCache
import com.vitorpamplona.ammolite.relays.RelayStat
import com.vitorpamplona.ammolite.relays.relays.RelayStat
import com.vitorpamplona.quartz.nip65RelayList.RelayUrlFormatter
import kotlinx.coroutines.launch

View File

@ -63,7 +63,7 @@ import com.vitorpamplona.amethyst.ui.theme.ThemeComparisonColumn
import com.vitorpamplona.amethyst.ui.theme.allGoodColor
import com.vitorpamplona.amethyst.ui.theme.largeRelayIconModifier
import com.vitorpamplona.ammolite.relays.COMMON_FEED_TYPES
import com.vitorpamplona.ammolite.relays.RelayStat
import com.vitorpamplona.ammolite.relays.relays.RelayStat
@OptIn(ExperimentalLayoutApi::class)
@Composable

View File

@ -40,7 +40,7 @@ import com.vitorpamplona.amethyst.ui.stringRes
import com.vitorpamplona.amethyst.ui.theme.ButtonBorder
import com.vitorpamplona.amethyst.ui.theme.Size10dp
import com.vitorpamplona.amethyst.ui.theme.placeholderText
import com.vitorpamplona.ammolite.relays.RelayStat
import com.vitorpamplona.ammolite.relays.relays.RelayStat
import com.vitorpamplona.quartz.nip65RelayList.RelayUrlFormatter
@Composable

View File

@ -23,7 +23,7 @@ package com.vitorpamplona.ammolite.relays
import com.vitorpamplona.quartz.nip65RelayList.RelayUrlFormatter
object Constants {
val activeTypes = setOf(FeedType.FOLLOWS, FeedType.PRIVATE_DMS)
val activeTypesFollows = setOf(FeedType.FOLLOWS, FeedType.PRIVATE_DMS)
val activeTypesChats = setOf(FeedType.FOLLOWS, FeedType.PUBLIC_CHATS, FeedType.PRIVATE_DMS)
val activeTypesGlobalChats = setOf(FeedType.FOLLOWS, FeedType.PUBLIC_CHATS, FeedType.PRIVATE_DMS, FeedType.GLOBAL)
val activeTypesSearch = setOf(FeedType.SEARCH)
@ -35,7 +35,7 @@ object Constants {
RelaySetupInfo(RelayUrlFormatter.normalize("wss://relay.nostr.bg"), read = true, write = true, feedTypes = activeTypesChats),
RelaySetupInfo(RelayUrlFormatter.normalize("wss://nostr.oxtr.dev"), read = true, write = true, feedTypes = activeTypesChats),
RelaySetupInfo(RelayUrlFormatter.normalize("wss://nostr.fmt.wiz.biz"), read = true, write = false, feedTypes = activeTypesChats),
RelaySetupInfo(RelayUrlFormatter.normalize("wss://relay.damus.io"), read = true, write = true, feedTypes = activeTypes),
RelaySetupInfo(RelayUrlFormatter.normalize("wss://relay.damus.io"), read = true, write = true, feedTypes = activeTypesFollows),
// Global
RelaySetupInfo(RelayUrlFormatter.normalize("wss://nostr.mom"), read = true, write = true, feedTypes = activeTypesGlobalChats),
RelaySetupInfo(RelayUrlFormatter.normalize("wss://nos.lol"), read = true, write = true, feedTypes = activeTypesGlobalChats),

View File

@ -21,8 +21,9 @@
package com.vitorpamplona.ammolite.relays
import android.util.Log
import com.vitorpamplona.ammolite.relays.relays.RelayState
import com.vitorpamplona.ammolite.relays.relays.sockets.WebsocketBuilderFactory
import com.vitorpamplona.ammolite.service.checkNotInMainThread
import com.vitorpamplona.ammolite.sockets.WebsocketBuilder
import com.vitorpamplona.quartz.nip01Core.core.Event
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
@ -38,7 +39,7 @@ import java.util.concurrent.TimeUnit
* published through multiple relays. Events are stored with their respective persona.
*/
class NostrClient(
private val websocketBuilder: WebsocketBuilder,
private val websocketBuilder: WebsocketBuilderFactory,
) : RelayPool.Listener {
private val relayPool: RelayPool = RelayPool()
private val subscriptions: MutableSubscriptionManager = MutableSubscriptionManager()
@ -174,17 +175,24 @@ class NostrClient(
Log.d("sendAndWaitForResponse", "onError Error from relay ${relay.url} count: ${latch.count} error: $error")
}
override fun onRelayStateChange(
type: Relay.StateType,
override fun onEOSE(
relay: Relay,
subscriptionId: String?,
subscriptionId: String,
) {
if (type == Relay.StateType.DISCONNECT || type == Relay.StateType.EOSE) {
latch.countDown()
Log.d("sendAndWaitForResponse", "onEOSE relay ${relay.url} count: ${latch.count}")
}
override fun onRelayStateChange(
type: RelayState,
relay: Relay,
) {
if (type == RelayState.DISCONNECTED) {
latch.countDown()
}
if (type == Relay.StateType.CONNECT) {
if (type == RelayState.CONNECTED) {
Log.d("sendAndWaitForResponse", "${type.name} Sending event to relay ${relay.url} count: ${latch.count}")
relay.sendOverride(signedEvent)
relay.send(signedEvent)
}
Log.d("sendAndWaitForResponse", "onRelayStateChange ${type.name} from relay ${relay.url} count: ${latch.count}")
}
@ -312,16 +320,18 @@ class NostrClient(
}
}
override fun onRelayStateChange(
type: Relay.StateType,
override fun onEOSE(
relay: Relay,
channel: String?,
subscriptionId: String,
) {
// Releases the Web thread for the new payload.
// May need to add a processing queue if processing new events become too costly.
// GlobalScope.launch(Dispatchers.Default) {
listeners.forEach { it.onRelayStateChange(type, relay, channel) }
// }
listeners.forEach { it.onEOSE(relay, subscriptionId) }
}
override fun onRelayStateChange(
type: RelayState,
relay: Relay,
) {
listeners.forEach { it.onRelayStateChange(type, relay) }
}
@OptIn(DelicateCoroutinesApi::class)
@ -420,10 +430,15 @@ class NostrClient(
) = Unit
/** Connected to or disconnected from a relay */
open fun onRelayStateChange(
type: Relay.StateType,
open fun onEOSE(
relay: Relay,
subscriptionId: String,
) = Unit
/** Connected to or disconnected from a relay */
open fun onRelayStateChange(
type: RelayState,
relay: Relay,
subscriptionId: String?,
) = Unit
/** When an relay saves or rejects a new event. */

View File

@ -21,6 +21,7 @@
package com.vitorpamplona.ammolite.relays
import android.util.Log
import com.vitorpamplona.ammolite.relays.relays.RelayState
import com.vitorpamplona.ammolite.service.checkNotInMainThread
import com.vitorpamplona.quartz.nip01Core.core.Event
import com.vitorpamplona.quartz.utils.TimeUtils
@ -93,25 +94,20 @@ abstract class NostrDataSource(
}
}
override fun onRelayStateChange(
type: Relay.StateType,
override fun onEOSE(
relay: Relay,
subscriptionId: String?,
subscriptionId: String,
) {
// if (subscriptions.containsKey(subscriptionId)) {
// Log.d(this@NostrDataSource.javaClass.simpleName, "Relay ${relay.url} ${subscriptionId}
// ${type.name}")
// }
if (
type == Relay.StateType.EOSE &&
subscriptionId != null &&
subscriptions.containsKey(subscriptionId)
) {
if (subscriptions.containsKey(subscriptionId)) {
markAsEOSE(subscriptionId, relay)
}
}
override fun onRelayStateChange(
type: RelayState,
relay: Relay,
) {}
override fun onSendResponse(
eventId: String,
success: Boolean,

View File

@ -20,20 +20,14 @@
*/
package com.vitorpamplona.ammolite.relays
import android.util.Log
import com.vitorpamplona.ammolite.BuildConfig
import com.vitorpamplona.ammolite.service.checkNotInMainThread
import com.vitorpamplona.ammolite.sockets.WebSocket
import com.vitorpamplona.ammolite.sockets.WebSocketListener
import com.vitorpamplona.ammolite.sockets.WebsocketBuilder
import com.vitorpamplona.quartz.nip01Core.HexKey
import com.vitorpamplona.ammolite.relays.relays.RelayState
import com.vitorpamplona.ammolite.relays.relays.SimpleRelay
import com.vitorpamplona.ammolite.relays.relays.Subscription
import com.vitorpamplona.ammolite.relays.relays.SubscriptionCollection
import com.vitorpamplona.ammolite.relays.relays.sockets.WebsocketBuilderFactory
import com.vitorpamplona.quartz.nip01Core.core.Event
import com.vitorpamplona.quartz.nip01Core.jackson.EventMapper
import com.vitorpamplona.quartz.nip01Core.relays.Filter
import com.vitorpamplona.quartz.nip42RelayAuth.RelayAuthEvent
import com.vitorpamplona.quartz.utils.TimeUtils
import com.vitorpamplona.quartz.utils.bytesUsedInMemory
import kotlinx.coroutines.CancellationException
import java.util.concurrent.atomic.AtomicBoolean
enum class FeedType {
FOLLOWS,
@ -53,35 +47,50 @@ val COMMON_FEED_TYPES =
val EVENT_FINDER_TYPES =
setOf(FeedType.FOLLOWS, FeedType.PUBLIC_CHATS, FeedType.GLOBAL)
class RelaySubFilter(
val url: String,
val activeTypes: Set<FeedType>,
val subs: SubscriptionManager,
) : SubscriptionCollection {
override fun allSubscriptions(): List<Subscription> =
subs.allSubscriptions().mapNotNull { filter ->
val filters = filter(filter.value)
if (filters.isNotEmpty()) {
Subscription(filter.key, filters)
} else {
null
}
}
fun filter(filters: List<TypedFilter>): List<Filter> =
filters.mapNotNull { filter ->
if (activeTypes.any { it in filter.types } && filter.filter.isValidFor(url)) {
filter.filter.toRelay(url)
} else {
null
}
}
}
class Relay(
val url: String,
val read: Boolean = true,
val write: Boolean = true,
val forceProxy: Boolean = false,
val activeTypes: Set<FeedType>,
val socketBuilder: WebsocketBuilder,
val subs: SubscriptionManager,
) {
companion object {
// waits 3 minutes to reconnect once things fail
const val RECONNECTING_IN_SECONDS = 60 * 3
}
socketBuilderFactory: WebsocketBuilderFactory,
subs: SubscriptionManager,
) : SimpleRelay.Listener {
private var listeners = setOf<Listener>()
val relaySubFilter = RelaySubFilter(url, activeTypes, subs)
val inner =
SimpleRelay(url, socketBuilderFactory.build(forceProxy), relaySubFilter, RelayStats.get(url)).apply {
register(this@Relay)
}
val brief = RelayBriefInfoCache.get(url)
private var listeners = setOf<Listener>()
private var socket: WebSocket? = null
private var isReady: Boolean = false
private var usingCompression: Boolean = false
private var lastConnectTentative: Long = 0L
private var afterEOSEPerSubscription = mutableMapOf<String, Boolean>()
private val authResponse = mutableMapOf<HexKey, Boolean>()
private val authChallengesSent = mutableSetOf<String>()
private val outboxCache = mutableMapOf<HexKey, Event>()
fun register(listener: Listener) {
listeners = listeners.plus(listener)
}
@ -90,460 +99,43 @@ class Relay(
listeners = listeners.minus(listener)
}
fun isConnected(): Boolean = socket != null
fun isConnected() = inner.isConnected()
fun connect() {
connectAndRun {
checkNotInMainThread()
fun connect() = inner.connect()
// Sends everything.
renewFilters()
sendOutbox()
}
}
fun sendOutbox() {
synchronized(outboxCache) {
outboxCache.values.forEach {
send(it)
}
}
}
private var connectingBlock = AtomicBoolean()
fun connectAndRun(onConnected: (Relay) -> Unit) {
Log.d("Relay", "Relay.connect $url proxy: $forceProxy isAlreadyConnecting: ${connectingBlock.get()}")
fun connectAndRun(onConnected: () -> Unit) {
// BRB is crashing OkHttp Deflater object :(
if (url.contains("brb.io")) return
// If there is a connection, don't wait.
if (connectingBlock.getAndSet(true)) {
return
}
try {
checkNotInMainThread()
if (socket != null) {
connectingBlock.set(false)
return
}
lastConnectTentative = TimeUtils.now()
socket = socketBuilder.build(url, forceProxy, RelayListener(onConnected))
socket?.connect()
} catch (e: Exception) {
if (e is CancellationException) throw e
RelayStats.newError(url, e.message ?: "Error trying to connect: ${e.javaClass.simpleName}")
markConnectionAsClosed()
e.printStackTrace()
} finally {
connectingBlock.set(false)
}
inner.connectAndRun(onConnected)
}
inner class RelayListener(
val onConnected: (Relay) -> Unit,
) : WebSocketListener {
override fun onOpen(
pingMillis: Long,
compression: Boolean,
) {
checkNotInMainThread()
Log.d("Relay", "Connect onOpen $url $socket")
fun sendOutbox() = inner.sendOutbox()
markConnectionAsReady(pingMillis, compression)
// Log.w("Relay", "Relay OnOpen, Loading All subscriptions $url")
onConnected(this@Relay)
listeners.forEach { it.onRelayStateChange(this@Relay, StateType.CONNECT, null) }
}
override fun onMessage(text: String) {
checkNotInMainThread()
RelayStats.addBytesReceived(url, text.bytesUsedInMemory())
try {
processNewRelayMessage(text)
} catch (e: Throwable) {
if (e is CancellationException) throw e
e.printStackTrace()
text.chunked(2000) { chunked ->
listeners.forEach { it.onError(this@Relay, "", Error("Problem with $chunked")) }
}
}
}
override fun onClosing(
code: Int,
reason: String,
) {
checkNotInMainThread()
Log.w("Relay", "Relay onClosing $url: $reason")
listeners.forEach {
it.onRelayStateChange(
this@Relay,
StateType.DISCONNECTING,
null,
)
}
}
override fun onClosed(
code: Int,
reason: String,
) {
checkNotInMainThread()
markConnectionAsClosed()
Log.w("Relay", "Relay onClosed $url: $reason")
listeners.forEach { it.onRelayStateChange(this@Relay, StateType.DISCONNECT, null) }
}
override fun onFailure(
t: Throwable,
responseMessage: String?,
) {
checkNotInMainThread()
socket?.cancel() // 1000, "Normal close"
// checks if this is an actual failure. Closing the socket generates an onFailure as well.
if (!(socket == null && (t.message == "Socket is closed" || t.message == "Socket closed"))) {
RelayStats.newError(url, responseMessage ?: t.message ?: "onFailure event from server: ${t.javaClass.simpleName}")
}
// Failures disconnect the relay.
markConnectionAsClosed()
Log.w("Relay", "Relay onFailure $url, $responseMessage $responseMessage ${t.message} $socket")
t.printStackTrace()
listeners.forEach {
it.onError(
this@Relay,
"",
Error("WebSocket Failure. Response: $responseMessage. Exception: ${t.message}", t),
)
}
}
}
fun markConnectionAsReady(
pingInMs: Long,
usingCompression: Boolean,
) {
this.resetEOSEStatuses()
this.isReady = true
this.usingCompression = usingCompression
RelayStats.setPing(url, pingInMs)
}
fun markConnectionAsClosed() {
this.socket = null
this.isReady = false
this.usingCompression = false
this.resetEOSEStatuses()
}
fun processNewRelayMessage(newMessage: String) {
val msgArray = EventMapper.mapper.readTree(newMessage)
when (val type = msgArray.get(0).asText()) {
"EVENT" -> {
val subscriptionId = msgArray.get(1).asText()
val event = EventMapper.fromJson(msgArray.get(2))
// Log.w("Relay", "Relay onEVENT ${event.kind} $url, $subscriptionId ${msgArray.get(2)}")
listeners.forEach {
it.onEvent(
this@Relay,
subscriptionId,
event,
afterEOSEPerSubscription[subscriptionId] == true,
)
}
}
"EOSE" ->
listeners.forEach {
val subscriptionId = msgArray.get(1).asText()
afterEOSEPerSubscription[subscriptionId] = true
// Log.w("Relay", "Relay onEOSE $url $subscriptionId")
it.onRelayStateChange(this@Relay, StateType.EOSE, subscriptionId)
}
"NOTICE" ->
listeners.forEach {
val message = msgArray.get(1).asText()
Log.w("Relay", "Relay onNotice $url, $message")
RelayStats.newNotice(url, message)
it.onError(this@Relay, message, Error("Relay sent notice: $message"))
}
"OK" ->
listeners.forEach {
val eventId = msgArray[1].asText()
val success = msgArray[2].asBoolean()
val message = if (msgArray.size() > 2) msgArray[3].asText() else ""
Log.w("Relay", "Relay on OK $url, $eventId, $success, $message")
if (authResponse.containsKey(eventId)) {
val wasAlreadyAuthenticated = authResponse.get(eventId)
authResponse.put(eventId, success)
if (wasAlreadyAuthenticated != true && success) {
renewFilters()
sendOutbox()
}
}
if (outboxCache.contains(eventId) && !message.startsWith("auth-required")) {
synchronized(outboxCache) {
outboxCache.remove(eventId)
}
}
if (!success) {
RelayStats.newNotice(url, "Failed to receive $eventId: $message")
}
it.onSendResponse(this@Relay, eventId, success, message)
}
"AUTH" ->
listeners.forEach {
Log.w("Relay", "Relay onAuth $url, ${ msgArray[1].asText()}")
it.onAuth(this@Relay, msgArray[1].asText())
}
"NOTIFY" ->
listeners.forEach {
// Log.w("Relay", "Relay onNotify $url, ${msg[1].asString}")
it.onNotify(this@Relay, msgArray[1].asText())
}
"CLOSED" -> listeners.forEach { Log.w("Relay", "Relay Closed Subscription $url, $newMessage") }
else -> {
RelayStats.newError(url, "Unsupported message: $newMessage")
listeners.forEach {
Log.w("Relay", "Unsupported message: $newMessage")
it.onError(
this@Relay,
"",
Error("Unknown type $type on channel. Msg was $newMessage"),
)
}
}
}
}
fun disconnect() {
Log.d("Relay", "Relay.disconnect $url")
checkNotInMainThread()
lastConnectTentative = 0L // this is not an error, so prepare to reconnect as soon as requested.
socket?.cancel()
socket = null
isReady = false
usingCompression = false
resetEOSEStatuses()
}
fun resetEOSEStatuses() {
afterEOSEPerSubscription = LinkedHashMap(afterEOSEPerSubscription.size)
authResponse.clear()
authChallengesSent.clear()
}
fun disconnect() = inner.disconnect()
fun sendFilter(
requestId: String,
filters: List<TypedFilter>,
) {
checkNotInMainThread()
if (read) {
if (isConnected()) {
if (isReady) {
val relayFilters =
filters.filter { filter ->
activeTypes.any { it in filter.types } && filter.filter.isValidFor(url)
}
if (relayFilters.isNotEmpty()) {
writeToSocket(
relayFilters.joinToStringLimited(
separator = ",",
limit = 19,
prefix = """["REQ","$requestId",""",
postfix = "]",
) {
it.filter.toJson(url)
},
)
afterEOSEPerSubscription[requestId] = false
}
}
} else {
// waits 60 seconds to reconnect after disconnected.
if (TimeUtils.now() > lastConnectTentative + RECONNECTING_IN_SECONDS) {
// sends all filters after connection is successful.
connect()
}
}
inner.sendFilter(requestId, relaySubFilter.filter(filters))
}
}
fun <T> Iterable<T>.joinToStringLimited(
separator: CharSequence = ", ",
prefix: CharSequence = "",
postfix: CharSequence = "",
limit: Int = -1,
transform: ((T) -> CharSequence)? = null,
): String {
val buffer = StringBuilder()
buffer.append(prefix)
var count = 0
for (element in this) {
if (limit < 0 || count <= limit) {
if (++count > 1) buffer.append(separator)
when {
transform != null -> buffer.append(transform(element))
element is CharSequence? -> buffer.append(element)
element is Char -> buffer.append(element)
else -> buffer.append(element.toString())
}
} else {
break
}
}
buffer.append(postfix)
return buffer.toString()
}
fun connectAndSendFiltersIfDisconnected() = inner.connectAndSendFiltersIfDisconnected()
fun connectAndSendFiltersIfDisconnected() {
checkNotInMainThread()
fun renewFilters() = inner.renewFilters()
if (socket == null) {
// waits 60 seconds to reconnect after disconnected.
if (TimeUtils.now() > lastConnectTentative + RECONNECTING_IN_SECONDS) {
// println("sendfilter Only if Disconnected ${url} ")
connect()
}
}
}
fun renewFilters() {
// Force update all filters after AUTH.
subs.allSubscriptions().forEach {
sendFilter(requestId = it.key, it.value)
}
}
// This function sends the event regardless of the relay being write or not.
fun sendOverride(signedEvent: Event) {
checkNotInMainThread()
listeners.forEach { listener ->
listener.onBeforeSend(this@Relay, signedEvent)
}
if (signedEvent is RelayAuthEvent) {
sendAuth(signedEvent)
} else {
sendEvent(signedEvent)
}
}
fun sendOverride(signedEvent: Event) = inner.send(signedEvent)
fun send(signedEvent: Event) {
checkNotInMainThread()
listeners.forEach { listener ->
listener.onBeforeSend(this@Relay, signedEvent)
}
if (signedEvent is RelayAuthEvent) {
sendAuth(signedEvent)
} else {
if (write) {
sendEvent(signedEvent)
}
if (signedEvent is RelayAuthEvent || write) {
inner.send(signedEvent)
}
}
private fun sendAuth(signedEvent: RelayAuthEvent) {
val challenge = signedEvent.challenge() ?: ""
// only send replies to new challenges to avoid infinite loop:
// 1. Auth is sent
// 2. auth is rejected
// 3. auth is requested
// 4. auth is sent
// ...
if (!authChallengesSent.contains(challenge)) {
authResponse.put(signedEvent.id, false)
authChallengesSent.add(challenge)
writeToSocket("""["AUTH",${signedEvent.toJson()}]""")
}
}
private fun sendEvent(signedEvent: Event) {
synchronized(outboxCache) {
outboxCache.put(signedEvent.id, signedEvent)
}
if (isConnected()) {
if (isReady) {
writeToSocket("""["EVENT",${signedEvent.toJson()}]""")
}
} else {
// sends all filters after connection is successful.
connectAndRun {
// Sends everything.
renewFilters()
sendOutbox()
}
}
}
private fun writeToSocket(str: String) {
if (socket == null) {
listeners.forEach { listener ->
listener.onError(
this@Relay,
"",
Error("Failed to send $str. Relay is not connected."),
)
}
}
socket?.let {
checkNotInMainThread()
val result = it.send(str)
listeners.forEach { listener ->
listener.onSend(this@Relay, str, result)
}
RelayStats.addBytesSent(url, str.bytesUsedInMemory())
if (BuildConfig.DEBUG || BuildConfig.BUILD_TYPE == "benchmark") {
Log.d("Relay", "Relay send $url (${str.length} chars) $str")
}
}
}
fun close(subscriptionId: String) {
writeToSocket("""["CLOSE","$subscriptionId"]""")
}
fun close(subscriptionId: String) = inner.close(subscriptionId)
fun isSameRelayConfig(other: RelaySetupInfoToConnect): Boolean =
url == other.url &&
@ -552,22 +144,58 @@ class Relay(
read == other.read &&
activeTypes == other.feedTypes
enum class StateType {
// Websocket connected
CONNECT,
override fun onEvent(
relay: SimpleRelay,
subscriptionId: String,
event: Event,
afterEOSE: Boolean,
) = listeners.forEach { it.onEvent(this, subscriptionId, event, afterEOSE) }
// Websocket disconnecting
DISCONNECTING,
override fun onError(
relay: SimpleRelay,
subscriptionId: String,
error: Error,
) = listeners.forEach { it.onError(this, subscriptionId, error) }
// Websocket disconnected
DISCONNECT,
override fun onEOSE(
relay: SimpleRelay,
subscriptionId: String,
) = listeners.forEach { it.onEOSE(this, subscriptionId) }
// End Of Stored Events
EOSE,
}
override fun onRelayStateChange(
relay: SimpleRelay,
type: RelayState,
) = listeners.forEach { it.onRelayStateChange(this, type) }
override fun onSendResponse(
relay: SimpleRelay,
eventId: String,
success: Boolean,
message: String,
) = listeners.forEach { it.onSendResponse(this, eventId, success, message) }
override fun onAuth(
relay: SimpleRelay,
challenge: String,
) = listeners.forEach { it.onAuth(this, challenge) }
override fun onNotify(
relay: SimpleRelay,
description: String,
) = listeners.forEach { it.onNotify(this, description) }
override fun onSend(
relay: SimpleRelay,
msg: String,
success: Boolean,
) = listeners.forEach { it.onSend(this, msg, success) }
override fun onBeforeSend(
relay: SimpleRelay,
event: Event,
) = listeners.forEach { it.onBeforeSend(this, event) }
interface Listener {
/** A new message was received */
fun onEvent(
relay: Relay,
subscriptionId: String,
@ -575,6 +203,11 @@ class Relay(
afterEOSE: Boolean,
)
fun onEOSE(
relay: Relay,
subscriptionId: String,
)
fun onError(
relay: Relay,
subscriptionId: String,
@ -593,32 +226,26 @@ class Relay(
challenge: String,
)
/**
* Connected to or disconnected from a relay
*
* @param type is 0 for disconnect and 1 for connect
*/
fun onRelayStateChange(
relay: Relay,
type: StateType,
channel: String?,
type: RelayState,
)
/** Relay sent an invoice */
/** Relay sent a notification */
fun onNotify(
relay: Relay,
description: String,
)
fun onBeforeSend(
relay: Relay,
event: Event,
)
fun onSend(
relay: Relay,
msg: String,
success: Boolean,
)
fun onBeforeSend(
relay: Relay,
event: Event,
)
}
}

View File

@ -21,6 +21,7 @@
package com.vitorpamplona.ammolite.relays
import androidx.compose.runtime.Immutable
import com.vitorpamplona.ammolite.relays.relays.RelayState
import com.vitorpamplona.ammolite.service.checkNotInMainThread
import com.vitorpamplona.quartz.nip01Core.core.Event
import kotlinx.coroutines.Dispatchers
@ -173,10 +174,14 @@ class RelayPool : Relay.Listener {
afterEOSE: Boolean,
)
fun onRelayStateChange(
type: Relay.StateType,
fun onEOSE(
relay: Relay,
subscriptionId: String,
)
fun onRelayStateChange(
type: RelayState,
relay: Relay,
channel: String?,
)
fun onSendResponse(
@ -232,15 +237,19 @@ class RelayPool : Relay.Listener {
updateStatus()
}
override fun onEOSE(
relay: Relay,
subscriptionId: String,
) {
listeners.forEach { it.onEOSE(relay, subscriptionId) }
updateStatus()
}
override fun onRelayStateChange(
relay: Relay,
type: Relay.StateType,
channel: String?,
type: RelayState,
) {
listeners.forEach { it.onRelayStateChange(type, relay, channel) }
if (type != Relay.StateType.EOSE) {
updateStatus()
}
listeners.forEach { it.onRelayStateChange(type, relay) }
}
override fun onSendResponse(

View File

@ -20,8 +20,7 @@
*/
package com.vitorpamplona.ammolite.relays
import androidx.collection.LruCache
import com.vitorpamplona.quartz.utils.TimeUtils
import com.vitorpamplona.ammolite.relays.relays.RelayStat
object RelayStats {
private val innerCache = mutableMapOf<String, RelayStat>()
@ -70,67 +69,3 @@ object RelayStats {
get(url).newSpam(explanation)
}
}
class RelayStat(
var receivedBytes: Long = 0L,
var sentBytes: Long = 0L,
var spamCounter: Long = 0L,
var errorCounter: Long = 0L,
var pingInMs: Long = 0L,
) {
val messages = LruCache<RelayDebugMessage, RelayDebugMessage>(100)
fun newNotice(notice: String?) {
val debugMessage =
RelayDebugMessage(
type = RelayDebugMessageType.NOTICE,
message = notice ?: "No error message provided",
)
messages.put(debugMessage, debugMessage)
}
fun newError(error: String?) {
errorCounter++
val debugMessage =
RelayDebugMessage(
type = RelayDebugMessageType.ERROR,
message = error ?: "No error message provided",
)
messages.put(debugMessage, debugMessage)
}
fun addBytesReceived(bytesUsedInMemory: Long) {
receivedBytes += bytesUsedInMemory
}
fun addBytesSent(bytesUsedInMemory: Long) {
sentBytes += bytesUsedInMemory
}
fun newSpam(spamDescriptor: String) {
spamCounter++
val debugMessage =
RelayDebugMessage(
type = RelayDebugMessageType.SPAM,
message = spamDescriptor,
)
messages.put(debugMessage, debugMessage)
}
}
class RelayDebugMessage(
val type: RelayDebugMessageType,
val message: String,
val time: Long = TimeUtils.now(),
)
enum class RelayDebugMessageType {
SPAM,
NOTICE,
ERROR,
}

View File

@ -21,8 +21,11 @@
package com.vitorpamplona.ammolite.relays.filters
import com.vitorpamplona.quartz.nip01Core.core.Event
import com.vitorpamplona.quartz.nip01Core.relays.Filter
interface IPerRelayFilter {
fun toRelay(forRelay: String): Filter
fun toJson(forRelay: String): String
fun match(

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory
import com.vitorpamplona.quartz.nip01Core.HexKey
import com.vitorpamplona.quartz.nip01Core.core.Event
import com.vitorpamplona.quartz.nip01Core.jackson.EventMapper
import com.vitorpamplona.quartz.nip01Core.relays.Filter
import com.vitorpamplona.quartz.nip01Core.relays.FilterMatcher
import com.vitorpamplona.quartz.nip01Core.relays.FilterSerializer
@ -45,6 +46,8 @@ class SinceAuthorPerRelayFilter(
// don't send it.
override fun isValidFor(forRelay: String) = authors == null || !authors[forRelay].isNullOrEmpty()
override fun toRelay(forRelay: String) = Filter(ids, authors?.get(forRelay), kinds, tags, since?.get(forRelay)?.time, until, limit, search)
override fun toJson(forRelay: String): String = FilterSerializer.toJson(ids, authors?.get(forRelay), kinds, tags, since?.get(forRelay)?.time, until, limit, search)
override fun match(

View File

@ -23,6 +23,7 @@ package com.vitorpamplona.ammolite.relays.filters
import com.fasterxml.jackson.databind.node.JsonNodeFactory
import com.vitorpamplona.quartz.nip01Core.core.Event
import com.vitorpamplona.quartz.nip01Core.jackson.EventMapper
import com.vitorpamplona.quartz.nip01Core.relays.Filter
import com.vitorpamplona.quartz.nip01Core.relays.FilterMatcher
import com.vitorpamplona.quartz.nip01Core.relays.FilterSerializer
@ -41,6 +42,8 @@ class SincePerRelayFilter(
) : IPerRelayFilter {
override fun isValidFor(url: String) = true
override fun toRelay(forRelay: String) = Filter(ids, authors, kinds, tags, since?.get(forRelay)?.time, until, limit, search)
override fun toJson(forRelay: String) = FilterSerializer.toJson(ids, authors, kinds, tags, since?.get(forRelay)?.time, until, limit, search)
override fun match(

View File

@ -0,0 +1,88 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays.relays
import androidx.collection.LruCache
import com.vitorpamplona.quartz.utils.TimeUtils
class RelayStat(
var receivedBytes: Long = 0L,
var sentBytes: Long = 0L,
var spamCounter: Long = 0L,
var errorCounter: Long = 0L,
var pingInMs: Long = 0L,
) {
val messages = LruCache<RelayDebugMessage, RelayDebugMessage>(100)
fun newNotice(notice: String?) {
val debugMessage =
RelayDebugMessage(
type = RelayDebugMessageType.NOTICE,
message = notice ?: "No error message provided",
)
messages.put(debugMessage, debugMessage)
}
fun newError(error: String?) {
errorCounter++
val debugMessage =
RelayDebugMessage(
type = RelayDebugMessageType.ERROR,
message = error ?: "No error message provided",
)
messages.put(debugMessage, debugMessage)
}
fun addBytesReceived(bytesUsedInMemory: Long) {
receivedBytes += bytesUsedInMemory
}
fun addBytesSent(bytesUsedInMemory: Long) {
sentBytes += bytesUsedInMemory
}
fun newSpam(spamDescriptor: String) {
spamCounter++
val debugMessage =
RelayDebugMessage(
type = RelayDebugMessageType.SPAM,
message = spamDescriptor,
)
messages.put(debugMessage, debugMessage)
}
}
class RelayDebugMessage(
val type: RelayDebugMessageType,
val message: String,
val time: Long = TimeUtils.now(),
)
enum class RelayDebugMessageType {
SPAM,
NOTICE,
ERROR,
}

View File

@ -0,0 +1,32 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays.relays
enum class RelayState {
// Websocket connected
CONNECTED,
// Websocket disconnecting
DISCONNECTING,
// Websocket disconnected
DISCONNECTED,
}

View File

@ -0,0 +1,520 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays.relays
import android.util.Log
import com.vitorpamplona.ammolite.relays.relays.sockets.WebSocket
import com.vitorpamplona.ammolite.relays.relays.sockets.WebSocketListener
import com.vitorpamplona.ammolite.relays.relays.sockets.WebsocketBuilder
import com.vitorpamplona.ammolite.service.checkNotInMainThread
import com.vitorpamplona.quartz.nip01Core.HexKey
import com.vitorpamplona.quartz.nip01Core.core.Event
import com.vitorpamplona.quartz.nip01Core.jackson.EventMapper
import com.vitorpamplona.quartz.nip01Core.relays.Filter
import com.vitorpamplona.quartz.nip42RelayAuth.RelayAuthEvent
import com.vitorpamplona.quartz.utils.TimeUtils
import com.vitorpamplona.quartz.utils.bytesUsedInMemory
import com.vitorpamplona.quartz.utils.joinToStringLimited
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.coroutines.cancellation.CancellationException
class SimpleRelay(
val url: String,
val socketBuilder: WebsocketBuilder,
val subs: SubscriptionCollection,
val stats: RelayStat = RelayStat(),
) {
companion object {
// waits 3 minutes to reconnect once things fail
const val RECONNECTING_IN_SECONDS = 60 * 3
}
private var listeners = setOf<Listener>()
private var socket: WebSocket? = null
private var isReady: Boolean = false
private var usingCompression: Boolean = false
private var lastConnectTentative: Long = 0L
private var afterEOSEPerSubscription = mutableMapOf<String, Boolean>()
private val authResponse = mutableMapOf<HexKey, Boolean>()
private val authChallengesSent = mutableSetOf<String>()
private val outboxCache = mutableMapOf<HexKey, Event>()
private var connectingMutex = AtomicBoolean()
fun register(listener: Listener) {
listeners = listeners.plus(listener)
}
fun unregister(listener: Listener) {
listeners = listeners.minus(listener)
}
fun isConnected(): Boolean = socket != null
fun connect() {
connectAndRun {
checkNotInMainThread()
// Sends everything.
renewFilters()
sendOutbox()
}
}
fun sendOutbox() {
synchronized(outboxCache) {
outboxCache.values.forEach {
send(it)
}
}
}
fun connectAndRun(onConnected: () -> Unit) {
Log.d("Relay", "Relay.connect $url isAlreadyConnecting: ${connectingMutex.get()}")
// If there is a connection, don't wait.
if (connectingMutex.getAndSet(true)) {
return
}
try {
checkNotInMainThread()
if (socket != null) {
connectingMutex.set(false)
return
}
lastConnectTentative = TimeUtils.now()
socket = socketBuilder.build(url, RelayListener(onConnected))
socket?.connect()
} catch (e: Exception) {
if (e is CancellationException) throw e
stats.newError(e.message ?: "Error trying to connect: ${e.javaClass.simpleName}")
markConnectionAsClosed()
e.printStackTrace()
} finally {
connectingMutex.set(false)
}
}
inner class RelayListener(
val onConnected: () -> Unit,
) : WebSocketListener {
override fun onOpen(
pingMillis: Long,
compression: Boolean,
) {
checkNotInMainThread()
Log.d("Relay", "Connect onOpen $url $socket")
markConnectionAsReady(pingMillis, compression)
// Log.w("Relay", "Relay OnOpen, Loading All subscriptions $url")
onConnected()
listeners.forEach { it.onRelayStateChange(this@SimpleRelay, RelayState.CONNECTED) }
}
override fun onMessage(text: String) {
checkNotInMainThread()
stats.addBytesReceived(text.bytesUsedInMemory())
try {
processNewRelayMessage(text)
} catch (e: Throwable) {
if (e is CancellationException) throw e
e.printStackTrace()
text.chunked(2000) { chunked ->
listeners.forEach { it.onError(this@SimpleRelay, "", Error("Problem with $chunked")) }
}
}
}
override fun onClosing(
code: Int,
reason: String,
) {
checkNotInMainThread()
Log.w("Relay", "Relay onClosing $url: $reason")
listeners.forEach {
it.onRelayStateChange(this@SimpleRelay, RelayState.DISCONNECTING)
}
}
override fun onClosed(
code: Int,
reason: String,
) {
checkNotInMainThread()
markConnectionAsClosed()
Log.w("Relay", "Relay onClosed $url: $reason")
listeners.forEach { it.onRelayStateChange(this@SimpleRelay, RelayState.DISCONNECTED) }
}
override fun onFailure(
t: Throwable,
responseMessage: String?,
) {
checkNotInMainThread()
socket?.cancel() // 1000, "Normal close"
// checks if this is an actual failure. Closing the socket generates an onFailure as well.
if (!(socket == null && (t.message == "Socket is closed" || t.message == "Socket closed"))) {
stats.newError(responseMessage ?: t.message ?: "onFailure event from server: ${t.javaClass.simpleName}")
}
// Failures disconnect the relay.
markConnectionAsClosed()
Log.w("Relay", "Relay onFailure $url, $responseMessage $responseMessage ${t.message} $socket")
t.printStackTrace()
listeners.forEach {
it.onError(
this@SimpleRelay,
"",
Error("WebSocket Failure. Response: $responseMessage. Exception: ${t.message}", t),
)
}
}
}
fun markConnectionAsReady(
pingInMs: Long,
usingCompression: Boolean,
) {
this.resetEOSEStatuses()
this.isReady = true
this.usingCompression = usingCompression
stats.pingInMs = pingInMs
}
fun markConnectionAsClosed() {
this.socket = null
this.isReady = false
this.usingCompression = false
this.resetEOSEStatuses()
}
fun processNewRelayMessage(newMessage: String) {
val msgArray = EventMapper.mapper.readTree(newMessage)
when (val type = msgArray.get(0).asText()) {
"EVENT" -> {
val subscriptionId = msgArray.get(1).asText()
val event = EventMapper.fromJson(msgArray.get(2))
// Log.w("Relay", "Relay onEVENT ${event.kind} $url, $subscriptionId ${msgArray.get(2)}")
listeners.forEach {
it.onEvent(
this@SimpleRelay,
subscriptionId,
event,
afterEOSEPerSubscription[subscriptionId] == true,
)
}
}
"EOSE" ->
listeners.forEach {
val subscriptionId = msgArray.get(1).asText()
afterEOSEPerSubscription[subscriptionId] = true
// Log.w("Relay", "Relay onEOSE $url $subscriptionId")
it.onEOSE(this@SimpleRelay, subscriptionId)
}
"NOTICE" ->
listeners.forEach {
val message = msgArray.get(1).asText()
Log.w("Relay", "Relay onNotice $url, $message")
stats.newNotice(message)
it.onError(this@SimpleRelay, message, Error("Relay sent notice: $message"))
}
"OK" ->
listeners.forEach {
val eventId = msgArray[1].asText()
val success = msgArray[2].asBoolean()
val message = if (msgArray.size() > 2) msgArray[3].asText() else ""
Log.w("Relay", "Relay on OK $url, $eventId, $success, $message")
if (authResponse.containsKey(eventId)) {
val wasAlreadyAuthenticated = authResponse.get(eventId)
authResponse.put(eventId, success)
if (wasAlreadyAuthenticated != true && success) {
renewFilters()
sendOutbox()
}
}
if (outboxCache.contains(eventId) && !message.startsWith("auth-required")) {
synchronized(outboxCache) {
outboxCache.remove(eventId)
}
}
if (!success) {
stats.newNotice("Failed to receive $eventId: $message")
}
it.onSendResponse(this@SimpleRelay, eventId, success, message)
}
"AUTH" ->
listeners.forEach {
Log.w("Relay", "Relay onAuth $url, ${ msgArray[1].asText()}")
it.onAuth(this@SimpleRelay, msgArray[1].asText())
}
"NOTIFY" ->
listeners.forEach {
// Log.w("Relay", "Relay onNotify $url, ${msg[1].asString}")
it.onNotify(this@SimpleRelay, msgArray[1].asText())
}
"CLOSED" -> listeners.forEach { Log.w("Relay", "Relay Closed Subscription $url, $newMessage") }
else -> {
stats.newError("Unsupported message: $newMessage")
listeners.forEach {
Log.w("Relay", "Unsupported message: $newMessage")
it.onError(
this@SimpleRelay,
"",
Error("Unknown type $type on channel. Msg was $newMessage"),
)
}
}
}
}
fun disconnect() {
Log.d("Relay", "Relay.disconnect $url")
checkNotInMainThread()
lastConnectTentative = 0L // this is not an error, so prepare to reconnect as soon as requested.
socket?.cancel()
socket = null
isReady = false
usingCompression = false
resetEOSEStatuses()
}
fun resetEOSEStatuses() {
afterEOSEPerSubscription = LinkedHashMap(afterEOSEPerSubscription.size)
authResponse.clear()
authChallengesSent.clear()
}
fun sendFilter(
requestId: String,
filters: List<Filter>,
) {
checkNotInMainThread()
if (isConnected()) {
if (isReady) {
if (filters.isNotEmpty()) {
writeToSocket(
filters.joinToStringLimited(
separator = ",",
limit = 19,
prefix = """["REQ","$requestId",""",
postfix = "]",
) {
it.toJson()
},
)
afterEOSEPerSubscription[requestId] = false
}
}
} else {
// waits 60 seconds to reconnect after disconnected.
if (TimeUtils.now() > lastConnectTentative + RECONNECTING_IN_SECONDS) {
// sends all filters after connection is successful.
connect()
}
}
}
fun connectAndSendFiltersIfDisconnected() {
checkNotInMainThread()
if (socket == null) {
// waits 60 seconds to reconnect after disconnected.
if (TimeUtils.now() > lastConnectTentative + RECONNECTING_IN_SECONDS) {
// println("sendfilter Only if Disconnected ${url} ")
connect()
}
}
}
fun renewFilters() {
// Force update all filters after AUTH.
subs.allSubscriptions().forEach {
sendFilter(requestId = it.id, it.filters)
}
}
fun send(signedEvent: Event) {
checkNotInMainThread()
listeners.forEach { listener ->
listener.onBeforeSend(this@SimpleRelay, signedEvent)
}
if (signedEvent is RelayAuthEvent) {
sendAuth(signedEvent)
} else {
sendEvent(signedEvent)
}
}
fun sendAuth(signedEvent: RelayAuthEvent) {
val challenge = signedEvent.challenge() ?: ""
// only send replies to new challenges to avoid infinite loop:
// 1. Auth is sent
// 2. auth is rejected
// 3. auth is requested
// 4. auth is sent
// ...
if (!authChallengesSent.contains(challenge)) {
authResponse.put(signedEvent.id, false)
authChallengesSent.add(challenge)
writeToSocket("""["AUTH",${signedEvent.toJson()}]""")
}
}
fun sendEvent(signedEvent: Event) {
synchronized(outboxCache) {
outboxCache.put(signedEvent.id, signedEvent)
}
if (isConnected()) {
if (isReady) {
writeToSocket("""["EVENT",${signedEvent.toJson()}]""")
}
} else {
// sends all filters after connection is successful.
connectAndRun {
// Sends everything.
renewFilters()
sendOutbox()
}
}
}
private fun writeToSocket(str: String) {
if (socket == null) {
listeners.forEach { listener ->
listener.onError(
this@SimpleRelay,
"",
Error("Failed to send $str. Relay is not connected."),
)
}
}
socket?.let {
checkNotInMainThread()
val result = it.send(str)
listeners.forEach { listener ->
listener.onSend(this@SimpleRelay, str, result)
}
stats.addBytesSent(str.bytesUsedInMemory())
Log.d("Relay", "Relay send $url (${str.length} chars) $str")
}
}
fun close(subscriptionId: String) {
writeToSocket("""["CLOSE","$subscriptionId"]""")
}
interface Listener {
fun onEvent(
relay: SimpleRelay,
subscriptionId: String,
event: Event,
afterEOSE: Boolean,
)
fun onEOSE(
relay: SimpleRelay,
subscriptionId: String,
)
fun onError(
relay: SimpleRelay,
subscriptionId: String,
error: Error,
)
fun onSendResponse(
relay: SimpleRelay,
eventId: String,
success: Boolean,
message: String,
)
fun onAuth(
relay: SimpleRelay,
challenge: String,
)
fun onRelayStateChange(
relay: SimpleRelay,
type: RelayState,
)
/** Relay sent a notification */
fun onNotify(
relay: SimpleRelay,
description: String,
)
fun onBeforeSend(
relay: SimpleRelay,
event: Event,
)
fun onSend(
relay: SimpleRelay,
msg: String,
success: Boolean,
)
}
}

View File

@ -0,0 +1,29 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays.relays
import com.vitorpamplona.quartz.nip01Core.relays.Filter
import java.util.UUID
class Subscription(
val id: String = UUID.randomUUID().toString().substring(0, 4),
val filters: List<Filter>,
)

View File

@ -0,0 +1,25 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays.relays
interface SubscriptionCollection {
fun allSubscriptions(): List<Subscription>
}

View File

@ -18,7 +18,7 @@
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.sockets
package com.vitorpamplona.ammolite.relays.relays.sockets
interface WebSocket {
fun connect()

View File

@ -18,7 +18,7 @@
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.sockets
package com.vitorpamplona.ammolite.relays.relays.sockets
interface WebSocketListener {
fun onOpen(

View File

@ -18,12 +18,11 @@
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.sockets
package com.vitorpamplona.ammolite.relays.relays.sockets
interface WebsocketBuilder {
fun build(
url: String,
forceProxy: Boolean,
out: WebSocketListener,
): WebSocket
}

View File

@ -0,0 +1,25 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays.relays.sockets
interface WebsocketBuilderFactory {
fun build(forceProxy: Boolean): WebsocketBuilder
}

View File

@ -25,6 +25,7 @@ import com.vitorpamplona.quartz.nip01Core.HexKey
import com.vitorpamplona.quartz.nip01Core.core.AddressableEvent
import com.vitorpamplona.quartz.nip01Core.signers.NostrSigner
import com.vitorpamplona.quartz.nip01Core.tags.addressables.ATag
import com.vitorpamplona.quartz.nip01Core.tags.addressables.dTag
import com.vitorpamplona.quartz.nip01Core.tags.hashtags.hashtags
import com.vitorpamplona.quartz.nip10Notes.BaseTextNoteEvent
import com.vitorpamplona.quartz.utils.TimeUtils
@ -39,7 +40,7 @@ class LongTextNoteEvent(
sig: HexKey,
) : BaseTextNoteEvent(id, pubKey, createdAt, KIND, tags, content, sig),
AddressableEvent {
override fun dTag() = tags.firstOrNull { it.size > 1 && it[0] == "d" }?.get(1) ?: ""
override fun dTag() = tags.dTag()
override fun address(relayHint: String?) = ATag(kind, pubKey, dTag(), relayHint)

View File

@ -0,0 +1,48 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.quartz.utils
fun <T> Iterable<T>.joinToStringLimited(
separator: CharSequence,
prefix: CharSequence,
postfix: CharSequence,
limit: Int,
transform: ((T) -> CharSequence)?,
): String {
val buffer = StringBuilder()
buffer.append(prefix)
var count = 0
for (element in this) {
if (limit < 0 || count <= limit) {
if (++count > 1) buffer.append(separator)
when {
transform != null -> buffer.append(transform(element))
element is CharSequence? -> buffer.append(element)
element is Char -> buffer.append(element)
else -> buffer.append(element.toString())
}
} else {
break
}
}
buffer.append(postfix)
return buffer.toString()
}