From 0db9045e11becc431f9331122f2148b16bb86ec6 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Sun, 16 Sep 2018 10:40:08 +0200 Subject: [PATCH 01/21] funding test: mark assert methods t.Helper --- fundingmanager_test.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/fundingmanager_test.go b/fundingmanager_test.go index 4fb31ee88..dee78d083 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -672,6 +672,8 @@ func fundChannel(t *testing.T, alice, bob *testNode, localFundingAmt, } func assertErrorNotSent(t *testing.T, msgChan chan lnwire.Message) { + t.Helper() + select { case <-msgChan: t.Fatalf("error sent unexpectedly") @@ -681,6 +683,8 @@ func assertErrorNotSent(t *testing.T, msgChan chan lnwire.Message) { } func assertErrorSent(t *testing.T, msgChan chan lnwire.Message) { + t.Helper() + var msg lnwire.Message select { case msg = <-msgChan: @@ -741,6 +745,8 @@ func assertFundingMsgSent(t *testing.T, msgChan chan lnwire.Message, func assertNumPendingReservations(t *testing.T, node *testNode, peerPubKey *btcec.PublicKey, expectedNum int) { + t.Helper() + serializedPubKey := newSerializedKey(peerPubKey) actualNum := len(node.fundingMgr.activeReservations[serializedPubKey]) if actualNum == expectedNum { @@ -753,6 +759,8 @@ func assertNumPendingReservations(t *testing.T, node *testNode, } func assertNumPendingChannelsBecomes(t *testing.T, node *testNode, expectedNum int) { + t.Helper() + var numPendingChans int for i := 0; i < testPollNumTries; i++ { // If this is not the first try, sleep before retrying. @@ -777,6 +785,8 @@ func assertNumPendingChannelsBecomes(t *testing.T, node *testNode, expectedNum i } func assertNumPendingChannelsRemains(t *testing.T, node *testNode, expectedNum int) { + t.Helper() + var numPendingChans int for i := 0; i < 5; i++ { // If this is not the first try, sleep before retrying. @@ -800,6 +810,7 @@ func assertNumPendingChannelsRemains(t *testing.T, node *testNode, expectedNum i func assertDatabaseState(t *testing.T, node *testNode, fundingOutPoint *wire.OutPoint, expectedState channelOpeningState) { + t.Helper() var state channelOpeningState var err error @@ -832,18 +843,24 @@ func assertDatabaseState(t *testing.T, node *testNode, func assertMarkedOpen(t *testing.T, alice, bob *testNode, fundingOutPoint *wire.OutPoint) { + t.Helper() + assertDatabaseState(t, alice, fundingOutPoint, markedOpen) assertDatabaseState(t, bob, fundingOutPoint, markedOpen) } func assertFundingLockedSent(t *testing.T, alice, bob *testNode, fundingOutPoint *wire.OutPoint) { + t.Helper() + assertDatabaseState(t, alice, fundingOutPoint, fundingLockedSent) assertDatabaseState(t, bob, fundingOutPoint, fundingLockedSent) } func assertAddedToRouterGraph(t *testing.T, alice, bob *testNode, fundingOutPoint *wire.OutPoint) { + t.Helper() + assertDatabaseState(t, alice, fundingOutPoint, addedToRouterGraph) assertDatabaseState(t, bob, fundingOutPoint, addedToRouterGraph) } @@ -950,6 +967,8 @@ func assertChannelAnnouncements(t *testing.T, alice, bob *testNode, } func assertAnnouncementSignatures(t *testing.T, alice, bob *testNode) { + t.Helper() + // After the FundingLocked message is sent and six confirmations have // been reached, the channel will be announced to the greater network // by having the nodes exchange announcement signatures. @@ -1006,6 +1025,7 @@ func waitForOpenUpdate(t *testing.T, updateChan chan *lnrpc.OpenStatusUpdate) { func assertNoChannelState(t *testing.T, alice, bob *testNode, fundingOutPoint *wire.OutPoint) { + t.Helper() assertErrChannelNotFound(t, alice, fundingOutPoint) assertErrChannelNotFound(t, bob, fundingOutPoint) @@ -1013,6 +1033,7 @@ func assertNoChannelState(t *testing.T, alice, bob *testNode, func assertErrChannelNotFound(t *testing.T, node *testNode, fundingOutPoint *wire.OutPoint) { + t.Helper() var state channelOpeningState var err error @@ -1036,6 +1057,8 @@ func assertErrChannelNotFound(t *testing.T, node *testNode, } func assertHandleFundingLocked(t *testing.T, alice, bob *testNode) { + t.Helper() + // They should both send the new channel state to their peer. select { case c := <-alice.newChannels: From 5d1a7b0e7d810ee2ace264450e53d3723ab021be Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Sun, 16 Sep 2018 10:40:09 +0200 Subject: [PATCH 02/21] funding test: correctly send remote peer on connected chan --- fundingmanager_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fundingmanager_test.go b/fundingmanager_test.go index dee78d083..90779918c 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -394,7 +394,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, f.cfg.NotifyWhenOnline = func(peer [33]byte, connectedChan chan<- lnpeer.Peer) { - connectedChan <- testNode + connectedChan <- testNode.remotePeer } return testNode, nil From e189f7856731dafef1dd4469ab6933503a5fa8c0 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Sun, 16 Sep 2018 10:40:09 +0200 Subject: [PATCH 03/21] fundingmanager: remove dead code Pending channels will never have a channel opening state, and hence will never be pending. --- fundingmanager.go | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index db159fae3..8ccf80a8f 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -613,17 +613,6 @@ func (f *fundingManager) start() error { fndgLog.Debugf("channel (%v) with opening state %v found", chanID, channelState) - if channel.IsPending { - // Set up the channel barriers again, to make sure - // waitUntilChannelOpen correctly waits until the - // opening process is completely over. - f.barrierMtx.Lock() - fndgLog.Tracef("Loading pending ChannelPoint(%v), "+ - "creating chan barrier", channel.FundingOutpoint) - f.newChanBarriers[chanID] = make(chan struct{}) - f.barrierMtx.Unlock() - } - // If we did find the channel in the opening state database, we // have seen the funding transaction being confirmed, but we // did not finish the rest of the setup procedure before we shut From 08cb31393482141eb832d5ec2f2b91e4f5243649 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Sun, 16 Sep 2018 10:40:09 +0200 Subject: [PATCH 04/21] fundingmanager: move startup state check into advanceFundingState method This commit extracts the funding state check we do at startup into a new method advanceFundingState. In later commits we will modify this method to work for all funding state machine flows, not only on restart. --- fundingmanager.go | 206 +++++++++++++++++++++++++--------------------- 1 file changed, 110 insertions(+), 96 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index 8ccf80a8f..4b40edf14 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -598,104 +598,10 @@ func (f *fundingManager) start() error { } for _, channel := range openChannels { - channelState, shortChanID, err := f.getChannelOpeningState( - &channel.FundingOutpoint) - if err == ErrChannelNotFound { - // Channel not in fundingManager's opening database, - // meaning it was successfully announced to the - // network. - continue - } else if err != nil { + err = f.advanceFundingState(channel) + if err != nil { return err } - - chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint) - fndgLog.Debugf("channel (%v) with opening state %v found", - chanID, channelState) - - // If we did find the channel in the opening state database, we - // have seen the funding transaction being confirmed, but we - // did not finish the rest of the setup procedure before we shut - // down. We handle the remaining steps of this setup by - // continuing the procedure where we left off. - switch channelState { - case markedOpen: - // The funding transaction was confirmed, but we did not - // successfully send the fundingLocked message to the - // peer, so let's do that now. - f.wg.Add(1) - go func(dbChan *channeldb.OpenChannel) { - defer f.wg.Done() - - peerChan := make(chan lnpeer.Peer, 1) - - var peerKey [33]byte - copy(peerKey[:], dbChan.IdentityPub.SerializeCompressed()) - - f.cfg.NotifyWhenOnline(peerKey, peerChan) - - var peer lnpeer.Peer - select { - case peer = <-peerChan: - case <-f.quit: - return - } - err := f.handleFundingConfirmation( - peer, dbChan, shortChanID, - ) - if err != nil { - fndgLog.Errorf("Failed to handle "+ - "funding confirmation: %v", err) - return - } - }(channel) - - case fundingLockedSent: - // fundingLocked was sent to peer, but the channel - // was not added to the router graph and the channel - // announcement was not sent. - f.wg.Add(1) - go func(dbChan *channeldb.OpenChannel) { - defer f.wg.Done() - - err = f.addToRouterGraph(dbChan, shortChanID) - if err != nil { - fndgLog.Errorf("failed adding to "+ - "router graph: %v", err) - return - } - - // TODO(halseth): should create a state machine - // that can more easily be resumed from - // different states, to avoid this code - // duplication. - err = f.annAfterSixConfs(dbChan, shortChanID) - if err != nil { - fndgLog.Errorf("error sending channel "+ - "announcements: %v", err) - return - } - }(channel) - - case addedToRouterGraph: - // The channel was added to the Router's topology, but - // the channel announcement was not sent. - f.wg.Add(1) - go func(dbChan *channeldb.OpenChannel) { - defer f.wg.Done() - - err = f.annAfterSixConfs(dbChan, shortChanID) - if err != nil { - fndgLog.Errorf("error sending channel "+ - "announcement: %v", err) - return - } - }(channel) - - default: - fndgLog.Errorf("undefined channelState: %v", - channelState) - } } f.wg.Add(1) // TODO(roasbeef): tune @@ -920,6 +826,114 @@ func (f *fundingManager) reservationCoordinator() { } } +// advanceFundingState will advance the channel fromt the markedOpen state to +// the point where the channel is ready for operation. This includes sending +// funding locked to the peer, adding the channel to the router graph, and +// announcing the channel. +func (f *fundingManager) advanceFundingState( + channel *channeldb.OpenChannel) error { + + channelState, shortChanID, err := f.getChannelOpeningState( + &channel.FundingOutpoint) + if err == ErrChannelNotFound { + // Channel not in fundingManager's opening database, + // meaning it was successfully announced to the + // network. + return nil + } else if err != nil { + return err + } + + chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint) + fndgLog.Debugf("channel (%v) with opening state %v found", + chanID, channelState) + + // If we did find the channel in the opening state database, we have + // seen the funding transaction being confirmed, but we did not finish + // the rest of the setup procedure before we shut down. We handle the + // remaining steps of this setup by continuing the procedure where we + // left off. + switch channelState { + + // The funding transaction was confirmed, but we did not successfully + // send the fundingLocked message to the peer, so let's do that now. + case markedOpen: + f.wg.Add(1) + go func(dbChan *channeldb.OpenChannel) { + defer f.wg.Done() + + peerChan := make(chan lnpeer.Peer, 1) + + var peerKey [33]byte + copy(peerKey[:], dbChan.IdentityPub.SerializeCompressed()) + + f.cfg.NotifyWhenOnline(peerKey, peerChan) + + var peer lnpeer.Peer + select { + case peer = <-peerChan: + case <-f.quit: + return + } + err := f.handleFundingConfirmation( + peer, dbChan, shortChanID, + ) + if err != nil { + fndgLog.Errorf("Failed to handle "+ + "funding confirmation: %v", err) + return + } + }(channel) + + // fundingLocked was sent to peer, but the channel was not added to the + // router graph and the channel announcement was not sent. + case fundingLockedSent: + f.wg.Add(1) + go func(dbChan *channeldb.OpenChannel) { + defer f.wg.Done() + + err = f.addToRouterGraph(dbChan, shortChanID) + if err != nil { + fndgLog.Errorf("failed adding to "+ + "router graph: %v", err) + return + } + + // TODO(halseth): should create a state machine + // that can more easily be resumed from + // different states, to avoid this code + // duplication. + err = f.annAfterSixConfs(dbChan, shortChanID) + if err != nil { + fndgLog.Errorf("error sending channel "+ + "announcements: %v", err) + return + } + }(channel) + + // The channel was added to the Router's topology, but the channel + // announcement was not sent. + case addedToRouterGraph: + f.wg.Add(1) + go func(dbChan *channeldb.OpenChannel) { + defer f.wg.Done() + + err = f.annAfterSixConfs(dbChan, shortChanID) + if err != nil { + fndgLog.Errorf("error sending channel "+ + "announcement: %v", err) + return + } + }(channel) + + default: + return fmt.Errorf("undefined channelState: %v", + channelState) + } + + return nil +} + // handlePendingChannels responds to a request for details concerning all // currently pending channels waiting for the final phase of the funding // workflow (funding txn confirmation). From cbf1fe6bb1dbc12a9dd6ad29cd5350c9b152e284 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Sun, 16 Sep 2018 10:40:09 +0200 Subject: [PATCH 05/21] fundingmanager: define stateStep, make advanceFundingState sync This commit make the advanceFundingStateMethod synchronous. It will now query the database for a channel's opening state, and call the method stateStep until the channel has finished the opening procedure. --- fundingmanager.go | 181 +++++++++++++++++++++++++--------------------- 1 file changed, 100 insertions(+), 81 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index 4b40edf14..a8fc7eaca 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -598,10 +598,8 @@ func (f *fundingManager) start() error { } for _, channel := range openChannels { - err = f.advanceFundingState(channel) - if err != nil { - return err - } + f.wg.Add(1) + go f.advanceFundingState(channel, nil) } f.wg.Add(1) // TODO(roasbeef): tune @@ -829,109 +827,130 @@ func (f *fundingManager) reservationCoordinator() { // advanceFundingState will advance the channel fromt the markedOpen state to // the point where the channel is ready for operation. This includes sending // funding locked to the peer, adding the channel to the router graph, and -// announcing the channel. -func (f *fundingManager) advanceFundingState( - channel *channeldb.OpenChannel) error { +// announcing the channel. The updateChan can be set non-nil to get +// OpenStatusUpdates. +// +// NOTE: This MUST be run as a goroutine. +func (f *fundingManager) advanceFundingState(channel *channeldb.OpenChannel, + updateChan chan<- *lnrpc.OpenStatusUpdate) { - channelState, shortChanID, err := f.getChannelOpeningState( - &channel.FundingOutpoint) - if err == ErrChannelNotFound { - // Channel not in fundingManager's opening database, - // meaning it was successfully announced to the - // network. - return nil - } else if err != nil { - return err + defer f.wg.Done() + + // We create the state-machine object which wraps the database state. + lnChannel, err := lnwallet.NewLightningChannel( + nil, channel, nil, + ) + if err != nil { + fndgLog.Errorf("Unable to create LightningChannel(%v): %v", + channel.FundingOutpoint, err) + return } - chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint) - fndgLog.Debugf("channel (%v) with opening state %v found", - chanID, channelState) + for { + channelState, shortChanID, err := f.getChannelOpeningState( + &channel.FundingOutpoint, + ) + if err == ErrChannelNotFound { + // Channel not in fundingManager's opening database, + // meaning it was successfully announced to the + // network. + return + } else if err != nil { + fndgLog.Errorf("Unable to query database for "+ + "channel opening state(%v): %v", + channel.FundingOutpoint, err) + return + } + + // If we did find the channel in the opening state database, we + // have seen the funding transaction being confirmed, but there + // are still steps left of the setup procedure. We continue the + // procedure where we left off. + err = f.stateStep( + channel, lnChannel, shortChanID, channelState, + updateChan, + ) + if err != nil { + fndgLog.Errorf("Unable to advance state(%v): %v", + channel.FundingOutpoint, err) + return + } + } +} + +// stateStep advances the confirmed channel one step in the funding state +// machine. This method is synchronous and the new channel opening state will +// have been written to the database when it successfully returns. The +// updateChan can be set non-nil to get OpenStatusUpdates. +func (f *fundingManager) stateStep(channel *channeldb.OpenChannel, + lnChannel *lnwallet.LightningChannel, + shortChanID *lnwire.ShortChannelID, channelState channelOpeningState, + updateChan chan<- *lnrpc.OpenStatusUpdate) error { + + chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint) + fndgLog.Debugf("Channel(%v) with ShortChanID %v has opening state %v", + chanID, shortChanID, channelState) - // If we did find the channel in the opening state database, we have - // seen the funding transaction being confirmed, but we did not finish - // the rest of the setup procedure before we shut down. We handle the - // remaining steps of this setup by continuing the procedure where we - // left off. switch channelState { // The funding transaction was confirmed, but we did not successfully // send the fundingLocked message to the peer, so let's do that now. case markedOpen: - f.wg.Add(1) - go func(dbChan *channeldb.OpenChannel) { - defer f.wg.Done() + peerChan := make(chan lnpeer.Peer, 1) - peerChan := make(chan lnpeer.Peer, 1) + var peerKey [33]byte + copy(peerKey[:], channel.IdentityPub.SerializeCompressed()) - var peerKey [33]byte - copy(peerKey[:], dbChan.IdentityPub.SerializeCompressed()) + f.cfg.NotifyWhenOnline(peerKey, peerChan) - f.cfg.NotifyWhenOnline(peerKey, peerChan) + var peer lnpeer.Peer + select { + case peer = <-peerChan: + case <-f.quit: + return ErrFundingManagerShuttingDown + } - var peer lnpeer.Peer - select { - case peer = <-peerChan: - case <-f.quit: - return - } - err := f.handleFundingConfirmation( - peer, dbChan, shortChanID, - ) - if err != nil { - fndgLog.Errorf("Failed to handle "+ - "funding confirmation: %v", err) - return - } - }(channel) + err := f.sendFundingLocked( + peer, channel, lnChannel, shortChanID, + ) + if err != nil { + return fmt.Errorf("failed sending fundingLocked: %v", + err) + } + + fndgLog.Debugf("Channel(%v) with ShortChanID %v: successfully "+ + "sent FundingLocked", chanID, shortChanID) + return nil // fundingLocked was sent to peer, but the channel was not added to the // router graph and the channel announcement was not sent. case fundingLockedSent: - f.wg.Add(1) - go func(dbChan *channeldb.OpenChannel) { - defer f.wg.Done() + err := f.addToRouterGraph(channel, shortChanID) + if err != nil { + return fmt.Errorf("failed adding to "+ + "router graph: %v", err) + } - err = f.addToRouterGraph(dbChan, shortChanID) - if err != nil { - fndgLog.Errorf("failed adding to "+ - "router graph: %v", err) - return - } + fndgLog.Debugf("Channel(%v) with ShortChanID %v: successfully "+ + "added to router graph", chanID, shortChanID) - // TODO(halseth): should create a state machine - // that can more easily be resumed from - // different states, to avoid this code - // duplication. - err = f.annAfterSixConfs(dbChan, shortChanID) - if err != nil { - fndgLog.Errorf("error sending channel "+ - "announcements: %v", err) - return - } - }(channel) + return nil // The channel was added to the Router's topology, but the channel // announcement was not sent. case addedToRouterGraph: - f.wg.Add(1) - go func(dbChan *channeldb.OpenChannel) { - defer f.wg.Done() + err := f.annAfterSixConfs(channel, shortChanID) + if err != nil { + return fmt.Errorf("error sending channel "+ + "announcement: %v", err) + } - err = f.annAfterSixConfs(dbChan, shortChanID) - if err != nil { - fndgLog.Errorf("error sending channel "+ - "announcement: %v", err) - return - } - }(channel) - - default: - return fmt.Errorf("undefined channelState: %v", - channelState) + fndgLog.Debugf("Channel(%v) with ShortChanID %v: successfully "+ + "announced", chanID, shortChanID) + return nil } - return nil + return fmt.Errorf("undefined channelState: %v", channelState) } // handlePendingChannels responds to a request for details concerning all From e274c2fb7ca3dc07f65202f5a113e42b6d2237fd Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Sun, 16 Sep 2018 10:40:09 +0200 Subject: [PATCH 06/21] fundingmanager: call advanceFundingState from handleFundingSigned Since the advanceFundingSigned now can resume a channel from any state, we resue the logic in handleFundingSigned instead of manually executing each step of the funding flow. --- fundingmanager.go | 85 ++++++++++++++++------------------------------- 1 file changed, 29 insertions(+), 56 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index a8fc7eaca..d875ebe86 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -934,6 +934,33 @@ func (f *fundingManager) stateStep(channel *channeldb.OpenChannel, fndgLog.Debugf("Channel(%v) with ShortChanID %v: successfully "+ "added to router graph", chanID, shortChanID) + // Give the caller a final update notifying them that + // the channel is now open. + // TODO(roasbeef): only notify after recv of funding locked? + fundingPoint := channel.FundingOutpoint + cp := &lnrpc.ChannelPoint{ + FundingTxid: &lnrpc.ChannelPoint_FundingTxidBytes{ + FundingTxidBytes: fundingPoint.Hash[:], + }, + OutputIndex: fundingPoint.Index, + } + + if updateChan != nil { + upd := &lnrpc.OpenStatusUpdate{ + Update: &lnrpc.OpenStatusUpdate_ChanOpen{ + ChanOpen: &lnrpc.ChannelOpenUpdate{ + ChannelPoint: cp, + }, + }, + } + + select { + case updateChan <- upd: + case <-f.quit: + return ErrFundingManagerShuttingDown + } + } + return nil // The channel was added to the Router's topology, but the channel @@ -1742,62 +1769,8 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { fndgLog.Debugf("Channel with ShortChanID %v now confirmed", shortChanID.ToUint64()) - // Go on adding the channel to the channel graph, and crafting - // channel announcements. - lnChannel, err := lnwallet.NewLightningChannel( - nil, completeChan, nil, - ) - if err != nil { - fndgLog.Errorf("failed creating lnChannel: %v", err) - return - } - - err = f.sendFundingLocked( - fmsg.peer, completeChan, lnChannel, shortChanID, - ) - if err != nil { - fndgLog.Errorf("failed sending fundingLocked: %v", err) - return - } - fndgLog.Debugf("FundingLocked for channel with ShortChanID "+ - "%v sent", shortChanID.ToUint64()) - - err = f.addToRouterGraph(completeChan, shortChanID) - if err != nil { - fndgLog.Errorf("failed adding to router graph: %v", err) - return - } - fndgLog.Debugf("Channel with ShortChanID %v added to "+ - "router graph", shortChanID.ToUint64()) - - // Give the caller a final update notifying them that - // the channel is now open. - // TODO(roasbeef): only notify after recv of funding locked? - upd := &lnrpc.OpenStatusUpdate{ - Update: &lnrpc.OpenStatusUpdate_ChanOpen{ - ChanOpen: &lnrpc.ChannelOpenUpdate{ - ChannelPoint: &lnrpc.ChannelPoint{ - FundingTxid: &lnrpc.ChannelPoint_FundingTxidBytes{ - FundingTxidBytes: fundingPoint.Hash[:], - }, - OutputIndex: fundingPoint.Index, - }, - }, - }, - } - - select { - case resCtx.updates <- upd: - case <-f.quit: - return - } - - err = f.annAfterSixConfs(completeChan, shortChanID) - if err != nil { - fndgLog.Errorf("failed sending channel announcement: %v", - err) - return - } + f.wg.Add(1) + go f.advanceFundingState(completeChan, resCtx.updates) }() } From 49bbf0eb61eb18882f1d4a693dabf2850ecdd48e Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Sun, 16 Sep 2018 10:40:09 +0200 Subject: [PATCH 07/21] fundingmanager: call advanceFundingState in place of handleFundingConfirmation This commit removes the handleFundingConfirmation method, and instead hands the newly confirmed channel of to advanceFundingState, which will take the channel through the rest of the channel opening flow. --- fundingmanager.go | 92 +++++++++++------------------------------------ 1 file changed, 21 insertions(+), 71 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index d875ebe86..0394ba435 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -525,6 +525,8 @@ func (f *fundingManager) start() error { go func(ch *channeldb.OpenChannel) { go f.waitForFundingWithTimeout(ch, confChan, timeoutChan) + var shortChanID *lnwire.ShortChannelID + var ok bool select { case <-timeoutChan: // Timeout channel will be triggered if the number of blocks @@ -547,43 +549,30 @@ func (f *fundingManager) start() error { if err := ch.CloseChannel(closeInfo); err != nil { fndgLog.Errorf("Failed closing channel "+ "%v: %v", ch.FundingOutpoint, err) + return } + return case <-f.quit: // The fundingManager is shutting down, and will // resume wait on startup. - case shortChanID, ok := <-confChan: + return + + case shortChanID, ok = <-confChan: if !ok { fndgLog.Errorf("Waiting for funding" + "confirmation failed") return } - - // The funding transaction has confirmed, so - // we'll attempt to retrieve the remote peer - // to complete the rest of the funding flow. - peerChan := make(chan lnpeer.Peer, 1) - - var peerKey [33]byte - copy(peerKey[:], ch.IdentityPub.SerializeCompressed()) - - f.cfg.NotifyWhenOnline(peerKey, peerChan) - - var peer lnpeer.Peer - select { - case peer = <-peerChan: - case <-f.quit: - return - } - err := f.handleFundingConfirmation( - peer, ch, shortChanID, - ) - if err != nil { - fndgLog.Errorf("Failed to handle "+ - "funding confirmation: %v", err) - return - } + // Fallthrough. } + + // Success, funding transaction was confirmed. + fndgLog.Debugf("ChannelID(%v) is now fully confirmed! "+ + "(shortChanID=%v)", chanID, shortChanID) + + f.wg.Add(1) + go f.advanceFundingState(ch, nil) }(channel) } @@ -1604,14 +1593,12 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { } // Success, funding transaction was confirmed. - err := f.handleFundingConfirmation( - fmsg.peer, completeChan, shortChanID, - ) - if err != nil { - fndgLog.Errorf("failed to handle funding"+ - "confirmation: %v", err) - return - } + fndgLog.Debugf("ChannelID(%v) is now fully confirmed! "+ + "(shortChanID=%v)", channelID, shortChanID) + + f.wg.Add(1) + go f.advanceFundingState(completeChan, nil) + }() } @@ -2020,43 +2007,6 @@ func (f *fundingManager) waitForFundingConfirmation( f.localDiscoveryMtx.Unlock() } -// handleFundingConfirmation is a wrapper method for creating a new -// lnwallet.LightningChannel object, calling sendFundingLocked, -// addToRouterGraph, and annAfterSixConfs. This is called after the funding -// transaction is confirmed. -func (f *fundingManager) handleFundingConfirmation(peer lnpeer.Peer, - completeChan *channeldb.OpenChannel, - shortChanID *lnwire.ShortChannelID) error { - - // We create the state-machine object which wraps the database state. - lnChannel, err := lnwallet.NewLightningChannel( - nil, completeChan, nil, - ) - if err != nil { - return err - } - - chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint) - - fndgLog.Debugf("ChannelID(%v) is now fully confirmed!", chanID) - - err = f.sendFundingLocked(peer, completeChan, lnChannel, shortChanID) - if err != nil { - return fmt.Errorf("failed sending fundingLocked: %v", err) - } - err = f.addToRouterGraph(completeChan, shortChanID) - if err != nil { - return fmt.Errorf("failed adding to router graph: %v", err) - } - err = f.annAfterSixConfs(completeChan, shortChanID) - if err != nil { - return fmt.Errorf("failed sending channel announcement: %v", - err) - } - - return nil -} - // sendFundingLocked creates and sends the fundingLocked message. // This should be called after the funding transaction has been confirmed, // and the channelState is 'markedOpen'. From b2a7e42f449eb6b6651e56cd1eccc196b0b6b14e Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Sun, 16 Sep 2018 10:40:09 +0200 Subject: [PATCH 08/21] fundingmanager: commit to new states in stateStep This commit moves the saving of the new channelOpeningState to the stateStep method. --- fundingmanager.go | 70 ++++++++++++++++++++++++++--------------------- 1 file changed, 39 insertions(+), 31 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index 0394ba435..29640c318 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -907,8 +907,22 @@ func (f *fundingManager) stateStep(channel *channeldb.OpenChannel, err) } + // As the fundingLocked message is now sent to the peer, the + // channel is moved to the next state of the state machine. It + // will be moved to the last state (actually deleted from the + // database) after the channel is finally announced. + err = f.saveChannelOpeningState( + &channel.FundingOutpoint, fundingLockedSent, + shortChanID, + ) + if err != nil { + return fmt.Errorf("error setting channel state to"+ + " fundingLockedSent: %v", err) + } + fndgLog.Debugf("Channel(%v) with ShortChanID %v: successfully "+ "sent FundingLocked", chanID, shortChanID) + return nil // fundingLocked was sent to peer, but the channel was not added to the @@ -920,6 +934,19 @@ func (f *fundingManager) stateStep(channel *channeldb.OpenChannel, "router graph: %v", err) } + // As the channel is now added to the ChannelRouter's topology, + // the channel is moved to the next state of the state machine. + // It will be moved to the last state (actually deleted from + // the database) after the channel is finally announced. + err = f.saveChannelOpeningState( + &channel.FundingOutpoint, addedToRouterGraph, + shortChanID, + ) + if err != nil { + return fmt.Errorf("error setting channel state to"+ + " addedToRouterGraph: %v", err) + } + fndgLog.Debugf("Channel(%v) with ShortChanID %v: successfully "+ "added to router graph", chanID, shortChanID) @@ -961,8 +988,20 @@ func (f *fundingManager) stateStep(channel *channeldb.OpenChannel, "announcement: %v", err) } + // We delete the channel opening state from our internal + // database as the opening process has succeeded. We can do + // this because we assume the AuthenticatedGossiper queues the + // announcement messages, and persists them in case of a daemon + // shutdown. + err = f.deleteChannelOpeningState(&channel.FundingOutpoint) + if err != nil { + return fmt.Errorf("error deleting channel state: %v", + err) + } + fndgLog.Debugf("Channel(%v) with ShortChanID %v: successfully "+ "announced", chanID, shortChanID) + return nil } @@ -2068,17 +2107,6 @@ func (f *fundingManager) sendFundingLocked(peer lnpeer.Peer, } } - // As the fundingLocked message is now sent to the peer, the channel is - // moved to the next state of the state machine. It will be moved to the - // last state (actually deleted from the database) after the channel is - // finally announced. - err = f.saveChannelOpeningState(&completeChan.FundingOutpoint, - fundingLockedSent, shortChanID) - if err != nil { - return fmt.Errorf("error setting channel state to"+ - " fundingLockedSent: %v", err) - } - return nil } @@ -2157,17 +2185,6 @@ func (f *fundingManager) addToRouterGraph(completeChan *channeldb.OpenChannel, return ErrFundingManagerShuttingDown } - // As the channel is now added to the ChannelRouter's topology, the - // channel is moved to the next state of the state machine. It will be - // moved to the last state (actually deleted from the database) after - // the channel is finally announced. - err = f.saveChannelOpeningState(&completeChan.FundingOutpoint, - addedToRouterGraph, shortChanID) - if err != nil { - return fmt.Errorf("error setting channel state to"+ - " addedToRouterGraph: %v", err) - } - return nil } @@ -2309,15 +2326,6 @@ func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel, "announced", &fundingPoint, shortChanID) } - // We delete the channel opening state from our internal database - // as the opening process has succeeded. We can do this because we - // assume the AuthenticatedGossiper queues the announcement messages, - // and persists them in case of a daemon shutdown. - err := f.deleteChannelOpeningState(&completeChan.FundingOutpoint) - if err != nil { - return fmt.Errorf("error deleting channel state: %v", err) - } - return nil } From b51b76b46987c414620906d17ec8f3f0385ea04a Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Sun, 16 Sep 2018 10:40:10 +0200 Subject: [PATCH 09/21] fundingmanager: extract opening logic into new method handleFundingConfirmation This commit moves the opening logic found within waitForFundingConfirmation into a new method handleFundingConfirmation. This will make it easier to later break up waitForFundingConfirmation, and avoid code duplication. --- fundingmanager.go | 41 +++++++++++++++++++++++++++++------------ 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index 29640c318..5ef93a384 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -1990,12 +1990,35 @@ func (f *fundingManager) waitForFundingConfirmation( TxPosition: uint16(fundingPoint.Index), } + err = f.handleFundingConfirmation(completeChan, shortChanID) + if err != nil { + fndgLog.Errorf("Unable to handle funding confirmation: %v", err) + return + } + + select { + case confChan <- &shortChanID: + case <-f.quit: + return + } +} + +// handleFundingConfirmation marks a channel as open in the database, and set +// the channelOpeningState markedOpen. In addition it will report the now +// decided short channel ID to the switch, and close the local discovery signal +// for this channel. +func (f *fundingManager) handleFundingConfirmation( + completeChan *channeldb.OpenChannel, + shortChanID lnwire.ShortChannelID) error { + + fundingPoint := completeChan.FundingOutpoint + chanID := lnwire.NewChanIDFromOutPoint(&fundingPoint) + // Now that the channel has been fully confirmed, we'll mark it as open // within the database. if err := completeChan.MarkAsOpen(shortChanID); err != nil { - fndgLog.Errorf("error setting channel pending flag to false: "+ + return fmt.Errorf("error setting channel pending flag to false: "+ "%v", err) - return } // Inform the ChannelNotifier that the channel has transitioned from @@ -2013,12 +2036,10 @@ func (f *fundingManager) waitForFundingConfirmation( // TODO(halseth): make the two db transactions (MarkChannelAsOpen and // saveChannelOpeningState) atomic by doing them in the same transaction. // Needed to be properly fault-tolerant. - err = f.saveChannelOpeningState(&completeChan.FundingOutpoint, markedOpen, - &shortChanID) + err := f.saveChannelOpeningState(&fundingPoint, markedOpen, &shortChanID) if err != nil { - fndgLog.Errorf("error setting channel state to markedOpen: %v", + return fmt.Errorf("error setting channel state to markedOpen: %v", err) - return } // As there might already be an active link in the switch with an @@ -2029,12 +2050,6 @@ func (f *fundingManager) waitForFundingConfirmation( fndgLog.Errorf("unable to report short chan id: %v", err) } - select { - case confChan <- &shortChanID: - case <-f.quit: - return - } - // Close the discoverySignal channel, indicating to a separate // goroutine that the channel now is marked as open in the database // and that it is acceptable to process funding locked messages @@ -2044,6 +2059,8 @@ func (f *fundingManager) waitForFundingConfirmation( close(discoverySignal) } f.localDiscoveryMtx.Unlock() + + return nil } // sendFundingLocked creates and sends the fundingLocked message. From 25f60948021c06869550cb09d98b01945e8a2ea4 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Sun, 16 Sep 2018 10:40:10 +0200 Subject: [PATCH 10/21] fungingmanager: move handleFundingConfirmation out of waitForfundingConfirmation This commit moves the handling of a funding confirmation out of waitForFundingConfirmation, and instead let the caller handle marking the channel opened. --- fundingmanager.go | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index 5ef93a384..6d46db4cb 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -1792,6 +1792,14 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { } } + err = f.handleFundingConfirmation(completeChan, *shortChanID) + if err != nil { + fndgLog.Errorf("unable to handle funding confirmation "+ + "for ChannelPoint(%v): %v", + completeChan.FundingOutpoint, err) + return + } + fndgLog.Debugf("Channel with ShortChanID %v now confirmed", shortChanID.ToUint64()) @@ -1873,6 +1881,16 @@ func (f *fundingManager) waitForFundingWithTimeout(completeChan *channeldb.OpenC return } + err = f.handleFundingConfirmation( + completeChan, *shortChanID, + ) + if err != nil { + fndgLog.Errorf("unable to handle funding "+ + "confirmation for ChannelPoint(%v): %v", + completeChan.FundingOutpoint, err) + return + } + select { case confChan <- shortChanID: case <-f.quit: @@ -1990,12 +2008,6 @@ func (f *fundingManager) waitForFundingConfirmation( TxPosition: uint16(fundingPoint.Index), } - err = f.handleFundingConfirmation(completeChan, shortChanID) - if err != nil { - fndgLog.Errorf("Unable to handle funding confirmation: %v", err) - return - } - select { case confChan <- &shortChanID: case <-f.quit: From 76857dbcdc76f9c95d89bdb85623508564b9d3b3 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Sun, 16 Sep 2018 10:40:10 +0200 Subject: [PATCH 11/21] fundingmanager: move handleFundingConfirmation out of waitForFundingWithTimeout Similarly to what we did in the previous commit, we move the responsibility of marking the channel open by calling handleFundingConfirmation out from waitForFundingWithTimeout to the caller. --- fundingmanager.go | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index 6d46db4cb..36450f3e2 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -571,6 +571,14 @@ func (f *fundingManager) start() error { fndgLog.Debugf("ChannelID(%v) is now fully confirmed! "+ "(shortChanID=%v)", chanID, shortChanID) + err = f.handleFundingConfirmation(ch, *shortChanID) + if err != nil { + fndgLog.Errorf("unable to handle funding "+ + "confirmation for ChannelPoint(%v): %v", + ch.FundingOutpoint, err) + return + } + f.wg.Add(1) go f.advanceFundingState(ch, nil) }(channel) @@ -1635,6 +1643,14 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { fndgLog.Debugf("ChannelID(%v) is now fully confirmed! "+ "(shortChanID=%v)", channelID, shortChanID) + err = f.handleFundingConfirmation(completeChan, *shortChanID) + if err != nil { + fndgLog.Errorf("unable to handle funding "+ + "confirmation for ChannelPoint(%v): %v", + completeChan.FundingOutpoint, err) + return + } + f.wg.Add(1) go f.advanceFundingState(completeChan, nil) @@ -1881,16 +1897,6 @@ func (f *fundingManager) waitForFundingWithTimeout(completeChan *channeldb.OpenC return } - err = f.handleFundingConfirmation( - completeChan, *shortChanID, - ) - if err != nil { - fndgLog.Errorf("unable to handle funding "+ - "confirmation for ChannelPoint(%v): %v", - completeChan.FundingOutpoint, err) - return - } - select { case confChan <- shortChanID: case <-f.quit: From ea196f6e8f127760d1d5ffa517bc78e54d3d51e1 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Sun, 16 Sep 2018 10:40:10 +0200 Subject: [PATCH 12/21] fundingmanager make waitForFundingConfirmation decrement waitgroup Since waitForFundingConfirmation is always called in a goroutine, we make this explicit by requireing the caller to always increment the waitgroup before calling it. --- fundingmanager.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index 36450f3e2..bbc284622 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -1789,11 +1789,8 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { // this process to finish (either successfully or with some // error), before the fundingManager can be shut down. f.wg.Add(1) - go func() { - defer f.wg.Done() - f.waitForFundingConfirmation(completeChan, cancelChan, - confChan) - }() + go f.waitForFundingConfirmation(completeChan, cancelChan, + confChan) var shortChanID *lnwire.ShortChannelID var ok bool @@ -1849,11 +1846,8 @@ func (f *fundingManager) waitForFundingWithTimeout(completeChan *channeldb.OpenC // Add this goroutine to wait group so we can be sure that it is // properly stopped before the funding manager can be shut down. f.wg.Add(1) - go func() { - defer f.wg.Done() - f.waitForFundingConfirmation(completeChan, cancelChan, - waitingConfChan) - }() + go f.waitForFundingConfirmation(completeChan, cancelChan, + waitingConfChan) // On block maxHeight we will cancel the funding confirmation wait. maxHeight := completeChan.FundingBroadcastHeight + maxWaitNumBlocksFundingConf @@ -1927,10 +1921,13 @@ func makeFundingScript(channel *channeldb.OpenChannel) ([]byte, error) { // when a channel has become active for lightning transactions. // The wait can be canceled by closing the cancelChan. In case of success, // a *lnwire.ShortChannelID will be passed to confChan. +// +// NOTE: This MUST be run as a goroutine. func (f *fundingManager) waitForFundingConfirmation( completeChan *channeldb.OpenChannel, cancelChan <-chan struct{}, confChan chan<- *lnwire.ShortChannelID) { + defer f.wg.Done() defer close(confChan) // Register with the ChainNotifier for a notification once the funding From 47fae26dc40e192602487607dcdd08787755dd54 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Sun, 16 Sep 2018 10:40:10 +0200 Subject: [PATCH 13/21] fundingmanager: define method waitForTimeout This commit defines a new method waitForTimeout, that will be used to listen for channels timing out. It handles a subset of what is already handled by waitForFundingWithTimeout, but we want to break that one up in smaller parts, and waitForTimeout is the first of these. --- fundingmanager.go | 59 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/fundingmanager.go b/fundingmanager.go index bbc284622..cbe9a72b7 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -2018,6 +2018,65 @@ func (f *fundingManager) waitForFundingConfirmation( } } +// waitForTimeout will close the timeout channel if maxWaitNumBlocksFundingConf +// has passed from the broadcast height of the given channel. In case of error, +// the error is sent on timeoutChan. The wait can be canceled by closing the +// cancelChan. +// +// NOTE: timeoutChan MUST be buffered. +// NOTE: This MUST be run as a goroutine. +func (f *fundingManager) waitForTimeout(completeChan *channeldb.OpenChannel, + cancelChan <-chan struct{}, timeoutChan chan<- error) { + defer f.wg.Done() + + epochClient, err := f.cfg.Notifier.RegisterBlockEpochNtfn(nil) + if err != nil { + timeoutChan <- fmt.Errorf("unable to register for epoch "+ + "notification: %v", err) + return + } + + defer epochClient.Cancel() + + // On block maxHeight we will cancel the funding confirmation wait. + maxHeight := completeChan.FundingBroadcastHeight + maxWaitNumBlocksFundingConf + for { + select { + case epoch, ok := <-epochClient.Epochs: + if !ok { + timeoutChan <- fmt.Errorf("epoch client " + + "shutting down") + return + } + + // Close the timeout channel and exit if the block is + // aboce the max height. + if uint32(epoch.Height) >= maxHeight { + fndgLog.Warnf("Waited for %v blocks without "+ + "seeing funding transaction confirmed,"+ + " cancelling.", + maxWaitNumBlocksFundingConf) + + // Notify the caller of the timeout. + close(timeoutChan) + return + } + + // TODO: If we are the channel initiator implement + // a method for recovering the funds from the funding + // transaction + + case <-cancelChan: + return + + case <-f.quit: + // The fundingManager is shutting down, will resume + // waiting for the funding transaction on startup. + return + } + } +} + // handleFundingConfirmation marks a channel as open in the database, and set // the channelOpeningState markedOpen. In addition it will report the now // decided short channel ID to the switch, and close the local discovery signal From 893c6cbc5935f0d67179bff8a559894bb06546b7 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Sun, 16 Sep 2018 10:40:10 +0200 Subject: [PATCH 14/21] fundingmanager: make waitForFundingTimeout sync This commit makes the waitForFundingTimeout method synchronous, and return ErrConfirmationTimeout in case the timeout is reached. We also simplify the internals by using waitForTimout defined earlier. --- fundingmanager.go | 201 ++++++++++++++++------------------------------ 1 file changed, 70 insertions(+), 131 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index cbe9a72b7..14a883429 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -85,6 +85,12 @@ var ( // been signaled to shut down. ErrFundingManagerShuttingDown = errors.New("funding manager shutting " + "down") + + // ErrConfirmationTimeout is an error returned when we as a responder + // are waiting for a funding transaction to confirm, but too many + // blocks pass without confirmation. + ErrConfirmationTimeout = errors.New("timeout waiting for funding " + + "confirmation") ) // reservationWithCtx encapsulates a pending channel reservation. This wrapper @@ -519,20 +525,20 @@ func (f *fundingManager) start() error { } } - confChan := make(chan *lnwire.ShortChannelID) - timeoutChan := make(chan struct{}) - + f.wg.Add(1) go func(ch *channeldb.OpenChannel) { - go f.waitForFundingWithTimeout(ch, confChan, timeoutChan) + defer f.wg.Done() - var shortChanID *lnwire.ShortChannelID - var ok bool - select { - case <-timeoutChan: - // Timeout channel will be triggered if the number of blocks - // mined since the channel was initiated reaches - // maxWaitNumBlocksFundingConf and we are not the channel - // initiator. + shortChanID, err := f.waitForFundingWithTimeout(ch) + if err == ErrConfirmationTimeout { + fndgLog.Warnf("Timeout waiting for funding "+ + "confirmation of ChannelPoint(%v)", + ch.FundingOutpoint) + + // We'll get a timeout if the number of blocks + // mined since the channel was initiated + // reaches maxWaitNumBlocksFundingConf and we + // are not the channel initiator. localBalance := ch.LocalCommitment.LocalBalance.ToSatoshis() closeInfo := &channeldb.ChannelCloseSummary{ ChainHash: ch.ChainHash, @@ -552,19 +558,11 @@ func (f *fundingManager) start() error { return } return - - case <-f.quit: - // The fundingManager is shutting down, and will - // resume wait on startup. + } else if err != nil { + fndgLog.Errorf("Failed waiting for funding "+ + "confirmation for ChannelPoint(%v): %v", + ch.FundingOutpoint, err) return - - case shortChanID, ok = <-confChan: - if !ok { - fndgLog.Errorf("Waiting for funding" + - "confirmation failed") - return - } - // Fallthrough. } // Success, funding transaction was confirmed. @@ -1609,15 +1607,9 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { f.wg.Add(1) go func() { defer f.wg.Done() - confChan := make(chan *lnwire.ShortChannelID) - timeoutChan := make(chan struct{}) - go f.waitForFundingWithTimeout(completeChan, confChan, - timeoutChan) - var shortChanID *lnwire.ShortChannelID - var ok bool - select { - case <-timeoutChan: + shortChanID, err := f.waitForFundingWithTimeout(completeChan) + if err == ErrConfirmationTimeout { // We did not see the funding confirmation before // timeout, so we forget the channel. err := fmt.Errorf("timeout waiting for funding tx "+ @@ -1626,17 +1618,9 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { f.failFundingFlow(fmsg.peer, pendingChanID, err) deleteFromDatabase() return - case <-f.quit: - // The fundingManager is shutting down, will resume - // wait for funding transaction on startup. + } else if err != nil { + fndgLog.Errorf(err.Error()) return - case shortChanID, ok = <-confChan: - if !ok { - fndgLog.Errorf("waiting for funding confirmation" + - " failed") - return - } - // Fallthrough. } // Success, funding transaction was confirmed. @@ -1781,28 +1765,15 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { f.wg.Add(1) go func() { defer f.wg.Done() - confChan := make(chan *lnwire.ShortChannelID) - cancelChan := make(chan struct{}) - // In case the fundingManager is stopped at some point during - // the remaining part of the opening process, we must wait for - // this process to finish (either successfully or with some - // error), before the fundingManager can be shut down. - f.wg.Add(1) - go f.waitForFundingConfirmation(completeChan, cancelChan, - confChan) - - var shortChanID *lnwire.ShortChannelID - var ok bool - select { - case <-f.quit: + shortChanID, err := f.waitForFundingWithTimeout(completeChan) + if err != nil { + // Since we are the channel initiator, we don't expect + // to get ErrConfirmationTimeout. + fndgLog.Errorf("Failed waiting for funding "+ + "confirmation for ChannelPoint(%v): %v", + completeChan.FundingOutpoint, err) return - case shortChanID, ok = <-confChan: - if !ok { - fndgLog.Errorf("waiting for funding " + - "confirmation failed") - return - } } err = f.handleFundingConfirmation(completeChan, *shortChanID) @@ -1821,82 +1792,50 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { }() } -// waitForFundingWithTimeout is a wrapper around waitForFundingConfirmation that -// will cancel the wait for confirmation if we are not the channel initiator and -// the maxWaitNumBlocksFundingConf has passed from bestHeight. -// In the case of timeout, the timeoutChan will be closed. In case of error, -// confChan will be closed. In case of success, a *lnwire.ShortChannelID will be -// passed to confChan. -func (f *fundingManager) waitForFundingWithTimeout(completeChan *channeldb.OpenChannel, - confChan chan<- *lnwire.ShortChannelID, timeoutChan chan<- struct{}) { +// waitForFundingWithTimeout is a wrapper around waitForFundingConfirmation and +// waitForTimeout that will return ErrConfirmationTimeout if we are not the +// channel initiator and the maxWaitNumBlocksFundingConf has passed from the +// funding broadcast height. In case of confirmation, the short channel ID of +// the channel will be returned. +func (f *fundingManager) waitForFundingWithTimeout( + ch *channeldb.OpenChannel) (*lnwire.ShortChannelID, error) { - epochClient, err := f.cfg.Notifier.RegisterBlockEpochNtfn(nil) - if err != nil { - fndgLog.Errorf("unable to register for epoch notification: %v", - err) - close(confChan) - return - } - - defer epochClient.Cancel() - - waitingConfChan := make(chan *lnwire.ShortChannelID) + confChan := make(chan *lnwire.ShortChannelID) + timeoutChan := make(chan error, 1) cancelChan := make(chan struct{}) - // Add this goroutine to wait group so we can be sure that it is - // properly stopped before the funding manager can be shut down. f.wg.Add(1) - go f.waitForFundingConfirmation(completeChan, cancelChan, - waitingConfChan) + go f.waitForFundingConfirmation(ch, cancelChan, confChan) - // On block maxHeight we will cancel the funding confirmation wait. - maxHeight := completeChan.FundingBroadcastHeight + maxWaitNumBlocksFundingConf - for { - select { - case epoch, ok := <-epochClient.Epochs: - if !ok { - fndgLog.Warnf("Epoch client shutting down") - return - } + // If we are not the initiator, we have no money at stake and will + // timeout waiting for the funding transaction to confirm after a + // while. + if !ch.IsInitiator { + f.wg.Add(1) + go f.waitForTimeout(ch, cancelChan, timeoutChan) + } + defer close(cancelChan) - // If we are not the channel initiator it's safe - // to timeout the channel - if uint32(epoch.Height) >= maxHeight && !completeChan.IsInitiator { - fndgLog.Warnf("waited for %v blocks without "+ - "seeing funding transaction confirmed,"+ - " cancelling.", maxWaitNumBlocksFundingConf) - - // Cancel the waitForFundingConfirmation - // goroutine. - close(cancelChan) - - // Notify the caller of the timeout. - close(timeoutChan) - return - } - - // TODO: If we are the channel initiator implement - // a method for recovering the funds from the funding - // transaction - - case <-f.quit: - // The fundingManager is shutting down, will resume - // waiting for the funding transaction on startup. - return - case shortChanID, ok := <-waitingConfChan: - if !ok { - // Failed waiting for confirmation, close - // confChan to indicate failure. - close(confChan) - return - } - - select { - case confChan <- shortChanID: - case <-f.quit: - return - } + var shortChanID *lnwire.ShortChannelID + var ok bool + select { + case err := <-timeoutChan: + if err != nil { + return nil, err } + return nil, ErrConfirmationTimeout + + case <-f.quit: + // The fundingManager is shutting down, and will resume wait on + // startup. + return nil, ErrFundingManagerShuttingDown + + case shortChanID, ok = <-confChan: + if !ok { + return nil, fmt.Errorf("waiting for funding" + + "confirmation failed") + } + return shortChanID, nil } } From 30614d384092d1365d1639d6de021560a0e9be5d Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Sun, 16 Sep 2018 10:40:10 +0200 Subject: [PATCH 15/21] funding tests: increase epoch channel buffer Since the initiator no longer registers for block epochs, we increase the buffer on the mocked channel to not block during tests. --- fundingmanager_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/fundingmanager_test.go b/fundingmanager_test.go index 90779918c..6164e9559 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -243,7 +243,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, chainNotifier := &mockNotifier{ oneConfChannel: make(chan *chainntnfs.TxConfirmation, 1), sixConfChannel: make(chan *chainntnfs.TxConfirmation, 1), - epochChan: make(chan *chainntnfs.BlockEpoch, 1), + epochChan: make(chan *chainntnfs.BlockEpoch, 2), } sentMessages := make(chan lnwire.Message) @@ -1741,7 +1741,7 @@ func TestFundingManagerFundingNotTimeoutInitiator(t *testing.T) { t.Fatalf("alice did not publish funding tx") } - // Increase the height to 1 minus the maxWaitNumBlocksFundingConf height + // Increase the height to 1 minus the maxWaitNumBlocksFundingConf height. alice.mockNotifier.epochChan <- &chainntnfs.BlockEpoch{ Height: fundingBroadcastHeight + maxWaitNumBlocksFundingConf - 1, } @@ -1750,12 +1750,12 @@ func TestFundingManagerFundingNotTimeoutInitiator(t *testing.T) { Height: fundingBroadcastHeight + maxWaitNumBlocksFundingConf - 1, } - // Assert both and Alice and Bob still have 1 pending channels + // Assert both and Alice and Bob still have 1 pending channels. assertNumPendingChannelsRemains(t, alice, 1) assertNumPendingChannelsRemains(t, bob, 1) - // Increase both Alice and Bob to maxWaitNumBlocksFundingConf height + // Increase both Alice and Bob to maxWaitNumBlocksFundingConf height. alice.mockNotifier.epochChan <- &chainntnfs.BlockEpoch{ Height: fundingBroadcastHeight + maxWaitNumBlocksFundingConf, } @@ -1764,13 +1764,13 @@ func TestFundingManagerFundingNotTimeoutInitiator(t *testing.T) { Height: fundingBroadcastHeight + maxWaitNumBlocksFundingConf, } - // Since Alice was the initiator, the channel should not have timed out + // Since Alice was the initiator, the channel should not have timed out. assertNumPendingChannelsRemains(t, alice, 1) // Bob should have sent an Error message to Alice. assertErrorSent(t, bob.msgChan) - // Since Bob was not the initiator, the channel should timeout + // Since Bob was not the initiator, the channel should timeout. assertNumPendingChannelsBecomes(t, bob, 0) } From 8a61af6a55b1d77b0f5285932505bbf161c2a63e Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Sun, 16 Sep 2018 10:40:10 +0200 Subject: [PATCH 16/21] fundingmanager: make advanceFundingState handle pending channels This commit makes advanceFundingState check whether a channel is still pending before checking the channel opening state. This lets us call it directly, without checking whether a channel has confirmed first. --- fundingmanager.go | 226 +++++++++++++++++++++------------------------- 1 file changed, 104 insertions(+), 122 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index 14a883429..0f037ec06 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -526,60 +526,7 @@ func (f *fundingManager) start() error { } f.wg.Add(1) - go func(ch *channeldb.OpenChannel) { - defer f.wg.Done() - - shortChanID, err := f.waitForFundingWithTimeout(ch) - if err == ErrConfirmationTimeout { - fndgLog.Warnf("Timeout waiting for funding "+ - "confirmation of ChannelPoint(%v)", - ch.FundingOutpoint) - - // We'll get a timeout if the number of blocks - // mined since the channel was initiated - // reaches maxWaitNumBlocksFundingConf and we - // are not the channel initiator. - localBalance := ch.LocalCommitment.LocalBalance.ToSatoshis() - closeInfo := &channeldb.ChannelCloseSummary{ - ChainHash: ch.ChainHash, - ChanPoint: ch.FundingOutpoint, - RemotePub: ch.IdentityPub, - Capacity: ch.Capacity, - SettledBalance: localBalance, - CloseType: channeldb.FundingCanceled, - RemoteCurrentRevocation: ch.RemoteCurrentRevocation, - RemoteNextRevocation: ch.RemoteNextRevocation, - LocalChanConfig: ch.LocalChanCfg, - } - - if err := ch.CloseChannel(closeInfo); err != nil { - fndgLog.Errorf("Failed closing channel "+ - "%v: %v", ch.FundingOutpoint, err) - return - } - return - } else if err != nil { - fndgLog.Errorf("Failed waiting for funding "+ - "confirmation for ChannelPoint(%v): %v", - ch.FundingOutpoint, err) - return - } - - // Success, funding transaction was confirmed. - fndgLog.Debugf("ChannelID(%v) is now fully confirmed! "+ - "(shortChanID=%v)", chanID, shortChanID) - - err = f.handleFundingConfirmation(ch, *shortChanID) - if err != nil { - fndgLog.Errorf("unable to handle funding "+ - "confirmation for ChannelPoint(%v): %v", - ch.FundingOutpoint, err) - return - } - - f.wg.Add(1) - go f.advanceFundingState(ch, nil) - }(channel) + go f.advanceFundingState(channel, chanID, nil) } // Fetch all our open channels, and make sure they all finalized the @@ -593,8 +540,10 @@ func (f *fundingManager) start() error { } for _, channel := range openChannels { + chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint) + f.wg.Add(1) - go f.advanceFundingState(channel, nil) + go f.advanceFundingState(channel, chanID, nil) } f.wg.Add(1) // TODO(roasbeef): tune @@ -819,18 +768,31 @@ func (f *fundingManager) reservationCoordinator() { } } -// advanceFundingState will advance the channel fromt the markedOpen state to -// the point where the channel is ready for operation. This includes sending -// funding locked to the peer, adding the channel to the router graph, and -// announcing the channel. The updateChan can be set non-nil to get -// OpenStatusUpdates. +// advanceFundingState will advance the channel through the steps after the +// funding transaction is broadcasted, up until the point where the channel is +// ready for operation. This includes waiting for the funding transaction to +// confirm, sending funding locked to the peer, adding the channel to the +// router graph, and announcing the channel. The updateChan can be set non-nil +// to get OpenStatusUpdates. // // NOTE: This MUST be run as a goroutine. func (f *fundingManager) advanceFundingState(channel *channeldb.OpenChannel, - updateChan chan<- *lnrpc.OpenStatusUpdate) { + pendingChanID [32]byte, updateChan chan<- *lnrpc.OpenStatusUpdate) { defer f.wg.Done() + // If the channel is still pending we must wait for the funding + // transaction to confirm. + if channel.IsPending { + err := f.advancePendingChannelState(channel, pendingChanID) + if err != nil { + fndgLog.Errorf("Unable to advance pending state of "+ + "ChannelPoint(%v): %v", + channel.FundingOutpoint, err) + return + } + } + // We create the state-machine object which wraps the database state. lnChannel, err := lnwallet.NewLightningChannel( nil, channel, nil, @@ -1014,6 +976,85 @@ func (f *fundingManager) stateStep(channel *channeldb.OpenChannel, return fmt.Errorf("undefined channelState: %v", channelState) } +// advancePendingChannelState waits for a pending channel's funding tx to +// confirm, and marks it open in the database when that happens. +func (f *fundingManager) advancePendingChannelState( + channel *channeldb.OpenChannel, pendingChanID [32]byte) error { + + shortChanID, err := f.waitForFundingWithTimeout(channel) + if err == ErrConfirmationTimeout { + // We'll get a timeout if the number of blocks mined + // since the channel was initiated reaches + // maxWaitNumBlocksFundingConf and we are not the + // channel initiator. + ch := channel + localBalance := ch.LocalCommitment.LocalBalance.ToSatoshis() + closeInfo := &channeldb.ChannelCloseSummary{ + ChainHash: ch.ChainHash, + ChanPoint: ch.FundingOutpoint, + RemotePub: ch.IdentityPub, + Capacity: ch.Capacity, + SettledBalance: localBalance, + CloseType: channeldb.FundingCanceled, + RemoteCurrentRevocation: ch.RemoteCurrentRevocation, + RemoteNextRevocation: ch.RemoteNextRevocation, + LocalChanConfig: ch.LocalChanCfg, + } + + if err := ch.CloseChannel(closeInfo); err != nil { + return fmt.Errorf("failed closing channel "+ + "%v: %v", ch.FundingOutpoint, err) + } + + timeoutErr := fmt.Errorf("timeout waiting for funding tx "+ + "(%v) to confirm", channel.FundingOutpoint) + + // When the peer comes online, we'll notify it that we + // are now considering the channel flow canceled. + f.wg.Add(1) + go func() { + defer f.wg.Done() + + peerChan := make(chan lnpeer.Peer, 1) + var peerKey [33]byte + copy(peerKey[:], ch.IdentityPub.SerializeCompressed()) + + f.cfg.NotifyWhenOnline(peerKey, peerChan) + + var peer lnpeer.Peer + select { + case peer = <-peerChan: + case <-f.quit: + return + } + // TODO(halseth): should this send be made + // reliable? + f.failFundingFlow(peer, pendingChanID, timeoutErr) + }() + + return timeoutErr + + } else if err != nil { + return fmt.Errorf("error waiting for funding "+ + "confirmation for ChannelPoint(%v): %v", + channel.FundingOutpoint, err) + } + + // Success, funding transaction was confirmed. + chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint) + fndgLog.Debugf("ChannelID(%v) is now fully confirmed! "+ + "(shortChanID=%v)", chanID, shortChanID) + + err = f.handleFundingConfirmation(channel, *shortChanID) + if err != nil { + return fmt.Errorf("unable to handle funding "+ + "confirmation for ChannelPoint(%v): %v", + channel.FundingOutpoint, err) + } + + return nil +} + // handlePendingChannels responds to a request for details concerning all // currently pending channels waiting for the final phase of the funding // workflow (funding txn confirmation). @@ -1605,40 +1646,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { // transaction in 288 blocks (~ 48 hrs), by canceling the reservation // and canceling the wait for the funding confirmation. f.wg.Add(1) - go func() { - defer f.wg.Done() - - shortChanID, err := f.waitForFundingWithTimeout(completeChan) - if err == ErrConfirmationTimeout { - // We did not see the funding confirmation before - // timeout, so we forget the channel. - err := fmt.Errorf("timeout waiting for funding tx "+ - "(%v) to confirm", completeChan.FundingOutpoint) - fndgLog.Warnf(err.Error()) - f.failFundingFlow(fmsg.peer, pendingChanID, err) - deleteFromDatabase() - return - } else if err != nil { - fndgLog.Errorf(err.Error()) - return - } - - // Success, funding transaction was confirmed. - fndgLog.Debugf("ChannelID(%v) is now fully confirmed! "+ - "(shortChanID=%v)", channelID, shortChanID) - - err = f.handleFundingConfirmation(completeChan, *shortChanID) - if err != nil { - fndgLog.Errorf("unable to handle funding "+ - "confirmation for ChannelPoint(%v): %v", - completeChan.FundingOutpoint, err) - return - } - - f.wg.Add(1) - go f.advanceFundingState(completeChan, nil) - - }() + go f.advanceFundingState(completeChan, pendingChanID, nil) } // processFundingSigned sends a single funding sign complete message along with @@ -1763,33 +1771,7 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { // At this point we have broadcast the funding transaction and done all // necessary processing. f.wg.Add(1) - go func() { - defer f.wg.Done() - - shortChanID, err := f.waitForFundingWithTimeout(completeChan) - if err != nil { - // Since we are the channel initiator, we don't expect - // to get ErrConfirmationTimeout. - fndgLog.Errorf("Failed waiting for funding "+ - "confirmation for ChannelPoint(%v): %v", - completeChan.FundingOutpoint, err) - return - } - - err = f.handleFundingConfirmation(completeChan, *shortChanID) - if err != nil { - fndgLog.Errorf("unable to handle funding confirmation "+ - "for ChannelPoint(%v): %v", - completeChan.FundingOutpoint, err) - return - } - - fndgLog.Debugf("Channel with ShortChanID %v now confirmed", - shortChanID.ToUint64()) - - f.wg.Add(1) - go f.advanceFundingState(completeChan, resCtx.updates) - }() + go f.advanceFundingState(completeChan, pendingChanID, resCtx.updates) } // waitForFundingWithTimeout is a wrapper around waitForFundingConfirmation and From 88f5e06427ef390135a78bb8ee1e97987dc76735 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Sun, 16 Sep 2018 10:40:11 +0200 Subject: [PATCH 17/21] fundingmanager: unify handling of pending and non-pending channels at startup Since the advanceFundingState now can handle pending channels, we'll call it for both pending and non-pending channels, just making sure that we re-initialize the channel barriers and re-publish the funding tx fro pending channels. --- fundingmanager.go | 80 ++++++++++++++++++++++------------------------- 1 file changed, 38 insertions(+), 42 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index 0f037ec06..c19dbe265 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -489,59 +489,55 @@ func (f *fundingManager) start() error { // down. // TODO(roasbeef): store height that funding finished? // * would then replace call below - pendingChannels, err := f.cfg.Wallet.Cfg.Database.FetchPendingChannels() + allChannels, err := f.cfg.Wallet.Cfg.Database.FetchAllChannels() if err != nil { return err } - // For any channels that were in a pending state when the daemon was - // last connected, the Funding Manager will re-initialize the channel - // barriers and will also launch waitForFundingConfirmation to wait for - // the channel's funding transaction to be confirmed on the blockchain. - for _, channel := range pendingChannels { - f.barrierMtx.Lock() - fndgLog.Tracef("Loading pending ChannelPoint(%v), creating chan "+ - "barrier", channel.FundingOutpoint) + for _, channel := range allChannels { chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint) - f.newChanBarriers[chanID] = make(chan struct{}) - f.barrierMtx.Unlock() - f.localDiscoverySignals[chanID] = make(chan struct{}) + // For any channels that were in a pending state when the + // daemon was last connected, the Funding Manager will + // re-initialize the channel barriers, and republish the + // funding transaction if we're the initiator. + if channel.IsPending { + f.barrierMtx.Lock() + fndgLog.Tracef("Loading pending ChannelPoint(%v), "+ + "creating chan barrier", + channel.FundingOutpoint) - // Rebroadcast the funding transaction for any pending channel - // that we initiated. If this operation fails due to a reported - // double spend, we treat this as an indicator that we have - // already broadcast this transaction. Otherwise, we simply log - // the error as there isn't anything we can currently do to - // recover. - if channel.ChanType == channeldb.SingleFunder && - channel.IsInitiator { + f.newChanBarriers[chanID] = make(chan struct{}) + f.barrierMtx.Unlock() - err := f.cfg.PublishTransaction(channel.FundingTxn) - if err != nil { - fndgLog.Errorf("Unable to rebroadcast funding "+ - "tx for ChannelPoint(%v): %v", - channel.FundingOutpoint, err) + f.localDiscoverySignals[chanID] = make(chan struct{}) + + // Rebroadcast the funding transaction for any pending + // channel that we initiated. No error will be returned + // if the transaction already has been broadcasted. + if channel.ChanType == channeldb.SingleFunder && + channel.IsInitiator { + + err := f.cfg.PublishTransaction( + channel.FundingTxn, + ) + if err != nil { + fndgLog.Errorf("Unable to rebroadcast "+ + "funding tx for "+ + "ChannelPoint(%v): %v", + channel.FundingOutpoint, err) + } } } - f.wg.Add(1) - go f.advanceFundingState(channel, chanID, nil) - } - - // Fetch all our open channels, and make sure they all finalized the - // opening process. - // TODO(halseth): this check is only done on restart atm, but should - // also be done if a peer that disappeared during the opening process - // reconnects. - openChannels, err := f.cfg.Wallet.Cfg.Database.FetchAllChannels() - if err != nil { - return err - } - - for _, channel := range openChannels { - chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint) - + // We will restart the funding state machine for all channels, + // which will wait for the channel's funding transaction to be + // confirmed on the blockchain, and retransmit the messages + // necessary for the channel to be operational. + // TODO(halseth): retransmission of messages to make the + // channel operational is only done on restart atm, but should + // also be done if a peer that disappeared during the opening + // process reconnects. f.wg.Add(1) go f.advanceFundingState(channel, chanID, nil) } From bda0e40dadaf0934006734bd14914ec66cc78591 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Sun, 16 Sep 2018 10:40:11 +0200 Subject: [PATCH 18/21] fundingmanager: save markedOpen before marking the channel open This commit fixes a potential issue within the fundingmanager, where failing to write the channel opening state could cause the channel being marked open in the DB, but the opening state not being set. On startup this would cause the channel state machine to not be able to resume. We fix this by saving the channel opening state _first_. This works because saving the opening state is idempotent, and in case a channel is found pending at startup, it will re-register for confirmation notifications and re-do the process. --- fundingmanager.go | 36 +++++++++++++++++------------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index c19dbe265..37f5563d7 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -2005,8 +2005,23 @@ func (f *fundingManager) handleFundingConfirmation( fundingPoint := completeChan.FundingOutpoint chanID := lnwire.NewChanIDFromOutPoint(&fundingPoint) - // Now that the channel has been fully confirmed, we'll mark it as open - // within the database. + // TODO(roasbeef): ideally persistent state update for chan above + // should be abstracted + + // The funding transaction now being confirmed, we add this channel to + // the fundingManager's internal persistent state machine that we use + // to track the remaining process of the channel opening. This is + // useful to resume the opening process in case of restarts. We set the + // opening state before we mark the channel opened in the database, + // such that we can receover from one of the db writes failing. + err := f.saveChannelOpeningState(&fundingPoint, markedOpen, &shortChanID) + if err != nil { + return fmt.Errorf("error setting channel state to markedOpen: %v", + err) + } + + // Now that the channel has been fully confirmed and we successfully + // saved the opening state, we'll mark it as open within the database. if err := completeChan.MarkAsOpen(shortChanID); err != nil { return fmt.Errorf("error setting channel pending flag to false: "+ "%v", err) @@ -2016,23 +2031,6 @@ func (f *fundingManager) handleFundingConfirmation( // pending open to open. f.cfg.NotifyOpenChannelEvent(completeChan.FundingOutpoint) - // TODO(roasbeef): ideally persistent state update for chan above - // should be abstracted - - // The funding transaction now being confirmed, we add this channel to - // the fundingManager's internal persistent state machine that we use - // to track the remaining process of the channel opening. This is - // useful to resume the opening process in case of restarts. - // - // TODO(halseth): make the two db transactions (MarkChannelAsOpen and - // saveChannelOpeningState) atomic by doing them in the same transaction. - // Needed to be properly fault-tolerant. - err := f.saveChannelOpeningState(&fundingPoint, markedOpen, &shortChanID) - if err != nil { - return fmt.Errorf("error setting channel state to markedOpen: %v", - err) - } - // As there might already be an active link in the switch with an // outdated short chan ID, we'll instruct the switch to load the updated // short chan id from disk. From 9ff6de5be1e11a397c8d3e60d8d5ad656f611b09 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Wed, 7 Aug 2019 15:38:44 +0200 Subject: [PATCH 19/21] funding: add TODO for consistency check --- fundingmanager.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/fundingmanager.go b/fundingmanager.go index 37f5563d7..33c04eb39 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -807,6 +807,12 @@ func (f *fundingManager) advanceFundingState(channel *channeldb.OpenChannel, // Channel not in fundingManager's opening database, // meaning it was successfully announced to the // network. + // TODO(halseth): could do graph consistency check + // here, and re-add the edge if missing. + fndgLog.Debugf("ChannlPoint(%v) with chanID=%v not "+ + "found in opening database, assuming already "+ + "announced to the network", + channel.FundingOutpoint, pendingChanID) return } else if err != nil { fndgLog.Errorf("Unable to query database for "+ From 5b8e97da293762591da0e534be13cc59a0472066 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Thu, 12 Sep 2019 15:05:52 +0200 Subject: [PATCH 20/21] funding: update comment aboute offline peers Since fundingLocked now are being sent reliably, the only message left to be sent reliably is the node announcement for private channels. --- fundingmanager.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index 33c04eb39..1f9ada2db 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -532,12 +532,8 @@ func (f *fundingManager) start() error { // We will restart the funding state machine for all channels, // which will wait for the channel's funding transaction to be - // confirmed on the blockchain, and retransmit the messages + // confirmed on the blockchain, and transmit the messages // necessary for the channel to be operational. - // TODO(halseth): retransmission of messages to make the - // channel operational is only done on restart atm, but should - // also be done if a peer that disappeared during the opening - // process reconnects. f.wg.Add(1) go f.advanceFundingState(channel, chanID, nil) } @@ -2244,6 +2240,9 @@ func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel, fndgLog.Debugf("Sending our NodeAnnouncement for "+ "ChannelID(%v) to %x", chanID, pubKey) + // TODO(halseth): make reliable. If the peer is not online this + // will fail, and the opening process will stop. Should instead + // block here, waiting for the peer to come online. if err := peer.SendMessage(true, &nodeAnn); err != nil { return fmt.Errorf("unable to send node announcement "+ "to peer %x: %v", pubKey, err) From 9f3fbda71f576cc05f04fca6a9987b2cebc16476 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Wed, 18 Sep 2019 11:27:34 +0200 Subject: [PATCH 21/21] fundingmanager: remove unnecessary Peer param to sendFundingLocked --- fundingmanager.go | 48 ++++++++++++++--------------------------------- 1 file changed, 14 insertions(+), 34 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index 1f9ada2db..0c858213b 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -851,23 +851,7 @@ func (f *fundingManager) stateStep(channel *channeldb.OpenChannel, // The funding transaction was confirmed, but we did not successfully // send the fundingLocked message to the peer, so let's do that now. case markedOpen: - peerChan := make(chan lnpeer.Peer, 1) - - var peerKey [33]byte - copy(peerKey[:], channel.IdentityPub.SerializeCompressed()) - - f.cfg.NotifyWhenOnline(peerKey, peerChan) - - var peer lnpeer.Peer - select { - case peer = <-peerChan: - case <-f.quit: - return ErrFundingManagerShuttingDown - } - - err := f.sendFundingLocked( - peer, channel, lnChannel, shortChanID, - ) + err := f.sendFundingLocked(channel, lnChannel, shortChanID) if err != nil { return fmt.Errorf("failed sending fundingLocked: %v", err) @@ -2057,7 +2041,7 @@ func (f *fundingManager) handleFundingConfirmation( // sendFundingLocked creates and sends the fundingLocked message. // This should be called after the funding transaction has been confirmed, // and the channelState is 'markedOpen'. -func (f *fundingManager) sendFundingLocked(peer lnpeer.Peer, +func (f *fundingManager) sendFundingLocked( completeChan *channeldb.OpenChannel, channel *lnwallet.LightningChannel, shortChanID *lnwire.ShortChannelID) error { @@ -2088,8 +2072,18 @@ func (f *fundingManager) sendFundingLocked(peer lnpeer.Peer, // send fundingLocked until we succeed, or the fundingManager is shut // down. for { - fndgLog.Debugf("Sending FundingLocked for ChannelID(%v) to "+ - "peer %x", chanID, peerKey) + connected := make(chan lnpeer.Peer, 1) + f.cfg.NotifyWhenOnline(peerKey, connected) + + var peer lnpeer.Peer + select { + case peer = <-connected: + case <-f.quit: + return ErrFundingManagerShuttingDown + } + + fndgLog.Infof("Peer(%x) is online, sending FundingLocked "+ + "for ChannelID(%v)", peerKey, chanID) if err := peer.SendMessage(false, fundingLockedMsg); err == nil { // Sending succeeded, we can break out and continue the @@ -2099,20 +2093,6 @@ func (f *fundingManager) sendFundingLocked(peer lnpeer.Peer, fndgLog.Warnf("Unable to send fundingLocked to peer %x: %v. "+ "Will retry when online", peerKey, err) - - connected := make(chan lnpeer.Peer, 1) - f.cfg.NotifyWhenOnline(peerKey, connected) - - select { - case <-connected: - fndgLog.Infof("Peer(%x) came back online, will retry "+ - "sending FundingLocked for ChannelID(%v)", - peerKey, chanID) - - // Retry sending. - case <-f.quit: - return ErrFundingManagerShuttingDown - } } return nil