contractcourt: remove block subscription in channel arbitrator

This commit removes the block subscriptions used in `ChannelArbitrator`,
replaced them with the blockbeat managed by `BlockbeatDispatcher`.
This commit is contained in:
yyforyongyu 2024-10-29 21:48:58 +08:00
parent 045f8432b7
commit 71295534bb
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868
2 changed files with 68 additions and 33 deletions

View File

@ -357,11 +357,6 @@ type ChannelArbitrator struct {
// to do its duty.
cfg ChannelArbitratorConfig
// blocks is a channel that the arbitrator will receive new blocks on.
// This channel should be buffered by so that it does not block the
// sender.
blocks chan int32
// signalUpdates is a channel that any new live signals for the channel
// we're watching over will be sent.
signalUpdates chan *signalUpdateMsg
@ -411,7 +406,6 @@ func NewChannelArbitrator(cfg ChannelArbitratorConfig,
c := &ChannelArbitrator{
log: log,
blocks: make(chan int32, arbitratorBlockBufferSize),
signalUpdates: make(chan *signalUpdateMsg),
resolutionSignal: make(chan struct{}),
forceCloseReqs: make(chan *forceCloseReq),
@ -2769,31 +2763,21 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32,
// A new block has arrived, we'll examine all the active HTLC's
// to see if any of them have expired, and also update our
// track of the best current height.
case blockHeight, ok := <-c.blocks:
if !ok {
return
}
bestHeight = blockHeight
case beat := <-c.BlockbeatChan:
bestHeight = beat.Height()
// If we're not in the default state, then we can
// ignore this signal as we're waiting for contract
// resolution.
if c.state != StateDefault {
continue
}
log.Debugf("ChannelArbitrator(%v): new block height=%v",
c.cfg.ChanPoint, bestHeight)
// Now that a new block has arrived, we'll attempt to
// advance our state forward.
nextState, _, err := c.advanceState(
uint32(bestHeight), chainTrigger, nil,
)
err := c.handleBlockbeat(beat)
if err != nil {
log.Errorf("Unable to advance state: %v", err)
log.Errorf("Handle block=%v got err: %v",
bestHeight, err)
}
// If as a result of this trigger, the contract is
// fully resolved, then well exit.
if nextState == StateFullyResolved {
if c.state == StateFullyResolved {
return
}
@ -3144,6 +3128,27 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32,
}
}
// handleBlockbeat processes a newly received blockbeat by advancing the
// arbitrator's internal state using the received block height.
func (c *ChannelArbitrator) handleBlockbeat(beat chainio.Blockbeat) error {
// Notify we've processed the block.
defer c.NotifyBlockProcessed(beat, nil)
// Try to advance the state if we are in StateDefault.
if c.state == StateDefault {
// Now that a new block has arrived, we'll attempt to advance
// our state forward.
_, _, err := c.advanceState(
uint32(beat.Height()), chainTrigger, nil,
)
if err != nil {
return fmt.Errorf("unable to advance state: %w", err)
}
}
return nil
}
// Name returns a human-readable string for this subsystem.
//
// NOTE: Part of chainio.Consumer interface.

View File

@ -13,6 +13,7 @@ import (
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/chainio"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/clock"
@ -227,6 +228,15 @@ func (c *chanArbTestCtx) CleanUp() {
}
}
// receiveBlockbeat mocks the behavior of a blockbeat being sent by the
// BlockbeatDispatcher, which essentially mocks the method `ProcessBlock`.
func (c *chanArbTestCtx) receiveBlockbeat(height int) {
go func() {
beat := newBeatFromHeight(int32(height))
c.chanArb.BlockbeatChan <- beat
}()
}
// AssertStateTransitions asserts that the state machine steps through the
// passed states in order.
func (c *chanArbTestCtx) AssertStateTransitions(expectedStates ...ArbitratorState) {
@ -1037,7 +1047,7 @@ func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) {
}
require.Equal(t, expectedFinalHtlcs, chanArbCtx.finalHtlcs)
// We'll no re-create the resolver, notice that we use the existing
// We'll now re-create the resolver, notice that we use the existing
// arbLog so it carries over the same on-disk state.
chanArbCtxNew, err := chanArbCtx.Restart(nil)
require.NoError(t, err, "unable to create ChannelArbitrator")
@ -1084,7 +1094,12 @@ func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) {
}
// Send a notification that the expiry height has been reached.
//
// TODO(yy): remove the EpochChan and use the blockbeat below once
// resolvers are hooked with the blockbeat.
oldNotifier.EpochChan <- &chainntnfs.BlockEpoch{Height: 10}
// beat := chainio.NewBlockbeatFromHeight(10)
// chanArb.BlockbeatChan <- beat
// htlcOutgoingContestResolver is now transforming into a
// htlcTimeoutResolver and should send the contract off for incubation.
@ -1924,7 +1939,8 @@ func TestChannelArbitratorDanglingCommitForceClose(t *testing.T) {
// now mine a block (height 5), which is 5 blocks away
// (our grace delta) from the expiry of that HTLC.
case testCase.htlcExpired:
chanArbCtx.chanArb.blocks <- 5
beat := newBeatFromHeight(5)
chanArbCtx.chanArb.BlockbeatChan <- beat
// Otherwise, we'll just trigger a regular force close
// request.
@ -2036,8 +2052,7 @@ func TestChannelArbitratorDanglingCommitForceClose(t *testing.T) {
// so instead, we'll mine another block which'll cause
// it to re-examine its state and realize there're no
// more HTLCs.
chanArbCtx.chanArb.blocks <- 6
chanArbCtx.AssertStateTransitions(StateFullyResolved)
chanArbCtx.receiveBlockbeat(6)
})
}
}
@ -2108,13 +2123,15 @@ func TestChannelArbitratorPendingExpiredHTLC(t *testing.T) {
// We will advance the uptime to 10 seconds which should be still within
// the grace period and should not trigger going to chain.
testClock.SetTime(startTime.Add(time.Second * 10))
chanArbCtx.chanArb.blocks <- 5
beat := newBeatFromHeight(5)
chanArbCtx.chanArb.BlockbeatChan <- beat
chanArbCtx.AssertState(StateDefault)
// We will advance the uptime to 16 seconds which should trigger going
// to chain.
testClock.SetTime(startTime.Add(time.Second * 16))
chanArbCtx.chanArb.blocks <- 6
beat = newBeatFromHeight(6)
chanArbCtx.chanArb.BlockbeatChan <- beat
chanArbCtx.AssertStateTransitions(
StateBroadcastCommit,
StateCommitmentBroadcasted,
@ -2482,7 +2499,7 @@ func TestSweepAnchors(t *testing.T) {
// Set current block height.
heightHint := uint32(1000)
chanArbCtx.chanArb.blocks <- int32(heightHint)
chanArbCtx.receiveBlockbeat(int(heightHint))
htlcIndexBase := uint64(99)
deadlineDelta := uint32(10)
@ -2645,7 +2662,7 @@ func TestSweepLocalAnchor(t *testing.T) {
// Set current block height.
heightHint := uint32(1000)
chanArbCtx.chanArb.blocks <- int32(heightHint)
chanArbCtx.receiveBlockbeat(int(heightHint))
htlcIndex := uint64(99)
deadlineDelta := uint32(10)
@ -2793,7 +2810,8 @@ func TestChannelArbitratorAnchors(t *testing.T) {
// Set current block height.
heightHint := uint32(1000)
chanArbCtx.chanArb.blocks <- int32(heightHint)
beat := newBeatFromHeight(int32(heightHint))
chanArbCtx.chanArb.BlockbeatChan <- beat
htlcAmt := lnwire.MilliSatoshi(1_000_000)
@ -2960,10 +2978,14 @@ func TestChannelArbitratorAnchors(t *testing.T) {
// to htlcWithPreimage's CLTV.
require.Equal(t, 2, len(chanArbCtx.sweeper.deadlines))
require.EqualValues(t,
heightHint+deadlinePreimageDelta/2,
chanArbCtx.sweeper.deadlines[0], "want %d, got %d",
heightHint+deadlinePreimageDelta/2,
chanArbCtx.sweeper.deadlines[0],
)
require.EqualValues(t,
heightHint+deadlinePreimageDelta/2,
chanArbCtx.sweeper.deadlines[1], "want %d, got %d",
heightHint+deadlinePreimageDelta/2,
chanArbCtx.sweeper.deadlines[1],
)
@ -3146,3 +3168,11 @@ func (m *mockChannel) ForceCloseChan() (*wire.MsgTx, error) {
return &wire.MsgTx{}, nil
}
func newBeatFromHeight(height int32) *chainio.Beat {
epoch := chainntnfs.BlockEpoch{
Height: height,
}
return chainio.NewBeat(epoch)
}