mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-09-08 14:57:38 +02:00
graph/db: migrate closed SCID index
This commit expands `MigrateGraphToSQL` to include the migration of the closed-scid index.
This commit is contained in:
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
"github.com/lightningnetwork/lnd/graph/db/models"
|
||||
"github.com/lightningnetwork/lnd/kvdb"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
"github.com/lightningnetwork/lnd/sqldb"
|
||||
"github.com/lightningnetwork/lnd/sqldb/sqlc"
|
||||
)
|
||||
@@ -62,6 +63,13 @@ func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend,
|
||||
return fmt.Errorf("could not migrate prune log: %w", err)
|
||||
}
|
||||
|
||||
// 5) Migrate the closed SCID index.
|
||||
err = migrateClosedSCIDIndex(ctx, kvBackend, sqlDB)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not migrate closed SCID index: %w",
|
||||
err)
|
||||
}
|
||||
|
||||
log.Infof("Finished migration of the graph store from KV to SQL in %v",
|
||||
time.Since(t0))
|
||||
|
||||
@@ -686,3 +694,68 @@ func forEachPruneLogEntry(db kvdb.Backend, cb func(height uint32,
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
||||
// migrateClosedSCIDIndex migrates the closed SCID index from the KV backend to
|
||||
// the SQL database. It iterates over each closed SCID in the KV store, inserts
|
||||
// it into the SQL database, and then verifies that the SCID was inserted
|
||||
// correctly by checking if the channel with the given SCID is seen as closed in
|
||||
// the SQL database.
|
||||
func migrateClosedSCIDIndex(ctx context.Context, kvBackend kvdb.Backend,
|
||||
sqlDB SQLQueries) error {
|
||||
|
||||
var count uint64
|
||||
migrateSingleClosedSCID := func(scid lnwire.ShortChannelID) error {
|
||||
count++
|
||||
|
||||
chanIDB := channelIDToBytes(scid.ToUint64())
|
||||
err := sqlDB.InsertClosedChannel(ctx, chanIDB)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not insert closed channel "+
|
||||
"with SCID %s: %w", scid, err)
|
||||
}
|
||||
|
||||
// Now, verify that the channel with the given SCID is
|
||||
// seen as closed.
|
||||
isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not check if channel %s "+
|
||||
"is closed: %w", scid, err)
|
||||
}
|
||||
|
||||
if !isClosed {
|
||||
return fmt.Errorf("channel %s should be closed, "+
|
||||
"but is not", scid)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
err := forEachClosedSCID(kvBackend, migrateSingleClosedSCID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not migrate closed SCID index: %w",
|
||||
err)
|
||||
}
|
||||
|
||||
log.Infof("Migrated %d closed SCIDs from KV to SQL", count)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// forEachClosedSCID iterates over each closed SCID in the KV backend and calls
|
||||
// the provided callback function for each SCID.
|
||||
func forEachClosedSCID(db kvdb.Backend,
|
||||
cb func(lnwire.ShortChannelID) error) error {
|
||||
|
||||
return kvdb.View(db, func(tx kvdb.RTx) error {
|
||||
closedScids := tx.ReadBucket(closedScidBucket)
|
||||
if closedScids == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return closedScids.ForEach(func(k, _ []byte) error {
|
||||
return cb(lnwire.NewShortChanIDFromInt(
|
||||
byteOrder.Uint64(k),
|
||||
))
|
||||
})
|
||||
}, func() {})
|
||||
}
|
||||
|
@@ -258,6 +258,22 @@ func TestMigrateGraphToSQL(t *testing.T) {
|
||||
pruneTip: 20,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "closed SCID index",
|
||||
write: func(t *testing.T, db *KVStore, object any) {
|
||||
scid, ok := object.(lnwire.ShortChannelID)
|
||||
require.True(t, ok)
|
||||
|
||||
err := db.PutClosedScid(scid)
|
||||
require.NoError(t, err)
|
||||
},
|
||||
objects: []any{
|
||||
lnwire.NewShortChanIDFromInt(1),
|
||||
lnwire.NewShortChanIDFromInt(2),
|
||||
lnwire.NewShortChanIDFromInt(3),
|
||||
lnwire.NewShortChanIDFromInt(4),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
@@ -324,6 +340,11 @@ func assertInSync(t *testing.T, kvDB *KVStore, sqlDB *SQLStore,
|
||||
// entries in the SQL store. Then we just do a final check to ensure
|
||||
// that the prune tip also matches.
|
||||
checkKVPruneLogEntries(t, kvDB, sqlDB, stats.pruneTip)
|
||||
|
||||
// 5) Assert that the closed SCID index is also in sync. Like the prune
|
||||
// log we iterate through the kvdb store and check that the entries
|
||||
// match the entries in the SQL store.
|
||||
checkClosedSCIDIndex(t, kvDB.db, sqlDB)
|
||||
}
|
||||
|
||||
// fetchAllNodes retrieves all nodes from the given store and returns them
|
||||
@@ -467,6 +488,19 @@ func checkKVPruneLogEntries(t *testing.T, kv *KVStore, sql *SQLStore,
|
||||
require.Equal(t, expTip, int(sqlPruneHeight))
|
||||
}
|
||||
|
||||
// checkClosedSCIDIndex iterates through the closed SCID index in the
|
||||
// KVStore and checks that each SCID is marked as closed in the SQLStore.
|
||||
func checkClosedSCIDIndex(t *testing.T, kv kvdb.Backend, sql *SQLStore) {
|
||||
err := forEachClosedSCID(kv, func(scid lnwire.ShortChannelID) error {
|
||||
closed, err := sql.IsClosedScid(scid)
|
||||
require.NoError(t, err)
|
||||
require.True(t, closed)
|
||||
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// setUpKVStore initializes a new KVStore for testing.
|
||||
func setUpKVStore(t *testing.T) *KVStore {
|
||||
kvDB, cleanup, err := kvdb.GetTestBackend(t.TempDir(), "graph")
|
||||
|
Reference in New Issue
Block a user