mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-07-08 22:41:00 +02:00
htlcswitch: add quiescence timeout that is aborted by Resume
This commit is contained in:
@ -490,6 +490,10 @@ func NewChannelLink(cfg ChannelLinkConfig,
|
||||
sendMsg: func(s lnwire.Stfu) error {
|
||||
return cfg.Peer.SendMessage(false, &s)
|
||||
},
|
||||
timeoutDuration: defaultQuiescenceTimeout,
|
||||
onTimeout: func() {
|
||||
cfg.Peer.Disconnect(ErrQuiescenceTimeout)
|
||||
},
|
||||
})
|
||||
} else {
|
||||
qsm = &quiescerNoop{}
|
||||
|
@ -3,6 +3,7 @@ package htlcswitch
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/lightningnetwork/lnd/fn"
|
||||
"github.com/lightningnetwork/lnd/lntypes"
|
||||
@ -42,7 +43,15 @@ var (
|
||||
ErrPendingLocalUpdates = fmt.Errorf(
|
||||
"stfu send attempted with pending local updates",
|
||||
)
|
||||
|
||||
// ErrQuiescenceTimeout indicates that the quiescer has been quiesced
|
||||
// beyond the allotted time.
|
||||
ErrQuiescenceTimeout = fmt.Errorf(
|
||||
"quiescence timeout",
|
||||
)
|
||||
)
|
||||
|
||||
const defaultQuiescenceTimeout = 30 * time.Second
|
||||
|
||||
type StfuReq = fn.Req[fn.Unit, fn.Result[lntypes.ChannelParty]]
|
||||
|
||||
@ -108,6 +117,17 @@ type QuiescerCfg struct {
|
||||
// sendMsg is a function that can be used to send an Stfu message over
|
||||
// the wire.
|
||||
sendMsg func(lnwire.Stfu) error
|
||||
|
||||
// timeoutDuration is the Duration that we will wait from the moment the
|
||||
// channel is considered quiescent before we call the onTimeout function
|
||||
timeoutDuration time.Duration
|
||||
|
||||
// onTimeout is a function that will be called in the event that the
|
||||
// Quiescer has not been resumed before the timeout is reached. If
|
||||
// Quiescer.Resume is called before the timeout has been raeached, then
|
||||
// onTimeout will not be called until the quiescer reaches a quiescent
|
||||
// state again.
|
||||
onTimeout func()
|
||||
}
|
||||
|
||||
// QuiescerLive is a state machine that tracks progression through the
|
||||
@ -140,6 +160,10 @@ type QuiescerLive struct {
|
||||
// channel was quiescent.
|
||||
resumeQueue []func()
|
||||
|
||||
// timeoutTimer is a field that is used to hold onto the timeout job
|
||||
// when we reach quiescence.
|
||||
timeoutTimer *time.Timer
|
||||
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
@ -195,6 +219,10 @@ func (q *QuiescerLive) recvStfu(msg lnwire.Stfu,
|
||||
// If so, we will try to resolve any outstanding StfuReqs.
|
||||
q.tryResolveStfuReq()
|
||||
|
||||
if q.isQuiescent() {
|
||||
q.startTimeout()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -398,6 +426,10 @@ func (q *QuiescerLive) sendOwedStfu(numPendingLocalUpdates uint64) error {
|
||||
// state. If so, we will try to resolve any outstanding
|
||||
// StfuReqs.
|
||||
q.tryResolveStfuReq()
|
||||
|
||||
if q.isQuiescent() {
|
||||
q.startTimeout()
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
@ -480,6 +512,10 @@ func (q *QuiescerLive) Resume() {
|
||||
// channel has been quiescent and then resets the quiescer state to its initial
|
||||
// state.
|
||||
func (q *QuiescerLive) resume() {
|
||||
// since we are resuming we want to cancel the quiescence timeout
|
||||
// action.
|
||||
q.cancelTimeout()
|
||||
|
||||
for _, hook := range q.resumeQueue {
|
||||
hook()
|
||||
}
|
||||
@ -490,6 +526,34 @@ func (q *QuiescerLive) resume() {
|
||||
q.resumeQueue = nil
|
||||
}
|
||||
|
||||
// startTimeout starts the timeout function that fires if the quiescer remains
|
||||
// in a quiesced state for too long. If this function is called multiple times
|
||||
// only the last one will have an effect.
|
||||
func (q *QuiescerLive) startTimeout() {
|
||||
if q.cfg.onTimeout == nil {
|
||||
return
|
||||
}
|
||||
|
||||
old := q.timeoutTimer
|
||||
|
||||
q.timeoutTimer = time.AfterFunc(q.cfg.timeoutDuration, q.cfg.onTimeout)
|
||||
|
||||
if old != nil {
|
||||
old.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
// cancelTimeout cancels the timeout function that would otherwise fire if the
|
||||
// quiescer remains in a quiesced state too long. If this function is called
|
||||
// before startTimeout or after another call to cancelTimeout, the effect will
|
||||
// be a noop.
|
||||
func (q *QuiescerLive) cancelTimeout() {
|
||||
if q.timeoutTimer != nil {
|
||||
q.timeoutTimer.Stop()
|
||||
q.timeoutTimer = nil
|
||||
}
|
||||
}
|
||||
|
||||
type quiescerNoop struct{}
|
||||
|
||||
var _ Quiescer = (*quiescerNoop)(nil)
|
||||
|
@ -3,6 +3,7 @@ package htlcswitch
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/lightningnetwork/lnd/fn"
|
||||
"github.com/lightningnetwork/lnd/lntypes"
|
||||
@ -417,3 +418,58 @@ func TestQuiescerResume(t *testing.T) {
|
||||
require.True(t, resumeHooksCalled)
|
||||
require.False(t, harness.quiescer.IsQuiescent())
|
||||
}
|
||||
|
||||
func TestQuiescerTimeoutTriggers(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
harness := initQuiescerTestHarness(lntypes.Local)
|
||||
|
||||
msg := lnwire.Stfu{
|
||||
ChanID: cid,
|
||||
Initiator: true,
|
||||
}
|
||||
|
||||
timeoutGate := make(chan struct{})
|
||||
|
||||
harness.quiescer.cfg.timeoutDuration = time.Second
|
||||
harness.quiescer.cfg.onTimeout = func() { close(timeoutGate) }
|
||||
|
||||
err := harness.quiescer.RecvStfu(msg, harness.pendingUpdates.Remote)
|
||||
require.NoError(t, err)
|
||||
err = harness.quiescer.SendOwedStfu(harness.pendingUpdates.Local)
|
||||
require.NoError(t, err)
|
||||
|
||||
select {
|
||||
case <-timeoutGate:
|
||||
case <-time.After(2 * harness.quiescer.cfg.timeoutDuration):
|
||||
t.Fatal("quiescence timeout did not trigger")
|
||||
}
|
||||
}
|
||||
|
||||
func TestQuiescerTimeoutAborts(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
harness := initQuiescerTestHarness(lntypes.Local)
|
||||
|
||||
msg := lnwire.Stfu{
|
||||
ChanID: cid,
|
||||
Initiator: true,
|
||||
}
|
||||
|
||||
timeoutGate := make(chan struct{})
|
||||
|
||||
harness.quiescer.cfg.timeoutDuration = time.Second
|
||||
harness.quiescer.cfg.onTimeout = func() { close(timeoutGate) }
|
||||
|
||||
err := harness.quiescer.RecvStfu(msg, harness.pendingUpdates.Remote)
|
||||
require.NoError(t, err)
|
||||
err = harness.quiescer.SendOwedStfu(harness.pendingUpdates.Local)
|
||||
require.NoError(t, err)
|
||||
harness.quiescer.Resume()
|
||||
|
||||
select {
|
||||
case <-timeoutGate:
|
||||
t.Fatal("quiescence timeout triggered despite being resumed")
|
||||
case <-time.After(2 * harness.quiescer.cfg.timeoutDuration):
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user