From 0f7c0ae9e7c27aafdad92d16063b498c4c914677 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 31 Jul 2025 11:31:32 +0200 Subject: [PATCH] graph/db: refactor to use sqldb.QueryConfig value This commit is a pure refactor. Here all we are doing is removing the old `pageSize` constant used in the SQLStore code and instead using the value provided in the QueryConfig struct. --- graph/db/sql_store.go | 248 +++++++++++++++++++++--------------------- 1 file changed, 122 insertions(+), 126 deletions(-) diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 006085aa5..c828cbc53 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -31,13 +31,6 @@ import ( "github.com/lightningnetwork/lnd/tor" ) -// pageSize is the limit for the number of records that can be returned -// in a paginated query. This can be tuned after some benchmarks. -// -// TODO(elle): make this configurable & have different defaults for SQLite and -// Postgres. -const pageSize = 10000 - // ProtocolVersion is an enum that defines the gossip protocol version of a // message. type ProtocolVersion uint8 @@ -834,7 +827,7 @@ func (s *SQLStore) ForEachNode(ctx context.Context, ctx, sqlc.ListNodesPaginatedParams{ Version: int16(ProtocolV1), ID: lastID, - Limit: pageSize, + Limit: s.cfg.QueryCfg.MaxPageSize, }, ) if err != nil { @@ -948,17 +941,20 @@ func (s *SQLStore) ForEachNodeCacheable(ctx context.Context, reset func()) error { err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { - return forEachNodeCacheable(ctx, db, func(nodeID int64, - nodePub route.Vertex) error { + return forEachNodeCacheable( + ctx, s.cfg.QueryCfg, db, + func(nodeID int64, nodePub route.Vertex) error { + features, err := getNodeFeatures( + ctx, db, nodeID, + ) + if err != nil { + return fmt.Errorf("unable to fetch "+ + "node features: %w", err) + } - features, err := getNodeFeatures(ctx, db, nodeID) - if err != nil { - return fmt.Errorf("unable to fetch node "+ - "features: %w", err) - } - - return cb(nodePub, features) - }) + return cb(nodePub, features) + }, + ) }, reset) if err != nil { return fmt.Errorf("unable to fetch nodes: %w", err) @@ -1128,118 +1124,117 @@ func (s *SQLStore) ForEachNodeCached(ctx context.Context, cb func(node route.Vertex, chans map[uint64]*DirectedChannel) error, reset func()) error { - return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { - return forEachNodeCacheable(ctx, db, func(nodeID int64, - nodePub route.Vertex) error { + handleNode := func(db SQLQueries, nodeID int64, + nodePub route.Vertex) error { - features, err := getNodeFeatures(ctx, db, nodeID) - if err != nil { - return fmt.Errorf("unable to fetch "+ - "node(id=%d) features: %w", nodeID, err) - } + features, err := getNodeFeatures(ctx, db, nodeID) + if err != nil { + return fmt.Errorf("unable to fetch node(id=%d) "+ + "features: %w", nodeID, err) + } - toNodeCallback := func() route.Vertex { - return nodePub - } + toNodeCallback := func() route.Vertex { + return nodePub + } - rows, err := db.ListChannelsByNodeID( - ctx, sqlc.ListChannelsByNodeIDParams{ - Version: int16(ProtocolV1), - NodeID1: nodeID, - }, + rows, err := db.ListChannelsByNodeID( + ctx, sqlc.ListChannelsByNodeIDParams{ + Version: int16(ProtocolV1), + NodeID1: nodeID, + }, + ) + if err != nil { + return fmt.Errorf("unable to fetch channels of "+ + "node(id=%d): %w", nodeID, err) + } + + channels := make(map[uint64]*DirectedChannel, len(rows)) + for _, row := range rows { + node1, node2, err := buildNodeVertices( + row.Node1Pubkey, row.Node2Pubkey, ) if err != nil { - return fmt.Errorf("unable to fetch channels "+ - "of node(id=%d): %w", nodeID, err) + return err } - channels := make(map[uint64]*DirectedChannel, len(rows)) - for _, row := range rows { - node1, node2, err := buildNodeVertices( - row.Node1Pubkey, row.Node2Pubkey, - ) - if err != nil { - return err - } - - e, err := getAndBuildEdgeInfo( - ctx, db, s.cfg.ChainHash, - row.GraphChannel, node1, node2, - ) - if err != nil { - return fmt.Errorf("unable to build "+ - "channel info: %w", err) - } - - dbPol1, dbPol2, err := extractChannelPolicies( - row, - ) - if err != nil { - return fmt.Errorf("unable to "+ - "extract channel "+ - "policies: %w", err) - } - - p1, p2, err := getAndBuildChanPolicies( - ctx, db, dbPol1, dbPol2, e.ChannelID, - node1, node2, - ) - if err != nil { - return fmt.Errorf("unable to "+ - "build channel policies: %w", - err) - } - - // Determine the outgoing and incoming policy - // for this channel and node combo. - outPolicy, inPolicy := p1, p2 - if p1 != nil && p1.ToNode == nodePub { - outPolicy, inPolicy = p2, p1 - } else if p2 != nil && p2.ToNode != nodePub { - outPolicy, inPolicy = p2, p1 - } - - var cachedInPolicy *models.CachedEdgePolicy - if inPolicy != nil { - cachedInPolicy = models.NewCachedPolicy( - inPolicy, - ) - cachedInPolicy.ToNodePubKey = - toNodeCallback - cachedInPolicy.ToNodeFeatures = - features - } - - var inboundFee lnwire.Fee - if outPolicy != nil { - outPolicy.InboundFee.WhenSome( - func(fee lnwire.Fee) { - inboundFee = fee - }, - ) - } - - directedChannel := &DirectedChannel{ - ChannelID: e.ChannelID, - IsNode1: nodePub == - e.NodeKey1Bytes, - OtherNode: e.NodeKey2Bytes, - Capacity: e.Capacity, - OutPolicySet: outPolicy != nil, - InPolicy: cachedInPolicy, - InboundFee: inboundFee, - } - - if nodePub == e.NodeKey2Bytes { - directedChannel.OtherNode = - e.NodeKey1Bytes - } - - channels[e.ChannelID] = directedChannel + e, err := getAndBuildEdgeInfo( + ctx, db, s.cfg.ChainHash, row.GraphChannel, + node1, node2, + ) + if err != nil { + return fmt.Errorf("unable to build channel "+ + "info: %w", err) } - return cb(nodePub, channels) - }) + dbPol1, dbPol2, err := extractChannelPolicies(row) + if err != nil { + return fmt.Errorf("unable to extract channel "+ + "policies: %w", err) + } + + p1, p2, err := getAndBuildChanPolicies( + ctx, db, dbPol1, dbPol2, e.ChannelID, node1, + node2, + ) + if err != nil { + return fmt.Errorf("unable to build channel "+ + "policies: %w", err) + } + + // Determine the outgoing and incoming policy + // for this channel and node combo. + outPolicy, inPolicy := p1, p2 + if p1 != nil && p1.ToNode == nodePub { + outPolicy, inPolicy = p2, p1 + } else if p2 != nil && p2.ToNode != nodePub { + outPolicy, inPolicy = p2, p1 + } + + var cachedInPolicy *models.CachedEdgePolicy + if inPolicy != nil { + cachedInPolicy = models.NewCachedPolicy( + inPolicy, + ) + cachedInPolicy.ToNodePubKey = toNodeCallback + cachedInPolicy.ToNodeFeatures = features + } + + var inboundFee lnwire.Fee + if outPolicy != nil { + outPolicy.InboundFee.WhenSome( + func(fee lnwire.Fee) { + inboundFee = fee + }, + ) + } + + directedChannel := &DirectedChannel{ + ChannelID: e.ChannelID, + IsNode1: nodePub == e.NodeKey1Bytes, + OtherNode: e.NodeKey2Bytes, + Capacity: e.Capacity, + OutPolicySet: outPolicy != nil, + InPolicy: cachedInPolicy, + InboundFee: inboundFee, + } + + if nodePub == e.NodeKey2Bytes { + directedChannel.OtherNode = e.NodeKey1Bytes + } + + channels[e.ChannelID] = directedChannel + } + + return cb(nodePub, channels) + } + + return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { + return forEachNodeCacheable( + ctx, s.cfg.QueryCfg, db, + func(nodeID int64, nodePub route.Vertex) error { + return handleNode(db, nodeID, nodePub) + }, + ) }, reset) } @@ -1317,7 +1312,7 @@ func (s *SQLStore) ForEachChannelCacheable(cb func(*models.CachedEdgeInfo, ctx, sqlc.ListChannelsWithPoliciesForCachePaginatedParams{ Version: int16(ProtocolV1), ID: lastID, - Limit: pageSize, + Limit: s.cfg.QueryCfg.MaxPageSize, }, ) if err != nil { @@ -1408,7 +1403,7 @@ func (s *SQLStore) ForEachChannel(ctx context.Context, ctx, sqlc.ListChannelsWithPoliciesPaginatedParams{ Version: int16(ProtocolV1), ID: lastID, - Limit: pageSize, + Limit: s.cfg.QueryCfg.MaxPageSize, }, ) if err != nil { @@ -2681,7 +2676,7 @@ func (s *SQLStore) ChannelView() ([]EdgePoint, error) { ctx, sqlc.ListChannelsPaginatedParams{ Version: int16(ProtocolV1), ID: lastID, - Limit: pageSize, + Limit: s.cfg.QueryCfg.MaxPageSize, }, ) if err != nil { @@ -3142,7 +3137,8 @@ func forEachNodeDirectedChannel(ctx context.Context, db SQLQueries, // forEachNodeCacheable fetches all V1 node IDs and pub keys from the database, // and executes the provided callback for each node. -func forEachNodeCacheable(ctx context.Context, db SQLQueries, +func forEachNodeCacheable(ctx context.Context, cfg *sqldb.QueryConfig, + db SQLQueries, cb func(nodeID int64, nodePub route.Vertex) error) error { lastID := int64(-1) @@ -3152,7 +3148,7 @@ func forEachNodeCacheable(ctx context.Context, db SQLQueries, ctx, sqlc.ListNodeIDsAndPubKeysParams{ Version: int16(ProtocolV1), ID: lastID, - Limit: pageSize, + Limit: cfg.MaxPageSize, }, ) if err != nil {