mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-09-06 17:47:01 +02:00
In this commit, we add a new CLI option to control if we D/C on slow pongs or not. Due to the existence of head-of-the-line blocking at various levels of abstraction (app buffer, slow processing, TCP kernel buffers, etc), if there's a flurry of gossip messages (eg: 1K channel updates), then even with a reasonable processing latency, a peer may still not read our ping in time. To give users another option, we add a flag that allows users to disable this behavior. The default remains.
311 lines
7.9 KiB
Go
311 lines
7.9 KiB
Go
package peer
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/lightningnetwork/lnd/fn/v2"
|
|
"github.com/lightningnetwork/lnd/lnwire"
|
|
)
|
|
|
|
// PingManagerConfig is a structure containing various parameters that govern
|
|
// how the PingManager behaves.
|
|
type PingManagerConfig struct {
|
|
// NewPingPayload is a closure that returns the payload to be packaged
|
|
// in the Ping message.
|
|
NewPingPayload func() []byte
|
|
|
|
// NewPongSize is a closure that returns a random value between
|
|
// [0, lnwire.MaxPongBytes]. This random value helps to more effectively
|
|
// pair Pong messages with Ping.
|
|
NewPongSize func() uint16
|
|
|
|
// IntervalDuration is the Duration between attempted pings.
|
|
IntervalDuration time.Duration
|
|
|
|
// TimeoutDuration is the Duration we wait before declaring a ping
|
|
// attempt failed.
|
|
TimeoutDuration time.Duration
|
|
|
|
// SendPing is a closure that is responsible for sending the Ping
|
|
// message out to our peer
|
|
SendPing func(ping *lnwire.Ping)
|
|
|
|
// OnPongFailure is a closure that is responsible for executing the
|
|
// logic when a Pong message is either late or does not match our
|
|
// expectations for that Pong
|
|
OnPongFailure func(failureReason error, timeWaitedForPong time.Duration,
|
|
lastKnownRTT time.Duration)
|
|
}
|
|
|
|
// PingManager is a structure that is designed to manage the internal state
|
|
// of the ping pong lifecycle with the remote peer. We assume there is only one
|
|
// ping outstanding at once.
|
|
//
|
|
// NOTE: This structure MUST be initialized with NewPingManager.
|
|
type PingManager struct {
|
|
cfg *PingManagerConfig
|
|
|
|
// pingTime is a rough estimate of the RTT (round-trip-time) between us
|
|
// and the connected peer.
|
|
// To be used atomically.
|
|
// TODO(roasbeef): also use a WMA or EMA?
|
|
pingTime atomic.Pointer[time.Duration]
|
|
|
|
// pingLastSend is the time when we sent our last ping message.
|
|
// To be used atomically.
|
|
pingLastSend *time.Time
|
|
|
|
// outstandingPongSize is the current size of the requested pong
|
|
// payload. This value can only validly range from [0,65531]. Any
|
|
// value < 0 is interpreted as if there is no outstanding ping message.
|
|
outstandingPongSize int32
|
|
|
|
// pingTicker is a pointer to a Ticker that fires on every ping
|
|
// interval.
|
|
pingTicker *time.Ticker
|
|
|
|
// pingTimeout is a Timer that will fire when we want to time out a
|
|
// ping
|
|
pingTimeout *time.Timer
|
|
|
|
// pongChan is the channel on which the pingManager will write Pong
|
|
// messages it is evaluating
|
|
pongChan chan *lnwire.Pong
|
|
|
|
started sync.Once
|
|
stopped sync.Once
|
|
|
|
quit chan struct{}
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// NewPingManager constructs a pingManager in a valid state. It must be started
|
|
// before it does anything useful, though.
|
|
func NewPingManager(cfg *PingManagerConfig) *PingManager {
|
|
m := PingManager{
|
|
cfg: cfg,
|
|
outstandingPongSize: -1,
|
|
pongChan: make(chan *lnwire.Pong, 1),
|
|
quit: make(chan struct{}),
|
|
}
|
|
|
|
return &m
|
|
}
|
|
|
|
// Start launches the primary goroutine that is owned by the pingManager.
|
|
func (m *PingManager) Start() error {
|
|
var err error
|
|
m.started.Do(func() {
|
|
m.pingTicker = time.NewTicker(m.cfg.IntervalDuration)
|
|
m.pingTimeout = time.NewTimer(0)
|
|
|
|
m.wg.Add(1)
|
|
go m.pingHandler()
|
|
})
|
|
|
|
return err
|
|
}
|
|
|
|
// getLastRTT safely retrieves the last known RTT, returning 0 if none exists.
|
|
func (m *PingManager) getLastRTT() time.Duration {
|
|
rttPtr := m.pingTime.Load()
|
|
if rttPtr == nil {
|
|
return 0
|
|
}
|
|
|
|
return *rttPtr
|
|
}
|
|
|
|
// pendingPingWait calculates the time waited since the last ping was sent. If
|
|
// no ping time is reported, None is returned. defaultDuration.
|
|
func (m *PingManager) pendingPingWait() fn.Option[time.Duration] {
|
|
if m.pingLastSend != nil {
|
|
return fn.Some(time.Since(*m.pingLastSend))
|
|
}
|
|
|
|
return fn.None[time.Duration]()
|
|
}
|
|
|
|
// pingHandler is the main goroutine responsible for enforcing the ping/pong
|
|
// protocol.
|
|
func (m *PingManager) pingHandler() {
|
|
defer m.wg.Done()
|
|
defer m.pingTimeout.Stop()
|
|
|
|
// Ensure that the pingTimeout channel is empty.
|
|
if !m.pingTimeout.Stop() {
|
|
<-m.pingTimeout.C
|
|
}
|
|
|
|
// Because we don't know if the OnPingFailure callback actually
|
|
// disconnects a peer (dependent on user config), we should never return
|
|
// from this loop unless the ping manager is stopped explicitly (which
|
|
// happens on disconnect).
|
|
for {
|
|
select {
|
|
case <-m.pingTicker.C:
|
|
// If this occurs it means that the new ping cycle has
|
|
// begun while there is still an outstanding ping
|
|
// awaiting a pong response. This should never occur,
|
|
// but if it does, it implies a timeout.
|
|
if m.outstandingPongSize >= 0 {
|
|
// Ping was outstanding, meaning it timed out by
|
|
// the arrival of the next ping interval.
|
|
timeWaited := m.pendingPingWait().UnwrapOr(
|
|
m.cfg.IntervalDuration,
|
|
)
|
|
lastRTT := m.getLastRTT()
|
|
|
|
m.cfg.OnPongFailure(
|
|
errors.New("ping timed "+
|
|
"out by next interval"),
|
|
timeWaited, lastRTT,
|
|
)
|
|
|
|
m.resetPingState()
|
|
}
|
|
|
|
pongSize := m.cfg.NewPongSize()
|
|
ping := &lnwire.Ping{
|
|
NumPongBytes: pongSize,
|
|
PaddingBytes: m.cfg.NewPingPayload(),
|
|
}
|
|
|
|
// Set up our bookkeeping for the new Ping.
|
|
if err := m.setPingState(pongSize); err != nil {
|
|
// This is an internal error related to timer
|
|
// reset. Pass it to OnPongFailure as it's
|
|
// critical. Current and last RTT are not
|
|
// directly applicable here.
|
|
m.cfg.OnPongFailure(err, 0, 0)
|
|
|
|
m.resetPingState()
|
|
|
|
continue
|
|
}
|
|
|
|
m.cfg.SendPing(ping)
|
|
|
|
case <-m.pingTimeout.C:
|
|
timeWaited := m.pendingPingWait().UnwrapOr(
|
|
m.cfg.TimeoutDuration,
|
|
)
|
|
lastRTT := m.getLastRTT()
|
|
|
|
m.cfg.OnPongFailure(
|
|
errors.New("timeout while waiting for "+
|
|
"pong response"),
|
|
timeWaited, lastRTT,
|
|
)
|
|
|
|
m.resetPingState()
|
|
|
|
case pong := <-m.pongChan:
|
|
pongSize := int32(len(pong.PongBytes))
|
|
|
|
// Save off values we are about to override when we call
|
|
// resetPingState.
|
|
expected := m.outstandingPongSize
|
|
lastPingTime := m.pingLastSend
|
|
|
|
// This is an unexpected pong, we'll continue.
|
|
if lastPingTime == nil {
|
|
continue
|
|
}
|
|
|
|
actualRTT := time.Since(*lastPingTime)
|
|
|
|
// If the pong we receive doesn't match the ping we sent
|
|
// out, then we fail out.
|
|
if pongSize != expected {
|
|
e := fmt.Errorf("pong response does not match "+
|
|
"expected size. Expected: %d, Got: %d",
|
|
expected, pongSize)
|
|
|
|
lastRTT := m.getLastRTT()
|
|
m.cfg.OnPongFailure(e, actualRTT, lastRTT)
|
|
|
|
m.resetPingState()
|
|
|
|
continue
|
|
}
|
|
|
|
// Pong is good, update RTT and reset state.
|
|
m.pingTime.Store(&actualRTT)
|
|
m.resetPingState()
|
|
|
|
case <-m.quit:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Stop interrupts the goroutines that the PingManager owns.
|
|
func (m *PingManager) Stop() {
|
|
if m.pingTicker == nil {
|
|
return
|
|
}
|
|
|
|
m.stopped.Do(func() {
|
|
close(m.quit)
|
|
m.wg.Wait()
|
|
|
|
m.pingTicker.Stop()
|
|
m.pingTimeout.Stop()
|
|
})
|
|
}
|
|
|
|
// setPingState is a private method to keep track of all of the fields we need
|
|
// to set when we send out a Ping.
|
|
func (m *PingManager) setPingState(pongSize uint16) error {
|
|
t := time.Now()
|
|
m.pingLastSend = &t
|
|
m.outstandingPongSize = int32(pongSize)
|
|
if m.pingTimeout.Reset(m.cfg.TimeoutDuration) {
|
|
return fmt.Errorf(
|
|
"impossible: ping timeout reset when already active",
|
|
)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// resetPingState is a private method that resets all of the bookkeeping that
|
|
// is tracking a currently outstanding Ping.
|
|
func (m *PingManager) resetPingState() {
|
|
m.pingLastSend = nil
|
|
m.outstandingPongSize = -1
|
|
|
|
if !m.pingTimeout.Stop() {
|
|
select {
|
|
case <-m.pingTimeout.C:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
// GetPingTimeMicroSeconds reports back the RTT calculated by the pingManager.
|
|
func (m *PingManager) GetPingTimeMicroSeconds() int64 {
|
|
rtt := m.pingTime.Load()
|
|
|
|
if rtt == nil {
|
|
return -1
|
|
}
|
|
|
|
return rtt.Microseconds()
|
|
}
|
|
|
|
// ReceivedPong is called to evaluate a Pong message against the expectations
|
|
// we have for it. It will cause the PingManager to invoke the supplied
|
|
// OnPongFailure function if the Pong argument supplied violates expectations.
|
|
func (m *PingManager) ReceivedPong(msg *lnwire.Pong) {
|
|
select {
|
|
case m.pongChan <- msg:
|
|
case <-m.quit:
|
|
}
|
|
}
|