sqldb: cleanup scope state reset by adding reset closure to ExecTx

For SQL transactions, we often accumulate results in variables declared
outside the closure's scope. To eliminate the need for manually clearing
these containers, we introduce a reset function to ExecTx, mirroring the
approach already adopted in kvdb.
This commit is contained in:
Andras Banki-Horvath 2024-04-02 16:46:31 +02:00
parent c6073a14ca
commit 478ae1e9b0
No known key found for this signature in database
GPG Key ID: 80E5375C094198D8
2 changed files with 17 additions and 11 deletions

View File

@ -259,7 +259,7 @@ func (i *SQLStore) AddInvoice(ctx context.Context,
AddedAt: newInvoice.CreationDate.UTC(), AddedAt: newInvoice.CreationDate.UTC(),
InvoiceID: invoiceID, InvoiceID: invoiceID,
}) })
}) }, func() {})
if err != nil { if err != nil {
mappedSQLErr := sqldb.MapSQLError(err) mappedSQLErr := sqldb.MapSQLError(err)
var uniqueConstraintErr *sqldb.ErrSQLUniqueConstraintViolation var uniqueConstraintErr *sqldb.ErrSQLUniqueConstraintViolation
@ -599,7 +599,7 @@ func (i *SQLStore) LookupInvoice(ctx context.Context,
invoice, err = i.fetchInvoice(ctx, db, ref) invoice, err = i.fetchInvoice(ctx, db, ref)
return err return err
}) }, func() {})
if txErr != nil { if txErr != nil {
return Invoice{}, txErr return Invoice{}, txErr
} }
@ -617,7 +617,6 @@ func (i *SQLStore) FetchPendingInvoices(ctx context.Context) (
readTxOpt := NewSQLInvoiceQueryReadTx() readTxOpt := NewSQLInvoiceQueryReadTx()
err := i.db.ExecTx(ctx, &readTxOpt, func(db SQLInvoiceQueries) error { err := i.db.ExecTx(ctx, &readTxOpt, func(db SQLInvoiceQueries) error {
invoices = make(map[lntypes.Hash]Invoice)
limit := queryPaginationLimit limit := queryPaginationLimit
return queryWithLimit(func(offset int) (int, error) { return queryWithLimit(func(offset int) (int, error) {
@ -647,6 +646,8 @@ func (i *SQLStore) FetchPendingInvoices(ctx context.Context) (
return len(rows), nil return len(rows), nil
}, limit) }, limit)
}, func() {
invoices = make(map[lntypes.Hash]Invoice)
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to fetch pending invoices: %w", return nil, fmt.Errorf("unable to fetch pending invoices: %w",
@ -674,7 +675,6 @@ func (i *SQLStore) InvoicesSettledSince(ctx context.Context, idx uint64) (
readTxOpt := NewSQLInvoiceQueryReadTx() readTxOpt := NewSQLInvoiceQueryReadTx()
err := i.db.ExecTx(ctx, &readTxOpt, func(db SQLInvoiceQueries) error { err := i.db.ExecTx(ctx, &readTxOpt, func(db SQLInvoiceQueries) error {
invoices = nil
settleIdx := idx settleIdx := idx
limit := queryPaginationLimit limit := queryPaginationLimit
@ -762,6 +762,8 @@ func (i *SQLStore) InvoicesSettledSince(ctx context.Context, idx uint64) (
} }
return nil return nil
}, func() {
invoices = nil
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to get invoices settled since "+ return nil, fmt.Errorf("unable to get invoices settled since "+
@ -788,7 +790,6 @@ func (i *SQLStore) InvoicesAddedSince(ctx context.Context, idx uint64) (
readTxOpt := NewSQLInvoiceQueryReadTx() readTxOpt := NewSQLInvoiceQueryReadTx()
err := i.db.ExecTx(ctx, &readTxOpt, func(db SQLInvoiceQueries) error { err := i.db.ExecTx(ctx, &readTxOpt, func(db SQLInvoiceQueries) error {
result = nil
addIdx := idx addIdx := idx
limit := queryPaginationLimit limit := queryPaginationLimit
@ -821,6 +822,8 @@ func (i *SQLStore) InvoicesAddedSince(ctx context.Context, idx uint64) (
return len(rows), nil return len(rows), nil
}, limit) }, limit)
}, func() {
result = nil
}) })
if err != nil { if err != nil {
@ -845,7 +848,6 @@ func (i *SQLStore) QueryInvoices(ctx context.Context,
readTxOpt := NewSQLInvoiceQueryReadTx() readTxOpt := NewSQLInvoiceQueryReadTx()
err := i.db.ExecTx(ctx, &readTxOpt, func(db SQLInvoiceQueries) error { err := i.db.ExecTx(ctx, &readTxOpt, func(db SQLInvoiceQueries) error {
invoices = nil
limit := queryPaginationLimit limit := queryPaginationLimit
return queryWithLimit(func(offset int) (int, error) { return queryWithLimit(func(offset int) (int, error) {
@ -919,6 +921,8 @@ func (i *SQLStore) QueryInvoices(ctx context.Context,
return len(rows), nil return len(rows), nil
}, limit) }, limit)
}, func() {
invoices = nil
}) })
if err != nil { if err != nil {
return InvoiceSlice{}, fmt.Errorf("unable to query "+ return InvoiceSlice{}, fmt.Errorf("unable to query "+
@ -1306,7 +1310,7 @@ func (i *SQLStore) UpdateInvoice(ctx context.Context, ref InvoiceRef,
) )
return err return err
}) }, func() {})
if txErr != nil { if txErr != nil {
// If the invoice is already settled, we'll return the // If the invoice is already settled, we'll return the
// (unchanged) invoice and the ErrInvoiceAlreadySettled error. // (unchanged) invoice and the ErrInvoiceAlreadySettled error.
@ -1370,7 +1374,7 @@ func (i *SQLStore) DeleteInvoice(ctx context.Context,
} }
return nil return nil
}) }, func() {})
if err != nil { if err != nil {
return fmt.Errorf("unable to delete invoices: %w", err) return fmt.Errorf("unable to delete invoices: %w", err)
@ -1390,7 +1394,7 @@ func (i *SQLStore) DeleteCanceledInvoices(ctx context.Context) error {
} }
return nil return nil
}) }, func() {})
if err != nil { if err != nil {
return fmt.Errorf("unable to delete invoices: %w", err) return fmt.Errorf("unable to delete invoices: %w", err)
} }

View File

@ -52,7 +52,7 @@ type BatchedTx[Q any] interface {
// specify if a transaction should be read-only and optionally what // specify if a transaction should be read-only and optionally what
// type of concurrency control should be used. // type of concurrency control should be used.
ExecTx(ctx context.Context, txOptions TxOptions, ExecTx(ctx context.Context, txOptions TxOptions,
txBody func(Q) error) error txBody func(Q) error, reset func()) error
} }
// Tx represents a database transaction that can be committed or rolled back. // Tx represents a database transaction that can be committed or rolled back.
@ -317,7 +317,7 @@ func ExecuteSQLTransactionWithRetry(ctx context.Context, makeTx MakeTx,
// type of query and options run, in order to have access to batched operations // type of query and options run, in order to have access to batched operations
// related to a storage object. // related to a storage object.
func (t *TransactionExecutor[Q]) ExecTx(ctx context.Context, func (t *TransactionExecutor[Q]) ExecTx(ctx context.Context,
txOptions TxOptions, txBody func(Q) error) error { txOptions TxOptions, txBody func(Q) error, reset func()) error {
makeTx := func() (Tx, error) { makeTx := func() (Tx, error) {
return t.BatchedQuerier.BeginTx(ctx, txOptions) return t.BatchedQuerier.BeginTx(ctx, txOptions)
@ -328,6 +328,8 @@ func (t *TransactionExecutor[Q]) ExecTx(ctx context.Context,
if !ok { if !ok {
return fmt.Errorf("expected *sql.Tx, got %T", tx) return fmt.Errorf("expected *sql.Tx, got %T", tx)
} }
reset()
return txBody(t.createQuery(sqlTx)) return txBody(t.createQuery(sqlTx))
} }