watchtower/wtmock: add in-memory impl of Queue

This commit adds an in-memory implementation of the Queue interface.
This can be used for tests.
This commit is contained in:
Elle Mouton
2023-02-09 12:37:57 +02:00
parent 5cc7438d33
commit 66f6bf3955
4 changed files with 161 additions and 13 deletions

View File

@@ -131,6 +131,10 @@ type DB interface {
// update identified by seqNum was received and saved. The returned
// lastApplied will be recorded.
AckUpdate(id *wtdb.SessionID, seqNum, lastApplied uint16) error
// GetDBQueue returns a BackupID Queue instance under the given name
// space.
GetDBQueue(namespace []byte) wtdb.Queue[*wtdb.BackupID]
}
// AuthDialer connects to a remote node using an authenticated transport, such

View File

@@ -4,7 +4,9 @@ import (
"testing"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/watchtower/wtclient"
"github.com/lightningnetwork/lnd/watchtower/wtdb"
"github.com/lightningnetwork/lnd/watchtower/wtmock"
"github.com/stretchr/testify/require"
)
@@ -13,21 +15,53 @@ import (
func TestDiskQueue(t *testing.T) {
t.Parallel()
// Set up a temporary bolt backend.
dbCfg := &kvdb.BoltConfig{DBTimeout: kvdb.DefaultDBTimeout}
dbs := []struct {
name string
init clientDBInit
}{
{
name: "bbolt",
init: func(t *testing.T) wtclient.DB {
dbCfg := &kvdb.BoltConfig{
DBTimeout: kvdb.DefaultDBTimeout,
}
// Construct the ClientDB.
bdb, err := wtdb.NewBoltBackendCreator(
true, t.TempDir(), "wtclient.db",
)(dbCfg)
require.NoError(t, err)
// Construct the ClientDB.
db, err := wtdb.OpenClientDB(bdb)
require.NoError(t, err)
t.Cleanup(func() {
err = db.Close()
require.NoError(t, err)
})
return db
},
},
{
name: "mock",
init: func(t *testing.T) wtclient.DB {
return wtmock.NewClientDB()
},
},
}
for _, database := range dbs {
db := database
t.Run(db.name, func(t *testing.T) {
t.Parallel()
testQueue(t, db.init(t))
})
}
}
func testQueue(t *testing.T, db wtclient.DB) {
namespace := []byte("test-namespace")
queue := db.GetDBQueue(namespace)

View File

@@ -49,6 +49,8 @@ type ClientDB struct {
nextIndex uint32
indexes map[keyIndexKey]uint32
legacyIndexes map[wtdb.TowerID]uint32
queues map[string]wtdb.Queue[*wtdb.BackupID]
}
// NewClientDB initializes a new mock ClientDB.
@@ -68,6 +70,7 @@ func NewClientDB() *ClientDB {
indexes: make(map[keyIndexKey]uint32),
legacyIndexes: make(map[wtdb.TowerID]uint32),
closableSessions: make(map[wtdb.SessionID]uint32),
queues: make(map[string]wtdb.Queue[*wtdb.BackupID]),
}
}
@@ -568,6 +571,21 @@ func (m *ClientDB) AckUpdate(id *wtdb.SessionID, seqNum,
return wtdb.ErrCommittedUpdateNotFound
}
// GetDBQueue returns a BackupID Queue instance under the given name space.
func (m *ClientDB) GetDBQueue(namespace []byte) wtdb.Queue[*wtdb.BackupID] {
m.mu.Lock()
defer m.mu.Unlock()
if q, ok := m.queues[string(namespace)]; ok {
return q
}
q := NewQueueDB[*wtdb.BackupID]()
m.queues[string(namespace)] = q
return q
}
// ListClosableSessions fetches and returns the IDs for all sessions marked as
// closable.
func (m *ClientDB) ListClosableSessions() (map[wtdb.SessionID]uint32, error) {

View File

@@ -0,0 +1,92 @@
package wtmock
import (
"container/list"
"fmt"
"sync"
"github.com/lightningnetwork/lnd/watchtower/wtdb"
)
// DiskQueueDB is an in-memory implementation of the wtclient.Queue interface.
type DiskQueueDB[T any] struct {
disk *list.List
mu sync.RWMutex
}
// NewQueueDB constructs a new DiskQueueDB.
func NewQueueDB[T any]() wtdb.Queue[T] {
return &DiskQueueDB[T]{
disk: list.New(),
}
}
// Len returns the number of tasks in the queue.
//
// NOTE: This is part of the wtclient.Queue interface.
func (d *DiskQueueDB[T]) Len() (uint64, error) {
d.mu.Lock()
defer d.mu.Unlock()
return uint64(d.disk.Len()), nil
}
// Push adds new T items to the tail of the queue.
//
// NOTE: This is part of the wtclient.Queue interface.
func (d *DiskQueueDB[T]) Push(items ...T) error {
d.mu.Lock()
defer d.mu.Unlock()
for _, item := range items {
d.disk.PushBack(item)
}
return nil
}
// PopUpTo attempts to pop up to n items from the queue. If the queue is empty,
// then ErrEmptyQueue is returned.
//
// NOTE: This is part of the Queue interface.
func (d *DiskQueueDB[T]) PopUpTo(n int) ([]T, error) {
d.mu.Lock()
defer d.mu.Unlock()
if d.disk.Len() == 0 {
return nil, wtdb.ErrEmptyQueue
}
num := n
if d.disk.Len() < n {
num = d.disk.Len()
}
tasks := make([]T, 0, num)
for i := 0; i < num; i++ {
e := d.disk.Front()
task, ok := d.disk.Remove(e).(T)
if !ok {
return nil, fmt.Errorf("queue item not of type %T",
task)
}
tasks = append(tasks, task)
}
return tasks, nil
}
// PushHead pushes new T items to the head of the queue.
//
// NOTE: This is part of the wtclient.Queue interface.
func (d *DiskQueueDB[T]) PushHead(items ...T) error {
d.mu.Lock()
defer d.mu.Unlock()
for i := len(items) - 1; i >= 0; i-- {
d.disk.PushFront(items[i])
}
return nil
}