nip60: Processed is also a function now.

This commit is contained in:
fiatjaf 2025-01-30 16:07:20 -03:00
parent b86d5d52bb
commit 14c4101a03
3 changed files with 26 additions and 40 deletions

View File

@ -151,19 +151,8 @@ func TestWalletRoundtrip(t *testing.T) {
// load wallets from events
walletStash := loadStash(ctx, kr, eventChan, make(chan struct{}))
var errorChanErr error
go func() {
for {
errorChanErr = <-walletStash.Processed
if errorChanErr != nil {
return
}
}
}()
<-done
time.Sleep(time.Millisecond * 200)
require.NoError(t, errorChanErr, "errorChan shouldn't have received any errors: %w", errorChanErr)
// compare loaded wallets with original ones
loadedWallet1 := walletStash.wallets[wallet1.Identifier]

View File

@ -32,8 +32,8 @@ type WalletStash struct {
isHistory bool,
)
// Processed emits an error or nil every time an event is processed
Processed chan error
// Processed, if not nil, is called every time a received event is processed
Processed func(*nostr.Event, error)
// Stable is closed when we have gotten an EOSE from all relays
Stable chan struct{}
@ -100,7 +100,6 @@ func loadStash(
pendingHistory: make(map[string][]HistoryEntry),
pendingDeletions: make([]string, 0, 128),
kr: kr,
Processed: make(chan error),
Stable: make(chan struct{}),
}
@ -138,8 +137,10 @@ func loadStash(
wl: wl,
}
if err := wallet.parse(ctx, kr, ie.Event); err != nil {
if wl.Processed != nil {
wl.Processed(ie.Event, err)
}
wl.Unlock()
wl.Processed <- fmt.Errorf("event %s failed: %w", ie.Event, err)
continue
}
@ -182,21 +183,27 @@ func loadStash(
case 7375: // token
ref := ie.Event.Tags.GetFirst([]string{"a", ""})
if ref == nil {
if wl.Processed != nil {
wl.Processed(ie.Event, fmt.Errorf("event missing 'a' tag"))
}
wl.Unlock()
wl.Processed <- fmt.Errorf("event %s missing 'a' tag", ie.Event)
continue
}
spl := strings.SplitN((*ref)[1], ":", 3)
if len(spl) < 3 {
if wl.Processed != nil {
wl.Processed(ie.Event, fmt.Errorf("event with invalid 'a' tag"))
}
wl.Unlock()
wl.Processed <- fmt.Errorf("event %s invalid 'a' tag", ie.Event)
continue
}
token := Token{}
if err := token.parse(ctx, kr, ie.Event); err != nil {
if wl.Processed != nil {
wl.Processed(ie.Event, err)
}
wl.Unlock()
wl.Processed <- fmt.Errorf("event %s failed: %w", ie.Event, err)
continue
}
@ -226,21 +233,27 @@ func loadStash(
case 7376: // history
ref := ie.Event.Tags.GetFirst([]string{"a", ""})
if ref == nil {
if wl.Processed != nil {
wl.Processed(ie.Event, fmt.Errorf("event missing 'a' tag"))
}
wl.Unlock()
wl.Processed <- fmt.Errorf("event %s missing 'a' tag", ie.Event)
continue
}
spl := strings.SplitN((*ref)[1], ":", 3)
if len(spl) < 3 {
if wl.Processed != nil {
wl.Processed(ie.Event, fmt.Errorf("event with invalid 'a' tag"))
}
wl.Unlock()
wl.Processed <- fmt.Errorf("event %s invalid 'a' tag", ie.Event)
continue
}
he := HistoryEntry{}
if err := he.parse(ctx, kr, ie.Event); err != nil {
if wl.Processed != nil {
wl.Processed(ie.Event, err)
}
wl.Unlock()
wl.Processed <- fmt.Errorf("event %s failed: %w", ie.Event, err)
continue
}
@ -255,7 +268,9 @@ func loadStash(
}
}
wl.Processed <- nil
if wl.Processed != nil {
wl.Processed(ie.Event, nil)
}
wl.Unlock()
}
}()

View File

@ -57,24 +57,6 @@ func TestWalletTransfer(t *testing.T) {
pool.PublishMany(ctx, testRelays, event)
}
// handle events from both stashes
go func() {
for {
select {
case err := <-stash1.Processed:
if err != nil {
t.Errorf("stash1 processing error: %v", err)
}
case err := <-stash2.Processed:
if err != nil {
t.Errorf("stash2 processing error: %v", err)
}
case <-ctx.Done():
return
}
}
}()
// wait for initial load
select {
case <-stash1.Stable: