From 8b82a5c8fe666817b32b05c01a34200ecd3337ed Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 4 Jul 2025 11:24:08 +0200 Subject: [PATCH] graph/db: migrate zombie index to SQL This commit expands `MigrateGraphToSQL` to include migration of the zombie index. NOTE: we take this opportunity to clean up the zombie index a bit by first checking for each channel in the zombie index if it has been marked as closed in the closed-scid index. If it has, then there is no need to include it in the zombie index. A special case test for this is added too. --- graph/db/sql_migration.go | 108 +++++++++++++++++++++++++++ graph/db/sql_migration_test.go | 131 ++++++++++++++++++++++++++++++++- 2 files changed, 237 insertions(+), 2 deletions(-) diff --git a/graph/db/sql_migration.go b/graph/db/sql_migration.go index 5a0aa1422..dfe5eb985 100644 --- a/graph/db/sql_migration.go +++ b/graph/db/sql_migration.go @@ -70,6 +70,11 @@ func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend, err) } + // 6) Migrate the zombie index. + if err := migrateZombieIndex(ctx, kvBackend, sqlDB); err != nil { + return fmt.Errorf("could not migrate zombie index: %w", err) + } + log.Infof("Finished migration of the graph store from KV to SQL in %v", time.Since(t0)) @@ -741,6 +746,109 @@ func migrateClosedSCIDIndex(ctx context.Context, kvBackend kvdb.Backend, return nil } +// migrateZombieIndex migrates the zombie index from the KV backend to +// the SQL database. It iterates over each zombie channel in the KV store, +// inserts it into the SQL database, and then verifies that the channel is +// indeed marked as a zombie channel in the SQL database. +// +// NOTE: before inserting an entry into the zombie index, the function checks +// if the channel is already marked as closed in the SQL store. If it is, +// the entry is skipped. This means that the resulting zombie index count in +// the SQL store may well be less than the count of zombie channels in the KV +// store. +func migrateZombieIndex(ctx context.Context, kvBackend kvdb.Backend, + sqlDB SQLQueries) error { + + var count uint64 + err := forEachZombieEntry(kvBackend, func(chanID uint64, pubKey1, + pubKey2 [33]byte) error { + + chanIDB := channelIDToBytes(chanID) + + // If it is in the closed SCID index, we don't need to + // add it to the zombie index. + // + // NOTE: this means that the resulting zombie index count in + // the SQL store may well be less than the count of zombie + // channels in the KV store. + isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB) + if err != nil { + return fmt.Errorf("could not check closed "+ + "channel: %w", err) + } + if isClosed { + return nil + } + + count++ + + err = sqlDB.UpsertZombieChannel( + ctx, sqlc.UpsertZombieChannelParams{ + Version: int16(ProtocolV1), + Scid: chanIDB, + NodeKey1: pubKey1[:], + NodeKey2: pubKey2[:], + }, + ) + if err != nil { + return fmt.Errorf("could not upsert zombie "+ + "channel %d: %w", chanID, err) + } + + // Finally, verify that the channel is indeed marked as a + // zombie channel. + isZombie, err := sqlDB.IsZombieChannel( + ctx, sqlc.IsZombieChannelParams{ + Version: int16(ProtocolV1), + Scid: chanIDB, + }, + ) + if err != nil { + return fmt.Errorf("could not check if "+ + "channel %d is zombie: %w", chanID, err) + } + + if !isZombie { + return fmt.Errorf("channel %d should be "+ + "a zombie, but is not", chanID) + } + + return nil + }) + if err != nil { + return fmt.Errorf("could not migrate zombie index: %w", err) + } + + log.Infof("Migrated %d zombie channels from KV to SQL", count) + + return nil +} + +// forEachZombieEntry iterates over each zombie channel entry in the +// KV backend and calls the provided callback function for each entry. +func forEachZombieEntry(db kvdb.Backend, cb func(chanID uint64, pubKey1, + pubKey2 [33]byte) error) error { + + return kvdb.View(db, func(tx kvdb.RTx) error { + edges := tx.ReadBucket(edgeBucket) + if edges == nil { + return ErrGraphNoEdgesFound + } + zombieIndex := edges.NestedReadBucket(zombieBucket) + if zombieIndex == nil { + return nil + } + + return zombieIndex.ForEach(func(k, v []byte) error { + var pubKey1, pubKey2 [33]byte + copy(pubKey1[:], v[:33]) + copy(pubKey2[:], v[33:]) + + return cb(byteOrder.Uint64(k), pubKey1, pubKey2) + }) + }, func() {}) +} + // forEachClosedSCID iterates over each closed SCID in the KV backend and calls // the provided callback function for each SCID. func forEachClosedSCID(db kvdb.Backend, diff --git a/graph/db/sql_migration_test.go b/graph/db/sql_migration_test.go index eb081b772..886055fb2 100644 --- a/graph/db/sql_migration_test.go +++ b/graph/db/sql_migration_test.go @@ -83,6 +83,12 @@ func TestMigrateGraphToSQL(t *testing.T) { node2 = genPubKey(t) ) + type zombieIndexObject struct { + scid uint64 + pubKey1 route.Vertex + pubKey2 route.Vertex + } + tests := []struct { name string write func(t *testing.T, db *KVStore, object any) @@ -274,6 +280,35 @@ func TestMigrateGraphToSQL(t *testing.T) { lnwire.NewShortChanIDFromInt(4), }, }, + { + name: "zombie index", + write: func(t *testing.T, db *KVStore, object any) { + obj, ok := object.(*zombieIndexObject) + require.True(t, ok) + + err := db.MarkEdgeZombie( + obj.scid, obj.pubKey1, obj.pubKey2, + ) + require.NoError(t, err) + }, + objects: []any{ + &zombieIndexObject{ + scid: prand.Uint64(), + pubKey1: genPubKey(t), + pubKey2: genPubKey(t), + }, + &zombieIndexObject{ + scid: prand.Uint64(), + pubKey1: genPubKey(t), + pubKey2: genPubKey(t), + }, + &zombieIndexObject{ + scid: prand.Uint64(), + pubKey1: genPubKey(t), + pubKey2: genPubKey(t), + }, + }, + }, } for _, test := range tests { @@ -345,6 +380,9 @@ func assertInSync(t *testing.T, kvDB *KVStore, sqlDB *SQLStore, // log we iterate through the kvdb store and check that the entries // match the entries in the SQL store. checkClosedSCIDIndex(t, kvDB.db, sqlDB) + + // 6) Finally, check that the zombie index is also in sync. + checkZombieIndex(t, kvDB.db, sqlDB) } // fetchAllNodes retrieves all nodes from the given store and returns them @@ -501,6 +539,38 @@ func checkClosedSCIDIndex(t *testing.T, kv kvdb.Backend, sql *SQLStore) { require.NoError(t, err) } +// checkZombieIndex iterates through the zombie index in the +// KVStore and checks that each SCID is marked as a zombie in the SQLStore. +func checkZombieIndex(t *testing.T, kv kvdb.Backend, sql *SQLStore) { + err := forEachZombieEntry(kv, func(chanID uint64, pubKey1, + pubKey2 [33]byte) error { + + scid := lnwire.NewShortChanIDFromInt(chanID) + + // The migration logic skips zombie entries if they are already + // present in the closed SCID index in the SQL DB. We need to + // replicate that check here. + isClosed, err := sql.IsClosedScid(scid) + require.NoError(t, err) + + isZombie, _, _, err := sql.IsZombieEdge(chanID) + require.NoError(t, err) + + if isClosed { + // If it's in the closed index, it should NOT be in the + // zombie index. + require.False(t, isZombie) + } else { + // If it's not in the closed index, it SHOULD be in the + // zombie index. + require.True(t, isZombie) + } + + return nil + }) + require.NoError(t, err) +} + // setUpKVStore initializes a new KVStore for testing. func setUpKVStore(t *testing.T) *KVStore { kvDB, cleanup, err := kvdb.GetTestBackend(t.TempDir(), "graph") @@ -989,6 +1059,39 @@ func TestSQLMigrationEdgeCases(t *testing.T) { }}, }) }) + + // This test covers the case where the KV store contains zombie entries + // that it also has entries for in the closed SCID index. In this case, + // the SQL store will only insert zombie entries for channels that + // are not yet closed. + t.Run("zombies and closed scids", func(t *testing.T) { + var ( + n1, n2 route.Vertex + cID1 = uint64(1) + cID2 = uint64(2) + ) + + populateKV := func(t *testing.T, db *KVStore) { + // Mark both channels as zombies. + err := db.MarkEdgeZombie(cID1, n1, n2) + require.NoError(t, err) + + err = db.MarkEdgeZombie(cID2, n1, n2) + require.NoError(t, err) + + // Mark channel 1 as closed. + err = db.PutClosedScid( + lnwire.NewShortChanIDFromInt(cID1), + ) + require.NoError(t, err) + } + + runTestMigration(t, populateKV, dbState{ + chans: make(chanSet, 0), + closed: []uint64{1}, + zombies: []uint64{2}, + }) + }) } // runTestMigration is a helper function that sets up the KVStore and SQLStore, @@ -1020,8 +1123,10 @@ func runTestMigration(t *testing.T, populateKV func(t *testing.T, db *KVStore), // dbState describes the expected state of the SQLStore after a migration. type dbState struct { - nodes []*models.LightningNode - chans chanSet + nodes []*models.LightningNode + chans chanSet + closed []uint64 + zombies []uint64 } // assertResultState asserts that the SQLStore contains the expected @@ -1032,4 +1137,26 @@ func assertResultState(t *testing.T, sql *SQLStore, expState dbState) { require.ElementsMatch( t, expState.chans, fetchAllChannelsAndPolicies(t, sql), ) + + for _, closed := range expState.closed { + isClosed, err := sql.IsClosedScid( + lnwire.NewShortChanIDFromInt(closed), + ) + require.NoError(t, err) + require.True(t, isClosed) + + // Any closed SCID should NOT be in the zombie + // index. + isZombie, _, _, err := sql.IsZombieEdge(closed) + require.NoError(t, err) + require.False(t, isZombie) + } + + for _, zombie := range expState.zombies { + isZombie, _, _, err := sql.IsZombieEdge( + zombie, + ) + require.NoError(t, err) + require.True(t, isZombie) + } }