lnd+server: use struct for database instances

As a preparation to initialize more than just the channel database on
startup we introduce a new struct that holds a reference to each of our
database instances.
This commit is contained in:
Oliver Gugger 2021-08-03 09:57:24 +02:00
parent ad061b73e3
commit abf3942228
No known key found for this signature in database
GPG Key ID: 8E4256593F177720
2 changed files with 54 additions and 45 deletions

53
lnd.go
View File

@ -449,7 +449,7 @@ func Main(cfg *Config, lisCfg ListenerCfg, interceptor signal.Interceptor) error
ltndLog.Infof("Elected as leader (%v)", cfg.Cluster.ID) ltndLog.Infof("Elected as leader (%v)", cfg.Cluster.ID)
} }
localChanDB, remoteChanDB, cleanUp, err := initializeDatabases(ctx, cfg) dbs, cleanUp, err := initializeDatabases(ctx, cfg)
switch { switch {
case err == channeldb.ErrDryRunMigrationOK: case err == channeldb.ErrDryRunMigrationOK:
ltndLog.Infof("%v, exiting", err) ltndLog.Infof("%v, exiting", err)
@ -467,7 +467,7 @@ func Main(cfg *Config, lisCfg ListenerCfg, interceptor signal.Interceptor) error
// environment. This will ensure that all members of the cluster // environment. This will ensure that all members of the cluster
// have access to the same wallet state. // have access to the same wallet state.
loaderOpt = btcwallet.LoaderWithExternalWalletDB( loaderOpt = btcwallet.LoaderWithExternalWalletDB(
remoteChanDB.Backend, dbs.remoteChanDB.Backend,
) )
} else { } else {
// When "running locally", LND will use the bbolt wallet.db to // When "running locally", LND will use the bbolt wallet.db to
@ -706,8 +706,8 @@ func Main(cfg *Config, lisCfg ListenerCfg, interceptor signal.Interceptor) error
LitecoindMode: cfg.LitecoindMode, LitecoindMode: cfg.LitecoindMode,
BtcdMode: cfg.BtcdMode, BtcdMode: cfg.BtcdMode,
LtcdMode: cfg.LtcdMode, LtcdMode: cfg.LtcdMode,
LocalChanDB: localChanDB, LocalChanDB: dbs.localChanDB,
RemoteChanDB: remoteChanDB, RemoteChanDB: dbs.remoteChanDB,
PrivateWalletPw: privateWalletPw, PrivateWalletPw: privateWalletPw,
PublicWalletPw: publicWalletPw, PublicWalletPw: publicWalletPw,
Birthday: walletInitParams.Birthday, Birthday: walletInitParams.Birthday,
@ -900,7 +900,7 @@ func Main(cfg *Config, lisCfg ListenerCfg, interceptor signal.Interceptor) error
// Set up the core server which will listen for incoming peer // Set up the core server which will listen for incoming peer
// connections. // connections.
server, err := newServer( server, err := newServer(
cfg, cfg.Listeners, localChanDB, remoteChanDB, towerClientDB, cfg, cfg.Listeners, dbs, towerClientDB,
activeChainControl, &idKeyDesc, walletInitParams.ChansToRestore, activeChainControl, &idKeyDesc, walletInitParams.ChansToRestore,
chainedAcceptor, torController, chainedAcceptor, torController,
) )
@ -1621,6 +1621,13 @@ func waitForWalletPassword(cfg *Config,
} }
} }
// databaseInstances is a struct that holds all instances to the actual
// databases that are used in lnd.
type databaseInstances struct {
localChanDB *channeldb.DB
remoteChanDB *channeldb.DB
}
// initializeDatabases extracts the current databases that we'll use for normal // initializeDatabases extracts the current databases that we'll use for normal
// operation in the daemon. Two databases are returned: one remote and one // operation in the daemon. Two databases are returned: one remote and one
// local. However, only if the replicated database is active will the remote // local. However, only if the replicated database is active will the remote
@ -1628,7 +1635,7 @@ func waitForWalletPassword(cfg *Config,
// both point to the same local database. A function closure that closes all // both point to the same local database. A function closure that closes all
// opened databases is also returned. // opened databases is also returned.
func initializeDatabases(ctx context.Context, func initializeDatabases(ctx context.Context,
cfg *Config) (*channeldb.DB, *channeldb.DB, func(), error) { cfg *Config) (*databaseInstances, func(), error) {
ltndLog.Infof("Opening the main database, this might take a few " + ltndLog.Infof("Opening the main database, this might take a few " +
"minutes...") "minutes...")
@ -1643,20 +1650,20 @@ func initializeDatabases(ctx context.Context,
databaseBackends, err := cfg.DB.GetBackends(ctx, cfg.localDatabaseDir()) databaseBackends, err := cfg.DB.GetBackends(ctx, cfg.localDatabaseDir())
if err != nil { if err != nil {
return nil, nil, nil, fmt.Errorf("unable to obtain database "+ return nil, nil, fmt.Errorf("unable to obtain database "+
"backends: %v", err) "backends: %v", err)
} }
// If the remoteDB is nil, then we'll just open a local DB as normal, // If the remoteDB is nil, then we'll just open a local DB as normal,
// having the remote and local pointer be the exact same instance. // having the remote and local pointer be the exact same instance.
var ( var (
localChanDB, remoteChanDB *channeldb.DB dbs = &databaseInstances{}
closeFuncs []func() closeFuncs []func()
) )
if databaseBackends.RemoteDB == nil { if databaseBackends.RemoteDB == nil {
// Open the channeldb, which is dedicated to storing channel, // Open the channeldb, which is dedicated to storing channel,
// and network related metadata. // and network related metadata.
localChanDB, err = channeldb.CreateWithBackend( dbs.localChanDB, err = channeldb.CreateWithBackend(
databaseBackends.LocalDB, databaseBackends.LocalDB,
channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize), channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize),
channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize), channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize),
@ -1665,19 +1672,19 @@ func initializeDatabases(ctx context.Context,
) )
switch { switch {
case err == channeldb.ErrDryRunMigrationOK: case err == channeldb.ErrDryRunMigrationOK:
return nil, nil, nil, err return nil, nil, err
case err != nil: case err != nil:
err := fmt.Errorf("unable to open local channeldb: %v", err) err := fmt.Errorf("unable to open local channeldb: %v", err)
ltndLog.Error(err) ltndLog.Error(err)
return nil, nil, nil, err return nil, nil, err
} }
closeFuncs = append(closeFuncs, func() { closeFuncs = append(closeFuncs, func() {
localChanDB.Close() dbs.localChanDB.Close()
}) })
remoteChanDB = localChanDB dbs.remoteChanDB = dbs.localChanDB
} else { } else {
ltndLog.Infof("Database replication is available! Creating " + ltndLog.Infof("Database replication is available! Creating " +
"local and remote channeldb instances") "local and remote channeldb instances")
@ -1685,7 +1692,7 @@ func initializeDatabases(ctx context.Context,
// Otherwise, we'll open two instances, one for the state we // Otherwise, we'll open two instances, one for the state we
// only need locally, and the other for things we want to // only need locally, and the other for things we want to
// ensure are replicated. // ensure are replicated.
localChanDB, err = channeldb.CreateWithBackend( dbs.localChanDB, err = channeldb.CreateWithBackend(
databaseBackends.LocalDB, databaseBackends.LocalDB,
channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize), channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize),
channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize), channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize),
@ -1702,34 +1709,34 @@ func initializeDatabases(ctx context.Context,
case err != nil: case err != nil:
err := fmt.Errorf("unable to open local channeldb: %v", err) err := fmt.Errorf("unable to open local channeldb: %v", err)
ltndLog.Error(err) ltndLog.Error(err)
return nil, nil, nil, err return nil, nil, err
} }
closeFuncs = append(closeFuncs, func() { closeFuncs = append(closeFuncs, func() {
localChanDB.Close() dbs.localChanDB.Close()
}) })
ltndLog.Infof("Opening replicated database instance...") ltndLog.Infof("Opening replicated database instance...")
remoteChanDB, err = channeldb.CreateWithBackend( dbs.remoteChanDB, err = channeldb.CreateWithBackend(
databaseBackends.RemoteDB, databaseBackends.RemoteDB,
channeldb.OptionDryRunMigration(cfg.DryRunMigration), channeldb.OptionDryRunMigration(cfg.DryRunMigration),
channeldb.OptionSetBatchCommitInterval(cfg.DB.BatchCommitInterval), channeldb.OptionSetBatchCommitInterval(cfg.DB.BatchCommitInterval),
) )
switch { switch {
case err == channeldb.ErrDryRunMigrationOK: case err == channeldb.ErrDryRunMigrationOK:
return nil, nil, nil, err return nil, nil, err
case err != nil: case err != nil:
localChanDB.Close() dbs.localChanDB.Close()
err := fmt.Errorf("unable to open remote channeldb: %v", err) err := fmt.Errorf("unable to open remote channeldb: %v", err)
ltndLog.Error(err) ltndLog.Error(err)
return nil, nil, nil, err return nil, nil, err
} }
closeFuncs = append(closeFuncs, func() { closeFuncs = append(closeFuncs, func() {
remoteChanDB.Close() dbs.remoteChanDB.Close()
}) })
} }
@ -1742,7 +1749,7 @@ func initializeDatabases(ctx context.Context,
} }
} }
return localChanDB, remoteChanDB, cleanUp, nil return dbs, cleanUp, nil
} }
// initNeutrinoBackend inits a new instance of the neutrino light client // initNeutrinoBackend inits a new instance of the neutrino light client

