mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-09-02 03:54:26 +02:00
fn: add new ConcurrentQueue[T] data structure
This is a version of the queue we have elsewhere, but we can get rid of all the casting and interface usage in favor of a type param.
This commit is contained in:
128
fn/conc_queue.go
Normal file
128
fn/conc_queue.go
Normal file
@@ -0,0 +1,128 @@
|
|||||||
|
package fn
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/lightninglabs/neutrino/cache/lru"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ConcurrentQueue is a typed concurrent-safe FIFO queue with unbounded
|
||||||
|
// capacity. Clients interact with the queue by pushing items into the in
|
||||||
|
// channel and popping items from the out channel. There is a goroutine that
|
||||||
|
// manages moving items from the in channel to the out channel in the correct
|
||||||
|
// order that must be started by calling Start().
|
||||||
|
type ConcurrentQueue[T any] struct {
|
||||||
|
started sync.Once
|
||||||
|
stopped sync.Once
|
||||||
|
|
||||||
|
chanIn chan T
|
||||||
|
chanOut chan T
|
||||||
|
overflow *lru.List[T]
|
||||||
|
|
||||||
|
wg sync.WaitGroup
|
||||||
|
quit chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewConcurrentQueue constructs a ConcurrentQueue. The bufferSize parameter is
|
||||||
|
// the capacity of the output channel. When the size of the queue is below this
|
||||||
|
// threshold, pushes do n[?12;4$yot incur the overhead of the less efficient overflow
|
||||||
|
// structure.
|
||||||
|
func NewConcurrentQueue[T any](bufferSize int) *ConcurrentQueue[T] {
|
||||||
|
return &ConcurrentQueue[T]{
|
||||||
|
chanIn: make(chan T),
|
||||||
|
chanOut: make(chan T, bufferSize),
|
||||||
|
overflow: lru.NewList[T](),
|
||||||
|
quit: make(chan struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ChanIn returns a channel that can be used to push new items into the queue.
|
||||||
|
func (cq *ConcurrentQueue[T]) ChanIn() chan<- T {
|
||||||
|
return cq.chanIn
|
||||||
|
}
|
||||||
|
|
||||||
|
// ChanOut returns a channel that can be used to pop items from the queue.
|
||||||
|
func (cq *ConcurrentQueue[T]) ChanOut() <-chan T {
|
||||||
|
return cq.chanOut
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start begins a goroutine that manages moving items from the in channel to the
|
||||||
|
// out channel. The queue tries to move items directly to the out channel
|
||||||
|
// minimize overhead, but if the out channel is full it pushes items to an
|
||||||
|
// overflow queue. This must be called before using the queue.
|
||||||
|
func (cq *ConcurrentQueue[T]) Start() {
|
||||||
|
cq.started.Do(cq.start)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cq *ConcurrentQueue[T]) start() {
|
||||||
|
cq.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer cq.wg.Done()
|
||||||
|
|
||||||
|
readLoop:
|
||||||
|
for {
|
||||||
|
nextElement := cq.overflow.Front()
|
||||||
|
if nextElement == nil {
|
||||||
|
// Overflow queue is empty so incoming items can
|
||||||
|
// be pushed directly to the output channel. If
|
||||||
|
// output channel is full though, push to
|
||||||
|
// overflow.
|
||||||
|
select {
|
||||||
|
case item, ok := <-cq.chanIn:
|
||||||
|
if !ok {
|
||||||
|
break readLoop
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case cq.chanOut <- item:
|
||||||
|
// Optimistically push directly
|
||||||
|
// to chanOut.
|
||||||
|
default:
|
||||||
|
cq.overflow.PushBack(item)
|
||||||
|
}
|
||||||
|
case <-cq.quit:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Overflow queue is not empty, so any new items
|
||||||
|
// get pushed to the back to preserve order.
|
||||||
|
select {
|
||||||
|
case item, ok := <-cq.chanIn:
|
||||||
|
if !ok {
|
||||||
|
break readLoop
|
||||||
|
}
|
||||||
|
cq.overflow.PushBack(item)
|
||||||
|
case cq.chanOut <- nextElement.Value:
|
||||||
|
cq.overflow.Remove(nextElement)
|
||||||
|
case <-cq.quit:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Incoming channel has been closed. Empty overflow queue into
|
||||||
|
// the outgoing channel.
|
||||||
|
nextElement := cq.overflow.Front()
|
||||||
|
for nextElement != nil {
|
||||||
|
select {
|
||||||
|
case cq.chanOut <- nextElement.Value:
|
||||||
|
cq.overflow.Remove(nextElement)
|
||||||
|
case <-cq.quit:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
nextElement = cq.overflow.Front()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close outgoing channel.
|
||||||
|
close(cq.chanOut)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop ends the goroutine that moves items from the in channel to the out
|
||||||
|
// channel. This does not clear the queue state, so the queue can be restarted
|
||||||
|
// without dropping items.
|
||||||
|
func (cq *ConcurrentQueue[T]) Stop() {
|
||||||
|
cq.stopped.Do(func() {
|
||||||
|
close(cq.quit)
|
||||||
|
cq.wg.Wait()
|
||||||
|
})
|
||||||
|
}
|
@@ -1,3 +1,5 @@
|
|||||||
module github.com/lightningnetwork/lnd/fn
|
module github.com/lightningnetwork/lnd/fn
|
||||||
|
|
||||||
go 1.19
|
go 1.19
|
||||||
|
|
||||||
|
require github.com/lightninglabs/neutrino/cache v1.1.2
|
||||||
|
6
fn/go.sum
Normal file
6
fn/go.sum
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
|
github.com/lightninglabs/neutrino/cache v1.1.2 h1:C9DY/DAPaPxbFC+xNNEI/z1SJY9GS3shmlu5hIQ798g=
|
||||||
|
github.com/lightninglabs/neutrino/cache v1.1.2/go.mod h1:XJNcgdOw1LQnanGjw8Vj44CvguYA25IMKjWFZczwZuo=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
|
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
||||||
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
Reference in New Issue
Block a user