graph/db: thread through reset call-backs

In preparation for handling retries on the source DB side, we thread
through the `reset` call-backs properly so that we can reset appropriate
variables.
This commit is contained in:
Elle Mouton
2025-08-15 09:05:57 +02:00
parent aefc9118a4
commit 2c8ac0c92c
2 changed files with 30 additions and 9 deletions

View File

@@ -923,6 +923,14 @@ func migratePruneLog(ctx context.Context, cfg *sqldb.QueryConfig,
return nil
},
func() {
count = 0
chunk = 0
t0 = time.Now()
batch = make(
map[uint32]chainhash.Hash, cfg.MaxBatchSize,
)
},
)
if err != nil {
return fmt.Errorf("could not migrate prune log: %w", err)
@@ -975,7 +983,7 @@ func migratePruneLog(ctx context.Context, cfg *sqldb.QueryConfig,
// forEachPruneLogEntry iterates over each prune log entry in the KV
// backend and calls the provided callback function for each entry.
func forEachPruneLogEntry(db kvdb.Backend, cb func(height uint32,
hash *chainhash.Hash) error) error {
hash *chainhash.Hash) error, reset func()) error {
return kvdb.View(db, func(tx kvdb.RTx) error {
metaBucket := tx.ReadBucket(graphMetaBucket)
@@ -997,7 +1005,7 @@ func forEachPruneLogEntry(db kvdb.Backend, cb func(height uint32,
return cb(blockHeight, &blockHash)
})
}, func() {})
}, reset)
}
// migrateClosedSCIDIndex migrates the closed SCID index from the KV backend to
@@ -1091,7 +1099,14 @@ func migrateClosedSCIDIndex(ctx context.Context, cfg *sqldb.QueryConfig,
return nil
}
err := forEachClosedSCID(kvBackend, migrateSingleClosedSCID)
err := forEachClosedSCID(
kvBackend, migrateSingleClosedSCID, func() {
count = 0
chunk = 0
t0 = time.Now()
batch = make([][]byte, 0, cfg.MaxBatchSize)
},
)
if err != nil {
return fmt.Errorf("could not migrate closed SCID index: %w",
err)
@@ -1270,6 +1285,11 @@ func migrateZombieIndex(ctx context.Context, cfg *sqldb.QueryConfig,
})
return nil
}, func() {
count = 0
chunk = 0
t0 = time.Now()
batch = make(map[uint64]*zombieEntry, cfg.MaxBatchSize)
})
if err != nil {
return fmt.Errorf("could not migrate zombie index: %w", err)
@@ -1293,7 +1313,7 @@ func migrateZombieIndex(ctx context.Context, cfg *sqldb.QueryConfig,
// forEachZombieEntry iterates over each zombie channel entry in the
// KV backend and calls the provided callback function for each entry.
func forEachZombieEntry(db kvdb.Backend, cb func(chanID uint64, pubKey1,
pubKey2 [33]byte) error) error {
pubKey2 [33]byte) error, reset func()) error {
return kvdb.View(db, func(tx kvdb.RTx) error {
edges := tx.ReadBucket(edgeBucket)
@@ -1312,13 +1332,13 @@ func forEachZombieEntry(db kvdb.Backend, cb func(chanID uint64, pubKey1,
return cb(byteOrder.Uint64(k), pubKey1, pubKey2)
})
}, func() {})
}, reset)
}
// forEachClosedSCID iterates over each closed SCID in the KV backend and calls
// the provided callback function for each SCID.
func forEachClosedSCID(db kvdb.Backend,
cb func(lnwire.ShortChannelID) error) error {
cb func(lnwire.ShortChannelID) error, reset func()) error {
return kvdb.View(db, func(tx kvdb.RTx) error {
closedScids := tx.ReadBucket(closedScidBucket)
@@ -1331,5 +1351,5 @@ func forEachClosedSCID(db kvdb.Backend,
byteOrder.Uint64(k),
))
})
}, func() {})
}, reset)
}

View File

@@ -600,6 +600,7 @@ func checkKVPruneLogEntries(t *testing.T, kv *KVStore, sql *SQLStore,
return nil
},
func() {},
)
require.NoError(t, err)
@@ -631,7 +632,7 @@ func checkClosedSCIDIndex(t *testing.T, kv kvdb.Backend, sql *SQLStore) {
require.True(t, closed)
return nil
})
}, func() {})
require.NoError(t, err)
}
@@ -663,7 +664,7 @@ func checkZombieIndex(t *testing.T, kv kvdb.Backend, sql *SQLStore) {
}
return nil
})
}, func() {})
require.NoError(t, err)
}