- Fixes lack of zap updates when there is a private zap the user cannot decrypt or when a nwc payment fails.

- Moves the parallel processing to a utils class
This commit is contained in:
Vitor Pamplona 2024-11-01 18:16:57 -04:00
parent d96f5953d5
commit f157f8eb9b
7 changed files with 246 additions and 169 deletions

View File

@ -0,0 +1,92 @@
/**
* Copyright (c) 2024 Vitor Pamplona
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to use,
* copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
* Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN
* AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.vitorpamplona.amethyst
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withTimeoutOrNull
import kotlin.coroutines.resume
/**
* Launches an async coroutine for each item, runs the
* function and waits for everybody to finsih
*/
suspend fun <T> launchAndWaitAll(
items: List<T>,
asyncFunc: suspend (T) -> Unit,
) {
coroutineScope {
val jobs =
items.map { next ->
async {
asyncFunc(next)
}
}
// runs in parallel to avoid overcrowding Amber.
withTimeoutOrNull(15000) {
jobs.joinAll()
}
}
}
/**
* Runs the function and waits for 10 seconds for any result.
*/
suspend inline fun <T> tryAndWait(crossinline asyncFunc: (CancellableContinuation<T>) -> Unit): T? =
withTimeoutOrNull(10000) {
suspendCancellableCoroutine { continuation ->
asyncFunc(continuation)
}
}
/**
* Runs an async coroutine for each one of the items,
* runs the request for that item,
* and gathers all the results in the output map.
*/
suspend fun <T, K> collectSuccessfulOperations(
items: List<T>,
runRequestFor: (T, (K) -> Unit) -> Unit,
output: MutableMap<T, K> = mutableMapOf(),
onReady: suspend (MutableMap<T, K>) -> Unit,
) {
if (items.isEmpty()) {
onReady(output)
return
}
launchAndWaitAll(items) {
val result =
tryAndWait { continuation ->
runRequestFor(it) { result: K -> continuation.resume(result) }
}
if (result != null) {
output[it] = result
}
}
onReady(output)
}

View File

