mirror of
https://github.com/vitorpamplona/amethyst.git
synced 2025-11-10 11:17:27 +01:00
Merge branch 'vitorpamplona:main' into followset-improvements
This commit is contained in:
@@ -107,7 +107,7 @@ kotlin {
|
||||
implementation(libs.kotlinx.serialization.json)
|
||||
|
||||
// in your shared module's dependencies block
|
||||
implementation( libs.kotlinx.datetime)
|
||||
// implementation( libs.kotlinx.datetime)
|
||||
|
||||
// immutable collections to avoid recomposition
|
||||
implementation(libs.kotlinx.collections.immutable)
|
||||
|
||||
@@ -83,17 +83,50 @@ class HexEncodingTest {
|
||||
assertFalse("`a", Hex.isHex("`a"))
|
||||
assertFalse("gg", Hex.isHex("gg"))
|
||||
assertFalse("fg", Hex.isHex("fg"))
|
||||
assertFalse("\uD83E\uDD70", Hex.isHex("\uD83E\uDD70"))
|
||||
}
|
||||
|
||||
@OptIn(ExperimentalStdlibApi::class)
|
||||
@Test
|
||||
fun testRandomsIsHex() {
|
||||
val lowerCaseHexNeg = "ghijklmnopqrstuvwxyz"
|
||||
val upperCaseHexNeg = "GHIJKLMNOPQRSTUVWXYZ"
|
||||
|
||||
for (i in 0..10000) {
|
||||
val bytes = RandomInstance.bytes(32)
|
||||
val hex = bytes.toHexString(HexFormat.Default)
|
||||
assertTrue(hex, Hex.isHex(hex))
|
||||
val hexUpper = bytes.toHexString(HexFormat.UpperCase)
|
||||
assertTrue(hexUpper, Hex.isHex(hexUpper))
|
||||
|
||||
// scramble
|
||||
val negHex = hex.replaceFirst(hex.random(), lowerCaseHexNeg.random())
|
||||
val negHexUpper = hexUpper.replaceFirst(hexUpper.random(), upperCaseHexNeg.random())
|
||||
|
||||
assertFalse(negHex, Hex.isHex(negHex))
|
||||
assertFalse(negHexUpper, Hex.isHex(negHexUpper))
|
||||
}
|
||||
}
|
||||
|
||||
@OptIn(ExperimentalStdlibApi::class)
|
||||
@Test
|
||||
fun testRandomsIsHex64() {
|
||||
val lowerCaseHexNeg = "ghijklmnopqrstuvwxyz"
|
||||
val upperCaseHexNeg = "GHIJKLMNOPQRSTUVWXYZ"
|
||||
|
||||
for (i in 0..10000) {
|
||||
val bytes = RandomInstance.bytes(32)
|
||||
val hex = bytes.toHexString(HexFormat.Default)
|
||||
assertTrue(hex, Hex.isHex64(hex))
|
||||
val hexUpper = bytes.toHexString(HexFormat.UpperCase)
|
||||
assertTrue(hexUpper, Hex.isHex64(hexUpper))
|
||||
|
||||
// scramble
|
||||
val negHex = hex.replaceFirst(hex.random(), lowerCaseHexNeg.random())
|
||||
val negHexUpper = hexUpper.replaceFirst(hexUpper.random(), upperCaseHexNeg.random())
|
||||
|
||||
assertFalse(hex + ":" + negHex, Hex.isHex64(negHex))
|
||||
assertFalse(hexUpper + ":" + negHexUpper, Hex.isHex64(negHexUpper))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -42,7 +42,8 @@ class HintIndexer {
|
||||
|
||||
// 3.75MB for keys
|
||||
private val pubKeyHints = BloomFilterMurMur3(30_000_000, 10)
|
||||
private val relayDB = LargeCache<NormalizedRelayUrl, NormalizedRelayUrl>()
|
||||
|
||||
val relayDB = LargeCache<NormalizedRelayUrl, NormalizedRelayUrl>()
|
||||
|
||||
private fun add(
|
||||
id: ByteArray,
|
||||
|
||||
@@ -36,7 +36,10 @@ interface INostrClient {
|
||||
|
||||
fun disconnect()
|
||||
|
||||
fun reconnect(onlyIfChanged: Boolean = false)
|
||||
fun reconnect(
|
||||
onlyIfChanged: Boolean = false,
|
||||
ignoreRetryDelays: Boolean = false,
|
||||
)
|
||||
|
||||
fun isActive(): Boolean
|
||||
|
||||
@@ -78,7 +81,10 @@ object EmptyNostrClient : INostrClient {
|
||||
|
||||
override fun disconnect() { }
|
||||
|
||||
override fun reconnect(onlyIfChanged: Boolean) { }
|
||||
override fun reconnect(
|
||||
onlyIfChanged: Boolean,
|
||||
ignoreRetryDelays: Boolean,
|
||||
) { }
|
||||
|
||||
override fun isActive() = false
|
||||
|
||||
|
||||
@@ -37,13 +37,15 @@ import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.DelicateCoroutinesApi
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.FlowPreview
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.SharingStarted
|
||||
import kotlinx.coroutines.flow.combine
|
||||
import kotlinx.coroutines.flow.debounce
|
||||
import kotlinx.coroutines.flow.flowOn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.flow.sample
|
||||
import kotlinx.coroutines.flow.stateIn
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.launch
|
||||
|
||||
/**
|
||||
* The NostrClient manages Nostr relay operations, subscriptions, and event delivery. It maintains:
|
||||
@@ -85,7 +87,8 @@ class NostrClient(
|
||||
// controls the state of the client in such a way that if it is active
|
||||
// new filters will be sent to the relays and a potential reconnect can
|
||||
// be triggered.
|
||||
private var isActive = false
|
||||
// STARTS active
|
||||
private var isActive = true
|
||||
|
||||
/**
|
||||
* Whatches for any changes in the relay list from subscriptions or outbox
|
||||
@@ -115,12 +118,13 @@ class NostrClient(
|
||||
socketBuilder = websocketBuilder,
|
||||
listener = relayPool,
|
||||
stats = RelayStats.get(relay),
|
||||
scope = scope,
|
||||
) { liveRelay ->
|
||||
if (isActive) {
|
||||
activeRequests.forEachSub(relay, liveRelay::sendRequest)
|
||||
activeCounts.forEachSub(relay, liveRelay::sendCount)
|
||||
eventOutbox.forEachUnsentEvent(relay, liveRelay::send)
|
||||
scope.launch(Dispatchers.Default) {
|
||||
activeRequests.forEachSub(relay, liveRelay::sendRequest)
|
||||
activeCounts.forEachSub(relay, liveRelay::sendCount)
|
||||
eventOutbox.forEachUnsentEvent(relay, liveRelay::send)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -137,21 +141,35 @@ class NostrClient(
|
||||
|
||||
override fun isActive() = isActive
|
||||
|
||||
val myReconnectMutex = Mutex()
|
||||
class Reconnect(
|
||||
val onlyIfChanged: Boolean,
|
||||
val ignoreRetryDelays: Boolean,
|
||||
)
|
||||
|
||||
override fun reconnect(onlyIfChanged: Boolean) {
|
||||
if (myReconnectMutex.tryLock()) {
|
||||
try {
|
||||
if (onlyIfChanged) {
|
||||
relayPool.reconnectIfNeedsToORIfItIsTime()
|
||||
val refreshConnection = MutableStateFlow(Reconnect(false, false))
|
||||
|
||||
@OptIn(FlowPreview::class)
|
||||
val debouncingConnection =
|
||||
refreshConnection
|
||||
.debounce(200)
|
||||
.onEach {
|
||||
if (it.onlyIfChanged) {
|
||||
relayPool.reconnectIfNeedsTo(it.ignoreRetryDelays)
|
||||
} else {
|
||||
relayPool.disconnect()
|
||||
relayPool.connect()
|
||||
}
|
||||
} finally {
|
||||
myReconnectMutex.unlock()
|
||||
}
|
||||
}
|
||||
}.stateIn(
|
||||
scope,
|
||||
SharingStarted.Eagerly,
|
||||
false,
|
||||
)
|
||||
|
||||
override fun reconnect(
|
||||
onlyIfChanged: Boolean,
|
||||
ignoreRetryDelays: Boolean,
|
||||
) {
|
||||
refreshConnection.tryEmit(Reconnect(onlyIfChanged, ignoreRetryDelays))
|
||||
}
|
||||
|
||||
fun needsToResendRequest(
|
||||
@@ -228,7 +246,10 @@ class NostrClient(
|
||||
|
||||
if (newFilters.isNullOrEmpty()) {
|
||||
// some relays are not in this sub anymore. Stop their subscriptions
|
||||
relayPool.close(relay, subId)
|
||||
if (!oldFilters.isNullOrEmpty()) {
|
||||
// only update if the old filters are not already closed.
|
||||
relayPool.close(relay, subId)
|
||||
}
|
||||
} else if (oldFilters.isNullOrEmpty()) {
|
||||
// new relays were added. Start a new sub in them
|
||||
relayPool.sendRequest(relay, subId, newFilters)
|
||||
@@ -244,7 +265,7 @@ class NostrClient(
|
||||
}
|
||||
|
||||
// wakes up all the other relays
|
||||
relayPool.reconnectIfNeedsToORIfItIsTime()
|
||||
reconnect(true)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -264,7 +285,10 @@ class NostrClient(
|
||||
|
||||
if (newFilters.isNullOrEmpty()) {
|
||||
// some relays are not in this sub anymore. Stop their subscriptions
|
||||
relayPool.close(relay, subId)
|
||||
if (!oldFilters.isNullOrEmpty()) {
|
||||
// only update if the old filters are not already closed.
|
||||
relayPool.close(relay, subId)
|
||||
}
|
||||
} else if (oldFilters.isNullOrEmpty()) {
|
||||
// new relays were added. Start a new sub in them
|
||||
relayPool.sendCount(relay, subId, newFilters)
|
||||
@@ -280,7 +304,7 @@ class NostrClient(
|
||||
}
|
||||
|
||||
// wakes up all the other relays
|
||||
relayPool.reconnectIfNeedsToORIfItIsTime()
|
||||
reconnect(true)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -292,7 +316,7 @@ class NostrClient(
|
||||
relayPool.getRelay(connectedRelay)?.send(event)
|
||||
|
||||
// wakes up all the other relays
|
||||
relayPool.reconnectIfNeedsToORIfItIsTime()
|
||||
reconnect(true)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -301,11 +325,12 @@ class NostrClient(
|
||||
relayList: Set<NormalizedRelayUrl>,
|
||||
) {
|
||||
eventOutbox.markAsSending(event, relayList)
|
||||
|
||||
if (isActive) {
|
||||
relayPool.send(event, relayList)
|
||||
|
||||
// wakes up all the other relays
|
||||
relayPool.reconnectIfNeedsToORIfItIsTime()
|
||||
reconnect(true)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -333,6 +358,14 @@ class NostrClient(
|
||||
listeners.forEach { it.onEOSE(relay, subId, arrivalTime) }
|
||||
}
|
||||
|
||||
override fun onClosed(
|
||||
relay: IRelayClient,
|
||||
subId: String,
|
||||
message: String,
|
||||
) {
|
||||
listeners.forEach { it.onClosed(relay, subId, message) }
|
||||
}
|
||||
|
||||
override fun onRelayStateChange(
|
||||
relay: IRelayClient,
|
||||
type: RelayState,
|
||||
|
||||
@@ -34,10 +34,6 @@ class NostrClientSubscription(
|
||||
) : IRelayClientListener {
|
||||
private val subId = RandomInstance.randomChars(10)
|
||||
|
||||
init {
|
||||
client.subscribe(this)
|
||||
}
|
||||
|
||||
override fun onEvent(
|
||||
relay: IRelayClient,
|
||||
subId: String,
|
||||
@@ -57,4 +53,13 @@ class NostrClientSubscription(
|
||||
fun updateFilter() = client.openReqSubscription(subId, filter())
|
||||
|
||||
fun closeSubscription() = client.close(subId)
|
||||
|
||||
fun destroy() {
|
||||
client.unsubscribe(this)
|
||||
}
|
||||
|
||||
init {
|
||||
client.subscribe(this)
|
||||
updateFilter()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,6 +32,7 @@ import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.IO
|
||||
import kotlinx.coroutines.async
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import kotlinx.coroutines.withTimeoutOrNull
|
||||
|
||||
@@ -46,7 +47,7 @@ suspend fun INostrClient.sendAndWaitForResponse(
|
||||
relayList: Set<NormalizedRelayUrl>,
|
||||
timeoutInSeconds: Long = 15,
|
||||
): Boolean {
|
||||
val resultChannel = Channel<Result>()
|
||||
val resultChannel = Channel<Result>(UNLIMITED)
|
||||
|
||||
Log.d("sendAndWaitForResponse", "Waiting for ${relayList.size} responses")
|
||||
|
||||
@@ -91,25 +92,28 @@ suspend fun INostrClient.sendAndWaitForResponse(
|
||||
// subscribe before sending the result.
|
||||
val resultSubscription =
|
||||
coroutineScope {
|
||||
async(Dispatchers.IO) {
|
||||
val receivedResults = mutableMapOf<NormalizedRelayUrl, Boolean>()
|
||||
// The withTimeout block will cancel the coroutine if the loop takes too long
|
||||
withTimeoutOrNull(timeoutInSeconds * 1000) {
|
||||
while (receivedResults.size < relayList.size) {
|
||||
val result = resultChannel.receive()
|
||||
val result =
|
||||
async(Dispatchers.IO) {
|
||||
val receivedResults = mutableMapOf<NormalizedRelayUrl, Boolean>()
|
||||
// The withTimeout block will cancel the coroutine if the loop takes too long
|
||||
withTimeoutOrNull(timeoutInSeconds * 1000) {
|
||||
while (receivedResults.size < relayList.size) {
|
||||
val result = resultChannel.receive()
|
||||
|
||||
val currentResult = receivedResults[result.relay]
|
||||
// do not override a successful result.
|
||||
if (currentResult == null || !currentResult) {
|
||||
receivedResults.put(result.relay, result.success)
|
||||
val currentResult = receivedResults[result.relay]
|
||||
// do not override a successful result.
|
||||
if (currentResult == null || !currentResult) {
|
||||
receivedResults.put(result.relay, result.success)
|
||||
}
|
||||
}
|
||||
}
|
||||
receivedResults
|
||||
}
|
||||
receivedResults
|
||||
}
|
||||
}
|
||||
|
||||
send(event, relayList)
|
||||
send(event, relayList)
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
val receivedResults = resultSubscription.await()
|
||||
|
||||
|
||||
@@ -27,14 +27,48 @@ import com.vitorpamplona.quartz.nip01Core.relay.client.single.IRelayClient
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.client.single.newSubId
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.filters.Filter
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.normalizer.RelayUrlNormalizer
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
|
||||
import kotlinx.coroutines.withTimeoutOrNull
|
||||
|
||||
suspend fun INostrClient.downloadFirstEvent(
|
||||
relay: String,
|
||||
filter: Filter,
|
||||
) = downloadFirstEvent(newSubId(), mapOf(RelayUrlNormalizer.normalize(relay) to listOf(filter)))
|
||||
|
||||
suspend fun INostrClient.downloadFirstEvent(
|
||||
relay: String,
|
||||
filters: List<Filter>,
|
||||
) = downloadFirstEvent(newSubId(), mapOf(RelayUrlNormalizer.normalize(relay) to filters))
|
||||
|
||||
suspend fun INostrClient.downloadFirstEvent(
|
||||
subscriptionId: String = newSubId(),
|
||||
relay: String,
|
||||
filters: List<Filter>,
|
||||
) = downloadFirstEvent(subscriptionId, mapOf(RelayUrlNormalizer.normalize(relay) to filters))
|
||||
|
||||
suspend fun INostrClient.downloadFirstEvent(
|
||||
relay: NormalizedRelayUrl,
|
||||
filter: Filter,
|
||||
) = downloadFirstEvent(newSubId(), mapOf(relay to listOf(filter)))
|
||||
|
||||
suspend fun INostrClient.downloadFirstEvent(
|
||||
relay: NormalizedRelayUrl,
|
||||
filters: List<Filter>,
|
||||
) = downloadFirstEvent(newSubId(), mapOf(relay to filters))
|
||||
|
||||
suspend fun INostrClient.downloadFirstEvent(
|
||||
subscriptionId: String = newSubId(),
|
||||
relay: NormalizedRelayUrl,
|
||||
filters: List<Filter>,
|
||||
) = downloadFirstEvent(subscriptionId, mapOf(relay to filters))
|
||||
|
||||
suspend fun INostrClient.downloadFirstEvent(
|
||||
subscriptionId: String = newSubId(),
|
||||
filters: Map<NormalizedRelayUrl, List<Filter>>,
|
||||
): Event? {
|
||||
val resultChannel = Channel<Event>()
|
||||
val resultChannel = Channel<Event>(UNLIMITED)
|
||||
|
||||
val listener =
|
||||
object : IRelayClientListener {
|
||||
|
||||
@@ -28,7 +28,6 @@ import com.vitorpamplona.quartz.nip01Core.relay.client.listeners.RelayState
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.client.single.IRelayClient
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.filters.Filter
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl
|
||||
import com.vitorpamplona.quartz.utils.TimeUtils
|
||||
import com.vitorpamplona.quartz.utils.cache.LargeCache
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
@@ -71,17 +70,7 @@ class RelayPool(
|
||||
|
||||
fun getRelay(url: NormalizedRelayUrl): IRelayClient? = relays.get(url)
|
||||
|
||||
var lastReconnectCall = TimeUtils.now()
|
||||
|
||||
fun reconnectIfNeedsToORIfItIsTime() {
|
||||
if (lastReconnectCall < TimeUtils.oneMinuteAgo()) {
|
||||
reconnectIfNeedsTo()
|
||||
|
||||
lastReconnectCall = TimeUtils.now()
|
||||
}
|
||||
}
|
||||
|
||||
fun reconnectIfNeedsTo() {
|
||||
fun reconnectIfNeedsTo(ignoreRetryDelays: Boolean = false) {
|
||||
relays.forEach { url, relay ->
|
||||
if (relay.isConnected()) {
|
||||
if (relay.needsToReconnect()) {
|
||||
@@ -91,7 +80,7 @@ class RelayPool(
|
||||
}
|
||||
} else {
|
||||
// relay is not connected. Connect if it is time
|
||||
relay.connectAndSyncFiltersIfDisconnected()
|
||||
relay.connectAndSyncFiltersIfDisconnected(ignoreRetryDelays)
|
||||
}
|
||||
}
|
||||
updateStatus()
|
||||
@@ -125,7 +114,7 @@ class RelayPool(
|
||||
subId: String,
|
||||
filters: List<Filter>,
|
||||
) {
|
||||
relays.get(relay)?.sendRequest(subId, filters)
|
||||
getOrCreateRelay(relay).sendRequest(subId, filters)
|
||||
}
|
||||
|
||||
fun sendRequest(
|
||||
@@ -145,7 +134,7 @@ class RelayPool(
|
||||
subId: String,
|
||||
filters: List<Filter>,
|
||||
) {
|
||||
relays.get(relay)?.sendCount(subId, filters)
|
||||
getOrCreateRelay(relay).sendCount(subId, filters)
|
||||
}
|
||||
|
||||
fun sendCount(
|
||||
|
||||
@@ -34,7 +34,7 @@ interface IRelayClient {
|
||||
|
||||
fun connectAndRunAfterSync(onConnected: () -> Unit)
|
||||
|
||||
fun connectAndSyncFiltersIfDisconnected()
|
||||
fun connectAndSyncFiltersIfDisconnected(ignoreRetryDelays: Boolean = false)
|
||||
|
||||
fun isConnected(): Boolean
|
||||
|
||||
|
||||
@@ -51,9 +51,6 @@ import com.vitorpamplona.quartz.nip42RelayAuth.RelayAuthEvent
|
||||
import com.vitorpamplona.quartz.utils.Log
|
||||
import com.vitorpamplona.quartz.utils.TimeUtils
|
||||
import com.vitorpamplona.quartz.utils.bytesUsedInMemory
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlin.concurrent.atomics.AtomicBoolean
|
||||
import kotlin.concurrent.atomics.ExperimentalAtomicApi
|
||||
import kotlin.coroutines.cancellation.CancellationException
|
||||
@@ -69,7 +66,7 @@ import kotlin.coroutines.cancellation.CancellationException
|
||||
* @property defaultOnConnect Callback executed after a successful connection, allowing subclasses to add initialization logic.
|
||||
*
|
||||
* Reconnection Strategy:
|
||||
* - Uses exponential backoff to retry connections, starting with [DELAY_TO_RECONNECT_IN_MSECS] (500ms).
|
||||
* - Uses exponential backoff to retry connections, starting with [DELAY_TO_RECONNECT_IN_SECS] (500ms).
|
||||
* - Doubles the delay between reconnection attempts in case of failure.
|
||||
*
|
||||
* Message Handling:
|
||||
@@ -82,13 +79,11 @@ open class BasicRelayClient(
|
||||
val socketBuilder: WebsocketBuilder,
|
||||
val listener: IRelayClientListener,
|
||||
val stats: RelayStat = RelayStat(),
|
||||
val scope: CoroutineScope,
|
||||
val defaultOnConnect: (BasicRelayClient) -> Unit = { },
|
||||
) : IRelayClient {
|
||||
companion object {
|
||||
// waits 3 minutes to reconnect once things fail
|
||||
// minimum wait time to reconnect: 1 second
|
||||
const val DELAY_TO_RECONNECT_IN_SECS = 1
|
||||
const val EVENT_MESSAGE_PREFIX = "[\"${EventMessage.LABEL}\""
|
||||
}
|
||||
|
||||
private val logTag = "Relay ${url.displayUrl()}"
|
||||
@@ -166,24 +161,13 @@ open class BasicRelayClient(
|
||||
|
||||
markConnectionAsReady(pingMillis, compression)
|
||||
|
||||
scope.launch(Dispatchers.Default) {
|
||||
onConnected()
|
||||
}
|
||||
onConnected()
|
||||
|
||||
listener.onRelayStateChange(this@BasicRelayClient, RelayState.CONNECTED)
|
||||
}
|
||||
|
||||
override fun onMessage(text: String) {
|
||||
// Log.d(logTag, "Receiving: $text")
|
||||
|
||||
if (text.startsWith(EVENT_MESSAGE_PREFIX)) {
|
||||
// defers the parsing of ["EVENTS" to avoid blocking the HTTP thread
|
||||
scope.launch(Dispatchers.Default) {
|
||||
consumeIncomingCommand(text, onConnected)
|
||||
}
|
||||
} else {
|
||||
consumeIncomingCommand(text, onConnected)
|
||||
}
|
||||
consumeIncomingMessage(text, onConnected)
|
||||
}
|
||||
|
||||
override fun onClosing(
|
||||
@@ -237,7 +221,7 @@ open class BasicRelayClient(
|
||||
}
|
||||
}
|
||||
|
||||
fun consumeIncomingCommand(
|
||||
fun consumeIncomingMessage(
|
||||
text: String,
|
||||
onConnected: () -> Unit,
|
||||
) {
|
||||
@@ -258,7 +242,7 @@ open class BasicRelayClient(
|
||||
} catch (e: Throwable) {
|
||||
if (e is CancellationException) throw e
|
||||
stats.newError("Error processing: $text")
|
||||
Log.e(logTag, "Error processing: $text")
|
||||
Log.e(logTag, "Error processing: $text", e)
|
||||
listener.onError(this@BasicRelayClient, "", Error("Error processing $text"))
|
||||
}
|
||||
}
|
||||
@@ -296,7 +280,7 @@ open class BasicRelayClient(
|
||||
}
|
||||
|
||||
private fun processEose(msg: EoseMessage) {
|
||||
// Log.w(logTag, "EOSE ${msg.subId}")
|
||||
Log.d(logTag, "EOSE ${msg.subId}")
|
||||
afterEOSEPerSubscription[msg.subId] = true
|
||||
listener.onEOSE(this, msg.subId, TimeUtils.now())
|
||||
}
|
||||
@@ -311,7 +295,7 @@ open class BasicRelayClient(
|
||||
msg: OkMessage,
|
||||
onConnected: () -> Unit,
|
||||
) {
|
||||
Log.w(logTag, "OK: ${msg.eventId} ${msg.success} ${msg.message}")
|
||||
Log.d(logTag, "OK: ${msg.eventId} ${msg.success} ${msg.message}")
|
||||
|
||||
// if this is the OK of an auth event, renew all subscriptions and resend all outgoing events.
|
||||
if (authResponseWatcher.containsKey(msg.eventId)) {
|
||||
@@ -403,10 +387,10 @@ open class BasicRelayClient(
|
||||
}
|
||||
}
|
||||
|
||||
override fun connectAndSyncFiltersIfDisconnected() {
|
||||
override fun connectAndSyncFiltersIfDisconnected(ignoreRetryDelays: Boolean) {
|
||||
if (!isConnectionStarted() && !connectingMutex.load()) {
|
||||
// waits 60 seconds to reconnect after disconnected.
|
||||
if (TimeUtils.now() > lastConnectTentativeInSeconds + delayToConnectInSeconds) {
|
||||
if (ignoreRetryDelays || TimeUtils.now() > lastConnectTentativeInSeconds + delayToConnectInSeconds) {
|
||||
upRelayDelayToConnect()
|
||||
connect()
|
||||
}
|
||||
|
||||
@@ -26,7 +26,6 @@ import com.vitorpamplona.quartz.nip01Core.relay.client.single.basic.BasicRelayCl
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.client.stats.RelayStat
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.sockets.WebsocketBuilder
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
|
||||
/**
|
||||
* This relay client saves any event that will be sent in an outbox
|
||||
@@ -37,13 +36,11 @@ class SimpleRelayClient(
|
||||
socketBuilder: WebsocketBuilder,
|
||||
listener: IRelayClientListener,
|
||||
stats: RelayStat = RelayStat(),
|
||||
scopeToParseEvents: CoroutineScope,
|
||||
defaultOnConnect: (BasicRelayClient) -> Unit = { },
|
||||
) : IRelayClient by BasicRelayClient(
|
||||
url,
|
||||
socketBuilder,
|
||||
OutboxCache(listener),
|
||||
stats,
|
||||
scopeToParseEvents,
|
||||
defaultOnConnect,
|
||||
)
|
||||
|
||||
@@ -58,13 +58,13 @@ class ContactListEvent(
|
||||
/**
|
||||
* Returns a list of p-tags that are verified as hex keys.
|
||||
*/
|
||||
fun verifiedFollowKeySet(): Set<HexKey> = tags.mapNotNullTo(HashSet(), ContactTag::parseValidKey)
|
||||
fun verifiedFollowKeySet(): Set<HexKey> = tags.mapNotNullTo(mutableSetOf(), ContactTag::parseValidKey)
|
||||
|
||||
/**
|
||||
* Returns a list of a-tags that are verified as correct.
|
||||
*/
|
||||
@Deprecated("Use CommunityListEvent instead.")
|
||||
fun verifiedFollowAddressSet(): Set<HexKey> = tags.mapNotNullTo(HashSet(), ATag::parseValidAddress)
|
||||
fun verifiedFollowAddressSet(): Set<HexKey> = tags.mapNotNullTo(mutableSetOf(), ATag::parseValidAddress)
|
||||
|
||||
fun unverifiedFollowKeySet() = tags.mapNotNull(ContactTag::parseKey)
|
||||
|
||||
|
||||
@@ -28,6 +28,7 @@ import com.vitorpamplona.quartz.nip01Core.hints.types.PubKeyHint
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.normalizer.RelayUrlNormalizer
|
||||
import com.vitorpamplona.quartz.nip19Bech32.decodePublicKey
|
||||
import com.vitorpamplona.quartz.utils.Hex
|
||||
import com.vitorpamplona.quartz.utils.Log
|
||||
import com.vitorpamplona.quartz.utils.arrayOfNotNull
|
||||
import com.vitorpamplona.quartz.utils.bytesUsedInMemory
|
||||
@@ -100,7 +101,11 @@ data class ContactTag(
|
||||
ensure(tag[0] == TAG_NAME) { return null }
|
||||
ensure(tag[1].length == 64) { return null }
|
||||
return try {
|
||||
decodePublicKey(tag[1]).toHexKey()
|
||||
if (Hex.isHex64(tag[1])) {
|
||||
tag[1]
|
||||
} else {
|
||||
null
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
Log.w("ContactListEvent", "Can't parse contact list pubkey ${tag.joinToString(", ")}", e)
|
||||
null
|
||||
|
||||
@@ -20,12 +20,6 @@
|
||||
*/
|
||||
package com.vitorpamplona.quartz.nip03Timestamp.ots
|
||||
|
||||
import kotlinx.datetime.TimeZone
|
||||
import kotlinx.datetime.number
|
||||
import kotlinx.datetime.toLocalDateTime
|
||||
import kotlin.time.ExperimentalTime
|
||||
import kotlin.time.Instant
|
||||
|
||||
/**
|
||||
* Class that lets us compare, sort, store and print timestamps.
|
||||
*/
|
||||
@@ -38,19 +32,12 @@ class VerifyResult(
|
||||
/**
|
||||
* Returns, if existing, a string representation describing the existence of a block attest
|
||||
*/
|
||||
@OptIn(ExperimentalTime::class)
|
||||
override fun toString(): String {
|
||||
if (height == 0 || timestamp == null) {
|
||||
return ""
|
||||
}
|
||||
|
||||
// 1. Create an Instant from the Unix timestamp (milliseconds)
|
||||
val instant = Instant.fromEpochMilliseconds(timestamp * 1000)
|
||||
|
||||
// 2. Convert the Instant to a LocalDateTime in the UTC time zone
|
||||
val dateTime = instant.toLocalDateTime(TimeZone.UTC)
|
||||
|
||||
return "block $height attests data existed as of ${dateTime.year}-${dateTime.month.number}-${dateTime.day} UTC"
|
||||
return "block $height attests data existed as of unix timestamp of $timestamp"
|
||||
}
|
||||
|
||||
override fun compareTo(other: VerifyResult): Int = this.height - other.height
|
||||
|
||||
@@ -36,23 +36,114 @@ object Hex {
|
||||
(LOWER_CASE_HEX[(it shr 4)].code shl 8) or LOWER_CASE_HEX[(it and 0xF)].code
|
||||
}
|
||||
|
||||
// 47ns in debug on the Emulator
|
||||
fun isHex(hex: String?): Boolean {
|
||||
if (hex.isNullOrEmpty()) return false
|
||||
if (hex == null) return false
|
||||
if (hex.length and 1 != 0) return false
|
||||
|
||||
try {
|
||||
for (c in hex.indices) {
|
||||
if (c < 0 || c > 255) return false
|
||||
if (hexToByte[hex[c].code] < 0) return false
|
||||
}
|
||||
return try {
|
||||
internalIsHex(hex, hexToByte)
|
||||
} catch (_: IllegalArgumentException) {
|
||||
// there are p tags with emoji's which makes the hex[c].code > 256
|
||||
return false
|
||||
false
|
||||
} catch (_: IndexOutOfBoundsException) {
|
||||
// there are p tags with emoji's which makes the hex[c].code > 256
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
// breaking this function away from the main one improves performance for some reason
|
||||
fun internalIsHex(
|
||||
hex: String,
|
||||
hexToByte: IntArray,
|
||||
): Boolean {
|
||||
for (c in hex.indices) {
|
||||
if (hexToByte[hex[c].code] < 0) return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// 30% faster than isHex
|
||||
fun isHex64(hex: String): Boolean =
|
||||
try {
|
||||
hexToByte[hex[0].code] >= 0 &&
|
||||
hexToByte[hex[1].code] >= 0 &&
|
||||
hexToByte[hex[2].code] >= 0 &&
|
||||
hexToByte[hex[3].code] >= 0 &&
|
||||
hexToByte[hex[4].code] >= 0 &&
|
||||
hexToByte[hex[5].code] >= 0 &&
|
||||
hexToByte[hex[6].code] >= 0 &&
|
||||
hexToByte[hex[7].code] >= 0 &&
|
||||
hexToByte[hex[8].code] >= 0 &&
|
||||
hexToByte[hex[9].code] >= 0 &&
|
||||
|
||||
hexToByte[hex[10].code] >= 0 &&
|
||||
hexToByte[hex[11].code] >= 0 &&
|
||||
hexToByte[hex[12].code] >= 0 &&
|
||||
hexToByte[hex[13].code] >= 0 &&
|
||||
hexToByte[hex[14].code] >= 0 &&
|
||||
hexToByte[hex[15].code] >= 0 &&
|
||||
hexToByte[hex[16].code] >= 0 &&
|
||||
hexToByte[hex[17].code] >= 0 &&
|
||||
hexToByte[hex[18].code] >= 0 &&
|
||||
hexToByte[hex[19].code] >= 0 &&
|
||||
|
||||
hexToByte[hex[20].code] >= 0 &&
|
||||
hexToByte[hex[21].code] >= 0 &&
|
||||
hexToByte[hex[22].code] >= 0 &&
|
||||
hexToByte[hex[23].code] >= 0 &&
|
||||
hexToByte[hex[24].code] >= 0 &&
|
||||
hexToByte[hex[25].code] >= 0 &&
|
||||
hexToByte[hex[26].code] >= 0 &&
|
||||
hexToByte[hex[27].code] >= 0 &&
|
||||
hexToByte[hex[28].code] >= 0 &&
|
||||
hexToByte[hex[29].code] >= 0 &&
|
||||
|
||||
hexToByte[hex[30].code] >= 0 &&
|
||||
hexToByte[hex[31].code] >= 0 &&
|
||||
hexToByte[hex[32].code] >= 0 &&
|
||||
hexToByte[hex[33].code] >= 0 &&
|
||||
hexToByte[hex[34].code] >= 0 &&
|
||||
hexToByte[hex[35].code] >= 0 &&
|
||||
hexToByte[hex[36].code] >= 0 &&
|
||||
hexToByte[hex[37].code] >= 0 &&
|
||||
hexToByte[hex[38].code] >= 0 &&
|
||||
hexToByte[hex[39].code] >= 0 &&
|
||||
|
||||
hexToByte[hex[40].code] >= 0 &&
|
||||
hexToByte[hex[41].code] >= 0 &&
|
||||
hexToByte[hex[42].code] >= 0 &&
|
||||
hexToByte[hex[43].code] >= 0 &&
|
||||
hexToByte[hex[44].code] >= 0 &&
|
||||
hexToByte[hex[45].code] >= 0 &&
|
||||
hexToByte[hex[46].code] >= 0 &&
|
||||
hexToByte[hex[47].code] >= 0 &&
|
||||
hexToByte[hex[48].code] >= 0 &&
|
||||
hexToByte[hex[49].code] >= 0 &&
|
||||
|
||||
hexToByte[hex[50].code] >= 0 &&
|
||||
hexToByte[hex[51].code] >= 0 &&
|
||||
hexToByte[hex[52].code] >= 0 &&
|
||||
hexToByte[hex[53].code] >= 0 &&
|
||||
hexToByte[hex[54].code] >= 0 &&
|
||||
hexToByte[hex[55].code] >= 0 &&
|
||||
hexToByte[hex[56].code] >= 0 &&
|
||||
hexToByte[hex[57].code] >= 0 &&
|
||||
hexToByte[hex[58].code] >= 0 &&
|
||||
hexToByte[hex[59].code] >= 0 &&
|
||||
|
||||
hexToByte[hex[60].code] >= 0 &&
|
||||
hexToByte[hex[61].code] >= 0 &&
|
||||
hexToByte[hex[62].code] >= 0 &&
|
||||
hexToByte[hex[63].code] >= 0
|
||||
} catch (_: IllegalArgumentException) {
|
||||
// there are p tags with emoji's which makes the hex[c].code > 256
|
||||
false
|
||||
} catch (_: IndexOutOfBoundsException) {
|
||||
// there are p tags with emoji's which makes the hex[c].code > 256
|
||||
false
|
||||
}
|
||||
|
||||
fun decode(hex: String): ByteArray {
|
||||
// faster version of hex decoder
|
||||
require(hex.length and 1 == 0)
|
||||
|
||||
@@ -32,6 +32,14 @@ class FilterSerializer : StdSerializer<Filter>(Filter::class.java) {
|
||||
) {
|
||||
gen.writeStartObject()
|
||||
|
||||
filter.kinds?.run {
|
||||
gen.writeArrayFieldStart("kinds")
|
||||
for (i in indices) {
|
||||
gen.writeNumber(this[i])
|
||||
}
|
||||
gen.writeEndArray()
|
||||
}
|
||||
|
||||
filter.ids?.run {
|
||||
gen.writeArrayFieldStart("ids")
|
||||
for (i in indices) {
|
||||
@@ -48,14 +56,6 @@ class FilterSerializer : StdSerializer<Filter>(Filter::class.java) {
|
||||
gen.writeEndArray()
|
||||
}
|
||||
|
||||
filter.kinds?.run {
|
||||
gen.writeArrayFieldStart("kinds")
|
||||
for (i in indices) {
|
||||
gen.writeNumber(this[i])
|
||||
}
|
||||
gen.writeEndArray()
|
||||
}
|
||||
|
||||
filter.tags?.run {
|
||||
entries.forEach { kv ->
|
||||
gen.writeArrayFieldStart("#${kv.key}")
|
||||
|
||||
@@ -24,6 +24,14 @@ import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.sockets.WebSocket
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.sockets.WebSocketListener
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.sockets.WebsocketBuilder
|
||||
import com.vitorpamplona.quartz.utils.Log
|
||||
import kotlinx.coroutines.CoroutineExceptionHandler
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.channels.trySendBlocking
|
||||
import kotlinx.coroutines.launch
|
||||
import okhttp3.OkHttpClient
|
||||
import okhttp3.Request
|
||||
import okhttp3.Response
|
||||
@@ -35,6 +43,14 @@ class BasicOkHttpWebSocket(
|
||||
val httpClient: (NormalizedRelayUrl) -> OkHttpClient,
|
||||
val out: WebSocketListener,
|
||||
) : WebSocket {
|
||||
companion object {
|
||||
// Exists to avoid exceptions stopping the coroutine
|
||||
val exceptionHandler =
|
||||
CoroutineExceptionHandler { _, throwable ->
|
||||
Log.e("BasicOkHttpWebSocket", "WebsocketListener Caught exception: ${throwable.message}", throwable)
|
||||
}
|
||||
}
|
||||
|
||||
private var socket: OkHttpWebSocket? = null
|
||||
|
||||
override fun needsReconnect() = socket == null
|
||||
@@ -44,6 +60,15 @@ class BasicOkHttpWebSocket(
|
||||
|
||||
val listener =
|
||||
object : OkHttpWebSocketListener() {
|
||||
val scope = CoroutineScope(Dispatchers.Default + exceptionHandler)
|
||||
val incomingMessages: Channel<String> = Channel(Channel.UNLIMITED)
|
||||
val job = // Launch a coroutine to process messages from the channel.
|
||||
scope.launch {
|
||||
for (message in incomingMessages) {
|
||||
out.onMessage(message)
|
||||
}
|
||||
}
|
||||
|
||||
override fun onOpen(
|
||||
webSocket: OkHttpWebSocket,
|
||||
response: Response,
|
||||
@@ -55,7 +80,13 @@ class BasicOkHttpWebSocket(
|
||||
override fun onMessage(
|
||||
webSocket: OkHttpWebSocket,
|
||||
text: String,
|
||||
) = out.onMessage(text)
|
||||
) {
|
||||
Log.d("OkHttpWebsocketListener", "Processing: $text")
|
||||
// Asynchronously send the received message to the channel.
|
||||
// `trySendBlocking` is used here for simplicity within the callback,
|
||||
// but it's important to understand potential thread blocking if the buffer is full.
|
||||
incomingMessages.trySendBlocking(text)
|
||||
}
|
||||
|
||||
override fun onClosing(
|
||||
webSocket: OkHttpWebSocket,
|
||||
@@ -67,13 +98,27 @@ class BasicOkHttpWebSocket(
|
||||
webSocket: OkHttpWebSocket,
|
||||
code: Int,
|
||||
reason: String,
|
||||
) = out.onClosed(code, reason)
|
||||
) {
|
||||
// Close the channel when the WebSocket connection is closed.
|
||||
incomingMessages.close()
|
||||
job.cancel()
|
||||
scope.cancel()
|
||||
|
||||
out.onClosed(code, reason)
|
||||
}
|
||||
|
||||
override fun onFailure(
|
||||
webSocket: OkHttpWebSocket,
|
||||
t: Throwable,
|
||||
response: Response?,
|
||||
) = out.onFailure(t, response?.code, response?.message)
|
||||
) {
|
||||
// Close the channel on failure, and propagate the error.
|
||||
incomingMessages.close()
|
||||
job.cancel()
|
||||
scope.cancel()
|
||||
|
||||
out.onFailure(t, response?.code, response?.message)
|
||||
}
|
||||
}
|
||||
|
||||
socket = httpClient(url).newWebSocket(request, listener)
|
||||
|
||||
@@ -0,0 +1,31 @@
|
||||
/**
|
||||
* Copyright (c) 2025 Vitor Pamplona
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
* this software and associated documentation files (the "Software"), to deal in
|
||||
* the Software without restriction, including without limitation the rights to use,
|
||||
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
|
||||
* Software, and to permit persons to whom the Software is furnished to do so,
|
||||
* subject to the following conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be included in all
|
||||
* copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
|
||||
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
|
||||
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
package com.vitorpamplona.quartz.nip01Core.relay
|
||||
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.sockets.okhttp.BasicOkHttpWebSocket
|
||||
import okhttp3.OkHttpClient
|
||||
|
||||
open class BaseNostrClientTest {
|
||||
companion object {
|
||||
val rootClient = OkHttpClient.Builder().build()
|
||||
val socketBuilder = BasicOkHttpWebSocket.Builder { url -> rootClient }
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,58 @@
|
||||
/**
|
||||
* Copyright (c) 2025 Vitor Pamplona
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
* this software and associated documentation files (the "Software"), to deal in
|
||||
* the Software without restriction, including without limitation the rights to use,
|
||||
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
|
||||
* Software, and to permit persons to whom the Software is furnished to do so,
|
||||
* subject to the following conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be included in all
|
||||
* copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
|
||||
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
|
||||
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
package com.vitorpamplona.quartz.nip01Core.relay
|
||||
|
||||
import com.vitorpamplona.quartz.nip01Core.metadata.MetadataEvent
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.client.NostrClient
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.client.accessories.downloadFirstEvent
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.filters.Filter
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import kotlin.test.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class NostrClientFirstEventTest : BaseNostrClientTest() {
|
||||
@Test
|
||||
fun testDownloadFirstEvent() =
|
||||
runBlocking {
|
||||
val appScope = CoroutineScope(Dispatchers.Default + SupervisorJob())
|
||||
val client = NostrClient(socketBuilder, appScope)
|
||||
|
||||
val event =
|
||||
client.downloadFirstEvent(
|
||||
relay = "wss://nos.lol",
|
||||
filter =
|
||||
Filter(
|
||||
kinds = listOf(MetadataEvent.KIND),
|
||||
authors = listOf("460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c"),
|
||||
),
|
||||
)
|
||||
|
||||
client.disconnect()
|
||||
appScope.cancel()
|
||||
|
||||
assertEquals(MetadataEvent.KIND, event?.kind)
|
||||
assertEquals("460c25e682fda7832b52d1f22d3d22b3176d972f60dcdc3212ed8c92ef85065c", event?.pubKey)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,111 @@
|
||||
/**
|
||||
* Copyright (c) 2025 Vitor Pamplona
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
* this software and associated documentation files (the "Software"), to deal in
|
||||
* the Software without restriction, including without limitation the rights to use,
|
||||
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
|
||||
* Software, and to permit persons to whom the Software is furnished to do so,
|
||||
* subject to the following conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be included in all
|
||||
* copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
|
||||
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
|
||||
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
package com.vitorpamplona.quartz.nip01Core.relay
|
||||
|
||||
import com.vitorpamplona.quartz.nip01Core.core.Event
|
||||
import com.vitorpamplona.quartz.nip01Core.metadata.MetadataEvent
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.client.NostrClient
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.client.listeners.IRelayClientListener
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.client.single.IRelayClient
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.filters.Filter
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.normalizer.RelayUrlNormalizer
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import kotlinx.coroutines.withTimeoutOrNull
|
||||
import kotlin.test.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class NostrClientManualSubTest : BaseNostrClientTest() {
|
||||
@Test
|
||||
fun testEoseAfter100Events() =
|
||||
runBlocking {
|
||||
val appScope = CoroutineScope(Dispatchers.Default + SupervisorJob())
|
||||
val client = NostrClient(socketBuilder, appScope)
|
||||
|
||||
val resultChannel = Channel<String>(UNLIMITED)
|
||||
val events = mutableListOf<String>()
|
||||
val mySubId = "test-sub-id-1"
|
||||
|
||||
val listener =
|
||||
object : IRelayClientListener {
|
||||
override fun onEvent(
|
||||
relay: IRelayClient,
|
||||
subId: String,
|
||||
event: Event,
|
||||
arrivalTime: Long,
|
||||
afterEOSE: Boolean,
|
||||
) {
|
||||
if (mySubId == subId) {
|
||||
resultChannel.trySend(event.id)
|
||||
}
|
||||
}
|
||||
|
||||
override fun onEOSE(
|
||||
relay: IRelayClient,
|
||||
subId: String,
|
||||
arrivalTime: Long,
|
||||
) {
|
||||
if (mySubId == subId) {
|
||||
resultChannel.trySend("EOSE")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
client.subscribe(listener)
|
||||
|
||||
val filters =
|
||||
mapOf(
|
||||
RelayUrlNormalizer.normalize("wss://relay.damus.io") to
|
||||
listOf(
|
||||
Filter(
|
||||
kinds = listOf(MetadataEvent.KIND),
|
||||
limit = 100,
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
client.openReqSubscription(mySubId, filters)
|
||||
|
||||
withTimeoutOrNull(30000) {
|
||||
while (events.size < 101) {
|
||||
val event = resultChannel.receive()
|
||||
events.add(event)
|
||||
}
|
||||
}
|
||||
|
||||
resultChannel.close()
|
||||
|
||||
client.close(mySubId)
|
||||
client.unsubscribe(listener)
|
||||
client.disconnect()
|
||||
|
||||
appScope.cancel()
|
||||
|
||||
assertEquals(101, events.size)
|
||||
assertEquals(true, events.take(100).all { it.length == 64 })
|
||||
assertEquals("EOSE", events[100])
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,137 @@
|
||||
/**
|
||||
* Copyright (c) 2025 Vitor Pamplona
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
* this software and associated documentation files (the "Software"), to deal in
|
||||
* the Software without restriction, including without limitation the rights to use,
|
||||
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
|
||||
* Software, and to permit persons to whom the Software is furnished to do so,
|
||||
* subject to the following conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be included in all
|
||||
* copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
|
||||
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
|
||||
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
package com.vitorpamplona.quartz.nip01Core.relay
|
||||
|
||||
import com.vitorpamplona.quartz.nip01Core.core.Event
|
||||
import com.vitorpamplona.quartz.nip01Core.metadata.MetadataEvent
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.client.NostrClient
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.client.listeners.IRelayClientListener
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.client.single.IRelayClient
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.filters.Filter
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.normalizer.RelayUrlNormalizer
|
||||
import com.vitorpamplona.quartz.nip65RelayList.AdvertisedRelayListEvent
|
||||
import com.vitorpamplona.quartz.utils.Log
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import kotlinx.coroutines.withTimeoutOrNull
|
||||
import kotlin.test.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class NostrClientRepeatSubTest : BaseNostrClientTest() {
|
||||
@Test
|
||||
fun testRepeatSubEvents() =
|
||||
runBlocking {
|
||||
val appScope = CoroutineScope(Dispatchers.Default + SupervisorJob())
|
||||
val client = NostrClient(socketBuilder, appScope)
|
||||
|
||||
val resultChannel = Channel<String>(UNLIMITED)
|
||||
val events = mutableListOf<String>()
|
||||
val mySubId = "test-sub-id-2"
|
||||
|
||||
val listener =
|
||||
object : IRelayClientListener {
|
||||
override fun onEvent(
|
||||
relay: IRelayClient,
|
||||
subId: String,
|
||||
event: Event,
|
||||
arrivalTime: Long,
|
||||
afterEOSE: Boolean,
|
||||
) {
|
||||
if (mySubId == subId) {
|
||||
resultChannel.trySend(event.id)
|
||||
}
|
||||
}
|
||||
|
||||
override fun onEOSE(
|
||||
relay: IRelayClient,
|
||||
subId: String,
|
||||
arrivalTime: Long,
|
||||
) {
|
||||
if (mySubId == subId) {
|
||||
resultChannel.trySend("EOSE")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
client.subscribe(listener)
|
||||
|
||||
val filters =
|
||||
mapOf(
|
||||
RelayUrlNormalizer.normalize("wss://relay.damus.io") to
|
||||
listOf(
|
||||
Filter(
|
||||
kinds = listOf(MetadataEvent.KIND),
|
||||
limit = 100,
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
val filters2 =
|
||||
mapOf(
|
||||
RelayUrlNormalizer.normalize("wss://relay.damus.io") to
|
||||
listOf(
|
||||
Filter(
|
||||
kinds = listOf(AdvertisedRelayListEvent.KIND),
|
||||
limit = 100,
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
coroutineScope {
|
||||
launch {
|
||||
withTimeoutOrNull(30000) {
|
||||
while (events.size < 202) {
|
||||
// simulates an update in the middle of the sub
|
||||
if (events.size == 1) {
|
||||
client.openReqSubscription(mySubId, filters2)
|
||||
}
|
||||
val event = resultChannel.receive()
|
||||
Log.d("OkHttpWebsocketListener", "Processing: ${events.size} $event")
|
||||
events.add(event)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
launch {
|
||||
client.openReqSubscription(mySubId, filters)
|
||||
}
|
||||
}
|
||||
|
||||
client.close(mySubId)
|
||||
client.unsubscribe(listener)
|
||||
client.disconnect()
|
||||
|
||||
appScope.cancel()
|
||||
|
||||
assertEquals(202, events.size)
|
||||
assertEquals(true, events.take(100).all { it.length == 64 })
|
||||
assertEquals("EOSE", events[100])
|
||||
assertEquals(true, events.drop(101).take(100).all { it.length == 64 })
|
||||
assertEquals("EOSE", events[201])
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,66 @@
|
||||
/**
|
||||
* Copyright (c) 2025 Vitor Pamplona
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
* this software and associated documentation files (the "Software"), to deal in
|
||||
* the Software without restriction, including without limitation the rights to use,
|
||||
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
|
||||
* Software, and to permit persons to whom the Software is furnished to do so,
|
||||
* subject to the following conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be included in all
|
||||
* copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
|
||||
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
|
||||
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
package com.vitorpamplona.quartz.nip01Core.relay
|
||||
|
||||
import com.vitorpamplona.quartz.nip01Core.crypto.KeyPair
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.client.NostrClient
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.client.accessories.sendAndWaitForResponse
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.normalizer.RelayUrlNormalizer
|
||||
import com.vitorpamplona.quartz.nip01Core.signers.NostrSignerInternal
|
||||
import com.vitorpamplona.quartz.nip10Notes.TextNoteEvent
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import kotlin.test.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class NostrClientSendAndWaitTest : BaseNostrClientTest() {
|
||||
@Test
|
||||
fun testSendAndWaitForResponse() =
|
||||
runBlocking {
|
||||
val appScope = CoroutineScope(Dispatchers.Default + SupervisorJob())
|
||||
val client = NostrClient(socketBuilder, appScope)
|
||||
|
||||
val randomSigner = NostrSignerInternal(KeyPair())
|
||||
|
||||
val event = randomSigner.sign(TextNoteEvent.build("Hello World"))
|
||||
|
||||
val resultDamus =
|
||||
client.sendAndWaitForResponse(
|
||||
event = event,
|
||||
relayList = setOf(RelayUrlNormalizer.normalize("wss://relay.damus.io")),
|
||||
)
|
||||
|
||||
val resultNos =
|
||||
client.sendAndWaitForResponse(
|
||||
event = event,
|
||||
relayList = setOf(RelayUrlNormalizer.normalize("wss://nos.lol")),
|
||||
)
|
||||
|
||||
client.disconnect()
|
||||
appScope.cancel()
|
||||
|
||||
assertEquals(true, resultDamus)
|
||||
assertEquals(false, resultNos)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,85 @@
|
||||
/**
|
||||
* Copyright (c) 2025 Vitor Pamplona
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
* this software and associated documentation files (the "Software"), to deal in
|
||||
* the Software without restriction, including without limitation the rights to use,
|
||||
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
|
||||
* Software, and to permit persons to whom the Software is furnished to do so,
|
||||
* subject to the following conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be included in all
|
||||
* copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
|
||||
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
|
||||
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
package com.vitorpamplona.quartz.nip01Core.relay
|
||||
|
||||
import com.vitorpamplona.quartz.nip01Core.core.Event
|
||||
import com.vitorpamplona.quartz.nip01Core.metadata.MetadataEvent
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.client.NostrClient
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.client.NostrClientSubscription
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.filters.Filter
|
||||
import com.vitorpamplona.quartz.nip01Core.relay.normalizer.RelayUrlNormalizer
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import kotlinx.coroutines.withTimeoutOrNull
|
||||
import kotlin.test.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class NostrClientSubscriptionTest : BaseNostrClientTest() {
|
||||
@Test
|
||||
fun testNostrClientSubscription() =
|
||||
runBlocking {
|
||||
val appScope = CoroutineScope(Dispatchers.Default + SupervisorJob())
|
||||
val client = NostrClient(socketBuilder, appScope)
|
||||
|
||||
val resultChannel = Channel<Event>(UNLIMITED)
|
||||
val events = mutableSetOf<Event>()
|
||||
|
||||
val sub =
|
||||
NostrClientSubscription(
|
||||
client = client,
|
||||
filter = {
|
||||
mapOf(
|
||||
RelayUrlNormalizer.normalize("wss://relay.damus.io") to
|
||||
listOf(
|
||||
Filter(
|
||||
kinds = listOf(MetadataEvent.KIND),
|
||||
limit = 100,
|
||||
),
|
||||
),
|
||||
)
|
||||
},
|
||||
) { event ->
|
||||
assertEquals(MetadataEvent.KIND, event.kind)
|
||||
resultChannel.trySend(event)
|
||||
}
|
||||
|
||||
withTimeoutOrNull(30000) {
|
||||
while (events.size < 100) {
|
||||
val event = resultChannel.receive()
|
||||
events.add(event)
|
||||
}
|
||||
}
|
||||
|
||||
resultChannel.close()
|
||||
|
||||
sub.closeSubscription()
|
||||
|
||||
client.disconnect()
|
||||
appScope.cancel()
|
||||
|
||||
assertEquals(100, events.size)
|
||||
}
|
||||
}
|
||||
@@ -20,16 +20,24 @@
|
||||
*/
|
||||
package com.vitorpamplona.quartz.utils
|
||||
|
||||
import java.time.LocalTime
|
||||
import java.time.format.DateTimeFormatter
|
||||
|
||||
actual object Log {
|
||||
// Define a formatter for the desired output format (e.g., HH:mm:ss)
|
||||
val formatter: DateTimeFormatter? = DateTimeFormatter.ofPattern("HH:mm:ss.SSS")
|
||||
|
||||
fun time() = LocalTime.now().format(formatter)
|
||||
|
||||
actual fun w(
|
||||
tag: String,
|
||||
message: String,
|
||||
throwable: Throwable?,
|
||||
) {
|
||||
if (throwable != null) {
|
||||
println("WARN: [$tag] $message. Throwable: ${throwable.message}")
|
||||
println("${time()} WARN : [$tag] $message. Throwable: ${throwable.message}")
|
||||
} else {
|
||||
println("WARN: [$tag] $message")
|
||||
println("${time()} WARN : [$tag] $message")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,9 +47,9 @@ actual object Log {
|
||||
throwable: Throwable?,
|
||||
) {
|
||||
if (throwable != null) {
|
||||
println("ERROR: [$tag] $message. Throwable: ${throwable.message}")
|
||||
println("${time()} ERROR: [$tag] $message. Throwable: ${throwable.message}")
|
||||
} else {
|
||||
println("ERROR: [$tag] $message")
|
||||
println("${time()} ERROR: [$tag] $message")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -49,13 +57,13 @@ actual object Log {
|
||||
tag: String,
|
||||
message: String,
|
||||
) {
|
||||
println("DEBUG: [$tag] $message")
|
||||
println("${time()} DEBUG: [$tag] $message")
|
||||
}
|
||||
|
||||
actual fun i(
|
||||
tag: String,
|
||||
message: String,
|
||||
) {
|
||||
println("INFO: [$tag] $message")
|
||||
println("${time()} INFO : [$tag] $message")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user