pool's .FetchManyReplaceable() and amends to previous commit.

This commit is contained in:
fiatjaf 2025-03-12 00:46:43 -03:00
parent 441f94563f
commit cc23d81e80
6 changed files with 142 additions and 10 deletions

@ -155,6 +155,21 @@ func extractEventID(jsonStr string) string {
return jsonStr[start : start+64]
}
func extractEventPubKey(jsonStr string) string {
// look for "pubkey" pattern
start := strings.Index(jsonStr, `"pubkey"`)
if start == -1 {
return ""
}
// move to the next quote
offset := strings.IndexRune(jsonStr[start+8:], '"')
start += 8 + offset + 1
// get 64 characters of the pubkey
return jsonStr[start : start+64]
}
func extractDTag(jsonStr string) string {
// look for ["d", pattern
start := strings.Index(jsonStr, `["d"`)

@ -36,6 +36,17 @@ func TestIDExtract(t *testing.T) {
}
}
func TestPubKeyExtract(t *testing.T) {
{
data := `{"kind":1,"id":"6b5988e9471fa340880a40df815befc69c901420facfb670acd8308012088f16","pubkey":"67ada8e344532cbf82f0e702472e24c7896e0e1c96235eacbaaa4b8616052171","created_at":1736909072,"tags":[["e","cfdf18b78527455097515545be4ccbe17e9b88f64539a566c632e405e2c0d08a","","root"],["e","f1ec9c301383be082f1860f7e24e49164d855bfab67f8e5c3ed17f6f3f867cca","","reply"],["p","1afe0c74e3d7784eba93a5e3fa554a6eeb01928d12739ae8ba4832786808e36d"],["p","8aa642e26e65072139e10db59646a89aa7538a59965aab3ed89191d71967d6c3"],["p","f4d89779148ccd245c8d50914a284fd62d97cb0fb68b797a70f24a172b522db9"],["p","18905d0a5d623ab81a98ba98c582bd5f57f2506c6b808905fc599d5a0b229b08"],["p","9a0e2043afaa056a12b8bbe77ac4c3185c0e2bc46b12aac158689144323c0e3c"],["p","45f195cffcb8c9724efc248f0507a2fb65b579dfabe7cd35398598163cab7627"]],"content":"🫡","sig":"d21aaf43963b07a3cb5f85ac8809c2b2e4dd3269195f4d810e1b7650895178fe01cf685ab3ee93f193cdde1f8d17419ff05332c6e3fc7429bbbe3d70016b8638"}`
require.Equal(t, "67ada8e344532cbf82f0e702472e24c7896e0e1c96235eacbaaa4b8616052171", extractEventPubKey(data))
}
{
data := `{"kind":1,"pubkey":"67ada8e344532cbf82f0e702472e24c7896e0e1c96235eacbaaa4b8616052171","created_at":1736909072,"tags":[["e","cfdf18b78527455097515545be4ccbe17e9b88f64539a566c632e405e2c0d08a","","root"],["e","f1ec9c301383be082f1860f7e24e49164d855bfab67f8e5c3ed17f6f3f867cca","","reply"],["p","1afe0c74e3d7784eba93a5e3fa554a6eeb01928d12739ae8ba4832786808e36d"],["p","8aa642e26e65072139e10db59646a89aa7538a59965aab3ed89191d71967d6c3"],["p","f4d89779148ccd245c8d50914a284fd62d97cb0fb68b797a70f24a172b522db9"],["p","18905d0a5d623ab81a98ba98c582bd5f57f2506c6b808905fc599d5a0b229b08"],["p","9a0e2043afaa056a12b8bbe77ac4c3185c0e2bc46b12aac158689144323c0e3c"],["p","45f195cffcb8c9724efc248f0507a2fb65b579dfabe7cd35398598163cab7627"]],"content":"🫡","sig":"d21aaf43963b07a3cb5f85ac8809c2b2e4dd3269195f4d810e1b7650895178fe01cf685ab3ee93f193cdde1f8d17419ff05332c6e3fc7429bbbe3d70016b8638","id": "6b5988e9471fa340880a40df815befc69c901420facfb670acd8308012088f16" }`
require.Equal(t, "67ada8e344532cbf82f0e702472e24c7896e0e1c96235eacbaaa4b8616052171", extractEventPubKey(data))
}
}
func TestSubIdExtract(t *testing.T) {
{
data := `["EVENT", "xxz" ,{"kind":1,"id":"6b5988e9471fa340880a40df815befc69c901420facfb670acd8308012088f16","pubkey":"67ada8e344532cbf82f0e702472e24c7896e0e1c96235eacbaaa4b8616052171","created_at":1736909072,"tags":[["e","cfdf18b78527455097515545be4ccbe17e9b88f64539a566c632e405e2c0d08a","","root"],["e","f1ec9c301383be082f1860f7e24e49164d855bfab67f8e5c3ed17f6f3f867cca","","reply"],["p","1afe0c74e3d7784eba93a5e3fa554a6eeb01928d12739ae8ba4832786808e36d"],["p","8aa642e26e65072139e10db59646a89aa7538a59965aab3ed89191d71967d6c3"],["p","f4d89779148ccd245c8d50914a284fd62d97cb0fb68b797a70f24a172b522db9"],["p","18905d0a5d623ab81a98ba98c582bd5f57f2506c6b808905fc599d5a0b229b08"],["p","9a0e2043afaa056a12b8bbe77ac4c3185c0e2bc46b12aac158689144323c0e3c"],["p","45f195cffcb8c9724efc248f0507a2fb65b579dfabe7cd35398598163cab7627"]],"content":"🫡","sig":"d21aaf43963b07a3cb5f85ac8809c2b2e4dd3269195f4d810e1b7650895178fe01cf685ab3ee93f193cdde1f8d17419ff05332c6e3fc7429bbbe3d70016b8638"}]`

101
pool.go

@ -278,6 +278,107 @@ func (pool *SimplePool) SubscribeManyNotifyEOSE(
return pool.subMany(ctx, urls, Filters{filter}, eoseChan, opts...)
}
type ReplaceableKey struct {
PubKey string
D string
}
// FetchManyReplaceable is like FetchMany, but deduplicates replaceable and addressable events and returns
// only the latest for each "d" tag.
func (pool *SimplePool) FetchManyReplaceable(
ctx context.Context,
urls []string,
filter Filter,
opts ...SubscriptionOption,
) *xsync.MapOf[ReplaceableKey, *Event] {
ctx, cancel := context.WithCancelCause(ctx)
results := xsync.NewMapOf[ReplaceableKey, *Event]()
wg := sync.WaitGroup{}
wg.Add(len(urls))
seenAlreadyLatest := xsync.NewMapOf[ReplaceableKey, Timestamp]()
opts = append(opts, WithCheckDuplicateReplaceable(func(rk ReplaceableKey, ts Timestamp) bool {
latest, _ := seenAlreadyLatest.Load(rk)
if ts > latest {
seenAlreadyLatest.Store(rk, ts)
return false // just stored the most recent
}
return true // already had one that was more recent
}))
for _, url := range urls {
go func(nm string) {
defer wg.Done()
if mh := pool.queryMiddleware; mh != nil {
if filter.Kinds != nil && filter.Authors != nil {
for _, kind := range filter.Kinds {
for _, author := range filter.Authors {
mh(nm, author, kind)
}
}
}
}
relay, err := pool.EnsureRelay(nm)
if err != nil {
debugLogf("error connecting to %s with %v: %s", nm, filter, err)
return
}
hasAuthed := false
subscribe:
sub, err := relay.Subscribe(ctx, Filters{filter}, opts...)
if err != nil {
debugLogf("error subscribing to %s with %v: %s", relay, filter, err)
return
}
for {
select {
case <-ctx.Done():
return
case <-sub.EndOfStoredEvents:
return
case reason := <-sub.ClosedReason:
if strings.HasPrefix(reason, "auth-required:") && pool.authHandler != nil && !hasAuthed {
// relay is requesting auth. if we can we will perform auth and try again
err := relay.Auth(ctx, func(event *Event) error {
return pool.authHandler(ctx, RelayEvent{Event: event, Relay: relay})
})
if err == nil {
hasAuthed = true // so we don't keep doing AUTH again and again
goto subscribe
}
}
debugLogf("CLOSED from %s: '%s'\n", nm, reason)
return
case evt, more := <-sub.Events:
if !more {
return
}
ie := RelayEvent{Event: evt, Relay: relay}
if mh := pool.eventMiddleware; mh != nil {
mh(ie)
}
results.Store(ReplaceableKey{evt.PubKey, evt.Tags.GetD()}, evt)
}
}
}(NormalizeURL(url))
}
// this will happen when all subscriptions get an eose (or when they die)
wg.Wait()
cancel(errors.New("all subscriptions ended"))
return results
}
func (pool *SimplePool) subMany(
ctx context.Context,
urls []string,

@ -232,13 +232,18 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error
// as we skip handling duplicate events
subid := extractSubID(message)
sub, ok := r.Subscriptions.Load(subIdToSerial(subid))
if ok && sub.checkDuplicate != nil {
if sub.checkDuplicate(extractEventID(message[10+len(subid):]), r.URL) {
continue
}
} else if sub.checkDuplicateReplaceable != nil {
if sub.checkDuplicateReplaceable(extractDTag(message), extractTimestamp(message)) {
continue
if ok {
if sub.checkDuplicate != nil {
if sub.checkDuplicate(extractEventID(message[10+len(subid):]), r.URL) {
continue
}
} else if sub.checkDuplicateReplaceable != nil {
if sub.checkDuplicateReplaceable(
ReplaceableKey{extractEventPubKey(message), extractDTag(message)},
extractTimestamp(message),
) {
continue
}
}
}

@ -39,7 +39,7 @@ type Subscription struct {
// if it is not nil, checkDuplicateReplaceable will be called for every event received
// if it returns true that event will not be processed further.
checkDuplicateReplaceable func(d string, ts Timestamp) bool
checkDuplicateReplaceable func(rk ReplaceableKey, ts Timestamp) bool
match func(*Event) bool // this will be either Filters.Match or Filters.MatchIgnoringTimestampConstraints
live atomic.Bool
@ -68,7 +68,7 @@ type WithCheckDuplicate func(id, relay string) bool
func (_ WithCheckDuplicate) IsSubscriptionOption() {}
// WithCheckDuplicateReplaceable sets checkDuplicateReplaceable on the subscription
type WithCheckDuplicateReplaceable func(d string, ts Timestamp) bool
type WithCheckDuplicateReplaceable func(rk ReplaceableKey, ts Timestamp) bool
func (_ WithCheckDuplicateReplaceable) IsSubscriptionOption() {}

@ -56,7 +56,7 @@ type Tags []Tag
// GetD gets the first "d" tag (for parameterized replaceable events) value or ""
func (tags Tags) GetD() string {
for _, v := range tags {
if v.StartsWith([]string{"d", ""}) {
if len(v) >= 2 && v[0] == "d" {
return v[1]
}
}