contractcourt: deadlock fix via temporary unmerged map

The main idea is that NotifyContractUpdate adds the ContractUpdate to
a map called unmerged. It is populated in Start by shallow-copying the
activeHTLCs map values (htlcSet). The htlcSets underlying maps are not
copied, and so unmerged will just contain pointers to them. This should
be fine since unmerged will not modify them. At the call-sites of
activeHTLCs, it is updated to include the unmerged sets. This happens
with a mutex and should not cause any data race, even though it is
copying the underlying map pointers. No persistence should be
necessary since on restart, activeHTLCs and unmerged will just be
populated again.
This commit is contained in:
eugene 2022-03-08 14:27:27 -05:00
parent 1e04f191a1
commit ef83370dcf
No known key found for this signature in database
GPG Key ID: 118759E83439A9B1
3 changed files with 89 additions and 75 deletions

View File

@ -1010,7 +1010,8 @@ func (c *ChainArbitrator) NotifyContractUpdate(chanPoint wire.OutPoint,
return fmt.Errorf("can't find arbitrator for %v", chanPoint)
}
return arbitrator.notifyContractUpdate(update)
arbitrator.notifyContractUpdate(update)
return nil
}
// GetChannelArbitrator safely returns the channel arbitrator for a given

View File

@ -29,12 +29,6 @@ var (
// close a channel that's already in the process of doing so.
errAlreadyForceClosed = errors.New("channel is already in the " +
"process of being force closed")
// errChanArbShuttingDown is an error returned when the channel arb is
// shutting down during the hand-off in notifyContractUpdate. This is
// mainly used to be able to notify the original caller (the link) that
// an error occurred.
errChanArbShuttingDown = errors.New("channel arb shutting down")
)
const (
@ -335,6 +329,13 @@ type ChannelArbitrator struct {
// currently valid commitment transactions.
activeHTLCs map[HtlcSetKey]htlcSet
// unmergedSet is used to update the activeHTLCs map in two callsites:
// checkLocalChainActions and sweepAnchors. It contains the latest
// updates from the link. It is not deleted from, its entries may be
// replaced on subsequent calls to notifyContractUpdate.
unmergedSet map[HtlcSetKey]htlcSet
unmergedMtx sync.RWMutex
// cfg contains all the functionality that the ChannelArbitrator requires
// to do its duty.
cfg ChannelArbitratorConfig
@ -348,11 +349,6 @@ type ChannelArbitrator struct {
// we're watching over will be sent.
signalUpdates chan *signalUpdateMsg
// htlcUpdates is a channel that is sent upon with new updates from the
// active channel. Each time a new commitment state is accepted, the
// set of HTLC's on the new state should be sent across this channel.
htlcUpdates chan *contractUpdateSignal
// activeResolvers is a slice of any active resolvers. This is used to
// be able to signal them for shutdown in the case that we shutdown.
activeResolvers []ContractResolver
@ -383,14 +379,27 @@ type ChannelArbitrator struct {
func NewChannelArbitrator(cfg ChannelArbitratorConfig,
htlcSets map[HtlcSetKey]htlcSet, log ArbitratorLog) *ChannelArbitrator {
// Create a new map for unmerged HTLC's as we will overwrite the values
// and want to avoid modifying activeHTLCs directly. This soft copying
// is done to ensure that activeHTLCs isn't reset as an empty map later
// on.
unmerged := make(map[HtlcSetKey]htlcSet)
unmerged[LocalHtlcSet] = htlcSets[LocalHtlcSet]
unmerged[RemoteHtlcSet] = htlcSets[RemoteHtlcSet]
// If the pending set exists, write that as well.
if _, ok := htlcSets[RemotePendingHtlcSet]; ok {
unmerged[RemotePendingHtlcSet] = htlcSets[RemotePendingHtlcSet]
}
return &ChannelArbitrator{
log: log,
blocks: make(chan int32, arbitratorBlockBufferSize),
signalUpdates: make(chan *signalUpdateMsg),
htlcUpdates: make(chan *contractUpdateSignal),
resolutionSignal: make(chan struct{}),
forceCloseReqs: make(chan *forceCloseReq),
activeHTLCs: htlcSets,
unmergedSet: unmerged,
cfg: cfg,
quit: make(chan struct{}),
}
@ -819,6 +828,10 @@ func (c *ChannelArbitrator) stateStep(
if confCommitSet != nil {
htlcs = confCommitSet.toActiveHTLCSets()
} else {
// Update the set of activeHTLCs so
// checkLocalChainActions has an up-to-date view of the
// commitments.
c.updateActiveHTLCs()
htlcs = c.activeHTLCs
}
chainActions, err := c.checkLocalChainActions(
@ -1217,6 +1230,10 @@ func (c *ChannelArbitrator) sweepAnchors(anchors *lnwallet.AnchorResolutions,
return nil
}
// Update the set of activeHTLCs so that the sweeping routine has an
// up-to-date view of the set of commitments.
c.updateActiveHTLCs()
// Sweep anchors based on different HTLC sets. Notice the HTLC sets may
// differ across commitments, thus their deadline values could vary.
for htlcSet, htlcs := range c.activeHTLCs {
@ -2396,39 +2413,40 @@ func (c *ChannelArbitrator) UpdateContractSignals(newSignals *ContractSignals) {
}
}
// contractUpdateSignal is a struct that carries the latest set of
// ContractUpdate for a particular key. It also carries a done chan that should
// be closed by the recipient.
type contractUpdateSignal struct {
// newUpdate contains the latest ContractUpdate for a key.
newUpdate *ContractUpdate
// notifyContractUpdate updates the ChannelArbitrator's unmerged mappings such
// that it can later be merged with activeHTLCs when calling
// checkLocalChainActions or sweepAnchors. These are the only two places that
// activeHTLCs is used.
func (c *ChannelArbitrator) notifyContractUpdate(upd *ContractUpdate) {
c.unmergedMtx.Lock()
defer c.unmergedMtx.Unlock()
// doneChan is an acknowledgement channel.
doneChan chan struct{}
// Update the mapping.
c.unmergedSet[upd.HtlcKey] = newHtlcSet(upd.Htlcs)
log.Tracef("ChannelArbitrator(%v): fresh set of htlcs=%v",
c.cfg.ChanPoint,
newLogClosure(func() string {
return spew.Sdump(upd)
}),
)
}
// notifyContractUpdate notifies the ChannelArbitrator that a new
// ContractUpdate is available from the link. The link will be paused until
// this function returns.
func (c *ChannelArbitrator) notifyContractUpdate(upd *ContractUpdate) error {
done := make(chan struct{})
// updateActiveHTLCs merges the unmerged set of HTLCs from the link with
// activeHTLCs.
func (c *ChannelArbitrator) updateActiveHTLCs() {
c.unmergedMtx.RLock()
defer c.unmergedMtx.RUnlock()
select {
case c.htlcUpdates <- &contractUpdateSignal{
newUpdate: upd,
doneChan: done,
}:
case <-c.quit:
return errChanArbShuttingDown
// Update the mapping.
c.activeHTLCs[LocalHtlcSet] = c.unmergedSet[LocalHtlcSet]
c.activeHTLCs[RemoteHtlcSet] = c.unmergedSet[RemoteHtlcSet]
// If the pending set exists, update that as well.
if _, ok := c.unmergedSet[RemotePendingHtlcSet]; ok {
pendingSet := c.unmergedSet[RemotePendingHtlcSet]
c.activeHTLCs[RemotePendingHtlcSet] = pendingSet
}
select {
case <-done:
case <-c.quit:
return errChanArbShuttingDown
}
return nil
}
// channelAttendant is the primary goroutine that acts at the judicial
@ -2500,30 +2518,6 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
// registered the new ShortChannelID.
close(signalUpdate.doneChan)
// A new set of HTLC's has been added or removed from the
// commitment transaction. So we'll update our activeHTLCs map
// accordingly.
case htlcUpdate := <-c.htlcUpdates:
// We'll wipe out our old set of HTLC's for each
// htlcSetKey type included in this update in order to
// only monitor the HTLCs that are still active on this
// target commitment.
htlcKey := htlcUpdate.newUpdate.HtlcKey
c.activeHTLCs[htlcKey] = newHtlcSet(
htlcUpdate.newUpdate.Htlcs,
)
// Now that the activeHTLCs have been updated, we'll
// close the done channel.
close(htlcUpdate.doneChan)
log.Tracef("ChannelArbitrator(%v): fresh set of htlcs=%v",
c.cfg.ChanPoint,
newLogClosure(func() string {
return spew.Sdump(htlcUpdate.newUpdate)
}),
)
// We've cooperatively closed the channel, so we're no longer
// needed. We'll mark the channel as resolved and exit.
case closeInfo := <-c.cfg.ChainEvents.CooperativeClosure:

View File

@ -864,8 +864,7 @@ func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) {
HtlcKey: LocalHtlcSet,
Htlcs: htlcSet,
}
err = chanArb.notifyContractUpdate(newUpdate)
require.NoError(t, err)
chanArb.notifyContractUpdate(newUpdate)
errChan := make(chan error, 1)
respChan := make(chan *wire.MsgTx, 1)
@ -1872,8 +1871,7 @@ func TestChannelArbitratorDanglingCommitForceClose(t *testing.T) {
HtlcKey: htlcKey,
Htlcs: []channeldb.HTLC{danglingHTLC},
}
err = chanArb.notifyContractUpdate(newUpdate)
require.NoError(t, err)
chanArb.notifyContractUpdate(newUpdate)
// At this point, we now have a split commitment state
// from the PoV of the channel arb. There's now an HTLC
@ -2061,8 +2059,7 @@ func TestChannelArbitratorPendingExpiredHTLC(t *testing.T) {
HtlcKey: RemoteHtlcSet,
Htlcs: []channeldb.HTLC{pendingHTLC},
}
err = chanArb.notifyContractUpdate(newUpdate)
require.NoError(t, err)
chanArb.notifyContractUpdate(newUpdate)
// We will advance the uptime to 10 seconds which should be still within
// the grace period and should not trigger going to chain.
@ -2408,6 +2405,14 @@ func TestSweepAnchors(t *testing.T) {
htlcDust.HtlcIndex: htlcDust,
},
}
chanArb.unmergedSet[LocalHtlcSet] = htlcSet{
incomingHTLCs: map[uint64]channeldb.HTLC{
htlcWithPreimage.HtlcIndex: htlcWithPreimage,
},
outgoingHTLCs: map[uint64]channeldb.HTLC{
htlcDust.HtlcIndex: htlcDust,
},
}
// Setup our remote HTLC set such that no valid HTLCs can be used, thus
// we default to anchorSweepConfTarget.
@ -2420,6 +2425,14 @@ func TestSweepAnchors(t *testing.T) {
htlcDust.HtlcIndex: htlcDust,
},
}
chanArb.unmergedSet[RemoteHtlcSet] = htlcSet{
incomingHTLCs: map[uint64]channeldb.HTLC{
htlcSmallExipry.HtlcIndex: htlcSmallExipry,
},
outgoingHTLCs: map[uint64]channeldb.HTLC{
htlcDust.HtlcIndex: htlcDust,
},
}
// Setup out pending remote HTLC set such that we will use the HTLC's
// CLTV from the outgoing HTLC set.
@ -2432,6 +2445,14 @@ func TestSweepAnchors(t *testing.T) {
htlcSmallExipry.HtlcIndex: htlcSmallExipry,
},
}
chanArb.unmergedSet[RemotePendingHtlcSet] = htlcSet{
incomingHTLCs: map[uint64]channeldb.HTLC{
htlcDust.HtlcIndex: htlcDust,
},
outgoingHTLCs: map[uint64]channeldb.HTLC{
htlcSmallExipry.HtlcIndex: htlcSmallExipry,
},
}
// Create AnchorResolutions.
anchors := &lnwallet.AnchorResolutions{
@ -2561,8 +2582,7 @@ func TestChannelArbitratorAnchors(t *testing.T) {
// preimage available.
Htlcs: []channeldb.HTLC{htlc, htlcWithPreimage},
}
err = chanArb.notifyContractUpdate(newUpdate)
require.NoError(t, err)
chanArb.notifyContractUpdate(newUpdate)
newUpdate = &ContractUpdate{
HtlcKey: RemoteHtlcSet,
@ -2571,8 +2591,7 @@ func TestChannelArbitratorAnchors(t *testing.T) {
// incoming HTLC (toRemoteHTLCs) has a lower CLTV.
Htlcs: []channeldb.HTLC{htlc, htlcWithPreimage},
}
err = chanArb.notifyContractUpdate(newUpdate)
require.NoError(t, err)
chanArb.notifyContractUpdate(newUpdate)
errChan := make(chan error, 1)
respChan := make(chan *wire.MsgTx, 1)