diff --git a/routing/missioncontrol_store.go b/routing/missioncontrol_store.go index cfe0a2cf0..975380af6 100644 --- a/routing/missioncontrol_store.go +++ b/routing/missioncontrol_store.go @@ -38,12 +38,15 @@ const ( // Also changes to mission control parameters can be applied to historical data. // Finally, it enables importing raw data from an external source. type missionControlStore struct { - done chan struct{} - wg sync.WaitGroup - db kvdb.Backend - queueMx sync.Mutex + done chan struct{} + wg sync.WaitGroup + db kvdb.Backend + + // queueCond is signalled when items are put into the queue. + queueCond *sync.Cond // queue stores all pending payment results not yet added to the store. + // Access is protected by the queueCond.L mutex. queue *list.List // keys holds the stored MC store item keys in the order of storage. @@ -101,6 +104,7 @@ func newMissionControlStore(db kvdb.Backend, maxRecords int, return &missionControlStore{ done: make(chan struct{}), db: db, + queueCond: sync.NewCond(&sync.Mutex{}), queue: list.New(), keys: keys, keysMap: keysMap, @@ -111,8 +115,8 @@ func newMissionControlStore(db kvdb.Backend, maxRecords int, // clear removes all results from the db. func (b *missionControlStore) clear() error { - b.queueMx.Lock() - defer b.queueMx.Unlock() + b.queueCond.L.Lock() + defer b.queueCond.L.Unlock() err := kvdb.Update(b.db, func(tx kvdb.RwTx) error { if err := tx.DeleteTopLevelBucket(resultsKey); err != nil { @@ -267,14 +271,19 @@ func deserializeResult(k, v []byte) (*paymentResult, error) { // AddResult adds a new result to the db. func (b *missionControlStore) AddResult(rp *paymentResult) { - b.queueMx.Lock() - defer b.queueMx.Unlock() + b.queueCond.L.Lock() b.queue.PushBack(rp) + b.queueCond.L.Unlock() + + b.queueCond.Signal() } // stop stops the store ticker goroutine. func (b *missionControlStore) stop() { close(b.done) + + b.queueCond.Signal() + b.wg.Wait() } @@ -283,19 +292,51 @@ func (b *missionControlStore) run() { b.wg.Add(1) go func() { - ticker := time.NewTicker(b.flushInterval) - defer ticker.Stop() defer b.wg.Done() + timer := time.NewTimer(b.flushInterval) + + // Immediately stop the timer. It will be started once new + // items are added to the store. As the doc for time.Timer + // states, every call to Stop() done on a timer that is not + // known to have been fired needs to be checked and the timer's + // channel needs to be drained appropriately. This could happen + // if the flushInterval is very small (e.g. 1 nanosecond). + if !timer.Stop() { + <-timer.C + } + for { + // Wait for the queue to not be empty. + b.queueCond.L.Lock() + for b.queue.Front() == nil { + b.queueCond.Wait() + + select { + case <-b.done: + b.queueCond.L.Unlock() + + return + default: + } + } + b.queueCond.L.Unlock() + + // Restart the timer. + timer.Reset(b.flushInterval) + select { - case <-ticker.C: + case <-timer.C: if err := b.storeResults(); err != nil { log.Errorf("Failed to update mission "+ "control store: %v", err) } case <-b.done: + // Release the timer's resources. + if !timer.Stop() { + <-timer.C + } return } } @@ -306,16 +347,16 @@ func (b *missionControlStore) run() { func (b *missionControlStore) storeResults() error { // We copy a reference to the queue and clear the original queue to be // able to release the lock. - b.queueMx.Lock() + b.queueCond.L.Lock() l := b.queue if l.Len() == 0 { - b.queueMx.Unlock() + b.queueCond.L.Unlock() return nil } b.queue = list.New() - b.queueMx.Unlock() + b.queueCond.L.Unlock() var ( keys *list.List