diff --git a/fn/conc_queue.go b/fn/conc_queue.go new file mode 100644 index 000000000..0d0dde207 --- /dev/null +++ b/fn/conc_queue.go @@ -0,0 +1,128 @@ +package fn + +import ( + "sync" + + "github.com/lightninglabs/neutrino/cache/lru" +) + +// ConcurrentQueue is a typed concurrent-safe FIFO queue with unbounded +// capacity. Clients interact with the queue by pushing items into the in +// channel and popping items from the out channel. There is a goroutine that +// manages moving items from the in channel to the out channel in the correct +// order that must be started by calling Start(). +type ConcurrentQueue[T any] struct { + started sync.Once + stopped sync.Once + + chanIn chan T + chanOut chan T + overflow *lru.List[T] + + wg sync.WaitGroup + quit chan struct{} +} + +// NewConcurrentQueue constructs a ConcurrentQueue. The bufferSize parameter is +// the capacity of the output channel. When the size of the queue is below this +// threshold, pushes do n[?12;4$yot incur the overhead of the less efficient overflow +// structure. +func NewConcurrentQueue[T any](bufferSize int) *ConcurrentQueue[T] { + return &ConcurrentQueue[T]{ + chanIn: make(chan T), + chanOut: make(chan T, bufferSize), + overflow: lru.NewList[T](), + quit: make(chan struct{}), + } +} + +// ChanIn returns a channel that can be used to push new items into the queue. +func (cq *ConcurrentQueue[T]) ChanIn() chan<- T { + return cq.chanIn +} + +// ChanOut returns a channel that can be used to pop items from the queue. +func (cq *ConcurrentQueue[T]) ChanOut() <-chan T { + return cq.chanOut +} + +// Start begins a goroutine that manages moving items from the in channel to the +// out channel. The queue tries to move items directly to the out channel +// minimize overhead, but if the out channel is full it pushes items to an +// overflow queue. This must be called before using the queue. +func (cq *ConcurrentQueue[T]) Start() { + cq.started.Do(cq.start) +} + +func (cq *ConcurrentQueue[T]) start() { + cq.wg.Add(1) + go func() { + defer cq.wg.Done() + + readLoop: + for { + nextElement := cq.overflow.Front() + if nextElement == nil { + // Overflow queue is empty so incoming items can + // be pushed directly to the output channel. If + // output channel is full though, push to + // overflow. + select { + case item, ok := <-cq.chanIn: + if !ok { + break readLoop + } + select { + case cq.chanOut <- item: + // Optimistically push directly + // to chanOut. + default: + cq.overflow.PushBack(item) + } + case <-cq.quit: + return + } + } else { + // Overflow queue is not empty, so any new items + // get pushed to the back to preserve order. + select { + case item, ok := <-cq.chanIn: + if !ok { + break readLoop + } + cq.overflow.PushBack(item) + case cq.chanOut <- nextElement.Value: + cq.overflow.Remove(nextElement) + case <-cq.quit: + return + } + } + } + + // Incoming channel has been closed. Empty overflow queue into + // the outgoing channel. + nextElement := cq.overflow.Front() + for nextElement != nil { + select { + case cq.chanOut <- nextElement.Value: + cq.overflow.Remove(nextElement) + case <-cq.quit: + return + } + nextElement = cq.overflow.Front() + } + + // Close outgoing channel. + close(cq.chanOut) + }() +} + +// Stop ends the goroutine that moves items from the in channel to the out +// channel. This does not clear the queue state, so the queue can be restarted +// without dropping items. +func (cq *ConcurrentQueue[T]) Stop() { + cq.stopped.Do(func() { + close(cq.quit) + cq.wg.Wait() + }) +} diff --git a/fn/either.go b/fn/either.go new file mode 100644 index 000000000..aedc9fe5b --- /dev/null +++ b/fn/either.go @@ -0,0 +1,48 @@ +package fn + +// Either is a type that can be either left or right. +type Either[L any, R any] struct { + left Option[L] + right Option[R] +} + +// NewLeft returns an Either with a left value. +func NewLeft[L any, R any](l L) Either[L, R] { + return Either[L, R]{left: Some(l), right: None[R]()} +} + +// NewRight returns an Either with a right value. +func NewRight[L any, R any](r R) Either[L, R] { + return Either[L, R]{left: None[L](), right: Some(r)} +} + +// WhenLeft executes the given function if the Either is left. +func (e Either[L, R]) WhenLeft(f func(L)) { + e.left.WhenSome(f) +} + +// WhenRight executes the given function if the Either is right. +func (e Either[L, R]) WhenRight(f func(R)) { + e.right.WhenSome(f) +} + +// IsLeft returns true if the Either is left. +func (e Either[L, R]) IsLeft() bool { + return e.left.IsSome() +} + +// IsRight returns true if the Either is right. +func (e Either[L, R]) IsRight() bool { + return e.right.IsSome() +} + +// MapLeft maps the left value of the Either to a new value. +func MapLeft[L any, R any, O any](f func(L) O) func(Either[L, R]) Option[O] { + return func(e Either[L, R]) Option[O] { + if e.IsLeft() { + return MapOption(f)(e.left) + } + + return None[O]() + } +} diff --git a/fn/events.go b/fn/events.go new file mode 100644 index 000000000..6ad182436 --- /dev/null +++ b/fn/events.go @@ -0,0 +1,142 @@ +package fn + +import ( + "fmt" + "sync" + "sync/atomic" + "time" +) + +const ( + // DefaultQueueSize is the default size to use for concurrent queues. + DefaultQueueSize = 10 +) + +var ( + // nextID is the next subscription ID that will be used for a new event + // receiver. This MUST be used atomically. + nextID uint64 +) + +// EventReceiver is a struct type that holds two queues for new and removed +// items respectively. +type EventReceiver[T any] struct { + // id is the internal process-unique ID of the subscription. + id uint64 + + // NewItemCreated is sent to when a new item was created successfully. + NewItemCreated *ConcurrentQueue[T] + + // ItemRemoved is sent to when an existing item was removed. + ItemRemoved *ConcurrentQueue[T] +} + +// ID returns the internal process-unique ID of the subscription. +func (e *EventReceiver[T]) ID() uint64 { + return e.id +} + +// Stop stops the receiver from processing events. +func (e *EventReceiver[T]) Stop() { + e.NewItemCreated.Stop() + e.ItemRemoved.Stop() +} + +// NewEventReceiver creates a new event receiver with concurrent queues of the +// given size. +func NewEventReceiver[T any](queueSize int) *EventReceiver[T] { + created := NewConcurrentQueue[T](queueSize) + created.Start() + removed := NewConcurrentQueue[T](queueSize) + removed.Start() + + id := atomic.AddUint64(&nextID, 1) + + return &EventReceiver[T]{ + id: id, + NewItemCreated: created, + ItemRemoved: removed, + } +} + +// EventPublisher is an interface type for a component that offers event based +// subscriptions for publishing events. +type EventPublisher[T any, Q any] interface { + // RegisterSubscriber adds a new subscriber for receiving events. The + // deliverExisting boolean indicates whether already existing items + // should be sent to the NewItemCreated channel when the subscription is + // started. An optional deliverFrom can be specified to indicate from + // which timestamp/index/marker onward existing items should be + // delivered on startup. If deliverFrom is nil/zero/empty then all + // existing items will be delivered. + RegisterSubscriber(receiver *EventReceiver[T], deliverExisting bool, + deliverFrom Q) error + + // RemoveSubscriber removes the given subscriber and also stops it from + // processing events. + RemoveSubscriber(subscriber *EventReceiver[T]) error +} + +// Event is a generic event that can be sent to a subscriber. +type Event interface { + Timestamp() time.Time +} + +// EventDistributor is a struct type that helps to distribute events to multiple +// subscribers. +type EventDistributor[T any] struct { + // subscribers is a map of components that want to be notified on new + // events, keyed by their subscription ID. + subscribers map[uint64]*EventReceiver[T] + + // subscriberMtx guards the subscribers map and access to the + // subscriptionID. + subscriberMtx sync.Mutex +} + +// NewEventDistributor creates a new event distributor of the declared type. +func NewEventDistributor[T any]() *EventDistributor[T] { + return &EventDistributor[T]{ + subscribers: make(map[uint64]*EventReceiver[T]), + } +} + +// RegisterSubscriber adds a new subscriber for receiving events. +func (d *EventDistributor[T]) RegisterSubscriber(subscriber *EventReceiver[T]) { + d.subscriberMtx.Lock() + defer d.subscriberMtx.Unlock() + + d.subscribers[subscriber.ID()] = subscriber +} + +// RemoveSubscriber removes the given subscriber and also stops it from +// processing events. +func (d *EventDistributor[T]) RemoveSubscriber( + subscriber *EventReceiver[T]) error { + + d.subscriberMtx.Lock() + defer d.subscriberMtx.Unlock() + + _, ok := d.subscribers[subscriber.ID()] + if !ok { + return fmt.Errorf("subscriber with ID %d not found", + subscriber.ID()) + } + + subscriber.Stop() + delete(d.subscribers, subscriber.ID()) + + return nil +} + +// NotifySubscribers sends the given events to all subscribers. +func (d *EventDistributor[T]) NotifySubscribers(events ...T) { + d.subscriberMtx.Lock() + for i := range events { + event := events[i] + for id := range d.subscribers { + d.subscribers[id].NewItemCreated.ChanIn() <- event + } + } + d.subscriberMtx.Unlock() +} diff --git a/fn/func.go b/fn/func.go new file mode 100644 index 000000000..056f84aef --- /dev/null +++ b/fn/func.go @@ -0,0 +1,17 @@ +package fn + +// Reducer represents a function that takes an accumulator and the value, then +// returns a new accumulator. +type Reducer[T, V any] func(accum T, value V) T + +// Reduce takes a slice of something, and a reducer, and produces a final +// accumulated value. +func Reduce[T any, V any, S []V](s S, f Reducer[T, V]) T { + var accum T + + for _, x := range s { + accum = f(accum, x) + } + + return accum +} diff --git a/fn/go.mod b/fn/go.mod index 6879fc382..4d1528efe 100644 --- a/fn/go.mod +++ b/fn/go.mod @@ -1,3 +1,8 @@ module github.com/lightningnetwork/lnd/fn go 1.19 + +require ( + github.com/lightninglabs/neutrino/cache v1.1.2 + golang.org/x/exp v0.0.0-20231226003508-02704c960a9b +) diff --git a/fn/go.sum b/fn/go.sum new file mode 100644 index 000000000..d066b9820 --- /dev/null +++ b/fn/go.sum @@ -0,0 +1,8 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/lightninglabs/neutrino/cache v1.1.2 h1:C9DY/DAPaPxbFC+xNNEI/z1SJY9GS3shmlu5hIQ798g= +github.com/lightninglabs/neutrino/cache v1.1.2/go.mod h1:XJNcgdOw1LQnanGjw8Vj44CvguYA25IMKjWFZczwZuo= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +golang.org/x/exp v0.0.0-20231226003508-02704c960a9b h1:kLiC65FbiHWFAOu+lxwNPujcsl8VYyTYYEZnsOO1WK4= +golang.org/x/exp v0.0.0-20231226003508-02704c960a9b/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/fn/option.go b/fn/option.go index ec0d346f5..59012effb 100644 --- a/fn/option.go +++ b/fn/option.go @@ -48,6 +48,22 @@ func (o Option[A]) UnwrapOr(a A) A { return a } +// UnwrapOrFunc is used to extract a value from an option, and we supply a +// thunk to be evaluated in the case when the Option is empty. +func (o Option[A]) UnwrapOrFunc(f func() A) A { + return ElimOption(o, f, func(a A) A { return a }) +} + +// UnwrapOrFuncErr is used to extract a value from an option, and we supply a +// thunk to be evaluated in the case when the Option is empty. +func (o Option[A]) UnwrapOrFuncErr(f func() (A, error)) (A, error) { + if o.isSome { + return o.some, nil + } + + return f() +} + // WhenSome is used to conditionally perform a side-effecting function that // accepts a value of the type that parameterizes the option. If this function // performs no side effects, WhenSome is useless. @@ -117,6 +133,19 @@ func MapOption[A, B any](f func(A) B) func(Option[A]) Option[B] { } } +// MapOptionZ transforms a pure function A -> B into one that will operate +// inside the Option context. Unlike MapOption, this function will return the +// default/zero argument of the return type if the Option is empty. +func MapOptionZ[A, B any](o Option[A], f func(A) B) B { + var zero B + + if o.IsNone() { + return zero + } + + return f(o.some) +} + // LiftA2Option transforms a pure function (A, B) -> C into one that will // operate in an Option context. For the returned function, if either of its // arguments are None, then the result will be None. diff --git a/fn/queue.go b/fn/queue.go new file mode 100644 index 000000000..7b8bc7e4c --- /dev/null +++ b/fn/queue.go @@ -0,0 +1,51 @@ +package fn + +// Queue is a generic queue implementation. +type Queue[T any] struct { + items []T +} + +// NewQueue creates a new Queue. +func NewQueue[T any](startingItems ...T) Queue[T] { + return Queue[T]{ + items: startingItems, + } +} + +// Enqueue adds one or more an items to the end of the Queue. +func (q *Queue[T]) Enqueue(value ...T) { + q.items = append(q.items, value...) +} + +// Dequeue removes an element from the front of the Queue. If there're no items +// in the queue, then None is returned. +func (q *Queue[T]) Dequeue() Option[T] { + if len(q.items) == 0 { + return None[T]() + } + + value := q.items[0] + q.items = q.items[1:] + + return Some(value) +} + +// Peek returns the first item in the queue without removing it. If the queue +// is empty, then None is returned. +func (q *Queue[T]) Peek() Option[T] { + if q.IsEmpty() { + return None[T]() + } + + return Some(q.items[0]) +} + +// IsEmpty returns true if the Queue is empty +func (q *Queue[T]) IsEmpty() bool { + return len(q.items) == 0 +} + +// Size returns the number of items in the Queue +func (q *Queue[T]) Size() int { + return len(q.items) +} diff --git a/fn/recv.go b/fn/recv.go new file mode 100644 index 000000000..254118076 --- /dev/null +++ b/fn/recv.go @@ -0,0 +1,38 @@ +package fn + +import ( + "fmt" + "time" +) + +// RecvOrTimeout attempts to recv over chan c, returning the value. If the +// timeout passes before the recv succeeds, an error is returned +func RecvOrTimeout[T any](c <-chan T, timeout time.Duration) (T, error) { + select { + case m := <-c: + return m, nil + + case <-time.After(timeout): + var zero T + return zero, fmt.Errorf("timeout hit") + } +} + +// RecvResp takes three channels: a response channel, an error channel and a +// quit channel. If either of these channels are sent on, then the function +// will exit with that response. This can be used to wait for a response, +// error, or a quit signal. +func RecvResp[T any](r <-chan T, e <-chan error, q <-chan struct{}) (T, error) { + var noResp T + + select { + case resp := <-r: + return resp, nil + + case err := <-e: + return noResp, err + + case <-q: + return noResp, fmt.Errorf("quitting") + } +} diff --git a/fn/send.go b/fn/send.go new file mode 100644 index 000000000..4258ca536 --- /dev/null +++ b/fn/send.go @@ -0,0 +1,13 @@ +package fn + +// SendOrQuit attempts to and a message through channel c. If this succeeds, +// then bool is returned. Otherwise if a quit signal is received first, then +// false is returned. +func SendOrQuit[T any, Q any](c chan<- T, msg T, quit chan Q) bool { + select { + case c <- msg: + return true + case <-quit: + return false + } +} diff --git a/fn/set.go b/fn/set.go new file mode 100644 index 000000000..51368b210 --- /dev/null +++ b/fn/set.go @@ -0,0 +1,95 @@ +package fn + +import "golang.org/x/exp/maps" + +// Set is a generic set using type params that supports the following +// operations: diff, union, intersection, and subset. +type Set[T comparable] map[T]struct{} + +// NewSet returns a new set with the given elements. +func NewSet[T comparable](elems ...T) Set[T] { + s := make(Set[T]) + for _, e := range elems { + s.Add(e) + } + return s +} + +// Add adds an element to the set. +func (s Set[T]) Add(e T) { + s[e] = struct{}{} +} + +// Remove removes an element from the set. +func (s Set[T]) Remove(e T) { + delete(s, e) +} + +// Contains returns true if the set contains the element. +func (s Set[T]) Contains(e T) bool { + _, ok := s[e] + return ok +} + +// Diff returns the difference between two sets. +func (s Set[T]) Diff(other Set[T]) Set[T] { + diff := make(Set[T]) + for e := range s { + if !other.Contains(e) { + diff.Add(e) + } + } + return diff +} + +// Union returns the union of two sets. +func (s Set[T]) Union(other Set[T]) Set[T] { + union := make(Set[T]) + for e := range s { + union.Add(e) + } + for e := range other { + union.Add(e) + } + return union +} + +// Intersect returns the intersection of two sets. +func (s Set[T]) Intersect(other Set[T]) Set[T] { + intersect := make(Set[T]) + for e := range s { + if other.Contains(e) { + intersect.Add(e) + } + } + return intersect +} + +// Subset returns true if the set is a subset of the other set. +func (s Set[T]) Subset(other Set[T]) bool { + for e := range s { + if !other.Contains(e) { + return false + } + } + return true +} + +// Equal returns true if the set is equal to the other set. +func (s Set[T]) Equal(other Set[T]) bool { + return s.Subset(other) && other.Subset(s) +} + +// ToSlice returns the set as a slice. +func (s Set[T]) ToSlice() []T { + return maps.Keys(s) +} + +// SetDiff returns all the items that are in the first set but not in the +// second. +func SetDiff[T comparable](a, b []T) []T { + setA := NewSet(a...) + setB := NewSet(b...) + + return setA.Diff(setB).ToSlice() +}