Merge pull request #7564 from yyforyongyu/mempool-watch

contractcourt: add mempool watcher to notify mempool events
This commit is contained in:
Olaoluwa Osuntokun 2023-04-18 14:22:23 -07:00 committed by GitHub
commit ec3cb6939a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1746 additions and 120 deletions

View File

@ -69,6 +69,9 @@ type BitcoindNotifier struct {
// which the transaction could have confirmed within the chain.
confirmHintCache chainntnfs.ConfirmHintCache
// memNotifier notifies clients of events related to the mempool.
memNotifier *chainntnfs.MempoolNotifier
wg sync.WaitGroup
quit chan struct{}
}
@ -77,6 +80,10 @@ type BitcoindNotifier struct {
// time.
var _ chainntnfs.ChainNotifier = (*BitcoindNotifier)(nil)
// Ensure BitcoindNotifier implements the MempoolWatcher interface at compile
// time.
var _ chainntnfs.MempoolWatcher = (*BitcoindNotifier)(nil)
// New returns a new BitcoindNotifier instance. This function assumes the
// bitcoind node detailed in the passed configuration is already running, and
// willing to accept RPC requests and new zmq clients.
@ -96,7 +103,8 @@ func New(chainConn *chain.BitcoindConn, chainParams *chaincfg.Params,
spendHintCache: spendHintCache,
confirmHintCache: confirmHintCache,
blockCache: blockCache,
blockCache: blockCache,
memNotifier: chainntnfs.NewMempoolNotifier(),
quit: make(chan struct{}),
}
@ -113,6 +121,7 @@ func (b *BitcoindNotifier) Start() error {
b.start.Do(func() {
startErr = b.startNotifier()
})
return startErr
}
@ -142,6 +151,9 @@ func (b *BitcoindNotifier) Stop() error {
}
b.txNotifier.TearDown()
// Stop the mempool notifier.
b.memNotifier.TearDown()
return nil
}
@ -441,23 +453,21 @@ out:
b.bestBlock = newBestBlock
case chain.RelevantTx:
// We only care about notifying on confirmed
// spends, so if this is a mempool spend, we can
// ignore it and wait for the spend to appear in
// on-chain.
tx := btcutil.NewTx(&item.TxRecord.MsgTx)
// Init values.
isMempool := false
height := uint32(0)
// Unwrap values.
if item.Block == nil {
continue
isMempool = true
} else {
height = uint32(item.Block.Height)
}
tx := btcutil.NewTx(&item.TxRecord.MsgTx)
err := b.txNotifier.ProcessRelevantSpendTx(
tx, uint32(item.Block.Height),
)
if err != nil {
chainntnfs.Log.Errorf("Unable to "+
"process transaction %v: %v",
tx.Hash(), err)
}
// Handle the transaction.
b.handleRelevantTx(tx, isMempool, height)
}
case <-b.quit:
@ -466,6 +476,39 @@ out:
}
}
// handleRelevantTx handles a new transaction that has been seen either in a
// block or in the mempool. If in mempool, it will ask the mempool notifier to
// handle it. If in a block, it will ask the txNotifier to handle it, and
// cancel any relevant subscriptions made in the mempool.
func (b *BitcoindNotifier) handleRelevantTx(tx *btcutil.Tx,
mempool bool, height uint32) {
// If this is a mempool spend, we'll ask the mempool notifier to hanlde
// it.
if mempool {
b.memNotifier.ProcessRelevantSpendTx(tx)
return
}
// Otherwise this is a confirmed spend, and we'll ask the tx notifier
// to handle it.
err := b.txNotifier.ProcessRelevantSpendTx(tx, height)
if err != nil {
chainntnfs.Log.Errorf("Unable to process transaction %v: %v",
tx.Hash(), err)
return
}
// Once the tx is processed, we will ask the memNotifier to unsubscribe
// the input.
//
// NOTE(yy): we could build it into txNotifier.ProcessRelevantSpendTx,
// but choose to implement it here so we can easily decouple the two
// notifiers in the future.
b.memNotifier.UnsubsribeConfirmedSpentTx(tx)
}
// historicalConfDetails looks up whether a confirmation request (txid/output
// script) has already been included in a block in the active chain and, if so,
// returns details about said block.
@ -993,3 +1036,28 @@ func (b *BitcoindNotifier) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock,
return b.blockCache.GetBlock(hash, b.chainConn.GetBlock)
}
// SubscribeMempoolSpent allows the caller to register a subscription to watch
// for a spend of an outpoint in the mempool.The event will be dispatched once
// the outpoint is spent in the mempool.
//
// NOTE: part of the MempoolWatcher interface.
func (b *BitcoindNotifier) SubscribeMempoolSpent(
outpoint wire.OutPoint) (*chainntnfs.MempoolSpendEvent, error) {
event := b.memNotifier.SubscribeInput(outpoint)
ops := []*wire.OutPoint{&outpoint}
return event, b.chainConn.NotifySpent(ops)
}
// CancelMempoolSpendEvent allows the caller to cancel a subscription to watch
// for a spend of an outpoint in the mempool.
//
// NOTE: part of the MempoolWatcher interface.
func (b *BitcoindNotifier) CancelMempoolSpendEvent(
sub *chainntnfs.MempoolSpendEvent) {
b.memNotifier.UnsubscribeEvent(sub)
}

View File

@ -86,6 +86,9 @@ type BtcdNotifier struct {
// which the transaction could have confirmed within the chain.
confirmHintCache chainntnfs.ConfirmHintCache
// memNotifier notifies clients of events related to the mempool.
memNotifier *chainntnfs.MempoolNotifier
wg sync.WaitGroup
quit chan struct{}
}
@ -93,6 +96,9 @@ type BtcdNotifier struct {
// Ensure BtcdNotifier implements the ChainNotifier interface at compile time.
var _ chainntnfs.ChainNotifier = (*BtcdNotifier)(nil)
// Ensure BtcdNotifier implements the MempoolWatcher interface at compile time.
var _ chainntnfs.MempoolWatcher = (*BtcdNotifier)(nil)
// New returns a new BtcdNotifier instance. This function assumes the btcd node
// detailed in the passed configuration is already running, and willing to
// accept new websockets clients.
@ -115,7 +121,8 @@ func New(config *rpcclient.ConnConfig, chainParams *chaincfg.Params,
spendHintCache: spendHintCache,
confirmHintCache: confirmHintCache,
blockCache: blockCache,
blockCache: blockCache,
memNotifier: chainntnfs.NewMempoolNotifier(),
quit: make(chan struct{}),
}
@ -146,6 +153,7 @@ func (b *BtcdNotifier) Start() error {
b.start.Do(func() {
startErr = b.startNotifier()
})
return startErr
}
@ -183,6 +191,9 @@ func (b *BtcdNotifier) Stop() error {
}
b.txNotifier.TearDown()
// Stop the mempool notifier.
b.memNotifier.TearDown()
return nil
}
@ -492,22 +503,21 @@ out:
case item := <-b.txUpdates.ChanOut():
newSpend := item.(*txUpdate)
tx := newSpend.tx
// We only care about notifying on confirmed spends, so
// if this is a mempool spend, we can ignore it and wait
// for the spend to appear in on-chain.
// Init values.
isMempool := false
height := uint32(0)
// Unwrap values.
if newSpend.details == nil {
continue
isMempool = true
} else {
height = uint32(newSpend.details.Height)
}
err := b.txNotifier.ProcessRelevantSpendTx(
newSpend.tx, uint32(newSpend.details.Height),
)
if err != nil {
chainntnfs.Log.Errorf("Unable to process "+
"transaction %v: %v",
newSpend.tx.Hash(), err)
}
// Handle the transaction.
b.handleRelevantTx(tx, isMempool, height)
case <-b.quit:
break out
@ -515,6 +525,39 @@ out:
}
}
// handleRelevantTx handles a new transaction that has been seen either in a
// block or in the mempool. If in mempool, it will ask the mempool notifier to
// handle it. If in a block, it will ask the txNotifier to handle it, and
// cancel any relevant subscriptions made in the mempool.
func (b *BtcdNotifier) handleRelevantTx(tx *btcutil.Tx,
mempool bool, height uint32) {
// If this is a mempool spend, we'll ask the mempool notifier to hanlde
// it.
if mempool {
b.memNotifier.ProcessRelevantSpendTx(tx)
return
}
// Otherwise this is a confirmed spend, and we'll ask the tx notifier
// to handle it.
err := b.txNotifier.ProcessRelevantSpendTx(tx, height)
if err != nil {
chainntnfs.Log.Errorf("Unable to process transaction %v: %v",
tx.Hash(), err)
return
}
// Once the tx is processed, we will ask the memNotifier to unsubscribe
// the input.
//
// NOTE(yy): we could build it into txNotifier.ProcessRelevantSpendTx,
// but choose to implement it here so we can easily decouple the two
// notifiers in the future.
b.memNotifier.UnsubsribeConfirmedSpentTx(tx)
}
// historicalConfDetails looks up whether a confirmation request (txid/output
// script) has already been included in a block in the active chain and, if so,
// returns details about said block.
@ -1051,3 +1094,28 @@ func (b *BtcdNotifier) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock,
return b.blockCache.GetBlock(hash, b.chainConn.GetBlock)
}
// SubscribeMempoolSpent allows the caller to register a subscription to watch
// for a spend of an outpoint in the mempool.The event will be dispatched once
// the outpoint is spent in the mempool.
//
// NOTE: part of the MempoolWatcher interface.
func (b *BtcdNotifier) SubscribeMempoolSpent(
outpoint wire.OutPoint) (*chainntnfs.MempoolSpendEvent, error) {
event := b.memNotifier.SubscribeInput(outpoint)
ops := []*wire.OutPoint{&outpoint}
return event, b.chainConn.NotifySpent(ops)
}
// CancelMempoolSpendEvent allows the caller to cancel a subscription to watch
// for a spend of an outpoint in the mempool.
//
// NOTE: part of the MempoolWatcher interface.
func (b *BtcdNotifier) CancelMempoolSpendEvent(
sub *chainntnfs.MempoolSpendEvent) {
b.memNotifier.UnsubscribeEvent(sub)
}

View File

