mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-03-28 02:33:22 +01:00
wtclient: demonstrate overflow queue flake
In this commit, some temporary variables and logic is added to the DiskOverflowQueue for easy stop/go control from unit tests. This is then used to write a temporary unit tests that demonstrates a race condition that can cause the queue to be in disk mode when it should be in memory mode. This new code & test will be removed after the issue has been fixed.
This commit is contained in:
parent
26c466aa80
commit
d4fefda10a
@ -92,6 +92,21 @@ type DiskOverflowQueue[T any] struct {
|
||||
// task.
|
||||
leftOverItem3 *T
|
||||
|
||||
// waitForDiskWriteSignal is true if blockDiskWrite should be used to
|
||||
// wait for an explicit signal before writing an item to disk. This is
|
||||
// for the purpose of testing only and will be removed after the fix of
|
||||
// a test flake has been demonstrated.
|
||||
waitForDiskWriteSignal bool
|
||||
blockDiskWrite chan struct{}
|
||||
|
||||
// waitForFeedOutputSignal is true if startFeedOutputChan should be used
|
||||
// to wait for an explicit signal before allowing the feedOutput
|
||||
// goroutine to begin its duties. This is for the purpose of testing
|
||||
// only and will be removed after the fix of a test flake has been
|
||||
// demonstrated.
|
||||
waitForFeedOutputSignal bool
|
||||
startFeedOutputChan chan struct{}
|
||||
|
||||
quit chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
@ -106,14 +121,16 @@ func NewDiskOverflowQueue[T any](db wtdb.Queue[T], maxQueueSize uint64,
|
||||
}
|
||||
|
||||
q := &DiskOverflowQueue[T]{
|
||||
log: logger,
|
||||
db: db,
|
||||
inputList: list.New(),
|
||||
newDiskItemSignal: make(chan struct{}, 1),
|
||||
inputChan: make(chan *internalTask[T]),
|
||||
memQueue: make(chan T, maxQueueSize-2),
|
||||
outputChan: make(chan T),
|
||||
quit: make(chan struct{}),
|
||||
log: logger,
|
||||
db: db,
|
||||
inputList: list.New(),
|
||||
newDiskItemSignal: make(chan struct{}, 1),
|
||||
inputChan: make(chan *internalTask[T]),
|
||||
memQueue: make(chan T, maxQueueSize-2),
|
||||
outputChan: make(chan T),
|
||||
blockDiskWrite: make(chan struct{}, 1),
|
||||
startFeedOutputChan: make(chan struct{}, 1),
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
q.inputListCond = sync.NewCond(&q.inputListMu)
|
||||
|
||||
@ -130,6 +147,27 @@ func (q *DiskOverflowQueue[T]) Start() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// allowDiskWrite is used to explicitly signal that a disk write may take place.
|
||||
// This is for the purposes of testing only and will be removed once a specific
|
||||
// test flake fix has been demonstrated.
|
||||
func (q *DiskOverflowQueue[T]) allowDiskWrite() {
|
||||
select {
|
||||
case q.blockDiskWrite <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// startFeedOutput is used to explicitly signal that the feedOutput goroutine
|
||||
// may start.
|
||||
// This is for the purposes of testing only and will be removed once a specific
|
||||
// test flake fix has been demonstrated.
|
||||
func (q *DiskOverflowQueue[T]) startFeedOutput() {
|
||||
select {
|
||||
case q.startFeedOutputChan <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// start kicks off all the goroutines that are required to manage the queue.
|
||||
func (q *DiskOverflowQueue[T]) start() error {
|
||||
numDisk, err := q.db.Len()
|
||||
@ -345,6 +383,21 @@ func (q *DiskOverflowQueue[T]) pushToActiveQueue(task T) bool {
|
||||
// If the queue is in disk mode then any new items should be put
|
||||
// straight into the disk queue.
|
||||
if q.toDisk.Load() {
|
||||
|
||||
// If waitForDiskWriteSignal is true, then we wait here for
|
||||
// an explicit signal before writing the item to disk. This is
|
||||
// for testing only and will be removed once a specific test
|
||||
// flake fix has been demonstrated.
|
||||
if q.waitForDiskWriteSignal {
|
||||
select {
|
||||
case <-q.blockDiskWrite:
|
||||
case <-q.quit:
|
||||
q.leftOverItem3 = &task
|
||||
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
err := q.db.Push(task)
|
||||
if err != nil {
|
||||
// Log and back off for a few seconds and then
|
||||
@ -545,6 +598,18 @@ func (q *DiskOverflowQueue[T]) feedOutputChan() {
|
||||
q.wg.Done()
|
||||
}()
|
||||
|
||||
// If waitForFeedOutputSignal is true, then we wait here for an explicit
|
||||
// signal before starting the main loop of the function. This is for
|
||||
// testing only and will be removed once a specific test flake fix has
|
||||
// been demonstrated.
|
||||
if q.waitForFeedOutputSignal {
|
||||
select {
|
||||
case <-q.startFeedOutputChan:
|
||||
case <-q.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case nextTask, ok := <-q.memQueue:
|
||||
|
@ -38,6 +38,10 @@ func TestDiskOverflowQueue(t *testing.T) {
|
||||
name: "start stop queue",
|
||||
run: testStartStopQueue,
|
||||
},
|
||||
{
|
||||
name: "demo flake",
|
||||
run: demoFlake,
|
||||
},
|
||||
}
|
||||
|
||||
initDB := func() wtdb.Queue[*wtdb.BackupID] {
|
||||
@ -71,6 +75,97 @@ func TestDiskOverflowQueue(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// demoFlake is a temporary test demonstrating race condition that currently
|
||||
// exists in the DiskOverflowQueue. It contrives a scenario that results in
|
||||
// the queue being in memory mode when it really should be in disk mode.
|
||||
func demoFlake(t *testing.T, db wtdb.Queue[*wtdb.BackupID]) {
|
||||
// Generate some backup IDs that we want to add to the queue.
|
||||
tasks := genBackupIDs(10)
|
||||
|
||||
// New mock logger.
|
||||
log := newMockLogger(t.Logf)
|
||||
|
||||
// Init the queue with the mock DB.
|
||||
q, err := NewDiskOverflowQueue[*wtdb.BackupID](
|
||||
db, maxInMemItems, log,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Set the two test variables to true so that we have more control over
|
||||
// the feedOutput goroutine and disk writes from the test.
|
||||
q.waitForDiskWriteSignal = true
|
||||
q.waitForFeedOutputSignal = true
|
||||
|
||||
// Start the queue.
|
||||
require.NoError(t, q.Start())
|
||||
|
||||
// Initially there should be no items on disk.
|
||||
assertNumDisk(t, db, 0)
|
||||
|
||||
// Start filling up the queue. The maxInMemItems is 5, meaning that the
|
||||
// memQueue capacity is 3. Since the feedOutput goroutine has not yet
|
||||
// started, these first 3 items (tasks 0-2) will fill the memQueue.
|
||||
enqueue(t, q, tasks[0])
|
||||
enqueue(t, q, tasks[1])
|
||||
enqueue(t, q, tasks[2])
|
||||
|
||||
// Adding task 3 is expected to result in the mode being changed to disk
|
||||
// mode.
|
||||
enqueue(t, q, tasks[3])
|
||||
|
||||
// Show that the mode does actually change to disk mode.
|
||||
err = wait.Predicate(func() bool {
|
||||
return q.toDisk.Load()
|
||||
}, waitTime)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Allow task 3 to be written to disk. This will send a signal on
|
||||
// newDiskItemsSignal.
|
||||
q.allowDiskWrite()
|
||||
|
||||
// Task 3 will almost immediately be popped from disk again due to
|
||||
// the newDiskItemsSignal causing feedMemQueue to call feedFromDisk.
|
||||
waitForNumDisk(t, db, 0)
|
||||
|
||||
// Enqueue task 4 but don't allow it to be written to disk yet.
|
||||
enqueue(t, q, tasks[4])
|
||||
|
||||
// Wait a bit just to make sure that task 4 has passed the
|
||||
// if q.toDisk.Load() check in pushToActiveQueue and is waiting on the
|
||||
// allowDiskWrite signal.
|
||||
time.Sleep(time.Second)
|
||||
|
||||
// Now, start the feedOutput goroutine. This will pop 1 item from the
|
||||
// memQueue meaning that feedMemQueue will now manage to push an item
|
||||
// onto the memQueue & will go onto the next iteration of feedFromDisk
|
||||
// which will then see that there are no items on disk and so will
|
||||
// change the mode to memory-mode.
|
||||
q.startFeedOutput()
|
||||
|
||||
err = wait.Predicate(func() bool {
|
||||
return !q.toDisk.Load()
|
||||
}, waitTime)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Now, we allow task 4 to be written to disk. NOTE that this is
|
||||
// happening while the mode is memory mode! This is the bug! This will
|
||||
// result in a newDiskItemsSignal being sent meaning that feedMemQueue
|
||||
// will read from disk and block on pushing the new item to memQueue.
|
||||
q.allowDiskWrite()
|
||||
|
||||
// Now, if we enqueue task 5, it will _not_ be written to disk since the
|
||||
// queue is currently in memory mode. This is the bug that needs to be
|
||||
// fixed.
|
||||
enqueue(t, q, tasks[5])
|
||||
q.allowDiskWrite()
|
||||
|
||||
// Show that there are no items on disk at this point. When the bug is
|
||||
// fixed, this should be changed to 1.
|
||||
waitForNumDisk(t, db, 0)
|
||||
|
||||
require.NoError(t, q.Stop())
|
||||
}
|
||||
|
||||
// testOverflowToDisk is a basic test that ensures that the queue correctly
|
||||
// overflows items to disk and then correctly reloads them.
|
||||
func testOverflowToDisk(t *testing.T, db wtdb.Queue[*wtdb.BackupID]) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user