diff --git a/pool.go b/pool.go index b40a37d..4874ed2 100644 --- a/pool.go +++ b/pool.go @@ -287,7 +287,7 @@ func (pool *SimplePool) SubMany( hasAuthed = false subscribe: - sub, err = relay.Subscribe(ctx, filters, append(opts, WithCheckDuplicate(func(relay, id string) bool { + sub, err = relay.Subscribe(ctx, filters, append(opts, WithCheckDuplicate(func(id, relay string) bool { _, exists := seenAlready.Load(id) if exists && pool.duplicateMiddleware != nil { pool.duplicateMiddleware(relay, id) @@ -417,7 +417,7 @@ func (pool *SimplePool) SubManyEose( hasAuthed := false subscribe: - sub, err := relay.Subscribe(ctx, filters, append(opts, WithCheckDuplicate(func(relay, id string) bool { + sub, err := relay.Subscribe(ctx, filters, append(opts, WithCheckDuplicate(func(id, relay string) bool { _, exists := seenAlready.Load(id) if exists && pool.duplicateMiddleware != nil { pool.duplicateMiddleware(relay, id) diff --git a/relay.go b/relay.go index f5518d6..de6c3a4 100644 --- a/relay.go +++ b/relay.go @@ -226,7 +226,7 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error subid := extractSubID(message) subscription, ok := r.Subscriptions.Load(subIdToSerial(subid)) if ok && subscription.checkDuplicate != nil { - if !subscription.checkDuplicate(extractEventID(message[10+len(subid):]), r.URL) { + if subscription.checkDuplicate(extractEventID(message[10+len(subid):]), r.URL) { continue } } diff --git a/sdk/event_relays.go b/sdk/event_relays.go index 4716eaa..2d85ef5 100644 --- a/sdk/event_relays.go +++ b/sdk/event_relays.go @@ -1,9 +1,11 @@ package sdk import ( - "encoding/binary" "encoding/hex" "fmt" + "slices" + + "github.com/nbd-wtf/go-nostr/sdk/kvstore" ) const eventRelayPrefix = byte('r') @@ -19,15 +21,18 @@ func makeEventRelayKey(eventID []byte) []byte { func encodeRelayList(relays []string) []byte { totalSize := 0 for _, relay := range relays { - totalSize += 2 + len(relay) // 2 bytes for length prefix + totalSize += 1 + len(relay) // 1 byte for length prefix } buf := make([]byte, totalSize) offset := 0 for _, relay := range relays { - binary.LittleEndian.PutUint16(buf[offset:], uint16(len(relay))) - offset += 2 + if len(relay) > 256 { + continue + } + buf[offset] = uint8(len(relay)) + offset += 1 copy(buf[offset:], relay) offset += len(relay) } @@ -40,12 +45,12 @@ func decodeRelayList(data []byte) []string { offset := 0 for offset < len(data) { - if offset+2 > len(data) { + if offset+1 > len(data) { return nil // malformed } - length := int(binary.LittleEndian.Uint16(data[offset:])) - offset += 2 + length := int(data[offset]) + offset += 1 if offset+length > len(data) { return nil // malformed @@ -59,7 +64,7 @@ func decodeRelayList(data []byte) []string { return relays } -func (sys *System) trackEventRelayCommon(eventID string, relay string) { +func (sys *System) trackEventRelayCommon(eventID string, relay string, onlyIfItExists bool) { // decode the event ID hex into bytes idBytes, err := hex.DecodeString(eventID) if err != nil || len(idBytes) < 8 { @@ -74,20 +79,22 @@ func (sys *System) trackEventRelayCommon(eventID string, relay string) { var relays []string if data != nil { relays = decodeRelayList(data) - } else { - relays = make([]string, 0, 1) - } - // check if relay is already in list - for _, r := range relays { - if r == relay { - return data, nil // no change needed + // check if relay is already in list + if slices.Contains(relays, relay) { + return nil, kvstore.NoOp // no change needed } - } - // append new relay - relays = append(relays, relay) - return encodeRelayList(relays), nil + // append new relay + relays = append(relays, relay) + return encodeRelayList(relays), nil + } else if onlyIfItExists { + // when this flag exists and nothing was found we won't create anything + return nil, kvstore.NoOp + } else { + // nothing exists, so create it + return encodeRelayList([]string{relay}), nil + } }) } diff --git a/sdk/kvstore/badger/store.go b/sdk/kvstore/badger/store.go index 57b55c4..9ad7db2 100644 --- a/sdk/kvstore/badger/store.go +++ b/sdk/kvstore/badger/store.go @@ -76,7 +76,9 @@ func (s *Store) Update(key []byte, f func([]byte) ([]byte, error)) error { } newVal, err := f(val) - if err != nil { + if err == kvstore.NoOp { + return nil + } else if err != nil { return err } diff --git a/sdk/kvstore/lmdb/store.go b/sdk/kvstore/lmdb/store.go index 1be0e21..5863d9b 100644 --- a/sdk/kvstore/lmdb/store.go +++ b/sdk/kvstore/lmdb/store.go @@ -104,7 +104,9 @@ func (s *Store) Update(key []byte, f func([]byte) ([]byte, error)) error { } newVal, err := f(val) - if err != nil { + if err == kvstore.NoOp { + return nil + } else if err != nil { return err } diff --git a/sdk/kvstore/memory/store.go b/sdk/kvstore/memory/store.go index ff8a807..fca4b76 100644 --- a/sdk/kvstore/memory/store.go +++ b/sdk/kvstore/memory/store.go @@ -22,7 +22,7 @@ func NewStore() *Store { func (s *Store) Get(key []byte) ([]byte, error) { s.RLock() defer s.RUnlock() - + if val, ok := s.data[string(key)]; ok { // Return a copy to prevent modification of stored data cp := make([]byte, len(val)) @@ -35,7 +35,7 @@ func (s *Store) Get(key []byte) ([]byte, error) { func (s *Store) Set(key []byte, value []byte) error { s.Lock() defer s.Unlock() - + // Store a copy to prevent modification of stored data cp := make([]byte, len(value)) copy(cp, value) @@ -69,7 +69,9 @@ func (s *Store) Update(key []byte, f func([]byte) ([]byte, error)) error { } newVal, err := f(val) - if err != nil { + if err == kvstore.NoOp { + return nil + } else if err != nil { return err } diff --git a/sdk/kvstore/noop.go b/sdk/kvstore/noop.go new file mode 100644 index 0000000..4740278 --- /dev/null +++ b/sdk/kvstore/noop.go @@ -0,0 +1,5 @@ +package kvstore + +import "fmt" + +var NoOp = fmt.Errorf("noop") diff --git a/sdk/metadata.go b/sdk/metadata.go index d1f2535..96272af 100644 --- a/sdk/metadata.go +++ b/sdk/metadata.go @@ -88,9 +88,8 @@ func (sys System) FetchProfileFromInput(ctx context.Context, nip19OrNip05Code st hintType = hints.LastInNprofile } for _, r := range p.Relays { - nm := nostr.NormalizeURL(r) - if !IsVirtualRelay(nm) { - sys.Hints.Save(p.PublicKey, nm, hintType, nostr.Now()) + if !IsVirtualRelay(r) { + sys.Hints.Save(p.PublicKey, nostr.NormalizeURL(r), hintType, nostr.Now()) } } diff --git a/sdk/outbox.go b/sdk/outbox.go index 9804ee5..ee8387c 100644 --- a/sdk/outbox.go +++ b/sdk/outbox.go @@ -3,18 +3,26 @@ package sdk import ( "context" "fmt" + "strconv" "sync" "time" "github.com/nbd-wtf/go-nostr" ) +var outboxShortTermCache = [256]ostcEntry{} + +type ostcEntry struct { + pubkey string + relays []string + when time.Time +} + func (sys *System) FetchOutboxRelays(ctx context.Context, pubkey string, n int) []string { - if relays, ok := sys.outboxShortTermCache.Get(pubkey); ok { - if len(relays) > n { - relays = relays[0:n] - } - return relays + ostcIndex, _ := strconv.ParseUint(pubkey[12:14], 16, 8) + now := time.Now() + if entry := outboxShortTermCache[ostcIndex]; entry.pubkey == pubkey && entry.when.Add(time.Minute*2).After(now) { + return entry.relays } // if we have it cached that means we have at least tried to fetch recently and it won't be tried again @@ -25,7 +33,11 @@ func (sys *System) FetchOutboxRelays(ctx context.Context, pubkey string, n int) return []string{"wss://relay.damus.io", "wss://nos.lol"} } - sys.outboxShortTermCache.SetWithTTL(pubkey, relays, time.Minute*2) + // we save a copy of this slice to this cache (must be a copy otherwise + // we will have a reference to a thing that the caller to this function may change at will) + relaysCopy := make([]string, len(relays)) + copy(relaysCopy, relays) + outboxShortTermCache[ostcIndex] = ostcEntry{pubkey, relaysCopy, now} if len(relays) > n { relays = relays[0:n] diff --git a/sdk/replaceable_loader.go b/sdk/replaceable_loader.go index 19aa3be..d654187 100644 --- a/sdk/replaceable_loader.go +++ b/sdk/replaceable_loader.go @@ -202,7 +202,7 @@ func (sys *System) batchReplaceableRelayQueries( defer wg.Done() n := len(filter.Authors) - ctx, cancel := context.WithTimeout(ctx, time.Millisecond*450+time.Millisecond*50*time.Duration(n)) + ctx, cancel := context.WithTimeout(ctx, time.Millisecond*950+time.Millisecond*50*time.Duration(n)) defer cancel() received := 0 diff --git a/sdk/sdk_test.go b/sdk/sdk_test.go new file mode 100644 index 0000000..8540503 --- /dev/null +++ b/sdk/sdk_test.go @@ -0,0 +1,35 @@ +package sdk + +import ( + "context" + "testing" + + "github.com/nbd-wtf/go-nostr" + "github.com/stretchr/testify/require" +) + +func TestSystemFiatjaf(t *testing.T) { + sys := NewSystem() + ctx := context.Background() + + // get metadata + meta, err := sys.FetchProfileFromInput(ctx, "nprofile1qyxhwumn8ghj7mn0wvhxcmmvqyd8wumn8ghj7un9d3shjtnhv4ehgetjde38gcewvdhk6qpq80cvv07tjdrrgpa0j7j7tmnyl2yr6yr7l8j4s3evf6u64th6gkwswpnfsn") + require.NoError(t, err) + require.Equal(t, "fiatjaf", meta.Name) + + // check outbox relays + relays := sys.FetchOutboxRelays(ctx, meta.PubKey, 5) + require.Contains(t, relays, "wss://relay.westernbtc.com") + require.Contains(t, relays, "wss://pyramid.fiatjaf.com") + + // fetch notes + filter := nostr.Filter{ + Kinds: []int{1}, + Authors: []string{meta.PubKey}, + Limit: 5, + } + events, err := sys.FetchUserEvents(ctx, filter) + require.NoError(t, err) + require.NotEmpty(t, events[meta.PubKey]) + require.GreaterOrEqual(t, len(events[meta.PubKey]), 5) +} diff --git a/sdk/specific_event.go b/sdk/specific_event.go index 4c8caa2..adf791f 100644 --- a/sdk/specific_event.go +++ b/sdk/specific_event.go @@ -91,7 +91,7 @@ func (sys *System) FetchSpecificEvent( // (we do this after fetching author outbox relays because we are already going to prioritize these hints) now := nostr.Now() for _, relay := range priorityRelays { - sys.Hints.Save(author, relay, hints.LastInNevent, now) + sys.Hints.Save(author, nostr.NormalizeURL(relay), hints.LastInNevent, now) } // arrange these diff --git a/sdk/system.go b/sdk/system.go index 541b191..56626d6 100644 --- a/sdk/system.go +++ b/sdk/system.go @@ -43,9 +43,8 @@ type System struct { StoreRelay nostr.RelayStore - replaceableLoaders []*dataloader.Loader[string, *nostr.Event] - addressableLoaders []*dataloader.Loader[string, []*nostr.Event] - outboxShortTermCache cache.Cache32[[]string] + replaceableLoaders []*dataloader.Loader[string, *nostr.Event] + addressableLoaders []*dataloader.Loader[string, []*nostr.Event] } type SystemModifier func(sys *System) @@ -105,14 +104,11 @@ func NewSystem(mods ...SystemModifier) *System { "wss://search.nos.today", ), Hints: memoryh.NewHintDB(), - - outboxShortTermCache: cache_memory.New32[[]string](1000), } sys.Pool = nostr.NewSimplePool(context.Background(), nostr.WithAuthorKindQueryMiddleware(sys.TrackQueryAttempts), - nostr.WithEventMiddleware(sys.TrackEventHints), - nostr.WithEventMiddleware(sys.TrackEventRelays), + nostr.WithEventMiddleware(sys.TrackEventHintsAndRelays), nostr.WithDuplicateMiddleware(sys.TrackEventRelaysD), nostr.WithPenaltyBox(), ) diff --git a/sdk/tracker.go b/sdk/tracker.go index 78362b0..4d8fae6 100644 --- a/sdk/tracker.go +++ b/sdk/tracker.go @@ -20,7 +20,7 @@ func (sys *System) TrackQueryAttempts(relay string, author string, kind int) { sys.Hints.Save(author, relay, hints.LastFetchAttempt, nostr.Now()) } -func (sys *System) TrackEventHints(ie nostr.RelayEvent) { +func (sys *System) TrackEventHintsAndRelays(ie nostr.RelayEvent) { if IsVirtualRelay(ie.Relay.URL) { return } @@ -28,6 +28,10 @@ func (sys *System) TrackEventHints(ie nostr.RelayEvent) { return } + if ie.Kind != 0 && ie.Kind != 10002 { + sys.trackEventRelayCommon(ie.ID, ie.Relay.URL, false) + } + switch ie.Kind { case nostr.KindProfileMetadata: // this could be anywhere so it doesn't count @@ -43,7 +47,7 @@ func (sys *System) TrackEventHints(ie nostr.RelayEvent) { continue } if len(tag) == 2 || (tag[2] == "" || tag[2] == "write") { - sys.Hints.Save(ie.PubKey, tag[1], hints.LastInRelayList, ie.CreatedAt) + sys.Hints.Save(ie.PubKey, nostr.NormalizeURL(tag[1]), hints.LastInRelayList, ie.CreatedAt) } } case nostr.KindFollowList: @@ -59,11 +63,11 @@ func (sys *System) TrackEventHints(ie nostr.RelayEvent) { continue } if tag[0] == "p" && nostr.IsValidPublicKey(tag[1]) { - sys.Hints.Save(tag[1], tag[2], hints.LastInTag, ie.CreatedAt) + sys.Hints.Save(tag[1], nostr.NormalizeURL(tag[2]), hints.LastInTag, ie.CreatedAt) } } default: - // everything else may have hints + // everything else we track by relays and also check for hints sys.Hints.Save(ie.PubKey, ie.Relay.URL, hints.MostRecentEventFetched, ie.CreatedAt) for _, tag := range ie.Tags { @@ -77,7 +81,7 @@ func (sys *System) TrackEventHints(ie nostr.RelayEvent) { continue } if tag[0] == "p" && nostr.IsValidPublicKey(tag[1]) { - sys.Hints.Save(tag[1], tag[2], hints.LastInTag, ie.CreatedAt) + sys.Hints.Save(tag[1], nostr.NormalizeURL(tag[2]), hints.LastInTag, ie.CreatedAt) } } @@ -91,7 +95,7 @@ func (sys *System) TrackEventHints(ie nostr.RelayEvent) { continue } if nostr.IsValidPublicKey(ref.Profile.PublicKey) { - sys.Hints.Save(ref.Profile.PublicKey, relay, hints.LastInNprofile, ie.CreatedAt) + sys.Hints.Save(ref.Profile.PublicKey, nostr.NormalizeURL(relay), hints.LastInNprofile, ie.CreatedAt) } } } else if ref.Event != nil && nostr.IsValidPublicKey(ref.Event.Author) { @@ -102,17 +106,16 @@ func (sys *System) TrackEventHints(ie nostr.RelayEvent) { if p, err := url.Parse(relay); err != nil || (p.Scheme != "wss" && p.Scheme != "ws") { continue } - sys.Hints.Save(ref.Event.Author, relay, hints.LastInNevent, ie.CreatedAt) + sys.Hints.Save(ref.Event.Author, nostr.NormalizeURL(relay), hints.LastInNevent, ie.CreatedAt) } } } } } -func (sys *System) TrackEventRelays(ie nostr.RelayEvent) { - sys.trackEventRelayCommon(ie.ID, ie.Relay.URL) -} - func (sys *System) TrackEventRelaysD(relay, id string) { - sys.trackEventRelayCommon(id, relay) + if IsVirtualRelay(relay) { + return + } + sys.trackEventRelayCommon(id, relay, true /* we pass this flag so we'll skip creating entries for events that didn't pass the checks on the function above -- i.e. ephemeral events */) } diff --git a/subscription.go b/subscription.go index 202e155..bf9ccc3 100644 --- a/subscription.go +++ b/subscription.go @@ -63,7 +63,7 @@ type WithLabel string func (_ WithLabel) IsSubscriptionOption() {} // WithCheckDuplicate sets checkDuplicate on the subscription -type WithCheckDuplicate func(relay, id string) bool +type WithCheckDuplicate func(id, relay string) bool func (_ WithCheckDuplicate) IsSubscriptionOption() {}