use @mmalmi json string preparsing speedup for duplicate events

- get rid of "nonUnique" variants of subMany as we can now relay on CheckDuplicate to track relays.
- add global pool tracker duplicateMiddleware.
This commit is contained in:
fiatjaf 2025-01-15 00:12:44 -03:00
parent faa4fabffe
commit 795f9516ae
5 changed files with 106 additions and 68 deletions

View File

@ -1,6 +1,7 @@
package nostr package nostr
import ( import (
"bytes"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -117,3 +118,36 @@ func isLowerHex(thing string) bool {
} }
return true return true
} }
func extractSubID(jsonStr []byte) string {
// look for "EVENT" pattern
start := bytes.Index(jsonStr, []byte(`"EVENT"`))
if start == -1 {
return ""
}
// move to the next quote
offset := bytes.Index(jsonStr[start+7:], []byte{'"'})
start += 7 + offset + 1
// find the ending quote
end := bytes.Index(jsonStr[start:], []byte{'"'})
// get the contents
return string(jsonStr[start : start+end])
}
func extractEventID(jsonStr []byte) string {
// look for "id": pattern
start := bytes.Index(jsonStr, []byte(`"id":`))
if start == -1 {
return ""
}
// move to the next quote
offset := bytes.Index(jsonStr[start+4:], []byte{'"'})
start += 4 + offset + 1
// get 64 characters of the id
return string(jsonStr[start : start+64])
}

File diff suppressed because one or more lines are too long

99
pool.go
View File

