htlcswitch: defer processRemoteAdds when quiescent

In this commit we defer processRemoteAdds using a new mechanism on
the quiescer where we capture a closure that needs to be run. We
do this because we need to avoid the scenario where we send back
immediate resolutions to the newly added HTLCs when quiescent as
it is a protocol violation. It is not enough for us to simply defer
sending the messages since the purpose of quiescence itself is to
have well-defined and agreed upon channel state. If, for whatever
reason, the node (or connection) is restarted between when these
hooks are captured and when they are ultimately run, they will
be resolved by the resolveFwdPkgs logic when the link comes back
up.

In a future commit we will explicitly call the quiescer's resume
method when it is OK for htlc traffic to commence.
This commit is contained in:
Keagan McClelland
2024-04-08 18:17:00 -07:00
parent 77fd8c6a21
commit 4fbab45a5f
3 changed files with 94 additions and 1 deletions

View File

@@ -2500,8 +2500,22 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
}
}
// If we can send updates then we can process adds in case we
// are the exit hop and need to send back resolutions, or in
// case there are validity issues with the packets. Otherwise
// we defer the action until resume.
//
// We are free to process the settles and fails without this
// check since processing those can't result in further updates
// to this channel link.
if l.quiescer.CanSendUpdates() {
l.processRemoteAdds(fwdPkg)
} else {
l.quiescer.OnResume(func() {
l.processRemoteAdds(fwdPkg)
})
}
l.processRemoteSettleFails(fwdPkg)
l.processRemoteAdds(fwdPkg)
// If the link failed during processing the adds, we must
// return to ensure we won't attempted to update the state

View File

@@ -90,6 +90,11 @@ type Quiescer struct {
// resolve when we complete quiescence.
activeQuiescenceReq fn.Option[StfuReq]
// resumeQueue is a slice of hooks that will be called when the quiescer
// is resumed. These are actions that needed to be deferred while the
// channel was quiescent.
resumeQueue []func()
sync.RWMutex
}
@@ -400,3 +405,42 @@ func (q *Quiescer) initStfu(req StfuReq) {
q.localInit = true
q.activeQuiescenceReq = fn.Some(req)
}
// OnResume accepts a no return closure that will run when the quiescer is
// resumed.
func (q *Quiescer) OnResume(hook func()) {
q.Lock()
defer q.Unlock()
q.onResume(hook)
}
// onResume accepts a no return closure that will run when the quiescer is
// resumed.
func (q *Quiescer) onResume(hook func()) {
q.resumeQueue = append(q.resumeQueue, hook)
}
// Resume runs all of the deferred actions that have accumulated while the
// channel has been quiescent and then resets the quiescer state to its initial
// state.
func (q *Quiescer) Resume() {
q.Lock()
defer q.Unlock()
q.resume()
}
// resume runs all of the deferred actions that have accumulated while the
// channel has been quiescent and then resets the quiescer state to its initial
// state.
func (q *Quiescer) resume() {
for _, hook := range q.resumeQueue {
hook()
}
q.localInit = false
q.remoteInit = false
q.sent = false
q.received = false
q.resumeQueue = nil
}

View File

@@ -380,3 +380,38 @@ func TestQuiescerTieBreaker(t *testing.T) {
}
}
}
// TestQuiescerResume ensures that the hooks that are attached to the quiescer
// are called when we call the resume method and no earlier.
func TestQuiescerResume(t *testing.T) {
t.Parallel()
harness := initQuiescerTestHarness(lntypes.Local)
msg := lnwire.Stfu{
ChanID: cid,
Initiator: true,
}
require.NoError(
t, harness.quiescer.RecvStfu(
msg, harness.pendingUpdates.Remote,
),
)
require.NoError(
t, harness.quiescer.SendOwedStfu(
harness.pendingUpdates.Local,
),
)
require.True(t, harness.quiescer.IsQuiescent())
var resumeHooksCalled = false
harness.quiescer.OnResume(func() {
resumeHooksCalled = true
})
require.False(t, resumeHooksCalled)
harness.quiescer.Resume()
require.True(t, resumeHooksCalled)
require.False(t, harness.quiescer.IsQuiescent())
}