Merge pull request #9534 from ellemouton/graph13

graph: refactor Builder network message handling
This commit is contained in:
Oliver Gugger
2025-02-21 08:38:35 -06:00
committed by GitHub

View File

@@ -123,10 +123,10 @@ type Builder struct {
// of our currently known best chain are sent over. // of our currently known best chain are sent over.
staleBlocks <-chan *chainview.FilteredBlock staleBlocks <-chan *chainview.FilteredBlock
// networkUpdates is a channel that carries new topology updates // topologyUpdates is a channel that carries new topology updates
// messages from outside the Builder to be processed by the // messages from outside the Builder to be processed by the
// networkHandler. // networkHandler.
networkUpdates chan *routingMsg topologyUpdates chan any
// topologyClients maps a client's unique notification ID to a // topologyClients maps a client's unique notification ID to a
// topologyClient client that contains its notification dispatch // topologyClient client that contains its notification dispatch
@@ -164,7 +164,7 @@ var _ ChannelGraphSource = (*Builder)(nil)
func NewBuilder(cfg *Config) (*Builder, error) { func NewBuilder(cfg *Config) (*Builder, error) {
return &Builder{ return &Builder{
cfg: cfg, cfg: cfg,
networkUpdates: make(chan *routingMsg), topologyUpdates: make(chan any),
topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{}, topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{},
ntfnClientUpdates: make(chan *topologyClientUpdate), ntfnClientUpdates: make(chan *topologyClientUpdate),
channelEdgeMtx: multimutex.NewMutex[uint64](), channelEdgeMtx: multimutex.NewMutex[uint64](),
@@ -656,59 +656,26 @@ func (b *Builder) pruneZombieChans() error {
return nil return nil
} }
// handleNetworkUpdate is responsible for processing the update message and // handleTopologyUpdate is responsible for sending any topology changes
// notifies topology changes, if any. // notifications to registered clients.
// //
// NOTE: must be run inside goroutine. // NOTE: must be run inside goroutine.
func (b *Builder) handleNetworkUpdate(update *routingMsg) { func (b *Builder) handleTopologyUpdate(update any) {
defer b.wg.Done() defer b.wg.Done()
// Process the routing update to determine if this is either a new
// update from our PoV or an update to a prior vertex/edge we
// previously accepted.
var err error
switch msg := update.msg.(type) {
case *models.LightningNode:
err = b.addNode(msg, update.op...)
case *models.ChannelEdgeInfo:
err = b.addEdge(msg, update.op...)
case *models.ChannelEdgePolicy:
err = b.updateEdge(msg, update.op...)
default:
err = errors.Errorf("wrong routing update message type")
}
update.err <- err
// If the error is not nil here, there's no need to send topology
// change.
if err != nil {
// Log as a debug message if this is not an error we need to be
// concerned about.
if IsError(err, ErrIgnored, ErrOutdated) {
log.Debugf("process network updates got: %v", err)
} else {
log.Errorf("process network updates got: %v", err)
}
return
}
// Otherwise, we'll send off a new notification for the newly accepted
// update, if any.
topChange := &TopologyChange{} topChange := &TopologyChange{}
err = addToTopologyChange(b.cfg.Graph, topChange, update.msg) err := addToTopologyChange(b.cfg.Graph, topChange, update)
if err != nil { if err != nil {
log.Errorf("unable to update topology change notification: %v", log.Errorf("unable to update topology change notification: %v",
err) err)
return return
} }
if !topChange.isEmpty() { if topChange.isEmpty() {
b.notifyTopologyChange(topChange) return
} }
b.notifyTopologyChange(topChange)
} }
// networkHandler is the primary goroutine for the Builder. The roles of // networkHandler is the primary goroutine for the Builder. The roles of
@@ -734,12 +701,11 @@ func (b *Builder) networkHandler() {
} }
select { select {
// A new fully validated network update has just arrived. As a // A new fully validated topology update has just arrived.
// result we'll modify the channel graph accordingly depending // We'll notify any registered clients.
// on the exact type of the message. case update := <-b.topologyUpdates:
case update := <-b.networkUpdates:
b.wg.Add(1) b.wg.Add(1)
go b.handleNetworkUpdate(update) go b.handleTopologyUpdate(update)
// TODO(roasbeef): remove all unconnected vertexes // TODO(roasbeef): remove all unconnected vertexes
// after N blocks pass with no corresponding // after N blocks pass with no corresponding
@@ -1033,14 +999,6 @@ func (b *Builder) MarkZombieEdge(chanID uint64) error {
return nil return nil
} }
// routingMsg couples a routing related routing topology update to the
// error channel.
type routingMsg struct {
msg interface{}
op []batch.SchedulerOption
err chan error
}
// ApplyChannelUpdate validates a channel update and if valid, applies it to the // ApplyChannelUpdate validates a channel update and if valid, applies it to the
// database. It returns a bool indicating whether the updates were successful. // database. It returns a bool indicating whether the updates were successful.
func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate1) bool { func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate1) bool {
@@ -1102,23 +1060,20 @@ func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate1) bool {
func (b *Builder) AddNode(node *models.LightningNode, func (b *Builder) AddNode(node *models.LightningNode,
op ...batch.SchedulerOption) error { op ...batch.SchedulerOption) error {
rMsg := &routingMsg{ err := b.addNode(node, op...)
msg: node, if err != nil {
op: op, logNetworkMsgProcessError(err)
err: make(chan error, 1),
return err
} }
select { select {
case b.networkUpdates <- rMsg: case b.topologyUpdates <- node:
select {
case err := <-rMsg.err:
return err
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
case <-b.quit: case <-b.quit:
return ErrGraphBuilderShuttingDown return ErrGraphBuilderShuttingDown
} }
return nil
} }
// addNode does some basic checks on the given LightningNode against what we // addNode does some basic checks on the given LightningNode against what we
@@ -1155,23 +1110,20 @@ func (b *Builder) addNode(node *models.LightningNode,
func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo, func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo,
op ...batch.SchedulerOption) error { op ...batch.SchedulerOption) error {
rMsg := &routingMsg{ err := b.addEdge(edge, op...)
msg: edge, if err != nil {
op: op, logNetworkMsgProcessError(err)
err: make(chan error, 1),
return err
} }
select { select {
case b.networkUpdates <- rMsg: case b.topologyUpdates <- edge:
select {
case err := <-rMsg.err:
return err
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
case <-b.quit: case <-b.quit:
return ErrGraphBuilderShuttingDown return ErrGraphBuilderShuttingDown
} }
return nil
} }
// addEdge does some validation on the new channel edge against what we // addEdge does some validation on the new channel edge against what we
@@ -1265,23 +1217,20 @@ func (b *Builder) addEdge(edge *models.ChannelEdgeInfo,
func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy, func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy,
op ...batch.SchedulerOption) error { op ...batch.SchedulerOption) error {
rMsg := &routingMsg{ err := b.updateEdge(update, op...)
msg: update, if err != nil {
op: op, logNetworkMsgProcessError(err)
err: make(chan error, 1),
return err
} }
select { select {
case b.networkUpdates <- rMsg: case b.topologyUpdates <- update:
select {
case err := <-rMsg.err:
return err
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
case <-b.quit: case <-b.quit:
return ErrGraphBuilderShuttingDown return ErrGraphBuilderShuttingDown
} }
return nil
} }
// updateEdge validates the new edge policy against what we currently have // updateEdge validates the new edge policy against what we currently have
@@ -1375,6 +1324,18 @@ func (b *Builder) updateEdge(policy *models.ChannelEdgePolicy,
return nil return nil
} }
// logNetworkMsgProcessError logs the error received from processing a network
// message. It logs as a debug message if the error is not critical.
func logNetworkMsgProcessError(err error) {
if IsError(err, ErrIgnored, ErrOutdated) {
log.Debugf("process network updates got: %v", err)
return
}
log.Errorf("process network updates got: %v", err)
}
// CurrentBlockHeight returns the block height from POV of the router subsystem. // CurrentBlockHeight returns the block height from POV of the router subsystem.
// //
// NOTE: This method is part of the ChannelGraphSource interface. // NOTE: This method is part of the ChannelGraphSource interface.