diff --git a/graph/db/sql_migration.go b/graph/db/sql_migration.go index 5aea3d335..45656e64e 100644 --- a/graph/db/sql_migration.go +++ b/graph/db/sql_migration.go @@ -923,6 +923,14 @@ func migratePruneLog(ctx context.Context, cfg *sqldb.QueryConfig, return nil }, + func() { + count = 0 + chunk = 0 + t0 = time.Now() + batch = make( + map[uint32]chainhash.Hash, cfg.MaxBatchSize, + ) + }, ) if err != nil { return fmt.Errorf("could not migrate prune log: %w", err) @@ -975,7 +983,7 @@ func migratePruneLog(ctx context.Context, cfg *sqldb.QueryConfig, // forEachPruneLogEntry iterates over each prune log entry in the KV // backend and calls the provided callback function for each entry. func forEachPruneLogEntry(db kvdb.Backend, cb func(height uint32, - hash *chainhash.Hash) error) error { + hash *chainhash.Hash) error, reset func()) error { return kvdb.View(db, func(tx kvdb.RTx) error { metaBucket := tx.ReadBucket(graphMetaBucket) @@ -997,7 +1005,7 @@ func forEachPruneLogEntry(db kvdb.Backend, cb func(height uint32, return cb(blockHeight, &blockHash) }) - }, func() {}) + }, reset) } // migrateClosedSCIDIndex migrates the closed SCID index from the KV backend to @@ -1091,7 +1099,14 @@ func migrateClosedSCIDIndex(ctx context.Context, cfg *sqldb.QueryConfig, return nil } - err := forEachClosedSCID(kvBackend, migrateSingleClosedSCID) + err := forEachClosedSCID( + kvBackend, migrateSingleClosedSCID, func() { + count = 0 + chunk = 0 + t0 = time.Now() + batch = make([][]byte, 0, cfg.MaxBatchSize) + }, + ) if err != nil { return fmt.Errorf("could not migrate closed SCID index: %w", err) @@ -1270,6 +1285,11 @@ func migrateZombieIndex(ctx context.Context, cfg *sqldb.QueryConfig, }) return nil + }, func() { + count = 0 + chunk = 0 + t0 = time.Now() + batch = make(map[uint64]*zombieEntry, cfg.MaxBatchSize) }) if err != nil { return fmt.Errorf("could not migrate zombie index: %w", err) @@ -1293,7 +1313,7 @@ func migrateZombieIndex(ctx context.Context, cfg *sqldb.QueryConfig, // forEachZombieEntry iterates over each zombie channel entry in the // KV backend and calls the provided callback function for each entry. func forEachZombieEntry(db kvdb.Backend, cb func(chanID uint64, pubKey1, - pubKey2 [33]byte) error) error { + pubKey2 [33]byte) error, reset func()) error { return kvdb.View(db, func(tx kvdb.RTx) error { edges := tx.ReadBucket(edgeBucket) @@ -1312,13 +1332,13 @@ func forEachZombieEntry(db kvdb.Backend, cb func(chanID uint64, pubKey1, return cb(byteOrder.Uint64(k), pubKey1, pubKey2) }) - }, func() {}) + }, reset) } // forEachClosedSCID iterates over each closed SCID in the KV backend and calls // the provided callback function for each SCID. func forEachClosedSCID(db kvdb.Backend, - cb func(lnwire.ShortChannelID) error) error { + cb func(lnwire.ShortChannelID) error, reset func()) error { return kvdb.View(db, func(tx kvdb.RTx) error { closedScids := tx.ReadBucket(closedScidBucket) @@ -1331,5 +1351,5 @@ func forEachClosedSCID(db kvdb.Backend, byteOrder.Uint64(k), )) }) - }, func() {}) + }, reset) } diff --git a/graph/db/sql_migration_test.go b/graph/db/sql_migration_test.go index 13e3bff80..d4bba602d 100644 --- a/graph/db/sql_migration_test.go +++ b/graph/db/sql_migration_test.go @@ -600,6 +600,7 @@ func checkKVPruneLogEntries(t *testing.T, kv *KVStore, sql *SQLStore, return nil }, + func() {}, ) require.NoError(t, err) @@ -631,7 +632,7 @@ func checkClosedSCIDIndex(t *testing.T, kv kvdb.Backend, sql *SQLStore) { require.True(t, closed) return nil - }) + }, func() {}) require.NoError(t, err) } @@ -663,7 +664,7 @@ func checkZombieIndex(t *testing.T, kv kvdb.Backend, sql *SQLStore) { } return nil - }) + }, func() {}) require.NoError(t, err) }