improve SimplePool authHandler, rename IncomingEvent to RelayEvent so we can use it there.

This commit is contained in:
fiatjaf
2024-09-19 12:28:42 -03:00
parent c07528eb42
commit 8327310d52
3 changed files with 31 additions and 26 deletions

53
pool.go
View File

@ -21,10 +21,10 @@ type SimplePool struct {
Relays *xsync.MapOf[string, *Relay] Relays *xsync.MapOf[string, *Relay]
Context context.Context Context context.Context
authHandler func(*Event) error authHandler func(context.Context, RelayEvent) error
cancel context.CancelFunc cancel context.CancelFunc
eventMiddleware []func(IncomingEvent) eventMiddleware []func(RelayEvent)
// custom things not often used // custom things not often used
signatureChecker func(Event) bool signatureChecker func(Event) bool
@ -37,12 +37,12 @@ type DirectedFilters struct {
Relay string Relay string
} }
type IncomingEvent struct { type RelayEvent struct {
*Event *Event
Relay *Relay Relay *Relay
} }
func (ie IncomingEvent) String() string { func (ie RelayEvent) String() string {
return fmt.Sprintf("[%s] >> %s", ie.Relay.URL, ie.Event) return fmt.Sprintf("[%s] >> %s", ie.Relay.URL, ie.Event)
} }
@ -70,7 +70,7 @@ func NewSimplePool(ctx context.Context, opts ...PoolOption) *SimplePool {
// WithAuthHandler must be a function that signs the auth event when called. // WithAuthHandler must be a function that signs the auth event when called.
// it will be called whenever any relay in the pool returns a `CLOSED` message // it will be called whenever any relay in the pool returns a `CLOSED` message
// with the "auth-required:" prefix, only once for each relay // with the "auth-required:" prefix, only once for each relay
type WithAuthHandler func(authEvent *Event) error type WithAuthHandler func(ctx context.Context, authEvent RelayEvent) error
func (h WithAuthHandler) ApplyPoolOption(pool *SimplePool) { func (h WithAuthHandler) ApplyPoolOption(pool *SimplePool) {
pool.authHandler = h pool.authHandler = h
@ -114,7 +114,7 @@ func (h withPenaltyBoxOpt) ApplyPoolOption(pool *SimplePool) {
// WithEventMiddleware is a function that will be called with all events received. // WithEventMiddleware is a function that will be called with all events received.
// more than one can be passed at a time. // more than one can be passed at a time.
type WithEventMiddleware func(IncomingEvent) type WithEventMiddleware func(RelayEvent)
func (h WithEventMiddleware) ApplyPoolOption(pool *SimplePool) { func (h WithEventMiddleware) ApplyPoolOption(pool *SimplePool) {
pool.eventMiddleware = append(pool.eventMiddleware, h) pool.eventMiddleware = append(pool.eventMiddleware, h)
@ -173,19 +173,19 @@ func (pool *SimplePool) EnsureRelay(url string) (*Relay, error) {
// SubMany opens a subscription with the given filters to multiple relays // SubMany opens a subscription with the given filters to multiple relays
// the subscriptions only end when the context is canceled // the subscriptions only end when the context is canceled
func (pool *SimplePool) SubMany(ctx context.Context, urls []string, filters Filters) chan IncomingEvent { func (pool *SimplePool) SubMany(ctx context.Context, urls []string, filters Filters) chan RelayEvent {
return pool.subMany(ctx, urls, filters, true) return pool.subMany(ctx, urls, filters, true)
} }
// SubManyNonUnique is like SubMany, but returns duplicate events if they come from different relays // SubManyNonUnique is like SubMany, but returns duplicate events if they come from different relays
func (pool *SimplePool) SubManyNonUnique(ctx context.Context, urls []string, filters Filters) chan IncomingEvent { func (pool *SimplePool) SubManyNonUnique(ctx context.Context, urls []string, filters Filters) chan RelayEvent {
return pool.subMany(ctx, urls, filters, false) return pool.subMany(ctx, urls, filters, false)
} }
func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filters, unique bool) chan IncomingEvent { func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filters, unique bool) chan RelayEvent {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
_ = cancel // do this so `go vet` will stop complaining _ = cancel // do this so `go vet` will stop complaining
events := make(chan IncomingEvent) events := make(chan RelayEvent)
seenAlready := xsync.NewMapOf[string, Timestamp]() seenAlready := xsync.NewMapOf[string, Timestamp]()
ticker := time.NewTicker(seenAlreadyDropTick) ticker := time.NewTicker(seenAlreadyDropTick)
@ -255,7 +255,7 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
goto reconnect goto reconnect
} }
ie := IncomingEvent{Event: evt, Relay: relay} ie := RelayEvent{Event: evt, Relay: relay}
for _, mh := range pool.eventMiddleware { for _, mh := range pool.eventMiddleware {
mh(ie) mh(ie)
} }
@ -284,7 +284,10 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
case reason := <-sub.ClosedReason: case reason := <-sub.ClosedReason:
if strings.HasPrefix(reason, "auth-required:") && pool.authHandler != nil && !hasAuthed { if strings.HasPrefix(reason, "auth-required:") && pool.authHandler != nil && !hasAuthed {
// relay is requesting auth. if we can we will perform auth and try again // relay is requesting auth. if we can we will perform auth and try again
if err := relay.Auth(ctx, pool.authHandler); err == nil { err := relay.Auth(ctx, func(event *Event) error {
return pool.authHandler(ctx, RelayEvent{Event: event, Relay: relay})
})
if err == nil {
hasAuthed = true // so we don't keep doing AUTH again and again hasAuthed = true // so we don't keep doing AUTH again and again
goto subscribe goto subscribe
} }
@ -310,19 +313,19 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
} }
// SubManyEose is like SubMany, but it stops subscriptions and closes the channel when gets a EOSE // SubManyEose is like SubMany, but it stops subscriptions and closes the channel when gets a EOSE
func (pool *SimplePool) SubManyEose(ctx context.Context, urls []string, filters Filters) chan IncomingEvent { func (pool *SimplePool) SubManyEose(ctx context.Context, urls []string, filters Filters) chan RelayEvent {
return pool.subManyEose(ctx, urls, filters, true) return pool.subManyEose(ctx, urls, filters, true)
} }
// SubManyEoseNonUnique is like SubManyEose, but returns duplicate events if they come from different relays // SubManyEoseNonUnique is like SubManyEose, but returns duplicate events if they come from different relays
func (pool *SimplePool) SubManyEoseNonUnique(ctx context.Context, urls []string, filters Filters) chan IncomingEvent { func (pool *SimplePool) SubManyEoseNonUnique(ctx context.Context, urls []string, filters Filters) chan RelayEvent {
return pool.subManyEose(ctx, urls, filters, false) return pool.subManyEose(ctx, urls, filters, false)
} }
func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters Filters, unique bool) chan IncomingEvent { func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters Filters, unique bool) chan RelayEvent {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
events := make(chan IncomingEvent) events := make(chan RelayEvent)
seenAlready := xsync.NewMapOf[string, bool]() seenAlready := xsync.NewMapOf[string, bool]()
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(len(urls)) wg.Add(len(urls))
@ -361,7 +364,9 @@ func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters
case reason := <-sub.ClosedReason: case reason := <-sub.ClosedReason:
if strings.HasPrefix(reason, "auth-required:") && pool.authHandler != nil && !hasAuthed { if strings.HasPrefix(reason, "auth-required:") && pool.authHandler != nil && !hasAuthed {
// relay is requesting auth. if we can we will perform auth and try again // relay is requesting auth. if we can we will perform auth and try again
err := relay.Auth(ctx, pool.authHandler) err := relay.Auth(ctx, func(event *Event) error {
return pool.authHandler(ctx, RelayEvent{Event: event, Relay: relay})
})
if err == nil { if err == nil {
hasAuthed = true // so we don't keep doing AUTH again and again hasAuthed = true // so we don't keep doing AUTH again and again
goto subscribe goto subscribe
@ -374,7 +379,7 @@ func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters
return return
} }
ie := IncomingEvent{Event: evt, Relay: relay} ie := RelayEvent{Event: evt, Relay: relay}
for _, mh := range pool.eventMiddleware { for _, mh := range pool.eventMiddleware {
mh(ie) mh(ie)
} }
@ -399,7 +404,7 @@ func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters
} }
// QuerySingle returns the first event returned by the first relay, cancels everything else. // QuerySingle returns the first event returned by the first relay, cancels everything else.
func (pool *SimplePool) QuerySingle(ctx context.Context, urls []string, filter Filter) *IncomingEvent { func (pool *SimplePool) QuerySingle(ctx context.Context, urls []string, filter Filter) *RelayEvent {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
for ievt := range pool.SubManyEose(ctx, urls, Filters{filter}) { for ievt := range pool.SubManyEose(ctx, urls, Filters{filter}) {
@ -411,9 +416,9 @@ func (pool *SimplePool) QuerySingle(ctx context.Context, urls []string, filter F
func (pool *SimplePool) batchedSubMany( func (pool *SimplePool) batchedSubMany(
ctx context.Context, ctx context.Context,
dfs []DirectedFilters, dfs []DirectedFilters,
subFn func(context.Context, []string, Filters, bool) chan IncomingEvent, subFn func(context.Context, []string, Filters, bool) chan RelayEvent,
) chan IncomingEvent { ) chan RelayEvent {
res := make(chan IncomingEvent) res := make(chan RelayEvent)
for _, df := range dfs { for _, df := range dfs {
go func(df DirectedFilters) { go func(df DirectedFilters) {
@ -427,11 +432,11 @@ func (pool *SimplePool) batchedSubMany(
} }
// BatchedSubMany fires subscriptions only to specific relays, but batches them when they are the same. // BatchedSubMany fires subscriptions only to specific relays, but batches them when they are the same.
func (pool *SimplePool) BatchedSubMany(ctx context.Context, dfs []DirectedFilters) chan IncomingEvent { func (pool *SimplePool) BatchedSubMany(ctx context.Context, dfs []DirectedFilters) chan RelayEvent {
return pool.batchedSubMany(ctx, dfs, pool.subMany) return pool.batchedSubMany(ctx, dfs, pool.subMany)
} }
// BatchedSubManyEose is like BatchedSubMany, but ends upon receiving EOSE from all relays. // BatchedSubManyEose is like BatchedSubMany, but ends upon receiving EOSE from all relays.
func (pool *SimplePool) BatchedSubManyEose(ctx context.Context, dfs []DirectedFilters) chan IncomingEvent { func (pool *SimplePool) BatchedSubManyEose(ctx context.Context, dfs []DirectedFilters) chan RelayEvent {
return pool.batchedSubMany(ctx, dfs, pool.subManyEose) return pool.batchedSubMany(ctx, dfs, pool.subManyEose)
} }

View File

@ -70,7 +70,7 @@ func NewSystem(mods ...SystemModifier) *System {
} }
sys.Pool = nostr.NewSimplePool(context.Background(), sys.Pool = nostr.NewSimplePool(context.Background(),
nostr.WithEventMiddleware(sys.trackEventHints), nostr.WithEventMiddleware(sys.TrackEventHints),
nostr.WithPenaltyBox(), nostr.WithPenaltyBox(),
) )

View File

@ -7,7 +7,7 @@ import (
"github.com/nbd-wtf/go-nostr/sdk/hints" "github.com/nbd-wtf/go-nostr/sdk/hints"
) )
func (sys *System) trackEventHints(ie nostr.IncomingEvent) { func (sys *System) TrackEventHints(ie nostr.RelayEvent) {
if IsVirtualRelay(ie.Relay.URL) { if IsVirtualRelay(ie.Relay.URL) {
return return
} }