multi: add a re-usable TxOptions type

Add a re-usable implementation of the sqldb.TxOptions interface and make
use of this in the various spots (invoices, batch and graph/db) where we
had previously defined individual implementations that were all doing
the same thing.
This commit is contained in:
Elle Mouton
2025-05-27 18:53:31 +02:00
parent 8e96bd0308
commit c4e6f23c5b
9 changed files with 79 additions and 134 deletions

View File

@@ -14,19 +14,6 @@ var errSolo = errors.New(
"batch function returned an error and should be re-run solo", "batch function returned an error and should be re-run solo",
) )
// txOpts implements the sqldb.TxOptions interface. It is used to indicate that
// the transaction can be read-only or not transaction.
type txOpts struct {
readOnly bool
}
// ReadOnly returns true if the transaction should be read only.
//
// NOTE: This is part of the sqldb.TxOptions interface.
func (t *txOpts) ReadOnly() bool {
return t.readOnly
}
type request[Q any] struct { type request[Q any] struct {
*Request[Q] *Request[Q]
errChan chan error errChan chan error
@@ -38,7 +25,7 @@ type batch[Q any] struct {
reqs []*request[Q] reqs []*request[Q]
clear func(b *batch[Q]) clear func(b *batch[Q])
locker sync.Locker locker sync.Locker
txOpts txOpts txOpts sqldb.TxOptions
} }
// trigger is the entry point for the batch and ensures that run is started at // trigger is the entry point for the batch and ensures that run is started at
@@ -68,7 +55,7 @@ func (b *batch[Q]) run(ctx context.Context) {
// that fail will be retried individually. // that fail will be retried individually.
for len(b.reqs) > 0 { for len(b.reqs) > 0 {
var failIdx = -1 var failIdx = -1
err := b.db.ExecTx(ctx, &b.txOpts, func(tx Q) error { err := b.db.ExecTx(ctx, b.txOpts, func(tx Q) error {
for i, req := range b.reqs { for i, req := range b.reqs {
err := req.Do(tx) err := req.Do(tx)
if err != nil { if err != nil {

View File

@@ -550,7 +550,7 @@ func benchmarkSQLBatching(b *testing.B, sqlite bool) {
} }
ctx := context.Background() ctx := context.Background()
var opts txOpts opts := sqldb.WriteTxOpt()
// writeRecord is a helper that adds a single new invoice to the // writeRecord is a helper that adds a single new invoice to the
// database. It uses the 'i' argument to create a unique hash for the // database. It uses the 'i' argument to create a unique hash for the
@@ -578,13 +578,12 @@ func benchmarkSQLBatching(b *testing.B, sqlite bool) {
var hash [8]byte var hash [8]byte
binary.BigEndian.PutUint64(hash[:], uint64(N-1)) binary.BigEndian.PutUint64(hash[:], uint64(N-1))
err := tx.ExecTx( err := tx.ExecTx(ctx, opts, func(queries *sqlc.Queries) error {
ctx, &txOpts{}, func(queries *sqlc.Queries) error { _, err := queries.GetInvoiceByHash(ctx, hash[:])
_, err := queries.GetInvoiceByHash(ctx, hash[:]) require.NoError(b, err)
require.NoError(b, err)
return nil return nil
}, func() {}, }, func() {},
) )
require.NoError(b, err) require.NoError(b, err)
} }
@@ -602,7 +601,7 @@ func benchmarkSQLBatching(b *testing.B, sqlite bool) {
defer wg.Done() defer wg.Done()
err := db.ExecTx( err := db.ExecTx(
ctx, &opts, ctx, opts,
func(tx *sqlc.Queries) error { func(tx *sqlc.Queries) error {
writeRecord(b, tx, int64(j)) writeRecord(b, tx, int64(j))
return nil return nil
@@ -624,7 +623,7 @@ func benchmarkSQLBatching(b *testing.B, sqlite bool) {
b.ResetTimer() b.ResetTimer()
err := db.ExecTx( err := db.ExecTx(
ctx, &opts, ctx, opts,
func(tx *sqlc.Queries) error { func(tx *sqlc.Queries) error {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
writeRecord(b, tx, int64(i)) writeRecord(b, tx, int64(i))

View File

@@ -65,9 +65,7 @@ func (s *TimeScheduler[Q]) Execute(ctx context.Context, r *Request[Q]) error {
// By default, we assume that the batch is read-only, // By default, we assume that the batch is read-only,
// and we only upgrade it to read-write if a request // and we only upgrade it to read-write if a request
// is added that is not read-only. // is added that is not read-only.
txOpts: txOpts{ txOpts: sqldb.ReadTxOpt(),
readOnly: true,
},
} }
trigger := s.b.trigger trigger := s.b.trigger
time.AfterFunc(s.duration, func() { time.AfterFunc(s.duration, func() {
@@ -78,8 +76,8 @@ func (s *TimeScheduler[Q]) Execute(ctx context.Context, r *Request[Q]) error {
// We only upgrade the batch to read-write if the new request is not // We only upgrade the batch to read-write if the new request is not
// read-only. If it is already read-write, we don't need to do anything. // read-only. If it is already read-write, we don't need to do anything.
if s.b.txOpts.readOnly && !r.Opts.ReadOnly { if s.b.txOpts.ReadOnly() && !r.Opts.ReadOnly {
s.b.txOpts.readOnly = false s.b.txOpts = sqldb.WriteTxOpt()
} }
// If this is a non-lazy request, we'll execute the batch immediately. // If this is a non-lazy request, we'll execute the batch immediately.
@@ -109,7 +107,7 @@ func (s *TimeScheduler[Q]) Execute(ctx context.Context, r *Request[Q]) error {
} }
// Otherwise, run the request on its own. // Otherwise, run the request on its own.
commitErr := s.db.ExecTx(ctx, &txOpts, func(tx Q) error { commitErr := s.db.ExecTx(ctx, txOpts, func(tx Q) error {
return req.Do(tx) return req.Do(tx)
}, func() { }, func() {
if req.Reset != nil { if req.Reset != nil {

View File

@@ -139,27 +139,6 @@ func NewSQLStore(db BatchedSQLQueries, kvStore *KVStore,
return s, nil return s, nil
} }
// TxOptions defines the set of db txn options the SQLQueries
// understands.
type TxOptions struct {
// readOnly governs if a read only transaction is needed or not.
readOnly bool
}
// ReadOnly returns true if the transaction should be read only.
//
// NOTE: This implements the TxOptions.
func (a *TxOptions) ReadOnly() bool {
return a.readOnly
}
// NewReadTx creates a new read transaction option set.
func NewReadTx() *TxOptions {
return &TxOptions{
readOnly: true,
}
}
// AddLightningNode adds a vertex/node to the graph database. If the node is not // AddLightningNode adds a vertex/node to the graph database. If the node is not
// in the database from before, this will add a new, unconnected one to the // in the database from before, this will add a new, unconnected one to the
// graph. If it is present from before, this will update that node's // graph. If it is present from before, this will update that node's
@@ -192,11 +171,8 @@ func (s *SQLStore) FetchLightningNode(pubKey route.Vertex) (
ctx := context.TODO() ctx := context.TODO()
var ( var node *models.LightningNode
readTx = NewReadTx() err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
node *models.LightningNode
)
err := s.db.ExecTx(ctx, readTx, func(db SQLQueries) error {
var err error var err error
_, node, err = getNodeByPubKey(ctx, db, pubKey) _, node, err = getNodeByPubKey(ctx, db, pubKey)
@@ -222,11 +198,10 @@ func (s *SQLStore) HasLightningNode(pubKey [33]byte) (time.Time, bool,
ctx := context.TODO() ctx := context.TODO()
var ( var (
readTx = NewReadTx()
exists bool exists bool
lastUpdate time.Time lastUpdate time.Time
) )
err := s.db.ExecTx(ctx, readTx, func(db SQLQueries) error { err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
dbNode, err := db.GetNodeByPubKey( dbNode, err := db.GetNodeByPubKey(
ctx, sqlc.GetNodeByPubKeyParams{ ctx, sqlc.GetNodeByPubKeyParams{
Version: int16(ProtocolV1), Version: int16(ProtocolV1),
@@ -266,11 +241,10 @@ func (s *SQLStore) AddrsForNode(nodePub *btcec.PublicKey) (bool, []net.Addr,
ctx := context.TODO() ctx := context.TODO()
var ( var (
readTx = NewReadTx()
addresses []net.Addr addresses []net.Addr
known bool known bool
) )
err := s.db.ExecTx(ctx, readTx, func(db SQLQueries) error { err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
var err error var err error
known, addresses, err = getNodeAddresses( known, addresses, err = getNodeAddresses(
ctx, db, nodePub.SerializeCompressed(), ctx, db, nodePub.SerializeCompressed(),
@@ -297,8 +271,7 @@ func (s *SQLStore) AddrsForNode(nodePub *btcec.PublicKey) (bool, []net.Addr,
func (s *SQLStore) DeleteLightningNode(pubKey route.Vertex) error { func (s *SQLStore) DeleteLightningNode(pubKey route.Vertex) error {
ctx := context.TODO() ctx := context.TODO()
var writeTxOpts TxOptions err := s.db.ExecTx(ctx, sqldb.WriteTxOpt(), func(db SQLQueries) error {
err := s.db.ExecTx(ctx, &writeTxOpts, func(db SQLQueries) error {
res, err := db.DeleteNodeByPubKey( res, err := db.DeleteNodeByPubKey(
ctx, sqlc.DeleteNodeByPubKeyParams{ ctx, sqlc.DeleteNodeByPubKeyParams{
Version: int16(ProtocolV1), Version: int16(ProtocolV1),
@@ -346,11 +319,10 @@ func (s *SQLStore) FetchNodeFeatures(nodePub route.Vertex) (
// NOTE: part of the V1Store interface. // NOTE: part of the V1Store interface.
func (s *SQLStore) LookupAlias(pub *btcec.PublicKey) (string, error) { func (s *SQLStore) LookupAlias(pub *btcec.PublicKey) (string, error) {
var ( var (
ctx = context.TODO() ctx = context.TODO()
readTx = NewReadTx() alias string
alias string
) )
err := s.db.ExecTx(ctx, readTx, func(db SQLQueries) error { err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
dbNode, err := db.GetNodeByPubKey( dbNode, err := db.GetNodeByPubKey(
ctx, sqlc.GetNodeByPubKeyParams{ ctx, sqlc.GetNodeByPubKeyParams{
Version: int16(ProtocolV1), Version: int16(ProtocolV1),
@@ -387,11 +359,8 @@ func (s *SQLStore) LookupAlias(pub *btcec.PublicKey) (string, error) {
func (s *SQLStore) SourceNode() (*models.LightningNode, error) { func (s *SQLStore) SourceNode() (*models.LightningNode, error) {
ctx := context.TODO() ctx := context.TODO()
var ( var node *models.LightningNode
readTx = NewReadTx() err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
node *models.LightningNode
)
err := s.db.ExecTx(ctx, readTx, func(db SQLQueries) error {
_, nodePub, err := getSourceNode(ctx, db, ProtocolV1) _, nodePub, err := getSourceNode(ctx, db, ProtocolV1)
if err != nil { if err != nil {
return fmt.Errorf("unable to fetch V1 source node: %w", return fmt.Errorf("unable to fetch V1 source node: %w",
@@ -416,9 +385,8 @@ func (s *SQLStore) SourceNode() (*models.LightningNode, error) {
// NOTE: part of the V1Store interface. // NOTE: part of the V1Store interface.
func (s *SQLStore) SetSourceNode(node *models.LightningNode) error { func (s *SQLStore) SetSourceNode(node *models.LightningNode) error {
ctx := context.TODO() ctx := context.TODO()
var writeTxOpts TxOptions
return s.db.ExecTx(ctx, &writeTxOpts, func(db SQLQueries) error { return s.db.ExecTx(ctx, sqldb.WriteTxOpt(), func(db SQLQueries) error {
id, err := upsertNode(ctx, db, node) id, err := upsertNode(ctx, db, node)
if err != nil { if err != nil {
return fmt.Errorf("unable to upsert source node: %w", return fmt.Errorf("unable to upsert source node: %w",
@@ -456,11 +424,8 @@ func (s *SQLStore) NodeUpdatesInHorizon(startTime,
ctx := context.TODO() ctx := context.TODO()
var ( var nodes []models.LightningNode
readTx = NewReadTx() err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
nodes []models.LightningNode
)
err := s.db.ExecTx(ctx, readTx, func(db SQLQueries) error {
dbNodes, err := db.GetNodesByLastUpdateRange( dbNodes, err := db.GetNodesByLastUpdateRange(
ctx, sqlc.GetNodesByLastUpdateRangeParams{ ctx, sqlc.GetNodesByLastUpdateRangeParams{
StartTime: sqldb.SQLInt64(startTime.Unix()), StartTime: sqldb.SQLInt64(startTime.Unix()),

View File

@@ -70,9 +70,8 @@ func TestMigrationWithChannelDB(t *testing.T) {
ctxb := context.Background() ctxb := context.Background()
const batchSize = 11 const batchSize = 11
var opts sqldb.MigrationTxOptions
err := sqlStore.ExecTx( err := sqlStore.ExecTx(
ctxb, &opts, func(tx *sqlc.Queries) error { ctxb, sqldb.WriteTxOpt(), func(tx *sqlc.Queries) error {
return invpkg.MigrateInvoicesToSQL( return invpkg.MigrateInvoicesToSQL(
ctxb, kvStore.Backend, kvStore, tx, ctxb, kvStore.Backend, kvStore, tx,
batchSize, batchSize,

View File

@@ -317,8 +317,8 @@ func testMigrateSingleInvoiceRapid(t *rapid.T, store *SQLStore, mpp bool,
invoices[hash] = invoice invoices[hash] = invoice
} }
var ops SQLInvoiceQueriesTxOptions ops := sqldb.WriteTxOpt()
err := store.db.ExecTx(ctxb, &ops, func(tx SQLInvoiceQueries) error { err := store.db.ExecTx(ctxb, ops, func(tx SQLInvoiceQueries) error {
for hash, invoice := range invoices { for hash, invoice := range invoices {
err := MigrateSingleInvoice(ctxb, tx, invoice, hash) err := MigrateSingleInvoice(ctxb, tx, invoice, hash)
require.NoError(t, err) require.NoError(t, err)

View File

@@ -151,27 +151,6 @@ type SQLInvoiceQueries interface { //nolint:interfacebloat
var _ InvoiceDB = (*SQLStore)(nil) var _ InvoiceDB = (*SQLStore)(nil)
// SQLInvoiceQueriesTxOptions defines the set of db txn options the
// SQLInvoiceQueries understands.
type SQLInvoiceQueriesTxOptions struct {
// readOnly governs if a read only transaction is needed or not.
readOnly bool
}
// ReadOnly returns true if the transaction should be read only.
//
// NOTE: This implements the TxOptions.
func (a *SQLInvoiceQueriesTxOptions) ReadOnly() bool {
return a.readOnly
}
// NewSQLInvoiceQueryReadTx creates a new read transaction option set.
func NewSQLInvoiceQueryReadTx() SQLInvoiceQueriesTxOptions {
return SQLInvoiceQueriesTxOptions{
readOnly: true,
}
}
// BatchedSQLInvoiceQueries is a version of the SQLInvoiceQueries that's capable // BatchedSQLInvoiceQueries is a version of the SQLInvoiceQueries that's capable
// of batched database operations. // of batched database operations.
type BatchedSQLInvoiceQueries interface { type BatchedSQLInvoiceQueries interface {
@@ -304,7 +283,7 @@ func (i *SQLStore) AddInvoice(ctx context.Context,
} }
var ( var (
writeTxOpts SQLInvoiceQueriesTxOptions writeTxOpts = sqldb.WriteTxOpt()
invoiceID int64 invoiceID int64
) )
@@ -315,7 +294,7 @@ func (i *SQLStore) AddInvoice(ctx context.Context,
return 0, err return 0, err
} }
err = i.db.ExecTx(ctx, &writeTxOpts, func(db SQLInvoiceQueries) error { err = i.db.ExecTx(ctx, writeTxOpts, func(db SQLInvoiceQueries) error {
var err error var err error
invoiceID, err = db.InsertInvoice(ctx, insertInvoiceParams) invoiceID, err = db.InsertInvoice(ctx, insertInvoiceParams)
if err != nil { if err != nil {
@@ -718,8 +697,8 @@ func (i *SQLStore) LookupInvoice(ctx context.Context,
err error err error
) )
readTxOpt := NewSQLInvoiceQueryReadTx() readTxOpt := sqldb.ReadTxOpt()
txErr := i.db.ExecTx(ctx, &readTxOpt, func(db SQLInvoiceQueries) error { txErr := i.db.ExecTx(ctx, readTxOpt, func(db SQLInvoiceQueries) error {
invoice, err = fetchInvoice(ctx, db, ref) invoice, err = fetchInvoice(ctx, db, ref)
return err return err
@@ -739,8 +718,8 @@ func (i *SQLStore) FetchPendingInvoices(ctx context.Context) (
var invoices map[lntypes.Hash]Invoice var invoices map[lntypes.Hash]Invoice
readTxOpt := NewSQLInvoiceQueryReadTx() readTxOpt := sqldb.ReadTxOpt()
err := i.db.ExecTx(ctx, &readTxOpt, func(db SQLInvoiceQueries) error { err := i.db.ExecTx(ctx, readTxOpt, func(db SQLInvoiceQueries) error {
return queryWithLimit(func(offset int) (int, error) { return queryWithLimit(func(offset int) (int, error) {
params := sqlc.FilterInvoicesParams{ params := sqlc.FilterInvoicesParams{
PendingOnly: true, PendingOnly: true,
@@ -800,8 +779,8 @@ func (i *SQLStore) InvoicesSettledSince(ctx context.Context, idx uint64) (
return invoices, nil return invoices, nil
} }
readTxOpt := NewSQLInvoiceQueryReadTx() readTxOpt := sqldb.ReadTxOpt()
err := i.db.ExecTx(ctx, &readTxOpt, func(db SQLInvoiceQueries) error { err := i.db.ExecTx(ctx, readTxOpt, func(db SQLInvoiceQueries) error {
err := queryWithLimit(func(offset int) (int, error) { err := queryWithLimit(func(offset int) (int, error) {
params := sqlc.FilterInvoicesParams{ params := sqlc.FilterInvoicesParams{
SettleIndexGet: sqldb.SQLInt64(idx + 1), SettleIndexGet: sqldb.SQLInt64(idx + 1),
@@ -946,8 +925,8 @@ func (i *SQLStore) InvoicesAddedSince(ctx context.Context, idx uint64) (
return result, nil return result, nil
} }
readTxOpt := NewSQLInvoiceQueryReadTx() readTxOpt := sqldb.ReadTxOpt()
err := i.db.ExecTx(ctx, &readTxOpt, func(db SQLInvoiceQueries) error { err := i.db.ExecTx(ctx, readTxOpt, func(db SQLInvoiceQueries) error {
return queryWithLimit(func(offset int) (int, error) { return queryWithLimit(func(offset int) (int, error) {
params := sqlc.FilterInvoicesParams{ params := sqlc.FilterInvoicesParams{
AddIndexGet: sqldb.SQLInt64(idx + 1), AddIndexGet: sqldb.SQLInt64(idx + 1),
@@ -1017,8 +996,8 @@ func (i *SQLStore) QueryInvoices(ctx context.Context,
"be non-zero") "be non-zero")
} }
readTxOpt := NewSQLInvoiceQueryReadTx() readTxOpt := sqldb.ReadTxOpt()
err := i.db.ExecTx(ctx, &readTxOpt, func(db SQLInvoiceQueries) error { err := i.db.ExecTx(ctx, readTxOpt, func(db SQLInvoiceQueries) error {
return queryWithLimit(func(offset int) (int, error) { return queryWithLimit(func(offset int) (int, error) {
params := sqlc.FilterInvoicesParams{ params := sqlc.FilterInvoicesParams{
NumOffset: int32(offset), NumOffset: int32(offset),
@@ -1465,8 +1444,8 @@ func (i *SQLStore) UpdateInvoice(ctx context.Context, ref InvoiceRef,
var updatedInvoice *Invoice var updatedInvoice *Invoice
txOpt := SQLInvoiceQueriesTxOptions{readOnly: false} txOpt := sqldb.WriteTxOpt()
txErr := i.db.ExecTx(ctx, &txOpt, func(db SQLInvoiceQueries) error { txErr := i.db.ExecTx(ctx, txOpt, func(db SQLInvoiceQueries) error {
switch { switch {
// For the default case we fetch all HTLCs. // For the default case we fetch all HTLCs.
case setID == nil: case setID == nil:
@@ -1539,8 +1518,8 @@ func (i *SQLStore) DeleteInvoice(ctx context.Context,
} }
} }
var writeTxOpt SQLInvoiceQueriesTxOptions writeTxOpt := sqldb.WriteTxOpt()
err := i.db.ExecTx(ctx, &writeTxOpt, func(db SQLInvoiceQueries) error { err := i.db.ExecTx(ctx, writeTxOpt, func(db SQLInvoiceQueries) error {
for _, ref := range invoicesToDelete { for _, ref := range invoicesToDelete {
params := sqlc.DeleteInvoiceParams{ params := sqlc.DeleteInvoiceParams{
AddIndex: sqldb.SQLInt64(ref.AddIndex), AddIndex: sqldb.SQLInt64(ref.AddIndex),
@@ -1584,8 +1563,8 @@ func (i *SQLStore) DeleteInvoice(ctx context.Context,
// DeleteCanceledInvoices removes all canceled invoices from the database. // DeleteCanceledInvoices removes all canceled invoices from the database.
func (i *SQLStore) DeleteCanceledInvoices(ctx context.Context) error { func (i *SQLStore) DeleteCanceledInvoices(ctx context.Context) error {
var writeTxOpt SQLInvoiceQueriesTxOptions writeTxOpt := sqldb.WriteTxOpt()
err := i.db.ExecTx(ctx, &writeTxOpt, func(db SQLInvoiceQueries) error { err := i.db.ExecTx(ctx, writeTxOpt, func(db SQLInvoiceQueries) error {
_, err := db.DeleteCanceledInvoices(ctx) _, err := db.DeleteCanceledInvoices(ctx)
if err != nil { if err != nil {
return fmt.Errorf("unable to delete canceled "+ return fmt.Errorf("unable to delete canceled "+

View File

@@ -39,6 +39,35 @@ type TxOptions interface {
ReadOnly() bool ReadOnly() bool
} }
// txOptions is a concrete implementation of the TxOptions interface.
type txOptions struct {
// readOnly indicates if the transaction should be read-only.
readOnly bool
}
// ReadOnly returns true if the transaction should be read only.
//
// NOTE: This is part of the TxOptions interface.
func (t *txOptions) ReadOnly() bool {
return t.readOnly
}
// WriteTxOpt returns a TxOptions that indicates that the transaction
// should be a write transaction.
func WriteTxOpt() TxOptions {
return &txOptions{
readOnly: false,
}
}
// ReadTxOpt returns a TxOptions that indicates that the transaction
// should be a read-only transaction.
func ReadTxOpt() TxOptions {
return &txOptions{
readOnly: true,
}
}
// BatchedTx is a generic interface that represents the ability to execute // BatchedTx is a generic interface that represents the ability to execute
// several operations to a given storage interface in a single atomic // several operations to a given storage interface in a single atomic
// transaction. Typically, Q here will be some subset of the main sqlc.Querier // transaction. Typically, Q here will be some subset of the main sqlc.Querier

View File

@@ -330,17 +330,6 @@ func (t *replacerFile) Close() error {
return nil return nil
} }
// MigrationTxOptions is the implementation of the TxOptions interface for
// migration transactions.
type MigrationTxOptions struct {
}
// ReadOnly returns false to indicate that migration transactions are not read
// only.
func (m *MigrationTxOptions) ReadOnly() bool {
return false
}
// ApplyMigrations applies the provided migrations to the database in sequence. // ApplyMigrations applies the provided migrations to the database in sequence.
// It ensures migrations are executed in the correct order, applying both custom // It ensures migrations are executed in the correct order, applying both custom
// migration functions and SQL migrations as needed. // migration functions and SQL migrations as needed.
@@ -433,12 +422,12 @@ func ApplyMigrations(ctx context.Context, db *BaseDB,
migration.SchemaVersion, err) migration.SchemaVersion, err)
} }
var opts MigrationTxOptions opts := WriteTxOpt()
// Run the custom migration as a transaction to ensure // Run the custom migration as a transaction to ensure
// atomicity. If successful, mark the migration as complete in // atomicity. If successful, mark the migration as complete in
// the migration tracker table. // the migration tracker table.
err = executor.ExecTx(ctx, &opts, func(tx *sqlc.Queries) error { err = executor.ExecTx(ctx, opts, func(tx *sqlc.Queries) error {
// Apply the migration function if one is provided. // Apply the migration function if one is provided.
if migration.MigrationFn != nil { if migration.MigrationFn != nil {
log.Infof("Applying custom migration '%v' "+ log.Infof("Applying custom migration '%v' "+