Merge pull request #9515 from ellemouton/graphFixFlake

graph: ensure topology subscriber handling and network msg handling is synchronous
This commit is contained in:
Yong 2025-02-17 20:48:05 +08:00 committed by GitHub
commit 65a18c5b35
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -123,6 +123,11 @@ type Builder struct {
// of our currently known best chain are sent over. // of our currently known best chain are sent over.
staleBlocks <-chan *chainview.FilteredBlock staleBlocks <-chan *chainview.FilteredBlock
// networkUpdates is a channel that carries new topology updates
// messages from outside the Builder to be processed by the
// networkHandler.
networkUpdates chan *routingMsg
// topologyClients maps a client's unique notification ID to a // topologyClients maps a client's unique notification ID to a
// topologyClient client that contains its notification dispatch // topologyClient client that contains its notification dispatch
// channel. // channel.
@ -159,6 +164,7 @@ var _ ChannelGraphSource = (*Builder)(nil)
func NewBuilder(cfg *Config) (*Builder, error) { func NewBuilder(cfg *Config) (*Builder, error) {
return &Builder{ return &Builder{
cfg: cfg, cfg: cfg,
networkUpdates: make(chan *routingMsg),
topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{}, topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{},
ntfnClientUpdates: make(chan *topologyClientUpdate), ntfnClientUpdates: make(chan *topologyClientUpdate),
channelEdgeMtx: multimutex.NewMutex[uint64](), channelEdgeMtx: multimutex.NewMutex[uint64](),
@ -707,8 +713,8 @@ func (b *Builder) handleNetworkUpdate(update *routingMsg) {
// networkHandler is the primary goroutine for the Builder. The roles of // networkHandler is the primary goroutine for the Builder. The roles of
// this goroutine include answering queries related to the state of the // this goroutine include answering queries related to the state of the
// network, pruning the graph on new block notification and registering new // network, pruning the graph on new block notification, applying network
// topology clients. // updates, and registering new topology clients.
// //
// NOTE: This MUST be run as a goroutine. // NOTE: This MUST be run as a goroutine.
func (b *Builder) networkHandler() { func (b *Builder) networkHandler() {
@ -728,6 +734,17 @@ func (b *Builder) networkHandler() {
} }
select { select {
// A new fully validated network update has just arrived. As a
// result we'll modify the channel graph accordingly depending
// on the exact type of the message.
case update := <-b.networkUpdates:
b.wg.Add(1)
go b.handleNetworkUpdate(update)
// TODO(roasbeef): remove all unconnected vertexes
// after N blocks pass with no corresponding
// announcements.
case chainUpdate, ok := <-b.staleBlocks: case chainUpdate, ok := <-b.staleBlocks:
// If the channel has been closed, then this indicates // If the channel has been closed, then this indicates
// the daemon is shutting down, so we exit ourselves. // the daemon is shutting down, so we exit ourselves.
@ -1091,15 +1108,17 @@ func (b *Builder) AddNode(node *models.LightningNode,
err: make(chan error, 1), err: make(chan error, 1),
} }
b.wg.Add(1) select {
go b.handleNetworkUpdate(rMsg) case b.networkUpdates <- rMsg:
select { select {
case err := <-rMsg.err: case err := <-rMsg.err:
return err return err
case <-b.quit: case <-b.quit:
return ErrGraphBuilderShuttingDown return ErrGraphBuilderShuttingDown
} }
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
} }
// addNode does some basic checks on the given LightningNode against what we // addNode does some basic checks on the given LightningNode against what we
@ -1142,15 +1161,17 @@ func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo,
err: make(chan error, 1), err: make(chan error, 1),
} }
b.wg.Add(1) select {
go b.handleNetworkUpdate(rMsg) case b.networkUpdates <- rMsg:
select { select {
case err := <-rMsg.err: case err := <-rMsg.err:
return err return err
case <-b.quit: case <-b.quit:
return ErrGraphBuilderShuttingDown return ErrGraphBuilderShuttingDown
} }
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
} }
// addEdge does some validation on the new channel edge against what we // addEdge does some validation on the new channel edge against what we
@ -1250,15 +1271,17 @@ func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy,
err: make(chan error, 1), err: make(chan error, 1),
} }
b.wg.Add(1) select {
go b.handleNetworkUpdate(rMsg) case b.networkUpdates <- rMsg:
select { select {
case err := <-rMsg.err: case err := <-rMsg.err:
return err return err
case <-b.quit: case <-b.quit:
return ErrGraphBuilderShuttingDown return ErrGraphBuilderShuttingDown
} }
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
} }
// updateEdge validates the new edge policy against what we currently have // updateEdge validates the new edge policy against what we currently have