diff --git a/lnd.go b/lnd.go index d883c92cc..f61b12918 100644 --- a/lnd.go +++ b/lnd.go @@ -449,7 +449,7 @@ func Main(cfg *Config, lisCfg ListenerCfg, interceptor signal.Interceptor) error ltndLog.Infof("Elected as leader (%v)", cfg.Cluster.ID) } - localChanDB, remoteChanDB, cleanUp, err := initializeDatabases(ctx, cfg) + dbs, cleanUp, err := initializeDatabases(ctx, cfg) switch { case err == channeldb.ErrDryRunMigrationOK: ltndLog.Infof("%v, exiting", err) @@ -467,7 +467,7 @@ func Main(cfg *Config, lisCfg ListenerCfg, interceptor signal.Interceptor) error // environment. This will ensure that all members of the cluster // have access to the same wallet state. loaderOpt = btcwallet.LoaderWithExternalWalletDB( - remoteChanDB.Backend, + dbs.remoteChanDB.Backend, ) } else { // When "running locally", LND will use the bbolt wallet.db to @@ -706,8 +706,8 @@ func Main(cfg *Config, lisCfg ListenerCfg, interceptor signal.Interceptor) error LitecoindMode: cfg.LitecoindMode, BtcdMode: cfg.BtcdMode, LtcdMode: cfg.LtcdMode, - LocalChanDB: localChanDB, - RemoteChanDB: remoteChanDB, + LocalChanDB: dbs.localChanDB, + RemoteChanDB: dbs.remoteChanDB, PrivateWalletPw: privateWalletPw, PublicWalletPw: publicWalletPw, Birthday: walletInitParams.Birthday, @@ -900,7 +900,7 @@ func Main(cfg *Config, lisCfg ListenerCfg, interceptor signal.Interceptor) error // Set up the core server which will listen for incoming peer // connections. server, err := newServer( - cfg, cfg.Listeners, localChanDB, remoteChanDB, towerClientDB, + cfg, cfg.Listeners, dbs, towerClientDB, activeChainControl, &idKeyDesc, walletInitParams.ChansToRestore, chainedAcceptor, torController, ) @@ -1621,6 +1621,13 @@ func waitForWalletPassword(cfg *Config, } } +// databaseInstances is a struct that holds all instances to the actual +// databases that are used in lnd. +type databaseInstances struct { + localChanDB *channeldb.DB + remoteChanDB *channeldb.DB +} + // initializeDatabases extracts the current databases that we'll use for normal // operation in the daemon. Two databases are returned: one remote and one // local. However, only if the replicated database is active will the remote @@ -1628,7 +1635,7 @@ func waitForWalletPassword(cfg *Config, // both point to the same local database. A function closure that closes all // opened databases is also returned. func initializeDatabases(ctx context.Context, - cfg *Config) (*channeldb.DB, *channeldb.DB, func(), error) { + cfg *Config) (*databaseInstances, func(), error) { ltndLog.Infof("Opening the main database, this might take a few " + "minutes...") @@ -1643,20 +1650,20 @@ func initializeDatabases(ctx context.Context, databaseBackends, err := cfg.DB.GetBackends(ctx, cfg.localDatabaseDir()) if err != nil { - return nil, nil, nil, fmt.Errorf("unable to obtain database "+ + return nil, nil, fmt.Errorf("unable to obtain database "+ "backends: %v", err) } // If the remoteDB is nil, then we'll just open a local DB as normal, // having the remote and local pointer be the exact same instance. var ( - localChanDB, remoteChanDB *channeldb.DB - closeFuncs []func() + dbs = &databaseInstances{} + closeFuncs []func() ) if databaseBackends.RemoteDB == nil { // Open the channeldb, which is dedicated to storing channel, // and network related metadata. - localChanDB, err = channeldb.CreateWithBackend( + dbs.localChanDB, err = channeldb.CreateWithBackend( databaseBackends.LocalDB, channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize), channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize), @@ -1665,19 +1672,19 @@ func initializeDatabases(ctx context.Context, ) switch { case err == channeldb.ErrDryRunMigrationOK: - return nil, nil, nil, err + return nil, nil, err case err != nil: err := fmt.Errorf("unable to open local channeldb: %v", err) ltndLog.Error(err) - return nil, nil, nil, err + return nil, nil, err } closeFuncs = append(closeFuncs, func() { - localChanDB.Close() + dbs.localChanDB.Close() }) - remoteChanDB = localChanDB + dbs.remoteChanDB = dbs.localChanDB } else { ltndLog.Infof("Database replication is available! Creating " + "local and remote channeldb instances") @@ -1685,7 +1692,7 @@ func initializeDatabases(ctx context.Context, // Otherwise, we'll open two instances, one for the state we // only need locally, and the other for things we want to // ensure are replicated. - localChanDB, err = channeldb.CreateWithBackend( + dbs.localChanDB, err = channeldb.CreateWithBackend( databaseBackends.LocalDB, channeldb.OptionSetRejectCacheSize(cfg.Caches.RejectCacheSize), channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize), @@ -1702,34 +1709,34 @@ func initializeDatabases(ctx context.Context, case err != nil: err := fmt.Errorf("unable to open local channeldb: %v", err) ltndLog.Error(err) - return nil, nil, nil, err + return nil, nil, err } closeFuncs = append(closeFuncs, func() { - localChanDB.Close() + dbs.localChanDB.Close() }) ltndLog.Infof("Opening replicated database instance...") - remoteChanDB, err = channeldb.CreateWithBackend( + dbs.remoteChanDB, err = channeldb.CreateWithBackend( databaseBackends.RemoteDB, channeldb.OptionDryRunMigration(cfg.DryRunMigration), channeldb.OptionSetBatchCommitInterval(cfg.DB.BatchCommitInterval), ) switch { case err == channeldb.ErrDryRunMigrationOK: - return nil, nil, nil, err + return nil, nil, err case err != nil: - localChanDB.Close() + dbs.localChanDB.Close() err := fmt.Errorf("unable to open remote channeldb: %v", err) ltndLog.Error(err) - return nil, nil, nil, err + return nil, nil, err } closeFuncs = append(closeFuncs, func() { - remoteChanDB.Close() + dbs.remoteChanDB.Close() }) } @@ -1742,7 +1749,7 @@ func initializeDatabases(ctx context.Context, } } - return localChanDB, remoteChanDB, cleanUp, nil + return dbs, cleanUp, nil } // initNeutrinoBackend inits a new instance of the neutrino light client diff --git a/server.go b/server.go index 2b914b90a..635b19534 100644 --- a/server.go +++ b/server.go @@ -352,7 +352,7 @@ func noiseDial(idKey keychain.SingleKeyECDH, // newServer creates a new instance of the server which is to listen using the // passed listener address. func newServer(cfg *Config, listenAddrs []net.Addr, - localChanDB, remoteChanDB *channeldb.DB, + dbs *databaseInstances, towerClientDB wtclient.DB, cc *chainreg.ChainControl, nodeKeyDesc *keychain.KeyDescriptor, chansToRestore walletunlocker.ChannelsToRecover, @@ -435,15 +435,15 @@ func newServer(cfg *Config, listenAddrs []net.Addr, s := &server{ cfg: cfg, - localChanDB: localChanDB, - remoteChanDB: remoteChanDB, + localChanDB: dbs.localChanDB, + remoteChanDB: dbs.remoteChanDB, cc: cc, sigPool: lnwallet.NewSigPool(cfg.Workers.Sig, cc.Signer), writePool: writePool, readPool: readPool, chansToRestore: chansToRestore, - channelNotifier: channelnotifier.New(remoteChanDB), + channelNotifier: channelnotifier.New(dbs.remoteChanDB), identityECDH: nodeKeyECDH, nodeSigner: netann.NewNodeSigner(nodeKeySigner), @@ -475,7 +475,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, } s.witnessBeacon = &preimageBeacon{ - wCache: remoteChanDB.NewWitnessCache(), + wCache: dbs.remoteChanDB.NewWitnessCache(), subscribers: make(map[uint64]*preimageSubscriber), } @@ -489,13 +489,13 @@ func newServer(cfg *Config, listenAddrs []net.Addr, uint32(currentHeight), currentHash, cc.ChainNotifier, ) s.invoices = invoices.NewRegistry( - remoteChanDB, expiryWatcher, ®istryConfig, + dbs.remoteChanDB, expiryWatcher, ®istryConfig, ) s.htlcNotifier = htlcswitch.NewHtlcNotifier(time.Now) s.htlcSwitch, err = htlcswitch.New(htlcswitch.Config{ - DB: remoteChanDB, + DB: dbs.remoteChanDB, LocalChannelClose: func(pubKey []byte, request *htlcswitch.ChanClose) { @@ -510,7 +510,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, peer.HandleLocalCloseChanReqs(request) }, - FwdingLog: remoteChanDB.ForwardingLog(), + FwdingLog: dbs.remoteChanDB.ForwardingLog(), SwitchPackager: channeldb.NewSwitchPackager(), ExtractErrorEncrypter: s.sphinx.ExtractErrorEncrypter, FetchLastChannelUpdate: s.fetchLastChanUpdate(), @@ -537,8 +537,8 @@ func newServer(cfg *Config, listenAddrs []net.Addr, MessageSigner: s.nodeSigner, IsChannelActive: s.htlcSwitch.HasActiveLink, ApplyChannelUpdate: s.applyChannelUpdate, - DB: remoteChanDB, - Graph: localChanDB.ChannelGraph(), + DB: dbs.remoteChanDB, + Graph: dbs.localChanDB.ChannelGraph(), } chanStatusMgr, err := netann.NewChanStatusManager(chanStatusMgrCfg) @@ -630,7 +630,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, // As the graph can be obtained at anytime from the network, we won't // replicate it, and instead it'll only be stored locally. - chanGraph := localChanDB.ChannelGraph() + chanGraph := dbs.localChanDB.ChannelGraph() // We'll now reconstruct a node announcement based on our current // configuration so we can send it out as a sort of heart beat within @@ -697,7 +697,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, // The router will get access to the payment ID sequencer, such that it // can generate unique payment IDs. - sequencer, err := htlcswitch.NewPersistentSequencer(remoteChanDB) + sequencer, err := htlcswitch.NewPersistentSequencer(dbs.remoteChanDB) if err != nil { return nil, err } @@ -742,7 +742,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, } s.missionControl, err = routing.NewMissionControl( - remoteChanDB, selfNode.PubKeyBytes, + dbs.remoteChanDB, selfNode.PubKeyBytes, &routing.MissionControlConfig{ ProbabilityEstimatorCfg: estimatorCfg, MaxMcHistory: routingConfig.MaxMcHistory, @@ -775,7 +775,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, PathFindingConfig: pathFindingConfig, } - paymentControl := channeldb.NewPaymentControl(remoteChanDB) + paymentControl := channeldb.NewPaymentControl(dbs.remoteChanDB) s.controlTower = routing.NewControlTower(paymentControl) @@ -851,7 +851,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr, FetchChannel: s.remoteChanDB.FetchChannel, } - utxnStore, err := newNurseryStore(s.cfg.ActiveNetParams.GenesisHash, remoteChanDB) + utxnStore, err := newNurseryStore( + s.cfg.ActiveNetParams.GenesisHash, dbs.remoteChanDB, + ) if err != nil { srvrLog.Errorf("unable to create nursery store: %v", err) return nil, err @@ -861,7 +863,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, sweep.DefaultBatchWindowDuration) sweeperStore, err := sweep.NewSweeperStore( - remoteChanDB, s.cfg.ActiveNetParams.GenesisHash, + dbs.remoteChanDB, s.cfg.ActiveNetParams.GenesisHash, ) if err != nil { srvrLog.Errorf("unable to create sweeper store: %v", err) @@ -888,8 +890,8 @@ func newServer(cfg *Config, listenAddrs []net.Addr, s.utxoNursery = newUtxoNursery(&NurseryConfig{ ChainIO: cc.ChainIO, ConfDepth: 1, - FetchClosedChannels: remoteChanDB.FetchClosedChannels, - FetchClosedChannel: remoteChanDB.FetchClosedChannel, + FetchClosedChannels: dbs.remoteChanDB.FetchClosedChannels, + FetchClosedChannel: dbs.remoteChanDB.FetchClosedChannel, Notifier: cc.ChainNotifier, PublishTransaction: cc.Wallet.PublishTransaction, Store: utxnStore, @@ -1010,18 +1012,18 @@ func newServer(cfg *Config, listenAddrs []net.Addr, PaymentsExpirationGracePeriod: cfg.PaymentsExpirationGracePeriod, IsForwardedHTLC: s.htlcSwitch.IsForwardedHTLC, Clock: clock.NewDefaultClock(), - }, remoteChanDB) + }, dbs.remoteChanDB) s.breachArbiter = newBreachArbiter(&BreachConfig{ CloseLink: closeLink, - DB: remoteChanDB, + DB: dbs.remoteChanDB, Estimator: s.cc.FeeEstimator, GenSweepScript: newSweepPkScriptGen(cc.Wallet), Notifier: cc.ChainNotifier, PublishTransaction: cc.Wallet.PublishTransaction, ContractBreaches: contractBreaches, Signer: cc.Wallet.Cfg.Signer, - Store: newRetributionStore(remoteChanDB), + Store: newRetributionStore(dbs.remoteChanDB), }) // Select the configuration and furnding parameters for Bitcoin or @@ -1069,7 +1071,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, FindChannel: func(chanID lnwire.ChannelID) ( *channeldb.OpenChannel, error) { - dbChannels, err := remoteChanDB.FetchAllChannels() + dbChannels, err := dbs.remoteChanDB.FetchAllChannels() if err != nil { return nil, err }