From 1974903fb22cbaae6bdac067ba2f7ecad76e9647 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Wed, 29 Jan 2025 11:15:37 +0200 Subject: [PATCH 1/5] multi: move node ann validation code to netann pkg The `netann` package is a more appropriate place for this code to live. Also, once the funding transaction code is moved out of the `graph.Builder`, then no `lnwire` validation will occur in the `graph` package. --- discovery/gossiper.go | 2 +- graph/ann_validation.go | 47 ------------------------------------- netann/node_announcement.go | 41 ++++++++++++++++++++++++++++++++ 3 files changed, 42 insertions(+), 48 deletions(-) delete mode 100644 graph/ann_validation.go diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 290e529bd..cb6db7a86 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1977,7 +1977,7 @@ func (d *AuthenticatedGossiper) fetchPKScript(chanID *lnwire.ShortChannelID) ( func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement, op ...batch.SchedulerOption) error { - if err := graph.ValidateNodeAnn(msg); err != nil { + if err := netann.ValidateNodeAnn(msg); err != nil { return fmt.Errorf("unable to validate node announcement: %w", err) } diff --git a/graph/ann_validation.go b/graph/ann_validation.go deleted file mode 100644 index 3c93d06e5..000000000 --- a/graph/ann_validation.go +++ /dev/null @@ -1,47 +0,0 @@ -package graph - -import ( - "bytes" - - "github.com/btcsuite/btcd/btcec/v2" - "github.com/btcsuite/btcd/chaincfg/chainhash" - "github.com/go-errors/errors" - "github.com/lightningnetwork/lnd/lnwire" -) - -// ValidateNodeAnn validates the node announcement by ensuring that the -// attached signature is needed a signature of the node announcement under the -// specified node public key. -func ValidateNodeAnn(a *lnwire.NodeAnnouncement) error { - // Reconstruct the data of announcement which should be covered by the - // signature so we can verify the signature shortly below - data, err := a.DataToSign() - if err != nil { - return err - } - - nodeSig, err := a.Signature.ToSignature() - if err != nil { - return err - } - nodeKey, err := btcec.ParsePubKey(a.NodeID[:]) - if err != nil { - return err - } - - // Finally ensure that the passed signature is valid, if not we'll - // return an error so this node announcement can be rejected. - dataHash := chainhash.DoubleHashB(data) - if !nodeSig.Verify(dataHash, nodeKey) { - var msgBuf bytes.Buffer - if _, err := lnwire.WriteMessage(&msgBuf, a, 0); err != nil { - return err - } - - return errors.Errorf("signature on NodeAnnouncement(%x) is "+ - "invalid: %x", nodeKey.SerializeCompressed(), - msgBuf.Bytes()) - } - - return nil -} diff --git a/netann/node_announcement.go b/netann/node_announcement.go index 5b6f7a430..3b5114c7d 100644 --- a/netann/node_announcement.go +++ b/netann/node_announcement.go @@ -1,10 +1,14 @@ package netann import ( + "bytes" "image/color" "net" "time" + "github.com/btcsuite/btcd/btcec/v2" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" @@ -76,3 +80,40 @@ func SignNodeAnnouncement(signer lnwallet.MessageSigner, nodeAnn.Signature, err = lnwire.NewSigFromSignature(sig) return err } + +// ValidateNodeAnn validates the node announcement by ensuring that the +// attached signature is needed a signature of the node announcement under the +// specified node public key. +func ValidateNodeAnn(a *lnwire.NodeAnnouncement) error { + // Reconstruct the data of announcement which should be covered by the + // signature so we can verify the signature shortly below + data, err := a.DataToSign() + if err != nil { + return err + } + + nodeSig, err := a.Signature.ToSignature() + if err != nil { + return err + } + nodeKey, err := btcec.ParsePubKey(a.NodeID[:]) + if err != nil { + return err + } + + // Finally ensure that the passed signature is valid, if not we'll + // return an error so this node announcement can be rejected. + dataHash := chainhash.DoubleHashB(data) + if !nodeSig.Verify(dataHash, nodeKey) { + var msgBuf bytes.Buffer + if _, err := lnwire.WriteMessage(&msgBuf, a, 0); err != nil { + return err + } + + return errors.Errorf("signature on NodeAnnouncement(%x) is "+ + "invalid: %x", nodeKey.SerializeCompressed(), + msgBuf.Bytes()) + } + + return nil +} From 276b335cf5dae5cbf2cf4e40c168d8c96bccc843 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Wed, 5 Feb 2025 09:39:33 +0200 Subject: [PATCH 2/5] graph: refactor announcement handling logic In this commit, we remove the `processUpdate` method which handles each announement type (node, channel, channel update) in a separate switch case. Each of these cases currently has a non-trivial amount of code. This commit creates separate methods for each message type we want to handle instead. This removes a level of indentation and will make things easier to review when we start editing the code for each handler. --- graph/builder.go | 605 ++++++++++++++++++++++++----------------------- 1 file changed, 307 insertions(+), 298 deletions(-) diff --git a/graph/builder.go b/graph/builder.go index 632156282..3b88cf947 100644 --- a/graph/builder.go +++ b/graph/builder.go @@ -674,7 +674,20 @@ func (b *Builder) handleNetworkUpdate(update *routingMsg) { // Process the routing update to determine if this is either a new // update from our PoV or an update to a prior vertex/edge we // previously accepted. - err := b.processUpdate(update.msg, update.op...) + var err error + switch msg := update.msg.(type) { + case *models.LightningNode: + err = b.addNode(msg, update.op...) + + case *models.ChannelEdgeInfo: + err = b.addEdge(msg, update.op...) + + case *models.ChannelEdgePolicy: + err = b.updateEdge(msg, update.op...) + + default: + err = errors.Errorf("wrong routing update message type") + } update.err <- err // If the error is not nil here, there's no need to send topology @@ -1094,303 +1107,6 @@ func makeFundingScript(bitcoinKey1, bitcoinKey2 []byte, chanFeatures []byte, return legacyFundingScript() } -// processUpdate processes a new relate authenticated channel/edge, node or -// channel/edge update network update. If the update didn't affect the internal -// state of the draft due to either being out of date, invalid, or redundant, -// then error is returned. -// -//nolint:funlen -func (b *Builder) processUpdate(msg interface{}, - op ...batch.SchedulerOption) error { - - switch msg := msg.(type) { - case *models.LightningNode: - // Before we add the node to the database, we'll check to see - // if the announcement is "fresh" or not. If it isn't, then - // we'll return an error. - err := b.assertNodeAnnFreshness(msg.PubKeyBytes, msg.LastUpdate) - if err != nil { - return err - } - - if err := b.cfg.Graph.AddLightningNode(msg, op...); err != nil { - return errors.Errorf("unable to add node %x to the "+ - "graph: %v", msg.PubKeyBytes, err) - } - - log.Tracef("Updated vertex data for node=%x", msg.PubKeyBytes) - b.stats.incNumNodeUpdates() - - case *models.ChannelEdgeInfo: - log.Debugf("Received ChannelEdgeInfo for channel %v", - msg.ChannelID) - - // Prior to processing the announcement we first check if we - // already know of this channel, if so, then we can exit early. - _, _, exists, isZombie, err := b.cfg.Graph.HasChannelEdge( - msg.ChannelID, - ) - if err != nil && - !errors.Is(err, graphdb.ErrGraphNoEdgesFound) { - - return errors.Errorf("unable to check for edge "+ - "existence: %v", err) - } - if isZombie { - return NewErrf(ErrIgnored, "ignoring msg for zombie "+ - "chan_id=%v", msg.ChannelID) - } - if exists { - return NewErrf(ErrIgnored, "ignoring msg for known "+ - "chan_id=%v", msg.ChannelID) - } - - // If AssumeChannelValid is present, then we are unable to - // perform any of the expensive checks below, so we'll - // short-circuit our path straight to adding the edge to our - // graph. If the passed ShortChannelID is an alias, then we'll - // skip validation as it will not map to a legitimate tx. This - // is not a DoS vector as only we can add an alias - // ChannelAnnouncement from the gossiper. - scid := lnwire.NewShortChanIDFromInt(msg.ChannelID) - if b.cfg.AssumeChannelValid || b.cfg.IsAlias(scid) { - err := b.cfg.Graph.AddChannelEdge(msg, op...) - if err != nil { - return fmt.Errorf("unable to add edge: %w", err) - } - log.Tracef("New channel discovered! Link "+ - "connects %x and %x with ChannelID(%v)", - msg.NodeKey1Bytes, msg.NodeKey2Bytes, - msg.ChannelID) - b.stats.incNumEdgesDiscovered() - - break - } - - // Before we can add the channel to the channel graph, we need - // to obtain the full funding outpoint that's encoded within - // the channel ID. - channelID := lnwire.NewShortChanIDFromInt(msg.ChannelID) - fundingTx, err := lnwallet.FetchFundingTxWrapper( - b.cfg.Chain, &channelID, b.quit, - ) - if err != nil { - //nolint:ll - // - // In order to ensure we don't erroneously mark a - // channel as a zombie due to an RPC failure, we'll - // attempt to string match for the relevant errors. - // - // * btcd: - // * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1316 - // * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1086 - // * bitcoind: - // * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L770 - // * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L954 - switch { - case strings.Contains(err.Error(), "not found"): - fallthrough - - case strings.Contains(err.Error(), "out of range"): - // If the funding transaction isn't found at - // all, then we'll mark the edge itself as a - // zombie so we don't continue to request it. - // We use the "zero key" for both node pubkeys - // so this edge can't be resurrected. - zErr := b.addZombieEdge(msg.ChannelID) - if zErr != nil { - return zErr - } - - default: - } - - return NewErrf(ErrNoFundingTransaction, "unable to "+ - "locate funding tx: %v", err) - } - - // Recreate witness output to be sure that declared in channel - // edge bitcoin keys and channel value corresponds to the - // reality. - fundingPkScript, err := makeFundingScript( - msg.BitcoinKey1Bytes[:], msg.BitcoinKey2Bytes[:], - msg.Features, msg.TapscriptRoot, - ) - if err != nil { - return err - } - - // Next we'll validate that this channel is actually well - // formed. If this check fails, then this channel either - // doesn't exist, or isn't the one that was meant to be created - // according to the passed channel proofs. - fundingPoint, err := chanvalidate.Validate( - &chanvalidate.Context{ - Locator: &chanvalidate.ShortChanIDChanLocator{ - ID: channelID, - }, - MultiSigPkScript: fundingPkScript, - FundingTx: fundingTx, - }, - ) - if err != nil { - // Mark the edge as a zombie so we won't try to - // re-validate it on start up. - if err := b.addZombieEdge(msg.ChannelID); err != nil { - return err - } - - return NewErrf(ErrInvalidFundingOutput, "output "+ - "failed validation: %w", err) - } - - // Now that we have the funding outpoint of the channel, ensure - // that it hasn't yet been spent. If so, then this channel has - // been closed so we'll ignore it. - chanUtxo, err := b.cfg.Chain.GetUtxo( - fundingPoint, fundingPkScript, channelID.BlockHeight, - b.quit, - ) - if err != nil { - if errors.Is(err, btcwallet.ErrOutputSpent) { - zErr := b.addZombieEdge(msg.ChannelID) - if zErr != nil { - return zErr - } - } - - return NewErrf(ErrChannelSpent, "unable to fetch utxo "+ - "for chan_id=%v, chan_point=%v: %v", - msg.ChannelID, fundingPoint, err) - } - - // TODO(roasbeef): this is a hack, needs to be removed - // after commitment fees are dynamic. - msg.Capacity = btcutil.Amount(chanUtxo.Value) - msg.ChannelPoint = *fundingPoint - if err := b.cfg.Graph.AddChannelEdge(msg, op...); err != nil { - return errors.Errorf("unable to add edge: %v", err) - } - - log.Debugf("New channel discovered! Link "+ - "connects %x and %x with ChannelPoint(%v): "+ - "chan_id=%v, capacity=%v", - msg.NodeKey1Bytes, msg.NodeKey2Bytes, - fundingPoint, msg.ChannelID, msg.Capacity) - b.stats.incNumEdgesDiscovered() - - // As a new edge has been added to the channel graph, we'll - // update the current UTXO filter within our active - // FilteredChainView so we are notified if/when this channel is - // closed. - filterUpdate := []graphdb.EdgePoint{ - { - FundingPkScript: fundingPkScript, - OutPoint: *fundingPoint, - }, - } - err = b.cfg.ChainView.UpdateFilter( - filterUpdate, b.bestHeight.Load(), - ) - if err != nil { - return errors.Errorf("unable to update chain "+ - "view: %v", err) - } - - case *models.ChannelEdgePolicy: - log.Debugf("Received ChannelEdgePolicy for channel %v", - msg.ChannelID) - - // We make sure to hold the mutex for this channel ID, - // such that no other goroutine is concurrently doing - // database accesses for the same channel ID. - b.channelEdgeMtx.Lock(msg.ChannelID) - defer b.channelEdgeMtx.Unlock(msg.ChannelID) - - edge1Timestamp, edge2Timestamp, exists, isZombie, err := - b.cfg.Graph.HasChannelEdge(msg.ChannelID) - if err != nil && !errors.Is( - err, graphdb.ErrGraphNoEdgesFound, - ) { - - return errors.Errorf("unable to check for edge "+ - "existence: %v", err) - } - - // If the channel is marked as a zombie in our database, and - // we consider this a stale update, then we should not apply the - // policy. - isStaleUpdate := time.Since(msg.LastUpdate) > - b.cfg.ChannelPruneExpiry - - if isZombie && isStaleUpdate { - return NewErrf(ErrIgnored, "ignoring stale update "+ - "(flags=%v|%v) for zombie chan_id=%v", - msg.MessageFlags, msg.ChannelFlags, - msg.ChannelID) - } - - // If the channel doesn't exist in our database, we cannot - // apply the updated policy. - if !exists { - return NewErrf(ErrIgnored, "ignoring update "+ - "(flags=%v|%v) for unknown chan_id=%v", - msg.MessageFlags, msg.ChannelFlags, - msg.ChannelID) - } - - log.Debugf("Found edge1Timestamp=%v, edge2Timestamp=%v", - edge1Timestamp, edge2Timestamp) - - // As edges are directional edge node has a unique policy for - // the direction of the edge they control. Therefore, we first - // check if we already have the most up-to-date information for - // that edge. If this message has a timestamp not strictly - // newer than what we already know of we can exit early. - switch msg.ChannelFlags & lnwire.ChanUpdateDirection { - // A flag set of 0 indicates this is an announcement for the - // "first" node in the channel. - case 0: - // Ignore outdated message. - if !edge1Timestamp.Before(msg.LastUpdate) { - return NewErrf(ErrOutdated, "Ignoring "+ - "outdated update (flags=%v|%v) for "+ - "known chan_id=%v", msg.MessageFlags, - msg.ChannelFlags, msg.ChannelID) - } - - // Similarly, a flag set of 1 indicates this is an announcement - // for the "second" node in the channel. - case 1: - // Ignore outdated message. - if !edge2Timestamp.Before(msg.LastUpdate) { - return NewErrf(ErrOutdated, "Ignoring "+ - "outdated update (flags=%v|%v) for "+ - "known chan_id=%v", msg.MessageFlags, - msg.ChannelFlags, msg.ChannelID) - } - } - - // Now that we know this isn't a stale update, we'll apply the - // new edge policy to the proper directional edge within the - // channel graph. - if err = b.cfg.Graph.UpdateEdgePolicy(msg, op...); err != nil { - err := errors.Errorf("unable to add channel: %v", err) - log.Error(err) - return err - } - - log.Tracef("New channel update applied: %v", - lnutils.SpewLogClosure(msg)) - b.stats.incNumChannelUpdates() - - default: - return errors.Errorf("wrong routing update message type") - } - - return nil -} - // routingMsg couples a routing related routing topology update to the // error channel. type routingMsg struct { @@ -1479,6 +1195,32 @@ func (b *Builder) AddNode(node *models.LightningNode, } } +// addNode does some basic checks on the given LightningNode against what we +// currently have persisted in the graph, and then adds it to the graph. If we +// already know about the node, then we only update our DB if the new update +// has a newer timestamp than the last one we received. +func (b *Builder) addNode(node *models.LightningNode, + op ...batch.SchedulerOption) error { + + // Before we add the node to the database, we'll check to see if the + // announcement is "fresh" or not. If it isn't, then we'll return an + // error. + err := b.assertNodeAnnFreshness(node.PubKeyBytes, node.LastUpdate) + if err != nil { + return err + } + + if err := b.cfg.Graph.AddLightningNode(node, op...); err != nil { + return errors.Errorf("unable to add node %x to the "+ + "graph: %v", node.PubKeyBytes, err) + } + + log.Tracef("Updated vertex data for node=%x", node.PubKeyBytes) + b.stats.incNumNodeUpdates() + + return nil +} + // AddEdge is used to add edge/channel to the topology of the router, after all // information about channel will be gathered this edge/channel might be used // in construction of payment path. @@ -1506,6 +1248,182 @@ func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo, } } +// addEdge does some validation on the new channel edge against what we +// currently have persisted in the graph, and then adds it to the graph. The +// Chain View is updated with the new edge if it is successfully added to the +// graph. We only persist the channel if we currently dont have it at all in +// our graph. +// +// TODO(elle): this currently also does funding-transaction validation. But this +// should be moved to the gossiper instead. +func (b *Builder) addEdge(edge *models.ChannelEdgeInfo, + op ...batch.SchedulerOption) error { + + log.Debugf("Received ChannelEdgeInfo for channel %v", edge.ChannelID) + + // Prior to processing the announcement we first check if we + // already know of this channel, if so, then we can exit early. + _, _, exists, isZombie, err := b.cfg.Graph.HasChannelEdge( + edge.ChannelID, + ) + if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) { + return errors.Errorf("unable to check for edge existence: %v", + err) + } + if isZombie { + return NewErrf(ErrIgnored, "ignoring msg for zombie chan_id=%v", + edge.ChannelID) + } + if exists { + return NewErrf(ErrIgnored, "ignoring msg for known chan_id=%v", + edge.ChannelID) + } + + // If AssumeChannelValid is present, then we are unable to perform any + // of the expensive checks below, so we'll short-circuit our path + // straight to adding the edge to our graph. If the passed + // ShortChannelID is an alias, then we'll skip validation as it will + // not map to a legitimate tx. This is not a DoS vector as only we can + // add an alias ChannelAnnouncement from the gossiper. + scid := lnwire.NewShortChanIDFromInt(edge.ChannelID) + if b.cfg.AssumeChannelValid || b.cfg.IsAlias(scid) { + err := b.cfg.Graph.AddChannelEdge(edge, op...) + if err != nil { + return fmt.Errorf("unable to add edge: %w", err) + } + log.Tracef("New channel discovered! Link connects %x and %x "+ + "with ChannelID(%v)", edge.NodeKey1Bytes, + edge.NodeKey2Bytes, edge.ChannelID) + b.stats.incNumEdgesDiscovered() + + return nil + } + + // Before we can add the channel to the channel graph, we need to obtain + // the full funding outpoint that's encoded within the channel ID. + channelID := lnwire.NewShortChanIDFromInt(edge.ChannelID) + fundingTx, err := lnwallet.FetchFundingTxWrapper( + b.cfg.Chain, &channelID, b.quit, + ) + if err != nil { + //nolint:ll + // + // In order to ensure we don't erroneously mark a channel as a + // zombie due to an RPC failure, we'll attempt to string match + // for the relevant errors. + // + // * btcd: + // * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1316 + // * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1086 + // * bitcoind: + // * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L770 + // * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L954 + switch { + case strings.Contains(err.Error(), "not found"): + fallthrough + + case strings.Contains(err.Error(), "out of range"): + // If the funding transaction isn't found at all, then + // we'll mark the edge itself as a zombie so we don't + // continue to request it. We use the "zero key" for + // both node pubkeys so this edge can't be resurrected. + zErr := b.addZombieEdge(edge.ChannelID) + if zErr != nil { + return zErr + } + + default: + } + + return NewErrf(ErrNoFundingTransaction, "unable to "+ + "locate funding tx: %v", err) + } + + // Recreate witness output to be sure that declared in channel edge + // bitcoin keys and channel value corresponds to the reality. + fundingPkScript, err := makeFundingScript( + edge.BitcoinKey1Bytes[:], edge.BitcoinKey2Bytes[:], + edge.Features, edge.TapscriptRoot, + ) + if err != nil { + return err + } + + // Next we'll validate that this channel is actually well formed. If + // this check fails, then this channel either doesn't exist, or isn't + // the one that was meant to be created according to the passed channel + // proofs. + fundingPoint, err := chanvalidate.Validate( + &chanvalidate.Context{ + Locator: &chanvalidate.ShortChanIDChanLocator{ + ID: channelID, + }, + MultiSigPkScript: fundingPkScript, + FundingTx: fundingTx, + }, + ) + if err != nil { + // Mark the edge as a zombie so we won't try to re-validate it + // on start up. + if err := b.addZombieEdge(edge.ChannelID); err != nil { + return err + } + + return NewErrf(ErrInvalidFundingOutput, "output failed "+ + "validation: %w", err) + } + + // Now that we have the funding outpoint of the channel, ensure + // that it hasn't yet been spent. If so, then this channel has + // been closed so we'll ignore it. + chanUtxo, err := b.cfg.Chain.GetUtxo( + fundingPoint, fundingPkScript, channelID.BlockHeight, b.quit, + ) + if err != nil { + if errors.Is(err, btcwallet.ErrOutputSpent) { + zErr := b.addZombieEdge(edge.ChannelID) + if zErr != nil { + return zErr + } + } + + return NewErrf(ErrChannelSpent, "unable to fetch utxo for "+ + "chan_id=%v, chan_point=%v: %v", edge.ChannelID, + fundingPoint, err) + } + + // TODO(roasbeef): this is a hack, needs to be removed after commitment + // fees are dynamic. + edge.Capacity = btcutil.Amount(chanUtxo.Value) + edge.ChannelPoint = *fundingPoint + if err := b.cfg.Graph.AddChannelEdge(edge, op...); err != nil { + return errors.Errorf("unable to add edge: %v", err) + } + + log.Debugf("New channel discovered! Link connects %x and %x with "+ + "ChannelPoint(%v): chan_id=%v, capacity=%v", edge.NodeKey1Bytes, + edge.NodeKey2Bytes, fundingPoint, edge.ChannelID, edge.Capacity) + b.stats.incNumEdgesDiscovered() + + // As a new edge has been added to the channel graph, we'll update the + // current UTXO filter within our active FilteredChainView so we are + // notified if/when this channel is closed. + filterUpdate := []graphdb.EdgePoint{ + { + FundingPkScript: fundingPkScript, + OutPoint: *fundingPoint, + }, + } + + err = b.cfg.ChainView.UpdateFilter(filterUpdate, b.bestHeight.Load()) + if err != nil { + return errors.Errorf("unable to update chain "+ + "view: %v", err) + } + + return nil +} + // UpdateEdge is used to update edge information, without this message edge // considered as not fully constructed. // @@ -1532,6 +1450,97 @@ func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy, } } +// updateEdge validates the new edge policy against what we currently have +// persisted in the graph, and then applies it to the graph if the update is +// considered fresh enough and if we actually have a channel persisted for the +// given update. +func (b *Builder) updateEdge(policy *models.ChannelEdgePolicy, + op ...batch.SchedulerOption) error { + + log.Debugf("Received ChannelEdgePolicy for channel %v", + policy.ChannelID) + + // We make sure to hold the mutex for this channel ID, such that no + // other goroutine is concurrently doing database accesses for the same + // channel ID. + b.channelEdgeMtx.Lock(policy.ChannelID) + defer b.channelEdgeMtx.Unlock(policy.ChannelID) + + edge1Timestamp, edge2Timestamp, exists, isZombie, err := + b.cfg.Graph.HasChannelEdge(policy.ChannelID) + if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) { + return errors.Errorf("unable to check for edge existence: %v", + err) + } + + // If the channel is marked as a zombie in our database, and + // we consider this a stale update, then we should not apply the + // policy. + isStaleUpdate := time.Since(policy.LastUpdate) > + b.cfg.ChannelPruneExpiry + + if isZombie && isStaleUpdate { + return NewErrf(ErrIgnored, "ignoring stale update "+ + "(flags=%v|%v) for zombie chan_id=%v", + policy.MessageFlags, policy.ChannelFlags, + policy.ChannelID) + } + + // If the channel doesn't exist in our database, we cannot apply the + // updated policy. + if !exists { + return NewErrf(ErrIgnored, "ignoring update (flags=%v|%v) for "+ + "unknown chan_id=%v", policy.MessageFlags, + policy.ChannelFlags, policy.ChannelID) + } + + log.Debugf("Found edge1Timestamp=%v, edge2Timestamp=%v", + edge1Timestamp, edge2Timestamp) + + // As edges are directional edge node has a unique policy for the + // direction of the edge they control. Therefore, we first check if we + // already have the most up-to-date information for that edge. If this + // message has a timestamp not strictly newer than what we already know + // of we can exit early. + switch policy.ChannelFlags & lnwire.ChanUpdateDirection { + // A flag set of 0 indicates this is an announcement for the "first" + // node in the channel. + case 0: + // Ignore outdated message. + if !edge1Timestamp.Before(policy.LastUpdate) { + return NewErrf(ErrOutdated, "Ignoring "+ + "outdated update (flags=%v|%v) for "+ + "known chan_id=%v", policy.MessageFlags, + policy.ChannelFlags, policy.ChannelID) + } + + // Similarly, a flag set of 1 indicates this is an announcement + // for the "second" node in the channel. + case 1: + // Ignore outdated message. + if !edge2Timestamp.Before(policy.LastUpdate) { + return NewErrf(ErrOutdated, "Ignoring "+ + "outdated update (flags=%v|%v) for "+ + "known chan_id=%v", policy.MessageFlags, + policy.ChannelFlags, policy.ChannelID) + } + } + + // Now that we know this isn't a stale update, we'll apply the new edge + // policy to the proper directional edge within the channel graph. + if err = b.cfg.Graph.UpdateEdgePolicy(policy, op...); err != nil { + err := errors.Errorf("unable to add channel: %v", err) + log.Error(err) + return err + } + + log.Tracef("New channel update applied: %v", + lnutils.SpewLogClosure(policy)) + b.stats.incNumChannelUpdates() + + return nil +} + // CurrentBlockHeight returns the block height from POV of the router subsystem. // // NOTE: This method is part of the ChannelGraphSource interface. From d757b3bcfc77b0fbb543ef5db800e6c6b058930f Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Wed, 5 Feb 2025 09:49:54 +0200 Subject: [PATCH 3/5] graph: refactor Builder network message handling The point of the `graph.Builder`'s `networkHandler` goroutine is to ensure that certain requests are handled in a synchronous fashion. However, any requests received on the `networkUpdates` channel, are currently immediately handled in a goroutine which calls `handleNetworkUpdate` which calls `processUpdate` before doing topology notifications. In other words, there is no reason for these `networkUpdates` to be handled in the `networkHandler` since they are always handled asynchronously anyways. This design is most likely due to the fact that originally the gossiper and graph builder code lived in the same system and so the pattern was copied across. So in this commit, we just remove the complexity. The only part we need to spin off in a goroutine is the topology notifications. --- graph/builder.go | 57 +++++++++++++++--------------------------------- 1 file changed, 17 insertions(+), 40 deletions(-) diff --git a/graph/builder.go b/graph/builder.go index 3b88cf947..dbe36b943 100644 --- a/graph/builder.go +++ b/graph/builder.go @@ -131,11 +131,6 @@ type Builder struct { // of our currently known best chain are sent over. staleBlocks <-chan *chainview.FilteredBlock - // networkUpdates is a channel that carries new topology updates - // messages from outside the Builder to be processed by the - // networkHandler. - networkUpdates chan *routingMsg - // topologyClients maps a client's unique notification ID to a // topologyClient client that contains its notification dispatch // channel. @@ -172,7 +167,6 @@ var _ ChannelGraphSource = (*Builder)(nil) func NewBuilder(cfg *Config) (*Builder, error) { return &Builder{ cfg: cfg, - networkUpdates: make(chan *routingMsg), topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{}, ntfnClientUpdates: make(chan *topologyClientUpdate), channelEdgeMtx: multimutex.NewMutex[uint64](), @@ -721,8 +715,8 @@ func (b *Builder) handleNetworkUpdate(update *routingMsg) { // networkHandler is the primary goroutine for the Builder. The roles of // this goroutine include answering queries related to the state of the -// network, pruning the graph on new block notification, applying network -// updates, and registering new topology clients. +// network, pruning the graph on new block notification and registering new +// topology clients. // // NOTE: This MUST be run as a goroutine. func (b *Builder) networkHandler() { @@ -742,17 +736,6 @@ func (b *Builder) networkHandler() { } select { - // A new fully validated network update has just arrived. As a - // result we'll modify the channel graph accordingly depending - // on the exact type of the message. - case update := <-b.networkUpdates: - b.wg.Add(1) - go b.handleNetworkUpdate(update) - - // TODO(roasbeef): remove all unconnected vertexes - // after N blocks pass with no corresponding - // announcements. - case chainUpdate, ok := <-b.staleBlocks: // If the channel has been closed, then this indicates // the daemon is shutting down, so we exit ourselves. @@ -1182,14 +1165,12 @@ func (b *Builder) AddNode(node *models.LightningNode, err: make(chan error, 1), } + b.wg.Add(1) + go b.handleNetworkUpdate(rMsg) + select { - case b.networkUpdates <- rMsg: - select { - case err := <-rMsg.err: - return err - case <-b.quit: - return ErrGraphBuilderShuttingDown - } + case err := <-rMsg.err: + return err case <-b.quit: return ErrGraphBuilderShuttingDown } @@ -1235,14 +1216,12 @@ func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo, err: make(chan error, 1), } + b.wg.Add(1) + go b.handleNetworkUpdate(rMsg) + select { - case b.networkUpdates <- rMsg: - select { - case err := <-rMsg.err: - return err - case <-b.quit: - return ErrGraphBuilderShuttingDown - } + case err := <-rMsg.err: + return err case <-b.quit: return ErrGraphBuilderShuttingDown } @@ -1437,14 +1416,12 @@ func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy, err: make(chan error, 1), } + b.wg.Add(1) + go b.handleNetworkUpdate(rMsg) + select { - case b.networkUpdates <- rMsg: - select { - case err := <-rMsg.err: - return err - case <-b.quit: - return ErrGraphBuilderShuttingDown - } + case err := <-rMsg.err: + return err case <-b.quit: return ErrGraphBuilderShuttingDown } From 6169b47d655c547f69fe185ba7c6cd1c4461c2ca Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Wed, 5 Feb 2025 09:54:22 +0200 Subject: [PATCH 4/5] graph: rename routerStats to builderStats This logic used to be handled by the router. Update to reflect new owner. --- graph/builder.go | 4 ++-- graph/stats.go | 20 ++++++++++---------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/graph/builder.go b/graph/builder.go index dbe36b943..57f00c02a 100644 --- a/graph/builder.go +++ b/graph/builder.go @@ -153,7 +153,7 @@ type Builder struct { // stats tracks newly processed channels, updates, and node // announcements over a window of defaultStatInterval. - stats *routerStats + stats *builderStats quit chan struct{} wg sync.WaitGroup @@ -171,7 +171,7 @@ func NewBuilder(cfg *Config) (*Builder, error) { ntfnClientUpdates: make(chan *topologyClientUpdate), channelEdgeMtx: multimutex.NewMutex[uint64](), statTicker: ticker.New(defaultStatInterval), - stats: new(routerStats), + stats: new(builderStats), quit: make(chan struct{}), }, nil } diff --git a/graph/stats.go b/graph/stats.go index 91e897ae5..a6c19ce63 100644 --- a/graph/stats.go +++ b/graph/stats.go @@ -6,9 +6,9 @@ import ( "time" ) -// routerStats is a struct that tracks various updates to the graph and +// builderStats is a struct that tracks various updates to the graph and // facilitates aggregate logging of the statistics. -type routerStats struct { +type builderStats struct { numChannels uint32 numUpdates uint32 numNodes uint32 @@ -18,28 +18,28 @@ type routerStats struct { } // incNumEdges increments the number of discovered edges. -func (g *routerStats) incNumEdgesDiscovered() { +func (g *builderStats) incNumEdgesDiscovered() { g.mu.Lock() g.numChannels++ g.mu.Unlock() } // incNumUpdates increments the number of channel updates processed. -func (g *routerStats) incNumChannelUpdates() { +func (g *builderStats) incNumChannelUpdates() { g.mu.Lock() g.numUpdates++ g.mu.Unlock() } // incNumNodeUpdates increments the number of node updates processed. -func (g *routerStats) incNumNodeUpdates() { +func (g *builderStats) incNumNodeUpdates() { g.mu.Lock() g.numNodes++ g.mu.Unlock() } // Empty returns true if all stats are zero. -func (g *routerStats) Empty() bool { +func (g *builderStats) Empty() bool { g.mu.RLock() isEmpty := g.numChannels == 0 && g.numUpdates == 0 && @@ -48,8 +48,8 @@ func (g *routerStats) Empty() bool { return isEmpty } -// Reset clears any router stats and sets the lastReset field to now. -func (g *routerStats) Reset() { +// Reset clears any stats and sets the lastReset field to now. +func (g *builderStats) Reset() { g.mu.Lock() g.numChannels = 0 g.numUpdates = 0 @@ -58,8 +58,8 @@ func (g *routerStats) Reset() { g.mu.Unlock() } -// String returns a human-readable description of the router stats. -func (g *routerStats) String() string { +// String returns a human-readable description of the stats. +func (g *builderStats) String() string { g.mu.RLock() str := fmt.Sprintf("Processed channels=%d updates=%d nodes=%d in "+ "last %v", g.numChannels, g.numUpdates, g.numNodes, From a86a5edbd5379251fab87589ae52726a37892fe6 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Wed, 5 Feb 2025 10:07:23 +0200 Subject: [PATCH 5/5] docs: update release notes --- docs/release-notes/release-notes-0.19.0.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/release-notes/release-notes-0.19.0.md b/docs/release-notes/release-notes-0.19.0.md index 6979d796b..8956526ea 100644 --- a/docs/release-notes/release-notes-0.19.0.md +++ b/docs/release-notes/release-notes-0.19.0.md @@ -239,6 +239,9 @@ The underlying functionality between those two options remain the same. * [Golang was updated to `v1.22.11`](https://github.com/lightningnetwork/lnd/pull/9462). +* Various refactors to simplify the + `graph.Builder` [1](https://github.com/lightningnetwork/lnd/pull/9476). + ## Breaking Changes ## Performance Improvements