mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-07-04 12:31:39 +02:00
kvdb+sqldb: use the same tx retry helper
This commit is contained in:
@ -3,6 +3,9 @@ package sqldb
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
prand "math/rand"
|
||||
"time"
|
||||
|
||||
@ -24,6 +27,9 @@ const (
|
||||
// DefaultRetryDelay is the default delay between retries. This will be
|
||||
// used to generate a random delay between 0 and this value.
|
||||
DefaultRetryDelay = time.Millisecond * 50
|
||||
|
||||
// DefaultMaxRetryDelay is the default maximum delay between retries.
|
||||
DefaultMaxRetryDelay = time.Second
|
||||
)
|
||||
|
||||
// TxOptions represents a set of options one can use to control what type of
|
||||
@ -156,38 +162,98 @@ func NewTransactionExecutor[Querier any](db BatchedQuerier,
|
||||
}
|
||||
}
|
||||
|
||||
// ExecTx is a wrapper for txBody to abstract the creation and commit of a db
|
||||
// transaction. The db transaction is embedded in a `*Queries` that txBody
|
||||
// needs to use when executing each one of the queries that need to be applied
|
||||
// atomically. This can be used by other storage interfaces to parameterize the
|
||||
// type of query and options run, in order to have access to batched operations
|
||||
// related to a storage object.
|
||||
func (t *TransactionExecutor[Q]) ExecTx(ctx context.Context,
|
||||
txOptions TxOptions, txBody func(Q) error) error {
|
||||
// randRetryDelay returns a random retry delay between -50% and +50% of the
|
||||
// configured delay that is doubled for each attempt and capped at a max value.
|
||||
func randRetryDelay(initialRetryDelay, maxRetryDelay time.Duration,
|
||||
attempt int) time.Duration {
|
||||
|
||||
waitBeforeRetry := func(attemptNumber int) {
|
||||
retryDelay := t.opts.randRetryDelay()
|
||||
halfDelay := initialRetryDelay / 2
|
||||
randDelay := rand.Int63n(int64(initialRetryDelay)) //nolint:gosec
|
||||
|
||||
log.Tracef("Retrying transaction due to tx serialization "+
|
||||
"error, attempt_number=%v, delay=%v", attemptNumber,
|
||||
retryDelay)
|
||||
// 50% plus 0%-100% gives us the range of 50%-150%.
|
||||
initialDelay := halfDelay + time.Duration(randDelay)
|
||||
|
||||
// Before we try again, we'll wait with a random backoff based
|
||||
// on the retry delay.
|
||||
time.Sleep(retryDelay)
|
||||
// If this is the first attempt, we just return the initial delay.
|
||||
if attempt == 0 {
|
||||
return initialDelay
|
||||
}
|
||||
|
||||
for i := 0; i < t.opts.numRetries; i++ {
|
||||
// Create the db transaction.
|
||||
tx, err := t.BatchedQuerier.BeginTx(ctx, txOptions)
|
||||
// For each subsequent delay, we double the initial delay. This still
|
||||
// gives us a somewhat random delay, but it still increases with each
|
||||
// attempt. If we double something n times, that's the same as
|
||||
// multiplying the value with 2^n. We limit the power to 32 to avoid
|
||||
// overflows.
|
||||
factor := time.Duration(math.Pow(2, math.Min(float64(attempt), 32)))
|
||||
actualDelay := initialDelay * factor
|
||||
|
||||
// Cap the delay at the maximum configured value.
|
||||
if actualDelay > maxRetryDelay {
|
||||
return maxRetryDelay
|
||||
}
|
||||
|
||||
return actualDelay
|
||||
}
|
||||
|
||||
// MakeTx is a function that creates a new transaction. It returns a Tx and an
|
||||
// error if the transaction cannot be created. This is used to abstract the
|
||||
// creation of a transaction from the actual transaction logic in order to be
|
||||
// able to reuse the transaction retry logic in other packages.
|
||||
type MakeTx func() (Tx, error)
|
||||
|
||||
// TxBody represents the function type for transactions. It returns an
|
||||
// error to indicate success or failure.
|
||||
type TxBody func(tx Tx) error
|
||||
|
||||
// RollbackTx is a function that is called when a transaction needs to be rolled
|
||||
// back due to a serialization error. By using this intermediate function, we
|
||||
// can avoid having to return rollback errors that are not actionable by the
|
||||
// caller.
|
||||
type RollbackTx func(tx Tx) error
|
||||
|
||||
// OnBackoff is a function that is called when a transaction is retried due to a
|
||||
// serialization error. The function is called with the retry attempt number and
|
||||
// the delay before the next retry.
|
||||
type OnBackoff func(retry int, delay time.Duration)
|
||||
|
||||
// ExecuteSQLTransactionWithRetry is a helper function that executes a
|
||||
// transaction with retry logic. It will retry the transaction if it fails with
|
||||
// a serialization error. The function will return an error if the transaction
|
||||
// fails with a non-retryable error, the context is cancelled or the number of
|
||||
// retries is exceeded.
|
||||
func ExecuteSQLTransactionWithRetry(ctx context.Context, makeTx MakeTx,
|
||||
rollbackTx RollbackTx, txBody TxBody, onBackoff OnBackoff,
|
||||
numRetries int) error {
|
||||
|
||||
waitBeforeRetry := func(attemptNumber int) bool {
|
||||
retryDelay := randRetryDelay(
|
||||
DefaultRetryDelay, DefaultMaxRetryDelay, attemptNumber,
|
||||
)
|
||||
|
||||
onBackoff(attemptNumber, retryDelay)
|
||||
|
||||
select {
|
||||
// Before we try again, we'll wait with a random backoff based
|
||||
// on the retry delay.
|
||||
case <-time.After(retryDelay):
|
||||
return true
|
||||
|
||||
// If the daemon is shutting down, then we'll exit early.
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < numRetries; i++ {
|
||||
tx, err := makeTx()
|
||||
if err != nil {
|
||||
dbErr := MapSQLError(err)
|
||||
if IsSerializationError(dbErr) {
|
||||
// Nothing to roll back here, since we didn't
|
||||
// even get a transaction yet.
|
||||
waitBeforeRetry(i)
|
||||
|
||||
continue
|
||||
// Nothing to roll back here, since we haven't
|
||||
// even get a transaction yet. We'll just wait
|
||||
// and try again.
|
||||
if waitBeforeRetry(i) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return dbErr
|
||||
@ -199,32 +265,38 @@ func (t *TransactionExecutor[Q]) ExecTx(ctx context.Context,
|
||||
_ = tx.Rollback()
|
||||
}()
|
||||
|
||||
if err := txBody(t.createQuery(tx)); err != nil {
|
||||
dbErr := MapSQLError(err)
|
||||
if bodyErr := txBody(tx); bodyErr != nil {
|
||||
// Roll back the transaction, then attempt a random
|
||||
// backoff and try again if the error was a
|
||||
// serialization error.
|
||||
if err := rollbackTx(tx); err != nil {
|
||||
return MapSQLError(err)
|
||||
}
|
||||
|
||||
dbErr := MapSQLError(bodyErr)
|
||||
if IsSerializationError(dbErr) {
|
||||
// Roll back the transaction, then pop back up
|
||||
// to try once again.
|
||||
_ = tx.Rollback()
|
||||
|
||||
waitBeforeRetry(i)
|
||||
|
||||
continue
|
||||
if waitBeforeRetry(i) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return dbErr
|
||||
}
|
||||
|
||||
// Commit transaction.
|
||||
if err = tx.Commit(); err != nil {
|
||||
dbErr := MapSQLError(err)
|
||||
if commitErr := tx.Commit(); commitErr != nil {
|
||||
// Roll back the transaction, then attempt a random
|
||||
// backoff and try again if the error was a
|
||||
// serialization error.
|
||||
if err := rollbackTx(tx); err != nil {
|
||||
return MapSQLError(err)
|
||||
}
|
||||
|
||||
dbErr := MapSQLError(commitErr)
|
||||
if IsSerializationError(dbErr) {
|
||||
// Roll back the transaction, then pop back up
|
||||
// to try once again.
|
||||
_ = tx.Rollback()
|
||||
|
||||
waitBeforeRetry(i)
|
||||
|
||||
continue
|
||||
if waitBeforeRetry(i) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return dbErr
|
||||
@ -238,6 +310,49 @@ func (t *TransactionExecutor[Q]) ExecTx(ctx context.Context,
|
||||
return ErrRetriesExceeded
|
||||
}
|
||||
|
||||
// ExecTx is a wrapper for txBody to abstract the creation and commit of a db
|
||||
// transaction. The db transaction is embedded in a `*Queries` that txBody
|
||||
// needs to use when executing each one of the queries that need to be applied
|
||||
// atomically. This can be used by other storage interfaces to parameterize the
|
||||
// type of query and options run, in order to have access to batched operations
|
||||
// related to a storage object.
|
||||
func (t *TransactionExecutor[Q]) ExecTx(ctx context.Context,
|
||||
txOptions TxOptions, txBody func(Q) error) error {
|
||||
|
||||
makeTx := func() (Tx, error) {
|
||||
return t.BatchedQuerier.BeginTx(ctx, txOptions)
|
||||
}
|
||||
|
||||
execTxBody := func(tx Tx) error {
|
||||
sqlTx, ok := tx.(*sql.Tx)
|
||||
if !ok {
|
||||
return fmt.Errorf("expected *sql.Tx, got %T", tx)
|
||||
}
|
||||
return txBody(t.createQuery(sqlTx))
|
||||
}
|
||||
|
||||
onBackoff := func(retry int, delay time.Duration) {
|
||||
log.Tracef("Retrying transaction due to tx serialization "+
|
||||
"error, attempt_number=%v, delay=%v", retry, delay)
|
||||
}
|
||||
|
||||
rollbackTx := func(tx Tx) error {
|
||||
sqlTx, ok := tx.(*sql.Tx)
|
||||
if !ok {
|
||||
return fmt.Errorf("expected *sql.Tx, got %T", tx)
|
||||
}
|
||||
|
||||
_ = sqlTx.Rollback()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return ExecuteSQLTransactionWithRetry(
|
||||
ctx, makeTx, rollbackTx, execTxBody, onBackoff,
|
||||
t.opts.numRetries,
|
||||
)
|
||||
}
|
||||
|
||||
// BaseDB is the base database struct that each implementation can embed to
|
||||
// gain some common functionality.
|
||||
type BaseDB struct {
|
||||
|
Reference in New Issue
Block a user