mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-09-12 22:59:38 +02:00
contractcourt: handle blockbeat in chainWatcher
This commit is contained in:
@@ -536,7 +536,7 @@ func newChainSet(chanState *channeldb.OpenChannel) (*chainSet, error) {
|
||||
localCommit, remoteCommit, err := chanState.LatestCommitments()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to fetch channel state for "+
|
||||
"chan_point=%v", chanState.FundingOutpoint)
|
||||
"chan_point=%v: %v", chanState.FundingOutpoint, err)
|
||||
}
|
||||
|
||||
log.Tracef("ChannelPoint(%v): local_commit_type=%v, local_commit=%v",
|
||||
@@ -610,57 +610,43 @@ func (c *chainWatcher) closeObserver() {
|
||||
log.Infof("Close observer for ChannelPoint(%v) active",
|
||||
c.cfg.chanState.FundingOutpoint)
|
||||
|
||||
// If this is a taproot channel, before we proceed, we want to ensure
|
||||
// that the expected funding output has confirmed on chain.
|
||||
if c.cfg.chanState.ChanType.IsTaproot() {
|
||||
fundingPoint := c.cfg.chanState.FundingOutpoint
|
||||
|
||||
confNtfn, err := c.cfg.notifier.RegisterConfirmationsNtfn(
|
||||
&fundingPoint.Hash, c.fundingPkScript, 1, c.heightHint,
|
||||
)
|
||||
if err != nil {
|
||||
log.Warnf("unable to register for conf: %v", err)
|
||||
}
|
||||
|
||||
log.Infof("Waiting for taproot ChannelPoint(%v) to confirm...",
|
||||
c.cfg.chanState.FundingOutpoint)
|
||||
|
||||
for {
|
||||
select {
|
||||
case _, ok := <-confNtfn.Confirmed:
|
||||
// A new block is received, we will check whether this block
|
||||
// contains a spending tx that we are interested in.
|
||||
case beat := <-c.BlockbeatChan:
|
||||
log.Debugf("ChainWatcher(%v) received blockbeat %v",
|
||||
c.cfg.chanState.FundingOutpoint, beat.Height())
|
||||
|
||||
// Process the block.
|
||||
c.handleBlockbeat(beat)
|
||||
|
||||
// If the funding outpoint is spent, we now go ahead and handle
|
||||
// it. Note that we cannot rely solely on the `block` event
|
||||
// above to trigger a close event, as deep down, the receiving
|
||||
// of block notifications and the receiving of spending
|
||||
// notifications are done in two different goroutines, so the
|
||||
// expected order: [receive block -> receive spend] is not
|
||||
// guaranteed .
|
||||
case spend, ok := <-c.fundingSpendNtfn.Spend:
|
||||
// If the channel was closed, then this means that the
|
||||
// notifier exited, so we will as well.
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
err := c.handleCommitSpend(spend)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to handle commit spend: %v",
|
||||
err)
|
||||
}
|
||||
|
||||
// The chainWatcher has been signalled to exit, so we'll do so
|
||||
// now.
|
||||
case <-c.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
// We've detected a spend of the channel onchain! Depending on the type
|
||||
// of spend, we'll act accordingly, so we'll examine the spending
|
||||
// transaction to determine what we should do.
|
||||
//
|
||||
// TODO(Roasbeef): need to be able to ensure this only triggers
|
||||
// on confirmation, to ensure if multiple txns are broadcast, we
|
||||
// act on the one that's timestamped
|
||||
case commitSpend, ok := <-c.fundingSpendNtfn.Spend:
|
||||
// If the channel was closed, then this means that the notifier
|
||||
// exited, so we will as well.
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
err := c.handleCommitSpend(commitSpend)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to handle commit spend: %v", err)
|
||||
}
|
||||
|
||||
// The chainWatcher has been signalled to exit, so we'll do so now.
|
||||
case <-c.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// handleKnownLocalState checks whether the passed spend is a local state that
|
||||
@@ -1452,3 +1438,102 @@ func (c *chainWatcher) handleCommitSpend(
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkFundingSpend performs a non-blocking read on the spendNtfn channel to
|
||||
// check whether there's a commit spend already. Returns the spend details if
|
||||
// found.
|
||||
func (c *chainWatcher) checkFundingSpend() *chainntnfs.SpendDetail {
|
||||
select {
|
||||
// We've detected a spend of the channel onchain! Depending on the type
|
||||
// of spend, we'll act accordingly, so we'll examine the spending
|
||||
// transaction to determine what we should do.
|
||||
//
|
||||
// TODO(Roasbeef): need to be able to ensure this only triggers
|
||||
// on confirmation, to ensure if multiple txns are broadcast, we
|
||||
// act on the one that's timestamped
|
||||
case spend, ok := <-c.fundingSpendNtfn.Spend:
|
||||
// If the channel was closed, then this means that the notifier
|
||||
// exited, so we will as well.
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Debugf("Found spend details for funding output: %v",
|
||||
spend.SpenderTxHash)
|
||||
|
||||
return spend
|
||||
|
||||
default:
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// chanPointConfirmed checks whether the given channel point has confirmed.
|
||||
// This is used to ensure that the funding output has confirmed on chain before
|
||||
// we proceed with the rest of the close observer logic for taproot channels.
|
||||
func (c *chainWatcher) chanPointConfirmed() bool {
|
||||
op := c.cfg.chanState.FundingOutpoint
|
||||
confNtfn, err := c.cfg.notifier.RegisterConfirmationsNtfn(
|
||||
&op.Hash, c.fundingPkScript, 1, c.heightHint,
|
||||
)
|
||||
if err != nil {
|
||||
log.Errorf("Unable to register for conf: %v", err)
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
select {
|
||||
case _, ok := <-confNtfn.Confirmed:
|
||||
// If the channel was closed, then this means that the notifier
|
||||
// exited, so we will as well.
|
||||
if !ok {
|
||||
// Check the docs in `fundingConfirmedNtfn` for details.
|
||||
return false
|
||||
}
|
||||
|
||||
log.Debugf("Taproot ChannelPoint(%v) confirmed", op)
|
||||
|
||||
return true
|
||||
|
||||
default:
|
||||
log.Infof("Taproot ChannelPoint(%v) not confirmed yet", op)
|
||||
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// handleBlockbeat takes a blockbeat and queries for a spending tx for the
|
||||
// funding output. If the spending tx is found, it will be handled based on the
|
||||
// closure type.
|
||||
func (c *chainWatcher) handleBlockbeat(beat chainio.Blockbeat) {
|
||||
// Notify the chain watcher has processed the block.
|
||||
defer c.NotifyBlockProcessed(beat, nil)
|
||||
|
||||
// If this is a taproot channel, before we proceed, we want to ensure
|
||||
// that the expected funding output has confirmed on chain.
|
||||
if c.cfg.chanState.IsPending && c.cfg.chanState.ChanType.IsTaproot() {
|
||||
// If the funding output hasn't confirmed in this block, we
|
||||
// will check it again in the next block.
|
||||
if !c.chanPointConfirmed() {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Perform a non-blocking read to check whether the funding output was
|
||||
// spent.
|
||||
spend := c.checkFundingSpend()
|
||||
if spend == nil {
|
||||
log.Tracef("No spend found for ChannelPoint(%v) in block %v",
|
||||
c.cfg.chanState.FundingOutpoint, beat.Height())
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// The funding output was spent, we now handle it by sending a close
|
||||
// event to the channel arbitrator.
|
||||
err := c.handleCommitSpend(spend)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to handle commit spend: %v", err)
|
||||
}
|
||||
}
|
||||
|
@@ -9,10 +9,11 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
"github.com/lightningnetwork/lnd/chainio"
|
||||
"github.com/lightningnetwork/lnd/chainntnfs"
|
||||
"github.com/lightningnetwork/lnd/channeldb"
|
||||
"github.com/lightningnetwork/lnd/input"
|
||||
"github.com/lightningnetwork/lnd/lntest/mock"
|
||||
lnmock "github.com/lightningnetwork/lnd/lntest/mock"
|
||||
"github.com/lightningnetwork/lnd/lnwallet"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
"github.com/stretchr/testify/require"
|
||||
@@ -33,8 +34,8 @@ func TestChainWatcherRemoteUnilateralClose(t *testing.T) {
|
||||
|
||||
// With the channels created, we'll now create a chain watcher instance
|
||||
// which will be watching for any closes of Alice's channel.
|
||||
aliceNotifier := &mock.ChainNotifier{
|
||||
SpendChan: make(chan *chainntnfs.SpendDetail),
|
||||
aliceNotifier := &lnmock.ChainNotifier{
|
||||
SpendChan: make(chan *chainntnfs.SpendDetail, 1),
|
||||
EpochChan: make(chan *chainntnfs.BlockEpoch),
|
||||
ConfChan: make(chan *chainntnfs.TxConfirmation),
|
||||
}
|
||||
@@ -49,6 +50,20 @@ func TestChainWatcherRemoteUnilateralClose(t *testing.T) {
|
||||
require.NoError(t, err, "unable to start chain watcher")
|
||||
defer aliceChainWatcher.Stop()
|
||||
|
||||
// Create a mock blockbeat and send it to Alice's BlockbeatChan.
|
||||
mockBeat := &chainio.MockBlockbeat{}
|
||||
|
||||
// Mock the logger. We don't care how many times it's called as it's
|
||||
// not critical.
|
||||
mockBeat.On("logger").Return(log)
|
||||
|
||||
// Mock a fake block height - this is called based on the debuglevel.
|
||||
mockBeat.On("Height").Return(int32(1)).Maybe()
|
||||
|
||||
// Mock `NotifyBlockProcessed` to be call once.
|
||||
mockBeat.On("NotifyBlockProcessed",
|
||||
nil, aliceChainWatcher.quit).Return().Once()
|
||||
|
||||
// We'll request a new channel event subscription from Alice's chain
|
||||
// watcher.
|
||||
chanEvents := aliceChainWatcher.SubscribeChannelEvents()
|
||||
@@ -61,7 +76,19 @@ func TestChainWatcherRemoteUnilateralClose(t *testing.T) {
|
||||
SpenderTxHash: &bobTxHash,
|
||||
SpendingTx: bobCommit,
|
||||
}
|
||||
aliceNotifier.SpendChan <- bobSpend
|
||||
|
||||
// Here we mock the behavior of a restart.
|
||||
select {
|
||||
case aliceNotifier.SpendChan <- bobSpend:
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatalf("unable to send spend details")
|
||||
}
|
||||
|
||||
select {
|
||||
case aliceChainWatcher.BlockbeatChan <- mockBeat:
|
||||
case <-time.After(time.Second * 1):
|
||||
t.Fatalf("unable to send blockbeat")
|
||||
}
|
||||
|
||||
// We should get a new spend event over the remote unilateral close
|
||||
// event channel.
|
||||
@@ -117,7 +144,7 @@ func TestChainWatcherRemoteUnilateralClosePendingCommit(t *testing.T) {
|
||||
|
||||
// With the channels created, we'll now create a chain watcher instance
|
||||
// which will be watching for any closes of Alice's channel.
|
||||
aliceNotifier := &mock.ChainNotifier{
|
||||
aliceNotifier := &lnmock.ChainNotifier{
|
||||
SpendChan: make(chan *chainntnfs.SpendDetail),
|
||||
EpochChan: make(chan *chainntnfs.BlockEpoch),
|
||||
ConfChan: make(chan *chainntnfs.TxConfirmation),
|
||||
@@ -165,7 +192,32 @@ func TestChainWatcherRemoteUnilateralClosePendingCommit(t *testing.T) {
|
||||
SpenderTxHash: &bobTxHash,
|
||||
SpendingTx: bobCommit,
|
||||
}
|
||||
aliceNotifier.SpendChan <- bobSpend
|
||||
|
||||
// Create a mock blockbeat and send it to Alice's BlockbeatChan.
|
||||
mockBeat := &chainio.MockBlockbeat{}
|
||||
|
||||
// Mock the logger. We don't care how many times it's called as it's
|
||||
// not critical.
|
||||
mockBeat.On("logger").Return(log)
|
||||
|
||||
// Mock a fake block height - this is called based on the debuglevel.
|
||||
mockBeat.On("Height").Return(int32(1)).Maybe()
|
||||
|
||||
// Mock `NotifyBlockProcessed` to be call once.
|
||||
mockBeat.On("NotifyBlockProcessed",
|
||||
nil, aliceChainWatcher.quit).Return().Once()
|
||||
|
||||
select {
|
||||
case aliceNotifier.SpendChan <- bobSpend:
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatalf("unable to send spend details")
|
||||
}
|
||||
|
||||
select {
|
||||
case aliceChainWatcher.BlockbeatChan <- mockBeat:
|
||||
case <-time.After(time.Second * 1):
|
||||
t.Fatalf("unable to send blockbeat")
|
||||
}
|
||||
|
||||
// We should get a new spend event over the remote unilateral close
|
||||
// event channel.
|
||||
@@ -279,7 +331,7 @@ func TestChainWatcherDataLossProtect(t *testing.T) {
|
||||
// With the channels created, we'll now create a chain watcher
|
||||
// instance which will be watching for any closes of Alice's
|
||||
// channel.
|
||||
aliceNotifier := &mock.ChainNotifier{
|
||||
aliceNotifier := &lnmock.ChainNotifier{
|
||||
SpendChan: make(chan *chainntnfs.SpendDetail),
|
||||
EpochChan: make(chan *chainntnfs.BlockEpoch),
|
||||
ConfChan: make(chan *chainntnfs.TxConfirmation),
|
||||
@@ -326,7 +378,34 @@ func TestChainWatcherDataLossProtect(t *testing.T) {
|
||||
SpenderTxHash: &bobTxHash,
|
||||
SpendingTx: bobCommit,
|
||||
}
|
||||
aliceNotifier.SpendChan <- bobSpend
|
||||
|
||||
// Create a mock blockbeat and send it to Alice's
|
||||
// BlockbeatChan.
|
||||
mockBeat := &chainio.MockBlockbeat{}
|
||||
|
||||
// Mock the logger. We don't care how many times it's called as
|
||||
// it's not critical.
|
||||
mockBeat.On("logger").Return(log)
|
||||
|
||||
// Mock a fake block height - this is called based on the
|
||||
// debuglevel.
|
||||
mockBeat.On("Height").Return(int32(1)).Maybe()
|
||||
|
||||
// Mock `NotifyBlockProcessed` to be call once.
|
||||
mockBeat.On("NotifyBlockProcessed",
|
||||
nil, aliceChainWatcher.quit).Return().Once()
|
||||
|
||||
select {
|
||||
case aliceNotifier.SpendChan <- bobSpend:
|
||||
case <-time.After(time.Second * 1):
|
||||
t.Fatalf("failed to send spend notification")
|
||||
}
|
||||
|
||||
select {
|
||||
case aliceChainWatcher.BlockbeatChan <- mockBeat:
|
||||
case <-time.After(time.Second * 1):
|
||||
t.Fatalf("unable to send blockbeat")
|
||||
}
|
||||
|
||||
// We should get a new uni close resolution that indicates we
|
||||
// processed the DLP scenario.
|
||||
@@ -453,7 +532,7 @@ func TestChainWatcherLocalForceCloseDetect(t *testing.T) {
|
||||
// With the channels created, we'll now create a chain watcher
|
||||
// instance which will be watching for any closes of Alice's
|
||||
// channel.
|
||||
aliceNotifier := &mock.ChainNotifier{
|
||||
aliceNotifier := &lnmock.ChainNotifier{
|
||||
SpendChan: make(chan *chainntnfs.SpendDetail),
|
||||
EpochChan: make(chan *chainntnfs.BlockEpoch),
|
||||
ConfChan: make(chan *chainntnfs.TxConfirmation),
|
||||
@@ -497,7 +576,33 @@ func TestChainWatcherLocalForceCloseDetect(t *testing.T) {
|
||||
SpenderTxHash: &aliceTxHash,
|
||||
SpendingTx: aliceCommit,
|
||||
}
|
||||
aliceNotifier.SpendChan <- aliceSpend
|
||||
// Create a mock blockbeat and send it to Alice's
|
||||
// BlockbeatChan.
|
||||
mockBeat := &chainio.MockBlockbeat{}
|
||||
|
||||
// Mock the logger. We don't care how many times it's called as
|
||||
// it's not critical.
|
||||
mockBeat.On("logger").Return(log)
|
||||
|
||||
// Mock a fake block height - this is called based on the
|
||||
// debuglevel.
|
||||
mockBeat.On("Height").Return(int32(1)).Maybe()
|
||||
|
||||
// Mock `NotifyBlockProcessed` to be call once.
|
||||
mockBeat.On("NotifyBlockProcessed",
|
||||
nil, aliceChainWatcher.quit).Return().Once()
|
||||
|
||||
select {
|
||||
case aliceNotifier.SpendChan <- aliceSpend:
|
||||
case <-time.After(time.Second * 1):
|
||||
t.Fatalf("unable to send spend notification")
|
||||
}
|
||||
|
||||
select {
|
||||
case aliceChainWatcher.BlockbeatChan <- mockBeat:
|
||||
case <-time.After(time.Second * 1):
|
||||
t.Fatalf("unable to send blockbeat")
|
||||
}
|
||||
|
||||
// We should get a local force close event from Alice as she
|
||||
// should be able to detect the close based on the commitment
|
||||
|
Reference in New Issue
Block a user