mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-09-05 08:41:42 +02:00
missioncontrolstore: avoid ticker when there is no work to be done.
This modifies the mission control store to avoid running the one second ticker for flushing data when there is no work to be done. This improves performance of a quiscent LN node by avoiding a one second interval busy loop that does nothing when there are no payments flowing through the node.
This commit is contained in:
@@ -38,12 +38,15 @@ const (
|
||||
// Also changes to mission control parameters can be applied to historical data.
|
||||
// Finally, it enables importing raw data from an external source.
|
||||
type missionControlStore struct {
|
||||
done chan struct{}
|
||||
wg sync.WaitGroup
|
||||
db kvdb.Backend
|
||||
queueMx sync.Mutex
|
||||
done chan struct{}
|
||||
wg sync.WaitGroup
|
||||
db kvdb.Backend
|
||||
|
||||
// queueCond is signalled when items are put into the queue.
|
||||
queueCond *sync.Cond
|
||||
|
||||
// queue stores all pending payment results not yet added to the store.
|
||||
// Access is protected by the queueCond.L mutex.
|
||||
queue *list.List
|
||||
|
||||
// keys holds the stored MC store item keys in the order of storage.
|
||||
@@ -101,6 +104,7 @@ func newMissionControlStore(db kvdb.Backend, maxRecords int,
|
||||
return &missionControlStore{
|
||||
done: make(chan struct{}),
|
||||
db: db,
|
||||
queueCond: sync.NewCond(&sync.Mutex{}),
|
||||
queue: list.New(),
|
||||
keys: keys,
|
||||
keysMap: keysMap,
|
||||
@@ -111,8 +115,8 @@ func newMissionControlStore(db kvdb.Backend, maxRecords int,
|
||||
|
||||
// clear removes all results from the db.
|
||||
func (b *missionControlStore) clear() error {
|
||||
b.queueMx.Lock()
|
||||
defer b.queueMx.Unlock()
|
||||
b.queueCond.L.Lock()
|
||||
defer b.queueCond.L.Unlock()
|
||||
|
||||
err := kvdb.Update(b.db, func(tx kvdb.RwTx) error {
|
||||
if err := tx.DeleteTopLevelBucket(resultsKey); err != nil {
|
||||
@@ -267,14 +271,19 @@ func deserializeResult(k, v []byte) (*paymentResult, error) {
|
||||
|
||||
// AddResult adds a new result to the db.
|
||||
func (b *missionControlStore) AddResult(rp *paymentResult) {
|
||||
b.queueMx.Lock()
|
||||
defer b.queueMx.Unlock()
|
||||
b.queueCond.L.Lock()
|
||||
b.queue.PushBack(rp)
|
||||
b.queueCond.L.Unlock()
|
||||
|
||||
b.queueCond.Signal()
|
||||
}
|
||||
|
||||
// stop stops the store ticker goroutine.
|
||||
func (b *missionControlStore) stop() {
|
||||
close(b.done)
|
||||
|
||||
b.queueCond.Signal()
|
||||
|
||||
b.wg.Wait()
|
||||
}
|
||||
|
||||
@@ -283,19 +292,51 @@ func (b *missionControlStore) run() {
|
||||
b.wg.Add(1)
|
||||
|
||||
go func() {
|
||||
ticker := time.NewTicker(b.flushInterval)
|
||||
defer ticker.Stop()
|
||||
defer b.wg.Done()
|
||||
|
||||
timer := time.NewTimer(b.flushInterval)
|
||||
|
||||
// Immediately stop the timer. It will be started once new
|
||||
// items are added to the store. As the doc for time.Timer
|
||||
// states, every call to Stop() done on a timer that is not
|
||||
// known to have been fired needs to be checked and the timer's
|
||||
// channel needs to be drained appropriately. This could happen
|
||||
// if the flushInterval is very small (e.g. 1 nanosecond).
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
|
||||
for {
|
||||
// Wait for the queue to not be empty.
|
||||
b.queueCond.L.Lock()
|
||||
for b.queue.Front() == nil {
|
||||
b.queueCond.Wait()
|
||||
|
||||
select {
|
||||
case <-b.done:
|
||||
b.queueCond.L.Unlock()
|
||||
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
b.queueCond.L.Unlock()
|
||||
|
||||
// Restart the timer.
|
||||
timer.Reset(b.flushInterval)
|
||||
|
||||
select {
|
||||
case <-ticker.C:
|
||||
case <-timer.C:
|
||||
if err := b.storeResults(); err != nil {
|
||||
log.Errorf("Failed to update mission "+
|
||||
"control store: %v", err)
|
||||
}
|
||||
|
||||
case <-b.done:
|
||||
// Release the timer's resources.
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -306,16 +347,16 @@ func (b *missionControlStore) run() {
|
||||
func (b *missionControlStore) storeResults() error {
|
||||
// We copy a reference to the queue and clear the original queue to be
|
||||
// able to release the lock.
|
||||
b.queueMx.Lock()
|
||||
b.queueCond.L.Lock()
|
||||
l := b.queue
|
||||
|
||||
if l.Len() == 0 {
|
||||
b.queueMx.Unlock()
|
||||
b.queueCond.L.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
b.queue = list.New()
|
||||
b.queueMx.Unlock()
|
||||
b.queueCond.L.Unlock()
|
||||
|
||||
var (
|
||||
keys *list.List
|
||||
|
Reference in New Issue
Block a user