mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-05-11 12:20:20 +02:00
discovery: thread context through to gossiper
Pass the parent LND context to the gossiper, let it derive a child context that gets cancelled on Stop. Pass the context through to any methods that will eventually thread it through to any graph DB calls. One `context.TODO()` is added here - this will be removed in the next commit. NOTE: for any internal methods that the context gets passed to, if those methods already listen on the gossiper's `quit` channel, then then don't need to also listen on the passed context's Done() channel because the quit channel is closed at the same time that the context is cancelled.
This commit is contained in:
parent
62db6e2a98
commit
9f6740e638
@ -2,6 +2,7 @@ package discovery
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
@ -475,9 +476,6 @@ type AuthenticatedGossiper struct {
|
|||||||
// held.
|
// held.
|
||||||
bestHeight uint32
|
bestHeight uint32
|
||||||
|
|
||||||
quit chan struct{}
|
|
||||||
wg sync.WaitGroup
|
|
||||||
|
|
||||||
// cfg is a copy of the configuration struct that the gossiper service
|
// cfg is a copy of the configuration struct that the gossiper service
|
||||||
// was initialized with.
|
// was initialized with.
|
||||||
cfg *Config
|
cfg *Config
|
||||||
@ -555,6 +553,10 @@ type AuthenticatedGossiper struct {
|
|||||||
vb *ValidationBarrier
|
vb *ValidationBarrier
|
||||||
|
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
|
|
||||||
|
cancel fn.Option[context.CancelFunc]
|
||||||
|
quit chan struct{}
|
||||||
|
wg sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new AuthenticatedGossiper instance, initialized with the
|
// New creates a new AuthenticatedGossiper instance, initialized with the
|
||||||
@ -600,7 +602,11 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper
|
|||||||
NotifyWhenOnline: cfg.NotifyWhenOnline,
|
NotifyWhenOnline: cfg.NotifyWhenOnline,
|
||||||
NotifyWhenOffline: cfg.NotifyWhenOffline,
|
NotifyWhenOffline: cfg.NotifyWhenOffline,
|
||||||
MessageStore: cfg.MessageStore,
|
MessageStore: cfg.MessageStore,
|
||||||
IsMsgStale: gossiper.isMsgStale,
|
IsMsgStale: func(message lnwire.Message) bool {
|
||||||
|
ctx := context.TODO()
|
||||||
|
|
||||||
|
return gossiper.isMsgStale(ctx, message)
|
||||||
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
return gossiper
|
return gossiper
|
||||||
@ -641,16 +647,19 @@ func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate(
|
|||||||
|
|
||||||
// Start spawns network messages handler goroutine and registers on new block
|
// Start spawns network messages handler goroutine and registers on new block
|
||||||
// notifications in order to properly handle the premature announcements.
|
// notifications in order to properly handle the premature announcements.
|
||||||
func (d *AuthenticatedGossiper) Start() error {
|
func (d *AuthenticatedGossiper) Start(ctx context.Context) error {
|
||||||
var err error
|
var err error
|
||||||
d.started.Do(func() {
|
d.started.Do(func() {
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
d.cancel = fn.Some(cancel)
|
||||||
|
|
||||||
log.Info("Authenticated Gossiper starting")
|
log.Info("Authenticated Gossiper starting")
|
||||||
err = d.start()
|
err = d.start(ctx)
|
||||||
})
|
})
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *AuthenticatedGossiper) start() error {
|
func (d *AuthenticatedGossiper) start(ctx context.Context) error {
|
||||||
// First we register for new notifications of newly discovered blocks.
|
// First we register for new notifications of newly discovered blocks.
|
||||||
// We do this immediately so we'll later be able to consume any/all
|
// We do this immediately so we'll later be able to consume any/all
|
||||||
// blocks which were discovered.
|
// blocks which were discovered.
|
||||||
@ -680,7 +689,7 @@ func (d *AuthenticatedGossiper) start() error {
|
|||||||
// Start receiving blocks in its dedicated goroutine.
|
// Start receiving blocks in its dedicated goroutine.
|
||||||
d.wg.Add(2)
|
d.wg.Add(2)
|
||||||
go d.syncBlockHeight()
|
go d.syncBlockHeight()
|
||||||
go d.networkHandler()
|
go d.networkHandler(ctx)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -835,6 +844,7 @@ func (d *AuthenticatedGossiper) stop() {
|
|||||||
|
|
||||||
d.banman.stop()
|
d.banman.stop()
|
||||||
|
|
||||||
|
d.cancel.WhenSome(func(fn context.CancelFunc) { fn() })
|
||||||
close(d.quit)
|
close(d.quit)
|
||||||
d.wg.Wait()
|
d.wg.Wait()
|
||||||
|
|
||||||
@ -1337,7 +1347,7 @@ func (d *AuthenticatedGossiper) splitAnnouncementBatches(
|
|||||||
// split size, and then sends out all items to the set of target peers. Locally
|
// split size, and then sends out all items to the set of target peers. Locally
|
||||||
// generated announcements are always sent before remotely generated
|
// generated announcements are always sent before remotely generated
|
||||||
// announcements.
|
// announcements.
|
||||||
func (d *AuthenticatedGossiper) splitAndSendAnnBatch(
|
func (d *AuthenticatedGossiper) splitAndSendAnnBatch(ctx context.Context,
|
||||||
annBatch msgsToBroadcast) {
|
annBatch msgsToBroadcast) {
|
||||||
|
|
||||||
// delayNextBatch is a helper closure that blocks for `SubBatchDelay`
|
// delayNextBatch is a helper closure that blocks for `SubBatchDelay`
|
||||||
@ -1374,7 +1384,7 @@ func (d *AuthenticatedGossiper) splitAndSendAnnBatch(
|
|||||||
|
|
||||||
// Now send the remote announcements.
|
// Now send the remote announcements.
|
||||||
for _, annBatch := range remoteBatches {
|
for _, annBatch := range remoteBatches {
|
||||||
d.sendRemoteBatch(annBatch)
|
d.sendRemoteBatch(ctx, annBatch)
|
||||||
delayNextBatch()
|
delayNextBatch()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -1398,7 +1408,9 @@ func (d *AuthenticatedGossiper) sendLocalBatch(annBatch []msgWithSenders) {
|
|||||||
|
|
||||||
// sendRemoteBatch broadcasts a list of remotely generated announcements to our
|
// sendRemoteBatch broadcasts a list of remotely generated announcements to our
|
||||||
// peers.
|
// peers.
|
||||||
func (d *AuthenticatedGossiper) sendRemoteBatch(annBatch []msgWithSenders) {
|
func (d *AuthenticatedGossiper) sendRemoteBatch(_ context.Context,
|
||||||
|
annBatch []msgWithSenders) {
|
||||||
|
|
||||||
syncerPeers := d.syncMgr.GossipSyncers()
|
syncerPeers := d.syncMgr.GossipSyncers()
|
||||||
|
|
||||||
// We'll first attempt to filter out this new message for all peers
|
// We'll first attempt to filter out this new message for all peers
|
||||||
@ -1431,7 +1443,7 @@ func (d *AuthenticatedGossiper) sendRemoteBatch(annBatch []msgWithSenders) {
|
|||||||
// broadcasting our latest topology state to all connected peers.
|
// broadcasting our latest topology state to all connected peers.
|
||||||
//
|
//
|
||||||
// NOTE: This MUST be run as a goroutine.
|
// NOTE: This MUST be run as a goroutine.
|
||||||
func (d *AuthenticatedGossiper) networkHandler() {
|
func (d *AuthenticatedGossiper) networkHandler(ctx context.Context) {
|
||||||
defer d.wg.Done()
|
defer d.wg.Done()
|
||||||
|
|
||||||
// Initialize empty deDupedAnnouncements to store announcement batch.
|
// Initialize empty deDupedAnnouncements to store announcement batch.
|
||||||
@ -1446,7 +1458,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
|||||||
|
|
||||||
// To start, we'll first check to see if there are any stale channel or
|
// To start, we'll first check to see if there are any stale channel or
|
||||||
// node announcements that we need to re-transmit.
|
// node announcements that we need to re-transmit.
|
||||||
if err := d.retransmitStaleAnns(time.Now()); err != nil {
|
if err := d.retransmitStaleAnns(ctx, time.Now()); err != nil {
|
||||||
log.Errorf("Unable to rebroadcast stale announcements: %v", err)
|
log.Errorf("Unable to rebroadcast stale announcements: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1463,7 +1475,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
|||||||
// the affected channels and also update the underlying
|
// the affected channels and also update the underlying
|
||||||
// graph with the new state.
|
// graph with the new state.
|
||||||
newChanUpdates, err := d.processChanPolicyUpdate(
|
newChanUpdates, err := d.processChanPolicyUpdate(
|
||||||
policyUpdate.edgesToUpdate,
|
ctx, policyUpdate.edgesToUpdate,
|
||||||
)
|
)
|
||||||
policyUpdate.errChan <- err
|
policyUpdate.errChan <- err
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1488,7 +1500,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
|||||||
// messages that we'll process serially.
|
// messages that we'll process serially.
|
||||||
case *lnwire.AnnounceSignatures1:
|
case *lnwire.AnnounceSignatures1:
|
||||||
emittedAnnouncements, _ := d.processNetworkAnnouncement(
|
emittedAnnouncements, _ := d.processNetworkAnnouncement(
|
||||||
announcement,
|
ctx, announcement,
|
||||||
)
|
)
|
||||||
log.Debugf("Processed network message %s, "+
|
log.Debugf("Processed network message %s, "+
|
||||||
"returned len(announcements)=%v",
|
"returned len(announcements)=%v",
|
||||||
@ -1528,7 +1540,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
|||||||
|
|
||||||
d.wg.Add(1)
|
d.wg.Add(1)
|
||||||
go d.handleNetworkMessages(
|
go d.handleNetworkMessages(
|
||||||
announcement, &announcements, annJobID,
|
ctx, announcement, &announcements, annJobID,
|
||||||
)
|
)
|
||||||
|
|
||||||
// The trickle timer has ticked, which indicates we should
|
// The trickle timer has ticked, which indicates we should
|
||||||
@ -1551,7 +1563,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
|||||||
// announcements, we'll blast them out w/o regard for
|
// announcements, we'll blast them out w/o regard for
|
||||||
// our peer's policies so we ensure they propagate
|
// our peer's policies so we ensure they propagate
|
||||||
// properly.
|
// properly.
|
||||||
d.splitAndSendAnnBatch(announcementBatch)
|
d.splitAndSendAnnBatch(ctx, announcementBatch)
|
||||||
|
|
||||||
// The retransmission timer has ticked which indicates that we
|
// The retransmission timer has ticked which indicates that we
|
||||||
// should check if we need to prune or re-broadcast any of our
|
// should check if we need to prune or re-broadcast any of our
|
||||||
@ -1560,7 +1572,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
|||||||
// have been dropped, or not properly propagated through the
|
// have been dropped, or not properly propagated through the
|
||||||
// network.
|
// network.
|
||||||
case tick := <-d.cfg.RetransmitTicker.Ticks():
|
case tick := <-d.cfg.RetransmitTicker.Ticks():
|
||||||
if err := d.retransmitStaleAnns(tick); err != nil {
|
if err := d.retransmitStaleAnns(ctx, tick); err != nil {
|
||||||
log.Errorf("unable to rebroadcast stale "+
|
log.Errorf("unable to rebroadcast stale "+
|
||||||
"announcements: %v", err)
|
"announcements: %v", err)
|
||||||
}
|
}
|
||||||
@ -1578,8 +1590,8 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
|||||||
// signal its dependants and add the new announcements to the announce batch.
|
// signal its dependants and add the new announcements to the announce batch.
|
||||||
//
|
//
|
||||||
// NOTE: must be run as a goroutine.
|
// NOTE: must be run as a goroutine.
|
||||||
func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,
|
func (d *AuthenticatedGossiper) handleNetworkMessages(ctx context.Context,
|
||||||
deDuped *deDupedAnnouncements, jobID JobID) {
|
nMsg *networkMsg, deDuped *deDupedAnnouncements, jobID JobID) {
|
||||||
|
|
||||||
defer d.wg.Done()
|
defer d.wg.Done()
|
||||||
defer d.vb.CompleteJob()
|
defer d.vb.CompleteJob()
|
||||||
@ -1607,7 +1619,7 @@ func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,
|
|||||||
// Process the network announcement to determine if this is either a
|
// Process the network announcement to determine if this is either a
|
||||||
// new announcement from our PoV or an edges to a prior vertex/edge we
|
// new announcement from our PoV or an edges to a prior vertex/edge we
|
||||||
// previously proceeded.
|
// previously proceeded.
|
||||||
newAnns, allow := d.processNetworkAnnouncement(nMsg)
|
newAnns, allow := d.processNetworkAnnouncement(ctx, nMsg)
|
||||||
|
|
||||||
log.Tracef("Processed network message %s, returned "+
|
log.Tracef("Processed network message %s, returned "+
|
||||||
"len(announcements)=%v, allowDependents=%v",
|
"len(announcements)=%v, allowDependents=%v",
|
||||||
@ -1681,7 +1693,9 @@ func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message,
|
|||||||
// stale iff, the last timestamp of its rebroadcast is older than the
|
// stale iff, the last timestamp of its rebroadcast is older than the
|
||||||
// RebroadcastInterval. We also check if a refreshed node announcement should
|
// RebroadcastInterval. We also check if a refreshed node announcement should
|
||||||
// be resent.
|
// be resent.
|
||||||
func (d *AuthenticatedGossiper) retransmitStaleAnns(now time.Time) error {
|
func (d *AuthenticatedGossiper) retransmitStaleAnns(ctx context.Context,
|
||||||
|
now time.Time) error {
|
||||||
|
|
||||||
// Iterate over all of our channels and check if any of them fall
|
// Iterate over all of our channels and check if any of them fall
|
||||||
// within the prune interval or re-broadcast interval.
|
// within the prune interval or re-broadcast interval.
|
||||||
type updateTuple struct {
|
type updateTuple struct {
|
||||||
@ -1753,7 +1767,7 @@ func (d *AuthenticatedGossiper) retransmitStaleAnns(now time.Time) error {
|
|||||||
// Re-sign and update the channel on disk and retrieve our
|
// Re-sign and update the channel on disk and retrieve our
|
||||||
// ChannelUpdate to broadcast.
|
// ChannelUpdate to broadcast.
|
||||||
chanAnn, chanUpdate, err := d.updateChannel(
|
chanAnn, chanUpdate, err := d.updateChannel(
|
||||||
chanToUpdate.info, chanToUpdate.edge,
|
ctx, chanToUpdate.info, chanToUpdate.edge,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to update channel: %w", err)
|
return fmt.Errorf("unable to update channel: %w", err)
|
||||||
@ -1794,7 +1808,7 @@ func (d *AuthenticatedGossiper) retransmitStaleAnns(now time.Time) error {
|
|||||||
|
|
||||||
// Before broadcasting the refreshed node announcement, add it
|
// Before broadcasting the refreshed node announcement, add it
|
||||||
// to our own graph.
|
// to our own graph.
|
||||||
if err := d.addNode(&newNodeAnn); err != nil {
|
if err := d.addNode(ctx, &newNodeAnn); err != nil {
|
||||||
log.Errorf("Unable to add refreshed node announcement "+
|
log.Errorf("Unable to add refreshed node announcement "+
|
||||||
"to graph: %v", err)
|
"to graph: %v", err)
|
||||||
}
|
}
|
||||||
@ -1820,7 +1834,7 @@ func (d *AuthenticatedGossiper) retransmitStaleAnns(now time.Time) error {
|
|||||||
|
|
||||||
// processChanPolicyUpdate generates a new set of channel updates for the
|
// processChanPolicyUpdate generates a new set of channel updates for the
|
||||||
// provided list of edges and updates the backing ChannelGraphSource.
|
// provided list of edges and updates the backing ChannelGraphSource.
|
||||||
func (d *AuthenticatedGossiper) processChanPolicyUpdate(
|
func (d *AuthenticatedGossiper) processChanPolicyUpdate(ctx context.Context,
|
||||||
edgesToUpdate []EdgeWithInfo) ([]networkMsg, error) {
|
edgesToUpdate []EdgeWithInfo) ([]networkMsg, error) {
|
||||||
|
|
||||||
var chanUpdates []networkMsg
|
var chanUpdates []networkMsg
|
||||||
@ -1829,7 +1843,7 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate(
|
|||||||
// we'll re-sign and update the backing ChannelGraphSource, and
|
// we'll re-sign and update the backing ChannelGraphSource, and
|
||||||
// retrieve our ChannelUpdate to broadcast.
|
// retrieve our ChannelUpdate to broadcast.
|
||||||
_, chanUpdate, err := d.updateChannel(
|
_, chanUpdate, err := d.updateChannel(
|
||||||
edgeInfo.Info, edgeInfo.Edge,
|
ctx, edgeInfo.Info, edgeInfo.Edge,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -1922,7 +1936,7 @@ func remotePubFromChanInfo(chanInfo *models.ChannelEdgeInfo,
|
|||||||
// situation in the case where we create a channel, but for some reason fail
|
// situation in the case where we create a channel, but for some reason fail
|
||||||
// to receive the remote peer's proof, while the remote peer is able to fully
|
// to receive the remote peer's proof, while the remote peer is able to fully
|
||||||
// assemble the proof and craft the ChannelAnnouncement.
|
// assemble the proof and craft the ChannelAnnouncement.
|
||||||
func (d *AuthenticatedGossiper) processRejectedEdge(
|
func (d *AuthenticatedGossiper) processRejectedEdge(_ context.Context,
|
||||||
chanAnnMsg *lnwire.ChannelAnnouncement1,
|
chanAnnMsg *lnwire.ChannelAnnouncement1,
|
||||||
proof *models.ChannelAuthProof) ([]networkMsg, error) {
|
proof *models.ChannelAuthProof) ([]networkMsg, error) {
|
||||||
|
|
||||||
@ -2010,8 +2024,8 @@ func (d *AuthenticatedGossiper) fetchPKScript(chanID *lnwire.ShortChannelID) (
|
|||||||
|
|
||||||
// addNode processes the given node announcement, and adds it to our channel
|
// addNode processes the given node announcement, and adds it to our channel
|
||||||
// graph.
|
// graph.
|
||||||
func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement,
|
func (d *AuthenticatedGossiper) addNode(_ context.Context,
|
||||||
op ...batch.SchedulerOption) error {
|
msg *lnwire.NodeAnnouncement, op ...batch.SchedulerOption) error {
|
||||||
|
|
||||||
if err := netann.ValidateNodeAnn(msg); err != nil {
|
if err := netann.ValidateNodeAnn(msg); err != nil {
|
||||||
return fmt.Errorf("unable to validate node announcement: %w",
|
return fmt.Errorf("unable to validate node announcement: %w",
|
||||||
@ -2086,7 +2100,7 @@ func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
|
|||||||
// be returned which should be broadcasted to the rest of the network. The
|
// be returned which should be broadcasted to the rest of the network. The
|
||||||
// boolean returned indicates whether any dependents of the announcement should
|
// boolean returned indicates whether any dependents of the announcement should
|
||||||
// attempt to be processed as well.
|
// attempt to be processed as well.
|
||||||
func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
func (d *AuthenticatedGossiper) processNetworkAnnouncement(ctx context.Context,
|
||||||
nMsg *networkMsg) ([]networkMsg, bool) {
|
nMsg *networkMsg) ([]networkMsg, bool) {
|
||||||
|
|
||||||
// If this is a remote update, we set the scheduler option to lazily
|
// If this is a remote update, we set the scheduler option to lazily
|
||||||
@ -2101,26 +2115,26 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
|||||||
// information about a node in one of the channels we know about, or a
|
// information about a node in one of the channels we know about, or a
|
||||||
// updating previously advertised information.
|
// updating previously advertised information.
|
||||||
case *lnwire.NodeAnnouncement:
|
case *lnwire.NodeAnnouncement:
|
||||||
return d.handleNodeAnnouncement(nMsg, msg, schedulerOp)
|
return d.handleNodeAnnouncement(ctx, nMsg, msg, schedulerOp)
|
||||||
|
|
||||||
// A new channel announcement has arrived, this indicates the
|
// A new channel announcement has arrived, this indicates the
|
||||||
// *creation* of a new channel within the network. This only advertises
|
// *creation* of a new channel within the network. This only advertises
|
||||||
// the existence of a channel and not yet the routing policies in
|
// the existence of a channel and not yet the routing policies in
|
||||||
// either direction of the channel.
|
// either direction of the channel.
|
||||||
case *lnwire.ChannelAnnouncement1:
|
case *lnwire.ChannelAnnouncement1:
|
||||||
return d.handleChanAnnouncement(nMsg, msg, schedulerOp...)
|
return d.handleChanAnnouncement(ctx, nMsg, msg, schedulerOp...)
|
||||||
|
|
||||||
// A new authenticated channel edge update has arrived. This indicates
|
// A new authenticated channel edge update has arrived. This indicates
|
||||||
// that the directional information for an already known channel has
|
// that the directional information for an already known channel has
|
||||||
// been updated.
|
// been updated.
|
||||||
case *lnwire.ChannelUpdate1:
|
case *lnwire.ChannelUpdate1:
|
||||||
return d.handleChanUpdate(nMsg, msg, schedulerOp)
|
return d.handleChanUpdate(ctx, nMsg, msg, schedulerOp)
|
||||||
|
|
||||||
// A new signature announcement has been received. This indicates
|
// A new signature announcement has been received. This indicates
|
||||||
// willingness of nodes involved in the funding of a channel to
|
// willingness of nodes involved in the funding of a channel to
|
||||||
// announce this new channel to the rest of the world.
|
// announce this new channel to the rest of the world.
|
||||||
case *lnwire.AnnounceSignatures1:
|
case *lnwire.AnnounceSignatures1:
|
||||||
return d.handleAnnSig(nMsg, msg)
|
return d.handleAnnSig(ctx, nMsg, msg)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
err := errors.New("wrong type of the announcement")
|
err := errors.New("wrong type of the announcement")
|
||||||
@ -2134,7 +2148,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
|||||||
//
|
//
|
||||||
// NOTE: only the NodeKey1Bytes and NodeKey2Bytes members of the ChannelEdgeInfo
|
// NOTE: only the NodeKey1Bytes and NodeKey2Bytes members of the ChannelEdgeInfo
|
||||||
// should be inspected.
|
// should be inspected.
|
||||||
func (d *AuthenticatedGossiper) processZombieUpdate(
|
func (d *AuthenticatedGossiper) processZombieUpdate(_ context.Context,
|
||||||
chanInfo *models.ChannelEdgeInfo, scid lnwire.ShortChannelID,
|
chanInfo *models.ChannelEdgeInfo, scid lnwire.ShortChannelID,
|
||||||
msg *lnwire.ChannelUpdate1) error {
|
msg *lnwire.ChannelUpdate1) error {
|
||||||
|
|
||||||
@ -2192,7 +2206,7 @@ func (d *AuthenticatedGossiper) processZombieUpdate(
|
|||||||
|
|
||||||
// fetchNodeAnn fetches the latest signed node announcement from our point of
|
// fetchNodeAnn fetches the latest signed node announcement from our point of
|
||||||
// view for the node with the given public key.
|
// view for the node with the given public key.
|
||||||
func (d *AuthenticatedGossiper) fetchNodeAnn(
|
func (d *AuthenticatedGossiper) fetchNodeAnn(_ context.Context,
|
||||||
pubKey [33]byte) (*lnwire.NodeAnnouncement, error) {
|
pubKey [33]byte) (*lnwire.NodeAnnouncement, error) {
|
||||||
|
|
||||||
node, err := d.cfg.Graph.FetchLightningNode(pubKey)
|
node, err := d.cfg.Graph.FetchLightningNode(pubKey)
|
||||||
@ -2205,7 +2219,9 @@ func (d *AuthenticatedGossiper) fetchNodeAnn(
|
|||||||
|
|
||||||
// isMsgStale determines whether a message retrieved from the backing
|
// isMsgStale determines whether a message retrieved from the backing
|
||||||
// MessageStore is seen as stale by the current graph.
|
// MessageStore is seen as stale by the current graph.
|
||||||
func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool {
|
func (d *AuthenticatedGossiper) isMsgStale(_ context.Context,
|
||||||
|
msg lnwire.Message) bool {
|
||||||
|
|
||||||
switch msg := msg.(type) {
|
switch msg := msg.(type) {
|
||||||
case *lnwire.AnnounceSignatures1:
|
case *lnwire.AnnounceSignatures1:
|
||||||
chanInfo, _, _, err := d.cfg.Graph.GetChannelByID(
|
chanInfo, _, _, err := d.cfg.Graph.GetChannelByID(
|
||||||
@ -2272,7 +2288,8 @@ func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool {
|
|||||||
|
|
||||||
// updateChannel creates a new fully signed update for the channel, and updates
|
// updateChannel creates a new fully signed update for the channel, and updates
|
||||||
// the underlying graph with the new state.
|
// the underlying graph with the new state.
|
||||||
func (d *AuthenticatedGossiper) updateChannel(info *models.ChannelEdgeInfo,
|
func (d *AuthenticatedGossiper) updateChannel(_ context.Context,
|
||||||
|
info *models.ChannelEdgeInfo,
|
||||||
edge *models.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement1,
|
edge *models.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement1,
|
||||||
*lnwire.ChannelUpdate1, error) {
|
*lnwire.ChannelUpdate1, error) {
|
||||||
|
|
||||||
@ -2414,8 +2431,8 @@ func (d *AuthenticatedGossiper) latestHeight() uint32 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// handleNodeAnnouncement processes a new node announcement.
|
// handleNodeAnnouncement processes a new node announcement.
|
||||||
func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
|
func (d *AuthenticatedGossiper) handleNodeAnnouncement(ctx context.Context,
|
||||||
nodeAnn *lnwire.NodeAnnouncement,
|
nMsg *networkMsg, nodeAnn *lnwire.NodeAnnouncement,
|
||||||
ops []batch.SchedulerOption) ([]networkMsg, bool) {
|
ops []batch.SchedulerOption) ([]networkMsg, bool) {
|
||||||
|
|
||||||
timestamp := time.Unix(int64(nodeAnn.Timestamp), 0)
|
timestamp := time.Unix(int64(nodeAnn.Timestamp), 0)
|
||||||
@ -2432,7 +2449,7 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
|
|||||||
return nil, true
|
return nil, true
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := d.addNode(nodeAnn, ops...); err != nil {
|
if err := d.addNode(ctx, nodeAnn, ops...); err != nil {
|
||||||
log.Debugf("Adding node: %x got error: %v", nodeAnn.NodeID,
|
log.Debugf("Adding node: %x got error: %v", nodeAnn.NodeID,
|
||||||
err)
|
err)
|
||||||
|
|
||||||
@ -2487,8 +2504,10 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// handleChanAnnouncement processes a new channel announcement.
|
// handleChanAnnouncement processes a new channel announcement.
|
||||||
func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
|
//
|
||||||
ann *lnwire.ChannelAnnouncement1,
|
//nolint:funlen
|
||||||
|
func (d *AuthenticatedGossiper) handleChanAnnouncement(ctx context.Context,
|
||||||
|
nMsg *networkMsg, ann *lnwire.ChannelAnnouncement1,
|
||||||
ops ...batch.SchedulerOption) ([]networkMsg, bool) {
|
ops ...batch.SchedulerOption) ([]networkMsg, bool) {
|
||||||
|
|
||||||
scid := ann.ShortChannelID
|
scid := ann.ShortChannelID
|
||||||
@ -2680,7 +2699,7 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
|
|||||||
// add an alias ChannelAnnouncement from the gossiper.
|
// add an alias ChannelAnnouncement from the gossiper.
|
||||||
if !(d.cfg.AssumeChannelValid || d.cfg.IsAlias(scid)) { //nolint:nestif
|
if !(d.cfg.AssumeChannelValid || d.cfg.IsAlias(scid)) { //nolint:nestif
|
||||||
op, capacity, script, err := d.validateFundingTransaction(
|
op, capacity, script, err := d.validateFundingTransaction(
|
||||||
ann, tapscriptRoot,
|
ctx, ann, tapscriptRoot,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
defer d.channelMtx.Unlock(scid.ToUint64())
|
defer d.channelMtx.Unlock(scid.ToUint64())
|
||||||
@ -2802,7 +2821,7 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
|
|||||||
if graph.IsError(err, graph.ErrIgnored) {
|
if graph.IsError(err, graph.ErrIgnored) {
|
||||||
// Attempt to process the rejected message to see if we
|
// Attempt to process the rejected message to see if we
|
||||||
// get any new announcements.
|
// get any new announcements.
|
||||||
anns, rErr := d.processRejectedEdge(ann, proof)
|
anns, rErr := d.processRejectedEdge(ctx, ann, proof)
|
||||||
if rErr != nil {
|
if rErr != nil {
|
||||||
key := newRejectCacheKey(
|
key := newRejectCacheKey(
|
||||||
scid.ToUint64(),
|
scid.ToUint64(),
|
||||||
@ -2945,8 +2964,10 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// handleChanUpdate processes a new channel update.
|
// handleChanUpdate processes a new channel update.
|
||||||
func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
|
//
|
||||||
upd *lnwire.ChannelUpdate1,
|
//nolint:funlen
|
||||||
|
func (d *AuthenticatedGossiper) handleChanUpdate(ctx context.Context,
|
||||||
|
nMsg *networkMsg, upd *lnwire.ChannelUpdate1,
|
||||||
ops []batch.SchedulerOption) ([]networkMsg, bool) {
|
ops []batch.SchedulerOption) ([]networkMsg, bool) {
|
||||||
|
|
||||||
log.Debugf("Processing ChannelUpdate: peer=%v, short_chan_id=%v, ",
|
log.Debugf("Processing ChannelUpdate: peer=%v, short_chan_id=%v, ",
|
||||||
@ -3052,7 +3073,7 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
|
|||||||
break
|
break
|
||||||
|
|
||||||
case errors.Is(err, graphdb.ErrZombieEdge):
|
case errors.Is(err, graphdb.ErrZombieEdge):
|
||||||
err = d.processZombieUpdate(chanInfo, graphScid, upd)
|
err = d.processZombieUpdate(ctx, chanInfo, graphScid, upd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debug(err)
|
log.Debug(err)
|
||||||
nMsg.err <- err
|
nMsg.err <- err
|
||||||
@ -3346,8 +3367,11 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// handleAnnSig processes a new announcement signatures message.
|
// handleAnnSig processes a new announcement signatures message.
|
||||||
func (d *AuthenticatedGossiper) handleAnnSig(nMsg *networkMsg,
|
//
|
||||||
ann *lnwire.AnnounceSignatures1) ([]networkMsg, bool) {
|
//nolint:funlen
|
||||||
|
func (d *AuthenticatedGossiper) handleAnnSig(ctx context.Context,
|
||||||
|
nMsg *networkMsg, ann *lnwire.AnnounceSignatures1) ([]networkMsg,
|
||||||
|
bool) {
|
||||||
|
|
||||||
needBlockHeight := ann.ShortChannelID.BlockHeight +
|
needBlockHeight := ann.ShortChannelID.BlockHeight +
|
||||||
d.cfg.ProofMatureDelta
|
d.cfg.ProofMatureDelta
|
||||||
@ -3631,7 +3655,7 @@ func (d *AuthenticatedGossiper) handleAnnSig(nMsg *networkMsg,
|
|||||||
// it since the source gets skipped. This isn't necessary for channel
|
// it since the source gets skipped. This isn't necessary for channel
|
||||||
// updates and announcement signatures since we send those directly to
|
// updates and announcement signatures since we send those directly to
|
||||||
// our channel counterparty through the gossiper's reliable sender.
|
// our channel counterparty through the gossiper's reliable sender.
|
||||||
node1Ann, err := d.fetchNodeAnn(chanInfo.NodeKey1Bytes)
|
node1Ann, err := d.fetchNodeAnn(ctx, chanInfo.NodeKey1Bytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debugf("Unable to fetch node announcement for %x: %v",
|
log.Debugf("Unable to fetch node announcement for %x: %v",
|
||||||
chanInfo.NodeKey1Bytes, err)
|
chanInfo.NodeKey1Bytes, err)
|
||||||
@ -3645,7 +3669,7 @@ func (d *AuthenticatedGossiper) handleAnnSig(nMsg *networkMsg,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
node2Ann, err := d.fetchNodeAnn(chanInfo.NodeKey2Bytes)
|
node2Ann, err := d.fetchNodeAnn(ctx, chanInfo.NodeKey2Bytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debugf("Unable to fetch node announcement for %x: %v",
|
log.Debugf("Unable to fetch node announcement for %x: %v",
|
||||||
chanInfo.NodeKey2Bytes, err)
|
chanInfo.NodeKey2Bytes, err)
|
||||||
@ -3700,7 +3724,7 @@ func (d *AuthenticatedGossiper) ShouldDisconnect(pubkey *btcec.PublicKey) (
|
|||||||
// transaction from chain to ensure that it exists, is not spent and matches
|
// transaction from chain to ensure that it exists, is not spent and matches
|
||||||
// the channel announcement proof. The transaction's outpoint and value are
|
// the channel announcement proof. The transaction's outpoint and value are
|
||||||
// returned if we can glean them from the work done in this method.
|
// returned if we can glean them from the work done in this method.
|
||||||
func (d *AuthenticatedGossiper) validateFundingTransaction(
|
func (d *AuthenticatedGossiper) validateFundingTransaction(_ context.Context,
|
||||||
ann *lnwire.ChannelAnnouncement1,
|
ann *lnwire.ChannelAnnouncement1,
|
||||||
tapscriptRoot fn.Option[chainhash.Hash]) (wire.OutPoint, btcutil.Amount,
|
tapscriptRoot fn.Option[chainhash.Hash]) (wire.OutPoint, btcutil.Amount,
|
||||||
[]byte, error) {
|
[]byte, error) {
|
||||||
|
@ -2,6 +2,7 @@ package discovery
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
prand "math/rand"
|
prand "math/rand"
|
||||||
@ -993,7 +994,7 @@ func createTestCtx(t *testing.T, startHeight uint32, isChanPeer bool) (
|
|||||||
ScidCloser: newMockScidCloser(isChanPeer),
|
ScidCloser: newMockScidCloser(isChanPeer),
|
||||||
}, selfKeyDesc)
|
}, selfKeyDesc)
|
||||||
|
|
||||||
if err := gossiper.Start(); err != nil {
|
if err := gossiper.Start(context.Background()); err != nil {
|
||||||
return nil, fmt.Errorf("unable to start router: %w", err)
|
return nil, fmt.Errorf("unable to start router: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1680,7 +1681,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
|
|||||||
KeyLocator: ctx.gossiper.selfKeyLoc,
|
KeyLocator: ctx.gossiper.selfKeyLoc,
|
||||||
})
|
})
|
||||||
require.NoError(t, err, "unable to recreate gossiper")
|
require.NoError(t, err, "unable to recreate gossiper")
|
||||||
if err := gossiper.Start(); err != nil {
|
if err := gossiper.Start(context.Background()); err != nil {
|
||||||
t.Fatalf("unable to start recreated gossiper: %v", err)
|
t.Fatalf("unable to start recreated gossiper: %v", err)
|
||||||
}
|
}
|
||||||
defer gossiper.Stop()
|
defer gossiper.Stop()
|
||||||
@ -4756,7 +4757,9 @@ func assertChanChainRejection(t *testing.T, ctx *testCtx,
|
|||||||
err: errChan,
|
err: errChan,
|
||||||
}
|
}
|
||||||
|
|
||||||
_, added := ctx.gossiper.handleChanAnnouncement(nMsg, edge)
|
_, added := ctx.gossiper.handleChanAnnouncement(
|
||||||
|
context.Background(), nMsg, edge,
|
||||||
|
)
|
||||||
require.False(t, added)
|
require.False(t, added)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
|
@ -2219,7 +2219,7 @@ func (s *server) startLowLevelServices() error {
|
|||||||
// NOTE: This function is safe for concurrent access.
|
// NOTE: This function is safe for concurrent access.
|
||||||
//
|
//
|
||||||
//nolint:funlen
|
//nolint:funlen
|
||||||
func (s *server) Start(_ context.Context) error {
|
func (s *server) Start(ctx context.Context) error {
|
||||||
// Get the current blockbeat.
|
// Get the current blockbeat.
|
||||||
beat, err := s.getStartingBeat()
|
beat, err := s.getStartingBeat()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -2390,7 +2390,7 @@ func (s *server) Start(_ context.Context) error {
|
|||||||
// The authGossiper depends on the chanRouter and therefore
|
// The authGossiper depends on the chanRouter and therefore
|
||||||
// should be started after it.
|
// should be started after it.
|
||||||
cleanup = cleanup.add(s.authGossiper.Stop)
|
cleanup = cleanup.add(s.authGossiper.Stop)
|
||||||
if err := s.authGossiper.Start(); err != nil {
|
if err := s.authGossiper.Start(ctx); err != nil {
|
||||||
startErr = err
|
startErr = err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user