From 44413868c7de6eddb26f502d465650bc8e5ef87e Mon Sep 17 00:00:00 2001 From: ziggie Date: Wed, 14 Aug 2024 21:43:50 +0200 Subject: [PATCH] multi: fix time.Time initialization. ChanUpdate timestamps are now restircted so that they cannot be more than two weeks into the future. Moreover channels with both timestamps in the ReplyChannelRange msg either too far in the past or too far in the future are not queried. Moreover fix unitests. --- channeldb/graph.go | 29 +++++++++++-- channeldb/graph_test.go | 68 +++++++++++++++++------------- discovery/gossiper.go | 16 +++++++ discovery/syncer.go | 43 +++++++++++++++++-- discovery/syncer_test.go | 90 ++++++++++++++++++++++++++++++++++++++-- 5 files changed, 208 insertions(+), 38 deletions(-) diff --git a/channeldb/graph.go b/channeldb/graph.go index 414672166..daa7a4230 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -2211,6 +2211,29 @@ type ChannelUpdateInfo struct { Node2UpdateTimestamp time.Time } +// NewChannelUpdateInfo is a constructor which makes sure we initialize the +// timestamps with zero seconds unix timestamp which equals +// `January 1, 1970, 00:00:00 UTC` in case the value is `time.Time{}`. +func NewChannelUpdateInfo(scid lnwire.ShortChannelID, node1Timestamp, + node2Timestamp time.Time) ChannelUpdateInfo { + + chanInfo := ChannelUpdateInfo{ + ShortChannelID: scid, + Node1UpdateTimestamp: node1Timestamp, + Node2UpdateTimestamp: node2Timestamp, + } + + if node1Timestamp.IsZero() { + chanInfo.Node1UpdateTimestamp = time.Unix(0, 0) + } + + if node2Timestamp.IsZero() { + chanInfo.Node2UpdateTimestamp = time.Unix(0, 0) + } + + return chanInfo +} + // BlockChannelRange represents a range of channels for a given block height. type BlockChannelRange struct { // Height is the height of the block all of the channels below were @@ -2284,9 +2307,9 @@ func (c *ChannelGraph) FilterChannelRange(startHeight, rawCid := byteOrder.Uint64(k) cid := lnwire.NewShortChanIDFromInt(rawCid) - chanInfo := ChannelUpdateInfo{ - ShortChannelID: cid, - } + chanInfo := NewChannelUpdateInfo( + cid, time.Time{}, time.Time{}, + ) if !withTimestamps { channelsPerBlock[cid.BlockHeight] = append( diff --git a/channeldb/graph_test.go b/channeldb/graph_test.go index 00e30d5b2..6380ed29c 100644 --- a/channeldb/graph_test.go +++ b/channeldb/graph_test.go @@ -1980,9 +1980,9 @@ func TestFilterKnownChanIDs(t *testing.T) { t.Fatalf("unable to create channel edge: %v", err) } - chanIDs = append(chanIDs, ChannelUpdateInfo{ - ShortChannelID: chanID, - }) + chanIDs = append(chanIDs, NewChannelUpdateInfo( + chanID, time.Time{}, time.Time{}, + )) } const numZombies = 5 @@ -2024,20 +2024,28 @@ func TestFilterKnownChanIDs(t *testing.T) { // should get the same set back. { queryIDs: []ChannelUpdateInfo{ - {ShortChannelID: lnwire.ShortChannelID{ - BlockHeight: 99, - }}, - {ShortChannelID: lnwire.ShortChannelID{ - BlockHeight: 100, - }}, + { + ShortChannelID: lnwire.ShortChannelID{ + BlockHeight: 99, + }, + }, + { + ShortChannelID: lnwire.ShortChannelID{ + BlockHeight: 100, + }, + }, }, resp: []ChannelUpdateInfo{ - {ShortChannelID: lnwire.ShortChannelID{ - BlockHeight: 99, - }}, - {ShortChannelID: lnwire.ShortChannelID{ - BlockHeight: 100, - }}, + { + ShortChannelID: lnwire.ShortChannelID{ + BlockHeight: 99, + }, + }, + { + ShortChannelID: lnwire.ShortChannelID{ + BlockHeight: 100, + }, + }, }, }, @@ -2419,7 +2427,7 @@ func TestFilterChannelRange(t *testing.T) { ) ) - updateTimeSeed := int64(1) + updateTimeSeed := time.Now().Unix() maybeAddPolicy := func(chanID uint64, node *LightningNode, node2 bool) time.Time { @@ -2428,7 +2436,7 @@ func TestFilterChannelRange(t *testing.T) { chanFlags = lnwire.ChanUpdateDirection } - var updateTime time.Time + var updateTime = time.Unix(0, 0) if rand.Int31n(2) == 0 { updateTime = time.Unix(updateTimeSeed, 0) err = graph.UpdateEdgePolicy(&models.ChannelEdgePolicy{ @@ -2456,11 +2464,16 @@ func TestFilterChannelRange(t *testing.T) { ) require.NoError(t, graph.AddChannelEdge(&channel2)) + chanInfo1 := NewChannelUpdateInfo( + chanID1, time.Time{}, time.Time{}, + ) + chanInfo2 := NewChannelUpdateInfo( + chanID2, time.Time{}, time.Time{}, + ) channelRanges = append(channelRanges, BlockChannelRange{ Height: chanHeight, Channels: []ChannelUpdateInfo{ - {ShortChannelID: chanID1}, - {ShortChannelID: chanID2}, + chanInfo1, chanInfo2, }, }) @@ -2471,20 +2484,17 @@ func TestFilterChannelRange(t *testing.T) { time4 = maybeAddPolicy(channel2.ChannelID, node2, true) ) + chanInfo1 = NewChannelUpdateInfo( + chanID1, time1, time2, + ) + chanInfo2 = NewChannelUpdateInfo( + chanID2, time3, time4, + ) channelRangesWithTimestamps = append( channelRangesWithTimestamps, BlockChannelRange{ Height: chanHeight, Channels: []ChannelUpdateInfo{ - { - ShortChannelID: chanID1, - Node1UpdateTimestamp: time1, - Node2UpdateTimestamp: time2, - }, - { - ShortChannelID: chanID2, - Node1UpdateTimestamp: time3, - Node2UpdateTimestamp: time4, - }, + chanInfo1, chanInfo2, }, }, ) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 0dcd7a6c9..80fd576a2 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -2745,6 +2745,22 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg, return nil, true } + // Check that the ChanUpdate is not too far into the future, this could + // reveal some faulty implementation therefore we log an error. + if time.Until(timestamp) > graph.DefaultChannelPruneExpiry { + log.Errorf("Skewed timestamp (%v) for edge policy of "+ + "short_chan_id(%v), timestamp too far in the future: "+ + "peer=%v, msg=%s, is_remote=%v", timestamp.Unix(), + shortChanID, nMsg.peer, nMsg.msg.MsgType(), + nMsg.isRemote, + ) + + nMsg.err <- fmt.Errorf("skewed timestamp of edge policy, "+ + "timestamp too far in the future: %v", timestamp.Unix()) + + return nil, false + } + // Get the node pub key as far since we don't have it in the channel // update announcement message. We'll need this to properly verify the // message's signature. diff --git a/discovery/syncer.go b/discovery/syncer.go index b910151cb..512c9f631 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -12,6 +12,7 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/graph" "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnwire" "golang.org/x/time/rate" @@ -787,6 +788,16 @@ func isLegacyReplyChannelRange(query *lnwire.QueryChannelRange, // reply to the initial range query to discover new channels that it didn't // previously know of. func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) error { + // isStale returns whether the timestamp is too far into the past. + isStale := func(timestamp time.Time) bool { + return time.Since(timestamp) > graph.DefaultChannelPruneExpiry + } + + // isSkewed returns whether the timestamp is too far into the future. + isSkewed := func(timestamp time.Time) bool { + return time.Until(timestamp) > graph.DefaultChannelPruneExpiry + } + // If we're not communicating with a legacy node, we'll apply some // further constraints on their reply to ensure it satisfies our query. if !isLegacyReplyChannelRange(g.curQueryRangeMsg, msg) { @@ -838,9 +849,9 @@ func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro } for i, scid := range msg.ShortChanIDs { - info := channeldb.ChannelUpdateInfo{ - ShortChannelID: scid, - } + info := channeldb.NewChannelUpdateInfo( + scid, time.Time{}, time.Time{}, + ) if len(msg.Timestamps) != 0 { t1 := time.Unix(int64(msg.Timestamps[i].Timestamp1), 0) @@ -848,6 +859,32 @@ func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro t2 := time.Unix(int64(msg.Timestamps[i].Timestamp2), 0) info.Node2UpdateTimestamp = t2 + + // Sort out all channels with outdated or skewed + // timestamps. Both timestamps need to be out of + // boundaries for us to skip the channel and not query + // it later on. + switch { + case isStale(info.Node1UpdateTimestamp) && + isStale(info.Node2UpdateTimestamp): + + continue + + case isSkewed(info.Node1UpdateTimestamp) && + isSkewed(info.Node2UpdateTimestamp): + + continue + + case isStale(info.Node1UpdateTimestamp) && + isSkewed(info.Node2UpdateTimestamp): + + continue + + case isStale(info.Node2UpdateTimestamp) && + isSkewed(info.Node1UpdateTimestamp): + + continue + } } g.bufferedChanRangeReplies = append( diff --git a/discovery/syncer_test.go b/discovery/syncer_test.go index 7a649f466..15e2442e1 100644 --- a/discovery/syncer_test.go +++ b/discovery/syncer_test.go @@ -1229,6 +1229,12 @@ func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) { query, err := syncer.genChanRangeQuery(true) require.NoError(t, err, "unable to generate channel range query") + currentTimestamp := time.Now().Unix() + // Timestamp more than 2 weeks in the past hence expired. + expiredTimestamp := time.Unix(0, 0).Unix() + // Timestamp three weeks in the future. + skewedTimestamp := time.Now().Add(time.Hour * 24 * 18).Unix() + // When interpreting block ranges, the first reply should start from // our requested first block, and the last should end at our requested // last block. @@ -1253,14 +1259,78 @@ func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) { }, { FirstBlockHeight: 12, - NumBlocks: query.NumBlocks - 12, - Complete: 1, + NumBlocks: 1, ShortChanIDs: []lnwire.ShortChannelID{ { BlockHeight: 12, }, }, }, + { + FirstBlockHeight: 13, + NumBlocks: query.NumBlocks - 13, + Complete: 1, + ShortChanIDs: []lnwire.ShortChannelID{ + { + BlockHeight: 13, + TxIndex: 1, + }, + { + BlockHeight: 13, + TxIndex: 2, + }, + { + BlockHeight: 13, + TxIndex: 3, + }, + { + BlockHeight: 13, + TxIndex: 4, + }, + { + BlockHeight: 13, + TxIndex: 5, + }, + { + BlockHeight: 13, + TxIndex: 6, + }, + }, + Timestamps: []lnwire.ChanUpdateTimestamps{ + { + // Both timestamps are valid. + Timestamp1: uint32(currentTimestamp), + Timestamp2: uint32(currentTimestamp), + }, + { + // One of the timestamps is valid. + Timestamp1: uint32(currentTimestamp), + Timestamp2: uint32(expiredTimestamp), + }, + { + // Both timestamps are expired. + Timestamp1: uint32(expiredTimestamp), + Timestamp2: uint32(expiredTimestamp), + }, + { + // Both timestamps are skewed. + Timestamp1: uint32(skewedTimestamp), + Timestamp2: uint32(skewedTimestamp), + }, + { + // One timestamp is skewed the other + // expired. + Timestamp1: uint32(expiredTimestamp), + Timestamp2: uint32(skewedTimestamp), + }, + { + // One timestamp is skewed the other + // expired. + Timestamp1: uint32(skewedTimestamp), + Timestamp2: uint32(expiredTimestamp), + }, + }, + }, } // Each reply query is the same as the original query in the legacy @@ -1274,6 +1344,9 @@ func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) { replies[2].FirstBlockHeight = query.FirstBlockHeight replies[2].NumBlocks = query.NumBlocks + + replies[3].FirstBlockHeight = query.FirstBlockHeight + replies[3].NumBlocks = query.NumBlocks } // We'll begin by sending the syncer a set of non-complete channel @@ -1284,6 +1357,9 @@ func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) { if err := syncer.processChanRangeReply(replies[1]); err != nil { t.Fatalf("unable to process reply: %v", err) } + if err := syncer.processChanRangeReply(replies[2]); err != nil { + t.Fatalf("unable to process reply: %v", err) + } // At this point, we should still be in our starting state as the query // hasn't finished. @@ -1301,6 +1377,14 @@ func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) { { BlockHeight: 12, }, + { + BlockHeight: 13, + TxIndex: 1, + }, + { + BlockHeight: 13, + TxIndex: 2, + }, } // As we're about to send the final response, we'll launch a goroutine @@ -1335,7 +1419,7 @@ func testGossipSyncerProcessChanRangeReply(t *testing.T, legacy bool) { // If we send the final message, then we should transition to // queryNewChannels as we've sent a non-empty set of new channels. - if err := syncer.processChanRangeReply(replies[2]); err != nil { + if err := syncer.processChanRangeReply(replies[3]); err != nil { t.Fatalf("unable to process reply: %v", err) }