From 8607e8c28aad22049db398b27d8fa9c8c2dd45cc Mon Sep 17 00:00:00 2001 From: eugene Date: Tue, 4 Jan 2022 16:21:36 -0500 Subject: [PATCH] multi: reliable hand-off from htlcswitch to contractcourt This is achieved by changing the 1-way handoff to a 2-way handoff with a done channel. --- contractcourt/chain_arbitrator.go | 30 +++++++---- contractcourt/channel_arbitrator.go | 63 ++++++++++++++++++---- contractcourt/channel_arbitrator_test.go | 33 ++++++------ htlcswitch/link.go | 68 +++++++++++++++--------- htlcswitch/link_test.go | 54 ++++++++++++++----- htlcswitch/test_utils.go | 15 +++++- peer/brontide.go | 5 ++ 7 files changed, 193 insertions(+), 75 deletions(-) diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index ba007ea6b..48a1b18be 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -964,16 +964,9 @@ type ContractUpdate struct { Htlcs []channeldb.HTLC } -// ContractSignals wraps the two signals that affect the state of a channel -// being watched by an arbitrator. The two signals we care about are: the -// channel has a new set of HTLC's, and the remote party has just broadcast -// their version of the commitment transaction. +// ContractSignals is used by outside subsystems to notify a channel arbitrator +// of its ShortChannelID. type ContractSignals struct { - // HtlcUpdates is a channel that the link will use to update the - // designated channel arbitrator when the set of HTLCs on any valid - // commitment changes. - HtlcUpdates chan *ContractUpdate - // ShortChanID is the up to date short channel ID for a contract. This // can change either if when the contract was added it didn't yet have // a stable identifier, or in the case of a reorg. @@ -1001,6 +994,25 @@ func (c *ChainArbitrator) UpdateContractSignals(chanPoint wire.OutPoint, return nil } +// NotifyContractUpdate lets a channel arbitrator know that a new +// ContractUpdate is available. This calls the ChannelArbitrator's internal +// method NotifyContractUpdate which waits for a response on a done chan before +// returning. This method will return an error if the ChannelArbitrator is not +// in the activeChannels map. However, this only happens if the arbitrator is +// resolved and the related link would already be shut down. +func (c *ChainArbitrator) NotifyContractUpdate(chanPoint wire.OutPoint, + update *ContractUpdate) error { + + c.Lock() + arbitrator, ok := c.activeChannels[chanPoint] + c.Unlock() + if !ok { + return fmt.Errorf("can't find arbitrator for %v", chanPoint) + } + + return arbitrator.notifyContractUpdate(update) +} + // GetChannelArbitrator safely returns the channel arbitrator for a given // channel outpoint. func (c *ChainArbitrator) GetChannelArbitrator(chanPoint wire.OutPoint) ( diff --git a/contractcourt/channel_arbitrator.go b/contractcourt/channel_arbitrator.go index a67c5e1e1..1d2c0c859 100644 --- a/contractcourt/channel_arbitrator.go +++ b/contractcourt/channel_arbitrator.go @@ -28,6 +28,12 @@ var ( // close a channel that's already in the process of doing so. errAlreadyForceClosed = errors.New("channel is already in the " + "process of being force closed") + + // errChanArbShuttingDown is an error returned when the channel arb is + // shutting down during the hand-off in notifyContractUpdate. This is + // mainly used to be able to notify the original caller (the link) that + // an error occurred. + errChanArbShuttingDown = errors.New("channel arb shutting down") ) const ( @@ -342,7 +348,7 @@ type ChannelArbitrator struct { // htlcUpdates is a channel that is sent upon with new updates from the // active channel. Each time a new commitment state is accepted, the // set of HTLC's on the new state should be sent across this channel. - htlcUpdates <-chan *ContractUpdate + htlcUpdates chan *contractUpdateSignal // activeResolvers is a slice of any active resolvers. This is used to // be able to signal them for shutdown in the case that we shutdown. @@ -378,7 +384,7 @@ func NewChannelArbitrator(cfg ChannelArbitratorConfig, log: log, blocks: make(chan int32, arbitratorBlockBufferSize), signalUpdates: make(chan *signalUpdateMsg), - htlcUpdates: make(<-chan *ContractUpdate), + htlcUpdates: make(chan *contractUpdateSignal), resolutionSignal: make(chan struct{}), forceCloseReqs: make(chan *forceCloseReq), activeHTLCs: htlcSets, @@ -2387,6 +2393,41 @@ func (c *ChannelArbitrator) UpdateContractSignals(newSignals *ContractSignals) { } } +// contractUpdateSignal is a struct that carries the latest set of +// ContractUpdate for a particular key. It also carries a done chan that should +// be closed by the recipient. +type contractUpdateSignal struct { + // newUpdate contains the latest ContractUpdate for a key. + newUpdate *ContractUpdate + + // doneChan is an acknowledgement channel. + doneChan chan struct{} +} + +// notifyContractUpdate notifies the ChannelArbitrator that a new +// ContractUpdate is available from the link. The link will be paused until +// this function returns. +func (c *ChannelArbitrator) notifyContractUpdate(upd *ContractUpdate) error { + done := make(chan struct{}) + + select { + case c.htlcUpdates <- &contractUpdateSignal{ + newUpdate: upd, + doneChan: done, + }: + case <-c.quit: + return errChanArbShuttingDown + } + + select { + case <-done: + case <-c.quit: + return errChanArbShuttingDown + } + + return nil +} + // channelAttendant is the primary goroutine that acts at the judicial // arbitrator between our channel state, the remote channel peer, and the // blockchain (Our judge). This goroutine will ensure that we faithfully execute @@ -2448,13 +2489,12 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) { log.Tracef("ChannelArbitrator(%v) got new signal "+ "update!", c.cfg.ChanPoint) - // First, we'll update our set of signals. - c.htlcUpdates = signalUpdate.newSignals.HtlcUpdates + // We'll update the ShortChannelID. c.cfg.ShortChanID = signalUpdate.newSignals.ShortChanID - // Now that the signals have been updated, we'll now + // Now that the signal has been updated, we'll now // close the done channel to signal to the caller we've - // registered the new contracts. + // registered the new ShortChannelID. close(signalUpdate.doneChan) // A new set of HTLC's has been added or removed from the @@ -2465,14 +2505,19 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) { // htlcSetKey type included in this update in order to // only monitor the HTLCs that are still active on this // target commitment. - c.activeHTLCs[htlcUpdate.HtlcKey] = newHtlcSet( - htlcUpdate.Htlcs, + htlcKey := htlcUpdate.newUpdate.HtlcKey + c.activeHTLCs[htlcKey] = newHtlcSet( + htlcUpdate.newUpdate.Htlcs, ) + // Now that the activeHTLCs have been updated, we'll + // close the done channel. + close(htlcUpdate.doneChan) + log.Tracef("ChannelArbitrator(%v): fresh set of htlcs=%v", c.cfg.ChanPoint, newLogClosure(func() string { - return spew.Sdump(htlcUpdate) + return spew.Sdump(htlcUpdate.newUpdate) }), ) diff --git a/contractcourt/channel_arbitrator_test.go b/contractcourt/channel_arbitrator_test.go index bd9461739..03141ee91 100644 --- a/contractcourt/channel_arbitrator_test.go +++ b/contractcourt/channel_arbitrator_test.go @@ -829,11 +829,7 @@ func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) { } defer chanArb.Stop() - // Create htlcUpdates channel. - htlcUpdates := make(chan *ContractUpdate) - signals := &ContractSignals{ - HtlcUpdates: htlcUpdates, ShortChanID: lnwire.ShortChannelID{}, } chanArb.UpdateContractSignals(signals) @@ -864,10 +860,12 @@ func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) { htlc, outgoingDustHtlc, incomingDustHtlc, } - htlcUpdates <- &ContractUpdate{ + newUpdate := &ContractUpdate{ HtlcKey: LocalHtlcSet, Htlcs: htlcSet, } + err = chanArb.notifyContractUpdate(newUpdate) + require.NoError(t, err) errChan := make(chan error, 1) respChan := make(chan *wire.MsgTx, 1) @@ -1849,9 +1847,7 @@ func TestChannelArbitratorDanglingCommitForceClose(t *testing.T) { // Now that our channel arb has started, we'll set up // its contract signals channel so we can send it // various HTLC updates for this test. - htlcUpdates := make(chan *ContractUpdate) signals := &ContractSignals{ - HtlcUpdates: htlcUpdates, ShortChanID: lnwire.ShortChannelID{}, } chanArb.UpdateContractSignals(signals) @@ -1872,10 +1868,12 @@ func TestChannelArbitratorDanglingCommitForceClose(t *testing.T) { HtlcIndex: htlcIndex, RefundTimeout: htlcExpiry, } - htlcUpdates <- &ContractUpdate{ + newUpdate := &ContractUpdate{ HtlcKey: htlcKey, Htlcs: []channeldb.HTLC{danglingHTLC}, } + err = chanArb.notifyContractUpdate(newUpdate) + require.NoError(t, err) // At this point, we now have a split commitment state // from the PoV of the channel arb. There's now an HTLC @@ -2044,9 +2042,7 @@ func TestChannelArbitratorPendingExpiredHTLC(t *testing.T) { // Now that our channel arb has started, we'll set up // its contract signals channel so we can send it // various HTLC updates for this test. - htlcUpdates := make(chan *ContractUpdate) signals := &ContractSignals{ - HtlcUpdates: htlcUpdates, ShortChanID: lnwire.ShortChannelID{}, } chanArb.UpdateContractSignals(signals) @@ -2061,10 +2057,12 @@ func TestChannelArbitratorPendingExpiredHTLC(t *testing.T) { HtlcIndex: htlcIndex, RefundTimeout: htlcExpiry, } - htlcUpdates <- &ContractUpdate{ + newUpdate := &ContractUpdate{ HtlcKey: RemoteHtlcSet, Htlcs: []channeldb.HTLC{pendingHTLC}, } + err = chanArb.notifyContractUpdate(newUpdate) + require.NoError(t, err) // We will advance the uptime to 10 seconds which should be still within // the grace period and should not trigger going to chain. @@ -2531,11 +2529,7 @@ func TestChannelArbitratorAnchors(t *testing.T) { } }() - // Create htlcUpdates channel. - htlcUpdates := make(chan *ContractUpdate) - signals := &ContractSignals{ - HtlcUpdates: htlcUpdates, ShortChanID: lnwire.ShortChannelID{}, } chanArb.UpdateContractSignals(signals) @@ -2559,7 +2553,7 @@ func TestChannelArbitratorAnchors(t *testing.T) { // We now send two HTLC updates, one for local HTLC set and the other // for remote HTLC set. - htlcUpdates <- &ContractUpdate{ + newUpdate := &ContractUpdate{ HtlcKey: LocalHtlcSet, // This will make the deadline of the local anchor resolution // to be htlcWithPreimage's CLTV minus heightHint since the @@ -2567,13 +2561,18 @@ func TestChannelArbitratorAnchors(t *testing.T) { // preimage available. Htlcs: []channeldb.HTLC{htlc, htlcWithPreimage}, } - htlcUpdates <- &ContractUpdate{ + err = chanArb.notifyContractUpdate(newUpdate) + require.NoError(t, err) + + newUpdate = &ContractUpdate{ HtlcKey: RemoteHtlcSet, // This will make the deadline of the remote anchor resolution // to be htlcWithPreimage's CLTV minus heightHint because the // incoming HTLC (toRemoteHTLCs) has a lower CLTV. Htlcs: []channeldb.HTLC{htlc, htlcWithPreimage}, } + err = chanArb.notifyContractUpdate(newUpdate) + require.NoError(t, err) errChan := make(chan error, 1) respChan := make(chan *wire.MsgTx, 1) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 38e1bbd76..f4714803d 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -187,11 +187,14 @@ type ChannelLinkConfig struct { LinkFailureError) // UpdateContractSignals is a function closure that we'll use to update - // outside sub-systems with the latest signals for our inner Lightning - // channel. These signals will notify the caller when the channel has - // been closed, or when the set of active HTLC's is updated. + // outside sub-systems with this channel's latest ShortChannelID. UpdateContractSignals func(*contractcourt.ContractSignals) error + // NotifyContractUpdate is a function closure that we'll use to update + // the contractcourt and more specifically the ChannelArbitrator of the + // latest channel state. + NotifyContractUpdate func(*contractcourt.ContractUpdate) error + // ChainEvents is an active subscription to the chain watcher for this // channel to be notified of any on-chain activity related to this // channel. @@ -372,10 +375,6 @@ type channelLink struct { // sent across. localUpdateAdd chan *localUpdateAddMsg - // htlcUpdates is a channel that we'll use to update outside - // sub-systems with the latest set of active HTLC's on our channel. - htlcUpdates chan *contractcourt.ContractUpdate - // shutdownRequest is a channel that the channelLink will listen on to // service shutdown requests from ShutdownIfChannelClean calls. shutdownRequest chan *shutdownReq @@ -421,11 +420,9 @@ func NewChannelLink(cfg ChannelLinkConfig, logPrefix := fmt.Sprintf("ChannelLink(%v):", channel.ChannelPoint()) return &channelLink{ - cfg: cfg, - channel: channel, - shortChanID: channel.ShortChanID(), - // TODO(roasbeef): just do reserve here? - htlcUpdates: make(chan *contractcourt.ContractUpdate), + cfg: cfg, + channel: channel, + shortChanID: channel.ShortChanID(), shutdownRequest: make(chan *shutdownReq), hodlMap: make(map[channeldb.CircuitKey]hodlHtlc), hodlQueue: queue.NewConcurrentQueue(10), @@ -496,7 +493,6 @@ func (l *channelLink) Start() error { // TODO(roasbeef): split goroutines within channel arb to avoid go func() { signals := &contractcourt.ContractSignals{ - HtlcUpdates: l.htlcUpdates, ShortChanID: l.channel.ShortChanID(), } @@ -1837,15 +1833,23 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { l.cfg.Peer.SendMessage(false, nextRevocation) // Since we just revoked our commitment, we may have a new set - // of HTLC's on our commitment, so we'll send them over our - // HTLC update channel so any callers can be notified. - select { - case l.htlcUpdates <- &contractcourt.ContractUpdate{ + // of HTLC's on our commitment, so we'll send them using our + // function closure NotifyContractUpdate. + newUpdate := &contractcourt.ContractUpdate{ HtlcKey: contractcourt.LocalHtlcSet, Htlcs: currentHtlcs, - }: + } + err = l.cfg.NotifyContractUpdate(newUpdate) + if err != nil { + l.log.Errorf("unable to notify contract update: %v", + err) + return + } + + select { case <-l.quit: return + default: } // If both commitment chains are fully synced from our PoV, @@ -1879,13 +1883,21 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // The remote party now has a new primary commitment, so we'll // update the contract court to be aware of this new set (the // prior old remote pending). - select { - case l.htlcUpdates <- &contractcourt.ContractUpdate{ + newUpdate := &contractcourt.ContractUpdate{ HtlcKey: contractcourt.RemoteHtlcSet, Htlcs: remoteHTLCs, - }: + } + err = l.cfg.NotifyContractUpdate(newUpdate) + if err != nil { + l.log.Errorf("unable to notify contract update: %v", + err) + return + } + + select { case <-l.quit: return + default: } // If we have a tower client for this channel type, we'll @@ -2093,13 +2105,20 @@ func (l *channelLink) updateCommitTx() error { // The remote party now has a new pending commitment, so we'll update // the contract court to be aware of this new set (the prior old remote // pending). - select { - case l.htlcUpdates <- &contractcourt.ContractUpdate{ + newUpdate := &contractcourt.ContractUpdate{ HtlcKey: contractcourt.RemotePendingHtlcSet, Htlcs: pendingHTLCs, - }: + } + err = l.cfg.NotifyContractUpdate(newUpdate) + if err != nil { + l.log.Errorf("unable to notify contract update: %v", err) + return err + } + + select { case <-l.quit: return ErrLinkShuttingDown + default: } commitSig := &lnwire.CommitSig{ @@ -2167,7 +2186,6 @@ func (l *channelLink) UpdateShortChanID() (lnwire.ShortChannelID, error) { go func() { err := l.cfg.UpdateContractSignals(&contractcourt.ContractSignals{ - HtlcUpdates: l.htlcUpdates, ShortChanID: sid, }) if err != nil { diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 8def75aad..39d215005 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -1944,6 +1944,17 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( return nil, nil, nil, nil, nil, nil, err } + notifyUpdateChan := make(chan *contractcourt.ContractUpdate) + doneChan := make(chan struct{}) + notifyContractUpdate := func(u *contractcourt.ContractUpdate) error { + select { + case notifyUpdateChan <- u: + case <-doneChan: + } + + return nil + } + // Instantiate with a long interval, so that we can precisely control // the firing via force feeding. bticker := ticker.NewForce(time.Hour) @@ -1967,12 +1978,13 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( UpdateContractSignals: func(*contractcourt.ContractSignals) error { return nil }, - Registry: invoiceRegistry, - FeeEstimator: newMockFeeEstimator(), - ChainEvents: &contractcourt.ChainEventSubscription{}, - BatchTicker: bticker, - FwdPkgGCTicker: ticker.NewForce(15 * time.Second), - PendingCommitTicker: ticker.New(time.Minute), + NotifyContractUpdate: notifyContractUpdate, + Registry: invoiceRegistry, + FeeEstimator: newMockFeeEstimator(), + ChainEvents: &contractcourt.ChainEventSubscription{}, + BatchTicker: bticker, + FwdPkgGCTicker: ticker.NewForce(15 * time.Second), + PendingCommitTicker: ticker.New(time.Minute), // Make the BatchSize and Min/MaxFeeUpdateTimeout large enough // to not trigger commit updates automatically during tests. BatchSize: 10000, @@ -1993,8 +2005,9 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( go func() { for { select { - case <-aliceLink.(*channelLink).htlcUpdates: + case <-notifyUpdateChan: case <-aliceLink.(*channelLink).quit: + close(doneChan) return } } @@ -4482,6 +4495,17 @@ func (h *persistentLinkHarness) restartLink( } } + notifyUpdateChan := make(chan *contractcourt.ContractUpdate) + doneChan := make(chan struct{}) + notifyContractUpdate := func(u *contractcourt.ContractUpdate) error { + select { + case notifyUpdateChan <- u: + case <-doneChan: + } + + return nil + } + // Instantiate with a long interval, so that we can precisely control // the firing via force feeding. bticker := ticker.NewForce(time.Hour) @@ -4505,12 +4529,13 @@ func (h *persistentLinkHarness) restartLink( UpdateContractSignals: func(*contractcourt.ContractSignals) error { return nil }, - Registry: h.coreLink.cfg.Registry, - FeeEstimator: newMockFeeEstimator(), - ChainEvents: &contractcourt.ChainEventSubscription{}, - BatchTicker: bticker, - FwdPkgGCTicker: ticker.New(5 * time.Second), - PendingCommitTicker: ticker.New(time.Minute), + NotifyContractUpdate: notifyContractUpdate, + Registry: h.coreLink.cfg.Registry, + FeeEstimator: newMockFeeEstimator(), + ChainEvents: &contractcourt.ChainEventSubscription{}, + BatchTicker: bticker, + FwdPkgGCTicker: ticker.New(5 * time.Second), + PendingCommitTicker: ticker.New(time.Minute), // Make the BatchSize and Min/MaxFeeUpdateTimeout large enough // to not trigger commit updates automatically during tests. BatchSize: 10000, @@ -4534,8 +4559,9 @@ func (h *persistentLinkHarness) restartLink( go func() { for { select { - case <-aliceLink.(*channelLink).htlcUpdates: + case <-notifyUpdateChan: case <-aliceLink.(*channelLink).quit: + close(doneChan) return } } diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index 3fa57f601..94c301be6 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -1122,6 +1122,17 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer, maxFeeUpdateTimeout = 40 * time.Minute ) + notifyUpdateChan := make(chan *contractcourt.ContractUpdate) + doneChan := make(chan struct{}) + notifyContractUpdate := func(u *contractcourt.ContractUpdate) error { + select { + case notifyUpdateChan <- u: + case <-doneChan: + } + + return nil + } + link := NewChannelLink( ChannelLinkConfig{ Switch: server.htlcSwitch, @@ -1142,6 +1153,7 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer, UpdateContractSignals: func(*contractcourt.ContractSignals) error { return nil }, + NotifyContractUpdate: notifyContractUpdate, ChainEvents: &contractcourt.ChainEventSubscription{}, SyncStates: true, BatchSize: 10, @@ -1169,8 +1181,9 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer, go func() { for { select { - case <-link.(*channelLink).htlcUpdates: + case <-notifyUpdateChan: case <-link.(*channelLink).quit: + close(doneChan) return } } diff --git a/peer/brontide.go b/peer/brontide.go index 576143824..b5571e596 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -815,6 +815,10 @@ func (p *Brontide) addLink(chanPoint *wire.OutPoint, return p.cfg.ChainArb.UpdateContractSignals(*chanPoint, signals) } + notifyContractUpdate := func(update *contractcourt.ContractUpdate) error { + return p.cfg.ChainArb.NotifyContractUpdate(*chanPoint, update) + } + chanType := lnChan.State().ChanType // Select the appropriate tower client based on the channel type. It's @@ -842,6 +846,7 @@ func (p *Brontide) addLink(chanPoint *wire.OutPoint, PreimageCache: p.cfg.WitnessBeacon, ChainEvents: chainEvents, UpdateContractSignals: updateContractSignals, + NotifyContractUpdate: notifyContractUpdate, OnChannelFailure: onChannelFailure, SyncStates: syncStates, BatchTicker: ticker.New(p.cfg.ChannelCommitInterval),