mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-08-26 21:51:27 +02:00
Merge pull request #10071 from ellemouton/graphSQLMigPlugIn
lnd+itest: plug in graph SQL migration under test tag & add itest
This commit is contained in:
@@ -1097,7 +1097,7 @@ func (d *DefaultDatabaseBuilder) BuildDatabase(
|
||||
// migration's version (7), it will be skipped permanently,
|
||||
// regardless of the flag.
|
||||
if !d.cfg.DB.SkipNativeSQLMigration {
|
||||
migrationFn := func(tx *sqlc.Queries) error {
|
||||
invoiceMig := func(tx *sqlc.Queries) error {
|
||||
err := invoices.MigrateInvoicesToSQL(
|
||||
ctx, dbs.ChanStateDB.Backend,
|
||||
dbs.ChanStateDB, tx,
|
||||
@@ -1119,11 +1119,22 @@ func (d *DefaultDatabaseBuilder) BuildDatabase(
|
||||
// Make sure we attach the custom migration function to
|
||||
// the correct migration version.
|
||||
for i := 0; i < len(migrations); i++ {
|
||||
if migrations[i].Version != invoiceMigration {
|
||||
version := migrations[i].Version
|
||||
if version == invoiceMigration {
|
||||
migrations[i].MigrationFn = invoiceMig
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
migrations[i].MigrationFn = migrationFn
|
||||
migFn, ok := getSQLMigration(
|
||||
ctx, version, dbs.ChanStateDB.Backend,
|
||||
*d.cfg.ActiveNetParams.GenesisHash,
|
||||
)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
migrations[i].MigrationFn = migFn
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -3,11 +3,19 @@
|
||||
package lnd
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
graphdb "github.com/lightningnetwork/lnd/graph/db"
|
||||
"github.com/lightningnetwork/lnd/kvdb"
|
||||
"github.com/lightningnetwork/lnd/sqldb"
|
||||
"github.com/lightningnetwork/lnd/sqldb/sqlc"
|
||||
)
|
||||
|
||||
// RunTestSQLMigration is a build tag that indicates whether the test_native_sql
|
||||
// build tag is set.
|
||||
var RunTestSQLMigration = false
|
||||
|
||||
// getGraphStore returns a graphdb.V1Store backed by a graphdb.KVStore
|
||||
// implementation.
|
||||
func (d *DefaultDatabaseBuilder) getGraphStore(_ *sqldb.BaseDB,
|
||||
@@ -16,3 +24,15 @@ func (d *DefaultDatabaseBuilder) getGraphStore(_ *sqldb.BaseDB,
|
||||
|
||||
return graphdb.NewKVStore(kvBackend, opts...)
|
||||
}
|
||||
|
||||
// getSQLMigration returns a migration function for the given version.
|
||||
//
|
||||
// NOTE: this is a no-op for the production build since all migrations that are
|
||||
// in production will also be in development builds, and so they are not
|
||||
// defined behind a build tag.
|
||||
func getSQLMigration(ctx context.Context, version int,
|
||||
kvBackend kvdb.Backend,
|
||||
chain chainhash.Hash) (func(tx *sqlc.Queries) error, bool) {
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
@@ -3,13 +3,21 @@
|
||||
package lnd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
graphdb "github.com/lightningnetwork/lnd/graph/db"
|
||||
"github.com/lightningnetwork/lnd/kvdb"
|
||||
"github.com/lightningnetwork/lnd/sqldb"
|
||||
"github.com/lightningnetwork/lnd/sqldb/sqlc"
|
||||
)
|
||||
|
||||
// RunTestSQLMigration is a build tag that indicates whether the test_native_sql
|
||||
// build tag is set.
|
||||
var RunTestSQLMigration = true
|
||||
|
||||
// getGraphStore returns a graphdb.V1Store backed by a graphdb.SQLStore
|
||||
// implementation.
|
||||
func (d *DefaultDatabaseBuilder) getGraphStore(baseDB *sqldb.BaseDB,
|
||||
@@ -29,3 +37,32 @@ func (d *DefaultDatabaseBuilder) getGraphStore(baseDB *sqldb.BaseDB,
|
||||
graphExecutor, opts...,
|
||||
)
|
||||
}
|
||||
|
||||
// graphSQLMigration is the version number for the graph migration
|
||||
// that migrates the KV graph to the native SQL schema.
|
||||
const graphSQLMigration = 9
|
||||
|
||||
// getSQLMigration returns a migration function for the given version.
|
||||
func getSQLMigration(ctx context.Context, version int,
|
||||
kvBackend kvdb.Backend,
|
||||
chain chainhash.Hash) (func(tx *sqlc.Queries) error, bool) {
|
||||
|
||||
switch version {
|
||||
case graphSQLMigration:
|
||||
return func(tx *sqlc.Queries) error {
|
||||
err := graphdb.MigrateGraphToSQL(
|
||||
ctx, kvBackend, tx, chain,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to migrate graph "+
|
||||
"to SQL: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}, true
|
||||
}
|
||||
|
||||
// No version was matched, so we return false to indicate that no
|
||||
// migration is known for the given version.
|
||||
return nil, false
|
||||
}
|
||||
|
@@ -683,6 +683,10 @@ var allTestCases = []*lntest.TestCase{
|
||||
Name: "invoice migration",
|
||||
TestFunc: testInvoiceMigration,
|
||||
},
|
||||
{
|
||||
Name: "graph migration",
|
||||
TestFunc: testGraphMigration,
|
||||
},
|
||||
{
|
||||
Name: "payment address mismatch",
|
||||
TestFunc: testWrongPaymentAddr,
|
||||
|
151
itest/lnd_graph_migration_test.go
Normal file
151
itest/lnd_graph_migration_test.go
Normal file
@@ -0,0 +1,151 @@
|
||||
package itest
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/lightningnetwork/lnd"
|
||||
graphdb "github.com/lightningnetwork/lnd/graph/db"
|
||||
"github.com/lightningnetwork/lnd/graph/db/models"
|
||||
"github.com/lightningnetwork/lnd/lntest"
|
||||
"github.com/lightningnetwork/lnd/lntest/node"
|
||||
"github.com/lightningnetwork/lnd/sqldb"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// testGraphMigration tests that the graph migration from the old KV store to
|
||||
// the new native SQL store works as expected.
|
||||
func testGraphMigration(ht *lntest.HarnessTest) {
|
||||
if !lnd.RunTestSQLMigration {
|
||||
ht.Skip("not running with test_native_sql tag")
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
alice := ht.NewNodeWithCoins("Alice", nil)
|
||||
|
||||
// Make sure we run the test with SQLite or Postgres.
|
||||
if alice.Cfg.DBBackend != node.BackendSqlite &&
|
||||
alice.Cfg.DBBackend != node.BackendPostgres {
|
||||
|
||||
ht.Skip("node not running with SQLite or Postgres")
|
||||
}
|
||||
|
||||
// Skip the test if the node is already running with native SQL.
|
||||
if alice.Cfg.NativeSQL {
|
||||
ht.Skip("node already running with native SQL")
|
||||
}
|
||||
|
||||
// Spin up a mini network and then connect Alice to it.
|
||||
chans, nodes := ht.CreateSimpleNetwork(
|
||||
[][]string{nil, nil, nil, nil},
|
||||
lntest.OpenChannelParams{Amt: chanAmt},
|
||||
)
|
||||
|
||||
// The expected number of nodes in the graph will include those spun up
|
||||
// above plus Alice.
|
||||
expNumNodes := len(nodes) + 1
|
||||
expNumChans := len(chans)
|
||||
require.Equal(ht, 5, expNumNodes)
|
||||
require.Equal(ht, 3, expNumChans)
|
||||
|
||||
// Connect Alice to one of the nodes. Alice should now perform a graph
|
||||
// sync with the node.
|
||||
ht.EnsureConnected(alice, nodes[0])
|
||||
|
||||
// Wait for Alice to have a full view of the graph.
|
||||
ht.AssertNumEdges(alice, expNumChans, false)
|
||||
|
||||
// Now stop Alice so we can open the DB for examination.
|
||||
require.NoError(ht, alice.Stop())
|
||||
|
||||
// Open the KV store channel graph DB.
|
||||
db, err := graphdb.NewKVStore(openKVBackend(ht, alice))
|
||||
require.NoError(ht, err)
|
||||
|
||||
// assertDBState is a helper function that asserts the state of the
|
||||
// graph DB.
|
||||
assertDBState := func(db graphdb.V1Store) {
|
||||
var (
|
||||
numNodes int
|
||||
edges = make(map[uint64]bool)
|
||||
)
|
||||
err := db.ForEachNode(ctx, func(tx graphdb.NodeRTx) error {
|
||||
numNodes++
|
||||
|
||||
// For each node, also count the number of edges.
|
||||
return tx.ForEachChannel(
|
||||
func(info *models.ChannelEdgeInfo,
|
||||
_ *models.ChannelEdgePolicy,
|
||||
_ *models.ChannelEdgePolicy) error {
|
||||
|
||||
edges[info.ChannelID] = true
|
||||
return nil
|
||||
},
|
||||
)
|
||||
})
|
||||
require.NoError(ht, err)
|
||||
require.Equal(ht, expNumNodes, numNodes)
|
||||
require.Equal(ht, expNumChans, len(edges))
|
||||
}
|
||||
assertDBState(db)
|
||||
|
||||
alice.SetExtraArgs([]string{"--db.use-native-sql"})
|
||||
|
||||
// Now run the migration flow three times to ensure that each run is
|
||||
// idempotent.
|
||||
for i := 0; i < 3; i++ {
|
||||
// Start Alice with the native SQL flag set. This will trigger
|
||||
// the migration to run.
|
||||
require.NoError(ht, alice.Start(ht.Context()))
|
||||
|
||||
// At this point the migration should have completed and the
|
||||
// node should be running with native SQL. Now we'll stop Alice
|
||||
// again so we can safely examine the database.
|
||||
require.NoError(ht, alice.Stop())
|
||||
|
||||
// Now we'll open the database with the native SQL backend and
|
||||
// fetch the graph data again to ensure that it was migrated
|
||||
// correctly.
|
||||
sqlGraphDB := openNativeSQLGraphDB(ht, alice)
|
||||
assertDBState(sqlGraphDB)
|
||||
}
|
||||
|
||||
// Now restart Alice without the --db.use-native-sql flag so we can
|
||||
// check that the KV tombstone was set and that Alice will fail to
|
||||
// start.
|
||||
// NOTE: this is the same tombstone used for the graph migration. Only
|
||||
// one tombstone is needed since we just need one to represent the fact
|
||||
// that the switch to native SQL has been made.
|
||||
require.NoError(ht, alice.Stop())
|
||||
alice.SetExtraArgs(nil)
|
||||
|
||||
// Alice should now fail to start due to the tombstone being set.
|
||||
require.NoError(ht, alice.StartLndCmd(ht.Context()))
|
||||
require.ErrorContains(ht, alice.WaitForProcessExit(), "exit status 1")
|
||||
|
||||
// Start Alice again so the test can complete.
|
||||
alice.SetExtraArgs([]string{"--db.use-native-sql"})
|
||||
require.NoError(ht, alice.Start(ht.Context()))
|
||||
}
|
||||
|
||||
func openNativeSQLGraphDB(ht *lntest.HarnessTest,
|
||||
hn *node.HarnessNode) graphdb.V1Store {
|
||||
|
||||
db := openNativeSQLDB(ht, hn)
|
||||
|
||||
executor := sqldb.NewTransactionExecutor(
|
||||
db, func(tx *sql.Tx) graphdb.SQLQueries {
|
||||
return db.WithTx(tx)
|
||||
},
|
||||
)
|
||||
|
||||
store, err := graphdb.NewSQLStore(
|
||||
&graphdb.SQLStoreConfig{
|
||||
ChainHash: *ht.Miner().ActiveNet.GenesisHash,
|
||||
},
|
||||
executor,
|
||||
)
|
||||
require.NoError(ht, err)
|
||||
|
||||
return store
|
||||
}
|
@@ -21,8 +21,9 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func openChannelDB(ht *lntest.HarnessTest, hn *node.HarnessNode) *channeldb.DB {
|
||||
func openKVBackend(ht *lntest.HarnessTest, hn *node.HarnessNode) kvdb.Backend {
|
||||
sqlbase.Init(0)
|
||||
|
||||
var (
|
||||
backend kvdb.Backend
|
||||
err error
|
||||
@@ -53,15 +54,28 @@ func openChannelDB(ht *lntest.HarnessTest, hn *node.HarnessNode) *channeldb.DB {
|
||||
require.NoError(ht, err)
|
||||
}
|
||||
|
||||
db, err := channeldb.CreateWithBackend(backend)
|
||||
require.NoError(ht, err)
|
||||
|
||||
return db
|
||||
return backend
|
||||
}
|
||||
|
||||
func openNativeSQLInvoiceDB(ht *lntest.HarnessTest,
|
||||
hn *node.HarnessNode) invoices.InvoiceDB {
|
||||
|
||||
db := openNativeSQLDB(ht, hn)
|
||||
|
||||
executor := sqldb.NewTransactionExecutor(
|
||||
db, func(tx *sql.Tx) invoices.SQLInvoiceQueries {
|
||||
return db.WithTx(tx)
|
||||
},
|
||||
)
|
||||
|
||||
return invoices.NewSQLStore(
|
||||
executor, clock.NewDefaultClock(),
|
||||
)
|
||||
}
|
||||
|
||||
func openNativeSQLDB(ht *lntest.HarnessTest,
|
||||
hn *node.HarnessNode) *sqldb.BaseDB {
|
||||
|
||||
var db *sqldb.BaseDB
|
||||
|
||||
switch hn.Cfg.DBBackend {
|
||||
@@ -90,15 +104,7 @@ func openNativeSQLInvoiceDB(ht *lntest.HarnessTest,
|
||||
db = postgresStore.BaseDB
|
||||
}
|
||||
|
||||
executor := sqldb.NewTransactionExecutor(
|
||||
db, func(tx *sql.Tx) invoices.SQLInvoiceQueries {
|
||||
return db.WithTx(tx)
|
||||
},
|
||||
)
|
||||
|
||||
return invoices.NewSQLStore(
|
||||
executor, clock.NewDefaultClock(),
|
||||
)
|
||||
return db
|
||||
}
|
||||
|
||||
// clampTime truncates the time of the passed invoice to the microsecond level.
|
||||
@@ -238,7 +244,8 @@ func testInvoiceMigration(ht *lntest.HarnessTest) {
|
||||
require.NoError(ht, bob.Stop())
|
||||
|
||||
// Open the KV channel DB.
|
||||
db := openChannelDB(ht, bob)
|
||||
db, err := channeldb.CreateWithBackend(openKVBackend(ht, bob))
|
||||
require.NoError(ht, err)
|
||||
|
||||
query := invoices.InvoiceQuery{
|
||||
IndexOffset: 0,
|
||||
|
@@ -1,7 +1,6 @@
|
||||
package itest
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"os"
|
||||
@@ -1355,14 +1354,11 @@ func testGRPCNotFound(ht *lntest.HarnessTest) {
|
||||
notFoundErr = codes.NotFound.String()
|
||||
unknownPub = "0286098b97bc843372b4426d4b276cea9aa2f48f0428d6" +
|
||||
"f5b66ae101befc14f8b4"
|
||||
rHash = make([]byte, 32)
|
||||
rHash = ht.Random32Bytes()
|
||||
)
|
||||
unknownPubBytes, err := route.NewVertexFromStr(unknownPub)
|
||||
require.NoError(ht, err)
|
||||
|
||||
_, err = rand.Read(rHash)
|
||||
require.NoError(ht, err)
|
||||
|
||||
alice := ht.NewNode("Alice", []string{
|
||||
// We add this flag so that we can test the
|
||||
// LookupHTLCResolutionAssertErr endpoint.
|
||||
|
@@ -8,4 +8,13 @@ var migrationAdditions = []MigrationConfig{
|
||||
Version: 8,
|
||||
SchemaVersion: 7,
|
||||
},
|
||||
{
|
||||
Name: "kv_graph_migration",
|
||||
Version: 9,
|
||||
SchemaVersion: 7,
|
||||
// A migration function may be attached to this
|
||||
// migration to migrate KV graph to the native SQL
|
||||
// schema. This is optional and can be disabled by the
|
||||
// user if necessary.
|
||||
},
|
||||
}
|
||||
|
Reference in New Issue
Block a user