@ -808,3 +808,16 @@ type ConfirmHintCache interface {
// the cache.
PurgeConfirmHint(confRequests ...ConfRequest) error
}
// MempoolWatcher defines an interface that allows the caller to query
// information in the mempool.
type MempoolWatcher interface {
// SubscribeMempoolSpent allows the caller to register a subscription
// to watch for a spend of an outpoint in the mempool.The event will be
// dispatched once the outpoint is spent in the mempool.
SubscribeMempoolSpent(op wire.OutPoint) (*MempoolSpendEvent, error)
// CancelMempoolSpendEvent allows the caller to cancel a subscription to
// watch for a spend of an outpoint in the mempool.
CancelMempoolSpendEvent(sub *MempoolSpendEvent)
}

277
chainntnfs/mempool.go Normal file
View File

@ -0,0 +1,277 @@
package chainntnfs
import (
"sync"
"sync/atomic"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/lnutils"
)
// inputsWithTx is a map of outpoints to the tx that spends them.
type inputsWithTx map[wire.OutPoint]*SpendDetail
// MempoolNotifier defines an internal mempool notifier that's used to notify
// the spending of given inputs. This is mounted to either `BitcoindNotifier`
// or `BtcdNotifier` depending on the chain backend.
type MempoolNotifier struct {
wg sync.WaitGroup
// subscribedInputs stores the inputs that we want to watch their
// spending event for.
subscribedInputs *lnutils.SyncMap[wire.OutPoint,
*lnutils.SyncMap[uint64, *MempoolSpendEvent]]
// sCounter is used to generate unique subscription IDs.
sCounter atomic.Uint64
// quit is closed when the notifier is torn down.
quit chan struct{}
}
// MempoolSpendEvent is returned to the subscriber to watch for the spending
// event for the requested input.
type MempoolSpendEvent struct {
// Spend is a receive only channel which will be sent upon once the
// target outpoint has been spent.
//
// NOTE: This channel must be buffered.
Spend <-chan *SpendDetail
// id is the unique identifier of this subscription.
id uint64
// outpoint is the subscribed outpoint.
outpoint wire.OutPoint
// event is the channel that will be sent upon once the target outpoint
// is spent.
event chan *SpendDetail
// cancel cancels the subscription.
cancel chan struct{}
}
// newMempoolSpendEvent returns a new instance of MempoolSpendEvent.
func newMempoolSpendEvent(id uint64, op wire.OutPoint) *MempoolSpendEvent {
sub := &MempoolSpendEvent{
id: id,
outpoint: op,
event: make(chan *SpendDetail, 1),
cancel: make(chan struct{}),
}
// Mount the receive only channel to the event channel.
sub.Spend = (<-chan *SpendDetail)(sub.event)
return sub
}
// NewMempoolNotifier takes a chain connection and returns a new mempool
// notifier.
func NewMempoolNotifier() *MempoolNotifier {
return &MempoolNotifier{
subscribedInputs: &lnutils.SyncMap[
wire.OutPoint, *lnutils.SyncMap[
uint64, *MempoolSpendEvent,
]]{},
quit: make(chan struct{}),
}
}
// SubscribeInput takes an outpoint of the input and returns a channel that the
// subscriber can listen to for the spending event.
func (m *MempoolNotifier) SubscribeInput(
outpoint wire.OutPoint) *MempoolSpendEvent {
// Get the current subscribers for this input or create a new one.
clients := &lnutils.SyncMap[uint64, *MempoolSpendEvent]{}
clients, _ = m.subscribedInputs.LoadOrStore(outpoint, clients)
// Increment the subscription counter and return the new value.
subscriptionID := m.sCounter.Add(1)
// Create a new subscription.
sub := newMempoolSpendEvent(subscriptionID, outpoint)
// Add the subscriber with a unique id.
clients.Store(subscriptionID, sub)
// Update the subscribed inputs.
m.subscribedInputs.Store(outpoint, clients)
Log.Debugf("Subscribed(id=%v) mempool event for input=%s",
subscriptionID, outpoint)
return sub
}
// UnsubscribeInput removes all the subscriptions for the given outpoint.
func (m *MempoolNotifier) UnsubscribeInput(outpoint wire.OutPoint) {
Log.Debugf("Unsubscribing MempoolSpendEvent for input %s", outpoint)
m.subscribedInputs.Delete(outpoint)
}
// UnsubscribeEvent removes a given subscriber for the given MempoolSpendEvent.
func (m *MempoolNotifier) UnsubscribeEvent(sub *MempoolSpendEvent) {
Log.Debugf("Unsubscribing(id=%v) MempoolSpendEvent for input=%s",
sub.id, sub.outpoint)
// Load all the subscribers for this input.
clients, loaded := m.subscribedInputs.Load(sub.outpoint)
if !loaded {
Log.Debugf("No subscribers for input %s", sub.outpoint)
return
}
// Load the subscriber.
subscriber, loaded := clients.Load(sub.id)
if !loaded {
Log.Debugf("No subscribers for input %s with id %v",
sub.outpoint, sub.id)
return
}
// Close the cancel channel in case it's been used in a goroutine.
close(subscriber.cancel)
// Remove the subscriber.
clients.Delete(sub.id)
}
// UnsubsribeConfirmedSpentTx takes a transaction and removes the subscriptions
// identified using its inputs.
func (m *MempoolNotifier) UnsubsribeConfirmedSpentTx(tx *btcutil.Tx) {
Log.Tracef("Unsubscribe confirmed tx %s", tx.Hash())
// Get the spent inputs of interest.
spentInputs := m.findRelevantInputs(tx)
// Unsubscribe the subscribers.
for outpoint := range spentInputs {
m.UnsubscribeInput(outpoint)
}
Log.Tracef("Finished unsubscribing confirmed tx %s, found %d inputs",
tx.Hash(), len(spentInputs))
}
// ProcessRelevantSpendTx takes a transaction and checks whether it spends any
// of the subscribed inputs. If so, spend notifications are sent to the
// relevant subscribers.
func (m *MempoolNotifier) ProcessRelevantSpendTx(tx *btcutil.Tx) {
Log.Tracef("Processing mempool tx %s", tx.Hash())
defer Log.Tracef("Finished processing mempool tx %s", tx.Hash())
// Get the spent inputs of interest.
spentInputs := m.findRelevantInputs(tx)
// Notify the subscribers.
m.notifySpent(spentInputs)
}
// TearDown stops the notifier and cleans up resources.
func (m *MempoolNotifier) TearDown() {
Log.Infof("Stopping mempool notifier")
close(m.quit)
m.wg.Wait()
}
// findRelevantInputs takes a transaction to find the subscribed inputs and
// returns them.
func (m *MempoolNotifier) findRelevantInputs(tx *btcutil.Tx) inputsWithTx {
txid := tx.Hash()
watchedInputs := make(inputsWithTx)
// NOTE: we may have found multiple targeted inputs in the same tx.
for i, input := range tx.MsgTx().TxIn {
op := &input.PreviousOutPoint
// Check whether this input is subscribed.
_, loaded := m.subscribedInputs.Load(*op)
if !loaded {
continue
}
// If found, save it to watchedInputs to notify the
// subscriber later.
Log.Infof("Found input %s, spent in %s", op, txid)
// Construct the spend details.
details := &SpendDetail{
SpentOutPoint: op,
SpenderTxHash: txid,
SpendingTx: tx.MsgTx().Copy(),
SpenderInputIndex: uint32(i),
SpendingHeight: 0,
}
watchedInputs[*op] = details
}
return watchedInputs
}
// notifySpent iterates all the spentInputs and notifies the subscribers about
// the spent details.
func (m *MempoolNotifier) notifySpent(spentInputs inputsWithTx) {
// notifySingle sends a notification to a single subscriber about the
// spending event.
//
// NOTE: must be used inside a goroutine.
notifySingle := func(id uint64, sub *MempoolSpendEvent,
op wire.OutPoint, detail *SpendDetail) {
defer m.wg.Done()
// Send the spend details to the subscriber.
select {
case sub.event <- detail:
Log.Debugf("Notified(id=%v) mempool spent for input %s",
sub.id, op)
case <-sub.cancel:
Log.Debugf("Subscription(id=%v) canceled, skipped "+
"notifying spent for input %s", sub.id, op)
case <-m.quit:
Log.Debugf("Mempool notifier quit, skipped notifying "+
"mempool spent for input %s", op)
}
}
// notifyAll is a helper closure that constructs a spend detail and
// sends it to all the subscribers of that particular input.
//
// NOTE: must be used inside a goroutine.
notifyAll := func(detail *SpendDetail, op wire.OutPoint) {
defer m.wg.Done()
txid := detail.SpendingTx.TxHash()
Log.Debugf("Notifying all clients for the spend of %s in tx %s",
op, txid)
// Load the subscriber.
subs, loaded := m.subscribedInputs.Load(op)
if !loaded {
Log.Errorf("Sub not found for %s", op)
return
}
// Iterate all the subscribers for this input and notify them.
subs.ForEach(func(id uint64, sub *MempoolSpendEvent) error {
m.wg.Add(1)
go notifySingle(id, sub, op, detail)
return nil
})
}
// Iterate the spent inputs to notify the subscribers concurrently.
for op, tx := range spentInputs {
op, tx := op, tx
m.wg.Add(1)
go notifyAll(tx, op)
}
}

387
chainntnfs/mempool_test.go Normal file
View File

