discovery: use ChannelUpdate interface in handleChanUpdate

This commit is contained in:
Elle Mouton
2023-11-07 14:10:38 +02:00
parent 58d45188fe
commit fc4e28f9fa

View File

@@ -2098,7 +2098,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// A new authenticated channel edge update has arrived. This indicates // A new authenticated channel edge update has arrived. This indicates
// that the directional information for an already known channel has // that the directional information for an already known channel has
// been updated. // been updated.
case *lnwire.ChannelUpdate1: case lnwire.ChannelUpdate:
return d.handleChanUpdate(nMsg, msg, schedulerOp) return d.handleChanUpdate(nMsg, msg, schedulerOp)
// A new signature announcement has been received. This indicates // A new signature announcement has been received. This indicates
@@ -2973,22 +2973,28 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
// handleChanUpdate processes a new channel update. // handleChanUpdate processes a new channel update.
func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg, func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
upd *lnwire.ChannelUpdate1, upd lnwire.ChannelUpdate, ops []batch.SchedulerOption) ([]networkMsg,
ops []batch.SchedulerOption) ([]networkMsg, bool) { bool) {
var (
scid = upd.SCID()
chainHash = upd.GetChainHash()
)
log.Debugf("Processing ChannelUpdate: peer=%v, short_chan_id=%v, ", log.Debugf("Processing ChannelUpdate: peer=%v, short_chan_id=%v, ",
nMsg.peer, upd.ShortChannelID.ToUint64()) nMsg.peer, scid)
// We'll ignore any channel updates that target any chain other than // We'll ignore any channel updates that target any chain other than
// the set of chains we know of. // the set of chains we know of.
if !bytes.Equal(upd.ChainHash[:], d.cfg.ChainHash[:]) { if !bytes.Equal(chainHash[:], d.cfg.ChainHash[:]) {
err := fmt.Errorf("ignoring ChannelUpdate from chain=%v, "+ err := fmt.Errorf("ignoring %s from chain=%v, "+
"gossiper on chain=%v", upd.ChainHash, d.cfg.ChainHash) "gossiper on chain=%v", upd.MsgType(), chainHash,
d.cfg.ChainHash)
log.Errorf(err.Error()) log.Errorf(err.Error())
key := newRejectCacheKey( key := newRejectCacheKey(
upd.ShortChannelID.ToUint64(), scid.ToUint64(), sourceToPub(nMsg.source),
sourceToPub(nMsg.source),
) )
_, _ = d.recentRejects.Put(key, &cachedReject{}) _, _ = d.recentRejects.Put(key, &cachedReject{})
@@ -2996,8 +3002,8 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
return nil, false return nil, false
} }
blockHeight := upd.ShortChannelID.BlockHeight blockHeight := upd.SCID().BlockHeight
shortChanID := upd.ShortChannelID.ToUint64() shortChanID := upd.SCID().ToUint64()
// If the advertised inclusionary block is beyond our knowledge of the // If the advertised inclusionary block is beyond our knowledge of the
// chain tip, then we'll put the announcement in limbo to be fully // chain tip, then we'll put the announcement in limbo to be fully
@@ -3005,8 +3011,8 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
// alias SCID, we'll skip the isPremature check. This is necessary // alias SCID, we'll skip the isPremature check. This is necessary
// since aliases start at block height 16_000_000. // since aliases start at block height 16_000_000.
d.Lock() d.Lock()
if nMsg.isRemote && !d.cfg.IsAlias(upd.ShortChannelID) && if nMsg.isRemote && !d.cfg.IsAlias(scid) &&
d.isPremature(upd.ShortChannelID, 0, nMsg) { d.isPremature(scid, 0, nMsg) {
log.Warnf("Update announcement for short_chan_id(%v), is "+ log.Warnf("Update announcement for short_chan_id(%v), is "+
"premature: advertises height %v, only height %v is "+ "premature: advertises height %v, only height %v is "+
@@ -3017,23 +3023,21 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
} }
d.Unlock() d.Unlock()
// Before we perform any of the expensive checks below, we'll check
// whether this update is stale or is for a zombie channel in order to
// quickly reject it.
timestamp := time.Unix(int64(upd.Timestamp), 0)
// Fetch the SCID we should be using to lock the channelMtx and make // Fetch the SCID we should be using to lock the channelMtx and make
// graph queries with. // graph queries with.
graphScid, err := d.cfg.FindBaseByAlias(upd.ShortChannelID) graphScid, err := d.cfg.FindBaseByAlias(scid)
if err != nil { if err != nil {
// Fallback and set the graphScid to the peer-provided SCID. // Fallback and set the graphScid to the peer-provided SCID.
// This will occur for non-option-scid-alias channels and for // This will occur for non-option-scid-alias channels and for
// public option-scid-alias channels after 6 confirmations. // public option-scid-alias channels after 6 confirmations.
// Once public option-scid-alias channels have 6 confs, we'll // Once public option-scid-alias channels have 6 confs, we'll
// ignore ChannelUpdates with one of their aliases. // ignore ChannelUpdates with one of their aliases.
graphScid = upd.ShortChannelID graphScid = scid
} }
// Before we perform any of the expensive checks below, we'll check
// whether this update is stale or is for a zombie channel in order to
// quickly reject it.
if d.cfg.Graph.IsStaleEdgePolicy(graphScid, upd) { if d.cfg.Graph.IsStaleEdgePolicy(graphScid, upd) {
log.Debugf("Ignored stale edge policy for short_chan_id(%v): "+ log.Debugf("Ignored stale edge policy for short_chan_id(%v): "+
"peer=%v, msg=%s, is_remote=%v", shortChanID, "peer=%v, msg=%s, is_remote=%v", shortChanID,
@@ -3041,24 +3045,46 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
) )
nMsg.err <- nil nMsg.err <- nil
return nil, true return nil, true
} }
// Check that the ChanUpdate is not too far into the future, this could // Check that the ChanUpdate is not too far into the future, this could
// reveal some faulty implementation therefore we log an error. // reveal some faulty implementation therefore we log an error.
if time.Until(timestamp) > graph.DefaultChannelPruneExpiry { // TODO(elle): abstract this check
log.Errorf("Skewed timestamp (%v) for edge policy of "+ switch u := upd.(type) {
"short_chan_id(%v), timestamp too far in the future: "+ case *lnwire.ChannelUpdate1:
"peer=%v, msg=%s, is_remote=%v", timestamp.Unix(), timestamp := time.Unix(int64(u.Timestamp), 0)
shortChanID, nMsg.peer, nMsg.msg.MsgType(),
nMsg.isRemote,
)
nMsg.err <- fmt.Errorf("skewed timestamp of edge policy, "+ if time.Until(timestamp) > graph.DefaultChannelPruneExpiry {
"timestamp too far in the future: %v", timestamp.Unix()) log.Errorf("Skewed timestamp (%v) for edge policy of "+
"short_chan_id(%v), timestamp too far in the future: "+
"peer=%v, msg=%s, is_remote=%v", timestamp.Unix(),
shortChanID, nMsg.peer, nMsg.msg.MsgType(),
nMsg.isRemote,
)
return nil, false nMsg.err <- fmt.Errorf("skewed timestamp of edge policy, "+
"timestamp too far in the future: %v", timestamp.Unix())
return nil, false
}
case *lnwire.ChannelUpdate2:
if int64(u.BlockHeight.Val)-int64(d.latestHeight()) >
int64(graph.DefaultChannelPruneExpiry.Hours()*6) {
log.Errorf("Skewed blockheight (%v) for edge policy "+
"of short_chan_id(%v), blockheight too far "+
"in the future: peer=%v, msg=%s, is_remote=%v",
u.BlockHeight.Val, shortChanID, nMsg.peer,
nMsg.msg.MsgType(), nMsg.isRemote,
)
nMsg.err <- fmt.Errorf("skewed blockheight of edge policy, "+
"timestamp too far in the future: %v", u.BlockHeight)
return nil, false
}
} }
// Get the node pub key as far since we don't have it in the channel // Get the node pub key as far since we don't have it in the channel
@@ -3099,7 +3125,7 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
// If the edge corresponding to this ChannelUpdate was not // If the edge corresponding to this ChannelUpdate was not
// found in the graph, this might be a channel in the process // found in the graph, this might be a channel in the process
// of being opened, and we haven't processed our own // of being opened, and we haven't processed our own
// ChannelAnnouncement yet, hence it is not not found in the // ChannelAnnouncement yet, hence it is not found in the
// graph. This usually gets resolved after the channel proofs // graph. This usually gets resolved after the channel proofs
// are exchanged and the channel is broadcasted to the rest of // are exchanged and the channel is broadcasted to the rest of
// the network, but in case this is a private channel this // the network, but in case this is a private channel this
@@ -3152,7 +3178,7 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
nMsg.err <- err nMsg.err <- err
key := newRejectCacheKey( key := newRejectCacheKey(
upd.ShortChannelID.ToUint64(), scid.ToUint64(),
sourceToPub(nMsg.source), sourceToPub(nMsg.source),
) )
_, _ = d.recentRejects.Put(key, &cachedReject{}) _, _ = d.recentRejects.Put(key, &cachedReject{})
@@ -3166,15 +3192,16 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
var ( var (
pubKey *btcec.PublicKey pubKey *btcec.PublicKey
edgeToUpdate models.ChannelEdgePolicy edgeToUpdate models.ChannelEdgePolicy
direction int
) )
direction := upd.ChannelFlags & lnwire.ChanUpdateDirection if upd.IsNode1() {
switch direction {
case 0:
pubKey, _ = chanInfo.NodeKey1() pubKey, _ = chanInfo.NodeKey1()
edgeToUpdate = e1 edgeToUpdate = e1
case 1: direction = 0
} else {
pubKey, _ = chanInfo.NodeKey2() pubKey, _ = chanInfo.NodeKey2()
edgeToUpdate = e2 edgeToUpdate = e2
direction = 1
} }
var chanID = chanInfo.GetChanID() var chanID = chanInfo.GetChanID()
@@ -3192,38 +3219,21 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
if err != nil { if err != nil {
rErr := fmt.Errorf("unable to validate channel update "+ rErr := fmt.Errorf("unable to validate channel update "+
"announcement for short_chan_id=%v: %v", "announcement for short_chan_id=%v: %v",
spew.Sdump(upd.ShortChannelID), err) spew.Sdump(scid), err)
log.Error(rErr) log.Error(rErr)
nMsg.err <- rErr nMsg.err <- rErr
return nil, false return nil, false
} }
var edge *models.ChannelEdgePolicy1
if edgeToUpdate != nil {
var ok bool
edge, ok = edgeToUpdate.(*models.ChannelEdgePolicy1)
if !ok {
rErr := fmt.Errorf("expected "+
"*models.ChannelEdgePolicy1, got: %T",
edgeToUpdate)
log.Error(rErr)
nMsg.err <- rErr
return nil, false
}
}
// If we have a previous version of the edge being updated, we'll want // If we have a previous version of the edge being updated, we'll want
// to rate limit its updates to prevent spam throughout the network. // to rate limit its updates to prevent spam throughout the network.
if nMsg.isRemote && edge != nil { if nMsg.isRemote && edgeToUpdate != nil {
// If it's a keep-alive update, we'll only propagate one if // If it's a keep-alive update, we'll only propagate one if
// it's been a day since the previous. This follows our own // it's been a day since the previous. This follows our own
// heuristic of sending keep-alive updates after the same // heuristic of sending keep-alive updates after the same
// duration (see retransmitStaleAnns). // duration (see retransmitStaleAnns).
timeSinceLastUpdate := timestamp.Sub(edge.LastUpdate) isKeepAlive, err := IsKeepAliveUpdate(upd, edgeToUpdate)
isKeepAlive, err := IsKeepAliveUpdate(upd, edge)
if err != nil { if err != nil {
log.Errorf("Could not determine if update is "+ log.Errorf("Could not determine if update is "+
"keepalive: %v", err) "keepalive: %v", err)
@@ -3233,7 +3243,18 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
} }
if isKeepAlive { if isKeepAlive {
if timeSinceLastUpdate < d.cfg.RebroadcastInterval { within, err := d.updateWithinRebroadcastInterval(
upd, edgeToUpdate,
)
if err != nil {
log.Errorf("Could not determine if update is "+
"within rebroadcast interval: %v", err)
nMsg.err <- err
return nil, false
}
if !within {
log.Debugf("Ignoring keep alive update not "+ log.Debugf("Ignoring keep alive update not "+
"within %v period for channel %v", "within %v period for channel %v",
d.cfg.RebroadcastInterval, shortChanID) d.cfg.RebroadcastInterval, shortChanID)
@@ -3252,7 +3273,7 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
// multiple aliases for a channel and we may otherwise // multiple aliases for a channel and we may otherwise
// rate-limit only a single alias of the channel, // rate-limit only a single alias of the channel,
// instead of the whole channel. // instead of the whole channel.
baseScid := chanID baseScid := chanInfo.GetChanID()
d.Lock() d.Lock()
rls, ok := d.chanUpdateRateLimiter[baseScid] rls, ok := d.chanUpdateRateLimiter[baseScid]
if !ok { if !ok {
@@ -3283,18 +3304,23 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
// different alias. This might mean that SigBytes is incorrect as it // different alias. This might mean that SigBytes is incorrect as it
// signs a different SCID than the database SCID, but since there will // signs a different SCID than the database SCID, but since there will
// only be a difference if AuthProof == nil, this is fine. // only be a difference if AuthProof == nil, this is fine.
update := &models.ChannelEdgePolicy1{ update, err := models.EdgePolicyFromUpdate(upd)
SigBytes: upd.Signature.ToSignatureBytes(), if err != nil {
ChannelID: chanID, rErr := fmt.Errorf("unable to convert update to policy for "+
LastUpdate: timestamp, "short_chan_id=%v: %v", spew.Sdump(scid), err)
MessageFlags: upd.MessageFlags,
ChannelFlags: upd.ChannelFlags, log.Error(rErr)
TimeLockDelta: upd.TimeLockDelta, nMsg.err <- rErr
MinHTLC: upd.HtlcMinimumMsat,
MaxHTLC: upd.HtlcMaximumMsat, return nil, false
FeeBaseMSat: lnwire.MilliSatoshi(upd.BaseFee), }
FeeProportionalMillionths: lnwire.MilliSatoshi(upd.FeeRate), switch upd := update.(type) {
ExtraOpaqueData: upd.ExtraOpaqueData, case *models.ChannelEdgePolicy1:
upd.ChannelID = chanInfo.GetChanID()
case *models.ChannelEdgePolicy2:
upd.ShortChannelID.Val = lnwire.NewShortChanIDFromInt(
chanInfo.GetChanID(),
)
} }
if err := d.cfg.Graph.UpdateEdge(update, ops...); err != nil { if err := d.cfg.Graph.UpdateEdge(update, ops...); err != nil {
@@ -3310,7 +3336,8 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
// Since we know the stored SCID in the graph, we'll // Since we know the stored SCID in the graph, we'll
// cache that SCID. // cache that SCID.
key := newRejectCacheKey( key := newRejectCacheKey(
chanID, sourceToPub(nMsg.source), chanInfo.GetChanID(),
sourceToPub(nMsg.source),
) )
_, _ = d.recentRejects.Put(key, &cachedReject{}) _, _ = d.recentRejects.Put(key, &cachedReject{})
@@ -3319,32 +3346,33 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
} }
nMsg.err <- err nMsg.err <- err
return nil, false return nil, false
} }
// If this is a local ChannelUpdate without an AuthProof, it means it // If this is a local ChannelUpdate without an AuthProof, it means it
// is an update to a channel that is not (yet) supposed to be announced // is an update to a channel that is not (yet) supposed to be announced
// to the greater network. However, our channel counter party will need // to the greater network. However, our channel counterparty will need
// to be given the update, so we'll try sending the update directly to // to be given the update, so we'll try sending the update directly to
// the remote peer. // the remote peer.
if !nMsg.isRemote && chanInfo.GetAuthProof() == nil { if !nMsg.isRemote && chanInfo.GetAuthProof() == nil {
if nMsg.optionalMsgFields != nil { if nMsg.optionalMsgFields != nil &&
remoteAlias := nMsg.optionalMsgFields.remoteAlias nMsg.optionalMsgFields.remoteAlias != nil {
if remoteAlias != nil {
// The remoteAlias field was specified, meaning
// that we should replace the SCID in the
// update with the remote's alias. We'll also
// need to re-sign the channel update. This is
// required for option-scid-alias feature-bit
// negotiated channels.
upd.ShortChannelID = *remoteAlias
err := d.cfg.SignAliasUpdate(upd) // The remoteAlias field was specified, meaning
if err != nil { // that we should replace the SCID in the
log.Error(err) // update with the remote's alias. We'll also
nMsg.err <- err // need to re-sign the channel update. This is
return nil, false // required for option-scid-alias feature-bit
} // negotiated channels.
remoteAlias := nMsg.optionalMsgFields.remoteAlias
upd.SetSCID(*remoteAlias)
err := d.cfg.SignAliasUpdate(upd)
if err != nil {
log.Error(err)
nMsg.err <- err
return nil, false
} }
} }
@@ -3361,7 +3389,7 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
if err != nil { if err != nil {
err := fmt.Errorf("unable to reliably send %v for "+ err := fmt.Errorf("unable to reliably send %v for "+
"channel=%v to peer=%x: %v", upd.MsgType(), "channel=%v to peer=%x: %v", upd.MsgType(),
upd.ShortChannelID, remotePubKey, err) scid, remotePubKey, err)
nMsg.err <- err nMsg.err <- err
return nil, false return nil, false
} }
@@ -3374,7 +3402,7 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
// contains an alias because the network would reject this. // contains an alias because the network would reject this.
var announcements []networkMsg var announcements []networkMsg
if chanInfo.GetAuthProof() != nil && if chanInfo.GetAuthProof() != nil &&
!d.cfg.IsAlias(upd.ShortChannelID) { !d.cfg.IsAlias(scid) {
announcements = append(announcements, networkMsg{ announcements = append(announcements, networkMsg{
peer: nMsg.peer, peer: nMsg.peer,
@@ -3386,9 +3414,9 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
nMsg.err <- nil nMsg.err <- nil
log.Debugf("Processed ChannelUpdate: peer=%v, short_chan_id=%v, "+ log.Debugf("Processed %s: peer=%v, short_chan_id=%v, ", upd.MsgType(),
"timestamp=%v", nMsg.peer, upd.ShortChannelID.ToUint64(), nMsg.peer, scid.ToUint64())
timestamp)
return announcements, true return announcements, true
} }
@@ -3848,6 +3876,39 @@ func (d *AuthenticatedGossiper) ShouldDisconnect(pubkey *btcec.PublicKey) (
return false, nil return false, nil
} }
func (d *AuthenticatedGossiper) updateWithinRebroadcastInterval(
upd lnwire.ChannelUpdate, policy models.ChannelEdgePolicy) (bool,
error) {
switch update := upd.(type) {
case *lnwire.ChannelUpdate1:
pol, ok := policy.(*models.ChannelEdgePolicy1)
if !ok {
return false, fmt.Errorf("expected chan edge policy 1")
}
timestamp := time.Unix(int64(update.Timestamp), 0)
timeSinceLastUpdate := timestamp.Sub(pol.LastUpdate)
return timeSinceLastUpdate >= d.cfg.RebroadcastInterval, nil
case *lnwire.ChannelUpdate2:
pol, ok := policy.(*models.ChannelEdgePolicy2)
if !ok {
return false, fmt.Errorf("expected chan edge policy 2")
}
blocksSinceLastUpdate := update.BlockHeight.Val -
pol.BlockHeight.Val
return blocksSinceLastUpdate >=
uint32(d.cfg.RebroadcastInterval.Hours()*6), nil
default:
return false, fmt.Errorf("unhandled impl of Chan Update")
}
}
func buildChanProof(ann lnwire.ChannelAnnouncement) ( func buildChanProof(ann lnwire.ChannelAnnouncement) (
models.ChannelAuthProof, error) { models.ChannelAuthProof, error) {