mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-08-27 06:01:48 +02:00
Merge pull request #10036 from ellemouton/graphMig1-nodes
[graph mig 1]: graph/db: migrate graph nodes from kvdb to SQL
This commit is contained in:
@@ -92,6 +92,8 @@ circuit. The indices are only available for forwarding events saved after v0.20.
|
||||
* [9](https://github.com/lightningnetwork/lnd/pull/9939)
|
||||
* [10](https://github.com/lightningnetwork/lnd/pull/9971)
|
||||
* [11](https://github.com/lightningnetwork/lnd/pull/9972)
|
||||
* Add graph SQL migration logic:
|
||||
* [1](https://github.com/lightningnetwork/lnd/pull/10036)
|
||||
|
||||
## RPC Updates
|
||||
* Previously the `RoutingPolicy` would return the inbound fee record in its
|
||||
|
2
go.mod
2
go.mod
@@ -140,7 +140,7 @@ require (
|
||||
github.com/opencontainers/image-spec v1.0.2 // indirect
|
||||
github.com/opencontainers/runc v1.1.12 // indirect
|
||||
github.com/ory/dockertest/v3 v3.10.0 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/prometheus/client_model v0.2.0 // indirect
|
||||
github.com/prometheus/common v0.26.0 // indirect
|
||||
github.com/prometheus/procfs v0.6.0 // indirect
|
||||
|
263
graph/db/sql_migration.go
Normal file
263
graph/db/sql_migration.go
Normal file
@@ -0,0 +1,263 @@
|
||||
package graphdb
|
||||
|
||||
import (
|
||||
"cmp"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
"github.com/lightningnetwork/lnd/graph/db/models"
|
||||
"github.com/lightningnetwork/lnd/kvdb"
|
||||
"github.com/lightningnetwork/lnd/sqldb"
|
||||
"github.com/lightningnetwork/lnd/sqldb/sqlc"
|
||||
)
|
||||
|
||||
// MigrateGraphToSQL migrates the graph store from a KV backend to a SQL
|
||||
// backend.
|
||||
//
|
||||
// NOTE: this is currently not called from any code path. It is called via tests
|
||||
// only for now and will be called from the main lnd binary once the
|
||||
// migration is fully implemented and tested.
|
||||
func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend,
|
||||
sqlDB SQLQueries, _ chainhash.Hash) error {
|
||||
|
||||
log.Infof("Starting migration of the graph store from KV to SQL")
|
||||
t0 := time.Now()
|
||||
|
||||
// Check if there is a graph to migrate.
|
||||
graphExists, err := checkGraphExists(kvBackend)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check graph existence: %w", err)
|
||||
}
|
||||
if !graphExists {
|
||||
log.Infof("No graph found in KV store, skipping the migration")
|
||||
return nil
|
||||
}
|
||||
|
||||
// 1) Migrate all the nodes.
|
||||
if err := migrateNodes(ctx, kvBackend, sqlDB); err != nil {
|
||||
return fmt.Errorf("could not migrate nodes: %w", err)
|
||||
}
|
||||
|
||||
// 2) Migrate the source node.
|
||||
if err := migrateSourceNode(ctx, kvBackend, sqlDB); err != nil {
|
||||
return fmt.Errorf("could not migrate source node: %w", err)
|
||||
}
|
||||
|
||||
log.Infof("Finished migration of the graph store from KV to SQL in %v",
|
||||
time.Since(t0))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkGraphExists checks if the graph exists in the KV backend.
|
||||
func checkGraphExists(db kvdb.Backend) (bool, error) {
|
||||
// Check if there is even a graph to migrate.
|
||||
err := db.View(func(tx kvdb.RTx) error {
|
||||
// Check for the existence of the node bucket which is a top
|
||||
// level bucket that would have been created on the initial
|
||||
// creation of the graph store.
|
||||
nodes := tx.ReadBucket(nodeBucket)
|
||||
if nodes == nil {
|
||||
return ErrGraphNotFound
|
||||
}
|
||||
|
||||
return nil
|
||||
}, func() {})
|
||||
if errors.Is(err, ErrGraphNotFound) {
|
||||
return false, nil
|
||||
} else if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// migrateNodes migrates all nodes from the KV backend to the SQL database.
|
||||
// This includes doing a sanity check after each migration to ensure that the
|
||||
// migrated node matches the original node.
|
||||
func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
|
||||
sqlDB SQLQueries) error {
|
||||
|
||||
// Keep track of the number of nodes migrated and the number of
|
||||
// nodes skipped due to errors.
|
||||
var (
|
||||
count uint64
|
||||
skipped uint64
|
||||
)
|
||||
|
||||
// Loop through each node in the KV store and insert it into the SQL
|
||||
// database.
|
||||
err := forEachNode(kvBackend, func(_ kvdb.RTx,
|
||||
node *models.LightningNode) error {
|
||||
|
||||
pub := node.PubKeyBytes
|
||||
|
||||
// Sanity check to ensure that the node has valid extra opaque
|
||||
// data. If it does not, we'll skip it. We need to do this
|
||||
// because previously we would just persist any TLV bytes that
|
||||
// we received without validating them. Now, however, we
|
||||
// normalise the storage of extra opaque data, so we need to
|
||||
// ensure that the data is valid. We don't want to abort the
|
||||
// migration if we encounter a node with invalid extra opaque
|
||||
// data, so we'll just skip it and log a warning.
|
||||
_, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
|
||||
if errors.Is(err, ErrParsingExtraTLVBytes) {
|
||||
skipped++
|
||||
log.Warnf("Skipping migration of node %x with invalid "+
|
||||
"extra opaque data: %v", pub,
|
||||
node.ExtraOpaqueData)
|
||||
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("unable to marshal extra "+
|
||||
"opaque data for node %x: %w", pub, err)
|
||||
}
|
||||
|
||||
count++
|
||||
|
||||
// TODO(elle): At this point, we should check the loaded node
|
||||
// to see if we should extract any DNS addresses from its
|
||||
// opaque type addresses. This is expected to be done in:
|
||||
// https://github.com/lightningnetwork/lnd/pull/9455.
|
||||
// This TODO is being tracked in
|
||||
// https://github.com/lightningnetwork/lnd/issues/9795 as this
|
||||
// must be addressed before making this code path active in
|
||||
// production.
|
||||
|
||||
// Write the node to the SQL database.
|
||||
id, err := upsertNode(ctx, sqlDB, node)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not persist node(%x): %w", pub,
|
||||
err)
|
||||
}
|
||||
|
||||
// Fetch it from the SQL store and compare it against the
|
||||
// original node object to ensure the migration was successful.
|
||||
dbNode, err := sqlDB.GetNodeByPubKey(
|
||||
ctx, sqlc.GetNodeByPubKeyParams{
|
||||
PubKey: node.PubKeyBytes[:],
|
||||
Version: int16(ProtocolV1),
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get node by pubkey (%x)"+
|
||||
"after migration: %w", pub, err)
|
||||
}
|
||||
|
||||
// Sanity check: ensure the migrated node ID matches the one we
|
||||
// just inserted.
|
||||
if dbNode.ID != id {
|
||||
return fmt.Errorf("node ID mismatch for node (%x) "+
|
||||
"after migration: expected %d, got %d",
|
||||
pub, id, dbNode.ID)
|
||||
}
|
||||
|
||||
migratedNode, err := buildNode(ctx, sqlDB, &dbNode)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not build migrated node "+
|
||||
"from dbNode(db id: %d, node pub: %x): %w",
|
||||
dbNode.ID, pub, err)
|
||||
}
|
||||
|
||||
// Make sure that the node addresses are sorted before
|
||||
// comparing them to ensure that the order of addresses does
|
||||
// not affect the comparison.
|
||||
slices.SortFunc(node.Addresses, func(i, j net.Addr) int {
|
||||
return cmp.Compare(i.String(), j.String())
|
||||
})
|
||||
slices.SortFunc(
|
||||
migratedNode.Addresses, func(i, j net.Addr) int {
|
||||
return cmp.Compare(i.String(), j.String())
|
||||
},
|
||||
)
|
||||
|
||||
return sqldb.CompareRecords(
|
||||
node, migratedNode, fmt.Sprintf("node %x", pub),
|
||||
)
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not migrate nodes: %w", err)
|
||||
}
|
||||
|
||||
log.Infof("Migrated %d nodes from KV to SQL (skipped %d nodes due to "+
|
||||
"invalid TLV streams)", count, skipped)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// migrateSourceNode migrates the source node from the KV backend to the
|
||||
// SQL database.
|
||||
func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend,
|
||||
sqlDB SQLQueries) error {
|
||||
|
||||
sourceNode, err := sourceNode(kvdb)
|
||||
if errors.Is(err, ErrSourceNodeNotSet) {
|
||||
// If the source node has not been set yet, we can skip this
|
||||
// migration step.
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("could not get source node from kv "+
|
||||
"store: %w", err)
|
||||
}
|
||||
|
||||
pub := sourceNode.PubKeyBytes
|
||||
|
||||
// Get the DB ID of the source node by its public key. This node must
|
||||
// already exist in the SQL database, as it should have been migrated
|
||||
// in the previous node-migration step.
|
||||
id, err := sqlDB.GetNodeIDByPubKey(
|
||||
ctx, sqlc.GetNodeIDByPubKeyParams{
|
||||
PubKey: pub[:],
|
||||
Version: int16(ProtocolV1),
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get source node ID: %w", err)
|
||||
}
|
||||
|
||||
// Now we can add the source node to the SQL database.
|
||||
err = sqlDB.AddSourceNode(ctx, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not add source node to SQL store: %w",
|
||||
err)
|
||||
}
|
||||
|
||||
// Verify that the source node was added correctly by fetching it back
|
||||
// from the SQL database and checking that the expected DB ID and
|
||||
// pub key are returned. We don't need to do a whole node comparison
|
||||
// here, as this was already done in the previous migration step.
|
||||
srcNodes, err := sqlDB.GetSourceNodesByVersion(ctx, int16(ProtocolV1))
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not get source nodes from SQL "+
|
||||
"store: %w", err)
|
||||
}
|
||||
|
||||
// The SQL store has support for multiple source nodes (for future
|
||||
// protocol versions) but this migration is purely aimed at the V1
|
||||
// store, and so we expect exactly one source node to be present.
|
||||
if len(srcNodes) != 1 {
|
||||
return fmt.Errorf("expected exactly one source node, "+
|
||||
"got %d", len(srcNodes))
|
||||
}
|
||||
|
||||
// Check that the source node ID and pub key match the original
|
||||
// source node.
|
||||
if srcNodes[0].NodeID != id {
|
||||
return fmt.Errorf("source node ID mismatch after migration: "+
|
||||
"expected %d, got %d", id, srcNodes[0].NodeID)
|
||||
}
|
||||
err = sqldb.CompareRecords(pub[:], srcNodes[0].PubKey, "source node")
|
||||
if err != nil {
|
||||
return fmt.Errorf("source node pubkey mismatch after "+
|
||||
"migration: %w", err)
|
||||
}
|
||||
|
||||
log.Infof("Migrated source node with pubkey %x to SQL", pub[:])
|
||||
|
||||
return nil
|
||||
}
|
485
graph/db/sql_migration_test.go
Normal file
485
graph/db/sql_migration_test.go
Normal file
@@ -0,0 +1,485 @@
|
||||
//go:build test_db_postgres || test_db_sqlite
|
||||
|
||||
package graphdb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"image/color"
|
||||
"net"
|
||||
"os"
|
||||
"path"
|
||||
"slices"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/btcsuite/btcd/btcec/v2"
|
||||
"github.com/btcsuite/btcd/chaincfg"
|
||||
"github.com/btcsuite/btclog/v2"
|
||||
"github.com/lightningnetwork/lnd/graph/db/models"
|
||||
"github.com/lightningnetwork/lnd/kvdb"
|
||||
"github.com/lightningnetwork/lnd/kvdb/sqlbase"
|
||||
"github.com/lightningnetwork/lnd/kvdb/sqlite"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
"github.com/lightningnetwork/lnd/routing/route"
|
||||
"github.com/lightningnetwork/lnd/sqldb"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var (
|
||||
testChain = *chaincfg.MainNetParams.GenesisHash
|
||||
testColor = color.RGBA{R: 1, G: 2, B: 3}
|
||||
testTime = time.Unix(11111, 0)
|
||||
testSigBytes = testSig.Serialize()
|
||||
testExtraData = []byte{1, 1, 1, 2, 2, 2, 2}
|
||||
testEmptyFeatures = lnwire.EmptyFeatureVector()
|
||||
)
|
||||
|
||||
// TestMigrateGraphToSQL tests various deterministic cases that we want to test
|
||||
// for to ensure that our migration from a graph store backed by a KV DB to a
|
||||
// SQL database works as expected. At the end of each test, the DBs are compared
|
||||
// and expected to have the exact same data in them.
|
||||
func TestMigrateGraphToSQL(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := context.Background()
|
||||
|
||||
writeUpdate := func(t *testing.T, db *KVStore, object any) {
|
||||
t.Helper()
|
||||
|
||||
var err error
|
||||
switch obj := object.(type) {
|
||||
case *models.LightningNode:
|
||||
err = db.AddLightningNode(ctx, obj)
|
||||
default:
|
||||
err = fmt.Errorf("unhandled object type: %T", obj)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
write func(t *testing.T, db *KVStore, object any)
|
||||
objects []any
|
||||
expGraphStats graphStats
|
||||
}{
|
||||
{
|
||||
name: "empty",
|
||||
},
|
||||
{
|
||||
name: "nodes",
|
||||
write: writeUpdate,
|
||||
//nolint:ll
|
||||
objects: []any{
|
||||
// Normal node with all fields.
|
||||
makeTestNode(t),
|
||||
// A node with no node announcement.
|
||||
makeTestShellNode(t),
|
||||
// A node with an announcement but no addresses.
|
||||
makeTestNode(t, func(n *models.LightningNode) {
|
||||
n.Addresses = nil
|
||||
}),
|
||||
// A node with all types of addresses.
|
||||
makeTestNode(t, func(n *models.LightningNode) {
|
||||
n.Addresses = []net.Addr{
|
||||
testAddr,
|
||||
testIPV4Addr,
|
||||
testIPV6Addr,
|
||||
anotherAddr,
|
||||
testOnionV2Addr,
|
||||
testOnionV3Addr,
|
||||
testOpaqueAddr,
|
||||
}
|
||||
}),
|
||||
// No extra opaque data.
|
||||
makeTestNode(t, func(n *models.LightningNode) {
|
||||
n.ExtraOpaqueData = nil
|
||||
}),
|
||||
// A node with no features.
|
||||
makeTestNode(t, func(n *models.LightningNode) {
|
||||
n.Features = lnwire.EmptyFeatureVector()
|
||||
}),
|
||||
},
|
||||
expGraphStats: graphStats{
|
||||
numNodes: 6,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "source node",
|
||||
write: func(t *testing.T, db *KVStore, object any) {
|
||||
node, ok := object.(*models.LightningNode)
|
||||
require.True(t, ok)
|
||||
|
||||
err := db.SetSourceNode(ctx, node)
|
||||
require.NoError(t, err)
|
||||
},
|
||||
objects: []any{
|
||||
makeTestNode(t),
|
||||
},
|
||||
expGraphStats: graphStats{
|
||||
numNodes: 1,
|
||||
srcNodeSet: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Set up our source kvdb DB.
|
||||
kvDB := setUpKVStore(t)
|
||||
|
||||
// Write the test objects to the kvdb store.
|
||||
for _, object := range test.objects {
|
||||
test.write(t, kvDB, object)
|
||||
}
|
||||
|
||||
// Set up our destination SQL DB.
|
||||
sql, ok := NewTestDB(t).(*SQLStore)
|
||||
require.True(t, ok)
|
||||
|
||||
// Run the migration.
|
||||
err := MigrateGraphToSQL(
|
||||
ctx, kvDB.db, sql.db, testChain,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Validate that the two databases are now in sync.
|
||||
assertInSync(t, kvDB, sql, test.expGraphStats)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// graphStats holds expected statistics about the graph after migration.
|
||||
type graphStats struct {
|
||||
numNodes int
|
||||
srcNodeSet bool
|
||||
}
|
||||
|
||||
// assertInSync checks that the KVStore and SQLStore both contain the same
|
||||
// graph data after migration.
|
||||
func assertInSync(t *testing.T, kvDB *KVStore, sqlDB *SQLStore,
|
||||
stats graphStats) {
|
||||
|
||||
// 1) Compare the nodes in the two stores.
|
||||
sqlNodes := fetchAllNodes(t, sqlDB)
|
||||
require.Len(t, sqlNodes, stats.numNodes)
|
||||
require.Equal(t, fetchAllNodes(t, kvDB), sqlNodes)
|
||||
|
||||
// 2) Check that the source nodes match (if indeed source nodes have
|
||||
// been set).
|
||||
sqlSourceNode := fetchSourceNode(t, sqlDB)
|
||||
require.Equal(t, stats.srcNodeSet, sqlSourceNode != nil)
|
||||
require.Equal(t, fetchSourceNode(t, kvDB), sqlSourceNode)
|
||||
}
|
||||
|
||||
// fetchAllNodes retrieves all nodes from the given store and returns them
|
||||
// sorted by their public key.
|
||||
func fetchAllNodes(t *testing.T, store V1Store) []*models.LightningNode {
|
||||
nodes := make([]*models.LightningNode, 0)
|
||||
|
||||
err := store.ForEachNode(func(tx NodeRTx) error {
|
||||
node := tx.Node()
|
||||
|
||||
// Call PubKey to ensure the objects cached pubkey is set so that
|
||||
// the objects can be compared as a whole.
|
||||
_, err := node.PubKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
nodes = append(nodes, node)
|
||||
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Sort the nodes by their public key to ensure a consistent order.
|
||||
slices.SortFunc(nodes, func(i, j *models.LightningNode) int {
|
||||
return bytes.Compare(i.PubKeyBytes[:], j.PubKeyBytes[:])
|
||||
})
|
||||
|
||||
return nodes
|
||||
}
|
||||
|
||||
// fetchSourceNode retrieves the source node from the given store.
|
||||
func fetchSourceNode(t *testing.T, store V1Store) *models.LightningNode {
|
||||
node, err := store.SourceNode(context.Background())
|
||||
if errors.Is(err, ErrSourceNodeNotSet) {
|
||||
return nil
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
return node
|
||||
}
|
||||
|
||||
// setUpKVStore initializes a new KVStore for testing.
|
||||
func setUpKVStore(t *testing.T) *KVStore {
|
||||
kvDB, cleanup, err := kvdb.GetTestBackend(t.TempDir(), "graph")
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(cleanup)
|
||||
|
||||
kvStore, err := NewKVStore(kvDB)
|
||||
require.NoError(t, err)
|
||||
|
||||
return kvStore
|
||||
}
|
||||
|
||||
// genPubKey generates a new public key for testing purposes.
|
||||
func genPubKey(t *testing.T) route.Vertex {
|
||||
key, err := btcec.NewPrivateKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
var pub route.Vertex
|
||||
copy(pub[:], key.PubKey().SerializeCompressed())
|
||||
|
||||
return pub
|
||||
}
|
||||
|
||||
// testNodeOpt defines a functional option type that can be used to
|
||||
// modify the attributes of a models.LightningNode crated by makeTestNode.
|
||||
type testNodeOpt func(*models.LightningNode)
|
||||
|
||||
// makeTestNode can be used to create a test models.LightningNode. The
|
||||
// functional options can be used to modify the node's attributes.
|
||||
func makeTestNode(t *testing.T, opts ...testNodeOpt) *models.LightningNode {
|
||||
n := &models.LightningNode{
|
||||
HaveNodeAnnouncement: true,
|
||||
AuthSigBytes: testSigBytes,
|
||||
LastUpdate: testTime,
|
||||
Color: testColor,
|
||||
Alias: "kek",
|
||||
Features: testFeatures,
|
||||
Addresses: testAddrs,
|
||||
ExtraOpaqueData: testExtraData,
|
||||
PubKeyBytes: genPubKey(t),
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(n)
|
||||
}
|
||||
|
||||
// We call this method so that the internal pubkey field is populated
|
||||
// which then lets us to proper struct comparison later on.
|
||||
_, err := n.PubKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
return n
|
||||
}
|
||||
|
||||
// makeTestShellNode creates a minimal models.LightningNode
|
||||
// that only contains the public key and no other attributes.
|
||||
func makeTestShellNode(t *testing.T,
|
||||
opts ...testNodeOpt) *models.LightningNode {
|
||||
|
||||
n := &models.LightningNode{
|
||||
HaveNodeAnnouncement: false,
|
||||
PubKeyBytes: genPubKey(t),
|
||||
Features: testEmptyFeatures,
|
||||
LastUpdate: time.Unix(0, 0),
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(n)
|
||||
}
|
||||
|
||||
// We call this method so that the internal pubkey field is populated
|
||||
// which then lets us to proper struct comparison later on.
|
||||
_, err := n.PubKey()
|
||||
require.NoError(t, err)
|
||||
|
||||
return n
|
||||
}
|
||||
|
||||
// TestMigrationWithChannelDB tests the migration of the graph store from a
|
||||
// bolt backed channel.db or a kvdb channel.sqlite to a SQL database. Note that
|
||||
// this test does not attempt to be a complete migration test for all graph
|
||||
// store types but rather is added as a tool for developers and users to debug
|
||||
// graph migration issues with an actual channel.db/channel.sqlite file.
|
||||
//
|
||||
// NOTE: To use this test, place either of those files in the graph/db/testdata
|
||||
// directory, uncomment the "Skipf" line, and set "chain" variable appropriately
|
||||
// and set the "fileName" variable to the name of the channel database file you
|
||||
// want to use for the migration test.
|
||||
func TestMigrationWithChannelDB(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// NOTE: comment this line out to run the test.
|
||||
t.Skipf("skipping test meant for local debugging only")
|
||||
|
||||
// NOTE: set this to the genesis hash of the chain that the store
|
||||
// was created on.
|
||||
chain := *chaincfg.MainNetParams.GenesisHash
|
||||
|
||||
// NOTE: set this to the name of the channel database file you want
|
||||
// to use for the migration test. This may be either a bbolt ".db" file
|
||||
// or a SQLite ".sqlite" file. If you want to migrate from a
|
||||
// bbolt channel.db file, set this to "channel.db".
|
||||
const fileName = "channel.sqlite"
|
||||
|
||||
// Set up logging for the test.
|
||||
UseLogger(btclog.NewSLogger(btclog.NewDefaultHandler(os.Stdout)))
|
||||
|
||||
// migrate runs the migration from the kvdb store to the SQL store.
|
||||
migrate := func(t *testing.T, kvBackend kvdb.Backend) {
|
||||
graphStore := newBatchQuerier(t)
|
||||
|
||||
err := graphStore.ExecTx(
|
||||
ctx, sqldb.WriteTxOpt(), func(tx SQLQueries) error {
|
||||
return MigrateGraphToSQL(
|
||||
ctx, kvBackend, tx, chain,
|
||||
)
|
||||
}, sqldb.NoOpReset,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
connectBBolt := func(t *testing.T, dbPath string) kvdb.Backend {
|
||||
cfg := &kvdb.BoltBackendConfig{
|
||||
DBPath: dbPath,
|
||||
DBFileName: fileName,
|
||||
NoFreelistSync: true,
|
||||
AutoCompact: false,
|
||||
AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge,
|
||||
DBTimeout: kvdb.DefaultDBTimeout,
|
||||
}
|
||||
|
||||
kvStore, err := kvdb.GetBoltBackend(cfg)
|
||||
require.NoError(t, err)
|
||||
|
||||
return kvStore
|
||||
}
|
||||
|
||||
connectSQLite := func(t *testing.T, dbPath string) kvdb.Backend {
|
||||
const (
|
||||
timeout = 10 * time.Second
|
||||
maxConns = 50
|
||||
)
|
||||
sqlbase.Init(maxConns)
|
||||
|
||||
cfg := &sqlite.Config{
|
||||
Timeout: timeout,
|
||||
BusyTimeout: timeout,
|
||||
MaxConnections: maxConns,
|
||||
}
|
||||
|
||||
kvStore, err := kvdb.Open(
|
||||
kvdb.SqliteBackendName, ctx, cfg,
|
||||
dbPath, fileName,
|
||||
// NOTE: we use the raw string here else we get an
|
||||
// import cycle if we try to import lncfg.NSChannelDB.
|
||||
"channeldb",
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
return kvStore
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
dbPath string
|
||||
}{
|
||||
{
|
||||
name: "empty",
|
||||
dbPath: t.TempDir(),
|
||||
},
|
||||
{
|
||||
name: "testdata",
|
||||
dbPath: "testdata",
|
||||
},
|
||||
}
|
||||
|
||||
// Determine if we are using a SQLite file or a Bolt DB file.
|
||||
var isSqlite bool
|
||||
if strings.HasSuffix(fileName, ".sqlite") {
|
||||
isSqlite = true
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
chanDBPath := path.Join(test.dbPath, fileName)
|
||||
t.Logf("Connecting to channel DB at: %s", chanDBPath)
|
||||
|
||||
connectDB := connectBBolt
|
||||
if isSqlite {
|
||||
connectDB = connectSQLite
|
||||
}
|
||||
|
||||
migrate(t, connectDB(t, test.dbPath))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestSQLMigrationEdgeCases tests various edge cases where the migration will
|
||||
// still be successful but the final states of the KVStore and SQLStore
|
||||
// will differ slightly.
|
||||
func TestSQLMigrationEdgeCases(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := context.Background()
|
||||
|
||||
var invalidTLVData = []byte{0x01, 0x02, 0x03}
|
||||
|
||||
// Here, we test that in the case where the KV store contains a node
|
||||
// with invalid TLV data, the migration will still succeed, but the
|
||||
// node will not end up in the SQL store.
|
||||
t.Run("node with bad tlv data", func(t *testing.T) {
|
||||
// Make one valid node and one node with invalid TLV data.
|
||||
n1 := makeTestNode(t)
|
||||
n2 := makeTestNode(t, func(n *models.LightningNode) {
|
||||
n.ExtraOpaqueData = invalidTLVData
|
||||
})
|
||||
|
||||
populateKV := func(t *testing.T, db *KVStore) {
|
||||
// Insert both nodes into the KV store.
|
||||
require.NoError(t, db.AddLightningNode(ctx, n1))
|
||||
require.NoError(t, db.AddLightningNode(ctx, n2))
|
||||
}
|
||||
|
||||
runTestMigration(t, populateKV, dbState{
|
||||
// We expect only the valid node to be present in the
|
||||
// SQL db.
|
||||
nodes: []*models.LightningNode{n1},
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// runTestMigration is a helper function that sets up the KVStore and SQLStore,
|
||||
// populates the KVStore with the provided call-back, runs the migration, and
|
||||
// asserts that the SQLStore contains the expected state.
|
||||
func runTestMigration(t *testing.T, populateKV func(t *testing.T, db *KVStore),
|
||||
expState dbState) {
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Set up our source kvdb DB.
|
||||
kvDB := setUpKVStore(t)
|
||||
|
||||
// Set up our destination SQL DB.
|
||||
sql, ok := NewTestDB(t).(*SQLStore)
|
||||
require.True(t, ok)
|
||||
|
||||
// Populate the kvdb store with the test data.
|
||||
populateKV(t, kvDB)
|
||||
|
||||
// Run the migration.
|
||||
err := MigrateGraphToSQL(
|
||||
ctx, kvDB.db, sql.db, testChain,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
assertResultState(t, sql, expState)
|
||||
}
|
||||
|
||||
// dbState describes the expected state of the SQLStore after a migration.
|
||||
type dbState struct {
|
||||
nodes []*models.LightningNode
|
||||
}
|
||||
|
||||
// assertResultState asserts that the SQLStore contains the expected
|
||||
// state after a migration.
|
||||
func assertResultState(t *testing.T, sql *SQLStore, expState dbState) {
|
||||
// Assert that the sql store contains the expected nodes.
|
||||
require.ElementsMatch(t, expState.nodes, fetchAllNodes(t, sql))
|
||||
}
|
@@ -6,14 +6,12 @@ import (
|
||||
"database/sql"
|
||||
"testing"
|
||||
|
||||
"github.com/btcsuite/btcd/chaincfg"
|
||||
"github.com/lightningnetwork/lnd/sqldb"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// NewTestDB is a helper function that creates a SQLStore backed by a postgres
|
||||
// database for testing.
|
||||
func NewTestDB(t testing.TB) V1Store {
|
||||
// newBatchQuerier creates a new BatchedSQLQueries instance for testing
|
||||
// using a PostgreSQL database fixture.
|
||||
func newBatchQuerier(t testing.TB) BatchedSQLQueries {
|
||||
pgFixture := sqldb.NewTestPgFixture(
|
||||
t, sqldb.DefaultPostgresFixtureLifetime,
|
||||
)
|
||||
@@ -23,18 +21,9 @@ func NewTestDB(t testing.TB) V1Store {
|
||||
|
||||
db := sqldb.NewTestPostgresDB(t, pgFixture).BaseDB
|
||||
|
||||
executor := sqldb.NewTransactionExecutor(
|
||||
return sqldb.NewTransactionExecutor(
|
||||
db, func(tx *sql.Tx) SQLQueries {
|
||||
return db.WithTx(tx)
|
||||
},
|
||||
)
|
||||
|
||||
store, err := NewSQLStore(
|
||||
&SQLStoreConfig{
|
||||
ChainHash: *chaincfg.MainNetParams.GenesisHash,
|
||||
}, executor,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
return store
|
||||
}
|
||||
|
23
graph/db/test_sql.go
Normal file
23
graph/db/test_sql.go
Normal file
@@ -0,0 +1,23 @@
|
||||
//go:build test_db_postgres || test_db_sqlite
|
||||
|
||||
package graphdb
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/btcsuite/btcd/chaincfg"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// NewTestDB is a helper function that creates a SQLStore backed by a SQL
|
||||
// database for testing.
|
||||
func NewTestDB(t testing.TB) V1Store {
|
||||
store, err := NewSQLStore(
|
||||
&SQLStoreConfig{
|
||||
ChainHash: *chaincfg.MainNetParams.GenesisHash,
|
||||
}, newBatchQuerier(t),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
return store
|
||||
}
|
@@ -1,4 +1,4 @@
|
||||
//go:build !test_db_posgres && test_db_sqlite
|
||||
//go:build !test_db_postgres && test_db_sqlite
|
||||
|
||||
package graphdb
|
||||
|
||||
@@ -6,28 +6,17 @@ import (
|
||||
"database/sql"
|
||||
"testing"
|
||||
|
||||
"github.com/btcsuite/btcd/chaincfg"
|
||||
"github.com/lightningnetwork/lnd/sqldb"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// NewTestDB is a helper function that creates a SQLStore backed by a sqlite
|
||||
// database for testing.
|
||||
func NewTestDB(t testing.TB) V1Store {
|
||||
// newBatchQuerier creates a new BatchedSQLQueries instance for testing
|
||||
// using a SQLite database.
|
||||
func newBatchQuerier(t testing.TB) BatchedSQLQueries {
|
||||
db := sqldb.NewTestSqliteDB(t).BaseDB
|
||||
|
||||
executor := sqldb.NewTransactionExecutor(
|
||||
return sqldb.NewTransactionExecutor(
|
||||
db, func(tx *sql.Tx) SQLQueries {
|
||||
return db.WithTx(tx)
|
||||
},
|
||||
)
|
||||
|
||||
store, err := NewSQLStore(
|
||||
&SQLStoreConfig{
|
||||
ChainHash: *chaincfg.MainNetParams.GenesisHash,
|
||||
}, executor,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
return store
|
||||
}
|
||||
|
@@ -6,17 +6,14 @@ import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/lightningnetwork/lnd/graph/db/models"
|
||||
"github.com/lightningnetwork/lnd/kvdb"
|
||||
"github.com/lightningnetwork/lnd/lntypes"
|
||||
"github.com/lightningnetwork/lnd/sqldb"
|
||||
"github.com/lightningnetwork/lnd/sqldb/sqlc"
|
||||
"github.com/pmezard/go-difflib/difflib"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
@@ -53,11 +50,6 @@ var (
|
||||
//
|
||||
// addIndexNo => invoiceKey
|
||||
addIndexBucket = []byte("invoice-add-index")
|
||||
|
||||
// ErrMigrationMismatch is returned when the migrated invoice does not
|
||||
// match the original invoice.
|
||||
ErrMigrationMismatch = fmt.Errorf("migrated invoice does not match " +
|
||||
"original invoice")
|
||||
)
|
||||
|
||||
// createInvoiceHashIndex generates a hash index that contains payment hashes
|
||||
@@ -548,24 +540,9 @@ func migrateInvoices(ctx context.Context, tx *sqlc.Queries,
|
||||
// Override the add index before checking for equality.
|
||||
migratedInvoice.AddIndex = invoice.AddIndex
|
||||
|
||||
if !reflect.DeepEqual(invoice, *migratedInvoice) {
|
||||
diff := difflib.UnifiedDiff{
|
||||
A: difflib.SplitLines(
|
||||
spew.Sdump(invoice),
|
||||
),
|
||||
B: difflib.SplitLines(
|
||||
spew.Sdump(migratedInvoice),
|
||||
),
|
||||
FromFile: "Expected",
|
||||
FromDate: "",
|
||||
ToFile: "Actual",
|
||||
ToDate: "",
|
||||
Context: 3,
|
||||
}
|
||||
diffText, _ := difflib.GetUnifiedDiffString(diff)
|
||||
|
||||
return fmt.Errorf("%w: %v.\n%v", ErrMigrationMismatch,
|
||||
paymentHash, diffText)
|
||||
err = sqldb.CompareRecords(invoice, *migratedInvoice, "invoice")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -2,11 +2,13 @@ module github.com/lightningnetwork/lnd/sqldb
|
||||
|
||||
require (
|
||||
github.com/btcsuite/btclog/v2 v2.0.1-0.20250602222548-9967d19bb084
|
||||
github.com/davecgh/go-spew v1.1.1
|
||||
github.com/golang-migrate/migrate/v4 v4.17.0
|
||||
github.com/jackc/pgconn v1.14.3
|
||||
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438
|
||||
github.com/jackc/pgx/v5 v5.3.1
|
||||
github.com/ory/dockertest/v3 v3.10.0
|
||||
github.com/pmezard/go-difflib v1.0.0
|
||||
github.com/stretchr/testify v1.9.0
|
||||
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8
|
||||
modernc.org/sqlite v1.29.10
|
||||
@@ -19,7 +21,6 @@ require (
|
||||
github.com/btcsuite/btclog v0.0.0-20241003133417-09c4e92e319c // indirect
|
||||
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
|
||||
github.com/containerd/continuity v0.3.0 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/docker/cli v20.10.17+incompatible // indirect
|
||||
github.com/docker/docker v24.0.7+incompatible // indirect
|
||||
github.com/docker/go-connections v0.4.0 // indirect
|
||||
@@ -45,7 +46,6 @@ require (
|
||||
github.com/opencontainers/image-spec v1.0.2 // indirect
|
||||
github.com/opencontainers/runc v1.1.5 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||
github.com/rogpeppe/go-internal v1.12.0 // indirect
|
||||
github.com/sirupsen/logrus v1.9.2 // indirect
|
||||
|
@@ -9,14 +9,17 @@ import (
|
||||
"io"
|
||||
"io/fs"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/btcsuite/btclog/v2"
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/golang-migrate/migrate/v4"
|
||||
"github.com/golang-migrate/migrate/v4/database"
|
||||
"github.com/golang-migrate/migrate/v4/source/httpfs"
|
||||
"github.com/lightningnetwork/lnd/sqldb/sqlc"
|
||||
"github.com/pmezard/go-difflib/difflib"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -71,6 +74,11 @@ var (
|
||||
// user if necessary.
|
||||
},
|
||||
}, migrationAdditions...)
|
||||
|
||||
// ErrMigrationMismatch is returned when a migrated record does not
|
||||
// match the original record.
|
||||
ErrMigrationMismatch = fmt.Errorf("migrated record does not match " +
|
||||
"original record")
|
||||
)
|
||||
|
||||
// MigrationConfig is a configuration struct that describes SQL migrations. Each
|
||||
@@ -472,3 +480,25 @@ func ApplyMigrations(ctx context.Context, db *BaseDB,
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CompareRecords checks if the original and migrated objects are equal. If
|
||||
// they are not, it returns an error with a unified diff of the two objects.
|
||||
func CompareRecords(original, migrated any, identifier string) error {
|
||||
if reflect.DeepEqual(original, migrated) {
|
||||
return nil
|
||||
}
|
||||
|
||||
diff := difflib.UnifiedDiff{
|
||||
A: difflib.SplitLines(spew.Sdump(original)),
|
||||
B: difflib.SplitLines(spew.Sdump(migrated)),
|
||||
FromFile: "Expected",
|
||||
FromDate: "",
|
||||
ToFile: "Actual",
|
||||
ToDate: "",
|
||||
Context: 3,
|
||||
}
|
||||
diffText, _ := difflib.GetUnifiedDiffString(diff)
|
||||
|
||||
return fmt.Errorf("%w: %s.\n%v", ErrMigrationMismatch, identifier,
|
||||
diffText)
|
||||
}
|
||||
|
Reference in New Issue
Block a user