discovery+routing: update to use ChanUpdate interface

In the IsKeepAlive and IsStaleEdgePolicy functions
This commit is contained in:
Elle Mouton 2023-11-07 12:41:58 +02:00
parent cdcf0ac16b
commit 58d45188fe
No known key found for this signature in database
GPG Key ID: D7D916376026F177
5 changed files with 238 additions and 98 deletions

View File

@ -2403,48 +2403,114 @@ func (d *AuthenticatedGossiper) SyncManager() *SyncManager {
// IsKeepAliveUpdate determines whether this channel update is considered a
// keep-alive update based on the previous channel update processed for the same
// direction.
func IsKeepAliveUpdate(update *lnwire.ChannelUpdate1,
prev *models.ChannelEdgePolicy1) bool {
func IsKeepAliveUpdate(update lnwire.ChannelUpdate,
prevPolicy models.ChannelEdgePolicy) (bool, error) {
// Both updates should be from the same direction.
if update.ChannelFlags&lnwire.ChanUpdateDirection !=
prev.ChannelFlags&lnwire.ChanUpdateDirection {
switch upd := update.(type) {
case *lnwire.ChannelUpdate1:
prev, ok := prevPolicy.(*models.ChannelEdgePolicy1)
if !ok {
return false, fmt.Errorf("expected chan edge policy 1")
}
return false
}
// Both updates should be from the same direction.
if upd.ChannelFlags&lnwire.ChanUpdateDirection !=
prev.ChannelFlags&lnwire.ChanUpdateDirection {
// The timestamp should always increase for a keep-alive update.
timestamp := time.Unix(int64(update.Timestamp), 0)
if !timestamp.After(prev.LastUpdate) {
return false
}
return false, nil
}
// None of the remaining fields should change for a keep-alive update.
if update.ChannelFlags.IsDisabled() != prev.ChannelFlags.IsDisabled() {
return false
// The timestamp should always increase for a keep-alive update.
timestamp := time.Unix(int64(upd.Timestamp), 0)
if !timestamp.After(prev.LastUpdate) {
return false, nil
}
// None of the remaining fields should change for a keep-alive
// update.
if upd.ChannelFlags.IsDisabled() !=
prev.ChannelFlags.IsDisabled() {
return false, nil
}
if lnwire.MilliSatoshi(upd.BaseFee) != prev.FeeBaseMSat {
return false, nil
}
if lnwire.MilliSatoshi(upd.FeeRate) !=
prev.FeeProportionalMillionths {
return false, nil
}
if upd.TimeLockDelta != prev.TimeLockDelta {
return false, nil
}
if upd.HtlcMinimumMsat != prev.MinHTLC {
return false, nil
}
if upd.MessageFlags.HasMaxHtlc() &&
!prev.MessageFlags.HasMaxHtlc() {
return false, nil
}
if upd.HtlcMaximumMsat != prev.MaxHTLC {
return false, nil
}
if !bytes.Equal(upd.ExtraOpaqueData, prev.ExtraOpaqueData) {
return false, nil
}
return true, nil
case *lnwire.ChannelUpdate2:
prev, ok := prevPolicy.(*models.ChannelEdgePolicy2)
if !ok {
return false, fmt.Errorf("expected chan edge policy 2")
}
// Both updates should be from the same direction.
if upd.IsNode1() != prev.IsNode1() {
return false, nil
}
// The block-height should always increase for a keep-alive
// update.
if upd.BlockHeight.Val <= prev.BlockHeight.Val {
return false, nil
}
// None of the remaining fields should change for a keep-alive
// update.
if upd.IsDisabled() != prev.IsDisabled() {
return false, nil
}
fwd := upd.ForwardingPolicy()
prevFwd := upd.ForwardingPolicy()
if fwd.BaseFee != prevFwd.BaseFee {
return false, nil
}
if fwd.FeeRate != prevFwd.FeeRate {
return false, nil
}
if fwd.TimeLockDelta != prevFwd.TimeLockDelta {
return false, nil
}
if fwd.MinHTLC != prevFwd.MinHTLC {
return false, nil
}
if fwd.MaxHTLC != prevFwd.MinHTLC {
return false, nil
}
if !bytes.Equal(upd.ExtraOpaqueData, prev.ExtraOpaqueData) {
return false, nil
}
return true, nil
default:
return false, fmt.Errorf("unhandled implementation of "+
"ChannelUpdate: %T", update)
}
if lnwire.MilliSatoshi(update.BaseFee) != prev.FeeBaseMSat {
return false
}
if lnwire.MilliSatoshi(update.FeeRate) != prev.FeeProportionalMillionths {
return false
}
if update.TimeLockDelta != prev.TimeLockDelta {
return false
}
if update.HtlcMinimumMsat != prev.MinHTLC {
return false
}
if update.MessageFlags.HasMaxHtlc() && !prev.MessageFlags.HasMaxHtlc() {
return false
}
if update.HtlcMaximumMsat != prev.MaxHTLC {
return false
}
if !bytes.Equal(update.ExtraOpaqueData, prev.ExtraOpaqueData) {
return false
}
return true
}
// latestHeight returns the gossiper's latest height known of the chain.
@ -2968,16 +3034,14 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
graphScid = upd.ShortChannelID
}
if d.cfg.Graph.IsStaleEdgePolicy(
graphScid, timestamp, upd.ChannelFlags,
) {
if d.cfg.Graph.IsStaleEdgePolicy(graphScid, upd) {
log.Debugf("Ignored stale edge policy for short_chan_id(%v): "+
"peer=%v, msg=%s, is_remote=%v", shortChanID,
nMsg.peer, nMsg.msg.MsgType(), nMsg.isRemote,
)
nMsg.err <- nil
return nil, true
}
@ -3159,7 +3223,16 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
// heuristic of sending keep-alive updates after the same
// duration (see retransmitStaleAnns).
timeSinceLastUpdate := timestamp.Sub(edge.LastUpdate)
if IsKeepAliveUpdate(upd, edge) {
isKeepAlive, err := IsKeepAliveUpdate(upd, edge)
if err != nil {
log.Errorf("Could not determine if update is "+
"keepalive: %v", err)
nMsg.err <- err
return nil, false
}
if isKeepAlive {
if timeSinceLastUpdate < d.cfg.RebroadcastInterval {
log.Debugf("Ignoring keep alive update not "+
"within %v period for channel %v",

View File

@ -359,11 +359,18 @@ func (r *mockGraphSource) IsKnownEdge(chanID lnwire.ShortChannelID) bool {
// IsStaleEdgePolicy returns true if the graph source has a channel edge for
// the passed channel ID (and flags) that have a more recent timestamp.
func (r *mockGraphSource) IsStaleEdgePolicy(chanID lnwire.ShortChannelID,
timestamp time.Time, flags lnwire.ChanUpdateChanFlags) bool {
policy lnwire.ChannelUpdate) bool {
r.mu.Lock()
defer r.mu.Unlock()
pol, ok := policy.(*lnwire.ChannelUpdate1)
if !ok {
panic("expected chan update 1")
}
timestamp := time.Unix(int64(pol.Timestamp), 0)
chanIDInt := chanID.ToUint64()
edges, ok := r.edges[chanIDInt]
if !ok {
@ -373,7 +380,6 @@ func (r *mockGraphSource) IsStaleEdgePolicy(chanID lnwire.ShortChannelID,
if !isZombie {
return false
}
// Since it exists within our zombie index, we'll check that it
// respects the router's live edge horizon to determine whether
// it is stale or not.
@ -381,7 +387,7 @@ func (r *mockGraphSource) IsStaleEdgePolicy(chanID lnwire.ShortChannelID,
}
switch {
case flags&lnwire.ChanUpdateDirection == 0 && edges[0] != nil:
case policy.IsNode1() && edges[0] != nil:
switch edge := edges[0].(type) {
case *models.ChannelEdgePolicy1:
return !timestamp.After(edge.LastUpdate)
@ -389,7 +395,7 @@ func (r *mockGraphSource) IsStaleEdgePolicy(chanID lnwire.ShortChannelID,
panic(fmt.Sprintf("unhandled: %T", edges[0]))
}
case flags&lnwire.ChanUpdateDirection == 1 && edges[1] != nil:
case !policy.IsNode1() && edges[1] != nil:
switch edge := edges[1].(type) {
case *models.ChannelEdgePolicy1:
return !timestamp.After(edge.LastUpdate)

View File

@ -1761,53 +1761,108 @@ func (b *Builder) IsKnownEdge(chanID lnwire.ShortChannelID) bool {
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (b *Builder) IsStaleEdgePolicy(chanID lnwire.ShortChannelID,
timestamp time.Time, flags lnwire.ChanUpdateChanFlags) bool {
update lnwire.ChannelUpdate) bool {
edge1Timestamp, edge2Timestamp, exists, isZombie, err :=
b.cfg.Graph.HasChannelEdge1(chanID.ToUint64())
if err != nil {
log.Debugf("Check stale edge policy got error: %v", err)
return false
}
var (
disabled = update.IsDisabled()
isNode1 = update.IsNode1()
)
// If we know of the edge as a zombie, then we'll make some additional
// checks to determine if the new policy is fresh.
if isZombie {
// When running with AssumeChannelValid, we also prune channels
// if both of their edges are disabled. We'll mark the new
// policy as stale if it remains disabled.
if b.cfg.AssumeChannelValid {
isDisabled := flags&lnwire.ChanUpdateDisabled ==
lnwire.ChanUpdateDisabled
if isDisabled {
return true
}
switch upd := update.(type) {
case *lnwire.ChannelUpdate1:
timestamp := time.Unix(int64(upd.Timestamp), 0)
edge1Timestamp, edge2Timestamp, exists, isZombie, err :=
b.cfg.Graph.HasChannelEdge1(chanID.ToUint64())
if err != nil {
log.Debugf("Check stale edge policy got error: %v", err)
return false
}
// Otherwise, we'll fall back to our usual ChannelPruneExpiry.
return time.Since(timestamp) > b.cfg.ChannelPruneExpiry
}
// If we know of the edge as a zombie, then we'll make some
// additional checks to determine if the new policy is fresh.
if isZombie {
// When running with AssumeChannelValid, we also prune
// channels if both of their edges are disabled. We'll
// mark the new policy as stale if it remains disabled.
if b.cfg.AssumeChannelValid {
if disabled {
return true
}
}
// If we don't know of the edge, then it means it's fresh (thus not
// stale).
if !exists {
return false
}
// Otherwise, we'll fall back to our usual
// ChannelPruneExpiry.
return time.Since(timestamp) > b.cfg.ChannelPruneExpiry
}
// As edges are directional edge node has a unique policy for the
// direction of the edge they control. Therefore, we first check if we
// already have the most up-to-date information for that edge. If so,
// then we can exit early.
switch {
// A flag set of 0 indicates this is an announcement for the "first"
// node in the channel.
case flags&lnwire.ChanUpdateDirection == 0:
return !edge1Timestamp.Before(timestamp)
// If we don't know of the edge, then it means it's fresh (thus
// not stale).
if !exists {
return false
}
// Similarly, a flag set of 1 indicates this is an announcement for the
// "second" node in the channel.
case flags&lnwire.ChanUpdateDirection == 1:
return !edge2Timestamp.Before(timestamp)
// As edges are directional edge node has a unique policy for
// the direction of the edge they control. Therefore we first
// check if we already have the most up to date information for
// that edge. If so, then we can exit early.
switch {
case isNode1:
return !edge1Timestamp.Before(timestamp)
case !isNode1:
return !edge2Timestamp.Before(timestamp)
}
case *lnwire.ChannelUpdate2:
height := upd.BlockHeight
edge1Height, edge2Height, exists, isZombie, err :=
b.cfg.Graph.HasChannelEdge2(chanID.ToUint64())
if err != nil {
log.Debugf("Check stale edge policy got error: %v", err)
return false
}
// If we know of the edge as a zombie, then we'll make some
// additional checks to determine if the new policy is fresh.
if isZombie {
// When running with AssumeChannelValid, we also prune
// channels if both of their edges are disabled. We'll
// mark the new policy as stale if it remains disabled.
if b.cfg.AssumeChannelValid {
if disabled {
return true
}
}
// Otherwise, we'll fall back to our usual
// ChannelPruneExpiry.
blocksSince := b.SyncedHeight() - height.Val
return blocksSince >
uint32(b.cfg.ChannelPruneExpiry.Hours()*6)
}
// If we don't know of the edge, then it means it's fresh (thus
// not stale).
if !exists {
return false
}
// As edges are directional edge node has a unique policy for
// the direction of the edge they control. Therefore we first
// check if we already have the most up to date information for
// that edge. If so, then we can exit early.
switch {
case isNode1:
return edge1Height >= height.Val
case !isNode1:
return edge2Height >= height.Val
}
}
return false

View File

@ -1142,13 +1142,17 @@ func TestIsStaleEdgePolicy(t *testing.T) {
// If we query for staleness before adding the edge, we should get
// false.
updateTimeStamp := time.Unix(123, 0)
if ctx.builder.IsStaleEdgePolicy(*chanID, updateTimeStamp, 0) {
t.Fatalf("router failed to detect fresh edge policy")
time1 := 123
updateTimeStamp := time.Unix(int64(time1), 0)
update1 := &lnwire.ChannelUpdate1{
Timestamp: uint32(time1),
}
if ctx.builder.IsStaleEdgePolicy(*chanID, updateTimeStamp, 1) {
t.Fatalf("router failed to detect fresh edge policy")
update2 := &lnwire.ChannelUpdate1{
Timestamp: uint32(time1),
ChannelFlags: lnwire.ChanUpdateDirection,
}
require.False(t, ctx.builder.IsStaleEdgePolicy(*chanID, update1))
require.False(t, ctx.builder.IsStaleEdgePolicy(*chanID, update2))
edge := &models.ChannelEdgeInfo1{
ChannelID: chanID.ToUint64(),
@ -1193,20 +1197,22 @@ func TestIsStaleEdgePolicy(t *testing.T) {
// Now that the edges have been added, an identical (chanID, flag,
// timestamp) tuple for each edge should be detected as a stale edge.
if !ctx.builder.IsStaleEdgePolicy(*chanID, updateTimeStamp, 0) {
if !ctx.builder.IsStaleEdgePolicy(*chanID, update1) {
t.Fatalf("router failed to detect stale edge policy")
}
if !ctx.builder.IsStaleEdgePolicy(*chanID, updateTimeStamp, 1) {
if !ctx.builder.IsStaleEdgePolicy(*chanID, update2) {
t.Fatalf("router failed to detect stale edge policy")
}
// If we now update the timestamp for both edges, the router should
// detect that this tuple represents a fresh edge.
updateTimeStamp = time.Unix(9999, 0)
if ctx.builder.IsStaleEdgePolicy(*chanID, updateTimeStamp, 0) {
time2 := 9999
update1.Timestamp = uint32(time2)
update2.Timestamp = uint32(time2)
if ctx.builder.IsStaleEdgePolicy(*chanID, update1) {
t.Fatalf("router failed to detect fresh edge policy")
}
if ctx.builder.IsStaleEdgePolicy(*chanID, updateTimeStamp, 1) {
if ctx.builder.IsStaleEdgePolicy(*chanID, update2) {
t.Fatalf("router failed to detect fresh edge policy")
}
}

View File

@ -59,8 +59,8 @@ type ChannelGraphSource interface {
// IsStaleEdgePolicy returns true if the graph source has a channel
// edge for the passed channel ID (and flags) that have a more recent
// timestamp.
IsStaleEdgePolicy(chanID lnwire.ShortChannelID, timestamp time.Time,
flags lnwire.ChanUpdateChanFlags) bool
IsStaleEdgePolicy(chanID lnwire.ShortChannelID,
policy lnwire.ChannelUpdate) bool
// MarkEdgeLive clears an edge from our zombie index, deeming it as
// live.