Move all relay and datasource to ammolite package

This commit is contained in:
greenart7c3
2024-06-24 10:52:34 -03:00
parent b00781ec32
commit 66e4021d7b
70 changed files with 392 additions and 244 deletions

View File

@@ -0,0 +1,126 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays
import androidx.compose.runtime.Stable
import com.vitorpamplona.ammolite.service.checkNotInMainThread
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.AtomicBoolean
/** This class is designed to have a waiting time between two calls of invalidate */
@Stable
class BundledUpdate(
val delay: Long,
val dispatcher: CoroutineDispatcher = Dispatchers.Default,
) {
val scope = CoroutineScope(dispatcher + SupervisorJob())
private var onlyOneInBlock = AtomicBoolean()
private var invalidatesAgain = false
fun invalidate(
ignoreIfDoing: Boolean = false,
onUpdate: suspend () -> Unit,
) {
if (onlyOneInBlock.getAndSet(true)) {
if (!ignoreIfDoing) {
invalidatesAgain = true
}
return
}
scope.launch(dispatcher) {
try {
onUpdate()
delay(delay)
if (invalidatesAgain) {
onUpdate()
}
} finally {
withContext(NonCancellable) {
invalidatesAgain = false
onlyOneInBlock.set(false)
}
}
}
}
fun cancel() {
scope.cancel()
}
}
/** This class is designed to have a waiting time between two calls of invalidate */
@Stable
class BundledInsert<T>(
val delay: Long,
val dispatcher: CoroutineDispatcher = Dispatchers.Default,
) {
val scope = CoroutineScope(dispatcher + SupervisorJob())
private var onlyOneInBlock = AtomicBoolean()
private var queue = LinkedBlockingQueue<T>()
fun invalidateList(
newObject: T,
onUpdate: suspend (Set<T>) -> Unit,
) {
checkNotInMainThread()
queue.put(newObject)
if (onlyOneInBlock.getAndSet(true)) {
return
}
scope.launch(dispatcher) {
try {
val mySet = mutableSetOf<T>()
queue.drainTo(mySet)
if (mySet.isNotEmpty()) {
onUpdate(mySet)
}
delay(delay)
val mySet2 = mutableSetOf<T>()
queue.drainTo(mySet2)
if (mySet2.isNotEmpty()) {
onUpdate(mySet2)
}
} finally {
withContext(NonCancellable) { onlyOneInBlock.set(false) }
}
}
}
fun cancel() {
scope.cancel()
}
}

View File

