graph/db: migrate zombie index to SQL

This commit expands `MigrateGraphToSQL` to include migration of the
zombie index.

NOTE: we take this opportunity to clean up the zombie index a bit by
first checking for each channel in the zombie index if it has been
marked as closed in the closed-scid index. If it has, then there is no
need to include it in the zombie index. A special case test for this
is added too.
This commit is contained in:
Elle Mouton
2025-07-04 11:24:08 +02:00
parent 0cb17bbbe5
commit 8b82a5c8fe
2 changed files with 237 additions and 2 deletions

View File

@@ -70,6 +70,11 @@ func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend,
err)
}
// 6) Migrate the zombie index.
if err := migrateZombieIndex(ctx, kvBackend, sqlDB); err != nil {
return fmt.Errorf("could not migrate zombie index: %w", err)
}
log.Infof("Finished migration of the graph store from KV to SQL in %v",
time.Since(t0))
@@ -741,6 +746,109 @@ func migrateClosedSCIDIndex(ctx context.Context, kvBackend kvdb.Backend,
return nil
}
// migrateZombieIndex migrates the zombie index from the KV backend to
// the SQL database. It iterates over each zombie channel in the KV store,
// inserts it into the SQL database, and then verifies that the channel is
// indeed marked as a zombie channel in the SQL database.
//
// NOTE: before inserting an entry into the zombie index, the function checks
// if the channel is already marked as closed in the SQL store. If it is,
// the entry is skipped. This means that the resulting zombie index count in
// the SQL store may well be less than the count of zombie channels in the KV
// store.
func migrateZombieIndex(ctx context.Context, kvBackend kvdb.Backend,
sqlDB SQLQueries) error {
var count uint64
err := forEachZombieEntry(kvBackend, func(chanID uint64, pubKey1,
pubKey2 [33]byte) error {
chanIDB := channelIDToBytes(chanID)
// If it is in the closed SCID index, we don't need to
// add it to the zombie index.
//
// NOTE: this means that the resulting zombie index count in
// the SQL store may well be less than the count of zombie
// channels in the KV store.
isClosed, err := sqlDB.IsClosedChannel(ctx, chanIDB)
if err != nil {
return fmt.Errorf("could not check closed "+
"channel: %w", err)
}
if isClosed {
return nil
}
count++
err = sqlDB.UpsertZombieChannel(
ctx, sqlc.UpsertZombieChannelParams{
Version: int16(ProtocolV1),
Scid: chanIDB,
NodeKey1: pubKey1[:],
NodeKey2: pubKey2[:],
},
)
if err != nil {
return fmt.Errorf("could not upsert zombie "+
"channel %d: %w", chanID, err)
}
// Finally, verify that the channel is indeed marked as a
// zombie channel.
isZombie, err := sqlDB.IsZombieChannel(
ctx, sqlc.IsZombieChannelParams{
Version: int16(ProtocolV1),
Scid: chanIDB,
},
)
if err != nil {
return fmt.Errorf("could not check if "+
"channel %d is zombie: %w", chanID, err)
}
if !isZombie {
return fmt.Errorf("channel %d should be "+
"a zombie, but is not", chanID)
}
return nil
})
if err != nil {
return fmt.Errorf("could not migrate zombie index: %w", err)
}
log.Infof("Migrated %d zombie channels from KV to SQL", count)
return nil
}
// forEachZombieEntry iterates over each zombie channel entry in the
// KV backend and calls the provided callback function for each entry.
func forEachZombieEntry(db kvdb.Backend, cb func(chanID uint64, pubKey1,
pubKey2 [33]byte) error) error {
return kvdb.View(db, func(tx kvdb.RTx) error {
edges := tx.ReadBucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
}
zombieIndex := edges.NestedReadBucket(zombieBucket)
if zombieIndex == nil {
return nil
}
return zombieIndex.ForEach(func(k, v []byte) error {
var pubKey1, pubKey2 [33]byte
copy(pubKey1[:], v[:33])
copy(pubKey2[:], v[33:])
return cb(byteOrder.Uint64(k), pubKey1, pubKey2)
})
}, func() {})
}
// forEachClosedSCID iterates over each closed SCID in the KV backend and calls
// the provided callback function for each SCID.
func forEachClosedSCID(db kvdb.Backend,

View File

@@ -83,6 +83,12 @@ func TestMigrateGraphToSQL(t *testing.T) {
node2 = genPubKey(t)
)
type zombieIndexObject struct {
scid uint64
pubKey1 route.Vertex
pubKey2 route.Vertex
}
tests := []struct {
name string
write func(t *testing.T, db *KVStore, object any)
@@ -274,6 +280,35 @@ func TestMigrateGraphToSQL(t *testing.T) {
lnwire.NewShortChanIDFromInt(4),
},
},
{
name: "zombie index",
write: func(t *testing.T, db *KVStore, object any) {
obj, ok := object.(*zombieIndexObject)
require.True(t, ok)
err := db.MarkEdgeZombie(
obj.scid, obj.pubKey1, obj.pubKey2,
)
require.NoError(t, err)
},
objects: []any{
&zombieIndexObject{
scid: prand.Uint64(),
pubKey1: genPubKey(t),
pubKey2: genPubKey(t),
},
&zombieIndexObject{
scid: prand.Uint64(),
pubKey1: genPubKey(t),
pubKey2: genPubKey(t),
},
&zombieIndexObject{
scid: prand.Uint64(),
pubKey1: genPubKey(t),
pubKey2: genPubKey(t),
},
},
},
}
for _, test := range tests {
@@ -345,6 +380,9 @@ func assertInSync(t *testing.T, kvDB *KVStore, sqlDB *SQLStore,
// log we iterate through the kvdb store and check that the entries
// match the entries in the SQL store.
checkClosedSCIDIndex(t, kvDB.db, sqlDB)
// 6) Finally, check that the zombie index is also in sync.
checkZombieIndex(t, kvDB.db, sqlDB)
}
// fetchAllNodes retrieves all nodes from the given store and returns them
@@ -501,6 +539,38 @@ func checkClosedSCIDIndex(t *testing.T, kv kvdb.Backend, sql *SQLStore) {
require.NoError(t, err)
}
// checkZombieIndex iterates through the zombie index in the
// KVStore and checks that each SCID is marked as a zombie in the SQLStore.
func checkZombieIndex(t *testing.T, kv kvdb.Backend, sql *SQLStore) {
err := forEachZombieEntry(kv, func(chanID uint64, pubKey1,
pubKey2 [33]byte) error {
scid := lnwire.NewShortChanIDFromInt(chanID)
// The migration logic skips zombie entries if they are already
// present in the closed SCID index in the SQL DB. We need to
// replicate that check here.
isClosed, err := sql.IsClosedScid(scid)
require.NoError(t, err)
isZombie, _, _, err := sql.IsZombieEdge(chanID)
require.NoError(t, err)
if isClosed {
// If it's in the closed index, it should NOT be in the
// zombie index.
require.False(t, isZombie)
} else {
// If it's not in the closed index, it SHOULD be in the
// zombie index.
require.True(t, isZombie)
}
return nil
})
require.NoError(t, err)
}
// setUpKVStore initializes a new KVStore for testing.
func setUpKVStore(t *testing.T) *KVStore {
kvDB, cleanup, err := kvdb.GetTestBackend(t.TempDir(), "graph")
@@ -989,6 +1059,39 @@ func TestSQLMigrationEdgeCases(t *testing.T) {
}},
})
})
// This test covers the case where the KV store contains zombie entries
// that it also has entries for in the closed SCID index. In this case,
// the SQL store will only insert zombie entries for channels that
// are not yet closed.
t.Run("zombies and closed scids", func(t *testing.T) {
var (
n1, n2 route.Vertex
cID1 = uint64(1)
cID2 = uint64(2)
)
populateKV := func(t *testing.T, db *KVStore) {
// Mark both channels as zombies.
err := db.MarkEdgeZombie(cID1, n1, n2)
require.NoError(t, err)
err = db.MarkEdgeZombie(cID2, n1, n2)
require.NoError(t, err)
// Mark channel 1 as closed.
err = db.PutClosedScid(
lnwire.NewShortChanIDFromInt(cID1),
)
require.NoError(t, err)
}
runTestMigration(t, populateKV, dbState{
chans: make(chanSet, 0),
closed: []uint64{1},
zombies: []uint64{2},
})
})
}
// runTestMigration is a helper function that sets up the KVStore and SQLStore,
@@ -1020,8 +1123,10 @@ func runTestMigration(t *testing.T, populateKV func(t *testing.T, db *KVStore),
// dbState describes the expected state of the SQLStore after a migration.
type dbState struct {
nodes []*models.LightningNode
chans chanSet
nodes []*models.LightningNode
chans chanSet
closed []uint64
zombies []uint64
}
// assertResultState asserts that the SQLStore contains the expected
@@ -1032,4 +1137,26 @@ func assertResultState(t *testing.T, sql *SQLStore, expState dbState) {
require.ElementsMatch(
t, expState.chans, fetchAllChannelsAndPolicies(t, sql),
)
for _, closed := range expState.closed {
isClosed, err := sql.IsClosedScid(
lnwire.NewShortChanIDFromInt(closed),
)
require.NoError(t, err)
require.True(t, isClosed)
// Any closed SCID should NOT be in the zombie
// index.
isZombie, _, _, err := sql.IsZombieEdge(closed)
require.NoError(t, err)
require.False(t, isZombie)
}
for _, zombie := range expState.zombies {
isZombie, _, _, err := sql.IsZombieEdge(
zombie,
)
require.NoError(t, err)
require.True(t, isZombie)
}
}