multi: add Start and Stop methods for ChannelGraph

We do this in preparation for moving channel cache population logic out
of the constructor and into the Start method. We also will later on
(when topology subscription is moved to the ChannelGraph), have a
goroutine that will need to be kicked off and stopped.
This commit is contained in:
Elle Mouton 2025-02-19 07:51:54 -03:00
parent bb3839e422
commit b497c4694e
No known key found for this signature in database
GPG Key ID: D7D916376026F177
8 changed files with 76 additions and 1 deletions

View File

@ -49,6 +49,11 @@ func newDiskChanGraph(t *testing.T) (testGraph, error) {
graphDB, err := graphdb.NewChannelGraph(&graphdb.Config{KVDB: backend})
require.NoError(t, err)
require.NoError(t, graphDB.Start())
t.Cleanup(func() {
require.NoError(t, graphDB.Stop())
})
return &testDBGraph{
db: graphDB,
databaseChannelGraph: databaseChannelGraph{

View File

@ -3,6 +3,7 @@ package graphdb
import (
"errors"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/chaincfg/chainhash"
@ -32,13 +33,20 @@ type Config struct {
// KVStore. Upcoming commits will move the graph cache out of the KVStore and
// into this layer so that the KVStore is only responsible for CRUD operations.
type ChannelGraph struct {
started atomic.Bool
stopped atomic.Bool
// cacheMu guards any writes to the graphCache. It should be held
// across the DB write call and the graphCache update to make the
// two updates as atomic as possible.
cacheMu sync.Mutex
cacheMu sync.Mutex
graphCache *GraphCache
*KVStore
quit chan struct{}
wg sync.WaitGroup
}
// NewChannelGraph creates a new ChannelGraph instance with the given backend.
@ -58,6 +66,7 @@ func NewChannelGraph(cfg *Config, options ...ChanGraphOption) (*ChannelGraph,
if !opts.useGraphCache {
return &ChannelGraph{
KVStore: store,
quit: make(chan struct{}),
}, nil
}
@ -96,9 +105,38 @@ func NewChannelGraph(cfg *Config, options ...ChanGraphOption) (*ChannelGraph,
return &ChannelGraph{
KVStore: store,
graphCache: graphCache,
quit: make(chan struct{}),
}, nil
}
// Start kicks off any goroutines required for the ChannelGraph to function.
// If the graph cache is enabled, then it will be populated with the contents of
// the database.
func (c *ChannelGraph) Start() error {
if !c.started.CompareAndSwap(false, true) {
return nil
}
log.Debugf("ChannelGraph starting")
defer log.Debug("ChannelGraph started")
return nil
}
// Stop signals any active goroutines for a graceful closure.
func (c *ChannelGraph) Stop() error {
if !c.stopped.CompareAndSwap(false, true) {
return nil
}
log.Debugf("ChannelGraph shutting down...")
defer log.Debug("ChannelGraph shutdown complete")
close(c.quit)
c.wg.Wait()
return nil
}
// ForEachNodeDirectedChannel iterates through all channels of a given node,
// executing the passed callback on the directed edge representing the channel
// and its incoming policy. If the callback returns an error, then the iteration

View File

@ -4077,6 +4077,10 @@ func TestGraphLoading(t *testing.T) {
graph, err := NewChannelGraph(&Config{KVDB: backend})
require.NoError(t, err)
require.NoError(t, graph.Start())
t.Cleanup(func() {
require.NoError(t, graph.Stop())
})
// Populate the graph with test data.
const numNodes = 100
@ -4087,6 +4091,10 @@ func TestGraphLoading(t *testing.T) {
// populated.
graphReloaded, err := NewChannelGraph(&Config{KVDB: backend})
require.NoError(t, err)
require.NoError(t, graphReloaded.Start())
t.Cleanup(func() {
require.NoError(t, graphReloaded.Stop())
})
// Assert that the cache content is identical.
require.Equal(

View File

@ -26,6 +26,7 @@ import (
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/stretchr/testify/require"
)
var (
@ -4722,10 +4723,12 @@ func MakeTestGraph(t testing.TB, modifiers ...KVStoreOptionModifier) (
return nil, err
}
require.NoError(t, graph.Start())
t.Cleanup(func() {
_ = backend.Close()
backendCleanup()
require.NoError(t, graph.Stop())
})
return graph, nil

View File

@ -1100,6 +1100,10 @@ func makeTestGraph(t *testing.T, useCache bool) (*graphdb.ChannelGraph,
if err != nil {
return nil, nil, err
}
require.NoError(t, graph.Start())
t.Cleanup(func() {
require.NoError(t, graph.Stop())
})
return graph, backend, nil
}

View File

@ -619,6 +619,10 @@ func createTestPeer(t *testing.T) *peerTestCtx {
KVDB: graphBackend,
})
require.NoError(t, err)
require.NoError(t, dbAliceGraph.Start())
t.Cleanup(func() {
require.NoError(t, dbAliceGraph.Stop())
})
dbAliceChannel := channeldb.OpenForTesting(t, dbPath)

View File

@ -173,6 +173,10 @@ func makeTestGraph(t *testing.T, useCache bool) (*graphdb.ChannelGraph,
if err != nil {
return nil, nil, err
}
require.NoError(t, graph.Start())
t.Cleanup(func() {
require.NoError(t, graph.Stop())
})
return graph, backend, nil
}

View File

@ -2377,6 +2377,12 @@ func (s *server) Start() error {
return
}
cleanup = cleanup.add(s.graphDB.Stop)
if err := s.graphDB.Start(); err != nil {
startErr = err
return
}
cleanup = cleanup.add(s.graphBuilder.Stop)
if err := s.graphBuilder.Start(); err != nil {
startErr = err
@ -2683,6 +2689,9 @@ func (s *server) Stop() error {
if err := s.graphBuilder.Stop(); err != nil {
srvrLog.Warnf("failed to stop graphBuilder %v", err)
}
if err := s.graphDB.Stop(); err != nil {
srvrLog.Warnf("failed to stop graphDB %v", err)
}
if err := s.chainArb.Stop(); err != nil {
srvrLog.Warnf("failed to stop chainArb: %v", err)
}