From 0715e84f042737712ba14620b73f3c197a28fe5d Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 2 Jan 2024 17:12:19 -0800 Subject: [PATCH] fn: add new ConcurrentQueue[T] data structure This is a version of the queue we have elsewhere, but we can get rid of all the casting and interface usage in favor of a type param. --- fn/conc_queue.go | 128 +++++++++++++++++++++++++++++++++++++++++++++++ fn/go.mod | 2 + fn/go.sum | 6 +++ 3 files changed, 136 insertions(+) create mode 100644 fn/conc_queue.go create mode 100644 fn/go.sum diff --git a/fn/conc_queue.go b/fn/conc_queue.go new file mode 100644 index 000000000..0d0dde207 --- /dev/null +++ b/fn/conc_queue.go @@ -0,0 +1,128 @@ +package fn + +import ( + "sync" + + "github.com/lightninglabs/neutrino/cache/lru" +) + +// ConcurrentQueue is a typed concurrent-safe FIFO queue with unbounded +// capacity. Clients interact with the queue by pushing items into the in +// channel and popping items from the out channel. There is a goroutine that +// manages moving items from the in channel to the out channel in the correct +// order that must be started by calling Start(). +type ConcurrentQueue[T any] struct { + started sync.Once + stopped sync.Once + + chanIn chan T + chanOut chan T + overflow *lru.List[T] + + wg sync.WaitGroup + quit chan struct{} +} + +// NewConcurrentQueue constructs a ConcurrentQueue. The bufferSize parameter is +// the capacity of the output channel. When the size of the queue is below this +// threshold, pushes do n[?12;4$yot incur the overhead of the less efficient overflow +// structure. +func NewConcurrentQueue[T any](bufferSize int) *ConcurrentQueue[T] { + return &ConcurrentQueue[T]{ + chanIn: make(chan T), + chanOut: make(chan T, bufferSize), + overflow: lru.NewList[T](), + quit: make(chan struct{}), + } +} + +// ChanIn returns a channel that can be used to push new items into the queue. +func (cq *ConcurrentQueue[T]) ChanIn() chan<- T { + return cq.chanIn +} + +// ChanOut returns a channel that can be used to pop items from the queue. +func (cq *ConcurrentQueue[T]) ChanOut() <-chan T { + return cq.chanOut +} + +// Start begins a goroutine that manages moving items from the in channel to the +// out channel. The queue tries to move items directly to the out channel +// minimize overhead, but if the out channel is full it pushes items to an +// overflow queue. This must be called before using the queue. +func (cq *ConcurrentQueue[T]) Start() { + cq.started.Do(cq.start) +} + +func (cq *ConcurrentQueue[T]) start() { + cq.wg.Add(1) + go func() { + defer cq.wg.Done() + + readLoop: + for { + nextElement := cq.overflow.Front() + if nextElement == nil { + // Overflow queue is empty so incoming items can + // be pushed directly to the output channel. If + // output channel is full though, push to + // overflow. + select { + case item, ok := <-cq.chanIn: + if !ok { + break readLoop + } + select { + case cq.chanOut <- item: + // Optimistically push directly + // to chanOut. + default: + cq.overflow.PushBack(item) + } + case <-cq.quit: + return + } + } else { + // Overflow queue is not empty, so any new items + // get pushed to the back to preserve order. + select { + case item, ok := <-cq.chanIn: + if !ok { + break readLoop + } + cq.overflow.PushBack(item) + case cq.chanOut <- nextElement.Value: + cq.overflow.Remove(nextElement) + case <-cq.quit: + return + } + } + } + + // Incoming channel has been closed. Empty overflow queue into + // the outgoing channel. + nextElement := cq.overflow.Front() + for nextElement != nil { + select { + case cq.chanOut <- nextElement.Value: + cq.overflow.Remove(nextElement) + case <-cq.quit: + return + } + nextElement = cq.overflow.Front() + } + + // Close outgoing channel. + close(cq.chanOut) + }() +} + +// Stop ends the goroutine that moves items from the in channel to the out +// channel. This does not clear the queue state, so the queue can be restarted +// without dropping items. +func (cq *ConcurrentQueue[T]) Stop() { + cq.stopped.Do(func() { + close(cq.quit) + cq.wg.Wait() + }) +} diff --git a/fn/go.mod b/fn/go.mod index 6879fc382..b199b1e2c 100644 --- a/fn/go.mod +++ b/fn/go.mod @@ -1,3 +1,5 @@ module github.com/lightningnetwork/lnd/fn go 1.19 + +require github.com/lightninglabs/neutrino/cache v1.1.2 diff --git a/fn/go.sum b/fn/go.sum new file mode 100644 index 000000000..3c118d23a --- /dev/null +++ b/fn/go.sum @@ -0,0 +1,6 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/lightninglabs/neutrino/cache v1.1.2 h1:C9DY/DAPaPxbFC+xNNEI/z1SJY9GS3shmlu5hIQ798g= +github.com/lightninglabs/neutrino/cache v1.1.2/go.mod h1:XJNcgdOw1LQnanGjw8Vj44CvguYA25IMKjWFZczwZuo= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=