Merge pull request #9544 from lightningnetwork/elle-graphCacheBase

graph: move graph cache out of CRUD layer
This commit is contained in:
Oliver Gugger 2025-03-25 07:43:06 -06:00 committed by GitHub
commit af2f11edff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 5435 additions and 4788 deletions

View File

@ -6,7 +6,7 @@ import (
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/graph"
graphdb "github.com/lightningnetwork/lnd/graph/db"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
)
@ -36,7 +36,7 @@ type ManagerCfg struct {
// SubscribeTopology is used to get a subscription for topology changes
// on the network.
SubscribeTopology func() (*graph.TopologyClient, error)
SubscribeTopology func() (*graphdb.TopologyClient, error)
}
// Manager is struct that manages an autopilot agent, making it possible to

View File

@ -46,9 +46,14 @@ func newDiskChanGraph(t *testing.T) (testGraph, error) {
})
require.NoError(t, err)
graphDB, err := graphdb.NewChannelGraph(backend)
graphDB, err := graphdb.NewChannelGraph(&graphdb.Config{KVDB: backend})
require.NoError(t, err)
require.NoError(t, graphDB.Start())
t.Cleanup(func() {
require.NoError(t, graphDB.Stop())
})
return &testDBGraph{
db: graphDB,
databaseChannelGraph: databaseChannelGraph{

View File

@ -1026,26 +1026,30 @@ func (d *DefaultDatabaseBuilder) BuildDatabase(
"instances")
}
graphDBOptions := []graphdb.OptionModifier{
graphDBOptions := []graphdb.KVStoreOptionModifier{
graphdb.WithRejectCacheSize(cfg.Caches.RejectCacheSize),
graphdb.WithChannelCacheSize(cfg.Caches.ChannelCacheSize),
graphdb.WithBatchCommitInterval(cfg.DB.BatchCommitInterval),
}
chanGraphOpts := []graphdb.ChanGraphOption{
graphdb.WithUseGraphCache(!cfg.DB.NoGraphCache),
}
// We want to pre-allocate the channel graph cache according to what we
// expect for mainnet to speed up memory allocation.
if cfg.ActiveNetParams.Name == chaincfg.MainNetParams.Name {
graphDBOptions = append(
graphDBOptions, graphdb.WithPreAllocCacheNumNodes(
chanGraphOpts = append(
chanGraphOpts, graphdb.WithPreAllocCacheNumNodes(
graphdb.DefaultPreAllocCacheNumNodes,
),
)
}
dbs.GraphDB, err = graphdb.NewChannelGraph(
databaseBackends.GraphDB, graphDBOptions...,
)
dbs.GraphDB, err = graphdb.NewChannelGraph(&graphdb.Config{
KVDB: databaseBackends.GraphDB,
KVStoreOpts: graphDBOptions,
}, chanGraphOpts...)
if err != nil {
cleanUp()

View File

@ -339,6 +339,11 @@ The underlying functionality between those two options remain the same.
- [Abstract autopilot access](https://github.com/lightningnetwork/lnd/pull/9480)
- [Abstract invoicerpc server access](https://github.com/lightningnetwork/lnd/pull/9516)
- [Refactor to hide DB transactions](https://github.com/lightningnetwork/lnd/pull/9513)
- Move the [graph cache out of the graph
CRUD](https://github.com/lightningnetwork/lnd/pull/9544) layer.
- Move [topology
subscription](https://github.com/lightningnetwork/lnd/pull/9577) and
notification handling from the graph.Builder to the ChannelGraph.
* [Golang was updated to
`v1.22.11`](https://github.com/lightningnetwork/lnd/pull/9462).

View File

@ -109,8 +109,7 @@ type Builder struct {
started atomic.Bool
stopped atomic.Bool
ntfnClientCounter atomic.Uint64
bestHeight atomic.Uint32
bestHeight atomic.Uint32
cfg *Config
@ -123,22 +122,6 @@ type Builder struct {
// of our currently known best chain are sent over.
staleBlocks <-chan *chainview.FilteredBlock
// topologyUpdates is a channel that carries new topology updates
// messages from outside the Builder to be processed by the
// networkHandler.
topologyUpdates chan any
// topologyClients maps a client's unique notification ID to a
// topologyClient client that contains its notification dispatch
// channel.
topologyClients *lnutils.SyncMap[uint64, *topologyClient]
// ntfnClientUpdates is a channel that's used to send new updates to
// topology notification clients to the Builder. Updates either
// add a new notification client, or cancel notifications for an
// existing client.
ntfnClientUpdates chan *topologyClientUpdate
// channelEdgeMtx is a mutex we use to make sure we process only one
// ChannelEdgePolicy at a time for a given channelID, to ensure
// consistency between the various database accesses.
@ -163,14 +146,11 @@ var _ ChannelGraphSource = (*Builder)(nil)
// NewBuilder constructs a new Builder.
func NewBuilder(cfg *Config) (*Builder, error) {
return &Builder{
cfg: cfg,
topologyUpdates: make(chan any),
topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{},
ntfnClientUpdates: make(chan *topologyClientUpdate),
channelEdgeMtx: multimutex.NewMutex[uint64](),
statTicker: ticker.New(defaultStatInterval),
stats: new(builderStats),
quit: make(chan struct{}),
cfg: cfg,
channelEdgeMtx: multimutex.NewMutex[uint64](),
statTicker: ticker.New(defaultStatInterval),
stats: new(builderStats),
quit: make(chan struct{}),
}, nil
}
@ -656,28 +636,6 @@ func (b *Builder) pruneZombieChans() error {
return nil
}
// handleTopologyUpdate is responsible for sending any topology changes
// notifications to registered clients.
//
// NOTE: must be run inside goroutine.
func (b *Builder) handleTopologyUpdate(update any) {
defer b.wg.Done()
topChange := &TopologyChange{}
err := addToTopologyChange(b.cfg.Graph, topChange, update)
if err != nil {
log.Errorf("unable to update topology change notification: %v",
err)
return
}
if topChange.isEmpty() {
return
}
b.notifyTopologyChange(topChange)
}
// networkHandler is the primary goroutine for the Builder. The roles of
// this goroutine include answering queries related to the state of the
// network, pruning the graph on new block notification, applying network
@ -701,16 +659,6 @@ func (b *Builder) networkHandler() {
}
select {
// A new fully validated topology update has just arrived.
// We'll notify any registered clients.
case update := <-b.topologyUpdates:
b.wg.Add(1)
go b.handleTopologyUpdate(update)
// TODO(roasbeef): remove all unconnected vertexes
// after N blocks pass with no corresponding
// announcements.
case chainUpdate, ok := <-b.staleBlocks:
// If the channel has been closed, then this indicates
// the daemon is shutting down, so we exit ourselves.
@ -783,31 +731,6 @@ func (b *Builder) networkHandler() {
" processed.", chainUpdate.Height)
}
// A new notification client update has arrived. We're either
// gaining a new client, or cancelling notifications for an
// existing client.
case ntfnUpdate := <-b.ntfnClientUpdates:
clientID := ntfnUpdate.clientID
if ntfnUpdate.cancel {
client, ok := b.topologyClients.LoadAndDelete(
clientID,
)
if ok {
close(client.exit)
client.wg.Wait()
close(client.ntfnChan)
}
continue
}
b.topologyClients.Store(clientID, &topologyClient{
ntfnChan: ntfnUpdate.ntfnChan,
exit: make(chan struct{}),
})
// The graph prune ticker has ticked, so we'll examine the
// state of the known graph to filter out any zombie channels
// for pruning.
@ -934,16 +857,6 @@ func (b *Builder) updateGraphWithClosedChannels(
log.Infof("Block %v (height=%v) closed %v channels", chainUpdate.Hash,
blockHeight, len(chansClosed))
if len(chansClosed) == 0 {
return err
}
// Notify all currently registered clients of the newly closed channels.
closeSummaries := createCloseSummaries(blockHeight, chansClosed...)
b.notifyTopologyChange(&TopologyChange{
ClosedChannels: closeSummaries,
})
return nil
}
@ -1067,12 +980,6 @@ func (b *Builder) AddNode(node *models.LightningNode,
return err
}
select {
case b.topologyUpdates <- node:
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
return nil
}
@ -1117,12 +1024,6 @@ func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo,
return err
}
select {
case b.topologyUpdates <- edge:
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
return nil
}
@ -1224,12 +1125,6 @@ func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy,
return err
}
select {
case b.topologyUpdates <- update:
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
return nil
}

File diff suppressed because it is too large Load Diff

View File

@ -155,62 +155,70 @@ func TestNodeInsertionAndDeletion(t *testing.T) {
}
// TestPartialNode checks that we can add and retrieve a LightningNode where
// where only the pubkey is known to the database.
// only the pubkey is known to the database.
func TestPartialNode(t *testing.T) {
t.Parallel()
graph, err := MakeTestGraph(t)
require.NoError(t, err, "unable to make test database")
// We want to be able to insert nodes into the graph that only has the
// PubKey set.
node := &models.LightningNode{
HaveNodeAnnouncement: false,
PubKeyBytes: testPub,
}
// To insert a partial node, we need to add a channel edge that has
// node keys for nodes we are not yet aware
var node1, node2 models.LightningNode
copy(node1.PubKeyBytes[:], pubKey1Bytes)
copy(node2.PubKeyBytes[:], pubKey2Bytes)
if err := graph.AddLightningNode(node); err != nil {
t.Fatalf("unable to add node: %v", err)
}
assertNodeInCache(t, graph, node, nil)
// Create an edge attached to these nodes and add it to the graph.
edgeInfo, _ := createEdge(140, 0, 0, 0, &node1, &node2)
require.NoError(t, graph.AddChannelEdge(&edgeInfo))
// Next, fetch the node from the database to ensure everything was
// Both of the nodes should now be in both the graph (as partial/shell)
// nodes _and_ the cache should also have an awareness of both nodes.
assertNodeInCache(t, graph, &node1, nil)
assertNodeInCache(t, graph, &node2, nil)
// Next, fetch the node2 from the database to ensure everything was
// serialized properly.
dbNode, err := graph.FetchLightningNode(testPub)
require.NoError(t, err, "unable to locate node")
dbNode1, err := graph.FetchLightningNode(pubKey1)
require.NoError(t, err)
dbNode2, err := graph.FetchLightningNode(pubKey2)
require.NoError(t, err)
_, exists, err := graph.HasLightningNode(dbNode.PubKeyBytes)
if err != nil {
t.Fatalf("unable to query for node: %v", err)
} else if !exists {
t.Fatalf("node should be found but wasn't")
}
_, exists, err := graph.HasLightningNode(dbNode1.PubKeyBytes)
require.NoError(t, err)
require.True(t, exists)
// The two nodes should match exactly! (with default values for
// LastUpdate and db set to satisfy compareNodes())
node = &models.LightningNode{
expectedNode1 := &models.LightningNode{
HaveNodeAnnouncement: false,
LastUpdate: time.Unix(0, 0),
PubKeyBytes: testPub,
PubKeyBytes: pubKey1,
}
require.NoError(t, compareNodes(dbNode1, expectedNode1))
if err := compareNodes(node, dbNode); err != nil {
t.Fatalf("nodes don't match: %v", err)
_, exists, err = graph.HasLightningNode(dbNode2.PubKeyBytes)
require.NoError(t, err)
require.True(t, exists)
// The two nodes should match exactly! (with default values for
// LastUpdate and db set to satisfy compareNodes())
expectedNode2 := &models.LightningNode{
HaveNodeAnnouncement: false,
LastUpdate: time.Unix(0, 0),
PubKeyBytes: pubKey2,
}
require.NoError(t, compareNodes(dbNode2, expectedNode2))
// Next, delete the node from the graph, this should purge all data
// related to the node.
if err := graph.DeleteLightningNode(testPub); err != nil {
t.Fatalf("unable to delete node: %v", err)
}
require.NoError(t, graph.DeleteLightningNode(pubKey1))
assertNodeNotInCache(t, graph, testPub)
// Finally, attempt to fetch the node again. This should fail as the
// node should have been deleted from the database.
_, err = graph.FetchLightningNode(testPub)
if err != ErrGraphNodeNotFound {
t.Fatalf("fetch after delete should fail!")
}
require.ErrorIs(t, err, ErrGraphNodeNotFound)
}
func TestAliasLookup(t *testing.T) {
@ -964,6 +972,23 @@ func randEdgePolicy(chanID uint64, db kvdb.Backend) *models.ChannelEdgePolicy {
return newEdgePolicy(chanID, db, update)
}
func copyEdgePolicy(p *models.ChannelEdgePolicy) *models.ChannelEdgePolicy {
return &models.ChannelEdgePolicy{
SigBytes: p.SigBytes,
ChannelID: p.ChannelID,
LastUpdate: p.LastUpdate,
MessageFlags: p.MessageFlags,
ChannelFlags: p.ChannelFlags,
TimeLockDelta: p.TimeLockDelta,
MinHTLC: p.MinHTLC,
MaxHTLC: p.MaxHTLC,
FeeBaseMSat: p.FeeBaseMSat,
FeeProportionalMillionths: p.FeeProportionalMillionths,
ToNode: p.ToNode,
ExtraOpaqueData: p.ExtraOpaqueData,
}
}
func newEdgePolicy(chanID uint64, db kvdb.Backend,
updateTime int64) *models.ChannelEdgePolicy {
@ -1919,6 +1944,76 @@ func TestNodeUpdatesInHorizon(t *testing.T) {
}
}
// TestFilterKnownChanIDsZombieRevival tests that if a ChannelUpdateInfo is
// passed to FilterKnownChanIDs that contains a channel that we have marked as
// a zombie, then we will mark it as live again if the new ChannelUpdate has
// timestamps that would make the channel be considered live again.
//
// NOTE: this tests focuses on zombie revival. The main logic of
// FilterKnownChanIDs is tested in TestFilterKnownChanIDs.
func TestFilterKnownChanIDsZombieRevival(t *testing.T) {
t.Parallel()
graph, err := MakeTestGraph(t)
require.NoError(t, err)
var (
scid1 = lnwire.ShortChannelID{BlockHeight: 1}
scid2 = lnwire.ShortChannelID{BlockHeight: 2}
scid3 = lnwire.ShortChannelID{BlockHeight: 3}
)
isZombie := func(scid lnwire.ShortChannelID) bool {
zombie, _, _ := graph.IsZombieEdge(scid.ToUint64())
return zombie
}
// Mark channel 1 and 2 as zombies.
err = graph.MarkEdgeZombie(scid1.ToUint64(), [33]byte{}, [33]byte{})
require.NoError(t, err)
err = graph.MarkEdgeZombie(scid2.ToUint64(), [33]byte{}, [33]byte{})
require.NoError(t, err)
require.True(t, isZombie(scid1))
require.True(t, isZombie(scid2))
require.False(t, isZombie(scid3))
// Call FilterKnownChanIDs with an isStillZombie call-back that would
// result in the current zombies still be considered as zombies.
_, err = graph.FilterKnownChanIDs([]ChannelUpdateInfo{
{ShortChannelID: scid1},
{ShortChannelID: scid2},
{ShortChannelID: scid3},
}, func(_ time.Time, _ time.Time) bool {
return true
})
require.NoError(t, err)
require.True(t, isZombie(scid1))
require.True(t, isZombie(scid2))
require.False(t, isZombie(scid3))
// Now call it again but this time with a isStillZombie call-back that
// would result in channel with SCID 2 no longer being considered a
// zombie.
_, err = graph.FilterKnownChanIDs([]ChannelUpdateInfo{
{ShortChannelID: scid1},
{
ShortChannelID: scid2,
Node1UpdateTimestamp: time.Unix(1000, 0),
},
{ShortChannelID: scid3},
}, func(t1 time.Time, _ time.Time) bool {
return !t1.Equal(time.Unix(1000, 0))
})
require.NoError(t, err)
// Show that SCID 2 has been marked as live.
require.True(t, isZombie(scid1))
require.False(t, isZombie(scid2))
require.False(t, isZombie(scid3))
}
// TestFilterKnownChanIDs tests that we're able to properly perform the set
// differences of an incoming set of channel ID's, and those that we already
// know of on disk.
@ -2859,6 +2954,7 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) {
if err := graph.UpdateEdgePolicy(edge1); err != nil {
t.Fatalf("unable to update edge: %v", err)
}
edge1 = copyEdgePolicy(edge1) // Avoid read/write race conditions.
edge2 := randEdgePolicy(chanID.ToUint64(), graph.db)
edge2.ChannelFlags = 1
@ -2867,6 +2963,7 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) {
if err := graph.UpdateEdgePolicy(edge2); err != nil {
t.Fatalf("unable to update edge: %v", err)
}
edge2 = copyEdgePolicy(edge2) // Avoid read/write race conditions.
// checkIndexTimestamps is a helper function that checks the edge update
// index only includes the given timestamps.
@ -3952,7 +4049,7 @@ func TestGraphCacheForEachNodeChannel(t *testing.T) {
getSingleChannel := func() *DirectedChannel {
var ch *DirectedChannel
err = graph.forEachNodeDirectedChannel(nil, node1.PubKeyBytes,
err = graph.ForEachNodeDirectedChannel(node1.PubKeyBytes,
func(c *DirectedChannel) error {
require.Nil(t, ch)
ch = c
@ -3974,6 +4071,7 @@ func TestGraphCacheForEachNodeChannel(t *testing.T) {
253, 217, 3, 8, 0, 0, 0, 10, 0, 0, 0, 20,
}
require.NoError(t, graph.UpdateEdgePolicy(edge1))
edge1 = copyEdgePolicy(edge1) // Avoid read/write race conditions.
directedChan := getSingleChannel()
require.NotNil(t, directedChan)
@ -4005,8 +4103,12 @@ func TestGraphLoading(t *testing.T) {
defer backend.Close()
defer backendCleanup()
graph, err := NewChannelGraph(backend)
graph, err := NewChannelGraph(&Config{KVDB: backend})
require.NoError(t, err)
require.NoError(t, graph.Start())
t.Cleanup(func() {
require.NoError(t, graph.Stop())
})
// Populate the graph with test data.
const numNodes = 100
@ -4015,8 +4117,12 @@ func TestGraphLoading(t *testing.T) {
// Recreate the graph. This should cause the graph cache to be
// populated.
graphReloaded, err := NewChannelGraph(backend)
graphReloaded, err := NewChannelGraph(&Config{KVDB: backend})
require.NoError(t, err)
require.NoError(t, graphReloaded.Start())
t.Cleanup(func() {
require.NoError(t, graphReloaded.Stop())
})
// Assert that the cache content is identical.
require.Equal(

4735
graph/db/kv_store.go Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,19 +1,54 @@
package graph
package graphdb
import (
"fmt"
"image/color"
"net"
"sync"
"sync/atomic"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/wire"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/graph/db/models"
"github.com/lightningnetwork/lnd/lnutils"
"github.com/lightningnetwork/lnd/lnwire"
)
// topologyManager holds all the fields required to manage the network topology
// subscriptions and notifications.
type topologyManager struct {
// ntfnClientCounter is an atomic counter that's used to assign unique
// notification client IDs to new clients.
ntfnClientCounter atomic.Uint64
// topologyUpdate is a channel that carries new topology updates
// messages from outside the ChannelGraph to be processed by the
// networkHandler.
topologyUpdate chan any
// topologyClients maps a client's unique notification ID to a
// topologyClient client that contains its notification dispatch
// channel.
topologyClients *lnutils.SyncMap[uint64, *topologyClient]
// ntfnClientUpdates is a channel that's used to send new updates to
// topology notification clients to the ChannelGraph. Updates either
// add a new notification client, or cancel notifications for an
// existing client.
ntfnClientUpdates chan *topologyClientUpdate
}
// newTopologyManager creates a new instance of the topologyManager.
func newTopologyManager() *topologyManager {
return &topologyManager{
topologyUpdate: make(chan any),
topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{},
ntfnClientUpdates: make(chan *topologyClientUpdate),
}
}
// TopologyClient represents an intent to receive notifications from the
// channel router regarding changes to the topology of the channel graph. The
// TopologyChanges channel will be sent upon with new updates to the channel
@ -54,16 +89,16 @@ type topologyClientUpdate struct {
// topology occurs. Changes that will be sent at notifications include: new
// nodes appearing, node updating their attributes, new channels, channels
// closing, and updates in the routing policies of a channel's directed edges.
func (b *Builder) SubscribeTopology() (*TopologyClient, error) {
func (c *ChannelGraph) SubscribeTopology() (*TopologyClient, error) {
// If the router is not yet started, return an error to avoid a
// deadlock waiting for it to handle the subscription request.
if !b.started.Load() {
if !c.started.Load() {
return nil, fmt.Errorf("router not started")
}
// We'll first atomically obtain the next ID for this client from the
// incrementing client ID counter.
clientID := b.ntfnClientCounter.Add(1)
clientID := c.ntfnClientCounter.Add(1)
log.Debugf("New graph topology client subscription, client %v",
clientID)
@ -71,12 +106,12 @@ func (b *Builder) SubscribeTopology() (*TopologyClient, error) {
ntfnChan := make(chan *TopologyChange, 10)
select {
case b.ntfnClientUpdates <- &topologyClientUpdate{
case c.ntfnClientUpdates <- &topologyClientUpdate{
cancel: false,
clientID: clientID,
ntfnChan: ntfnChan,
}:
case <-b.quit:
case <-c.quit:
return nil, errors.New("ChannelRouter shutting down")
}
@ -84,11 +119,11 @@ func (b *Builder) SubscribeTopology() (*TopologyClient, error) {
TopologyChanges: ntfnChan,
Cancel: func() {
select {
case b.ntfnClientUpdates <- &topologyClientUpdate{
case c.ntfnClientUpdates <- &topologyClientUpdate{
cancel: true,
clientID: clientID,
}:
case <-b.quit:
case <-c.quit:
return
}
},
@ -114,7 +149,7 @@ type topologyClient struct {
// notifyTopologyChange notifies all registered clients of a new change in
// graph topology in a non-blocking.
func (b *Builder) notifyTopologyChange(topologyDiff *TopologyChange) {
func (c *ChannelGraph) notifyTopologyChange(topologyDiff *TopologyChange) {
// notifyClient is a helper closure that will send topology updates to
// the given client.
notifyClient := func(clientID uint64, client *topologyClient) bool {
@ -127,23 +162,22 @@ func (b *Builder) notifyTopologyChange(topologyDiff *TopologyChange) {
len(topologyDiff.ChannelEdgeUpdates),
len(topologyDiff.ClosedChannels))
go func(c *topologyClient) {
defer c.wg.Done()
go func(t *topologyClient) {
defer t.wg.Done()
select {
// In this case we'll try to send the notification
// directly to the upstream client consumer.
case c.ntfnChan <- topologyDiff:
case t.ntfnChan <- topologyDiff:
// If the client cancels the notifications, then we'll
// exit early.
case <-c.exit:
case <-t.exit:
// Similarly, if the ChannelRouter itself exists early,
// then we'll also exit ourselves.
case <-b.quit:
case <-c.quit:
}
}(client)
@ -154,7 +188,29 @@ func (b *Builder) notifyTopologyChange(topologyDiff *TopologyChange) {
// Range over the set of active clients, and attempt to send the
// topology updates.
b.topologyClients.Range(notifyClient)
c.topologyClients.Range(notifyClient)
}
// handleTopologyUpdate is responsible for sending any topology changes
// notifications to registered clients.
//
// NOTE: must be run inside goroutine.
func (c *ChannelGraph) handleTopologyUpdate(update any) {
defer c.wg.Done()
topChange := &TopologyChange{}
err := c.addToTopologyChange(topChange, update)
if err != nil {
log.Errorf("unable to update topology change notification: %v",
err)
return
}
if topChange.isEmpty() {
return
}
c.notifyTopologyChange(topChange)
}
// TopologyChange represents a new set of modifications to the channel graph.
@ -310,8 +366,8 @@ type ChannelEdgeUpdate struct {
// constitutes. This function will also fetch any required auxiliary
// information required to create the topology change update from the graph
// database.
func addToTopologyChange(graph DB, update *TopologyChange,
msg interface{}) error {
func (c *ChannelGraph) addToTopologyChange(update *TopologyChange,
msg any) error {
switch m := msg.(type) {
@ -345,7 +401,7 @@ func addToTopologyChange(graph DB, update *TopologyChange,
// We'll need to fetch the edge's information from the database
// in order to get the information concerning which nodes are
// being connected.
edgeInfo, _, _, err := graph.FetchChannelEdgesByID(m.ChannelID)
edgeInfo, _, _, err := c.FetchChannelEdgesByID(m.ChannelID)
if err != nil {
return errors.Errorf("unable fetch channel edge: %v",
err)

View File

@ -20,8 +20,49 @@ const (
DefaultPreAllocCacheNumNodes = 15000
)
// Options holds parameters for tuning and customizing a graph.DB.
type Options struct {
// chanGraphOptions holds parameters for tuning and customizing the
// ChannelGraph.
type chanGraphOptions struct {
// useGraphCache denotes whether the in-memory graph cache should be
// used or a fallback version that uses the underlying database for
// path finding.
useGraphCache bool
// preAllocCacheNumNodes is the number of nodes we expect to be in the
// graph cache, so we can pre-allocate the map accordingly.
preAllocCacheNumNodes int
}
// defaultChanGraphOptions returns a new chanGraphOptions instance populated
// with default values.
func defaultChanGraphOptions() *chanGraphOptions {
return &chanGraphOptions{
useGraphCache: true,
preAllocCacheNumNodes: DefaultPreAllocCacheNumNodes,
}
}
// ChanGraphOption describes the signature of a functional option that can be
// used to customize a ChannelGraph instance.
type ChanGraphOption func(*chanGraphOptions)
// WithUseGraphCache sets whether the in-memory graph cache should be used.
func WithUseGraphCache(use bool) ChanGraphOption {
return func(o *chanGraphOptions) {
o.useGraphCache = use
}
}
// WithPreAllocCacheNumNodes sets the number of nodes we expect to be in the
// graph cache, so we can pre-allocate the map accordingly.
func WithPreAllocCacheNumNodes(n int) ChanGraphOption {
return func(o *chanGraphOptions) {
o.preAllocCacheNumNodes = n
}
}
// KVStoreOptions holds parameters for tuning and customizing a graph.DB.
type KVStoreOptions struct {
// RejectCacheSize is the maximum number of rejectCacheEntries to hold
// in the rejection cache.
RejectCacheSize int
@ -34,67 +75,43 @@ type Options struct {
// wait before attempting to commit a pending set of updates.
BatchCommitInterval time.Duration
// PreAllocCacheNumNodes is the number of nodes we expect to be in the
// graph cache, so we can pre-allocate the map accordingly.
PreAllocCacheNumNodes int
// UseGraphCache denotes whether the in-memory graph cache should be
// used or a fallback version that uses the underlying database for
// path finding.
UseGraphCache bool
// NoMigration specifies that underlying backend was opened in read-only
// mode and migrations shouldn't be performed. This can be useful for
// applications that use the channeldb package as a library.
NoMigration bool
}
// DefaultOptions returns an Options populated with default values.
func DefaultOptions() *Options {
return &Options{
RejectCacheSize: DefaultRejectCacheSize,
ChannelCacheSize: DefaultChannelCacheSize,
PreAllocCacheNumNodes: DefaultPreAllocCacheNumNodes,
UseGraphCache: true,
NoMigration: false,
// DefaultOptions returns a KVStoreOptions populated with default values.
func DefaultOptions() *KVStoreOptions {
return &KVStoreOptions{
RejectCacheSize: DefaultRejectCacheSize,
ChannelCacheSize: DefaultChannelCacheSize,
NoMigration: false,
}
}
// OptionModifier is a function signature for modifying the default Options.
type OptionModifier func(*Options)
// KVStoreOptionModifier is a function signature for modifying the default
// KVStoreOptions.
type KVStoreOptionModifier func(*KVStoreOptions)
// WithRejectCacheSize sets the RejectCacheSize to n.
func WithRejectCacheSize(n int) OptionModifier {
return func(o *Options) {
func WithRejectCacheSize(n int) KVStoreOptionModifier {
return func(o *KVStoreOptions) {
o.RejectCacheSize = n
}
}
// WithChannelCacheSize sets the ChannelCacheSize to n.
func WithChannelCacheSize(n int) OptionModifier {
return func(o *Options) {
func WithChannelCacheSize(n int) KVStoreOptionModifier {
return func(o *KVStoreOptions) {
o.ChannelCacheSize = n
}
}
// WithPreAllocCacheNumNodes sets the PreAllocCacheNumNodes to n.
func WithPreAllocCacheNumNodes(n int) OptionModifier {
return func(o *Options) {
o.PreAllocCacheNumNodes = n
}
}
// WithBatchCommitInterval sets the batch commit interval for the interval batch
// schedulers.
func WithBatchCommitInterval(interval time.Duration) OptionModifier {
return func(o *Options) {
func WithBatchCommitInterval(interval time.Duration) KVStoreOptionModifier {
return func(o *KVStoreOptions) {
o.BatchCommitInterval = interval
}
}
// WithUseGraphCache sets the UseGraphCache option to the given value.
func WithUseGraphCache(use bool) OptionModifier {
return func(o *Options) {
o.UseGraphCache = use
}
}

View File

@ -469,7 +469,7 @@ func TestEdgeUpdateNotification(t *testing.T) {
// With the channel edge now in place, we'll subscribe for topology
// notifications.
ntfnClient, err := ctx.builder.SubscribeTopology()
ntfnClient, err := ctx.graph.SubscribeTopology()
require.NoError(t, err, "unable to subscribe for channel notifications")
// Create random policy edges that are stemmed to the channel id
@ -489,7 +489,8 @@ func TestEdgeUpdateNotification(t *testing.T) {
t.Fatalf("unable to add edge update: %v", err)
}
assertEdgeCorrect := func(t *testing.T, edgeUpdate *ChannelEdgeUpdate,
assertEdgeCorrect := func(t *testing.T,
edgeUpdate *graphdb.ChannelEdgeUpdate,
edgeAnn *models.ChannelEdgePolicy) {
if edgeUpdate.ChanID != edgeAnn.ChannelID {
@ -659,7 +660,7 @@ func TestNodeUpdateNotification(t *testing.T) {
}
// Create a new client to receive notifications.
ntfnClient, err := ctx.builder.SubscribeTopology()
ntfnClient, err := ctx.graph.SubscribeTopology()
require.NoError(t, err, "unable to subscribe for channel notifications")
// Change network topology by adding the updated info for the two nodes
@ -672,7 +673,7 @@ func TestNodeUpdateNotification(t *testing.T) {
}
assertNodeNtfnCorrect := func(t *testing.T, ann *models.LightningNode,
nodeUpdate *NetworkNodeUpdate) {
nodeUpdate *graphdb.NetworkNodeUpdate) {
nodeKey, _ := ann.PubKey()
@ -699,9 +700,10 @@ func TestNodeUpdateNotification(t *testing.T) {
t.Fatalf("node alias doesn't match: expected %v, got %v",
ann.Alias, nodeUpdate.Alias)
}
if nodeUpdate.Color != EncodeHexColor(ann.Color) {
t.Fatalf("node color doesn't match: expected %v, got %v",
EncodeHexColor(ann.Color), nodeUpdate.Color)
if nodeUpdate.Color != graphdb.EncodeHexColor(ann.Color) {
t.Fatalf("node color doesn't match: expected %v, "+
"got %v", graphdb.EncodeHexColor(ann.Color),
nodeUpdate.Color)
}
}
@ -793,7 +795,7 @@ func TestNotificationCancellation(t *testing.T) {
ctx := createTestCtxSingleNode(t, startingBlockHeight)
// Create a new client to receive notifications.
ntfnClient, err := ctx.builder.SubscribeTopology()
ntfnClient, err := ctx.graph.SubscribeTopology()
require.NoError(t, err, "unable to subscribe for channel notifications")
// We'll create the utxo for a new channel.
@ -919,7 +921,7 @@ func TestChannelCloseNotification(t *testing.T) {
// With the channel edge now in place, we'll subscribe for topology
// notifications.
ntfnClient, err := ctx.builder.SubscribeTopology()
ntfnClient, err := ctx.graph.SubscribeTopology()
require.NoError(t, err, "unable to subscribe for channel notifications")
// Next, we'll simulate the closure of our channel by generating a new
@ -1002,7 +1004,9 @@ func TestEncodeHexColor(t *testing.T) {
}
for _, tc := range colorTestCases {
encoded := EncodeHexColor(color.RGBA{tc.R, tc.G, tc.B, 0})
encoded := graphdb.EncodeHexColor(
color.RGBA{tc.R, tc.G, tc.B, 0},
)
if (encoded == tc.encoded) != tc.isValid {
t.Fatalf("incorrect color encoding, "+
"want: %v, got: %v", tc.encoded, encoded)
@ -1094,11 +1098,16 @@ func makeTestGraph(t *testing.T, useCache bool) (*graphdb.ChannelGraph,
t.Cleanup(backendCleanup)
graph, err := graphdb.NewChannelGraph(
backend, graphdb.WithUseGraphCache(useCache),
&graphdb.Config{KVDB: backend},
graphdb.WithUseGraphCache(useCache),
)
if err != nil {
return nil, nil, err
}
require.NoError(t, graph.Start())
t.Cleanup(func() {
require.NoError(t, graph.Stop())
})
return graph, backend, nil
}

View File

@ -341,10 +341,10 @@ func testUpdateChannelPolicy(ht *lntest.HarnessTest) {
// but not the second, as she only allows two updates per day and a day
// has yet to elapse from the previous update.
// assertAliceAndBob is a helper closure which updates Alice's policy
// and asserts that both Alice and Bob have heard and updated the
// updateAndAssertAliceAndBob is a helper closure which updates Alice's
// policy and asserts that both Alice and Bob have heard and updated the
// policy in their graph.
assertAliceAndBob := func(req *lnrpc.PolicyUpdateRequest,
updateAndAssertAliceAndBob := func(req *lnrpc.PolicyUpdateRequest,
expectedPolicy *lnrpc.RoutingPolicy) {
alice.RPC.UpdateChannelPolicy(req)
@ -384,7 +384,7 @@ func testUpdateChannelPolicy(ht *lntest.HarnessTest) {
expectedPolicy.FeeBaseMsat = baseFee1
req.BaseFeeMsat = baseFee1
req.InboundFee = nil
assertAliceAndBob(req, expectedPolicy)
updateAndAssertAliceAndBob(req, expectedPolicy)
// Check that Carol has both heard the policy and updated it in her
// graph.
@ -407,7 +407,7 @@ func testUpdateChannelPolicy(ht *lntest.HarnessTest) {
baseFee2 := baseFee1 * 2
expectedPolicy.FeeBaseMsat = baseFee2
req.BaseFeeMsat = baseFee2
assertAliceAndBob(req, expectedPolicy)
updateAndAssertAliceAndBob(req, expectedPolicy)
// Since Carol didn't receive the last update, she still has Alice's
// old policy. We validate this by checking the base fee is the older

View File

@ -615,8 +615,14 @@ func createTestPeer(t *testing.T) *peerTestCtx {
})
require.NoError(t, err)
dbAliceGraph, err := graphdb.NewChannelGraph(graphBackend)
dbAliceGraph, err := graphdb.NewChannelGraph(&graphdb.Config{
KVDB: graphBackend,
})
require.NoError(t, err)
require.NoError(t, dbAliceGraph.Start())
t.Cleanup(func() {
require.NoError(t, dbAliceGraph.Stop())
})
dbAliceChannel := channeldb.OpenForTesting(t, dbPath)

View File

@ -295,6 +295,6 @@ func initAutoPilot(svr *server, cfg *lncfg.AutoPilot,
}, nil
},
SubscribeTransactions: svr.cc.Wallet.SubscribeTransactions,
SubscribeTopology: svr.graphBuilder.SubscribeTopology,
SubscribeTopology: svr.graphDB.SubscribeTopology,
}, nil
}

View File

@ -167,11 +167,16 @@ func makeTestGraph(t *testing.T, useCache bool) (*graphdb.ChannelGraph,
t.Cleanup(backendCleanup)
graph, err := graphdb.NewChannelGraph(
backend, graphdb.WithUseGraphCache(useCache),
&graphdb.Config{KVDB: backend},
graphdb.WithUseGraphCache(useCache),
)
if err != nil {
return nil, nil, err
}
require.NoError(t, graph.Start())
t.Cleanup(func() {
require.NoError(t, graph.Stop())
})
return graph, backend, nil
}

View File

@ -48,7 +48,6 @@ import (
"github.com/lightningnetwork/lnd/feature"
"github.com/lightningnetwork/lnd/fn/v2"
"github.com/lightningnetwork/lnd/funding"
"github.com/lightningnetwork/lnd/graph"
graphdb "github.com/lightningnetwork/lnd/graph/db"
"github.com/lightningnetwork/lnd/graph/db/models"
"github.com/lightningnetwork/lnd/htlcswitch"
@ -3347,7 +3346,7 @@ func (r *rpcServer) GetInfo(_ context.Context,
// TODO(roasbeef): add synced height n stuff
isTestNet := chainreg.IsTestnet(&r.cfg.ActiveNetParams)
nodeColor := graph.EncodeHexColor(nodeAnn.RGBColor)
nodeColor := graphdb.EncodeHexColor(nodeAnn.RGBColor)
version := build.Version() + " commit=" + build.Commit
return &lnrpc.GetInfoResponse{
@ -6960,7 +6959,7 @@ func marshalNode(node *models.LightningNode) *lnrpc.LightningNode {
PubKey: hex.EncodeToString(node.PubKeyBytes[:]),
Addresses: nodeAddrs,
Alias: node.Alias,
Color: graph.EncodeHexColor(node.Color),
Color: graphdb.EncodeHexColor(node.Color),
Features: features,
CustomRecords: customRecords,
}
@ -7158,7 +7157,7 @@ func (r *rpcServer) SubscribeChannelGraph(req *lnrpc.GraphTopologySubscription,
// First, we start by subscribing to a new intent to receive
// notifications from the channel router.
client, err := r.server.graphBuilder.SubscribeTopology()
client, err := r.server.graphDB.SubscribeTopology()
if err != nil {
return err
}
@ -7211,7 +7210,7 @@ func (r *rpcServer) SubscribeChannelGraph(req *lnrpc.GraphTopologySubscription,
// returned by the router to the form of notifications expected by the current
// gRPC service.
func marshallTopologyChange(
topChange *graph.TopologyChange) *lnrpc.GraphTopologyUpdate {
topChange *graphdb.TopologyChange) *lnrpc.GraphTopologyUpdate {
// encodeKey is a simple helper function that converts a live public
// key into a hex-encoded version of the compressed serialization for

View File

@ -417,7 +417,7 @@ type server struct {
// updatePersistentPeerAddrs subscribes to topology changes and stores
// advertised addresses for any NodeAnnouncements from our persisted peers.
func (s *server) updatePersistentPeerAddrs() error {
graphSub, err := s.graphBuilder.SubscribeTopology()
graphSub, err := s.graphDB.SubscribeTopology()
if err != nil {
return err
}
@ -2379,6 +2379,12 @@ func (s *server) Start() error {
return
}
cleanup = cleanup.add(s.graphDB.Stop)
if err := s.graphDB.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.graphBuilder.Stop)
if err := s.graphBuilder.Start(); err != nil {
startErr = err
@ -2691,6 +2697,9 @@ func (s *server) Stop() error {
if err := s.graphBuilder.Stop(); err != nil {
srvrLog.Warnf("failed to stop graphBuilder %v", err)
}
if err := s.graphDB.Stop(); err != nil {
srvrLog.Warnf("failed to stop graphDB %v", err)
}
if err := s.chainArb.Stop(); err != nil {
srvrLog.Warnf("failed to stop chainArb: %v", err)
}