From 26d1a41926215238c2297af5680268bbf0ef6610 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Sun, 20 Nov 2022 07:59:16 +0800 Subject: [PATCH] discovery+routing: add more logs to reveal channel update flow --- discovery/gossiper.go | 70 ++++++++++++++++++++++++++++++++----------- routing/router.go | 28 ++++++++++++++--- 2 files changed, 76 insertions(+), 22 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index bd2d3b605..f3c497025 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1242,6 +1242,9 @@ func (d *AuthenticatedGossiper) networkHandler() { announcement.msg, ) if err != nil { + log.Debugf("Validating network message %s got err: %v", + announcement.msg.MsgType(), err) + if !routing.IsError( err, routing.ErrVBarrierShuttingDown, @@ -1824,10 +1827,6 @@ func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID, func (d *AuthenticatedGossiper) processNetworkAnnouncement( nMsg *networkMsg) ([]networkMsg, bool) { - log.Debugf("Processing network message: peer=%v, source=%x, msg=%s, "+ - "is_remote=%v", nMsg.peer, nMsg.source.SerializeCompressed(), - nMsg.msg.MsgType(), nMsg.isRemote) - // If this is a remote update, we set the scheduler option to lazily // add it to the graph. var schedulerOp []batch.SchedulerOption @@ -1947,7 +1946,7 @@ func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool { } if err != nil { log.Debugf("Unable to retrieve channel=%v from graph: "+ - "%v", err) + "%v", chanInfo.ChannelID, err) return false } @@ -2145,6 +2144,9 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg, timestamp := time.Unix(int64(nodeAnn.Timestamp), 0) + log.Debugf("Processing NodeAnnouncement: peer=%v, timestamp=%v, "+ + "node=%x", nMsg.peer, timestamp, nodeAnn.NodeID) + // We'll quickly ask the router if it already has a newer update for // this node so we can skip validating signatures if not required. if d.cfg.Router.IsStaleNode(nodeAnn.NodeID, timestamp) { @@ -2199,6 +2201,10 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg, nMsg.err <- nil // TODO(roasbeef): get rid of the above + + log.Debugf("Processed NodeAnnouncement: peer=%v, timestamp=%v, "+ + "node=%x", nMsg.peer, timestamp, nodeAnn.NodeID) + return announcements, true } @@ -2207,6 +2213,9 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, ann *lnwire.ChannelAnnouncement, ops []batch.SchedulerOption) ([]networkMsg, bool) { + log.Debugf("Processing ChannelAnnouncement: peer=%v, short_chan_id=%v", + nMsg.peer, ann.ShortChannelID.ToUint64()) + // We'll ignore any channel announcements that target any chain other // than the set of chains we know of. if !bytes.Equal(ann.ChainHash[:], d.cfg.ChainHash[:]) { @@ -2327,6 +2336,9 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, } } + log.Debugf("Adding edge for short_chan_id: %v", + ann.ShortChannelID.ToUint64()) + // We will add the edge to the channel router. If the nodes present in // this channel are not present in the database, a partial node will be // added to represent each node while we wait for a node announcement. @@ -2338,6 +2350,9 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, d.channelMtx.Lock(ann.ShortChannelID.ToUint64()) err := d.cfg.Router.AddEdge(edge, ops...) if err != nil { + log.Debugf("Router rejected edge for short_chan_id(%v): %v", + ann.ShortChannelID.ToUint64(), err) + defer d.channelMtx.Unlock(ann.ShortChannelID.ToUint64()) // If the edge was rejected due to already being known, then it @@ -2359,6 +2374,9 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, return nil, false } + log.Debugf("Extracted %v announcements from rejected "+ + "msgs", len(anns)) + // If while processing this rejected edge, we realized // there's a set of announcements we could extract, // then we'll return those directly. @@ -2367,11 +2385,8 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, return anns, true } - // Otherwise, this is just a regular rejected edge. - log.Debugf("Router rejected channel edge: %v", err) } else { - log.Debugf("Router rejected channel edge: %v", err) - + // Otherwise, this is just a regular rejected edge. key := newRejectCacheKey( ann.ShortChannelID.ToUint64(), sourceToPub(nMsg.source), @@ -2386,6 +2401,9 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, // If err is nil, release the lock immediately. d.channelMtx.Unlock(ann.ShortChannelID.ToUint64()) + log.Debugf("Finish adding edge for short_chan_id: %v", + ann.ShortChannelID.ToUint64()) + // If we earlier received any ChannelUpdates for this channel, we can // now process them, as the channel is added to the graph. shortChanID := ann.ShortChannelID.ToUint64() @@ -2456,6 +2474,10 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, } nMsg.err <- nil + + log.Debugf("Processed ChannelAnnouncement: peer=%v, short_chan_id=%v", + nMsg.peer, ann.ShortChannelID.ToUint64()) + return announcements, true } @@ -2464,6 +2486,9 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg, upd *lnwire.ChannelUpdate, ops []batch.SchedulerOption) ([]networkMsg, bool) { + log.Debugf("Processing ChannelUpdate: peer=%v, short_chan_id=%v, ", + nMsg.peer, upd.ShortChannelID.ToUint64()) + // We'll ignore any channel updates that target any chain other than // the set of chains we know of. if !bytes.Equal(upd.ChainHash[:], d.cfg.ChainHash[:]) { @@ -2523,10 +2548,10 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg, graphScid, timestamp, upd.ChannelFlags, ) { - log.Debugf("Ignored stale edge policy: peer=%v, source=%x, "+ - "msg=%s, is_remote=%v", nMsg.peer, - nMsg.source.SerializeCompressed(), nMsg.msg.MsgType(), - nMsg.isRemote, + log.Debugf("Ignored stale edge policy for short_chan_id(%v): "+ + "peer=%v, source=%x, msg=%s, is_remote=%v", shortChanID, + nMsg.peer, nMsg.source.SerializeCompressed(), + nMsg.msg.MsgType(), nMsg.isRemote, ) nMsg.err <- nil @@ -2649,6 +2674,10 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg, edgeToUpdate = e2 } + log.Debugf("Validating ChannelUpdate: channel=%v, from node=%x, has "+ + "edge=%v", chanInfo.ChannelID, pubKey.SerializeCompressed(), + edgeToUpdate != nil) + // Validate the channel announcement with the expected public key and // channel capacity. In the case of an invalid channel update, we'll // return an error to the caller and exit early. @@ -2743,7 +2772,8 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg, routing.ErrVBarrierShuttingDown, ) { - log.Debug(err) + log.Debugf("Update edge for short_chan_id(%v) got: %v", + shortChanID, err) } else { // Since we know the stored SCID in the graph, we'll // cache that SCID. @@ -2753,7 +2783,8 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg, ) _, _ = d.recentRejects.Put(key, &cachedReject{}) - log.Error(err) + log.Errorf("Update edge for short_chan_id(%v) got: %v", + shortChanID, err) } nMsg.err <- err @@ -2801,8 +2832,7 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg, ) log.Debugf("The message %v has no AuthProof, sending the "+ - "update to remote peer %x", upd.MsgType(), - remotePubKey) + "update to remote peer %x", upd.MsgType(), remotePubKey) // Now we'll attempt to send the channel update message // reliably to the remote peer in the background, so that we @@ -2832,6 +2862,10 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg, } nMsg.err <- nil + + log.Debugf("Processed ChannelUpdate: peer=%v, short_chan_id=%v, "+ + "timestamp=%v", nMsg.peer, upd.ShortChannelID.ToUint64(), + timestamp) return announcements, true } @@ -2848,7 +2882,7 @@ func (d *AuthenticatedGossiper) handleAnnSig(nMsg *networkMsg, prefix = "remote" } - log.Infof("Received new %v channel announcement for %v", prefix, + log.Infof("Received new %v announcement signature for %v", prefix, ann.ShortChannelID) // By the specification, channel announcement proofs should be sent diff --git a/routing/router.go b/routing/router.go index 01d4b5022..8d61a25f2 100644 --- a/routing/router.go +++ b/routing/router.go @@ -1099,6 +1099,8 @@ func (r *ChannelRouter) networkHandler() { update.msg, allowDependents, ) if err != nil { + log.Debugf("process network updates "+ + "got: %v", err) return } @@ -1439,6 +1441,9 @@ func (r *ChannelRouter) processUpdate(msg interface{}, r.stats.incNumNodeUpdates() case *channeldb.ChannelEdgeInfo: + log.Debugf("Received ChannelEdgeInfo for channel %v", + msg.ChannelID) + // Prior to processing the announcement we first check if we // already know of this channel, if so, then we can exit early. _, _, exists, isZombie, err := r.cfg.Graph.HasChannelEdge( @@ -1584,7 +1589,7 @@ func (r *ChannelRouter) processUpdate(msg interface{}, return errors.Errorf("unable to add edge: %v", err) } - log.Tracef("New channel discovered! Link "+ + log.Debugf("New channel discovered! Link "+ "connects %x and %x with ChannelPoint(%v): "+ "chan_id=%v, capacity=%v", msg.NodeKey1Bytes, msg.NodeKey2Bytes, @@ -1610,6 +1615,9 @@ func (r *ChannelRouter) processUpdate(msg interface{}, } case *channeldb.ChannelEdgePolicy: + log.Debugf("Received ChannelEdgePolicy for channel %v", + msg.ChannelID) + // We make sure to hold the mutex for this channel ID, // such that no other goroutine is concurrently doing // database accesses for the same channel ID. @@ -1685,7 +1693,7 @@ func (r *ChannelRouter) processUpdate(msg interface{}, return err } - log.Tracef("New channel update applied: %v", + log.Debugf("New channel update applied: %v", newLogClosure(func() string { return spew.Sdump(msg) })) r.stats.incNumChannelUpdates() @@ -2608,10 +2616,18 @@ func (r *ChannelRouter) AddProof(chanID lnwire.ShortChannelID, // target node with a more recent timestamp. // // NOTE: This method is part of the ChannelGraphSource interface. -func (r *ChannelRouter) IsStaleNode(node route.Vertex, timestamp time.Time) bool { +func (r *ChannelRouter) IsStaleNode(node route.Vertex, + timestamp time.Time) bool { + // If our attempt to assert that the node announcement is fresh fails, // then we know that this is actually a stale announcement. - return r.assertNodeAnnFreshness(node, timestamp) != nil + err := r.assertNodeAnnFreshness(node, timestamp) + if err != nil { + log.Debugf("Checking stale node %x got %v", node, err) + return true + } + + return false } // IsPublicNode determines whether the given vertex is seen as a public node in @@ -2641,6 +2657,7 @@ func (r *ChannelRouter) IsStaleEdgePolicy(chanID lnwire.ShortChannelID, edge1Timestamp, edge2Timestamp, exists, isZombie, err := r.cfg.Graph.HasChannelEdge(chanID.ToUint64()) if err != nil { + log.Debugf("Check stale edge policy got error: %v", err) return false } @@ -2788,6 +2805,7 @@ func (r *ChannelRouter) BuildRoute(amt *lnwire.MilliSatoshi, // Exit if there are no channels. unifiedPolicy, ok := u.policies[fromNode] if !ok { + log.Errorf("Cannot find policy for node %v", fromNode) return nil, ErrNoChannel{ fromNode: fromNode, position: i, @@ -2806,6 +2824,8 @@ func (r *ChannelRouter) BuildRoute(amt *lnwire.MilliSatoshi, // to forward. policy := unifiedPolicy.getPolicy(runningAmt, bandwidthHints) if policy == nil { + log.Errorf("Cannot find policy with amt=%v for node %v", + runningAmt, fromNode) return nil, ErrNoChannel{ fromNode: fromNode, position: i,