@@ -0,0 +1,351 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays
import android.util.Log
import com.vitorpamplona.ammolite.service.checkNotInMainThread
import com.vitorpamplona.quartz.events.Event
import com.vitorpamplona.quartz.events.EventInterface
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import java.util.UUID
/**
* The Nostr Client manages multiple personae the user may switch between. Events are received and
* published through multiple relays. Events are stored with their respective persona.
*/
object Client : RelayPool.Listener {
private var listeners = setOf<Listener>()
private var relays = emptyArray<Relay>()
private var subscriptions = mapOf<String, List<TypedFilter>>()
@Synchronized
fun reconnect(
relays: Array<RelaySetupInfo>?,
onlyIfChanged: Boolean = false,
) {
Log.d("Relay", "Relay Pool Reconnecting to ${relays?.size} relays: \n${relays?.joinToString("\n") { it.url + " " + it.read + " " + it.write + " " + it.feedTypes.joinToString(",") { it.name } }}")
checkNotInMainThread()
if (onlyIfChanged) {
if (!isSameRelaySetConfig(relays)) {
if (Client.relays.isNotEmpty()) {
RelayPool.disconnect()
RelayPool.unregister(this)
RelayPool.unloadRelays()
}
if (relays != null) {
val newRelays = relays.map { Relay(it.url, it.read, it.write, it.feedTypes) }
RelayPool.register(this)
RelayPool.loadRelays(newRelays)
RelayPool.requestAndWatch()
Client.relays = newRelays.toTypedArray()
}
}
} else {
if (Client.relays.isNotEmpty()) {
RelayPool.disconnect()
RelayPool.unregister(this)
RelayPool.unloadRelays()
}
if (relays != null) {
val newRelays = relays.map { Relay(it.url, it.read, it.write, it.feedTypes) }
RelayPool.register(this)
RelayPool.loadRelays(newRelays)
RelayPool.requestAndWatch()
Client.relays = newRelays.toTypedArray()
}
}
}
fun isSameRelaySetConfig(newRelayConfig: Array<RelaySetupInfo>?): Boolean {
if (relays.size != newRelayConfig?.size) return false
relays.forEach { oldRelayInfo ->
val newRelayInfo = newRelayConfig.find { it.url == oldRelayInfo.url } ?: return false
if (!oldRelayInfo.isSameRelayConfig(newRelayInfo)) return false
}
return true
}
fun sendFilter(
subscriptionId: String = UUID.randomUUID().toString().substring(0..10),
filters: List<TypedFilter> = listOf(),
) {
checkNotInMainThread()
subscriptions = subscriptions + Pair(subscriptionId, filters)
RelayPool.sendFilter(subscriptionId, filters)
}
fun sendFilterAndStopOnFirstResponse(
subscriptionId: String = UUID.randomUUID().toString().substring(0..10),
filters: List<TypedFilter> = listOf(),
onResponse: (Event) -> Unit,
) {
checkNotInMainThread()
subscribe(
object : Listener() {
override fun onEvent(
event: Event,
subId: String,
relay: Relay,
afterEOSE: Boolean,
) {
if (subId == subscriptionId) {
onResponse(event)
unsubscribe(this)
close(subscriptionId)
}
}
},
)
subscriptions = subscriptions + Pair(subscriptionId, filters)
RelayPool.sendFilter(subscriptionId, filters)
}
fun sendFilterOnlyIfDisconnected(
subscriptionId: String = UUID.randomUUID().toString().substring(0..10),
filters: List<TypedFilter> = listOf(),
) {
checkNotInMainThread()
subscriptions = subscriptions + Pair(subscriptionId, filters)
RelayPool.connectAndSendFiltersIfDisconnected()
}
fun send(
signedEvent: EventInterface,
relay: String? = null,
feedTypes: Set<FeedType>? = null,
relayList: List<RelaySetupInfo>? = null,
onDone: (() -> Unit)? = null,
) {
checkNotInMainThread()
if (relayList != null) {
RelayPool.sendToSelectedRelays(relayList, signedEvent)
} else if (relay == null) {
RelayPool.send(signedEvent)
} else {
RelayPool.getOrCreateRelay(relay, feedTypes, onDone) {
it.send(signedEvent)
}
}
}
fun sendPrivately(
signedEvent: EventInterface,
relayList: List<String>,
) {
checkNotInMainThread()
relayList.forEach { relayUrl ->
RelayPool.getOrCreateRelay(relayUrl, emptySet(), { }) {
it.sendOverride(signedEvent)
}
}
}
fun close(subscriptionId: String) {
RelayPool.close(subscriptionId)
subscriptions = subscriptions.minus(subscriptionId)
}
fun isActive(subscriptionId: String): Boolean {
return subscriptions.contains(subscriptionId)
}
@OptIn(DelicateCoroutinesApi::class)
override fun onEvent(
event: Event,
subscriptionId: String,
relay: Relay,
afterEOSE: Boolean,
) {
// Releases the Web thread for the new payload.
// May need to add a processing queue if processing new events become too costly.
GlobalScope.launch(Dispatchers.Default) {
listeners.forEach { it.onEvent(event, subscriptionId, relay, afterEOSE) }
}
}
@OptIn(DelicateCoroutinesApi::class)
override fun onRelayStateChange(
type: Relay.StateType,
relay: Relay,
channel: String?,
) {
// Releases the Web thread for the new payload.
// May need to add a processing queue if processing new events become too costly.
GlobalScope.launch(Dispatchers.Default) {
listeners.forEach { it.onRelayStateChange(type, relay, channel) }
}
}
@OptIn(DelicateCoroutinesApi::class)
override fun onSendResponse(
eventId: String,
success: Boolean,
message: String,
relay: Relay,
) {
// Releases the Web thread for the new payload.
// May need to add a processing queue if processing new events become too costly.
GlobalScope.launch(Dispatchers.Default) {
listeners.forEach { it.onSendResponse(eventId, success, message, relay) }
}
}
@OptIn(DelicateCoroutinesApi::class)
override fun onAuth(
relay: Relay,
challenge: String,
) {
// Releases the Web thread for the new payload.
// May need to add a processing queue if processing new events become too costly.
GlobalScope.launch(Dispatchers.Default) { listeners.forEach { it.onAuth(relay, challenge) } }
}
@OptIn(DelicateCoroutinesApi::class)
override fun onNotify(
relay: Relay,
description: String,
) {
// Releases the Web thread for the new payload.
// May need to add a processing queue if processing new events become too costly.
GlobalScope.launch(Dispatchers.Default) {
listeners.forEach { it.onNotify(relay, description) }
}
}
@OptIn(DelicateCoroutinesApi::class)
override fun onSend(
relay: Relay,
msg: String,
success: Boolean,
) {
GlobalScope.launch(Dispatchers.Default) {
listeners.forEach { it.onSend(relay, msg, success) }
}
}
@OptIn(DelicateCoroutinesApi::class)
override fun onBeforeSend(
relay: Relay,
event: EventInterface,
) {
GlobalScope.launch(Dispatchers.Default) {
listeners.forEach { it.onBeforeSend(relay, event) }
}
}
@OptIn(DelicateCoroutinesApi::class)
override fun onError(
error: Error,
subscriptionId: String,
relay: Relay,
) {
GlobalScope.launch(Dispatchers.Default) {
listeners.forEach { it.onError(error, subscriptionId, relay) }
}
}
fun subscribe(listener: Listener) {
listeners = listeners.plus(listener)
}
fun isSubscribed(listener: Listener): Boolean {
return listeners.contains(listener)
}
fun unsubscribe(listener: Listener) {
listeners = listeners.minus(listener)
}
fun allSubscriptions(): Map<String, List<TypedFilter>> {
return subscriptions
}
fun getSubscriptionFilters(subId: String): List<TypedFilter> {
return subscriptions[subId] ?: emptyList()
}
abstract class Listener {
/** A new message was received */
open fun onEvent(
event: Event,
subscriptionId: String,
relay: Relay,
afterEOSE: Boolean,
) = Unit
/** Connected to or disconnected from a relay */
open fun onRelayStateChange(
type: Relay.StateType,
relay: Relay,
subscriptionId: String?,
) = Unit
/** When an relay saves or rejects a new event. */
open fun onSendResponse(
eventId: String,
success: Boolean,
message: String,
relay: Relay,
) = Unit
open fun onAuth(
relay: Relay,
challenge: String,
) = Unit
open fun onNotify(
relay: Relay,
description: String,
) = Unit
open fun onSend(
relay: Relay,
msg: String,
success: Boolean,
) = Unit
open fun onBeforeSend(
relay: Relay,
event: EventInterface,
) = Unit
open fun onError(
error: Error,
subscriptionId: String,
relay: Relay,
) = Unit
}
}

View File

@@ -0,0 +1,61 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays
import com.vitorpamplona.quartz.encoders.RelayUrlFormatter
object Constants {
val activeTypes = setOf(FeedType.FOLLOWS, FeedType.PRIVATE_DMS)
val activeTypesChats = setOf(FeedType.FOLLOWS, FeedType.PUBLIC_CHATS, FeedType.PRIVATE_DMS)
val activeTypesGlobalChats =
setOf(FeedType.FOLLOWS, FeedType.PUBLIC_CHATS, FeedType.PRIVATE_DMS, FeedType.GLOBAL)
val activeTypesSearch = setOf(FeedType.SEARCH)
fun convertDefaultRelays(): Array<Relay> {
return defaultRelays.map { Relay(it.url, it.read, it.write, it.feedTypes) }.toTypedArray()
}
val defaultRelays =
arrayOf(
// Free relays for only DMs, Chats and Follows due to the amount of spam
RelaySetupInfo(RelayUrlFormatter.normalize("wss://nostr.bitcoiner.social"), read = true, write = true, feedTypes = activeTypesChats),
RelaySetupInfo(RelayUrlFormatter.normalize("wss://relay.nostr.bg"), read = true, write = true, feedTypes = activeTypesChats),
RelaySetupInfo(RelayUrlFormatter.normalize("wss://nostr.oxtr.dev"), read = true, write = true, feedTypes = activeTypesChats),
RelaySetupInfo(RelayUrlFormatter.normalize("wss://nostr.fmt.wiz.biz"), read = true, write = false, feedTypes = activeTypesChats),
RelaySetupInfo(RelayUrlFormatter.normalize("wss://relay.damus.io"), read = true, write = true, feedTypes = activeTypes),
// Global
RelaySetupInfo(RelayUrlFormatter.normalize("wss://nostr.mom"), read = true, write = true, feedTypes = activeTypesGlobalChats),
RelaySetupInfo(RelayUrlFormatter.normalize("wss://nos.lol"), read = true, write = true, feedTypes = activeTypesGlobalChats),
// Paid relays
RelaySetupInfo(RelayUrlFormatter.normalize("wss://nostr.wine"), read = true, write = false, feedTypes = activeTypesGlobalChats),
// Supporting NIP-50
RelaySetupInfo(RelayUrlFormatter.normalize("wss://relay.nostr.band"), read = true, write = false, feedTypes = activeTypesSearch),
RelaySetupInfo(RelayUrlFormatter.normalize("wss://nostr.wine"), read = true, write = false, feedTypes = activeTypesSearch),
RelaySetupInfo(RelayUrlFormatter.normalize("wss://relay.noswhere.com"), read = true, write = false, feedTypes = activeTypesSearch),
)
val defaultSearchRelaySet =
setOf(
RelayUrlFormatter.normalize("wss://relay.nostr.band"),
RelayUrlFormatter.normalize("wss://nostr.wine"),
RelayUrlFormatter.normalize("wss://relay.noswhere.com"),
)
}

View File

@@ -0,0 +1,27 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays
class EOSETime(var time: Long) {
override fun toString(): String {
return time.toString()
}
}

View File

@@ -0,0 +1,87 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays
import com.vitorpamplona.quartz.events.Event
class JsonFilter(
val ids: List<String>? = null,
val authors: List<String>? = null,
val kinds: List<Int>? = null,
val tags: Map<String, List<String>>? = null,
val since: Map<String, EOSETime>? = null,
val until: Long? = null,
val limit: Int? = null,
val search: String? = null,
) {
fun toJson(forRelay: String? = null): String {
val factory = Event.mapper.nodeFactory
val filter =
factory.objectNode().apply {
ids?.run {
replace(
"ids",
factory.arrayNode(ids.size).apply { ids.forEach { add(it) } },
)
}
authors?.run {
replace(
"authors",
factory.arrayNode(authors.size).apply { authors.forEach { add(it) } },
)
}
kinds?.run {
replace(
"kinds",
factory.arrayNode(kinds.size).apply { kinds.forEach { add(it) } },
)
}
tags?.run {
entries.forEach { kv ->
replace(
"#${kv.key}",
factory.arrayNode(kv.value.size).apply { kv.value.forEach { add(it) } },
)
}
}
since?.run {
if (!isEmpty()) {
if (forRelay != null) {
val relaySince = get(forRelay)
if (relaySince != null) {
put("since", relaySince.time)
}
} else {
val jsonObjectSince = factory.objectNode()
entries.forEach { sincePairs ->
jsonObjectSince.put(sincePairs.key, "${sincePairs.value}")
}
put("since", jsonObjectSince)
}
}
}
until?.run { put("until", until) }
limit?.run { put("limit", limit) }
search?.run { put("search", search) }
}
return Event.mapper.writeValueAsString(filter)
}
}

View File

@@ -0,0 +1,315 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays
import android.util.Log
import com.vitorpamplona.ammolite.service.checkNotInMainThread
import com.vitorpamplona.quartz.events.Event
import com.vitorpamplona.quartz.utils.TimeUtils
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import java.util.UUID
import java.util.concurrent.atomic.AtomicBoolean
abstract class NostrDataSource(val debugName: String) {
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
private var subscriptions = mapOf<String, Subscription>()
data class Counter(val subscriptionId: String, val eventKind: Int, var counter: Int)
private var eventCounter = mapOf<Int, Counter>()
var changingFilters = AtomicBoolean()
private var active: Boolean = false
fun printCounter() {
eventCounter.forEach {
Log.d(
"STATE DUMP ${this.javaClass.simpleName}",
"Received Events $debugName ${it.value.subscriptionId} ${it.value.eventKind}: ${it.value.counter}",
)
}
}
fun hashCodeFields(
str1: String,
str2: Int,
): Int {
return 31 * str1.hashCode() + str2.hashCode()
}
private val clientListener =
object : Client.Listener() {
override fun onEvent(
event: Event,
subscriptionId: String,
relay: Relay,
afterEOSE: Boolean,
) {
if (subscriptions.containsKey(subscriptionId)) {
val key = hashCodeFields(subscriptionId, event.kind)
val keyValue = eventCounter[key]
if (keyValue != null) {
keyValue.counter++
} else {
eventCounter = eventCounter + Pair(key, Counter(subscriptionId, event.kind, 1))
}
// Log.d(this@NostrDataSource.javaClass.simpleName, "Relay ${relay.url}: ${event.kind}")
consume(event, relay)
if (afterEOSE) {
markAsEOSE(subscriptionId, relay)
}
}
}
override fun onRelayStateChange(
type: Relay.StateType,
relay: Relay,
subscriptionId: String?,
) {
// if (subscriptions.containsKey(subscriptionId)) {
// Log.d(this@NostrDataSource.javaClass.simpleName, "Relay ${relay.url} ${subscriptionId}
// ${type.name}")
// }
if (
type == Relay.StateType.EOSE &&
subscriptionId != null &&
subscriptions.containsKey(subscriptionId)
) {
markAsEOSE(subscriptionId, relay)
}
}
override fun onSendResponse(
eventId: String,
success: Boolean,
message: String,
relay: Relay,
) {
if (success) {
markAsSeenOnRelay(eventId, relay)
}
}
override fun onAuth(
relay: Relay,
challenge: String,
) {
auth(relay, challenge)
}
override fun onNotify(
relay: Relay,
description: String,
) {
notify(relay, description)
}
}
init {
Log.d(this.javaClass.simpleName, "${this.javaClass.simpleName} Subscribe")
Client.subscribe(clientListener)
}
fun destroy() {
// makes sure to run
Log.d(this.javaClass.simpleName, "${this.javaClass.simpleName} Unsubscribe")
stop()
Client.unsubscribe(clientListener)
scope.cancel()
bundler.cancel()
}
open fun start() {
println("DataSource: ${this.javaClass.simpleName} Start")
active = true
resetFilters()
}
open fun stop() {
active = false
println("DataSource: ${this.javaClass.simpleName} Stop")
GlobalScope.launch(Dispatchers.IO) {
subscriptions.values.forEach { subscription ->
Client.close(subscription.id)
subscription.typedFilters = null
}
}
}
open fun stopSync() {
active = false
println("DataSource: ${this.javaClass.simpleName} Stop")
subscriptions.values.forEach { subscription ->
Client.close(subscription.id)
subscription.typedFilters = null
}
}
fun requestNewChannel(onEOSE: ((Long, String) -> Unit)? = null): Subscription {
val newSubscription = Subscription(UUID.randomUUID().toString().substring(0, 4), onEOSE)
subscriptions = subscriptions + Pair(newSubscription.id, newSubscription)
return newSubscription
}
fun dismissChannel(subscription: Subscription) {
Client.close(subscription.id)
subscriptions = subscriptions.minus(subscription.id)
}
// Refreshes observers in batches.
private val bundler = BundledUpdate(300, Dispatchers.IO)
fun invalidateFilters() {
bundler.invalidate {
// println("DataSource: ${this.javaClass.simpleName} InvalidateFilters")
// adds the time to perform the refresh into this delay
// holding off new updates in case of heavy refresh routines.
resetFiltersSuspend()
}
}
fun resetFilters() {
scope.launch(Dispatchers.IO) { resetFiltersSuspend() }
}
fun resetFiltersSuspend() {
println("DataSource: ${this.javaClass.simpleName} resetFiltersSuspend $active")
checkNotInMainThread()
// saves the channels that are currently active
val activeSubscriptions = subscriptions.values.filter { it.typedFilters != null }
// saves the current content to only update if it changes
val currentFilters = activeSubscriptions.associate { it.id to it.typedFilters }
changingFilters.getAndSet(true)
updateChannelFilters()
// Makes sure to only send an updated filter when it actually changes.
subscriptions.values.forEach { updatedSubscription ->
val updatedSubscriptionNewFilters = updatedSubscription.typedFilters
val isActive = Client.isActive(updatedSubscription.id)
if (!isActive && updatedSubscriptionNewFilters != null) {
// Filter was removed from the active list
if (active) {
Client.sendFilter(updatedSubscription.id, updatedSubscriptionNewFilters)
}
} else {
if (currentFilters.containsKey(updatedSubscription.id)) {
if (updatedSubscriptionNewFilters == null) {
// was active and is not active anymore, just close.
Client.close(updatedSubscription.id)
} else {
// was active and is still active, check if it has changed.
if (updatedSubscription.hasChangedFiltersFrom(currentFilters[updatedSubscription.id])) {
Client.close(updatedSubscription.id)
if (active) {
Client.sendFilter(updatedSubscription.id, updatedSubscriptionNewFilters)
}
} else {
// hasn't changed, does nothing.
if (active) {
Client.sendFilterOnlyIfDisconnected(
updatedSubscription.id,
updatedSubscriptionNewFilters,
)
}
}
}
} else {
if (updatedSubscriptionNewFilters == null) {
// was not active and is still not active, does nothing
} else {
// was not active and becomes active, sends the filter.
if (updatedSubscription.hasChangedFiltersFrom(currentFilters[updatedSubscription.id])) {
if (active) {
Log.d(
this@NostrDataSource.javaClass.simpleName,
"Update Filter 3 ${updatedSubscription.id} ${Client.isSubscribed(clientListener)}",
)
Client.sendFilter(updatedSubscription.id, updatedSubscriptionNewFilters)
}
}
}
}
}
}
changingFilters.getAndSet(false)
}
open fun consume(
event: Event,
relay: Relay,
) {
// LocalCache.verifyAndConsume(event, relay)
}
open fun markAsSeenOnRelay(
eventId: String,
relay: Relay,
) {
// val note = LocalCache.getNoteIfExists(eventId)
// val noteEvent = note?.event
// if (noteEvent is AddressableEvent) {
// LocalCache.getAddressableNoteIfExists(noteEvent.address().toTag())?.addRelay(relay)
// } else {
// note?.addRelay(relay)
// }
}
open fun markAsEOSE(
subscriptionId: String,
relay: Relay,
) {
subscriptions[subscriptionId]?.updateEOSE(
// in case people's clock is slighly off.
TimeUtils.oneMinuteAgo(),
relay.url,
)
}
abstract fun updateChannelFilters()
open fun auth(
relay: Relay,
challenge: String,
) = Unit
open fun notify(
relay: Relay,
description: String,
) = Unit
}

View File

