diff --git a/watchtower/wtclient/queue.go b/watchtower/wtclient/queue.go index f2e660457..8581ea888 100644 --- a/watchtower/wtclient/queue.go +++ b/watchtower/wtclient/queue.go @@ -92,21 +92,6 @@ type DiskOverflowQueue[T any] struct { // task. leftOverItem3 *T - // waitForDiskWriteSignal is true if blockDiskWrite should be used to - // wait for an explicit signal before writing an item to disk. This is - // for the purpose of testing only and will be removed after the fix of - // a test flake has been demonstrated. - waitForDiskWriteSignal bool - blockDiskWrite chan struct{} - - // waitForFeedOutputSignal is true if startFeedOutputChan should be used - // to wait for an explicit signal before allowing the feedOutput - // goroutine to begin its duties. This is for the purpose of testing - // only and will be removed after the fix of a test flake has been - // demonstrated. - waitForFeedOutputSignal bool - startFeedOutputChan chan struct{} - quit chan struct{} wg sync.WaitGroup } @@ -121,16 +106,14 @@ func NewDiskOverflowQueue[T any](db wtdb.Queue[T], maxQueueSize uint64, } q := &DiskOverflowQueue[T]{ - log: logger, - db: db, - inputList: list.New(), - newDiskItemSignal: make(chan struct{}, 1), - inputChan: make(chan *internalTask[T]), - memQueue: make(chan T, maxQueueSize-2), - outputChan: make(chan T), - blockDiskWrite: make(chan struct{}, 1), - startFeedOutputChan: make(chan struct{}, 1), - quit: make(chan struct{}), + log: logger, + db: db, + inputList: list.New(), + newDiskItemSignal: make(chan struct{}, 1), + inputChan: make(chan *internalTask[T]), + memQueue: make(chan T, maxQueueSize-2), + outputChan: make(chan T), + quit: make(chan struct{}), } q.inputListCond = sync.NewCond(&q.inputListMu) @@ -147,27 +130,6 @@ func (q *DiskOverflowQueue[T]) Start() error { return err } -// allowDiskWrite is used to explicitly signal that a disk write may take place. -// This is for the purposes of testing only and will be removed once a specific -// test flake fix has been demonstrated. -func (q *DiskOverflowQueue[T]) allowDiskWrite() { - select { - case q.blockDiskWrite <- struct{}{}: - default: - } -} - -// startFeedOutput is used to explicitly signal that the feedOutput goroutine -// may start. -// This is for the purposes of testing only and will be removed once a specific -// test flake fix has been demonstrated. -func (q *DiskOverflowQueue[T]) startFeedOutput() { - select { - case q.startFeedOutputChan <- struct{}{}: - default: - } -} - // start kicks off all the goroutines that are required to manage the queue. func (q *DiskOverflowQueue[T]) start() error { numDisk, err := q.db.Len() @@ -383,21 +345,6 @@ func (q *DiskOverflowQueue[T]) pushToActiveQueue(task T) bool { // If the queue is in disk mode then any new items should be put // straight into the disk queue. if q.toDisk.Load() { - - // If waitForDiskWriteSignal is true, then we wait here for - // an explicit signal before writing the item to disk. This is - // for testing only and will be removed once a specific test - // flake fix has been demonstrated. - if q.waitForDiskWriteSignal { - select { - case <-q.blockDiskWrite: - case <-q.quit: - q.leftOverItem3 = &task - - return false - } - } - err := q.db.Push(task) if err != nil { // Log and back off for a few seconds and then @@ -605,18 +552,6 @@ func (q *DiskOverflowQueue[T]) feedOutputChan() { q.wg.Done() }() - // If waitForFeedOutputSignal is true, then we wait here for an explicit - // signal before starting the main loop of the function. This is for - // testing only and will be removed once a specific test flake fix has - // been demonstrated. - if q.waitForFeedOutputSignal { - select { - case <-q.startFeedOutputChan: - case <-q.quit: - return - } - } - for { select { case nextTask, ok := <-q.memQueue: diff --git a/watchtower/wtclient/queue_test.go b/watchtower/wtclient/queue_test.go index e87ca1480..529acb49a 100644 --- a/watchtower/wtclient/queue_test.go +++ b/watchtower/wtclient/queue_test.go @@ -38,10 +38,6 @@ func TestDiskOverflowQueue(t *testing.T) { name: "start stop queue", run: testStartStopQueue, }, - { - name: "demo flake", - run: demoFlake, - }, } initDB := func() wtdb.Queue[*wtdb.BackupID] { @@ -75,101 +71,6 @@ func TestDiskOverflowQueue(t *testing.T) { } } -// demoFlake is a temporary test demonstrating the fix of a race condition that -// existed in the DiskOverflowQueue. It contrives a scenario that once resulted -// in the queue being in memory mode when it really should be in disk mode. -func demoFlake(t *testing.T, db wtdb.Queue[*wtdb.BackupID]) { - // Generate some backup IDs that we want to add to the queue. - tasks := genBackupIDs(10) - - // New mock logger. - log := newMockLogger(t.Logf) - - // Init the queue with the mock DB. - q, err := NewDiskOverflowQueue[*wtdb.BackupID]( - db, maxInMemItems, log, - ) - require.NoError(t, err) - - // Set the two test variables to true so that we have more control over - // the feedOutput goroutine and disk writes from the test. - q.waitForDiskWriteSignal = true - q.waitForFeedOutputSignal = true - - // Start the queue. - require.NoError(t, q.Start()) - - // Initially there should be no items on disk. - assertNumDisk(t, db, 0) - - // Start filling up the queue. The maxInMemItems is 5, meaning that the - // memQueue capacity is 3. Since the feedOutput goroutine has not yet - // started, these first 3 items (tasks 0-2) will fill the memQueue. - enqueue(t, q, tasks[0]) - enqueue(t, q, tasks[1]) - enqueue(t, q, tasks[2]) - - // Adding task 3 is expected to result in the mode being changed to disk - // mode. - enqueue(t, q, tasks[3]) - - // Show that the mode does actually change to disk mode. - err = wait.Predicate(func() bool { - return q.toDisk.Load() - }, waitTime) - require.NoError(t, err) - - // Allow task 3 to be written to disk. This will send a signal on - // newDiskItemsSignal. - q.allowDiskWrite() - - // Task 3 will almost immediately be popped from disk again due to - // the newDiskItemsSignal causing feedMemQueue to call feedFromDisk. - waitForNumDisk(t, db, 0) - - // Enqueue task 4 but don't allow it to be written to disk yet. - enqueue(t, q, tasks[4]) - - // Wait a bit just to make sure that task 4 has passed the - // if q.toDisk.Load() check in pushToActiveQueue and is waiting on the - // allowDiskWrite signal. - time.Sleep(time.Second) - - // Now, start the feedOutput goroutine. This will pop 1 item from the - // memQueue meaning that feedMemQueue will now manage to push an item - // onto the memQueue & will go onto the next iteration of feedFromDisk - // which will then see that there are no items on disk and so will - // change the mode to memory-mode. - q.startFeedOutput() - - err = wait.Predicate(func() bool { - return !q.toDisk.Load() - }, waitTime) - require.NoError(t, err) - - // Now, we allow task 4 to be written to disk. This will result in a - // newDiskItemsSignal being sent meaning that feedMemQueue will read - // from disk and block on pushing the new item to memQueue. - q.allowDiskWrite() - - // The above will result in feeMemQueue switching the mode to disk-mode. - err = wait.Predicate(func() bool { - return q.toDisk.Load() - }, waitTime) - require.NoError(t, err) - - // Now, if we enqueue task 5, it _will_ be written to disk since the - // queue is currently in disk mode. - enqueue(t, q, tasks[5]) - q.allowDiskWrite() - - // Show that there is an item on disk at this point. This demonstrates - // that the bug has been fixed. - waitForNumDisk(t, db, 1) - - require.NoError(t, q.Stop()) -} - // testOverflowToDisk is a basic test that ensures that the queue correctly // overflows items to disk and then correctly reloads them. func testOverflowToDisk(t *testing.T, db wtdb.Queue[*wtdb.BackupID]) {