diff --git a/config_builder.go b/config_builder.go index 35a44e2c6..28e8dd426 100644 --- a/config_builder.go +++ b/config_builder.go @@ -1130,9 +1130,8 @@ func (d *DefaultDatabaseBuilder) BuildDatabase( continue } - migFn, ok := getSQLMigration( + migFn, ok := d.getSQLMigration( ctx, version, dbs.ChanStateDB.Backend, - *d.cfg.ActiveNetParams.GenesisHash, ) if !ok { continue diff --git a/config_prod.go b/config_prod.go index f593340c4..e9f320256 100644 --- a/config_prod.go +++ b/config_prod.go @@ -5,7 +5,6 @@ package lnd import ( "context" - "github.com/btcsuite/btcd/chaincfg/chainhash" graphdb "github.com/lightningnetwork/lnd/graph/db" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/sqldb" @@ -30,9 +29,9 @@ func (d *DefaultDatabaseBuilder) getGraphStore(_ *sqldb.BaseDB, // NOTE: this is a no-op for the production build since all migrations that are // in production will also be in development builds, and so they are not // defined behind a build tag. -func getSQLMigration(ctx context.Context, version int, - kvBackend kvdb.Backend, - chain chainhash.Hash) (func(tx *sqlc.Queries) error, bool) { +func (d *DefaultDatabaseBuilder) getSQLMigration(ctx context.Context, + version int, kvBackend kvdb.Backend) (func(tx *sqlc.Queries) error, + bool) { return nil, false } diff --git a/config_test_native_sql.go b/config_test_native_sql.go index 8ee51d4e7..01baacfae 100644 --- a/config_test_native_sql.go +++ b/config_test_native_sql.go @@ -7,7 +7,6 @@ import ( "database/sql" "fmt" - "github.com/btcsuite/btcd/chaincfg/chainhash" graphdb "github.com/lightningnetwork/lnd/graph/db" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lncfg" @@ -50,15 +49,23 @@ func (d *DefaultDatabaseBuilder) getGraphStore(baseDB *sqldb.BaseDB, const graphSQLMigration = 9 // getSQLMigration returns a migration function for the given version. -func getSQLMigration(ctx context.Context, version int, - kvBackend kvdb.Backend, - chain chainhash.Hash) (func(tx *sqlc.Queries) error, bool) { +func (d *DefaultDatabaseBuilder) getSQLMigration(ctx context.Context, + version int, kvBackend kvdb.Backend) (func(tx *sqlc.Queries) error, + bool) { + + cfg := &graphdb.SQLStoreConfig{ + ChainHash: *d.cfg.ActiveNetParams.GenesisHash, + QueryCfg: &d.cfg.DB.Sqlite.QueryConfig, + } + if d.cfg.DB.Backend == lncfg.PostgresBackend { + cfg.QueryCfg = &d.cfg.DB.Postgres.QueryConfig + } switch version { case graphSQLMigration: return func(tx *sqlc.Queries) error { err := graphdb.MigrateGraphToSQL( - ctx, kvBackend, tx, chain, + ctx, cfg, kvBackend, tx, ) if err != nil { return fmt.Errorf("failed to migrate graph "+ diff --git a/graph/db/benchmark_test.go b/graph/db/benchmark_test.go index 783a2ce08..74cf68c84 100644 --- a/graph/db/benchmark_test.go +++ b/graph/db/benchmark_test.go @@ -449,10 +449,12 @@ func TestPopulateViaMigration(t *testing.T) { log.SetLevel(btclog.LevelDebug) var ( + cfg = sqldb.DefaultSQLiteConfig() srcKVDB = kvdbSqlite(t, kvdbSqlitePath, kvdbSqliteFile) dstSQL = sqlSQLite(t, nativeSQLSqlitePath, nativeSQLSqliteFile) ) if testPostgres { + cfg = sqldb.DefaultPostgresConfig() srcKVDB = kvdbPostgres(t, kvdbPostgresDNS) dstSQL = sqlPostgres(t, nativeSQLPostgresDNS) } @@ -462,7 +464,10 @@ func TestPopulateViaMigration(t *testing.T) { err := dstSQL.ExecTx( ctx, sqldb.WriteTxOpt(), func(queries SQLQueries) error { return MigrateGraphToSQL( - ctx, srcKVDB, queries, dbTestChain, + ctx, &SQLStoreConfig{ + QueryCfg: cfg, + ChainHash: dbTestChain, + }, srcKVDB, queries, ) }, func() {}, ) diff --git a/graph/db/sql_migration.go b/graph/db/sql_migration.go index 4d337ad94..9b22fbf03 100644 --- a/graph/db/sql_migration.go +++ b/graph/db/sql_migration.go @@ -26,8 +26,8 @@ import ( // NOTE: this is currently not called from any code path. It is called via tests // only for now and will be called from the main lnd binary once the // migration is fully implemented and tested. -func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend, - sqlDB SQLQueries, chain chainhash.Hash) error { +func MigrateGraphToSQL(ctx context.Context, cfg *SQLStoreConfig, + kvBackend kvdb.Backend, sqlDB SQLQueries) error { log.Infof("Starting migration of the graph store from KV to SQL") t0 := time.Now() @@ -43,7 +43,8 @@ func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend, } // 1) Migrate all the nodes. - if err := migrateNodes(ctx, kvBackend, sqlDB); err != nil { + err = migrateNodes(ctx, cfg.QueryCfg, kvBackend, sqlDB) + if err != nil { return fmt.Errorf("could not migrate nodes: %w", err) } @@ -53,7 +54,7 @@ func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend, } // 3) Migrate all the channels and channel policies. - err = migrateChannelsAndPolicies(ctx, kvBackend, sqlDB, chain) + err = migrateChannelsAndPolicies(ctx, cfg, kvBackend, sqlDB) if err != nil { return fmt.Errorf("could not migrate channels and policies: %w", err) @@ -108,8 +109,8 @@ func checkGraphExists(db kvdb.Backend) (bool, error) { // migrateNodes migrates all nodes from the KV backend to the SQL database. // This includes doing a sanity check after each migration to ensure that the // migrated node matches the original node. -func migrateNodes(ctx context.Context, kvBackend kvdb.Backend, - sqlDB SQLQueries) error { +func migrateNodes(ctx context.Context, cfg *sqldb.QueryConfig, + kvBackend kvdb.Backend, sqlDB SQLQueries) error { // Keep track of the number of nodes migrated and the number of // nodes skipped due to errors. @@ -192,7 +193,7 @@ func migrateNodes(ctx context.Context, kvBackend kvdb.Backend, pub, id, dbNode.ID) } - migratedNode, err := buildNode(ctx, sqlDB, dbNode) + migratedNode, err := buildNode(ctx, cfg, sqlDB, dbNode) if err != nil { return fmt.Errorf("could not build migrated node "+ "from dbNode(db id: %d, node pub: %x): %w", @@ -320,8 +321,8 @@ func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend, // migrateChannelsAndPolicies migrates all channels and their policies // from the KV backend to the SQL database. -func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend, - sqlDB SQLQueries, chain chainhash.Hash) error { +func migrateChannelsAndPolicies(ctx context.Context, cfg *SQLStoreConfig, + kvBackend kvdb.Backend, sqlDB SQLQueries) error { var ( channelCount uint64 @@ -373,9 +374,10 @@ func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend, // info, but rather rely on the chain hash LND is running with. // So this is our way of ensuring that LND is running on the // correct network at migration time. - if channel.ChainHash != chain { + if channel.ChainHash != cfg.ChainHash { return fmt.Errorf("channel %d has chain hash %s, "+ - "expected %s", scid, channel.ChainHash, chain) + "expected %s", scid, channel.ChainHash, + cfg.ChainHash) } // Sanity check to ensure that the channel has valid extra @@ -413,7 +415,8 @@ func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend, chunk++ err = migrateSingleChannel( - ctx, sqlDB, channel, policy1, policy2, migChanPolicy, + ctx, cfg, sqlDB, channel, policy1, policy2, + migChanPolicy, ) if err != nil { return fmt.Errorf("could not migrate channel %d: %w", @@ -448,8 +451,8 @@ func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend, return nil } -func migrateSingleChannel(ctx context.Context, sqlDB SQLQueries, - channel *models.ChannelEdgeInfo, +func migrateSingleChannel(ctx context.Context, cfg *SQLStoreConfig, + sqlDB SQLQueries, channel *models.ChannelEdgeInfo, policy1, policy2 *models.ChannelEdgePolicy, migChanPolicy func(*models.ChannelEdgePolicy) error) error { @@ -508,7 +511,7 @@ func migrateSingleChannel(ctx context.Context, sqlDB SQLQueries, } migChan, migPol1, migPol2, err := getAndBuildChanAndPolicies( - ctx, sqlDB, row, channel.ChainHash, + ctx, cfg, sqlDB, row, ) if err != nil { return fmt.Errorf("could not build migrated channel and "+ @@ -703,9 +706,9 @@ func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend, // getAndBuildChanAndPolicies is a helper that builds the channel edge info // and policies from the given row returned by the SQL query // GetChannelBySCIDWithPolicies. -func getAndBuildChanAndPolicies(ctx context.Context, db SQLQueries, - row sqlc.GetChannelBySCIDWithPoliciesRow, - chain chainhash.Hash) (*models.ChannelEdgeInfo, +func getAndBuildChanAndPolicies(ctx context.Context, cfg *SQLStoreConfig, + db SQLQueries, + row sqlc.GetChannelBySCIDWithPoliciesRow) (*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) { node1, node2, err := buildNodeVertices( @@ -716,7 +719,7 @@ func getAndBuildChanAndPolicies(ctx context.Context, db SQLQueries, } edge, err := getAndBuildEdgeInfo( - ctx, db, chain, row.GraphChannel, node1, node2, + ctx, cfg, db, row.GraphChannel, node1, node2, ) if err != nil { return nil, nil, nil, fmt.Errorf("unable to build channel "+ @@ -730,7 +733,8 @@ func getAndBuildChanAndPolicies(ctx context.Context, db SQLQueries, } policy1, policy2, err := getAndBuildChanPolicies( - ctx, db, dbPol1, dbPol2, edge.ChannelID, node1, node2, + ctx, cfg.QueryCfg, db, dbPol1, dbPol2, edge.ChannelID, node1, + node2, ) if err != nil { return nil, nil, nil, fmt.Errorf("unable to build channel "+ diff --git a/graph/db/sql_migration_test.go b/graph/db/sql_migration_test.go index d134b2fb1..c555e433d 100644 --- a/graph/db/sql_migration_test.go +++ b/graph/db/sql_migration_test.go @@ -333,9 +333,7 @@ func TestMigrateGraphToSQL(t *testing.T) { require.True(t, ok) // Run the migration. - err := MigrateGraphToSQL( - ctx, kvDB.db, sql.db, testChain, - ) + err := MigrateGraphToSQL(ctx, sql.cfg, kvDB.db, sql.db) require.NoError(t, err) // Validate that the two databases are now in sync. @@ -753,6 +751,18 @@ func TestMigrationWithChannelDB(t *testing.T) { // bbolt channel.db file, set this to "channel.db". const fileName = "channel.sqlite" + cfg := &SQLStoreConfig{ + ChainHash: chain, + QueryCfg: sqldb.DefaultPostgresConfig(), + } + + // Determine if we are using a SQLite file or a Bolt DB file. + var isSqlite bool + if strings.HasSuffix(fileName, ".sqlite") { + isSqlite = true + cfg.QueryCfg = sqldb.DefaultSQLiteConfig() + } + // Set up logging for the test. UseLogger(btclog.NewSLogger(btclog.NewDefaultHandler(os.Stdout))) @@ -763,7 +773,7 @@ func TestMigrationWithChannelDB(t *testing.T) { err := graphStore.ExecTx( ctx, sqldb.WriteTxOpt(), func(tx SQLQueries) error { return MigrateGraphToSQL( - ctx, kvBackend, tx, chain, + ctx, cfg, kvBackend, tx, ) }, sqldb.NoOpReset, ) @@ -825,12 +835,6 @@ func TestMigrationWithChannelDB(t *testing.T) { }, } - // Determine if we are using a SQLite file or a Bolt DB file. - var isSqlite bool - if strings.HasSuffix(fileName, ".sqlite") { - isSqlite = true - } - for _, test := range tests { t.Run(test.name, func(t *testing.T) { chanDBPath := path.Join(test.dbPath, fileName) @@ -1125,7 +1129,7 @@ func runTestMigration(t *testing.T, populateKV func(t *testing.T, db *KVStore), // Run the migration. err := MigrateGraphToSQL( - ctx, kvDB.db, sql.db, testChain, + ctx, sql.cfg, kvDB.db, sql.db, ) require.NoError(t, err) @@ -1312,7 +1316,7 @@ func testMigrateGraphToSQLRapidOnce(t *testing.T, rt *rapid.T, } // Run the migration. - err := MigrateGraphToSQL(ctx, kvDB.db, sql.db, testChain) + err := MigrateGraphToSQL(ctx, sql.cfg, kvDB.db, sql.db) require.NoError(t, err) // Create a slice of all nodes. diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 3276f67d7..1c913b78c 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -255,7 +255,7 @@ func (s *SQLStore) FetchLightningNode(ctx context.Context, var node *models.LightningNode err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { var err error - _, node, err = getNodeByPubKey(ctx, db, pubKey) + _, node, err = getNodeByPubKey(ctx, s.cfg.QueryCfg, db, pubKey) return err }, sqldb.NoOpReset) @@ -483,7 +483,7 @@ func (s *SQLStore) SourceNode(ctx context.Context) (*models.LightningNode, err) } - _, node, err = getNodeByPubKey(ctx, db, nodePub) + _, node, err = getNodeByPubKey(ctx, s.cfg.QueryCfg, db, nodePub) return err }, sqldb.NoOpReset) @@ -779,7 +779,7 @@ func (s *SQLStore) ForEachSourceNodeChannel(ctx context.Context, } _, otherNode, err := getNodeByPubKey( - ctx, db, otherNodePub, + ctx, s.cfg.QueryCfg, db, otherNodePub, ) if err != nil { return fmt.Errorf("unable to fetch "+ @@ -1761,8 +1761,7 @@ func (s *SQLStore) FetchChannelEdgesByID(chanID uint64) ( } edge, err = getAndBuildEdgeInfo( - ctx, db, s.cfg.ChainHash, row.GraphChannel, node1, - node2, + ctx, s.cfg, db, row.GraphChannel, node1, node2, ) if err != nil { return fmt.Errorf("unable to build channel info: %w", @@ -1776,7 +1775,8 @@ func (s *SQLStore) FetchChannelEdgesByID(chanID uint64) ( } policy1, policy2, err = getAndBuildChanPolicies( - ctx, db, dbPol1, dbPol2, edge.ChannelID, node1, node2, + ctx, s.cfg.QueryCfg, db, dbPol1, dbPol2, edge.ChannelID, + node1, node2, ) if err != nil { return fmt.Errorf("unable to build channel "+ @@ -1833,8 +1833,7 @@ func (s *SQLStore) FetchChannelEdgesByOutpoint(op *wire.OutPoint) ( } edge, err = getAndBuildEdgeInfo( - ctx, db, s.cfg.ChainHash, row.GraphChannel, node1, - node2, + ctx, s.cfg, db, row.GraphChannel, node1, node2, ) if err != nil { return fmt.Errorf("unable to build channel info: %w", @@ -1848,7 +1847,8 @@ func (s *SQLStore) FetchChannelEdgesByOutpoint(op *wire.OutPoint) ( } policy1, policy2, err = getAndBuildChanPolicies( - ctx, db, dbPol1, dbPol2, edge.ChannelID, node1, node2, + ctx, s.cfg.QueryCfg, db, dbPol1, dbPol2, edge.ChannelID, + node1, node2, ) if err != nil { return fmt.Errorf("unable to build channel "+ @@ -3171,7 +3171,7 @@ func updateChanEdgePolicy(ctx context.Context, tx SQLQueries, } // getNodeByPubKey attempts to look up a target node by its public key. -func getNodeByPubKey(ctx context.Context, db SQLQueries, +func getNodeByPubKey(ctx context.Context, cfg *sqldb.QueryConfig, db SQLQueries, pubKey route.Vertex) (int64, *models.LightningNode, error) { dbNode, err := db.GetNodeByPubKey( @@ -3186,7 +3186,7 @@ func getNodeByPubKey(ctx context.Context, db SQLQueries, return 0, nil, fmt.Errorf("unable to fetch node: %w", err) } - node, err := buildNode(ctx, db, dbNode) + node, err := buildNode(ctx, cfg, db, dbNode) if err != nil { return 0, nil, fmt.Errorf("unable to build node: %w", err) } @@ -3210,14 +3210,9 @@ func buildCacheableChannelInfo(scid []byte, capacity int64, node1Pub, // buildNode constructs a LightningNode instance from the given database node // record. The node's features, addresses and extra signed fields are also // fetched from the database and set on the node. -func buildNode(ctx context.Context, db SQLQueries, +func buildNode(ctx context.Context, cfg *sqldb.QueryConfig, db SQLQueries, dbNode sqlc.GraphNode) (*models.LightningNode, error) { - // NOTE: buildNode is only used to load the data for a single node, and - // so no paged queries will be performed. This means that it's ok to - // used pass in default config values here. - cfg := sqldb.DefaultQueryConfig() - data, err := batchLoadNodeData(ctx, cfg, db, []int64{dbNode.ID}) if err != nil { return nil, fmt.Errorf("unable to batch load node data: %w", @@ -3945,22 +3940,21 @@ func upsertChanPolicyExtraSignedFields(ctx context.Context, db SQLQueries, // getAndBuildEdgeInfo builds a models.ChannelEdgeInfo instance from the // provided dbChanRow and also fetches any other required information // to construct the edge info. -func getAndBuildEdgeInfo(ctx context.Context, db SQLQueries, - chain chainhash.Hash, dbChan sqlc.GraphChannel, node1, +func getAndBuildEdgeInfo(ctx context.Context, cfg *SQLStoreConfig, + db SQLQueries, dbChan sqlc.GraphChannel, node1, node2 route.Vertex) (*models.ChannelEdgeInfo, error) { - // NOTE: getAndBuildEdgeInfo is only used to load the data for a single - // edge, and so no paged queries will be performed. This means that - // it's ok to used pass in default config values here. - cfg := sqldb.DefaultQueryConfig() - - data, err := batchLoadChannelData(ctx, cfg, db, []int64{dbChan.ID}, nil) + data, err := batchLoadChannelData( + ctx, cfg.QueryCfg, db, []int64{dbChan.ID}, nil, + ) if err != nil { return nil, fmt.Errorf("unable to batch load channel data: %w", err) } - return buildEdgeInfoWithBatchData(chain, dbChan, node1, node2, data) + return buildEdgeInfoWithBatchData( + cfg.ChainHash, dbChan, node1, node2, data, + ) } // buildEdgeInfoWithBatchData builds edge info using pre-loaded batch data. @@ -4059,9 +4053,9 @@ func buildNodeVertices(node1Pub, node2Pub []byte) (route.Vertex, // retrieves all the extra info required to build the complete // models.ChannelEdgePolicy types. It returns two policies, which may be nil if // the provided sqlc.GraphChannelPolicy records are nil. -func getAndBuildChanPolicies(ctx context.Context, db SQLQueries, - dbPol1, dbPol2 *sqlc.GraphChannelPolicy, channelID uint64, node1, - node2 route.Vertex) (*models.ChannelEdgePolicy, +func getAndBuildChanPolicies(ctx context.Context, cfg *sqldb.QueryConfig, + db SQLQueries, dbPol1, dbPol2 *sqlc.GraphChannelPolicy, + channelID uint64, node1, node2 route.Vertex) (*models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) { if dbPol1 == nil && dbPol2 == nil { @@ -4076,12 +4070,6 @@ func getAndBuildChanPolicies(ctx context.Context, db SQLQueries, policyIDs = append(policyIDs, dbPol2.ID) } - // NOTE: getAndBuildChanPolicies is only used to load the data for - // a maximum of two policies, and so no paged queries will be - // performed (unless the page size is one). So it's ok to use - // the default config values here. - cfg := sqldb.DefaultQueryConfig() - batchData, err := batchLoadChannelData(ctx, cfg, db, nil, policyIDs) if err != nil { return nil, nil, fmt.Errorf("unable to batch load channel "+ diff --git a/itest/lnd_graph_migration_test.go b/itest/lnd_graph_migration_test.go index 07abc3bb0..652b1e652 100644 --- a/itest/lnd_graph_migration_test.go +++ b/itest/lnd_graph_migration_test.go @@ -142,10 +142,15 @@ func openNativeSQLGraphDB(ht *lntest.HarnessTest, }, ) + queryCfg := sqldb.DefaultSQLiteConfig() + if hn.Cfg.DBBackend != node.BackendSqlite { + queryCfg = sqldb.DefaultPostgresConfig() + } + store, err := graphdb.NewSQLStore( &graphdb.SQLStoreConfig{ ChainHash: *ht.Miner().ActiveNet.GenesisHash, - QueryCfg: sqldb.DefaultQueryConfig(), + QueryCfg: queryCfg, }, executor, ) diff --git a/sqldb/paginate.go b/sqldb/paginate.go index 279893add..17e2fd40e 100644 --- a/sqldb/paginate.go +++ b/sqldb/paginate.go @@ -72,14 +72,6 @@ func (c *QueryConfig) Validate(sqlite bool) error { return nil } -// DefaultQueryConfig returns a default configuration for SQL queries. -func DefaultQueryConfig() *QueryConfig { - return &QueryConfig{ - MaxBatchSize: 250, - MaxPageSize: 10000, - } -} - // DefaultSQLiteConfig returns a default configuration for SQL queries to a // SQLite backend. func DefaultSQLiteConfig() *QueryConfig { diff --git a/sqldb/paginate_test.go b/sqldb/paginate_test.go index 5b0683a06..f62bf65b4 100644 --- a/sqldb/paginate_test.go +++ b/sqldb/paginate_test.go @@ -22,7 +22,7 @@ func TestExecuteBatchQuery(t *testing.T) { t.Run("empty input returns nil", func(t *testing.T) { var ( - cfg = DefaultQueryConfig() + cfg = DefaultSQLiteConfig() inputItems []int ) @@ -144,7 +144,7 @@ func TestExecuteBatchQuery(t *testing.T) { t.Run("query function error is propagated", func(t *testing.T) { var ( - cfg = DefaultQueryConfig() + cfg = DefaultSQLiteConfig() inputItems = []int{1, 2, 3} ) @@ -174,7 +174,7 @@ func TestExecuteBatchQuery(t *testing.T) { t.Run("callback error is propagated", func(t *testing.T) { var ( - cfg = DefaultQueryConfig() + cfg = DefaultSQLiteConfig() inputItems = []int{1, 2, 3} ) @@ -307,7 +307,7 @@ func TestSQLSliceQueries(t *testing.T) { err := ExecuteBatchQuery( ctx, - DefaultQueryConfig(), + DefaultSQLiteConfig(), queryParams, func(s string) string { return s