diff --git a/config_builder.go b/config_builder.go index 225c82b96..08b134270 100644 --- a/config_builder.go +++ b/config_builder.go @@ -11,18 +11,23 @@ import ( "sort" "strconv" "strings" + "sync/atomic" "time" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btclog" "github.com/btcsuite/btcwallet/waddrmgr" "github.com/btcsuite/btcwallet/wallet" "github.com/btcsuite/btcwallet/walletdb" proxy "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/lightninglabs/neutrino" + "github.com/lightninglabs/neutrino/blockntfns" "github.com/lightninglabs/neutrino/headerfs" + "github.com/lightninglabs/neutrino/pushtx" "github.com/lightningnetwork/lnd/blockcache" + "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainreg" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/invoices" @@ -606,6 +611,65 @@ func (d *DefaultWalletImpl) BuildWalletConfig(ctx context.Context, return partialChainControl, walletConfig, cleanUp, nil } +// proxyBlockEpoch proxies a block epoch subsections to the underlying neutrino +// rebroadcaster client. +func proxyBlockEpoch(notifier chainntnfs.ChainNotifier, +) func() (*blockntfns.Subscription, error) { + + return func() (*blockntfns.Subscription, error) { + blockEpoch, err := notifier.RegisterBlockEpochNtfn( + nil, + ) + if err != nil { + return nil, err + } + + sub := blockntfns.Subscription{ + Notifications: make(chan blockntfns.BlockNtfn, 6), + Cancel: blockEpoch.Cancel, + } + go func() { + for blk := range blockEpoch.Epochs { + ntfn := blockntfns.NewBlockConnected( + *blk.BlockHeader, + uint32(blk.Height), + ) + + sub.Notifications <- ntfn + } + }() + + return &sub, nil + } +} + +// walletReBroadcaster is a simple wrapper around the pushtx.Broadcaster +// interface to adhere to the expanded lnwallet.Rebraodcaster interface. +type walletReBroadcaster struct { + started atomic.Bool + + *pushtx.Broadcaster +} + +// newWalletReBroadcaster creates a new instance of the walletReBroadcaster. +func newWalletReBroadcaster(broadcaster *pushtx.Broadcaster) *walletReBroadcaster { + return &walletReBroadcaster{ + Broadcaster: broadcaster, + } +} + +// Start launches all goroutines the rebroadcaster needs to operate. +func (w *walletReBroadcaster) Start() error { + defer w.started.Store(true) + + return w.Broadcaster.Start() +} + +// Started returns true if the broadcaster is already active. +func (w *walletReBroadcaster) Started() bool { + return w.started.Load() +} + // BuildChainControl is responsible for creating a fully populated chain // control instance from a wallet. // @@ -641,6 +705,29 @@ func (d *DefaultWalletImpl) BuildChainControl( NetParams: *walletConfig.NetParams, } + // The broadcast is already always active for neutrino nodes, so we + // don't want to create a rebroadcast loop. + if partialChainControl.Cfg.NeutrinoCS == nil { + broadcastCfg := pushtx.Config{ + Broadcast: func(tx *wire.MsgTx) error { + cs := partialChainControl.ChainSource + _, err := cs.SendRawTransaction( + tx, true, + ) + + return err + }, + SubscribeBlocks: proxyBlockEpoch( + partialChainControl.ChainNotifier, + ), + RebroadcastInterval: pushtx.DefaultRebroadcastInterval, + } + + lnWalletConfig.Rebroadcaster = newWalletReBroadcaster( + pushtx.NewBroadcaster(&broadcastCfg), + ) + } + // We've created the wallet configuration now, so we can finish // initializing the main chain control. activeChainControl, cleanUp, err := chainreg.NewChainControl(