From 10929d80cc2531d3ee4fe39bb14fbe7e85744b8c Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 23 Feb 2023 17:59:06 -0800 Subject: [PATCH 1/5] lnwallet: add new rebroadcaster interface, use for background tx publish In this commit, we add a new Rebroadcaster interface to be used for publishing transactions passively in the background until they've been confirmed on chain. This is useful if a tx drops out of the mempool, but then the pool clears down and has more space available to accept the tx at the current fee level. --- lnwallet/config.go | 5 +++ lnwallet/rebroadcaster.go | 27 +++++++++++++++ lnwallet/wallet.go | 72 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 104 insertions(+) create mode 100644 lnwallet/rebroadcaster.go diff --git a/lnwallet/config.go b/lnwallet/config.go index cf7f3f4b8..f18d6e529 100644 --- a/lnwallet/config.go +++ b/lnwallet/config.go @@ -56,4 +56,9 @@ type Config struct { // NetParams is the set of parameters that tells the wallet which chain // it will be operating on. NetParams chaincfg.Params + + // Rebroadcaster is an optional config param that can be used to + // passively rebroadcast transactions in the background until they're + // detected as being confirmed. + Rebroadcaster Rebroadcaster } diff --git a/lnwallet/rebroadcaster.go b/lnwallet/rebroadcaster.go new file mode 100644 index 000000000..bb139f0a6 --- /dev/null +++ b/lnwallet/rebroadcaster.go @@ -0,0 +1,27 @@ +package lnwallet + +import ( + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" +) + +// Rebroadcaster is an abstract rebroadcaster instance that'll continually +// rebroadcast transactions in the background until they're confirmed. +type Rebroadcaster interface { + // Start launches all goroutines the rebroadcaster needs to operate. + Start() error + + // Started returns true if the broadcaster is already active. + Started() bool + + // Stop terminates the rebroadcaster and all goroutines it spawned. + Stop() + + // Broadcast enqueues a transaction to be rebroadcast until it's been + // confirmed. + Broadcast(tx *wire.MsgTx) error + + // MarkAsConfirmed marks a transaction as confirmed, so it won't be + // rebroadcast. + MarkAsConfirmed(txid chainhash.Hash) +} diff --git a/lnwallet/wallet.go b/lnwallet/wallet.go index a76c706dd..091affa34 100644 --- a/lnwallet/wallet.go +++ b/lnwallet/wallet.go @@ -407,6 +407,15 @@ func (l *LightningWallet) Startup() error { return err } + if l.Cfg.Rebroadcaster != nil { + go func() { + if err := l.Cfg.Rebroadcaster.Start(); err != nil { + walletLog.Errorf("unable to start "+ + "rebroadcaster: %v", err) + } + }() + } + l.wg.Add(1) // TODO(roasbeef): multiple request handlers? go l.requestHandler() @@ -426,11 +435,74 @@ func (l *LightningWallet) Shutdown() error { return err } + if l.Cfg.Rebroadcaster != nil && l.Cfg.Rebroadcaster.Started() { + l.Cfg.Rebroadcaster.Stop() + } + close(l.quit) l.wg.Wait() return nil } +// PublishTransaction wraps the wallet controller tx publish method with an +// extra rebroadcaster layer if the sub-system is configured. +func (l *LightningWallet) PublishTransaction(tx *wire.MsgTx, + label string) error { + + sendTxToWallet := func() error { + return l.WalletController.PublishTransaction(tx, label) + } + + // If we don't have rebroadcaster then we can exit early (and send only + // to the wallet). + if l.Cfg.Rebroadcaster == nil || !l.Cfg.Rebroadcaster.Started() { + return sendTxToWallet() + } + + // We pass this into the rebroadcaster first, so the initial attempt + // will succeed if the transaction isn't yet in the mempool. However we + // ignore the error here as this might be resent on start up and the + // transaction already exists. + _ = l.Cfg.Rebroadcaster.Broadcast(tx) + + // Then we pass things into the wallet as normal, which'll add the + // transaction label on disk. + if err := sendTxToWallet(); err != nil { + return err + } + + // TODO(roasbeef): want diff height actually? no context though + _, bestHeight, err := l.Cfg.ChainIO.GetBestBlock() + if err != nil { + return err + } + + txHash := tx.TxHash() + go func() { + const numConfs = 6 + + txConf, err := l.Cfg.Notifier.RegisterConfirmationsNtfn( + &txHash, tx.TxOut[0].PkScript, numConfs, uint32(bestHeight), + ) + if err != nil { + return + } + + select { + case <-txConf.Confirmed: + // TODO(roasbeef): also want to remove from + // rebroadcaster if conflict happens...deeper wallet + // integration? + l.Cfg.Rebroadcaster.MarkAsConfirmed(tx.TxHash()) + + case <-l.quit: + return + } + }() + + return nil +} + // ConfirmedBalance returns the current confirmed balance of a wallet account. // This methods wraps the internal WalletController method so we're able to // properly hold the coin select mutex while we compute the balance. From 42343184f49f362c843b9001bf7655a5923a8111 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 23 Feb 2023 17:59:44 -0800 Subject: [PATCH 2/5] lnd: hook up neutrino's rebroadcaster for full node backends Neutrino already runs this rebroadcaster in the background, so we don't need to create it again if we're running in that operating mode. --- config_builder.go | 87 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/config_builder.go b/config_builder.go index 225c82b96..08b134270 100644 --- a/config_builder.go +++ b/config_builder.go @@ -11,18 +11,23 @@ import ( "sort" "strconv" "strings" + "sync/atomic" "time" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btclog" "github.com/btcsuite/btcwallet/waddrmgr" "github.com/btcsuite/btcwallet/wallet" "github.com/btcsuite/btcwallet/walletdb" proxy "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/lightninglabs/neutrino" + "github.com/lightninglabs/neutrino/blockntfns" "github.com/lightninglabs/neutrino/headerfs" + "github.com/lightninglabs/neutrino/pushtx" "github.com/lightningnetwork/lnd/blockcache" + "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainreg" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/invoices" @@ -606,6 +611,65 @@ func (d *DefaultWalletImpl) BuildWalletConfig(ctx context.Context, return partialChainControl, walletConfig, cleanUp, nil } +// proxyBlockEpoch proxies a block epoch subsections to the underlying neutrino +// rebroadcaster client. +func proxyBlockEpoch(notifier chainntnfs.ChainNotifier, +) func() (*blockntfns.Subscription, error) { + + return func() (*blockntfns.Subscription, error) { + blockEpoch, err := notifier.RegisterBlockEpochNtfn( + nil, + ) + if err != nil { + return nil, err + } + + sub := blockntfns.Subscription{ + Notifications: make(chan blockntfns.BlockNtfn, 6), + Cancel: blockEpoch.Cancel, + } + go func() { + for blk := range blockEpoch.Epochs { + ntfn := blockntfns.NewBlockConnected( + *blk.BlockHeader, + uint32(blk.Height), + ) + + sub.Notifications <- ntfn + } + }() + + return &sub, nil + } +} + +// walletReBroadcaster is a simple wrapper around the pushtx.Broadcaster +// interface to adhere to the expanded lnwallet.Rebraodcaster interface. +type walletReBroadcaster struct { + started atomic.Bool + + *pushtx.Broadcaster +} + +// newWalletReBroadcaster creates a new instance of the walletReBroadcaster. +func newWalletReBroadcaster(broadcaster *pushtx.Broadcaster) *walletReBroadcaster { + return &walletReBroadcaster{ + Broadcaster: broadcaster, + } +} + +// Start launches all goroutines the rebroadcaster needs to operate. +func (w *walletReBroadcaster) Start() error { + defer w.started.Store(true) + + return w.Broadcaster.Start() +} + +// Started returns true if the broadcaster is already active. +func (w *walletReBroadcaster) Started() bool { + return w.started.Load() +} + // BuildChainControl is responsible for creating a fully populated chain // control instance from a wallet. // @@ -641,6 +705,29 @@ func (d *DefaultWalletImpl) BuildChainControl( NetParams: *walletConfig.NetParams, } + // The broadcast is already always active for neutrino nodes, so we + // don't want to create a rebroadcast loop. + if partialChainControl.Cfg.NeutrinoCS == nil { + broadcastCfg := pushtx.Config{ + Broadcast: func(tx *wire.MsgTx) error { + cs := partialChainControl.ChainSource + _, err := cs.SendRawTransaction( + tx, true, + ) + + return err + }, + SubscribeBlocks: proxyBlockEpoch( + partialChainControl.ChainNotifier, + ), + RebroadcastInterval: pushtx.DefaultRebroadcastInterval, + } + + lnWalletConfig.Rebroadcaster = newWalletReBroadcaster( + pushtx.NewBroadcaster(&broadcastCfg), + ) + } + // We've created the wallet configuration now, so we can finish // initializing the main chain control. activeChainControl, cleanUp, err := chainreg.NewChainControl( From 6b8a1f1d67645c71280f3f2f59e8e4f31fb1c542 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 7 Mar 2023 17:07:22 -0800 Subject: [PATCH 3/5] docs/release-notes: add note for new tx rebroadcast --- docs/release-notes/release-notes-0.16.0.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/release-notes/release-notes-0.16.0.md b/docs/release-notes/release-notes-0.16.0.md index 57f9f647a..4b033687c 100644 --- a/docs/release-notes/release-notes-0.16.0.md +++ b/docs/release-notes/release-notes-0.16.0.md @@ -12,6 +12,12 @@ that might lead to channel updates being missed, causing channel graph being incomplete. Aside from that, a potential announcement messages being sent out of order is also [fixed](https://github.com/lightningnetwork/lnd/pull/7264). +`lnd` will now attempt to [rebroadcast unconfirmed +transactions](https://github.com/lightningnetwork/lnd/pull/7448) with each +passing block the transaction hasn't been confirmed. This was already the +default for the neutrino backend. This complements the existing behavior where +all unconfirmed transactions are rebroadcast on start up. + ## BOLT Specs * Warning messages from peers are now recognized and From e007125f78963f7436cd1f7b4f019ef690b3208e Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 9 Mar 2023 15:34:13 -0800 Subject: [PATCH 4/5] lnutils: add RecvOrTimeout This abstracts out a common pattern where we wait for a send on a channel, and timeout otherwise. --- lnutils/chan.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 lnutils/chan.go diff --git a/lnutils/chan.go b/lnutils/chan.go new file mode 100644 index 000000000..b85c0de96 --- /dev/null +++ b/lnutils/chan.go @@ -0,0 +1,18 @@ +package lnutils + +import ( + "fmt" + "time" +) + +// RecvOrTimeout attempts to recv over chan c, returning the value. If the +// timeout passes before the recv succeeds, an error is returned +func RecvOrTimeout[T any](c <-chan T, timeout time.Duration) (*T, error) { + select { + case m := <-c: + return &m, nil + + case <-time.After(timeout): + return nil, fmt.Errorf("timeout hit") + } +} From 8754547dede44fde9f3f530864fee8c8b404edd3 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 9 Mar 2023 15:36:21 -0800 Subject: [PATCH 5/5] lnwallet: add tests for the new rebroadcaster logic We needed to copy some mocks from elsewhere in the codebase, as otherwise we'd run into an import cycle. --- lnwallet/mock.go | 351 +++++++++++++++++++++++++++++++++ lnwallet/rebroadcaster_test.go | 197 ++++++++++++++++++ 2 files changed, 548 insertions(+) create mode 100644 lnwallet/mock.go create mode 100644 lnwallet/rebroadcaster_test.go diff --git a/lnwallet/mock.go b/lnwallet/mock.go new file mode 100644 index 000000000..9fd9891c7 --- /dev/null +++ b/lnwallet/mock.go @@ -0,0 +1,351 @@ +package lnwallet + +import ( + "encoding/hex" + "sync/atomic" + "time" + + "github.com/btcsuite/btcd/btcec/v2" + "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btcd/btcutil/hdkeychain" + "github.com/btcsuite/btcd/btcutil/psbt" + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcwallet/waddrmgr" + base "github.com/btcsuite/btcwallet/wallet" + "github.com/btcsuite/btcwallet/wallet/txauthor" + "github.com/btcsuite/btcwallet/wtxmgr" + "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/lnwallet/chainfee" +) + +var ( + CoinPkScript, _ = hex.DecodeString("001431df1bde03c074d0cf21ea2529427e1499b8f1de") +) + +// mockWalletController is a mock implementation of the WalletController +// interface. It let's us mock the interaction with the bitcoin network. +type mockWalletController struct { + RootKey *btcec.PrivateKey + PublishedTransactions chan *wire.MsgTx + index uint32 + Utxos []*Utxo +} + +// BackEnd returns "mock" to signify a mock wallet controller. +func (w *mockWalletController) BackEnd() string { + return "mock" +} + +// FetchInputInfo will be called to get info about the inputs to the funding +// transaction. +func (w *mockWalletController) FetchInputInfo( + prevOut *wire.OutPoint) (*Utxo, error) { + + utxo := &Utxo{ + AddressType: WitnessPubKey, + Value: 10 * btcutil.SatoshiPerBitcoin, + PkScript: []byte("dummy"), + Confirmations: 1, + OutPoint: *prevOut, + } + return utxo, nil +} + +// ScriptForOutput returns the address, witness program and redeem script for a +// given UTXO. An error is returned if the UTXO does not belong to our wallet or +// it is not a managed pubKey address. +func (w *mockWalletController) ScriptForOutput(*wire.TxOut) ( + waddrmgr.ManagedPubKeyAddress, []byte, []byte, error) { + + return nil, nil, nil, nil +} + +// ConfirmedBalance currently returns dummy values. +func (w *mockWalletController) ConfirmedBalance(int32, string) (btcutil.Amount, + error) { + + return 0, nil +} + +// NewAddress is called to get new addresses for delivery, change etc. +func (w *mockWalletController) NewAddress(AddressType, bool, + string) (btcutil.Address, error) { + + addr, _ := btcutil.NewAddressPubKey( + w.RootKey.PubKey().SerializeCompressed(), &chaincfg.MainNetParams, + ) + return addr, nil +} + +// LastUnusedAddress currently returns dummy values. +func (w *mockWalletController) LastUnusedAddress(AddressType, + string) (btcutil.Address, error) { + + return nil, nil +} + +// IsOurAddress currently returns a dummy value. +func (w *mockWalletController) IsOurAddress(btcutil.Address) bool { + return false +} + +// AddressInfo currently returns a dummy value. +func (w *mockWalletController) AddressInfo( + btcutil.Address) (waddrmgr.ManagedAddress, error) { + + return nil, nil +} + +// ListAccounts currently returns a dummy value. +func (w *mockWalletController) ListAccounts(string, + *waddrmgr.KeyScope) ([]*waddrmgr.AccountProperties, error) { + + return nil, nil +} + +// RequiredReserve currently returns a dummy value. +func (w *mockWalletController) RequiredReserve(uint32) btcutil.Amount { + return 0 +} + +// ListAddresses currently returns a dummy value. +func (w *mockWalletController) ListAddresses(string, + bool) (AccountAddressMap, error) { + + return nil, nil +} + +// ImportAccount currently returns a dummy value. +func (w *mockWalletController) ImportAccount(string, *hdkeychain.ExtendedKey, + uint32, *waddrmgr.AddressType, bool) (*waddrmgr.AccountProperties, + []btcutil.Address, []btcutil.Address, error) { + + return nil, nil, nil, nil +} + +// ImportPublicKey currently returns a dummy value. +func (w *mockWalletController) ImportPublicKey(*btcec.PublicKey, + waddrmgr.AddressType) error { + + return nil +} + +// ImportTaprootScript currently returns a dummy value. +func (w *mockWalletController) ImportTaprootScript(waddrmgr.KeyScope, + *waddrmgr.Tapscript) (waddrmgr.ManagedAddress, error) { + + return nil, nil +} + +// SendOutputs currently returns dummy values. +func (w *mockWalletController) SendOutputs([]*wire.TxOut, + chainfee.SatPerKWeight, int32, string) (*wire.MsgTx, error) { + + return nil, nil +} + +// CreateSimpleTx currently returns dummy values. +func (w *mockWalletController) CreateSimpleTx([]*wire.TxOut, + chainfee.SatPerKWeight, int32, bool) (*txauthor.AuthoredTx, error) { + + return nil, nil +} + +// ListUnspentWitness is called by the wallet when doing coin selection. We just +// need one unspent for the funding transaction. +func (w *mockWalletController) ListUnspentWitness(int32, int32, + string) ([]*Utxo, error) { + + // If the mock already has a list of utxos, return it. + if w.Utxos != nil { + return w.Utxos, nil + } + + // Otherwise create one to return. + utxo := &Utxo{ + AddressType: WitnessPubKey, + Value: btcutil.Amount(10 * btcutil.SatoshiPerBitcoin), + PkScript: CoinPkScript, + OutPoint: wire.OutPoint{ + Hash: chainhash.Hash{}, + Index: w.index, + }, + } + atomic.AddUint32(&w.index, 1) + var ret []*Utxo + ret = append(ret, utxo) + return ret, nil +} + +// ListTransactionDetails currently returns dummy values. +func (w *mockWalletController) ListTransactionDetails(int32, int32, + string) ([]*TransactionDetail, error) { + + return nil, nil +} + +// LockOutpoint currently does nothing. +func (w *mockWalletController) LockOutpoint(o wire.OutPoint) {} + +// UnlockOutpoint currently does nothing. +func (w *mockWalletController) UnlockOutpoint(o wire.OutPoint) {} + +// LeaseOutput returns the current time and a nil error. +func (w *mockWalletController) LeaseOutput(wtxmgr.LockID, wire.OutPoint, + time.Duration) (time.Time, []byte, btcutil.Amount, error) { + + return time.Now(), nil, 0, nil +} + +// ReleaseOutput currently does nothing. +func (w *mockWalletController) ReleaseOutput(wtxmgr.LockID, wire.OutPoint) error { + return nil +} + +func (w *mockWalletController) ListLeasedOutputs() ([]*base.ListLeasedOutputResult, + error) { + + return nil, nil +} + +// FundPsbt currently does nothing. +func (w *mockWalletController) FundPsbt(*psbt.Packet, int32, chainfee.SatPerKWeight, + string, *waddrmgr.KeyScope) (int32, error) { + + return 0, nil +} + +// SignPsbt currently does nothing. +func (w *mockWalletController) SignPsbt(*psbt.Packet) ([]uint32, error) { + return nil, nil +} + +// FinalizePsbt currently does nothing. +func (w *mockWalletController) FinalizePsbt(_ *psbt.Packet, _ string) error { + return nil +} + +// PublishTransaction sends a transaction to the PublishedTransactions chan. +func (w *mockWalletController) PublishTransaction(tx *wire.MsgTx, _ string) error { + w.PublishedTransactions <- tx + return nil +} + +// LabelTransaction currently does nothing. +func (w *mockWalletController) LabelTransaction(chainhash.Hash, string, + bool) error { + + return nil +} + +// SubscribeTransactions currently does nothing. +func (w *mockWalletController) SubscribeTransactions() (TransactionSubscription, + error) { + + return nil, nil +} + +// IsSynced currently returns dummy values. +func (w *mockWalletController) IsSynced() (bool, int64, error) { + return true, int64(0), nil +} + +// GetRecoveryInfo currently returns dummy values. +func (w *mockWalletController) GetRecoveryInfo() (bool, float64, error) { + return true, float64(1), nil +} + +// Start currently does nothing. +func (w *mockWalletController) Start() error { + return nil +} + +// Stop currently does nothing. +func (w *mockWalletController) Stop() error { + return nil +} + +func (w *mockWalletController) FetchTx(chainhash.Hash) (*wire.MsgTx, error) { + return nil, nil +} + +func (w *mockWalletController) RemoveDescendants(*wire.MsgTx) error { + return nil +} + +// mockChainNotifier is a mock implementation of the ChainNotifier interface. +type mockChainNotifier struct { + SpendChan chan *chainntnfs.SpendDetail + EpochChan chan *chainntnfs.BlockEpoch + ConfChan chan *chainntnfs.TxConfirmation +} + +// RegisterConfirmationsNtfn returns a ConfirmationEvent that contains a channel +// that the tx confirmation will go over. +func (c *mockChainNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, + pkScript []byte, numConfs, heightHint uint32, + opts ...chainntnfs.NotifierOption) (*chainntnfs.ConfirmationEvent, error) { + + return &chainntnfs.ConfirmationEvent{ + Confirmed: c.ConfChan, + Cancel: func() {}, + }, nil +} + +// RegisterSpendNtfn returns a SpendEvent that contains a channel that the spend +// details will go over. +func (c *mockChainNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, + pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) { + + return &chainntnfs.SpendEvent{ + Spend: c.SpendChan, + Cancel: func() {}, + }, nil +} + +// RegisterBlockEpochNtfn returns a BlockEpochEvent that contains a channel that +// block epochs will go over. +func (c *mockChainNotifier) RegisterBlockEpochNtfn(blockEpoch *chainntnfs.BlockEpoch) ( + *chainntnfs.BlockEpochEvent, error) { + + return &chainntnfs.BlockEpochEvent{ + Epochs: c.EpochChan, + Cancel: func() {}, + }, nil +} + +// Start currently returns a dummy value. +func (c *mockChainNotifier) Start() error { + return nil +} + +// Started currently returns a dummy value. +func (c *mockChainNotifier) Started() bool { + return true +} + +// Stop currently returns a dummy value. +func (c *mockChainNotifier) Stop() error { + return nil +} + +type mockChainIO struct{} + +func (*mockChainIO) GetBestBlock() (*chainhash.Hash, int32, error) { + return nil, 0, nil +} + +func (*mockChainIO) GetUtxo(op *wire.OutPoint, _ []byte, + heightHint uint32, _ <-chan struct{}) (*wire.TxOut, error) { + return nil, nil +} + +func (*mockChainIO) GetBlockHash(blockHeight int64) (*chainhash.Hash, error) { + return nil, nil +} + +func (*mockChainIO) GetBlock(blockHash *chainhash.Hash) (*wire.MsgBlock, error) { + return nil, nil +} diff --git a/lnwallet/rebroadcaster_test.go b/lnwallet/rebroadcaster_test.go new file mode 100644 index 000000000..599d7d64e --- /dev/null +++ b/lnwallet/rebroadcaster_test.go @@ -0,0 +1,197 @@ +package lnwallet + +import ( + "sync/atomic" + "testing" + "time" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + + "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/lnutils" + "github.com/stretchr/testify/require" +) + +type mockRebroadcaster struct { + started atomic.Bool + + rebroadcastAttempt chan struct{} + + falseStart bool + + confSignal chan struct{} + + startSignal chan struct{} +} + +func newMockRebroadcaster(falseStart bool) *mockRebroadcaster { + return &mockRebroadcaster{ + rebroadcastAttempt: make(chan struct{}, 1), + falseStart: falseStart, + confSignal: make(chan struct{}, 1), + startSignal: make(chan struct{}), + } +} + +func (m *mockRebroadcaster) Start() error { + if !m.falseStart { + defer m.started.Store(true) + defer close(m.startSignal) + } + + return nil +} + +func (m *mockRebroadcaster) Started() bool { + return m.started.Load() +} + +func (m *mockRebroadcaster) Stop() { +} + +// Broadcast enqueues a transaction to be rebroadcast until it's been +// confirmed. +func (m *mockRebroadcaster) Broadcast(tx *wire.MsgTx) error { + m.rebroadcastAttempt <- struct{}{} + return nil +} + +func (m *mockRebroadcaster) MarkAsConfirmed(txid chainhash.Hash) { + m.confSignal <- struct{}{} +} + +func assertBroadcasterBypass(t *testing.T, wallet *LightningWallet, + rebroadcaster *mockRebroadcaster, + walletController *mockWalletController) { + + testTx := wire.NewMsgTx(2) + timeout := time.Second * 1 + + require.NoError(t, wallet.PublishTransaction(testTx, "")) + + // The tx should go to the backend. + _, err := lnutils.RecvOrTimeout( + walletController.PublishedTransactions, timeout, + ) + require.NoError(t, err) + + // It shouldn't go to the rebroadcaster. + select { + case <-time.After(timeout): + case <-rebroadcaster.rebroadcastAttempt: + t.Fatal("tx sent to rebroadcaster") + } +} + +func assertBroadcasterSend(t *testing.T, wallet *LightningWallet, + rebroadcaster *mockRebroadcaster, + walletController *mockWalletController) { + + testTx := wire.NewMsgTx(2) + testTx.AddTxOut(&wire.TxOut{}) + + timeout := time.Second * 1 + + require.NoError(t, wallet.PublishTransaction(testTx, "")) + + // The tx should go to the backend. + _, err := lnutils.RecvOrTimeout( + walletController.PublishedTransactions, timeout, + ) + require.NoError(t, err) + + // It should also go to the rebroadcaster. + select { + case <-time.After(timeout): + t.Fatal("tx not sent to rebroadcaster") + case <-rebroadcaster.rebroadcastAttempt: + } +} + +// TestWalletRebroadcaster tests that the wallet properly manages the existence +// or lack of existence of the rebroadcaster, and also properly marks the +// transaction as confirmed. +func TestWalletRebroadcaster(t *testing.T) { + t.Parallel() + + rebroadcaster := newMockRebroadcaster(false) + walletController := &mockWalletController{ + PublishedTransactions: make(chan *wire.MsgTx, 1), + } + chainIO := &mockChainIO{} + notifier := &mockChainNotifier{ + SpendChan: make(chan *chainntnfs.SpendDetail, 1), + EpochChan: make(chan *chainntnfs.BlockEpoch, 1), + ConfChan: make(chan *chainntnfs.TxConfirmation, 1), + } + cfg := &Config{ + Rebroadcaster: rebroadcaster, + WalletController: walletController, + Notifier: notifier, + ChainIO: chainIO, + } + + t.Run("rebroadcast bypass", func(t *testing.T) { + // We'll make a copy of the config, but without the + // broadcaster. + testCfg := *cfg + testCfg.Rebroadcaster = nil + + wallet, err := NewLightningWallet(testCfg) + require.NoError(t, err) + require.NoError(t, wallet.Startup()) + + // If we try to broadcast, it should go straight to the wallet + // backend and skip the broadcaster. + assertBroadcasterBypass( + t, wallet, rebroadcaster, walletController, + ) + + wallet.Shutdown() + + // If we make a new wallet, that has the broadcaster, but + // hasn't started yet, we should see the same behavior. + testCfg.Rebroadcaster = newMockRebroadcaster(true) + + wallet, err = NewLightningWallet(testCfg) + require.NoError(t, err) + require.NoError(t, wallet.Startup()) + + assertBroadcasterBypass( + t, wallet, rebroadcaster, walletController, + ) + + wallet.Shutdown() + }) + + t.Run("rebroadcast normal", func(t *testing.T) { + wallet, err := NewLightningWallet(*cfg) + require.NoError(t, err) + require.NoError(t, wallet.Startup()) + + defer wallet.Shutdown() + + // Wait for the broadcaster to start. + _, err = lnutils.RecvOrTimeout( + rebroadcaster.startSignal, time.Second, + ) + require.NoError(t, err) + + // We'll now broadcast a new test transaction, asserting that + // it goes to both the backend and the rebroadcaster. + assertBroadcasterSend( + t, wallet, rebroadcaster, walletController, + ) + + // We'll now mark the transaction as confirmed, and assert that + // the rebroadcaster was notified. + notifier.ConfChan <- &chainntnfs.TxConfirmation{} + + _, err = lnutils.RecvOrTimeout( + rebroadcaster.confSignal, time.Second, + ) + require.NoError(t, err) + + }) +}