From 1ee4bb8c519a69edb721a480edb81b8223a4a147 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 18 Feb 2025 13:19:50 -0300 Subject: [PATCH] graph/db: rename ChannelGraph and introduce the new ChannelGraph layer In this commit, we rename the existing ChannelGraph struct to KVStore to better reflect its responsibilities as the CRUD layer. We then introduce a new ChannelGraph struct which will eventually be the layer above the CRUD layer in which we will handle cacheing and topology subscriptions. For now, however, it houses only the KVStore. This means that all calls to the KVStore will now go through this layer of indirection first. This will allow us to slowly move the graph Cache management out of the KVStore and into the new ChannelGraph layer. We introduce the new ChannelGraph and rename the old one in the same commit so that all existing call-sites don't need to change at all :) --- graph/db/graph.go | 26 ++++++++ graph/db/kv_store.go | 150 +++++++++++++++++++++---------------------- 2 files changed, 101 insertions(+), 75 deletions(-) create mode 100644 graph/db/graph.go diff --git a/graph/db/graph.go b/graph/db/graph.go new file mode 100644 index 000000000..217302ca9 --- /dev/null +++ b/graph/db/graph.go @@ -0,0 +1,26 @@ +package graphdb + +import "github.com/lightningnetwork/lnd/kvdb" + +// ChannelGraph is a layer above the graph's CRUD layer. +// +// NOTE: currently, this is purely a pass-through layer directly to the backing +// KVStore. Upcoming commits will move the graph cache out of the KVStore and +// into this layer so that the KVStore is only responsible for CRUD operations. +type ChannelGraph struct { + *KVStore +} + +// NewChannelGraph creates a new ChannelGraph instance with the given backend. +func NewChannelGraph(db kvdb.Backend, options ...OptionModifier) (*ChannelGraph, + error) { + + store, err := NewKVStore(db, options...) + if err != nil { + return nil, err + } + + return &ChannelGraph{ + KVStore: store, + }, nil +} diff --git a/graph/db/kv_store.go b/graph/db/kv_store.go index 0a643144e..dfe856419 100644 --- a/graph/db/kv_store.go +++ b/graph/db/kv_store.go @@ -171,7 +171,7 @@ const ( MaxAllowedExtraOpaqueBytes = 10000 ) -// ChannelGraph is a persistent, on-disk graph representation of the Lightning +// KVStore is a persistent, on-disk graph representation of the Lightning // Network. This struct can be used to implement path finding algorithms on top // of, and also to update a node's view based on information received from the // p2p network. Internally, the graph is stored using a modified adjacency list @@ -181,7 +181,7 @@ const ( // Nodes, edges, and edge information can all be added to the graph // independently. Edge removal results in the deletion of all edge information // for that edge. -type ChannelGraph struct { +type KVStore struct { db kvdb.Backend // cacheMu guards all caches (rejectCache, chanCache, graphCache). If @@ -196,9 +196,9 @@ type ChannelGraph struct { nodeScheduler batch.Scheduler } -// NewChannelGraph allocates a new ChannelGraph backed by a DB instance. The +// NewKVStore allocates a new KVStore backed by a DB instance. The // returned instance has its own unique reject cache and channel cache. -func NewChannelGraph(db kvdb.Backend, options ...OptionModifier) (*ChannelGraph, +func NewKVStore(db kvdb.Backend, options ...OptionModifier) (*KVStore, error) { opts := DefaultOptions() @@ -207,12 +207,12 @@ func NewChannelGraph(db kvdb.Backend, options ...OptionModifier) (*ChannelGraph, } if !opts.NoMigration { - if err := initChannelGraph(db); err != nil { + if err := initKVStore(db); err != nil { return nil, err } } - g := &ChannelGraph{ + g := &KVStore{ db: db, rejectCache: newRejectCache(opts.RejectCacheSize), chanCache: newChannelCache(opts.ChannelCacheSize), @@ -269,7 +269,7 @@ type channelMapKey struct { // getChannelMap loads all channel edge policies from the database and stores // them in a map. -func (c *ChannelGraph) getChannelMap(edges kvdb.RBucket) ( +func (c *KVStore) getChannelMap(edges kvdb.RBucket) ( map[channelMapKey]*models.ChannelEdgePolicy, error) { // Create a map to store all channel edge policies. @@ -336,7 +336,7 @@ var graphTopLevelBuckets = [][]byte{ // Wipe completely deletes all saved state within all used buckets within the // database. The deletion is done in a single transaction, therefore this // operation is fully atomic. -func (c *ChannelGraph) Wipe() error { +func (c *KVStore) Wipe() error { err := kvdb.Update(c.db, func(tx kvdb.RwTx) error { for _, tlb := range graphTopLevelBuckets { err := tx.DeleteTopLevelBucket(tlb) @@ -350,14 +350,14 @@ func (c *ChannelGraph) Wipe() error { return err } - return initChannelGraph(c.db) + return initKVStore(c.db) } // createChannelDB creates and initializes a fresh version of In // the case that the target path has not yet been created or doesn't yet exist, // then the path is created. Additionally, all required top-level buckets used // within the database are created. -func initChannelGraph(db kvdb.Backend) error { +func initKVStore(db kvdb.Backend) error { err := kvdb.Update(db, func(tx kvdb.RwTx) error { for _, tlb := range graphTopLevelBuckets { if _, err := tx.CreateTopLevelBucket(tlb); err != nil { @@ -409,7 +409,7 @@ func initChannelGraph(db kvdb.Backend) error { // unknown to the graph DB or not. // // NOTE: this is part of the channeldb.AddrSource interface. -func (c *ChannelGraph) AddrsForNode(nodePub *btcec.PublicKey) (bool, []net.Addr, +func (c *KVStore) AddrsForNode(nodePub *btcec.PublicKey) (bool, []net.Addr, error) { pubKey, err := route.NewVertexFromBytes(nodePub.SerializeCompressed()) @@ -439,7 +439,7 @@ func (c *ChannelGraph) AddrsForNode(nodePub *btcec.PublicKey) (bool, []net.Addr, // NOTE: If an edge can't be found, or wasn't advertised, then a nil pointer // for that particular channel edge routing policy will be passed into the // callback. -func (c *ChannelGraph) ForEachChannel(cb func(*models.ChannelEdgeInfo, +func (c *KVStore) ForEachChannel(cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error { return c.db.View(func(tx kvdb.RTx) error { @@ -498,7 +498,7 @@ func (c *ChannelGraph) ForEachChannel(cb func(*models.ChannelEdgeInfo, // transaction may be provided. If none is provided, a new one will be created. // // Unknown policies are passed into the callback as nil values. -func (c *ChannelGraph) forEachNodeDirectedChannel(tx kvdb.RTx, +func (c *KVStore) forEachNodeDirectedChannel(tx kvdb.RTx, node route.Vertex, cb func(channel *DirectedChannel) error) error { if c.graphCache != nil { @@ -556,7 +556,7 @@ func (c *ChannelGraph) forEachNodeDirectedChannel(tx kvdb.RTx, // fetchNodeFeatures returns the features of a given node. If no features are // known for the node, an empty feature vector is returned. An optional read // transaction may be provided. If none is provided, a new one will be created. -func (c *ChannelGraph) fetchNodeFeatures(tx kvdb.RTx, +func (c *KVStore) fetchNodeFeatures(tx kvdb.RTx, node route.Vertex) (*lnwire.FeatureVector, error) { if c.graphCache != nil { @@ -591,7 +591,7 @@ func (c *ChannelGraph) fetchNodeFeatures(tx kvdb.RTx, // Unknown policies are passed into the callback as nil values. // // NOTE: this is part of the graphdb.NodeTraverser interface. -func (c *ChannelGraph) ForEachNodeDirectedChannel(nodePub route.Vertex, +func (c *KVStore) ForEachNodeDirectedChannel(nodePub route.Vertex, cb func(channel *DirectedChannel) error) error { return c.forEachNodeDirectedChannel(nil, nodePub, cb) @@ -603,7 +603,7 @@ func (c *ChannelGraph) ForEachNodeDirectedChannel(nodePub route.Vertex, // features instead of the database. // // NOTE: this is part of the graphdb.NodeTraverser interface. -func (c *ChannelGraph) FetchNodeFeatures(nodePub route.Vertex) ( +func (c *KVStore) FetchNodeFeatures(nodePub route.Vertex) ( *lnwire.FeatureVector, error) { return c.fetchNodeFeatures(nil, nodePub) @@ -614,7 +614,7 @@ func (c *ChannelGraph) FetchNodeFeatures(nodePub route.Vertex) ( // regular forEachNode method does. // // NOTE: The callback contents MUST not be modified. -func (c *ChannelGraph) ForEachNodeCached(cb func(node route.Vertex, +func (c *KVStore) ForEachNodeCached(cb func(node route.Vertex, chans map[uint64]*DirectedChannel) error) error { if c.graphCache != nil { @@ -685,7 +685,7 @@ func (c *ChannelGraph) ForEachNodeCached(cb func(node route.Vertex, // DisabledChannelIDs returns the channel ids of disabled channels. // A channel is disabled when two of the associated ChanelEdgePolicies // have their disabled bit on. -func (c *ChannelGraph) DisabledChannelIDs() ([]uint64, error) { +func (c *KVStore) DisabledChannelIDs() ([]uint64, error) { var disabledChanIDs []uint64 var chanEdgeFound map[uint64]struct{} @@ -740,7 +740,7 @@ func (c *ChannelGraph) DisabledChannelIDs() ([]uint64, error) { // early. Any operations performed on the NodeTx passed to the call-back are // executed under the same read transaction and so, methods on the NodeTx object // _MUST_ only be called from within the call-back. -func (c *ChannelGraph) ForEachNode(cb func(tx NodeRTx) error) error { +func (c *KVStore) ForEachNode(cb func(tx NodeRTx) error) error { return c.forEachNode(func(tx kvdb.RTx, node *models.LightningNode) error { @@ -755,7 +755,7 @@ func (c *ChannelGraph) ForEachNode(cb func(tx NodeRTx) error) error { // // TODO(roasbeef): add iterator interface to allow for memory efficient graph // traversal when graph gets mega. -func (c *ChannelGraph) forEachNode( +func (c *KVStore) forEachNode( cb func(kvdb.RTx, *models.LightningNode) error) error { traversal := func(tx kvdb.RTx) error { @@ -793,7 +793,7 @@ func (c *ChannelGraph) forEachNode( // graph, executing the passed callback with each node encountered. If the // callback returns an error, then the transaction is aborted and the iteration // stops early. -func (c *ChannelGraph) ForEachNodeCacheable(cb func(route.Vertex, +func (c *KVStore) ForEachNodeCacheable(cb func(route.Vertex, *lnwire.FeatureVector) error) error { traversal := func(tx kvdb.RTx) error { @@ -833,7 +833,7 @@ func (c *ChannelGraph) ForEachNodeCacheable(cb func(route.Vertex, // as the center node within a star-graph. This method may be used to kick off // a path finding algorithm in order to explore the reachability of another // node based off the source node. -func (c *ChannelGraph) SourceNode() (*models.LightningNode, error) { +func (c *KVStore) SourceNode() (*models.LightningNode, error) { var source *models.LightningNode err := kvdb.View(c.db, func(tx kvdb.RTx) error { // First grab the nodes bucket which stores the mapping from @@ -864,7 +864,7 @@ func (c *ChannelGraph) SourceNode() (*models.LightningNode, error) { // of the graph. The source node is treated as the center node within a // star-graph. This method may be used to kick off a path finding algorithm in // order to explore the reachability of another node based off the source node. -func (c *ChannelGraph) sourceNode(nodes kvdb.RBucket) (*models.LightningNode, +func (c *KVStore) sourceNode(nodes kvdb.RBucket) (*models.LightningNode, error) { selfPub := nodes.Get(sourceKey) @@ -885,7 +885,7 @@ func (c *ChannelGraph) sourceNode(nodes kvdb.RBucket) (*models.LightningNode, // SetSourceNode sets the source node within the graph database. The source // node is to be used as the center of a star-graph within path finding // algorithms. -func (c *ChannelGraph) SetSourceNode(node *models.LightningNode) error { +func (c *KVStore) SetSourceNode(node *models.LightningNode) error { nodePubBytes := node.PubKeyBytes[:] return kvdb.Update(c.db, func(tx kvdb.RwTx) error { @@ -916,7 +916,7 @@ func (c *ChannelGraph) SetSourceNode(node *models.LightningNode) error { // channel update. // // TODO(roasbeef): also need sig of announcement -func (c *ChannelGraph) AddLightningNode(node *models.LightningNode, +func (c *KVStore) AddLightningNode(node *models.LightningNode, op ...batch.SchedulerOption) error { r := &batch.Request{ @@ -961,7 +961,7 @@ func addLightningNode(tx kvdb.RwTx, node *models.LightningNode) error { // LookupAlias attempts to return the alias as advertised by the target node. // TODO(roasbeef): currently assumes that aliases are unique... -func (c *ChannelGraph) LookupAlias(pub *btcec.PublicKey) (string, error) { +func (c *KVStore) LookupAlias(pub *btcec.PublicKey) (string, error) { var alias string err := kvdb.View(c.db, func(tx kvdb.RTx) error { @@ -997,7 +997,7 @@ func (c *ChannelGraph) LookupAlias(pub *btcec.PublicKey) (string, error) { // DeleteLightningNode starts a new database transaction to remove a vertex/node // from the database according to the node's public key. -func (c *ChannelGraph) DeleteLightningNode(nodePub route.Vertex) error { +func (c *KVStore) DeleteLightningNode(nodePub route.Vertex) error { // TODO(roasbeef): ensure dangling edges are removed... return kvdb.Update(c.db, func(tx kvdb.RwTx) error { nodes := tx.ReadWriteBucket(nodeBucket) @@ -1015,7 +1015,7 @@ func (c *ChannelGraph) DeleteLightningNode(nodePub route.Vertex) error { // deleteLightningNode uses an existing database transaction to remove a // vertex/node from the database according to the node's public key. -func (c *ChannelGraph) deleteLightningNode(nodes kvdb.RwBucket, +func (c *KVStore) deleteLightningNode(nodes kvdb.RwBucket, compressedPubKey []byte) error { aliases := nodes.NestedReadWriteBucket(aliasIndexBucket) @@ -1063,7 +1063,7 @@ func (c *ChannelGraph) deleteLightningNode(nodes kvdb.RwBucket, // involved in creation of the channel, and the set of features that the channel // supports. The chanPoint and chanID are used to uniquely identify the edge // globally within the database. -func (c *ChannelGraph) AddChannelEdge(edge *models.ChannelEdgeInfo, +func (c *KVStore) AddChannelEdge(edge *models.ChannelEdgeInfo, op ...batch.SchedulerOption) error { var alreadyExists bool @@ -1110,7 +1110,7 @@ func (c *ChannelGraph) AddChannelEdge(edge *models.ChannelEdgeInfo, // addChannelEdge is the private form of AddChannelEdge that allows callers to // utilize an existing db transaction. -func (c *ChannelGraph) addChannelEdge(tx kvdb.RwTx, +func (c *KVStore) addChannelEdge(tx kvdb.RwTx, edge *models.ChannelEdgeInfo) error { // Construct the channel's primary key which is the 8-byte channel ID. @@ -1215,7 +1215,7 @@ func (c *ChannelGraph) addChannelEdge(tx kvdb.RwTx, // was updated for both directed edges are returned along with the boolean. If // it is not found, then the zombie index is checked and its result is returned // as the second boolean. -func (c *ChannelGraph) HasChannelEdge( +func (c *KVStore) HasChannelEdge( chanID uint64) (time.Time, time.Time, bool, bool, error) { var ( @@ -1319,7 +1319,7 @@ func (c *ChannelGraph) HasChannelEdge( } // AddEdgeProof sets the proof of an existing edge in the graph database. -func (c *ChannelGraph) AddEdgeProof(chanID lnwire.ShortChannelID, +func (c *KVStore) AddEdgeProof(chanID lnwire.ShortChannelID, proof *models.ChannelAuthProof) error { // Construct the channel's primary key which is the 8-byte channel ID. @@ -1364,7 +1364,7 @@ const ( // prune the graph is stored so callers can ensure the graph is fully in sync // with the current UTXO state. A slice of channels that have been closed by // the target block are returned if the function succeeds without error. -func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint, +func (c *KVStore) PruneGraph(spentOutputs []*wire.OutPoint, blockHash *chainhash.Hash, blockHeight uint32) ( []*models.ChannelEdgeInfo, error) { @@ -1499,7 +1499,7 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint, // any nodes from the channel graph that are currently unconnected. This ensure // that we only maintain a graph of reachable nodes. In the event that a pruned // node gains more channels, it will be re-added back to the graph. -func (c *ChannelGraph) PruneGraphNodes() error { +func (c *KVStore) PruneGraphNodes() error { return kvdb.Update(c.db, func(tx kvdb.RwTx) error { nodes := tx.ReadWriteBucket(nodeBucket) if nodes == nil { @@ -1521,7 +1521,7 @@ func (c *ChannelGraph) PruneGraphNodes() error { // pruneGraphNodes attempts to remove any nodes from the graph who have had a // channel closed within the current block. If the node still has existing // channels in the graph, this will act as a no-op. -func (c *ChannelGraph) pruneGraphNodes(nodes kvdb.RwBucket, +func (c *KVStore) pruneGraphNodes(nodes kvdb.RwBucket, edgeIndex kvdb.RwBucket) error { log.Trace("Pruning nodes from graph with no open channels") @@ -1632,7 +1632,7 @@ func (c *ChannelGraph) pruneGraphNodes(nodes kvdb.RwBucket, // set to the last prune height valid for the remaining chain. // Channels that were removed from the graph resulting from the // disconnected block are returned. -func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) ( +func (c *KVStore) DisconnectBlockAtHeight(height uint32) ( []*models.ChannelEdgeInfo, error) { // Every channel having a ShortChannelID starting at 'height' @@ -1764,7 +1764,7 @@ func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) ( // used to prune channels in the graph. Knowing the "prune tip" allows callers // to tell if the graph is currently in sync with the current best known UTXO // state. -func (c *ChannelGraph) PruneTip() (*chainhash.Hash, uint32, error) { +func (c *KVStore) PruneTip() (*chainhash.Hash, uint32, error) { var ( tipHash chainhash.Hash tipHeight uint32 @@ -1811,7 +1811,7 @@ func (c *ChannelGraph) PruneTip() (*chainhash.Hash, uint32, error) { // that we require the node that failed to send the fresh update to be the one // that resurrects the channel from its zombie state. The markZombie bool // denotes whether or not to mark the channel as a zombie. -func (c *ChannelGraph) DeleteChannelEdges(strictZombiePruning, markZombie bool, +func (c *KVStore) DeleteChannelEdges(strictZombiePruning, markZombie bool, chanIDs ...uint64) error { // TODO(roasbeef): possibly delete from node bucket if node has no more @@ -1872,7 +1872,7 @@ func (c *ChannelGraph) DeleteChannelEdges(strictZombiePruning, markZombie bool, // ChannelID attempt to lookup the 8-byte compact channel ID which maps to the // passed channel point (outpoint). If the passed channel doesn't exist within // the database, then ErrEdgeNotFound is returned. -func (c *ChannelGraph) ChannelID(chanPoint *wire.OutPoint) (uint64, error) { +func (c *KVStore) ChannelID(chanPoint *wire.OutPoint) (uint64, error) { var chanID uint64 if err := kvdb.View(c.db, func(tx kvdb.RTx) error { var err error @@ -1918,7 +1918,7 @@ func getChanID(tx kvdb.RTx, chanPoint *wire.OutPoint) (uint64, error) { // HighestChanID returns the "highest" known channel ID in the channel graph. // This represents the "newest" channel from the PoV of the chain. This method // can be used by peers to quickly determine if they're graphs are in sync. -func (c *ChannelGraph) HighestChanID() (uint64, error) { +func (c *KVStore) HighestChanID() (uint64, error) { var cid uint64 err := kvdb.View(c.db, func(tx kvdb.RTx) error { @@ -1983,7 +1983,7 @@ type ChannelEdge struct { // ChanUpdatesInHorizon returns all the known channel edges which have at least // one edge that has an update timestamp within the specified horizon. -func (c *ChannelGraph) ChanUpdatesInHorizon(startTime, +func (c *KVStore) ChanUpdatesInHorizon(startTime, endTime time.Time) ([]ChannelEdge, error) { // To ensure we don't return duplicate ChannelEdges, we'll use an @@ -2135,7 +2135,7 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime, // update timestamp within the passed range. This method can be used by two // nodes to quickly determine if they have the same set of up to date node // announcements. -func (c *ChannelGraph) NodeUpdatesInHorizon(startTime, +func (c *KVStore) NodeUpdatesInHorizon(startTime, endTime time.Time) ([]models.LightningNode, error) { var nodesInHorizon []models.LightningNode @@ -2202,7 +2202,7 @@ func (c *ChannelGraph) NodeUpdatesInHorizon(startTime, // words, we perform a set difference of our set of chan ID's and the ones // passed in. This method can be used by callers to determine the set of // channels another peer knows of that we don't. -func (c *ChannelGraph) FilterKnownChanIDs(chansInfo []ChannelUpdateInfo, +func (c *KVStore) FilterKnownChanIDs(chansInfo []ChannelUpdateInfo, isZombieChan func(time.Time, time.Time) bool) ([]uint64, error) { var newChanIDs []uint64 @@ -2369,7 +2369,7 @@ type BlockChannelRange struct { // up after a period of time offline. If withTimestamps is true then the // timestamp info of the latest received channel update messages of the channel // will be included in the response. -func (c *ChannelGraph) FilterChannelRange(startHeight, +func (c *KVStore) FilterChannelRange(startHeight, endHeight uint32, withTimestamps bool) ([]BlockChannelRange, error) { startChanID := &lnwire.ShortChannelID{ @@ -2514,7 +2514,7 @@ func (c *ChannelGraph) FilterChannelRange(startHeight, // skipped and the result will contain only those edges that exist at the time // of the query. This can be used to respond to peer queries that are seeking to // fill in gaps in their view of the channel graph. -func (c *ChannelGraph) FetchChanInfos(chanIDs []uint64) ([]ChannelEdge, error) { +func (c *KVStore) FetchChanInfos(chanIDs []uint64) ([]ChannelEdge, error) { return c.fetchChanInfos(nil, chanIDs) } @@ -2526,7 +2526,7 @@ func (c *ChannelGraph) FetchChanInfos(chanIDs []uint64) ([]ChannelEdge, error) { // // NOTE: An optional transaction may be provided. If none is provided, then a // new one will be created. -func (c *ChannelGraph) fetchChanInfos(tx kvdb.RTx, chanIDs []uint64) ( +func (c *KVStore) fetchChanInfos(tx kvdb.RTx, chanIDs []uint64) ( []ChannelEdge, error) { // TODO(roasbeef): sort cids? @@ -2667,7 +2667,7 @@ func delEdgeUpdateIndexEntry(edgesBucket kvdb.RwBucket, chanID uint64, // // NOTE: this method MUST only be called if the cacheMu has already been // acquired. -func (c *ChannelGraph) delChannelEdgeUnsafe(edges, edgeIndex, chanIndex, +func (c *KVStore) delChannelEdgeUnsafe(edges, edgeIndex, chanIndex, zombieIndex kvdb.RwBucket, chanID []byte, isZombie, strictZombie bool) error { @@ -2806,7 +2806,7 @@ func makeZombiePubkeys(info *models.ChannelEdgeInfo, // updated, otherwise it's the second node's information. The node ordering is // determined by the lexicographical ordering of the identity public keys of the // nodes on either side of the channel. -func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy, +func (c *KVStore) UpdateEdgePolicy(edge *models.ChannelEdgePolicy, op ...batch.SchedulerOption) error { var ( @@ -2858,7 +2858,7 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy, return c.chanScheduler.Execute(r) } -func (c *ChannelGraph) updateEdgeCache(e *models.ChannelEdgePolicy, +func (c *KVStore) updateEdgeCache(e *models.ChannelEdgePolicy, isUpdate1 bool) { // If an entry for this channel is found in reject cache, we'll modify @@ -2956,7 +2956,7 @@ func updateEdgePolicy(tx kvdb.RwTx, edge *models.ChannelEdgePolicy, // isPublic determines whether the node is seen as public within the graph from // the source node's point of view. An existing database transaction can also be // specified. -func (c *ChannelGraph) isPublic(tx kvdb.RTx, nodePub route.Vertex, +func (c *KVStore) isPublic(tx kvdb.RTx, nodePub route.Vertex, sourcePubKey []byte) (bool, error) { // In order to determine whether this node is publicly advertised within @@ -3001,7 +3001,7 @@ func (c *ChannelGraph) isPublic(tx kvdb.RTx, nodePub route.Vertex, // public key. If the node isn't found in the database, then // ErrGraphNodeNotFound is returned. An optional transaction may be provided. // If none is provided, then a new one will be created. -func (c *ChannelGraph) FetchLightningNodeTx(tx kvdb.RTx, nodePub route.Vertex) ( +func (c *KVStore) FetchLightningNodeTx(tx kvdb.RTx, nodePub route.Vertex) ( *models.LightningNode, error) { return c.fetchLightningNode(tx, nodePub) @@ -3010,7 +3010,7 @@ func (c *ChannelGraph) FetchLightningNodeTx(tx kvdb.RTx, nodePub route.Vertex) ( // FetchLightningNode attempts to look up a target node by its identity public // key. If the node isn't found in the database, then ErrGraphNodeNotFound is // returned. -func (c *ChannelGraph) FetchLightningNode(nodePub route.Vertex) ( +func (c *KVStore) FetchLightningNode(nodePub route.Vertex) ( *models.LightningNode, error) { return c.fetchLightningNode(nil, nodePub) @@ -3020,7 +3020,7 @@ func (c *ChannelGraph) FetchLightningNode(nodePub route.Vertex) ( // key. If the node isn't found in the database, then ErrGraphNodeNotFound is // returned. An optional transaction may be provided. If none is provided, then // a new one will be created. -func (c *ChannelGraph) fetchLightningNode(tx kvdb.RTx, +func (c *KVStore) fetchLightningNode(tx kvdb.RTx, nodePub route.Vertex) (*models.LightningNode, error) { var node *models.LightningNode @@ -3078,7 +3078,7 @@ func (c *ChannelGraph) fetchLightningNode(tx kvdb.RTx, // timestamp of when the data for the node was lasted updated is returned along // with a true boolean. Otherwise, an empty time.Time is returned with a false // boolean. -func (c *ChannelGraph) HasLightningNode(nodePub [33]byte) (time.Time, bool, +func (c *KVStore) HasLightningNode(nodePub [33]byte) (time.Time, bool, error) { var ( @@ -3216,7 +3216,7 @@ func nodeTraversal(tx kvdb.RTx, nodePub []byte, db kvdb.Backend, // halted with the error propagated back up to the caller. // // Unknown policies are passed into the callback as nil values. -func (c *ChannelGraph) ForEachNodeChannel(nodePub route.Vertex, +func (c *KVStore) ForEachNodeChannel(nodePub route.Vertex, cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error { @@ -3236,7 +3236,7 @@ func (c *ChannelGraph) ForEachNodeChannel(nodePub route.Vertex, // should be passed as the first argument. Otherwise, the first argument should // be nil and a fresh transaction will be created to execute the graph // traversal. -func (c *ChannelGraph) ForEachNodeChannelTx(tx kvdb.RTx, +func (c *KVStore) ForEachNodeChannelTx(tx kvdb.RTx, nodePub route.Vertex, cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error { @@ -3248,7 +3248,7 @@ func (c *ChannelGraph) ForEachNodeChannelTx(tx kvdb.RTx, // the target node in the channel. This is useful when one knows the pubkey of // one of the nodes, and wishes to obtain the full LightningNode for the other // end of the channel. -func (c *ChannelGraph) FetchOtherNode(tx kvdb.RTx, +func (c *KVStore) FetchOtherNode(tx kvdb.RTx, channel *models.ChannelEdgeInfo, thisNodeKey []byte) ( *models.LightningNode, error) { @@ -3319,7 +3319,7 @@ func computeEdgePolicyKeys(info *models.ChannelEdgeInfo) ([]byte, []byte) { // found, then ErrEdgeNotFound is returned. A struct which houses the general // information for the channel itself is returned as well as two structs that // contain the routing policies for the channel in either direction. -func (c *ChannelGraph) FetchChannelEdgesByOutpoint(op *wire.OutPoint) ( +func (c *KVStore) FetchChannelEdgesByOutpoint(op *wire.OutPoint) ( *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) { @@ -3404,7 +3404,7 @@ func (c *ChannelGraph) FetchChannelEdgesByOutpoint(op *wire.OutPoint) ( // ErrZombieEdge an be returned if the edge is currently marked as a zombie // within the database. In this case, the ChannelEdgePolicy's will be nil, and // the ChannelEdgeInfo will only include the public keys of each node. -func (c *ChannelGraph) FetchChannelEdgesByID(chanID uint64) ( +func (c *KVStore) FetchChannelEdgesByID(chanID uint64) ( *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) { @@ -3506,7 +3506,7 @@ func (c *ChannelGraph) FetchChannelEdgesByID(chanID uint64) ( // IsPublicNode is a helper method that determines whether the node with the // given public key is seen as a public node in the graph from the graph's // source node's point of view. -func (c *ChannelGraph) IsPublicNode(pubKey [33]byte) (bool, error) { +func (c *KVStore) IsPublicNode(pubKey [33]byte) (bool, error) { var nodeIsPublic bool err := kvdb.View(c.db, func(tx kvdb.RTx) error { nodes := tx.ReadBucket(nodeBucket) @@ -3576,7 +3576,7 @@ func (e *EdgePoint) String() string { // within the known channel graph. The set of UTXO's (along with their scripts) // returned are the ones that need to be watched on chain to detect channel // closes on the resident blockchain. -func (c *ChannelGraph) ChannelView() ([]EdgePoint, error) { +func (c *KVStore) ChannelView() ([]EdgePoint, error) { var edgePoints []EdgePoint if err := kvdb.View(c.db, func(tx kvdb.RTx) error { // We're going to iterate over the entire channel index, so @@ -3645,7 +3645,7 @@ func (c *ChannelGraph) ChannelView() ([]EdgePoint, error) { // MarkEdgeZombie attempts to mark a channel identified by its channel ID as a // zombie. This method is used on an ad-hoc basis, when channels need to be // marked as zombies outside the normal pruning cycle. -func (c *ChannelGraph) MarkEdgeZombie(chanID uint64, +func (c *KVStore) MarkEdgeZombie(chanID uint64, pubKey1, pubKey2 [33]byte) error { c.cacheMu.Lock() @@ -3695,7 +3695,7 @@ func markEdgeZombie(zombieIndex kvdb.RwBucket, chanID uint64, pubKey1, } // MarkEdgeLive clears an edge from our zombie index, deeming it as live. -func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error { +func (c *KVStore) MarkEdgeLive(chanID uint64) error { c.cacheMu.Lock() defer c.cacheMu.Unlock() @@ -3708,7 +3708,7 @@ func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error { // // NOTE: this method MUST only be called if the cacheMu has already been // acquired. -func (c *ChannelGraph) markEdgeLiveUnsafe(tx kvdb.RwTx, chanID uint64) error { +func (c *KVStore) markEdgeLiveUnsafe(tx kvdb.RwTx, chanID uint64) error { dbFn := func(tx kvdb.RwTx) error { edges := tx.ReadWriteBucket(edgeBucket) if edges == nil { @@ -3766,7 +3766,7 @@ func (c *ChannelGraph) markEdgeLiveUnsafe(tx kvdb.RwTx, chanID uint64) error { // IsZombieEdge returns whether the edge is considered zombie. If it is a // zombie, then the two node public keys corresponding to this edge are also // returned. -func (c *ChannelGraph) IsZombieEdge(chanID uint64) (bool, [33]byte, [33]byte) { +func (c *KVStore) IsZombieEdge(chanID uint64) (bool, [33]byte, [33]byte) { var ( isZombie bool pubKey1, pubKey2 [33]byte @@ -3818,7 +3818,7 @@ func isZombieEdge(zombieIndex kvdb.RBucket, } // NumZombies returns the current number of zombie channels in the graph. -func (c *ChannelGraph) NumZombies() (uint64, error) { +func (c *KVStore) NumZombies() (uint64, error) { var numZombies uint64 err := kvdb.View(c.db, func(tx kvdb.RTx) error { edges := tx.ReadBucket(edgeBucket) @@ -3847,7 +3847,7 @@ func (c *ChannelGraph) NumZombies() (uint64, error) { // PutClosedScid stores a SCID for a closed channel in the database. This is so // that we can ignore channel announcements that we know to be closed without // having to validate them and fetch a block. -func (c *ChannelGraph) PutClosedScid(scid lnwire.ShortChannelID) error { +func (c *KVStore) PutClosedScid(scid lnwire.ShortChannelID) error { return kvdb.Update(c.db, func(tx kvdb.RwTx) error { closedScids, err := tx.CreateTopLevelBucket(closedScidBucket) if err != nil { @@ -3864,7 +3864,7 @@ func (c *ChannelGraph) PutClosedScid(scid lnwire.ShortChannelID) error { // IsClosedScid checks whether a channel identified by the passed in scid is // closed. This helps avoid having to perform expensive validation checks. // TODO: Add an LRU cache to cut down on disc reads. -func (c *ChannelGraph) IsClosedScid(scid lnwire.ShortChannelID) (bool, error) { +func (c *KVStore) IsClosedScid(scid lnwire.ShortChannelID) (bool, error) { var isClosed bool err := kvdb.View(c.db, func(tx kvdb.RTx) error { closedScids := tx.ReadBucket(closedScidBucket) @@ -3895,7 +3895,7 @@ func (c *ChannelGraph) IsClosedScid(scid lnwire.ShortChannelID) (bool, error) { // instance which can be used to perform queries against the channel graph. If // the graph cache is not enabled, then the call-back will be provided with // access to the graph via a consistent read-only transaction. -func (c *ChannelGraph) GraphSession(cb func(graph NodeTraverser) error) error { +func (c *KVStore) GraphSession(cb func(graph NodeTraverser) error) error { if c.graphCache != nil { return cb(&nodeTraverserSession{db: c}) } @@ -3913,7 +3913,7 @@ func (c *ChannelGraph) GraphSession(cb func(graph NodeTraverser) error) error { // where the graph Cache has not been enabled. type nodeTraverserSession struct { tx kvdb.RTx - db *ChannelGraph + db *KVStore } // ForEachNodeDirectedChannel calls the callback for every channel of the given @@ -4746,10 +4746,10 @@ func deserializeChanEdgePolicyRaw(r io.Reader) (*models.ChannelEdgePolicy, } // chanGraphNodeTx is an implementation of the NodeRTx interface backed by the -// ChannelGraph and a kvdb.RTx. +// KVStore and a kvdb.RTx. type chanGraphNodeTx struct { tx kvdb.RTx - db *ChannelGraph + db *KVStore node *models.LightningNode } @@ -4757,7 +4757,7 @@ type chanGraphNodeTx struct { // interface. var _ NodeRTx = (*chanGraphNodeTx)(nil) -func newChanGraphNodeTx(tx kvdb.RTx, db *ChannelGraph, +func newChanGraphNodeTx(tx kvdb.RTx, db *KVStore, node *models.LightningNode) *chanGraphNodeTx { return &chanGraphNodeTx{ @@ -4804,7 +4804,7 @@ func (c *chanGraphNodeTx) ForEachChannel(f func(*models.ChannelEdgeInfo, ) } -// MakeTestGraph creates a new instance of the ChannelGraph for testing +// MakeTestGraph creates a new instance of the KVStore for testing // purposes. func MakeTestGraph(t testing.TB, modifiers ...OptionModifier) (*ChannelGraph, error) { @@ -4814,7 +4814,7 @@ func MakeTestGraph(t testing.TB, modifiers ...OptionModifier) (*ChannelGraph, modifier(opts) } - // Next, create channelgraph for the first time. + // Next, create KVStore for the first time. backend, backendCleanup, err := kvdb.GetTestBackend(t.TempDir(), "cgr") if err != nil { backendCleanup()