package graphdb import ( "context" "database/sql" "errors" "fmt" "os" "path" "sync" "testing" "time" "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btclog/v2" "github.com/lightningnetwork/lnd/batch" "github.com/lightningnetwork/lnd/graph/db/models" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/kvdb/postgres" "github.com/lightningnetwork/lnd/kvdb/sqlbase" "github.com/lightningnetwork/lnd/kvdb/sqlite" "github.com/lightningnetwork/lnd/sqldb" "github.com/stretchr/testify/require" "golang.org/x/time/rate" ) // Here we define various database paths, connection strings and file names that // we will use to open the database connections. These should be changed to // point to your actual local test databases. const ( bboltDBPath = "testdata/kvdb" kvdbSqlitePath = "testdata/kvdb" nativeSQLSqlitePath = "testdata" kvdbPostgresDNS = "postgres://test@localhost/graphbenchmark_kvdb" nativeSQLPostgresDNS = "postgres://test@localhost/graphbenchmark" kvdbSqliteFile = "channel.sqlite" kvdbBBoltFile = "channel.db" nativeSQLSqliteFile = "lnd.sqlite" testMaxSQLiteConnections = 2 testMaxPostgresConnections = 50 testSQLBusyTimeout = 5 * time.Second ) // Here we define some variables that will be used to configure the graph stores // we open for testing. These can be modified to suit your testing needs. var ( // dbTestChain is the chain hash used for initialising the test // databases. This should be changed to match the chain hash of the // database you are testing against. dbTestChain = *chaincfg.MainNetParams.GenesisHash // testStoreOptions is used to configure the graph stores we open for // testing. testStoreOptions = []StoreOptionModifier{ WithBatchCommitInterval(500 * time.Millisecond), } // testSQLPaginationCfg is used to configure the pagination settings for // the SQL stores we open for testing. testSQLPaginationCfg = sqldb.DefaultPagedQueryConfig() // testSqlitePragmaOpts is a set of SQLite pragma options that we apply // to the SQLite databases we open for testing. testSqlitePragmaOpts = []string{ "synchronous=full", "auto_vacuum=incremental", "fullfsync=true", } ) // dbConnection is a struct that holds the name of the database connection // and a function to open the connection. type dbConnection struct { name string open func(testing.TB) V1Store } // This var block defines the various database connections that we will use // for testing. Each connection is defined as a dbConnection struct that // contains a name and an open function. The open function is used to create // a new V1Store instance for the given database type. var ( // kvdbBBoltConn is a connection to a kvdb-bbolt database called // channel.db. kvdbBBoltConn = dbConnection{ name: "kvdb-bbolt", open: func(b testing.TB) V1Store { return connectBBoltDB(b, bboltDBPath, kvdbBBoltFile) }, } // kvdbSqliteConn is a connection to a kvdb-sqlite database called // channel.sqlite. kvdbSqliteConn = dbConnection{ name: "kvdb-sqlite", open: func(b testing.TB) V1Store { return connectKVDBSqlite( b, kvdbSqlitePath, kvdbSqliteFile, ) }, } // nativeSQLSqliteConn is a connection to a native SQL sqlite database // called lnd.sqlite. nativeSQLSqliteConn = dbConnection{ name: "native-sqlite", open: func(b testing.TB) V1Store { return connectNativeSQLite( b, nativeSQLSqlitePath, nativeSQLSqliteFile, ) }, } // kvdbPostgresConn is a connection to a kvdb-postgres database // using a postgres connection string. kvdbPostgresConn = dbConnection{ name: "kvdb-postgres", open: func(b testing.TB) V1Store { return connectKVDBPostgres(b, kvdbPostgresDNS) }, } // nativeSQLPostgresConn is a connection to a native SQL postgres // database using a postgres connection string. nativeSQLPostgresConn = dbConnection{ name: "native-postgres", open: func(b testing.TB) V1Store { return connectNativePostgres(b, nativeSQLPostgresDNS) }, } ) // connectNativePostgres creates a V1Store instance backed by a native Postgres // database for testing purposes. func connectNativePostgres(t testing.TB, dsn string) V1Store { return newSQLStore(t, sqlPostgres(t, dsn)) } // sqlPostgres creates a sqldb.DB instance backed by a native Postgres database // for testing purposes. func sqlPostgres(t testing.TB, dsn string) BatchedSQLQueries { store, err := sqldb.NewPostgresStore(&sqldb.PostgresConfig{ Dsn: dsn, MaxConnections: testMaxPostgresConnections, }) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, store.Close()) }) return newSQLExecutor(t, store) } // connectNativeSQLite creates a V1Store instance backed by a native SQLite // database for testing purposes. func connectNativeSQLite(t testing.TB, dbPath, file string) V1Store { return newSQLStore(t, sqlSQLite(t, dbPath, file)) } // sqlSQLite creates a sqldb.DB instance backed by a native SQLite database for // testing purposes. func sqlSQLite(t testing.TB, dbPath, file string) BatchedSQLQueries { store, err := sqldb.NewSqliteStore( &sqldb.SqliteConfig{ MaxConnections: testMaxSQLiteConnections, BusyTimeout: testSQLBusyTimeout, PragmaOptions: testSqlitePragmaOpts, }, path.Join(dbPath, file), ) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, store.Close()) }) return newSQLExecutor(t, store) } // kvdbPostgres creates a kvdb.Backend instance backed by a kvdb-postgres // database for testing purposes. func kvdbPostgres(t testing.TB, dsn string) kvdb.Backend { kvStore, err := kvdb.Open( kvdb.PostgresBackendName, context.Background(), &postgres.Config{ Dsn: dsn, MaxConnections: testMaxPostgresConnections, }, // NOTE: we use the raw string here else we get an // import cycle if we try to import lncfg.NSChannelDB. "channeldb", ) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, kvStore.Close()) }) return kvStore } // connectKVDBPostgres creates a V1Store instance backed by a kvdb-postgres // database for testing purposes. func connectKVDBPostgres(t testing.TB, dsn string) V1Store { return newKVStore(t, kvdbPostgres(t, dsn)) } // kvdbSqlite creates a kvdb.Backend instance backed by a kvdb-sqlite // database for testing purposes. func kvdbSqlite(t testing.TB, dbPath, fileName string) kvdb.Backend { sqlbase.Init(testMaxSQLiteConnections) kvStore, err := kvdb.Open( kvdb.SqliteBackendName, context.Background(), &sqlite.Config{ BusyTimeout: testSQLBusyTimeout, MaxConnections: testMaxSQLiteConnections, PragmaOptions: testSqlitePragmaOpts, }, dbPath, fileName, // NOTE: we use the raw string here else we get an // import cycle if we try to import lncfg.NSChannelDB. "channeldb", ) require.NoError(t, err) return kvStore } // connectKVDBSqlite creates a V1Store instance backed by a kvdb-sqlite // database for testing purposes. func connectKVDBSqlite(t testing.TB, dbPath, fileName string) V1Store { return newKVStore(t, kvdbSqlite(t, dbPath, fileName)) } // connectBBoltDB creates a new BBolt database connection for testing. func connectBBoltDB(t testing.TB, dbPath, fileName string) V1Store { cfg := &kvdb.BoltBackendConfig{ DBPath: dbPath, DBFileName: fileName, NoFreelistSync: true, AutoCompact: false, AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge, DBTimeout: kvdb.DefaultDBTimeout, } kvStore, err := kvdb.GetBoltBackend(cfg) require.NoError(t, err) return newKVStore(t, kvStore) } // newKVStore creates a new KVStore instance for testing using a provided // kvdb.Backend instance. func newKVStore(t testing.TB, backend kvdb.Backend) V1Store { store, err := NewKVStore(backend, testStoreOptions...) require.NoError(t, err) return store } // newSQLExecutor creates a new BatchedSQLQueries instance for testing using a // provided sqldb.DB instance. func newSQLExecutor(t testing.TB, db sqldb.DB) BatchedSQLQueries { err := db.ApplyAllMigrations( context.Background(), sqldb.GetMigrations(), ) require.NoError(t, err) return sqldb.NewTransactionExecutor( db.GetBaseDB(), func(tx *sql.Tx) SQLQueries { return db.GetBaseDB().WithTx(tx) }, ) } // newSQLStore creates a new SQLStore instance for testing using a provided // sqldb.DB instance. func newSQLStore(t testing.TB, db BatchedSQLQueries) V1Store { store, err := NewSQLStore( &SQLStoreConfig{ ChainHash: dbTestChain, PaginationCfg: testSQLPaginationCfg, }, db, testStoreOptions..., ) require.NoError(t, err) return store } // TestPopulateDBs is a helper test that can be used to populate various local // graph DBs from some source graph DB. This can then be used to run the // various benchmark tests against the same graph data. // // TODO(elle): this test reveals that the batching logic we use might be // problematic for postgres backends. This needs some investigation & it might // make sense to only use LazyAdd for sqlite backends & it may make sense to // also add a maximum batch size to avoid grouping too many updates at once. // Observations: // - the LazyAdd options need to be turned off for channel/policy update calls // for both the native SQL postgres & kvdb postgres backend for this test to // succeed. // - The LazyAdd option must be added for the sqlite backends, else it takes // very long to sync the graph. func TestPopulateDBs(t *testing.T) { t.Parallel() ctx := context.Background() // NOTE: uncomment the line below to run this test locally, then provide // the desired source database (and make sure the destination Postgres // databases exist and are running). t.Skipf("Skipping local helper test") // Set your desired source database here. For kvdbSqliteConn, a file // called testdata/kvdb/channel.sqlite must exist and be populated with // a KVDB based channel graph. sourceDB := kvdbSqliteConn // Populate this list with the desired destination databases. destinations := []dbConnection{ kvdbBBoltConn, nativeSQLSqliteConn, kvdbPostgresConn, nativeSQLPostgresConn, } // Open and start the source graph. src, err := NewChannelGraph(sourceDB.open(t)) require.NoError(t, err) require.NoError(t, src.Start()) t.Cleanup(func() { require.NoError(t, src.Stop()) }) // countNodes is a helper function to count the number of nodes in the // graph. countNodes := func(graph *ChannelGraph) int { numNodes := 0 err := graph.ForEachNode(ctx, func(tx NodeRTx) error { numNodes++ return nil }, func() { numNodes = 0 }) require.NoError(t, err) return numNodes } // countChannels is a helper function to count the number of channels // in the graph. countChannels := func(graph *ChannelGraph) (int, int) { var ( numChans = 0 numPolicies = 0 ) err := graph.ForEachChannel( ctx, func(info *models.ChannelEdgeInfo, policy, policy2 *models.ChannelEdgePolicy) error { numChans++ if policy != nil { numPolicies++ } if policy2 != nil { numPolicies++ } return nil }, func() { numChans = 0 numPolicies = 0 }) require.NoError(t, err) return numChans, numPolicies } t.Logf("Number of nodes in source graph (%s): %d", sourceDB.name, countNodes(src)) numChan, numPol := countChannels(src) t.Logf("Number of channels & policies in source graph (%s): %d "+ "channels, %d policies", sourceDB.name, numChan, numPol) for _, destDB := range destinations { t.Run(destDB.name, func(t *testing.T) { t.Parallel() // Open and start the destination graph. dest, err := NewChannelGraph(destDB.open(t)) require.NoError(t, err) require.NoError(t, dest.Start()) t.Cleanup(func() { require.NoError(t, dest.Stop()) }) t.Logf("Number of nodes in %s graph: %d", destDB.name, countNodes(dest)) numChan, numPol := countChannels(dest) t.Logf("Number of channels in %s graph: %d, %d", destDB.name, numChan, numPol) // Sync the source graph to the destination graph. syncGraph(t, src, dest) t.Logf("Number of nodes in %s graph after sync: %d", destDB.name, countNodes(dest)) numChan, numPol = countChannels(dest) t.Logf("Number of channels in %s graph after sync: "+ "%d, %d", destDB.name, numChan, numPol) }) } } // TestPopulateViaMigration is a helper test that can be used to populate a // local native SQL graph from a kvdb-sql graph using the migration logic. // // NOTE: the testPostgres variable can be set to true to test with a // postgres backend instead of the kvdb-sqlite backend. // // NOTE: this is a helper test and is not run by default. // // TODO(elle): this test reveals tht there may be an issue with the postgres // migration as it is super slow. func TestPopulateViaMigration(t *testing.T) { t.Skipf("Skipping local helper test") // Set this to true if you want to test with a postgres backend. // By default, we use a kvdb-sqlite backend. testPostgres := false ctx := context.Background() // Set up a logger so we can see the migration progress. logger := btclog.NewDefaultHandler(os.Stdout) UseLogger(btclog.NewSLogger(logger)) log.SetLevel(btclog.LevelDebug) var ( srcKVDB = kvdbSqlite(t, kvdbSqlitePath, kvdbSqliteFile) dstSQL = sqlSQLite(t, nativeSQLSqlitePath, nativeSQLSqliteFile) ) if testPostgres { srcKVDB = kvdbPostgres(t, kvdbPostgresDNS) dstSQL = sqlPostgres(t, nativeSQLPostgresDNS) } // Use the graph migration to populate the SQL graph from the // kvdb graph. err := dstSQL.ExecTx( ctx, sqldb.WriteTxOpt(), func(queries SQLQueries) error { return MigrateGraphToSQL( ctx, srcKVDB, queries, dbTestChain, ) }, func() {}, ) require.NoError(t, err) } // syncGraph synchronizes the source graph with the destination graph by // copying all nodes and channels from the source to the destination. func syncGraph(t *testing.T, src, dest *ChannelGraph) { ctx := context.Background() var ( s = rate.Sometimes{ Interval: 10 * time.Second, } t0 = time.Now() chunk = 0 total = 0 mu sync.Mutex ) reportNodeStats := func() { elapsed := time.Since(t0).Seconds() ratePerSec := float64(chunk) / elapsed t.Logf("Synced %d nodes (last chunk: %d) "+ "(%.2f nodes/second)", total, chunk, ratePerSec) t0 = time.Now() } var wgNodes sync.WaitGroup err := src.ForEachNode(ctx, func(tx NodeRTx) error { wgNodes.Add(1) go func() { defer wgNodes.Done() // NOTE: even though the transaction (tx) may have // already been aborted, it is still ok to use the // Node() result since that is a static object that // is not affected by the transaction state. err := dest.AddLightningNode( ctx, tx.Node(), batch.LazyAdd(), ) require.NoError(t, err) mu.Lock() total++ chunk++ s.Do(func() { reportNodeStats() chunk = 0 }) mu.Unlock() }() return nil }, func() {}) require.NoError(t, err) wgNodes.Wait() reportNodeStats() t.Logf("Done syncing %d nodes", total) total = 0 chunk = 0 t0 = time.Now() reportChanStats := func() { elapsed := time.Since(t0).Seconds() ratePerSec := float64(chunk) / elapsed t.Logf("Synced %d channels (and its "+ "policies) (last chunk: %d) "+ "(%.2f channels/second)", total, chunk, ratePerSec) t0 = time.Now() } var wgChans sync.WaitGroup err = src.ForEachChannel(ctx, func(info *models.ChannelEdgeInfo, policy1, policy2 *models.ChannelEdgePolicy) error { // Add each channel & policy. We do this in a goroutine to // take advantage of batch processing. wgChans.Add(1) go func() { defer wgChans.Done() err := dest.AddChannelEdge( ctx, info, batch.LazyAdd(), ) if !errors.Is(err, ErrEdgeAlreadyExist) { require.NoError(t, err) } if policy1 != nil { err = dest.UpdateEdgePolicy( ctx, policy1, batch.LazyAdd(), ) require.NoError(t, err) } if policy2 != nil { err = dest.UpdateEdgePolicy( ctx, policy2, batch.LazyAdd(), ) require.NoError(t, err) } mu.Lock() total++ chunk++ s.Do(func() { reportChanStats() chunk = 0 }) mu.Unlock() }() return nil }, func() {}) require.NoError(t, err) wgChans.Wait() reportChanStats() t.Logf("Done syncing %d channels", total) } // BenchmarkCacheLoading benchmarks how long it takes to load the in-memory // graph cache from a populated database. // // NOTE: this is to be run against a local graph database. It can be run // either against a kvdb-bbolt channel.db file, or a kvdb-sqlite channel.sqlite // file or a postgres connection containing the channel graph in kvdb format and // finally, it can be run against a native SQL sqlite or postgres database. // // NOTE: the TestPopulateDBs test helper can be used to populate a set of test // DBs from a single source db. func BenchmarkCacheLoading(b *testing.B) { ctx := context.Background() tests := []dbConnection{ kvdbBBoltConn, kvdbSqliteConn, nativeSQLSqliteConn, kvdbPostgresConn, nativeSQLPostgresConn, } for _, test := range tests { b.Run(test.name, func(b *testing.B) { store := test.open(b) // Reset timer to exclude setup time. b.ResetTimer() for i := 0; i < b.N; i++ { b.StopTimer() graph, err := NewChannelGraph(store) require.NoError(b, err) b.StartTimer() require.NoError(b, graph.populateCache(ctx)) } }) } } // BenchmarkGraphReadMethods benchmarks various read calls of various V1Store // implementations. // // NOTE: this is to be run against a local graph database. It can be run // either against a kvdb-bbolt channel.db file, or a kvdb-sqlite channel.sqlite // file or a postgres connection containing the channel graph in kvdb format and // finally, it can be run against a native SQL sqlite or postgres database. // // NOTE: the TestPopulateDBs test helper can be used to populate a set of test // DBs from a single source db. func BenchmarkGraphReadMethods(b *testing.B) { ctx := context.Background() backends := []dbConnection{ kvdbBBoltConn, kvdbSqliteConn, nativeSQLSqliteConn, kvdbPostgresConn, nativeSQLPostgresConn, } tests := []struct { name string fn func(b testing.TB, store V1Store) }{ { name: "ForEachNode", fn: func(b testing.TB, store V1Store) { err := store.ForEachNode( ctx, func(_ NodeRTx) error { return nil }, func() {}, ) require.NoError(b, err) }, }, { name: "ForEachChannel", fn: func(b testing.TB, store V1Store) { //nolint:ll err := store.ForEachChannel( ctx, func(_ *models.ChannelEdgeInfo, _ *models.ChannelEdgePolicy, _ *models.ChannelEdgePolicy) error { return nil }, func() {}, ) require.NoError(b, err) }, }, { name: "NodeUpdatesInHorizon", fn: func(b testing.TB, store V1Store) { _, err := store.NodeUpdatesInHorizon( time.Unix(0, 0), time.Now(), ) require.NoError(b, err) }, }, } for _, test := range tests { for _, db := range backends { name := fmt.Sprintf("%s-%s", test.name, db.name) b.Run(name, func(b *testing.B) { store := db.open(b) // Reset timer to exclude setup time. b.ResetTimer() for i := 0; i < b.N; i++ { test.fn(b, store) } }) } } }