graph/db: move Topology client management to ChannelGraph

We plan to later on add an option for a remote graph source which will
be managed from the ChannelGraph. In such a set-up, a node would rely on
the remote graph source for graph updates instead of from gossip sync.
In this scenario, however, our topology subscription logic should still
notify clients of all updates and so it makes more sense to have the
logic as part of the ChannelGraph so that we can send updates we receive
from the remote graph.
This commit is contained in:
Elle Mouton
2025-02-19 08:48:30 -03:00
parent 4131b3fc7e
commit 2221aaa889
9 changed files with 209 additions and 158 deletions

View File

@@ -12,10 +12,15 @@ import (
"github.com/lightningnetwork/lnd/batch"
"github.com/lightningnetwork/lnd/graph/db/models"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnutils"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
)
// ErrChanGraphShuttingDown indicates that the ChannelGraph has shutdown or is
// busy shutting down.
var ErrChanGraphShuttingDown = fmt.Errorf("ChannelGraph shutting down")
// Config is a struct that holds all the necessary dependencies for a
// ChannelGraph.
type Config struct {
@@ -46,6 +51,26 @@ type ChannelGraph struct {
*KVStore
// ntfnClientCounter is an atomic counter that's used to assign unique
// notification client IDs to new clients.
ntfnClientCounter atomic.Uint64
// topologyUpdate is a channel that carries new topology updates
// messages from outside the ChannelGraph to be processed by the
// networkHandler.
topologyUpdate chan any
// topologyClients maps a client's unique notification ID to a
// topologyClient client that contains its notification dispatch
// channel.
topologyClients *lnutils.SyncMap[uint64, *topologyClient]
// ntfnClientUpdates is a channel that's used to send new updates to
// topology notification clients to the ChannelGraph. Updates either
// add a new notification client, or cancel notifications for an
// existing client.
ntfnClientUpdates chan *topologyClientUpdate
quit chan struct{}
wg sync.WaitGroup
}
@@ -65,8 +90,11 @@ func NewChannelGraph(cfg *Config, options ...ChanGraphOption) (*ChannelGraph,
}
g := &ChannelGraph{
KVStore: store,
quit: make(chan struct{}),
KVStore: store,
topologyUpdate: make(chan any),
topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{},
ntfnClientUpdates: make(chan *topologyClientUpdate),
quit: make(chan struct{}),
}
// The graph cache can be turned off (e.g. for mobile users) for a
@@ -95,6 +123,9 @@ func (c *ChannelGraph) Start() error {
}
}
c.wg.Add(1)
go c.handleTopologySubscriptions()
return nil
}
@@ -113,6 +144,60 @@ func (c *ChannelGraph) Stop() error {
return nil
}
// handleTopologySubscriptions ensures that topology client subscriptions,
// subscription cancellations and topology notifications are handled
// synchronously.
//
// NOTE: this MUST be run in a goroutine.
func (c *ChannelGraph) handleTopologySubscriptions() {
defer c.wg.Done()
for {
select {
// A new fully validated topology update has just arrived.
// We'll notify any registered clients.
case update := <-c.topologyUpdate:
// TODO(elle): change topology handling to be handled
// synchronously so that we can guarantee the order of
// notification delivery.
c.wg.Add(1)
go c.handleTopologyUpdate(update)
// TODO(roasbeef): remove all unconnected vertexes
// after N blocks pass with no corresponding
// announcements.
// A new notification client update has arrived. We're either
// gaining a new client, or cancelling notifications for an
// existing client.
case ntfnUpdate := <-c.ntfnClientUpdates:
clientID := ntfnUpdate.clientID
if ntfnUpdate.cancel {
client, ok := c.topologyClients.LoadAndDelete(
clientID,
)
if ok {
close(client.exit)
client.wg.Wait()
close(client.ntfnChan)
}
continue
}
c.topologyClients.Store(clientID, &topologyClient{
ntfnChan: ntfnUpdate.ntfnChan,
exit: make(chan struct{}),
})
case <-c.quit:
return
}
}
}
// populateCache loads the entire channel graph into the in-memory graph cache.
//
// NOTE: This should only be called if the graphCache has been constructed.
@@ -234,6 +319,12 @@ func (c *ChannelGraph) AddLightningNode(node *models.LightningNode,
)
}
select {
case c.topologyUpdate <- node:
case <-c.quit:
return ErrChanGraphShuttingDown
}
return nil
}
@@ -276,6 +367,12 @@ func (c *ChannelGraph) AddChannelEdge(edge *models.ChannelEdgeInfo,
c.graphCache.AddChannel(edge, nil, nil)
}
select {
case c.topologyUpdate <- edge:
case <-c.quit:
return ErrChanGraphShuttingDown
}
return nil
}
@@ -411,6 +508,17 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
c.graphCache.Stats())
}
if len(edges) != 0 {
// Notify all currently registered clients of the newly closed
// channels.
closeSummaries := createCloseSummaries(
blockHeight, edges...,
)
c.notifyTopologyChange(&TopologyChange{
ClosedChannels: closeSummaries,
})
}
return edges, nil
}
@@ -527,16 +635,20 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy,
return err
}
if c.graphCache == nil {
return nil
if c.graphCache != nil {
var isUpdate1 bool
if edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
isUpdate1 = true
}
c.graphCache.UpdatePolicy(edge, from, to, isUpdate1)
}
var isUpdate1 bool
if edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
isUpdate1 = true
select {
case c.topologyUpdate <- edge:
case <-c.quit:
return ErrChanGraphShuttingDown
}
c.graphCache.UpdatePolicy(edge, from, to, isUpdate1)
return nil
}