graph: refactor Builder network message handling

The exposed AddNode, AddEdge and UpdateEdge methods of the Builder are
currently synchronous since even though they pass messages to the
network handler which spins off the handling in a goroutine, the public
methods still wait for a response from the handling before returning.
The only part that is actually done asynchronously is the topology
notifications.

We previously tried to simplify things in [this
commit](d757b3bcfc)
but we soon realised that there was a reason for sending the messages to
the central/synchronous network handler first: it was to ensure
consistency for topology clients: ie, the ordering between when there is
a new topology client or if it is cancelled needs to be consistent and
handled synchronously with new network updates. So for example, if a new
update comes in right after a topology client cancels its subscription,
then it should _not_ be notified. Similariy for new subscriptions. So
this commit was reverted soon after.

We can, however, still simplify things as is done in this commit by
noting that _only topology subscriptions and notifications_ need to be
handled separately. The actual network updates do not need to. So that
is what is done here.

This refactor will make moving the topology subscription logic to a new
subsystem later on much easier.
This commit is contained in:
Elle Mouton 2025-02-20 10:40:15 -03:00
parent 27440e8957
commit c89b616e7d
No known key found for this signature in database
GPG Key ID: D7D916376026F177

View File

@ -123,10 +123,10 @@ type Builder struct {
// of our currently known best chain are sent over.
staleBlocks <-chan *chainview.FilteredBlock
// networkUpdates is a channel that carries new topology updates
// topologyUpdates is a channel that carries new topology updates
// messages from outside the Builder to be processed by the
// networkHandler.
networkUpdates chan *routingMsg
topologyUpdates chan any
// topologyClients maps a client's unique notification ID to a
// topologyClient client that contains its notification dispatch
@ -164,7 +164,7 @@ var _ ChannelGraphSource = (*Builder)(nil)
func NewBuilder(cfg *Config) (*Builder, error) {
return &Builder{
cfg: cfg,
networkUpdates: make(chan *routingMsg),
topologyUpdates: make(chan any),
topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{},
ntfnClientUpdates: make(chan *topologyClientUpdate),
channelEdgeMtx: multimutex.NewMutex[uint64](),
@ -656,59 +656,26 @@ func (b *Builder) pruneZombieChans() error {
return nil
}
// handleNetworkUpdate is responsible for processing the update message and
// notifies topology changes, if any.
// handleTopologyUpdate is responsible for sending any topology changes
// notifications to registered clients.
//
// NOTE: must be run inside goroutine.
func (b *Builder) handleNetworkUpdate(update *routingMsg) {
func (b *Builder) handleTopologyUpdate(update any) {
defer b.wg.Done()
// Process the routing update to determine if this is either a new
// update from our PoV or an update to a prior vertex/edge we
// previously accepted.
var err error
switch msg := update.msg.(type) {
case *models.LightningNode:
err = b.addNode(msg, update.op...)
case *models.ChannelEdgeInfo:
err = b.addEdge(msg, update.op...)
case *models.ChannelEdgePolicy:
err = b.updateEdge(msg, update.op...)
default:
err = errors.Errorf("wrong routing update message type")
}
update.err <- err
// If the error is not nil here, there's no need to send topology
// change.
if err != nil {
// Log as a debug message if this is not an error we need to be
// concerned about.
if IsError(err, ErrIgnored, ErrOutdated) {
log.Debugf("process network updates got: %v", err)
} else {
log.Errorf("process network updates got: %v", err)
}
return
}
// Otherwise, we'll send off a new notification for the newly accepted
// update, if any.
topChange := &TopologyChange{}
err = addToTopologyChange(b.cfg.Graph, topChange, update.msg)
err := addToTopologyChange(b.cfg.Graph, topChange, update)
if err != nil {
log.Errorf("unable to update topology change notification: %v",
err)
return
}
if !topChange.isEmpty() {
b.notifyTopologyChange(topChange)
if topChange.isEmpty() {
return
}
b.notifyTopologyChange(topChange)
}
// networkHandler is the primary goroutine for the Builder. The roles of
@ -734,12 +701,11 @@ func (b *Builder) networkHandler() {
}
select {
// A new fully validated network update has just arrived. As a
// result we'll modify the channel graph accordingly depending
// on the exact type of the message.
case update := <-b.networkUpdates:
// A new fully validated topology update has just arrived.
// We'll notify any registered clients.
case update := <-b.topologyUpdates:
b.wg.Add(1)
go b.handleNetworkUpdate(update)
go b.handleTopologyUpdate(update)
// TODO(roasbeef): remove all unconnected vertexes
// after N blocks pass with no corresponding
@ -1033,14 +999,6 @@ func (b *Builder) MarkZombieEdge(chanID uint64) error {
return nil
}
// routingMsg couples a routing related routing topology update to the
// error channel.
type routingMsg struct {
msg interface{}
op []batch.SchedulerOption
err chan error
}
// ApplyChannelUpdate validates a channel update and if valid, applies it to the
// database. It returns a bool indicating whether the updates were successful.
func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate1) bool {
@ -1102,23 +1060,20 @@ func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate1) bool {
func (b *Builder) AddNode(node *models.LightningNode,
op ...batch.SchedulerOption) error {
rMsg := &routingMsg{
msg: node,
op: op,
err: make(chan error, 1),
err := b.addNode(node, op...)
if err != nil {
logNetworkMsgProcessError(err)
return err
}
select {
case b.networkUpdates <- rMsg:
select {
case err := <-rMsg.err:
return err
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
case b.topologyUpdates <- node:
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
return nil
}
// addNode does some basic checks on the given LightningNode against what we
@ -1155,23 +1110,20 @@ func (b *Builder) addNode(node *models.LightningNode,
func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo,
op ...batch.SchedulerOption) error {
rMsg := &routingMsg{
msg: edge,
op: op,
err: make(chan error, 1),
err := b.addEdge(edge, op...)
if err != nil {
logNetworkMsgProcessError(err)
return err
}
select {
case b.networkUpdates <- rMsg:
select {
case err := <-rMsg.err:
return err
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
case b.topologyUpdates <- edge:
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
return nil
}
// addEdge does some validation on the new channel edge against what we
@ -1265,23 +1217,20 @@ func (b *Builder) addEdge(edge *models.ChannelEdgeInfo,
func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy,
op ...batch.SchedulerOption) error {
rMsg := &routingMsg{
msg: update,
op: op,
err: make(chan error, 1),
err := b.updateEdge(update, op...)
if err != nil {
logNetworkMsgProcessError(err)
return err
}
select {
case b.networkUpdates <- rMsg:
select {
case err := <-rMsg.err:
return err
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
case b.topologyUpdates <- update:
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
return nil
}
// updateEdge validates the new edge policy against what we currently have
@ -1375,6 +1324,18 @@ func (b *Builder) updateEdge(policy *models.ChannelEdgePolicy,
return nil
}
// logNetworkMsgProcessError logs the error received from processing a network
// message. It logs as a debug message if the error is not critical.
func logNetworkMsgProcessError(err error) {
if IsError(err, ErrIgnored, ErrOutdated) {
log.Debugf("process network updates got: %v", err)
return
}
log.Errorf("process network updates got: %v", err)
}
// CurrentBlockHeight returns the block height from POV of the router subsystem.
//
// NOTE: This method is part of the ChannelGraphSource interface.