pool: pre-update seenAlready atomically so there is no risk of the same event being parsed and dispatched twice racily.

This commit is contained in:
fiatjaf
2025-04-06 09:40:19 -03:00
parent 198dc6ebda
commit bba332aea6

44
pool.go
View File

@@ -329,12 +329,15 @@ func (pool *SimplePool) FetchManyReplaceable(
seenAlreadyLatest := xsync.NewMapOf[ReplaceableKey, Timestamp]() seenAlreadyLatest := xsync.NewMapOf[ReplaceableKey, Timestamp]()
opts = append(opts, WithCheckDuplicateReplaceable(func(rk ReplaceableKey, ts Timestamp) bool { opts = append(opts, WithCheckDuplicateReplaceable(func(rk ReplaceableKey, ts Timestamp) bool {
latest, _ := seenAlreadyLatest.Load(rk) updated := false
if ts > latest { seenAlreadyLatest.Compute(rk, func(latest Timestamp, _ bool) (newValue Timestamp, delete bool) {
seenAlreadyLatest.Store(rk, ts) if ts > latest {
return false // just stored the most recent updated = true // we are updating the most recent
} return ts, false
return true // already had one that was more recent }
return latest, false // the one we had was already more recent
})
return updated
})) }))
for _, url := range urls { for _, url := range urls {
@@ -538,8 +541,6 @@ func (pool *SimplePool) subMany(
mh(ie) mh(ie)
} }
seenAlready.Store(evt.ID, evt.CreatedAt)
select { select {
case events <- ie: case events <- ie:
case <-ctx.Done(): case <-ctx.Done():
@@ -593,14 +594,16 @@ func (pool *SimplePool) SubManyEose(
filters Filters, filters Filters,
opts ...SubscriptionOption, opts ...SubscriptionOption,
) chan RelayEvent { ) chan RelayEvent {
seenAlready := xsync.NewMapOf[string, bool]() seenAlready := xsync.NewMapOf[string, struct{}]()
return pool.subManyEoseNonOverwriteCheckDuplicate(ctx, urls, filters, WithCheckDuplicate(func(id, relay string) bool { return pool.subManyEoseNonOverwriteCheckDuplicate(ctx, urls, filters,
_, exists := seenAlready.Load(id) WithCheckDuplicate(func(id, relay string) bool {
if exists && pool.duplicateMiddleware != nil { _, exists := seenAlready.LoadOrStore(id, struct{}{})
pool.duplicateMiddleware(relay, id) if exists && pool.duplicateMiddleware != nil {
} pool.duplicateMiddleware(relay, id)
return exists }
}), seenAlready, opts...) return exists
}),
opts...)
} }
func (pool *SimplePool) subManyEoseNonOverwriteCheckDuplicate( func (pool *SimplePool) subManyEoseNonOverwriteCheckDuplicate(
@@ -608,7 +611,6 @@ func (pool *SimplePool) subManyEoseNonOverwriteCheckDuplicate(
urls []string, urls []string,
filters Filters, filters Filters,
wcd WithCheckDuplicate, wcd WithCheckDuplicate,
seenAlready *xsync.MapOf[string, bool],
opts ...SubscriptionOption, opts ...SubscriptionOption,
) chan RelayEvent { ) chan RelayEvent {
ctx, cancel := context.WithCancelCause(ctx) ctx, cancel := context.WithCancelCause(ctx)
@@ -686,8 +688,6 @@ func (pool *SimplePool) subManyEoseNonOverwriteCheckDuplicate(
mh(ie) mh(ie)
} }
seenAlready.Store(evt.ID, true)
select { select {
case events <- ie: case events <- ie:
case <-ctx.Done(): case <-ctx.Done():
@@ -759,7 +759,7 @@ func (pool *SimplePool) BatchedSubManyEose(
res := make(chan RelayEvent) res := make(chan RelayEvent)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(len(dfs)) wg.Add(len(dfs))
seenAlready := xsync.NewMapOf[string, bool]() seenAlready := xsync.NewMapOf[string, struct{}]()
for _, df := range dfs { for _, df := range dfs {
go func(df DirectedFilter) { go func(df DirectedFilter) {
@@ -767,12 +767,12 @@ func (pool *SimplePool) BatchedSubManyEose(
[]string{df.Relay}, []string{df.Relay},
Filters{df.Filter}, Filters{df.Filter},
WithCheckDuplicate(func(id, relay string) bool { WithCheckDuplicate(func(id, relay string) bool {
_, exists := seenAlready.Load(id) _, exists := seenAlready.LoadOrStore(id, struct{}{})
if exists && pool.duplicateMiddleware != nil { if exists && pool.duplicateMiddleware != nil {
pool.duplicateMiddleware(relay, id) pool.duplicateMiddleware(relay, id)
} }
return exists return exists
}), seenAlready, opts..., }), opts...,
) { ) {
select { select {
case res <- ie: case res <- ie: