htlcswitch: implement noop quiescer

In this commit we implement a noop quiescer that we will use when
the feature hasn't been negotiated. This will make it far easier to
manage quiescence operations without having a number of if statements
in the link logic.
This commit is contained in:
Keagan McClelland
2024-08-09 20:25:44 -07:00
parent 5906ca2537
commit 48ee643c0d
2 changed files with 102 additions and 37 deletions

View File

@@ -46,6 +46,51 @@ var (
type StfuReq = fn.Req[fn.Unit, fn.Result[lntypes.ChannelParty]] type StfuReq = fn.Req[fn.Unit, fn.Result[lntypes.ChannelParty]]
// Quiescer is the public interface of the quiescence mechanism. Callers of the
// quiescence API should not need any methods besides the ones detailed here.
type Quiescer interface {
// IsQuiescent returns true if the state machine has been driven all the
// way to completion. If this returns true, processes that depend on
// channel quiescence may proceed.
IsQuiescent() bool
// QuiescenceInitiator determines which ChannelParty is the initiator of
// quiescence for the purposes of downstream protocols. If the channel
// is not currently quiescent, this method will return
// ErrNoDownstreamLeader.
QuiescenceInitiator() fn.Result[lntypes.ChannelParty]
// InitStfu instructs the quiescer that we intend to begin a quiescence
// negotiation where we are the initiator. We don't yet send stfu yet
// because we need to wait for the link to give us a valid opportunity
// to do so.
InitStfu(req StfuReq)
// RecvStfu is called when we receive an Stfu message from the remote.
RecvStfu(stfu lnwire.Stfu, numRemotePendingUpdates uint64) error
// CanRecvUpdates returns true if we haven't yet received an Stfu which
// would mark the end of the remote's ability to send updates.
CanRecvUpdates() bool
// CanSendUpdates returns true if we haven't yet sent an Stfu which
// would mark the end of our ability to send updates.
CanSendUpdates() bool
// SendOwedStfu sends Stfu if it owes one. It returns an error if the
// state machine is in an invalid state.
SendOwedStfu(numPendingLocalUpdates uint64) error
// OnResume accepts a no return closure that will run when the quiescer
// is resumed.
OnResume(hook func())
// Resume runs all of the deferred actions that have accumulated while
// the channel has been quiescent and then resets the quiescer state to
// its initial state.
Resume()
}
// QuiescerCfg is a config structure used to initialize a quiescer giving it the // QuiescerCfg is a config structure used to initialize a quiescer giving it the
// appropriate functionality to interact with the channel state that the // appropriate functionality to interact with the channel state that the
// quiescer must syncrhonize with. // quiescer must syncrhonize with.
@@ -65,9 +110,9 @@ type QuiescerCfg struct {
sendMsg func(lnwire.Stfu) error sendMsg func(lnwire.Stfu) error
} }
// Quiescer is a state machine that tracks progression through the quiescence // QuiescerLive is a state machine that tracks progression through the
// protocol. // quiescence protocol.
type Quiescer struct { type QuiescerLive struct {
cfg QuiescerCfg cfg QuiescerCfg
// localInit indicates whether our path through this state machine was // localInit indicates whether our path through this state machine was
@@ -100,13 +145,13 @@ type Quiescer struct {
// NewQuiescer creates a new quiescer for the given channel. // NewQuiescer creates a new quiescer for the given channel.
func NewQuiescer(cfg QuiescerCfg) Quiescer { func NewQuiescer(cfg QuiescerCfg) Quiescer {
return Quiescer{ return &QuiescerLive{
cfg: cfg, cfg: cfg,
} }
} }
// RecvStfu is called when we receive an Stfu message from the remote. // RecvStfu is called when we receive an Stfu message from the remote.
func (q *Quiescer) RecvStfu(msg lnwire.Stfu, func (q *QuiescerLive) RecvStfu(msg lnwire.Stfu,
numPendingRemoteUpdates uint64) error { numPendingRemoteUpdates uint64) error {
q.Lock() q.Lock()
@@ -116,7 +161,7 @@ func (q *Quiescer) RecvStfu(msg lnwire.Stfu,
} }
// recvStfu is called when we receive an Stfu message from the remote. // recvStfu is called when we receive an Stfu message from the remote.
func (q *Quiescer) recvStfu(msg lnwire.Stfu, func (q *QuiescerLive) recvStfu(msg lnwire.Stfu,
numPendingRemoteUpdates uint64) error { numPendingRemoteUpdates uint64) error {
// At the time of this writing, this check that we have already received // At the time of this writing, this check that we have already received
@@ -155,7 +200,7 @@ func (q *Quiescer) recvStfu(msg lnwire.Stfu,
// MakeStfu is called when we are ready to send an Stfu message. It returns the // MakeStfu is called when we are ready to send an Stfu message. It returns the
// Stfu message to be sent. // Stfu message to be sent.
func (q *Quiescer) MakeStfu( func (q *QuiescerLive) MakeStfu(
numPendingLocalUpdates uint64) fn.Result[lnwire.Stfu] { numPendingLocalUpdates uint64) fn.Result[lnwire.Stfu] {
q.RLock() q.RLock()
@@ -166,7 +211,7 @@ func (q *Quiescer) MakeStfu(
// makeStfu is called when we are ready to send an Stfu message. It returns the // makeStfu is called when we are ready to send an Stfu message. It returns the
// Stfu message to be sent. // Stfu message to be sent.
func (q *Quiescer) makeStfu( func (q *QuiescerLive) makeStfu(
numPendingLocalUpdates uint64) fn.Result[lnwire.Stfu] { numPendingLocalUpdates uint64) fn.Result[lnwire.Stfu] {
if q.sent { if q.sent {
@@ -190,7 +235,7 @@ func (q *Quiescer) makeStfu(
// OweStfu returns true if we owe the other party an Stfu. We owe the remote an // OweStfu returns true if we owe the other party an Stfu. We owe the remote an
// Stfu when we have received but not yet sent an Stfu, or we are the initiator // Stfu when we have received but not yet sent an Stfu, or we are the initiator
// but have not yet sent an Stfu. // but have not yet sent an Stfu.
func (q *Quiescer) OweStfu() bool { func (q *QuiescerLive) OweStfu() bool {
q.RLock() q.RLock()
defer q.RUnlock() defer q.RUnlock()
@@ -200,13 +245,13 @@ func (q *Quiescer) OweStfu() bool {
// oweStfu returns true if we owe the other party an Stfu. We owe the remote an // oweStfu returns true if we owe the other party an Stfu. We owe the remote an
// Stfu when we have received but not yet sent an Stfu, or we are the initiator // Stfu when we have received but not yet sent an Stfu, or we are the initiator
// but have not yet sent an Stfu. // but have not yet sent an Stfu.
func (q *Quiescer) oweStfu() bool { func (q *QuiescerLive) oweStfu() bool {
return (q.received || q.localInit) && !q.sent return (q.received || q.localInit) && !q.sent
} }
// NeedStfu returns true if the remote owes us an Stfu. They owe us an Stfu when // NeedStfu returns true if the remote owes us an Stfu. They owe us an Stfu when
// we have sent but not yet received an Stfu. // we have sent but not yet received an Stfu.
func (q *Quiescer) NeedStfu() bool { func (q *QuiescerLive) NeedStfu() bool {
q.RLock() q.RLock()
defer q.RUnlock() defer q.RUnlock()
@@ -215,7 +260,7 @@ func (q *Quiescer) NeedStfu() bool {
// needStfu returns true if the remote owes us an Stfu. They owe us an Stfu when // needStfu returns true if the remote owes us an Stfu. They owe us an Stfu when
// we have sent but not yet received an Stfu. // we have sent but not yet received an Stfu.
func (q *Quiescer) needStfu() bool { func (q *QuiescerLive) needStfu() bool {
q.RLock() q.RLock()
defer q.RUnlock() defer q.RUnlock()
@@ -225,7 +270,7 @@ func (q *Quiescer) needStfu() bool {
// IsQuiescent returns true if the state machine has been driven all the way to // IsQuiescent returns true if the state machine has been driven all the way to
// completion. If this returns true, processes that depend on channel quiescence // completion. If this returns true, processes that depend on channel quiescence
// may proceed. // may proceed.
func (q *Quiescer) IsQuiescent() bool { func (q *QuiescerLive) IsQuiescent() bool {
q.RLock() q.RLock()
defer q.RUnlock() defer q.RUnlock()
@@ -235,14 +280,14 @@ func (q *Quiescer) IsQuiescent() bool {
// isQuiescent returns true if the state machine has been driven all the way to // isQuiescent returns true if the state machine has been driven all the way to
// completion. If this returns true, processes that depend on channel quiescence // completion. If this returns true, processes that depend on channel quiescence
// may proceed. // may proceed.
func (q *Quiescer) isQuiescent() bool { func (q *QuiescerLive) isQuiescent() bool {
return q.sent && q.received return q.sent && q.received
} }
// QuiescenceInitiator determines which ChannelParty is the initiator of // QuiescenceInitiator determines which ChannelParty is the initiator of
// quiescence for the purposes of downstream protocols. If the channel is not // quiescence for the purposes of downstream protocols. If the channel is not
// currently quiescent, this method will return ErrNoQuiescenceInitiator. // currently quiescent, this method will return ErrNoQuiescenceInitiator.
func (q *Quiescer) QuiescenceInitiator() fn.Result[lntypes.ChannelParty] { func (q *QuiescerLive) QuiescenceInitiator() fn.Result[lntypes.ChannelParty] {
q.RLock() q.RLock()
defer q.RUnlock() defer q.RUnlock()
@@ -252,7 +297,7 @@ func (q *Quiescer) QuiescenceInitiator() fn.Result[lntypes.ChannelParty] {
// quiescenceInitiator determines which ChannelParty is the initiator of // quiescenceInitiator determines which ChannelParty is the initiator of
// quiescence for the purposes of downstream protocols. If the channel is not // quiescence for the purposes of downstream protocols. If the channel is not
// currently quiescent, this method will return ErrNoQuiescenceInitiator. // currently quiescent, this method will return ErrNoQuiescenceInitiator.
func (q *Quiescer) quiescenceInitiator() fn.Result[lntypes.ChannelParty] { func (q *QuiescerLive) quiescenceInitiator() fn.Result[lntypes.ChannelParty] {
switch { switch {
case !q.isQuiescent(): case !q.isQuiescent():
return fn.Err[lntypes.ChannelParty](ErrNoQuiescenceInitiator) return fn.Err[lntypes.ChannelParty](ErrNoQuiescenceInitiator)
@@ -274,7 +319,7 @@ func (q *Quiescer) quiescenceInitiator() fn.Result[lntypes.ChannelParty] {
// CanSendUpdates returns true if we haven't yet sent an Stfu which would mark // CanSendUpdates returns true if we haven't yet sent an Stfu which would mark
// the end of our ability to send updates. // the end of our ability to send updates.
func (q *Quiescer) CanSendUpdates() bool { func (q *QuiescerLive) CanSendUpdates() bool {
q.RLock() q.RLock()
defer q.RUnlock() defer q.RUnlock()
@@ -283,13 +328,13 @@ func (q *Quiescer) CanSendUpdates() bool {
// canSendUpdates returns true if we haven't yet sent an Stfu which would mark // canSendUpdates returns true if we haven't yet sent an Stfu which would mark
// the end of our ability to send updates. // the end of our ability to send updates.
func (q *Quiescer) canSendUpdates() bool { func (q *QuiescerLive) canSendUpdates() bool {
return !q.sent && !q.localInit return !q.sent && !q.localInit
} }
// CanRecvUpdates returns true if we haven't yet received an Stfu which would // CanRecvUpdates returns true if we haven't yet received an Stfu which would
// mark the end of the remote's ability to send updates. // mark the end of the remote's ability to send updates.
func (q *Quiescer) CanRecvUpdates() bool { func (q *QuiescerLive) CanRecvUpdates() bool {
q.RLock() q.RLock()
defer q.RUnlock() defer q.RUnlock()
@@ -298,12 +343,12 @@ func (q *Quiescer) CanRecvUpdates() bool {
// canRecvUpdates returns true if we haven't yet received an Stfu which would // canRecvUpdates returns true if we haven't yet received an Stfu which would
// mark the end of the remote's ability to send updates. // mark the end of the remote's ability to send updates.
func (q *Quiescer) canRecvUpdates() bool { func (q *QuiescerLive) canRecvUpdates() bool {
return !q.received return !q.received
} }
// CanSendStfu returns true if we can send an Stfu. // CanSendStfu returns true if we can send an Stfu.
func (q *Quiescer) CanSendStfu(numPendingLocalUpdates uint64) bool { func (q *QuiescerLive) CanSendStfu(numPendingLocalUpdates uint64) bool {
q.RLock() q.RLock()
defer q.RUnlock() defer q.RUnlock()
@@ -311,12 +356,12 @@ func (q *Quiescer) CanSendStfu(numPendingLocalUpdates uint64) bool {
} }
// canSendStfu returns true if we can send an Stfu. // canSendStfu returns true if we can send an Stfu.
func (q *Quiescer) canSendStfu(numPendingLocalUpdates uint64) bool { func (q *QuiescerLive) canSendStfu(numPendingLocalUpdates uint64) bool {
return numPendingLocalUpdates == 0 && !q.sent return numPendingLocalUpdates == 0 && !q.sent
} }
// CanRecvStfu returns true if we can receive an Stfu. // CanRecvStfu returns true if we can receive an Stfu.
func (q *Quiescer) CanRecvStfu(numPendingRemoteUpdates uint64) bool { func (q *QuiescerLive) CanRecvStfu(numPendingRemoteUpdates uint64) bool {
q.RLock() q.RLock()
defer q.RUnlock() defer q.RUnlock()
@@ -324,13 +369,13 @@ func (q *Quiescer) CanRecvStfu(numPendingRemoteUpdates uint64) bool {
} }
// canRecvStfu returns true if we can receive an Stfu. // canRecvStfu returns true if we can receive an Stfu.
func (q *Quiescer) canRecvStfu(numPendingRemoteUpdates uint64) bool { func (q *QuiescerLive) canRecvStfu(numPendingRemoteUpdates uint64) bool {
return numPendingRemoteUpdates == 0 && !q.received return numPendingRemoteUpdates == 0 && !q.received
} }
// SendOwedStfu sends Stfu if it owes one. It returns an error if the state // SendOwedStfu sends Stfu if it owes one. It returns an error if the state
// machine is in an invalid state. // machine is in an invalid state.
func (q *Quiescer) SendOwedStfu(numPendingLocalUpdates uint64) error { func (q *QuiescerLive) SendOwedStfu(numPendingLocalUpdates uint64) error {
q.Lock() q.Lock()
defer q.Unlock() defer q.Unlock()
@@ -339,7 +384,7 @@ func (q *Quiescer) SendOwedStfu(numPendingLocalUpdates uint64) error {
// sendOwedStfu sends Stfu if it owes one. It returns an error if the state // sendOwedStfu sends Stfu if it owes one. It returns an error if the state
// machine is in an invalid state. // machine is in an invalid state.
func (q *Quiescer) sendOwedStfu(numPendingLocalUpdates uint64) error { func (q *QuiescerLive) sendOwedStfu(numPendingLocalUpdates uint64) error {
if !q.oweStfu() || !q.canSendStfu(numPendingLocalUpdates) { if !q.oweStfu() || !q.canSendStfu(numPendingLocalUpdates) {
return nil return nil
} }
@@ -360,7 +405,7 @@ func (q *Quiescer) sendOwedStfu(numPendingLocalUpdates uint64) error {
// TryResolveStfuReq attempts to resolve the active quiescence request if the // TryResolveStfuReq attempts to resolve the active quiescence request if the
// state machine has reached a quiescent state. // state machine has reached a quiescent state.
func (q *Quiescer) TryResolveStfuReq() { func (q *QuiescerLive) TryResolveStfuReq() {
q.Lock() q.Lock()
defer q.Unlock() defer q.Unlock()
@@ -369,7 +414,7 @@ func (q *Quiescer) TryResolveStfuReq() {
// tryResolveStfuReq attempts to resolve the active quiescence request if the // tryResolveStfuReq attempts to resolve the active quiescence request if the
// state machine has reached a quiescent state. // state machine has reached a quiescent state.
func (q *Quiescer) tryResolveStfuReq() { func (q *QuiescerLive) tryResolveStfuReq() {
q.activeQuiescenceReq.WhenSome( q.activeQuiescenceReq.WhenSome(
func(req StfuReq) { func(req StfuReq) {
if q.isQuiescent() { if q.isQuiescent() {
@@ -383,7 +428,7 @@ func (q *Quiescer) tryResolveStfuReq() {
// InitStfu instructs the quiescer that we intend to begin a quiescence // InitStfu instructs the quiescer that we intend to begin a quiescence
// negotiation where we are the initiator. We don't yet send stfu yet because // negotiation where we are the initiator. We don't yet send stfu yet because
// we need to wait for the link to give us a valid opportunity to do so. // we need to wait for the link to give us a valid opportunity to do so.
func (q *Quiescer) InitStfu(req StfuReq) { func (q *QuiescerLive) InitStfu(req StfuReq) {
q.Lock() q.Lock()
defer q.Unlock() defer q.Unlock()
@@ -393,7 +438,7 @@ func (q *Quiescer) InitStfu(req StfuReq) {
// initStfu instructs the quiescer that we intend to begin a quiescence // initStfu instructs the quiescer that we intend to begin a quiescence
// negotiation where we are the initiator. We don't yet send stfu yet because // negotiation where we are the initiator. We don't yet send stfu yet because
// we need to wait for the link to give us a valid opportunity to do so. // we need to wait for the link to give us a valid opportunity to do so.
func (q *Quiescer) initStfu(req StfuReq) { func (q *QuiescerLive) initStfu(req StfuReq) {
if q.localInit { if q.localInit {
req.Resolve(fn.Errf[lntypes.ChannelParty]( req.Resolve(fn.Errf[lntypes.ChannelParty](
"quiescence already requested", "quiescence already requested",
@@ -408,7 +453,7 @@ func (q *Quiescer) initStfu(req StfuReq) {
// OnResume accepts a no return closure that will run when the quiescer is // OnResume accepts a no return closure that will run when the quiescer is
// resumed. // resumed.
func (q *Quiescer) OnResume(hook func()) { func (q *QuiescerLive) OnResume(hook func()) {
q.Lock() q.Lock()
defer q.Unlock() defer q.Unlock()
@@ -417,14 +462,14 @@ func (q *Quiescer) OnResume(hook func()) {
// onResume accepts a no return closure that will run when the quiescer is // onResume accepts a no return closure that will run when the quiescer is
// resumed. // resumed.
func (q *Quiescer) onResume(hook func()) { func (q *QuiescerLive) onResume(hook func()) {
q.resumeQueue = append(q.resumeQueue, hook) q.resumeQueue = append(q.resumeQueue, hook)
} }
// Resume runs all of the deferred actions that have accumulated while the // Resume runs all of the deferred actions that have accumulated while the
// channel has been quiescent and then resets the quiescer state to its initial // channel has been quiescent and then resets the quiescer state to its initial
// state. // state.
func (q *Quiescer) Resume() { func (q *QuiescerLive) Resume() {
q.Lock() q.Lock()
defer q.Unlock() defer q.Unlock()
@@ -434,7 +479,7 @@ func (q *Quiescer) Resume() {
// resume runs all of the deferred actions that have accumulated while the // resume runs all of the deferred actions that have accumulated while the
// channel has been quiescent and then resets the quiescer state to its initial // channel has been quiescent and then resets the quiescer state to its initial
// state. // state.
func (q *Quiescer) resume() { func (q *QuiescerLive) resume() {
for _, hook := range q.resumeQueue { for _, hook := range q.resumeQueue {
hook() hook()
} }
@@ -444,3 +489,21 @@ func (q *Quiescer) resume() {
q.received = false q.received = false
q.resumeQueue = nil q.resumeQueue = nil
} }
type quiescerNoop struct{}
var _ Quiescer = (*quiescerNoop)(nil)
func (q *quiescerNoop) InitStfu(req StfuReq) {
req.Resolve(fn.Errf[lntypes.ChannelParty]("quiescence not supported"))
}
func (q *quiescerNoop) RecvStfu(_ lnwire.Stfu, _ uint64) error { return nil }
func (q *quiescerNoop) CanRecvUpdates() bool { return true }
func (q *quiescerNoop) CanSendUpdates() bool { return true }
func (q *quiescerNoop) SendOwedStfu(_ uint64) error { return nil }
func (q *quiescerNoop) IsQuiescent() bool { return false }
func (q *quiescerNoop) OnResume(hook func()) { hook() }
func (q *quiescerNoop) Resume() {}
func (q *quiescerNoop) QuiescenceInitiator() fn.Result[lntypes.ChannelParty] {
return fn.Err[lntypes.ChannelParty](ErrNoQuiescenceInitiator)
}

View File

@@ -14,7 +14,7 @@ var cid = lnwire.ChannelID(bytes.Repeat([]byte{0x00}, 32))
type quiescerTestHarness struct { type quiescerTestHarness struct {
pendingUpdates lntypes.Dual[uint64] pendingUpdates lntypes.Dual[uint64]
quiescer Quiescer quiescer *QuiescerLive
conn <-chan lnwire.Stfu conn <-chan lnwire.Stfu
} }
@@ -27,14 +27,16 @@ func initQuiescerTestHarness(
conn: conn, conn: conn,
} }
harness.quiescer = NewQuiescer(QuiescerCfg{ quiescer, _ := NewQuiescer(QuiescerCfg{
chanID: cid, chanID: cid,
channelInitiator: channelInitiator, channelInitiator: channelInitiator,
sendMsg: func(msg lnwire.Stfu) error { sendMsg: func(msg lnwire.Stfu) error {
conn <- msg conn <- msg
return nil return nil
}, },
}) }).(*QuiescerLive)
harness.quiescer = quiescer
return harness return harness
} }