diff --git a/pool.go b/pool.go new file mode 100644 index 0000000..8de7142 --- /dev/null +++ b/pool.go @@ -0,0 +1,123 @@ +package nostr + +import ( + "context" + "sync" + + syncmap "github.com/SaveTheRbtz/generic-sync-map-go" +) + +type SimplePool struct { + Relays map[string]*Relay + + mutex sync.Mutex +} + +func NewSimplePool() *SimplePool { + return &SimplePool{ + Relays: make(map[string]*Relay), + } +} + +func (pool *SimplePool) EnsureRelay(url string) *Relay { + nm := NormalizeURL(url) + + pool.mutex.Lock() + defer pool.mutex.Unlock() + + relay, ok := pool.Relays[nm] + if ok { + // already connected, unlock and return + return relay + } else { + var err error + // when connecting to a relay we want the connection to persist forever if possible, so use a new context + relay, err = RelayConnect(context.Background(), nm) + if err != nil { + return nil + } + + pool.Relays[nm] = relay + return relay + } +} + +// SubMany opens a subscription with the given filters to multiple relays +// the subscriptions only end when the context is canceled +func (pool *SimplePool) SubMany( + ctx context.Context, + urls []string, + filters Filters, +) chan *Event { + uniqueEvents := make(chan *Event) + seenAlready := syncmap.MapOf[string, struct{}]{} + + for _, url := range urls { + go func(nm string) { + sub, _ := pool.EnsureRelay(nm).Subscribe(ctx, filters) + if sub == nil { + return + } + + for evt := range sub.Events { + // dispatch unique events to client + if _, ok := seenAlready.LoadOrStore(evt.ID, struct{}{}); !ok { + uniqueEvents <- evt + } + } + }(NormalizeURL(url)) + } + + return uniqueEvents +} + +// SubManyEose is like SubMany, but it stops subscriptions and closes the channel when gets a EOSE +func (pool *SimplePool) SubManyEose( + ctx context.Context, + urls []string, + filters Filters, +) chan *Event { + ctx, cancel := context.WithCancel(ctx) + + uniqueEvents := make(chan *Event) + seenAlready := syncmap.MapOf[string, struct{}]{} + wg := sync.WaitGroup{} + wg.Add(len(urls)) + + go func() { + // this will happen when all subscriptions get an eose (or when they die) + wg.Wait() + cancel() + close(uniqueEvents) + }() + + for _, url := range urls { + go func(nm string) { + sub, _ := pool.EnsureRelay(nm).Subscribe(ctx, filters) + if sub == nil { + wg.Done() + return + } + + defer wg.Done() + + for { + select { + case <-sub.EndOfStoredEvents: + return + case evt, more := <-sub.Events: + if !more { + return + } + + // dispatch unique events to client + if _, ok := seenAlready.LoadOrStore(evt.ID, struct{}{}); !ok { + uniqueEvents <- evt + } + } + } + }(NormalizeURL(url)) + } + + return uniqueEvents +}