watchtowers: add thread safe min-heap

In this commit, a thread-safe min-heap is implemented. It will carry
sessionCloseItems which carry a sessionID and a block height at which
the session should be closed.
This commit is contained in:
Elle Mouton
2022-10-21 11:30:23 +02:00
parent 16008c0032
commit 2b08d3443f
3 changed files with 162 additions and 12 deletions

View File

@@ -271,6 +271,8 @@ type TowerClient struct {
sessionQueue *sessionQueue
prevTask *backupTask
closableSessionQueue *sessionCloseMinHeap
backupMu sync.Mutex
summaries wtdb.ChannelSummaries
chanCommitHeights map[lnwire.ChannelID]uint64
@@ -322,18 +324,19 @@ func New(config *Config) (*TowerClient, error) {
}
c := &TowerClient{
cfg: cfg,
log: plog,
pipeline: newTaskPipeline(plog),
chanCommitHeights: make(map[lnwire.ChannelID]uint64),
activeSessions: make(sessionQueueSet),
summaries: chanSummaries,
statTicker: time.NewTicker(DefaultStatInterval),
stats: new(ClientStats),
newTowers: make(chan *newTowerMsg),
staleTowers: make(chan *staleTowerMsg),
forceQuit: make(chan struct{}),
quit: make(chan struct{}),
cfg: cfg,
log: plog,
pipeline: newTaskPipeline(plog),
chanCommitHeights: make(map[lnwire.ChannelID]uint64),
activeSessions: make(sessionQueueSet),
summaries: chanSummaries,
closableSessionQueue: newSessionCloseMinHeap(),
statTicker: time.NewTicker(DefaultStatInterval),
stats: new(ClientStats),
newTowers: make(chan *newTowerMsg),
staleTowers: make(chan *staleTowerMsg),
forceQuit: make(chan struct{}),
quit: make(chan struct{}),
}
// perUpdate is a callback function that will be used to inspect the

View File

@@ -0,0 +1,95 @@
package wtclient
import (
"sync"
"github.com/lightningnetwork/lnd/queue"
"github.com/lightningnetwork/lnd/watchtower/wtdb"
)
// sessionCloseMinHeap is a thread-safe min-heap implementation that stores
// sessionCloseItem items and prioritises the item with the lowest block height.
type sessionCloseMinHeap struct {
queue queue.PriorityQueue
mu sync.Mutex
}
// newSessionCloseMinHeap constructs a new sessionCloseMineHeap.
func newSessionCloseMinHeap() *sessionCloseMinHeap {
return &sessionCloseMinHeap{}
}
// Len returns the length of the queue.
func (h *sessionCloseMinHeap) Len() int {
h.mu.Lock()
defer h.mu.Unlock()
return h.queue.Len()
}
// Empty returns true if the queue is empty.
func (h *sessionCloseMinHeap) Empty() bool {
h.mu.Lock()
defer h.mu.Unlock()
return h.queue.Empty()
}
// Push adds an item to the priority queue.
func (h *sessionCloseMinHeap) Push(item *sessionCloseItem) {
h.mu.Lock()
defer h.mu.Unlock()
h.queue.Push(item)
}
// Pop removes the top most item from the queue.
func (h *sessionCloseMinHeap) Pop() *sessionCloseItem {
h.mu.Lock()
defer h.mu.Unlock()
if h.queue.Empty() {
return nil
}
item := h.queue.Pop()
return item.(*sessionCloseItem) //nolint:forcetypeassert
}
// Top returns the top most item from the queue without removing it.
func (h *sessionCloseMinHeap) Top() *sessionCloseItem {
h.mu.Lock()
defer h.mu.Unlock()
if h.queue.Empty() {
return nil
}
item := h.queue.Top()
return item.(*sessionCloseItem) //nolint:forcetypeassert
}
// sessionCloseItem represents a session that is ready to be deleted.
type sessionCloseItem struct {
// sessionID is the ID of the session in question.
sessionID wtdb.SessionID
// deleteHeight is the block height after which we can delete the
// session.
deleteHeight uint32
}
// Less returns true if the current item's delete height is less than the
// other sessionCloseItem's delete height. This results in lower block heights
// being popped first from the heap.
//
// NOTE: this is part of the queue.PriorityQueueItem interface.
func (s *sessionCloseItem) Less(other queue.PriorityQueueItem) bool {
o := other.(*sessionCloseItem).deleteHeight //nolint:forcetypeassert
return s.deleteHeight < o
}
var _ queue.PriorityQueueItem = (*sessionCloseItem)(nil)

View File

@@ -0,0 +1,52 @@
package wtclient
import (
"testing"
"github.com/stretchr/testify/require"
)
// TestSessionCloseMinHeap asserts that the sessionCloseMinHeap behaves as
// expected.
func TestSessionCloseMinHeap(t *testing.T) {
t.Parallel()
heap := newSessionCloseMinHeap()
require.Nil(t, heap.Pop())
require.Nil(t, heap.Top())
require.True(t, heap.Empty())
require.Zero(t, heap.Len())
// Add an item with height 10.
item1 := &sessionCloseItem{
sessionID: [33]byte{1, 2, 3},
deleteHeight: 10,
}
heap.Push(item1)
require.Equal(t, item1, heap.Top())
require.False(t, heap.Empty())
require.EqualValues(t, 1, heap.Len())
// Add a bunch more items with heights 1, 2, 6, 11, 6, 30, 9.
heap.Push(&sessionCloseItem{deleteHeight: 1})
heap.Push(&sessionCloseItem{deleteHeight: 2})
heap.Push(&sessionCloseItem{deleteHeight: 6})
heap.Push(&sessionCloseItem{deleteHeight: 11})
heap.Push(&sessionCloseItem{deleteHeight: 6})
heap.Push(&sessionCloseItem{deleteHeight: 30})
heap.Push(&sessionCloseItem{deleteHeight: 9})
// Now pop from the queue and assert that the items are returned in
// ascending order.
require.EqualValues(t, 1, heap.Pop().deleteHeight)
require.EqualValues(t, 2, heap.Pop().deleteHeight)
require.EqualValues(t, 6, heap.Pop().deleteHeight)
require.EqualValues(t, 6, heap.Pop().deleteHeight)
require.EqualValues(t, 9, heap.Pop().deleteHeight)
require.EqualValues(t, 10, heap.Pop().deleteHeight)
require.EqualValues(t, 11, heap.Pop().deleteHeight)
require.EqualValues(t, 30, heap.Pop().deleteHeight)
require.Nil(t, heap.Pop())
require.Zero(t, heap.Len())
}