diff --git a/discovery/chan_series.go b/discovery/chan_series.go index cf502694c..1fcb13d55 100644 --- a/discovery/chan_series.go +++ b/discovery/chan_series.go @@ -201,7 +201,7 @@ func (c *ChanSeries) UpdatesInHorizon(chain chainhash.Hash, return nil, err } - for _, nodeAnn := range nodeAnnsInHorizon { + for nodeAnn := range nodeAnnsInHorizon { // If this node has not been seen in the above channels, we can // skip sending its NodeAnnouncement. if _, seen := nodesFromChan[nodeAnn.PubKeyBytes]; !seen { diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index 6975c9d48..9a25f2292 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -2263,9 +2263,11 @@ func TestNodeUpdatesInHorizon(t *testing.T) { }, } for _, queryCase := range queryCases { - resp, err := graph.NodeUpdatesInHorizon( + respIter, err := graph.NodeUpdatesInHorizon( queryCase.start, queryCase.end, ) + + resp := fn.Collect(respIter) require.NoError(t, err) require.Len(t, resp, len(queryCase.resp)) @@ -3507,11 +3509,12 @@ func TestNodePruningUpdateIndexDeletion(t *testing.T) { // update time of our test node. startTime := time.Unix(9, 0) endTime := node1.LastUpdate.Add(time.Minute) - nodesInHorizon, err := graph.NodeUpdatesInHorizon(startTime, endTime) + nodesInHorizonIter, err := graph.NodeUpdatesInHorizon(startTime, endTime) require.NoError(t, err, "unable to fetch nodes in horizon") // We should only have a single node, and that node should exactly // match the node we just inserted. + nodesInHorizon := fn.Collect(nodesInHorizonIter) if len(nodesInHorizon) != 1 { t.Fatalf("should have 1 nodes instead have: %v", len(nodesInHorizon)) @@ -3525,10 +3528,10 @@ func TestNodePruningUpdateIndexDeletion(t *testing.T) { // Now that the node has been deleted, we'll again query the nodes in // the horizon. This time we should have no nodes at all. - nodesInHorizon, err = graph.NodeUpdatesInHorizon(startTime, endTime) + nodesInHorizonIter, err = graph.NodeUpdatesInHorizon(startTime, endTime) require.NoError(t, err, "unable to fetch nodes in horizon") - if len(nodesInHorizon) != 0 { + if len(fn.Collect(nodesInHorizonIter)) != 0 { t.Fatalf("should have zero nodes instead have: %v", len(nodesInHorizon)) } diff --git a/graph/db/interfaces.go b/graph/db/interfaces.go index f90c2da8a..fb77120e1 100644 --- a/graph/db/interfaces.go +++ b/graph/db/interfaces.go @@ -2,6 +2,7 @@ package graphdb import ( "context" + "iter" "net" "time" @@ -109,8 +110,8 @@ type V1Store interface { //nolint:interfacebloat // an update timestamp within the passed range. This method can be used // by two nodes to quickly determine if they have the same set of up to // date node announcements. - NodeUpdatesInHorizon(startTime, - endTime time.Time) ([]models.Node, error) + NodeUpdatesInHorizon(startTime, endTime time.Time, + opts ...IteratorOption) (iter.Seq[models.Node], error) // FetchNode attempts to look up a target node by its identity // public key. If the node isn't found in the database, then diff --git a/graph/db/kv_store.go b/graph/db/kv_store.go index 165f51987..080fd3812 100644 --- a/graph/db/kv_store.go +++ b/graph/db/kv_store.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "io" + "iter" "math" "net" "sort" @@ -2194,57 +2195,175 @@ func (c *KVStore) ChanUpdatesInHorizon(startTime, return edgesInHorizon, nil } -// NodeUpdatesInHorizon returns all the known lightning node which have an -// update timestamp within the passed range. This method can be used by two -// nodes to quickly determine if they have the same set of up to date node -// announcements. -func (c *KVStore) NodeUpdatesInHorizon(startTime, - endTime time.Time) ([]models.Node, error) { +// nodeUpdatesIterator maintains state for iterating through node updates. +// +// Iterator Lifecycle: +// 1. Initialize state with start/end time, batch size, and filtering options +// 2. Fetch batch using pagination cursor (lastSeenKey) +// 3. Filter nodes if publicNodesOnly is set +// 4. Update lastSeenKey to the last processed node's index key +// 5. Repeat until we exceed endTime or no more nodes exist +type nodeUpdatesIterator struct { + // batchSize is the amount of node updates to read at a single time. + batchSize int - var nodesInHorizon []models.Node + // startTime is the start time of the iteration request. + startTime time.Time + + // endTime is the end time of the iteration request. + endTime time.Time + + // lastSeenKey is the last index key seen. This is used to resume + // iteration. + lastSeenKey []byte + + // publicNodesOnly filters to only return public nodes if true. + publicNodesOnly bool + + // total tracks total nodes processed. + total int +} + +// newNodeUpdatesIterator makes a new node updates iterator. +func newNodeUpdatesIterator(batchSize int, startTime, endTime time.Time, + publicNodesOnly bool) *nodeUpdatesIterator { + + return &nodeUpdatesIterator{ + batchSize: batchSize, + startTime: startTime, + endTime: endTime, + lastSeenKey: nil, + publicNodesOnly: publicNodesOnly, + } +} + +// fetchNextNodeBatch fetches the next batch of node announcements using the +// iterator state. +func (c *KVStore) fetchNextNodeBatch( + state *nodeUpdatesIterator) ([]models.Node, bool, error) { + + var ( + nodeBatch []models.Node + hasMore bool + ) err := kvdb.View(c.db, func(tx kvdb.RTx) error { nodes := tx.ReadBucket(nodeBucket) if nodes == nil { return ErrGraphNodesNotFound } - + ourPubKey := nodes.Get(sourceKey) + if ourPubKey == nil && state.publicNodesOnly { + // If we're filtering for public nodes only but don't + // have a source node set, we can't determine if nodes + // are public. A node is considered public if it has at + // least one channel with our node (the source node). + return ErrSourceNodeNotSet + } nodeUpdateIndex := nodes.NestedReadBucket(nodeUpdateIndexBucket) if nodeUpdateIndex == nil { return ErrGraphNodesNotFound } - // We'll now obtain a cursor to perform a range query within - // the index to find all node announcements within the horizon. + // We'll now obtain a cursor to perform a range query within the + // index to find all node announcements within the horizon. The + // nodeUpdateIndex key format is: [8 bytes timestamp][33 bytes + // node pubkey] This allows efficient range queries by time + // while maintaining a stable sort order for nodes with the same + // timestamp. updateCursor := nodeUpdateIndex.ReadCursor() var startTimeBytes, endTimeBytes [8 + 33]byte byteOrder.PutUint64( - startTimeBytes[:8], uint64(startTime.Unix()), + startTimeBytes[:8], uint64(state.startTime.Unix()), ) byteOrder.PutUint64( - endTimeBytes[:8], uint64(endTime.Unix()), + endTimeBytes[:8], uint64(state.endTime.Unix()), ) - // With our start and end times constructed, we'll step through - // the index collecting info for each node within the time - // range. - // - //nolint:ll - for indexKey, _ := updateCursor.Seek(startTimeBytes[:]); indexKey != nil && - bytes.Compare(indexKey, endTimeBytes[:]) <= 0; indexKey, _ = updateCursor.Next() { + // If we have a last seen key (existing iteration), then that'll + // be our starting point. Otherwise, we'll seek to the start + // time. + var indexKey []byte + if state.lastSeenKey != nil { + indexKey, _ = updateCursor.Seek(state.lastSeenKey) + + if bytes.Equal(indexKey, state.lastSeenKey) { + indexKey, _ = updateCursor.Next() + } + } else { + indexKey, _ = updateCursor.Seek(startTimeBytes[:]) + } + + // Now we'll read items up to the batch size, exiting early if + // we exceed the ending time. + var lastProcessedKey []byte + for len(nodeBatch) < state.batchSize && indexKey != nil { + // Extract the timestamp from the index key (first 8 + // bytes). Only compare timestamps, not the full key + // with pubkey. + keyTimestamp := byteOrder.Uint64(indexKey[:8]) + endTimestamp := uint64(state.endTime.Unix()) + if keyTimestamp > endTimestamp { + break + } + nodePub := indexKey[8:] node, err := fetchLightningNode(nodes, nodePub) if err != nil { return err } - nodesInHorizon = append(nodesInHorizon, node) + if state.publicNodesOnly { + nodeIsPublic, err := c.isPublic( + tx, node.PubKeyBytes, ourPubKey, + ) + if err != nil { + return err + } + if !nodeIsPublic { + indexKey, _ = updateCursor.Next() + continue + } + } + + nodeBatch = append(nodeBatch, node) + state.total++ + + // Remember the last key we actually processed. We'll + // use this to update the last seen key below. + if lastProcessedKey == nil { + lastProcessedKey = make([]byte, len(indexKey)) + } + copy(lastProcessedKey, indexKey) + + // Advance the iterator to the next entry. + indexKey, _ = updateCursor.Next() + } + + // If we haven't yet crossed the endTime, then we still + // have more entries to deliver. + if indexKey != nil { + keyTimestamp := byteOrder.Uint64(indexKey[:8]) + endTimestamp := uint64(state.endTime.Unix()) + if keyTimestamp <= endTimestamp { + hasMore = true + } + } + + // Update the cursor to the last key we actually processed. + if lastProcessedKey != nil { + if state.lastSeenKey == nil { + state.lastSeenKey = make( + []byte, len(lastProcessedKey), + ) + } + copy(state.lastSeenKey, lastProcessedKey) } return nil }, func() { - nodesInHorizon = nil + nodeBatch = nil }) switch { case errors.Is(err, ErrGraphNoEdgesFound): @@ -2253,10 +2372,53 @@ func (c *KVStore) NodeUpdatesInHorizon(startTime, break case err != nil: - return nil, err + return nil, false, err } - return nodesInHorizon, nil + return nodeBatch, hasMore, nil +} + +// NodeUpdatesInHorizon returns all the known lightning node which have an +// update timestamp within the passed range. +func (c *KVStore) NodeUpdatesInHorizon(startTime, + endTime time.Time, + opts ...IteratorOption) (iter.Seq[models.Node], error) { + + cfg := defaultIteratorConfig() + for _, opt := range opts { + opt(cfg) + } + + return func(yield func(models.Node) bool) { + // Initialize iterator state. + state := newNodeUpdatesIterator( + cfg.nodeUpdateIterBatchSize, + startTime, endTime, + cfg.iterPublicNodes, + ) + + for { + nodeAnns, hasMore, err := c.fetchNextNodeBatch(state) + if err != nil { + log.Errorf("unable to read node updates in "+ + "horizon: %v", err) + + return + } + + for _, node := range nodeAnns { + if !yield(node) { + return + } + } + + // If we we're done, then we can just break out here + // now. + if !hasMore || len(nodeAnns) == 0 { + break + } + } + }, nil } // FilterKnownChanIDs takes a set of channel IDs and return the subset of chan diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 23fd50482..b617aa3b8 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -7,6 +7,7 @@ import ( "encoding/hex" "errors" "fmt" + "iter" "maps" "math" "net" @@ -553,7 +554,8 @@ func (s *SQLStore) SetSourceNode(ctx context.Context, // // NOTE: This is part of the V1Store interface. func (s *SQLStore) NodeUpdatesInHorizon(startTime, - endTime time.Time) ([]models.Node, error) { + endTime time.Time, + opts ...IteratorOption) (iter.Seq[models.Node], error) { ctx := context.TODO() @@ -587,7 +589,13 @@ func (s *SQLStore) NodeUpdatesInHorizon(startTime, return nil, fmt.Errorf("unable to fetch nodes: %w", err) } - return nodes, nil + return func(yield func(models.Node) bool) { + for _, node := range nodes { + if !yield(node) { + return + } + } + }, nil } // AddChannelEdge adds a new (undirected, blank) edge to the graph database. An