graph: refactor Builder network message handling

The point of the `graph.Builder`'s `networkHandler` goroutine is to
ensure that certain requests are handled in a synchronous fashion.
However, any requests received on the `networkUpdates` channel, are
currently immediately handled in a goroutine which calls
`handleNetworkUpdate` which calls `processUpdate` before doing topology
notifications. In other words, there is no reason for these
`networkUpdates` to be handled in the `networkHandler` since they are
always handled asynchronously anyways. This design is most likely due to
the fact that originally the gossiper and graph builder code lived in
the same system and so the pattern was copied across.

So in this commit, we just remove the complexity. The only part we need
to spin off in a goroutine is the topology notifications.
This commit is contained in:
Elle Mouton
2025-02-05 09:49:54 +02:00
parent 276b335cf5
commit d757b3bcfc

View File

@ -131,11 +131,6 @@ type Builder struct {
// of our currently known best chain are sent over. // of our currently known best chain are sent over.
staleBlocks <-chan *chainview.FilteredBlock staleBlocks <-chan *chainview.FilteredBlock
// networkUpdates is a channel that carries new topology updates
// messages from outside the Builder to be processed by the
// networkHandler.
networkUpdates chan *routingMsg
// topologyClients maps a client's unique notification ID to a // topologyClients maps a client's unique notification ID to a
// topologyClient client that contains its notification dispatch // topologyClient client that contains its notification dispatch
// channel. // channel.
@ -172,7 +167,6 @@ var _ ChannelGraphSource = (*Builder)(nil)
func NewBuilder(cfg *Config) (*Builder, error) { func NewBuilder(cfg *Config) (*Builder, error) {
return &Builder{ return &Builder{
cfg: cfg, cfg: cfg,
networkUpdates: make(chan *routingMsg),
topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{}, topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{},
ntfnClientUpdates: make(chan *topologyClientUpdate), ntfnClientUpdates: make(chan *topologyClientUpdate),
channelEdgeMtx: multimutex.NewMutex[uint64](), channelEdgeMtx: multimutex.NewMutex[uint64](),
@ -721,8 +715,8 @@ func (b *Builder) handleNetworkUpdate(update *routingMsg) {
// networkHandler is the primary goroutine for the Builder. The roles of // networkHandler is the primary goroutine for the Builder. The roles of
// this goroutine include answering queries related to the state of the // this goroutine include answering queries related to the state of the
// network, pruning the graph on new block notification, applying network // network, pruning the graph on new block notification and registering new
// updates, and registering new topology clients. // topology clients.
// //
// NOTE: This MUST be run as a goroutine. // NOTE: This MUST be run as a goroutine.
func (b *Builder) networkHandler() { func (b *Builder) networkHandler() {
@ -742,17 +736,6 @@ func (b *Builder) networkHandler() {
} }
select { select {
// A new fully validated network update has just arrived. As a
// result we'll modify the channel graph accordingly depending
// on the exact type of the message.
case update := <-b.networkUpdates:
b.wg.Add(1)
go b.handleNetworkUpdate(update)
// TODO(roasbeef): remove all unconnected vertexes
// after N blocks pass with no corresponding
// announcements.
case chainUpdate, ok := <-b.staleBlocks: case chainUpdate, ok := <-b.staleBlocks:
// If the channel has been closed, then this indicates // If the channel has been closed, then this indicates
// the daemon is shutting down, so we exit ourselves. // the daemon is shutting down, so we exit ourselves.
@ -1182,17 +1165,15 @@ func (b *Builder) AddNode(node *models.LightningNode,
err: make(chan error, 1), err: make(chan error, 1),
} }
select { b.wg.Add(1)
case b.networkUpdates <- rMsg: go b.handleNetworkUpdate(rMsg)
select { select {
case err := <-rMsg.err: case err := <-rMsg.err:
return err return err
case <-b.quit: case <-b.quit:
return ErrGraphBuilderShuttingDown return ErrGraphBuilderShuttingDown
} }
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
} }
// addNode does some basic checks on the given LightningNode against what we // addNode does some basic checks on the given LightningNode against what we
@ -1235,17 +1216,15 @@ func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo,
err: make(chan error, 1), err: make(chan error, 1),
} }
select { b.wg.Add(1)
case b.networkUpdates <- rMsg: go b.handleNetworkUpdate(rMsg)
select { select {
case err := <-rMsg.err: case err := <-rMsg.err:
return err return err
case <-b.quit: case <-b.quit:
return ErrGraphBuilderShuttingDown return ErrGraphBuilderShuttingDown
} }
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
} }
// addEdge does some validation on the new channel edge against what we // addEdge does some validation on the new channel edge against what we
@ -1437,17 +1416,15 @@ func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy,
err: make(chan error, 1), err: make(chan error, 1),
} }
select { b.wg.Add(1)
case b.networkUpdates <- rMsg: go b.handleNetworkUpdate(rMsg)
select { select {
case err := <-rMsg.err: case err := <-rMsg.err:
return err return err
case <-b.quit: case <-b.quit:
return ErrGraphBuilderShuttingDown return ErrGraphBuilderShuttingDown
} }
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
} }
// updateEdge validates the new edge policy against what we currently have // updateEdge validates the new edge policy against what we currently have