diff --git a/fundingmanager.go b/fundingmanager.go index f2a321eff..05acb70a7 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -86,6 +86,12 @@ var ( // been signaled to shut down. ErrFundingManagerShuttingDown = errors.New("funding manager shutting " + "down") + + // ErrConfirmationTimeout is an error returned when we as a responder + // are waiting for a funding transaction to confirm, but too many + // blocks pass without confirmation. + ErrConfirmationTimeout = errors.New("timeout waiting for funding " + + "confirmation") ) // reservationWithCtx encapsulates a pending channel reservation. This wrapper @@ -489,230 +495,53 @@ func (f *fundingManager) start() error { // down. // TODO(roasbeef): store height that funding finished? // * would then replace call below - pendingChannels, err := f.cfg.Wallet.Cfg.Database.FetchPendingChannels() + allChannels, err := f.cfg.Wallet.Cfg.Database.FetchAllChannels() if err != nil { return err } - // For any channels that were in a pending state when the daemon was - // last connected, the Funding Manager will re-initialize the channel - // barriers and will also launch waitForFundingConfirmation to wait for - // the channel's funding transaction to be confirmed on the blockchain. - for _, channel := range pendingChannels { - f.barrierMtx.Lock() - fndgLog.Tracef("Loading pending ChannelPoint(%v), creating chan "+ - "barrier", channel.FundingOutpoint) + for _, channel := range allChannels { chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint) - f.newChanBarriers[chanID] = make(chan struct{}) - f.barrierMtx.Unlock() - - f.localDiscoverySignals[chanID] = make(chan struct{}) - - // Rebroadcast the funding transaction for any pending channel - // that we initiated. If this operation fails due to a reported - // double spend, we treat this as an indicator that we have - // already broadcast this transaction. Otherwise, we simply log - // the error as there isn't anything we can currently do to - // recover. - if channel.ChanType == channeldb.SingleFunder && - channel.IsInitiator { - - err := f.cfg.PublishTransaction(channel.FundingTxn) - if err != nil { - fndgLog.Errorf("Unable to rebroadcast funding "+ - "tx for ChannelPoint(%v): %v", - channel.FundingOutpoint, err) - } - } - - confChan := make(chan *lnwire.ShortChannelID) - timeoutChan := make(chan struct{}) - - go func(ch *channeldb.OpenChannel) { - go f.waitForFundingWithTimeout(ch, confChan, timeoutChan) - - select { - case <-timeoutChan: - // Timeout channel will be triggered if the number of blocks - // mined since the channel was initiated reaches - // maxWaitNumBlocksFundingConf and we are not the channel - // initiator. - localBalance := ch.LocalCommitment.LocalBalance.ToSatoshis() - closeInfo := &channeldb.ChannelCloseSummary{ - ChainHash: ch.ChainHash, - ChanPoint: ch.FundingOutpoint, - RemotePub: ch.IdentityPub, - Capacity: ch.Capacity, - SettledBalance: localBalance, - CloseType: channeldb.FundingCanceled, - RemoteCurrentRevocation: ch.RemoteCurrentRevocation, - RemoteNextRevocation: ch.RemoteNextRevocation, - LocalChanConfig: ch.LocalChanCfg, - } - - if err := ch.CloseChannel(closeInfo); err != nil { - fndgLog.Errorf("Failed closing channel "+ - "%v: %v", ch.FundingOutpoint, err) - } - - case <-f.quit: - // The fundingManager is shutting down, and will - // resume wait on startup. - case shortChanID, ok := <-confChan: - if !ok { - fndgLog.Errorf("Waiting for funding" + - "confirmation failed") - return - } - - // The funding transaction has confirmed, so - // we'll attempt to retrieve the remote peer - // to complete the rest of the funding flow. - peerChan := make(chan lnpeer.Peer, 1) - - var peerKey [33]byte - copy(peerKey[:], ch.IdentityPub.SerializeCompressed()) - - f.cfg.NotifyWhenOnline(peerKey, peerChan) - - var peer lnpeer.Peer - select { - case peer = <-peerChan: - case <-f.quit: - return - } - err := f.handleFundingConfirmation( - peer, ch, shortChanID, - ) - if err != nil { - fndgLog.Errorf("Failed to handle "+ - "funding confirmation: %v", err) - return - } - } - }(channel) - } - - // Fetch all our open channels, and make sure they all finalized the - // opening process. - // TODO(halseth): this check is only done on restart atm, but should - // also be done if a peer that disappeared during the opening process - // reconnects. - openChannels, err := f.cfg.Wallet.Cfg.Database.FetchAllChannels() - if err != nil { - return err - } - - for _, channel := range openChannels { - channelState, shortChanID, err := f.getChannelOpeningState( - &channel.FundingOutpoint) - if err == ErrChannelNotFound { - // Channel not in fundingManager's opening database, - // meaning it was successfully announced to the - // network. - continue - } else if err != nil { - return err - } - - chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint) - fndgLog.Debugf("channel (%v) with opening state %v found", - chanID, channelState) + // For any channels that were in a pending state when the + // daemon was last connected, the Funding Manager will + // re-initialize the channel barriers, and republish the + // funding transaction if we're the initiator. if channel.IsPending { - // Set up the channel barriers again, to make sure - // waitUntilChannelOpen correctly waits until the - // opening process is completely over. f.barrierMtx.Lock() fndgLog.Tracef("Loading pending ChannelPoint(%v), "+ - "creating chan barrier", channel.FundingOutpoint) + "creating chan barrier", + channel.FundingOutpoint) + f.newChanBarriers[chanID] = make(chan struct{}) f.barrierMtx.Unlock() - } - // If we did find the channel in the opening state database, we - // have seen the funding transaction being confirmed, but we - // did not finish the rest of the setup procedure before we shut - // down. We handle the remaining steps of this setup by - // continuing the procedure where we left off. - switch channelState { - case markedOpen: - // The funding transaction was confirmed, but we did not - // successfully send the fundingLocked message to the - // peer, so let's do that now. - f.wg.Add(1) - go func(dbChan *channeldb.OpenChannel) { - defer f.wg.Done() + f.localDiscoverySignals[chanID] = make(chan struct{}) - peerChan := make(chan lnpeer.Peer, 1) + // Rebroadcast the funding transaction for any pending + // channel that we initiated. No error will be returned + // if the transaction already has been broadcasted. + if channel.ChanType == channeldb.SingleFunder && + channel.IsInitiator { - var peerKey [33]byte - copy(peerKey[:], dbChan.IdentityPub.SerializeCompressed()) - - f.cfg.NotifyWhenOnline(peerKey, peerChan) - - var peer lnpeer.Peer - select { - case peer = <-peerChan: - case <-f.quit: - return - } - err := f.handleFundingConfirmation( - peer, dbChan, shortChanID, + err := f.cfg.PublishTransaction( + channel.FundingTxn, ) if err != nil { - fndgLog.Errorf("Failed to handle "+ - "funding confirmation: %v", err) - return + fndgLog.Errorf("Unable to rebroadcast "+ + "funding tx for "+ + "ChannelPoint(%v): %v", + channel.FundingOutpoint, err) } - }(channel) - - case fundingLockedSent: - // fundingLocked was sent to peer, but the channel - // was not added to the router graph and the channel - // announcement was not sent. - f.wg.Add(1) - go func(dbChan *channeldb.OpenChannel) { - defer f.wg.Done() - - err = f.addToRouterGraph(dbChan, shortChanID) - if err != nil { - fndgLog.Errorf("failed adding to "+ - "router graph: %v", err) - return - } - - // TODO(halseth): should create a state machine - // that can more easily be resumed from - // different states, to avoid this code - // duplication. - err = f.annAfterSixConfs(dbChan, shortChanID) - if err != nil { - fndgLog.Errorf("error sending channel "+ - "announcements: %v", err) - return - } - }(channel) - - case addedToRouterGraph: - // The channel was added to the Router's topology, but - // the channel announcement was not sent. - f.wg.Add(1) - go func(dbChan *channeldb.OpenChannel) { - defer f.wg.Done() - - err = f.annAfterSixConfs(dbChan, shortChanID) - if err != nil { - fndgLog.Errorf("error sending channel "+ - "announcement: %v", err) - return - } - }(channel) - - default: - fndgLog.Errorf("undefined channelState: %v", - channelState) + } } + + // We will restart the funding state machine for all channels, + // which will wait for the channel's funding transaction to be + // confirmed on the blockchain, and transmit the messages + // necessary for the channel to be operational. + f.wg.Add(1) + go f.advanceFundingState(channel, chanID, nil) } f.wg.Add(1) // TODO(roasbeef): tune @@ -937,6 +766,283 @@ func (f *fundingManager) reservationCoordinator() { } } +// advanceFundingState will advance the channel through the steps after the +// funding transaction is broadcasted, up until the point where the channel is +// ready for operation. This includes waiting for the funding transaction to +// confirm, sending funding locked to the peer, adding the channel to the +// router graph, and announcing the channel. The updateChan can be set non-nil +// to get OpenStatusUpdates. +// +// NOTE: This MUST be run as a goroutine. +func (f *fundingManager) advanceFundingState(channel *channeldb.OpenChannel, + pendingChanID [32]byte, updateChan chan<- *lnrpc.OpenStatusUpdate) { + + defer f.wg.Done() + + // If the channel is still pending we must wait for the funding + // transaction to confirm. + if channel.IsPending { + err := f.advancePendingChannelState(channel, pendingChanID) + if err != nil { + fndgLog.Errorf("Unable to advance pending state of "+ + "ChannelPoint(%v): %v", + channel.FundingOutpoint, err) + return + } + } + + // We create the state-machine object which wraps the database state. + lnChannel, err := lnwallet.NewLightningChannel( + nil, channel, nil, + ) + if err != nil { + fndgLog.Errorf("Unable to create LightningChannel(%v): %v", + channel.FundingOutpoint, err) + return + } + + for { + channelState, shortChanID, err := f.getChannelOpeningState( + &channel.FundingOutpoint, + ) + if err == ErrChannelNotFound { + // Channel not in fundingManager's opening database, + // meaning it was successfully announced to the + // network. + // TODO(halseth): could do graph consistency check + // here, and re-add the edge if missing. + fndgLog.Debugf("ChannlPoint(%v) with chanID=%v not "+ + "found in opening database, assuming already "+ + "announced to the network", + channel.FundingOutpoint, pendingChanID) + return + } else if err != nil { + fndgLog.Errorf("Unable to query database for "+ + "channel opening state(%v): %v", + channel.FundingOutpoint, err) + return + } + + // If we did find the channel in the opening state database, we + // have seen the funding transaction being confirmed, but there + // are still steps left of the setup procedure. We continue the + // procedure where we left off. + err = f.stateStep( + channel, lnChannel, shortChanID, channelState, + updateChan, + ) + if err != nil { + fndgLog.Errorf("Unable to advance state(%v): %v", + channel.FundingOutpoint, err) + return + } + } +} + +// stateStep advances the confirmed channel one step in the funding state +// machine. This method is synchronous and the new channel opening state will +// have been written to the database when it successfully returns. The +// updateChan can be set non-nil to get OpenStatusUpdates. +func (f *fundingManager) stateStep(channel *channeldb.OpenChannel, + lnChannel *lnwallet.LightningChannel, + shortChanID *lnwire.ShortChannelID, channelState channelOpeningState, + updateChan chan<- *lnrpc.OpenStatusUpdate) error { + + chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint) + fndgLog.Debugf("Channel(%v) with ShortChanID %v has opening state %v", + chanID, shortChanID, channelState) + + switch channelState { + + // The funding transaction was confirmed, but we did not successfully + // send the fundingLocked message to the peer, so let's do that now. + case markedOpen: + err := f.sendFundingLocked(channel, lnChannel, shortChanID) + if err != nil { + return fmt.Errorf("failed sending fundingLocked: %v", + err) + } + + // As the fundingLocked message is now sent to the peer, the + // channel is moved to the next state of the state machine. It + // will be moved to the last state (actually deleted from the + // database) after the channel is finally announced. + err = f.saveChannelOpeningState( + &channel.FundingOutpoint, fundingLockedSent, + shortChanID, + ) + if err != nil { + return fmt.Errorf("error setting channel state to"+ + " fundingLockedSent: %v", err) + } + + fndgLog.Debugf("Channel(%v) with ShortChanID %v: successfully "+ + "sent FundingLocked", chanID, shortChanID) + + return nil + + // fundingLocked was sent to peer, but the channel was not added to the + // router graph and the channel announcement was not sent. + case fundingLockedSent: + err := f.addToRouterGraph(channel, shortChanID) + if err != nil { + return fmt.Errorf("failed adding to "+ + "router graph: %v", err) + } + + // As the channel is now added to the ChannelRouter's topology, + // the channel is moved to the next state of the state machine. + // It will be moved to the last state (actually deleted from + // the database) after the channel is finally announced. + err = f.saveChannelOpeningState( + &channel.FundingOutpoint, addedToRouterGraph, + shortChanID, + ) + if err != nil { + return fmt.Errorf("error setting channel state to"+ + " addedToRouterGraph: %v", err) + } + + fndgLog.Debugf("Channel(%v) with ShortChanID %v: successfully "+ + "added to router graph", chanID, shortChanID) + + // Give the caller a final update notifying them that + // the channel is now open. + // TODO(roasbeef): only notify after recv of funding locked? + fundingPoint := channel.FundingOutpoint + cp := &lnrpc.ChannelPoint{ + FundingTxid: &lnrpc.ChannelPoint_FundingTxidBytes{ + FundingTxidBytes: fundingPoint.Hash[:], + }, + OutputIndex: fundingPoint.Index, + } + + if updateChan != nil { + upd := &lnrpc.OpenStatusUpdate{ + Update: &lnrpc.OpenStatusUpdate_ChanOpen{ + ChanOpen: &lnrpc.ChannelOpenUpdate{ + ChannelPoint: cp, + }, + }, + } + + select { + case updateChan <- upd: + case <-f.quit: + return ErrFundingManagerShuttingDown + } + } + + return nil + + // The channel was added to the Router's topology, but the channel + // announcement was not sent. + case addedToRouterGraph: + err := f.annAfterSixConfs(channel, shortChanID) + if err != nil { + return fmt.Errorf("error sending channel "+ + "announcement: %v", err) + } + + // We delete the channel opening state from our internal + // database as the opening process has succeeded. We can do + // this because we assume the AuthenticatedGossiper queues the + // announcement messages, and persists them in case of a daemon + // shutdown. + err = f.deleteChannelOpeningState(&channel.FundingOutpoint) + if err != nil { + return fmt.Errorf("error deleting channel state: %v", + err) + } + + fndgLog.Debugf("Channel(%v) with ShortChanID %v: successfully "+ + "announced", chanID, shortChanID) + + return nil + } + + return fmt.Errorf("undefined channelState: %v", channelState) +} + +// advancePendingChannelState waits for a pending channel's funding tx to +// confirm, and marks it open in the database when that happens. +func (f *fundingManager) advancePendingChannelState( + channel *channeldb.OpenChannel, pendingChanID [32]byte) error { + + shortChanID, err := f.waitForFundingWithTimeout(channel) + if err == ErrConfirmationTimeout { + // We'll get a timeout if the number of blocks mined + // since the channel was initiated reaches + // maxWaitNumBlocksFundingConf and we are not the + // channel initiator. + ch := channel + localBalance := ch.LocalCommitment.LocalBalance.ToSatoshis() + closeInfo := &channeldb.ChannelCloseSummary{ + ChainHash: ch.ChainHash, + ChanPoint: ch.FundingOutpoint, + RemotePub: ch.IdentityPub, + Capacity: ch.Capacity, + SettledBalance: localBalance, + CloseType: channeldb.FundingCanceled, + RemoteCurrentRevocation: ch.RemoteCurrentRevocation, + RemoteNextRevocation: ch.RemoteNextRevocation, + LocalChanConfig: ch.LocalChanCfg, + } + + if err := ch.CloseChannel(closeInfo); err != nil { + return fmt.Errorf("failed closing channel "+ + "%v: %v", ch.FundingOutpoint, err) + } + + timeoutErr := fmt.Errorf("timeout waiting for funding tx "+ + "(%v) to confirm", channel.FundingOutpoint) + + // When the peer comes online, we'll notify it that we + // are now considering the channel flow canceled. + f.wg.Add(1) + go func() { + defer f.wg.Done() + + peerChan := make(chan lnpeer.Peer, 1) + var peerKey [33]byte + copy(peerKey[:], ch.IdentityPub.SerializeCompressed()) + + f.cfg.NotifyWhenOnline(peerKey, peerChan) + + var peer lnpeer.Peer + select { + case peer = <-peerChan: + case <-f.quit: + return + } + // TODO(halseth): should this send be made + // reliable? + f.failFundingFlow(peer, pendingChanID, timeoutErr) + }() + + return timeoutErr + + } else if err != nil { + return fmt.Errorf("error waiting for funding "+ + "confirmation for ChannelPoint(%v): %v", + channel.FundingOutpoint, err) + } + + // Success, funding transaction was confirmed. + chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint) + fndgLog.Debugf("ChannelID(%v) is now fully confirmed! "+ + "(shortChanID=%v)", chanID, shortChanID) + + err = f.handleFundingConfirmation(channel, *shortChanID) + if err != nil { + return fmt.Errorf("unable to handle funding "+ + "confirmation for ChannelPoint(%v): %v", + channel.FundingOutpoint, err) + } + + return nil +} + // handlePendingChannels responds to a request for details concerning all // currently pending channels waiting for the final phase of the funding // workflow (funding txn confirmation). @@ -1544,48 +1650,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { // transaction in 288 blocks (~ 48 hrs), by canceling the reservation // and canceling the wait for the funding confirmation. f.wg.Add(1) - go func() { - defer f.wg.Done() - confChan := make(chan *lnwire.ShortChannelID) - timeoutChan := make(chan struct{}) - go f.waitForFundingWithTimeout(completeChan, confChan, - timeoutChan) - - var shortChanID *lnwire.ShortChannelID - var ok bool - select { - case <-timeoutChan: - // We did not see the funding confirmation before - // timeout, so we forget the channel. - err := fmt.Errorf("timeout waiting for funding tx "+ - "(%v) to confirm", completeChan.FundingOutpoint) - fndgLog.Warnf(err.Error()) - f.failFundingFlow(fmsg.peer, pendingChanID, err) - deleteFromDatabase() - return - case <-f.quit: - // The fundingManager is shutting down, will resume - // wait for funding transaction on startup. - return - case shortChanID, ok = <-confChan: - if !ok { - fndgLog.Errorf("waiting for funding confirmation" + - " failed") - return - } - // Fallthrough. - } - - // Success, funding transaction was confirmed. - err := f.handleFundingConfirmation( - fmsg.peer, completeChan, shortChanID, - ) - if err != nil { - fndgLog.Errorf("failed to handle funding"+ - "confirmation: %v", err) - return - } - }() + go f.advanceFundingState(completeChan, pendingChanID, nil) } // processFundingSigned sends a single funding sign complete message along with @@ -1710,176 +1775,53 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { // At this point we have broadcast the funding transaction and done all // necessary processing. f.wg.Add(1) - go func() { - defer f.wg.Done() - confChan := make(chan *lnwire.ShortChannelID) - cancelChan := make(chan struct{}) - - // In case the fundingManager is stopped at some point during - // the remaining part of the opening process, we must wait for - // this process to finish (either successfully or with some - // error), before the fundingManager can be shut down. - f.wg.Add(1) - go func() { - defer f.wg.Done() - f.waitForFundingConfirmation(completeChan, cancelChan, - confChan) - }() - - var shortChanID *lnwire.ShortChannelID - var ok bool - select { - case <-f.quit: - return - case shortChanID, ok = <-confChan: - if !ok { - fndgLog.Errorf("waiting for funding " + - "confirmation failed") - return - } - } - - fndgLog.Debugf("Channel with ShortChanID %v now confirmed", - shortChanID.ToUint64()) - - // Go on adding the channel to the channel graph, and crafting - // channel announcements. - lnChannel, err := lnwallet.NewLightningChannel( - nil, completeChan, nil, - ) - if err != nil { - fndgLog.Errorf("failed creating lnChannel: %v", err) - return - } - - err = f.sendFundingLocked( - fmsg.peer, completeChan, lnChannel, shortChanID, - ) - if err != nil { - fndgLog.Errorf("failed sending fundingLocked: %v", err) - return - } - fndgLog.Debugf("FundingLocked for channel with ShortChanID "+ - "%v sent", shortChanID.ToUint64()) - - err = f.addToRouterGraph(completeChan, shortChanID) - if err != nil { - fndgLog.Errorf("failed adding to router graph: %v", err) - return - } - fndgLog.Debugf("Channel with ShortChanID %v added to "+ - "router graph", shortChanID.ToUint64()) - - // Give the caller a final update notifying them that - // the channel is now open. - // TODO(roasbeef): only notify after recv of funding locked? - upd := &lnrpc.OpenStatusUpdate{ - Update: &lnrpc.OpenStatusUpdate_ChanOpen{ - ChanOpen: &lnrpc.ChannelOpenUpdate{ - ChannelPoint: &lnrpc.ChannelPoint{ - FundingTxid: &lnrpc.ChannelPoint_FundingTxidBytes{ - FundingTxidBytes: fundingPoint.Hash[:], - }, - OutputIndex: fundingPoint.Index, - }, - }, - }, - } - - select { - case resCtx.updates <- upd: - case <-f.quit: - return - } - - err = f.annAfterSixConfs(completeChan, shortChanID) - if err != nil { - fndgLog.Errorf("failed sending channel announcement: %v", - err) - return - } - }() + go f.advanceFundingState(completeChan, pendingChanID, resCtx.updates) } -// waitForFundingWithTimeout is a wrapper around waitForFundingConfirmation that -// will cancel the wait for confirmation if we are not the channel initiator and -// the maxWaitNumBlocksFundingConf has passed from bestHeight. -// In the case of timeout, the timeoutChan will be closed. In case of error, -// confChan will be closed. In case of success, a *lnwire.ShortChannelID will be -// passed to confChan. -func (f *fundingManager) waitForFundingWithTimeout(completeChan *channeldb.OpenChannel, - confChan chan<- *lnwire.ShortChannelID, timeoutChan chan<- struct{}) { +// waitForFundingWithTimeout is a wrapper around waitForFundingConfirmation and +// waitForTimeout that will return ErrConfirmationTimeout if we are not the +// channel initiator and the maxWaitNumBlocksFundingConf has passed from the +// funding broadcast height. In case of confirmation, the short channel ID of +// the channel will be returned. +func (f *fundingManager) waitForFundingWithTimeout( + ch *channeldb.OpenChannel) (*lnwire.ShortChannelID, error) { - epochClient, err := f.cfg.Notifier.RegisterBlockEpochNtfn(nil) - if err != nil { - fndgLog.Errorf("unable to register for epoch notification: %v", - err) - close(confChan) - return - } - - defer epochClient.Cancel() - - waitingConfChan := make(chan *lnwire.ShortChannelID) + confChan := make(chan *lnwire.ShortChannelID) + timeoutChan := make(chan error, 1) cancelChan := make(chan struct{}) - // Add this goroutine to wait group so we can be sure that it is - // properly stopped before the funding manager can be shut down. f.wg.Add(1) - go func() { - defer f.wg.Done() - f.waitForFundingConfirmation(completeChan, cancelChan, - waitingConfChan) - }() + go f.waitForFundingConfirmation(ch, cancelChan, confChan) - // On block maxHeight we will cancel the funding confirmation wait. - maxHeight := completeChan.FundingBroadcastHeight + maxWaitNumBlocksFundingConf - for { - select { - case epoch, ok := <-epochClient.Epochs: - if !ok { - fndgLog.Warnf("Epoch client shutting down") - return - } + // If we are not the initiator, we have no money at stake and will + // timeout waiting for the funding transaction to confirm after a + // while. + if !ch.IsInitiator { + f.wg.Add(1) + go f.waitForTimeout(ch, cancelChan, timeoutChan) + } + defer close(cancelChan) - // If we are not the channel initiator it's safe - // to timeout the channel - if uint32(epoch.Height) >= maxHeight && !completeChan.IsInitiator { - fndgLog.Warnf("waited for %v blocks without "+ - "seeing funding transaction confirmed,"+ - " cancelling.", maxWaitNumBlocksFundingConf) - - // Cancel the waitForFundingConfirmation - // goroutine. - close(cancelChan) - - // Notify the caller of the timeout. - close(timeoutChan) - return - } - - // TODO: If we are the channel initiator implement - // a method for recovering the funds from the funding - // transaction - - case <-f.quit: - // The fundingManager is shutting down, will resume - // waiting for the funding transaction on startup. - return - case shortChanID, ok := <-waitingConfChan: - if !ok { - // Failed waiting for confirmation, close - // confChan to indicate failure. - close(confChan) - return - } - - select { - case confChan <- shortChanID: - case <-f.quit: - return - } + var shortChanID *lnwire.ShortChannelID + var ok bool + select { + case err := <-timeoutChan: + if err != nil { + return nil, err } + return nil, ErrConfirmationTimeout + + case <-f.quit: + // The fundingManager is shutting down, and will resume wait on + // startup. + return nil, ErrFundingManagerShuttingDown + + case shortChanID, ok = <-confChan: + if !ok { + return nil, fmt.Errorf("waiting for funding" + + "confirmation failed") + } + return shortChanID, nil } } @@ -1904,10 +1846,13 @@ func makeFundingScript(channel *channeldb.OpenChannel) ([]byte, error) { // when a channel has become active for lightning transactions. // The wait can be canceled by closing the cancelChan. In case of success, // a *lnwire.ShortChannelID will be passed to confChan. +// +// NOTE: This MUST be run as a goroutine. func (f *fundingManager) waitForFundingConfirmation( completeChan *channeldb.OpenChannel, cancelChan <-chan struct{}, confChan chan<- *lnwire.ShortChannelID) { + defer f.wg.Done() defer close(confChan) // Register with the ChainNotifier for a notification once the funding @@ -1991,17 +1936,82 @@ func (f *fundingManager) waitForFundingConfirmation( TxPosition: uint16(fundingPoint.Index), } - // Now that the channel has been fully confirmed, we'll mark it as open - // within the database. - if err := completeChan.MarkAsOpen(shortChanID); err != nil { - fndgLog.Errorf("error setting channel pending flag to false: "+ - "%v", err) + select { + case confChan <- &shortChanID: + case <-f.quit: + return + } +} + +// waitForTimeout will close the timeout channel if maxWaitNumBlocksFundingConf +// has passed from the broadcast height of the given channel. In case of error, +// the error is sent on timeoutChan. The wait can be canceled by closing the +// cancelChan. +// +// NOTE: timeoutChan MUST be buffered. +// NOTE: This MUST be run as a goroutine. +func (f *fundingManager) waitForTimeout(completeChan *channeldb.OpenChannel, + cancelChan <-chan struct{}, timeoutChan chan<- error) { + defer f.wg.Done() + + epochClient, err := f.cfg.Notifier.RegisterBlockEpochNtfn(nil) + if err != nil { + timeoutChan <- fmt.Errorf("unable to register for epoch "+ + "notification: %v", err) return } - // Inform the ChannelNotifier that the channel has transitioned from - // pending open to open. - f.cfg.NotifyOpenChannelEvent(completeChan.FundingOutpoint) + defer epochClient.Cancel() + + // On block maxHeight we will cancel the funding confirmation wait. + maxHeight := completeChan.FundingBroadcastHeight + maxWaitNumBlocksFundingConf + for { + select { + case epoch, ok := <-epochClient.Epochs: + if !ok { + timeoutChan <- fmt.Errorf("epoch client " + + "shutting down") + return + } + + // Close the timeout channel and exit if the block is + // aboce the max height. + if uint32(epoch.Height) >= maxHeight { + fndgLog.Warnf("Waited for %v blocks without "+ + "seeing funding transaction confirmed,"+ + " cancelling.", + maxWaitNumBlocksFundingConf) + + // Notify the caller of the timeout. + close(timeoutChan) + return + } + + // TODO: If we are the channel initiator implement + // a method for recovering the funds from the funding + // transaction + + case <-cancelChan: + return + + case <-f.quit: + // The fundingManager is shutting down, will resume + // waiting for the funding transaction on startup. + return + } + } +} + +// handleFundingConfirmation marks a channel as open in the database, and set +// the channelOpeningState markedOpen. In addition it will report the now +// decided short channel ID to the switch, and close the local discovery signal +// for this channel. +func (f *fundingManager) handleFundingConfirmation( + completeChan *channeldb.OpenChannel, + shortChanID lnwire.ShortChannelID) error { + + fundingPoint := completeChan.FundingOutpoint + chanID := lnwire.NewChanIDFromOutPoint(&fundingPoint) // TODO(roasbeef): ideally persistent state update for chan above // should be abstracted @@ -2009,19 +2019,26 @@ func (f *fundingManager) waitForFundingConfirmation( // The funding transaction now being confirmed, we add this channel to // the fundingManager's internal persistent state machine that we use // to track the remaining process of the channel opening. This is - // useful to resume the opening process in case of restarts. - // - // TODO(halseth): make the two db transactions (MarkChannelAsOpen and - // saveChannelOpeningState) atomic by doing them in the same transaction. - // Needed to be properly fault-tolerant. - err = f.saveChannelOpeningState(&completeChan.FundingOutpoint, markedOpen, - &shortChanID) + // useful to resume the opening process in case of restarts. We set the + // opening state before we mark the channel opened in the database, + // such that we can receover from one of the db writes failing. + err := f.saveChannelOpeningState(&fundingPoint, markedOpen, &shortChanID) if err != nil { - fndgLog.Errorf("error setting channel state to markedOpen: %v", + return fmt.Errorf("error setting channel state to markedOpen: %v", err) - return } + // Now that the channel has been fully confirmed and we successfully + // saved the opening state, we'll mark it as open within the database. + if err := completeChan.MarkAsOpen(shortChanID); err != nil { + return fmt.Errorf("error setting channel pending flag to false: "+ + "%v", err) + } + + // Inform the ChannelNotifier that the channel has transitioned from + // pending open to open. + f.cfg.NotifyOpenChannelEvent(completeChan.FundingOutpoint) + // As there might already be an active link in the switch with an // outdated short chan ID, we'll instruct the switch to load the updated // short chan id from disk. @@ -2030,12 +2047,6 @@ func (f *fundingManager) waitForFundingConfirmation( fndgLog.Errorf("unable to report short chan id: %v", err) } - select { - case confChan <- &shortChanID: - case <-f.quit: - return - } - // Close the discoverySignal channel, indicating to a separate // goroutine that the channel now is marked as open in the database // and that it is acceptable to process funding locked messages @@ -2045,41 +2056,6 @@ func (f *fundingManager) waitForFundingConfirmation( close(discoverySignal) } f.localDiscoveryMtx.Unlock() -} - -// handleFundingConfirmation is a wrapper method for creating a new -// lnwallet.LightningChannel object, calling sendFundingLocked, -// addToRouterGraph, and annAfterSixConfs. This is called after the funding -// transaction is confirmed. -func (f *fundingManager) handleFundingConfirmation(peer lnpeer.Peer, - completeChan *channeldb.OpenChannel, - shortChanID *lnwire.ShortChannelID) error { - - // We create the state-machine object which wraps the database state. - lnChannel, err := lnwallet.NewLightningChannel( - nil, completeChan, nil, - ) - if err != nil { - return err - } - - chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint) - - fndgLog.Debugf("ChannelID(%v) is now fully confirmed!", chanID) - - err = f.sendFundingLocked(peer, completeChan, lnChannel, shortChanID) - if err != nil { - return fmt.Errorf("failed sending fundingLocked: %v", err) - } - err = f.addToRouterGraph(completeChan, shortChanID) - if err != nil { - return fmt.Errorf("failed adding to router graph: %v", err) - } - err = f.annAfterSixConfs(completeChan, shortChanID) - if err != nil { - return fmt.Errorf("failed sending channel announcement: %v", - err) - } return nil } @@ -2087,7 +2063,7 @@ func (f *fundingManager) handleFundingConfirmation(peer lnpeer.Peer, // sendFundingLocked creates and sends the fundingLocked message. // This should be called after the funding transaction has been confirmed, // and the channelState is 'markedOpen'. -func (f *fundingManager) sendFundingLocked(peer lnpeer.Peer, +func (f *fundingManager) sendFundingLocked( completeChan *channeldb.OpenChannel, channel *lnwallet.LightningChannel, shortChanID *lnwire.ShortChannelID) error { @@ -2118,8 +2094,18 @@ func (f *fundingManager) sendFundingLocked(peer lnpeer.Peer, // send fundingLocked until we succeed, or the fundingManager is shut // down. for { - fndgLog.Debugf("Sending FundingLocked for ChannelID(%v) to "+ - "peer %x", chanID, peerKey) + connected := make(chan lnpeer.Peer, 1) + f.cfg.NotifyWhenOnline(peerKey, connected) + + var peer lnpeer.Peer + select { + case peer = <-connected: + case <-f.quit: + return ErrFundingManagerShuttingDown + } + + fndgLog.Infof("Peer(%x) is online, sending FundingLocked "+ + "for ChannelID(%v)", peerKey, chanID) if err := peer.SendMessage(false, fundingLockedMsg); err == nil { // Sending succeeded, we can break out and continue the @@ -2129,31 +2115,6 @@ func (f *fundingManager) sendFundingLocked(peer lnpeer.Peer, fndgLog.Warnf("Unable to send fundingLocked to peer %x: %v. "+ "Will retry when online", peerKey, err) - - connected := make(chan lnpeer.Peer, 1) - f.cfg.NotifyWhenOnline(peerKey, connected) - - select { - case <-connected: - fndgLog.Infof("Peer(%x) came back online, will retry "+ - "sending FundingLocked for ChannelID(%v)", - peerKey, chanID) - - // Retry sending. - case <-f.quit: - return ErrFundingManagerShuttingDown - } - } - - // As the fundingLocked message is now sent to the peer, the channel is - // moved to the next state of the state machine. It will be moved to the - // last state (actually deleted from the database) after the channel is - // finally announced. - err = f.saveChannelOpeningState(&completeChan.FundingOutpoint, - fundingLockedSent, shortChanID) - if err != nil { - return fmt.Errorf("error setting channel state to"+ - " fundingLockedSent: %v", err) } return nil @@ -2234,17 +2195,6 @@ func (f *fundingManager) addToRouterGraph(completeChan *channeldb.OpenChannel, return ErrFundingManagerShuttingDown } - // As the channel is now added to the ChannelRouter's topology, the - // channel is moved to the next state of the state machine. It will be - // moved to the last state (actually deleted from the database) after - // the channel is finally announced. - err = f.saveChannelOpeningState(&completeChan.FundingOutpoint, - addedToRouterGraph, shortChanID) - if err != nil { - return fmt.Errorf("error setting channel state to"+ - " addedToRouterGraph: %v", err) - } - return nil } @@ -2292,6 +2242,9 @@ func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel, fndgLog.Debugf("Sending our NodeAnnouncement for "+ "ChannelID(%v) to %x", chanID, pubKey) + // TODO(halseth): make reliable. If the peer is not online this + // will fail, and the opening process will stop. Should instead + // block here, waiting for the peer to come online. if err := peer.SendMessage(true, &nodeAnn); err != nil { return fmt.Errorf("unable to send node announcement "+ "to peer %x: %v", pubKey, err) @@ -2386,15 +2339,6 @@ func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel, "announced", &fundingPoint, shortChanID) } - // We delete the channel opening state from our internal database - // as the opening process has succeeded. We can do this because we - // assume the AuthenticatedGossiper queues the announcement messages, - // and persists them in case of a daemon shutdown. - err := f.deleteChannelOpeningState(&completeChan.FundingOutpoint) - if err != nil { - return fmt.Errorf("error deleting channel state: %v", err) - } - return nil } diff --git a/fundingmanager_test.go b/fundingmanager_test.go index f5e3fa5bc..05c0f41c3 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -244,7 +244,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, chainNotifier := &mockNotifier{ oneConfChannel: make(chan *chainntnfs.TxConfirmation, 1), sixConfChannel: make(chan *chainntnfs.TxConfirmation, 1), - epochChan: make(chan *chainntnfs.BlockEpoch, 1), + epochChan: make(chan *chainntnfs.BlockEpoch, 2), } sentMessages := make(chan lnwire.Message) @@ -398,7 +398,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, f.cfg.NotifyWhenOnline = func(peer [33]byte, connectedChan chan<- lnpeer.Peer) { - connectedChan <- testNode + connectedChan <- testNode.remotePeer } return testNode, nil @@ -679,6 +679,8 @@ func fundChannel(t *testing.T, alice, bob *testNode, localFundingAmt, } func assertErrorNotSent(t *testing.T, msgChan chan lnwire.Message) { + t.Helper() + select { case <-msgChan: t.Fatalf("error sent unexpectedly") @@ -688,6 +690,8 @@ func assertErrorNotSent(t *testing.T, msgChan chan lnwire.Message) { } func assertErrorSent(t *testing.T, msgChan chan lnwire.Message) { + t.Helper() + var msg lnwire.Message select { case msg = <-msgChan: @@ -748,6 +752,8 @@ func assertFundingMsgSent(t *testing.T, msgChan chan lnwire.Message, func assertNumPendingReservations(t *testing.T, node *testNode, peerPubKey *btcec.PublicKey, expectedNum int) { + t.Helper() + serializedPubKey := newSerializedKey(peerPubKey) actualNum := len(node.fundingMgr.activeReservations[serializedPubKey]) if actualNum == expectedNum { @@ -760,6 +766,8 @@ func assertNumPendingReservations(t *testing.T, node *testNode, } func assertNumPendingChannelsBecomes(t *testing.T, node *testNode, expectedNum int) { + t.Helper() + var numPendingChans int for i := 0; i < testPollNumTries; i++ { // If this is not the first try, sleep before retrying. @@ -784,6 +792,8 @@ func assertNumPendingChannelsBecomes(t *testing.T, node *testNode, expectedNum i } func assertNumPendingChannelsRemains(t *testing.T, node *testNode, expectedNum int) { + t.Helper() + var numPendingChans int for i := 0; i < 5; i++ { // If this is not the first try, sleep before retrying. @@ -807,6 +817,7 @@ func assertNumPendingChannelsRemains(t *testing.T, node *testNode, expectedNum i func assertDatabaseState(t *testing.T, node *testNode, fundingOutPoint *wire.OutPoint, expectedState channelOpeningState) { + t.Helper() var state channelOpeningState var err error @@ -839,18 +850,24 @@ func assertDatabaseState(t *testing.T, node *testNode, func assertMarkedOpen(t *testing.T, alice, bob *testNode, fundingOutPoint *wire.OutPoint) { + t.Helper() + assertDatabaseState(t, alice, fundingOutPoint, markedOpen) assertDatabaseState(t, bob, fundingOutPoint, markedOpen) } func assertFundingLockedSent(t *testing.T, alice, bob *testNode, fundingOutPoint *wire.OutPoint) { + t.Helper() + assertDatabaseState(t, alice, fundingOutPoint, fundingLockedSent) assertDatabaseState(t, bob, fundingOutPoint, fundingLockedSent) } func assertAddedToRouterGraph(t *testing.T, alice, bob *testNode, fundingOutPoint *wire.OutPoint) { + t.Helper() + assertDatabaseState(t, alice, fundingOutPoint, addedToRouterGraph) assertDatabaseState(t, bob, fundingOutPoint, addedToRouterGraph) } @@ -957,6 +974,8 @@ func assertChannelAnnouncements(t *testing.T, alice, bob *testNode, } func assertAnnouncementSignatures(t *testing.T, alice, bob *testNode) { + t.Helper() + // After the FundingLocked message is sent and six confirmations have // been reached, the channel will be announced to the greater network // by having the nodes exchange announcement signatures. @@ -1013,6 +1032,7 @@ func waitForOpenUpdate(t *testing.T, updateChan chan *lnrpc.OpenStatusUpdate) { func assertNoChannelState(t *testing.T, alice, bob *testNode, fundingOutPoint *wire.OutPoint) { + t.Helper() assertErrChannelNotFound(t, alice, fundingOutPoint) assertErrChannelNotFound(t, bob, fundingOutPoint) @@ -1020,6 +1040,7 @@ func assertNoChannelState(t *testing.T, alice, bob *testNode, func assertErrChannelNotFound(t *testing.T, node *testNode, fundingOutPoint *wire.OutPoint) { + t.Helper() var state channelOpeningState var err error @@ -1043,6 +1064,8 @@ func assertErrChannelNotFound(t *testing.T, node *testNode, } func assertHandleFundingLocked(t *testing.T, alice, bob *testNode) { + t.Helper() + // They should both send the new channel state to their peer. select { case c := <-alice.newChannels: @@ -1725,7 +1748,7 @@ func TestFundingManagerFundingNotTimeoutInitiator(t *testing.T) { t.Fatalf("alice did not publish funding tx") } - // Increase the height to 1 minus the maxWaitNumBlocksFundingConf height + // Increase the height to 1 minus the maxWaitNumBlocksFundingConf height. alice.mockNotifier.epochChan <- &chainntnfs.BlockEpoch{ Height: fundingBroadcastHeight + maxWaitNumBlocksFundingConf - 1, } @@ -1734,12 +1757,12 @@ func TestFundingManagerFundingNotTimeoutInitiator(t *testing.T) { Height: fundingBroadcastHeight + maxWaitNumBlocksFundingConf - 1, } - // Assert both and Alice and Bob still have 1 pending channels + // Assert both and Alice and Bob still have 1 pending channels. assertNumPendingChannelsRemains(t, alice, 1) assertNumPendingChannelsRemains(t, bob, 1) - // Increase both Alice and Bob to maxWaitNumBlocksFundingConf height + // Increase both Alice and Bob to maxWaitNumBlocksFundingConf height. alice.mockNotifier.epochChan <- &chainntnfs.BlockEpoch{ Height: fundingBroadcastHeight + maxWaitNumBlocksFundingConf, } @@ -1748,13 +1771,13 @@ func TestFundingManagerFundingNotTimeoutInitiator(t *testing.T) { Height: fundingBroadcastHeight + maxWaitNumBlocksFundingConf, } - // Since Alice was the initiator, the channel should not have timed out + // Since Alice was the initiator, the channel should not have timed out. assertNumPendingChannelsRemains(t, alice, 1) // Bob should have sent an Error message to Alice. assertErrorSent(t, bob.msgChan) - // Since Bob was not the initiator, the channel should timeout + // Since Bob was not the initiator, the channel should timeout. assertNumPendingChannelsBecomes(t, bob, 0) }