graph/db: handle topology updates in a single location

In this commit, we ensure that any topology update is forced to go via
the `handleTopologySubscriptions` handler so that client subscriptions
and updates are handled correctly and in the correct order.

This removes a bug that could result from a client missing a
notification about a channel being closed if the client is subscribed
and shortly after, `PruneGraph` is called which would notify all
subscribed clients and possibly do so before the client subscription has
actually been persisted.
This commit is contained in:
Elle Mouton
2025-05-12 14:36:49 +02:00
parent ee25c228e9
commit 0155d5d7b0
3 changed files with 18 additions and 4 deletions

View File

@@ -492,9 +492,12 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
closeSummaries := createCloseSummaries(
blockHeight, edges...,
)
c.notifyTopologyChange(&TopologyChange{
ClosedChannels: closeSummaries,
})
select {
case c.topologyUpdate <- closeSummaries:
case <-c.quit:
return nil, ErrChanGraphShuttingDown
}
}
return edges, nil

View File

@@ -149,6 +149,9 @@ type topologyClient struct {
// notifyTopologyChange notifies all registered clients of a new change in
// graph topology in a non-blocking.
//
// NOTE: this should only ever be called from a call-stack originating from the
// handleTopologySubscriptions handler.
func (c *ChannelGraph) notifyTopologyChange(topologyDiff *TopologyChange) {
// notifyClient is a helper closure that will send topology updates to
// the given client.
@@ -194,7 +197,8 @@ func (c *ChannelGraph) notifyTopologyChange(topologyDiff *TopologyChange) {
// handleTopologyUpdate is responsible for sending any topology changes
// notifications to registered clients.
//
// NOTE: must be run inside goroutine.
// NOTE: must be run inside goroutine and must only ever be called from within
// handleTopologySubscriptions.
func (c *ChannelGraph) handleTopologyUpdate(update any) {
defer c.wg.Done()
@@ -445,6 +449,10 @@ func (c *ChannelGraph) addToTopologyChange(update *TopologyChange,
edgeUpdate)
return nil
case []*ClosedChanSummary:
update.ClosedChannels = append(update.ClosedChannels, m...)
return nil
default:
return fmt.Errorf("unable to add to topology change, "+
"unknown message type %T", msg)