From 98a2a39521a0c7d3c9698e5e95a89cec018c21f0 Mon Sep 17 00:00:00 2001
From: fiatjaf <fiatjaf@gmail.com>
Date: Fri, 5 May 2023 19:05:11 -0300
Subject: [PATCH] add SimplePool.

---
 pool.go | 123 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 123 insertions(+)
 create mode 100644 pool.go

diff --git a/pool.go b/pool.go
new file mode 100644
index 0000000..8de7142
--- /dev/null
+++ b/pool.go
@@ -0,0 +1,123 @@
+package nostr
+
+import (
+	"context"
+	"sync"
+
+	syncmap "github.com/SaveTheRbtz/generic-sync-map-go"
+)
+
+type SimplePool struct {
+	Relays map[string]*Relay
+
+	mutex sync.Mutex
+}
+
+func NewSimplePool() *SimplePool {
+	return &SimplePool{
+		Relays: make(map[string]*Relay),
+	}
+}
+
+func (pool *SimplePool) EnsureRelay(url string) *Relay {
+	nm := NormalizeURL(url)
+
+	pool.mutex.Lock()
+	defer pool.mutex.Unlock()
+
+	relay, ok := pool.Relays[nm]
+	if ok {
+		// already connected, unlock and return
+		return relay
+	} else {
+		var err error
+		// when connecting to a relay we want the connection to persist forever if possible, so use a new context
+		relay, err = RelayConnect(context.Background(), nm)
+		if err != nil {
+			return nil
+		}
+
+		pool.Relays[nm] = relay
+		return relay
+	}
+}
+
+// SubMany opens a subscription with the given filters to multiple relays
+// the subscriptions only end when the context is canceled
+func (pool *SimplePool) SubMany(
+	ctx context.Context,
+	urls []string,
+	filters Filters,
+) chan *Event {
+	uniqueEvents := make(chan *Event)
+	seenAlready := syncmap.MapOf[string, struct{}]{}
+
+	for _, url := range urls {
+		go func(nm string) {
+			sub, _ := pool.EnsureRelay(nm).Subscribe(ctx, filters)
+			if sub == nil {
+				return
+			}
+
+			for evt := range sub.Events {
+				// dispatch unique events to client
+				if _, ok := seenAlready.LoadOrStore(evt.ID, struct{}{}); !ok {
+					uniqueEvents <- evt
+				}
+			}
+		}(NormalizeURL(url))
+	}
+
+	return uniqueEvents
+}
+
+// SubManyEose is like SubMany, but it stops subscriptions and closes the channel when gets a EOSE
+func (pool *SimplePool) SubManyEose(
+	ctx context.Context,
+	urls []string,
+	filters Filters,
+) chan *Event {
+	ctx, cancel := context.WithCancel(ctx)
+
+	uniqueEvents := make(chan *Event)
+	seenAlready := syncmap.MapOf[string, struct{}]{}
+	wg := sync.WaitGroup{}
+	wg.Add(len(urls))
+
+	go func() {
+		// this will happen when all subscriptions get an eose (or when they die)
+		wg.Wait()
+		cancel()
+		close(uniqueEvents)
+	}()
+
+	for _, url := range urls {
+		go func(nm string) {
+			sub, _ := pool.EnsureRelay(nm).Subscribe(ctx, filters)
+			if sub == nil {
+				wg.Done()
+				return
+			}
+
+			defer wg.Done()
+
+			for {
+				select {
+				case <-sub.EndOfStoredEvents:
+					return
+				case evt, more := <-sub.Events:
+					if !more {
+						return
+					}
+
+					// dispatch unique events to client
+					if _, ok := seenAlready.LoadOrStore(evt.ID, struct{}{}); !ok {
+						uniqueEvents <- evt
+					}
+				}
+			}
+		}(NormalizeURL(url))
+	}
+
+	return uniqueEvents
+}