peer+lnd: fix peer blocking on node shutdown

This commit fixes a case where the peer blocks the shutdown process.
During the shutdown, the server will call `s.DisconnectPeer`, which
calls `peer.Disconnect`. Because the peer never enters `peer.Start` via
`s.peerInitializer`, the `startReady` chan will not be closed, causing
`peer.Disconnect` to hang forever. We now fix it by only block on
`startReady` when the peer is started.
This commit is contained in:
yyforyongyu 2024-11-16 10:54:22 +08:00
parent 95b248a1ef
commit fe03aa0201
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868
2 changed files with 34 additions and 10 deletions

View File

@ -1467,10 +1467,18 @@ func (p *Brontide) Disconnect(reason error) {
// Make sure initialization has completed before we try to tear things
// down.
select {
case <-p.startReady:
case <-p.quit:
return
//
// NOTE: We only read the `startReady` chan if the peer has been
// started, otherwise we will skip reading it as this chan won't be
// closed, hence blocks forever.
if atomic.LoadInt32(&p.started) == 1 {
p.log.Debugf("Started, waiting on startReady signal")
select {
case <-p.startReady:
case <-p.quit:
return
}
}
err := fmt.Errorf("disconnecting %s, reason: %v", p, reason)

View File

@ -196,7 +196,15 @@ type server struct {
// to handle dynamic IP changes.
lastDetectedIP net.IP
mu sync.RWMutex
mu sync.RWMutex
// peersByPub is a map of the active peers.
//
// NOTE: The key used here is the raw bytes of the peer's public key to
// string conversion, which means it cannot be printed using `%s` as it
// will just print the binary.
//
// TODO(yy): Use the hex string instead.
peersByPub map[string]*peer.Brontide
inboundPeers map[string]*peer.Brontide
@ -4215,9 +4223,14 @@ func (s *server) addPeer(p *peer.Brontide) {
return
}
pubBytes := p.IdentityKey().SerializeCompressed()
// Ignore new peers if we're shutting down.
if s.Stopped() {
srvrLog.Infof("Server stopped, skipped adding peer=%x",
pubBytes)
p.Disconnect(ErrServerShuttingDown)
return
}
@ -4226,8 +4239,9 @@ func (s *server) addPeer(p *peer.Brontide) {
// TODO(roasbeef): pipe all requests through to the
// queryHandler/peerManager
pubSer := p.IdentityKey().SerializeCompressed()
pubStr := string(pubSer)
// NOTE: This pubStr is a raw bytes to string conversion and will NOT
// be human-readable.
pubStr := string(pubBytes)
s.peersByPub[pubStr] = p
@ -4240,7 +4254,7 @@ func (s *server) addPeer(p *peer.Brontide) {
// Inform the peer notifier of a peer online event so that it can be reported
// to clients listening for peer events.
var pubKey [33]byte
copy(pubKey[:], pubSer)
copy(pubKey[:], pubBytes)
s.peerNotifier.NotifyPeerOnline(pubKey)
}
@ -4257,8 +4271,12 @@ func (s *server) addPeer(p *peer.Brontide) {
func (s *server) peerInitializer(p *peer.Brontide) {
defer s.wg.Done()
pubBytes := p.IdentityKey().SerializeCompressed()
// Avoid initializing peers while the server is exiting.
if s.Stopped() {
srvrLog.Infof("Server stopped, skipped initializing peer=%x",
pubBytes)
return
}
@ -4276,8 +4294,6 @@ func (s *server) peerInitializer(p *peer.Brontide) {
s.wg.Add(1)
go s.peerTerminationWatcher(p, ready)
pubBytes := p.IdentityKey().SerializeCompressed()
// Start the peer! If an error occurs, we Disconnect the peer, which
// will unblock the peerTerminationWatcher.
if err := p.Start(); err != nil {