fundingmanager: define stateStep, make advanceFundingState sync

This commit make the advanceFundingStateMethod synchronous. It will now
query the database for a channel's opening state, and call the method
stateStep until the channel has finished the opening procedure.
This commit is contained in:
Johan T. Halseth
2018-09-16 10:40:09 +02:00
parent 08cb313934
commit cbf1fe6bb1

View File

@ -598,10 +598,8 @@ func (f *fundingManager) start() error {
} }
for _, channel := range openChannels { for _, channel := range openChannels {
err = f.advanceFundingState(channel) f.wg.Add(1)
if err != nil { go f.advanceFundingState(channel, nil)
return err
}
} }
f.wg.Add(1) // TODO(roasbeef): tune f.wg.Add(1) // TODO(roasbeef): tune
@ -829,43 +827,79 @@ func (f *fundingManager) reservationCoordinator() {
// advanceFundingState will advance the channel fromt the markedOpen state to // advanceFundingState will advance the channel fromt the markedOpen state to
// the point where the channel is ready for operation. This includes sending // the point where the channel is ready for operation. This includes sending
// funding locked to the peer, adding the channel to the router graph, and // funding locked to the peer, adding the channel to the router graph, and
// announcing the channel. // announcing the channel. The updateChan can be set non-nil to get
func (f *fundingManager) advanceFundingState( // OpenStatusUpdates.
channel *channeldb.OpenChannel) error { //
// NOTE: This MUST be run as a goroutine.
func (f *fundingManager) advanceFundingState(channel *channeldb.OpenChannel,
updateChan chan<- *lnrpc.OpenStatusUpdate) {
defer f.wg.Done()
// We create the state-machine object which wraps the database state.
lnChannel, err := lnwallet.NewLightningChannel(
nil, channel, nil,
)
if err != nil {
fndgLog.Errorf("Unable to create LightningChannel(%v): %v",
channel.FundingOutpoint, err)
return
}
for {
channelState, shortChanID, err := f.getChannelOpeningState( channelState, shortChanID, err := f.getChannelOpeningState(
&channel.FundingOutpoint) &channel.FundingOutpoint,
)
if err == ErrChannelNotFound { if err == ErrChannelNotFound {
// Channel not in fundingManager's opening database, // Channel not in fundingManager's opening database,
// meaning it was successfully announced to the // meaning it was successfully announced to the
// network. // network.
return nil return
} else if err != nil { } else if err != nil {
return err fndgLog.Errorf("Unable to query database for "+
"channel opening state(%v): %v",
channel.FundingOutpoint, err)
return
} }
chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint) // If we did find the channel in the opening state database, we
fndgLog.Debugf("channel (%v) with opening state %v found", // have seen the funding transaction being confirmed, but there
chanID, channelState) // are still steps left of the setup procedure. We continue the
// procedure where we left off.
err = f.stateStep(
channel, lnChannel, shortChanID, channelState,
updateChan,
)
if err != nil {
fndgLog.Errorf("Unable to advance state(%v): %v",
channel.FundingOutpoint, err)
return
}
}
}
// stateStep advances the confirmed channel one step in the funding state
// machine. This method is synchronous and the new channel opening state will
// have been written to the database when it successfully returns. The
// updateChan can be set non-nil to get OpenStatusUpdates.
func (f *fundingManager) stateStep(channel *channeldb.OpenChannel,
lnChannel *lnwallet.LightningChannel,
shortChanID *lnwire.ShortChannelID, channelState channelOpeningState,
updateChan chan<- *lnrpc.OpenStatusUpdate) error {
chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint)
fndgLog.Debugf("Channel(%v) with ShortChanID %v has opening state %v",
chanID, shortChanID, channelState)
// If we did find the channel in the opening state database, we have
// seen the funding transaction being confirmed, but we did not finish
// the rest of the setup procedure before we shut down. We handle the
// remaining steps of this setup by continuing the procedure where we
// left off.
switch channelState { switch channelState {
// The funding transaction was confirmed, but we did not successfully // The funding transaction was confirmed, but we did not successfully
// send the fundingLocked message to the peer, so let's do that now. // send the fundingLocked message to the peer, so let's do that now.
case markedOpen: case markedOpen:
f.wg.Add(1)
go func(dbChan *channeldb.OpenChannel) {
defer f.wg.Done()
peerChan := make(chan lnpeer.Peer, 1) peerChan := make(chan lnpeer.Peer, 1)
var peerKey [33]byte var peerKey [33]byte
copy(peerKey[:], dbChan.IdentityPub.SerializeCompressed()) copy(peerKey[:], channel.IdentityPub.SerializeCompressed())
f.cfg.NotifyWhenOnline(peerKey, peerChan) f.cfg.NotifyWhenOnline(peerKey, peerChan)
@ -873,67 +907,52 @@ func (f *fundingManager) advanceFundingState(
select { select {
case peer = <-peerChan: case peer = <-peerChan:
case <-f.quit: case <-f.quit:
return return ErrFundingManagerShuttingDown
} }
err := f.handleFundingConfirmation(
peer, dbChan, shortChanID, err := f.sendFundingLocked(
peer, channel, lnChannel, shortChanID,
) )
if err != nil { if err != nil {
fndgLog.Errorf("Failed to handle "+ return fmt.Errorf("failed sending fundingLocked: %v",
"funding confirmation: %v", err) err)
return
} }
}(channel)
fndgLog.Debugf("Channel(%v) with ShortChanID %v: successfully "+
"sent FundingLocked", chanID, shortChanID)
return nil
// fundingLocked was sent to peer, but the channel was not added to the // fundingLocked was sent to peer, but the channel was not added to the
// router graph and the channel announcement was not sent. // router graph and the channel announcement was not sent.
case fundingLockedSent: case fundingLockedSent:
f.wg.Add(1) err := f.addToRouterGraph(channel, shortChanID)
go func(dbChan *channeldb.OpenChannel) {
defer f.wg.Done()
err = f.addToRouterGraph(dbChan, shortChanID)
if err != nil { if err != nil {
fndgLog.Errorf("failed adding to "+ return fmt.Errorf("failed adding to "+
"router graph: %v", err) "router graph: %v", err)
return
} }
// TODO(halseth): should create a state machine fndgLog.Debugf("Channel(%v) with ShortChanID %v: successfully "+
// that can more easily be resumed from "added to router graph", chanID, shortChanID)
// different states, to avoid this code
// duplication. return nil
err = f.annAfterSixConfs(dbChan, shortChanID)
if err != nil {
fndgLog.Errorf("error sending channel "+
"announcements: %v", err)
return
}
}(channel)
// The channel was added to the Router's topology, but the channel // The channel was added to the Router's topology, but the channel
// announcement was not sent. // announcement was not sent.
case addedToRouterGraph: case addedToRouterGraph:
f.wg.Add(1) err := f.annAfterSixConfs(channel, shortChanID)
go func(dbChan *channeldb.OpenChannel) {
defer f.wg.Done()
err = f.annAfterSixConfs(dbChan, shortChanID)
if err != nil { if err != nil {
fndgLog.Errorf("error sending channel "+ return fmt.Errorf("error sending channel "+
"announcement: %v", err) "announcement: %v", err)
return
}
}(channel)
default:
return fmt.Errorf("undefined channelState: %v",
channelState)
} }
fndgLog.Debugf("Channel(%v) with ShortChanID %v: successfully "+
"announced", chanID, shortChanID)
return nil return nil
} }
return fmt.Errorf("undefined channelState: %v", channelState)
}
// handlePendingChannels responds to a request for details concerning all // handlePendingChannels responds to a request for details concerning all
// currently pending channels waiting for the final phase of the funding // currently pending channels waiting for the final phase of the funding
// workflow (funding txn confirmation). // workflow (funding txn confirmation).