Merge pull request #8385 from Roasbeef/ping-async-dc

peer: make PingManager disconnect call async
This commit is contained in:
Elle 2024-01-23 09:24:11 +02:00 committed by GitHub
commit 51de320d69
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 83 additions and 76 deletions

View File

@ -29,6 +29,11 @@
FilterKnownChanIDs](https://github.com/lightningnetwork/lnd/pull/8400) by
ensuring the `cacheMu` mutex is acquired before the main database lock.
* [Prevent](https://github.com/lightningnetwork/lnd/pull/8385) ping failures
from [deadlocking](https://github.com/lightningnetwork/lnd/issues/8379)
the peer connection.
# New Features
## Functional Enhancements
## RPC Additions
@ -50,5 +55,8 @@
## Tooling and Documentation
# Contributors (Alphabetical Order)
* Elle Mouton
* Keagan McClelland
* Olaoluwa Osuntokun
* ziggie1984

View File

@ -577,7 +577,7 @@ func NewBrontide(cfg Config) *Brontide {
eStr := "pong response failure for %s: %v " +
"-- disconnecting"
p.log.Warnf(eStr, p, err)
p.Disconnect(fmt.Errorf(eStr, p, err))
go p.Disconnect(fmt.Errorf(eStr, p, err))
},
})

View File

@ -13,7 +13,6 @@ import (
// PingManagerConfig is a structure containing various parameters that govern
// how the PingManager behaves.
type PingManagerConfig struct {
// NewPingPayload is a closure that returns the payload to be packaged
// in the Ping message.
NewPingPayload func() []byte
@ -99,16 +98,20 @@ func NewPingManager(cfg *PingManagerConfig) *PingManager {
func (m *PingManager) Start() error {
var err error
m.started.Do(func() {
err = m.start()
m.pingTicker = time.NewTicker(m.cfg.IntervalDuration)
m.pingTimeout = time.NewTimer(0)
m.wg.Add(1)
go m.pingHandler()
})
return err
}
func (m *PingManager) start() error {
m.pingTicker = time.NewTicker(m.cfg.IntervalDuration)
m.pingTimeout = time.NewTimer(0)
// pingHandler is the main goroutine responsible for enforcing the ping/pong
// protocol.
func (m *PingManager) pingHandler() {
defer m.wg.Done()
defer m.pingTimeout.Stop()
// Ensure that the pingTimeout channel is empty.
@ -116,85 +119,81 @@ func (m *PingManager) start() error {
<-m.pingTimeout.C
}
m.wg.Add(1)
go func() {
defer m.wg.Done()
for {
select {
case <-m.pingTicker.C:
// If this occurs it means that the new ping
// cycle has begun while there is still an
// outstanding ping awaiting a pong response.
// This should never occur, but if it does, it
// implies a timeout.
if m.outstandingPongSize >= 0 {
e := errors.New("impossible: new ping" +
"in unclean state",
)
m.cfg.OnPongFailure(e)
return
}
pongSize := m.cfg.NewPongSize()
ping := &lnwire.Ping{
NumPongBytes: pongSize,
PaddingBytes: m.cfg.NewPingPayload(),
}
// Set up our bookkeeping for the new Ping.
if err := m.setPingState(pongSize); err != nil {
m.cfg.OnPongFailure(err)
return
}
m.cfg.SendPing(ping)
case <-m.pingTimeout.C:
m.resetPingState()
e := errors.New("timeout while waiting for " +
"pong response",
for {
select {
case <-m.pingTicker.C:
// If this occurs it means that the new ping cycle has
// begun while there is still an outstanding ping
// awaiting a pong response. This should never occur,
// but if it does, it implies a timeout.
if m.outstandingPongSize >= 0 {
e := errors.New("impossible: new ping" +
"in unclean state",
)
m.cfg.OnPongFailure(e)
return
}
case pong := <-m.pongChan:
pongSize := int32(len(pong.PongBytes))
pongSize := m.cfg.NewPongSize()
ping := &lnwire.Ping{
NumPongBytes: pongSize,
PaddingBytes: m.cfg.NewPingPayload(),
}
// Save off values we are about to override
// when we call resetPingState.
expected := m.outstandingPongSize
lastPing := m.pingLastSend
// Set up our bookkeeping for the new Ping.
if err := m.setPingState(pongSize); err != nil {
m.cfg.OnPongFailure(err)
m.resetPingState()
// If the pong we receive doesn't match the
// ping we sent out, then we fail out.
if pongSize != expected {
e := errors.New("pong response does " +
"not match expected size",
)
m.cfg.OnPongFailure(e)
return
}
// Compute RTT of ping and save that for future
// querying.
if lastPing != nil {
rtt := time.Since(*lastPing)
m.pingTime.Store(&rtt)
}
case <-m.quit:
return
}
}
}()
return nil
m.cfg.SendPing(ping)
case <-m.pingTimeout.C:
m.resetPingState()
e := errors.New("timeout while waiting for " +
"pong response",
)
m.cfg.OnPongFailure(e)
return
case pong := <-m.pongChan:
pongSize := int32(len(pong.PongBytes))
// Save off values we are about to override when we
// call resetPingState.
expected := m.outstandingPongSize
lastPing := m.pingLastSend
m.resetPingState()
// If the pong we receive doesn't match the ping we
// sent out, then we fail out.
if pongSize != expected {
e := errors.New("pong response does " +
"not match expected size",
)
m.cfg.OnPongFailure(e)
return
}
// Compute RTT of ping and save that for future
// querying.
if lastPing != nil {
rtt := time.Since(*lastPing)
m.pingTime.Store(&rtt)
}
case <-m.quit:
return
}
}
}
// Stop interrupts the goroutines that the PingManager owns. Can only be called