Merge pull request #9011 from ziggie1984/fix-gossip-syncer

Fix TimeStamp issue in the Gossip Syncer
This commit is contained in:
Olaoluwa Osuntokun 2024-08-19 16:58:52 -07:00 committed by GitHub
commit b4693b2010
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 228 additions and 39 deletions

View File

@ -2143,6 +2143,21 @@ func (c *ChannelGraph) FilterKnownChanIDs(chansInfo []ChannelUpdateInfo,
zombieIndex, scid,
)
// TODO(ziggie): Make sure that for the strict
// pruning case we compare the pubkeys and
// whether the right timestamp is not older than
// the `ChannelPruneExpiry`.
//
// NOTE: The timestamp data has no verification
// attached to it in the `ReplyChannelRange` msg
// so we are trusting this data at this point.
// However it is not critical because we are
// just removing the channel from the db when
// the timestamps are more recent. During the
// querying of the gossip msg verification
// happens as usual.
// However we should start punishing peers when
// they don't provide us honest data ?
isStillZombie := isZombieChan(
info.Node1UpdateTimestamp,
info.Node2UpdateTimestamp,
@ -2211,6 +2226,29 @@ type ChannelUpdateInfo struct {
Node2UpdateTimestamp time.Time
}
// NewChannelUpdateInfo is a constructor which makes sure we initialize the
// timestamps with zero seconds unix timestamp which equals
// `January 1, 1970, 00:00:00 UTC` in case the value is `time.Time{}`.
func NewChannelUpdateInfo(scid lnwire.ShortChannelID, node1Timestamp,
node2Timestamp time.Time) ChannelUpdateInfo {
chanInfo := ChannelUpdateInfo{
ShortChannelID: scid,
Node1UpdateTimestamp: node1Timestamp,
Node2UpdateTimestamp: node2Timestamp,
}
if node1Timestamp.IsZero() {
chanInfo.Node1UpdateTimestamp = time.Unix(0, 0)
}
if node2Timestamp.IsZero() {
chanInfo.Node2UpdateTimestamp = time.Unix(0, 0)
}
return chanInfo
}
// BlockChannelRange represents a range of channels for a given block height.
type BlockChannelRange struct {
// Height is the height of the block all of the channels below were
@ -2284,9 +2322,9 @@ func (c *ChannelGraph) FilterChannelRange(startHeight,
rawCid := byteOrder.Uint64(k)
cid := lnwire.NewShortChanIDFromInt(rawCid)
chanInfo := ChannelUpdateInfo{
ShortChannelID: cid,
}
chanInfo := NewChannelUpdateInfo(
cid, time.Time{}, time.Time{},
)
if !withTimestamps {
channelsPerBlock[cid.BlockHeight] = append(

View File

@ -1980,9 +1980,9 @@ func TestFilterKnownChanIDs(t *testing.T) {
t.Fatalf("unable to create channel edge: %v", err)
}
chanIDs = append(chanIDs, ChannelUpdateInfo{
ShortChannelID: chanID,
})
chanIDs = append(chanIDs, NewChannelUpdateInfo(
chanID, time.Time{}, time.Time{},
))
}
const numZombies = 5
@ -2024,20 +2024,28 @@ func TestFilterKnownChanIDs(t *testing.T) {
// should get the same set back.
{
queryIDs: []ChannelUpdateInfo{
{ShortChannelID: lnwire.ShortChannelID{
BlockHeight: 99,
}},
{ShortChannelID: lnwire.ShortChannelID{
BlockHeight: 100,
}},
{
ShortChannelID: lnwire.ShortChannelID{
BlockHeight: 99,
},
},
{
ShortChannelID: lnwire.ShortChannelID{
BlockHeight: 100,
},
},
},
resp: []ChannelUpdateInfo{
{ShortChannelID: lnwire.ShortChannelID{
BlockHeight: 99,
}},
{ShortChannelID: lnwire.ShortChannelID{
BlockHeight: 100,
}},
{
ShortChannelID: lnwire.ShortChannelID{
BlockHeight: 99,
},
},
{
ShortChannelID: lnwire.ShortChannelID{
BlockHeight: 100,
},
},
},
},
@ -2419,7 +2427,7 @@ func TestFilterChannelRange(t *testing.T) {
)
)
updateTimeSeed := int64(1)
updateTimeSeed := time.Now().Unix()
maybeAddPolicy := func(chanID uint64, node *LightningNode,
node2 bool) time.Time {
@ -2428,7 +2436,7 @@ func TestFilterChannelRange(t *testing.T) {
chanFlags = lnwire.ChanUpdateDirection
}
var updateTime time.Time
var updateTime = time.Unix(0, 0)
if rand.Int31n(2) == 0 {
updateTime = time.Unix(updateTimeSeed, 0)
err = graph.UpdateEdgePolicy(&models.ChannelEdgePolicy{
@ -2456,11 +2464,16 @@ func TestFilterChannelRange(t *testing.T) {
)
require.NoError(t, graph.AddChannelEdge(&channel2))
chanInfo1 := NewChannelUpdateInfo(
chanID1, time.Time{}, time.Time{},
)
chanInfo2 := NewChannelUpdateInfo(
chanID2, time.Time{}, time.Time{},
)
channelRanges = append(channelRanges, BlockChannelRange{
Height: chanHeight,
Channels: []ChannelUpdateInfo{
{ShortChannelID: chanID1},
{ShortChannelID: chanID2},
chanInfo1, chanInfo2,
},
})
@ -2471,20 +2484,17 @@ func TestFilterChannelRange(t *testing.T) {
time4 = maybeAddPolicy(channel2.ChannelID, node2, true)
)
chanInfo1 = NewChannelUpdateInfo(
chanID1, time1, time2,
)
chanInfo2 = NewChannelUpdateInfo(
chanID2, time3, time4,
)
channelRangesWithTimestamps = append(
channelRangesWithTimestamps, BlockChannelRange{
Height: chanHeight,
Channels: []ChannelUpdateInfo{
{
ShortChannelID: chanID1,
Node1UpdateTimestamp: time1,
Node2UpdateTimestamp: time2,
},
{
ShortChannelID: chanID2,
Node1UpdateTimestamp: time3,
Node2UpdateTimestamp: time4,
},
chanInfo1, chanInfo2,
},
},
)

View File

@ -2745,6 +2745,22 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
return nil, true
}
// Check that the ChanUpdate is not too far into the future, this could
// reveal some faulty implementation therefore we log an error.
if time.Until(timestamp) > graph.DefaultChannelPruneExpiry {
log.Errorf("Skewed timestamp (%v) for edge policy of "+
"short_chan_id(%v), timestamp too far in the future: "+
"peer=%v, msg=%s, is_remote=%v", timestamp.Unix(),
shortChanID, nMsg.peer, nMsg.msg.MsgType(),
nMsg.isRemote,
)
nMsg.err <- fmt.Errorf("skewed timestamp of edge policy, "+
"timestamp too far in the future: %v", timestamp.Unix())
return nil, false
}
// Get the node pub key as far since we don't have it in the channel
// update announcement message. We'll need this to properly verify the
// message's signature.

View File

@ -561,7 +561,7 @@ func (m *SyncManager) removeGossipSyncer(peer route.Vertex) {
return
}
log.Debugf("Replaced active GossipSyncer(%x) with GossipSyncer(%x)",
log.Debugf("Replaced active GossipSyncer(%v) with GossipSyncer(%x)",
peer, newActiveSyncer.cfg.peerPub)
}

View File

@ -12,6 +12,7 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/graph"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwire"
"golang.org/x/time/rate"
@ -787,6 +788,16 @@ func isLegacyReplyChannelRange(query *lnwire.QueryChannelRange,
// reply to the initial range query to discover new channels that it didn't
// previously know of.
func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) error {
// isStale returns whether the timestamp is too far into the past.
isStale := func(timestamp time.Time) bool {
return time.Since(timestamp) > graph.DefaultChannelPruneExpiry
}
// isSkewed returns whether the timestamp is too far into the future.
isSkewed := func(timestamp time.Time) bool {
return time.Until(timestamp) > graph.DefaultChannelPruneExpiry
}
// If we're not communicating with a legacy node, we'll apply some
// further constraints on their reply to ensure it satisfies our query.
if !isLegacyReplyChannelRange(g.curQueryRangeMsg, msg) {
@ -838,9 +849,9 @@ func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro
}
for i, scid := range msg.ShortChanIDs {
info := channeldb.ChannelUpdateInfo{
ShortChannelID: scid,
}
info := channeldb.NewChannelUpdateInfo(
scid, time.Time{}, time.Time{},
)
if len(msg.Timestamps) != 0 {
t1 := time.Unix(int64(msg.Timestamps[i].Timestamp1), 0)
@ -848,6 +859,32 @@ func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro
t2 := time.Unix(int64(msg.Timestamps[i].Timestamp2), 0)
info.Node2UpdateTimestamp = t2
// Sort out all channels with outdated or skewed
// timestamps. Both timestamps need to be out of
// boundaries for us to skip the channel and not query
// it later on.
switch {
case isStale(info.Node1UpdateTimestamp) &&
isStale(info.Node2UpdateTimestamp):
continue
case isSkewed(info.Node1UpdateTimestamp) &&
isSkewed(info.Node2UpdateTimestamp):
continue
case isStale(info.Node1UpdateTimestamp) &&
isSkewed(info.Node2UpdateTimestamp):
continue
case isStale(info.Node2UpdateTimestamp) &&
isSkewed(info.Node1UpdateTimestamp):
continue
}
}
g.bufferedChanRangeReplies = append(

View File

@ -1229,6 +1229,12 @@ func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) {
query, err := syncer.genChanRangeQuery(true)
require.NoError(t, err, "unable to generate channel range query")
currentTimestamp := time.Now().Unix()
// Timestamp more than 2 weeks in the past hence expired.
expiredTimestamp := time.Unix(0, 0).Unix()
// Timestamp three weeks in the future.
skewedTimestamp := time.Now().Add(time.Hour * 24 * 18).Unix()
// When interpreting block ranges, the first reply should start from
// our requested first block, and the last should end at our requested
// last block.
@ -1253,14 +1259,78 @@ func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) {
},
{
FirstBlockHeight: 12,
NumBlocks: query.NumBlocks - 12,
Complete: 1,
NumBlocks: 1,
ShortChanIDs: []lnwire.ShortChannelID{
{
BlockHeight: 12,
},
},
},
{
FirstBlockHeight: 13,
NumBlocks: query.NumBlocks - 13,
Complete: 1,
ShortChanIDs: []lnwire.ShortChannelID{
{
BlockHeight: 13,
TxIndex: 1,
},
{
BlockHeight: 13,
TxIndex: 2,
},
{
BlockHeight: 13,
TxIndex: 3,
},
{
BlockHeight: 13,
TxIndex: 4,
},
{
BlockHeight: 13,
TxIndex: 5,
},
{
BlockHeight: 13,
TxIndex: 6,
},
},
Timestamps: []lnwire.ChanUpdateTimestamps{
{
// Both timestamps are valid.
Timestamp1: uint32(currentTimestamp),
Timestamp2: uint32(currentTimestamp),
},
{
// One of the timestamps is valid.
Timestamp1: uint32(currentTimestamp),
Timestamp2: uint32(expiredTimestamp),
},
{
// Both timestamps are expired.
Timestamp1: uint32(expiredTimestamp),
Timestamp2: uint32(expiredTimestamp),
},
{
// Both timestamps are skewed.
Timestamp1: uint32(skewedTimestamp),
Timestamp2: uint32(skewedTimestamp),
},
{
// One timestamp is skewed the other
// expired.
Timestamp1: uint32(expiredTimestamp),
Timestamp2: uint32(skewedTimestamp),
},
{
// One timestamp is skewed the other
// expired.
Timestamp1: uint32(skewedTimestamp),
Timestamp2: uint32(expiredTimestamp),
},
},
},
}
// Each reply query is the same as the original query in the legacy
@ -1274,6 +1344,9 @@ func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) {
replies[2].FirstBlockHeight = query.FirstBlockHeight
replies[2].NumBlocks = query.NumBlocks
replies[3].FirstBlockHeight = query.FirstBlockHeight
replies[3].NumBlocks = query.NumBlocks
}
// We'll begin by sending the syncer a set of non-complete channel
@ -1284,6 +1357,9 @@ func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) {
if err := syncer.processChanRangeReply(replies[1]); err != nil {
t.Fatalf("unable to process reply: %v", err)
}
if err := syncer.processChanRangeReply(replies[2]); err != nil {
t.Fatalf("unable to process reply: %v", err)
}
// At this point, we should still be in our starting state as the query
// hasn't finished.
@ -1301,6 +1377,14 @@ func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) {
{
BlockHeight: 12,
},
{
BlockHeight: 13,
TxIndex: 1,
},
{
BlockHeight: 13,
TxIndex: 2,
},
}
// As we're about to send the final response, we'll launch a goroutine
@ -1335,7 +1419,7 @@ func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) {
// If we send the final message, then we should transition to
// queryNewChannels as we've sent a non-empty set of new channels.
if err := syncer.processChanRangeReply(replies[2]); err != nil {
if err := syncer.processChanRangeReply(replies[3]); err != nil {
t.Fatalf("unable to process reply: %v", err)
}

View File

@ -63,6 +63,10 @@ commitment when the channel was force closed.
* [A bug has been fixed that could cause invalid channel
announcements](https://github.com/lightningnetwork/lnd/pull/9002) to be
generated if the inbound fee discount is used.
* [Fixed](https://github.com/lightningnetwork/lnd/pull/9011) a timestamp issue
in the `ReplyChannelRange` msg and introduced a check that ChanUpdates with a
timestamp too far into the future will be discarded.
# New Features
## Functional Enhancements