mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-08-29 15:11:09 +02:00
multi: add leader check to the healthcheck monitor
This commit extends our healtcheck with an optional leader check. This is to ensure that given network partition or other cluster wide failure we act as soon as possible to avoid a split-brain situation where a new leader is elected but we still hold onto our etcd client.
This commit is contained in:
@@ -99,9 +99,27 @@ func (e *etcdLeaderElector) Leader(ctx context.Context) (string, error) {
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if resp == nil || len(resp.Kvs) == 0 {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
return string(resp.Kvs[0].Value), nil
|
return string(resp.Kvs[0].Value), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsLeader returns true if the caller is the leader.
|
||||||
|
func (e *etcdLeaderElector) IsLeader(ctx context.Context) (bool, error) {
|
||||||
|
resp, err := e.election.Leader(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp == nil || len(resp.Kvs) == 0 {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return string(resp.Kvs[0].Value) == e.id, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Campaign will start a new leader election campaign. Campaign will block until
|
// Campaign will start a new leader election campaign. Campaign will block until
|
||||||
// the elector context is canceled or the caller is elected as the leader.
|
// the elector context is canceled or the caller is elected as the leader.
|
||||||
func (e *etcdLeaderElector) Campaign(ctx context.Context) error {
|
func (e *etcdLeaderElector) Campaign(ctx context.Context) error {
|
||||||
@@ -110,6 +128,6 @@ func (e *etcdLeaderElector) Campaign(ctx context.Context) error {
|
|||||||
|
|
||||||
// Resign resigns the leader role allowing other election members to take
|
// Resign resigns the leader role allowing other election members to take
|
||||||
// the place.
|
// the place.
|
||||||
func (e *etcdLeaderElector) Resign() error {
|
func (e *etcdLeaderElector) Resign(ctx context.Context) error {
|
||||||
return e.election.Resign(context.Background())
|
return e.election.Resign(ctx)
|
||||||
}
|
}
|
||||||
|
@@ -87,12 +87,12 @@ func TestEtcdElector(t *testing.T) {
|
|||||||
tmp := <-ch
|
tmp := <-ch
|
||||||
first, err := tmp.Leader(ctxb)
|
first, err := tmp.Leader(ctxb)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NoError(t, tmp.Resign())
|
require.NoError(t, tmp.Resign(ctxb))
|
||||||
|
|
||||||
tmp = <-ch
|
tmp = <-ch
|
||||||
second, err := tmp.Leader(ctxb)
|
second, err := tmp.Leader(ctxb)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NoError(t, tmp.Resign())
|
require.NoError(t, tmp.Resign(ctxb))
|
||||||
|
|
||||||
require.Contains(t, []string{id1, id2}, first)
|
require.Contains(t, []string{id1, id2}, first)
|
||||||
require.Contains(t, []string{id1, id2}, second)
|
require.Contains(t, []string{id1, id2}, second)
|
||||||
|
@@ -19,8 +19,11 @@ type LeaderElector interface {
|
|||||||
|
|
||||||
// Resign resigns from the leader role, allowing other election members
|
// Resign resigns from the leader role, allowing other election members
|
||||||
// to take on leadership.
|
// to take on leadership.
|
||||||
Resign() error
|
Resign(ctx context.Context) error
|
||||||
|
|
||||||
// Leader returns the leader value for the current election.
|
// Leader returns the leader value for the current election.
|
||||||
Leader(ctx context.Context) (string, error)
|
Leader(ctx context.Context) (string, error)
|
||||||
|
|
||||||
|
// IsLeader returns true if the caller is the leader.
|
||||||
|
IsLeader(ctx context.Context) (bool, error)
|
||||||
}
|
}
|
||||||
|
17
config.go
17
config.go
@@ -169,6 +169,17 @@ const (
|
|||||||
defaultRSBackoff = time.Second * 30
|
defaultRSBackoff = time.Second * 30
|
||||||
defaultRSAttempts = 1
|
defaultRSAttempts = 1
|
||||||
|
|
||||||
|
// Set defaults for a health check which ensures that the leader
|
||||||
|
// election is functioning correctly. Although this check is off by
|
||||||
|
// default (as etcd leader election is only used in a clustered setup),
|
||||||
|
// we still set the default values so that the health check can be
|
||||||
|
// easily enabled with sane defaults. Note that by default we only run
|
||||||
|
// this check once, as it is critical for the node's operation.
|
||||||
|
defaultLeaderCheckInterval = time.Minute
|
||||||
|
defaultLeaderCheckTimeout = time.Second * 5
|
||||||
|
defaultLeaderCheckBackoff = time.Second * 5
|
||||||
|
defaultLeaderCheckAttempts = 1
|
||||||
|
|
||||||
// defaultRemoteMaxHtlcs specifies the default limit for maximum
|
// defaultRemoteMaxHtlcs specifies the default limit for maximum
|
||||||
// concurrent HTLCs the remote party may add to commitment transactions.
|
// concurrent HTLCs the remote party may add to commitment transactions.
|
||||||
// This value can be overridden with --default-remote-max-htlcs.
|
// This value can be overridden with --default-remote-max-htlcs.
|
||||||
@@ -672,6 +683,12 @@ func DefaultConfig() Config {
|
|||||||
Attempts: defaultRSAttempts,
|
Attempts: defaultRSAttempts,
|
||||||
Backoff: defaultRSBackoff,
|
Backoff: defaultRSBackoff,
|
||||||
},
|
},
|
||||||
|
LeaderCheck: &lncfg.CheckConfig{
|
||||||
|
Interval: defaultLeaderCheckInterval,
|
||||||
|
Timeout: defaultLeaderCheckTimeout,
|
||||||
|
Attempts: defaultLeaderCheckAttempts,
|
||||||
|
Backoff: defaultLeaderCheckBackoff,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Gossip: &lncfg.Gossip{
|
Gossip: &lncfg.Gossip{
|
||||||
MaxChannelUpdateBurst: discovery.DefaultMaxChannelUpdateBurst,
|
MaxChannelUpdateBurst: discovery.DefaultMaxChannelUpdateBurst,
|
||||||
|
@@ -34,6 +34,8 @@ type HealthCheckConfig struct {
|
|||||||
TorConnection *CheckConfig `group:"torconnection" namespace:"torconnection"`
|
TorConnection *CheckConfig `group:"torconnection" namespace:"torconnection"`
|
||||||
|
|
||||||
RemoteSigner *CheckConfig `group:"remotesigner" namespace:"remotesigner"`
|
RemoteSigner *CheckConfig `group:"remotesigner" namespace:"remotesigner"`
|
||||||
|
|
||||||
|
LeaderCheck *CheckConfig `group:"leader" namespace:"leader"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate checks the values configured for our health checks.
|
// Validate checks the values configured for our health checks.
|
||||||
|
26
lnd.go
26
lnd.go
@@ -24,6 +24,7 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/build"
|
"github.com/lightningnetwork/lnd/build"
|
||||||
"github.com/lightningnetwork/lnd/chanacceptor"
|
"github.com/lightningnetwork/lnd/chanacceptor"
|
||||||
"github.com/lightningnetwork/lnd/channeldb"
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
|
"github.com/lightningnetwork/lnd/cluster"
|
||||||
"github.com/lightningnetwork/lnd/keychain"
|
"github.com/lightningnetwork/lnd/keychain"
|
||||||
"github.com/lightningnetwork/lnd/lncfg"
|
"github.com/lightningnetwork/lnd/lncfg"
|
||||||
"github.com/lightningnetwork/lnd/lnrpc"
|
"github.com/lightningnetwork/lnd/lnrpc"
|
||||||
@@ -56,6 +57,14 @@ const (
|
|||||||
// admin macaroon unless the administrator explicitly allowed it. Thus
|
// admin macaroon unless the administrator explicitly allowed it. Thus
|
||||||
// there's no harm allowing group read.
|
// there's no harm allowing group read.
|
||||||
adminMacaroonFilePermissions = 0640
|
adminMacaroonFilePermissions = 0640
|
||||||
|
|
||||||
|
// leaderResignTimeout is the timeout used when resigning from the
|
||||||
|
// leader role. This is kept short so LND can shut down quickly in case
|
||||||
|
// of a system failure or network partition making the cluster
|
||||||
|
// unresponsive. The cluster itself should ensure that the leader is not
|
||||||
|
// elected again until the previous leader has resigned or the leader
|
||||||
|
// election timeout has passed.
|
||||||
|
leaderResignTimeout = 5 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// AdminAuthOptions returns a list of DialOptions that can be used to
|
// AdminAuthOptions returns a list of DialOptions that can be used to
|
||||||
@@ -381,6 +390,7 @@ func Main(cfg *Config, lisCfg ListenerCfg, implCfg *ImplementationCfg,
|
|||||||
// blocked until this instance is elected as the current leader or
|
// blocked until this instance is elected as the current leader or
|
||||||
// shutting down.
|
// shutting down.
|
||||||
elected := false
|
elected := false
|
||||||
|
var leaderElector cluster.LeaderElector
|
||||||
if cfg.Cluster.EnableLeaderElection {
|
if cfg.Cluster.EnableLeaderElection {
|
||||||
electionCtx, cancelElection := context.WithCancel(ctx)
|
electionCtx, cancelElection := context.WithCancel(ctx)
|
||||||
|
|
||||||
@@ -392,7 +402,7 @@ func Main(cfg *Config, lisCfg ListenerCfg, implCfg *ImplementationCfg,
|
|||||||
ltndLog.Infof("Using %v leader elector",
|
ltndLog.Infof("Using %v leader elector",
|
||||||
cfg.Cluster.LeaderElector)
|
cfg.Cluster.LeaderElector)
|
||||||
|
|
||||||
leaderElector, err := cfg.Cluster.MakeLeaderElector(
|
leaderElector, err = cfg.Cluster.MakeLeaderElector(
|
||||||
electionCtx, cfg.DB,
|
electionCtx, cfg.DB,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -407,7 +417,17 @@ func Main(cfg *Config, lisCfg ListenerCfg, implCfg *ImplementationCfg,
|
|||||||
ltndLog.Infof("Attempting to resign from leader role "+
|
ltndLog.Infof("Attempting to resign from leader role "+
|
||||||
"(%v)", cfg.Cluster.ID)
|
"(%v)", cfg.Cluster.ID)
|
||||||
|
|
||||||
if err := leaderElector.Resign(); err != nil {
|
// Ensure that we don't block the shutdown process if
|
||||||
|
// the leader resigning process takes too long. The
|
||||||
|
// cluster will ensure that the leader is not elected
|
||||||
|
// again until the previous leader has resigned or the
|
||||||
|
// leader election timeout has passed.
|
||||||
|
timeoutCtx, cancel := context.WithTimeout(
|
||||||
|
ctx, leaderResignTimeout,
|
||||||
|
)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if err := leaderElector.Resign(timeoutCtx); err != nil {
|
||||||
ltndLog.Errorf("Leader elector failed to "+
|
ltndLog.Errorf("Leader elector failed to "+
|
||||||
"resign: %v", err)
|
"resign: %v", err)
|
||||||
}
|
}
|
||||||
@@ -579,7 +599,7 @@ func Main(cfg *Config, lisCfg ListenerCfg, implCfg *ImplementationCfg,
|
|||||||
server, err := newServer(
|
server, err := newServer(
|
||||||
cfg, cfg.Listeners, dbs, activeChainControl, &idKeyDesc,
|
cfg, cfg.Listeners, dbs, activeChainControl, &idKeyDesc,
|
||||||
activeChainControl.Cfg.WalletUnlockParams.ChansToRestore,
|
activeChainControl.Cfg.WalletUnlockParams.ChansToRestore,
|
||||||
multiAcceptor, torController, tlsManager,
|
multiAcceptor, torController, tlsManager, leaderElector,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return mkErr("unable to create server: %v", err)
|
return mkErr("unable to create server: %v", err)
|
||||||
|
@@ -315,6 +315,9 @@ func ExtraArgsEtcd(etcdCfg *etcd.Config, name string, cluster bool,
|
|||||||
leaderSessionTTL),
|
leaderSessionTTL),
|
||||||
}
|
}
|
||||||
extraArgs = append(extraArgs, clusterArgs...)
|
extraArgs = append(extraArgs, clusterArgs...)
|
||||||
|
extraArgs = append(
|
||||||
|
extraArgs, "--healthcheck.leader.interval=10s",
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
return extraArgs
|
return extraArgs
|
||||||
|
54
server.go
54
server.go
@@ -36,6 +36,7 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/channeldb/models"
|
"github.com/lightningnetwork/lnd/channeldb/models"
|
||||||
"github.com/lightningnetwork/lnd/channelnotifier"
|
"github.com/lightningnetwork/lnd/channelnotifier"
|
||||||
"github.com/lightningnetwork/lnd/clock"
|
"github.com/lightningnetwork/lnd/clock"
|
||||||
|
"github.com/lightningnetwork/lnd/cluster"
|
||||||
"github.com/lightningnetwork/lnd/contractcourt"
|
"github.com/lightningnetwork/lnd/contractcourt"
|
||||||
"github.com/lightningnetwork/lnd/discovery"
|
"github.com/lightningnetwork/lnd/discovery"
|
||||||
"github.com/lightningnetwork/lnd/feature"
|
"github.com/lightningnetwork/lnd/feature"
|
||||||
@@ -484,8 +485,8 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
|
|||||||
nodeKeyDesc *keychain.KeyDescriptor,
|
nodeKeyDesc *keychain.KeyDescriptor,
|
||||||
chansToRestore walletunlocker.ChannelsToRecover,
|
chansToRestore walletunlocker.ChannelsToRecover,
|
||||||
chanPredicate chanacceptor.ChannelAcceptor,
|
chanPredicate chanacceptor.ChannelAcceptor,
|
||||||
torController *tor.Controller, tlsManager *TLSManager) (*server,
|
torController *tor.Controller, tlsManager *TLSManager,
|
||||||
error) {
|
leaderElector cluster.LeaderElector) (*server, error) {
|
||||||
|
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
@@ -1674,7 +1675,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create liveness monitor.
|
// Create liveness monitor.
|
||||||
s.createLivenessMonitor(cfg, cc)
|
s.createLivenessMonitor(cfg, cc, leaderElector)
|
||||||
|
|
||||||
// Create the connection manager which will be responsible for
|
// Create the connection manager which will be responsible for
|
||||||
// maintaining persistent outbound connections and also accepting new
|
// maintaining persistent outbound connections and also accepting new
|
||||||
@@ -1721,7 +1722,9 @@ func (s *server) signAliasUpdate(u *lnwire.ChannelUpdate) (*ecdsa.Signature,
|
|||||||
//
|
//
|
||||||
// If a health check has been disabled by setting attempts to 0, our monitor
|
// If a health check has been disabled by setting attempts to 0, our monitor
|
||||||
// will not run it.
|
// will not run it.
|
||||||
func (s *server) createLivenessMonitor(cfg *Config, cc *chainreg.ChainControl) {
|
func (s *server) createLivenessMonitor(cfg *Config, cc *chainreg.ChainControl,
|
||||||
|
leaderElector cluster.LeaderElector) {
|
||||||
|
|
||||||
chainBackendAttempts := cfg.HealthChecks.ChainCheck.Attempts
|
chainBackendAttempts := cfg.HealthChecks.ChainCheck.Attempts
|
||||||
if cfg.Bitcoin.Node == "nochainbackend" {
|
if cfg.Bitcoin.Node == "nochainbackend" {
|
||||||
srvrLog.Info("Disabling chain backend checks for " +
|
srvrLog.Info("Disabling chain backend checks for " +
|
||||||
@@ -1837,6 +1840,49 @@ func (s *server) createLivenessMonitor(cfg *Config, cc *chainreg.ChainControl) {
|
|||||||
checks = append(checks, remoteSignerConnectionCheck)
|
checks = append(checks, remoteSignerConnectionCheck)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If we have a leader elector, we add a health check to ensure we are
|
||||||
|
// still the leader. During normal operation, we should always be the
|
||||||
|
// leader, but there are circumstances where this may change, such as
|
||||||
|
// when we lose network connectivity for long enough expiring out lease.
|
||||||
|
if leaderElector != nil {
|
||||||
|
leaderCheck := healthcheck.NewObservation(
|
||||||
|
"leader status",
|
||||||
|
func() error {
|
||||||
|
// Check if we are still the leader. Note that
|
||||||
|
// we don't need to use a timeout context here
|
||||||
|
// as the healthcheck observer will handle the
|
||||||
|
// timeout case for us.
|
||||||
|
timeoutCtx, cancel := context.WithTimeout(
|
||||||
|
context.Background(),
|
||||||
|
cfg.HealthChecks.LeaderCheck.Timeout,
|
||||||
|
)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
leader, err := leaderElector.IsLeader(
|
||||||
|
timeoutCtx,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to check if "+
|
||||||
|
"still leader: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !leader {
|
||||||
|
srvrLog.Debug("Not the current leader")
|
||||||
|
return fmt.Errorf("not the current " +
|
||||||
|
"leader")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
cfg.HealthChecks.LeaderCheck.Interval,
|
||||||
|
cfg.HealthChecks.LeaderCheck.Timeout,
|
||||||
|
cfg.HealthChecks.LeaderCheck.Backoff,
|
||||||
|
cfg.HealthChecks.LeaderCheck.Attempts,
|
||||||
|
)
|
||||||
|
|
||||||
|
checks = append(checks, leaderCheck)
|
||||||
|
}
|
||||||
|
|
||||||
// If we have not disabled all of our health checks, we create a
|
// If we have not disabled all of our health checks, we create a
|
||||||
// liveness monitor with our configured checks.
|
// liveness monitor with our configured checks.
|
||||||
s.livenessMonitor = healthcheck.NewMonitor(
|
s.livenessMonitor = healthcheck.NewMonitor(
|
||||||
|
Reference in New Issue
Block a user