multi: remove DefaultQueryConfig

And always make use of either the new DefaultSQLiteConfig or
DefaultPostgresConfig.
This commit is contained in:
Elle Mouton
2025-08-13 09:30:09 +02:00
parent 1082eaaeb3
commit b1deddec44
10 changed files with 95 additions and 92 deletions

View File

@@ -1130,9 +1130,8 @@ func (d *DefaultDatabaseBuilder) BuildDatabase(
continue
}
migFn, ok := getSQLMigration(
migFn, ok := d.getSQLMigration(
ctx, version, dbs.ChanStateDB.Backend,
*d.cfg.ActiveNetParams.GenesisHash,
)
if !ok {
continue

View File

@@ -5,7 +5,6 @@ package lnd
import (
"context"
"github.com/btcsuite/btcd/chaincfg/chainhash"
graphdb "github.com/lightningnetwork/lnd/graph/db"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/sqldb"
@@ -30,9 +29,9 @@ func (d *DefaultDatabaseBuilder) getGraphStore(_ *sqldb.BaseDB,
// NOTE: this is a no-op for the production build since all migrations that are
// in production will also be in development builds, and so they are not
// defined behind a build tag.
func getSQLMigration(ctx context.Context, version int,
kvBackend kvdb.Backend,
chain chainhash.Hash) (func(tx *sqlc.Queries) error, bool) {
func (d *DefaultDatabaseBuilder) getSQLMigration(ctx context.Context,
version int, kvBackend kvdb.Backend) (func(tx *sqlc.Queries) error,
bool) {
return nil, false
}

View File

@@ -7,7 +7,6 @@ import (
"database/sql"
"fmt"
"github.com/btcsuite/btcd/chaincfg/chainhash"
graphdb "github.com/lightningnetwork/lnd/graph/db"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lncfg"
@@ -50,15 +49,23 @@ func (d *DefaultDatabaseBuilder) getGraphStore(baseDB *sqldb.BaseDB,
const graphSQLMigration = 9
// getSQLMigration returns a migration function for the given version.
func getSQLMigration(ctx context.Context, version int,
kvBackend kvdb.Backend,
chain chainhash.Hash) (func(tx *sqlc.Queries) error, bool) {
func (d *DefaultDatabaseBuilder) getSQLMigration(ctx context.Context,
version int, kvBackend kvdb.Backend) (func(tx *sqlc.Queries) error,
bool) {
cfg := &graphdb.SQLStoreConfig{
ChainHash: *d.cfg.ActiveNetParams.GenesisHash,
QueryCfg: &d.cfg.DB.Sqlite.QueryConfig,
}
if d.cfg.DB.Backend == lncfg.PostgresBackend {
cfg.QueryCfg = &d.cfg.DB.Postgres.QueryConfig
}
switch version {
case graphSQLMigration:
return func(tx *sqlc.Queries) error {
err := graphdb.MigrateGraphToSQL(
ctx, kvBackend, tx, chain,
ctx, cfg, kvBackend, tx,
)
if err != nil {
return fmt.Errorf("failed to migrate graph "+

View File

@@ -449,10 +449,12 @@ func TestPopulateViaMigration(t *testing.T) {
log.SetLevel(btclog.LevelDebug)
var (
cfg = sqldb.DefaultSQLiteConfig()
srcKVDB = kvdbSqlite(t, kvdbSqlitePath, kvdbSqliteFile)
dstSQL = sqlSQLite(t, nativeSQLSqlitePath, nativeSQLSqliteFile)
)
if testPostgres {
cfg = sqldb.DefaultPostgresConfig()
srcKVDB = kvdbPostgres(t, kvdbPostgresDNS)
dstSQL = sqlPostgres(t, nativeSQLPostgresDNS)
}
@@ -462,7 +464,10 @@ func TestPopulateViaMigration(t *testing.T) {
err := dstSQL.ExecTx(
ctx, sqldb.WriteTxOpt(), func(queries SQLQueries) error {
return MigrateGraphToSQL(
ctx, srcKVDB, queries, dbTestChain,
ctx, &SQLStoreConfig{
QueryCfg: cfg,
ChainHash: dbTestChain,
}, srcKVDB, queries,
)
}, func() {},
)

View File

@@ -26,8 +26,8 @@ import (
// NOTE: this is currently not called from any code path. It is called via tests
// only for now and will be called from the main lnd binary once the
// migration is fully implemented and tested.
func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend,
sqlDB SQLQueries, chain chainhash.Hash) error {
func MigrateGraphToSQL(ctx context.Context, cfg *SQLStoreConfig,
kvBackend kvdb.Backend, sqlDB SQLQueries) error {
log.Infof("Starting migration of the graph store from KV to SQL")
t0 := time.Now()
@@ -43,7 +43,8 @@ func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend,
}
// 1) Migrate all the nodes.
if err := migrateNodes(ctx, kvBackend, sqlDB); err != nil {
err = migrateNodes(ctx, cfg.QueryCfg, kvBackend, sqlDB)
if err != nil {
return fmt.Errorf("could not migrate nodes: %w", err)
}
@@ -53,7 +54,7 @@ func MigrateGraphToSQL(ctx context.Context, kvBackend kvdb.Backend,
}
// 3) Migrate all the channels and channel policies.
err = migrateChannelsAndPolicies(ctx, kvBackend, sqlDB, chain)
err = migrateChannelsAndPolicies(ctx, cfg, kvBackend, sqlDB)
if err != nil {
return fmt.Errorf("could not migrate channels and policies: %w",
err)
@@ -108,8 +109,8 @@ func checkGraphExists(db kvdb.Backend) (bool, error) {
// migrateNodes migrates all nodes from the KV backend to the SQL database.
// This includes doing a sanity check after each migration to ensure that the
// migrated node matches the original node.
func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
sqlDB SQLQueries) error {
func migrateNodes(ctx context.Context, cfg *sqldb.QueryConfig,
kvBackend kvdb.Backend, sqlDB SQLQueries) error {
// Keep track of the number of nodes migrated and the number of
// nodes skipped due to errors.
@@ -192,7 +193,7 @@ func migrateNodes(ctx context.Context, kvBackend kvdb.Backend,
pub, id, dbNode.ID)
}
migratedNode, err := buildNode(ctx, sqlDB, dbNode)
migratedNode, err := buildNode(ctx, cfg, sqlDB, dbNode)
if err != nil {
return fmt.Errorf("could not build migrated node "+
"from dbNode(db id: %d, node pub: %x): %w",
@@ -320,8 +321,8 @@ func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend,
// migrateChannelsAndPolicies migrates all channels and their policies
// from the KV backend to the SQL database.
func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend,
sqlDB SQLQueries, chain chainhash.Hash) error {
func migrateChannelsAndPolicies(ctx context.Context, cfg *SQLStoreConfig,
kvBackend kvdb.Backend, sqlDB SQLQueries) error {
var (
channelCount uint64
@@ -373,9 +374,10 @@ func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend,
// info, but rather rely on the chain hash LND is running with.
// So this is our way of ensuring that LND is running on the
// correct network at migration time.
if channel.ChainHash != chain {
if channel.ChainHash != cfg.ChainHash {
return fmt.Errorf("channel %d has chain hash %s, "+
"expected %s", scid, channel.ChainHash, chain)
"expected %s", scid, channel.ChainHash,
cfg.ChainHash)
}
// Sanity check to ensure that the channel has valid extra
@@ -413,7 +415,8 @@ func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend,
chunk++
err = migrateSingleChannel(
ctx, sqlDB, channel, policy1, policy2, migChanPolicy,
ctx, cfg, sqlDB, channel, policy1, policy2,
migChanPolicy,
)
if err != nil {
return fmt.Errorf("could not migrate channel %d: %w",
@@ -448,8 +451,8 @@ func migrateChannelsAndPolicies(ctx context.Context, kvBackend kvdb.Backend,
return nil
}
func migrateSingleChannel(ctx context.Context, sqlDB SQLQueries,
channel *models.ChannelEdgeInfo,
func migrateSingleChannel(ctx context.Context, cfg *SQLStoreConfig,
sqlDB SQLQueries, channel *models.ChannelEdgeInfo,
policy1, policy2 *models.ChannelEdgePolicy,
migChanPolicy func(*models.ChannelEdgePolicy) error) error {
@@ -508,7 +511,7 @@ func migrateSingleChannel(ctx context.Context, sqlDB SQLQueries,
}
migChan, migPol1, migPol2, err := getAndBuildChanAndPolicies(
ctx, sqlDB, row, channel.ChainHash,
ctx, cfg, sqlDB, row,
)
if err != nil {
return fmt.Errorf("could not build migrated channel and "+
@@ -703,9 +706,9 @@ func migratePruneLog(ctx context.Context, kvBackend kvdb.Backend,
// getAndBuildChanAndPolicies is a helper that builds the channel edge info
// and policies from the given row returned by the SQL query
// GetChannelBySCIDWithPolicies.
func getAndBuildChanAndPolicies(ctx context.Context, db SQLQueries,
row sqlc.GetChannelBySCIDWithPoliciesRow,
chain chainhash.Hash) (*models.ChannelEdgeInfo,
func getAndBuildChanAndPolicies(ctx context.Context, cfg *SQLStoreConfig,
db SQLQueries,
row sqlc.GetChannelBySCIDWithPoliciesRow) (*models.ChannelEdgeInfo,
*models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) {
node1, node2, err := buildNodeVertices(
@@ -716,7 +719,7 @@ func getAndBuildChanAndPolicies(ctx context.Context, db SQLQueries,
}
edge, err := getAndBuildEdgeInfo(
ctx, db, chain, row.GraphChannel, node1, node2,
ctx, cfg, db, row.GraphChannel, node1, node2,
)
if err != nil {
return nil, nil, nil, fmt.Errorf("unable to build channel "+
@@ -730,7 +733,8 @@ func getAndBuildChanAndPolicies(ctx context.Context, db SQLQueries,
}
policy1, policy2, err := getAndBuildChanPolicies(
ctx, db, dbPol1, dbPol2, edge.ChannelID, node1, node2,
ctx, cfg.QueryCfg, db, dbPol1, dbPol2, edge.ChannelID, node1,
node2,
)
if err != nil {
return nil, nil, nil, fmt.Errorf("unable to build channel "+

View File

@@ -333,9 +333,7 @@ func TestMigrateGraphToSQL(t *testing.T) {
require.True(t, ok)
// Run the migration.
err := MigrateGraphToSQL(
ctx, kvDB.db, sql.db, testChain,
)
err := MigrateGraphToSQL(ctx, sql.cfg, kvDB.db, sql.db)
require.NoError(t, err)
// Validate that the two databases are now in sync.
@@ -753,6 +751,18 @@ func TestMigrationWithChannelDB(t *testing.T) {
// bbolt channel.db file, set this to "channel.db".
const fileName = "channel.sqlite"
cfg := &SQLStoreConfig{
ChainHash: chain,
QueryCfg: sqldb.DefaultPostgresConfig(),
}
// Determine if we are using a SQLite file or a Bolt DB file.
var isSqlite bool
if strings.HasSuffix(fileName, ".sqlite") {
isSqlite = true
cfg.QueryCfg = sqldb.DefaultSQLiteConfig()
}
// Set up logging for the test.
UseLogger(btclog.NewSLogger(btclog.NewDefaultHandler(os.Stdout)))
@@ -763,7 +773,7 @@ func TestMigrationWithChannelDB(t *testing.T) {
err := graphStore.ExecTx(
ctx, sqldb.WriteTxOpt(), func(tx SQLQueries) error {
return MigrateGraphToSQL(
ctx, kvBackend, tx, chain,
ctx, cfg, kvBackend, tx,
)
}, sqldb.NoOpReset,
)
@@ -825,12 +835,6 @@ func TestMigrationWithChannelDB(t *testing.T) {
},
}
// Determine if we are using a SQLite file or a Bolt DB file.
var isSqlite bool
if strings.HasSuffix(fileName, ".sqlite") {
isSqlite = true
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
chanDBPath := path.Join(test.dbPath, fileName)
@@ -1125,7 +1129,7 @@ func runTestMigration(t *testing.T, populateKV func(t *testing.T, db *KVStore),
// Run the migration.
err := MigrateGraphToSQL(
ctx, kvDB.db, sql.db, testChain,
ctx, sql.cfg, kvDB.db, sql.db,
)
require.NoError(t, err)
@@ -1312,7 +1316,7 @@ func testMigrateGraphToSQLRapidOnce(t *testing.T, rt *rapid.T,
}
// Run the migration.
err := MigrateGraphToSQL(ctx, kvDB.db, sql.db, testChain)
err := MigrateGraphToSQL(ctx, sql.cfg, kvDB.db, sql.db)
require.NoError(t, err)
// Create a slice of all nodes.

View File

@@ -255,7 +255,7 @@ func (s *SQLStore) FetchLightningNode(ctx context.Context,
var node *models.LightningNode
err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
var err error
_, node, err = getNodeByPubKey(ctx, db, pubKey)
_, node, err = getNodeByPubKey(ctx, s.cfg.QueryCfg, db, pubKey)
return err
}, sqldb.NoOpReset)
@@ -483,7 +483,7 @@ func (s *SQLStore) SourceNode(ctx context.Context) (*models.LightningNode,
err)
}
_, node, err = getNodeByPubKey(ctx, db, nodePub)
_, node, err = getNodeByPubKey(ctx, s.cfg.QueryCfg, db, nodePub)
return err
}, sqldb.NoOpReset)
@@ -779,7 +779,7 @@ func (s *SQLStore) ForEachSourceNodeChannel(ctx context.Context,
}
_, otherNode, err := getNodeByPubKey(
ctx, db, otherNodePub,
ctx, s.cfg.QueryCfg, db, otherNodePub,
)
if err != nil {
return fmt.Errorf("unable to fetch "+
@@ -1761,8 +1761,7 @@ func (s *SQLStore) FetchChannelEdgesByID(chanID uint64) (
}
edge, err = getAndBuildEdgeInfo(
ctx, db, s.cfg.ChainHash, row.GraphChannel, node1,
node2,
ctx, s.cfg, db, row.GraphChannel, node1, node2,
)
if err != nil {
return fmt.Errorf("unable to build channel info: %w",
@@ -1776,7 +1775,8 @@ func (s *SQLStore) FetchChannelEdgesByID(chanID uint64) (
}
policy1, policy2, err = getAndBuildChanPolicies(
ctx, db, dbPol1, dbPol2, edge.ChannelID, node1, node2,
ctx, s.cfg.QueryCfg, db, dbPol1, dbPol2, edge.ChannelID,
node1, node2,
)
if err != nil {
return fmt.Errorf("unable to build channel "+
@@ -1833,8 +1833,7 @@ func (s *SQLStore) FetchChannelEdgesByOutpoint(op *wire.OutPoint) (
}
edge, err = getAndBuildEdgeInfo(
ctx, db, s.cfg.ChainHash, row.GraphChannel, node1,
node2,
ctx, s.cfg, db, row.GraphChannel, node1, node2,
)
if err != nil {
return fmt.Errorf("unable to build channel info: %w",
@@ -1848,7 +1847,8 @@ func (s *SQLStore) FetchChannelEdgesByOutpoint(op *wire.OutPoint) (
}
policy1, policy2, err = getAndBuildChanPolicies(
ctx, db, dbPol1, dbPol2, edge.ChannelID, node1, node2,
ctx, s.cfg.QueryCfg, db, dbPol1, dbPol2, edge.ChannelID,
node1, node2,
)
if err != nil {
return fmt.Errorf("unable to build channel "+
@@ -3171,7 +3171,7 @@ func updateChanEdgePolicy(ctx context.Context, tx SQLQueries,
}
// getNodeByPubKey attempts to look up a target node by its public key.
func getNodeByPubKey(ctx context.Context, db SQLQueries,
func getNodeByPubKey(ctx context.Context, cfg *sqldb.QueryConfig, db SQLQueries,
pubKey route.Vertex) (int64, *models.LightningNode, error) {
dbNode, err := db.GetNodeByPubKey(
@@ -3186,7 +3186,7 @@ func getNodeByPubKey(ctx context.Context, db SQLQueries,
return 0, nil, fmt.Errorf("unable to fetch node: %w", err)
}
node, err := buildNode(ctx, db, dbNode)
node, err := buildNode(ctx, cfg, db, dbNode)
if err != nil {
return 0, nil, fmt.Errorf("unable to build node: %w", err)
}
@@ -3210,14 +3210,9 @@ func buildCacheableChannelInfo(scid []byte, capacity int64, node1Pub,
// buildNode constructs a LightningNode instance from the given database node
// record. The node's features, addresses and extra signed fields are also
// fetched from the database and set on the node.
func buildNode(ctx context.Context, db SQLQueries,
func buildNode(ctx context.Context, cfg *sqldb.QueryConfig, db SQLQueries,
dbNode sqlc.GraphNode) (*models.LightningNode, error) {
// NOTE: buildNode is only used to load the data for a single node, and
// so no paged queries will be performed. This means that it's ok to
// used pass in default config values here.
cfg := sqldb.DefaultQueryConfig()
data, err := batchLoadNodeData(ctx, cfg, db, []int64{dbNode.ID})
if err != nil {
return nil, fmt.Errorf("unable to batch load node data: %w",
@@ -3945,22 +3940,21 @@ func upsertChanPolicyExtraSignedFields(ctx context.Context, db SQLQueries,
// getAndBuildEdgeInfo builds a models.ChannelEdgeInfo instance from the
// provided dbChanRow and also fetches any other required information
// to construct the edge info.
func getAndBuildEdgeInfo(ctx context.Context, db SQLQueries,
chain chainhash.Hash, dbChan sqlc.GraphChannel, node1,
func getAndBuildEdgeInfo(ctx context.Context, cfg *SQLStoreConfig,
db SQLQueries, dbChan sqlc.GraphChannel, node1,
node2 route.Vertex) (*models.ChannelEdgeInfo, error) {
// NOTE: getAndBuildEdgeInfo is only used to load the data for a single
// edge, and so no paged queries will be performed. This means that
// it's ok to used pass in default config values here.
cfg := sqldb.DefaultQueryConfig()
data, err := batchLoadChannelData(ctx, cfg, db, []int64{dbChan.ID}, nil)
data, err := batchLoadChannelData(
ctx, cfg.QueryCfg, db, []int64{dbChan.ID}, nil,
)
if err != nil {
return nil, fmt.Errorf("unable to batch load channel data: %w",
err)
}
return buildEdgeInfoWithBatchData(chain, dbChan, node1, node2, data)
return buildEdgeInfoWithBatchData(
cfg.ChainHash, dbChan, node1, node2, data,
)
}
// buildEdgeInfoWithBatchData builds edge info using pre-loaded batch data.
@@ -4059,9 +4053,9 @@ func buildNodeVertices(node1Pub, node2Pub []byte) (route.Vertex,
// retrieves all the extra info required to build the complete
// models.ChannelEdgePolicy types. It returns two policies, which may be nil if
// the provided sqlc.GraphChannelPolicy records are nil.
func getAndBuildChanPolicies(ctx context.Context, db SQLQueries,
dbPol1, dbPol2 *sqlc.GraphChannelPolicy, channelID uint64, node1,
node2 route.Vertex) (*models.ChannelEdgePolicy,
func getAndBuildChanPolicies(ctx context.Context, cfg *sqldb.QueryConfig,
db SQLQueries, dbPol1, dbPol2 *sqlc.GraphChannelPolicy,
channelID uint64, node1, node2 route.Vertex) (*models.ChannelEdgePolicy,
*models.ChannelEdgePolicy, error) {
if dbPol1 == nil && dbPol2 == nil {
@@ -4076,12 +4070,6 @@ func getAndBuildChanPolicies(ctx context.Context, db SQLQueries,
policyIDs = append(policyIDs, dbPol2.ID)
}
// NOTE: getAndBuildChanPolicies is only used to load the data for
// a maximum of two policies, and so no paged queries will be
// performed (unless the page size is one). So it's ok to use
// the default config values here.
cfg := sqldb.DefaultQueryConfig()
batchData, err := batchLoadChannelData(ctx, cfg, db, nil, policyIDs)
if err != nil {
return nil, nil, fmt.Errorf("unable to batch load channel "+

View File

@@ -142,10 +142,15 @@ func openNativeSQLGraphDB(ht *lntest.HarnessTest,
},
)
queryCfg := sqldb.DefaultSQLiteConfig()
if hn.Cfg.DBBackend != node.BackendSqlite {
queryCfg = sqldb.DefaultPostgresConfig()
}
store, err := graphdb.NewSQLStore(
&graphdb.SQLStoreConfig{
ChainHash: *ht.Miner().ActiveNet.GenesisHash,
QueryCfg: sqldb.DefaultQueryConfig(),
QueryCfg: queryCfg,
},
executor,
)

View File

@@ -72,14 +72,6 @@ func (c *QueryConfig) Validate(sqlite bool) error {
return nil
}
// DefaultQueryConfig returns a default configuration for SQL queries.
func DefaultQueryConfig() *QueryConfig {
return &QueryConfig{
MaxBatchSize: 250,
MaxPageSize: 10000,
}
}
// DefaultSQLiteConfig returns a default configuration for SQL queries to a
// SQLite backend.
func DefaultSQLiteConfig() *QueryConfig {

View File

@@ -22,7 +22,7 @@ func TestExecuteBatchQuery(t *testing.T) {
t.Run("empty input returns nil", func(t *testing.T) {
var (
cfg = DefaultQueryConfig()
cfg = DefaultSQLiteConfig()
inputItems []int
)
@@ -144,7 +144,7 @@ func TestExecuteBatchQuery(t *testing.T) {
t.Run("query function error is propagated", func(t *testing.T) {
var (
cfg = DefaultQueryConfig()
cfg = DefaultSQLiteConfig()
inputItems = []int{1, 2, 3}
)
@@ -174,7 +174,7 @@ func TestExecuteBatchQuery(t *testing.T) {
t.Run("callback error is propagated", func(t *testing.T) {
var (
cfg = DefaultQueryConfig()
cfg = DefaultSQLiteConfig()
inputItems = []int{1, 2, 3}
)
@@ -307,7 +307,7 @@ func TestSQLSliceQueries(t *testing.T) {
err := ExecuteBatchQuery(
ctx,
DefaultQueryConfig(),
DefaultSQLiteConfig(),
queryParams,
func(s string) string {
return s