go-nostr/nip60/stash.go

267 lines
6.3 KiB
Go

package nip60
import (
"context"
"fmt"
"iter"
"strings"
"sync"
"time"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/nbd-wtf/go-nostr"
)
type WalletStash struct {
sync.Mutex
wallets map[string]*Wallet
pendingHistory map[string][]HistoryEntry // history entries not yet assigned to a wallet
pendingTokens map[string][]Token // tokens not yet assigned to a wallet
pendingDeletions []string // token events that should be deleted
kr nostr.Keyer
// Changes emits a stream of events that must be published whenever something changes
Changes chan nostr.Event
// Processed emits an error or nil every time an event is processed
Processed chan error
// Stable is closed when we have gotten an EOSE from all relays
Stable chan struct{}
}
func LoadStash(
ctx context.Context,
kr nostr.Keyer,
pool *nostr.SimplePool,
relays []string,
) *WalletStash {
pk, err := kr.GetPublicKey(ctx)
if err != nil {
return nil
}
eoseChan := make(chan struct{})
events := pool.SubManyNotifyEOSE(
ctx,
relays,
nostr.Filters{
{Kinds: []int{37375, 7375, 7376}, Authors: []string{pk}},
{Kinds: []int{5}, Tags: nostr.TagMap{"k": []string{"7375"}}, Authors: []string{pk}},
},
eoseChan,
)
return loadStash(ctx, kr, events, eoseChan)
}
func loadStash(
ctx context.Context,
kr nostr.Keyer,
events chan nostr.RelayEvent,
eoseChan chan struct{},
) *WalletStash {
wl := &WalletStash{
wallets: make(map[string]*Wallet, 1),
pendingTokens: make(map[string][]Token),
pendingHistory: make(map[string][]HistoryEntry),
pendingDeletions: make([]string, 0, 128),
kr: kr,
Changes: make(chan nostr.Event),
Processed: make(chan error),
Stable: make(chan struct{}),
}
eosed := false
go func() {
<-eoseChan
eosed = true
// check all pending deletions and delete stuff locally
for _, id := range wl.pendingDeletions {
wl.removeDeletedToken(id)
}
wl.pendingDeletions = nil
time.Sleep(100 * time.Millisecond) // race condition hack
close(wl.Stable)
}()
go func() {
for ie := range events {
wl.Lock()
switch ie.Event.Kind {
case 5:
if !eosed {
for _, tag := range ie.Event.Tags.All([]string{"e", ""}) {
wl.pendingDeletions = append(wl.pendingDeletions, tag[1])
}
} else {
for _, tag := range ie.Event.Tags.All([]string{"e", ""}) {
wl.removeDeletedToken(tag[1])
}
}
case 37375:
wallet := &Wallet{
wl: wl,
}
if err := wallet.parse(ctx, kr, ie.Event); err != nil {
wl.Unlock()
wl.Processed <- fmt.Errorf("event %s failed: %w", ie.Event, err)
continue
}
// if we already have a wallet with this identifier then we must be careful
if curr, ok := wl.wallets[wallet.Identifier]; ok {
// if the metadata we have is newer ignore this event
if curr.event.CreatedAt > ie.Event.CreatedAt {
wl.Unlock()
continue
}
// otherwise transfer history events and tokens to the new wallet object
wallet.Tokens = curr.Tokens
wallet.History = curr.History
}
// get all pending stuff and assign them to this, then delete the pending stuff
for _, he := range wl.pendingHistory[wallet.Identifier] {
wallet.History = append(wallet.History, he)
}
delete(wl.pendingHistory, wallet.Identifier)
wallet.tokensMu.Lock()
for _, token := range wl.pendingTokens[wallet.Identifier] {
wallet.Tokens = append(wallet.Tokens, token)
}
delete(wl.pendingTokens, wallet.Identifier)
wallet.tokensMu.Unlock()
// finally save the new wallet object
wl.wallets[wallet.Identifier] = wallet
case 7375: // token
ref := ie.Event.Tags.GetFirst([]string{"a", ""})
if ref == nil {
wl.Unlock()
wl.Processed <- fmt.Errorf("event %s missing 'a' tag", ie.Event)
continue
}
spl := strings.SplitN((*ref)[1], ":", 3)
if len(spl) < 3 {
wl.Unlock()
wl.Processed <- fmt.Errorf("event %s invalid 'a' tag", ie.Event)
continue
}
token := Token{}
if err := token.parse(ctx, kr, ie.Event); err != nil {
wl.Unlock()
wl.Processed <- fmt.Errorf("event %s failed: %w", ie.Event, err)
continue
}
if wallet, ok := wl.wallets[spl[2]]; ok {
wallet.tokensMu.Lock()
wallet.Tokens = append(wallet.Tokens, token)
wallet.tokensMu.Unlock()
} else {
wl.pendingTokens[spl[2]] = append(wl.pendingTokens[spl[2]], token)
}
// keep track tokens that were deleted by this, if they exist
if !eosed {
for _, del := range token.Deleted {
wl.pendingDeletions = append(wl.pendingDeletions, del)
}
} else {
for _, del := range token.Deleted {
wl.removeDeletedToken(del)
}
}
case 7376: // history
ref := ie.Event.Tags.GetFirst([]string{"a", ""})
if ref == nil {
wl.Unlock()
wl.Processed <- fmt.Errorf("event %s missing 'a' tag", ie.Event)
continue
}
spl := strings.SplitN((*ref)[1], ":", 3)
if len(spl) < 3 {
wl.Unlock()
wl.Processed <- fmt.Errorf("event %s invalid 'a' tag", ie.Event)
continue
}
he := HistoryEntry{}
if err := he.parse(ctx, kr, ie.Event); err != nil {
wl.Unlock()
wl.Processed <- fmt.Errorf("event %s failed: %w", ie.Event, err)
continue
}
if wallet, ok := wl.wallets[spl[2]]; ok {
wallet.History = append(wallet.History, he)
} else {
wl.pendingHistory[spl[2]] = append(wl.pendingHistory[spl[2]], he)
}
}
wl.Processed <- nil
wl.Unlock()
}
}()
return wl
}
func (wl *WalletStash) removeDeletedToken(eventId string) {
for _, w := range wl.wallets {
for t := len(w.Tokens) - 1; t >= 0; t-- {
token := w.Tokens[t]
if token.event != nil && token.event.ID == eventId {
// swap delete
w.Tokens[t] = w.Tokens[len(w.Tokens)-1]
w.Tokens = w.Tokens[0 : len(w.Tokens)-1]
}
}
}
}
func (wl *WalletStash) EnsureWallet(id string) *Wallet {
wl.Lock()
defer wl.Unlock()
if w, ok := wl.wallets[id]; ok {
return w
}
sk, err := btcec.NewPrivateKey()
if err != nil {
panic(err)
}
w := &Wallet{
Identifier: id,
PrivateKey: sk,
PublicKey: sk.PubKey(),
wl: wl,
}
wl.wallets[id] = w
return w
}
func (wl *WalletStash) Wallets() iter.Seq[*Wallet] {
return func(yield func(*Wallet) bool) {
wl.Lock()
defer wl.Unlock()
for _, w := range wl.wallets {
if !yield(w) {
return
}
}
}
}