mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-09-12 14:42:38 +02:00
multi: rename local and remote chan DBs
As a preparation to not have a local and remote version of the database around anymore, we rename the variables into what their actual function is. In case of the RPC server we even directly use the channel graph instead of the DB instance. This should allow us to extract the channel graph into its own, separate database (perhaps with better access characteristics) in the future.
This commit is contained in:
83
rpcserver.go
83
rpcserver.go
@@ -614,12 +614,11 @@ func (r *rpcServer) addDeps(s *server, macService *macaroons.Service,
|
||||
chanPredicate *chanacceptor.ChainedAcceptor) error {
|
||||
|
||||
// Set up router rpc backend.
|
||||
channelGraph := s.localChanDB.ChannelGraph()
|
||||
selfNode, err := channelGraph.SourceNode()
|
||||
selfNode, err := s.graphDB.SourceNode()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
graph := s.localChanDB.ChannelGraph()
|
||||
graph := s.graphDB
|
||||
routerBackend := &routerrpc.RouterBackend{
|
||||
SelfNode: selfNode.PubKeyBytes,
|
||||
FetchChannelCapacity: func(chanID uint64) (btcutil.Amount,
|
||||
@@ -683,7 +682,7 @@ func (r *rpcServer) addDeps(s *server, macService *macaroons.Service,
|
||||
err = subServerCgs.PopulateDependencies(
|
||||
r.cfg, s.cc, r.cfg.networkDir, macService, atpl, invoiceRegistry,
|
||||
s.htlcSwitch, r.cfg.ActiveNetParams.Params, s.chanRouter,
|
||||
routerBackend, s.nodeSigner, s.localChanDB, s.remoteChanDB,
|
||||
routerBackend, s.nodeSigner, s.graphDB, s.chanStateDB,
|
||||
s.sweeper, tower, s.towerClient, s.anchorTowerClient,
|
||||
r.cfg.net.ResolveTCPAddr, genInvoiceFeatures,
|
||||
genAmpInvoiceFeatures, rpcsLog,
|
||||
@@ -1509,7 +1508,7 @@ func (r *rpcServer) VerifyMessage(ctx context.Context,
|
||||
// channels signed the message.
|
||||
//
|
||||
// TODO(phlip9): Require valid nodes to have capital in active channels.
|
||||
graph := r.server.localChanDB.ChannelGraph()
|
||||
graph := r.server.graphDB
|
||||
_, active, err := graph.HasLightningNode(pub)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query graph: %v", err)
|
||||
@@ -1615,7 +1614,7 @@ func (r *rpcServer) DisconnectPeer(ctx context.Context,
|
||||
|
||||
// Next, we'll fetch the pending/active channels we have with a
|
||||
// particular peer.
|
||||
nodeChannels, err := r.server.remoteChanDB.FetchOpenChannels(peerPubKey)
|
||||
nodeChannels, err := r.server.chanStateDB.FetchOpenChannels(peerPubKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to fetch channels for peer: %v", err)
|
||||
}
|
||||
@@ -2125,7 +2124,7 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest,
|
||||
|
||||
// First, we'll fetch the channel as is, as we'll need to examine it
|
||||
// regardless of if this is a force close or not.
|
||||
channel, err := r.server.remoteChanDB.FetchChannel(*chanPoint)
|
||||
channel, err := r.server.chanStateDB.FetchChannel(*chanPoint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -2403,7 +2402,7 @@ func (r *rpcServer) AbandonChannel(_ context.Context,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dbChan, err := r.server.remoteChanDB.FetchChannel(*chanPoint)
|
||||
dbChan, err := r.server.chanStateDB.FetchChannel(*chanPoint)
|
||||
switch {
|
||||
// If the channel isn't found in the set of open channels, then we can
|
||||
// continue on as it can't be loaded into the link/peer.
|
||||
@@ -2450,13 +2449,11 @@ func (r *rpcServer) AbandonChannel(_ context.Context,
|
||||
// court. Between any step it's possible that the users restarts the
|
||||
// process all over again. As a result, each of the steps below are
|
||||
// intended to be idempotent.
|
||||
err = r.server.remoteChanDB.AbandonChannel(chanPoint, uint32(bestHeight))
|
||||
err = r.server.chanStateDB.AbandonChannel(chanPoint, uint32(bestHeight))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = abandonChanFromGraph(
|
||||
r.server.localChanDB.ChannelGraph(), chanPoint,
|
||||
)
|
||||
err = abandonChanFromGraph(r.server.graphDB, chanPoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -2489,7 +2486,7 @@ func (r *rpcServer) GetInfo(_ context.Context,
|
||||
|
||||
serverPeers := r.server.Peers()
|
||||
|
||||
openChannels, err := r.server.remoteChanDB.FetchAllOpenChannels()
|
||||
openChannels, err := r.server.chanStateDB.FetchAllOpenChannels()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -2504,7 +2501,7 @@ func (r *rpcServer) GetInfo(_ context.Context,
|
||||
|
||||
inactiveChannels := uint32(len(openChannels)) - activeChannels
|
||||
|
||||
pendingChannels, err := r.server.remoteChanDB.FetchPendingChannels()
|
||||
pendingChannels, err := r.server.chanStateDB.FetchPendingChannels()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to get retrieve pending "+
|
||||
"channels: %v", err)
|
||||
@@ -2905,7 +2902,7 @@ func (r *rpcServer) ChannelBalance(ctx context.Context,
|
||||
pendingOpenRemoteBalance lnwire.MilliSatoshi
|
||||
)
|
||||
|
||||
openChannels, err := r.server.remoteChanDB.FetchAllOpenChannels()
|
||||
openChannels, err := r.server.chanStateDB.FetchAllOpenChannels()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -2925,7 +2922,7 @@ func (r *rpcServer) ChannelBalance(ctx context.Context,
|
||||
}
|
||||
}
|
||||
|
||||
pendingChannels, err := r.server.remoteChanDB.FetchPendingChannels()
|
||||
pendingChannels, err := r.server.chanStateDB.FetchPendingChannels()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -2999,7 +2996,7 @@ func (r *rpcServer) PendingChannels(ctx context.Context,
|
||||
// First, we'll populate the response with all the channels that are
|
||||
// soon to be opened. We can easily fetch this data from the database
|
||||
// and map the db struct to the proto response.
|
||||
pendingOpenChannels, err := r.server.remoteChanDB.FetchPendingChannels()
|
||||
pendingOpenChannels, err := r.server.chanStateDB.FetchPendingChannels()
|
||||
if err != nil {
|
||||
rpcsLog.Errorf("unable to fetch pending channels: %v", err)
|
||||
return nil, err
|
||||
@@ -3047,7 +3044,7 @@ func (r *rpcServer) PendingChannels(ctx context.Context,
|
||||
|
||||
// Next, we'll examine the channels that are soon to be closed so we
|
||||
// can populate these fields within the response.
|
||||
pendingCloseChannels, err := r.server.remoteChanDB.FetchClosedChannels(true)
|
||||
pendingCloseChannels, err := r.server.chanStateDB.FetchClosedChannels(true)
|
||||
if err != nil {
|
||||
rpcsLog.Errorf("unable to fetch closed channels: %v", err)
|
||||
return nil, err
|
||||
@@ -3076,7 +3073,7 @@ func (r *rpcServer) PendingChannels(ctx context.Context,
|
||||
// not found, or the channel itself, this channel was closed
|
||||
// in a version before we started persisting historical
|
||||
// channels, so we silence the error.
|
||||
historical, err := r.server.remoteChanDB.FetchHistoricalChannel(
|
||||
historical, err := r.server.chanStateDB.FetchHistoricalChannel(
|
||||
&pendingClose.ChanPoint,
|
||||
)
|
||||
switch err {
|
||||
@@ -3151,7 +3148,7 @@ func (r *rpcServer) PendingChannels(ctx context.Context,
|
||||
// We'll also fetch all channels that are open, but have had their
|
||||
// commitment broadcasted, meaning they are waiting for the closing
|
||||
// transaction to confirm.
|
||||
waitingCloseChans, err := r.server.remoteChanDB.FetchWaitingCloseChannels()
|
||||
waitingCloseChans, err := r.server.chanStateDB.FetchWaitingCloseChannels()
|
||||
if err != nil {
|
||||
rpcsLog.Errorf("unable to fetch channels waiting close: %v",
|
||||
err)
|
||||
@@ -3386,7 +3383,7 @@ func (r *rpcServer) ClosedChannels(ctx context.Context,
|
||||
|
||||
resp := &lnrpc.ClosedChannelsResponse{}
|
||||
|
||||
dbChannels, err := r.server.remoteChanDB.FetchClosedChannels(false)
|
||||
dbChannels, err := r.server.chanStateDB.FetchClosedChannels(false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -3463,9 +3460,9 @@ func (r *rpcServer) ListChannels(ctx context.Context,
|
||||
|
||||
resp := &lnrpc.ListChannelsResponse{}
|
||||
|
||||
graph := r.server.localChanDB.ChannelGraph()
|
||||
graph := r.server.graphDB
|
||||
|
||||
dbChannels, err := r.server.remoteChanDB.FetchAllOpenChannels()
|
||||
dbChannels, err := r.server.chanStateDB.FetchAllOpenChannels()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -3816,7 +3813,7 @@ func (r *rpcServer) createRPCClosedChannel(
|
||||
CloseInitiator: closeInitiator,
|
||||
}
|
||||
|
||||
reports, err := r.server.remoteChanDB.FetchChannelReports(
|
||||
reports, err := r.server.chanStateDB.FetchChannelReports(
|
||||
*r.cfg.ActiveNetParams.GenesisHash, &dbChannel.ChanPoint,
|
||||
)
|
||||
switch err {
|
||||
@@ -3921,7 +3918,7 @@ func (r *rpcServer) getInitiators(chanPoint *wire.OutPoint) (
|
||||
|
||||
// To get the close initiator for cooperative closes, we need
|
||||
// to get the channel status from the historical channel bucket.
|
||||
histChan, err := r.server.remoteChanDB.FetchHistoricalChannel(chanPoint)
|
||||
histChan, err := r.server.chanStateDB.FetchHistoricalChannel(chanPoint)
|
||||
switch {
|
||||
// The node has upgraded from a version where we did not store
|
||||
// historical channels, and has not closed a channel since. Do
|
||||
@@ -3985,7 +3982,7 @@ func (r *rpcServer) SubscribeChannelEvents(req *lnrpc.ChannelEventSubscription,
|
||||
// the server, or client exits.
|
||||
defer channelEventSub.Cancel()
|
||||
|
||||
graph := r.server.localChanDB.ChannelGraph()
|
||||
graph := r.server.graphDB
|
||||
|
||||
for {
|
||||
select {
|
||||
@@ -4823,8 +4820,8 @@ func (r *rpcServer) AddInvoice(ctx context.Context,
|
||||
ChainParams: r.cfg.ActiveNetParams.Params,
|
||||
NodeSigner: r.server.nodeSigner,
|
||||
DefaultCLTVExpiry: defaultDelta,
|
||||
ChanDB: r.server.remoteChanDB,
|
||||
Graph: r.server.localChanDB.ChannelGraph(),
|
||||
ChanDB: r.server.chanStateDB,
|
||||
Graph: r.server.graphDB,
|
||||
GenInvoiceFeatures: func() *lnwire.FeatureVector {
|
||||
return r.server.featureMgr.Get(feature.SetInvoice)
|
||||
},
|
||||
@@ -4949,7 +4946,7 @@ func (r *rpcServer) ListInvoices(ctx context.Context,
|
||||
PendingOnly: req.PendingOnly,
|
||||
Reversed: req.Reversed,
|
||||
}
|
||||
invoiceSlice, err := r.server.remoteChanDB.QueryInvoices(q)
|
||||
invoiceSlice, err := r.server.chanStateDB.QueryInvoices(q)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to query invoices: %v", err)
|
||||
}
|
||||
@@ -5124,7 +5121,7 @@ func (r *rpcServer) DescribeGraph(ctx context.Context,
|
||||
// Obtain the pointer to the global singleton channel graph, this will
|
||||
// provide a consistent view of the graph due to bolt db's
|
||||
// transactional model.
|
||||
graph := r.server.localChanDB.ChannelGraph()
|
||||
graph := r.server.graphDB
|
||||
|
||||
// First iterate through all the known nodes (connected or unconnected
|
||||
// within the graph), collating their current state into the RPC
|
||||
@@ -5263,7 +5260,7 @@ func (r *rpcServer) GetNodeMetrics(ctx context.Context,
|
||||
// Obtain the pointer to the global singleton channel graph, this will
|
||||
// provide a consistent view of the graph due to bolt db's
|
||||
// transactional model.
|
||||
graph := r.server.localChanDB.ChannelGraph()
|
||||
graph := r.server.graphDB
|
||||
|
||||
// Calculate betweenness centrality if requested. Note that depending on the
|
||||
// graph size, this may take up to a few minutes.
|
||||
@@ -5302,7 +5299,7 @@ func (r *rpcServer) GetNodeMetrics(ctx context.Context,
|
||||
func (r *rpcServer) GetChanInfo(ctx context.Context,
|
||||
in *lnrpc.ChanInfoRequest) (*lnrpc.ChannelEdge, error) {
|
||||
|
||||
graph := r.server.localChanDB.ChannelGraph()
|
||||
graph := r.server.graphDB
|
||||
|
||||
edgeInfo, edge1, edge2, err := graph.FetchChannelEdgesByID(in.ChanId)
|
||||
if err != nil {
|
||||
@@ -5322,7 +5319,7 @@ func (r *rpcServer) GetChanInfo(ctx context.Context,
|
||||
func (r *rpcServer) GetNodeInfo(ctx context.Context,
|
||||
in *lnrpc.NodeInfoRequest) (*lnrpc.NodeInfo, error) {
|
||||
|
||||
graph := r.server.localChanDB.ChannelGraph()
|
||||
graph := r.server.graphDB
|
||||
|
||||
// First, parse the hex-encoded public key into a full in-memory public
|
||||
// key object we can work with for querying.
|
||||
@@ -5423,7 +5420,7 @@ func (r *rpcServer) QueryRoutes(ctx context.Context,
|
||||
func (r *rpcServer) GetNetworkInfo(ctx context.Context,
|
||||
_ *lnrpc.NetworkInfoRequest) (*lnrpc.NetworkInfo, error) {
|
||||
|
||||
graph := r.server.localChanDB.ChannelGraph()
|
||||
graph := r.server.graphDB
|
||||
|
||||
var (
|
||||
numNodes uint32
|
||||
@@ -5735,7 +5732,7 @@ func (r *rpcServer) ListPayments(ctx context.Context,
|
||||
query.MaxPayments = math.MaxUint64
|
||||
}
|
||||
|
||||
paymentsQuerySlice, err := r.server.remoteChanDB.QueryPayments(query)
|
||||
paymentsQuerySlice, err := r.server.chanStateDB.QueryPayments(query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -5770,7 +5767,7 @@ func (r *rpcServer) DeleteAllPayments(ctx context.Context,
|
||||
"failed_htlcs_only=%v", req.FailedPaymentsOnly,
|
||||
req.FailedHtlcsOnly)
|
||||
|
||||
err := r.server.remoteChanDB.DeletePayments(
|
||||
err := r.server.chanStateDB.DeletePayments(
|
||||
req.FailedPaymentsOnly, req.FailedHtlcsOnly,
|
||||
)
|
||||
if err != nil {
|
||||
@@ -5893,7 +5890,7 @@ func (r *rpcServer) FeeReport(ctx context.Context,
|
||||
|
||||
rpcsLog.Debugf("[feereport]")
|
||||
|
||||
channelGraph := r.server.localChanDB.ChannelGraph()
|
||||
channelGraph := r.server.graphDB
|
||||
selfNode, err := channelGraph.SourceNode()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -5932,7 +5929,7 @@ func (r *rpcServer) FeeReport(ctx context.Context,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fwdEventLog := r.server.remoteChanDB.ForwardingLog()
|
||||
fwdEventLog := r.server.chanStateDB.ForwardingLog()
|
||||
|
||||
// computeFeeSum is a helper function that computes the total fees for
|
||||
// a particular time slice described by a forwarding event query.
|
||||
@@ -6170,7 +6167,7 @@ func (r *rpcServer) ForwardingHistory(ctx context.Context,
|
||||
IndexOffset: req.IndexOffset,
|
||||
NumMaxEvents: numEvents,
|
||||
}
|
||||
timeSlice, err := r.server.remoteChanDB.ForwardingLog().Query(eventQuery)
|
||||
timeSlice, err := r.server.chanStateDB.ForwardingLog().Query(eventQuery)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to query forwarding log: %v", err)
|
||||
}
|
||||
@@ -6232,7 +6229,7 @@ func (r *rpcServer) ExportChannelBackup(ctx context.Context,
|
||||
// the database. If this channel has been closed, or the outpoint is
|
||||
// unknown, then we'll return an error
|
||||
unpackedBackup, err := chanbackup.FetchBackupForChan(
|
||||
chanPoint, r.server.remoteChanDB,
|
||||
chanPoint, r.server.chanStateDB,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -6402,7 +6399,7 @@ func (r *rpcServer) ExportAllChannelBackups(ctx context.Context,
|
||||
// First, we'll attempt to read back ups for ALL currently opened
|
||||
// channels from disk.
|
||||
allUnpackedBackups, err := chanbackup.FetchStaticChanBackups(
|
||||
r.server.remoteChanDB,
|
||||
r.server.chanStateDB,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to fetch all static chan "+
|
||||
@@ -6425,7 +6422,7 @@ func (r *rpcServer) RestoreChannelBackups(ctx context.Context,
|
||||
// restore either a set of chanbackup.Single or chanbackup.Multi
|
||||
// backups.
|
||||
chanRestorer := &chanDBRestorer{
|
||||
db: r.server.remoteChanDB,
|
||||
db: r.server.chanStateDB,
|
||||
secretKeys: r.server.cc.KeyRing,
|
||||
chainArb: r.server.chainArb,
|
||||
}
|
||||
@@ -6523,7 +6520,7 @@ func (r *rpcServer) SubscribeChannelBackups(req *lnrpc.ChannelBackupSubscription
|
||||
// we'll obtains the current set of single channel
|
||||
// backups from disk.
|
||||
chanBackups, err := chanbackup.FetchStaticChanBackups(
|
||||
r.server.remoteChanDB,
|
||||
r.server.chanStateDB,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to fetch all "+
|
||||
|
Reference in New Issue
Block a user