Moves subscription management from Ammolite to Quatrz

This commit is contained in:
Vitor Pamplona
2025-07-11 08:51:02 -04:00
parent a94c8673d5
commit 2ecfeb6e7f
36 changed files with 128 additions and 48 deletions

View File

@@ -1,53 +0,0 @@
/**
* Copyright (c) 2025 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays.datasources
import com.vitorpamplona.quartz.nip01Core.relay.client.single.newSubId
import com.vitorpamplona.quartz.nip01Core.relay.filters.Filter
import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl
import kotlin.contracts.ExperimentalContracts
data class Subscription(
val id: String = newSubId(),
val onEose: ((time: Long, relayUrl: NormalizedRelayUrl) -> Unit)? = null,
) {
private var filters: Map<NormalizedRelayUrl, List<Filter>>? = null // Inactive when null
fun reset() {
filters = null
}
fun updateFilters(newFilters: Map<NormalizedRelayUrl, List<Filter>>?) {
filters = newFilters
}
fun filters() = filters
@OptIn(ExperimentalContracts::class)
fun isActive() = filters != null
fun callEose(
time: Long,
relay: NormalizedRelayUrl,
) {
onEose?.let { it(time, relay) }
}
}

View File

@@ -1,126 +0,0 @@
/**
* Copyright (c) 2025 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays.datasources
import com.vitorpamplona.quartz.nip01Core.core.Event
import com.vitorpamplona.quartz.nip01Core.relay.client.NostrClient
import com.vitorpamplona.quartz.nip01Core.relay.client.listeners.IRelayClientListener
import com.vitorpamplona.quartz.nip01Core.relay.client.single.IRelayClient
import com.vitorpamplona.quartz.nip01Core.relay.filters.Filter
import com.vitorpamplona.quartz.nip01Core.relay.normalizer.NormalizedRelayUrl
import com.vitorpamplona.quartz.utils.LargeCache
/**
* Semantically groups Nostr filters and subscriptions in data source objects that
* maintain the desired active filter with the relay.
*/
class SubscriptionController(
val client: NostrClient,
) {
private val subscriptions = LargeCache<String, Subscription>()
private val stats = SubscriptionStats()
private val clientListener =
object : IRelayClientListener {
override fun onEvent(
relay: IRelayClient,
subId: String,
event: Event,
arrivalTime: Long,
afterEOSE: Boolean,
) {
if (subscriptions.containsKey(subId)) {
stats.add(subId, event.kind)
if (afterEOSE) {
subscriptions.get(subId)?.callEose(arrivalTime, relay.url)
}
}
}
override fun onEOSE(
relay: IRelayClient,
subId: String,
arrivalTime: Long,
) {
if (subscriptions.containsKey(subId)) {
subscriptions.get(subId)?.callEose(arrivalTime, relay.url)
}
}
}
init {
client.subscribe(clientListener)
}
fun destroy() {
client.unsubscribe(clientListener)
}
fun printStats(tag: String) = stats.printCounter(tag)
fun getSub(subId: String) = subscriptions.get(subId)
fun requestNewSubscription(
subId: String,
onEOSE: ((Long, NormalizedRelayUrl) -> Unit)? = null,
): Subscription = Subscription(subId, onEose = onEOSE).also { subscriptions.put(it.id, it) }
fun dismissSubscription(subId: String) = getSub(subId)?.let { dismissSubscription(it) }
fun dismissSubscription(subscription: Subscription) {
client.close(subscription.id)
subscription.reset()
subscriptions.remove(subscription.id)
}
fun updateRelays() {
val currentFilters =
subscriptions.associateWith { id, sub ->
client.getSubscriptionFiltersOrNull(id)
}
subscriptions.forEach { id, sub ->
updateRelaysIfNeeded(id, sub.filters(), currentFilters[id])
}
}
fun updateRelaysIfNeeded(
subId: String,
updatedFilters: Map<NormalizedRelayUrl, List<Filter>>?,
currentFilters: Map<NormalizedRelayUrl, List<Filter>>?,
) {
if (currentFilters != null) {
if (updatedFilters == null) {
// was active and is not active anymore, just close.
client.close(subId)
} else {
client.sendRequest(subId, updatedFilters)
}
} else {
if (updatedFilters == null) {
// was not active and is still not active, does nothing
} else {
// was not active and becomes active, sends the entire filter.
client.sendRequest(subId, updatedFilters)
}
}
}
}

View File

@@ -1,58 +0,0 @@
/**
* Copyright (c) 2025 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays.datasources
import android.util.Log
import com.vitorpamplona.quartz.utils.LargeCache
class SubscriptionStats {
data class Counter(
val subscriptionId: String,
val eventKind: Int,
) {
var counter: Int = 0
}
private var eventCounter = LargeCache<Int, Counter>()
private fun eventCounterIndex(
str1: String,
str2: Int,
): Int = 31 * str1.hashCode() + str2.hashCode()
fun add(
subscriptionId: String,
eventKind: Int,
) {
val key = eventCounterIndex(subscriptionId, eventKind)
val stats = eventCounter.getOrCreate(key) { Counter(subscriptionId, eventKind) }
stats.counter++
}
fun printCounter(tag: String) {
eventCounter.forEach { _, stats ->
Log.d(
tag,
"Received Events ${stats.subscriptionId} ${stats.eventKind}: ${stats.counter}",
)
}
}
}