graph/db: prep SQLStore for reset param

This commit just surfaces the locations in the SQLStore where we will
later pass reset params through.
This commit is contained in:
Elle Mouton
2025-07-11 10:11:30 +02:00
parent 57d4ce0f9b
commit e00f0a03fc

View File

@@ -729,6 +729,8 @@ func (s *SQLStore) ForEachSourceNodeChannel(ctx context.Context,
cb func(chanPoint wire.OutPoint, havePolicy bool, cb func(chanPoint wire.OutPoint, havePolicy bool,
otherNode *models.LightningNode) error) error { otherNode *models.LightningNode) error) error {
reset := func() {}
return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
nodeID, nodePub, err := s.getSourceNode(ctx, db, ProtocolV1) nodeID, nodePub, err := s.getSourceNode(ctx, db, ProtocolV1)
if err != nil { if err != nil {
@@ -773,7 +775,7 @@ func (s *SQLStore) ForEachSourceNodeChannel(ctx context.Context,
) )
}, },
) )
}, sqldb.NoOpReset) }, reset)
} }
// ForEachNode iterates through all the stored vertices/nodes in the graph, // ForEachNode iterates through all the stored vertices/nodes in the graph,
@@ -787,6 +789,8 @@ func (s *SQLStore) ForEachSourceNodeChannel(ctx context.Context,
func (s *SQLStore) ForEachNode(ctx context.Context, func (s *SQLStore) ForEachNode(ctx context.Context,
cb func(tx NodeRTx) error) error { cb func(tx NodeRTx) error) error {
reset := func() {}
var lastID int64 = 0 var lastID int64 = 0
handleNode := func(db SQLQueries, dbNode sqlc.Node) error { handleNode := func(db SQLQueries, dbNode sqlc.Node) error {
node, err := buildNode(ctx, db, &dbNode) node, err := buildNode(ctx, db, &dbNode)
@@ -835,7 +839,7 @@ func (s *SQLStore) ForEachNode(ctx context.Context,
} }
return nil return nil
}, sqldb.NoOpReset) }, reset)
} }
// sqlGraphNodeTx is an implementation of the NodeRTx interface backed by the // sqlGraphNodeTx is an implementation of the NodeRTx interface backed by the
@@ -909,11 +913,12 @@ func (s *sqlGraphNodeTx) FetchNode(nodePub route.Vertex) (NodeRTx, error) {
func (s *SQLStore) ForEachNodeDirectedChannel(nodePub route.Vertex, func (s *SQLStore) ForEachNodeDirectedChannel(nodePub route.Vertex,
cb func(channel *DirectedChannel) error) error { cb func(channel *DirectedChannel) error) error {
reset := func() {}
var ctx = context.TODO() var ctx = context.TODO()
return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
return forEachNodeDirectedChannel(ctx, db, nodePub, cb) return forEachNodeDirectedChannel(ctx, db, nodePub, cb)
}, sqldb.NoOpReset) }, reset)
} }
// ForEachNodeCacheable iterates through all the stored vertices/nodes in the // ForEachNodeCacheable iterates through all the stored vertices/nodes in the
@@ -925,6 +930,8 @@ func (s *SQLStore) ForEachNodeDirectedChannel(nodePub route.Vertex,
func (s *SQLStore) ForEachNodeCacheable(ctx context.Context, func (s *SQLStore) ForEachNodeCacheable(ctx context.Context,
cb func(route.Vertex, *lnwire.FeatureVector) error) error { cb func(route.Vertex, *lnwire.FeatureVector) error) error {
reset := func() {}
err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
return forEachNodeCacheable(ctx, db, func(nodeID int64, return forEachNodeCacheable(ctx, db, func(nodeID int64,
nodePub route.Vertex) error { nodePub route.Vertex) error {
@@ -937,7 +944,7 @@ func (s *SQLStore) ForEachNodeCacheable(ctx context.Context,
return cb(nodePub, features) return cb(nodePub, features)
}) })
}, sqldb.NoOpReset) }, reset)
if err != nil { if err != nil {
return fmt.Errorf("unable to fetch nodes: %w", err) return fmt.Errorf("unable to fetch nodes: %w", err)
} }
@@ -959,6 +966,8 @@ func (s *SQLStore) ForEachNodeChannel(ctx context.Context, nodePub route.Vertex,
cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
*models.ChannelEdgePolicy) error) error { *models.ChannelEdgePolicy) error) error {
reset := func() {}
return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
dbNode, err := db.GetNodeByPubKey( dbNode, err := db.GetNodeByPubKey(
ctx, sqlc.GetNodeByPubKeyParams{ ctx, sqlc.GetNodeByPubKeyParams{
@@ -975,7 +984,7 @@ func (s *SQLStore) ForEachNodeChannel(ctx context.Context, nodePub route.Vertex,
return forEachNodeChannel( return forEachNodeChannel(
ctx, db, s.cfg.ChainHash, dbNode.ID, cb, ctx, db, s.cfg.ChainHash, dbNode.ID, cb,
) )
}, sqldb.NoOpReset) }, reset)
} }
// ChanUpdatesInHorizon returns all the known channel edges which have at least // ChanUpdatesInHorizon returns all the known channel edges which have at least
@@ -1107,6 +1116,8 @@ func (s *SQLStore) ForEachNodeCached(ctx context.Context,
cb func(node route.Vertex, cb func(node route.Vertex,
chans map[uint64]*DirectedChannel) error) error { chans map[uint64]*DirectedChannel) error) error {
reset := func() {}
return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
return forEachNodeCacheable(ctx, db, func(nodeID int64, return forEachNodeCacheable(ctx, db, func(nodeID int64,
nodePub route.Vertex) error { nodePub route.Vertex) error {
@@ -1218,7 +1229,7 @@ func (s *SQLStore) ForEachNodeCached(ctx context.Context,
return cb(nodePub, channels) return cb(nodePub, channels)
}) })
}, sqldb.NoOpReset) }, reset)
} }
// ForEachChannelCacheable iterates through all the channel edges stored // ForEachChannelCacheable iterates through all the channel edges stored
@@ -1237,6 +1248,7 @@ func (s *SQLStore) ForEachChannelCacheable(cb func(*models.CachedEdgeInfo,
*models.CachedEdgePolicy, *models.CachedEdgePolicy,
*models.CachedEdgePolicy) error) error { *models.CachedEdgePolicy) error) error {
reset := func() {}
ctx := context.TODO() ctx := context.TODO()
handleChannel := func(db SQLQueries, handleChannel := func(db SQLQueries,
@@ -1315,7 +1327,7 @@ func (s *SQLStore) ForEachChannelCacheable(cb func(*models.CachedEdgeInfo,
} }
return nil return nil
}, sqldb.NoOpReset) }, reset)
} }
// ForEachChannel iterates through all the channel edges stored within the // ForEachChannel iterates through all the channel edges stored within the
@@ -1333,6 +1345,8 @@ func (s *SQLStore) ForEachChannel(ctx context.Context,
cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy,
*models.ChannelEdgePolicy) error) error { *models.ChannelEdgePolicy) error) error {
reset := func() {}
handleChannel := func(db SQLQueries, handleChannel := func(db SQLQueries,
row sqlc.ListChannelsWithPoliciesPaginatedRow) error { row sqlc.ListChannelsWithPoliciesPaginatedRow) error {
@@ -1406,7 +1420,7 @@ func (s *SQLStore) ForEachChannel(ctx context.Context,
} }
return nil return nil
}, sqldb.NoOpReset) }, reset)
} }
// FilterChannelRange returns the channel ID's of all known channels which were // FilterChannelRange returns the channel ID's of all known channels which were
@@ -2757,9 +2771,11 @@ func (s *SQLStore) IsClosedScid(scid lnwire.ShortChannelID) (bool, error) {
func (s *SQLStore) GraphSession(cb func(graph NodeTraverser) error) error { func (s *SQLStore) GraphSession(cb func(graph NodeTraverser) error) error {
var ctx = context.TODO() var ctx = context.TODO()
reset := func() {}
return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
return cb(newSQLNodeTraverser(db, s.cfg.ChainHash)) return cb(newSQLNodeTraverser(db, s.cfg.ChainHash))
}, sqldb.NoOpReset) }, reset)
} }
// sqlNodeTraverser implements the NodeTraverser interface but with a backing // sqlNodeTraverser implements the NodeTraverser interface but with a backing