From f157f8eb9bb26c6fa89346c1a9f9af0e74808832 Mon Sep 17 00:00:00 2001 From: Vitor Pamplona Date: Fri, 1 Nov 2024 18:16:57 -0400 Subject: [PATCH] - Fixes lack of zap updates when there is a private zap the user cannot decrypt or when a nwc payment fails. - Moves the parallel processing to a utils class --- .../vitorpamplona/amethyst/ParallelUtils.kt | 92 ++++++++++ .../vitorpamplona/amethyst/model/Account.kt | 37 ++-- .../com/vitorpamplona/amethyst/model/Note.kt | 160 ++++++++++++------ .../amethyst/service/Nip96Uploader.kt | 9 +- .../amethyst/service/ZapPaymentHandler.kt | 15 +- .../service/notifications/RegisterAccounts.kt | 51 ++---- .../ui/screen/loggedIn/AccountViewModel.kt | 51 +----- 7 files changed, 246 insertions(+), 169 deletions(-) create mode 100644 amethyst/src/main/java/com/vitorpamplona/amethyst/ParallelUtils.kt diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/ParallelUtils.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/ParallelUtils.kt new file mode 100644 index 000000000..629b93706 --- /dev/null +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/ParallelUtils.kt @@ -0,0 +1,92 @@ +/** + * Copyright (c) 2024 Vitor Pamplona + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the + * Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN + * AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +package com.vitorpamplona.amethyst + +import kotlinx.coroutines.CancellableContinuation +import kotlinx.coroutines.async +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.joinAll +import kotlinx.coroutines.suspendCancellableCoroutine +import kotlinx.coroutines.withTimeoutOrNull +import kotlin.coroutines.resume + +/** + * Launches an async coroutine for each item, runs the + * function and waits for everybody to finsih + */ +suspend fun launchAndWaitAll( + items: List, + asyncFunc: suspend (T) -> Unit, +) { + coroutineScope { + val jobs = + items.map { next -> + async { + asyncFunc(next) + } + } + + // runs in parallel to avoid overcrowding Amber. + withTimeoutOrNull(15000) { + jobs.joinAll() + } + } +} + +/** + * Runs the function and waits for 10 seconds for any result. + */ +suspend inline fun tryAndWait(crossinline asyncFunc: (CancellableContinuation) -> Unit): T? = + withTimeoutOrNull(10000) { + suspendCancellableCoroutine { continuation -> + asyncFunc(continuation) + } + } + +/** + * Runs an async coroutine for each one of the items, + * runs the request for that item, + * and gathers all the results in the output map. + */ +suspend fun collectSuccessfulOperations( + items: List, + runRequestFor: (T, (K) -> Unit) -> Unit, + output: MutableMap = mutableMapOf(), + onReady: suspend (MutableMap) -> Unit, +) { + if (items.isEmpty()) { + onReady(output) + return + } + + launchAndWaitAll(items) { + val result = + tryAndWait { continuation -> + runRequestFor(it) { result: K -> continuation.resume(result) } + } + + if (result != null) { + output[it] = result + } + } + + onReady(output) +} diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/model/Account.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/model/Account.kt index 65fca25f6..46b5b013c 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/model/Account.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/model/Account.kt @@ -35,6 +35,7 @@ import com.vitorpamplona.amethyst.BuildConfig import com.vitorpamplona.amethyst.service.FileHeader import com.vitorpamplona.amethyst.service.NostrLnZapPaymentResponseDataSource import com.vitorpamplona.amethyst.service.checkNotInMainThread +import com.vitorpamplona.amethyst.tryAndWait import com.vitorpamplona.amethyst.ui.tor.TorType import com.vitorpamplona.ammolite.relays.Client import com.vitorpamplona.ammolite.relays.Constants @@ -129,8 +130,6 @@ import kotlinx.coroutines.flow.transformLatest import kotlinx.coroutines.flow.update import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking -import kotlinx.coroutines.suspendCancellableCoroutine -import kotlinx.coroutines.withTimeoutOrNull import java.math.BigDecimal import java.util.Locale import java.util.UUID @@ -897,11 +896,9 @@ class Account( } suspend fun waitToDecrypt(peopleListFollows: GeneralListEvent): LiveFollowList? = - withTimeoutOrNull(1000) { - suspendCancellableCoroutine { continuation -> - decryptLiveFollows(peopleListFollows) { - continuation.resume(it) - } + tryAndWait { continuation -> + decryptLiveFollows(peopleListFollows) { + continuation.resume(it) } } @@ -921,11 +918,9 @@ class Account( suspend fun decryptPeopleList(event: PeopleListEvent?): PeopleListEvent.UsersAndWords { if (event == null) return PeopleListEvent.UsersAndWords() - return withTimeoutOrNull(1000) { - suspendCancellableCoroutine { continuation -> - event.publicAndPrivateUsersAndWords(signer) { - continuation.resume(it) - } + return tryAndWait { continuation -> + event.publicAndPrivateUsersAndWords(signer) { + continuation.resume(it) } } ?: PeopleListEvent.UsersAndWords() } @@ -933,11 +928,9 @@ class Account( suspend fun decryptMuteList(event: MuteListEvent?): PeopleListEvent.UsersAndWords { if (event == null) return PeopleListEvent.UsersAndWords() - return withTimeoutOrNull(1000) { - suspendCancellableCoroutine { continuation -> - event.publicAndPrivateUsersAndWords(signer) { - continuation.resume(it) - } + return tryAndWait { continuation -> + event.publicAndPrivateUsersAndWords(signer) { + continuation.resume(it) } } ?: PeopleListEvent.UsersAndWords() } @@ -992,11 +985,9 @@ class Account( emit(null) } else { emit( - withTimeoutOrNull(1000) { - suspendCancellableCoroutine { continuation -> - userState.user.latestBookmarkList?.privateTags(signer) { - continuation.resume(userState.user.latestBookmarkList) - } + tryAndWait { continuation -> + userState.user.latestBookmarkList?.privateTags(signer) { + continuation.resume(userState.user.latestBookmarkList) } }, ) @@ -1337,7 +1328,7 @@ class Account( zappedNote?.isZappedBy(userProfile(), this, onWasZapped) } - fun calculateZappedAmount( + suspend fun calculateZappedAmount( zappedNote: Note?, onReady: (BigDecimal) -> Unit, ) { diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/model/Note.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/model/Note.kt index bb88bf0b1..135316dc5 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/model/Note.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/model/Note.kt @@ -25,9 +25,11 @@ import androidx.compose.runtime.Stable import androidx.lifecycle.LiveData import androidx.lifecycle.MediatorLiveData import androidx.lifecycle.distinctUntilChanged +import com.vitorpamplona.amethyst.launchAndWaitAll import com.vitorpamplona.amethyst.service.NostrSingleEventDataSource import com.vitorpamplona.amethyst.service.checkNotInMainThread import com.vitorpamplona.amethyst.service.firstFullCharOrEmoji +import com.vitorpamplona.amethyst.tryAndWait import com.vitorpamplona.amethyst.ui.actions.relays.updated import com.vitorpamplona.amethyst.ui.note.combineWith import com.vitorpamplona.amethyst.ui.note.toShortenHex @@ -65,13 +67,12 @@ import com.vitorpamplona.quartz.events.WrappedEvent import com.vitorpamplona.quartz.signers.NostrSigner import com.vitorpamplona.quartz.utils.TimeUtils import com.vitorpamplona.quartz.utils.containsAny +import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.CancellationException import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.flatMapLatest -import kotlinx.coroutines.suspendCancellableCoroutine -import kotlinx.coroutines.withTimeoutOrNull import java.math.BigDecimal import kotlin.coroutines.resume @@ -423,31 +424,36 @@ open class Note( } } - private fun recursiveIsPaidByCalculation( + private suspend fun isPaidByCalculation( account: Account, - remainingZapPayments: List>, + zapEvents: List>, onWasZappedByAuthor: () -> Unit, ) { - if (remainingZapPayments.isEmpty()) { + if (zapEvents.isEmpty()) { return } - val next = remainingZapPayments.first() + var hasSentOne = false - val zapResponseEvent = next.second?.event as? LnZapPaymentResponseEvent - if (zapResponseEvent != null) { - account.decryptZapPaymentResponseEvent(zapResponseEvent) { response -> - if ( - response is PayInvoiceSuccessResponse && - account.isNIP47Author(zapResponseEvent.requestAuthor()) - ) { + launchAndWaitAll(zapEvents) { next -> + val zapResponseEvent = next.second?.event as? LnZapPaymentResponseEvent + + if (zapResponseEvent != null) { + val result = + tryAndWait { continuation -> + account.decryptZapPaymentResponseEvent(zapResponseEvent) { response -> + if ( + response is PayInvoiceSuccessResponse && + account.isNIP47Author(zapResponseEvent.requestAuthor()) + ) { + continuation.resume(true) + } + } + } + + if (!hasSentOne && result == true) { + hasSentOne = true onWasZappedByAuthor() - } else { - recursiveIsPaidByCalculation( - account, - remainingZapPayments.minus(next), - onWasZappedByAuthor, - ) } } } @@ -457,14 +463,14 @@ open class Note( option: Int?, user: User, account: Account, - remainingZapEvents: Map, + zapEvents: Map, onWasZappedByAuthor: () -> Unit, ) { - if (remainingZapEvents.isEmpty()) { + if (zapEvents.isEmpty()) { return } - remainingZapEvents.forEach { next -> + zapEvents.forEach { next -> val zapRequest = next.key.event as LnZapRequestEvent val zapEvent = next.value?.event as? LnZapEvent @@ -487,11 +493,9 @@ open class Note( } else { if (account.isWriteable()) { val result = - withTimeoutOrNull(1000) { - suspendCancellableCoroutine { continuation -> - zapRequest.decryptPrivateZap(account.signer) { - continuation.resume(it) - } + tryAndWait { continuation -> + zapRequest.decryptPrivateZap(account.signer) { + continuation.resume(it) } } @@ -512,7 +516,7 @@ open class Note( ) { isZappedByCalculation(null, user, account, zaps, onWasZappedByAuthor) if (account.userProfile() == user) { - recursiveIsPaidByCalculation(account, zapPayments.toList(), onWasZappedByAuthor) + isPaidByCalculation(account, zapPayments.toList(), onWasZappedByAuthor) } } @@ -564,23 +568,79 @@ open class Note( zapsAmount = sumOfAmounts } - private fun recursiveZappedAmountCalculation( - invoiceSet: LinkedHashSet, - remainingZapPayments: List>, + private suspend fun zappedAmountCalculation( + startAmount: BigDecimal, + paidInvoiceSet: LinkedHashSet, + zapPayments: List>, signer: NostrSigner, - output: BigDecimal, onReady: (BigDecimal) -> Unit, ) { - if (remainingZapPayments.isEmpty()) { - onReady(output) + if (zapPayments.isEmpty()) { + onReady(startAmount) return } - val next = remainingZapPayments.first() + var output: BigDecimal = startAmount - (next.second?.event as? LnZapPaymentResponseEvent)?.response(signer) { noteEvent -> + launchAndWaitAll(zapPayments) { next -> + val result = + tryAndWait { continuation -> + processZapAmountFromResponse( + next.first, + next.second, + continuation, + signer, + ) + } + + if (result != null && !paidInvoiceSet.contains(result.invoice)) { + paidInvoiceSet.add(result.invoice) + output = output.add(result.amount) + } + } + + onReady(output) + } + + private fun processZapAmountFromResponse( + paymentRequest: Note, + paymentResponse: Note?, + continuation: CancellableContinuation, + signer: NostrSigner, + ) { + val nwcRequest = paymentRequest.event as? LnZapPaymentRequestEvent + val nwcResponse = paymentResponse?.event as? LnZapPaymentResponseEvent + + if (nwcRequest != null && nwcResponse != null) { + processZapAmountFromResponse( + nwcRequest, + nwcResponse, + continuation, + signer, + ) + } else { + continuation.resume(null) + } + } + + class InvoiceAmount( + val invoice: String, + val amount: BigDecimal, + ) + + private fun processZapAmountFromResponse( + nwcRequest: LnZapPaymentRequestEvent, + nwcResponse: LnZapPaymentResponseEvent, + continuation: CancellableContinuation, + signer: NostrSigner, + ) { + // if we can decrypt the reply + nwcResponse.response(signer) { noteEvent -> + // if it is a sucess if (noteEvent is PayInvoiceSuccessResponse) { - (next.first.event as? LnZapPaymentRequestEvent)?.lnInvoice(signer) { invoice -> + // if we can decrypt the invoice + nwcRequest.lnInvoice(signer) { invoice -> + // if we can parse the amount val amount = try { LnInvoiceUtil.getAmountInSats(invoice) @@ -589,26 +649,20 @@ open class Note( null } - var newAmount = output - - if (amount != null && !invoiceSet.contains(invoice)) { - invoiceSet.add(invoice) - newAmount += amount + // avoid double counting + if (amount != null) { + continuation.resume(InvoiceAmount(invoice, amount)) + } else { + continuation.resume(null) } - - recursiveZappedAmountCalculation( - invoiceSet, - remainingZapPayments.minus(next), - signer, - newAmount, - onReady, - ) } + } else { + continuation.resume(null) } } } - fun zappedAmountWithNWCPayments( + suspend fun zappedAmountWithNWCPayments( signer: NostrSigner, onReady: (BigDecimal) -> Unit, ) { @@ -619,11 +673,11 @@ open class Note( val invoiceSet = LinkedHashSet(zaps.size + zapPayments.size) zaps.forEach { (it.value?.event as? LnZapEvent)?.lnInvoice()?.let { invoiceSet.add(it) } } - recursiveZappedAmountCalculation( + zappedAmountCalculation( + zapsAmount, invoiceSet, zapPayments.toList(), signer, - zapsAmount, onReady, ) } diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/Nip96Uploader.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/Nip96Uploader.kt index 4db170ef7..45a220a6f 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/Nip96Uploader.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/Nip96Uploader.kt @@ -32,11 +32,10 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.vitorpamplona.amethyst.BuildConfig import com.vitorpamplona.amethyst.R import com.vitorpamplona.amethyst.model.Account +import com.vitorpamplona.amethyst.tryAndWait import com.vitorpamplona.amethyst.ui.stringRes import com.vitorpamplona.ammolite.service.HttpClientManager import kotlinx.coroutines.delay -import kotlinx.coroutines.suspendCancellableCoroutine -import kotlinx.coroutines.withTimeoutOrNull import okhttp3.MediaType.Companion.toMediaType import okhttp3.MultipartBody import okhttp3.Request @@ -304,10 +303,8 @@ class Nip96Uploader( } suspend fun nip98Header(url: String): String? = - withTimeoutOrNull(5000) { - suspendCancellableCoroutine { continuation -> - nip98Header(url, "POST") { authorizationToken -> continuation.resume(authorizationToken) } - } + tryAndWait { continuation -> + nip98Header(url, "POST") { authorizationToken -> continuation.resume(authorizationToken) } } fun nip98Header( diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/ZapPaymentHandler.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/ZapPaymentHandler.kt index c85bdca61..7cf1b585b 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/ZapPaymentHandler.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/ZapPaymentHandler.kt @@ -23,13 +23,12 @@ package com.vitorpamplona.amethyst.service import android.content.Context import androidx.compose.runtime.Immutable import com.vitorpamplona.amethyst.R +import com.vitorpamplona.amethyst.collectSuccessfulOperations import com.vitorpamplona.amethyst.model.Account import com.vitorpamplona.amethyst.model.LocalCache import com.vitorpamplona.amethyst.model.Note import com.vitorpamplona.amethyst.model.User -import com.vitorpamplona.amethyst.service.NostrUserProfileDataSource.user import com.vitorpamplona.amethyst.service.lnurl.LightningAddressResolver -import com.vitorpamplona.amethyst.ui.screen.loggedIn.collectSuccessfulSigningOperations import com.vitorpamplona.amethyst.ui.stringRes import com.vitorpamplona.quartz.events.AdvertisedRelayListEvent import com.vitorpamplona.quartz.events.AppDefinitionEvent @@ -195,8 +194,8 @@ class ZapPaymentHandler( )?.readRelays() }?.toSet() - collectSuccessfulSigningOperations( - operationsInput = zapsToSend, + collectSuccessfulOperations( + items = zapsToSend, runRequestFor = { next: ZapSplitSetup, onReady -> if (next.isLnAddress) { prepareZapRequestIfNeeded(note, pollOption, message, zapType) { zapRequestJson -> @@ -239,8 +238,8 @@ class ZapPaymentHandler( var progressAllPayments = 0.00f val totalWeight = invoices.sumOf { it.first.weight } - collectSuccessfulSigningOperations, AssembleInvoiceReturn>( - operationsInput = invoices, + collectSuccessfulOperations, AssembleInvoiceReturn>( + items = invoices, runRequestFor = { splitZapRequestPair: Pair, onReady -> assembleInvoice( splitSetup = splitZapRequestPair.first, @@ -272,8 +271,8 @@ class ZapPaymentHandler( ) { var progressAllPayments = 0.00f - collectSuccessfulSigningOperations( - operationsInput = invoices, + collectSuccessfulOperations( + items = invoices, runRequestFor = { invoice: String, onReady -> account.sendZapPaymentRequestFor( bolt11 = invoice, diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/notifications/RegisterAccounts.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/notifications/RegisterAccounts.kt index 1084cc338..34579cf64 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/service/notifications/RegisterAccounts.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/service/notifications/RegisterAccounts.kt @@ -25,18 +25,15 @@ import com.vitorpamplona.amethyst.AccountInfo import com.vitorpamplona.amethyst.Amethyst import com.vitorpamplona.amethyst.BuildConfig import com.vitorpamplona.amethyst.LocalPreferences +import com.vitorpamplona.amethyst.launchAndWaitAll import com.vitorpamplona.amethyst.model.AccountSettings +import com.vitorpamplona.amethyst.tryAndWait import com.vitorpamplona.ammolite.service.HttpClientManager import com.vitorpamplona.quartz.events.RelayAuthEvent import com.vitorpamplona.quartz.signers.NostrSignerExternal import kotlinx.coroutines.CancellationException import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.async -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.joinAll -import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withContext -import kotlinx.coroutines.withTimeoutOrNull import okhttp3.MediaType.Companion.toMediaType import okhttp3.Request import okhttp3.RequestBody.Companion.toRequestBody @@ -63,38 +60,26 @@ class RegisterAccounts( return } - coroutineScope { - val jobs = - remainingTos.map { accountRelayPair -> - async { - val result = - withTimeoutOrNull(10000) { - suspendCancellableCoroutine { continuation -> - val signer = accountRelayPair.first.createSigner() - // TODO: Modify the external launcher to launch as different users. - // Right now it only registers if Amber has already approved this signature - if (signer is NostrSignerExternal) { - signer.launcher.registerLauncher( - launcher = { }, - contentResolver = Amethyst.instance::contentResolverFn, - ) - } + launchAndWaitAll(remainingTos) { accountRelayPair -> + val result = + tryAndWait { continuation -> + val signer = accountRelayPair.first.createSigner() + // TODO: Modify the external launcher to launch as different users. + // Right now it only registers if Amber has already approved this signature + if (signer is NostrSignerExternal) { + signer.launcher.registerLauncher( + launcher = { }, + contentResolver = Amethyst.instance::contentResolverFn, + ) + } - RelayAuthEvent.create(accountRelayPair.second, notificationToken, signer) { result -> - continuation.resume(result) - } - } - } - - if (result != null) { - output.add(result) - } + RelayAuthEvent.create(accountRelayPair.second, notificationToken, signer) { result -> + continuation.resume(result) } } - // runs in parallel to avoid overcrowding Amber. - withTimeoutOrNull(15000) { - jobs.joinAll() + if (result != null) { + output.add(result) } } diff --git a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/AccountViewModel.kt b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/AccountViewModel.kt index bdc3f23b7..f7abdb54e 100644 --- a/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/AccountViewModel.kt +++ b/amethyst/src/main/java/com/vitorpamplona/amethyst/ui/screen/loggedIn/AccountViewModel.kt @@ -36,6 +36,7 @@ import coil3.imageLoader import coil3.request.ImageRequest import com.vitorpamplona.amethyst.Amethyst import com.vitorpamplona.amethyst.R +import com.vitorpamplona.amethyst.collectSuccessfulOperations import com.vitorpamplona.amethyst.commons.compose.GenericBaseCache import com.vitorpamplona.amethyst.commons.compose.GenericBaseCacheAsync import com.vitorpamplona.amethyst.model.Account @@ -108,9 +109,7 @@ import kotlinx.coroutines.CancellationException import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job -import kotlinx.coroutines.async import kotlinx.coroutines.channels.BufferOverflow -import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.SharingStarted @@ -121,12 +120,8 @@ import kotlinx.coroutines.flow.flatMapLatest import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.stateIn -import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch -import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withContext -import kotlinx.coroutines.withTimeoutOrNull -import kotlin.coroutines.resume @Immutable open class ToastMsg @@ -524,8 +519,8 @@ class AccountViewModel( ) }.toMutableMap() - collectSuccessfulSigningOperations( - operationsInput = zaps.filter { (it.request.event as? LnZapRequestEvent)?.isPrivateZap() == true }, + collectSuccessfulOperations( + items = zaps.filter { (it.request.event as? LnZapRequestEvent)?.isPrivateZap() == true }, runRequestFor = { next, onReady -> checkNotInMainThread() @@ -628,8 +623,8 @@ class AccountViewModel( ) }.toMutableMap() - collectSuccessfulSigningOperations, ZapAmountCommentNotification>( - operationsInput = myList, + collectSuccessfulOperations, ZapAmountCommentNotification>( + items = myList, runRequestFor = { next, onReady -> innerDecryptAmountMessage(next.first, next.second, onReady) }, @@ -1679,42 +1674,6 @@ class AccountViewModel( val nip19: Nip19Bech32.ParseReturn, ) -public suspend fun collectSuccessfulSigningOperations( - operationsInput: List, - runRequestFor: (T, (K) -> Unit) -> Unit, - output: MutableMap = mutableMapOf(), - onReady: suspend (MutableMap) -> Unit, -) { - if (operationsInput.isEmpty()) { - onReady(output) - return - } - - coroutineScope { - val jobs = - operationsInput.map { - async { - val result = - withTimeoutOrNull(10000) { - suspendCancellableCoroutine { continuation -> - runRequestFor(it) { result: K -> continuation.resume(result) } - } - } - if (result != null) { - output[it] = result - } - } - } - - // runs in parallel to avoid overcrowding Amber. - withTimeoutOrNull(15000) { - jobs.joinAll() - } - } - - onReady(output) -} - @Composable fun mockAccountViewModel(): AccountViewModel { val sharedPreferencesViewModel: SharedPreferencesViewModel = viewModel()