2023-05-05 19:05:11 -03:00
|
|
|
package nostr
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2025-01-17 13:44:50 -03:00
|
|
|
"errors"
|
2023-05-09 00:08:06 -03:00
|
|
|
"fmt"
|
2024-07-29 14:58:25 -03:00
|
|
|
"math"
|
2025-01-03 01:15:12 -03:00
|
|
|
"net/http"
|
2024-02-24 18:44:37 -03:00
|
|
|
"slices"
|
2023-12-07 21:37:41 -03:00
|
|
|
"strings"
|
2023-05-05 19:05:11 -03:00
|
|
|
"sync"
|
2025-02-16 17:36:05 -03:00
|
|
|
"sync/atomic"
|
2023-05-30 17:44:25 -03:00
|
|
|
"time"
|
2023-05-05 19:05:11 -03:00
|
|
|
|
2024-11-16 16:59:24 -03:00
|
|
|
"github.com/nbd-wtf/go-nostr/nip45/hyperloglog"
|
2024-01-10 06:54:52 -03:00
|
|
|
"github.com/puzpuzpuz/xsync/v3"
|
2023-05-05 19:05:11 -03:00
|
|
|
)
|
|
|
|
|
2023-11-24 17:24:36 +09:00
|
|
|
const (
|
|
|
|
seenAlreadyDropTick = time.Minute
|
|
|
|
)
|
|
|
|
|
2025-03-04 11:08:31 -03:00
|
|
|
// SimplePool manages connections to multiple relays, ensures they are reopened when necessary and not duplicated.
|
2023-05-05 19:05:11 -03:00
|
|
|
type SimplePool struct {
|
2023-11-16 14:51:33 -03:00
|
|
|
Relays *xsync.MapOf[string, *Relay]
|
2023-05-06 14:32:39 -03:00
|
|
|
Context context.Context
|
2023-05-05 19:05:11 -03:00
|
|
|
|
2024-09-19 12:28:42 -03:00
|
|
|
authHandler func(context.Context, RelayEvent) error
|
2025-01-17 13:44:50 -03:00
|
|
|
cancel context.CancelCauseFunc
|
2024-05-29 17:08:15 -03:00
|
|
|
|
2025-01-15 00:12:44 -03:00
|
|
|
eventMiddleware func(RelayEvent)
|
|
|
|
duplicateMiddleware func(relay string, id string)
|
|
|
|
queryMiddleware func(relay string, pubkey string, kind int)
|
2024-06-27 16:49:50 -03:00
|
|
|
|
2024-05-29 17:08:15 -03:00
|
|
|
// custom things not often used
|
2024-09-27 17:35:43 -03:00
|
|
|
penaltyBoxMu sync.Mutex
|
|
|
|
penaltyBox map[string][2]float64
|
2025-01-03 01:15:12 -03:00
|
|
|
relayOptions []RelayOption
|
2023-05-05 19:05:11 -03:00
|
|
|
}
|
|
|
|
|
2025-03-04 11:08:31 -03:00
|
|
|
// DirectedFilter combines a Filter with a specific relay URL.
|
2025-01-17 13:44:50 -03:00
|
|
|
type DirectedFilter struct {
|
|
|
|
Filter
|
2024-02-24 18:44:37 -03:00
|
|
|
Relay string
|
|
|
|
}
|
|
|
|
|
2025-03-04 11:08:31 -03:00
|
|
|
// RelayEvent represents an event received from a specific relay.
|
2024-09-19 12:28:42 -03:00
|
|
|
type RelayEvent struct {
|
2023-09-30 19:16:30 -03:00
|
|
|
*Event
|
|
|
|
Relay *Relay
|
|
|
|
}
|
|
|
|
|
2025-02-07 18:01:59 -03:00
|
|
|
func (ie RelayEvent) String() string { return fmt.Sprintf("[%s] >> %s", ie.Relay.URL, ie.Event) }
|
2024-05-22 08:49:46 -03:00
|
|
|
|
2025-03-04 11:08:31 -03:00
|
|
|
// PoolOption is an interface for options that can be applied to a SimplePool.
|
2023-12-07 21:37:41 -03:00
|
|
|
type PoolOption interface {
|
2024-06-27 16:49:50 -03:00
|
|
|
ApplyPoolOption(*SimplePool)
|
2023-12-07 21:37:41 -03:00
|
|
|
}
|
|
|
|
|
2025-03-04 11:08:31 -03:00
|
|
|
// NewSimplePool creates a new SimplePool with the given context and options.
|
2023-12-07 21:37:41 -03:00
|
|
|
func NewSimplePool(ctx context.Context, opts ...PoolOption) *SimplePool {
|
2025-01-17 13:44:50 -03:00
|
|
|
ctx, cancel := context.WithCancelCause(ctx)
|
2023-05-06 14:32:39 -03:00
|
|
|
|
2023-12-07 21:37:41 -03:00
|
|
|
pool := &SimplePool{
|
2024-01-10 06:54:52 -03:00
|
|
|
Relays: xsync.NewMapOf[string, *Relay](),
|
2023-05-06 14:32:39 -03:00
|
|
|
|
|
|
|
Context: ctx,
|
|
|
|
cancel: cancel,
|
2023-05-05 19:05:11 -03:00
|
|
|
}
|
2023-12-07 21:37:41 -03:00
|
|
|
|
|
|
|
for _, opt := range opts {
|
2024-06-27 16:49:50 -03:00
|
|
|
opt.ApplyPoolOption(pool)
|
2023-12-07 21:37:41 -03:00
|
|
|
}
|
|
|
|
|
|
|
|
return pool
|
|
|
|
}
|
|
|
|
|
2025-01-03 01:15:12 -03:00
|
|
|
// WithRelayOptions sets options that will be used on every relay instance created by this pool.
|
|
|
|
func WithRelayOptions(ropts ...RelayOption) withRelayOptionsOpt {
|
|
|
|
return ropts
|
|
|
|
}
|
|
|
|
|
|
|
|
type withRelayOptionsOpt []RelayOption
|
|
|
|
|
|
|
|
func (h withRelayOptionsOpt) ApplyPoolOption(pool *SimplePool) {
|
|
|
|
pool.relayOptions = h
|
|
|
|
}
|
|
|
|
|
2023-12-07 21:37:41 -03:00
|
|
|
// WithAuthHandler must be a function that signs the auth event when called.
|
|
|
|
// it will be called whenever any relay in the pool returns a `CLOSED` message
|
|
|
|
// with the "auth-required:" prefix, only once for each relay
|
2024-09-19 12:28:42 -03:00
|
|
|
type WithAuthHandler func(ctx context.Context, authEvent RelayEvent) error
|
2023-12-07 21:37:41 -03:00
|
|
|
|
2024-06-27 16:49:50 -03:00
|
|
|
func (h WithAuthHandler) ApplyPoolOption(pool *SimplePool) {
|
2023-12-07 21:37:41 -03:00
|
|
|
pool.authHandler = h
|
2023-05-05 19:05:11 -03:00
|
|
|
}
|
|
|
|
|
2024-07-29 14:58:25 -03:00
|
|
|
// WithPenaltyBox just sets the penalty box mechanism so relays that fail to connect
|
|
|
|
// or that disconnect will be ignored for a while and we won't attempt to connect again.
|
|
|
|
func WithPenaltyBox() withPenaltyBoxOpt { return withPenaltyBoxOpt{} }
|
|
|
|
|
|
|
|
type withPenaltyBoxOpt struct{}
|
|
|
|
|
|
|
|
func (h withPenaltyBoxOpt) ApplyPoolOption(pool *SimplePool) {
|
|
|
|
pool.penaltyBox = make(map[string][2]float64)
|
|
|
|
go func() {
|
|
|
|
sleep := 30.0
|
|
|
|
for {
|
|
|
|
time.Sleep(time.Duration(sleep) * time.Second)
|
|
|
|
|
|
|
|
pool.penaltyBoxMu.Lock()
|
|
|
|
nextSleep := 300.0
|
|
|
|
for url, v := range pool.penaltyBox {
|
|
|
|
remainingSeconds := v[1]
|
|
|
|
remainingSeconds -= sleep
|
|
|
|
if remainingSeconds <= 0 {
|
|
|
|
pool.penaltyBox[url] = [2]float64{v[0], 0}
|
|
|
|
continue
|
|
|
|
} else {
|
|
|
|
pool.penaltyBox[url] = [2]float64{v[0], remainingSeconds}
|
|
|
|
}
|
|
|
|
|
|
|
|
if remainingSeconds < nextSleep {
|
|
|
|
nextSleep = remainingSeconds
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
sleep = nextSleep
|
|
|
|
pool.penaltyBoxMu.Unlock()
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
|
2024-06-27 16:49:50 -03:00
|
|
|
// WithEventMiddleware is a function that will be called with all events received.
|
2024-09-19 12:28:42 -03:00
|
|
|
type WithEventMiddleware func(RelayEvent)
|
2024-06-27 16:49:50 -03:00
|
|
|
|
|
|
|
func (h WithEventMiddleware) ApplyPoolOption(pool *SimplePool) {
|
2024-12-24 00:15:10 -03:00
|
|
|
pool.eventMiddleware = h
|
|
|
|
}
|
|
|
|
|
2025-01-15 00:12:44 -03:00
|
|
|
// WithDuplicateMiddleware is a function that will be called with all duplicate ids received.
|
|
|
|
type WithDuplicateMiddleware func(relay string, id string)
|
|
|
|
|
|
|
|
func (h WithDuplicateMiddleware) ApplyPoolOption(pool *SimplePool) {
|
|
|
|
pool.duplicateMiddleware = h
|
|
|
|
}
|
|
|
|
|
2025-03-04 11:08:31 -03:00
|
|
|
// WithAuthorKindQueryMiddleware is a function that will be called with every combination of relay+pubkey+kind queried
|
2024-12-24 00:15:10 -03:00
|
|
|
// in a .SubMany*() call -- when applicable (i.e. when the query contains a pubkey and a kind).
|
2025-01-03 00:12:33 -03:00
|
|
|
type WithAuthorKindQueryMiddleware func(relay string, pubkey string, kind int)
|
2024-12-24 00:15:10 -03:00
|
|
|
|
2025-01-03 00:12:33 -03:00
|
|
|
func (h WithAuthorKindQueryMiddleware) ApplyPoolOption(pool *SimplePool) {
|
2024-12-24 00:15:10 -03:00
|
|
|
pool.queryMiddleware = h
|
2024-06-27 16:49:50 -03:00
|
|
|
}
|
|
|
|
|
|
|
|
var (
|
|
|
|
_ PoolOption = (WithAuthHandler)(nil)
|
|
|
|
_ PoolOption = (WithEventMiddleware)(nil)
|
2024-07-29 14:58:25 -03:00
|
|
|
_ PoolOption = WithPenaltyBox()
|
2025-01-03 01:15:12 -03:00
|
|
|
_ PoolOption = WithRelayOptions(WithRequestHeader(http.Header{}))
|
2024-06-27 16:49:50 -03:00
|
|
|
)
|
2023-12-07 21:37:41 -03:00
|
|
|
|
2025-03-04 11:08:31 -03:00
|
|
|
// EnsureRelay ensures that a relay connection exists and is active.
|
|
|
|
// If the relay is not connected, it attempts to connect.
|
2023-05-09 00:08:06 -03:00
|
|
|
func (pool *SimplePool) EnsureRelay(url string) (*Relay, error) {
|
2023-05-05 19:05:11 -03:00
|
|
|
nm := NormalizeURL(url)
|
2024-05-29 08:39:24 -03:00
|
|
|
defer namedLock(nm)()
|
2023-05-05 19:05:11 -03:00
|
|
|
|
2023-11-16 14:51:33 -03:00
|
|
|
relay, ok := pool.Relays.Load(nm)
|
2024-07-29 14:58:25 -03:00
|
|
|
if ok && relay == nil {
|
|
|
|
if pool.penaltyBox != nil {
|
|
|
|
pool.penaltyBoxMu.Lock()
|
|
|
|
defer pool.penaltyBoxMu.Unlock()
|
|
|
|
v, _ := pool.penaltyBox[nm]
|
|
|
|
if v[1] > 0 {
|
|
|
|
return nil, fmt.Errorf("in penalty box, %fs remaining", v[1])
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else if ok && relay.IsConnected() {
|
2023-05-05 19:05:11 -03:00
|
|
|
// already connected, unlock and return
|
2023-05-09 00:08:06 -03:00
|
|
|
return relay, nil
|
2024-07-29 14:58:25 -03:00
|
|
|
}
|
2024-05-29 17:08:15 -03:00
|
|
|
|
2024-07-29 14:58:25 -03:00
|
|
|
// try to connect
|
|
|
|
// we use this ctx here so when the pool dies everything dies
|
2025-01-17 13:44:50 -03:00
|
|
|
ctx, cancel := context.WithTimeoutCause(
|
|
|
|
pool.Context,
|
|
|
|
time.Second*15,
|
|
|
|
errors.New("connecting to the relay took too long"),
|
|
|
|
)
|
2024-07-29 14:58:25 -03:00
|
|
|
defer cancel()
|
2023-05-05 19:05:11 -03:00
|
|
|
|
2025-01-03 01:15:12 -03:00
|
|
|
relay = NewRelay(context.Background(), url, pool.relayOptions...)
|
2024-09-26 19:18:34 -03:00
|
|
|
if err := relay.Connect(ctx); err != nil {
|
2024-07-29 14:58:25 -03:00
|
|
|
if pool.penaltyBox != nil {
|
|
|
|
// putting relay in penalty box
|
|
|
|
pool.penaltyBoxMu.Lock()
|
|
|
|
defer pool.penaltyBoxMu.Unlock()
|
|
|
|
v, _ := pool.penaltyBox[nm]
|
|
|
|
pool.penaltyBox[nm] = [2]float64{v[0] + 1, 30.0 + math.Pow(2, v[0]+1)}
|
|
|
|
}
|
|
|
|
return nil, fmt.Errorf("failed to connect: %w", err)
|
2023-05-05 19:05:11 -03:00
|
|
|
}
|
2024-07-29 14:58:25 -03:00
|
|
|
|
|
|
|
pool.Relays.Store(nm, relay)
|
|
|
|
return relay, nil
|
2023-05-05 19:05:11 -03:00
|
|
|
}
|
|
|
|
|
2025-03-04 11:08:31 -03:00
|
|
|
// PublishResult represents the result of publishing an event to a relay.
|
2024-10-18 07:08:12 -03:00
|
|
|
type PublishResult struct {
|
|
|
|
Error error
|
|
|
|
RelayURL string
|
|
|
|
Relay *Relay
|
|
|
|
}
|
|
|
|
|
2025-03-04 11:08:31 -03:00
|
|
|
// PublishMany publishes an event to multiple relays and returns a channel of results emitted as they're received.
|
2024-10-18 07:08:12 -03:00
|
|
|
func (pool *SimplePool) PublishMany(ctx context.Context, urls []string, evt Event) chan PublishResult {
|
|
|
|
ch := make(chan PublishResult, len(urls))
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
for _, url := range urls {
|
|
|
|
relay, err := pool.EnsureRelay(url)
|
|
|
|
if err != nil {
|
|
|
|
ch <- PublishResult{err, url, nil}
|
|
|
|
} else {
|
|
|
|
err = relay.Publish(ctx, evt)
|
|
|
|
ch <- PublishResult{err, url, relay}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
close(ch)
|
|
|
|
}()
|
|
|
|
|
|
|
|
return ch
|
|
|
|
}
|
|
|
|
|
2025-02-12 15:50:08 -03:00
|
|
|
// SubscribeMany opens a subscription with the given filter to multiple relays
|
|
|
|
// the subscriptions ends when the context is canceled or when all relays return a CLOSED.
|
|
|
|
func (pool *SimplePool) SubscribeMany(
|
|
|
|
ctx context.Context,
|
|
|
|
urls []string,
|
|
|
|
filter Filter,
|
|
|
|
opts ...SubscriptionOption,
|
|
|
|
) chan RelayEvent {
|
|
|
|
return pool.subMany(ctx, urls, Filters{filter}, nil, opts...)
|
|
|
|
}
|
|
|
|
|
|
|
|
// FetchMany opens a subscription, much like SubscribeMany, but it ends as soon as all Relays
|
|
|
|
// return an EOSE message.
|
|
|
|
func (pool *SimplePool) FetchMany(
|
|
|
|
ctx context.Context,
|
|
|
|
urls []string,
|
|
|
|
filter Filter,
|
|
|
|
opts ...SubscriptionOption,
|
|
|
|
) chan RelayEvent {
|
|
|
|
return pool.SubManyEose(ctx, urls, Filters{filter}, opts...)
|
|
|
|
}
|
|
|
|
|
2025-03-04 11:08:31 -03:00
|
|
|
// Deprecated: SubMany is deprecated: use SubscribeMany instead.
|
2024-09-24 12:05:22 -03:00
|
|
|
func (pool *SimplePool) SubMany(
|
|
|
|
ctx context.Context,
|
|
|
|
urls []string,
|
|
|
|
filters Filters,
|
|
|
|
opts ...SubscriptionOption,
|
2025-01-28 16:12:53 -03:00
|
|
|
) chan RelayEvent {
|
|
|
|
return pool.subMany(ctx, urls, filters, nil, opts...)
|
|
|
|
}
|
|
|
|
|
2025-02-12 15:50:08 -03:00
|
|
|
// SubscribeManyNotifyEOSE is like SubscribeMany, but takes a channel that is closed when
|
2025-01-28 16:12:53 -03:00
|
|
|
// all subscriptions have received an EOSE
|
2025-02-12 15:50:08 -03:00
|
|
|
func (pool *SimplePool) SubscribeManyNotifyEOSE(
|
2025-01-28 16:12:53 -03:00
|
|
|
ctx context.Context,
|
|
|
|
urls []string,
|
2025-02-12 15:50:08 -03:00
|
|
|
filter Filter,
|
2025-01-28 16:12:53 -03:00
|
|
|
eoseChan chan struct{},
|
|
|
|
opts ...SubscriptionOption,
|
|
|
|
) chan RelayEvent {
|
2025-02-12 15:50:08 -03:00
|
|
|
return pool.subMany(ctx, urls, Filters{filter}, eoseChan, opts...)
|
2025-01-28 16:12:53 -03:00
|
|
|
}
|
|
|
|
|
2025-03-12 00:46:43 -03:00
|
|
|
type ReplaceableKey struct {
|
|
|
|
PubKey string
|
|
|
|
D string
|
|
|
|
}
|
|
|
|
|
|
|
|
// FetchManyReplaceable is like FetchMany, but deduplicates replaceable and addressable events and returns
|
|
|
|
// only the latest for each "d" tag.
|
|
|
|
func (pool *SimplePool) FetchManyReplaceable(
|
|
|
|
ctx context.Context,
|
|
|
|
urls []string,
|
|
|
|
filter Filter,
|
|
|
|
opts ...SubscriptionOption,
|
|
|
|
) *xsync.MapOf[ReplaceableKey, *Event] {
|
|
|
|
ctx, cancel := context.WithCancelCause(ctx)
|
|
|
|
|
|
|
|
results := xsync.NewMapOf[ReplaceableKey, *Event]()
|
|
|
|
|
|
|
|
wg := sync.WaitGroup{}
|
|
|
|
wg.Add(len(urls))
|
|
|
|
|
|
|
|
seenAlreadyLatest := xsync.NewMapOf[ReplaceableKey, Timestamp]()
|
|
|
|
opts = append(opts, WithCheckDuplicateReplaceable(func(rk ReplaceableKey, ts Timestamp) bool {
|
|
|
|
latest, _ := seenAlreadyLatest.Load(rk)
|
|
|
|
if ts > latest {
|
|
|
|
seenAlreadyLatest.Store(rk, ts)
|
|
|
|
return false // just stored the most recent
|
|
|
|
}
|
|
|
|
return true // already had one that was more recent
|
|
|
|
}))
|
|
|
|
|
|
|
|
for _, url := range urls {
|
|
|
|
go func(nm string) {
|
|
|
|
defer wg.Done()
|
|
|
|
|
|
|
|
if mh := pool.queryMiddleware; mh != nil {
|
|
|
|
if filter.Kinds != nil && filter.Authors != nil {
|
|
|
|
for _, kind := range filter.Kinds {
|
|
|
|
for _, author := range filter.Authors {
|
|
|
|
mh(nm, author, kind)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
relay, err := pool.EnsureRelay(nm)
|
|
|
|
if err != nil {
|
|
|
|
debugLogf("error connecting to %s with %v: %s", nm, filter, err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
hasAuthed := false
|
|
|
|
|
|
|
|
subscribe:
|
|
|
|
sub, err := relay.Subscribe(ctx, Filters{filter}, opts...)
|
|
|
|
if err != nil {
|
|
|
|
debugLogf("error subscribing to %s with %v: %s", relay, filter, err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
|
|
|
case <-sub.EndOfStoredEvents:
|
|
|
|
return
|
|
|
|
case reason := <-sub.ClosedReason:
|
|
|
|
if strings.HasPrefix(reason, "auth-required:") && pool.authHandler != nil && !hasAuthed {
|
|
|
|
// relay is requesting auth. if we can we will perform auth and try again
|
|
|
|
err := relay.Auth(ctx, func(event *Event) error {
|
|
|
|
return pool.authHandler(ctx, RelayEvent{Event: event, Relay: relay})
|
|
|
|
})
|
|
|
|
if err == nil {
|
|
|
|
hasAuthed = true // so we don't keep doing AUTH again and again
|
|
|
|
goto subscribe
|
|
|
|
}
|
|
|
|
}
|
|
|
|
debugLogf("CLOSED from %s: '%s'\n", nm, reason)
|
|
|
|
return
|
|
|
|
case evt, more := <-sub.Events:
|
|
|
|
if !more {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
ie := RelayEvent{Event: evt, Relay: relay}
|
|
|
|
if mh := pool.eventMiddleware; mh != nil {
|
|
|
|
mh(ie)
|
|
|
|
}
|
|
|
|
|
|
|
|
results.Store(ReplaceableKey{evt.PubKey, evt.Tags.GetD()}, evt)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}(NormalizeURL(url))
|
|
|
|
}
|
|
|
|
|
|
|
|
// this will happen when all subscriptions get an eose (or when they die)
|
|
|
|
wg.Wait()
|
|
|
|
cancel(errors.New("all subscriptions ended"))
|
|
|
|
|
|
|
|
return results
|
|
|
|
}
|
|
|
|
|
2025-01-28 16:12:53 -03:00
|
|
|
func (pool *SimplePool) subMany(
|
|
|
|
ctx context.Context,
|
|
|
|
urls []string,
|
|
|
|
filters Filters,
|
|
|
|
eoseChan chan struct{},
|
|
|
|
opts ...SubscriptionOption,
|
2024-09-24 12:05:22 -03:00
|
|
|
) chan RelayEvent {
|
2025-02-12 14:32:22 -03:00
|
|
|
ctx, cancel := context.WithCancelCause(ctx)
|
2023-12-01 20:49:45 -03:00
|
|
|
_ = cancel // do this so `go vet` will stop complaining
|
2024-09-19 12:28:42 -03:00
|
|
|
events := make(chan RelayEvent)
|
2024-01-10 06:54:52 -03:00
|
|
|
seenAlready := xsync.NewMapOf[string, Timestamp]()
|
2023-11-24 17:24:36 +09:00
|
|
|
ticker := time.NewTicker(seenAlreadyDropTick)
|
2023-12-15 00:27:56 +09:00
|
|
|
|
2025-01-28 16:12:53 -03:00
|
|
|
eoseWg := sync.WaitGroup{}
|
|
|
|
eoseWg.Add(len(urls))
|
|
|
|
if eoseChan != nil {
|
|
|
|
go func() {
|
|
|
|
eoseWg.Wait()
|
|
|
|
close(eoseChan)
|
|
|
|
}()
|
|
|
|
}
|
2023-05-05 19:05:11 -03:00
|
|
|
|
2023-09-16 07:51:22 -03:00
|
|
|
pending := xsync.NewCounter()
|
2023-12-01 20:49:45 -03:00
|
|
|
pending.Add(int64(len(urls)))
|
2024-01-18 11:49:16 -03:00
|
|
|
for i, url := range urls {
|
|
|
|
url = NormalizeURL(url)
|
|
|
|
urls[i] = url
|
|
|
|
if idx := slices.Index(urls, url); idx != i {
|
|
|
|
// skip duplicate relays in the list
|
2025-01-28 16:12:53 -03:00
|
|
|
eoseWg.Done()
|
2024-01-18 11:49:16 -03:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2025-02-16 17:36:05 -03:00
|
|
|
eosed := atomic.Bool{}
|
2025-01-29 22:12:42 -03:00
|
|
|
firstConnection := true
|
|
|
|
|
2023-05-05 19:05:11 -03:00
|
|
|
go func(nm string) {
|
2023-12-01 20:49:45 -03:00
|
|
|
defer func() {
|
|
|
|
pending.Dec()
|
|
|
|
if pending.Value() == 0 {
|
|
|
|
close(events)
|
2025-02-12 14:32:22 -03:00
|
|
|
cancel(fmt.Errorf("aborted: %w", context.Cause(ctx)))
|
2023-05-05 19:05:11 -03:00
|
|
|
}
|
2025-02-16 18:45:09 -03:00
|
|
|
if eosed.CompareAndSwap(false, true) {
|
2025-01-29 22:12:42 -03:00
|
|
|
eoseWg.Done()
|
|
|
|
}
|
2023-12-01 20:49:45 -03:00
|
|
|
}()
|
2023-10-03 15:43:18 -03:00
|
|
|
|
2023-12-07 21:37:41 -03:00
|
|
|
hasAuthed := false
|
2023-12-02 15:12:45 -03:00
|
|
|
interval := 3 * time.Second
|
2023-12-01 20:49:45 -03:00
|
|
|
for {
|
2023-10-03 15:43:18 -03:00
|
|
|
select {
|
2023-12-02 19:14:54 +09:00
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
2023-12-02 14:59:16 -03:00
|
|
|
var sub *Subscription
|
|
|
|
|
2024-12-24 00:15:10 -03:00
|
|
|
if mh := pool.queryMiddleware; mh != nil {
|
|
|
|
for _, filter := range filters {
|
|
|
|
if filter.Kinds != nil && filter.Authors != nil {
|
|
|
|
for _, kind := range filter.Kinds {
|
|
|
|
for _, author := range filter.Authors {
|
|
|
|
mh(nm, author, kind)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-12-02 19:14:54 +09:00
|
|
|
relay, err := pool.EnsureRelay(nm)
|
|
|
|
if err != nil {
|
2025-01-29 22:12:42 -03:00
|
|
|
// if we never connected to this just fail
|
|
|
|
if firstConnection {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// otherwise (if we were connected and got disconnected) keep trying to reconnect
|
2025-02-03 10:34:43 -03:00
|
|
|
debugLogf("%s reconnecting because connection failed\n", nm)
|
2023-12-02 14:59:16 -03:00
|
|
|
goto reconnect
|
2023-12-02 19:14:54 +09:00
|
|
|
}
|
2025-01-29 22:12:42 -03:00
|
|
|
firstConnection = false
|
2023-12-07 21:37:41 -03:00
|
|
|
hasAuthed = false
|
2023-12-02 19:14:54 +09:00
|
|
|
|
2023-12-07 21:37:41 -03:00
|
|
|
subscribe:
|
2025-01-16 17:38:03 -03:00
|
|
|
sub, err = relay.Subscribe(ctx, filters, append(opts, WithCheckDuplicate(func(id, relay string) bool {
|
2025-01-15 00:12:44 -03:00
|
|
|
_, exists := seenAlready.Load(id)
|
|
|
|
if exists && pool.duplicateMiddleware != nil {
|
|
|
|
pool.duplicateMiddleware(relay, id)
|
|
|
|
}
|
|
|
|
return exists
|
2025-01-15 14:26:00 -03:00
|
|
|
}))...)
|
|
|
|
if err != nil {
|
2025-02-03 10:34:43 -03:00
|
|
|
debugLogf("%s reconnecting because subscription died\n", nm)
|
2023-12-02 14:59:16 -03:00
|
|
|
goto reconnect
|
2023-12-02 19:14:54 +09:00
|
|
|
}
|
|
|
|
|
2023-12-15 00:27:56 +09:00
|
|
|
go func() {
|
|
|
|
<-sub.EndOfStoredEvents
|
2025-01-28 16:12:53 -03:00
|
|
|
|
|
|
|
// guard here otherwise a resubscription will trigger a duplicate call to eoseWg.Done()
|
2025-02-16 17:36:05 -03:00
|
|
|
if eosed.CompareAndSwap(false, true) {
|
2025-01-28 16:12:53 -03:00
|
|
|
eoseWg.Done()
|
|
|
|
}
|
2023-12-15 00:27:56 +09:00
|
|
|
}()
|
|
|
|
|
2023-12-02 15:12:45 -03:00
|
|
|
// reset interval when we get a good subscription
|
|
|
|
interval = 3 * time.Second
|
|
|
|
|
2023-12-02 19:14:54 +09:00
|
|
|
for {
|
2023-12-01 20:49:45 -03:00
|
|
|
select {
|
2023-12-02 19:14:54 +09:00
|
|
|
case evt, more := <-sub.Events:
|
|
|
|
if !more {
|
2023-12-02 15:12:45 -03:00
|
|
|
// this means the connection was closed for weird reasons, like the server shut down
|
|
|
|
// so we will update the filters here to include only events seem from now on
|
|
|
|
// and try to reconnect until we succeed
|
|
|
|
now := Now()
|
|
|
|
for i := range filters {
|
|
|
|
filters[i].Since = &now
|
|
|
|
}
|
2025-02-03 10:34:43 -03:00
|
|
|
debugLogf("%s reconnecting because sub.Events is broken\n", nm)
|
2023-12-02 14:59:16 -03:00
|
|
|
goto reconnect
|
2023-12-02 19:14:54 +09:00
|
|
|
}
|
2024-06-27 16:49:50 -03:00
|
|
|
|
2024-09-19 12:28:42 -03:00
|
|
|
ie := RelayEvent{Event: evt, Relay: relay}
|
2024-12-24 00:15:10 -03:00
|
|
|
if mh := pool.eventMiddleware; mh != nil {
|
2024-06-27 16:49:50 -03:00
|
|
|
mh(ie)
|
|
|
|
}
|
|
|
|
|
2025-01-15 00:12:44 -03:00
|
|
|
seenAlready.Store(evt.ID, evt.CreatedAt)
|
2024-06-27 16:49:50 -03:00
|
|
|
|
2023-12-02 19:14:54 +09:00
|
|
|
select {
|
2024-06-27 16:49:50 -03:00
|
|
|
case events <- ie:
|
2023-12-02 19:14:54 +09:00
|
|
|
case <-ctx.Done():
|
2024-09-11 21:05:34 -03:00
|
|
|
return
|
2023-12-02 19:14:54 +09:00
|
|
|
}
|
|
|
|
case <-ticker.C:
|
2025-02-16 17:36:05 -03:00
|
|
|
if eosed.Load() {
|
2023-12-02 19:14:54 +09:00
|
|
|
old := Timestamp(time.Now().Add(-seenAlreadyDropTick).Unix())
|
2025-01-15 00:12:44 -03:00
|
|
|
for id, value := range seenAlready.Range {
|
2023-12-02 19:14:54 +09:00
|
|
|
if value < old {
|
2023-12-02 14:37:32 -03:00
|
|
|
seenAlready.Delete(id)
|
2023-12-02 19:14:54 +09:00
|
|
|
}
|
2025-01-15 00:12:44 -03:00
|
|
|
}
|
2023-12-02 19:14:54 +09:00
|
|
|
}
|
|
|
|
case reason := <-sub.ClosedReason:
|
2023-12-07 21:37:41 -03:00
|
|
|
if strings.HasPrefix(reason, "auth-required:") && pool.authHandler != nil && !hasAuthed {
|
|
|
|
// relay is requesting auth. if we can we will perform auth and try again
|
2024-09-19 12:28:42 -03:00
|
|
|
err := relay.Auth(ctx, func(event *Event) error {
|
|
|
|
return pool.authHandler(ctx, RelayEvent{Event: event, Relay: relay})
|
|
|
|
})
|
|
|
|
if err == nil {
|
2023-12-07 21:37:41 -03:00
|
|
|
hasAuthed = true // so we don't keep doing AUTH again and again
|
|
|
|
goto subscribe
|
|
|
|
}
|
|
|
|
} else {
|
2025-02-03 10:34:43 -03:00
|
|
|
debugLogf("CLOSED from %s: '%s'\n", nm, reason)
|
2023-12-07 21:37:41 -03:00
|
|
|
}
|
2025-01-28 16:12:53 -03:00
|
|
|
|
2023-12-02 19:14:54 +09:00
|
|
|
return
|
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
2023-11-24 17:24:36 +09:00
|
|
|
}
|
2023-10-03 15:43:18 -03:00
|
|
|
}
|
2023-12-02 14:59:16 -03:00
|
|
|
|
|
|
|
reconnect:
|
2023-12-02 15:12:45 -03:00
|
|
|
// we will go back to the beginning of the loop and try to connect again and again
|
|
|
|
// until the context is canceled
|
|
|
|
time.Sleep(interval)
|
|
|
|
interval = interval * 17 / 10 // the next time we try we will wait longer
|
2023-05-05 19:05:11 -03:00
|
|
|
}
|
2024-01-18 11:49:16 -03:00
|
|
|
}(url)
|
2023-05-05 19:05:11 -03:00
|
|
|
}
|
|
|
|
|
2023-10-02 14:16:16 -03:00
|
|
|
return events
|
2023-05-05 19:05:11 -03:00
|
|
|
}
|
|
|
|
|
2025-03-04 11:08:31 -03:00
|
|
|
// Deprecated: SubManyEose is deprecated: use FetchMany instead.
|
2024-09-24 12:05:22 -03:00
|
|
|
func (pool *SimplePool) SubManyEose(
|
|
|
|
ctx context.Context,
|
|
|
|
urls []string,
|
|
|
|
filters Filters,
|
|
|
|
opts ...SubscriptionOption,
|
|
|
|
) chan RelayEvent {
|
2025-01-17 13:44:50 -03:00
|
|
|
seenAlready := xsync.NewMapOf[string, bool]()
|
|
|
|
return pool.subManyEoseNonOverwriteCheckDuplicate(ctx, urls, filters, WithCheckDuplicate(func(id, relay string) bool {
|
|
|
|
_, exists := seenAlready.Load(id)
|
|
|
|
if exists && pool.duplicateMiddleware != nil {
|
|
|
|
pool.duplicateMiddleware(relay, id)
|
|
|
|
}
|
|
|
|
return exists
|
|
|
|
}), seenAlready, opts...)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (pool *SimplePool) subManyEoseNonOverwriteCheckDuplicate(
|
|
|
|
ctx context.Context,
|
|
|
|
urls []string,
|
|
|
|
filters Filters,
|
|
|
|
wcd WithCheckDuplicate,
|
|
|
|
seenAlready *xsync.MapOf[string, bool],
|
|
|
|
opts ...SubscriptionOption,
|
|
|
|
) chan RelayEvent {
|
|
|
|
ctx, cancel := context.WithCancelCause(ctx)
|
2023-05-05 19:05:11 -03:00
|
|
|
|
2024-09-19 12:28:42 -03:00
|
|
|
events := make(chan RelayEvent)
|
2023-05-05 19:05:11 -03:00
|
|
|
wg := sync.WaitGroup{}
|
|
|
|
wg.Add(len(urls))
|
|
|
|
|
2025-01-17 13:44:50 -03:00
|
|
|
opts = append(opts, wcd)
|
|
|
|
|
2023-05-05 19:05:11 -03:00
|
|
|
go func() {
|
|
|
|
// this will happen when all subscriptions get an eose (or when they die)
|
|
|
|
wg.Wait()
|
2025-01-17 13:44:50 -03:00
|
|
|
cancel(errors.New("all subscriptions ended"))
|
2023-10-02 14:16:16 -03:00
|
|
|
close(events)
|
2023-05-05 19:05:11 -03:00
|
|
|
}()
|
|
|
|
|
|
|
|
for _, url := range urls {
|
|
|
|
go func(nm string) {
|
2023-07-08 08:15:34 -03:00
|
|
|
defer wg.Done()
|
|
|
|
|
2024-12-24 00:15:10 -03:00
|
|
|
if mh := pool.queryMiddleware; mh != nil {
|
|
|
|
for _, filter := range filters {
|
|
|
|
if filter.Kinds != nil && filter.Authors != nil {
|
|
|
|
for _, kind := range filter.Kinds {
|
|
|
|
for _, author := range filter.Authors {
|
|
|
|
mh(nm, author, kind)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-05-09 00:08:06 -03:00
|
|
|
relay, err := pool.EnsureRelay(nm)
|
|
|
|
if err != nil {
|
2025-01-17 17:39:24 -03:00
|
|
|
debugLogf("error connecting to %s with %v: %s", nm, filters, err)
|
2023-05-09 00:08:06 -03:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2023-12-07 21:37:41 -03:00
|
|
|
hasAuthed := false
|
|
|
|
|
|
|
|
subscribe:
|
2025-01-17 13:44:50 -03:00
|
|
|
sub, err := relay.Subscribe(ctx, filters, opts...)
|
2025-01-15 14:26:00 -03:00
|
|
|
if err != nil {
|
2023-07-11 15:25:02 -03:00
|
|
|
debugLogf("error subscribing to %s with %v: %s", relay, filters, err)
|
2023-05-05 19:05:11 -03:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
2023-07-08 08:15:34 -03:00
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
2023-05-05 19:05:11 -03:00
|
|
|
case <-sub.EndOfStoredEvents:
|
|
|
|
return
|
2023-12-01 13:37:24 -03:00
|
|
|
case reason := <-sub.ClosedReason:
|
2023-12-07 21:37:41 -03:00
|
|
|
if strings.HasPrefix(reason, "auth-required:") && pool.authHandler != nil && !hasAuthed {
|
|
|
|
// relay is requesting auth. if we can we will perform auth and try again
|
2024-09-19 12:28:42 -03:00
|
|
|
err := relay.Auth(ctx, func(event *Event) error {
|
|
|
|
return pool.authHandler(ctx, RelayEvent{Event: event, Relay: relay})
|
|
|
|
})
|
2023-12-09 13:21:35 -03:00
|
|
|
if err == nil {
|
2023-12-07 21:37:41 -03:00
|
|
|
hasAuthed = true // so we don't keep doing AUTH again and again
|
|
|
|
goto subscribe
|
|
|
|
}
|
|
|
|
}
|
2025-02-03 10:34:43 -03:00
|
|
|
debugLogf("CLOSED from %s: '%s'\n", nm, reason)
|
2023-12-01 13:37:24 -03:00
|
|
|
return
|
2023-05-05 19:05:11 -03:00
|
|
|
case evt, more := <-sub.Events:
|
|
|
|
if !more {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2024-09-19 12:28:42 -03:00
|
|
|
ie := RelayEvent{Event: evt, Relay: relay}
|
2024-12-24 00:15:10 -03:00
|
|
|
if mh := pool.eventMiddleware; mh != nil {
|
2024-06-27 16:49:50 -03:00
|
|
|
mh(ie)
|
|
|
|
}
|
|
|
|
|
2025-01-15 00:12:44 -03:00
|
|
|
seenAlready.Store(evt.ID, true)
|
2023-10-03 15:43:18 -03:00
|
|
|
|
|
|
|
select {
|
2024-06-27 16:49:50 -03:00
|
|
|
case events <- ie:
|
2023-10-03 15:43:18 -03:00
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
|
|
|
}
|
2023-05-05 19:05:11 -03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}(NormalizeURL(url))
|
|
|
|
}
|
|
|
|
|
2023-10-02 14:16:16 -03:00
|
|
|
return events
|
2023-05-05 19:05:11 -03:00
|
|
|
}
|
2023-08-06 19:57:08 -03:00
|
|
|
|
2025-03-04 11:08:31 -03:00
|
|
|
// CountMany aggregates count results from multiple relays using NIP-45 HyperLogLog
|
2024-11-16 16:59:24 -03:00
|
|
|
func (pool *SimplePool) CountMany(
|
|
|
|
ctx context.Context,
|
|
|
|
urls []string,
|
|
|
|
filter Filter,
|
|
|
|
opts []SubscriptionOption,
|
|
|
|
) int {
|
2025-01-03 00:12:33 -03:00
|
|
|
hll := hyperloglog.New(0) // offset is irrelevant here
|
2024-11-16 16:59:24 -03:00
|
|
|
|
|
|
|
wg := sync.WaitGroup{}
|
|
|
|
wg.Add(len(urls))
|
|
|
|
for _, url := range urls {
|
|
|
|
go func(nm string) {
|
|
|
|
defer wg.Done()
|
|
|
|
relay, err := pool.EnsureRelay(url)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
ce, err := relay.countInternal(ctx, Filters{filter}, opts...)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if len(ce.HyperLogLog) != 256 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
hll.MergeRegisters(ce.HyperLogLog)
|
|
|
|
}(NormalizeURL(url))
|
|
|
|
}
|
|
|
|
|
|
|
|
wg.Wait()
|
|
|
|
return int(hll.Count())
|
|
|
|
}
|
|
|
|
|
2023-08-06 19:57:08 -03:00
|
|
|
// QuerySingle returns the first event returned by the first relay, cancels everything else.
|
2025-02-07 18:02:19 -03:00
|
|
|
func (pool *SimplePool) QuerySingle(
|
|
|
|
ctx context.Context,
|
|
|
|
urls []string,
|
|
|
|
filter Filter,
|
|
|
|
opts ...SubscriptionOption,
|
|
|
|
) *RelayEvent {
|
2025-01-17 13:44:50 -03:00
|
|
|
ctx, cancel := context.WithCancelCause(ctx)
|
2025-02-07 18:02:19 -03:00
|
|
|
for ievt := range pool.SubManyEose(ctx, urls, Filters{filter}, opts...) {
|
2025-01-17 13:44:50 -03:00
|
|
|
cancel(errors.New("got the first event and ended successfully"))
|
2023-09-30 19:16:30 -03:00
|
|
|
return &ievt
|
2023-08-06 19:57:08 -03:00
|
|
|
}
|
2025-01-17 13:44:50 -03:00
|
|
|
cancel(errors.New("SubManyEose() didn't get yield events"))
|
2023-08-06 19:57:08 -03:00
|
|
|
return nil
|
|
|
|
}
|
2024-02-24 18:44:37 -03:00
|
|
|
|
2025-03-04 11:08:31 -03:00
|
|
|
// BatchedSubManyEose performs batched subscriptions to multiple relays with different filters.
|
2025-01-17 13:44:50 -03:00
|
|
|
func (pool *SimplePool) BatchedSubManyEose(
|
2024-02-24 18:44:37 -03:00
|
|
|
ctx context.Context,
|
2025-01-17 13:44:50 -03:00
|
|
|
dfs []DirectedFilter,
|
|
|
|
opts ...SubscriptionOption,
|
2024-09-19 12:28:42 -03:00
|
|
|
) chan RelayEvent {
|
|
|
|
res := make(chan RelayEvent)
|
2025-01-17 13:44:50 -03:00
|
|
|
wg := sync.WaitGroup{}
|
|
|
|
wg.Add(len(dfs))
|
|
|
|
seenAlready := xsync.NewMapOf[string, bool]()
|
2024-02-24 18:44:37 -03:00
|
|
|
|
2024-02-25 11:22:30 -03:00
|
|
|
for _, df := range dfs {
|
2025-01-17 13:44:50 -03:00
|
|
|
go func(df DirectedFilter) {
|
|
|
|
for ie := range pool.subManyEoseNonOverwriteCheckDuplicate(ctx,
|
|
|
|
[]string{df.Relay},
|
|
|
|
Filters{df.Filter},
|
|
|
|
WithCheckDuplicate(func(id, relay string) bool {
|
|
|
|
_, exists := seenAlready.Load(id)
|
|
|
|
if exists && pool.duplicateMiddleware != nil {
|
|
|
|
pool.duplicateMiddleware(relay, id)
|
|
|
|
}
|
|
|
|
return exists
|
|
|
|
}), seenAlready, opts...) {
|
2024-02-24 18:44:37 -03:00
|
|
|
res <- ie
|
|
|
|
}
|
2025-01-17 13:44:50 -03:00
|
|
|
|
|
|
|
wg.Done()
|
2024-02-25 11:22:30 -03:00
|
|
|
}(df)
|
2024-02-24 18:44:37 -03:00
|
|
|
}
|
|
|
|
|
2025-01-17 13:44:50 -03:00
|
|
|
go func() {
|
|
|
|
wg.Wait()
|
|
|
|
close(res)
|
|
|
|
}()
|
2024-02-24 18:44:37 -03:00
|
|
|
|
2025-01-17 13:44:50 -03:00
|
|
|
return res
|
2024-02-24 18:44:37 -03:00
|
|
|
}
|
|
|
|
|
2025-03-04 11:08:31 -03:00
|
|
|
// Close closes the pool with the given reason.
|
2025-01-17 13:44:50 -03:00
|
|
|
func (pool *SimplePool) Close(reason string) {
|
|
|
|
pool.cancel(fmt.Errorf("pool closed with reason: '%s'", reason))
|
2024-02-24 18:44:37 -03:00
|
|
|
}
|