@ -26,8 +26,9 @@ type SimplePool struct {
authHandler func(context.Context, RelayEvent) error authHandler func(context.Context, RelayEvent) error
cancel context.CancelFunc cancel context.CancelFunc
eventMiddleware func(RelayEvent) eventMiddleware func(RelayEvent)
queryMiddleware func(relay string, pubkey string, kind int) duplicateMiddleware func(relay string, id string)
queryMiddleware func(relay string, pubkey string, kind int)
// custom things not often used // custom things not often used
penaltyBoxMu sync.Mutex penaltyBoxMu sync.Mutex
@ -133,6 +134,13 @@ func (h WithEventMiddleware) ApplyPoolOption(pool *SimplePool) {
pool.eventMiddleware = h pool.eventMiddleware = h
} }
// WithDuplicateMiddleware is a function that will be called with all duplicate ids received.
type WithDuplicateMiddleware func(relay string, id string)
func (h WithDuplicateMiddleware) ApplyPoolOption(pool *SimplePool) {
pool.duplicateMiddleware = h
}
// WithQueryMiddleware is a function that will be called with every combination of relay+pubkey+kind queried // WithQueryMiddleware is a function that will be called with every combination of relay+pubkey+kind queried
// in a .SubMany*() call -- when applicable (i.e. when the query contains a pubkey and a kind). // in a .SubMany*() call -- when applicable (i.e. when the query contains a pubkey and a kind).
type WithAuthorKindQueryMiddleware func(relay string, pubkey string, kind int) type WithAuthorKindQueryMiddleware func(relay string, pubkey string, kind int)
@ -221,26 +229,6 @@ func (pool *SimplePool) SubMany(
urls []string, urls []string,
filters Filters, filters Filters,
opts ...SubscriptionOption, opts ...SubscriptionOption,
) chan RelayEvent {
return pool.subMany(ctx, urls, filters, true, opts)
}
// SubManyNonUnique is like SubMany, but returns duplicate events if they come from different relays
func (pool *SimplePool) SubManyNonUnique(
ctx context.Context,
urls []string,
filters Filters,
opts ...SubscriptionOption,
) chan RelayEvent {
return pool.subMany(ctx, urls, filters, false, opts)
}
func (pool *SimplePool) subMany(
ctx context.Context,
urls []string,
filters Filters,
unique bool,
opts []SubscriptionOption,
) chan RelayEvent { ) chan RelayEvent {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
_ = cancel // do this so `go vet` will stop complaining _ = cancel // do this so `go vet` will stop complaining
@ -299,8 +287,15 @@ func (pool *SimplePool) subMany(
hasAuthed = false hasAuthed = false
subscribe: subscribe:
sub, err = relay.Subscribe(ctx, filters, opts...) sub = relay.PrepareSubscription(ctx, filters, opts...)
if err != nil { sub.CheckDuplicate = func(id, relay string) bool {
_, exists := seenAlready.Load(id)
if exists && pool.duplicateMiddleware != nil {
pool.duplicateMiddleware(relay, id)
}
return exists
}
if err := sub.Fire(); err != nil {
goto reconnect goto reconnect
} }
@ -331,11 +326,7 @@ func (pool *SimplePool) subMany(
mh(ie) mh(ie)
} }
if unique { seenAlready.Store(evt.ID, evt.CreatedAt)
if _, seen := seenAlready.LoadOrStore(evt.ID, evt.CreatedAt); seen {
continue
}
}
select { select {
case events <- ie: case events <- ie:
@ -345,12 +336,11 @@ func (pool *SimplePool) subMany(
case <-ticker.C: case <-ticker.C:
if eose { if eose {
old := Timestamp(time.Now().Add(-seenAlreadyDropTick).Unix()) old := Timestamp(time.Now().Add(-seenAlreadyDropTick).Unix())
seenAlready.Range(func(id string, value Timestamp) bool { for id, value := range seenAlready.Range {
if value < old { if value < old {
seenAlready.Delete(id) seenAlready.Delete(id)
} }
return true }
})
} }
case reason := <-sub.ClosedReason: case reason := <-sub.ClosedReason:
if strings.HasPrefix(reason, "auth-required:") && pool.authHandler != nil && !hasAuthed { if strings.HasPrefix(reason, "auth-required:") && pool.authHandler != nil && !hasAuthed {
@ -389,26 +379,6 @@ func (pool *SimplePool) SubManyEose(
urls []string, urls []string,
filters Filters, filters Filters,
opts ...SubscriptionOption, opts ...SubscriptionOption,
) chan RelayEvent {
return pool.subManyEose(ctx, urls, filters, true, opts)
}
// SubManyEoseNonUnique is like SubManyEose, but returns duplicate events if they come from different relays
func (pool *SimplePool) SubManyEoseNonUnique(
ctx context.Context,
urls []string,
filters Filters,
opts ...SubscriptionOption,
) chan RelayEvent {
return pool.subManyEose(ctx, urls, filters, false, opts)
}
func (pool *SimplePool) subManyEose(
ctx context.Context,
urls []string,
filters Filters,
unique bool,
opts []SubscriptionOption,
) chan RelayEvent { ) chan RelayEvent {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
@ -448,8 +418,15 @@ func (pool *SimplePool) subManyEose(
hasAuthed := false hasAuthed := false
subscribe: subscribe:
sub, err := relay.Subscribe(ctx, filters, opts...) sub := relay.PrepareSubscription(ctx, filters, opts...)
if sub == nil { sub.CheckDuplicate = func(id, relay string) bool {
_, exists := seenAlready.Load(id)
if exists && pool.duplicateMiddleware != nil {
pool.duplicateMiddleware(relay, id)
}
return exists
}
if err := sub.Fire(); err != nil {
debugLogf("error subscribing to %s with %v: %s", relay, filters, err) debugLogf("error subscribing to %s with %v: %s", relay, filters, err)
return return
} }
@ -483,11 +460,7 @@ func (pool *SimplePool) subManyEose(
mh(ie) mh(ie)
} }
if unique { seenAlready.Store(evt.ID, true)
if _, seen := seenAlready.LoadOrStore(evt.ID, true); seen {
continue
}
}
select { select {
case events <- ie: case events <- ie:
@ -548,14 +521,14 @@ func (pool *SimplePool) QuerySingle(ctx context.Context, urls []string, filter F
func (pool *SimplePool) batchedSubMany( func (pool *SimplePool) batchedSubMany(
ctx context.Context, ctx context.Context,
dfs []DirectedFilters, dfs []DirectedFilters,
subFn func(context.Context, []string, Filters, bool, []SubscriptionOption) chan RelayEvent, subFn func(context.Context, []string, Filters, ...SubscriptionOption) chan RelayEvent,
opts []SubscriptionOption, opts []SubscriptionOption,
) chan RelayEvent { ) chan RelayEvent {
res := make(chan RelayEvent) res := make(chan RelayEvent)
for _, df := range dfs { for _, df := range dfs {
go func(df DirectedFilters) { go func(df DirectedFilters) {
for ie := range subFn(ctx, []string{df.Relay}, df.Filters, true, opts) { for ie := range subFn(ctx, []string{df.Relay}, df.Filters, opts...) {
res <- ie res <- ie
} }
}(df) }(df)
@ -570,7 +543,7 @@ func (pool *SimplePool) BatchedSubMany(
dfs []DirectedFilters, dfs []DirectedFilters,
opts ...SubscriptionOption, opts ...SubscriptionOption,
) chan RelayEvent { ) chan RelayEvent {
return pool.batchedSubMany(ctx, dfs, pool.subMany, opts) return pool.batchedSubMany(ctx, dfs, pool.SubMany, opts)
} }
// BatchedSubManyEose is like BatchedSubMany, but ends upon receiving EOSE from all relays. // BatchedSubManyEose is like BatchedSubMany, but ends upon receiving EOSE from all relays.
@ -579,5 +552,5 @@ func (pool *SimplePool) BatchedSubManyEose(
dfs []DirectedFilters, dfs []DirectedFilters,
opts ...SubscriptionOption, opts ...SubscriptionOption,
) chan RelayEvent { ) chan RelayEvent {
return pool.batchedSubMany(ctx, dfs, pool.subManyEose, opts) return pool.batchedSubMany(ctx, dfs, pool.SubManyEose, opts)
} }

View File

@ -220,6 +220,17 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error
message := buf.Bytes() message := buf.Bytes()
debugLogf("{%s} received %v\n", r.URL, message) debugLogf("{%s} received %v\n", r.URL, message)
// if this is an "EVENT" we will have this preparser logic that should speed things up a little
// as we skip handling duplicate events
subid := extractSubID(message)
subscription, ok := r.Subscriptions.Load(subIdToSerial(subid))
if ok && subscription.CheckDuplicate != nil {
if !subscription.CheckDuplicate(extractEventID(message[10+len(subid):]), r.URL) {
continue
}
}
envelope := ParseMessage(message) envelope := ParseMessage(message)
if envelope == nil { if envelope == nil {
if r.customHandler != nil { if r.customHandler != nil {
@ -242,11 +253,8 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error
} }
r.challenge = *env.Challenge r.challenge = *env.Challenge
case *EventEnvelope: case *EventEnvelope:
if env.SubscriptionID == nil { // we already have the subscription from the pre-check above, so we can just reuse it
continue if subscription == nil {
}
if subscription, ok := r.Subscriptions.Load(subIdToSerial(*env.SubscriptionID)); !ok {
// InfoLogger.Printf("{%s} no subscription with id '%s'\n", r.URL, *env.SubscriptionID) // InfoLogger.Printf("{%s} no subscription with id '%s'\n", r.URL, *env.SubscriptionID)
continue continue
} else { } else {

View File

@ -31,6 +31,10 @@ type Subscription struct {
// Context will be .Done() when the subscription ends // Context will be .Done() when the subscription ends
Context context.Context Context context.Context
// if it is not nil, CheckDuplicate will be called for every event received
// if it returns true that event will not be processed further.
CheckDuplicate func(id string, relay string) bool
match func(*Event) bool // this will be either Filters.Match or Filters.MatchIgnoringTimestampConstraints match func(*Event) bool // this will be either Filters.Match or Filters.MatchIgnoringTimestampConstraints
live atomic.Bool live atomic.Bool
eosed atomic.Bool eosed atomic.Bool