@@ -0,0 +1,616 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays
import android.util.Log
import com.vitorpamplona.ammolite.BuildConfig
import com.vitorpamplona.ammolite.service.HttpClientManager
import com.vitorpamplona.ammolite.service.checkNotInMainThread
import com.vitorpamplona.quartz.encoders.HexKey
import com.vitorpamplona.quartz.events.Event
import com.vitorpamplona.quartz.events.EventInterface
import com.vitorpamplona.quartz.events.RelayAuthEvent
import com.vitorpamplona.quartz.utils.TimeUtils
import com.vitorpamplona.quartz.utils.bytesUsedInMemory
import kotlinx.coroutines.CancellationException
import okhttp3.Request
import okhttp3.Response
import okhttp3.WebSocket
import okhttp3.WebSocketListener
import java.util.concurrent.atomic.AtomicBoolean
enum class FeedType {
FOLLOWS,
PUBLIC_CHATS,
PRIVATE_DMS,
GLOBAL,
SEARCH,
WALLET_CONNECT,
}
val COMMON_FEED_TYPES =
setOf(FeedType.FOLLOWS, FeedType.PUBLIC_CHATS, FeedType.PRIVATE_DMS, FeedType.GLOBAL)
val EVENT_FINDER_TYPES =
setOf(FeedType.FOLLOWS, FeedType.PUBLIC_CHATS, FeedType.GLOBAL)
class Relay(
val url: String,
val read: Boolean = true,
val write: Boolean = true,
val activeTypes: Set<FeedType> = FeedType.values().toSet(),
) {
companion object {
// waits 3 minutes to reconnect once things fail
const val RECONNECTING_IN_SECONDS = 60 * 3
}
val brief = RelayBriefInfoCache.get(url)
private var listeners = setOf<Listener>()
private var socket: WebSocket? = null
private var isReady: Boolean = false
private var usingCompression: Boolean = false
private var lastConnectTentative: Long = 0L
private var afterEOSEPerSubscription = mutableMapOf<String, Boolean>()
private val authResponse = mutableMapOf<HexKey, Boolean>()
private val sendWhenReady = mutableListOf<EventInterface>()
fun register(listener: Listener) {
listeners = listeners.plus(listener)
}
fun unregister(listener: Listener) {
listeners = listeners.minus(listener)
}
fun isConnected(): Boolean = socket != null
fun connect() {
connectAndRun {
checkNotInMainThread()
// Sends everything.
renewFilters()
}
}
private var connectingBlock = AtomicBoolean()
fun connectAndRun(onConnected: (Relay) -> Unit) {
Log.d("Relay", "Relay.connect $url isAlreadyConnecting: ${connectingBlock.get()}")
// BRB is crashing OkHttp Deflater object :(
if (url.contains("brb.io")) return
// If there is a connection, don't wait.
if (connectingBlock.getAndSet(true)) {
return
}
try {
checkNotInMainThread()
if (socket != null) {
connectingBlock.set(false)
return
}
lastConnectTentative = TimeUtils.now()
val request =
Request
.Builder()
.header("User-Agent", HttpClientManager.getDefaultUserAgentHeader())
.url(url.trim())
.build()
socket = HttpClientManager.getHttpClientForUrl(url).newWebSocket(request, RelayListener(onConnected))
} catch (e: Exception) {
if (e is CancellationException) throw e
RelayStats.newError(url, e.message)
markConnectionAsClosed()
e.printStackTrace()
} finally {
connectingBlock.set(false)
}
}
inner class RelayListener(
val onConnected: (Relay) -> Unit,
) : WebSocketListener() {
override fun onOpen(
webSocket: WebSocket,
response: Response,
) {
checkNotInMainThread()
Log.d("Relay", "Connect onOpen $url $socket")
markConnectionAsReady(
pingInMs = response.receivedResponseAtMillis - response.sentRequestAtMillis,
usingCompression =
response.headers.get("Sec-WebSocket-Extensions")?.contains("permessage-deflate") ?: false,
)
// Log.w("Relay", "Relay OnOpen, Loading All subscriptions $url")
onConnected(this@Relay)
synchronized(sendWhenReady) {
sendWhenReady.forEach {
send(it)
}
sendWhenReady.clear()
}
listeners.forEach { it.onRelayStateChange(this@Relay, StateType.CONNECT, null) }
}
override fun onMessage(
webSocket: WebSocket,
text: String,
) {
checkNotInMainThread()
RelayStats.addBytesReceived(url, text.bytesUsedInMemory())
try {
processNewRelayMessage(text)
} catch (e: Throwable) {
if (e is CancellationException) throw e
e.printStackTrace()
text.chunked(2000) { chunked ->
listeners.forEach { it.onError(this@Relay, "", Error("Problem with $chunked")) }
}
}
}
override fun onClosing(
webSocket: WebSocket,
code: Int,
reason: String,
) {
checkNotInMainThread()
Log.w("Relay", "Relay onClosing $url: $reason")
listeners.forEach {
it.onRelayStateChange(
this@Relay,
StateType.DISCONNECTING,
null,
)
}
}
override fun onClosed(
webSocket: WebSocket,
code: Int,
reason: String,
) {
checkNotInMainThread()
markConnectionAsClosed()
Log.w("Relay", "Relay onClosed $url: $reason")
listeners.forEach { it.onRelayStateChange(this@Relay, StateType.DISCONNECT, null) }
}
override fun onFailure(
webSocket: WebSocket,
t: Throwable,
response: Response?,
) {
checkNotInMainThread()
socket?.cancel() // 1000, "Normal close"
// checks if this is an actual failure. Closing the socket generates an onFailure as well.
if (!(socket == null && (t.message == "Socket is closed" || t.message == "Socket closed"))) {
RelayStats.newError(url, response?.message ?: t.message)
Log.w("Relay", "Relay onFailure $url, ${response?.message} $response ${t.message} $socket")
t.printStackTrace()
listeners.forEach {
it.onError(
this@Relay,
"",
Error("WebSocket Failure. Response: $response. Exception: ${t.message}", t),
)
}
}
// Failures disconnect the relay.
markConnectionAsClosed()
}
}
fun markConnectionAsReady(
pingInMs: Long,
usingCompression: Boolean,
) {
this.resetEOSEStatuses()
this.isReady = true
this.usingCompression = usingCompression
RelayStats.setPing(url, pingInMs)
}
fun markConnectionAsClosed() {
this.socket = null
this.isReady = false
this.usingCompression = false
this.resetEOSEStatuses()
}
fun processNewRelayMessage(newMessage: String) {
val msgArray = Event.mapper.readTree(newMessage)
when (val type = msgArray.get(0).asText()) {
"EVENT" -> {
val subscriptionId = msgArray.get(1).asText()
val event = Event.fromJson(msgArray.get(2))
// Log.w("Relay", "Relay onEVENT ${event.kind} $url, $subscriptionId ${msgArray.get(2)}")
listeners.forEach {
it.onEvent(
this@Relay,
subscriptionId,
event,
afterEOSEPerSubscription[subscriptionId] == true,
)
}
}
"EOSE" ->
listeners.forEach {
val subscriptionId = msgArray.get(1).asText()
afterEOSEPerSubscription[subscriptionId] = true
// Log.w("Relay", "Relay onEOSE $url $subscriptionId")
it.onRelayStateChange(this@Relay, StateType.EOSE, subscriptionId)
}
"NOTICE" ->
listeners.forEach {
val message = msgArray.get(1).asText()
Log.w("Relay", "Relay onNotice $url, $message")
RelayStats.newNotice(url, message)
it.onError(this@Relay, message, Error("Relay sent notice: $message"))
}
"OK" ->
listeners.forEach {
val eventId = msgArray[1].asText()
val success = msgArray[2].asBoolean()
val message = if (msgArray.size() > 2) msgArray[3].asText() else ""
if (authResponse.containsKey(eventId)) {
val wasAlreadyAuthenticated = authResponse.get(eventId)
authResponse.put(eventId, success)
if (wasAlreadyAuthenticated != true && success) {
renewFilters()
}
}
Log.w("Relay", "Relay on OK $url, $eventId, $success, $message")
it.onSendResponse(this@Relay, eventId, success, message)
}
"AUTH" ->
listeners.forEach {
// Log.w("Relay", "Relay onAuth $url, ${msg[1].asString}")
it.onAuth(this@Relay, msgArray[1].asText())
}
"NOTIFY" ->
listeners.forEach {
// Log.w("Relay", "Relay onNotify $url, ${msg[1].asString}")
it.onNotify(this@Relay, msgArray[1].asText())
}
"CLOSED" -> listeners.forEach { Log.w("Relay", "Relay onClosed $url, $newMessage") }
else -> {
RelayStats.newError(url, "Unsupported message: $newMessage")
listeners.forEach {
Log.w("Relay", "Unsupported message: $newMessage")
it.onError(
this@Relay,
"",
Error("Unknown type $type on channel. Msg was $newMessage"),
)
}
}
}
}
fun disconnect() {
Log.d("Relay", "Relay.disconnect $url")
checkNotInMainThread()
lastConnectTentative = 0L // this is not an error, so prepare to reconnect as soon as requested.
socket?.cancel()
socket = null
isReady = false
usingCompression = false
resetEOSEStatuses()
}
fun resetEOSEStatuses() {
afterEOSEPerSubscription = LinkedHashMap(afterEOSEPerSubscription.size)
}
fun sendFilter(
requestId: String,
filters: List<TypedFilter>,
) {
checkNotInMainThread()
if (read) {
if (isConnected()) {
if (isReady) {
val relayFilters =
filters.filter { filter ->
activeTypes.any { it in filter.types }
}
if (relayFilters.isNotEmpty()) {
val request =
relayFilters.joinToStringLimited(
separator = ",",
limit = 20,
prefix = """["REQ","$requestId",""",
postfix = "]",
) {
it.filter.toJson(url)
}
writeToSocket(request)
afterEOSEPerSubscription[requestId] = false
}
}
} else {
// waits 60 seconds to reconnect after disconnected.
if (TimeUtils.now() > lastConnectTentative + RECONNECTING_IN_SECONDS) {
// sends all filters after connection is successful.
connect()
}
}
}
}
fun <T> Iterable<T>.joinToStringLimited(
separator: CharSequence = ", ",
prefix: CharSequence = "",
postfix: CharSequence = "",
limit: Int = -1,
transform: ((T) -> CharSequence)? = null,
): String {
val buffer = StringBuilder()
buffer.append(prefix)
var count = 0
for (element in this) {
if (limit < 0 || count <= limit) {
if (++count > 1) buffer.append(separator)
when {
transform != null -> buffer.append(transform(element))
element is CharSequence? -> buffer.append(element)
element is Char -> buffer.append(element)
else -> buffer.append(element.toString())
}
} else {
break
}
}
buffer.append(postfix)
return buffer.toString()
}
fun connectAndSendFiltersIfDisconnected() {
checkNotInMainThread()
if (socket == null) {
// waits 60 seconds to reconnect after disconnected.
if (TimeUtils.now() > lastConnectTentative + RECONNECTING_IN_SECONDS) {
// println("sendfilter Only if Disconnected ${url} ")
connect()
}
}
}
fun renewFilters() {
// Force update all filters after AUTH.
Client.allSubscriptions().forEach {
sendFilter(requestId = it.key, it.value)
}
}
// This function sends the event regardless of the relay being write or not.
fun sendOverride(signedEvent: EventInterface) {
checkNotInMainThread()
listeners.forEach { listener ->
listener.onBeforeSend(this@Relay, signedEvent)
}
if (signedEvent is RelayAuthEvent) {
sendAuth(signedEvent)
} else {
sendEvent(signedEvent)
}
}
fun send(signedEvent: EventInterface) {
checkNotInMainThread()
listeners.forEach { listener ->
listener.onBeforeSend(this@Relay, signedEvent)
}
if (signedEvent is RelayAuthEvent) {
sendAuth(signedEvent)
} else {
if (write) {
if (isConnected()) {
if (isReady) {
writeToSocket("""["EVENT",${signedEvent.toJson()}]""")
} else {
synchronized(sendWhenReady) {
sendWhenReady.add(signedEvent)
}
}
} else {
// sends all filters after connection is successful.
connectAndRun {
writeToSocket("""["EVENT",${signedEvent.toJson()}]""")
// Sends everything.
renewFilters()
}
}
}
}
}
private fun sendAuth(signedEvent: RelayAuthEvent) {
authResponse.put(signedEvent.id, false)
writeToSocket("""["AUTH",${signedEvent.toJson()}]""")
}
private fun sendEvent(signedEvent: EventInterface) {
if (isConnected()) {
if (isReady) {
writeToSocket("""["EVENT",${signedEvent.toJson()}]""")
} else {
synchronized(sendWhenReady) {
sendWhenReady.add(signedEvent)
}
}
} else {
// sends all filters after connection is successful.
connectAndRun {
writeToSocket("""["EVENT",${signedEvent.toJson()}]""")
// Sends everything.
renewFilters()
}
}
}
private fun writeToSocket(str: String) {
socket?.let {
checkNotInMainThread()
val result = it.send(str)
listeners.forEach { listener ->
listener.onSend(this@Relay, str, result)
}
RelayStats.addBytesSent(url, str.bytesUsedInMemory())
if (BuildConfig.DEBUG) {
Log.d("Relay", "Relay send $url $str")
}
}
}
fun close(subscriptionId: String) {
writeToSocket("""["CLOSE","$subscriptionId"]""")
}
fun isSameRelayConfig(other: RelaySetupInfo): Boolean =
url == other.url &&
write == other.write &&
read == other.read &&
activeTypes == other.feedTypes
enum class StateType {
// Websocket connected
CONNECT,
// Websocket disconnecting
DISCONNECTING,
// Websocket disconnected
DISCONNECT,
// End Of Stored Events
EOSE,
}
interface Listener {
/** A new message was received */
fun onEvent(
relay: Relay,
subscriptionId: String,
event: Event,
afterEOSE: Boolean,
)
fun onError(
relay: Relay,
subscriptionId: String,
error: Error,
)
fun onSendResponse(
relay: Relay,
eventId: String,
success: Boolean,
message: String,
)
fun onAuth(
relay: Relay,
challenge: String,
)
/**
* Connected to or disconnected from a relay
*
* @param type is 0 for disconnect and 1 for connect
*/
fun onRelayStateChange(
relay: Relay,
type: StateType,
channel: String?,
)
/** Relay sent an invoice */
fun onNotify(
relay: Relay,
description: String,
)
fun onSend(
relay: Relay,
msg: String,
success: Boolean,
)
fun onBeforeSend(
relay: Relay,
event: EventInterface,
)
}
}

