diff --git a/graph/db/sql_migration.go b/graph/db/sql_migration.go index d1b37ea1f..004577731 100644 --- a/graph/db/sql_migration.go +++ b/graph/db/sql_migration.go @@ -4,20 +4,31 @@ import ( "context" "errors" "fmt" + "reflect" + "sort" "time" "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/davecgh/go-spew/spew" + "github.com/lightningnetwork/lnd/graph/db/models" "github.com/lightningnetwork/lnd/kvdb" + "github.com/lightningnetwork/lnd/sqldb/sqlc" + "github.com/pmezard/go-difflib/difflib" ) +// ErrMigrationMismatch is returned when a migrated graph record does not match +// the original record. +var ErrMigrationMismatch = fmt.Errorf("migrated graph record does not match " + + "original record") + // MigrateGraphToSQL migrates the graph store from a KV backend to a SQL // backend. // // NOTE: this is currently not called from any code path. It is called via tests // only for now and will be called from the main lnd binary once the // migration is fully implemented and tested. -func MigrateGraphToSQL(_ context.Context, kvBackend kvdb.Backend, - _ SQLQueries, _ chainhash.Hash) error { +func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend, + sqlDB SQLQueries, _ chainhash.Hash) error { log.Infof("Starting migration of the graph store from KV to SQL") t0 := time.Now() @@ -32,6 +43,11 @@ func MigrateGraphToSQL(_ context.Context, kvBackend kvdb.Backend, return nil } + // 1) Migrate all the nodes. + if err := migrateNodes(ctx, kvBackend, sqlDB); err != nil { + return fmt.Errorf("could not migrate nodes: %w", err) + } + log.Infof("Finished migration of the graph store from KV to SQL in %v", time.Since(t0)) @@ -61,3 +77,127 @@ func checkGraphExists(db kvdb.Backend) (bool, error) { return true, nil } + +// migrateNodes migrates all nodes from the KV backend to the SQL database. +// This includes doing a sanity check after each migration to ensure that the +// migrated node matches the original node. +func migrateNodes(ctx context.Context, kvBackend kvdb.Backend, + sqlDB SQLQueries) error { + + // Keep track of the number of nodes migrated and the number of + // nodes skipped due to errors. + var ( + count uint64 + skipped uint64 + ) + + // Loop through each node in the KV store and insert it into the SQL + // database. + err := forEachNode(kvBackend, func(_ kvdb.RTx, + node *models.LightningNode) error { + + pub := node.PubKeyBytes + + // Sanity check to ensure that the node has valid extra opaque + // data. If it does not, we'll skip it. We need to do this + // because previously we would just persist any TLV bytes that + // we received without validating them. Now, however, we + // normalise the storage of extra opaque data, so we need to + // ensure that the data is valid. We don't want to abort the + // migration if we encounter a node with invalid extra opaque + // data, so we'll just skip it and log a warning. + _, err := marshalExtraOpaqueData(node.ExtraOpaqueData) + if errors.Is(err, ErrParsingExtraTLVBytes) { + skipped++ + log.Warnf("Skipping migration of node %x with invalid "+ + "extra opaque data: %v", pub, + node.ExtraOpaqueData) + + return nil + } else if err != nil { + return fmt.Errorf("unable to marshal extra "+ + "opaque data for node %x: %w", pub, err) + } + + count++ + + // Write the node to the SQL database. + id, err := upsertNode(ctx, sqlDB, node) + if err != nil { + return fmt.Errorf("could not persist node(%x): %w", pub, + err) + } + + // Fetch it from the SQL store and compare it against the + // original node object to ensure the migration was successful. + dbNode, err := sqlDB.GetNodeByPubKey( + ctx, sqlc.GetNodeByPubKeyParams{ + PubKey: node.PubKeyBytes[:], + Version: int16(ProtocolV1), + }, + ) + if err != nil { + return fmt.Errorf("could not get node by pubkey (%x)"+ + "after migration: %w", pub, err) + } + + // Sanity check: ensure the migrated node ID matches the one we + // just inserted. + if dbNode.ID != id { + return fmt.Errorf("node ID mismatch for node (%x) "+ + "after migration: expected %d, got %d", + pub, id, dbNode.ID) + } + + migratedNode, err := buildNode(ctx, sqlDB, &dbNode) + if err != nil { + return fmt.Errorf("could not build migrated node "+ + "from dbNode(db id: %d, node pub: %x): %w", + dbNode.ID, pub, err) + } + + // Make sure that the node addresses are sorted before + // comparing them to ensure that the order of addresses does + // not affect the comparison. + sort.Slice(node.Addresses, func(i, j int) bool { + return node.Addresses[i].String() < + node.Addresses[j].String() + }) + sort.Slice(migratedNode.Addresses, func(i, j int) bool { + return migratedNode.Addresses[i].String() < + migratedNode.Addresses[j].String() + }) + + return compare(node, migratedNode, fmt.Sprintf("node %x", pub)) + }) + if err != nil { + return fmt.Errorf("could not migrate nodes: %w", err) + } + + log.Infof("Migrated %d nodes from KV to SQL (skipped %d nodes due to "+ + "invalid TLV streams)", count, skipped) + + return nil +} + +// compare checks if the original and migrated objects are equal. If they +// are not, it returns an error with a unified diff of the two objects. +func compare(original, migrated any, identifier string) error { + if reflect.DeepEqual(original, migrated) { + return nil + } + + diff := difflib.UnifiedDiff{ + A: difflib.SplitLines(spew.Sdump(original)), + B: difflib.SplitLines(spew.Sdump(migrated)), + FromFile: "Expected", + FromDate: "", + ToFile: "Actual", + ToDate: "", + Context: 3, + } + diffText, _ := difflib.GetUnifiedDiffString(diff) + + return fmt.Errorf("%w: %s.\n%v", ErrMigrationMismatch, identifier, + diffText) +} diff --git a/graph/db/sql_migration_test.go b/graph/db/sql_migration_test.go index 47373b7b3..ff6f3913d 100644 --- a/graph/db/sql_migration_test.go +++ b/graph/db/sql_migration_test.go @@ -3,23 +3,39 @@ package graphdb import ( + "bytes" "context" + "fmt" + "image/color" + "net" "os" "path" + "sort" "strings" "testing" "time" + "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btclog/v2" + "github.com/lightningnetwork/lnd/graph/db/models" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/kvdb/sqlbase" "github.com/lightningnetwork/lnd/kvdb/sqlite" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/sqldb" "github.com/stretchr/testify/require" ) -var testChain = *chaincfg.MainNetParams.GenesisHash +var ( + testChain = *chaincfg.MainNetParams.GenesisHash + testColor = color.RGBA{R: 1, G: 2, B: 3} + testTime = time.Unix(11111, 0) + testSigBytes = testSig.Serialize() + testExtraData = []byte{1, 1, 1, 2, 2, 2, 2} + testEmptyFeatures = lnwire.EmptyFeatureVector() +) // TestMigrateGraphToSQL tests various deterministic cases that we want to test // for to ensure that our migration from a graph store backed by a KV DB to a @@ -29,6 +45,19 @@ func TestMigrateGraphToSQL(t *testing.T) { t.Parallel() ctx := context.Background() + writeUpdate := func(t *testing.T, db *KVStore, object any) { + t.Helper() + + var err error + switch obj := object.(type) { + case *models.LightningNode: + err = db.AddLightningNode(ctx, obj) + default: + err = fmt.Errorf("unhandled object type: %T", obj) + } + require.NoError(t, err) + } + tests := []struct { name string write func(t *testing.T, db *KVStore, object any) @@ -38,6 +67,44 @@ func TestMigrateGraphToSQL(t *testing.T) { { name: "empty", }, + { + name: "nodes", + write: writeUpdate, + //nolint:ll + objects: []any{ + // Normal node with all fields. + makeTestNode(t), + // A node with no node announcement. + makeTestShellNode(t), + // A node with an announcement but no addresses. + makeTestNode(t, func(n *models.LightningNode) { + n.Addresses = nil + }), + // A node with all types of addresses. + makeTestNode(t, func(n *models.LightningNode) { + n.Addresses = []net.Addr{ + testAddr, + testIPV4Addr, + testIPV6Addr, + anotherAddr, + testOnionV2Addr, + testOnionV3Addr, + testOpaqueAddr, + } + }), + // No extra opaque data. + makeTestNode(t, func(n *models.LightningNode) { + n.ExtraOpaqueData = nil + }), + // A node with no features. + makeTestNode(t, func(n *models.LightningNode) { + n.Features = lnwire.EmptyFeatureVector() + }), + }, + expGraphStats: graphStats{ + numNodes: 6, + }, + }, } for _, test := range tests { @@ -70,11 +137,48 @@ func TestMigrateGraphToSQL(t *testing.T) { // graphStats holds expected statistics about the graph after migration. type graphStats struct { + numNodes int } // assertInSync checks that the KVStore and SQLStore both contain the same // graph data after migration. -func assertInSync(_ *testing.T, _ *KVStore, _ *SQLStore, stats graphStats) { +func assertInSync(t *testing.T, kvDB *KVStore, sqlDB *SQLStore, + stats graphStats) { + + // 1) Compare the nodes in the two stores. + sqlNodes := fetchAllNodes(t, sqlDB) + require.Len(t, sqlNodes, stats.numNodes) + require.Equal(t, fetchAllNodes(t, kvDB), sqlNodes) +} + +// fetchAllNodes retrieves all nodes from the given store and returns them +// sorted by their public key. +func fetchAllNodes(t *testing.T, store V1Store) []*models.LightningNode { + nodes := make([]*models.LightningNode, 0) + + err := store.ForEachNode(func(tx NodeRTx) error { + node := tx.Node() + + // Call PubKey to ensure the objects cached pubkey is set so that + // the objects can be compared as a whole. + _, err := node.PubKey() + require.NoError(t, err) + + nodes = append(nodes, node) + + return nil + }) + require.NoError(t, err) + + // Sort the nodes by their public key to ensure a consistent order. + sort.Slice(nodes, func(i, j int) bool { + return bytes.Compare( + nodes[i].PubKeyBytes[:], + nodes[j].PubKeyBytes[:], + ) < 0 + }) + + return nodes } // setUpKVStore initializes a new KVStore for testing. @@ -89,6 +193,72 @@ func setUpKVStore(t *testing.T) *KVStore { return kvStore } +// genPubKey generates a new public key for testing purposes. +func genPubKey(t *testing.T) route.Vertex { + key, err := btcec.NewPrivateKey() + require.NoError(t, err) + + var pub route.Vertex + copy(pub[:], key.PubKey().SerializeCompressed()) + + return pub +} + +// testNodeOpt defines a functional option type that can be used to +// modify the attributes of a models.LightningNode crated by makeTestNode. +type testNodeOpt func(*models.LightningNode) + +// makeTestNode can be used to create a test models.LightningNode. The +// functional options can be used to modify the node's attributes. +func makeTestNode(t *testing.T, opts ...testNodeOpt) *models.LightningNode { + n := &models.LightningNode{ + HaveNodeAnnouncement: true, + AuthSigBytes: testSigBytes, + LastUpdate: testTime, + Color: testColor, + Alias: "kek", + Features: testFeatures, + Addresses: testAddrs, + ExtraOpaqueData: testExtraData, + PubKeyBytes: genPubKey(t), + } + + for _, opt := range opts { + opt(n) + } + + // We call this method so that the internal pubkey field is populated + // which then lets us to proper struct comparison later on. + _, err := n.PubKey() + require.NoError(t, err) + + return n +} + +// makeTestShellNode creates a minimal models.LightningNode +// that only contains the public key and no other attributes. +func makeTestShellNode(t *testing.T, + opts ...testNodeOpt) *models.LightningNode { + + n := &models.LightningNode{ + HaveNodeAnnouncement: false, + PubKeyBytes: genPubKey(t), + Features: testEmptyFeatures, + LastUpdate: time.Unix(0, 0), + } + + for _, opt := range opts { + opt(n) + } + + // We call this method so that the internal pubkey field is populated + // which then lets us to proper struct comparison later on. + _, err := n.PubKey() + require.NoError(t, err) + + return n +} + // TestMigrationWithChannelDB tests the migration of the graph store from a // bolt backed channel.db or a kvdb channel.sqlite to a SQL database. Note that // this test does not attempt to be a complete migration test for all graph @@ -207,3 +377,75 @@ func TestMigrationWithChannelDB(t *testing.T) { }) } } + +// TestSQLMigrationEdgeCases tests various edge cases where the migration will +// still be successful but the final states of the KVStore and SQLStore +// will differ slightly. +func TestSQLMigrationEdgeCases(t *testing.T) { + t.Parallel() + ctx := context.Background() + + var invalidTLVData = []byte{0x01, 0x02, 0x03} + + // Here, we test that in the case where the KV store contains a node + // with invalid TLV data, the migration will still succeed, but the + // node will not end up in the SQL store. + t.Run("node with bad tlv data", func(t *testing.T) { + // Make one valid node and one node with invalid TLV data. + n1 := makeTestNode(t) + n2 := makeTestNode(t, func(n *models.LightningNode) { + n.ExtraOpaqueData = invalidTLVData + }) + + populateKV := func(t *testing.T, db *KVStore) { + // Insert both nodes into the KV store. + require.NoError(t, db.AddLightningNode(ctx, n1)) + require.NoError(t, db.AddLightningNode(ctx, n2)) + } + + runTestMigration(t, populateKV, dbState{ + // We expect only the valid node to be present in the + // SQL db. + nodes: []*models.LightningNode{n1}, + }) + }) +} + +// runTestMigration is a helper function that sets up the KVStore and SQLStore, +// populates the KVStore with the provided call-back, runs the migration, and +// asserts that the SQLStore contains the expected state. +func runTestMigration(t *testing.T, populateKV func(t *testing.T, db *KVStore), + expState dbState) { + + ctx := context.Background() + + // Set up our source kvdb DB. + kvDB := setUpKVStore(t) + + // Set up our destination SQL DB. + sql, ok := NewTestDB(t).(*SQLStore) + require.True(t, ok) + + // Populate the kvdb store with the test data. + populateKV(t, kvDB) + + // Run the migration. + err := MigrateGraphToSQL( + ctx, kvDB.db, sql.db, testChain, + ) + require.NoError(t, err) + + assertResultState(t, sql, expState) +} + +// dbState describes the expected state of the SQLStore after a migration. +type dbState struct { + nodes []*models.LightningNode +} + +// assertResultState asserts that the SQLStore contains the expected +// state after a migration. +func assertResultState(t *testing.T, sql *SQLStore, expState dbState) { + // Assert that the sql store contains the expected nodes. + require.ElementsMatch(t, expState.nodes, fetchAllNodes(t, sql)) +}