mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-09-06 09:34:13 +02:00
kvdb: make etcd calls timeout to ensure liveness
Previously our RPC calls to etcd would hang even in the case of properly set dial timeouts and even if there was a network partition. To ensure liveness we need to make sure that calls fail correctly in case of system failure. To fix this we add a default timeout of 30 seconds to each etcd RPC call.
This commit is contained in:
@@ -8,12 +8,25 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/google/btree"
|
"github.com/google/btree"
|
||||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||||
v3 "go.etcd.io/etcd/client/v3"
|
v3 "go.etcd.io/etcd/client/v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// rpcTimeout is the timeout for all RPC calls to etcd. It is set to 30
|
||||||
|
// seconds to avoid blocking the server for too long but give reasonable
|
||||||
|
// time for etcd to respond. If any operations would take longer than 30
|
||||||
|
// seconds that generally means there's a problem with the etcd server
|
||||||
|
// or the network resulting in degraded performance in which case we
|
||||||
|
// want LND to fail fast. Due to the underlying gRPC implementation in
|
||||||
|
// etcd calls without a timeout can hang indefinitely even in the case
|
||||||
|
// of network partitions or other critical failures.
|
||||||
|
rpcTimeout = time.Second * 30
|
||||||
|
)
|
||||||
|
|
||||||
type CommitStats struct {
|
type CommitStats struct {
|
||||||
Rset int
|
Rset int
|
||||||
Wset int
|
Wset int
|
||||||
@@ -609,8 +622,13 @@ func (s *stm) FetchRangePaginatedRaw(prefix string, limit int64,
|
|||||||
|
|
||||||
key := prefix
|
key := prefix
|
||||||
for {
|
for {
|
||||||
|
timeoutCtx, cancel := context.WithTimeout(
|
||||||
|
s.options.ctx, rpcTimeout,
|
||||||
|
)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
resp, err := s.client.Get(
|
resp, err := s.client.Get(
|
||||||
s.options.ctx, key, append(opts, s.getOpts...)...,
|
timeoutCtx, key, append(opts, s.getOpts...)...,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return DatabaseError{
|
return DatabaseError{
|
||||||
@@ -645,8 +663,12 @@ func (s *stm) FetchRangePaginatedRaw(prefix string, limit int64,
|
|||||||
// We'll also cache the returned key/value in the read set.
|
// We'll also cache the returned key/value in the read set.
|
||||||
func (s *stm) fetch(key string, opts ...v3.OpOption) ([]KV, error) {
|
func (s *stm) fetch(key string, opts ...v3.OpOption) ([]KV, error) {
|
||||||
s.callCount++
|
s.callCount++
|
||||||
|
|
||||||
|
timeoutCtx, cancel := context.WithTimeout(s.options.ctx, rpcTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
resp, err := s.client.Get(
|
resp, err := s.client.Get(
|
||||||
s.options.ctx, key, append(opts, s.getOpts...)...,
|
timeoutCtx, key, append(opts, s.getOpts...)...,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, DatabaseError{
|
return nil, DatabaseError{
|
||||||
@@ -1049,7 +1071,10 @@ func (s *stm) Prefetch(keys []string, prefixes []string) {
|
|||||||
[]v3.OpOption{v3.WithPrefix()}, s.getOpts...,
|
[]v3.OpOption{v3.WithPrefix()}, s.getOpts...,
|
||||||
)
|
)
|
||||||
|
|
||||||
txn := s.client.Txn(s.options.ctx)
|
timeoutCtx, cancel := context.WithTimeout(s.options.ctx, rpcTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
txn := s.client.Txn(timeoutCtx)
|
||||||
ops := make([]v3.Op, 0, len(fetchKeys)+len(fetchPrefixes))
|
ops := make([]v3.Op, 0, len(fetchKeys)+len(fetchPrefixes))
|
||||||
|
|
||||||
for _, key := range fetchKeys {
|
for _, key := range fetchKeys {
|
||||||
@@ -1103,8 +1128,11 @@ func (s *stm) commit() (CommitStats, error) {
|
|||||||
|
|
||||||
// Create the compare set.
|
// Create the compare set.
|
||||||
cmps := append(rset, wset...)
|
cmps := append(rset, wset...)
|
||||||
|
|
||||||
// Create a transaction with the optional abort context.
|
// Create a transaction with the optional abort context.
|
||||||
txn := s.client.Txn(s.options.ctx)
|
timeoutCtx, cancel := context.WithTimeout(s.options.ctx, rpcTimeout)
|
||||||
|
defer cancel()
|
||||||
|
txn := s.client.Txn(timeoutCtx)
|
||||||
|
|
||||||
// If the compare set holds, try executing the puts.
|
// If the compare set holds, try executing the puts.
|
||||||
txn = txn.If(cmps...)
|
txn = txn.If(cmps...)
|
||||||
|
Reference in New Issue
Block a user