diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index 79c722cc3..8be4fc0e9 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -312,18 +312,8 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel, log.Tracef("Creating ChannelArbitrator for ChannelPoint(%v)", channel.FundingOutpoint) - // We'll start by registering for a block epoch notifications so this - // channel can keep track of the current state of the main chain. - // // TODO(roasbeef): fetch best height (or pass in) so can ensure block // epoch delivers all the notifications to - // - // TODO(roasbeef): instead 1 block epoch that multi-plexes to the rest? - // * reduces the number of goroutines - blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn(nil) - if err != nil { - return nil, err - } chanPoint := channel.FundingOutpoint @@ -333,7 +323,6 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel, ChanPoint: chanPoint, Channel: c.getArbChannel(channel), ShortChanID: channel.ShortChanID(), - BlockEpochs: blockEpoch, MarkCommitmentBroadcasted: channel.MarkCommitmentBroadcasted, MarkChannelClosed: func(summary *channeldb.ChannelCloseSummary, @@ -369,7 +358,6 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel, c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint, ) if err != nil { - blockEpoch.Cancel() return nil, err } @@ -385,7 +373,6 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel, pendingRemoteCommitment, err := channel.RemoteCommitChainTip() if err != nil && err != channeldb.ErrNoPendingCommit { - blockEpoch.Cancel() return nil, err } if pendingRemoteCommitment != nil { @@ -545,18 +532,12 @@ func (c *ChainArbitrator) Start() error { // the chain any longer, only resolve the contracts on the confirmed // commitment. for _, closeChanInfo := range closingChannels { - blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn(nil) - if err != nil { - return err - } - // We can leave off the CloseContract and ForceCloseChan // methods as the channel is already closed at this point. chanPoint := closeChanInfo.ChanPoint arbCfg := ChannelArbitratorConfig{ ChanPoint: chanPoint, ShortChanID: closeChanInfo.ShortChanID, - BlockEpochs: blockEpoch, ChainArbitratorConfig: c.cfg, ChainEvents: &ChainEventSubscription{}, IsPendingClose: true, @@ -574,7 +555,6 @@ func (c *ChainArbitrator) Start() error { c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint, ) if err != nil { - blockEpoch.Cancel() return err } arbCfg.MarkChannelResolved = func() error { @@ -627,8 +607,8 @@ func (c *ChainArbitrator) Start() error { } } - // Finally, we'll launch all the goroutines for each arbitrator so they - // can carry out their duties. + // Launch all the goroutines for each arbitrator so they can carry out + // their duties. for _, arbitrator := range c.activeChannels { if err := arbitrator.Start(); err != nil { c.Stop() @@ -636,11 +616,121 @@ func (c *ChainArbitrator) Start() error { } } + // Subscribe to a single stream of block epoch notifications that we + // will dispatch to all active arbitrators. + blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn(nil) + if err != nil { + return err + } + + // Start our goroutine which will dispatch blocks to each arbitrator. + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.dispatchBlocks(blockEpoch) + }() + // TODO(roasbeef): eventually move all breach watching here return nil } +// blockRecipient contains the information we need to dispatch a block to a +// channel arbitrator. +type blockRecipient struct { + // chanPoint is the funding outpoint of the channel. + chanPoint wire.OutPoint + + // blocks is the channel that new block heights are sent into. This + // channel should be sufficiently buffered as to not block the sender. + blocks chan<- int32 + + // quit is closed if the receiving entity is shutting down. + quit chan struct{} +} + +// dispatchBlocks consumes a block epoch notification stream and dispatches +// blocks to each of the chain arb's active channel arbitrators. This function +// must be run in a goroutine. +func (c *ChainArbitrator) dispatchBlocks( + blockEpoch *chainntnfs.BlockEpochEvent) { + + // getRecipients is a helper function which acquires the chain arb + // lock and returns a set of block recipients which can be used to + // dispatch blocks. + getRecipients := func() []blockRecipient { + c.Lock() + blocks := make([]blockRecipient, 0, len(c.activeChannels)) + for _, channel := range c.activeChannels { + blocks = append(blocks, blockRecipient{ + chanPoint: channel.cfg.ChanPoint, + blocks: channel.blocks, + quit: channel.quit, + }) + } + c.Unlock() + + return blocks + } + + // On exit, cancel our blocks subscription and close each block channel + // so that the arbitrators know they will no longer be receiving blocks. + defer func() { + blockEpoch.Cancel() + + recipients := getRecipients() + for _, recipient := range recipients { + close(recipient.blocks) + } + }() + + // Consume block epochs until we receive the instruction to shutdown. + for { + select { + // Consume block epochs, exiting if our subscription is + // terminated. + case block, ok := <-blockEpoch.Epochs: + if !ok { + log.Trace("dispatchBlocks block epoch " + + "cancelled") + return + } + + // Get the set of currently active channels block + // subscription channels and dispatch the block to + // each. + for _, recipient := range getRecipients() { + select { + // Deliver the block to the arbitrator. + case recipient.blocks <- block.Height: + + // If the recipient is shutting down, exit + // without delivering the block. This may be + // the case when two blocks are mined in quick + // succession, and the arbitrator resolves + // after the first block, and does not need to + // consume the second block. + case <-recipient.quit: + log.Debugf("channel: %v exit without "+ + "receiving block: %v", + recipient.chanPoint, + block.Height) + + // If the chain arb is shutting down, we don't + // need to deliver any more blocks (everything + // will be shutting down). + case <-c.quit: + return + } + } + + // Exit if the chain arbitrator is shutting down. + case <-c.quit: + return + } + } +} + // publishClosingTxs will load any stored cooperative or unilater closing // transactions and republish them. This helps ensure propagation of the // transactions in the event that prior publications failed. diff --git a/contractcourt/channel_arbitrator.go b/contractcourt/channel_arbitrator.go index 2d50ca1de..86ddd87d7 100644 --- a/contractcourt/channel_arbitrator.go +++ b/contractcourt/channel_arbitrator.go @@ -12,7 +12,6 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/davecgh/go-spew/spew" - "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/input" @@ -34,6 +33,10 @@ const ( // anchorSweepConfTarget is the conf target used when sweeping // commitment anchors. anchorSweepConfTarget = 6 + + // arbitratorBlockBufferSize is the size of the buffer we give to each + // channel arbitrator. + arbitratorBlockBufferSize = 20 ) // WitnessSubscription represents an intent to be notified once new witnesses @@ -108,12 +111,6 @@ type ChannelArbitratorConfig struct { // to the switch during contract resolution. ShortChanID lnwire.ShortChannelID - // BlockEpochs is an active block epoch event stream backed by an - // active ChainNotifier instance. We will use new block notifications - // sent over this channel to decide when we should go on chain to - // reclaim/redeem the funds in an HTLC sent to/from us. - BlockEpochs *chainntnfs.BlockEpochEvent - // ChainEvents is an active subscription to the chain watcher for this // channel to be notified of any on-chain activity related to this // channel. @@ -325,6 +322,11 @@ type ChannelArbitrator struct { // to do its duty. cfg ChannelArbitratorConfig + // blocks is a channel that the arbitrator will receive new blocks on. + // This channel should be buffered by so that it does not block the + // sender. + blocks chan int32 + // signalUpdates is a channel that any new live signals for the channel // we're watching over will be sent. signalUpdates chan *signalUpdateMsg @@ -366,6 +368,7 @@ func NewChannelArbitrator(cfg ChannelArbitratorConfig, return &ChannelArbitrator{ log: log, + blocks: make(chan int32, arbitratorBlockBufferSize), signalUpdates: make(chan *signalUpdateMsg), htlcUpdates: make(<-chan *ContractUpdate), resolutionSignal: make(chan struct{}), @@ -397,13 +400,11 @@ func (c *ChannelArbitrator) Start() error { // machine can act accordingly. c.state, err = c.log.CurrentState() if err != nil { - c.cfg.BlockEpochs.Cancel() return err } _, bestHeight, err := c.cfg.ChainIO.GetBestBlock() if err != nil { - c.cfg.BlockEpochs.Cancel() return err } @@ -479,7 +480,6 @@ func (c *ChannelArbitrator) Start() error { c.cfg.ChanPoint) default: - c.cfg.BlockEpochs.Cancel() return err } } @@ -501,7 +501,6 @@ func (c *ChannelArbitrator) Start() error { // commitment has been confirmed on chain, and before we // advance our state step, we call InsertConfirmedCommitSet. if err := c.relaunchResolvers(commitSet, triggerHeight); err != nil { - c.cfg.BlockEpochs.Cancel() return err } } @@ -2111,7 +2110,6 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) { // TODO(roasbeef): tell top chain arb we're done defer func() { - c.cfg.BlockEpochs.Cancel() c.wg.Done() }() @@ -2121,11 +2119,11 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) { // A new block has arrived, we'll examine all the active HTLC's // to see if any of them have expired, and also update our // track of the best current height. - case blockEpoch, ok := <-c.cfg.BlockEpochs.Epochs: + case blockHeight, ok := <-c.blocks: if !ok { return } - bestHeight = blockEpoch.Height + bestHeight = blockHeight // If we're not in the default state, then we can // ignore this signal as we're waiting for contract diff --git a/contractcourt/channel_arbitrator_test.go b/contractcourt/channel_arbitrator_test.go index d3c85f263..38970b6be 100644 --- a/contractcourt/channel_arbitrator_test.go +++ b/contractcourt/channel_arbitrator_test.go @@ -197,8 +197,6 @@ type chanArbTestCtx struct { resolvedChan chan struct{} - blockEpochs chan *chainntnfs.BlockEpoch - incubationRequests chan struct{} resolutions chan []ResolutionMsg @@ -304,12 +302,6 @@ func withMarkClosed(markClosed func(*channeldb.ChannelCloseSummary, func createTestChannelArbitrator(t *testing.T, log ArbitratorLog, opts ...testChanArbOption) (*chanArbTestCtx, error) { - blockEpochs := make(chan *chainntnfs.BlockEpoch) - blockEpoch := &chainntnfs.BlockEpochEvent{ - Epochs: blockEpochs, - Cancel: func() {}, - } - chanPoint := wire.OutPoint{} shortChanID := lnwire.ShortChannelID{} chanEvents := &ChainEventSubscription{ @@ -366,7 +358,6 @@ func createTestChannelArbitrator(t *testing.T, log ArbitratorLog, arbCfg := &ChannelArbitratorConfig{ ChanPoint: chanPoint, ShortChanID: shortChanID, - BlockEpochs: blockEpoch, MarkChannelResolved: func() error { resolvedChan <- struct{}{} return nil @@ -433,7 +424,6 @@ func createTestChannelArbitrator(t *testing.T, log ArbitratorLog, cleanUp: cleanUp, resolvedChan: resolvedChan, resolutions: resolutionChan, - blockEpochs: blockEpochs, log: log, incubationRequests: incubateChan, sweeper: mockSweeper, @@ -1759,7 +1749,7 @@ func TestChannelArbitratorDanglingCommitForceClose(t *testing.T) { // now mine a block (height 5), which is 5 blocks away // (our grace delta) from the expiry of that HTLC. case testCase.htlcExpired: - chanArbCtx.blockEpochs <- &chainntnfs.BlockEpoch{Height: 5} + chanArbCtx.chanArb.blocks <- 5 // Otherwise, we'll just trigger a regular force close // request. @@ -1863,7 +1853,7 @@ func TestChannelArbitratorDanglingCommitForceClose(t *testing.T) { // so instead, we'll mine another block which'll cause // it to re-examine its state and realize there're no // more HTLCs. - chanArbCtx.blockEpochs <- &chainntnfs.BlockEpoch{Height: 6} + chanArbCtx.chanArb.blocks <- 6 chanArbCtx.AssertStateTransitions(StateFullyResolved) }) } @@ -1940,13 +1930,13 @@ func TestChannelArbitratorPendingExpiredHTLC(t *testing.T) { // We will advance the uptime to 10 seconds which should be still within // the grace period and should not trigger going to chain. testClock.SetTime(startTime.Add(time.Second * 10)) - chanArbCtx.blockEpochs <- &chainntnfs.BlockEpoch{Height: 5} + chanArbCtx.chanArb.blocks <- 5 chanArbCtx.AssertState(StateDefault) // We will advance the uptime to 16 seconds which should trigger going // to chain. testClock.SetTime(startTime.Add(time.Second * 16)) - chanArbCtx.blockEpochs <- &chainntnfs.BlockEpoch{Height: 6} + chanArbCtx.chanArb.blocks <- 6 chanArbCtx.AssertStateTransitions( StateBroadcastCommit, StateCommitmentBroadcasted,