From e1971d12c05e8a8e1c8abf929568685ba8997b98 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Wed, 15 Jan 2025 14:26:00 -0300 Subject: [PATCH] WithCheckDuplicate(), let's see if this works. --- pool.go | 14 ++++++-------- relay.go | 6 ++++-- sdk/specific_event.go | 25 +------------------------ subscription.go | 12 ++++++++++-- 4 files changed, 21 insertions(+), 36 deletions(-) diff --git a/pool.go b/pool.go index 93b8f71..b40a37d 100644 --- a/pool.go +++ b/pool.go @@ -287,15 +287,14 @@ func (pool *SimplePool) SubMany( hasAuthed = false subscribe: - sub = relay.PrepareSubscription(ctx, filters, opts...) - sub.CheckDuplicate = func(id, relay string) bool { + sub, err = relay.Subscribe(ctx, filters, append(opts, WithCheckDuplicate(func(relay, id string) bool { _, exists := seenAlready.Load(id) if exists && pool.duplicateMiddleware != nil { pool.duplicateMiddleware(relay, id) } return exists - } - if err := sub.Fire(); err != nil { + }))...) + if err != nil { goto reconnect } @@ -418,15 +417,14 @@ func (pool *SimplePool) SubManyEose( hasAuthed := false subscribe: - sub := relay.PrepareSubscription(ctx, filters, opts...) - sub.CheckDuplicate = func(id, relay string) bool { + sub, err := relay.Subscribe(ctx, filters, append(opts, WithCheckDuplicate(func(relay, id string) bool { _, exists := seenAlready.Load(id) if exists && pool.duplicateMiddleware != nil { pool.duplicateMiddleware(relay, id) } return exists - } - if err := sub.Fire(); err != nil { + }))...) + if err != nil { debugLogf("error subscribing to %s with %v: %s", relay, filters, err) return } diff --git a/relay.go b/relay.go index c3db83e..f5518d6 100644 --- a/relay.go +++ b/relay.go @@ -225,8 +225,8 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error // as we skip handling duplicate events subid := extractSubID(message) subscription, ok := r.Subscriptions.Load(subIdToSerial(subid)) - if ok && subscription.CheckDuplicate != nil { - if !subscription.CheckDuplicate(extractEventID(message[10+len(subid):]), r.URL) { + if ok && subscription.checkDuplicate != nil { + if !subscription.checkDuplicate(extractEventID(message[10+len(subid):]), r.URL) { continue } } @@ -426,6 +426,8 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filters Filters, opts . switch o := opt.(type) { case WithLabel: label = string(o) + case WithCheckDuplicate: + sub.checkDuplicate = o } } diff --git a/sdk/specific_event.go b/sdk/specific_event.go index fac6674..4c8caa2 100644 --- a/sdk/specific_event.go +++ b/sdk/specific_event.go @@ -5,7 +5,6 @@ import ( "fmt" "slices" "sync" - "time" "github.com/nbd-wtf/go-nostr" "github.com/nbd-wtf/go-nostr/nip19" @@ -125,30 +124,8 @@ attempts: // actually fetch the event here countdown := 6.0 subManyCtx := ctx - subMany := sys.Pool.SubManyEose - if attempt.slowWithRelays { - subMany = sys.Pool.SubManyEoseNonUnique - } - if attempt.slowWithRelays { - // keep track of where we have actually found the event so we can show that - var cancel context.CancelFunc - subManyCtx, cancel = context.WithTimeout(ctx, time.Second*6) - defer cancel() - - go func() { - for { - time.Sleep(100 * time.Millisecond) - if countdown <= 0 { - cancel() - break - } - countdown -= 0.1 - } - }() - } - - for ie := range subMany( + for ie := range sys.Pool.SubManyEose( subManyCtx, attempt.relays, nostr.Filters{filter}, diff --git a/subscription.go b/subscription.go index e481d6e..202e155 100644 --- a/subscription.go +++ b/subscription.go @@ -33,7 +33,7 @@ type Subscription struct { // if it is not nil, CheckDuplicate will be called for every event received // if it returns true that event will not be processed further. - CheckDuplicate func(id string, relay string) bool + checkDuplicate func(id string, relay string) bool match func(*Event) bool // this will be either Filters.Match or Filters.MatchIgnoringTimestampConstraints live atomic.Bool @@ -62,7 +62,15 @@ type WithLabel string func (_ WithLabel) IsSubscriptionOption() {} -var _ SubscriptionOption = (WithLabel)("") +// WithCheckDuplicate sets checkDuplicate on the subscription +type WithCheckDuplicate func(relay, id string) bool + +func (_ WithCheckDuplicate) IsSubscriptionOption() {} + +var ( + _ SubscriptionOption = (WithLabel)("") + _ SubscriptionOption = (WithCheckDuplicate)(nil) +) func (sub *Subscription) start() { <-sub.Context.Done()