@ -0,0 +1,387 @@
package chainntnfs
import (
"testing"
"time"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/wire"
"github.com/stretchr/testify/require"
)
const testTimeout = 5 * time.Second
// TestMempoolSubscribeInput tests that we can successfully subscribe an input.
func TestMempoolSubscribeInput(t *testing.T) {
t.Parallel()
// Create a new mempool notifier instance.
notifier := NewMempoolNotifier()
// Create a test input.
input := wire.OutPoint{Hash: [32]byte{1}}
// Create the expected subscription.
expectedSub := newMempoolSpendEvent(1, input)
// Subscribe to the input.
sub := notifier.SubscribeInput(input)
// Verify the subscription is returned.
require.Equal(t, expectedSub.id, sub.id)
require.Equal(t, expectedSub.outpoint, sub.outpoint)
// Verify that the subscription was added to the notifier.
subs, loaded := notifier.subscribedInputs.Load(input)
require.True(t, loaded)
// Verify the saved subscription is the same as the expected one.
sub, loaded = subs.Load(sub.id)
require.True(t, loaded)
require.Equal(t, expectedSub.id, sub.id)
require.Equal(t, expectedSub.outpoint, sub.outpoint)
}
// TestMempoolUnsubscribeInput tests that we can successfully unsubscribe an
// input.
func TestMempoolUnsubscribeInput(t *testing.T) {
t.Parallel()
// Create a new mempool notifier instance.
notifier := NewMempoolNotifier()
// Register a spend notification for an outpoint.
input := wire.OutPoint{Hash: [32]byte{1}}
notifier.SubscribeInput(input)
// Verify that the subscription was added to the notifier.
_, loaded := notifier.subscribedInputs.Load(input)
require.True(t, loaded)
// Unsubscribe the input.
notifier.UnsubscribeInput(input)
// Verify that the input is gone.
_, loaded = notifier.subscribedInputs.Load(input)
require.False(t, loaded)
}
// TestMempoolUnsubscribeEvent tests that when a given input has multiple
// subscribers, removing one of them won't affect the others.
func TestMempoolUnsubscribeEvent(t *testing.T) {
t.Parallel()
// Create a new mempool notifier instance.
notifier := NewMempoolNotifier()
// Register a spend notification for an outpoint.
input := wire.OutPoint{Hash: [32]byte{1}}
sub1 := notifier.SubscribeInput(input)
sub2 := notifier.SubscribeInput(input)
// Verify that the subscription was added to the notifier.
subs, loaded := notifier.subscribedInputs.Load(input)
require.True(t, loaded)
// sub1 should be found.
_, loaded = subs.Load(sub1.id)
require.True(t, loaded)
// sub2 should be found.
_, loaded = subs.Load(sub2.id)
require.True(t, loaded)
// Unsubscribe sub1.
notifier.UnsubscribeEvent(sub1)
// Verify that the subscription was removed from the notifier.
subs, loaded = notifier.subscribedInputs.Load(input)
require.True(t, loaded)
// sub1 should be gone.
_, loaded = subs.Load(sub1.id)
require.False(t, loaded)
// sub2 should still be found.
_, loaded = subs.Load(sub2.id)
require.True(t, loaded)
}
// TestMempoolFindRelevantInputs tests that the mempool notifier can find the
// spend of subscribed inputs from a given transaction.
func TestMempoolFindRelevantInputs(t *testing.T) {
t.Parallel()
// Create a new mempool notifier instance.
notifier := NewMempoolNotifier()
// Create two inputs and subscribe to the second one.
input1 := wire.OutPoint{Hash: [32]byte{1}}
input2 := wire.OutPoint{Hash: [32]byte{2}}
// Make input2 the subscribed input.
notifier.SubscribeInput(input2)
// Create a transaction that spends the above two inputs.
msgTx := &wire.MsgTx{
TxIn: []*wire.TxIn{
{PreviousOutPoint: input1},
{PreviousOutPoint: input2},
},
TxOut: []*wire.TxOut{},
}
tx := btcutil.NewTx(msgTx)
// Create the expected spend detail.
detailExp := &SpendDetail{
SpentOutPoint: &input2,
SpenderTxHash: tx.Hash(),
SpendingTx: msgTx,
SpenderInputIndex: 1,
}
// Call the method.
result := notifier.findRelevantInputs(tx)
// Verify that the result is as expected.
require.Contains(t, result, input2)
// Verify the returned spend details is as expected.
detail := result[input2]
require.Equal(t, detailExp, detail)
}
// TestMempoolNotifySpentSameInputs tests that the mempool notifier sends
// notifications to all subscribers of the same input.
func TestMempoolNotifySpentSameInputs(t *testing.T) {
t.Parallel()
// Create a new mempool notifier instance.
notifier := NewMempoolNotifier()
// Register a spend notification for an outpoint.
input := wire.OutPoint{Hash: [32]byte{1}}
sub1 := notifier.SubscribeInput(input)
sub2 := notifier.SubscribeInput(input)
// Create a transaction that spends input.
msgTx := &wire.MsgTx{
TxIn: []*wire.TxIn{
{PreviousOutPoint: input},
},
}
tx := btcutil.NewTx(msgTx)
// Notify the subscribers about the spent input.
spendDetail := &SpendDetail{
SpentOutPoint: &input,
SpenderTxHash: tx.Hash(),
SpendingTx: msgTx,
SpenderInputIndex: 0,
}
notifier.notifySpent(inputsWithTx{input: spendDetail})
// Verify that sub1 received the spend notification for input1.
select {
case spend := <-sub1.Spend:
require.Equal(t, tx.Hash(), spend.SpenderTxHash)
case <-time.After(testTimeout):
require.Fail(t, "timeout for sub1 to receive")
}
// Verify that sub2 received the spend notification for input1.
select {
case spend := <-sub2.Spend:
require.Equal(t, tx.Hash(), spend.SpenderTxHash)
case <-time.After(testTimeout):
require.Fail(t, "timeout for sub2 to receive")
}
}
// TestMempoolNotifySpentDifferentInputs tests that the mempool notifier sends
// notifications to different subscribers of different inputs.
func TestMempoolNotifySpentDifferentInputs(t *testing.T) {
t.Parallel()
// Create a new mempool notifier instance.
notifier := NewMempoolNotifier()
// Create two inputs and subscribe to them.
input1 := wire.OutPoint{Hash: [32]byte{1}, Index: 0}
input2 := wire.OutPoint{Hash: [32]byte{2}, Index: 0}
sub1 := notifier.SubscribeInput(input1)
sub2 := notifier.SubscribeInput(input2)
// Create a transaction that spends input1.
msgTx := &wire.MsgTx{
TxIn: []*wire.TxIn{
{PreviousOutPoint: input1},
},
}
tx := btcutil.NewTx(msgTx)
spendDetail1 := &SpendDetail{
SpentOutPoint: &input1,
SpenderTxHash: tx.Hash(),
SpendingTx: msgTx,
SpenderInputIndex: 0,
}
// Notify the subscribers about the spent input.
notifier.notifySpent(inputsWithTx{input1: spendDetail1})
// Verify that sub1 received the spend notification for input1.
select {
case spend := <-sub1.Spend:
require.Equal(t, tx.Hash(), spend.SpenderTxHash)
case <-time.After(testTimeout):
require.Fail(t, "timeout for sub1 to receive")
}
// Verify that sub2 did not receive any spend notifications.
select {
case <-sub2.Spend:
require.Fail(t, "Expected sub2 to not receive")
// Give it one second to NOT receive a spend notification.
case <-time.After(1 * time.Second):
}
// Create another transaction that spends input1 and input2.
msgTx2 := &wire.MsgTx{
TxIn: []*wire.TxIn{
{PreviousOutPoint: input1},
{PreviousOutPoint: input2},
},
}
tx2 := btcutil.NewTx(msgTx)
spendDetail2 := &SpendDetail{
SpentOutPoint: &input2,
SpenderTxHash: tx2.Hash(),
SpendingTx: msgTx2,
SpenderInputIndex: 1,
}
// Notify the subscribers about the spent inputs.
notifier.notifySpent(inputsWithTx{
input1: spendDetail1, input2: spendDetail2,
})
// Verify that sub1 received the spend notification for input1.
select {
case spend := <-sub1.Spend:
require.Equal(t, tx.Hash(), spend.SpenderTxHash)
case <-time.After(testTimeout):
require.Fail(t, "timeout for sub1 to receive")
}
// Verify that sub2 received the spend notification for input2.
select {
case spend := <-sub2.Spend:
require.Equal(t, tx2.Hash(), spend.SpenderTxHash)
case <-time.After(testTimeout):
require.Fail(t, "timeout for sub2 to receive")
}
}
// TestMempoolNotifySpentCancel tests that once a subscription is canceled, it
// won't get notified and won't affect other subscriptions.
func TestMempoolNotifySpentCancel(t *testing.T) {
t.Parallel()
// Create a new mempool notifier instance.
notifier := NewMempoolNotifier()
// Register a spend notification for an outpoint.
input := wire.OutPoint{Hash: [32]byte{1}}
sub1 := notifier.SubscribeInput(input)
sub2 := notifier.SubscribeInput(input)
// Create a transaction that spends input.
msgTx := &wire.MsgTx{
TxIn: []*wire.TxIn{
{PreviousOutPoint: input},
},
}
tx := btcutil.NewTx(msgTx)
// Cancel the second subscription before notify.
notifier.UnsubscribeEvent(sub2)
// Notify the subscribers about the spent input.
spendDetail := &SpendDetail{
SpentOutPoint: &input,
SpenderTxHash: tx.Hash(),
SpendingTx: msgTx,
SpenderInputIndex: 0,
}
notifier.notifySpent(inputsWithTx{input: spendDetail})
// Verify that sub1 received the spend notification for input1.
select {
case spend := <-sub1.Spend:
require.Equal(t, tx.Hash(), spend.SpenderTxHash)
case <-time.After(testTimeout):
require.Fail(t, "timeout for sub1 to receive")
}
// Verify that sub2 did not receive any spend notifications.
select {
case <-sub2.Spend:
require.Fail(t, "expected sub2 to not receive")
// Give it one second to NOT receive a spend notification.
case <-time.After(1 * time.Second):
// Expected
}
}
// TestMempoolUnsubscribeConfirmedSpentTx tests that the subscriptions for a
// confirmed tx are removed when calling the method.
func TestMempoolUnsubscribeConfirmedSpentTx(t *testing.T) {
t.Parallel()
// Create a new mempool notifier instance.
notifier := NewMempoolNotifier()
// Create two inputs and subscribe to them.
input1 := wire.OutPoint{Hash: [32]byte{1}, Index: 0}
input2 := wire.OutPoint{Hash: [32]byte{2}, Index: 0}
// sub1 and sub2 are subscribed to the same input.
notifier.SubscribeInput(input1)
notifier.SubscribeInput(input1)
// sub3 is subscribed to a different input.
sub3 := notifier.SubscribeInput(input2)
// Create a transaction that spends input1.
msgTx := &wire.MsgTx{
TxIn: []*wire.TxIn{
{PreviousOutPoint: input1},
},
}
tx := btcutil.NewTx(msgTx)
// Unsubscribe the relevant transaction.
notifier.UnsubsribeConfirmedSpentTx(tx)
// Verify that the sub1 and sub2 are removed from the notifier.
_, loaded := notifier.subscribedInputs.Load(input1)
require.False(t, loaded)
// Verify that the sub3 is not affected.
subs, loaded := notifier.subscribedInputs.Load(input2)
require.True(t, loaded)
// sub3 should still be found.
_, loaded = subs.Load(sub3.id)
require.True(t, loaded)
}

