From 4fbab45a5fcd3a85672e15ef68898ff2ab511276 Mon Sep 17 00:00:00 2001 From: Keagan McClelland Date: Mon, 8 Apr 2024 18:17:00 -0700 Subject: [PATCH] htlcswitch: defer processRemoteAdds when quiescent In this commit we defer processRemoteAdds using a new mechanism on the quiescer where we capture a closure that needs to be run. We do this because we need to avoid the scenario where we send back immediate resolutions to the newly added HTLCs when quiescent as it is a protocol violation. It is not enough for us to simply defer sending the messages since the purpose of quiescence itself is to have well-defined and agreed upon channel state. If, for whatever reason, the node (or connection) is restarted between when these hooks are captured and when they are ultimately run, they will be resolved by the resolveFwdPkgs logic when the link comes back up. In a future commit we will explicitly call the quiescer's resume method when it is OK for htlc traffic to commence. --- htlcswitch/link.go | 16 +++++++++++++- htlcswitch/quiescer.go | 44 +++++++++++++++++++++++++++++++++++++ htlcswitch/quiescer_test.go | 35 +++++++++++++++++++++++++++++ 3 files changed, 94 insertions(+), 1 deletion(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 449c81179..980f4dcb6 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -2500,8 +2500,22 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { } } + // If we can send updates then we can process adds in case we + // are the exit hop and need to send back resolutions, or in + // case there are validity issues with the packets. Otherwise + // we defer the action until resume. + // + // We are free to process the settles and fails without this + // check since processing those can't result in further updates + // to this channel link. + if l.quiescer.CanSendUpdates() { + l.processRemoteAdds(fwdPkg) + } else { + l.quiescer.OnResume(func() { + l.processRemoteAdds(fwdPkg) + }) + } l.processRemoteSettleFails(fwdPkg) - l.processRemoteAdds(fwdPkg) // If the link failed during processing the adds, we must // return to ensure we won't attempted to update the state diff --git a/htlcswitch/quiescer.go b/htlcswitch/quiescer.go index 300988c7a..3fa30597b 100644 --- a/htlcswitch/quiescer.go +++ b/htlcswitch/quiescer.go @@ -90,6 +90,11 @@ type Quiescer struct { // resolve when we complete quiescence. activeQuiescenceReq fn.Option[StfuReq] + // resumeQueue is a slice of hooks that will be called when the quiescer + // is resumed. These are actions that needed to be deferred while the + // channel was quiescent. + resumeQueue []func() + sync.RWMutex } @@ -400,3 +405,42 @@ func (q *Quiescer) initStfu(req StfuReq) { q.localInit = true q.activeQuiescenceReq = fn.Some(req) } + +// OnResume accepts a no return closure that will run when the quiescer is +// resumed. +func (q *Quiescer) OnResume(hook func()) { + q.Lock() + defer q.Unlock() + + q.onResume(hook) +} + +// onResume accepts a no return closure that will run when the quiescer is +// resumed. +func (q *Quiescer) onResume(hook func()) { + q.resumeQueue = append(q.resumeQueue, hook) +} + +// Resume runs all of the deferred actions that have accumulated while the +// channel has been quiescent and then resets the quiescer state to its initial +// state. +func (q *Quiescer) Resume() { + q.Lock() + defer q.Unlock() + + q.resume() +} + +// resume runs all of the deferred actions that have accumulated while the +// channel has been quiescent and then resets the quiescer state to its initial +// state. +func (q *Quiescer) resume() { + for _, hook := range q.resumeQueue { + hook() + } + q.localInit = false + q.remoteInit = false + q.sent = false + q.received = false + q.resumeQueue = nil +} diff --git a/htlcswitch/quiescer_test.go b/htlcswitch/quiescer_test.go index 5c6e2fa4c..77cf9e45b 100644 --- a/htlcswitch/quiescer_test.go +++ b/htlcswitch/quiescer_test.go @@ -380,3 +380,38 @@ func TestQuiescerTieBreaker(t *testing.T) { } } } + +// TestQuiescerResume ensures that the hooks that are attached to the quiescer +// are called when we call the resume method and no earlier. +func TestQuiescerResume(t *testing.T) { + t.Parallel() + + harness := initQuiescerTestHarness(lntypes.Local) + + msg := lnwire.Stfu{ + ChanID: cid, + Initiator: true, + } + + require.NoError( + t, harness.quiescer.RecvStfu( + msg, harness.pendingUpdates.Remote, + ), + ) + require.NoError( + t, harness.quiescer.SendOwedStfu( + harness.pendingUpdates.Local, + ), + ) + + require.True(t, harness.quiescer.IsQuiescent()) + var resumeHooksCalled = false + harness.quiescer.OnResume(func() { + resumeHooksCalled = true + }) + require.False(t, resumeHooksCalled) + + harness.quiescer.Resume() + require.True(t, resumeHooksCalled) + require.False(t, harness.quiescer.IsQuiescent()) +}