From 99150b41d6377981a65bad48e44c9fb022f1a704 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 26 Mar 2019 16:40:07 -0700 Subject: [PATCH 1/6] peer: retry writes with delay on timeout errors This commit modifies the writeHandler to catch timeout errors, and retry writes to the socket after a small backoff, which increases exponentially from 5s to 1m. With the growing channel graph size, some lower-powered devices can be slow to pull messages off the wire during validation. The current behavior will cause us to disconnect the peer, and resend all of the messages that the remote peer is slow to validate. Catching the timeout helps in preventing such expensive reconnection cycles, especially as the network continues to grow. This is also a preliminary step to reducing the write timeout constant. This will allow concurrent usage of the write pools w/out devoting excessive amounts of time blocking the pool for slow peers. --- peer.go | 65 +++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 59 insertions(+), 6 deletions(-) diff --git a/peer.go b/peer.go index f1fe9490f..84cfe9ea8 100644 --- a/peer.go +++ b/peer.go @@ -1429,27 +1429,80 @@ func (p *peer) writeMessage(msg lnwire.Message) error { func (p *peer) writeHandler() { var exitErr error + const ( + minRetryDelay = 5 * time.Second + maxRetryDelay = time.Minute + ) + out: for { select { case outMsg := <-p.sendQueue: - switch outMsg.msg.(type) { + // Record the time at which we first attempt to send the + // message. + startTime := time.Now() + + // Initialize a retry delay of zero, which will be + // increased if we encounter a write timeout on the + // send. + var retryDelay time.Duration + retryWithDelay: + if retryDelay > 0 { + select { + case <-time.After(retryDelay): + case <-p.quit: + // Inform synchronous writes that the + // peer is exiting. + if outMsg.errChan != nil { + outMsg.errChan <- ErrPeerExiting + } + exitErr = ErrPeerExiting + break out + } + } + // If we're about to send a ping message, then log the // exact time in which we send the message so we can // use the delay as a rough estimate of latency to the // remote peer. - case *lnwire.Ping: + if _, ok := outMsg.msg.(*lnwire.Ping); ok { // TODO(roasbeef): do this before the write? // possibly account for processing within func? now := time.Now().UnixNano() atomic.StoreInt64(&p.pingLastSend, now) } - // Write out the message to the socket, responding with - // error if `errChan` is non-nil. The `errChan` allows - // callers to optionally synchronize sends with the - // writeHandler. + // Write out the message to the socket. If a timeout + // error is encountered, we will catch this and retry + // after backing off in case the remote peer is just + // slow to process messages from the wire. err := p.writeMessage(outMsg.msg) + if nerr, ok := err.(net.Error); ok && nerr.Timeout() { + // Increase the retry delay in the event of a + // timeout error, this prevents us from + // disconnecting if the remote party is slow to + // pull messages off the wire. We back off + // exponentially up to our max delay to prevent + // blocking the write pool. + if retryDelay == 0 { + retryDelay = minRetryDelay + } else { + retryDelay *= 2 + if retryDelay > maxRetryDelay { + retryDelay = maxRetryDelay + } + } + + peerLog.Debugf("Write timeout detected for "+ + "peer %s, retrying after %v, "+ + "first attempted %v ago", p, retryDelay, + time.Since(startTime)) + + goto retryWithDelay + } + + // If the peer requested a synchronous write, respond + // with the error. if outMsg.errChan != nil { outMsg.errChan <- err } From b78e5f67428e31e950b44f1d2332787871165d1d Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 26 Mar 2019 16:40:24 -0700 Subject: [PATCH 2/6] peer: reduce write timeout to 5 seconds This commit reduces the peer's write timeout to 5s. Now that the peer catches write timeouts and doesn't disconnect, this will ensure we spend less time blocking in the write pool in case others also need to access the workers concurrently. Slower peers will now only block for 5s, after every reattempt w/ exponential backoff. --- peer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/peer.go b/peer.go index 84cfe9ea8..549e94a39 100644 --- a/peer.go +++ b/peer.go @@ -45,7 +45,7 @@ const ( idleTimeout = 5 * time.Minute // writeMessageTimeout is the timeout used when writing a message to peer. - writeMessageTimeout = 50 * time.Second + writeMessageTimeout = 5 * time.Second // readMessageTimeout is the timeout used when reading a message from a // peer. From 93e56f9ee85ab553fc93f03fe28baa063b4966ff Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 26 Mar 2019 16:40:41 -0700 Subject: [PATCH 3/6] peer: add symmetric write idle timeout In this commit, we add a 5 minute idle timer to the write handler. After catching the write timeouts, it's been observed that some connections have trouble reading a message for several hours. This typically points to a deeper issue w/ the peer or, e.g. the remote peer switched networks. This now mirrors the idle timeout used in the read handler, such that we will disconnect a peer if we are unable to send or receive a message from the peer after 5 minutes. We also modify the readHandler to drain its idleTimer's channel in the even that the timer had already fired, but we successfully sent the message. --- peer.go | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/peer.go b/peer.go index 549e94a39..70b409226 100644 --- a/peer.go +++ b/peer.go @@ -1005,7 +1005,12 @@ func (p *peer) readHandler() { out: for atomic.LoadInt32(&p.disconnect) == 0 { nextMsg, err := p.readNextMessage() - idleTimer.Stop() + if !idleTimer.Stop() { + select { + case <-idleTimer.C: + default: + } + } if err != nil { peerLog.Infof("unable to read message from %v: %v", p, err) @@ -1427,6 +1432,14 @@ func (p *peer) writeMessage(msg lnwire.Message) error { // // NOTE: This method MUST be run as a goroutine. func (p *peer) writeHandler() { + // We'll stop the timer after a new messages is sent, and also reset it + // after we process the next message. + idleTimer := time.AfterFunc(idleTimeout, func() { + err := fmt.Errorf("Peer %s no write for %s -- disconnecting", + p, idleTimeout) + p.Disconnect(err) + }) + var exitErr error const ( @@ -1501,6 +1514,16 @@ out: goto retryWithDelay } + // The write succeeded, reset the idle timer to prevent + // us from disconnecting the peer. + if !idleTimer.Stop() { + select { + case <-idleTimer.C: + default: + } + } + idleTimer.Reset(idleTimeout) + // If the peer requested a synchronous write, respond // with the error. if outMsg.errChan != nil { From 60467bef7be557ef462932f6bb5b7b0001e927da Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 26 Mar 2019 16:40:57 -0700 Subject: [PATCH 4/6] lncfg/workers: bump default read/write workers from 16 -> 100 Bumps the default read and write handlers to be well above the average number of peers a node has. Since the worker counts specify only a maximum number of concurrent read/write workers, it is expected that the actual usage would converge to the requirements of the node anyway. However, in preparation for a major release, this is a conservative measure to ensure that the default values aren't too low and improve network instability. --- lncfg/workers.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lncfg/workers.go b/lncfg/workers.go index fec57ddda..6bbacc5f9 100644 --- a/lncfg/workers.go +++ b/lncfg/workers.go @@ -5,11 +5,11 @@ import "fmt" const ( // DefaultReadWorkers is the default maximum number of concurrent // workers used by the daemon's read pool. - DefaultReadWorkers = 16 + DefaultReadWorkers = 100 // DefaultWriteWorkers is the default maximum number of concurrent // workers used by the daemon's write pool. - DefaultWriteWorkers = 16 + DefaultWriteWorkers = 100 // DefaultSigWorkers is the default maximum number of concurrent workers // used by the daemon's sig pool. From 7358535725de3ee33164264ae9d2036fe87b8df1 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 26 Mar 2019 16:41:13 -0700 Subject: [PATCH 5/6] peer+server: log pubkey@addr --- peer.go | 2 +- server.go | 27 ++++++++++++++------------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/peer.go b/peer.go index 70b409226..4cdeed579 100644 --- a/peer.go +++ b/peer.go @@ -638,7 +638,7 @@ func (p *peer) Disconnect(reason error) { // String returns the string representation of this peer. func (p *peer) String() string { - return p.conn.RemoteAddr().String() + return fmt.Sprintf("%x@%s", p.pubKeyBytes, p.conn.RemoteAddr()) } // readNextMessage reads, and returns the next message on the wire along with diff --git a/server.go b/server.go index 0a4d2ec74..441c43cd8 100644 --- a/server.go +++ b/server.go @@ -2161,9 +2161,9 @@ func (s *server) InboundPeerConnected(conn net.Conn) { // the new connection s.t there's only a single connection between us. localPub := s.identityPriv.PubKey() if !connectedPeer.inbound && !shouldDropLocalConnection(localPub, nodePub) { - srvrLog.Warnf("Received inbound connection from peer %x, "+ + srvrLog.Warnf("Received inbound connection from peer %v, "+ "but already have outbound connection, dropping conn", - nodePub.SerializeCompressed()) + connectedPeer) conn.Close() return } @@ -2236,7 +2236,8 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) return } - srvrLog.Infof("Established connection to: %v", conn.RemoteAddr()) + srvrLog.Infof("Established connection to: %x@%v", pubStr, + conn.RemoteAddr()) if connReq != nil { // A successful connection was returned by the connmgr. @@ -2268,9 +2269,9 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) // the new connection s.t there's only a single connection between us. localPub := s.identityPriv.PubKey() if connectedPeer.inbound && shouldDropLocalConnection(localPub, nodePub) { - srvrLog.Warnf("Established outbound connection to peer %x, "+ + srvrLog.Warnf("Established outbound connection to peer %v, "+ "but already have inbound connection, dropping conn", - nodePub.SerializeCompressed()) + connectedPeer) if connReq != nil { s.connMgr.Remove(connReq.ID()) } @@ -2355,8 +2356,8 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, addr := conn.RemoteAddr() pubKey := brontideConn.RemotePub() - srvrLog.Infof("Finalizing connection to %x, inbound=%v", - pubKey.SerializeCompressed(), inbound) + srvrLog.Infof("Finalizing connection to %x@%s, inbound=%v", + pubKey.SerializeCompressed(), addr, inbound) peerAddr := &lnwire.NetAddress{ IdentityKey: pubKey, @@ -2473,7 +2474,7 @@ func (s *server) peerInitializer(p *peer) { defer s.mu.Unlock() // Check if there are listeners waiting for this peer to come online. - srvrLog.Debugf("Notifying that peer %x is online", p.PubKey()) + srvrLog.Debugf("Notifying that peer %v is online", p) for _, peerChan := range s.peerConnectedListeners[pubStr] { select { case peerChan <- p: @@ -2527,8 +2528,7 @@ func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) { // TODO(roasbeef): instead add a PurgeInterfaceLinks function? links, err := p.server.htlcSwitch.GetLinksByInterface(p.pubKeyBytes) if err != nil && err != htlcswitch.ErrNoLinksFound { - srvrLog.Errorf("Unable to get channel links for %x: %v", - p.PubKey(), err) + srvrLog.Errorf("Unable to get channel links for %v: %v", p, err) } for _, link := range links { @@ -2540,7 +2540,7 @@ func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) { // If there were any notification requests for when this peer // disconnected, we can trigger them now. - srvrLog.Debugf("Notifying that peer %x is offline", p.PubKey()) + srvrLog.Debugf("Notifying that peer %x is offline", p) pubStr := string(pubKey.SerializeCompressed()) for _, offlineChan := range s.peerDisconnectedListeners[pubStr] { close(offlineChan) @@ -2736,13 +2736,14 @@ func (s *server) ConnectToPeer(addr *lnwire.NetAddress, perm bool) error { // connection. if reqs, ok := s.persistentConnReqs[targetPub]; ok { srvrLog.Warnf("Already have %d persistent connection "+ - "requests for %v, connecting anyway.", len(reqs), addr) + "requests for %x@%v, connecting anyway.", len(reqs), + targetPub, addr) } // If there's not already a pending or active connection to this node, // then instruct the connection manager to attempt to establish a // persistent connection to the peer. - srvrLog.Debugf("Connecting to %v", addr) + srvrLog.Debugf("Connecting to %x@%v", targetPub, addr) if perm { connReq := &connmgr.ConnReq{ Addr: addr, From c5365160fbb4e6bd4a5959d94bdea052ec555f60 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 26 Mar 2019 16:41:30 -0700 Subject: [PATCH 6/6] server: wrap peer connection tiebreaking to 80 chars --- server.go | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/server.go b/server.go index 441c43cd8..0ac457332 100644 --- a/server.go +++ b/server.go @@ -2156,14 +2156,17 @@ func (s *server) InboundPeerConnected(conn net.Conn) { case nil: // We already have a connection with the incoming peer. If the - // connection we've already established should be kept and is not of - // the same type of the new connection (inbound), then we'll close out - // the new connection s.t there's only a single connection between us. + // connection we've already established should be kept and is + // not of the same type of the new connection (inbound), then + // we'll close out the new connection s.t there's only a single + // connection between us. localPub := s.identityPriv.PubKey() - if !connectedPeer.inbound && !shouldDropLocalConnection(localPub, nodePub) { - srvrLog.Warnf("Received inbound connection from peer %v, "+ - "but already have outbound connection, dropping conn", - connectedPeer) + if !connectedPeer.inbound && + !shouldDropLocalConnection(localPub, nodePub) { + + srvrLog.Warnf("Received inbound connection from "+ + "peer %v, but already have outbound "+ + "connection, dropping conn", connectedPeer) conn.Close() return } @@ -2264,14 +2267,17 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) case nil: // We already have a connection with the incoming peer. If the - // connection we've already established should be kept and is not of - // the same type of the new connection (outbound), then we'll close out - // the new connection s.t there's only a single connection between us. + // connection we've already established should be kept and is + // not of the same type of the new connection (outbound), then + // we'll close out the new connection s.t there's only a single + // connection between us. localPub := s.identityPriv.PubKey() - if connectedPeer.inbound && shouldDropLocalConnection(localPub, nodePub) { - srvrLog.Warnf("Established outbound connection to peer %v, "+ - "but already have inbound connection, dropping conn", - connectedPeer) + if connectedPeer.inbound && + shouldDropLocalConnection(localPub, nodePub) { + + srvrLog.Warnf("Established outbound connection to "+ + "peer %v, but already have inbound "+ + "connection, dropping conn", connectedPeer) if connReq != nil { s.connMgr.Remove(connReq.ID()) }