diff --git a/config.go b/config.go index 824bcc55f..a0bfe02b7 100644 --- a/config.go +++ b/config.go @@ -251,6 +251,11 @@ const ( defaultPrunedNodeMaxPeers = 4 defaultNeutrinoMaxPeers = 8 + + // defaultNoDisconnectOnPongFailure is the default value for whether we + // should *not* disconnect from a peer if we don't receive a pong + // response in time after we send a ping. + defaultNoDisconnectOnPongFailure = false ) var ( @@ -527,6 +532,10 @@ type Config struct { // NumRestrictedSlots is the number of restricted slots we'll allocate // in the server. NumRestrictedSlots uint64 `long:"num-restricted-slots" description:"The number of restricted slots we'll allocate in the server."` + + // NoDisconnectOnPongFailure controls if we'll disconnect if a peer + // doesn't respond to a pong in time. + NoDisconnectOnPongFailure bool `long:"no-disconnect-on-pong-failure" description:"If true, a peer will *not* be disconnected if a pong is not received in time or is mismatched. Defaults to false, meaning peers *will* be disconnected on pong failure."` } // GRPCConfig holds the configuration options for the gRPC server. @@ -747,10 +756,11 @@ func DefaultConfig() Config { ServerPingTimeout: defaultGrpcServerPingTimeout, ClientPingMinWait: defaultGrpcClientPingMinWait, }, - LogConfig: build.DefaultLogConfig(), - WtClient: lncfg.DefaultWtClientCfg(), - HTTPHeaderTimeout: DefaultHTTPHeaderTimeout, - NumRestrictedSlots: DefaultNumRestrictedSlots, + LogConfig: build.DefaultLogConfig(), + WtClient: lncfg.DefaultWtClientCfg(), + HTTPHeaderTimeout: DefaultHTTPHeaderTimeout, + NumRestrictedSlots: DefaultNumRestrictedSlots, + NoDisconnectOnPongFailure: defaultNoDisconnectOnPongFailure, } } diff --git a/peer/brontide.go b/peer/brontide.go index bfc603ae8..da4aa610a 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -94,7 +94,7 @@ const ( torTimeoutMultiplier = 3 // msgStreamSize is the size of the message streams. - msgStreamSize = 5 + msgStreamSize = 50 ) var ( @@ -455,6 +455,10 @@ type Config struct { // experimental endorsement signals should be set. ShouldFwdExpEndorsement func() bool + // NoDisconnectOnPongFailure indicates whether the peer should *not* be + // disconnected if a pong is not received in time or is mismatched. + NoDisconnectOnPongFailure bool + // Quit is the server's quit channel. If this is closed, we halt operation. Quit chan struct{} } @@ -735,11 +739,27 @@ func NewBrontide(cfg Config) *Brontide { SendPing: func(ping *lnwire.Ping) { p.queueMsg(ping, nil) }, - OnPongFailure: func(err error) { - eStr := "pong response failure for %s: %v " + - "-- disconnecting" - p.log.Warnf(eStr, p, err) - go p.Disconnect(fmt.Errorf(eStr, p, err)) + OnPongFailure: func(reason error, + timeWaitedForPong time.Duration, + lastKnownRTT time.Duration) { + + logMsg := fmt.Sprintf("pong response "+ + "failure for %s: %v. Time waited for this "+ + "pong: %v. Last successful RTT: %v.", + p, reason, timeWaitedForPong, lastKnownRTT) + + // If NoDisconnectOnPongFailure is true, we don't + // disconnect. Otherwise (if it's false, the default), + // we disconnect. + if p.cfg.NoDisconnectOnPongFailure { + p.log.Warnf("%s -- not disconnecting "+ + "due to config", logMsg) + return + } + + p.log.Warnf("%s -- disconnecting", logMsg) + + go p.Disconnect(fmt.Errorf("pong failure: %w", reason)) }, }) diff --git a/peer/ping_manager.go b/peer/ping_manager.go index f5c6180be..7686cbe8b 100644 --- a/peer/ping_manager.go +++ b/peer/ping_manager.go @@ -7,6 +7,7 @@ import ( "sync/atomic" "time" + "github.com/lightningnetwork/lnd/fn/v2" "github.com/lightningnetwork/lnd/lnwire" ) @@ -36,7 +37,8 @@ type PingManagerConfig struct { // OnPongFailure is a closure that is responsible for executing the // logic when a Pong message is either late or does not match our // expectations for that Pong - OnPongFailure func(error) + OnPongFailure func(failureReason error, timeWaitedForPong time.Duration, + lastKnownRTT time.Duration) } // PingManager is a structure that is designed to manage the internal state @@ -108,6 +110,26 @@ func (m *PingManager) Start() error { return err } +// getLastRTT safely retrieves the last known RTT, returning 0 if none exists. +func (m *PingManager) getLastRTT() time.Duration { + rttPtr := m.pingTime.Load() + if rttPtr == nil { + return 0 + } + + return *rttPtr +} + +// pendingPingWait calculates the time waited since the last ping was sent. If +// no ping time is reported, None is returned. defaultDuration. +func (m *PingManager) pendingPingWait() fn.Option[time.Duration] { + if m.pingLastSend != nil { + return fn.Some(time.Since(*m.pingLastSend)) + } + + return fn.None[time.Duration]() +} + // pingHandler is the main goroutine responsible for enforcing the ping/pong // protocol. func (m *PingManager) pingHandler() { @@ -119,6 +141,10 @@ func (m *PingManager) pingHandler() { <-m.pingTimeout.C } + // Because we don't know if the OnPingFailure callback actually + // disconnects a peer (dependent on user config), we should never return + // from this loop unless the ping manager is stopped explicitly (which + // happens on disconnect). for { select { case <-m.pingTicker.C: @@ -127,12 +153,20 @@ func (m *PingManager) pingHandler() { // awaiting a pong response. This should never occur, // but if it does, it implies a timeout. if m.outstandingPongSize >= 0 { - e := errors.New("impossible: new ping" + - "in unclean state", + // Ping was outstanding, meaning it timed out by + // the arrival of the next ping interval. + timeWaited := m.pendingPingWait().UnwrapOr( + m.cfg.IntervalDuration, ) - m.cfg.OnPongFailure(e) + lastRTT := m.getLastRTT() - return + m.cfg.OnPongFailure( + errors.New("ping timed "+ + "out by next interval"), + timeWaited, lastRTT, + ) + + m.resetPingState() } pongSize := m.cfg.NewPongSize() @@ -143,52 +177,66 @@ func (m *PingManager) pingHandler() { // Set up our bookkeeping for the new Ping. if err := m.setPingState(pongSize); err != nil { - m.cfg.OnPongFailure(err) + // This is an internal error related to timer + // reset. Pass it to OnPongFailure as it's + // critical. Current and last RTT are not + // directly applicable here. + m.cfg.OnPongFailure(err, 0, 0) - return + m.resetPingState() + + continue } m.cfg.SendPing(ping) case <-m.pingTimeout.C: - m.resetPingState() + timeWaited := m.pendingPingWait().UnwrapOr( + m.cfg.TimeoutDuration, + ) + lastRTT := m.getLastRTT() - e := errors.New("timeout while waiting for " + - "pong response", + m.cfg.OnPongFailure( + errors.New("timeout while waiting for "+ + "pong response"), + timeWaited, lastRTT, ) - m.cfg.OnPongFailure(e) - - return + m.resetPingState() case pong := <-m.pongChan: pongSize := int32(len(pong.PongBytes)) - // Save off values we are about to override when we - // call resetPingState. + // Save off values we are about to override when we call + // resetPingState. expected := m.outstandingPongSize - lastPing := m.pingLastSend + lastPingTime := m.pingLastSend - m.resetPingState() + // This is an unexpected pong, we'll continue. + if lastPingTime == nil { + continue + } - // If the pong we receive doesn't match the ping we - // sent out, then we fail out. + actualRTT := time.Since(*lastPingTime) + + // If the pong we receive doesn't match the ping we sent + // out, then we fail out. if pongSize != expected { - e := errors.New("pong response does " + - "not match expected size", - ) + e := fmt.Errorf("pong response does not match "+ + "expected size. Expected: %d, Got: %d", + expected, pongSize) - m.cfg.OnPongFailure(e) + lastRTT := m.getLastRTT() + m.cfg.OnPongFailure(e, actualRTT, lastRTT) - return + m.resetPingState() + + continue } - // Compute RTT of ping and save that for future - // querying. - if lastPing != nil { - rtt := time.Since(*lastPing) - m.pingTime.Store(&rtt) - } + // Pong is good, update RTT and reset state. + m.pingTime.Store(&actualRTT) + m.resetPingState() case <-m.quit: return @@ -231,6 +279,7 @@ func (m *PingManager) setPingState(pongSize uint16) error { func (m *PingManager) resetPingState() { m.pingLastSend = nil m.outstandingPongSize = -1 + if !m.pingTimeout.Stop() { select { case <-m.pingTimeout.C: diff --git a/peer/ping_manager_test.go b/peer/ping_manager_test.go index 0f4c3be49..d0d96bb94 100644 --- a/peer/ping_manager_test.go +++ b/peer/ping_manager_test.go @@ -1,6 +1,7 @@ package peer import ( + "sync" "testing" "time" @@ -23,19 +24,19 @@ func TestPingManager(t *testing.T) { result bool }{ { - name: "Happy Path", + name: "happy Path", delay: 0, pongSize: 4, result: true, }, { - name: "Bad Pong", + name: "bad Pong", delay: 0, pongSize: 3, result: false, }, { - name: "Timeout", + name: "timeout", delay: 2, pongSize: 4, result: false, @@ -44,45 +45,56 @@ func TestPingManager(t *testing.T) { payload := make([]byte, 4) for _, test := range testCases { - // Set up PingManager. - pingSent := make(chan struct{}) - disconnected := make(chan struct{}) - mgr := NewPingManager(&PingManagerConfig{ - NewPingPayload: func() []byte { - return payload - }, - NewPongSize: func() uint16 { - return 4 - }, - IntervalDuration: time.Second * 2, - TimeoutDuration: time.Second, - SendPing: func(ping *lnwire.Ping) { - close(pingSent) - }, - OnPongFailure: func(err error) { - close(disconnected) - }, + t.Run(test.name, func(t *testing.T) { + // Set up PingManager. + var pingOnce sync.Once + pingSent := make(chan struct{}) + disconnected := make(chan struct{}) + mgr := NewPingManager(&PingManagerConfig{ + NewPingPayload: func() []byte { + return payload + }, + NewPongSize: func() uint16 { + return 4 + }, + IntervalDuration: time.Second * 2, + TimeoutDuration: time.Second, + SendPing: func(ping *lnwire.Ping) { + pingOnce.Do(func() { + close(pingSent) + }) + }, + OnPongFailure: func(err error, + _ time.Duration, _ time.Duration) { + + close(disconnected) + }, + }) + require.NoError( + t, mgr.Start(), "Could not start pingManager", + ) + + // Wait for initial Ping. + <-pingSent + + // Wait for pre-determined time before sending Pong + // response. + time.Sleep(time.Duration(test.delay) * time.Second) + + // Send Pong back. + res := lnwire.Pong{ + PongBytes: make([]byte, test.pongSize), + } + mgr.ReceivedPong(&res) + + select { + case <-time.NewTimer(time.Second / 2).C: + require.True(t, test.result) + case <-disconnected: + require.False(t, test.result) + } + + mgr.Stop() }) - require.NoError(t, mgr.Start(), "Could not start pingManager") - - // Wait for initial Ping. - <-pingSent - - // Wait for pre-determined time before sending Pong response. - time.Sleep(time.Duration(test.delay) * time.Second) - - // Send Pong back. - res := lnwire.Pong{PongBytes: make([]byte, test.pongSize)} - mgr.ReceivedPong(&res) - - // Evaluate result - select { - case <-time.NewTimer(time.Second / 2).C: - require.True(t, test.result) - case <-disconnected: - require.False(t, test.result) - } - - mgr.Stop() } } diff --git a/sample-lnd.conf b/sample-lnd.conf index af4923846..ae929d8e6 100644 --- a/sample-lnd.conf +++ b/sample-lnd.conf @@ -569,6 +569,11 @@ ; The number of restricted slots the server will allocate for peers. ; num-restricted-slots=30 +; If true, a peer will *not* be disconnected if a pong is not received in time +; or is mismatched. Defaults to false, meaning peers *will* be disconnected on +; pong failure. +; no-disconnect-on-pong-failure=false + [fee] ; Optional URL for external fee estimation. If no URL is specified, the method diff --git a/server.go b/server.go index e816c3ca4..3a077a28f 100644 --- a/server.go +++ b/server.go @@ -4493,6 +4493,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, EndorsementExperimentEnd, ) }, + NoDisconnectOnPongFailure: s.cfg.NoDisconnectOnPongFailure, } copy(pCfg.PubKeyBytes[:], peerAddr.IdentityKey.SerializeCompressed())