Makes lists either immutable or puts them into a synchronized block for thread safety

This commit is contained in:
Vitor Pamplona
2023-01-17 08:42:00 -05:00
parent 2037b35af6
commit 0667a822f1
17 changed files with 116 additions and 86 deletions

View File

@@ -26,6 +26,7 @@ Amethyst brings the best social network to your Android phone. Just insert your
- [ ] Profile Edit - [ ] Profile Edit
- [ ] Relay Edit - [ ] Relay Edit
- [ ] Account Creation / Backup Guidance - [ ] Account Creation / Backup Guidance
- [ ] Message Sent feedback
# Development Overview # Development Overview

View File

@@ -67,9 +67,12 @@ class User(val pubkey: ByteArray) {
} }
fun updateFollows(newFollows: List<User>, updateAt: Long) { fun updateFollows(newFollows: List<User>, updateAt: Long) {
val toBeAdded = newFollows - follows val toBeAdded = synchronized(follows) {
val toBeRemoved = follows - newFollows newFollows - follows
}
val toBeRemoved = synchronized(follows) {
follows - newFollows
}
toBeAdded.forEach { toBeAdded.forEach {
follow(it) follow(it)
} }
@@ -89,6 +92,12 @@ class User(val pubkey: ByteArray) {
live.refresh() live.refresh()
} }
fun isFollowing(user: User): Boolean {
return synchronized(follows) {
follows.contains(user)
}
}
// Observers line up here. // Observers line up here.
val live: UserLiveData = UserLiveData(this) val live: UserLiveData = UserLiveData(this)

View File

@@ -57,10 +57,15 @@ object NostrAccountDataSource: NostrDataSource<Note>("AccountData") {
override fun feed(): List<Note> { override fun feed(): List<Note> {
val user = account.userProfile() val user = account.userProfile()
val follows = user.follows.map { it.pubkeyHex }.plus(user.pubkeyHex).toSet()
val follows = user.follows
val followKeys = synchronized(follows) {
follows.map { it.pubkeyHex }
}
val allowSet = followKeys.plus(user.pubkeyHex).toSet()
return LocalCache.notes.values return LocalCache.notes.values
.filter { (it.event is TextNoteEvent || it.event is RepostEvent) && it.author?.pubkeyHex in follows } .filter { (it.event is TextNoteEvent || it.event is RepostEvent) && it.author?.pubkeyHex in allowSet }
.sortedBy { it.event!!.createdAt } .sortedBy { it.event!!.createdAt }
.reversed() .reversed()
} }

View File

@@ -50,14 +50,14 @@ object NostrChatroomListDataSource: NostrDataSource<Note>("MailBoxFeed") {
val messagingWith = messages.keys().toList() val messagingWith = messages.keys().toList()
val privateMessages = messagingWith.mapNotNull { val privateMessages = messagingWith.mapNotNull {
messages[it]?.sortedBy { it.event?.createdAt }?.last { it.event != null } messages[it]?.sortedBy { it.event?.createdAt }?.lastOrNull { it.event != null }
} }
val publicChannels = account.followingChannels().map { val publicChannels = account.followingChannels().map {
it.notes.values.sortedBy { it.event?.createdAt }.last { it.event != null } it.notes.values.sortedBy { it.event?.createdAt }.lastOrNull { it.event != null }
} }
return (privateMessages + publicChannels).sortedBy { it.event?.createdAt }.reversed() return (privateMessages + publicChannels).filterNotNull().sortedBy { it.event?.createdAt }.reversed()
} }
override fun updateChannelFilters() { override fun updateChannelFilters() {

View File

@@ -29,17 +29,21 @@ object NostrHomeDataSource: NostrDataSource<Note>("HomeFeed") {
} }
fun createFollowAccountsFilter(): JsonFilter? { fun createFollowAccountsFilter(): JsonFilter? {
val follows = listOf(account.userProfile().pubkeyHex.substring(0, 6)).plus( val follows = account.userProfile().follows ?: emptySet()
account.userProfile().follows?.map {
it.pubkey.toHex().substring(0, 6)
} ?: emptyList()
)
if (follows.isEmpty()) return null val followKeys = synchronized(follows) {
follows.map {
it.pubkey.toHex().substring(0, 6)
}
}
val followSet = followKeys.plus(account.userProfile().pubkeyHex.substring(0, 6))
if (followSet.isEmpty()) return null
return JsonFilter( return JsonFilter(
kinds = listOf(TextNoteEvent.kind, RepostEvent.kind), kinds = listOf(TextNoteEvent.kind, RepostEvent.kind),
authors = follows, authors = followSet,
since = System.currentTimeMillis() / 1000 - (60 * 60 * 24 * 1), // 24 hours since = System.currentTimeMillis() / 1000 - (60 * 60 * 24 * 1), // 24 hours
) )
} }
@@ -64,10 +68,16 @@ object NostrHomeDataSource: NostrDataSource<Note>("HomeFeed") {
override fun feed(): List<Note> { override fun feed(): List<Note> {
val user = account.userProfile() val user = account.userProfile()
val follows = user.follows.map { it.pubkeyHex }.plus(user.pubkeyHex).toSet()
val follows = user.follows
val followKeys = synchronized(follows) {
follows.map { it.pubkeyHex }
}
val allowSet = followKeys.plus(user.pubkeyHex).toSet()
return LocalCache.notes.values return LocalCache.notes.values
.filter { (it.event is TextNoteEvent || it.event is RepostEvent) && it.author?.pubkeyHex in follows } .filter { (it.event is TextNoteEvent || it.event is RepostEvent) && it.author?.pubkeyHex in allowSet }
.sortedBy { it.event!!.createdAt } .sortedBy { it.event!!.createdAt }
.reversed() .reversed()
} }

View File

@@ -32,10 +32,12 @@ object NostrNotificationDataSource: NostrDataSource<Note>("GlobalFeed") {
} }
override fun feed(): List<Note> { override fun feed(): List<Note> {
return account.userProfile().taggedPosts val set = account.userProfile().taggedPosts
.filter { it.event != null } val filtered = synchronized(set) {
.sortedBy { it.event!!.createdAt } set.filter { it.event != null }
.reversed() }
return filtered.sortedBy { it.event!!.createdAt }.reversed()
} }
override fun updateChannelFilters() { override fun updateChannelFilters() {

View File

@@ -9,9 +9,9 @@ import nostr.postr.JsonFilter
import nostr.postr.events.TextNoteEvent import nostr.postr.events.TextNoteEvent
object NostrSingleEventDataSource: NostrDataSource<Note>("SingleEventFeed") { object NostrSingleEventDataSource: NostrDataSource<Note>("SingleEventFeed") {
val eventsToWatch = Collections.synchronizedList(mutableListOf<String>()) private var eventsToWatch = listOf<String>()
fun createRepliesAndReactionsFilter(): JsonFilter? { private fun createRepliesAndReactionsFilter(): JsonFilter? {
val reactionsToWatch = eventsToWatch.map { it.substring(0, 8) } val reactionsToWatch = eventsToWatch.map { it.substring(0, 8) }
if (reactionsToWatch.isEmpty()) { if (reactionsToWatch.isEmpty()) {
@@ -65,12 +65,12 @@ object NostrSingleEventDataSource: NostrDataSource<Note>("SingleEventFeed") {
} }
fun add(eventId: String) { fun add(eventId: String) {
eventsToWatch.add(eventId) eventsToWatch = eventsToWatch.plus(eventId)
resetFilters() resetFilters()
} }
fun remove(eventId: String) { fun remove(eventId: String) {
eventsToWatch.remove(eventId) eventsToWatch = eventsToWatch.minus(eventId)
resetFilters() resetFilters()
} }
} }

View File

@@ -7,7 +7,7 @@ import nostr.postr.JsonFilter
import nostr.postr.events.MetadataEvent import nostr.postr.events.MetadataEvent
object NostrSingleUserDataSource: NostrDataSource<Note>("SingleUserFeed") { object NostrSingleUserDataSource: NostrDataSource<Note>("SingleUserFeed") {
val usersToWatch = Collections.synchronizedList(mutableListOf<String>()) var usersToWatch = listOf<String>()
fun createUserFilter(): JsonFilter? { fun createUserFilter(): JsonFilter? {
if (usersToWatch.isEmpty()) return null if (usersToWatch.isEmpty()) return null
@@ -31,12 +31,12 @@ object NostrSingleUserDataSource: NostrDataSource<Note>("SingleUserFeed") {
} }
fun add(userId: String) { fun add(userId: String) {
usersToWatch.add(userId) usersToWatch = usersToWatch.plus(userId)
resetFilters() resetFilters()
} }
fun remove(userId: String) { fun remove(userId: String) {
usersToWatch.remove(userId) usersToWatch = usersToWatch.minus(userId)
resetFilters() resetFilters()
} }
} }

View File

@@ -35,7 +35,11 @@ object NostrUserProfileDataSource: NostrDataSource<Note>("UserProfileFeed") {
val notesChannel = requestNewChannel() val notesChannel = requestNewChannel()
override fun feed(): List<Note> { override fun feed(): List<Note> {
return user?.notes?.sortedBy { it.event?.createdAt }?.reversed() ?: emptyList() val notes = user?.notes ?: return emptyList()
val sortedNotes = synchronized(notes) {
notes.sortedBy { it.event?.createdAt }
}
return sortedNotes.reversed()
} }
override fun updateChannelFilters() { override fun updateChannelFilters() {

View File

@@ -22,7 +22,11 @@ object NostrUserProfileFollowersDataSource: NostrDataSource<User>("UserProfileFo
val followerChannel = requestNewChannel() val followerChannel = requestNewChannel()
override fun feed(): List<User> { override fun feed(): List<User> {
return user?.followers?.toList() ?: emptyList() val followers = user?.followers ?: emptyList()
return synchronized(followers) {
followers.toList()
}
} }
override fun updateChannelFilters() { override fun updateChannelFilters() {

View File

@@ -21,7 +21,7 @@ class ChannelMuteUserEvent (
} }
companion object { companion object {
const val kind = 43 const val kind = 44
fun create(reason: String, usersToMute: List<String>?, privateKey: ByteArray, createdAt: Long = Date().time / 1000): ChannelMuteUserEvent { fun create(reason: String, usersToMute: List<String>?, privateKey: ByteArray, createdAt: Long = Date().time / 1000): ChannelMuteUserEvent {
val content = reason val content = reason

View File

@@ -23,9 +23,9 @@ object Client: RelayPool.Listener {
* something. * something.
**/ **/
var lenient: Boolean = false var lenient: Boolean = false
private val listeners = Collections.synchronizedSet(HashSet<Listener>()) private var listeners = setOf<Listener>()
internal var relays = Constants.defaultRelays private var relays = Constants.defaultRelays
internal val subscriptions = ConcurrentHashMap<String, MutableList<JsonFilter>>() private val subscriptions = mutableMapOf<String, List<JsonFilter>>()
fun connect( fun connect(
relays: Array<Relay> = Constants.defaultRelays relays: Array<Relay> = Constants.defaultRelays
@@ -35,17 +35,9 @@ object Client: RelayPool.Listener {
this.relays = relays this.relays = relays
} }
fun requestAndWatch(
subscriptionId: String = UUID.randomUUID().toString().substring(0..10),
filters: MutableList<JsonFilter> = mutableListOf(JsonFilter())
) {
subscriptions[subscriptionId] = filters
RelayPool.requestAndWatch()
}
fun sendFilter( fun sendFilter(
subscriptionId: String = UUID.randomUUID().toString().substring(0..10), subscriptionId: String = UUID.randomUUID().toString().substring(0..10),
filters: MutableList<JsonFilter> = mutableListOf(JsonFilter()) filters: List<JsonFilter> = listOf(JsonFilter())
) { ) {
subscriptions[subscriptionId] = filters subscriptions[subscriptionId] = filters
RelayPool.sendFilter(subscriptionId) RelayPool.sendFilter(subscriptionId)
@@ -53,10 +45,10 @@ object Client: RelayPool.Listener {
fun sendFilterOnlyIfDisconnected( fun sendFilterOnlyIfDisconnected(
subscriptionId: String = UUID.randomUUID().toString().substring(0..10), subscriptionId: String = UUID.randomUUID().toString().substring(0..10),
filters: MutableList<JsonFilter> = mutableListOf(JsonFilter()) filters: List<JsonFilter> = listOf(JsonFilter())
) { ) {
subscriptions[subscriptionId] = filters subscriptions[subscriptionId] = filters
RelayPool.sendFilterOnlyIfDisconnected(subscriptionId) RelayPool.sendFilterOnlyIfDisconnected()
} }
fun send(signedEvent: Event) { fun send(signedEvent: Event) {
@@ -90,13 +82,22 @@ object Client: RelayPool.Listener {
} }
fun subscribe(listener: Listener) { fun subscribe(listener: Listener) {
listeners.add(listener) listeners = listeners.plus(listener)
} }
fun unsubscribe(listener: Listener): Boolean { fun unsubscribe(listener: Listener) {
return listeners.remove(listener) listeners = listeners.minus(listener)
} }
fun allSubscriptions(): List<String> {
return synchronized(subscriptions) {
subscriptions.keys.toList()
}
}
fun getSubscriptionFilters(subId: String): List<JsonFilter> {
return subscriptions[subId] ?: emptyList()
}
abstract class Listener { abstract class Listener {
/** /**

View File

@@ -2,7 +2,6 @@ package com.vitorpamplona.amethyst.service.relays
import com.google.gson.JsonElement import com.google.gson.JsonElement
import java.util.Collections import java.util.Collections
import nostr.postr.JsonFilter
import nostr.postr.events.Event import nostr.postr.events.Event
import okhttp3.OkHttpClient import okhttp3.OkHttpClient
import okhttp3.Request import okhttp3.Request
@@ -16,27 +15,29 @@ class Relay(
var write: Boolean = true var write: Boolean = true
) { ) {
private val httpClient = OkHttpClient() private val httpClient = OkHttpClient()
private val listeners = Collections.synchronizedSet(HashSet<Listener>()) private var listeners = setOf<Listener>()
private var socket: WebSocket? = null private var socket: WebSocket? = null
fun register(listener: Listener) { fun register(listener: Listener) {
listeners.add(listener) listeners = listeners.plus(listener)
}
fun unregister(listener: Listener) {
listeners = listeners.minus(listener)
} }
fun isConnected(): Boolean { fun isConnected(): Boolean {
return socket != null return socket != null
} }
fun unregister(listener: Listener) = listeners.remove(listener) fun requestAndWatch() {
fun requestAndWatch(reconnectTs: Long? = null) {
val request = Request.Builder().url(url).build() val request = Request.Builder().url(url).build()
val listener = object : WebSocketListener() { val listener = object : WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) { override fun onOpen(webSocket: WebSocket, response: Response) {
// Sends everything. // Sends everything.
Client.subscriptions.forEach { Client.allSubscriptions().forEach {
sendFilter(requestId = it.key, reconnectTs = reconnectTs) sendFilter(requestId = it)
} }
listeners.forEach { it.onRelayStateChange(this@Relay, Type.CONNECT) } listeners.forEach { it.onRelayStateChange(this@Relay, Type.CONNECT) }
} }
@@ -101,28 +102,22 @@ class Relay(
socket?.close(1000, "Normal close") socket?.close(1000, "Normal close")
} }
fun sendFilter(requestId: String, reconnectTs: Long? = null) { fun sendFilter(requestId: String) {
if (socket == null) { if (socket == null) {
requestAndWatch(reconnectTs) requestAndWatch()
} else { } else {
val filters = if (reconnectTs != null) { val filters = Client.getSubscriptionFilters(requestId)
Client.subscriptions[requestId]?.let { if (filters.isNotEmpty()) {
it.map { filter ->
JsonFilter(filter.ids, filter.authors, filter.kinds, filter.tags, since = reconnectTs)
}
} ?: error("No filter(s) found.")
} else {
Client.subscriptions[requestId] ?: error("No filter(s) found.")
}
val request = """["REQ","$requestId",${filters.joinToString(",") { it.toJson() }}]""" val request = """["REQ","$requestId",${filters.joinToString(",") { it.toJson() }}]"""
//println("FILTERSSENT " + """["REQ","$requestId",${filters.joinToString(",") { it.toJson() }}]""") //println("FILTERSSENT " + """["REQ","$requestId",${filters.joinToString(",") { it.toJson() }}]""")
socket!!.send(request) socket!!.send(request)
} }
} }
}
fun sendFilterOnlyIfDisconnected(requestId: String, reconnectTs: Long? = null) { fun sendFilterOnlyIfDisconnected() {
if (socket == null) { if (socket == null) {
requestAndWatch(reconnectTs) requestAndWatch()
} }
} }

View File

@@ -9,8 +9,8 @@ import nostr.postr.events.Event
* RelayPool manages the connection to multiple Relays and lets consumers deal with simple events. * RelayPool manages the connection to multiple Relays and lets consumers deal with simple events.
*/ */
object RelayPool: Relay.Listener { object RelayPool: Relay.Listener {
private val relays = Collections.synchronizedList(ArrayList<Relay>()) private var relays = listOf<Relay>()
private val listeners = Collections.synchronizedSet(HashSet<Listener>()) private var listeners = setOf<Listener>()
fun availableRelays(): Int { fun availableRelays(): Int {
return relays.size return relays.size
@@ -29,7 +29,8 @@ object RelayPool: Relay.Listener {
} }
fun unloadRelays() { fun unloadRelays() {
relays.toList().forEach { removeRelay(it) } relays.forEach { it.unregister(this) }
relays = listOf()
} }
fun requestAndWatch() { fun requestAndWatch() {
@@ -40,8 +41,8 @@ object RelayPool: Relay.Listener {
relays.forEach { it.sendFilter(subscriptionId) } relays.forEach { it.sendFilter(subscriptionId) }
} }
fun sendFilterOnlyIfDisconnected(subscriptionId: String) { fun sendFilterOnlyIfDisconnected() {
relays.forEach { it.sendFilterOnlyIfDisconnected(subscriptionId) } relays.forEach { it.sendFilterOnlyIfDisconnected() }
} }
fun send(signedEvent: Event) { fun send(signedEvent: Event) {
@@ -61,19 +62,17 @@ object RelayPool: Relay.Listener {
relays += relay relays += relay
} }
fun removeRelay(relay: Relay): Boolean { fun removeRelay(relay: Relay) {
relay.unregister(this) relay.unregister(this)
return relays.remove(relay) relays = relays.minus(relay)
} }
fun getRelays(): List<Relay> = relays
fun register(listener: Listener) { fun register(listener: Listener) {
listeners.add(listener) listeners = listeners.plus(listener)
} }
fun unregister(listener: Listener): Boolean { fun unregister(listener: Listener) {
return listeners.remove(listener) listeners = listeners.minus(listener)
} }
interface Listener { interface Listener {

View File

@@ -71,7 +71,7 @@ fun MainTopBar(scaffoldState: ScaffoldState, accountViewModel: AccountViewModel)
) { ) {
IconButton( IconButton(
onClick = { onClick = {
Client.subscriptions.map { "${it.key} ${it.value.joinToString { it.toJson() }}" }.forEach { Client.allSubscriptions().map { "${it} ${Client.getSubscriptionFilters(it).joinToString { it.toJson() }}" }.forEach {
Log.d("CURRENT FILTERS", it) Log.d("CURRENT FILTERS", it)
} }
} }

View File

@@ -64,7 +64,7 @@ fun UserCompose(baseUser: User, accountViewModel: AccountViewModel, navControlle
} }
Column(modifier = Modifier.padding(start = 10.dp)) { Column(modifier = Modifier.padding(start = 10.dp)) {
if (accountState?.account?.userProfile()?.follows?.contains(user) == true) { if (accountState?.account?.userProfile()?.isFollowing(user) == true) {
UnfollowButton { accountState?.account?.unfollow(user) } UnfollowButton { accountState?.account?.unfollow(user) }
} else { } else {
FollowButton { accountState?.account?.follow(user) } FollowButton { accountState?.account?.follow(user) }

View File

@@ -143,7 +143,7 @@ fun ProfileScreen(userId: String?, accountViewModel: AccountViewModel, navContro
if (accountUser == user) { if (accountUser == user) {
EditButton() EditButton()
} else { } else {
if (accountUser.follows?.contains(user) == true) { if (accountUser.isFollowing(user) == true) {
UnfollowButton { account.unfollow(user) } UnfollowButton { account.unfollow(user) }
} else { } else {
FollowButton { account.follow(user) } FollowButton { account.follow(user) }