View File

@ -182,6 +182,10 @@ type PartialChainControl struct {
// interested in.
ChainNotifier chainntnfs.ChainNotifier
// MempoolNotifier is used to watch for spending events happened in
// mempool.
MempoolNotifier chainntnfs.MempoolWatcher
// ChainView is used in the router for maintaining an up-to-date graph.
ChainView chainview.FilteredChainView
@ -249,6 +253,8 @@ func GenDefaultBtcConstraints() channeldb.ChannelConstraints {
// NewPartialChainControl creates a new partial chain control that contains all
// the parts that can be purely constructed from the passed in global
// configuration and doesn't need any wallet instance yet.
//
//nolint:lll
func NewPartialChainControl(cfg *Config) (*PartialChainControl, func(), error) {
// Set the RPC config from the "home" chain. Multi-chain isn't yet
// active, so we'll restrict usage to a particular chain for now.
@ -410,14 +416,17 @@ func NewPartialChainControl(cfg *Config) (*PartialChainControl, func(), error) {
if bitcoindMode.RPCPolling {
bitcoindCfg.PollingConfig = &chain.PollingConfig{
BlockPollingInterval: bitcoindMode.BlockPollingInterval,
TxPollingInterval: bitcoindMode.TxPollingInterval,
BlockPollingInterval: bitcoindMode.BlockPollingInterval,
TxPollingInterval: bitcoindMode.TxPollingInterval,
TxPollingIntervalJitter: lncfg.DefaultTxPollingJitter,
}
} else {
bitcoindCfg.ZMQConfig = &chain.ZMQConfig{
ZMQBlockHost: bitcoindMode.ZMQPubRawBlock,
ZMQTxHost: bitcoindMode.ZMQPubRawTx,
ZMQReadDeadline: bitcoindMode.ZMQReadDeadline,
ZMQBlockHost: bitcoindMode.ZMQPubRawBlock,
ZMQTxHost: bitcoindMode.ZMQPubRawTx,
ZMQReadDeadline: bitcoindMode.ZMQReadDeadline,
MempoolPollingInterval: bitcoindMode.TxPollingInterval,
PollingIntervalJitter: lncfg.DefaultTxPollingJitter,
}
}
@ -433,10 +442,14 @@ func NewPartialChainControl(cfg *Config) (*PartialChainControl, func(), error) {
"bitcoind: %v", err)
}
cc.ChainNotifier = bitcoindnotify.New(
chainNotifier := bitcoindnotify.New(
bitcoindConn, cfg.ActiveNetParams.Params, hintCache,
hintCache, cfg.BlockCache,
)
cc.ChainNotifier = chainNotifier
cc.MempoolNotifier = chainNotifier
cc.ChainView = chainview.NewBitcoindFilteredChainView(
bitcoindConn, cfg.BlockCache,
)
@ -655,7 +668,8 @@ func NewPartialChainControl(cfg *Config) (*PartialChainControl, func(), error) {
DisableConnectOnNew: true,
DisableAutoReconnect: false,
}
cc.ChainNotifier, err = btcdnotify.New(
chainNotifier, err := btcdnotify.New(
rpcConfig, cfg.ActiveNetParams.Params, hintCache,
hintCache, cfg.BlockCache,
)
@ -663,6 +677,9 @@ func NewPartialChainControl(cfg *Config) (*PartialChainControl, func(), error) {
return nil, nil, err
}
cc.ChainNotifier = chainNotifier
cc.MempoolNotifier = chainNotifier
// Finally, we'll create an instance of the default chain view
// to be used within the routing layer.
cc.ChainView, err = chainview.NewBtcdFilteredChainView(

View File

@ -130,6 +130,10 @@ type ChainArbitratorConfig struct {
// certain on-chain events.
Notifier chainntnfs.ChainNotifier
// Mempool is the a mempool watcher that allows us to watch for events
// happened in mempool.
Mempool chainntnfs.MempoolWatcher
// Signer is a signer backed by the active lnd node. This should be
// capable of producing a signature as specified by a valid
// SignDescriptor.

View File

@ -1589,11 +1589,15 @@ func (c *ChannelArbitrator) checkCommitChainActions(height uint32,
)
if toChain {
// Convert to int64 in case of overflow.
remainingBlocks := int64(htlc.RefundTimeout) -
int64(height)
log.Infof("ChannelArbitrator(%v): go to chain for "+
"outgoing htlc %x: timeout=%v, "+
"blocks_until_expiry=%v, broadcast_delta=%v",
c.cfg.ChanPoint, htlc.RHash[:],
htlc.RefundTimeout, htlc.RefundTimeout-height,
htlc.RefundTimeout, remainingBlocks,
c.cfg.OutgoingBroadcastDelta,
)
}
@ -1620,11 +1624,15 @@ func (c *ChannelArbitrator) checkCommitChainActions(height uint32,
)
if toChain {
// Convert to int64 in case of overflow.
remainingBlocks := int64(htlc.RefundTimeout) -
int64(height)
log.Infof("ChannelArbitrator(%v): go to chain for "+
"incoming htlc %x: timeout=%v, "+
"blocks_until_expiry=%v, broadcast_delta=%v",
c.cfg.ChanPoint, htlc.RHash[:],
htlc.RefundTimeout, htlc.RefundTimeout-height,
htlc.RefundTimeout, remainingBlocks,
c.cfg.IncomingBroadcastDelta,
)
}

View File

@ -140,6 +140,9 @@ func (h *htlcIncomingContestResolver) Resolve() (ContractResolver, error) {
return nil, errResolverShuttingDown
}
log.Debugf("%T(%v): Resolving incoming HTLC(expiry=%v, height=%v)", h,
h.htlcResolution.ClaimOutpoint, h.htlcExpiry, currentHeight)
// We'll first check if this HTLC has been timed out, if so, we can
// return now and mark ourselves as resolved. If we're past the point of
// expiry of the HTLC, then at this point the sender can sweep it, so

View File

@ -192,13 +192,10 @@ func (h *htlcTimeoutResolver) claimCleanUp(
// chainDetailsToWatch returns the output and script which we use to watch for
// spends from the direct HTLC output on the commitment transaction.
//
// TODO(joostjager): output already set properly in
// lnwallet.newOutgoingHtlcResolution? And script too?
func (h *htlcTimeoutResolver) chainDetailsToWatch() (*wire.OutPoint, []byte, error) {
// If there's no timeout transaction, then the claim output is the
// output directly on the commitment transaction, so we'll just use
// that.
// If there's no timeout transaction, it means we are spending from a
// remote commit, then the claim output is the output directly on the
// commitment transaction, so we'll just use that.
if h.htlcResolution.SignedTimeoutTx == nil {
outPointToWatch := h.htlcResolution.ClaimOutpoint
scriptToWatch := h.htlcResolution.SweepSignDesc.Output.PkScript
@ -206,11 +203,11 @@ func (h *htlcTimeoutResolver) chainDetailsToWatch() (*wire.OutPoint, []byte, err
return &outPointToWatch, scriptToWatch, nil
}
// If this is the remote party's commitment, then we'll need to grab
// watch the output that our timeout transaction points to. We can
// directly grab the outpoint, then also extract the witness script
// (the last element of the witness stack) to re-construct the pkScript
// we need to watch.
// If SignedTimeoutTx is not nil, this is the local party's commitment,
// and we'll need to grab watch the output that our timeout transaction
// points to. We can directly grab the outpoint, then also extract the
// witness script (the last element of the witness stack) to
// re-construct the pkScript we need to watch.
outPointToWatch := h.htlcResolution.SignedTimeoutTx.TxIn[0].PreviousOutPoint
witness := h.htlcResolution.SignedTimeoutTx.TxIn[0].Witness
scriptToWatch, err := input.WitnessScriptHash(witness[len(witness)-1])
@ -221,9 +218,9 @@ func (h *htlcTimeoutResolver) chainDetailsToWatch() (*wire.OutPoint, []byte, err
return &outPointToWatch, scriptToWatch, nil
}
// isSuccessSpend returns true if the passed spend on the specified commitment
// isPreimageSpend returns true if the passed spend on the specified commitment
// is a success spend that reveals the pre-image or not.
func isSuccessSpend(spend *chainntnfs.SpendDetail, localCommit bool) bool {
func isPreimageSpend(spend *chainntnfs.SpendDetail, localCommit bool) bool {
// Based on the spending input index and transaction, obtain the
// witness that tells us what type of spend this is.
spenderIndex := spend.SpenderInputIndex
@ -281,7 +278,10 @@ func (h *htlcTimeoutResolver) Resolve() (ContractResolver, error) {
// If the spend reveals the pre-image, then we'll enter the clean up
// workflow to pass the pre-image back to the incoming link, add it to
// the witness cache, and exit.
if isSuccessSpend(commitSpend, h.htlcResolution.SignedTimeoutTx != nil) {
if isPreimageSpend(
commitSpend, h.htlcResolution.SignedTimeoutTx != nil,
) {
log.Infof("%T(%v): HTLC has been swept with pre-image by "+
"remote party during timeout flow! Adding pre-image to "+
"witness cache", h.htlcResolution.ClaimOutpoint)
@ -310,6 +310,49 @@ func (h *htlcTimeoutResolver) Resolve() (ContractResolver, error) {
return h.handleCommitSpend(commitSpend)
}
// sweepSecondLevelTx sends a second level timeout transaction to the sweeper.
// This transaction uses the SINLGE|ANYONECANPAY flag.
func (h *htlcTimeoutResolver) sweepSecondLevelTx() error {
log.Infof("%T(%x): offering second-layer timeout tx to sweeper: %v",
h, h.htlc.RHash[:],
spew.Sdump(h.htlcResolution.SignedTimeoutTx))
inp := input.MakeHtlcSecondLevelTimeoutAnchorInput(
h.htlcResolution.SignedTimeoutTx,
h.htlcResolution.SignDetails,
h.broadcastHeight,
)
_, err := h.Sweeper.SweepInput(
&inp, sweep.Params{
Fee: sweep.FeePreference{
ConfTarget: secondLevelConfTarget,
},
},
)
// TODO(yy): checkpoint here?
return err
}
// sendSecondLevelTxLegacy sends a second level timeout transaction to the utxo
// nursery. This transaction uses the legacy SIGHASH_ALL flag.
func (h *htlcTimeoutResolver) sendSecondLevelTxLegacy() error {
log.Debugf("%T(%v): incubating htlc output", h,
h.htlcResolution.ClaimOutpoint)
err := h.IncubateOutputs(
h.ChanPoint, &h.htlcResolution, nil,
h.broadcastHeight,
)
if err != nil {
return err
}
h.outputIncubating = true
return h.Checkpoint(h)
}
// spendHtlcOutput handles the initial spend of an HTLC output via the timeout
// clause. If this is our local commitment, the second-level timeout TX will be
// used to spend the output into the next stage. If this is the remote
@ -322,45 +365,16 @@ func (h *htlcTimeoutResolver) spendHtlcOutput() (*chainntnfs.SpendDetail, error)
// (the case for anchor type channels). In this case we can re-sign it
// and attach fees at will. We let the sweeper handle this job.
case h.htlcResolution.SignDetails != nil && !h.outputIncubating:
log.Infof("%T(%x): offering second-layer timeout tx to "+
"sweeper: %v", h, h.htlc.RHash[:],
spew.Sdump(h.htlcResolution.SignedTimeoutTx))
inp := input.MakeHtlcSecondLevelTimeoutAnchorInput(
h.htlcResolution.SignedTimeoutTx,
h.htlcResolution.SignDetails,
h.broadcastHeight,
)
_, err := h.Sweeper.SweepInput(
&inp,
sweep.Params{
Fee: sweep.FeePreference{
ConfTarget: secondLevelConfTarget,
},
},
)
if err != nil {
if err := h.sweepSecondLevelTx(); err != nil {
log.Errorf("Sending timeout tx to sweeper: %v", err)
return nil, err
}
// If we have no SignDetails, and we haven't already sent the output to
// the utxo nursery, then we'll do so now.
case h.htlcResolution.SignDetails == nil && !h.outputIncubating:
log.Debugf("%T(%v): incubating htlc output", h,
h.htlcResolution.ClaimOutpoint)
err := h.IncubateOutputs(
h.ChanPoint, &h.htlcResolution, nil,
h.broadcastHeight,
)
if err != nil {
return nil, err
}
h.outputIncubating = true
if err := h.Checkpoint(h); err != nil {
log.Errorf("unable to Checkpoint: %v", err)
if err := h.sendSecondLevelTxLegacy(); err != nil {
log.Errorf("Sending timeout tx to nursery: %v", err)
return nil, err
}
}
@ -369,36 +383,71 @@ func (h *htlcTimeoutResolver) spendHtlcOutput() (*chainntnfs.SpendDetail, error)
// watch for a spend of the output, and make our next move off of that.
// Depending on if this is our commitment, or the remote party's
// commitment, we'll be watching a different outpoint and script.
return h.watchHtlcSpend()
}
// watchHtlcSpend watches for a spend of the HTLC output. For neutrino backend,
// it will check blocks for the confirmed spend. For btcd and bitcoind, it will
// check both the mempool and the blocks.
func (h *htlcTimeoutResolver) watchHtlcSpend() (*chainntnfs.SpendDetail,
error) {
// TODO(yy): outpointToWatch is always h.HtlcOutpoint(), can refactor
// to remove the redundancy.
outpointToWatch, scriptToWatch, err := h.chainDetailsToWatch()
if err != nil {
return nil, err
}
// If there's no mempool configured, which is the case for SPV node
// such as neutrino, then we will watch for confirmed spend only.
if h.Mempool == nil {
return h.waitForConfirmedSpend(outpointToWatch, scriptToWatch)
}
// Watch for a spend of the HTLC output in both the mempool and blocks.
return h.waitForMempoolOrBlockSpend(*outpointToWatch, scriptToWatch)
}
// waitForConfirmedSpend waits for the HTLC output to be spent and confirmed in
// a block, returns the spend details.
func (h *htlcTimeoutResolver) waitForConfirmedSpend(op *wire.OutPoint,
pkScript []byte) (*chainntnfs.SpendDetail, error) {
log.Infof("%T(%v): waiting for spent of HTLC output %v to be "+
"fully confirmed", h, h.htlcResolution.ClaimOutpoint,
outpointToWatch)
"fully confirmed", h, h.htlcResolution.ClaimOutpoint, op)
// We'll block here until either we exit, or the HTLC output on the
// commitment transaction has been spent.
spend, err := waitForSpend(
outpointToWatch, scriptToWatch, h.broadcastHeight,
h.Notifier, h.quit,
op, pkScript, h.broadcastHeight, h.Notifier, h.quit,
)
if err != nil {
return nil, err
}
// Once confirmed, persist the state on disk.
if err := h.checkPointSecondLevelTx(); err != nil {
return nil, err
}
return spend, err
}
// checkPointSecondLevelTx persists the state of a second level HTLC tx to disk
// if it's published by the sweeper.
func (h *htlcTimeoutResolver) checkPointSecondLevelTx() error {
// If this was the second level transaction published by the sweeper,
// we can checkpoint the resolver now that it's confirmed.
if h.htlcResolution.SignDetails != nil && !h.outputIncubating {
h.outputIncubating = true
if err := h.Checkpoint(h); err != nil {
log.Errorf("unable to Checkpoint: %v", err)
return nil, err
return err
}
}
return spend, err
return nil
}
// handleCommitSpend handles the spend of the HTLC output on the commitment
@ -696,3 +745,163 @@ func (h *htlcTimeoutResolver) HtlcPoint() wire.OutPoint {
// A compile time assertion to ensure htlcTimeoutResolver meets the
// ContractResolver interface.
var _ htlcContractResolver = (*htlcTimeoutResolver)(nil)
// spendResult is used to hold the result of a spend event from either a
// mempool spend or a block spend.
type spendResult struct {
// spend contains the details of the spend.
spend *chainntnfs.SpendDetail
// err is the error that occurred during the spend notification.
err error
}
// waitForMempoolOrBlockSpend waits for the htlc output to be spent by a
// transaction that's either be found in the mempool or in a block.
func (h *htlcTimeoutResolver) waitForMempoolOrBlockSpend(op wire.OutPoint,
pkScript []byte) (*chainntnfs.SpendDetail, error) {
log.Infof("%T(%v): waiting for spent of HTLC output %v to be found "+
"in mempool or block", h, h.htlcResolution.ClaimOutpoint, op)
// Subscribe for block spent(confirmed).
blockSpent, err := h.Notifier.RegisterSpendNtfn(
&op, pkScript, h.broadcastHeight,
)
if err != nil {
return nil, fmt.Errorf("register spend: %w", err)
}
// Subscribe for mempool spent(unconfirmed).
mempoolSpent, err := h.Mempool.SubscribeMempoolSpent(op)
if err != nil {
return nil, fmt.Errorf("register mempool spend: %w", err)
}
// Create a result chan that will be used to receive the spending
// events.
result := make(chan *spendResult, 2)
// Create a goroutine that will wait for either a mempool spend or a
// block spend.
//
// NOTE: no need to use waitgroup here as when the resolver exits, the
// goroutine will return on the quit channel.
go h.consumeSpendEvents(result, blockSpent.Spend, mempoolSpent.Spend)
// Wait for the spend event to be received.
select {
case event := <-result:
// Cancel the mempool subscription as we don't need it anymore.
h.Mempool.CancelMempoolSpendEvent(mempoolSpent)
return event.spend, event.err
case <-h.quit:
return nil, errResolverShuttingDown
}
}
// consumeSpendEvents consumes the spend events from the block and mempool
// subscriptions. It exits when a spend event is received from the block, or
// the resolver itself quits. When a spend event is received from the mempool,
// however, it won't exit but continuing to wait for a spend event from the
// block subscription.
//
// NOTE: there could be a case where we found the preimage in the mempool,
// which will be added to our preimage beacon and settle the incoming link,
// meanwhile the timeout sweep tx confirms. This outgoing HTLC is "free" money
// and is not swept here.
//
// TODO(yy): sweep the outgoing htlc if it's confirmed.
func (h *htlcTimeoutResolver) consumeSpendEvents(resultChan chan *spendResult,
blockSpent, mempoolSpent <-chan *chainntnfs.SpendDetail) {
op := h.HtlcPoint()
// Create a result chan to hold the results.
result := &spendResult{}
// Wait for a spend event to arrive.
for {
select {
// If a spend event is received from the block, this outgoing
// htlc is spent either by the remote via the preimage or by us
// via the timeout. We can exit the loop and `claimCleanUp`
// will feed the preimage to the beacon if found. This treats
// the block as the final judge and the preimage spent won't
// appear in the mempool afterwards.
//
// NOTE: if a reorg happens, the preimage spend can appear in
// the mempool again. Though a rare case, we should handle it
// in a dedicated reorg system.
case spendDetail, ok := <-blockSpent:
if !ok {
result.err = fmt.Errorf("block spent err: %w",
errResolverShuttingDown)
} else {
log.Debugf("Found confirmed spend of HTLC "+
"output %s in tx=%s", op,
spendDetail.SpenderTxHash)
result.spend = spendDetail
// Once confirmed, persist the state on disk.
result.err = h.checkPointSecondLevelTx()
}
// Send the result and exit the loop.
resultChan <- result
return
// If a spend event is received from the mempool, this can be
// either the 2nd stage timeout tx or a preimage spend from the
// remote. We will further check whether the spend reveals the
// preimage and add it to the preimage beacon to settle the
// incoming link.
//
// NOTE: we won't exit the loop here so we can continue to
// watch for the block spend to check point the resolution.
case spendDetail, ok := <-mempoolSpent:
if !ok {
result.err = fmt.Errorf("mempool spent err: %w",
errResolverShuttingDown)
// This is an internal error so we exit.
resultChan <- result
return
}
log.Debugf("Found mempool spend of HTLC output %s "+
"in tx=%s", op, spendDetail.SpenderTxHash)
// Check whether the spend reveals the preimage, if not
// continue the loop.
hasPreimage := isPreimageSpend(
spendDetail,
h.htlcResolution.SignedTimeoutTx != nil,
)
if !hasPreimage {
log.Debugf("HTLC output %s spent doesn't "+
"reveal preimage", op)
continue
}
// Found the preimage spend, send the result and
// continue the loop.
result.spend = spendDetail
resultChan <- result
continue
// If the resolver exits, we exit the goroutine.
case <-h.quit:
result.err = errResolverShuttingDown
resultChan <- result
return
}
}
}

View File

@ -50,6 +50,10 @@ https://github.com/lightningnetwork/lnd/pull/7359)
* Optimize script allocation size in order to save
[memory](https://github.com/lightningnetwork/lnd/pull/7464).
* When resolving outgoing HTLCs onchain, the HTLC timeout resolver will now
[monitor mempool](https://github.com/lightningnetwork/lnd/pull/7564) for
faster preimage extraction.
## Spec
* [Add test vectors for

18
go.mod
View File

@ -9,7 +9,7 @@ require (
github.com/btcsuite/btcd/btcutil/psbt v1.1.8
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f
github.com/btcsuite/btcwallet v0.16.8-0.20230324081040-520f650ce045
github.com/btcsuite/btcwallet v0.16.8
github.com/btcsuite/btcwallet/wallet/txauthor v1.3.2
github.com/btcsuite/btcwallet/wallet/txrules v1.2.0
github.com/btcsuite/btcwallet/walletdb v1.4.0
@ -50,12 +50,12 @@ require (
github.com/urfave/cli v1.22.9
go.etcd.io/etcd/client/pkg/v3 v3.5.7
go.etcd.io/etcd/client/v3 v3.5.7
golang.org/x/crypto v0.1.0
golang.org/x/crypto v0.7.0
golang.org/x/exp v0.0.0-20221111094246-ab4555d3164f
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028
golang.org/x/net v0.7.0
golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde
golang.org/x/term v0.5.0
golang.org/x/net v0.8.0
golang.org/x/sync v0.1.0
golang.org/x/term v0.6.0
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
google.golang.org/grpc v1.41.0
google.golang.org/protobuf v1.27.1
@ -149,10 +149,10 @@ require (
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.17.0 // indirect
golang.org/x/mod v0.6.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/tools v0.2.0 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/tools v0.6.0 // indirect
google.golang.org/genproto v0.0.0-20210617175327-b9e0b3197ced // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/errgo.v1 v1.0.1 // indirect

36
go.sum
View File

@ -89,8 +89,8 @@ github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2/go.mod h1:7SFka0XMvUgj3hfZtyd
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f h1:bAs4lUbRJpnnkd9VhRV3jjAVU7DJVjMaK+IsvSeZvFo=
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
github.com/btcsuite/btcwallet v0.16.8-0.20230324081040-520f650ce045 h1:302lmdYbONzrLACkm9IlCobMYuYOjF8to+tUhaQsTzc=
github.com/btcsuite/btcwallet v0.16.8-0.20230324081040-520f650ce045/go.mod h1:WBcb0CNEgzjF2EdatcERzG7cd/lhHXtNJ4VjIePOdXM=
github.com/btcsuite/btcwallet v0.16.8 h1:ADMormFSOBMANB/ZWvU00VDnFelZXY9Uyc9wW0dNFzM=
github.com/btcsuite/btcwallet v0.16.8/go.mod h1:ynC16HgMU03dYyUit6b0+IZjDg9KVnr2VNLie9ZXG1I=
github.com/btcsuite/btcwallet/wallet/txauthor v1.3.2 h1:etuLgGEojecsDOYTII8rYiGHjGyV5xTqsXi+ZQ715UU=
github.com/btcsuite/btcwallet/wallet/txauthor v1.3.2/go.mod h1:Zpk/LOb2sKqwP2lmHjaZT9AdaKsHPSbNLm2Uql5IQ/0=
github.com/btcsuite/btcwallet/wallet/txrules v1.2.0 h1:BtEN5Empw62/RVnZ0VcJaVtVlBijnLlJY+dwjAye2Bg=
@ -626,8 +626,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU=
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A=
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@ -663,8 +663,8 @@ golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzB
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0 h1:b9gGHsz9/HhJ3HF5DHQytPpuwocVTChQJK3AvoLRD5I=
golang.org/x/mod v0.6.0/go.mod h1:4mET923SAdbXp2ki8ey+zGs1SLqsuM2Y0uvdZR/fUNI=
golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20180719180050-a680a1efc54d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -703,8 +703,8 @@ golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwY
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@ -723,8 +723,8 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde h1:ejfdSekXMDxDLbRrJMwUk6KnSLZ2McaUCVcIKM+N6jc=
golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180816055513-1c9583448a9c/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@ -782,12 +782,12 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw=
golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -796,8 +796,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@ -851,8 +851,8 @@ golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc
golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.2.0 h1:G6AHpWxTMGY1KyEYoAQ5WTtIekUUvDNjan3ugu60JvE=
golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA=
golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

View File

@ -523,4 +523,12 @@ var allTestCases = []*lntest.TestCase{
Name: "channel fundmax",
TestFunc: testChannelFundMax,
},
{
Name: "htlc timeout resolver extract preimage remote",
TestFunc: testHtlcTimeoutResolverExtractPreimageRemote,
},
{
Name: "htlc timeout resolver extract preimage local",
TestFunc: testHtlcTimeoutResolverExtractPreimageLocal,
},
}

View File

@ -1838,3 +1838,291 @@ func createThreeHopNetwork(ht *lntest.HarnessTest,
return aliceChanPoint, bobChanPoint, carol
}
// testHtlcTimeoutResolverExtractPreimageRemote tests that in the multi-hop
// setting, Alice->Bob->Carol, when Bob's outgoing HTLC is swept by Carol using
// the 2nd level success tx2nd level success tx, Bob's timeout resolver will
// extract the preimage from the sweep tx found in mempool or blocks(for
// neutrino). The 2nd level success tx is broadcast by Carol and spends the
// outpoint on her commit tx.
func testHtlcTimeoutResolverExtractPreimageRemote(ht *lntest.HarnessTest) {
runMultiHopHtlcClaimTest(ht, runExtraPreimageFromRemoteCommit)
}
// runExtraPreimageFromRemoteCommit checks that Bob's htlc timeout resolver
// will extract the preimage from the 2nd level success tx broadcast by Carol
// which spends the htlc output on her commitment tx.
func runExtraPreimageFromRemoteCommit(ht *lntest.HarnessTest,
alice, bob *node.HarnessNode, c lnrpc.CommitmentType, zeroConf bool) {
// First, we'll create a three hop network: Alice -> Bob -> Carol, with
// Carol refusing to actually settle or directly cancel any HTLC's
// self.
aliceChanPoint, bobChanPoint, carol := createThreeHopNetwork(
ht, alice, bob, false, c, zeroConf,
)
// With the network active, we'll now add a new hodl invoice at Carol's
// end. Make sure the cltv expiry delta is large enough, otherwise Bob
// won't send out the outgoing htlc.
preimage := ht.RandomPreimage()
payHash := preimage.Hash()
invoiceReq := &invoicesrpc.AddHoldInvoiceRequest{
Value: 100_000,
CltvExpiry: finalCltvDelta,
Hash: payHash[:],
}
eveInvoice := carol.RPC.AddHoldInvoice(invoiceReq)
// Subscribe the invoice.
stream := carol.RPC.SubscribeSingleInvoice(payHash[:])
// Now that we've created the invoice, we'll send a single payment from
// Alice to Carol. We won't wait for the response however, as Carol
// will not immediately settle the payment.
req := &routerrpc.SendPaymentRequest{
PaymentRequest: eveInvoice.PaymentRequest,
TimeoutSeconds: 60,
FeeLimitMsat: noFeeLimitMsat,
}
alice.RPC.SendPayment(req)
// Once the payment sent, Alice should have one outgoing HTLC active.
ht.AssertOutgoingHTLCActive(alice, aliceChanPoint, payHash[:])
// Bob should have two HTLCs active. One incoming HTLC from Alice, and
// one outgoing to Carol.
ht.AssertIncomingHTLCActive(bob, aliceChanPoint, payHash[:])
htlc := ht.AssertOutgoingHTLCActive(bob, bobChanPoint, payHash[:])
// Carol should have one incoming HTLC from Bob.
ht.AssertIncomingHTLCActive(carol, bobChanPoint, payHash[:])
// Wait for Carol to mark invoice as accepted. There is a small gap to
// bridge between adding the htlc to the channel and executing the exit
// hop logic.
ht.AssertInvoiceState(stream, lnrpc.Invoice_ACCEPTED)
// Bob now goes offline so the link between Bob and Carol is broken.
restartBob := ht.SuspendNode(bob)
// Carol now settles the invoice, since her link with Bob is broken,
// Bob won't know the preimage.
carol.RPC.SettleInvoice(preimage[:])
// We'll now mine enough blocks to trigger Carol's broadcast of her
// commitment transaction due to the fact that the HTLC is about to
// timeout. With the default incoming broadcast delta of 10, this
// will be the htlc expiry height minus 10.
numBlocks := padCLTV(uint32(
invoiceReq.CltvExpiry - lncfg.DefaultIncomingBroadcastDelta,
))
ht.MineBlocks(numBlocks)
// Carol's force close transaction should now be found in the mempool.
// If there are anchors, we also expect Carol's anchor sweep. We now
// mine a block to confirm Carol's closing transaction.
ht.MineClosingTx(bobChanPoint, c)
// With the closing transaction confirmed, we should expect Carol's
// HTLC success transaction to be broadcast.
ht.Miner.AssertNumTxsInMempool(1)
// Restart Bob. Once he finishes syncing the channel state, he should
// notice the force close from Carol.
require.NoError(ht, restartBob())
// Get the current height to compute number of blocks to mine to
// trigger the htlc timeout resolver from Bob.
_, height := ht.Miner.GetBestBlock()
// We'll now mine enough blocks to trigger Bob's timeout resolver.
numBlocks = htlc.ExpirationHeight - uint32(height) -
lncfg.DefaultOutgoingBroadcastDelta
// Mine empty blocks so Carol's htlc success tx stays in mempool. Once
// the height is reached, Bob's timeout resolver will resolve the htlc
// by extracing the preimage from the mempool.
ht.MineEmptyBlocks(int(numBlocks))
// For neutrino backend, the timeout resolver needs to extract the
// preimage from the blocks.
if ht.IsNeutrinoBackend() {
// Mine a block to confirm Carol's 2nd level success tx.
ht.MineBlocksAndAssertNumTxes(1, 1)
}
// Finally, check that the Alice's payment is marked as succeeded as
// Bob has settled the htlc using the preimage extracted from Carol's
// 2nd level success tx.
ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_SUCCEEDED)
// NOTE: for non-standby nodes there's no need to clean up the force
// close as long as the mempool is cleaned.
ht.CleanShutDown()
}
// testHtlcTimeoutResolverExtractPreimage tests that in the multi-hop setting,
// Alice->Bob->Carol, when Bob's outgoing HTLC is swept by Carol using the
// direct preimage spend, Bob's timeout resolver will extract the preimage from
// the sweep tx found in mempool or blocks(for neutrino). The direct spend tx
// is broadcast by Carol and spends the outpoint on Bob's commit tx.
func testHtlcTimeoutResolverExtractPreimageLocal(ht *lntest.HarnessTest) {
runMultiHopHtlcClaimTest(ht, runExtraPreimageFromLocalCommit)
}
// runExtraPreimageFromLocalCommit checks that Bob's htlc timeout resolver will
// extract the preimage from the direct spend broadcast by Carol which spends
// the htlc output on Bob's commitment tx.
func runExtraPreimageFromLocalCommit(ht *lntest.HarnessTest,
alice, bob *node.HarnessNode, c lnrpc.CommitmentType, zeroConf bool) {
// First, we'll create a three hop network: Alice -> Bob -> Carol, with
// Carol refusing to actually settle or directly cancel any HTLC's
// self.
aliceChanPoint, bobChanPoint, carol := createThreeHopNetwork(
ht, alice, bob, false, c, zeroConf,
)
// With the network active, we'll now add a new hodl invoice at Carol's
// end. Make sure the cltv expiry delta is large enough, otherwise Bob
// won't send out the outgoing htlc.
preimage := ht.RandomPreimage()
payHash := preimage.Hash()
invoiceReq := &invoicesrpc.AddHoldInvoiceRequest{
Value: 100_000,
CltvExpiry: finalCltvDelta,
Hash: payHash[:],
}
eveInvoice := carol.RPC.AddHoldInvoice(invoiceReq)
// Subscribe the invoice.
stream := carol.RPC.SubscribeSingleInvoice(payHash[:])
// Now that we've created the invoice, we'll send a single payment from
// Alice to Carol. We won't wait for the response however, as Carol
// will not immediately settle the payment.
req := &routerrpc.SendPaymentRequest{
PaymentRequest: eveInvoice.PaymentRequest,
TimeoutSeconds: 60,
FeeLimitMsat: noFeeLimitMsat,
}
alice.RPC.SendPayment(req)
// Once the payment sent, Alice should have one outgoing HTLC active.
ht.AssertOutgoingHTLCActive(alice, aliceChanPoint, payHash[:])
// Bob should have two HTLCs active. One incoming HTLC from Alice, and
// one outgoing to Carol.
ht.AssertIncomingHTLCActive(bob, aliceChanPoint, payHash[:])
htlc := ht.AssertOutgoingHTLCActive(bob, bobChanPoint, payHash[:])
// Carol should have one incoming HTLC from Bob.
ht.AssertIncomingHTLCActive(carol, bobChanPoint, payHash[:])
// Wait for Carol to mark invoice as accepted. There is a small gap to
// bridge between adding the htlc to the channel and executing the exit
// hop logic.
ht.AssertInvoiceState(stream, lnrpc.Invoice_ACCEPTED)
// Bob now goes offline so the link between Bob and Carol is broken.
restartBob := ht.SuspendNode(bob)
// Carol now settles the invoice, since her link with Bob is broken,
// Bob won't know the preimage.
carol.RPC.SettleInvoice(preimage[:])
// Stop Carol so it's easier to check the mempool's state since she
// will broadcast the anchor sweeping once Bob force closes.
restartCarol := ht.SuspendNode(carol)
// Restart Bob to force close the channel.
require.NoError(ht, restartBob())
// Bob force closes the channel, which gets his commitment tx into the
// mempool.
ht.CloseChannelAssertPending(bob, bobChanPoint, true)
// Mine Bob's force close tx.
closeTx := ht.MineClosingTx(bobChanPoint, c)
// We'll now mine enough blocks to trigger Carol's sweeping of the htlc
// via the direct spend. With the default incoming broadcast delta of
// 10, this will be the htlc expiry height minus 10.
//
// NOTE: we need to mine 1 fewer block as we've already mined one to
// confirm Bob's force close tx.
numBlocks := padCLTV(uint32(
invoiceReq.CltvExpiry - lncfg.DefaultIncomingBroadcastDelta - 1,
))
// Mine empty blocks so it's easier to check Bob's sweeping txes below.
ht.MineEmptyBlocks(int(numBlocks))
// Increase the fee rate used by the sweeper so Carol's direct spend tx
// won't be replaced by Bob's timeout tx.
ht.SetFeeEstimate(30000)
// Restart Carol to sweep the htlc output.
require.NoError(ht, restartCarol())
// Construct the htlc output on Bob's commitment tx, and decide its
// index based on the commit type below.
htlcOutpoint := wire.OutPoint{Hash: closeTx.TxHash()}
// Check the current mempool state and we should see,
// - Carol's direct spend tx.
// - Bob's local output sweep tx, if this is NOT script enforced lease.
// - Carol's anchor sweep tx, if the commitment type is anchor.
switch c {
case lnrpc.CommitmentType_LEGACY:
htlcOutpoint.Index = 0
ht.Miner.AssertNumTxsInMempool(2)
case lnrpc.CommitmentType_ANCHORS:
htlcOutpoint.Index = 2
ht.Miner.AssertNumTxsInMempool(3)
case lnrpc.CommitmentType_SCRIPT_ENFORCED_LEASE:
htlcOutpoint.Index = 2
ht.Miner.AssertNumTxsInMempool(2)
}
// Get the current height to compute number of blocks to mine to
// trigger the timeout resolver from Bob.
_, height := ht.Miner.GetBestBlock()
// We'll now mine enough blocks to trigger Bob's htlc timeout resolver
// to act. Once his timeout resolver starts, it will extract the
// preimage from Carol's direct spend tx found in the mempool.
numBlocks = htlc.ExpirationHeight - uint32(height) -
lncfg.DefaultOutgoingBroadcastDelta
// Decrease the fee rate used by the sweeper so Bob's timeout tx will
// not replace Carol's direct spend tx.
ht.SetFeeEstimate(1000)
// Mine empty blocks so Carol's direct spend tx stays in mempool. Once
// the height is reached, Bob's timeout resolver will resolve the htlc
// by extracing the preimage from the mempool.
ht.MineEmptyBlocks(int(numBlocks))
// For neutrino backend, the timeout resolver needs to extract the
// preimage from the blocks.
if ht.IsNeutrinoBackend() {
// Make sure the direct spend tx is still in the mempool.
ht.Miner.AssertOutpointInMempool(htlcOutpoint)
// Mine a block to confirm Carol's direct spend tx.
ht.MineBlocks(1)
}
// Finally, check that the Alice's payment is marked as succeeded as
// Bob has settled the htlc using the preimage extracted from Carol's
// direct spend tx.
ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_SUCCEEDED)
// NOTE: for non-standby nodes there's no need to clean up the force
// close as long as the mempool is cleaned.
ht.CleanShutDown()
}

View File

@ -2,6 +2,12 @@ package lncfg
import "time"
const (
// DefaultTxPollingJitter defines the default TxPollingIntervalJitter
// to be used for bitcoind backend.
DefaultTxPollingJitter = 0.5
)
// Bitcoind holds the configuration options for the daemon's connection to
// bitcoind.
//

View File

@ -1610,6 +1610,57 @@ func (h *HarnessTest) MineBlocksAndAssertNumTxes(num uint32,
return blocks
}
// cleanMempool mines blocks till the mempool is empty and asserts all active
// nodes have synced to the chain.
func (h *HarnessTest) cleanMempool() {
_, startHeight := h.Miner.GetBestBlock()
// Mining the blocks slow to give `lnd` more time to sync.
var bestBlock *wire.MsgBlock
err := wait.NoError(func() error {
// If mempool is empty, exit.
mem := h.Miner.GetRawMempool()
if len(mem) == 0 {
_, height := h.Miner.GetBestBlock()
h.Logf("Mined %d blocks when cleanup the mempool",
height-startHeight)
return nil
}
// Otherwise mine a block.
blocks := h.Miner.MineBlocksSlow(1)
bestBlock = blocks[len(blocks)-1]
return fmt.Errorf("still have %d txes in mempool", len(mem))
}, wait.MinerMempoolTimeout)
require.NoError(h, err, "timeout cleaning up mempool")
// Exit early if the best block is nil, which means we haven't mined
// any blocks during the cleanup.
if bestBlock == nil {
return
}
// Make sure all the active nodes are synced.
h.AssertActiveNodesSyncedTo(bestBlock)
}
// CleanShutDown is used to quickly end a test by shutting down all non-standby
// nodes and mining blocks to empty the mempool.
//
// NOTE: this method provides a faster exit for a test that involves force
// closures as the caller doesn't need to mine all the blocks to make sure the
// mempool is empty.
func (h *HarnessTest) CleanShutDown() {
// First, shutdown all non-standby nodes to prevent new transactions
// being created and fed into the mempool.
h.shutdownNonStandbyNodes()
// Now mine blocks till the mempool is empty.
h.cleanMempool()
}
// MineEmptyBlocks mines a given number of empty blocks.
//
// NOTE: this differs from miner's `MineEmptyBlocks` as it requires the nodes

View File

@ -887,6 +887,15 @@ func (h *HarnessTest) Random32Bytes() []byte {
return randBuf
}
// RandomPreimage generates a random preimage which can be used as a payment
// preimage.
func (h *HarnessTest) RandomPreimage() lntypes.Preimage {
var preimage lntypes.Preimage
copy(preimage[:], h.Random32Bytes())
return preimage
}
// DecodeAddress decodes a given address and asserts there's no error.
func (h *HarnessTest) DecodeAddress(addr string) btcutil.Address {
resp, err := btcutil.DecodeAddress(addr, harnessNetParams)
@ -1256,6 +1265,111 @@ func (h *HarnessTest) AssertActiveHtlcs(hn *node.HarnessNode,
require.NoError(h, err, "timeout checking active HTLCs")
}
// AssertIncomingHTLCActive asserts the node has a pending incoming HTLC in the
// given channel. Returns the HTLC if found and active.
func (h *HarnessTest) AssertIncomingHTLCActive(hn *node.HarnessNode,
cp *lnrpc.ChannelPoint, payHash []byte) *lnrpc.HTLC {
return h.assertHLTCActive(hn, cp, payHash, true)
}
// AssertOutgoingHTLCActive asserts the node has a pending outgoing HTLC in the
// given channel. Returns the HTLC if found and active.
func (h *HarnessTest) AssertOutgoingHTLCActive(hn *node.HarnessNode,
cp *lnrpc.ChannelPoint, payHash []byte) *lnrpc.HTLC {
return h.assertHLTCActive(hn, cp, payHash, false)
}
// assertHLTCActive asserts the node has a pending HTLC in the given channel.
// Returns the HTLC if found and active.
func (h *HarnessTest) assertHLTCActive(hn *node.HarnessNode,
cp *lnrpc.ChannelPoint, payHash []byte, incoming bool) *lnrpc.HTLC {
var result *lnrpc.HTLC
target := hex.EncodeToString(payHash)
err := wait.NoError(func() error {
// We require the RPC call to be succeeded and won't wait for
// it as it's an unexpected behavior.
ch := h.GetChannelByChanPoint(hn, cp)
// Check all payment hashes active for this channel.
for _, htlc := range ch.PendingHtlcs {
h := hex.EncodeToString(htlc.HashLock)
if h != target {
continue
}
// If the payment hash is found, check the incoming
// field.
if htlc.Incoming == incoming {
// Found it and return.
result = htlc
return nil
}
// Otherwise we do have the HTLC but its direction is
// not right.
have, want := "outgoing", "incoming"
if htlc.Incoming {
have, want = "incoming", "outgoing"
}
return fmt.Errorf("node[%s] have htlc(%v), want: %s, "+
"have: %s", hn.Name(), payHash, want, have)
}
return fmt.Errorf("node [%s:%x] didn't have: the payHash %v",
hn.Name(), hn.PubKey[:], payHash)
}, DefaultTimeout)
require.NoError(h, err, "timeout checking pending HTLC")
return result
}
// AssertHLTCNotActive asserts the node doesn't have a pending HTLC in the
// given channel, which mean either the HTLC never exists, or it was pending
// and now settled. Returns the HTLC if found and active.
//
// NOTE: to check a pending HTLC becoming settled, first use AssertHLTCActive
// then follow this check.
func (h *HarnessTest) AssertHLTCNotActive(hn *node.HarnessNode,
cp *lnrpc.ChannelPoint, payHash []byte) *lnrpc.HTLC {
var result *lnrpc.HTLC
target := hex.EncodeToString(payHash)
err := wait.NoError(func() error {
// We require the RPC call to be succeeded and won't wait for
// it as it's an unexpected behavior.
ch := h.GetChannelByChanPoint(hn, cp)
// Check all payment hashes active for this channel.
for _, htlc := range ch.PendingHtlcs {
h := hex.EncodeToString(htlc.HashLock)
// Break if found the htlc.
if h == target {
result = htlc
break
}
}
// If we've found nothing, we're done.
if result == nil {
return nil
}
// Otherwise return an error.
return fmt.Errorf("node [%s:%x] still has: the payHash %x",
hn.Name(), hn.PubKey[:], payHash)
}, DefaultTimeout)
require.NoError(h, err, "timeout checking pending HTLC")
return result
}
// ReceiveSingleInvoice waits until a message is received on the subscribe
// single invoice stream or the timeout is reached.
func (h *HarnessTest) ReceiveSingleInvoice(
@ -1450,16 +1564,17 @@ func (h *HarnessTest) AssertPaymentStatus(hn *node.HarnessNode,
status lnrpc.Payment_PaymentStatus) *lnrpc.Payment {
var target *lnrpc.Payment
payHash := preimage.Hash()
err := wait.NoError(func() error {
p := h.findPayment(hn, preimage.Hash().String())
p := h.findPayment(hn, payHash.String())
if status == p.Status {
target = p
return nil
}
return fmt.Errorf("payment: %v status not match, want %s "+
"got %s", preimage, status, p.Status)
"got %s", payHash, status, p.Status)
}, DefaultTimeout)
require.NoError(h, err, "timeout checking payment status")
@ -2349,3 +2464,53 @@ func (h *HarnessTest) AssertWalletAccountBalance(hn *node.HarnessNode,
}, DefaultTimeout)
require.NoError(h, err, "timeout checking wallet account balance")
}
// AssertClosingTxInMempool assert that the closing transaction of the given
// channel point can be found in the mempool. If the channel has anchors, it
// will assert the anchor sweep tx is also in the mempool.
func (h *HarnessTest) AssertClosingTxInMempool(cp *lnrpc.ChannelPoint,
c lnrpc.CommitmentType) *wire.MsgTx {
// Get expected number of txes to be found in the mempool.
expectedTxes := 1
hasAnchors := CommitTypeHasAnchors(c)
if hasAnchors {
expectedTxes = 2
}
// Wait for the expected txes to be found in the mempool.
h.Miner.AssertNumTxsInMempool(expectedTxes)
// Get the closing tx from the mempool.
op := h.OutPointFromChannelPoint(cp)
closeTx := h.Miner.AssertOutpointInMempool(op)
return closeTx
}
// AssertClosingTxInMempool assert that the closing transaction of the given
// channel point can be found in the mempool. If the channel has anchors, it
// will assert the anchor sweep tx is also in the mempool.
func (h *HarnessTest) MineClosingTx(cp *lnrpc.ChannelPoint,
c lnrpc.CommitmentType) *wire.MsgTx {
// Get expected number of txes to be found in the mempool.
expectedTxes := 1
hasAnchors := CommitTypeHasAnchors(c)
if hasAnchors {
expectedTxes = 2
}
// Wait for the expected txes to be found in the mempool.
h.Miner.AssertNumTxsInMempool(expectedTxes)
// Get the closing tx from the mempool.
op := h.OutPointFromChannelPoint(cp)
closeTx := h.Miner.AssertOutpointInMempool(op)
// Mine a block to confirm the closing transaction and potential anchor
// sweep.
h.MineBlocksAndAssertNumTxes(1, expectedTxes)
return closeTx
}

View File

@ -301,6 +301,40 @@ func (h *HarnessMiner) AssertTxInMempool(txid *chainhash.Hash) *wire.MsgTx {
return msgTx
}
// AssertTxNotInMempool asserts a given transaction cannot be found in the
// mempool. It assumes the mempool is not empty.
//
// NOTE: this should be used after `AssertTxInMempool` to ensure the tx has
// entered the mempool before. Otherwise it might give false positive and the
// tx may enter the mempool after the check.
func (h *HarnessMiner) AssertTxNotInMempool(txid chainhash.Hash) *wire.MsgTx {
var msgTx *wire.MsgTx
err := wait.NoError(func() error {
// We require the RPC call to be succeeded and won't wait for
// it as it's an unexpected behavior.
mempool := h.GetRawMempool()
if len(mempool) == 0 {
return fmt.Errorf("empty mempool")
}
for _, memTx := range mempool {
// Check the values are equal.
if txid.IsEqual(memTx) {
return fmt.Errorf("expect txid %v to be NOT "+
"found in mempool", txid)
}
}
return nil
}, wait.MinerMempoolTimeout)
require.NoError(h, err, "timeout checking tx not in mempool")
return msgTx
}
// SendOutputsWithoutChange uses the miner to send the given outputs using the
// specified fee rate and returns the txid.
func (h *HarnessMiner) SendOutputsWithoutChange(outputs []*wire.TxOut,

View File

@ -236,6 +236,10 @@ func (cfg *BaseNodeConfig) GenArgs() []string {
// Use a small cache duration so the `DescribeGraph` can be
// updated quicker.
"--caches.rpc-graph-cache-duration=100ms",
// Speed up the tests for bitcoind backend.
"--bitcoind.blockpollinginterval=100ms",
"--bitcoind.txpollinginterval=100ms",
}
args = append(args, nodeArgs...)

View File

@ -1148,6 +1148,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
},
PreimageDB: s.witnessBeacon,
Notifier: cc.ChainNotifier,
Mempool: cc.MempoolNotifier,
Signer: cc.Wallet.Cfg.Signer,
FeeEstimator: cc.FeeEstimator,
ChainIO: cc.ChainIO,

View File

@ -1,6 +1,7 @@
package lnd
import (
"errors"
"sync"
"github.com/lightningnetwork/lnd/channeldb"
@ -127,6 +128,11 @@ func (p *preimageBeacon) LookupPreimage(
// Otherwise, we'll perform a final check using the witness cache.
preimage, err := p.wCache.LookupSha256Witness(payHash)
if errors.Is(err, channeldb.ErrNoWitnesses) {
ltndLog.Debugf("No witness for payment %v", payHash)
return lntypes.Preimage{}, false
}
if err != nil {
ltndLog.Errorf("Unable to lookup witness: %v", err)
return lntypes.Preimage{}, false
@ -147,7 +153,9 @@ func (p *preimageBeacon) AddPreimages(preimages ...lntypes.Preimage) error {
// the caller when delivering notifications.
preimageCopies := make([]lntypes.Preimage, 0, len(preimages))
for _, preimage := range preimages {
srvrLog.Infof("Adding preimage=%v to witness cache", preimage)
srvrLog.Infof("Adding preimage=%v to witness cache for %v",
preimage, preimage.Hash())
preimageCopies = append(preimageCopies, preimage)
}
@ -174,6 +182,9 @@ func (p *preimageBeacon) AddPreimages(preimages ...lntypes.Preimage) error {
}(client)
}
srvrLog.Debugf("Added %d preimage(s) to witness cache",
len(preimageCopies))
return nil
}