peer: make activeChanCloses a SyncMap

This fixes some existing race conditions, as the `finalizeChanClosure`
function was being called from outside the main event loop.
This commit is contained in:
Olaoluwa Osuntokun 2024-03-07 20:12:21 -08:00
parent 413cc3da35
commit 029fbd22fd

View File

@ -563,7 +563,7 @@ type Brontide struct {
// cooperative channel closures. Any channel closing messages are directed
// to one of these active state machines. Once the channel has been closed,
// the state machine will be deleted from the map.
activeChanCloses map[lnwire.ChannelID]chanCloserFsm
activeChanCloses *lnutils.SyncMap[lnwire.ChannelID, chanCloserFsm]
// localCloseChanReqs is a channel in which any local requests to close
// a particular channel are sent over.
@ -649,8 +649,10 @@ func NewBrontide(cfg Config) *Brontide {
newPendingChannel: make(chan *newChannelMsg, 1),
removePendingChannel: make(chan *newChannelMsg),
activeMsgStreams: make(map[lnwire.ChannelID]*msgStream),
activeChanCloses: make(map[lnwire.ChannelID]chanCloserFsm),
activeMsgStreams: make(map[lnwire.ChannelID]*msgStream),
activeChanCloses: &lnutils.SyncMap[
lnwire.ChannelID, chanCloserFsm,
]{},
localCloseChanReqs: make(chan *htlcswitch.ChanClose),
linkFailures: make(chan linkFailureReport),
chanCloseMsgs: make(chan *closeMsg),
@ -1260,16 +1262,14 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
lnChan.State().FundingOutpoint,
)
chanCloser := makeNegotiateCloser(
p.activeChanCloses.Store(chanID, makeNegotiateCloser(
negotiateChanCloser,
)
p.activeChanCloses[chanID] = chanCloser
))
// Create the Shutdown message.
shutdown, err := negotiateChanCloser.ShutdownChan()
if err != nil {
delete(p.activeChanCloses, chanID)
p.activeChanCloses.Delete(chanID)
shutdownInfoErr = err
return
@ -1313,7 +1313,7 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
// Creating this here ensures that any shutdown messages sent
// will be automatically routed by the msg router.
if _, err := p.initRbfChanCloser(lnChan); err != nil {
delete(p.activeChanCloses, chanID)
p.activeChanCloses.Delete(chanID)
return nil, fmt.Errorf("unable to init RBF chan "+
"closer during peer connect: %w", err)
@ -3014,7 +3014,7 @@ func (p *Brontide) reenableActiveChannels() {
func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) (
*chanCloserFsm, error) {
chanCloser, found := p.activeChanCloses[chanID]
chanCloser, found := p.activeChanCloses.Load(chanID)
if found {
// An entry will only be found if the closer has already been
// created for a non-pending channel or for a channel that had
@ -3076,7 +3076,7 @@ func (p *Brontide) fetchActiveChanCloser(chanID lnwire.ChannelID) (
chanCloser = makeNegotiateCloser(negotiateChanCloser)
p.activeChanCloses[chanID] = chanCloser
p.activeChanCloses.Store(chanID, chanCloser)
return &chanCloser, nil
}
@ -3355,13 +3355,13 @@ func (p *Brontide) restartCoopClose(lnChan *lnwallet.LightningChannel) (
// This does not need a mutex even though it is in a different
// goroutine since this is done before the channelManager goroutine is
// created.
p.activeChanCloses[chanID] = makeNegotiateCloser(chanCloser)
p.activeChanCloses.Store(chanID, makeNegotiateCloser(chanCloser))
// Create the Shutdown message.
shutdownMsg, err := chanCloser.ShutdownChan()
if err != nil {
p.log.Errorf("unable to create shutdown message: %v", err)
delete(p.activeChanCloses, chanID)
p.activeChanCloses.Delete(chanID)
return nil, err
}
@ -3454,8 +3454,8 @@ func (p *Brontide) initNegotiateChanCloser(req *htlcswitch.ChanClose,
return fmt.Errorf("unable to make chan closer: %w", err)
}
chanID := lnwire.NewChanIDFromOutPoint(*req.ChanPoint)
p.activeChanCloses[chanID] = makeNegotiateCloser(chanCloser)
chanID := lnwire.NewChanIDFromOutPoint(channel.ChannelPoint())
p.activeChanCloses.Store(chanID, makeNegotiateCloser(chanCloser))
// Finally, we'll initiate the channel shutdown within the
// chanCloser, and send the shutdown message to the remote
@ -3466,7 +3466,7 @@ func (p *Brontide) initNegotiateChanCloser(req *htlcswitch.ChanClose,
// back to its normal state.
defer channel.ResetState()
delete(p.activeChanCloses, chanID)
p.activeChanCloses.Delete(chanID)
return fmt.Errorf("unable to shutdown channel: %w", err)
}
@ -3585,10 +3585,13 @@ func (p *Brontide) observeRbfCloseUpdates(chanCloser *chancloser.RbfChanCloser,
}
// TODO(roasbeef): race, make to sync map?
// other clean up?
chanID := lnwire.NewChanIDFromOutPoint(
*closeReq.ChanPoint,
)
delete(p.activeChanCloses, chanID)
p.activeChanCloses.Delete(chanID)
return
}
case <-closeReq.Ctx.Done():
@ -3657,7 +3660,7 @@ func (c *chanErrorReporter) ReportError(chanErr error) {
}
if _, err := c.peer.initRbfChanCloser(lnChan); err != nil {
delete(c.peer.activeChanCloses, c.chanID)
c.peer.activeChanCloses.Delete(c.chanID)
c.peer.log.Errorf("unable to init RBF chan closer after "+
"error case: %v", err)
@ -3823,13 +3826,12 @@ func (p *Brontide) initRbfChanCloser(
})
if err != nil {
chanCloser.Stop()
delete(p.activeChanCloses, chanID)
return nil, fmt.Errorf("unable to register endpoint for co-op "+
"close: %w", err)
}
p.activeChanCloses[chanID] = makeRbfCloser(&chanCloser)
p.activeChanCloses.Store(chanID, makeRbfCloser(&chanCloser))
// Now that we've created the rbf closer state machine, we'll launch a
// new goroutine to eventually send in the ChannelFlushed event once
@ -3923,7 +3925,7 @@ func (p *Brontide) startRbfChanCloser(shutdown shutdownInit,
// Unlike the old negotiate chan closer, we'll always create the RBF
// chan closer on startup, so we can skip init here.
chanID := lnwire.NewChanIDFromOutPoint(channel.ChannelPoint())
chanCloser, found := p.activeChanCloses[chanID]
chanCloser, found := p.activeChanCloses.Load(chanID)
if !found {
return fmt.Errorf("rbf can closer not found for channel %v",
channel.ChannelPoint())
@ -4156,7 +4158,7 @@ func (p *Brontide) finalizeChanClosure(chanCloser *chancloser.ChanCloser) {
// Also clear the activeChanCloses map of this channel.
cid := lnwire.NewChanIDFromOutPoint(chanPoint)
delete(p.activeChanCloses, cid) // TODO(roasbeef): existing race
p.activeChanCloses.Delete(cid) // TODO(roasbeef): existing race
// Next, we'll launch a goroutine which will request to be notified by
// the ChainNotifier once the closure transaction obtains a single
@ -4646,7 +4648,7 @@ func (p *Brontide) handleCloseMsg(msg *closeMsg) {
chanCloser.CloseRequest().Err <- err
}
delete(p.activeChanCloses, msg.cid)
p.activeChanCloses.Delete(msg.cid)
p.Disconnect(err)
}
@ -4973,7 +4975,7 @@ func (p *Brontide) addActiveChannel(c *lnpeer.NewChannel) error {
// Creating this here ensures that any shutdown messages sent will be
// automatically routed by the msg router.
if _, err := p.initRbfChanCloser(lnChan); err != nil {
delete(p.activeChanCloses, chanID)
p.activeChanCloses.Delete(chanID)
return fmt.Errorf("unable to init RBF chan closer for new "+
"chan: %w", err)