Merge pull request #5618 from Crypt-iQ/coop_switch_sync_08092021

multi: optimistically shutdown link during coop close
This commit is contained in:
Olaoluwa Osuntokun
2021-09-20 18:09:07 -07:00
committed by GitHub
12 changed files with 490 additions and 57 deletions

View File

@@ -1184,11 +1184,9 @@ func waitUntilLinkActive(p *Brontide,
// The link may already be active by this point, and we may have missed the
// ActiveLinkEvent. Check if the link exists.
links, _ := p.cfg.Switch.GetLinksByInterface(p.cfg.PubKeyBytes)
for _, link := range links {
if link.ChanID() == cid {
return link
}
link := p.fetchLinkFromKeyAndCid(cid)
if link != nil {
return link
}
// If the link is nil, we must wait for it to be active.
@@ -1216,16 +1214,7 @@ func waitUntilLinkActive(p *Brontide,
// The link shouldn't be nil as we received an
// ActiveLinkEvent. If it is nil, we return nil and the
// calling function should catch it.
links, _ = p.cfg.Switch.GetLinksByInterface(
p.cfg.PubKeyBytes,
)
for _, link := range links {
if link.ChanID() == cid {
return link
}
}
return nil
return p.fetchLinkFromKeyAndCid(cid)
case <-p.quit:
return nil
@@ -2409,12 +2398,11 @@ func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) (
// cooperative channel closure.
chanCloser, ok := p.activeChanCloses[chanID]
if !ok {
// If we need to create a chan closer for the first time, then
// we'll check to ensure that the channel is even in the proper
// state to allow a co-op channel closure.
if len(channel.ActiveHtlcs()) != 0 {
return nil, fmt.Errorf("cannot co-op close " +
"channel w/ active htlcs")
// Optimistically try a link shutdown, erroring out if it
// failed.
if err := p.tryLinkShutdown(chanID); err != nil {
peerLog.Errorf("failed link shutdown: %v", err)
return nil, err
}
// We'll create a valid closing state machine in order to
@@ -2454,9 +2442,8 @@ func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) (
chanCloser = chancloser.NewChanCloser(
chancloser.ChanCloseCfg{
Channel: channel,
UnregisterChannel: p.cfg.Switch.RemoveLink,
BroadcastTx: p.cfg.Wallet.PublishTransaction,
Channel: channel,
BroadcastTx: p.cfg.Wallet.PublishTransaction,
DisableChannel: func(chanPoint wire.OutPoint) error {
return p.cfg.ChanStatusMgr.RequestDisable(chanPoint, false)
},
@@ -2570,11 +2557,18 @@ func (p *Brontide) handleLocalCloseReq(req *htlcswitch.ChanClose) {
return
}
// Optimistically try a link shutdown, erroring out if it
// failed.
if err := p.tryLinkShutdown(chanID); err != nil {
peerLog.Errorf("failed link shutdown: %v", err)
req.Err <- err
return
}
chanCloser := chancloser.NewChanCloser(
chancloser.ChanCloseCfg{
Channel: channel,
UnregisterChannel: p.cfg.Switch.RemoveLink,
BroadcastTx: p.cfg.Wallet.PublishTransaction,
Channel: channel,
BroadcastTx: p.cfg.Wallet.PublishTransaction,
DisableChannel: func(chanPoint wire.OutPoint) error {
return p.cfg.ChanStatusMgr.RequestDisable(chanPoint, false)
},
@@ -2701,6 +2695,55 @@ func (p *Brontide) handleLinkFailure(failure linkFailureReport) {
}
}
// tryLinkShutdown attempts to fetch a target link from the switch, calls
// ShutdownIfChannelClean to optimistically trigger a link shutdown, and
// removes the link from the switch. It returns an error if any step failed.
func (p *Brontide) tryLinkShutdown(cid lnwire.ChannelID) error {
// Fetch the appropriate link and call ShutdownIfChannelClean to ensure
// no other updates can occur.
chanLink := p.fetchLinkFromKeyAndCid(cid)
// If the link happens to be nil, return ErrChannelNotFound so we can
// ignore the close message.
if chanLink == nil {
return ErrChannelNotFound
}
// Else, the link exists, so attempt to trigger shutdown. If this
// fails, we'll send an error message to the remote peer.
if err := chanLink.ShutdownIfChannelClean(); err != nil {
return err
}
// Next, we remove the link from the switch to shut down all of the
// link's goroutines and remove it from the switch's internal maps. We
// don't call WipeChannel as the channel must still be in the
// activeChannels map to process coop close messages.
p.cfg.Switch.RemoveLink(cid)
return nil
}
// fetchLinkFromKeyAndCid fetches a link from the switch via the remote's
// public key and the channel id.
func (p *Brontide) fetchLinkFromKeyAndCid(
cid lnwire.ChannelID) htlcswitch.ChannelUpdateHandler {
var chanLink htlcswitch.ChannelUpdateHandler
// We don't need to check the error here, and can instead just loop
// over the slice and return nil.
links, _ := p.cfg.Switch.GetLinksByInterface(p.cfg.PubKeyBytes)
for _, link := range links {
if link.ChanID() == cid {
chanLink = link
break
}
}
return chanLink
}
// finalizeChanClosure performs the final clean up steps once the cooperative
// closure transaction has been fully broadcast. The finalized closing state
// machine should be passed in. Once the transaction has been sufficiently

View File

@@ -39,8 +39,10 @@ func TestPeerChannelClosureAcceptFeeResponder(t *testing.T) {
}
broadcastTxChan := make(chan *wire.MsgTx)
mockSwitch := &mockMessageSwitch{}
alicePeer, bobChan, cleanUp, err := createTestPeer(
notifier, broadcastTxChan, noUpdate,
notifier, broadcastTxChan, noUpdate, mockSwitch,
)
if err != nil {
t.Fatalf("unable to create test channels: %v", err)
@@ -49,6 +51,9 @@ func TestPeerChannelClosureAcceptFeeResponder(t *testing.T) {
chanID := lnwire.NewChanIDFromOutPoint(bobChan.ChannelPoint())
mockLink := newMockUpdateHandler(chanID)
mockSwitch.links = append(mockSwitch.links, mockLink)
// We send a shutdown request to Alice. She will now be the responding
// node in this shutdown procedure. We first expect Alice to answer
// this shutdown request with a Shutdown message.
@@ -142,14 +147,20 @@ func TestPeerChannelClosureAcceptFeeInitiator(t *testing.T) {
}
broadcastTxChan := make(chan *wire.MsgTx)
mockSwitch := &mockMessageSwitch{}
alicePeer, bobChan, cleanUp, err := createTestPeer(
notifier, broadcastTxChan, noUpdate,
notifier, broadcastTxChan, noUpdate, mockSwitch,
)
if err != nil {
t.Fatalf("unable to create test channels: %v", err)
}
defer cleanUp()
chanID := lnwire.NewChanIDFromOutPoint(bobChan.ChannelPoint())
mockLink := newMockUpdateHandler(chanID)
mockSwitch.links = append(mockSwitch.links, mockLink)
// We make Alice send a shutdown request.
updateChan := make(chan interface{}, 1)
errChan := make(chan error, 1)
@@ -179,7 +190,6 @@ func TestPeerChannelClosureAcceptFeeInitiator(t *testing.T) {
aliceDeliveryScript := shutdownMsg.Address
// Bob will respond with his own Shutdown message.
chanID := shutdownMsg.ChannelID
alicePeer.chanCloseMsgs <- &closeMsg{
cid: chanID,
msg: lnwire.NewShutdown(chanID,
@@ -264,8 +274,10 @@ func TestPeerChannelClosureFeeNegotiationsResponder(t *testing.T) {
}
broadcastTxChan := make(chan *wire.MsgTx)
mockSwitch := &mockMessageSwitch{}
alicePeer, bobChan, cleanUp, err := createTestPeer(
notifier, broadcastTxChan, noUpdate,
notifier, broadcastTxChan, noUpdate, mockSwitch,
)
if err != nil {
t.Fatalf("unable to create test channels: %v", err)
@@ -274,6 +286,9 @@ func TestPeerChannelClosureFeeNegotiationsResponder(t *testing.T) {
chanID := lnwire.NewChanIDFromOutPoint(bobChan.ChannelPoint())
mockLink := newMockUpdateHandler(chanID)
mockSwitch.links = append(mockSwitch.links, mockLink)
// Bob sends a shutdown request to Alice. She will now be the responding
// node in this shutdown procedure. We first expect Alice to answer this
// Shutdown request with a Shutdown message.
@@ -458,14 +473,20 @@ func TestPeerChannelClosureFeeNegotiationsInitiator(t *testing.T) {
}
broadcastTxChan := make(chan *wire.MsgTx)
mockSwitch := &mockMessageSwitch{}
alicePeer, bobChan, cleanUp, err := createTestPeer(
notifier, broadcastTxChan, noUpdate,
notifier, broadcastTxChan, noUpdate, mockSwitch,
)
if err != nil {
t.Fatalf("unable to create test channels: %v", err)
}
defer cleanUp()
chanID := lnwire.NewChanIDFromOutPoint(bobChan.ChannelPoint())
mockLink := newMockUpdateHandler(chanID)
mockSwitch.links = append(mockSwitch.links, mockLink)
// We make the initiator send a shutdown request.
updateChan := make(chan interface{}, 1)
errChan := make(chan error, 1)
@@ -496,7 +517,6 @@ func TestPeerChannelClosureFeeNegotiationsInitiator(t *testing.T) {
aliceDeliveryScript := shutdownMsg.Address
// Bob will answer the Shutdown message with his own Shutdown.
chanID := lnwire.NewChanIDFromOutPoint(bobChan.ChannelPoint())
respShutdown := lnwire.NewShutdown(chanID, dummyDeliveryScript)
alicePeer.chanCloseMsgs <- &closeMsg{
cid: chanID,
@@ -793,20 +813,27 @@ func TestCustomShutdownScript(t *testing.T) {
}
broadcastTxChan := make(chan *wire.MsgTx)
mockSwitch := &mockMessageSwitch{}
// Open a channel.
alicePeer, bobChan, cleanUp, err := createTestPeer(
notifier, broadcastTxChan, test.update,
mockSwitch,
)
if err != nil {
t.Fatalf("unable to create test channels: %v", err)
}
defer cleanUp()
chanPoint := bobChan.ChannelPoint()
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
mockLink := newMockUpdateHandler(chanID)
mockSwitch.links = append(mockSwitch.links, mockLink)
// Request initiator to cooperatively close the channel, with
// a specified delivery address.
updateChan := make(chan interface{}, 1)
errChan := make(chan error, 1)
chanPoint := bobChan.ChannelPoint()
closeCommand := htlcswitch.ChanClose{
CloseType: htlcswitch.CloseRegular,
ChanPoint: chanPoint,

View File

@@ -54,7 +54,8 @@ var noUpdate = func(a, b *channeldb.OpenChannel) {}
// an updateChan function which can be used to modify the default values on
// the channel states for each peer.
func createTestPeer(notifier chainntnfs.ChainNotifier,
publTx chan *wire.MsgTx, updateChan func(a, b *channeldb.OpenChannel)) (
publTx chan *wire.MsgTx, updateChan func(a, b *channeldb.OpenChannel),
mockSwitch *mockMessageSwitch) (
*Brontide, *lnwallet.LightningChannel, func(), error) {
aliceKeyPriv, aliceKeyPub := btcec.PrivKeyFromBytes(
@@ -306,7 +307,11 @@ func createTestPeer(notifier chainntnfs.ChainNotifier,
},
}
htlcSwitch := &mockMessageSwitch{}
// If mockSwitch is not set by the caller, set it to the default as the
// caller does not need to control it.
if mockSwitch == nil {
mockSwitch = &mockMessageSwitch{}
}
nodeSignerAlice := netann.NewNodeSigner(aliceKeySigner)
@@ -349,7 +354,7 @@ func createTestPeer(notifier chainntnfs.ChainNotifier,
PubKeyBytes: pubKey,
ErrorBuffer: errBuffer,
ChainIO: chainIO,
Switch: htlcSwitch,
Switch: mockSwitch,
ChanActiveTimeout: chanActiveTimeout,
InterceptSwitch: htlcswitch.NewInterceptableSwitch(nil),
@@ -375,7 +380,9 @@ func createTestPeer(notifier chainntnfs.ChainNotifier,
// mockMessageSwitch is a mock implementation of the messageSwitch interface
// used for testing without relying on a *htlcswitch.Switch in unit tests.
type mockMessageSwitch struct{}
type mockMessageSwitch struct {
links []htlcswitch.ChannelUpdateHandler
}
// BestHeight currently returns a dummy value.
func (m *mockMessageSwitch) BestHeight() uint32 {
@@ -397,13 +404,44 @@ func (m *mockMessageSwitch) CreateAndAddLink(cfg htlcswitch.ChannelLinkConfig,
return nil
}
// GetLinksByInterface currently returns dummy values.
// GetLinksByInterface returns the active links.
func (m *mockMessageSwitch) GetLinksByInterface(pub [33]byte) (
[]htlcswitch.ChannelUpdateHandler, error) {
return nil, nil
return m.links, nil
}
// mockUpdateHandler is a mock implementation of the ChannelUpdateHandler
// interface. It is used in mockMessageSwitch's GetLinksByInterface method.
type mockUpdateHandler struct {
cid lnwire.ChannelID
}
// newMockUpdateHandler creates a new mockUpdateHandler.
func newMockUpdateHandler(cid lnwire.ChannelID) *mockUpdateHandler {
return &mockUpdateHandler{
cid: cid,
}
}
// HandleChannelUpdate currently does nothing.
func (m *mockUpdateHandler) HandleChannelUpdate(msg lnwire.Message) {}
// ChanID returns the mockUpdateHandler's cid.
func (m *mockUpdateHandler) ChanID() lnwire.ChannelID { return m.cid }
// Bandwidth currently returns a dummy value.
func (m *mockUpdateHandler) Bandwidth() lnwire.MilliSatoshi { return 0 }
// EligibleToForward currently returns a dummy value.
func (m *mockUpdateHandler) EligibleToForward() bool { return false }
// MayAddOutgoingHtlc currently returns nil.
func (m *mockUpdateHandler) MayAddOutgoingHtlc() error { return nil }
// ShutdownIfChannelClean currently returns nil.
func (m *mockUpdateHandler) ShutdownIfChannelClean() error { return nil }
type mockMessageConn struct {
t *testing.T