mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-07-28 13:52:55 +02:00
graph/db: migrate nodes to SQL store
This commit expands MigrateGraphToSQL so that it migrates all the graph nodes from kvdb to SQL.
This commit is contained in:
@@ -4,20 +4,31 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/lightningnetwork/lnd/graph/db/models"
|
||||
"github.com/lightningnetwork/lnd/kvdb"
|
||||
"github.com/lightningnetwork/lnd/sqldb/sqlc"
|
||||
"github.com/pmezard/go-difflib/difflib"
|
||||
)
|
||||
|
||||
// ErrMigrationMismatch is returned when a migrated graph record does not match
|
||||
// the original record.
|
||||
var ErrMigrationMismatch = fmt.Errorf("migrated graph record does not match " +
|
||||
"original record")
|
||||
|
||||
// MigrateGraphToSQL migrates the graph store from a KV backend to a SQL
|
||||
// backend.
|
||||
//
|
||||
// NOTE: this is currently not called from any code path. It is called via tests
|
||||
// only for now and will be called from the main lnd binary once the
|
||||
// migration is fully implemented and tested.
|
||||
func MigrateGraphToSQL(_ context.Context, kvBackend kvdb.Backend,
|
||||
_ SQLQueries, _ chainhash.Hash) error {
|
||||
func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend,
|
||||
sqlDB SQLQueries, _ chainhash.Hash) error {
|
||||
|
||||
log.Infof("Starting migration of the graph store from KV to SQL")
|
||||
t0 := time.Now()
|
||||
@@ -32,6 +43,11 @@ func MigrateGraphToSQL(_ context.Context, kvBackend kvdb.Backend,
|
||||
return nil
|
||||
}
|
||||
|
||||
// 1) Migrate all the nodes.
|
||||
if err := migrateNodes(ctx, kvBackend, sqlDB); err != nil {
|
||||
return fmt.Errorf("could not migrate nodes: %w", err)
|
||||
}
|
||||
|
||||
log.Infof("Finished migration of the graph store from KV to SQL in %v",
|
||||
time.Since(t0))
|
||||
|
||||
@@ -61,3 +77,127 @@ func checkGraphExists(db kvdb.Backend) (bool, error) {
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// migrateNodes migrates all nodes from the KV backend to the SQL database.
|
||||
// This includes doing a sanity check after each migration to ensure that the
|
||||
// migrated node matches the original node.
|
||||
func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
|
||||
sqlDB SQLQueries) error {
|
||||
|
||||
// Keep track of the number of nodes migrated and the number of
|
||||
// nodes skipped due to errors.
|
||||
var (
|
||||
count uint64
|
||||
skipped uint64
|
||||
)
|
||||
|
||||
// Loop through each node in the KV store and insert it into the SQL
|
||||
// database.
|
||||
err := forEachNode(kvBackend, func(_ kvdb.RTx,
|
||||
node *models.LightningNode) error {
|
||||
|
||||
pub := node.PubKeyBytes
|
||||
|
||||
// Sanity check to ensure that the node has valid extra opaque
|
||||
// data. If it does not, we'll skip it. We need to do this
|
||||
// because previously we would just persist any TLV bytes that
|
||||
// we received without validating them. Now, however, we
|
||||
// normalise the storage of extra opaque data, so we need to
|
||||
// ensure that the data is valid. We don't want to abort the
|
||||
// migration if we encounter a node with invalid extra opaque
|
||||
// data, so we'll just skip it and log a warning.
|
||||
_, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
|
||||
if errors.Is(err, ErrParsingExtraTLVBytes) {
|
||||
skipped++
|
||||
log.Warnf("Skipping migration of node %x with invalid "+
|
||||
"extra opaque data: %v", pub,
|
||||
node.ExtraOpaqueData)
|
||||
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("unable to marshal extra "+
|
||||
"opaque data for node %x: %w", pub, err)
|
||||
}
|
||||
|
||||
count++
|
||||
|
||||
// Write the node to the SQL database.
|
||||
id, err := upsertNode(ctx, sqlDB, node)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not persist node(%x): %w", pub,
|
||||
err)
|
||||
}
|
||||
|
||||
// Fetch it from the SQL store and compare it against the
|
||||
// original node object to ensure the migration was successful.
|
||||
dbNode, err := sqlDB.GetNodeByPubKey(
|
||||
ctx, sqlc.GetNodeByPubKeyParams{
|
||||
PubKey: node.PubKeyBytes[:],
|
||||
Version: int16(ProtocolV1),
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get node by pubkey (%x)"+
|
||||
"after migration: %w", pub, err)
|
||||
}
|
||||
|
||||
// Sanity check: ensure the migrated node ID matches the one we
|
||||
// just inserted.
|
||||
if dbNode.ID != id {
|
||||
return fmt.Errorf("node ID mismatch for node (%x) "+
|
||||
"after migration: expected %d, got %d",
|
||||
pub, id, dbNode.ID)
|
||||
}
|
||||
|
||||
migratedNode, err := buildNode(ctx, sqlDB, &dbNode)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not build migrated node "+
|
||||
"from dbNode(db id: %d, node pub: %x): %w",
|
||||
dbNode.ID, pub, err)
|
||||
}
|
||||
|
||||
// Make sure that the node addresses are sorted before
|
||||
// comparing them to ensure that the order of addresses does
|
||||
// not affect the comparison.
|
||||
sort.Slice(node.Addresses, func(i, j int) bool {
|
||||
return node.Addresses[i].String() <
|
||||
node.Addresses[j].String()
|
||||
})
|
||||
sort.Slice(migratedNode.Addresses, func(i, j int) bool {
|
||||
return migratedNode.Addresses[i].String() <
|
||||
migratedNode.Addresses[j].String()
|
||||
})
|
||||
|
||||
return compare(node, migratedNode, fmt.Sprintf("node %x", pub))
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not migrate nodes: %w", err)
|
||||
}
|
||||
|
||||
log.Infof("Migrated %d nodes from KV to SQL (skipped %d nodes due to "+
|
||||
"invalid TLV streams)", count, skipped)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// compare checks if the original and migrated objects are equal. If they
|
||||
// are not, it returns an error with a unified diff of the two objects.
|
||||
func compare(original, migrated any, identifier string) error {
|
||||
if reflect.DeepEqual(original, migrated) {
|
||||
return nil
|
||||
}
|
||||
|
||||
diff := difflib.UnifiedDiff{
|
||||
A: difflib.SplitLines(spew.Sdump(original)),
|
||||
B: difflib.SplitLines(spew.Sdump(migrated)),
|
||||
FromFile: "Expected",
|
||||
FromDate: "",
|
||||
ToFile: "Actual",
|
||||
ToDate: "",
|
||||
Context: 3,
|
||||
}
|
||||
diffText, _ := difflib.GetUnifiedDiffString(diff)
|
||||
|
||||
return fmt.Errorf("%w: %s.\n%v", ErrMigrationMismatch, identifier,
|
||||
diffText)
|
||||
}
|
||||
|
@@ -3,23 +3,39 @@
|
||||
package graphdb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"image/color"
|
||||
"net"
|
||||
"os"
|
||||
"path"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/btcsuite/btcd/btcec/v2"
|
||||
"github.com/btcsuite/btcd/chaincfg"
|
||||
"github.com/btcsuite/btclog/v2"
|
||||
"github.com/lightningnetwork/lnd/graph/db/models"
|
||||
"github.com/lightningnetwork/lnd/kvdb"
|
||||
"github.com/lightningnetwork/lnd/kvdb/sqlbase"
|
||||
"github.com/lightningnetwork/lnd/kvdb/sqlite"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
"github.com/lightningnetwork/lnd/routing/route"
|
||||
"github.com/lightningnetwork/lnd/sqldb"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var testChain = *chaincfg.MainNetParams.GenesisHash
|
||||
var (
|
||||
testChain = *chaincfg.MainNetParams.GenesisHash
|
||||
testColor = color.RGBA{R: 1, G: 2, B: 3}
|
||||
testTime = time.Unix(11111, 0)
|
||||
testSigBytes = testSig.Serialize()
|
||||
testExtraData = []byte{1, 1, 1, 2, 2, 2, 2}
|
||||
testEmptyFeatures = lnwire.EmptyFeatureVector()
|
||||
)
|
||||
|
||||
// TestMigrateGraphToSQL tests various deterministic cases that we want to test
|
||||
// for to ensure that our migration from a graph store backed by a KV DB to a
|
||||
@@ -29,6 +45,19 @@ func TestMigrateGraphToSQL(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := context.Background()
|
||||
|
||||
writeUpdate := func(t *testing.T, db *KVStore, object any) {
|
||||
t.Helper()
|
||||
|
||||
var err error
|
||||
switch obj := object.(type) {
|
||||
case *models.LightningNode:
|
||||
err = db.AddLightningNode(ctx, obj)
|
||||
default:
|
||||
err = fmt.Errorf("unhandled object type: %T", obj)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
write func(t *testing.T, db *KVStore, object any)
|
||||
@@ -38,6 +67,44 @@ func TestMigrateGraphToSQL(t *testing.T) {
|
||||
{
|
||||
name: "empty",
|
||||
},
|
||||
{
|
||||
name: "nodes",
|
||||
write: writeUpdate,
|
||||
//nolint:ll
|
||||
objects: []any{
|
||||
// Normal node with all fields.
|
||||
makeTestNode(t),
|
||||
// A node with no node announcement.
|
||||
makeTestShellNode(t),
|
||||
// A node with an announcement but no addresses.
|
||||
makeTestNode(t, func(n *models.LightningNode) {
|
||||
n.Addresses = nil
|
||||
}),
|
||||
// A node with all types of addresses.
|
||||
makeTestNode(t, func(n *models.LightningNode) {
|
||||
n.Addresses = []net.Addr{
|
||||
testAddr,
|
||||
testIPV4Addr,
|
||||
testIPV6Addr,
|
||||
anotherAddr,
|
||||
testOnionV2Addr,
|
||||
testOnionV3Addr,
|
||||
testOpaqueAddr,
|
||||
}
|
||||
}),
|
||||
// No extra opaque data.
|
||||
makeTestNode(t, func(n *models.LightningNode) {
|
||||
n.ExtraOpaqueData = nil
|
||||
}),
|
||||
// A node with no features.
|
||||
makeTestNode(t, func(n *models.LightningNode) {
|
||||
n.Features = lnwire.EmptyFeatureVector()
|
||||
}),
|
||||
},
|
||||
expGraphStats: graphStats{
|
||||
numNodes: 6,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
@@ -70,11 +137,48 @@ func TestMigrateGraphToSQL(t *testing.T) {
|
||||
|
||||
// graphStats holds expected statistics about the graph after migration.
|
||||
type graphStats struct {
|
||||
numNodes int
|
||||
}
|
||||
|
||||
// assertInSync checks that the KVStore and SQLStore both contain the same
|
||||
// graph data after migration.
|
||||
func assertInSync(_ *testing.T, _ *KVStore, _ *SQLStore, stats graphStats) {
|
||||
func assertInSync(t *testing.T, kvDB *KVStore, sqlDB *SQLStore,
|
||||
stats graphStats) {
|
||||
|
||||
// 1) Compare the nodes in the two stores.
|
||||
sqlNodes := fetchAllNodes(t, sqlDB)
|
||||
require.Len(t, sqlNodes, stats.numNodes)
|
||||
require.Equal(t, fetchAllNodes(t, kvDB), sqlNodes)
|
||||
}
|
||||
|
||||
// fetchAllNodes retrieves all nodes from the given store and returns them
|
||||
// sorted by their public key.
|
||||
func fetchAllNodes(t *testing.T, store V1Store) []*models.LightningNode {
|
||||
nodes := make([]*models.LightningNode, 0)
|
||||
|
||||
err := store.ForEachNode(func(tx NodeRTx) error {
|
||||
node := tx.Node()
|
||||
|
||||
// Call PubKey to ensure the objects cached pubkey is set so that
|
||||
// the objects can be compared as a whole.
|
||||
_, err := node.PubKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
nodes = append(nodes, node)
|
||||
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Sort the nodes by their public key to ensure a consistent order.
|
||||
sort.Slice(nodes, func(i, j int) bool {
|
||||
return bytes.Compare(
|
||||
nodes[i].PubKeyBytes[:],
|
||||
nodes[j].PubKeyBytes[:],
|
||||
) < 0
|
||||
})
|
||||
|
||||
return nodes
|
||||
}
|
||||
|
||||
// setUpKVStore initializes a new KVStore for testing.
|
||||
@@ -89,6 +193,72 @@ func setUpKVStore(t *testing.T) *KVStore {
|
||||
return kvStore
|
||||
}
|
||||
|
||||
// genPubKey generates a new public key for testing purposes.
|
||||
func genPubKey(t *testing.T) route.Vertex {
|
||||
key, err := btcec.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
var pub route.Vertex
|
||||
copy(pub[:], key.PubKey().SerializeCompressed())
|
||||
|
||||
return pub
|
||||
}
|
||||
|
||||
// testNodeOpt defines a functional option type that can be used to
|
||||
// modify the attributes of a models.LightningNode crated by makeTestNode.
|
||||
type testNodeOpt func(*models.LightningNode)
|
||||
|
||||
// makeTestNode can be used to create a test models.LightningNode. The
|
||||
// functional options can be used to modify the node's attributes.
|
||||
func makeTestNode(t *testing.T, opts ...testNodeOpt) *models.LightningNode {
|
||||
n := &models.LightningNode{
|
||||
HaveNodeAnnouncement: true,
|
||||
AuthSigBytes: testSigBytes,
|
||||
LastUpdate: testTime,
|
||||
Color: testColor,
|
||||
Alias: "kek",
|
||||
Features: testFeatures,
|
||||
Addresses: testAddrs,
|
||||
ExtraOpaqueData: testExtraData,
|
||||
PubKeyBytes: genPubKey(t),
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(n)
|
||||
}
|
||||
|
||||
// We call this method so that the internal pubkey field is populated
|
||||
// which then lets us to proper struct comparison later on.
|
||||
_, err := n.PubKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
return n
|
||||
}
|
||||
|
||||
// makeTestShellNode creates a minimal models.LightningNode
|
||||
// that only contains the public key and no other attributes.
|
||||
func makeTestShellNode(t *testing.T,
|
||||
opts ...testNodeOpt) *models.LightningNode {
|
||||
|
||||
n := &models.LightningNode{
|
||||
HaveNodeAnnouncement: false,
|
||||
PubKeyBytes: genPubKey(t),
|
||||
Features: testEmptyFeatures,
|
||||
LastUpdate: time.Unix(0, 0),
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(n)
|
||||
}
|
||||
|
||||
// We call this method so that the internal pubkey field is populated
|
||||
// which then lets us to proper struct comparison later on.
|
||||
_, err := n.PubKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
return n
|
||||
}
|
||||
|
||||
// TestMigrationWithChannelDB tests the migration of the graph store from a
|
||||
// bolt backed channel.db or a kvdb channel.sqlite to a SQL database. Note that
|
||||
// this test does not attempt to be a complete migration test for all graph
|
||||
@@ -207,3 +377,75 @@ func TestMigrationWithChannelDB(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestSQLMigrationEdgeCases tests various edge cases where the migration will
|
||||
// still be successful but the final states of the KVStore and SQLStore
|
||||
// will differ slightly.
|
||||
func TestSQLMigrationEdgeCases(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := context.Background()
|
||||
|
||||
var invalidTLVData = []byte{0x01, 0x02, 0x03}
|
||||
|
||||
// Here, we test that in the case where the KV store contains a node
|
||||
// with invalid TLV data, the migration will still succeed, but the
|
||||
// node will not end up in the SQL store.
|
||||
t.Run("node with bad tlv data", func(t *testing.T) {
|
||||
// Make one valid node and one node with invalid TLV data.
|
||||
n1 := makeTestNode(t)
|
||||
n2 := makeTestNode(t, func(n *models.LightningNode) {
|
||||
n.ExtraOpaqueData = invalidTLVData
|
||||
})
|
||||
|
||||
populateKV := func(t *testing.T, db *KVStore) {
|
||||
// Insert both nodes into the KV store.
|
||||
require.NoError(t, db.AddLightningNode(ctx, n1))
|
||||
require.NoError(t, db.AddLightningNode(ctx, n2))
|
||||
}
|
||||
|
||||
runTestMigration(t, populateKV, dbState{
|
||||
// We expect only the valid node to be present in the
|
||||
// SQL db.
|
||||
nodes: []*models.LightningNode{n1},
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// runTestMigration is a helper function that sets up the KVStore and SQLStore,
|
||||
// populates the KVStore with the provided call-back, runs the migration, and
|
||||
// asserts that the SQLStore contains the expected state.
|
||||
func runTestMigration(t *testing.T, populateKV func(t *testing.T, db *KVStore),
|
||||
expState dbState) {
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Set up our source kvdb DB.
|
||||
kvDB := setUpKVStore(t)
|
||||
|
||||
// Set up our destination SQL DB.
|
||||
sql, ok := NewTestDB(t).(*SQLStore)
|
||||
require.True(t, ok)
|
||||
|
||||
// Populate the kvdb store with the test data.
|
||||
populateKV(t, kvDB)
|
||||
|
||||
// Run the migration.
|
||||
err := MigrateGraphToSQL(
|
||||
ctx, kvDB.db, sql.db, testChain,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
assertResultState(t, sql, expState)
|
||||
}
|
||||
|
||||
// dbState describes the expected state of the SQLStore after a migration.
|
||||
type dbState struct {
|
||||
nodes []*models.LightningNode
|
||||
}
|
||||
|
||||
// assertResultState asserts that the SQLStore contains the expected
|
||||
// state after a migration.
|
||||
func assertResultState(t *testing.T, sql *SQLStore, expState dbState) {
|
||||
// Assert that the sql store contains the expected nodes.
|
||||
require.ElementsMatch(t, expState.nodes, fetchAllNodes(t, sql))
|
||||
}
|
||||
|
Reference in New Issue
Block a user