Revert "graph: refactor Builder network message handling"

This reverts commit d757b3bcfc.
This commit is contained in:
Elle Mouton
2025-02-13 11:19:07 +02:00
parent 9c2c95d46f
commit f39a004662

View File

@@ -123,6 +123,11 @@ type Builder struct {
// of our currently known best chain are sent over.
staleBlocks <-chan *chainview.FilteredBlock
// networkUpdates is a channel that carries new topology updates
// messages from outside the Builder to be processed by the
// networkHandler.
networkUpdates chan *routingMsg
// topologyClients maps a client's unique notification ID to a
// topologyClient client that contains its notification dispatch
// channel.
@@ -159,6 +164,7 @@ var _ ChannelGraphSource = (*Builder)(nil)
func NewBuilder(cfg *Config) (*Builder, error) {
return &Builder{
cfg: cfg,
networkUpdates: make(chan *routingMsg),
topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{},
ntfnClientUpdates: make(chan *topologyClientUpdate),
channelEdgeMtx: multimutex.NewMutex[uint64](),
@@ -707,8 +713,8 @@ func (b *Builder) handleNetworkUpdate(update *routingMsg) {
// networkHandler is the primary goroutine for the Builder. The roles of
// this goroutine include answering queries related to the state of the
// network, pruning the graph on new block notification and registering new
// topology clients.
// network, pruning the graph on new block notification, applying network
// updates, and registering new topology clients.
//
// NOTE: This MUST be run as a goroutine.
func (b *Builder) networkHandler() {
@@ -728,6 +734,17 @@ func (b *Builder) networkHandler() {
}
select {
// A new fully validated network update has just arrived. As a
// result we'll modify the channel graph accordingly depending
// on the exact type of the message.
case update := <-b.networkUpdates:
b.wg.Add(1)
go b.handleNetworkUpdate(update)
// TODO(roasbeef): remove all unconnected vertexes
// after N blocks pass with no corresponding
// announcements.
case chainUpdate, ok := <-b.staleBlocks:
// If the channel has been closed, then this indicates
// the daemon is shutting down, so we exit ourselves.
@@ -1091,12 +1108,14 @@ func (b *Builder) AddNode(node *models.LightningNode,
err: make(chan error, 1),
}
b.wg.Add(1)
go b.handleNetworkUpdate(rMsg)
select {
case err := <-rMsg.err:
return err
case b.networkUpdates <- rMsg:
select {
case err := <-rMsg.err:
return err
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
@@ -1142,12 +1161,14 @@ func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo,
err: make(chan error, 1),
}
b.wg.Add(1)
go b.handleNetworkUpdate(rMsg)
select {
case err := <-rMsg.err:
return err
case b.networkUpdates <- rMsg:
select {
case err := <-rMsg.err:
return err
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
@@ -1250,12 +1271,14 @@ func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy,
err: make(chan error, 1),
}
b.wg.Add(1)
go b.handleNetworkUpdate(rMsg)
select {
case err := <-rMsg.err:
return err
case b.networkUpdates <- rMsg:
select {
case err := <-rMsg.err:
return err
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
case <-b.quit:
return ErrGraphBuilderShuttingDown
}