From cc23d81e80553acd2a6a7966f754a70b4ed8dddb Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Wed, 12 Mar 2025 00:46:43 -0300 Subject: [PATCH] pool's .FetchManyReplaceable() and amends to previous commit. --- helpers.go | 15 +++++++ helpers_test.go | 11 ++++++ pool.go | 101 ++++++++++++++++++++++++++++++++++++++++++++++++ relay.go | 19 +++++---- subscription.go | 4 +- tags.go | 2 +- 6 files changed, 142 insertions(+), 10 deletions(-) diff --git a/helpers.go b/helpers.go index 4fb6cb0..0ec7f63 100644 --- a/helpers.go +++ b/helpers.go @@ -155,6 +155,21 @@ func extractEventID(jsonStr string) string { return jsonStr[start : start+64] } +func extractEventPubKey(jsonStr string) string { + // look for "pubkey" pattern + start := strings.Index(jsonStr, `"pubkey"`) + if start == -1 { + return "" + } + + // move to the next quote + offset := strings.IndexRune(jsonStr[start+8:], '"') + start += 8 + offset + 1 + + // get 64 characters of the pubkey + return jsonStr[start : start+64] +} + func extractDTag(jsonStr string) string { // look for ["d", pattern start := strings.Index(jsonStr, `["d"`) diff --git a/helpers_test.go b/helpers_test.go index e7872f5..a22744b 100644 --- a/helpers_test.go +++ b/helpers_test.go @@ -36,6 +36,17 @@ func TestIDExtract(t *testing.T) { } } +func TestPubKeyExtract(t *testing.T) { + { + data := `{"kind":1,"id":"6b5988e9471fa340880a40df815befc69c901420facfb670acd8308012088f16","pubkey":"67ada8e344532cbf82f0e702472e24c7896e0e1c96235eacbaaa4b8616052171","created_at":1736909072,"tags":[["e","cfdf18b78527455097515545be4ccbe17e9b88f64539a566c632e405e2c0d08a","","root"],["e","f1ec9c301383be082f1860f7e24e49164d855bfab67f8e5c3ed17f6f3f867cca","","reply"],["p","1afe0c74e3d7784eba93a5e3fa554a6eeb01928d12739ae8ba4832786808e36d"],["p","8aa642e26e65072139e10db59646a89aa7538a59965aab3ed89191d71967d6c3"],["p","f4d89779148ccd245c8d50914a284fd62d97cb0fb68b797a70f24a172b522db9"],["p","18905d0a5d623ab81a98ba98c582bd5f57f2506c6b808905fc599d5a0b229b08"],["p","9a0e2043afaa056a12b8bbe77ac4c3185c0e2bc46b12aac158689144323c0e3c"],["p","45f195cffcb8c9724efc248f0507a2fb65b579dfabe7cd35398598163cab7627"]],"content":"🫡","sig":"d21aaf43963b07a3cb5f85ac8809c2b2e4dd3269195f4d810e1b7650895178fe01cf685ab3ee93f193cdde1f8d17419ff05332c6e3fc7429bbbe3d70016b8638"}` + require.Equal(t, "67ada8e344532cbf82f0e702472e24c7896e0e1c96235eacbaaa4b8616052171", extractEventPubKey(data)) + } + { + data := `{"kind":1,"pubkey":"67ada8e344532cbf82f0e702472e24c7896e0e1c96235eacbaaa4b8616052171","created_at":1736909072,"tags":[["e","cfdf18b78527455097515545be4ccbe17e9b88f64539a566c632e405e2c0d08a","","root"],["e","f1ec9c301383be082f1860f7e24e49164d855bfab67f8e5c3ed17f6f3f867cca","","reply"],["p","1afe0c74e3d7784eba93a5e3fa554a6eeb01928d12739ae8ba4832786808e36d"],["p","8aa642e26e65072139e10db59646a89aa7538a59965aab3ed89191d71967d6c3"],["p","f4d89779148ccd245c8d50914a284fd62d97cb0fb68b797a70f24a172b522db9"],["p","18905d0a5d623ab81a98ba98c582bd5f57f2506c6b808905fc599d5a0b229b08"],["p","9a0e2043afaa056a12b8bbe77ac4c3185c0e2bc46b12aac158689144323c0e3c"],["p","45f195cffcb8c9724efc248f0507a2fb65b579dfabe7cd35398598163cab7627"]],"content":"🫡","sig":"d21aaf43963b07a3cb5f85ac8809c2b2e4dd3269195f4d810e1b7650895178fe01cf685ab3ee93f193cdde1f8d17419ff05332c6e3fc7429bbbe3d70016b8638","id": "6b5988e9471fa340880a40df815befc69c901420facfb670acd8308012088f16" }` + require.Equal(t, "67ada8e344532cbf82f0e702472e24c7896e0e1c96235eacbaaa4b8616052171", extractEventPubKey(data)) + } +} + func TestSubIdExtract(t *testing.T) { { data := `["EVENT", "xxz" ,{"kind":1,"id":"6b5988e9471fa340880a40df815befc69c901420facfb670acd8308012088f16","pubkey":"67ada8e344532cbf82f0e702472e24c7896e0e1c96235eacbaaa4b8616052171","created_at":1736909072,"tags":[["e","cfdf18b78527455097515545be4ccbe17e9b88f64539a566c632e405e2c0d08a","","root"],["e","f1ec9c301383be082f1860f7e24e49164d855bfab67f8e5c3ed17f6f3f867cca","","reply"],["p","1afe0c74e3d7784eba93a5e3fa554a6eeb01928d12739ae8ba4832786808e36d"],["p","8aa642e26e65072139e10db59646a89aa7538a59965aab3ed89191d71967d6c3"],["p","f4d89779148ccd245c8d50914a284fd62d97cb0fb68b797a70f24a172b522db9"],["p","18905d0a5d623ab81a98ba98c582bd5f57f2506c6b808905fc599d5a0b229b08"],["p","9a0e2043afaa056a12b8bbe77ac4c3185c0e2bc46b12aac158689144323c0e3c"],["p","45f195cffcb8c9724efc248f0507a2fb65b579dfabe7cd35398598163cab7627"]],"content":"🫡","sig":"d21aaf43963b07a3cb5f85ac8809c2b2e4dd3269195f4d810e1b7650895178fe01cf685ab3ee93f193cdde1f8d17419ff05332c6e3fc7429bbbe3d70016b8638"}]` diff --git a/pool.go b/pool.go index 9b8ba97..514f7d9 100644 --- a/pool.go +++ b/pool.go @@ -278,6 +278,107 @@ func (pool *SimplePool) SubscribeManyNotifyEOSE( return pool.subMany(ctx, urls, Filters{filter}, eoseChan, opts...) } +type ReplaceableKey struct { + PubKey string + D string +} + +// FetchManyReplaceable is like FetchMany, but deduplicates replaceable and addressable events and returns +// only the latest for each "d" tag. +func (pool *SimplePool) FetchManyReplaceable( + ctx context.Context, + urls []string, + filter Filter, + opts ...SubscriptionOption, +) *xsync.MapOf[ReplaceableKey, *Event] { + ctx, cancel := context.WithCancelCause(ctx) + + results := xsync.NewMapOf[ReplaceableKey, *Event]() + + wg := sync.WaitGroup{} + wg.Add(len(urls)) + + seenAlreadyLatest := xsync.NewMapOf[ReplaceableKey, Timestamp]() + opts = append(opts, WithCheckDuplicateReplaceable(func(rk ReplaceableKey, ts Timestamp) bool { + latest, _ := seenAlreadyLatest.Load(rk) + if ts > latest { + seenAlreadyLatest.Store(rk, ts) + return false // just stored the most recent + } + return true // already had one that was more recent + })) + + for _, url := range urls { + go func(nm string) { + defer wg.Done() + + if mh := pool.queryMiddleware; mh != nil { + if filter.Kinds != nil && filter.Authors != nil { + for _, kind := range filter.Kinds { + for _, author := range filter.Authors { + mh(nm, author, kind) + } + } + } + } + + relay, err := pool.EnsureRelay(nm) + if err != nil { + debugLogf("error connecting to %s with %v: %s", nm, filter, err) + return + } + + hasAuthed := false + + subscribe: + sub, err := relay.Subscribe(ctx, Filters{filter}, opts...) + if err != nil { + debugLogf("error subscribing to %s with %v: %s", relay, filter, err) + return + } + + for { + select { + case <-ctx.Done(): + return + case <-sub.EndOfStoredEvents: + return + case reason := <-sub.ClosedReason: + if strings.HasPrefix(reason, "auth-required:") && pool.authHandler != nil && !hasAuthed { + // relay is requesting auth. if we can we will perform auth and try again + err := relay.Auth(ctx, func(event *Event) error { + return pool.authHandler(ctx, RelayEvent{Event: event, Relay: relay}) + }) + if err == nil { + hasAuthed = true // so we don't keep doing AUTH again and again + goto subscribe + } + } + debugLogf("CLOSED from %s: '%s'\n", nm, reason) + return + case evt, more := <-sub.Events: + if !more { + return + } + + ie := RelayEvent{Event: evt, Relay: relay} + if mh := pool.eventMiddleware; mh != nil { + mh(ie) + } + + results.Store(ReplaceableKey{evt.PubKey, evt.Tags.GetD()}, evt) + } + } + }(NormalizeURL(url)) + } + + // this will happen when all subscriptions get an eose (or when they die) + wg.Wait() + cancel(errors.New("all subscriptions ended")) + + return results +} + func (pool *SimplePool) subMany( ctx context.Context, urls []string, diff --git a/relay.go b/relay.go index e38c190..04f3568 100644 --- a/relay.go +++ b/relay.go @@ -232,13 +232,18 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error // as we skip handling duplicate events subid := extractSubID(message) sub, ok := r.Subscriptions.Load(subIdToSerial(subid)) - if ok && sub.checkDuplicate != nil { - if sub.checkDuplicate(extractEventID(message[10+len(subid):]), r.URL) { - continue - } - } else if sub.checkDuplicateReplaceable != nil { - if sub.checkDuplicateReplaceable(extractDTag(message), extractTimestamp(message)) { - continue + if ok { + if sub.checkDuplicate != nil { + if sub.checkDuplicate(extractEventID(message[10+len(subid):]), r.URL) { + continue + } + } else if sub.checkDuplicateReplaceable != nil { + if sub.checkDuplicateReplaceable( + ReplaceableKey{extractEventPubKey(message), extractDTag(message)}, + extractTimestamp(message), + ) { + continue + } } } diff --git a/subscription.go b/subscription.go index 5c8de42..298dd39 100644 --- a/subscription.go +++ b/subscription.go @@ -39,7 +39,7 @@ type Subscription struct { // if it is not nil, checkDuplicateReplaceable will be called for every event received // if it returns true that event will not be processed further. - checkDuplicateReplaceable func(d string, ts Timestamp) bool + checkDuplicateReplaceable func(rk ReplaceableKey, ts Timestamp) bool match func(*Event) bool // this will be either Filters.Match or Filters.MatchIgnoringTimestampConstraints live atomic.Bool @@ -68,7 +68,7 @@ type WithCheckDuplicate func(id, relay string) bool func (_ WithCheckDuplicate) IsSubscriptionOption() {} // WithCheckDuplicateReplaceable sets checkDuplicateReplaceable on the subscription -type WithCheckDuplicateReplaceable func(d string, ts Timestamp) bool +type WithCheckDuplicateReplaceable func(rk ReplaceableKey, ts Timestamp) bool func (_ WithCheckDuplicateReplaceable) IsSubscriptionOption() {} diff --git a/tags.go b/tags.go index 0e11297..5818361 100644 --- a/tags.go +++ b/tags.go @@ -56,7 +56,7 @@ type Tags []Tag // GetD gets the first "d" tag (for parameterized replaceable events) value or "" func (tags Tags) GetD() string { for _, v := range tags { - if v.StartsWith([]string{"d", ""}) { + if len(v) >= 2 && v[0] == "d" { return v[1] } }