pool: implement optional penalty box mechanic.

This commit is contained in:
fiatjaf 2024-07-29 14:58:25 -03:00
parent 53ff9019d6
commit 077c14cef6

95
pool.go
View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log"
"math"
"slices"
"strings"
"sync"
@ -26,7 +27,9 @@ type SimplePool struct {
eventMiddleware []func(IncomingEvent)
// custom things not often used
SignatureChecker func(Event) bool
signatureChecker func(Event) bool
penaltyBoxMu sync.Mutex
penaltyBox map[string][2]float64
}
type DirectedFilters struct {
@ -73,6 +76,42 @@ func (h WithAuthHandler) ApplyPoolOption(pool *SimplePool) {
pool.authHandler = h
}
// WithPenaltyBox just sets the penalty box mechanism so relays that fail to connect
// or that disconnect will be ignored for a while and we won't attempt to connect again.
func WithPenaltyBox() withPenaltyBoxOpt { return withPenaltyBoxOpt{} }
type withPenaltyBoxOpt struct{}
func (h withPenaltyBoxOpt) ApplyPoolOption(pool *SimplePool) {
pool.penaltyBox = make(map[string][2]float64)
go func() {
sleep := 30.0
for {
time.Sleep(time.Duration(sleep) * time.Second)
pool.penaltyBoxMu.Lock()
nextSleep := 300.0
for url, v := range pool.penaltyBox {
remainingSeconds := v[1]
remainingSeconds -= sleep
if remainingSeconds <= 0 {
pool.penaltyBox[url] = [2]float64{v[0], 0}
continue
} else {
pool.penaltyBox[url] = [2]float64{v[0], remainingSeconds}
}
if remainingSeconds < nextSleep {
nextSleep = remainingSeconds
}
}
sleep = nextSleep
pool.penaltyBoxMu.Unlock()
}
}()
}
// WithEventMiddleware is a function that will be called with all events received.
// more than one can be passed at a time.
type WithEventMiddleware func(IncomingEvent)
@ -84,6 +123,7 @@ func (h WithEventMiddleware) ApplyPoolOption(pool *SimplePool) {
var (
_ PoolOption = (WithAuthHandler)(nil)
_ PoolOption = (WithEventMiddleware)(nil)
_ PoolOption = WithPenaltyBox()
)
func (pool *SimplePool) EnsureRelay(url string) (*Relay, error) {
@ -91,27 +131,44 @@ func (pool *SimplePool) EnsureRelay(url string) (*Relay, error) {
defer namedLock(nm)()
relay, ok := pool.Relays.Load(nm)
if ok && relay.IsConnected() {
if ok && relay == nil {
if pool.penaltyBox != nil {
pool.penaltyBoxMu.Lock()
defer pool.penaltyBoxMu.Unlock()
v, _ := pool.penaltyBox[nm]
if v[1] > 0 {
return nil, fmt.Errorf("in penalty box, %fs remaining", v[1])
}
}
} else if ok && relay.IsConnected() {
// already connected, unlock and return
return relay, nil
} else {
var err error
// we use this ctx here so when the pool dies everything dies
ctx, cancel := context.WithTimeout(pool.Context, time.Second*15)
defer cancel()
opts := make([]RelayOption, 0, 1+len(pool.eventMiddleware))
if pool.SignatureChecker != nil {
opts = append(opts, WithSignatureChecker(pool.SignatureChecker))
}
if relay, err = RelayConnect(ctx, nm, opts...); err != nil {
return nil, fmt.Errorf("failed to connect: %w", err)
}
pool.Relays.Store(nm, relay)
return relay, nil
}
// try to connect
var err error
// we use this ctx here so when the pool dies everything dies
ctx, cancel := context.WithTimeout(pool.Context, time.Second*15)
defer cancel()
opts := make([]RelayOption, 0, 1+len(pool.eventMiddleware))
if pool.signatureChecker != nil {
opts = append(opts, WithSignatureChecker(pool.signatureChecker))
}
if relay, err = RelayConnect(ctx, nm, opts...); err != nil {
if pool.penaltyBox != nil {
// putting relay in penalty box
pool.penaltyBoxMu.Lock()
defer pool.penaltyBoxMu.Unlock()
v, _ := pool.penaltyBox[nm]
pool.penaltyBox[nm] = [2]float64{v[0] + 1, 30.0 + math.Pow(2, v[0]+1)}
}
return nil, fmt.Errorf("failed to connect: %w", err)
}
pool.Relays.Store(nm, relay)
return relay, nil
}
// SubMany opens a subscription with the given filters to multiple relays