@ -35,6 +35,7 @@ import com.vitorpamplona.amethyst.BuildConfig
import com.vitorpamplona.amethyst.service.FileHeader
import com.vitorpamplona.amethyst.service.NostrLnZapPaymentResponseDataSource
import com.vitorpamplona.amethyst.service.checkNotInMainThread
import com.vitorpamplona.amethyst.tryAndWait
import com.vitorpamplona.amethyst.ui.tor.TorType
import com.vitorpamplona.ammolite.relays.Client
import com.vitorpamplona.ammolite.relays.Constants
@ -129,8 +130,6 @@ import kotlinx.coroutines.flow.transformLatest
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withTimeoutOrNull
import java.math.BigDecimal
import java.util.Locale
import java.util.UUID
@ -897,11 +896,9 @@ class Account(
}
suspend fun waitToDecrypt(peopleListFollows: GeneralListEvent): LiveFollowList? =
withTimeoutOrNull(1000) {
suspendCancellableCoroutine { continuation ->
decryptLiveFollows(peopleListFollows) {
continuation.resume(it)
}
tryAndWait { continuation ->
decryptLiveFollows(peopleListFollows) {
continuation.resume(it)
}
}
@ -921,11 +918,9 @@ class Account(
suspend fun decryptPeopleList(event: PeopleListEvent?): PeopleListEvent.UsersAndWords {
if (event == null) return PeopleListEvent.UsersAndWords()
return withTimeoutOrNull(1000) {
suspendCancellableCoroutine { continuation ->
event.publicAndPrivateUsersAndWords(signer) {
continuation.resume(it)
}
return tryAndWait { continuation ->
event.publicAndPrivateUsersAndWords(signer) {
continuation.resume(it)
}
} ?: PeopleListEvent.UsersAndWords()
}
@ -933,11 +928,9 @@ class Account(
suspend fun decryptMuteList(event: MuteListEvent?): PeopleListEvent.UsersAndWords {
if (event == null) return PeopleListEvent.UsersAndWords()
return withTimeoutOrNull(1000) {
suspendCancellableCoroutine { continuation ->
event.publicAndPrivateUsersAndWords(signer) {
continuation.resume(it)
}
return tryAndWait { continuation ->
event.publicAndPrivateUsersAndWords(signer) {
continuation.resume(it)
}
} ?: PeopleListEvent.UsersAndWords()
}
@ -992,11 +985,9 @@ class Account(
emit(null)
} else {
emit(
withTimeoutOrNull(1000) {
suspendCancellableCoroutine { continuation ->
userState.user.latestBookmarkList?.privateTags(signer) {
continuation.resume(userState.user.latestBookmarkList)
}
tryAndWait { continuation ->
userState.user.latestBookmarkList?.privateTags(signer) {
continuation.resume(userState.user.latestBookmarkList)
}
},
)
@ -1337,7 +1328,7 @@ class Account(
zappedNote?.isZappedBy(userProfile(), this, onWasZapped)
}
fun calculateZappedAmount(
suspend fun calculateZappedAmount(
zappedNote: Note?,
onReady: (BigDecimal) -> Unit,
) {

View File

@ -25,9 +25,11 @@ import androidx.compose.runtime.Stable
import androidx.lifecycle.LiveData
import androidx.lifecycle.MediatorLiveData
import androidx.lifecycle.distinctUntilChanged
import com.vitorpamplona.amethyst.launchAndWaitAll
import com.vitorpamplona.amethyst.service.NostrSingleEventDataSource
import com.vitorpamplona.amethyst.service.checkNotInMainThread
import com.vitorpamplona.amethyst.service.firstFullCharOrEmoji
import com.vitorpamplona.amethyst.tryAndWait
import com.vitorpamplona.amethyst.ui.actions.relays.updated
import com.vitorpamplona.amethyst.ui.note.combineWith
import com.vitorpamplona.amethyst.ui.note.toShortenHex
@ -65,13 +67,12 @@ import com.vitorpamplona.quartz.events.WrappedEvent
import com.vitorpamplona.quartz.signers.NostrSigner
import com.vitorpamplona.quartz.utils.TimeUtils
import com.vitorpamplona.quartz.utils.containsAny
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withTimeoutOrNull
import java.math.BigDecimal
import kotlin.coroutines.resume
@ -423,31 +424,36 @@ open class Note(
}
}
private fun recursiveIsPaidByCalculation(
private suspend fun isPaidByCalculation(
account: Account,
remainingZapPayments: List<Pair<Note, Note?>>,
zapEvents: List<Pair<Note, Note?>>,
onWasZappedByAuthor: () -> Unit,
) {
if (remainingZapPayments.isEmpty()) {
if (zapEvents.isEmpty()) {
return
}
val next = remainingZapPayments.first()
var hasSentOne = false
val zapResponseEvent = next.second?.event as? LnZapPaymentResponseEvent
if (zapResponseEvent != null) {
account.decryptZapPaymentResponseEvent(zapResponseEvent) { response ->
if (
response is PayInvoiceSuccessResponse &&
account.isNIP47Author(zapResponseEvent.requestAuthor())
) {
launchAndWaitAll(zapEvents) { next ->
val zapResponseEvent = next.second?.event as? LnZapPaymentResponseEvent
if (zapResponseEvent != null) {
val result =
tryAndWait { continuation ->
account.decryptZapPaymentResponseEvent(zapResponseEvent) { response ->
if (
response is PayInvoiceSuccessResponse &&
account.isNIP47Author(zapResponseEvent.requestAuthor())
) {
continuation.resume(true)
}
}
}
if (!hasSentOne && result == true) {
hasSentOne = true
onWasZappedByAuthor()
} else {
recursiveIsPaidByCalculation(
account,
remainingZapPayments.minus(next),
onWasZappedByAuthor,
)
}
}
}
@ -457,14 +463,14 @@ open class Note(
option: Int?,
user: User,
account: Account,
remainingZapEvents: Map<Note, Note?>,
zapEvents: Map<Note, Note?>,
onWasZappedByAuthor: () -> Unit,
) {
if (remainingZapEvents.isEmpty()) {
if (zapEvents.isEmpty()) {
return
}
remainingZapEvents.forEach { next ->
zapEvents.forEach { next ->
val zapRequest = next.key.event as LnZapRequestEvent
val zapEvent = next.value?.event as? LnZapEvent
@ -487,11 +493,9 @@ open class Note(
} else {
if (account.isWriteable()) {
val result =
withTimeoutOrNull(1000) {
suspendCancellableCoroutine { continuation ->
zapRequest.decryptPrivateZap(account.signer) {
continuation.resume(it)
}
tryAndWait { continuation ->
zapRequest.decryptPrivateZap(account.signer) {
continuation.resume(it)
}
}
@ -512,7 +516,7 @@ open class Note(
) {
isZappedByCalculation(null, user, account, zaps, onWasZappedByAuthor)
if (account.userProfile() == user) {
recursiveIsPaidByCalculation(account, zapPayments.toList(), onWasZappedByAuthor)
isPaidByCalculation(account, zapPayments.toList(), onWasZappedByAuthor)
}
}
@ -564,23 +568,79 @@ open class Note(
zapsAmount = sumOfAmounts
}
private fun recursiveZappedAmountCalculation(
invoiceSet: LinkedHashSet<String>,
remainingZapPayments: List<Pair<Note, Note?>>,
private suspend fun zappedAmountCalculation(
startAmount: BigDecimal,
paidInvoiceSet: LinkedHashSet<String>,
zapPayments: List<Pair<Note, Note?>>,
signer: NostrSigner,
output: BigDecimal,
onReady: (BigDecimal) -> Unit,
) {
if (remainingZapPayments.isEmpty()) {
onReady(output)
if (zapPayments.isEmpty()) {
onReady(startAmount)
return
}
val next = remainingZapPayments.first()
var output: BigDecimal = startAmount
(next.second?.event as? LnZapPaymentResponseEvent)?.response(signer) { noteEvent ->
launchAndWaitAll(zapPayments) { next ->
val result =
tryAndWait { continuation ->
processZapAmountFromResponse(
next.first,
next.second,
continuation,
signer,
)
}
if (result != null && !paidInvoiceSet.contains(result.invoice)) {
paidInvoiceSet.add(result.invoice)
output = output.add(result.amount)
}
}
onReady(output)
}
private fun processZapAmountFromResponse(
paymentRequest: Note,
paymentResponse: Note?,
continuation: CancellableContinuation<InvoiceAmount?>,
signer: NostrSigner,
) {
val nwcRequest = paymentRequest.event as? LnZapPaymentRequestEvent
val nwcResponse = paymentResponse?.event as? LnZapPaymentResponseEvent
if (nwcRequest != null && nwcResponse != null) {
processZapAmountFromResponse(
nwcRequest,
nwcResponse,
continuation,
signer,
)
} else {
continuation.resume(null)
}
}
class InvoiceAmount(
val invoice: String,
val amount: BigDecimal,
)
private fun processZapAmountFromResponse(
nwcRequest: LnZapPaymentRequestEvent,
nwcResponse: LnZapPaymentResponseEvent,
continuation: CancellableContinuation<InvoiceAmount?>,
signer: NostrSigner,
) {
// if we can decrypt the reply
nwcResponse.response(signer) { noteEvent ->
// if it is a sucess
if (noteEvent is PayInvoiceSuccessResponse) {
(next.first.event as? LnZapPaymentRequestEvent)?.lnInvoice(signer) { invoice ->
// if we can decrypt the invoice
nwcRequest.lnInvoice(signer) { invoice ->
// if we can parse the amount
val amount =
try {
LnInvoiceUtil.getAmountInSats(invoice)
@ -589,26 +649,20 @@ open class Note(
null
}
var newAmount = output
if (amount != null && !invoiceSet.contains(invoice)) {
invoiceSet.add(invoice)
newAmount += amount
// avoid double counting
if (amount != null) {
continuation.resume(InvoiceAmount(invoice, amount))
} else {
continuation.resume(null)
}
recursiveZappedAmountCalculation(
invoiceSet,
remainingZapPayments.minus(next),
signer,
newAmount,
onReady,
)
}
} else {
continuation.resume(null)
}
}
}
fun zappedAmountWithNWCPayments(
suspend fun zappedAmountWithNWCPayments(
signer: NostrSigner,
onReady: (BigDecimal) -> Unit,
) {
@ -619,11 +673,11 @@ open class Note(
val invoiceSet = LinkedHashSet<String>(zaps.size + zapPayments.size)
zaps.forEach { (it.value?.event as? LnZapEvent)?.lnInvoice()?.let { invoiceSet.add(it) } }
recursiveZappedAmountCalculation(
zappedAmountCalculation(
zapsAmount,
invoiceSet,
zapPayments.toList(),
signer,
zapsAmount,
onReady,
)
}

View File

@ -32,11 +32,10 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.vitorpamplona.amethyst.BuildConfig
import com.vitorpamplona.amethyst.R
import com.vitorpamplona.amethyst.model.Account
import com.vitorpamplona.amethyst.tryAndWait
import com.vitorpamplona.amethyst.ui.stringRes
import com.vitorpamplona.ammolite.service.HttpClientManager
import kotlinx.coroutines.delay
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withTimeoutOrNull
import okhttp3.MediaType.Companion.toMediaType
import okhttp3.MultipartBody
import okhttp3.Request
@ -304,10 +303,8 @@ class Nip96Uploader(
}
suspend fun nip98Header(url: String): String? =
withTimeoutOrNull(5000) {
suspendCancellableCoroutine { continuation ->
nip98Header(url, "POST") { authorizationToken -> continuation.resume(authorizationToken) }
}
tryAndWait { continuation ->
nip98Header(url, "POST") { authorizationToken -> continuation.resume(authorizationToken) }
}
fun nip98Header(

View File

@ -23,13 +23,12 @@ package com.vitorpamplona.amethyst.service
import android.content.Context
import androidx.compose.runtime.Immutable
import com.vitorpamplona.amethyst.R
import com.vitorpamplona.amethyst.collectSuccessfulOperations
import com.vitorpamplona.amethyst.model.Account
import com.vitorpamplona.amethyst.model.LocalCache
import com.vitorpamplona.amethyst.model.Note
import com.vitorpamplona.amethyst.model.User
import com.vitorpamplona.amethyst.service.NostrUserProfileDataSource.user
import com.vitorpamplona.amethyst.service.lnurl.LightningAddressResolver
import com.vitorpamplona.amethyst.ui.screen.loggedIn.collectSuccessfulSigningOperations
import com.vitorpamplona.amethyst.ui.stringRes
import com.vitorpamplona.quartz.events.AdvertisedRelayListEvent
import com.vitorpamplona.quartz.events.AppDefinitionEvent
@ -195,8 +194,8 @@ class ZapPaymentHandler(
)?.readRelays()
}?.toSet()
collectSuccessfulSigningOperations<ZapSplitSetup, SignAllZapRequestsReturn>(
operationsInput = zapsToSend,
collectSuccessfulOperations<ZapSplitSetup, SignAllZapRequestsReturn>(
items = zapsToSend,
runRequestFor = { next: ZapSplitSetup, onReady ->
if (next.isLnAddress) {
prepareZapRequestIfNeeded(note, pollOption, message, zapType) { zapRequestJson ->
@ -239,8 +238,8 @@ class ZapPaymentHandler(
var progressAllPayments = 0.00f
val totalWeight = invoices.sumOf { it.first.weight }
collectSuccessfulSigningOperations<Pair<ZapSplitSetup, SignAllZapRequestsReturn>, AssembleInvoiceReturn>(
operationsInput = invoices,
collectSuccessfulOperations<Pair<ZapSplitSetup, SignAllZapRequestsReturn>, AssembleInvoiceReturn>(
items = invoices,
runRequestFor = { splitZapRequestPair: Pair<ZapSplitSetup, SignAllZapRequestsReturn>, onReady ->
assembleInvoice(
splitSetup = splitZapRequestPair.first,
@ -272,8 +271,8 @@ class ZapPaymentHandler(
) {
var progressAllPayments = 0.00f
collectSuccessfulSigningOperations<String, Boolean>(
operationsInput = invoices,
collectSuccessfulOperations<String, Boolean>(
items = invoices,
runRequestFor = { invoice: String, onReady ->
account.sendZapPaymentRequestFor(
bolt11 = invoice,

View File

@ -25,18 +25,15 @@ import com.vitorpamplona.amethyst.AccountInfo
import com.vitorpamplona.amethyst.Amethyst
import com.vitorpamplona.amethyst.BuildConfig
import com.vitorpamplona.amethyst.LocalPreferences
import com.vitorpamplona.amethyst.launchAndWaitAll
import com.vitorpamplona.amethyst.model.AccountSettings
import com.vitorpamplona.amethyst.tryAndWait
import com.vitorpamplona.ammolite.service.HttpClientManager
import com.vitorpamplona.quartz.events.RelayAuthEvent
import com.vitorpamplona.quartz.signers.NostrSignerExternal
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeoutOrNull
import okhttp3.MediaType.Companion.toMediaType
import okhttp3.Request
import okhttp3.RequestBody.Companion.toRequestBody
@ -63,38 +60,26 @@ class RegisterAccounts(
return
}
coroutineScope {
val jobs =
remainingTos.map { accountRelayPair ->
async {
val result =
withTimeoutOrNull(10000) {
suspendCancellableCoroutine { continuation ->
val signer = accountRelayPair.first.createSigner()
// TODO: Modify the external launcher to launch as different users.
// Right now it only registers if Amber has already approved this signature
if (signer is NostrSignerExternal) {
signer.launcher.registerLauncher(
launcher = { },
contentResolver = Amethyst.instance::contentResolverFn,
)
}
launchAndWaitAll(remainingTos) { accountRelayPair ->
val result =
tryAndWait { continuation ->
val signer = accountRelayPair.first.createSigner()
// TODO: Modify the external launcher to launch as different users.
// Right now it only registers if Amber has already approved this signature
if (signer is NostrSignerExternal) {
signer.launcher.registerLauncher(
launcher = { },
contentResolver = Amethyst.instance::contentResolverFn,
)
}
RelayAuthEvent.create(accountRelayPair.second, notificationToken, signer) { result ->
continuation.resume(result)
}
}
}
if (result != null) {
output.add(result)
}
RelayAuthEvent.create(accountRelayPair.second, notificationToken, signer) { result ->
continuation.resume(result)
}
}
// runs in parallel to avoid overcrowding Amber.
withTimeoutOrNull(15000) {
jobs.joinAll()
if (result != null) {
output.add(result)
}
}

View File

@ -36,6 +36,7 @@ import coil3.imageLoader
import coil3.request.ImageRequest
import com.vitorpamplona.amethyst.Amethyst
import com.vitorpamplona.amethyst.R
import com.vitorpamplona.amethyst.collectSuccessfulOperations
import com.vitorpamplona.amethyst.commons.compose.GenericBaseCache
import com.vitorpamplona.amethyst.commons.compose.GenericBaseCacheAsync
import com.vitorpamplona.amethyst.model.Account
@ -108,9 +109,7 @@ import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharingStarted
@ -121,12 +120,8 @@ import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeoutOrNull
import kotlin.coroutines.resume
@Immutable open class ToastMsg
@ -524,8 +519,8 @@ class AccountViewModel(
)
}.toMutableMap()
collectSuccessfulSigningOperations<CombinedZap, ZapAmountCommentNotification>(
operationsInput = zaps.filter { (it.request.event as? LnZapRequestEvent)?.isPrivateZap() == true },
collectSuccessfulOperations<CombinedZap, ZapAmountCommentNotification>(
items = zaps.filter { (it.request.event as? LnZapRequestEvent)?.isPrivateZap() == true },
runRequestFor = { next, onReady ->
checkNotInMainThread()
@ -628,8 +623,8 @@ class AccountViewModel(
)
}.toMutableMap()
collectSuccessfulSigningOperations<Pair<Note, Note?>, ZapAmountCommentNotification>(
operationsInput = myList,
collectSuccessfulOperations<Pair<Note, Note?>, ZapAmountCommentNotification>(
items = myList,
runRequestFor = { next, onReady ->
innerDecryptAmountMessage(next.first, next.second, onReady)
},
@ -1679,42 +1674,6 @@ class AccountViewModel(
val nip19: Nip19Bech32.ParseReturn,
)
public suspend fun <T, K> collectSuccessfulSigningOperations(
operationsInput: List<T>,
runRequestFor: (T, (K) -> Unit) -> Unit,
output: MutableMap<T, K> = mutableMapOf(),
onReady: suspend (MutableMap<T, K>) -> Unit,
) {
if (operationsInput.isEmpty()) {
onReady(output)
return
}
coroutineScope {
val jobs =
operationsInput.map {
async {
val result =
withTimeoutOrNull(10000) {
suspendCancellableCoroutine { continuation ->
runRequestFor(it) { result: K -> continuation.resume(result) }
}
}
if (result != null) {
output[it] = result
}
}
}
// runs in parallel to avoid overcrowding Amber.
withTimeoutOrNull(15000) {
jobs.joinAll()
}
}
onReady(output)
}
@Composable
fun mockAccountViewModel(): AccountViewModel {
val sharedPreferencesViewModel: SharedPreferencesViewModel = viewModel()