diff --git a/server.go b/server.go index 4503dbce2..bf7700920 100644 --- a/server.go +++ b/server.go @@ -1917,155 +1917,6 @@ func (s *server) findPeerByPubStr(pubStr string) (*peer, error) { return peer, nil } -// peerTerminationWatcher waits until a peer has been disconnected unexpectedly, -// and then cleans up all resources allocated to the peer, notifies relevant -// sub-systems of its demise, and finally handles re-connecting to the peer if -// it's persistent. If the server intentionally disconnects a peer, it should -// have a corresponding entry in the ignorePeerTermination map which will cause -// the cleanup routine to exit early. The passed `ready` chan is used to -// synchronize when WaitForDisconnect should begin watching on the peer's -// waitgroup. The ready chan should only be signaled if the peer starts -// successfully, otherwise the peer should be disconnected instead. -// -// NOTE: This MUST be launched as a goroutine. -func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) { - defer s.wg.Done() - - p.WaitForDisconnect(ready) - - srvrLog.Debugf("Peer %v has been disconnected", p) - - // If the server is exiting then we can bail out early ourselves as all - // the other sub-systems will already be shutting down. - if s.Stopped() { - return - } - - // Next, we'll cancel all pending funding reservations with this node. - // If we tried to initiate any funding flows that haven't yet finished, - // then we need to unlock those committed outputs so they're still - // available for use. - s.fundingMgr.CancelPeerReservations(p.PubKey()) - - pubKey := p.addr.IdentityKey - - // We'll also inform the gossiper that this peer is no longer active, - // so we don't need to maintain sync state for it any longer. - s.authGossiper.PruneSyncState(pubKey) - - // Tell the switch to remove all links associated with this peer. - // Passing nil as the target link indicates that all links associated - // with this interface should be closed. - // - // TODO(roasbeef): instead add a PurgeInterfaceLinks function? - links, err := p.server.htlcSwitch.GetLinksByInterface(p.pubKeyBytes) - if err != nil { - srvrLog.Errorf("unable to get channel links: %v", err) - } - - for _, link := range links { - p.server.htlcSwitch.RemoveLink(link.ChanID()) - } - - s.mu.Lock() - defer s.mu.Unlock() - - // If the server has already removed this peer, we can short circuit the - // peer termination watcher and skip cleanup. - if _, ok := s.ignorePeerTermination[p]; ok { - delete(s.ignorePeerTermination, p) - - pubKey := p.PubKey() - pubStr := string(pubKey[:]) - - // If a connection callback is present, we'll go ahead and - // execute it now that previous peer has fully disconnected. If - // the callback is not present, this likely implies the peer was - // purposefully disconnected via RPC, and that no reconnect - // should be attempted. - connCallback, ok := s.scheduledPeerConnection[pubStr] - if ok { - delete(s.scheduledPeerConnection, pubStr) - connCallback() - } - return - } - - // First, cleanup any remaining state the server has regarding the peer - // in question. - s.removePeer(p) - - // Next, check to see if this is a persistent peer or not. - pubStr := string(pubKey.SerializeCompressed()) - _, ok := s.persistentPeers[pubStr] - if ok { - // We'll only need to re-launch a connection request if one - // isn't already currently pending. - if _, ok := s.persistentConnReqs[pubStr]; ok { - return - } - - // We'll ensure that we locate an advertised address to use - // within the peer's address for reconnection purposes. - // - // TODO(roasbeef): use them all? - if p.inbound { - advertisedAddr, err := s.fetchNodeAdvertisedAddr( - pubKey, - ) - if err != nil { - srvrLog.Errorf("Unable to retrieve advertised "+ - "address for node %x: %v", - pubKey.SerializeCompressed(), err) - } else { - p.addr.Address = advertisedAddr - } - } - - // Otherwise, we'll launch a new connection request in order to - // attempt to maintain a persistent connection with this peer. - connReq := &connmgr.ConnReq{ - Addr: p.addr, - Permanent: true, - } - s.persistentConnReqs[pubStr] = append( - s.persistentConnReqs[pubStr], connReq) - - // Record the computed backoff in the backoff map. - backoff := s.nextPeerBackoff(pubStr, p.StartTime()) - s.persistentPeersBackoff[pubStr] = backoff - - // Initialize a retry canceller for this peer if one does not - // exist. - cancelChan, ok := s.persistentRetryCancels[pubStr] - if !ok { - cancelChan = make(chan struct{}) - s.persistentRetryCancels[pubStr] = cancelChan - } - - // We choose not to wait group this go routine since the Connect - // call can stall for arbitrarily long if we shutdown while an - // outbound connection attempt is being made. - go func() { - srvrLog.Debugf("Scheduling connection re-establishment to "+ - "persistent peer %v in %s", p, backoff) - - select { - case <-time.After(backoff): - case <-cancelChan: - return - case <-s.quit: - return - } - - srvrLog.Debugf("Attempting to re-establish persistent "+ - "connection to peer %v", p) - - s.connMgr.Connect(connReq) - }() - } -} - // nextPeerBackoff computes the next backoff duration for a peer's pubkey using // exponential backoff. If no previous backoff was known, the default is // returned. @@ -2113,63 +1964,6 @@ func (s *server) shouldRequestGraphSync() bool { return len(s.peersByPub) <= 2 } -// peerConnected is a function that handles initialization a newly connected -// peer by adding it to the server's global list of all active peers, and -// starting all the goroutines the peer needs to function properly. The inbound -// boolean should be true if the peer initiated the connection to us. -func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, - inbound bool) { - - brontideConn := conn.(*brontide.Conn) - addr := conn.RemoteAddr() - pubKey := brontideConn.RemotePub() - - srvrLog.Infof("Finalizing connection to %x, inbound=%v", - pubKey.SerializeCompressed(), inbound) - - peerAddr := &lnwire.NetAddress{ - IdentityKey: pubKey, - Address: addr, - ChainNet: activeNetParams.Net, - } - - // With the brontide connection established, we'll now craft the local - // feature vector to advertise to the remote node. - localFeatures := lnwire.NewRawFeatureVector() - - // We'll signal that we understand the data loss protection feature, - // and also that we support the new gossip query features. - localFeatures.Set(lnwire.DataLossProtectOptional) - localFeatures.Set(lnwire.GossipQueriesOptional) - - // We'll only request a full channel graph sync if we detect that that - // we aren't fully synced yet. - if s.shouldRequestGraphSync() { - // TODO(roasbeef): only do so if gossiper doesn't have active - // peers? - localFeatures.Set(lnwire.InitialRoutingSync) - } - - // Now that we've established a connection, create a peer, and it to - // the set of currently active peers. - p, err := newPeer(conn, connReq, s, peerAddr, inbound, localFeatures) - if err != nil { - srvrLog.Errorf("unable to create peer %v", err) - return - } - - // TODO(roasbeef): update IP address for link-node - // * also mark last-seen, do it one single transaction? - - s.addPeer(p) - - // Dispatch a goroutine to asynchronously start the peer. This process - // includes sending and receiving Init messages, which would be a DOS - // vector if we held the server's mutex throughout the procedure. - s.wg.Add(1) - go s.peerInitializer(p) -} - // shouldDropConnection determines if our local connection to a remote peer // should be dropped in the case of concurrent connection establishment. In // order to deterministically decide which connection should be dropped, we'll @@ -2428,6 +2222,63 @@ func (s *server) cancelConnReqs(pubStr string, skip *uint64) { delete(s.persistentConnReqs, pubStr) } +// peerConnected is a function that handles initialization a newly connected +// peer by adding it to the server's global list of all active peers, and +// starting all the goroutines the peer needs to function properly. The inbound +// boolean should be true if the peer initiated the connection to us. +func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, + inbound bool) { + + brontideConn := conn.(*brontide.Conn) + addr := conn.RemoteAddr() + pubKey := brontideConn.RemotePub() + + srvrLog.Infof("Finalizing connection to %x, inbound=%v", + pubKey.SerializeCompressed(), inbound) + + peerAddr := &lnwire.NetAddress{ + IdentityKey: pubKey, + Address: addr, + ChainNet: activeNetParams.Net, + } + + // With the brontide connection established, we'll now craft the local + // feature vector to advertise to the remote node. + localFeatures := lnwire.NewRawFeatureVector() + + // We'll signal that we understand the data loss protection feature, + // and also that we support the new gossip query features. + localFeatures.Set(lnwire.DataLossProtectOptional) + localFeatures.Set(lnwire.GossipQueriesOptional) + + // We'll only request a full channel graph sync if we detect that that + // we aren't fully synced yet. + if s.shouldRequestGraphSync() { + // TODO(roasbeef): only do so if gossiper doesn't have active + // peers? + localFeatures.Set(lnwire.InitialRoutingSync) + } + + // Now that we've established a connection, create a peer, and it to + // the set of currently active peers. + p, err := newPeer(conn, connReq, s, peerAddr, inbound, localFeatures) + if err != nil { + srvrLog.Errorf("unable to create peer %v", err) + return + } + + // TODO(roasbeef): update IP address for link-node + // * also mark last-seen, do it one single transaction? + + s.addPeer(p) + + // Dispatch a goroutine to asynchronously start the peer. This process + // includes sending and receiving Init messages, which would be a DOS + // vector if we held the server's mutex throughout the procedure. + s.wg.Add(1) + go s.peerInitializer(p) +} + // addPeer adds the passed peer to the server's global state of all active // peers. func (s *server) addPeer(p *peer) { @@ -2542,6 +2393,155 @@ func (s *server) peerInitializer(p *peer) { delete(s.peerConnectedListeners, pubStr) } +// peerTerminationWatcher waits until a peer has been disconnected unexpectedly, +// and then cleans up all resources allocated to the peer, notifies relevant +// sub-systems of its demise, and finally handles re-connecting to the peer if +// it's persistent. If the server intentionally disconnects a peer, it should +// have a corresponding entry in the ignorePeerTermination map which will cause +// the cleanup routine to exit early. The passed `ready` chan is used to +// synchronize when WaitForDisconnect should begin watching on the peer's +// waitgroup. The ready chan should only be signaled if the peer starts +// successfully, otherwise the peer should be disconnected instead. +// +// NOTE: This MUST be launched as a goroutine. +func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) { + defer s.wg.Done() + + p.WaitForDisconnect(ready) + + srvrLog.Debugf("Peer %v has been disconnected", p) + + // If the server is exiting then we can bail out early ourselves as all + // the other sub-systems will already be shutting down. + if s.Stopped() { + return + } + + // Next, we'll cancel all pending funding reservations with this node. + // If we tried to initiate any funding flows that haven't yet finished, + // then we need to unlock those committed outputs so they're still + // available for use. + s.fundingMgr.CancelPeerReservations(p.PubKey()) + + pubKey := p.addr.IdentityKey + + // We'll also inform the gossiper that this peer is no longer active, + // so we don't need to maintain sync state for it any longer. + s.authGossiper.PruneSyncState(pubKey) + + // Tell the switch to remove all links associated with this peer. + // Passing nil as the target link indicates that all links associated + // with this interface should be closed. + // + // TODO(roasbeef): instead add a PurgeInterfaceLinks function? + links, err := p.server.htlcSwitch.GetLinksByInterface(p.pubKeyBytes) + if err != nil { + srvrLog.Errorf("unable to get channel links: %v", err) + } + + for _, link := range links { + p.server.htlcSwitch.RemoveLink(link.ChanID()) + } + + s.mu.Lock() + defer s.mu.Unlock() + + // If the server has already removed this peer, we can short circuit the + // peer termination watcher and skip cleanup. + if _, ok := s.ignorePeerTermination[p]; ok { + delete(s.ignorePeerTermination, p) + + pubKey := p.PubKey() + pubStr := string(pubKey[:]) + + // If a connection callback is present, we'll go ahead and + // execute it now that previous peer has fully disconnected. If + // the callback is not present, this likely implies the peer was + // purposefully disconnected via RPC, and that no reconnect + // should be attempted. + connCallback, ok := s.scheduledPeerConnection[pubStr] + if ok { + delete(s.scheduledPeerConnection, pubStr) + connCallback() + } + return + } + + // First, cleanup any remaining state the server has regarding the peer + // in question. + s.removePeer(p) + + // Next, check to see if this is a persistent peer or not. + pubStr := string(pubKey.SerializeCompressed()) + _, ok := s.persistentPeers[pubStr] + if ok { + // We'll only need to re-launch a connection request if one + // isn't already currently pending. + if _, ok := s.persistentConnReqs[pubStr]; ok { + return + } + + // We'll ensure that we locate an advertised address to use + // within the peer's address for reconnection purposes. + // + // TODO(roasbeef): use them all? + if p.inbound { + advertisedAddr, err := s.fetchNodeAdvertisedAddr( + pubKey, + ) + if err != nil { + srvrLog.Errorf("Unable to retrieve advertised "+ + "address for node %x: %v", + pubKey.SerializeCompressed(), err) + } else { + p.addr.Address = advertisedAddr + } + } + + // Otherwise, we'll launch a new connection request in order to + // attempt to maintain a persistent connection with this peer. + connReq := &connmgr.ConnReq{ + Addr: p.addr, + Permanent: true, + } + s.persistentConnReqs[pubStr] = append( + s.persistentConnReqs[pubStr], connReq) + + // Record the computed backoff in the backoff map. + backoff := s.nextPeerBackoff(pubStr, p.StartTime()) + s.persistentPeersBackoff[pubStr] = backoff + + // Initialize a retry canceller for this peer if one does not + // exist. + cancelChan, ok := s.persistentRetryCancels[pubStr] + if !ok { + cancelChan = make(chan struct{}) + s.persistentRetryCancels[pubStr] = cancelChan + } + + // We choose not to wait group this go routine since the Connect + // call can stall for arbitrarily long if we shutdown while an + // outbound connection attempt is being made. + go func() { + srvrLog.Debugf("Scheduling connection re-establishment to "+ + "persistent peer %v in %s", p, backoff) + + select { + case <-time.After(backoff): + case <-cancelChan: + return + case <-s.quit: + return + } + + srvrLog.Debugf("Attempting to re-establish persistent "+ + "connection to peer %v", p) + + s.connMgr.Connect(connReq) + }() + } +} + // removePeer removes the passed peer from the server's state of all active // peers. func (s *server) removePeer(p *peer) {