mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-08-30 07:35:07 +02:00
Merge branch '0-19-2-branch-rc1-10016' into 0-19-2-branch-rc1
This commit is contained in:
@@ -611,8 +611,7 @@ func (a *accessMan) addPeerAccess(remotePub *btcec.PublicKey,
|
||||
|
||||
// removePeerAccess removes the peer's access from the maps. This should be
|
||||
// called when the peer has been disconnected.
|
||||
func (a *accessMan) removePeerAccess(peerPubKey string) {
|
||||
ctx := btclog.WithCtx(context.TODO(), "peer", peerPubKey)
|
||||
func (a *accessMan) removePeerAccess(ctx context.Context, peerPubKey string) {
|
||||
acsmLog.DebugS(ctx, "Removing access:")
|
||||
|
||||
a.banScoreMtx.Lock()
|
||||
|
@@ -708,6 +708,7 @@ func TestAddPeerAccessOutbound(t *testing.T) {
|
||||
// accessman's internal state based on the peer's status.
|
||||
func TestRemovePeerAccess(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := context.Background()
|
||||
|
||||
// Create a testing accessMan.
|
||||
a := &accessMan{
|
||||
@@ -758,7 +759,7 @@ func TestRemovePeerAccess(t *testing.T) {
|
||||
// We now assert `removePeerAccess` behaves as expected.
|
||||
//
|
||||
// Remove peer1 should change nothing.
|
||||
a.removePeerAccess(peer1)
|
||||
a.removePeerAccess(ctx, peer1)
|
||||
|
||||
// peer1 should be removed from peerScores but not peerChanInfo.
|
||||
_, found := a.peerScores[peer1]
|
||||
@@ -767,7 +768,7 @@ func TestRemovePeerAccess(t *testing.T) {
|
||||
require.True(t, found)
|
||||
|
||||
// Remove peer2 should change nothing.
|
||||
a.removePeerAccess(peer2)
|
||||
a.removePeerAccess(ctx, peer2)
|
||||
|
||||
// peer2 should be removed from peerScores but not peerChanInfo.
|
||||
_, found = a.peerScores[peer2]
|
||||
@@ -776,7 +777,7 @@ func TestRemovePeerAccess(t *testing.T) {
|
||||
require.True(t, found)
|
||||
|
||||
// Remove peer3 should remove it from the maps.
|
||||
a.removePeerAccess(peer3)
|
||||
a.removePeerAccess(ctx, peer3)
|
||||
|
||||
// peer3 should be removed from peerScores and peerChanInfo.
|
||||
_, found = a.peerScores[peer3]
|
||||
@@ -785,7 +786,7 @@ func TestRemovePeerAccess(t *testing.T) {
|
||||
require.False(t, found)
|
||||
|
||||
// Remove peer4 should remove it from the maps.
|
||||
a.removePeerAccess(peer4)
|
||||
a.removePeerAccess(ctx, peer4)
|
||||
|
||||
// peer4 should be removed from peerScores and NOT be found in
|
||||
// peerChanInfo.
|
||||
@@ -795,7 +796,7 @@ func TestRemovePeerAccess(t *testing.T) {
|
||||
require.False(t, found)
|
||||
|
||||
// Remove peer5 should be NOOP.
|
||||
a.removePeerAccess(peer5)
|
||||
a.removePeerAccess(ctx, peer5)
|
||||
|
||||
// peer5 should NOT be found.
|
||||
_, found = a.peerScores[peer5]
|
||||
|
75
server.go
75
server.go
@@ -24,6 +24,7 @@ import (
|
||||
"github.com/btcsuite/btcd/connmgr"
|
||||
"github.com/btcsuite/btcd/txscript"
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
"github.com/btcsuite/btclog/v2"
|
||||
sphinx "github.com/lightningnetwork/lightning-onion"
|
||||
"github.com/lightningnetwork/lnd/aliasmgr"
|
||||
"github.com/lightningnetwork/lnd/autopilot"
|
||||
@@ -56,6 +57,7 @@ import (
|
||||
"github.com/lightningnetwork/lnd/lnpeer"
|
||||
"github.com/lightningnetwork/lnd/lnrpc"
|
||||
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
|
||||
"github.com/lightningnetwork/lnd/lnutils"
|
||||
"github.com/lightningnetwork/lnd/lnwallet"
|
||||
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
|
||||
"github.com/lightningnetwork/lnd/lnwallet/chanfunding"
|
||||
@@ -4074,6 +4076,11 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
|
||||
s.peerConnected(conn, nil, true)
|
||||
|
||||
case nil:
|
||||
ctx := btclog.WithCtx(
|
||||
context.TODO(),
|
||||
lnutils.LogPubKey("peer", connectedPeer.IdentityKey()),
|
||||
)
|
||||
|
||||
// We already have a connection with the incoming peer. If the
|
||||
// connection we've already established should be kept and is
|
||||
// not of the same type of the new connection (inbound), then
|
||||
@@ -4083,24 +4090,24 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
|
||||
if !connectedPeer.Inbound() &&
|
||||
!shouldDropLocalConnection(localPub, nodePub) {
|
||||
|
||||
srvrLog.Warnf("Received inbound connection from "+
|
||||
"peer %v, but already have outbound "+
|
||||
"connection, dropping conn", connectedPeer)
|
||||
srvrLog.WarnS(ctx, "Received inbound connection from "+
|
||||
"peer, but already have outbound "+
|
||||
"connection, dropping conn",
|
||||
fmt.Errorf("already have outbound conn"))
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
// Otherwise, if we should drop the connection, then we'll
|
||||
// disconnect our already connected peer.
|
||||
srvrLog.Debugf("Disconnecting stale connection to %v",
|
||||
connectedPeer)
|
||||
srvrLog.DebugS(ctx, "Disconnecting stale connection")
|
||||
|
||||
s.cancelConnReqs(pubStr, nil)
|
||||
|
||||
// Remove the current peer from the server's internal state and
|
||||
// signal that the peer termination watcher does not need to
|
||||
// execute for this peer.
|
||||
s.removePeerUnsafe(connectedPeer)
|
||||
s.removePeerUnsafe(ctx, connectedPeer)
|
||||
s.ignorePeerTermination[connectedPeer] = struct{}{}
|
||||
s.scheduledPeerConnection[pubStr] = func() {
|
||||
s.peerConnected(conn, nil, true)
|
||||
@@ -4189,6 +4196,11 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
|
||||
s.peerConnected(conn, connReq, false)
|
||||
|
||||
case nil:
|
||||
ctx := btclog.WithCtx(
|
||||
context.TODO(),
|
||||
lnutils.LogPubKey("peer", connectedPeer.IdentityKey()),
|
||||
)
|
||||
|
||||
// We already have a connection with the incoming peer. If the
|
||||
// connection we've already established should be kept and is
|
||||
// not of the same type of the new connection (outbound), then
|
||||
@@ -4198,9 +4210,10 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
|
||||
if connectedPeer.Inbound() &&
|
||||
shouldDropLocalConnection(localPub, nodePub) {
|
||||
|
||||
srvrLog.Warnf("Established outbound connection to "+
|
||||
"peer %v, but already have inbound "+
|
||||
"connection, dropping conn", connectedPeer)
|
||||
srvrLog.WarnS(ctx, "Established outbound connection "+
|
||||
"to peer, but already have inbound "+
|
||||
"connection, dropping conn",
|
||||
fmt.Errorf("already have inbound conn"))
|
||||
if connReq != nil {
|
||||
s.connMgr.Remove(connReq.ID())
|
||||
}
|
||||
@@ -4211,13 +4224,12 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
|
||||
// Otherwise, _their_ connection should be dropped. So we'll
|
||||
// disconnect the peer and send the now obsolete peer to the
|
||||
// server for garbage collection.
|
||||
srvrLog.Debugf("Disconnecting stale connection to %v",
|
||||
connectedPeer)
|
||||
srvrLog.DebugS(ctx, "Disconnecting stale connection")
|
||||
|
||||
// Remove the current peer from the server's internal state and
|
||||
// signal that the peer termination watcher does not need to
|
||||
// execute for this peer.
|
||||
s.removePeerUnsafe(connectedPeer)
|
||||
s.removePeerUnsafe(ctx, connectedPeer)
|
||||
s.ignorePeerTermination[connectedPeer] = struct{}{}
|
||||
s.scheduledPeerConnection[pubStr] = func() {
|
||||
s.peerConnected(conn, connReq, false)
|
||||
@@ -4665,14 +4677,18 @@ func (s *server) peerInitializer(p *peer.Brontide) {
|
||||
func (s *server) peerTerminationWatcher(p *peer.Brontide, ready chan struct{}) {
|
||||
defer s.wg.Done()
|
||||
|
||||
ctx := btclog.WithCtx(
|
||||
context.TODO(), lnutils.LogPubKey("peer", p.IdentityKey()),
|
||||
)
|
||||
|
||||
p.WaitForDisconnect(ready)
|
||||
|
||||
srvrLog.Debugf("Peer %v has been disconnected", p)
|
||||
srvrLog.DebugS(ctx, "Peer has been disconnected")
|
||||
|
||||
// If the server is exiting then we can bail out early ourselves as all
|
||||
// the other sub-systems will already be shutting down.
|
||||
if s.Stopped() {
|
||||
srvrLog.Debugf("Server quitting, exit early for peer %v", p)
|
||||
srvrLog.DebugS(ctx, "Server quitting, exit early for peer")
|
||||
return
|
||||
}
|
||||
|
||||
@@ -4707,7 +4723,7 @@ func (s *server) peerTerminationWatcher(p *peer.Brontide, ready chan struct{}) {
|
||||
|
||||
// If there were any notification requests for when this peer
|
||||
// disconnected, we can trigger them now.
|
||||
srvrLog.Debugf("Notifying that peer %v is offline", p)
|
||||
srvrLog.DebugS(ctx, "Notifying that peer is offline")
|
||||
pubStr := string(pubKey.SerializeCompressed())
|
||||
for _, offlineChan := range s.peerDisconnectedListeners[pubStr] {
|
||||
close(offlineChan)
|
||||
@@ -4737,7 +4753,7 @@ func (s *server) peerTerminationWatcher(p *peer.Brontide, ready chan struct{}) {
|
||||
|
||||
// First, cleanup any remaining state the server has regarding the peer
|
||||
// in question.
|
||||
s.removePeerUnsafe(p)
|
||||
s.removePeerUnsafe(ctx, p)
|
||||
|
||||
// Next, check to see if this is a persistent peer or not.
|
||||
if _, ok := s.persistentPeers[pubStr]; !ok {
|
||||
@@ -4776,18 +4792,16 @@ func (s *server) peerTerminationWatcher(p *peer.Brontide, ready chan struct{}) {
|
||||
// the address used by our onion service to dial
|
||||
// to lnd), so we don't have enough information
|
||||
// to attempt a reconnect.
|
||||
srvrLog.Debugf("Ignoring reconnection attempt "+
|
||||
"to inbound peer %v without "+
|
||||
"advertised address", p)
|
||||
srvrLog.DebugS(ctx, "Ignoring reconnection attempt "+
|
||||
"to inbound peer without advertised address")
|
||||
return
|
||||
|
||||
// We came across an error retrieving an advertised
|
||||
// address, log it, and fall back to the existing peer
|
||||
// address.
|
||||
default:
|
||||
srvrLog.Errorf("Unable to retrieve advertised "+
|
||||
"address for node %x: %v", p.PubKey(),
|
||||
err)
|
||||
srvrLog.ErrorS(ctx, "Unable to retrieve advertised "+
|
||||
"address for peer", err)
|
||||
}
|
||||
|
||||
// Make an easy lookup map so that we can check if an address
|
||||
@@ -4829,9 +4843,9 @@ func (s *server) peerTerminationWatcher(p *peer.Brontide, ready chan struct{}) {
|
||||
// call can stall for arbitrarily long if we shutdown while an
|
||||
// outbound connection attempt is being made.
|
||||
go func() {
|
||||
srvrLog.Debugf("Scheduling connection re-establishment to "+
|
||||
"persistent peer %x in %s",
|
||||
p.IdentityKey().SerializeCompressed(), backoff)
|
||||
srvrLog.DebugS(ctx, "Scheduling connection "+
|
||||
"re-establishment to persistent peer",
|
||||
"reconnecting_in", backoff)
|
||||
|
||||
select {
|
||||
case <-time.After(backoff):
|
||||
@@ -4841,9 +4855,8 @@ func (s *server) peerTerminationWatcher(p *peer.Brontide, ready chan struct{}) {
|
||||
return
|
||||
}
|
||||
|
||||
srvrLog.Debugf("Attempting to re-establish persistent "+
|
||||
"connection to peer %x",
|
||||
p.IdentityKey().SerializeCompressed())
|
||||
srvrLog.DebugS(ctx, "Attempting to re-establish persistent "+
|
||||
"connection")
|
||||
|
||||
s.connectToPersistentPeer(pubStr)
|
||||
}()
|
||||
@@ -4950,12 +4963,12 @@ func (s *server) connectToPersistentPeer(pubKeyStr string) {
|
||||
// active peers.
|
||||
//
|
||||
// NOTE: Server mutex must be held when calling this function.
|
||||
func (s *server) removePeerUnsafe(p *peer.Brontide) {
|
||||
func (s *server) removePeerUnsafe(ctx context.Context, p *peer.Brontide) {
|
||||
if p == nil {
|
||||
return
|
||||
}
|
||||
|
||||
srvrLog.Debugf("Removing peer %v", p)
|
||||
srvrLog.DebugS(ctx, "Removing peer")
|
||||
|
||||
// Exit early if we have already been instructed to shutdown, the peers
|
||||
// will be disconnected in the server shutdown process.
|
||||
@@ -4995,7 +5008,7 @@ func (s *server) removePeerUnsafe(p *peer.Brontide) {
|
||||
|
||||
// Remove the peer's access permission from the access manager.
|
||||
peerPubStr := string(p.IdentityKey().SerializeCompressed())
|
||||
s.peerAccessMan.removePeerAccess(peerPubStr)
|
||||
s.peerAccessMan.removePeerAccess(ctx, peerPubStr)
|
||||
|
||||
// Copy the peer's error buffer across to the server if it has
|
||||
// any items in it so that we can restore peer errors across
|
||||
|
Reference in New Issue
Block a user