diff --git a/pool.go b/pool.go index 1113d15..eb52dee 100644 --- a/pool.go +++ b/pool.go @@ -329,12 +329,15 @@ func (pool *SimplePool) FetchManyReplaceable( seenAlreadyLatest := xsync.NewMapOf[ReplaceableKey, Timestamp]() opts = append(opts, WithCheckDuplicateReplaceable(func(rk ReplaceableKey, ts Timestamp) bool { - latest, _ := seenAlreadyLatest.Load(rk) - if ts > latest { - seenAlreadyLatest.Store(rk, ts) - return false // just stored the most recent - } - return true // already had one that was more recent + updated := false + seenAlreadyLatest.Compute(rk, func(latest Timestamp, _ bool) (newValue Timestamp, delete bool) { + if ts > latest { + updated = true // we are updating the most recent + return ts, false + } + return latest, false // the one we had was already more recent + }) + return updated })) for _, url := range urls { @@ -538,8 +541,6 @@ func (pool *SimplePool) subMany( mh(ie) } - seenAlready.Store(evt.ID, evt.CreatedAt) - select { case events <- ie: case <-ctx.Done(): @@ -593,14 +594,16 @@ func (pool *SimplePool) SubManyEose( filters Filters, opts ...SubscriptionOption, ) chan RelayEvent { - seenAlready := xsync.NewMapOf[string, bool]() - return pool.subManyEoseNonOverwriteCheckDuplicate(ctx, urls, filters, WithCheckDuplicate(func(id, relay string) bool { - _, exists := seenAlready.Load(id) - if exists && pool.duplicateMiddleware != nil { - pool.duplicateMiddleware(relay, id) - } - return exists - }), seenAlready, opts...) + seenAlready := xsync.NewMapOf[string, struct{}]() + return pool.subManyEoseNonOverwriteCheckDuplicate(ctx, urls, filters, + WithCheckDuplicate(func(id, relay string) bool { + _, exists := seenAlready.LoadOrStore(id, struct{}{}) + if exists && pool.duplicateMiddleware != nil { + pool.duplicateMiddleware(relay, id) + } + return exists + }), + opts...) } func (pool *SimplePool) subManyEoseNonOverwriteCheckDuplicate( @@ -608,7 +611,6 @@ func (pool *SimplePool) subManyEoseNonOverwriteCheckDuplicate( urls []string, filters Filters, wcd WithCheckDuplicate, - seenAlready *xsync.MapOf[string, bool], opts ...SubscriptionOption, ) chan RelayEvent { ctx, cancel := context.WithCancelCause(ctx) @@ -686,8 +688,6 @@ func (pool *SimplePool) subManyEoseNonOverwriteCheckDuplicate( mh(ie) } - seenAlready.Store(evt.ID, true) - select { case events <- ie: case <-ctx.Done(): @@ -759,7 +759,7 @@ func (pool *SimplePool) BatchedSubManyEose( res := make(chan RelayEvent) wg := sync.WaitGroup{} wg.Add(len(dfs)) - seenAlready := xsync.NewMapOf[string, bool]() + seenAlready := xsync.NewMapOf[string, struct{}]() for _, df := range dfs { go func(df DirectedFilter) { @@ -767,12 +767,12 @@ func (pool *SimplePool) BatchedSubManyEose( []string{df.Relay}, Filters{df.Filter}, WithCheckDuplicate(func(id, relay string) bool { - _, exists := seenAlready.Load(id) + _, exists := seenAlready.LoadOrStore(id, struct{}{}) if exists && pool.duplicateMiddleware != nil { pool.duplicateMiddleware(relay, id) } return exists - }), seenAlready, opts..., + }), opts..., ) { select { case res <- ie: