diff --git a/graph/builder.go b/graph/builder.go index 572bcd5d8..4c6557b93 100644 --- a/graph/builder.go +++ b/graph/builder.go @@ -123,6 +123,11 @@ type Builder struct { // of our currently known best chain are sent over. staleBlocks <-chan *chainview.FilteredBlock + // networkUpdates is a channel that carries new topology updates + // messages from outside the Builder to be processed by the + // networkHandler. + networkUpdates chan *routingMsg + // topologyClients maps a client's unique notification ID to a // topologyClient client that contains its notification dispatch // channel. @@ -159,6 +164,7 @@ var _ ChannelGraphSource = (*Builder)(nil) func NewBuilder(cfg *Config) (*Builder, error) { return &Builder{ cfg: cfg, + networkUpdates: make(chan *routingMsg), topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{}, ntfnClientUpdates: make(chan *topologyClientUpdate), channelEdgeMtx: multimutex.NewMutex[uint64](), @@ -707,8 +713,8 @@ func (b *Builder) handleNetworkUpdate(update *routingMsg) { // networkHandler is the primary goroutine for the Builder. The roles of // this goroutine include answering queries related to the state of the -// network, pruning the graph on new block notification and registering new -// topology clients. +// network, pruning the graph on new block notification, applying network +// updates, and registering new topology clients. // // NOTE: This MUST be run as a goroutine. func (b *Builder) networkHandler() { @@ -728,6 +734,17 @@ func (b *Builder) networkHandler() { } select { + // A new fully validated network update has just arrived. As a + // result we'll modify the channel graph accordingly depending + // on the exact type of the message. + case update := <-b.networkUpdates: + b.wg.Add(1) + go b.handleNetworkUpdate(update) + + // TODO(roasbeef): remove all unconnected vertexes + // after N blocks pass with no corresponding + // announcements. + case chainUpdate, ok := <-b.staleBlocks: // If the channel has been closed, then this indicates // the daemon is shutting down, so we exit ourselves. @@ -1091,12 +1108,14 @@ func (b *Builder) AddNode(node *models.LightningNode, err: make(chan error, 1), } - b.wg.Add(1) - go b.handleNetworkUpdate(rMsg) - select { - case err := <-rMsg.err: - return err + case b.networkUpdates <- rMsg: + select { + case err := <-rMsg.err: + return err + case <-b.quit: + return ErrGraphBuilderShuttingDown + } case <-b.quit: return ErrGraphBuilderShuttingDown } @@ -1142,12 +1161,14 @@ func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo, err: make(chan error, 1), } - b.wg.Add(1) - go b.handleNetworkUpdate(rMsg) - select { - case err := <-rMsg.err: - return err + case b.networkUpdates <- rMsg: + select { + case err := <-rMsg.err: + return err + case <-b.quit: + return ErrGraphBuilderShuttingDown + } case <-b.quit: return ErrGraphBuilderShuttingDown } @@ -1250,12 +1271,14 @@ func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy, err: make(chan error, 1), } - b.wg.Add(1) - go b.handleNetworkUpdate(rMsg) - select { - case err := <-rMsg.err: - return err + case b.networkUpdates <- rMsg: + select { + case err := <-rMsg.err: + return err + case <-b.quit: + return ErrGraphBuilderShuttingDown + } case <-b.quit: return ErrGraphBuilderShuttingDown }