graph/db: refactor to use sqldb.QueryConfig value

This commit is a pure refactor. Here all we are doing is removing the
old `pageSize` constant used in the SQLStore code and instead using the
value provided in the QueryConfig struct.
This commit is contained in:
Elle Mouton
2025-07-31 11:31:32 +02:00
parent e276f1ec3e
commit 0f7c0ae9e7

View File

@@ -31,13 +31,6 @@ import (
"github.com/lightningnetwork/lnd/tor"
)
// pageSize is the limit for the number of records that can be returned
// in a paginated query. This can be tuned after some benchmarks.
//
// TODO(elle): make this configurable & have different defaults for SQLite and
// Postgres.
const pageSize = 10000
// ProtocolVersion is an enum that defines the gossip protocol version of a
// message.
type ProtocolVersion uint8
@@ -834,7 +827,7 @@ func (s *SQLStore) ForEachNode(ctx context.Context,
ctx, sqlc.ListNodesPaginatedParams{
Version: int16(ProtocolV1),
ID: lastID,
Limit: pageSize,
Limit: s.cfg.QueryCfg.MaxPageSize,
},
)
if err != nil {
@@ -948,17 +941,20 @@ func (s *SQLStore) ForEachNodeCacheable(ctx context.Context,
reset func()) error {
err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
return forEachNodeCacheable(ctx, db, func(nodeID int64,
nodePub route.Vertex) error {
return forEachNodeCacheable(
ctx, s.cfg.QueryCfg, db,
func(nodeID int64, nodePub route.Vertex) error {
features, err := getNodeFeatures(
ctx, db, nodeID,
)
if err != nil {
return fmt.Errorf("unable to fetch "+
"node features: %w", err)
}
features, err := getNodeFeatures(ctx, db, nodeID)
if err != nil {
return fmt.Errorf("unable to fetch node "+
"features: %w", err)
}
return cb(nodePub, features)
})
return cb(nodePub, features)
},
)
}, reset)
if err != nil {
return fmt.Errorf("unable to fetch nodes: %w", err)
@@ -1128,118 +1124,117 @@ func (s *SQLStore) ForEachNodeCached(ctx context.Context,
cb func(node route.Vertex, chans map[uint64]*DirectedChannel) error,
reset func()) error {
return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
return forEachNodeCacheable(ctx, db, func(nodeID int64,
nodePub route.Vertex) error {
handleNode := func(db SQLQueries, nodeID int64,
nodePub route.Vertex) error {
features, err := getNodeFeatures(ctx, db, nodeID)
if err != nil {
return fmt.Errorf("unable to fetch "+
"node(id=%d) features: %w", nodeID, err)
}
features, err := getNodeFeatures(ctx, db, nodeID)
if err != nil {
return fmt.Errorf("unable to fetch node(id=%d) "+
"features: %w", nodeID, err)
}
toNodeCallback := func() route.Vertex {
return nodePub
}
toNodeCallback := func() route.Vertex {
return nodePub
}
rows, err := db.ListChannelsByNodeID(
ctx, sqlc.ListChannelsByNodeIDParams{
Version: int16(ProtocolV1),
NodeID1: nodeID,
},
rows, err := db.ListChannelsByNodeID(
ctx, sqlc.ListChannelsByNodeIDParams{
Version: int16(ProtocolV1),
NodeID1: nodeID,
},
)
if err != nil {
return fmt.Errorf("unable to fetch channels of "+
"node(id=%d): %w", nodeID, err)
}
channels := make(map[uint64]*DirectedChannel, len(rows))
for _, row := range rows {
node1, node2, err := buildNodeVertices(
row.Node1Pubkey, row.Node2Pubkey,
)
if err != nil {
return fmt.Errorf("unable to fetch channels "+
"of node(id=%d): %w", nodeID, err)
return err
}
channels := make(map[uint64]*DirectedChannel, len(rows))
for _, row := range rows {
node1, node2, err := buildNodeVertices(
row.Node1Pubkey, row.Node2Pubkey,
)
if err != nil {
return err
}
e, err := getAndBuildEdgeInfo(
ctx, db, s.cfg.ChainHash,
row.GraphChannel, node1, node2,
)
if err != nil {
return fmt.Errorf("unable to build "+
"channel info: %w", err)
}
dbPol1, dbPol2, err := extractChannelPolicies(
row,
)
if err != nil {
return fmt.Errorf("unable to "+
"extract channel "+
"policies: %w", err)
}
p1, p2, err := getAndBuildChanPolicies(
ctx, db, dbPol1, dbPol2, e.ChannelID,
node1, node2,
)
if err != nil {
return fmt.Errorf("unable to "+
"build channel policies: %w",
err)
}
// Determine the outgoing and incoming policy
// for this channel and node combo.
outPolicy, inPolicy := p1, p2
if p1 != nil && p1.ToNode == nodePub {
outPolicy, inPolicy = p2, p1
} else if p2 != nil && p2.ToNode != nodePub {
outPolicy, inPolicy = p2, p1
}
var cachedInPolicy *models.CachedEdgePolicy
if inPolicy != nil {
cachedInPolicy = models.NewCachedPolicy(
inPolicy,
)
cachedInPolicy.ToNodePubKey =
toNodeCallback
cachedInPolicy.ToNodeFeatures =
features
}
var inboundFee lnwire.Fee
if outPolicy != nil {
outPolicy.InboundFee.WhenSome(
func(fee lnwire.Fee) {
inboundFee = fee
},
)
}
directedChannel := &DirectedChannel{
ChannelID: e.ChannelID,
IsNode1: nodePub ==
e.NodeKey1Bytes,
OtherNode: e.NodeKey2Bytes,
Capacity: e.Capacity,
OutPolicySet: outPolicy != nil,
InPolicy: cachedInPolicy,
InboundFee: inboundFee,
}
if nodePub == e.NodeKey2Bytes {
directedChannel.OtherNode =
e.NodeKey1Bytes
}
channels[e.ChannelID] = directedChannel
e, err := getAndBuildEdgeInfo(
ctx, db, s.cfg.ChainHash, row.GraphChannel,
node1, node2,
)
if err != nil {
return fmt.Errorf("unable to build channel "+
"info: %w", err)
}
return cb(nodePub, channels)
})
dbPol1, dbPol2, err := extractChannelPolicies(row)
if err != nil {
return fmt.Errorf("unable to extract channel "+
"policies: %w", err)
}
p1, p2, err := getAndBuildChanPolicies(
ctx, db, dbPol1, dbPol2, e.ChannelID, node1,
node2,
)
if err != nil {
return fmt.Errorf("unable to build channel "+
"policies: %w", err)
}
// Determine the outgoing and incoming policy
// for this channel and node combo.
outPolicy, inPolicy := p1, p2
if p1 != nil && p1.ToNode == nodePub {
outPolicy, inPolicy = p2, p1
} else if p2 != nil && p2.ToNode != nodePub {
outPolicy, inPolicy = p2, p1
}
var cachedInPolicy *models.CachedEdgePolicy
if inPolicy != nil {
cachedInPolicy = models.NewCachedPolicy(
inPolicy,
)
cachedInPolicy.ToNodePubKey = toNodeCallback
cachedInPolicy.ToNodeFeatures = features
}
var inboundFee lnwire.Fee
if outPolicy != nil {
outPolicy.InboundFee.WhenSome(
func(fee lnwire.Fee) {
inboundFee = fee
},
)
}
directedChannel := &DirectedChannel{
ChannelID: e.ChannelID,
IsNode1: nodePub == e.NodeKey1Bytes,
OtherNode: e.NodeKey2Bytes,
Capacity: e.Capacity,
OutPolicySet: outPolicy != nil,
InPolicy: cachedInPolicy,
InboundFee: inboundFee,
}
if nodePub == e.NodeKey2Bytes {
directedChannel.OtherNode = e.NodeKey1Bytes
}
channels[e.ChannelID] = directedChannel
}
return cb(nodePub, channels)
}
return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
return forEachNodeCacheable(
ctx, s.cfg.QueryCfg, db,
func(nodeID int64, nodePub route.Vertex) error {
return handleNode(db, nodeID, nodePub)
},
)
}, reset)
}
@@ -1317,7 +1312,7 @@ func (s *SQLStore) ForEachChannelCacheable(cb func(*models.CachedEdgeInfo,
ctx, sqlc.ListChannelsWithPoliciesForCachePaginatedParams{
Version: int16(ProtocolV1),
ID: lastID,
Limit: pageSize,
Limit: s.cfg.QueryCfg.MaxPageSize,
},
)
if err != nil {
@@ -1408,7 +1403,7 @@ func (s *SQLStore) ForEachChannel(ctx context.Context,
ctx, sqlc.ListChannelsWithPoliciesPaginatedParams{
Version: int16(ProtocolV1),
ID: lastID,
Limit: pageSize,
Limit: s.cfg.QueryCfg.MaxPageSize,
},
)
if err != nil {
@@ -2681,7 +2676,7 @@ func (s *SQLStore) ChannelView() ([]EdgePoint, error) {
ctx, sqlc.ListChannelsPaginatedParams{
Version: int16(ProtocolV1),
ID: lastID,
Limit: pageSize,
Limit: s.cfg.QueryCfg.MaxPageSize,
},
)
if err != nil {
@@ -3142,7 +3137,8 @@ func forEachNodeDirectedChannel(ctx context.Context, db SQLQueries,
// forEachNodeCacheable fetches all V1 node IDs and pub keys from the database,
// and executes the provided callback for each node.
func forEachNodeCacheable(ctx context.Context, db SQLQueries,
func forEachNodeCacheable(ctx context.Context, cfg *sqldb.QueryConfig,
db SQLQueries,
cb func(nodeID int64, nodePub route.Vertex) error) error {
lastID := int64(-1)
@@ -3152,7 +3148,7 @@ func forEachNodeCacheable(ctx context.Context, db SQLQueries,
ctx, sqlc.ListNodeIDsAndPubKeysParams{
Version: int16(ProtocolV1),
ID: lastID,
Limit: pageSize,
Limit: cfg.MaxPageSize,
},
)
if err != nil {