take subscription options in pool.SubMany*

This commit is contained in:
fiatjaf
2024-09-24 12:05:22 -03:00
parent 39f7a99894
commit 1b786ab213

77
pool.go
View File

@@ -173,16 +173,32 @@ func (pool *SimplePool) EnsureRelay(url string) (*Relay, error) {
// SubMany opens a subscription with the given filters to multiple relays // SubMany opens a subscription with the given filters to multiple relays
// the subscriptions only end when the context is canceled // the subscriptions only end when the context is canceled
func (pool *SimplePool) SubMany(ctx context.Context, urls []string, filters Filters) chan RelayEvent { func (pool *SimplePool) SubMany(
return pool.subMany(ctx, urls, filters, true) ctx context.Context,
urls []string,
filters Filters,
opts ...SubscriptionOption,
) chan RelayEvent {
return pool.subMany(ctx, urls, filters, true, opts)
} }
// SubManyNonUnique is like SubMany, but returns duplicate events if they come from different relays // SubManyNonUnique is like SubMany, but returns duplicate events if they come from different relays
func (pool *SimplePool) SubManyNonUnique(ctx context.Context, urls []string, filters Filters) chan RelayEvent { func (pool *SimplePool) SubManyNonUnique(
return pool.subMany(ctx, urls, filters, false) ctx context.Context,
urls []string,
filters Filters,
opts ...SubscriptionOption,
) chan RelayEvent {
return pool.subMany(ctx, urls, filters, false, opts)
} }
func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filters, unique bool) chan RelayEvent { func (pool *SimplePool) subMany(
ctx context.Context,
urls []string,
filters Filters,
unique bool,
opts []SubscriptionOption,
) chan RelayEvent {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
_ = cancel // do this so `go vet` will stop complaining _ = cancel // do this so `go vet` will stop complaining
events := make(chan RelayEvent) events := make(chan RelayEvent)
@@ -228,7 +244,7 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
hasAuthed = false hasAuthed = false
subscribe: subscribe:
sub, err = relay.Subscribe(ctx, filters) sub, err = relay.Subscribe(ctx, filters, opts...)
if err != nil { if err != nil {
goto reconnect goto reconnect
} }
@@ -313,16 +329,32 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
} }
// SubManyEose is like SubMany, but it stops subscriptions and closes the channel when gets a EOSE // SubManyEose is like SubMany, but it stops subscriptions and closes the channel when gets a EOSE
func (pool *SimplePool) SubManyEose(ctx context.Context, urls []string, filters Filters) chan RelayEvent { func (pool *SimplePool) SubManyEose(
return pool.subManyEose(ctx, urls, filters, true) ctx context.Context,
urls []string,
filters Filters,
opts ...SubscriptionOption,
) chan RelayEvent {
return pool.subManyEose(ctx, urls, filters, true, opts)
} }
// SubManyEoseNonUnique is like SubManyEose, but returns duplicate events if they come from different relays // SubManyEoseNonUnique is like SubManyEose, but returns duplicate events if they come from different relays
func (pool *SimplePool) SubManyEoseNonUnique(ctx context.Context, urls []string, filters Filters) chan RelayEvent { func (pool *SimplePool) SubManyEoseNonUnique(
return pool.subManyEose(ctx, urls, filters, false) ctx context.Context,
urls []string,
filters Filters,
opts ...SubscriptionOption,
) chan RelayEvent {
return pool.subManyEose(ctx, urls, filters, false, opts)
} }
func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters Filters, unique bool) chan RelayEvent { func (pool *SimplePool) subManyEose(
ctx context.Context,
urls []string,
filters Filters,
unique bool,
opts []SubscriptionOption,
) chan RelayEvent {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
events := make(chan RelayEvent) events := make(chan RelayEvent)
@@ -349,7 +381,7 @@ func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters
hasAuthed := false hasAuthed := false
subscribe: subscribe:
sub, err := relay.Subscribe(ctx, filters) sub, err := relay.Subscribe(ctx, filters, opts...)
if sub == nil { if sub == nil {
debugLogf("error subscribing to %s with %v: %s", relay, filters, err) debugLogf("error subscribing to %s with %v: %s", relay, filters, err)
return return
@@ -416,13 +448,14 @@ func (pool *SimplePool) QuerySingle(ctx context.Context, urls []string, filter F
func (pool *SimplePool) batchedSubMany( func (pool *SimplePool) batchedSubMany(
ctx context.Context, ctx context.Context,
dfs []DirectedFilters, dfs []DirectedFilters,
subFn func(context.Context, []string, Filters, bool) chan RelayEvent, subFn func(context.Context, []string, Filters, bool, []SubscriptionOption) chan RelayEvent,
opts []SubscriptionOption,
) chan RelayEvent { ) chan RelayEvent {
res := make(chan RelayEvent) res := make(chan RelayEvent)
for _, df := range dfs { for _, df := range dfs {
go func(df DirectedFilters) { go func(df DirectedFilters) {
for ie := range subFn(ctx, []string{df.Relay}, df.Filters, true) { for ie := range subFn(ctx, []string{df.Relay}, df.Filters, true, opts) {
res <- ie res <- ie
} }
}(df) }(df)
@@ -432,11 +465,19 @@ func (pool *SimplePool) batchedSubMany(
} }
// BatchedSubMany fires subscriptions only to specific relays, but batches them when they are the same. // BatchedSubMany fires subscriptions only to specific relays, but batches them when they are the same.
func (pool *SimplePool) BatchedSubMany(ctx context.Context, dfs []DirectedFilters) chan RelayEvent { func (pool *SimplePool) BatchedSubMany(
return pool.batchedSubMany(ctx, dfs, pool.subMany) ctx context.Context,
dfs []DirectedFilters,
opts ...SubscriptionOption,
) chan RelayEvent {
return pool.batchedSubMany(ctx, dfs, pool.subMany, opts)
} }
// BatchedSubManyEose is like BatchedSubMany, but ends upon receiving EOSE from all relays. // BatchedSubManyEose is like BatchedSubMany, but ends upon receiving EOSE from all relays.
func (pool *SimplePool) BatchedSubManyEose(ctx context.Context, dfs []DirectedFilters) chan RelayEvent { func (pool *SimplePool) BatchedSubManyEose(
return pool.batchedSubMany(ctx, dfs, pool.subManyEose) ctx context.Context,
dfs []DirectedFilters,
opts ...SubscriptionOption,
) chan RelayEvent {
return pool.batchedSubMany(ctx, dfs, pool.subManyEose, opts)
} }