From d757b3bcfc77b0fbb543ef5db800e6c6b058930f Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Wed, 5 Feb 2025 09:49:54 +0200 Subject: [PATCH] graph: refactor Builder network message handling The point of the `graph.Builder`'s `networkHandler` goroutine is to ensure that certain requests are handled in a synchronous fashion. However, any requests received on the `networkUpdates` channel, are currently immediately handled in a goroutine which calls `handleNetworkUpdate` which calls `processUpdate` before doing topology notifications. In other words, there is no reason for these `networkUpdates` to be handled in the `networkHandler` since they are always handled asynchronously anyways. This design is most likely due to the fact that originally the gossiper and graph builder code lived in the same system and so the pattern was copied across. So in this commit, we just remove the complexity. The only part we need to spin off in a goroutine is the topology notifications. --- graph/builder.go | 57 +++++++++++++++--------------------------------- 1 file changed, 17 insertions(+), 40 deletions(-) diff --git a/graph/builder.go b/graph/builder.go index 3b88cf947..dbe36b943 100644 --- a/graph/builder.go +++ b/graph/builder.go @@ -131,11 +131,6 @@ type Builder struct { // of our currently known best chain are sent over. staleBlocks <-chan *chainview.FilteredBlock - // networkUpdates is a channel that carries new topology updates - // messages from outside the Builder to be processed by the - // networkHandler. - networkUpdates chan *routingMsg - // topologyClients maps a client's unique notification ID to a // topologyClient client that contains its notification dispatch // channel. @@ -172,7 +167,6 @@ var _ ChannelGraphSource = (*Builder)(nil) func NewBuilder(cfg *Config) (*Builder, error) { return &Builder{ cfg: cfg, - networkUpdates: make(chan *routingMsg), topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{}, ntfnClientUpdates: make(chan *topologyClientUpdate), channelEdgeMtx: multimutex.NewMutex[uint64](), @@ -721,8 +715,8 @@ func (b *Builder) handleNetworkUpdate(update *routingMsg) { // networkHandler is the primary goroutine for the Builder. The roles of // this goroutine include answering queries related to the state of the -// network, pruning the graph on new block notification, applying network -// updates, and registering new topology clients. +// network, pruning the graph on new block notification and registering new +// topology clients. // // NOTE: This MUST be run as a goroutine. func (b *Builder) networkHandler() { @@ -742,17 +736,6 @@ func (b *Builder) networkHandler() { } select { - // A new fully validated network update has just arrived. As a - // result we'll modify the channel graph accordingly depending - // on the exact type of the message. - case update := <-b.networkUpdates: - b.wg.Add(1) - go b.handleNetworkUpdate(update) - - // TODO(roasbeef): remove all unconnected vertexes - // after N blocks pass with no corresponding - // announcements. - case chainUpdate, ok := <-b.staleBlocks: // If the channel has been closed, then this indicates // the daemon is shutting down, so we exit ourselves. @@ -1182,14 +1165,12 @@ func (b *Builder) AddNode(node *models.LightningNode, err: make(chan error, 1), } + b.wg.Add(1) + go b.handleNetworkUpdate(rMsg) + select { - case b.networkUpdates <- rMsg: - select { - case err := <-rMsg.err: - return err - case <-b.quit: - return ErrGraphBuilderShuttingDown - } + case err := <-rMsg.err: + return err case <-b.quit: return ErrGraphBuilderShuttingDown } @@ -1235,14 +1216,12 @@ func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo, err: make(chan error, 1), } + b.wg.Add(1) + go b.handleNetworkUpdate(rMsg) + select { - case b.networkUpdates <- rMsg: - select { - case err := <-rMsg.err: - return err - case <-b.quit: - return ErrGraphBuilderShuttingDown - } + case err := <-rMsg.err: + return err case <-b.quit: return ErrGraphBuilderShuttingDown } @@ -1437,14 +1416,12 @@ func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy, err: make(chan error, 1), } + b.wg.Add(1) + go b.handleNetworkUpdate(rMsg) + select { - case b.networkUpdates <- rMsg: - select { - case err := <-rMsg.err: - return err - case <-b.quit: - return ErrGraphBuilderShuttingDown - } + case err := <-rMsg.err: + return err case <-b.quit: return ErrGraphBuilderShuttingDown }