Files
lnd/watchtower/wtclient/queue_test.go
Elle Mouton 3d201dde50 wtclient: ensure correct disk mode for overflow queue
Before this commit, in the watchtower client's DiskOverflowQueue, there
could be a situation _if no consumer was reading from the queue_ that
could lead to an in-memory build up of backup tasks instead of the
expected disk overflow. This commit fixes this by ensuring that if a
task is read from disk, then the mode is set to disk mode to ensure that
any new items are persisted to disk in this scenario.
The unit tests added in the previous commit is updated here in order to
show that the fix does in-fact fix the test.
2024-01-15 17:28:17 +02:00

507 lines
13 KiB
Go

package wtclient
import (
"sync"
"testing"
"time"
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lntest/wait"
"github.com/lightningnetwork/lnd/watchtower/wtdb"
"github.com/lightningnetwork/lnd/watchtower/wtmock"
"github.com/stretchr/testify/require"
)
const (
maxInMemItems = 5
waitTime = time.Second * 2
)
// TestDiskOverflowQueue tests that the DiskOverflowQueue behaves as expected.
func TestDiskOverflowQueue(t *testing.T) {
t.Parallel()
tests := []struct {
name string
run func(*testing.T, wtdb.Queue[*wtdb.BackupID])
}{
{
name: "overflow to disk",
run: testOverflowToDisk,
},
{
name: "startup with smaller buffer size",
run: testRestartWithSmallerBufferSize,
},
{
name: "start stop queue",
run: testStartStopQueue,
},
{
name: "demo flake",
run: demoFlake,
},
}
initDB := func() wtdb.Queue[*wtdb.BackupID] {
dbCfg := &kvdb.BoltConfig{
DBTimeout: kvdb.DefaultDBTimeout,
}
bdb, err := wtdb.NewBoltBackendCreator(
true, t.TempDir(), "wtclient.db",
)(dbCfg)
require.NoError(t, err)
db, err := wtdb.OpenClientDB(bdb)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, db.Close())
})
return db.GetDBQueue([]byte("test-namespace"))
}
for _, test := range tests {
test := test
t.Run(test.name, func(tt *testing.T) {
tt.Parallel()
test.run(tt, initDB())
})
}
}
// demoFlake is a temporary test demonstrating the fix of a race condition that
// existed in the DiskOverflowQueue. It contrives a scenario that once resulted
// in the queue being in memory mode when it really should be in disk mode.
func demoFlake(t *testing.T, db wtdb.Queue[*wtdb.BackupID]) {
// Generate some backup IDs that we want to add to the queue.
tasks := genBackupIDs(10)
// New mock logger.
log := newMockLogger(t.Logf)
// Init the queue with the mock DB.
q, err := NewDiskOverflowQueue[*wtdb.BackupID](
db, maxInMemItems, log,
)
require.NoError(t, err)
// Set the two test variables to true so that we have more control over
// the feedOutput goroutine and disk writes from the test.
q.waitForDiskWriteSignal = true
q.waitForFeedOutputSignal = true
// Start the queue.
require.NoError(t, q.Start())
// Initially there should be no items on disk.
assertNumDisk(t, db, 0)
// Start filling up the queue. The maxInMemItems is 5, meaning that the
// memQueue capacity is 3. Since the feedOutput goroutine has not yet
// started, these first 3 items (tasks 0-2) will fill the memQueue.
enqueue(t, q, tasks[0])
enqueue(t, q, tasks[1])
enqueue(t, q, tasks[2])
// Adding task 3 is expected to result in the mode being changed to disk
// mode.
enqueue(t, q, tasks[3])
// Show that the mode does actually change to disk mode.
err = wait.Predicate(func() bool {
return q.toDisk.Load()
}, waitTime)
require.NoError(t, err)
// Allow task 3 to be written to disk. This will send a signal on
// newDiskItemsSignal.
q.allowDiskWrite()
// Task 3 will almost immediately be popped from disk again due to
// the newDiskItemsSignal causing feedMemQueue to call feedFromDisk.
waitForNumDisk(t, db, 0)
// Enqueue task 4 but don't allow it to be written to disk yet.
enqueue(t, q, tasks[4])
// Wait a bit just to make sure that task 4 has passed the
// if q.toDisk.Load() check in pushToActiveQueue and is waiting on the
// allowDiskWrite signal.
time.Sleep(time.Second)
// Now, start the feedOutput goroutine. This will pop 1 item from the
// memQueue meaning that feedMemQueue will now manage to push an item
// onto the memQueue & will go onto the next iteration of feedFromDisk
// which will then see that there are no items on disk and so will
// change the mode to memory-mode.
q.startFeedOutput()
err = wait.Predicate(func() bool {
return !q.toDisk.Load()
}, waitTime)
require.NoError(t, err)
// Now, we allow task 4 to be written to disk. This will result in a
// newDiskItemsSignal being sent meaning that feedMemQueue will read
// from disk and block on pushing the new item to memQueue.
q.allowDiskWrite()
// The above will result in feeMemQueue switching the mode to disk-mode.
err = wait.Predicate(func() bool {
return q.toDisk.Load()
}, waitTime)
require.NoError(t, err)
// Now, if we enqueue task 5, it _will_ be written to disk since the
// queue is currently in disk mode.
enqueue(t, q, tasks[5])
q.allowDiskWrite()
// Show that there is an item on disk at this point. This demonstrates
// that the bug has been fixed.
waitForNumDisk(t, db, 1)
require.NoError(t, q.Stop())
}
// testOverflowToDisk is a basic test that ensures that the queue correctly
// overflows items to disk and then correctly reloads them.
func testOverflowToDisk(t *testing.T, db wtdb.Queue[*wtdb.BackupID]) {
// Generate some backup IDs that we want to add to the queue.
tasks := genBackupIDs(10)
// New mock logger.
log := newMockLogger(t.Logf)
// Init the queue with the mock DB.
q, err := NewDiskOverflowQueue[*wtdb.BackupID](
db, maxInMemItems, log,
)
require.NoError(t, err)
// Start the queue.
require.NoError(t, q.Start())
// Initially there should be no items on disk.
assertNumDisk(t, db, 0)
// Start filling up the queue.
enqueue(t, q, tasks[0])
enqueue(t, q, tasks[1])
enqueue(t, q, tasks[2])
enqueue(t, q, tasks[3])
enqueue(t, q, tasks[4])
// The queue should now be full, so any new items should be persisted to
// disk.
enqueue(t, q, tasks[5])
waitForNumDisk(t, db, 1)
// Now pop all items from the queue to ensure that the item
// from disk is loaded in properly once there is space.
require.Equal(t, tasks[0], getNext(t, q, 0))
require.Equal(t, tasks[1], getNext(t, q, 1))
require.Equal(t, tasks[2], getNext(t, q, 2))
require.Equal(t, tasks[3], getNext(t, q, 3))
require.Equal(t, tasks[4], getNext(t, q, 4))
require.Equal(t, tasks[5], getNext(t, q, 5))
// There should no longer be any items in the disk queue.
assertNumDisk(t, db, 0)
require.NoError(t, q.Stop())
}
// testRestartWithSmallerBufferSize tests that if the queue is restarted with
// a smaller in-memory buffer size that it was initially started with, then
// tasks are still loaded in the correct order.
func testRestartWithSmallerBufferSize(t *testing.T,
db wtdb.Queue[*wtdb.BackupID]) {
const (
firstMaxInMemItems = 5
secondMaxInMemItems = 2
)
// Generate some backup IDs that we want to add to the queue.
tasks := genBackupIDs(10)
// New mock logger.
log := newMockLogger(t.Logf)
// Init the queue with the mock DB and an initial max in-mem
// items number.
q, err := NewDiskOverflowQueue[*wtdb.BackupID](
db, firstMaxInMemItems, log,
)
require.NoError(t, err)
require.NoError(t, q.Start())
// Add 7 items to the queue. The first 5 will go into the in-mem
// queue, the other 2 will be persisted to the main disk queue.
enqueue(t, q, tasks[0])
enqueue(t, q, tasks[1])
enqueue(t, q, tasks[2])
enqueue(t, q, tasks[3])
enqueue(t, q, tasks[4])
enqueue(t, q, tasks[5])
enqueue(t, q, tasks[6])
waitForNumDisk(t, db, 2)
// Now stop the queue and re-initialise it with a smaller
// buffer maximum.
require.NoError(t, q.Stop())
// Check that there are now 7 items in the disk queue.
waitForNumDisk(t, db, 7)
// Re-init the queue with a smaller max buffer size.
q, err = NewDiskOverflowQueue[*wtdb.BackupID](
db, secondMaxInMemItems, log,
)
require.NoError(t, err)
require.NoError(t, q.Start())
// Once more we shall repeat the above restart process just to ensure
// that in-memory items are correctly re-written and read from the DB.
waitForNumDisk(t, db, 5)
require.NoError(t, q.Stop())
waitForNumDisk(t, db, 7)
q, err = NewDiskOverflowQueue[*wtdb.BackupID](
db, secondMaxInMemItems, log,
)
require.NoError(t, err)
require.NoError(t, q.Start())
waitForNumDisk(t, db, 5)
// Make sure that items are popped off the queue in the correct
// order.
require.Equal(t, tasks[0], getNext(t, q, 0))
require.Equal(t, tasks[1], getNext(t, q, 1))
require.Equal(t, tasks[2], getNext(t, q, 2))
require.Equal(t, tasks[3], getNext(t, q, 3))
require.Equal(t, tasks[4], getNext(t, q, 4))
require.Equal(t, tasks[5], getNext(t, q, 5))
require.Equal(t, tasks[6], getNext(t, q, 6))
require.NoError(t, q.Stop())
}
// testStartStopQueue is a stress test that pushes a large number of tasks
// through the queue while also restarting the queue a couple of times
// throughout.
func testStartStopQueue(t *testing.T, db wtdb.Queue[*wtdb.BackupID]) {
// Generate a lot of backup IDs that we want to add to the
// queue one after the other.
tasks := genBackupIDs(200_000)
// New mock logger.
log := newMockLogger(t.Logf)
// Init the queue with the mock DB.
q, err := NewDiskOverflowQueue[*wtdb.BackupID](
db, DefaultMaxTasksInMemQueue, log,
)
require.NoError(t, err)
// Start the queue.
require.NoError(t, q.Start())
// Initially there should be no items on disk.
assertNumDisk(t, db, 0)
// We need to guard the queue with a mutex since we will be
// stopping, re-creating and starting the queue multiple times.
var (
queueMtx sync.RWMutex
wg sync.WaitGroup
sendDone = make(chan struct{})
)
// This goroutine will constantly try to add new items to the
// queue, even if the queue is stopped.
wg.Add(1)
go func() {
defer wg.Done()
for idx := range tasks {
queueMtx.RLock()
err := q.QueueBackupID(tasks[idx])
require.NoError(t, err)
queueMtx.RUnlock()
}
}()
// This goroutine will repeatedly stop, re-create and start the
// queue until we're done sending items.
wg.Add(1)
go func() {
defer wg.Done()
numRestarts := 0
for {
select {
case <-sendDone:
t.Logf("Restarted queue %d times",
numRestarts)
return
case <-time.After(100 * time.Millisecond):
}
queueMtx.Lock()
require.NoError(t, q.Stop())
q, err = NewDiskOverflowQueue[*wtdb.BackupID](
db, DefaultMaxTasksInMemQueue, log,
)
require.NoError(t, err)
require.NoError(t, q.Start())
queueMtx.Unlock()
numRestarts++
}
}()
// We should be able to read all items from the queue, not being
// affected by restarts, other than needing to wait for the
// queue to be started again.
results := make([]*wtdb.BackupID, 0, len(tasks))
for i := 0; i < len(tasks); i++ {
queueMtx.RLock()
task := getNext(t, q, i)
queueMtx.RUnlock()
results = append(results, task)
}
close(sendDone)
require.Equal(t, tasks, results)
require.NoError(t, q.Stop())
wg.Wait()
}
func getNext(t *testing.T, q *DiskOverflowQueue[*wtdb.BackupID],
i int) *wtdb.BackupID {
var item *wtdb.BackupID
select {
case item = <-q.NextBackupID():
case <-time.After(waitTime):
t.Fatalf("task %d not received in time", i)
}
return item
}
func enqueue(t *testing.T, q *DiskOverflowQueue[*wtdb.BackupID],
task *wtdb.BackupID) {
err := q.QueueBackupID(task)
require.NoError(t, err)
}
func waitForNumDisk(t *testing.T, db wtdb.Queue[*wtdb.BackupID], num int) {
err := wait.Predicate(func() bool {
n, err := db.Len()
require.NoError(t, err)
return n == uint64(num)
}, waitTime)
require.NoError(t, err)
}
func assertNumDisk(t *testing.T, db wtdb.Queue[*wtdb.BackupID], num int) {
n, err := db.Len()
require.NoError(t, err)
require.EqualValues(t, num, n)
}
func genBackupIDs(num int) []*wtdb.BackupID {
ids := make([]*wtdb.BackupID, num)
for i := 0; i < num; i++ {
ids[i] = newBackupID(i)
}
return ids
}
func newBackupID(id int) *wtdb.BackupID {
return &wtdb.BackupID{CommitHeight: uint64(id)}
}
// BenchmarkDiskOverflowQueue benchmarks the performance of adding and removing
// items from the DiskOverflowQueue using an in-memory disk db.
func BenchmarkDiskOverflowQueue(b *testing.B) {
enqueue := func(q *DiskOverflowQueue[*wtdb.BackupID],
task *wtdb.BackupID) {
err := q.QueueBackupID(task)
require.NoError(b, err)
}
getNext := func(q *DiskOverflowQueue[*wtdb.BackupID],
i int) *wtdb.BackupID {
var item *wtdb.BackupID
select {
case item = <-q.NextBackupID():
case <-time.After(time.Second * 2):
b.Fatalf("task %d not received in time", i)
}
return item
}
// Generate some backup IDs that we want to add to the queue.
tasks := genBackupIDs(b.N)
// Create a mock db.
db := wtmock.NewQueueDB[*wtdb.BackupID]()
// New mock logger.
log := newMockLogger(b.Logf)
// Init the queue with the mock DB.
q, err := NewDiskOverflowQueue[*wtdb.BackupID](db, 5, log)
require.NoError(b, err)
// Start the queue.
require.NoError(b, q.Start())
// Start filling up the queue.
for n := 0; n < b.N; n++ {
enqueue(q, tasks[n])
}
// Pop all the items off the queue.
for n := 0; n < b.N; n++ {
require.Equal(b, tasks[n], getNext(q, n))
}
require.NoError(b, q.Stop())
}
type mockLogger struct {
log func(string, ...any)
btclog.Logger
}
func newMockLogger(logger func(string, ...any)) *mockLogger {
return &mockLogger{log: logger}
}
// Errorf formats message according to format specifier and writes to log.
//
// NOTE: this is part of the btclog.Logger interface.
func (l *mockLogger) Errorf(format string, params ...any) {
l.log("[ERR]: "+format, params...)
}