diff --git a/lncfg/workers.go b/lncfg/workers.go index fec57ddda..6bbacc5f9 100644 --- a/lncfg/workers.go +++ b/lncfg/workers.go @@ -5,11 +5,11 @@ import "fmt" const ( // DefaultReadWorkers is the default maximum number of concurrent // workers used by the daemon's read pool. - DefaultReadWorkers = 16 + DefaultReadWorkers = 100 // DefaultWriteWorkers is the default maximum number of concurrent // workers used by the daemon's write pool. - DefaultWriteWorkers = 16 + DefaultWriteWorkers = 100 // DefaultSigWorkers is the default maximum number of concurrent workers // used by the daemon's sig pool. diff --git a/peer.go b/peer.go index f1fe9490f..4cdeed579 100644 --- a/peer.go +++ b/peer.go @@ -45,7 +45,7 @@ const ( idleTimeout = 5 * time.Minute // writeMessageTimeout is the timeout used when writing a message to peer. - writeMessageTimeout = 50 * time.Second + writeMessageTimeout = 5 * time.Second // readMessageTimeout is the timeout used when reading a message from a // peer. @@ -638,7 +638,7 @@ func (p *peer) Disconnect(reason error) { // String returns the string representation of this peer. func (p *peer) String() string { - return p.conn.RemoteAddr().String() + return fmt.Sprintf("%x@%s", p.pubKeyBytes, p.conn.RemoteAddr()) } // readNextMessage reads, and returns the next message on the wire along with @@ -1005,7 +1005,12 @@ func (p *peer) readHandler() { out: for atomic.LoadInt32(&p.disconnect) == 0 { nextMsg, err := p.readNextMessage() - idleTimer.Stop() + if !idleTimer.Stop() { + select { + case <-idleTimer.C: + default: + } + } if err != nil { peerLog.Infof("unable to read message from %v: %v", p, err) @@ -1427,29 +1432,100 @@ func (p *peer) writeMessage(msg lnwire.Message) error { // // NOTE: This method MUST be run as a goroutine. func (p *peer) writeHandler() { + // We'll stop the timer after a new messages is sent, and also reset it + // after we process the next message. + idleTimer := time.AfterFunc(idleTimeout, func() { + err := fmt.Errorf("Peer %s no write for %s -- disconnecting", + p, idleTimeout) + p.Disconnect(err) + }) + var exitErr error + const ( + minRetryDelay = 5 * time.Second + maxRetryDelay = time.Minute + ) + out: for { select { case outMsg := <-p.sendQueue: - switch outMsg.msg.(type) { + // Record the time at which we first attempt to send the + // message. + startTime := time.Now() + + // Initialize a retry delay of zero, which will be + // increased if we encounter a write timeout on the + // send. + var retryDelay time.Duration + retryWithDelay: + if retryDelay > 0 { + select { + case <-time.After(retryDelay): + case <-p.quit: + // Inform synchronous writes that the + // peer is exiting. + if outMsg.errChan != nil { + outMsg.errChan <- ErrPeerExiting + } + exitErr = ErrPeerExiting + break out + } + } + // If we're about to send a ping message, then log the // exact time in which we send the message so we can // use the delay as a rough estimate of latency to the // remote peer. - case *lnwire.Ping: + if _, ok := outMsg.msg.(*lnwire.Ping); ok { // TODO(roasbeef): do this before the write? // possibly account for processing within func? now := time.Now().UnixNano() atomic.StoreInt64(&p.pingLastSend, now) } - // Write out the message to the socket, responding with - // error if `errChan` is non-nil. The `errChan` allows - // callers to optionally synchronize sends with the - // writeHandler. + // Write out the message to the socket. If a timeout + // error is encountered, we will catch this and retry + // after backing off in case the remote peer is just + // slow to process messages from the wire. err := p.writeMessage(outMsg.msg) + if nerr, ok := err.(net.Error); ok && nerr.Timeout() { + // Increase the retry delay in the event of a + // timeout error, this prevents us from + // disconnecting if the remote party is slow to + // pull messages off the wire. We back off + // exponentially up to our max delay to prevent + // blocking the write pool. + if retryDelay == 0 { + retryDelay = minRetryDelay + } else { + retryDelay *= 2 + if retryDelay > maxRetryDelay { + retryDelay = maxRetryDelay + } + } + + peerLog.Debugf("Write timeout detected for "+ + "peer %s, retrying after %v, "+ + "first attempted %v ago", p, retryDelay, + time.Since(startTime)) + + goto retryWithDelay + } + + // The write succeeded, reset the idle timer to prevent + // us from disconnecting the peer. + if !idleTimer.Stop() { + select { + case <-idleTimer.C: + default: + } + } + idleTimer.Reset(idleTimeout) + + // If the peer requested a synchronous write, respond + // with the error. if outMsg.errChan != nil { outMsg.errChan <- err } diff --git a/server.go b/server.go index 0a4d2ec74..0ac457332 100644 --- a/server.go +++ b/server.go @@ -2156,14 +2156,17 @@ func (s *server) InboundPeerConnected(conn net.Conn) { case nil: // We already have a connection with the incoming peer. If the - // connection we've already established should be kept and is not of - // the same type of the new connection (inbound), then we'll close out - // the new connection s.t there's only a single connection between us. + // connection we've already established should be kept and is + // not of the same type of the new connection (inbound), then + // we'll close out the new connection s.t there's only a single + // connection between us. localPub := s.identityPriv.PubKey() - if !connectedPeer.inbound && !shouldDropLocalConnection(localPub, nodePub) { - srvrLog.Warnf("Received inbound connection from peer %x, "+ - "but already have outbound connection, dropping conn", - nodePub.SerializeCompressed()) + if !connectedPeer.inbound && + !shouldDropLocalConnection(localPub, nodePub) { + + srvrLog.Warnf("Received inbound connection from "+ + "peer %v, but already have outbound "+ + "connection, dropping conn", connectedPeer) conn.Close() return } @@ -2236,7 +2239,8 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) return } - srvrLog.Infof("Established connection to: %v", conn.RemoteAddr()) + srvrLog.Infof("Established connection to: %x@%v", pubStr, + conn.RemoteAddr()) if connReq != nil { // A successful connection was returned by the connmgr. @@ -2263,14 +2267,17 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) case nil: // We already have a connection with the incoming peer. If the - // connection we've already established should be kept and is not of - // the same type of the new connection (outbound), then we'll close out - // the new connection s.t there's only a single connection between us. + // connection we've already established should be kept and is + // not of the same type of the new connection (outbound), then + // we'll close out the new connection s.t there's only a single + // connection between us. localPub := s.identityPriv.PubKey() - if connectedPeer.inbound && shouldDropLocalConnection(localPub, nodePub) { - srvrLog.Warnf("Established outbound connection to peer %x, "+ - "but already have inbound connection, dropping conn", - nodePub.SerializeCompressed()) + if connectedPeer.inbound && + shouldDropLocalConnection(localPub, nodePub) { + + srvrLog.Warnf("Established outbound connection to "+ + "peer %v, but already have inbound "+ + "connection, dropping conn", connectedPeer) if connReq != nil { s.connMgr.Remove(connReq.ID()) } @@ -2355,8 +2362,8 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, addr := conn.RemoteAddr() pubKey := brontideConn.RemotePub() - srvrLog.Infof("Finalizing connection to %x, inbound=%v", - pubKey.SerializeCompressed(), inbound) + srvrLog.Infof("Finalizing connection to %x@%s, inbound=%v", + pubKey.SerializeCompressed(), addr, inbound) peerAddr := &lnwire.NetAddress{ IdentityKey: pubKey, @@ -2473,7 +2480,7 @@ func (s *server) peerInitializer(p *peer) { defer s.mu.Unlock() // Check if there are listeners waiting for this peer to come online. - srvrLog.Debugf("Notifying that peer %x is online", p.PubKey()) + srvrLog.Debugf("Notifying that peer %v is online", p) for _, peerChan := range s.peerConnectedListeners[pubStr] { select { case peerChan <- p: @@ -2527,8 +2534,7 @@ func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) { // TODO(roasbeef): instead add a PurgeInterfaceLinks function? links, err := p.server.htlcSwitch.GetLinksByInterface(p.pubKeyBytes) if err != nil && err != htlcswitch.ErrNoLinksFound { - srvrLog.Errorf("Unable to get channel links for %x: %v", - p.PubKey(), err) + srvrLog.Errorf("Unable to get channel links for %v: %v", p, err) } for _, link := range links { @@ -2540,7 +2546,7 @@ func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) { // If there were any notification requests for when this peer // disconnected, we can trigger them now. - srvrLog.Debugf("Notifying that peer %x is offline", p.PubKey()) + srvrLog.Debugf("Notifying that peer %x is offline", p) pubStr := string(pubKey.SerializeCompressed()) for _, offlineChan := range s.peerDisconnectedListeners[pubStr] { close(offlineChan) @@ -2736,13 +2742,14 @@ func (s *server) ConnectToPeer(addr *lnwire.NetAddress, perm bool) error { // connection. if reqs, ok := s.persistentConnReqs[targetPub]; ok { srvrLog.Warnf("Already have %d persistent connection "+ - "requests for %v, connecting anyway.", len(reqs), addr) + "requests for %x@%v, connecting anyway.", len(reqs), + targetPub, addr) } // If there's not already a pending or active connection to this node, // then instruct the connection manager to attempt to establish a // persistent connection to the peer. - srvrLog.Debugf("Connecting to %v", addr) + srvrLog.Debugf("Connecting to %x@%v", targetPub, addr) if perm { connReq := &connmgr.ConnReq{ Addr: addr,