From 9b2d1018f213bfb5f4da54714953fa738eabf1f7 Mon Sep 17 00:00:00 2001 From: Keagan McClelland Date: Tue, 21 Nov 2023 14:20:53 -0700 Subject: [PATCH] htlcswitch+peer: add flush api and lifecycle hooks to ChannelUpdateHandler We also add dummy implementations to channelLink and various mocks. --- go.mod | 2 +- go.sum | 4 +- htlcswitch/interfaces.go | 48 ++++++++++ htlcswitch/link.go | 39 ++++++++ htlcswitch/link_test.go | 188 +++++++++++++++++++++++++++++++++++++++ htlcswitch/mock.go | 19 ++++ peer/test_utils.go | 21 +++++ 7 files changed, 318 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 3e7dcc8ec..6591ec6df 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,7 @@ require ( github.com/lightningnetwork/lightning-onion v1.2.1-0.20230823005744-06182b1d7d2f github.com/lightningnetwork/lnd/cert v1.2.2 github.com/lightningnetwork/lnd/clock v1.1.1 - github.com/lightningnetwork/lnd/fn v1.0.0 + github.com/lightningnetwork/lnd/fn v1.0.1 github.com/lightningnetwork/lnd/healthcheck v1.2.3 github.com/lightningnetwork/lnd/kvdb v1.4.4 github.com/lightningnetwork/lnd/queue v1.1.1 diff --git a/go.sum b/go.sum index e3ee04c30..9d4624e67 100644 --- a/go.sum +++ b/go.sum @@ -449,8 +449,8 @@ github.com/lightningnetwork/lnd/cert v1.2.2/go.mod h1:jQmFn/Ez4zhDgq2hnYSw8r35bq github.com/lightningnetwork/lnd/clock v1.0.1/go.mod h1:KnQudQ6w0IAMZi1SgvecLZQZ43ra2vpDNj7H/aasemg= github.com/lightningnetwork/lnd/clock v1.1.1 h1:OfR3/zcJd2RhH0RU+zX/77c0ZiOnIMsDIBjgjWdZgA0= github.com/lightningnetwork/lnd/clock v1.1.1/go.mod h1:mGnAhPyjYZQJmebS7aevElXKTFDuO+uNFFfMXK1W8xQ= -github.com/lightningnetwork/lnd/fn v1.0.0 h1:I5VG9AD63mOQ89RMQEu7HRI1r68wn8yz539LoylUIKM= -github.com/lightningnetwork/lnd/fn v1.0.0/go.mod h1:XV+0vBXSnh3aUjskJUv58TOpsveiXQ+ac8rEnXZDGFc= +github.com/lightningnetwork/lnd/fn v1.0.1 h1:4nAxKpGKgk4/xRQKxvim3BW0QM34S4BH6QghWZVjsko= +github.com/lightningnetwork/lnd/fn v1.0.1/go.mod h1:XV+0vBXSnh3aUjskJUv58TOpsveiXQ+ac8rEnXZDGFc= github.com/lightningnetwork/lnd/healthcheck v1.2.3 h1:oqhOOy8WmIEa6RBkYKC0mmYZkhl8T2kGD97n9jpML8o= github.com/lightningnetwork/lnd/healthcheck v1.2.3/go.mod h1:eDxH3dEwV9DeBW/6inrmlVh1qBOFV0AI14EEPnGt9gc= github.com/lightningnetwork/lnd/kvdb v1.4.4 h1:bCv63rVCvzqj1BkagN/EWTov6NDDgYEG/t0z2HepRMk= diff --git a/htlcswitch/interfaces.go b/htlcswitch/interfaces.go index 92e071f13..f39de2a73 100644 --- a/htlcswitch/interfaces.go +++ b/htlcswitch/interfaces.go @@ -138,8 +138,56 @@ type ChannelUpdateHandler interface { // clean. This can be used with dynamic commitment negotiation or coop // close negotiation which require a clean channel state. ShutdownIfChannelClean() error + + // EnableAdds sets the ChannelUpdateHandler state to allow + // UpdateAddHtlc's in the specified direction. It returns an error if + // the state already allowed those adds. + EnableAdds(direction LinkDirection) error + + // DiableAdds sets the ChannelUpdateHandler state to allow + // UpdateAddHtlc's in the specified direction. It returns an error if + // the state already disallowed those adds. + DisableAdds(direction LinkDirection) error + + // IsFlushing returns true when UpdateAddHtlc's are disabled in the + // direction of the argument. + IsFlushing(direction LinkDirection) bool + + // OnFlushedOnce adds a hook that will be called the next time the + // channel state reaches zero htlcs. This hook will only ever be called + // once. If the channel state already has zero htlcs, then this will be + // called immediately. + OnFlushedOnce(func()) + + // OnCommitOnce adds a hook that will be called the next time a + // CommitSig message is sent in the argument's LinkDirection. This hook + // will only ever be called once. If no CommitSig is owed in the + // argument's LinkDirection, then we will call this hook immediately. + OnCommitOnce(LinkDirection, func()) } +// CommitHookID is a value that is used to uniquely identify hooks in the +// ChannelUpdateHandler's commitment update lifecycle. You should never need to +// construct one of these by hand, nor should you try. +type CommitHookID uint64 + +// FlushHookID is a value that is used to uniquely identify hooks in the +// ChannelUpdateHandler's flush lifecycle. You should never need to construct +// one of these by hand, nor should you try. +type FlushHookID uint64 + +// LinkDirection is used to query and change any link state on a per-direction +// basis. +type LinkDirection bool + +const ( + // Incoming is the direction from the remote peer to our node. + Incoming LinkDirection = false + + // Outgoing is the direction from our node to the remote peer. + Outgoing LinkDirection = true +) + // ChannelLink is an interface which represents the subsystem for managing the // incoming htlc requests, applying the changes to the channel, and also // propagating/forwarding it to htlc switch. diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 49965a97a..def2e2f01 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -547,6 +547,45 @@ func (l *channelLink) EligibleToForward() bool { l.isReestablished() } +// EnableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in +// the specified direction. It returns an error if the state already allowed +// those adds. +func (l *channelLink) EnableAdds(LinkDirection) error { + // TODO(proofofkeags): Implement + return nil +} + +// DiableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in +// the specified direction. It returns an error if the state already disallowed +// those adds. +func (l *channelLink) DisableAdds(LinkDirection) error { + // TODO(proofofkeags): Implement + return nil +} + +// IsFlushing returns true when UpdateAddHtlc's are disabled in the direction of +// the argument. +func (l *channelLink) IsFlushing(LinkDirection) bool { + // TODO(proofofkeags): Implement + return false +} + +// OnFlushedOnce adds a hook that will be called the next time the +// channel state reaches zero htlcs. This hook will only ever be called +// once. If the channel state already has zero htlcs, then this will be +// called immediately. +func (l *channelLink) OnFlushedOnce(func()) { + // TODO(proofofkeags): Implement +} + +// OnCommitOnce adds a hook that will be called the next time a CommitSig +// message is sent in the argument's LinkDirection. This hook will only ever be +// called once. If no CommitSig is owed in the argument's LinkDirection, then +// we will call this hook immediately. +func (l *channelLink) OnCommitOnce(LinkDirection, func()) { + // TODO(proofofkeags): Implement +} + // isReestablished returns true if the link has successfully completed the // channel reestablishment dance. func (l *channelLink) isReestablished() bool { diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index baaa002a2..5dce84c3a 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -8,6 +8,7 @@ import ( "encoding/binary" "fmt" "io" + prand "math/rand" "net" "reflect" "runtime" @@ -7041,3 +7042,190 @@ func TestChannelLinkShortFailureRelay(t *testing.T) { default: } } + +// TestLinkFlushApiDirectionIsolation tests whether the calls to EnableAdds and +// DisableAdds are correctly isolated based off of direction (Incoming and +// Outgoing). This means that the state of the Outgoing flush should always be +// unaffected by calls to EnableAdds/DisableAdds on the Incoming direction and +// vice-versa. +func TestLinkFlushApiDirectionIsolation(t *testing.T) { + aliceLink, _, _, _, _, _ := + newSingleLinkTestHarness( + t, 5*btcutil.SatoshiPerBitcoin, + 1*btcutil.SatoshiPerBitcoin, + ) + + for i := 0; i < 10; i++ { + if prand.Uint64()%2 == 0 { + //nolint:errcheck + aliceLink.EnableAdds(Outgoing) + require.False(t, aliceLink.IsFlushing(Outgoing)) + } else { + //nolint:errcheck + aliceLink.DisableAdds(Outgoing) + require.True(t, aliceLink.IsFlushing(Outgoing)) + } + require.False(t, aliceLink.IsFlushing(Incoming)) + } + + //nolint:errcheck + aliceLink.EnableAdds(Outgoing) + + for i := 0; i < 10; i++ { + if prand.Uint64()%2 == 0 { + //nolint:errcheck + aliceLink.EnableAdds(Incoming) + require.False(t, aliceLink.IsFlushing(Incoming)) + } else { + //nolint:errcheck + aliceLink.DisableAdds(Incoming) + require.True(t, aliceLink.IsFlushing(Incoming)) + } + require.False(t, aliceLink.IsFlushing(Outgoing)) + } +} + +// TestLinkFlushApiGateStateIdempotence tests whether successive calls to +// EnableAdds or DisableAdds (without the other one in between) result in both +// no state change in the flush state AND that the second call results in an +// error (to inform the caller that the call was unnecessary in case it implies +// a bug in their logic). +func TestLinkFlushApiGateStateIdempotence(t *testing.T) { + aliceLink, _, _, _, _, _ := + newSingleLinkTestHarness( + t, 5*btcutil.SatoshiPerBitcoin, + 1*btcutil.SatoshiPerBitcoin, + ) + + for _, dir := range []LinkDirection{Incoming, Outgoing} { + require.Nil(t, aliceLink.DisableAdds(dir)) + require.True(t, aliceLink.IsFlushing(dir)) + + require.NotNil(t, aliceLink.DisableAdds(dir)) + require.True(t, aliceLink.IsFlushing(dir)) + + require.Nil(t, aliceLink.EnableAdds(dir)) + require.False(t, aliceLink.IsFlushing(dir)) + + require.NotNil(t, aliceLink.EnableAdds(dir)) + require.False(t, aliceLink.IsFlushing(dir)) + } +} + +func TestLinkOutgoingCommitHooksCalled(t *testing.T) { + aliceLink, _, batchTicker, start, _, err := + newSingleLinkTestHarness( + t, 5*btcutil.SatoshiPerBitcoin, + btcutil.SatoshiPerBitcoin, + ) + require.NoError(t, err) + + require.NoError(t, start(), "could not start link") + + hookCalled := make(chan struct{}) + aliceLink.OnCommitOnce(Outgoing, func() { + close(hookCalled) + }) + + select { + case <-hookCalled: + t.Fatal("hook called prematurely") + case <-time.NewTimer(time.Second).C: + } + + batchTicker <- time.Now() + + // Send a second tick just to ensure the hook isn't called more than + // once. + batchTicker <- time.Now() + + select { + case <-hookCalled: + case <-time.NewTimer(time.Second).C: + t.Fatal("hook not called") + } +} + +func TestLinkFlushHooksCalled(t *testing.T) { + aliceLink, bobChannel, _, start, _, err := + newSingleLinkTestHarness( + t, 5*btcutil.SatoshiPerBitcoin, + btcutil.SatoshiPerBitcoin, + ) + require.NoError(t, err) + + require.NoError(t, start(), "could not start link") + + //nolint:forcetypeassert + aliceMsgs := aliceLink.(*channelLink).cfg.Peer.(*mockPeer).sentMsgs + + ctx := linkTestContext{ + t: t, + aliceLink: aliceLink, + bobChannel: bobChannel, + aliceMsgs: aliceMsgs, + } + + hookCalled := make(chan struct{}) + + assertHookCalled := func(shouldBeCalled bool) { + select { + case <-hookCalled: + require.True( + t, shouldBeCalled, "hook called prematurely", + ) + case <-time.NewTimer(time.Millisecond).C: + require.False(t, shouldBeCalled, "hook not called") + } + } + + //nolint:forcetypeassert + htlc := generateHtlc(t, aliceLink.(*channelLink), 0) + + // A <- add -- B + ctx.sendHtlcBobToAlice(htlc) + + // A <- sig -- B + ctx.sendCommitSigBobToAlice(1) + + // A -- rev -> B + ctx.receiveRevAndAckAliceToBob() + + // Register flush hook + aliceLink.OnFlushedOnce(func() { + close(hookCalled) + }) + + // Channel is not clean, hook should not be called + assertHookCalled(false) + + // A -- sig -> B + ctx.receiveCommitSigAliceToBob(1) + assertHookCalled(false) + + // A <- rev -- B + ctx.sendRevAndAckBobToAlice() + assertHookCalled(false) + + // A -- set -> B + ctx.receiveSettleAliceToBob() + assertHookCalled(false) + + // A -- sig -> B + ctx.receiveCommitSigAliceToBob(0) + assertHookCalled(false) + + // A <- rev -- B + ctx.sendRevAndAckBobToAlice() + assertHookCalled(false) + + // A <- sig -- B + ctx.sendCommitSigBobToAlice(0) + // since there is no pause point between alice receiving CommitSig and + // sending RevokeAndAck, we don't assert the hook hasn't been called + // here. + + // A -- rev -> B + ctx.receiveRevAndAckAliceToBob() + assertHookCalled(true) +} diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index 9b0453dea..6d21a9519 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -907,6 +907,25 @@ func (f *mockChannelLink) UpdateShortChanID() (lnwire.ShortChannelID, error) { return f.shortChanID, nil } +func (f *mockChannelLink) EnableAdds(linkDirection LinkDirection) error { + // TODO(proofofkeags): Implement + return nil +} +func (f *mockChannelLink) DisableAdds(linkDirection LinkDirection) error { + // TODO(proofofkeags): Implement + return nil +} +func (f *mockChannelLink) IsFlushing(linkDirection LinkDirection) bool { + // TODO(proofofkeags): Implement + return false +} +func (f *mockChannelLink) OnFlushedOnce(func()) { + // TODO(proofofkeags): Implement +} +func (f *mockChannelLink) OnCommitOnce(LinkDirection, func()) { + // TODO(proofofkeags): Implement +} + var _ ChannelLink = (*mockChannelLink)(nil) func newDB() (*channeldb.DB, func(), error) { diff --git a/peer/test_utils.go b/peer/test_utils.go index c7509c0c8..4993d3af3 100644 --- a/peer/test_utils.go +++ b/peer/test_utils.go @@ -509,6 +509,27 @@ type mockMessageConn struct { readRaceDetectingCounter int } +func (m *mockUpdateHandler) EnableAdds( + htlcswitch.LinkDirection, +) error { + // TODO(proofofkeags): Implement + return nil +} +func (m *mockUpdateHandler) DisableAdds(htlcswitch.LinkDirection) error { + // TODO(proofofkeags): Implement + return nil +} +func (m *mockUpdateHandler) IsFlushing(htlcswitch.LinkDirection) bool { + // TODO(proofofkeags): Implement + return false +} +func (m *mockUpdateHandler) OnFlushedOnce(func()) { + // TODO(proofofkeags): Implement +} +func (m *mockUpdateHandler) OnCommitOnce(htlcswitch.LinkDirection, func()) { + // TODO(proofofkeags): Implement +} + func newMockConn(t *testing.T, expectedMessages int) *mockMessageConn { return &mockMessageConn{ t: t,