diff --git a/graph/builder.go b/graph/builder.go index c379fe93f..3e1115553 100644 --- a/graph/builder.go +++ b/graph/builder.go @@ -123,10 +123,10 @@ type Builder struct { // of our currently known best chain are sent over. staleBlocks <-chan *chainview.FilteredBlock - // networkUpdates is a channel that carries new topology updates + // topologyUpdates is a channel that carries new topology updates // messages from outside the Builder to be processed by the // networkHandler. - networkUpdates chan *routingMsg + topologyUpdates chan any // topologyClients maps a client's unique notification ID to a // topologyClient client that contains its notification dispatch @@ -164,7 +164,7 @@ var _ ChannelGraphSource = (*Builder)(nil) func NewBuilder(cfg *Config) (*Builder, error) { return &Builder{ cfg: cfg, - networkUpdates: make(chan *routingMsg), + topologyUpdates: make(chan any), topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{}, ntfnClientUpdates: make(chan *topologyClientUpdate), channelEdgeMtx: multimutex.NewMutex[uint64](), @@ -656,59 +656,26 @@ func (b *Builder) pruneZombieChans() error { return nil } -// handleNetworkUpdate is responsible for processing the update message and -// notifies topology changes, if any. +// handleTopologyUpdate is responsible for sending any topology changes +// notifications to registered clients. // // NOTE: must be run inside goroutine. -func (b *Builder) handleNetworkUpdate(update *routingMsg) { +func (b *Builder) handleTopologyUpdate(update any) { defer b.wg.Done() - // Process the routing update to determine if this is either a new - // update from our PoV or an update to a prior vertex/edge we - // previously accepted. - var err error - switch msg := update.msg.(type) { - case *models.LightningNode: - err = b.addNode(msg, update.op...) - - case *models.ChannelEdgeInfo: - err = b.addEdge(msg, update.op...) - - case *models.ChannelEdgePolicy: - err = b.updateEdge(msg, update.op...) - - default: - err = errors.Errorf("wrong routing update message type") - } - update.err <- err - - // If the error is not nil here, there's no need to send topology - // change. - if err != nil { - // Log as a debug message if this is not an error we need to be - // concerned about. - if IsError(err, ErrIgnored, ErrOutdated) { - log.Debugf("process network updates got: %v", err) - } else { - log.Errorf("process network updates got: %v", err) - } - - return - } - - // Otherwise, we'll send off a new notification for the newly accepted - // update, if any. topChange := &TopologyChange{} - err = addToTopologyChange(b.cfg.Graph, topChange, update.msg) + err := addToTopologyChange(b.cfg.Graph, topChange, update) if err != nil { log.Errorf("unable to update topology change notification: %v", err) return } - if !topChange.isEmpty() { - b.notifyTopologyChange(topChange) + if topChange.isEmpty() { + return } + + b.notifyTopologyChange(topChange) } // networkHandler is the primary goroutine for the Builder. The roles of @@ -734,12 +701,11 @@ func (b *Builder) networkHandler() { } select { - // A new fully validated network update has just arrived. As a - // result we'll modify the channel graph accordingly depending - // on the exact type of the message. - case update := <-b.networkUpdates: + // A new fully validated topology update has just arrived. + // We'll notify any registered clients. + case update := <-b.topologyUpdates: b.wg.Add(1) - go b.handleNetworkUpdate(update) + go b.handleTopologyUpdate(update) // TODO(roasbeef): remove all unconnected vertexes // after N blocks pass with no corresponding @@ -1033,14 +999,6 @@ func (b *Builder) MarkZombieEdge(chanID uint64) error { return nil } -// routingMsg couples a routing related routing topology update to the -// error channel. -type routingMsg struct { - msg interface{} - op []batch.SchedulerOption - err chan error -} - // ApplyChannelUpdate validates a channel update and if valid, applies it to the // database. It returns a bool indicating whether the updates were successful. func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate1) bool { @@ -1102,23 +1060,20 @@ func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate1) bool { func (b *Builder) AddNode(node *models.LightningNode, op ...batch.SchedulerOption) error { - rMsg := &routingMsg{ - msg: node, - op: op, - err: make(chan error, 1), + err := b.addNode(node, op...) + if err != nil { + logNetworkMsgProcessError(err) + + return err } select { - case b.networkUpdates <- rMsg: - select { - case err := <-rMsg.err: - return err - case <-b.quit: - return ErrGraphBuilderShuttingDown - } + case b.topologyUpdates <- node: case <-b.quit: return ErrGraphBuilderShuttingDown } + + return nil } // addNode does some basic checks on the given LightningNode against what we @@ -1155,23 +1110,20 @@ func (b *Builder) addNode(node *models.LightningNode, func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo, op ...batch.SchedulerOption) error { - rMsg := &routingMsg{ - msg: edge, - op: op, - err: make(chan error, 1), + err := b.addEdge(edge, op...) + if err != nil { + logNetworkMsgProcessError(err) + + return err } select { - case b.networkUpdates <- rMsg: - select { - case err := <-rMsg.err: - return err - case <-b.quit: - return ErrGraphBuilderShuttingDown - } + case b.topologyUpdates <- edge: case <-b.quit: return ErrGraphBuilderShuttingDown } + + return nil } // addEdge does some validation on the new channel edge against what we @@ -1265,23 +1217,20 @@ func (b *Builder) addEdge(edge *models.ChannelEdgeInfo, func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy, op ...batch.SchedulerOption) error { - rMsg := &routingMsg{ - msg: update, - op: op, - err: make(chan error, 1), + err := b.updateEdge(update, op...) + if err != nil { + logNetworkMsgProcessError(err) + + return err } select { - case b.networkUpdates <- rMsg: - select { - case err := <-rMsg.err: - return err - case <-b.quit: - return ErrGraphBuilderShuttingDown - } + case b.topologyUpdates <- update: case <-b.quit: return ErrGraphBuilderShuttingDown } + + return nil } // updateEdge validates the new edge policy against what we currently have @@ -1375,6 +1324,18 @@ func (b *Builder) updateEdge(policy *models.ChannelEdgePolicy, return nil } +// logNetworkMsgProcessError logs the error received from processing a network +// message. It logs as a debug message if the error is not critical. +func logNetworkMsgProcessError(err error) { + if IsError(err, ErrIgnored, ErrOutdated) { + log.Debugf("process network updates got: %v", err) + + return + } + + log.Errorf("process network updates got: %v", err) +} + // CurrentBlockHeight returns the block height from POV of the router subsystem. // // NOTE: This method is part of the ChannelGraphSource interface.