DirectedFilters to accept a list of filters.

This commit is contained in:
fiatjaf
2024-02-24 18:51:56 -03:00
parent 28b34794f4
commit e6ef78c509

20
pool.go
View File

@@ -24,8 +24,8 @@ type SimplePool struct {
cancel context.CancelFunc cancel context.CancelFunc
} }
type DirectedFilter struct { type DirectedFilters struct {
Filter Filters
Relay string Relay string
} }
@@ -317,23 +317,25 @@ func (pool *SimplePool) QuerySingle(ctx context.Context, urls []string, filter F
func (pool *SimplePool) batchedSubMany( func (pool *SimplePool) batchedSubMany(
ctx context.Context, ctx context.Context,
dfs []DirectedFilter, dfs []DirectedFilters,
subFn func(context.Context, []string, Filters, bool) chan IncomingEvent, subFn func(context.Context, []string, Filters, bool) chan IncomingEvent,
) chan IncomingEvent { ) chan IncomingEvent {
type batch struct { type batch struct {
filter Filter filters Filters
relays []string relays []string
} }
batches := make([]batch, 0, len(dfs)) batches := make([]batch, 0, len(dfs))
for _, df := range dfs { for _, df := range dfs {
idx := slices.IndexFunc(batches, func(b batch) bool { return FilterEqual(b.filter, df.Filter) }) idx := slices.IndexFunc(batches, func(b batch) bool {
return slices.EqualFunc(b.filters, df.Filters, FilterEqual)
})
if idx != -1 { if idx != -1 {
batches[idx].relays = append(batches[idx].relays, df.Relay) batches[idx].relays = append(batches[idx].relays, df.Relay)
} else { } else {
relays := make([]string, 0, 10) relays := make([]string, 0, 10)
relays = append(relays, df.Relay) relays = append(relays, df.Relay)
batches = append(batches, batch{filter: df.Filter, relays: relays}) batches = append(batches, batch{filters: df.Filters, relays: relays})
} }
} }
@@ -341,7 +343,7 @@ func (pool *SimplePool) batchedSubMany(
for _, b := range batches { for _, b := range batches {
go func(b batch) { go func(b batch) {
for ie := range subFn(ctx, b.relays, Filters{b.filter}, true) { for ie := range subFn(ctx, b.relays, b.filters, true) {
res <- ie res <- ie
} }
}(b) }(b)
@@ -351,11 +353,11 @@ func (pool *SimplePool) batchedSubMany(
} }
// BatchedSubMany fires subscriptions only to specific relays, but batches them when they are the same. // BatchedSubMany fires subscriptions only to specific relays, but batches them when they are the same.
func (pool *SimplePool) BatchedSubMany(ctx context.Context, dfs []DirectedFilter) chan IncomingEvent { func (pool *SimplePool) BatchedSubMany(ctx context.Context, dfs []DirectedFilters) chan IncomingEvent {
return pool.batchedSubMany(ctx, dfs, pool.subMany) return pool.batchedSubMany(ctx, dfs, pool.subMany)
} }
// BatchedSubManyEose is like BatchedSubMany, but ends upon receiving EOSE from all relays. // BatchedSubManyEose is like BatchedSubMany, but ends upon receiving EOSE from all relays.
func (pool *SimplePool) BatchedSubManyEose(ctx context.Context, dfs []DirectedFilter) chan IncomingEvent { func (pool *SimplePool) BatchedSubManyEose(ctx context.Context, dfs []DirectedFilters) chan IncomingEvent {
return pool.batchedSubMany(ctx, dfs, pool.subManyEose) return pool.batchedSubMany(ctx, dfs, pool.subManyEose)
} }