View File

@ -352,7 +352,7 @@ func noiseDial(idKey keychain.SingleKeyECDH,
// newServer creates a new instance of the server which is to listen using the // newServer creates a new instance of the server which is to listen using the
// passed listener address. // passed listener address.
func newServer(cfg *Config, listenAddrs []net.Addr, func newServer(cfg *Config, listenAddrs []net.Addr,
localChanDB, remoteChanDB *channeldb.DB, dbs *databaseInstances,
towerClientDB wtclient.DB, cc *chainreg.ChainControl, towerClientDB wtclient.DB, cc *chainreg.ChainControl,
nodeKeyDesc *keychain.KeyDescriptor, nodeKeyDesc *keychain.KeyDescriptor,
chansToRestore walletunlocker.ChannelsToRecover, chansToRestore walletunlocker.ChannelsToRecover,
@ -435,15 +435,15 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
s := &server{ s := &server{
cfg: cfg, cfg: cfg,
localChanDB: localChanDB, localChanDB: dbs.localChanDB,
remoteChanDB: remoteChanDB, remoteChanDB: dbs.remoteChanDB,
cc: cc, cc: cc,
sigPool: lnwallet.NewSigPool(cfg.Workers.Sig, cc.Signer), sigPool: lnwallet.NewSigPool(cfg.Workers.Sig, cc.Signer),
writePool: writePool, writePool: writePool,
readPool: readPool, readPool: readPool,
chansToRestore: chansToRestore, chansToRestore: chansToRestore,
channelNotifier: channelnotifier.New(remoteChanDB), channelNotifier: channelnotifier.New(dbs.remoteChanDB),
identityECDH: nodeKeyECDH, identityECDH: nodeKeyECDH,
nodeSigner: netann.NewNodeSigner(nodeKeySigner), nodeSigner: netann.NewNodeSigner(nodeKeySigner),
@ -475,7 +475,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
} }
s.witnessBeacon = &preimageBeacon{ s.witnessBeacon = &preimageBeacon{
wCache: remoteChanDB.NewWitnessCache(), wCache: dbs.remoteChanDB.NewWitnessCache(),
subscribers: make(map[uint64]*preimageSubscriber), subscribers: make(map[uint64]*preimageSubscriber),
} }
@ -489,13 +489,13 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
uint32(currentHeight), currentHash, cc.ChainNotifier, uint32(currentHeight), currentHash, cc.ChainNotifier,
) )
s.invoices = invoices.NewRegistry( s.invoices = invoices.NewRegistry(
remoteChanDB, expiryWatcher, &registryConfig, dbs.remoteChanDB, expiryWatcher, &registryConfig,
) )
s.htlcNotifier = htlcswitch.NewHtlcNotifier(time.Now) s.htlcNotifier = htlcswitch.NewHtlcNotifier(time.Now)
s.htlcSwitch, err = htlcswitch.New(htlcswitch.Config{ s.htlcSwitch, err = htlcswitch.New(htlcswitch.Config{
DB: remoteChanDB, DB: dbs.remoteChanDB,
LocalChannelClose: func(pubKey []byte, LocalChannelClose: func(pubKey []byte,
request *htlcswitch.ChanClose) { request *htlcswitch.ChanClose) {
@ -510,7 +510,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
peer.HandleLocalCloseChanReqs(request) peer.HandleLocalCloseChanReqs(request)
}, },
FwdingLog: remoteChanDB.ForwardingLog(), FwdingLog: dbs.remoteChanDB.ForwardingLog(),
SwitchPackager: channeldb.NewSwitchPackager(), SwitchPackager: channeldb.NewSwitchPackager(),
ExtractErrorEncrypter: s.sphinx.ExtractErrorEncrypter, ExtractErrorEncrypter: s.sphinx.ExtractErrorEncrypter,
FetchLastChannelUpdate: s.fetchLastChanUpdate(), FetchLastChannelUpdate: s.fetchLastChanUpdate(),
@ -537,8 +537,8 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
MessageSigner: s.nodeSigner, MessageSigner: s.nodeSigner,
IsChannelActive: s.htlcSwitch.HasActiveLink, IsChannelActive: s.htlcSwitch.HasActiveLink,
ApplyChannelUpdate: s.applyChannelUpdate, ApplyChannelUpdate: s.applyChannelUpdate,
DB: remoteChanDB, DB: dbs.remoteChanDB,
Graph: localChanDB.ChannelGraph(), Graph: dbs.localChanDB.ChannelGraph(),
} }
chanStatusMgr, err := netann.NewChanStatusManager(chanStatusMgrCfg) chanStatusMgr, err := netann.NewChanStatusManager(chanStatusMgrCfg)
@ -630,7 +630,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
// As the graph can be obtained at anytime from the network, we won't // As the graph can be obtained at anytime from the network, we won't
// replicate it, and instead it'll only be stored locally. // replicate it, and instead it'll only be stored locally.
chanGraph := localChanDB.ChannelGraph() chanGraph := dbs.localChanDB.ChannelGraph()
// We'll now reconstruct a node announcement based on our current // We'll now reconstruct a node announcement based on our current
// configuration so we can send it out as a sort of heart beat within // configuration so we can send it out as a sort of heart beat within
@ -697,7 +697,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
// The router will get access to the payment ID sequencer, such that it // The router will get access to the payment ID sequencer, such that it
// can generate unique payment IDs. // can generate unique payment IDs.
sequencer, err := htlcswitch.NewPersistentSequencer(remoteChanDB) sequencer, err := htlcswitch.NewPersistentSequencer(dbs.remoteChanDB)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -742,7 +742,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
} }
s.missionControl, err = routing.NewMissionControl( s.missionControl, err = routing.NewMissionControl(
remoteChanDB, selfNode.PubKeyBytes, dbs.remoteChanDB, selfNode.PubKeyBytes,
&routing.MissionControlConfig{ &routing.MissionControlConfig{
ProbabilityEstimatorCfg: estimatorCfg, ProbabilityEstimatorCfg: estimatorCfg,
MaxMcHistory: routingConfig.MaxMcHistory, MaxMcHistory: routingConfig.MaxMcHistory,
@ -775,7 +775,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
PathFindingConfig: pathFindingConfig, PathFindingConfig: pathFindingConfig,
} }
paymentControl := channeldb.NewPaymentControl(remoteChanDB) paymentControl := channeldb.NewPaymentControl(dbs.remoteChanDB)
s.controlTower = routing.NewControlTower(paymentControl) s.controlTower = routing.NewControlTower(paymentControl)
@ -851,7 +851,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
FetchChannel: s.remoteChanDB.FetchChannel, FetchChannel: s.remoteChanDB.FetchChannel,
} }
utxnStore, err := newNurseryStore(s.cfg.ActiveNetParams.GenesisHash, remoteChanDB) utxnStore, err := newNurseryStore(
s.cfg.ActiveNetParams.GenesisHash, dbs.remoteChanDB,
)
if err != nil { if err != nil {
srvrLog.Errorf("unable to create nursery store: %v", err) srvrLog.Errorf("unable to create nursery store: %v", err)
return nil, err return nil, err
@ -861,7 +863,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
sweep.DefaultBatchWindowDuration) sweep.DefaultBatchWindowDuration)
sweeperStore, err := sweep.NewSweeperStore( sweeperStore, err := sweep.NewSweeperStore(
remoteChanDB, s.cfg.ActiveNetParams.GenesisHash, dbs.remoteChanDB, s.cfg.ActiveNetParams.GenesisHash,
) )
if err != nil { if err != nil {
srvrLog.Errorf("unable to create sweeper store: %v", err) srvrLog.Errorf("unable to create sweeper store: %v", err)
@ -888,8 +890,8 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
s.utxoNursery = newUtxoNursery(&NurseryConfig{ s.utxoNursery = newUtxoNursery(&NurseryConfig{
ChainIO: cc.ChainIO, ChainIO: cc.ChainIO,
ConfDepth: 1, ConfDepth: 1,
FetchClosedChannels: remoteChanDB.FetchClosedChannels, FetchClosedChannels: dbs.remoteChanDB.FetchClosedChannels,
FetchClosedChannel: remoteChanDB.FetchClosedChannel, FetchClosedChannel: dbs.remoteChanDB.FetchClosedChannel,
Notifier: cc.ChainNotifier, Notifier: cc.ChainNotifier,
PublishTransaction: cc.Wallet.PublishTransaction, PublishTransaction: cc.Wallet.PublishTransaction,
Store: utxnStore, Store: utxnStore,
@ -1010,18 +1012,18 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
PaymentsExpirationGracePeriod: cfg.PaymentsExpirationGracePeriod, PaymentsExpirationGracePeriod: cfg.PaymentsExpirationGracePeriod,
IsForwardedHTLC: s.htlcSwitch.IsForwardedHTLC, IsForwardedHTLC: s.htlcSwitch.IsForwardedHTLC,
Clock: clock.NewDefaultClock(), Clock: clock.NewDefaultClock(),
}, remoteChanDB) }, dbs.remoteChanDB)
s.breachArbiter = newBreachArbiter(&BreachConfig{ s.breachArbiter = newBreachArbiter(&BreachConfig{
CloseLink: closeLink, CloseLink: closeLink,
DB: remoteChanDB, DB: dbs.remoteChanDB,
Estimator: s.cc.FeeEstimator, Estimator: s.cc.FeeEstimator,
GenSweepScript: newSweepPkScriptGen(cc.Wallet), GenSweepScript: newSweepPkScriptGen(cc.Wallet),
Notifier: cc.ChainNotifier, Notifier: cc.ChainNotifier,
PublishTransaction: cc.Wallet.PublishTransaction, PublishTransaction: cc.Wallet.PublishTransaction,
ContractBreaches: contractBreaches, ContractBreaches: contractBreaches,
Signer: cc.Wallet.Cfg.Signer, Signer: cc.Wallet.Cfg.Signer,
Store: newRetributionStore(remoteChanDB), Store: newRetributionStore(dbs.remoteChanDB),
}) })
// Select the configuration and furnding parameters for Bitcoin or // Select the configuration and furnding parameters for Bitcoin or
@ -1069,7 +1071,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
FindChannel: func(chanID lnwire.ChannelID) ( FindChannel: func(chanID lnwire.ChannelID) (
*channeldb.OpenChannel, error) { *channeldb.OpenChannel, error) {
dbChannels, err := remoteChanDB.FetchAllChannels() dbChannels, err := dbs.remoteChanDB.FetchAllChannels()
if err != nil { if err != nil {
return nil, err return nil, err
} }