mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-09-29 21:42:49 +02:00
Merge pull request #10109 from ellemouton/graphPerf1
graph/db: helper tests and some benchmarks for SQL
This commit is contained in:
693
graph/db/benchmark_test.go
Normal file
693
graph/db/benchmark_test.go
Normal file
@@ -0,0 +1,693 @@
|
||||
package graphdb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/btcsuite/btcd/chaincfg"
|
||||
"github.com/btcsuite/btclog/v2"
|
||||
"github.com/lightningnetwork/lnd/batch"
|
||||
"github.com/lightningnetwork/lnd/graph/db/models"
|
||||
"github.com/lightningnetwork/lnd/kvdb"
|
||||
"github.com/lightningnetwork/lnd/kvdb/postgres"
|
||||
"github.com/lightningnetwork/lnd/kvdb/sqlbase"
|
||||
"github.com/lightningnetwork/lnd/kvdb/sqlite"
|
||||
"github.com/lightningnetwork/lnd/sqldb"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
// Here we define various database paths, connection strings and file names that
|
||||
// we will use to open the database connections. These should be changed to
|
||||
// point to your actual local test databases.
|
||||
const (
|
||||
bboltDBPath = "testdata/kvdb"
|
||||
kvdbSqlitePath = "testdata/kvdb"
|
||||
nativeSQLSqlitePath = "testdata"
|
||||
kvdbPostgresDNS = "postgres://test@localhost/graphbenchmark_kvdb"
|
||||
nativeSQLPostgresDNS = "postgres://test@localhost/graphbenchmark"
|
||||
|
||||
kvdbSqliteFile = "channel.sqlite"
|
||||
kvdbBBoltFile = "channel.db"
|
||||
nativeSQLSqliteFile = "lnd.sqlite"
|
||||
|
||||
testMaxSQLiteConnections = 2
|
||||
testMaxPostgresConnections = 50
|
||||
testSQLBusyTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
// Here we define some variables that will be used to configure the graph stores
|
||||
// we open for testing. These can be modified to suit your testing needs.
|
||||
var (
|
||||
// dbTestChain is the chain hash used for initialising the test
|
||||
// databases. This should be changed to match the chain hash of the
|
||||
// database you are testing against.
|
||||
dbTestChain = *chaincfg.MainNetParams.GenesisHash
|
||||
|
||||
// testStoreOptions is used to configure the graph stores we open for
|
||||
// testing.
|
||||
testStoreOptions = []StoreOptionModifier{
|
||||
WithBatchCommitInterval(500 * time.Millisecond),
|
||||
}
|
||||
|
||||
// testSQLPaginationCfg is used to configure the pagination settings for
|
||||
// the SQL stores we open for testing.
|
||||
testSQLPaginationCfg = sqldb.DefaultPagedQueryConfig()
|
||||
|
||||
// testSqlitePragmaOpts is a set of SQLite pragma options that we apply
|
||||
// to the SQLite databases we open for testing.
|
||||
testSqlitePragmaOpts = []string{
|
||||
"synchronous=full",
|
||||
"auto_vacuum=incremental",
|
||||
"fullfsync=true",
|
||||
}
|
||||
)
|
||||
|
||||
// dbConnection is a struct that holds the name of the database connection
|
||||
// and a function to open the connection.
|
||||
type dbConnection struct {
|
||||
name string
|
||||
open func(testing.TB) V1Store
|
||||
}
|
||||
|
||||
// This var block defines the various database connections that we will use
|
||||
// for testing. Each connection is defined as a dbConnection struct that
|
||||
// contains a name and an open function. The open function is used to create
|
||||
// a new V1Store instance for the given database type.
|
||||
var (
|
||||
// kvdbBBoltConn is a connection to a kvdb-bbolt database called
|
||||
// channel.db.
|
||||
kvdbBBoltConn = dbConnection{
|
||||
name: "kvdb-bbolt",
|
||||
open: func(b testing.TB) V1Store {
|
||||
return connectBBoltDB(b, bboltDBPath, kvdbBBoltFile)
|
||||
},
|
||||
}
|
||||
|
||||
// kvdbSqliteConn is a connection to a kvdb-sqlite database called
|
||||
// channel.sqlite.
|
||||
kvdbSqliteConn = dbConnection{
|
||||
name: "kvdb-sqlite",
|
||||
open: func(b testing.TB) V1Store {
|
||||
return connectKVDBSqlite(
|
||||
b, kvdbSqlitePath, kvdbSqliteFile,
|
||||
)
|
||||
},
|
||||
}
|
||||
|
||||
// nativeSQLSqliteConn is a connection to a native SQL sqlite database
|
||||
// called lnd.sqlite.
|
||||
nativeSQLSqliteConn = dbConnection{
|
||||
name: "native-sqlite",
|
||||
open: func(b testing.TB) V1Store {
|
||||
return connectNativeSQLite(
|
||||
b, nativeSQLSqlitePath, nativeSQLSqliteFile,
|
||||
)
|
||||
},
|
||||
}
|
||||
|
||||
// kvdbPostgresConn is a connection to a kvdb-postgres database
|
||||
// using a postgres connection string.
|
||||
kvdbPostgresConn = dbConnection{
|
||||
name: "kvdb-postgres",
|
||||
open: func(b testing.TB) V1Store {
|
||||
return connectKVDBPostgres(b, kvdbPostgresDNS)
|
||||
},
|
||||
}
|
||||
|
||||
// nativeSQLPostgresConn is a connection to a native SQL postgres
|
||||
// database using a postgres connection string.
|
||||
nativeSQLPostgresConn = dbConnection{
|
||||
name: "native-postgres",
|
||||
open: func(b testing.TB) V1Store {
|
||||
return connectNativePostgres(b, nativeSQLPostgresDNS)
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
// connectNativePostgres creates a V1Store instance backed by a native Postgres
|
||||
// database for testing purposes.
|
||||
func connectNativePostgres(t testing.TB, dsn string) V1Store {
|
||||
return newSQLStore(t, sqlPostgres(t, dsn))
|
||||
}
|
||||
|
||||
// sqlPostgres creates a sqldb.DB instance backed by a native Postgres database
|
||||
// for testing purposes.
|
||||
func sqlPostgres(t testing.TB, dsn string) BatchedSQLQueries {
|
||||
store, err := sqldb.NewPostgresStore(&sqldb.PostgresConfig{
|
||||
Dsn: dsn,
|
||||
MaxConnections: testMaxPostgresConnections,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, store.Close())
|
||||
})
|
||||
|
||||
return newSQLExecutor(t, store)
|
||||
}
|
||||
|
||||
// connectNativeSQLite creates a V1Store instance backed by a native SQLite
|
||||
// database for testing purposes.
|
||||
func connectNativeSQLite(t testing.TB, dbPath, file string) V1Store {
|
||||
return newSQLStore(t, sqlSQLite(t, dbPath, file))
|
||||
}
|
||||
|
||||
// sqlSQLite creates a sqldb.DB instance backed by a native SQLite database for
|
||||
// testing purposes.
|
||||
func sqlSQLite(t testing.TB, dbPath, file string) BatchedSQLQueries {
|
||||
store, err := sqldb.NewSqliteStore(
|
||||
&sqldb.SqliteConfig{
|
||||
MaxConnections: testMaxSQLiteConnections,
|
||||
BusyTimeout: testSQLBusyTimeout,
|
||||
PragmaOptions: testSqlitePragmaOpts,
|
||||
},
|
||||
path.Join(dbPath, file),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, store.Close())
|
||||
})
|
||||
|
||||
return newSQLExecutor(t, store)
|
||||
}
|
||||
|
||||
// kvdbPostgres creates a kvdb.Backend instance backed by a kvdb-postgres
|
||||
// database for testing purposes.
|
||||
func kvdbPostgres(t testing.TB, dsn string) kvdb.Backend {
|
||||
kvStore, err := kvdb.Open(
|
||||
kvdb.PostgresBackendName, context.Background(),
|
||||
&postgres.Config{
|
||||
Dsn: dsn,
|
||||
MaxConnections: testMaxPostgresConnections,
|
||||
},
|
||||
// NOTE: we use the raw string here else we get an
|
||||
// import cycle if we try to import lncfg.NSChannelDB.
|
||||
"channeldb",
|
||||
)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, kvStore.Close())
|
||||
})
|
||||
|
||||
return kvStore
|
||||
}
|
||||
|
||||
// connectKVDBPostgres creates a V1Store instance backed by a kvdb-postgres
|
||||
// database for testing purposes.
|
||||
func connectKVDBPostgres(t testing.TB, dsn string) V1Store {
|
||||
return newKVStore(t, kvdbPostgres(t, dsn))
|
||||
}
|
||||
|
||||
// kvdbSqlite creates a kvdb.Backend instance backed by a kvdb-sqlite
|
||||
// database for testing purposes.
|
||||
func kvdbSqlite(t testing.TB, dbPath, fileName string) kvdb.Backend {
|
||||
sqlbase.Init(testMaxSQLiteConnections)
|
||||
kvStore, err := kvdb.Open(
|
||||
kvdb.SqliteBackendName, context.Background(),
|
||||
&sqlite.Config{
|
||||
BusyTimeout: testSQLBusyTimeout,
|
||||
MaxConnections: testMaxSQLiteConnections,
|
||||
PragmaOptions: testSqlitePragmaOpts,
|
||||
}, dbPath, fileName,
|
||||
// NOTE: we use the raw string here else we get an
|
||||
// import cycle if we try to import lncfg.NSChannelDB.
|
||||
"channeldb",
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
return kvStore
|
||||
}
|
||||
|
||||
// connectKVDBSqlite creates a V1Store instance backed by a kvdb-sqlite
|
||||
// database for testing purposes.
|
||||
func connectKVDBSqlite(t testing.TB, dbPath, fileName string) V1Store {
|
||||
return newKVStore(t, kvdbSqlite(t, dbPath, fileName))
|
||||
}
|
||||
|
||||
// connectBBoltDB creates a new BBolt database connection for testing.
|
||||
func connectBBoltDB(t testing.TB, dbPath, fileName string) V1Store {
|
||||
cfg := &kvdb.BoltBackendConfig{
|
||||
DBPath: dbPath,
|
||||
DBFileName: fileName,
|
||||
NoFreelistSync: true,
|
||||
AutoCompact: false,
|
||||
AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge,
|
||||
DBTimeout: kvdb.DefaultDBTimeout,
|
||||
}
|
||||
|
||||
kvStore, err := kvdb.GetBoltBackend(cfg)
|
||||
require.NoError(t, err)
|
||||
|
||||
return newKVStore(t, kvStore)
|
||||
}
|
||||
|
||||
// newKVStore creates a new KVStore instance for testing using a provided
|
||||
// kvdb.Backend instance.
|
||||
func newKVStore(t testing.TB, backend kvdb.Backend) V1Store {
|
||||
store, err := NewKVStore(backend, testStoreOptions...)
|
||||
require.NoError(t, err)
|
||||
|
||||
return store
|
||||
}
|
||||
|
||||
// newSQLExecutor creates a new BatchedSQLQueries instance for testing using a
|
||||
// provided sqldb.DB instance.
|
||||
func newSQLExecutor(t testing.TB, db sqldb.DB) BatchedSQLQueries {
|
||||
err := db.ApplyAllMigrations(
|
||||
context.Background(), sqldb.GetMigrations(),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
return sqldb.NewTransactionExecutor(
|
||||
db.GetBaseDB(), func(tx *sql.Tx) SQLQueries {
|
||||
return db.GetBaseDB().WithTx(tx)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// newSQLStore creates a new SQLStore instance for testing using a provided
|
||||
// sqldb.DB instance.
|
||||
func newSQLStore(t testing.TB, db BatchedSQLQueries) V1Store {
|
||||
store, err := NewSQLStore(
|
||||
&SQLStoreConfig{
|
||||
ChainHash: dbTestChain,
|
||||
PaginationCfg: testSQLPaginationCfg,
|
||||
},
|
||||
db, testStoreOptions...,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
return store
|
||||
}
|
||||
|
||||
// TestPopulateDBs is a helper test that can be used to populate various local
|
||||
// graph DBs from some source graph DB. This can then be used to run the
|
||||
// various benchmark tests against the same graph data.
|
||||
//
|
||||
// TODO(elle): this test reveals that the batching logic we use might be
|
||||
// problematic for postgres backends. This needs some investigation & it might
|
||||
// make sense to only use LazyAdd for sqlite backends & it may make sense to
|
||||
// also add a maximum batch size to avoid grouping too many updates at once.
|
||||
// Observations:
|
||||
// - the LazyAdd options need to be turned off for channel/policy update calls
|
||||
// for both the native SQL postgres & kvdb postgres backend for this test to
|
||||
// succeed.
|
||||
// - The LazyAdd option must be added for the sqlite backends, else it takes
|
||||
// very long to sync the graph.
|
||||
func TestPopulateDBs(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := context.Background()
|
||||
|
||||
// NOTE: uncomment the line below to run this test locally, then provide
|
||||
// the desired source database (and make sure the destination Postgres
|
||||
// databases exist and are running).
|
||||
t.Skipf("Skipping local helper test")
|
||||
|
||||
// Set your desired source database here. For kvdbSqliteConn, a file
|
||||
// called testdata/kvdb/channel.sqlite must exist and be populated with
|
||||
// a KVDB based channel graph.
|
||||
sourceDB := kvdbSqliteConn
|
||||
|
||||
// Populate this list with the desired destination databases.
|
||||
destinations := []dbConnection{
|
||||
kvdbBBoltConn,
|
||||
nativeSQLSqliteConn,
|
||||
kvdbPostgresConn,
|
||||
nativeSQLPostgresConn,
|
||||
}
|
||||
|
||||
// Open and start the source graph.
|
||||
src, err := NewChannelGraph(sourceDB.open(t))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, src.Start())
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, src.Stop())
|
||||
})
|
||||
|
||||
// countNodes is a helper function to count the number of nodes in the
|
||||
// graph.
|
||||
countNodes := func(graph *ChannelGraph) int {
|
||||
numNodes := 0
|
||||
err := graph.ForEachNode(ctx, func(tx NodeRTx) error {
|
||||
numNodes++
|
||||
return nil
|
||||
}, func() {
|
||||
numNodes = 0
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
return numNodes
|
||||
}
|
||||
|
||||
// countChannels is a helper function to count the number of channels
|
||||
// in the graph.
|
||||
countChannels := func(graph *ChannelGraph) (int, int) {
|
||||
var (
|
||||
numChans = 0
|
||||
numPolicies = 0
|
||||
)
|
||||
err := graph.ForEachChannel(
|
||||
ctx, func(info *models.ChannelEdgeInfo,
|
||||
policy,
|
||||
policy2 *models.ChannelEdgePolicy) error {
|
||||
|
||||
numChans++
|
||||
if policy != nil {
|
||||
numPolicies++
|
||||
}
|
||||
if policy2 != nil {
|
||||
numPolicies++
|
||||
}
|
||||
|
||||
return nil
|
||||
}, func() {
|
||||
numChans = 0
|
||||
numPolicies = 0
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
return numChans, numPolicies
|
||||
}
|
||||
|
||||
t.Logf("Number of nodes in source graph (%s): %d", sourceDB.name,
|
||||
countNodes(src))
|
||||
numChan, numPol := countChannels(src)
|
||||
t.Logf("Number of channels & policies in source graph (%s): %d "+
|
||||
"channels, %d policies", sourceDB.name, numChan, numPol)
|
||||
|
||||
for _, destDB := range destinations {
|
||||
t.Run(destDB.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Open and start the destination graph.
|
||||
dest, err := NewChannelGraph(destDB.open(t))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, dest.Start())
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, dest.Stop())
|
||||
})
|
||||
|
||||
t.Logf("Number of nodes in %s graph: %d", destDB.name,
|
||||
countNodes(dest))
|
||||
numChan, numPol := countChannels(dest)
|
||||
t.Logf("Number of channels in %s graph: %d, %d",
|
||||
destDB.name, numChan, numPol)
|
||||
|
||||
// Sync the source graph to the destination graph.
|
||||
syncGraph(t, src, dest)
|
||||
|
||||
t.Logf("Number of nodes in %s graph after sync: %d",
|
||||
destDB.name, countNodes(dest))
|
||||
numChan, numPol = countChannels(dest)
|
||||
t.Logf("Number of channels in %s graph after sync: "+
|
||||
"%d, %d", destDB.name, numChan, numPol)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestPopulateViaMigration is a helper test that can be used to populate a
|
||||
// local native SQL graph from a kvdb-sql graph using the migration logic.
|
||||
//
|
||||
// NOTE: the testPostgres variable can be set to true to test with a
|
||||
// postgres backend instead of the kvdb-sqlite backend.
|
||||
//
|
||||
// NOTE: this is a helper test and is not run by default.
|
||||
//
|
||||
// TODO(elle): this test reveals tht there may be an issue with the postgres
|
||||
// migration as it is super slow.
|
||||
func TestPopulateViaMigration(t *testing.T) {
|
||||
t.Skipf("Skipping local helper test")
|
||||
|
||||
// Set this to true if you want to test with a postgres backend.
|
||||
// By default, we use a kvdb-sqlite backend.
|
||||
testPostgres := false
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Set up a logger so we can see the migration progress.
|
||||
logger := btclog.NewDefaultHandler(os.Stdout)
|
||||
UseLogger(btclog.NewSLogger(logger))
|
||||
log.SetLevel(btclog.LevelDebug)
|
||||
|
||||
var (
|
||||
srcKVDB = kvdbSqlite(t, kvdbSqlitePath, kvdbSqliteFile)
|
||||
dstSQL = sqlSQLite(t, nativeSQLSqlitePath, nativeSQLSqliteFile)
|
||||
)
|
||||
if testPostgres {
|
||||
srcKVDB = kvdbPostgres(t, kvdbPostgresDNS)
|
||||
dstSQL = sqlPostgres(t, nativeSQLPostgresDNS)
|
||||
}
|
||||
|
||||
// Use the graph migration to populate the SQL graph from the
|
||||
// kvdb graph.
|
||||
err := dstSQL.ExecTx(
|
||||
ctx, sqldb.WriteTxOpt(), func(queries SQLQueries) error {
|
||||
return MigrateGraphToSQL(
|
||||
ctx, srcKVDB, queries, dbTestChain,
|
||||
)
|
||||
}, func() {},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// syncGraph synchronizes the source graph with the destination graph by
|
||||
// copying all nodes and channels from the source to the destination.
|
||||
func syncGraph(t *testing.T, src, dest *ChannelGraph) {
|
||||
ctx := context.Background()
|
||||
|
||||
var (
|
||||
s = rate.Sometimes{
|
||||
Interval: 10 * time.Second,
|
||||
}
|
||||
t0 = time.Now()
|
||||
|
||||
chunk = 0
|
||||
total = 0
|
||||
mu sync.Mutex
|
||||
)
|
||||
|
||||
reportNodeStats := func() {
|
||||
elapsed := time.Since(t0).Seconds()
|
||||
ratePerSec := float64(chunk) / elapsed
|
||||
t.Logf("Synced %d nodes (last chunk: %d) "+
|
||||
"(%.2f nodes/second)",
|
||||
total, chunk, ratePerSec)
|
||||
|
||||
t0 = time.Now()
|
||||
}
|
||||
|
||||
var wgNodes sync.WaitGroup
|
||||
err := src.ForEachNode(ctx, func(tx NodeRTx) error {
|
||||
wgNodes.Add(1)
|
||||
go func() {
|
||||
defer wgNodes.Done()
|
||||
|
||||
// NOTE: even though the transaction (tx) may have
|
||||
// already been aborted, it is still ok to use the
|
||||
// Node() result since that is a static object that
|
||||
// is not affected by the transaction state.
|
||||
err := dest.AddLightningNode(
|
||||
ctx, tx.Node(), batch.LazyAdd(),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
mu.Lock()
|
||||
total++
|
||||
chunk++
|
||||
s.Do(func() {
|
||||
reportNodeStats()
|
||||
chunk = 0
|
||||
})
|
||||
mu.Unlock()
|
||||
}()
|
||||
|
||||
return nil
|
||||
}, func() {})
|
||||
require.NoError(t, err)
|
||||
|
||||
wgNodes.Wait()
|
||||
reportNodeStats()
|
||||
t.Logf("Done syncing %d nodes", total)
|
||||
|
||||
total = 0
|
||||
chunk = 0
|
||||
t0 = time.Now()
|
||||
|
||||
reportChanStats := func() {
|
||||
elapsed := time.Since(t0).Seconds()
|
||||
ratePerSec := float64(chunk) / elapsed
|
||||
t.Logf("Synced %d channels (and its "+
|
||||
"policies) (last chunk: %d) "+
|
||||
"(%.2f channels/second)",
|
||||
total, chunk, ratePerSec)
|
||||
|
||||
t0 = time.Now()
|
||||
}
|
||||
|
||||
var wgChans sync.WaitGroup
|
||||
err = src.ForEachChannel(ctx, func(info *models.ChannelEdgeInfo,
|
||||
policy1, policy2 *models.ChannelEdgePolicy) error {
|
||||
|
||||
// Add each channel & policy. We do this in a goroutine to
|
||||
// take advantage of batch processing.
|
||||
wgChans.Add(1)
|
||||
go func() {
|
||||
defer wgChans.Done()
|
||||
|
||||
err := dest.AddChannelEdge(
|
||||
ctx, info, batch.LazyAdd(),
|
||||
)
|
||||
if !errors.Is(err, ErrEdgeAlreadyExist) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
if policy1 != nil {
|
||||
err = dest.UpdateEdgePolicy(
|
||||
ctx, policy1, batch.LazyAdd(),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
if policy2 != nil {
|
||||
err = dest.UpdateEdgePolicy(
|
||||
ctx, policy2, batch.LazyAdd(),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
total++
|
||||
chunk++
|
||||
s.Do(func() {
|
||||
reportChanStats()
|
||||
chunk = 0
|
||||
})
|
||||
mu.Unlock()
|
||||
}()
|
||||
|
||||
return nil
|
||||
}, func() {})
|
||||
require.NoError(t, err)
|
||||
|
||||
wgChans.Wait()
|
||||
reportChanStats()
|
||||
|
||||
t.Logf("Done syncing %d channels", total)
|
||||
}
|
||||
|
||||
// BenchmarkCacheLoading benchmarks how long it takes to load the in-memory
|
||||
// graph cache from a populated database.
|
||||
//
|
||||
// NOTE: this is to be run against a local graph database. It can be run
|
||||
// either against a kvdb-bbolt channel.db file, or a kvdb-sqlite channel.sqlite
|
||||
// file or a postgres connection containing the channel graph in kvdb format and
|
||||
// finally, it can be run against a native SQL sqlite or postgres database.
|
||||
//
|
||||
// NOTE: the TestPopulateDBs test helper can be used to populate a set of test
|
||||
// DBs from a single source db.
|
||||
func BenchmarkCacheLoading(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
|
||||
tests := []dbConnection{
|
||||
kvdbBBoltConn,
|
||||
kvdbSqliteConn,
|
||||
nativeSQLSqliteConn,
|
||||
kvdbPostgresConn,
|
||||
nativeSQLPostgresConn,
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
b.Run(test.name, func(b *testing.B) {
|
||||
store := test.open(b)
|
||||
|
||||
// Reset timer to exclude setup time.
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
b.StopTimer()
|
||||
graph, err := NewChannelGraph(store)
|
||||
require.NoError(b, err)
|
||||
b.StartTimer()
|
||||
|
||||
require.NoError(b, graph.populateCache(ctx))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// BenchmarkGraphReadMethods benchmarks various read calls of various V1Store
|
||||
// implementations.
|
||||
//
|
||||
// NOTE: this is to be run against a local graph database. It can be run
|
||||
// either against a kvdb-bbolt channel.db file, or a kvdb-sqlite channel.sqlite
|
||||
// file or a postgres connection containing the channel graph in kvdb format and
|
||||
// finally, it can be run against a native SQL sqlite or postgres database.
|
||||
//
|
||||
// NOTE: the TestPopulateDBs test helper can be used to populate a set of test
|
||||
// DBs from a single source db.
|
||||
func BenchmarkGraphReadMethods(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
|
||||
backends := []dbConnection{
|
||||
kvdbBBoltConn,
|
||||
kvdbSqliteConn,
|
||||
nativeSQLSqliteConn,
|
||||
kvdbPostgresConn,
|
||||
nativeSQLPostgresConn,
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
fn func(b testing.TB, store V1Store)
|
||||
}{
|
||||
{
|
||||
name: "ForEachNode",
|
||||
fn: func(b testing.TB, store V1Store) {
|
||||
err := store.ForEachNode(
|
||||
ctx, func(_ NodeRTx) error {
|
||||
return nil
|
||||
}, func() {},
|
||||
)
|
||||
require.NoError(b, err)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "ForEachChannel",
|
||||
fn: func(b testing.TB, store V1Store) {
|
||||
//nolint:ll
|
||||
err := store.ForEachChannel(
|
||||
ctx, func(_ *models.ChannelEdgeInfo,
|
||||
_ *models.ChannelEdgePolicy,
|
||||
_ *models.ChannelEdgePolicy) error {
|
||||
|
||||
return nil
|
||||
}, func() {},
|
||||
)
|
||||
require.NoError(b, err)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
for _, db := range backends {
|
||||
name := fmt.Sprintf("%s-%s", test.name, db.name)
|
||||
b.Run(name, func(b *testing.B) {
|
||||
store := db.open(b)
|
||||
|
||||
// Reset timer to exclude setup time.
|
||||
b.ResetTimer()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
test.fn(b, store)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,7 +1,7 @@
|
||||
package graphdb
|
||||
|
||||
import (
|
||||
"github.com/btcsuite/btclog"
|
||||
"github.com/btcsuite/btclog/v2"
|
||||
"github.com/lightningnetwork/lnd/build"
|
||||
)
|
||||
|
||||
|
@@ -17,6 +17,7 @@ import (
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
"github.com/lightningnetwork/lnd/sqldb"
|
||||
"github.com/lightningnetwork/lnd/sqldb/sqlc"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
// MigrateGraphToSQL migrates the graph store from a KV backend to a SQL
|
||||
@@ -115,6 +116,12 @@ func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
|
||||
var (
|
||||
count uint64
|
||||
skipped uint64
|
||||
|
||||
t0 = time.Now()
|
||||
chunk uint64
|
||||
s = rate.Sometimes{
|
||||
Interval: 10 * time.Second,
|
||||
}
|
||||
)
|
||||
|
||||
// Loop through each node in the KV store and insert it into the SQL
|
||||
@@ -146,6 +153,7 @@ func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
|
||||
}
|
||||
|
||||
count++
|
||||
chunk++
|
||||
|
||||
// TODO(elle): At this point, we should check the loaded node
|
||||
// to see if we should extract any DNS addresses from its
|
||||
@@ -203,9 +211,25 @@ func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
|
||||
},
|
||||
)
|
||||
|
||||
return sqldb.CompareRecords(
|
||||
err = sqldb.CompareRecords(
|
||||
node, migratedNode, fmt.Sprintf("node %x", pub),
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("node mismatch after migration "+
|
||||
"for node %x: %w", pub, err)
|
||||
}
|
||||
|
||||
s.Do(func() {
|
||||
elapsed := time.Since(t0).Seconds()
|
||||
ratePerSec := float64(chunk) / elapsed
|
||||
log.Debugf("Migrated %d nodes (%.2f nodes/sec)",
|
||||
count, ratePerSec)
|
||||
|
||||
t0 = time.Now()
|
||||
chunk = 0
|
||||
})
|
||||
|
||||
return nil
|
||||
}, func() {
|
||||
// No reset is needed since if a retry occurs, the entire
|
||||
// migration will be retried from the start.
|
||||
@@ -225,6 +249,8 @@ func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
|
||||
func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend,
|
||||
sqlDB SQLQueries) error {
|
||||
|
||||
log.Debugf("Migrating source node from KV to SQL")
|
||||
|
||||
sourceNode, err := sourceNode(kvdb)
|
||||
if errors.Is(err, ErrSourceNodeNotSet) {
|
||||
// If the source node has not been set yet, we can skip this
|
||||
@@ -302,6 +328,12 @@ func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend,
|
||||
skippedChanCount uint64
|
||||
policyCount uint64
|
||||
skippedPolicyCount uint64
|
||||
|
||||
t0 = time.Now()
|
||||
chunk uint64
|
||||
s = rate.Sometimes{
|
||||
Interval: 10 * time.Second,
|
||||
}
|
||||
)
|
||||
migChanPolicy := func(policy *models.ChannelEdgePolicy) error {
|
||||
// If the policy is nil, we can skip it.
|
||||
@@ -386,6 +418,16 @@ func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend,
|
||||
scid, err)
|
||||
}
|
||||
|
||||
s.Do(func() {
|
||||
elapsed := time.Since(t0).Seconds()
|
||||
ratePerSec := float64(chunk) / elapsed
|
||||
log.Debugf("Migrated %d channels (%.2f channels/sec)",
|
||||
channelCount, ratePerSec)
|
||||
|
||||
t0 = time.Now()
|
||||
chunk = 0
|
||||
})
|
||||
|
||||
return nil
|
||||
}, func() {
|
||||
// No reset is needed since if a retry occurs, the entire
|
||||
@@ -544,6 +586,12 @@ func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend,
|
||||
count uint64
|
||||
pruneTipHeight uint32
|
||||
pruneTipHash chainhash.Hash
|
||||
|
||||
t0 = time.Now()
|
||||
chunk uint64
|
||||
s = rate.Sometimes{
|
||||
Interval: 10 * time.Second,
|
||||
}
|
||||
)
|
||||
|
||||
// migrateSinglePruneEntry is a helper function that inserts a single
|
||||
@@ -596,6 +644,17 @@ func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend,
|
||||
height, err)
|
||||
}
|
||||
|
||||
s.Do(func() {
|
||||
elapsed := time.Since(t0).Seconds()
|
||||
ratePerSec := float64(chunk) / elapsed
|
||||
log.Debugf("Migrated %d prune log "+
|
||||
"entries (%.2f entries/sec)",
|
||||
count, ratePerSec)
|
||||
|
||||
t0 = time.Now()
|
||||
chunk = 0
|
||||
})
|
||||
|
||||
return nil
|
||||
},
|
||||
)
|
||||
@@ -715,7 +774,15 @@ func forEachPruneLogEntry(db kvdb.Backend, cb func(height uint32,
|
||||
func migrateClosedSCIDIndex(ctx context.Context, kvBackend kvdb.Backend,
|
||||
sqlDB SQLQueries) error {
|
||||
|
||||
var count uint64
|
||||
var (
|
||||
count uint64
|
||||
|
||||
t0 = time.Now()
|
||||
chunk uint64
|
||||
s = rate.Sometimes{
|
||||
Interval: 10 * time.Second,
|
||||
}
|
||||
)
|
||||
migrateSingleClosedSCID := func(scid lnwire.ShortChannelID) error {
|
||||
count++
|
||||
|
||||
@@ -739,6 +806,16 @@ func migrateClosedSCIDIndex(ctx context.Context, kvBackend kvdb.Backend,
|
||||
"but is not", scid)
|
||||
}
|
||||
|
||||
s.Do(func() {
|
||||
elapsed := time.Since(t0).Seconds()
|
||||
ratePerSec := float64(chunk) / elapsed
|
||||
log.Debugf("Migrated %d closed scids "+
|
||||
"(%.2f entries/sec)", count, ratePerSec)
|
||||
|
||||
t0 = time.Now()
|
||||
chunk = 0
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -766,7 +843,15 @@ func migrateClosedSCIDIndex(ctx context.Context, kvBackend kvdb.Backend,
|
||||
func migrateZombieIndex(ctx context.Context, kvBackend kvdb.Backend,
|
||||
sqlDB SQLQueries) error {
|
||||
|
||||
var count uint64
|
||||
var (
|
||||
count uint64
|
||||
|
||||
t0 = time.Now()
|
||||
chunk uint64
|
||||
s = rate.Sometimes{
|
||||
Interval: 10 * time.Second,
|
||||
}
|
||||
)
|
||||
err := forEachZombieEntry(kvBackend, func(chanID uint64, pubKey1,
|
||||
pubKey2 [33]byte) error {
|
||||
|
||||
@@ -820,6 +905,16 @@ func migrateZombieIndex(ctx context.Context, kvBackend kvdb.Backend,
|
||||
"a zombie, but is not", chanID)
|
||||
}
|
||||
|
||||
s.Do(func() {
|
||||
elapsed := time.Since(t0).Seconds()
|
||||
ratePerSec := float64(chunk) / elapsed
|
||||
log.Debugf("Migrated %d zombie index entries "+
|
||||
"(%.2f entries/sec)", count, ratePerSec)
|
||||
|
||||
t0 = time.Now()
|
||||
chunk = 0
|
||||
})
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
|
2
log.go
2
log.go
@@ -203,7 +203,7 @@ func SetupLoggers(root *build.SubLoggerManager, interceptor signal.Interceptor)
|
||||
AddSubLogger(
|
||||
root, blindedpath.Subsystem, interceptor, blindedpath.UseLogger,
|
||||
)
|
||||
AddV1SubLogger(root, graphdb.Subsystem, interceptor, graphdb.UseLogger)
|
||||
AddSubLogger(root, graphdb.Subsystem, interceptor, graphdb.UseLogger)
|
||||
AddSubLogger(root, chainio.Subsystem, interceptor, chainio.UseLogger)
|
||||
AddSubLogger(root, msgmux.Subsystem, interceptor, msgmux.UseLogger)
|
||||
AddSubLogger(root, sqldb.Subsystem, interceptor, sqldb.UseLogger)
|
||||
|
Reference in New Issue
Block a user