diff --git a/discovery/chan_series.go b/discovery/chan_series.go index 1fcb13d55..6c75635be 100644 --- a/discovery/chan_series.go +++ b/discovery/chan_series.go @@ -5,6 +5,7 @@ import ( "time" "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/lightningnetwork/lnd/fn/v2" graphdb "github.com/lightningnetwork/lnd/graph/db" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/netann" @@ -114,12 +115,13 @@ func (c *ChanSeries) UpdatesInHorizon(chain chainhash.Hash, // First, we'll query for all the set of channels that have an update // that falls within the specified horizon. - chansInHorizon, err := c.graph.ChanUpdatesInHorizon( + chansInHorizonIter, err := c.graph.ChanUpdatesInHorizon( startTime, endTime, ) if err != nil { return nil, err } + chansInHorizon := fn.Collect(chansInHorizonIter) // nodesFromChan records the nodes seen from the channels. nodesFromChan := make(map[[33]byte]struct{}, len(chansInHorizon)*2) diff --git a/graph/builder.go b/graph/builder.go index 594093697..c77d0fea2 100644 --- a/graph/builder.go +++ b/graph/builder.go @@ -593,13 +593,13 @@ func (b *Builder) pruneZombieChans() error { startTime := time.Unix(0, 0) endTime := time.Now().Add(-1 * chanExpiry) - oldEdges, err := b.cfg.Graph.ChanUpdatesInHorizon(startTime, endTime) + oldEdgesIter, err := b.cfg.Graph.ChanUpdatesInHorizon(startTime, endTime) if err != nil { return fmt.Errorf("unable to fetch expired channel updates "+ "chans: %v", err) } - for _, u := range oldEdges { + for u := range oldEdgesIter { err = filterPruneChans(u.Info, u.Policy1, u.Policy2) if err != nil { return fmt.Errorf("error filtering channels to "+ diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index 9a25f2292..730fd8363 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -2028,10 +2028,11 @@ func TestChanUpdatesInHorizon(t *testing.T) { // If we issue an arbitrary query before any channel updates are // inserted in the database, we should get zero results. - chanUpdates, err := graph.ChanUpdatesInHorizon( + chanUpdatesIter, err := graph.ChanUpdatesInHorizon( time.Unix(999, 0), time.Unix(9999, 0), ) require.NoError(t, err, "unable to updates for updates") + chanUpdates := fn.Collect(chanUpdatesIter) if len(chanUpdates) != 0 { t.Fatalf("expected 0 chan updates, instead got %v", len(chanUpdates)) @@ -2144,12 +2145,13 @@ func TestChanUpdatesInHorizon(t *testing.T) { }, } for _, queryCase := range queryCases { - resp, err := graph.ChanUpdatesInHorizon( + respIter, err := graph.ChanUpdatesInHorizon( queryCase.start, queryCase.end, ) if err != nil { t.Fatalf("unable to query for updates: %v", err) } + resp := fn.Collect(respIter) if len(resp) != len(queryCase.resp) { t.Fatalf("expected %v chans, got %v chans", diff --git a/graph/db/interfaces.go b/graph/db/interfaces.go index fb77120e1..c97b26c5c 100644 --- a/graph/db/interfaces.go +++ b/graph/db/interfaces.go @@ -220,8 +220,8 @@ type V1Store interface { //nolint:interfacebloat // ChanUpdatesInHorizon returns all the known channel edges which have // at least one edge that has an update timestamp within the specified // horizon. - ChanUpdatesInHorizon(startTime, endTime time.Time) ([]ChannelEdge, - error) + ChanUpdatesInHorizon(startTime, endTime time.Time, + opts ...IteratorOption) (iter.Seq[ChannelEdge], error) // FilterKnownChanIDs takes a set of channel IDs and return the subset // of chan ID's that we don't know and are not known zombies of the diff --git a/graph/db/kv_store.go b/graph/db/kv_store.go index 080fd3812..7bd958861 100644 --- a/graph/db/kv_store.go +++ b/graph/db/kv_store.go @@ -2040,107 +2040,202 @@ type ChannelEdge struct { Node2 *models.Node } -// ChanUpdatesInHorizon returns all the known channel edges which have at least -// one edge that has an update timestamp within the specified horizon. -func (c *KVStore) ChanUpdatesInHorizon(startTime, - endTime time.Time) ([]ChannelEdge, error) { - - // To ensure we don't return duplicate ChannelEdges, we'll use an - // additional map to keep track of the edges already seen to prevent - // re-adding it. - var edgesSeen map[uint64]struct{} - var edgesToCache map[uint64]ChannelEdge - var edgesInHorizon []ChannelEdge +// updateChanCacheBatch updates the channel cache with multiple edges at once. +// This method acquires the cache lock only once for the entire batch. +func (c *KVStore) updateChanCacheBatch(edgesToCache map[uint64]ChannelEdge) { + if len(edgesToCache) == 0 { + return + } c.cacheMu.Lock() defer c.cacheMu.Unlock() - var hits int + for cid, edge := range edgesToCache { + c.chanCache.insert(cid, edge) + } +} + +// isEmptyGraphError returns true if the error indicates the graph database +// is empty (no edges or nodes exist). These errors are expected when the +// graph is first created or has no data. +func isEmptyGraphError(err error) bool { + return errors.Is(err, ErrGraphNoEdgesFound) || + errors.Is(err, ErrGraphNodesNotFound) +} + +// chanUpdatesIterator holds the state for chunked channel update iteration. +type chanUpdatesIterator struct { + // batchSize is the amount of channel updates to read at a single time. + batchSize int + + // startTime is the start time of the iteration request. + startTime time.Time + + // endTime is the end time of the iteration request. + endTime time.Time + + // edgesSeen is used to dedup edges. + edgesSeen map[uint64]struct{} + + // edgesToCache houses all the edges that we read from the disk which + // aren't yet cached. This is used to update the cache after a batch + // chunk. + edgesToCache map[uint64]ChannelEdge + + // lastSeenKey is the last index key seen. This is used to resume + // iteration. + lastSeenKey []byte + + // hits is the number of cache hits. + hits int + + // total is the total number of edges requested. + total int +} + +// newChanUpdatesIterator makes a new chan updates iterator. +func newChanUpdatesIterator(batchSize int, + startTime, endTime time.Time) *chanUpdatesIterator { + + return &chanUpdatesIterator{ + batchSize: batchSize, + startTime: startTime, + endTime: endTime, + edgesSeen: make(map[uint64]struct{}), + edgesToCache: make(map[uint64]ChannelEdge), + lastSeenKey: nil, + } +} + +// fetchNextChanUpdateBatch retrieves the next batch of channel edges within the +// horizon. Returns the batch, whether there are more edges, and any error. +func (c *KVStore) fetchNextChanUpdateBatch( + state *chanUpdatesIterator) ([]ChannelEdge, bool, error) { + + var ( + batch []ChannelEdge + hasMore bool + ) err := kvdb.View(c.db, func(tx kvdb.RTx) error { edges := tx.ReadBucket(edgeBucket) if edges == nil { return ErrGraphNoEdgesFound } + edgeIndex := edges.NestedReadBucket(edgeIndexBucket) if edgeIndex == nil { return ErrGraphNoEdgesFound } + edgeUpdateIndex := edges.NestedReadBucket(edgeUpdateIndexBucket) if edgeUpdateIndex == nil { return ErrGraphNoEdgesFound } - nodes := tx.ReadBucket(nodeBucket) if nodes == nil { return ErrGraphNodesNotFound } - // We'll now obtain a cursor to perform a range query within - // the index to find all channels within the horizon. + // With all the relevant buckets read, we'll now create a fresh + // read cursor. updateCursor := edgeUpdateIndex.ReadCursor() + // We'll now use the start and end time to create the keys that + // we'll use to seek. var startTimeBytes, endTimeBytes [8 + 8]byte byteOrder.PutUint64( - startTimeBytes[:8], uint64(startTime.Unix()), + startTimeBytes[:8], uint64(state.startTime.Unix()), ) byteOrder.PutUint64( - endTimeBytes[:8], uint64(endTime.Unix()), + endTimeBytes[:8], uint64(state.endTime.Unix()), ) - // With our start and end times constructed, we'll step through - // the index collecting the info and policy of each update of - // each channel that has a last update within the time range. - // - //nolint:ll - for indexKey, _ := updateCursor.Seek(startTimeBytes[:]); indexKey != nil && - bytes.Compare(indexKey, endTimeBytes[:]) <= 0; indexKey, _ = updateCursor.Next() { - // We have a new eligible entry, so we'll slice of the - // chan ID so we can query it in the DB. + var indexKey []byte + + // If we left off earlier, then we'll use that key as the + // starting point. + switch { + case state.lastSeenKey != nil: + // Seek to the last seen key, moving to the key right + // after it. + indexKey, _ = updateCursor.Seek(state.lastSeenKey) + + if bytes.Equal(indexKey, state.lastSeenKey) { + indexKey, _ = updateCursor.Next() + } + + // Otherwise, we'll move to the very start of the time range. + default: + indexKey, _ = updateCursor.Seek(startTimeBytes[:]) + } + + // TODO(roasbeef): iterate the channel graph cache instead w/ a + // treap ordering? + + // Now we'll read items up to the batch size, exiting early if + // we exceed the ending time. + for len(batch) < state.batchSize && indexKey != nil { + // If we're at the end, then we'll break out now. + if bytes.Compare(indexKey, endTimeBytes[:]) > 0 { + break + } + chanID := indexKey[8:] - - // If we've already retrieved the info and policies for - // this edge, then we can skip it as we don't need to do - // so again. chanIDInt := byteOrder.Uint64(chanID) - if _, ok := edgesSeen[chanIDInt]; ok { + + if state.lastSeenKey == nil { + state.lastSeenKey = make([]byte, len(indexKey)) + } + copy(state.lastSeenKey, indexKey) + + // If we've seen this channel ID already, then we'll + // skip it. + if _, ok := state.edgesSeen[chanIDInt]; ok { + indexKey, _ = updateCursor.Next() continue } + // Before we read the edge info, we'll see if this + // element is already in the cache or not. + c.cacheMu.RLock() if channel, ok := c.chanCache.get(chanIDInt); ok { - hits++ - edgesSeen[chanIDInt] = struct{}{} - edgesInHorizon = append(edgesInHorizon, channel) + state.edgesSeen[chanIDInt] = struct{}{} + + batch = append(batch, channel) + + state.hits++ + state.total++ + + indexKey, _ = updateCursor.Next() + + c.cacheMu.RUnlock() continue } + c.cacheMu.RUnlock() - // First, we'll fetch the static edge information. + // The edge wasn't in the cache, so we'll fetch it along + // w/ the edge policies and nodes. edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID) if err != nil { - chanID := byteOrder.Uint64(chanID) - return fmt.Errorf("unable to fetch info for "+ - "edge with chan_id=%v: %v", chanID, err) + return fmt.Errorf("unable to fetch info "+ + "for edge with chan_id=%v: %v", + chanIDInt, err) } - - // With the static information obtained, we'll now - // fetch the dynamic policy info. edge1, edge2, err := fetchChanEdgePolicies( edgeIndex, edges, chanID, ) if err != nil { - chanID := byteOrder.Uint64(chanID) - return fmt.Errorf("unable to fetch policies "+ - "for edge with chan_id=%v: %v", chanID, - err) + return fmt.Errorf("unable to fetch "+ + "policies for edge with chan_id=%v: %v", + chanIDInt, err) } - node1, err := fetchLightningNode( nodes, edgeInfo.NodeKey1Bytes[:], ) if err != nil { return err } - node2, err := fetchLightningNode( nodes, edgeInfo.NodeKey2Bytes[:], ) @@ -2148,9 +2243,8 @@ func (c *KVStore) ChanUpdatesInHorizon(startTime, return err } - // Finally, we'll collate this edge with the rest of - // edges to be returned. - edgesSeen[chanIDInt] = struct{}{} + // Now we have all the information we need to build the + // channel edge. channel := ChannelEdge{ Info: &edgeInfo, Policy1: edge1, @@ -2158,41 +2252,102 @@ func (c *KVStore) ChanUpdatesInHorizon(startTime, Node1: &node1, Node2: &node2, } - edgesInHorizon = append(edgesInHorizon, channel) - edgesToCache[chanIDInt] = channel + + state.edgesSeen[chanIDInt] = struct{}{} + state.edgesToCache[chanIDInt] = channel + + batch = append(batch, channel) + + state.total++ + + // Advance the iterator to the next entry. + indexKey, _ = updateCursor.Next() + } + + // If we haven't yet crossed the endTimeBytes, then we still + // have more entries to deliver. + if indexKey != nil && + bytes.Compare(indexKey, endTimeBytes[:]) <= 0 { + + hasMore = true } return nil }, func() { - edgesSeen = make(map[uint64]struct{}) - edgesToCache = make(map[uint64]ChannelEdge) - edgesInHorizon = nil + batch = nil + hasMore = false }) - switch { - case errors.Is(err, ErrGraphNoEdgesFound): - fallthrough - case errors.Is(err, ErrGraphNodesNotFound): - break - - case err != nil: - return nil, err + if err != nil { + return nil, false, err } - // Insert any edges loaded from disk into the cache. - for chanid, channel := range edgesToCache { - c.chanCache.insert(chanid, channel) + return batch, hasMore, nil +} + +// ChanUpdatesInHorizon returns all the known channel edges which have at least +// one edge that has an update timestamp within the specified horizon. +func (c *KVStore) ChanUpdatesInHorizon(startTime, endTime time.Time, + opts ...IteratorOption) (iter.Seq[ChannelEdge], error) { + + cfg := defaultIteratorConfig() + for _, opt := range opts { + opt(cfg) } - if len(edgesInHorizon) > 0 { - log.Debugf("ChanUpdatesInHorizon hit percentage: %.2f (%d/%d)", - float64(hits)*100/float64(len(edgesInHorizon)), hits, - len(edgesInHorizon)) - } else { - log.Debugf("ChanUpdatesInHorizon returned no edges in "+ - "horizon (%s, %s)", startTime, endTime) - } + return func(yield func(ChannelEdge) bool) { + iterState := newChanUpdatesIterator( + cfg.chanUpdateIterBatchSize, startTime, endTime, + ) - return edgesInHorizon, nil + for { + // At the top of the loop, we'll read the next batch + // chunk from disk. We'll also determine if we have any + // more entries after this or not. + batch, hasMore, err := c.fetchNextChanUpdateBatch( + iterState, + ) + // TODO(roasbeef): yield error here? + if err != nil { + // These errors just mean the graph is empty, + // which is OK. + if !isEmptyGraphError(err) { + + log.Errorf("ChanUpdatesInHorizon "+ + "batch error: %v", err) + return + } + // Continue with empty batch + } + + // We'll now yield each edge that we just read. If yield + // returns false, then that means that we'll exit early. + for _, edge := range batch { + if !yield(edge) { + return + } + } + + // Update cache after successful batch yield. + c.updateChanCacheBatch(iterState.edgesToCache) + iterState.edgesToCache = make(map[uint64]ChannelEdge) + + // If we we're done, then we can just break out here + // now. + if !hasMore || len(batch) == 0 { + break + } + } + + if iterState.total > 0 { + log.Tracef("ChanUpdatesInHorizon hit percentage: "+ + "%.2f (%d/%d)", float64(iterState.hits)*100/ + float64(iterState.total), iterState.hits, + iterState.total) + } else { + log.Tracef("ChanUpdatesInHorizon returned no edges "+ + "in horizon (%s, %s)", startTime, endTime) + } + }, nil } // nodeUpdatesIterator maintains state for iterating through node updates. diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index b617aa3b8..38485f6c1 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -933,7 +933,8 @@ func (s *SQLStore) ForEachNodeChannel(ctx context.Context, nodePub route.Vertex, // // NOTE: This is part of the V1Store interface. func (s *SQLStore) ChanUpdatesInHorizon(startTime, - endTime time.Time) ([]ChannelEdge, error) { + endTime time.Time, + opts ...IteratorOption) (iter.Seq[ChannelEdge], error) { s.cacheMu.Lock() defer s.cacheMu.Unlock() @@ -1033,7 +1034,13 @@ func (s *SQLStore) ChanUpdatesInHorizon(startTime, "horizon (%s, %s)", startTime, endTime) } - return edges, nil + return func(yield func(ChannelEdge) bool) { + for _, edge := range edges { + if !yield(edge) { + return + } + } + }, nil } // ForEachNodeCached is similar to forEachNode, but it returns DirectedChannel diff --git a/graph/interfaces.go b/graph/interfaces.go index ec4f42b79..cf660ec50 100644 --- a/graph/interfaces.go +++ b/graph/interfaces.go @@ -2,6 +2,7 @@ package graph import ( "context" + "iter" "time" "github.com/btcsuite/btcd/chaincfg/chainhash" @@ -152,8 +153,9 @@ type DB interface { // ChanUpdatesInHorizon returns all the known channel edges which have // at least one edge that has an update timestamp within the specified // horizon. - ChanUpdatesInHorizon(startTime, endTime time.Time) ( - []graphdb.ChannelEdge, error) + ChanUpdatesInHorizon(startTime, endTime time.Time, + opts ...graphdb.IteratorOption) ( + iter.Seq[graphdb.ChannelEdge], error) // DeleteChannelEdges removes edges with the given channel IDs from the // database and marks them as zombies. This ensures that we're unable to