View File

@@ -0,0 +1,46 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays
import android.util.LruCache
import androidx.compose.runtime.Immutable
import com.vitorpamplona.quartz.encoders.RelayUrlFormatter
object RelayBriefInfoCache {
val cache = LruCache<String, RelayBriefInfo?>(50)
@Immutable
class RelayBriefInfo(
val url: String,
) {
val displayUrl: String = RelayUrlFormatter.displayUrl(url).intern()
val favIcon: String = "https://$displayUrl/favicon.ico".intern()
}
fun get(url: String): RelayBriefInfo {
val info = cache[url]
if (info != null) return info
val newInfo = RelayBriefInfo(url)
cache.put(url, newInfo)
return newInfo
}
}

View File

@@ -0,0 +1,326 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays
import androidx.compose.runtime.Immutable
import com.vitorpamplona.ammolite.service.checkNotInMainThread
import com.vitorpamplona.quartz.events.Event
import com.vitorpamplona.quartz.events.EventInterface
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.launch
/**
* RelayPool manages the connection to multiple Relays and lets consumers deal with simple events.
*/
object RelayPool : Relay.Listener {
private var relays = listOf<Relay>()
private var listeners = setOf<Listener>()
// Backing property to avoid flow emissions from other classes
private var lastStatus = RelayPoolStatus(0, 0)
private val _statusFlow =
MutableSharedFlow<RelayPoolStatus>(1, 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
val statusFlow: SharedFlow<RelayPoolStatus> = _statusFlow.asSharedFlow()
fun availableRelays(): Int {
return relays.size
}
fun connectedRelays(): Int {
return relays.count { it.isConnected() }
}
fun getRelay(url: String): Relay? {
return relays.firstOrNull { it.url == url }
}
fun getRelays(url: String): List<Relay> {
return relays.filter { it.url == url }
}
fun getAll() = relays
fun getOrCreateRelay(
url: String,
feedTypes: Set<FeedType>? = null,
onDone: (() -> Unit)? = null,
whenConnected: (Relay) -> Unit,
) {
synchronized(this) {
val matching = getRelays(url)
if (matching.isNotEmpty()) {
matching.forEach { whenConnected(it) }
} else {
/** temporary connection */
newSporadicRelay(
url,
feedTypes,
onConnected = whenConnected,
onDone = onDone,
)
}
}
}
@OptIn(DelicateCoroutinesApi::class)
fun newSporadicRelay(
url: String,
feedTypes: Set<FeedType>?,
onConnected: (Relay) -> Unit,
onDone: (() -> Unit)?,
timeout: Long = 60000,
) {
val relay = Relay(url, true, true, feedTypes ?: emptySet())
addRelay(relay)
relay.connectAndRun {
Client.allSubscriptions().forEach {
relay.sendFilter(it.key, it.value)
}
onConnected(relay)
GlobalScope.launch(Dispatchers.IO) {
delay(timeout) // waits for a reply
relay.disconnect()
removeRelay(relay)
if (onDone != null) {
onDone()
}
}
}
}
fun loadRelays(relayList: List<Relay>) {
if (!relayList.isNullOrEmpty()) {
relayList.forEach { addRelay(it) }
} else {
Constants.convertDefaultRelays().forEach { addRelay(it) }
}
}
fun unloadRelays() {
relays.forEach { it.unregister(this) }
relays = listOf()
}
fun requestAndWatch() {
checkNotInMainThread()
relays.forEach { it.connect() }
}
fun sendFilter(
subscriptionId: String,
filters: List<TypedFilter>,
) {
relays.forEach { relay ->
relay.sendFilter(subscriptionId, filters)
}
}
fun connectAndSendFiltersIfDisconnected() {
relays.forEach { it.connectAndSendFiltersIfDisconnected() }
}
fun sendToSelectedRelays(
list: List<RelaySetupInfo>,
signedEvent: EventInterface,
) {
list.forEach { relay -> relays.filter { it.url == relay.url }.forEach { it.sendOverride(signedEvent) } }
}
fun send(signedEvent: EventInterface) {
relays.forEach { it.send(signedEvent) }
}
fun sendOverride(signedEvent: EventInterface) {
relays.forEach { it.sendOverride(signedEvent) }
}
fun close(subscriptionId: String) {
relays.forEach { it.close(subscriptionId) }
}
fun disconnect() {
relays.forEach { it.disconnect() }
}
fun addRelay(relay: Relay) {
relay.register(this)
relays += relay
updateStatus()
}
fun removeRelay(relay: Relay) {
relay.unregister(this)
relays = relays.minus(relay)
updateStatus()
}
fun register(listener: Listener) {
listeners = listeners.plus(listener)
}
fun unregister(listener: Listener) {
listeners = listeners.minus(listener)
}
interface Listener {
fun onEvent(
event: Event,
subscriptionId: String,
relay: Relay,
afterEOSE: Boolean,
)
fun onRelayStateChange(
type: Relay.StateType,
relay: Relay,
channel: String?,
)
fun onSendResponse(
eventId: String,
success: Boolean,
message: String,
relay: Relay,
)
fun onAuth(
relay: Relay,
challenge: String,
)
fun onNotify(
relay: Relay,
description: String,
)
fun onSend(
relay: Relay,
msg: String,
success: Boolean,
)
fun onBeforeSend(
relay: Relay,
event: EventInterface,
)
fun onError(
error: Error,
subscriptionId: String,
relay: Relay,
)
}
override fun onEvent(
relay: Relay,
subscriptionId: String,
event: Event,
afterEOSE: Boolean,
) {
listeners.forEach { it.onEvent(event, subscriptionId, relay, afterEOSE) }
}
override fun onError(
relay: Relay,
subscriptionId: String,
error: Error,
) {
listeners.forEach { it.onError(error, subscriptionId, relay) }
updateStatus()
}
override fun onRelayStateChange(
relay: Relay,
type: Relay.StateType,
channel: String?,
) {
listeners.forEach { it.onRelayStateChange(type, relay, channel) }
if (type != Relay.StateType.EOSE) {
updateStatus()
}
}
override fun onSendResponse(
relay: Relay,
eventId: String,
success: Boolean,
message: String,
) {
listeners.forEach { it.onSendResponse(eventId, success, message, relay) }
}
override fun onAuth(
relay: Relay,
challenge: String,
) {
listeners.forEach { it.onAuth(relay, challenge) }
}
override fun onNotify(
relay: Relay,
description: String,
) {
listeners.forEach { it.onNotify(relay, description) }
}
override fun onSend(
relay: Relay,
msg: String,
success: Boolean,
) {
listeners.forEach { it.onSend(relay, msg, success) }
}
override fun onBeforeSend(
relay: Relay,
event: EventInterface,
) {
listeners.forEach { it.onBeforeSend(relay, event) }
}
private fun updateStatus() {
val connected = connectedRelays()
val available = availableRelays()
if (lastStatus.connected != connected || lastStatus.available != available) {
lastStatus = RelayPoolStatus(connected, available)
_statusFlow.tryEmit(lastStatus)
}
}
}
@Immutable
data class RelayPoolStatus(
val connected: Int,
val available: Int,
val isConnected: Boolean = connected > 0,
)

View File

@@ -0,0 +1,31 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays
import androidx.compose.runtime.Immutable
@Immutable
data class RelaySetupInfo(
val url: String,
val read: Boolean,
val write: Boolean,
val feedTypes: Set<FeedType>,
)

View File

@@ -0,0 +1,138 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays
import androidx.collection.LruCache
import com.vitorpamplona.quartz.utils.TimeUtils
object RelayStats {
private val innerCache = mutableMapOf<String, RelayStat>()
fun get(url: String): RelayStat {
return innerCache.getOrPut(url) { RelayStat() }
}
fun addBytesReceived(
url: String,
bytesUsedInMemory: Int,
) {
get(url).addBytesReceived(bytesUsedInMemory)
}
fun addBytesSent(
url: String,
bytesUsedInMemory: Int,
) {
get(url).addBytesSent(bytesUsedInMemory)
}
fun newError(
url: String,
error: String?,
) {
get(url).newError(error)
}
fun newNotice(
url: String,
notice: String?,
) {
get(url).newNotice(notice)
}
fun setPing(
url: String,
pingInMs: Long,
) {
get(url).pingInMs = pingInMs
}
fun newSpam(
url: String,
explanation: String,
) {
get(url).newSpam(explanation)
}
}
class RelayStat(
var receivedBytes: Long = 0L,
var sentBytes: Long = 0L,
var spamCounter: Long = 0L,
var errorCounter: Long = 0L,
var pingInMs: Long = 0L,
) {
val messages = LruCache<RelayDebugMessage, RelayDebugMessage>(100)
fun newNotice(notice: String?) {
val debugMessage =
RelayDebugMessage(
type = RelayDebugMessageType.NOTICE,
message = notice ?: "No error message provided",
)
messages.put(debugMessage, debugMessage)
}
fun newError(error: String?) {
errorCounter++
val debugMessage =
RelayDebugMessage(
type = RelayDebugMessageType.ERROR,
message = error ?: "No error message provided",
)
messages.put(debugMessage, debugMessage)
}
fun addBytesReceived(bytesUsedInMemory: Int) {
receivedBytes += bytesUsedInMemory
}
fun addBytesSent(bytesUsedInMemory: Int) {
sentBytes += bytesUsedInMemory
}
fun newSpam(spamDescriptor: String) {
spamCounter++
val debugMessage =
RelayDebugMessage(
type = RelayDebugMessageType.SPAM,
message = spamDescriptor,
)
messages.put(debugMessage, debugMessage)
}
}
class RelayDebugMessage(
val type: RelayDebugMessageType,
val message: String,
val time: Long = TimeUtils.now(),
)
enum class RelayDebugMessageType {
SPAM,
NOTICE,
ERROR,
}

View File

@@ -0,0 +1,70 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays
import java.util.UUID
data class Subscription(
val id: String = UUID.randomUUID().toString().substring(0, 4),
val onEOSE: ((Long, String) -> Unit)? = null,
) {
var typedFilters: List<TypedFilter>? = null // Inactive when null
fun updateEOSE(
time: Long,
relay: String,
) {
onEOSE?.let { it(time, relay) }
}
fun hasChangedFiltersFrom(otherFilters: List<TypedFilter>?): Boolean {
if (typedFilters == null && otherFilters == null) return false
if (typedFilters?.size != otherFilters?.size) return true
typedFilters?.forEachIndexed { index, typedFilter ->
val otherFilter = otherFilters?.getOrNull(index) ?: return true
// Does not check SINCE on purpose. Avoids replacing the filter if SINCE was all that changed.
// fast check
if (typedFilter.filter.authors?.size != otherFilter.filter.authors?.size ||
typedFilter.filter.ids?.size != otherFilter.filter.ids?.size ||
typedFilter.filter.tags?.size != otherFilter.filter.tags?.size ||
typedFilter.filter.kinds?.size != otherFilter.filter.kinds?.size ||
typedFilter.filter.limit != otherFilter.filter.limit ||
typedFilter.filter.search?.length != otherFilter.filter.search?.length ||
typedFilter.filter.until != otherFilter.filter.until
) {
return true
}
// deep check
if (typedFilter.filter.ids != otherFilter.filter.ids ||
typedFilter.filter.authors != otherFilter.filter.authors ||
typedFilter.filter.tags != otherFilter.filter.tags ||
typedFilter.filter.kinds != otherFilter.filter.kinds ||
typedFilter.filter.search != otherFilter.filter.search
) {
return true
}
}
return false
}
}

View File

@@ -0,0 +1,26 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.relays
class TypedFilter(
val types: Set<FeedType>,
val filter: JsonFilter,
)

View File

@@ -39,7 +39,7 @@ object HttpClientManager {
private var defaultTimeout = DEFAULT_TIMEOUT_ON_WIFI
private var defaultHttpClient: OkHttpClient? = null
private var defaultHttpClientWithoutProxy: OkHttpClient? = null
private var internalInterceptor: Interceptor = DefaultContentTypeInterceptor()
private var userAgent: String = "Amethyst"
// fires off every time value of the property changes
private var internalProxy: Proxy? by
@@ -73,12 +73,14 @@ object HttpClientManager {
}
}
fun setDefaultInterceptor(interceptor: Interceptor) {
Log.d("HttpClient", "Changing interceptor")
this.internalInterceptor = interceptor
fun setDefaultUserAgent(userAgentHeader: String) {
Log.d("HttpClient", "Changing userAgent")
this.userAgent = userAgentHeader
this.defaultHttpClient = buildHttpClient(internalProxy, defaultTimeout)
}
fun getDefaultUserAgentHeader() = this.userAgent
private fun buildHttpClient(
proxy: Proxy?,
timeout: Duration,
@@ -90,20 +92,22 @@ object HttpClientManager {
.readTimeout(duration)
.connectTimeout(duration)
.writeTimeout(duration)
.addInterceptor(internalInterceptor)
.addInterceptor(DefaultContentTypeInterceptor(userAgent))
.followRedirects(true)
.followSslRedirects(true)
.build()
}
class DefaultContentTypeInterceptor : Interceptor {
class DefaultContentTypeInterceptor(
private val userAgentHeader: String,
) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val originalRequest: Request = chain.request()
val requestWithUserAgent: Request =
originalRequest
.newBuilder()
.header("User-Agent", "Amethyst")
.header("User-Agent", userAgentHeader)
.build()
return chain.proceed(requestWithUserAgent)
}

View File

@@ -0,0 +1,34 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.ammolite.service
import android.os.Looper
import com.vitorpamplona.ammolite.BuildConfig
fun checkNotInMainThread() {
if (BuildConfig.DEBUG && isMainThread()) {
throw OnMainThreadException("It should not be in the MainThread")
}
}
fun isMainThread() = Looper.myLooper() == Looper.getMainLooper()
class OnMainThreadException(str: String) : RuntimeException(str)