diff --git a/accessman.go b/accessman.go index 9667fbc2e..66ec71646 100644 --- a/accessman.go +++ b/accessman.go @@ -21,20 +21,24 @@ type accessMan struct { // the server mutex. banScoreMtx sync.RWMutex - // peerCounts is a mapping from remote public key to {bool, uint64} - // where the bool indicates that we have an open/closed channel with - // the peer and where the uint64 indicates the number of pending-open + // peerChanInfo is a mapping from remote public key to {bool, uint64} + // where the bool indicates that we have an open/closed channel with the + // peer and where the uint64 indicates the number of pending-open // channels we currently have with them. This mapping will be used to // determine access permissions for the peer. The map key is the // string-version of the serialized public key. // // NOTE: This MUST be accessed with the banScoreMtx held. - peerCounts map[string]channeldb.ChanCount + peerChanInfo map[string]channeldb.ChanCount // peerScores stores each connected peer's access status. The map key // is the string-version of the serialized public key. // // NOTE: This MUST be accessed with the banScoreMtx held. + // + // TODO(yy): unify `peerScores` and `peerChanInfo` - there's no need to + // create two maps tracking essentially the same info. `numRestricted` + // can also be derived from `peerChanInfo`. peerScores map[string]peerSlotStatus // numRestricted tracks the number of peers with restricted access in @@ -44,7 +48,7 @@ type accessMan struct { type accessManConfig struct { // initAccessPerms checks the channeldb for initial access permissions - // and then populates the peerCounts and peerScores maps. + // and then populates the peerChanInfo and peerScores maps. initAccessPerms func() (map[string]channeldb.ChanCount, error) // shouldDisconnect determines whether we should disconnect a peer or @@ -57,9 +61,9 @@ type accessManConfig struct { func newAccessMan(cfg *accessManConfig) (*accessMan, error) { a := &accessMan{ - cfg: cfg, - peerCounts: make(map[string]channeldb.ChanCount), - peerScores: make(map[string]peerSlotStatus), + cfg: cfg, + peerChanInfo: make(map[string]channeldb.ChanCount), + peerScores: make(map[string]peerSlotStatus), } counts, err := a.cfg.initAccessPerms() @@ -67,16 +71,58 @@ func newAccessMan(cfg *accessManConfig) (*accessMan, error) { return nil, err } - // We'll populate the server's peerCounts map with the counts fetched + // We'll populate the server's peerChanInfo map with the counts fetched // via initAccessPerms. Also note that we haven't yet connected to the // peers. - maps.Copy(a.peerCounts, counts) + maps.Copy(a.peerChanInfo, counts) acsmLog.Info("Access Manager initialized") return a, nil } +// hasPeer checks whether a given peer already exists in the internal maps. +func (a *accessMan) hasPeer(ctx context.Context, + pub string) (peerAccessStatus, bool) { + + // Lock banScoreMtx for reading so that we can read the banning maps + // below. + a.banScoreMtx.RLock() + defer a.banScoreMtx.RUnlock() + + count, found := a.peerChanInfo[pub] + if found { + if count.HasOpenOrClosedChan { + acsmLog.DebugS(ctx, "Peer has open/closed channel, "+ + "assigning protected access") + + // Exit early if the peer is no longer restricted. + return peerStatusProtected, true + } + + if count.PendingOpenCount != 0 { + acsmLog.DebugS(ctx, "Peer has pending channel(s), "+ + "assigning temporary access") + + // Exit early if the peer is no longer restricted. + return peerStatusTemporary, true + } + + return peerStatusRestricted, true + } + + // Check if the peer is found in the scores map. + status, found := a.peerScores[pub] + if found { + acsmLog.DebugS(ctx, "Peer already has access", "access", + status.state) + + return status.state, true + } + + return peerStatusRestricted, false +} + // assignPeerPerms assigns a new peer its permissions. This does not track the // access in the maps. This is intentional. func (a *accessMan) assignPeerPerms(remotePub *btcec.PublicKey) ( @@ -91,31 +137,15 @@ func (a *accessMan) assignPeerPerms(remotePub *btcec.PublicKey) ( acsmLog.DebugS(ctx, "Assigning permissions") // Default is restricted unless the below filters say otherwise. - access := peerStatusRestricted + access, peerExist := a.hasPeer(ctx, peerMapKey) - // Lock banScoreMtx for reading so that we can update the banning maps - // below. - a.banScoreMtx.RLock() - if count, found := a.peerCounts[peerMapKey]; found { - if count.HasOpenOrClosedChan { - acsmLog.DebugS(ctx, "Peer has open/closed channel, "+ - "assigning protected access") - - access = peerStatusProtected - } else if count.PendingOpenCount != 0 { - acsmLog.DebugS(ctx, "Peer has pending channel(s), "+ - "assigning temporary access") - - access = peerStatusTemporary - } - } - a.banScoreMtx.RUnlock() - - // Exit early if the peer status is no longer restricted. + // Exit early if the peer is not restricted. if access != peerStatusRestricted { return access, nil } + // If we are here, it means the peer has peerStatusRestricted. + // // Check whether this peer is banned. shouldDisconnect, err := a.cfg.shouldDisconnect(remotePub) if err != nil { @@ -138,6 +168,12 @@ func (a *accessMan) assignPeerPerms(remotePub *btcec.PublicKey) ( // peer. acsmLog.DebugS(ctx, "Peer has no channels, assigning restricted access") + // If this is an existing peer, there's no need to check for slot limit. + if peerExist { + acsmLog.DebugS(ctx, "Skipped slot check for existing peer") + return access, nil + } + a.banScoreMtx.RLock() defer a.banScoreMtx.RUnlock() @@ -187,11 +223,11 @@ func (a *accessMan) newPendingOpenChan(remotePub *btcec.PublicKey) error { case peerStatusTemporary: // If this peer's access status is temporary, we'll need to - // update the peerCounts map. The peer's access status will stay - // temporary. - peerCount, found := a.peerCounts[peerMapKey] + // update the peerChanInfo map. The peer's access status will + // stay temporary. + peerCount, found := a.peerChanInfo[peerMapKey] if !found { - // Error if we did not find any info in peerCounts. + // Error if we did not find any info in peerChanInfo. acsmLog.ErrorS(ctx, "Pending peer info not found", ErrNoPendingPeerInfo) @@ -200,7 +236,7 @@ func (a *accessMan) newPendingOpenChan(remotePub *btcec.PublicKey) error { // Increment the pending channel amount. peerCount.PendingOpenCount += 1 - a.peerCounts[peerMapKey] = peerCount + a.peerChanInfo[peerMapKey] = peerCount acsmLog.DebugS(ctx, "Peer is temporary, incremented "+ "pending count", @@ -210,13 +246,13 @@ func (a *accessMan) newPendingOpenChan(remotePub *btcec.PublicKey) error { // If the peer's access status is restricted, then we can // transition it to a temporary-access peer. We'll need to // update numRestricted and also peerScores. We'll also need to - // update peerCounts. + // update peerChanInfo. peerCount := channeldb.ChanCount{ HasOpenOrClosedChan: false, PendingOpenCount: 1, } - a.peerCounts[peerMapKey] = peerCount + a.peerChanInfo[peerMapKey] = peerCount // A restricted-access slot has opened up. oldRestricted := a.numRestricted @@ -277,12 +313,12 @@ func (a *accessMan) newPendingCloseChan(remotePub *btcec.PublicKey) error { case peerStatusTemporary: // If this peer is temporary, we need to check if it will // revert to a restricted-access peer. - peerCount, found := a.peerCounts[peerMapKey] + peerCount, found := a.peerChanInfo[peerMapKey] if !found { acsmLog.ErrorS(ctx, "Pending peer info not found", ErrNoPendingPeerInfo) - // Error if we did not find any info in peerCounts. + // Error if we did not find any info in peerChanInfo. return ErrNoPendingPeerInfo } @@ -293,8 +329,8 @@ func (a *accessMan) newPendingCloseChan(remotePub *btcec.PublicKey) error { "pending_count", currentNumPending) if currentNumPending == 0 { - // Remove the entry from peerCounts. - delete(a.peerCounts, peerMapKey) + // Remove the entry from peerChanInfo. + delete(a.peerChanInfo, peerMapKey) // If this is the only pending-open channel for this // peer and it's getting removed, attempt to demote @@ -334,7 +370,7 @@ func (a *accessMan) newPendingCloseChan(remotePub *btcec.PublicKey) error { // Else, we don't need to demote this peer since it has other // pending-open channels with us. peerCount.PendingOpenCount = currentNumPending - a.peerCounts[peerMapKey] = peerCount + a.peerChanInfo[peerMapKey] = peerCount acsmLog.DebugS(ctx, "Peer still has other pending channels", "pending_count", currentNumPending) @@ -394,9 +430,9 @@ func (a *accessMan) newOpenChan(remotePub *btcec.PublicKey) error { case peerStatusTemporary: // If the peer's state is temporary, we'll upgrade the peer to // a protected peer. - peerCount, found := a.peerCounts[peerMapKey] + peerCount, found := a.peerChanInfo[peerMapKey] if !found { - // Error if we did not find any info in peerCounts. + // Error if we did not find any info in peerChanInfo. acsmLog.ErrorS(ctx, "Pending peer info not found", ErrNoPendingPeerInfo) @@ -404,7 +440,9 @@ func (a *accessMan) newOpenChan(remotePub *btcec.PublicKey) error { } peerCount.HasOpenOrClosedChan = true - a.peerCounts[peerMapKey] = peerCount + peerCount.PendingOpenCount -= 1 + + a.peerChanInfo[peerMapKey] = peerCount newStatus := peerSlotStatus{ state: peerStatusProtected, @@ -443,11 +481,15 @@ func (a *accessMan) newOpenChan(remotePub *btcec.PublicKey) error { } } -// checkIncomingConnBanScore checks whether, given the remote's public hex- +// checkAcceptIncomingConn checks whether, given the remote's public hex- // encoded key, we should not accept this incoming connection or immediately // disconnect. This does not assign to the server's peerScores maps. This is // just an inbound filter that the brontide listeners use. -func (a *accessMan) checkIncomingConnBanScore(remotePub *btcec.PublicKey) ( +// +// TODO(yy): We should also consider removing this `checkAcceptIncomingConn` +// check as a) it doesn't check for ban score; and b) we should, and already +// have this check when we handle incoming connection in `InboundPeerConnected`. +func (a *accessMan) checkAcceptIncomingConn(remotePub *btcec.PublicKey) ( bool, error) { ctx := btclog.WithCtx( @@ -461,42 +503,51 @@ func (a *accessMan) checkIncomingConnBanScore(remotePub *btcec.PublicKey) ( a.banScoreMtx.RLock() defer a.banScoreMtx.RUnlock() - if _, found := a.peerCounts[peerMapKey]; !found { - acsmLog.DebugS(ctx, "Peer not found in counts, "+ - "checking restricted slots") + _, found := a.peerChanInfo[peerMapKey] - // Check numRestricted to see if there is an available slot. In - // the future, it's possible to add better heuristics. - if a.numRestricted < a.cfg.maxRestrictedSlots { - // There is an available slot. - acsmLog.DebugS(ctx, "Restricted slot available, "+ - "accepting", - "num_restricted", a.numRestricted, - "max_restricted", a.cfg.maxRestrictedSlots) + // Exit early if found. + if found { + acsmLog.DebugS(ctx, "Peer found (protected/temporary), "+ + "accepting") - return true, nil - } - - // If there are no slots left, then we reject this connection. - acsmLog.WarnS(ctx, "No restricted slots available, "+ - "rejecting", - ErrNoMoreRestrictedAccessSlots, - "num_restricted", a.numRestricted, - "max_restricted", a.cfg.maxRestrictedSlots) - - return false, ErrNoMoreRestrictedAccessSlots + return true, nil } - // Else, the peer is either protected or temporary. - acsmLog.DebugS(ctx, "Peer found (protected/temporary), accepting") + _, found = a.peerScores[peerMapKey] - return true, nil + // Exit early if found. + if found { + acsmLog.DebugS(ctx, "Found existing peer, accepting") + + return true, nil + } + + acsmLog.DebugS(ctx, "Peer not found in counts, checking restricted "+ + "slots") + + // Check numRestricted to see if there is an available slot. In + // the future, it's possible to add better heuristics. + if a.numRestricted < a.cfg.maxRestrictedSlots { + // There is an available slot. + acsmLog.DebugS(ctx, "Restricted slot available, accepting ", + "num_restricted", a.numRestricted, "max_restricted", + a.cfg.maxRestrictedSlots) + + return true, nil + } + + // If there are no slots left, then we reject this connection. + acsmLog.WarnS(ctx, "No restricted slots available, rejecting ", + ErrNoMoreRestrictedAccessSlots, "num_restricted", + a.numRestricted, "max_restricted", a.cfg.maxRestrictedSlots) + + return false, ErrNoMoreRestrictedAccessSlots } // addPeerAccess tracks a peer's access in the maps. This should be called when // the peer has fully connected. func (a *accessMan) addPeerAccess(remotePub *btcec.PublicKey, - access peerAccessStatus) { + access peerAccessStatus, inbound bool) { ctx := btclog.WithCtx( context.TODO(), lnutils.LogPubKey("peer", remotePub), @@ -510,34 +561,64 @@ func (a *accessMan) addPeerAccess(remotePub *btcec.PublicKey, peerMapKey := string(remotePub.SerializeCompressed()) + // Exit early if this is an existing peer, which means it won't take + // another slot. + _, found := a.peerScores[peerMapKey] + if found { + acsmLog.DebugS(ctx, "Skipped taking restricted slot for "+ + "existing peer") + + return + } + a.peerScores[peerMapKey] = peerSlotStatus{state: access} - // Increment numRestricted. - if access == peerStatusRestricted { + // Exit early if this is not a restricted peer. + if access != peerStatusRestricted { + acsmLog.DebugS(ctx, "Skipped taking restricted slot as peer "+ + "already has access", "access", access) + + return + } + + // Increment numRestricted if this is an inbound connection. + if inbound { oldRestricted := a.numRestricted a.numRestricted++ acsmLog.DebugS(ctx, "Incremented restricted slots", "old_restricted", oldRestricted, "new_restricted", a.numRestricted) + + return } + + // Otherwise, this is a newly created outbound connection. We won't + // place any restriction on it, instead, we will do a hot upgrade here + // to move it from restricted to temporary. + peerCount := channeldb.ChanCount{ + HasOpenOrClosedChan: false, + PendingOpenCount: 0, + } + + a.peerChanInfo[peerMapKey] = peerCount + a.peerScores[peerMapKey] = peerSlotStatus{ + state: peerStatusTemporary, + } + + acsmLog.InfoS(ctx, "Upgraded outbound peer: restricted -> temporary") } // removePeerAccess removes the peer's access from the maps. This should be // called when the peer has been disconnected. -func (a *accessMan) removePeerAccess(remotePub *btcec.PublicKey) { +func (a *accessMan) removePeerAccess(peerPubKey string) { + ctx := btclog.WithCtx(context.TODO(), "peer", peerPubKey) + acsmLog.DebugS(ctx, "Removing access:") + a.banScoreMtx.Lock() defer a.banScoreMtx.Unlock() - ctx := btclog.WithCtx( - context.TODO(), lnutils.LogPubKey("peer", remotePub), - ) - - acsmLog.DebugS(ctx, "Removing peer access") - - peerMapKey := string(remotePub.SerializeCompressed()) - - status, found := a.peerScores[peerMapKey] + status, found := a.peerScores[peerPubKey] if !found { acsmLog.InfoS(ctx, "Peer score not found during removal") return @@ -554,7 +635,31 @@ func (a *accessMan) removePeerAccess(remotePub *btcec.PublicKey) { "new_restricted", a.numRestricted) } - acsmLog.TraceS(ctx, "Deleting peer from peerScores") + acsmLog.TraceS(ctx, "Deleting from peerScores:") - delete(a.peerScores, peerMapKey) + delete(a.peerScores, peerPubKey) + + // We now check whether this peer has channels with us or not. + info, found := a.peerChanInfo[peerPubKey] + if !found { + acsmLog.DebugS(ctx, "Chan info not found during removal:") + return + } + + // Exit early if the peer has channel(s) with us. + if info.HasOpenOrClosedChan { + acsmLog.DebugS(ctx, "Skipped removing peer with channels:") + return + } + + // Skip removing the peer if it has pending open/close with us. + if info.PendingOpenCount != 0 { + acsmLog.DebugS(ctx, "Skipped removing peer with pending "+ + "channels:") + return + } + + // Given this peer has no channels with us, we can now remove it. + delete(a.peerChanInfo, peerPubKey) + acsmLog.TraceS(ctx, "Removed peer from peerChanInfo:") } diff --git a/accessman_test.go b/accessman_test.go index 0663a9b4e..30a315eec 100644 --- a/accessman_test.go +++ b/accessman_test.go @@ -1,6 +1,7 @@ package lnd import ( + "context" "testing" "github.com/btcsuite/btcd/btcec/v2" @@ -15,7 +16,7 @@ func assertInboundConnection(t *testing.T, a *accessMan, remotePubSer := string(remotePub.SerializeCompressed()) - isSlotAvailable, err := a.checkIncomingConnBanScore(remotePub) + isSlotAvailable, err := a.checkAcceptIncomingConn(remotePub) require.NoError(t, err) require.True(t, isSlotAvailable) @@ -23,7 +24,7 @@ func assertInboundConnection(t *testing.T, a *accessMan, require.NoError(t, err) require.Equal(t, status, peerAccess) - a.addPeerAccess(remotePub, peerAccess) + a.addPeerAccess(remotePub, peerAccess, true) peerScore, ok := a.peerScores[remotePubSer] require.True(t, ok) require.Equal(t, status, peerScore.state) @@ -63,18 +64,25 @@ func TestAccessManRestrictedSlots(t *testing.T) { peerKey3 := peerPriv3.PubKey() peerKeySer3 := string(peerKey3.SerializeCompressed()) + var ( + peer1PendingCount = 0 + peer2PendingCount = 1 + peer3PendingCount = 1 + ) + initPerms := func() (map[string]channeldb.ChanCount, error) { return map[string]channeldb.ChanCount{ peerKeySer1: { HasOpenOrClosedChan: true, + PendingOpenCount: uint64(peer1PendingCount), }, peerKeySer2: { HasOpenOrClosedChan: true, - PendingOpenCount: 1, + PendingOpenCount: uint64(peer2PendingCount), }, peerKeySer3: { HasOpenOrClosedChan: false, - PendingOpenCount: 1, + PendingOpenCount: uint64(peer3PendingCount), }, }, nil } @@ -92,25 +100,25 @@ func TestAccessManRestrictedSlots(t *testing.T) { a, err := newAccessMan(cfg) require.NoError(t, err) - // Check that the peerCounts map is correctly populated with three + // Check that the peerChanInfo map is correctly populated with three // peers. require.Equal(t, 0, int(a.numRestricted)) - require.Equal(t, 3, len(a.peerCounts)) + require.Equal(t, 3, len(a.peerChanInfo)) - peerCount1, ok := a.peerCounts[peerKeySer1] + peerCount1, ok := a.peerChanInfo[peerKeySer1] require.True(t, ok) require.True(t, peerCount1.HasOpenOrClosedChan) - require.Equal(t, 0, int(peerCount1.PendingOpenCount)) + require.Equal(t, peer1PendingCount, int(peerCount1.PendingOpenCount)) - peerCount2, ok := a.peerCounts[peerKeySer2] + peerCount2, ok := a.peerChanInfo[peerKeySer2] require.True(t, ok) require.True(t, peerCount2.HasOpenOrClosedChan) - require.Equal(t, 1, int(peerCount2.PendingOpenCount)) + require.Equal(t, peer2PendingCount, int(peerCount2.PendingOpenCount)) - peerCount3, ok := a.peerCounts[peerKeySer3] + peerCount3, ok := a.peerChanInfo[peerKeySer3] require.True(t, ok) require.False(t, peerCount3.HasOpenOrClosedChan) - require.Equal(t, 1, int(peerCount3.PendingOpenCount)) + require.Equal(t, peer3PendingCount, int(peerCount3.PendingOpenCount)) // We'll now start to connect the peers. We'll add a new fourth peer // that will take up the restricted slot. The first three peers should @@ -120,7 +128,7 @@ func TestAccessManRestrictedSlots(t *testing.T) { peerKey4 := peerPriv4.PubKey() // Follow the normal process of an incoming connection. We check if we - // can accommodate this peer in checkIncomingConnBanScore and then we + // can accommodate this peer in checkAcceptIncomingConn and then we // assign its access permissions and then insert into the map. assertInboundConnection(t, a, peerKey4, peerStatusRestricted) @@ -134,11 +142,26 @@ func TestAccessManRestrictedSlots(t *testing.T) { require.NoError(t, err) assertAccessState(t, a, peerKey4, peerStatusTemporary) + // Assert that accessman's internal state is updated with peer4. We + // expect this new peer to have 1 pending open count. + peerCount4, ok := a.peerChanInfo[string(peerKey4.SerializeCompressed())] + require.True(t, ok) + require.False(t, peerCount4.HasOpenOrClosedChan) + require.Equal(t, 1, int(peerCount4.PendingOpenCount)) + // Check that an open channel promotes the temporary peer. err = a.newOpenChan(peerKey3) require.NoError(t, err) assertAccessState(t, a, peerKey3, peerStatusProtected) + // Assert that accessman's internal state is updated with peer3. We + // expect this existing peer to decrement its pending open count and the + // flag `HasOpenOrClosedChan` should be true. + peerCount3, ok = a.peerChanInfo[peerKeySer3] + require.True(t, ok) + require.True(t, peerCount3.HasOpenOrClosedChan) + require.Equal(t, peer3PendingCount-1, int(peerCount3.PendingOpenCount)) + // We should be able to accommodate a new peer. peerPriv5, err := btcec.NewPrivateKey() require.NoError(t, err) @@ -150,6 +173,10 @@ func TestAccessManRestrictedSlots(t *testing.T) { // peer. err = a.newPendingCloseChan(peerKey4) require.ErrorIs(t, err, ErrNoMoreRestrictedAccessSlots) + + // Assert that peer4 is removed. + _, ok = a.peerChanInfo[string(peerKey4.SerializeCompressed())] + require.False(t, ok) } // TestAssignPeerPerms asserts that the peer's access status is correctly @@ -250,9 +277,8 @@ func TestAssignPeerPerms(t *testing.T) { expectedErr: ErrGossiperBan, }, // peer6 has no channel with us, and we expect it to have a - // restricted status. We also expect the error - // `ErrNoMoreRestrictedAccessSlots` to be returned given - // we only allow 1 restricted peer in this test. + // restricted status. Since this peer is seen, we don't expect + // the error `ErrNoMoreRestrictedAccessSlots` to be returned. { name: "peer with no channels and restricted", peerPub: genPeerPub(), @@ -264,7 +290,7 @@ func TestAssignPeerPerms(t *testing.T) { numRestricted: 1, expectedStatus: peerStatusRestricted, - expectedErr: ErrNoMoreRestrictedAccessSlots, + expectedErr: nil, }, } @@ -394,3 +420,394 @@ func TestAssignPeerPermsBypassRestriction(t *testing.T) { }) } } + +// TestAssignPeerPermsBypassExisting asserts that when the peer is a +// pre-existing peer, it won't be restricted. +func TestAssignPeerPermsBypassExisting(t *testing.T) { + t.Parallel() + + // genPeerPub is a helper closure that generates a random public key. + genPeerPub := func() *btcec.PublicKey { + peerPriv, err := btcec.NewPrivateKey() + require.NoError(t, err) + + return peerPriv.PubKey() + } + + // peer1 exists in `peerChanInfo` map. + peer1 := genPeerPub() + peer1Str := string(peer1.SerializeCompressed()) + + // peer2 exists in `peerScores` map. + peer2 := genPeerPub() + peer2Str := string(peer2.SerializeCompressed()) + + // peer3 is a new peer. + peer3 := genPeerPub() + + // Create params to init the accessman. + initPerms := func() (map[string]channeldb.ChanCount, error) { + return map[string]channeldb.ChanCount{ + peer1Str: {}, + }, nil + } + + disconnect := func(*btcec.PublicKey) (bool, error) { + return false, nil + } + + cfg := &accessManConfig{ + initAccessPerms: initPerms, + shouldDisconnect: disconnect, + maxRestrictedSlots: 0, + } + + a, err := newAccessMan(cfg) + require.NoError(t, err) + + // Add peer2 to the `peerScores`. + a.peerScores[peer2Str] = peerSlotStatus{ + state: peerStatusTemporary, + } + + // Assigning to peer1 should not return an error. + status, err := a.assignPeerPerms(peer1) + require.NoError(t, err) + require.Equal(t, peerStatusRestricted, status) + + // Assigning to peer2 should not return an error. + status, err = a.assignPeerPerms(peer2) + require.NoError(t, err) + require.Equal(t, peerStatusTemporary, status) + + // Assigning to peer3 should return an error. + status, err = a.assignPeerPerms(peer3) + require.ErrorIs(t, err, ErrNoMoreRestrictedAccessSlots) + require.Equal(t, peerStatusRestricted, status) +} + +// TestHasPeer asserts `hasPeer` returns the correct results. +func TestHasPeer(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + // Create a testing accessMan. + a := &accessMan{ + peerChanInfo: make(map[string]channeldb.ChanCount), + peerScores: make(map[string]peerSlotStatus), + } + + // peer1 exists with an open channel. + peer1 := "peer1" + a.peerChanInfo[peer1] = channeldb.ChanCount{ + HasOpenOrClosedChan: true, + } + peer1Access := peerStatusProtected + + // peer2 exists with a pending channel. + peer2 := "peer2" + a.peerChanInfo[peer2] = channeldb.ChanCount{ + PendingOpenCount: 1, + } + peer2Access := peerStatusTemporary + + // peer3 exists without any channels. + peer3 := "peer3" + a.peerChanInfo[peer3] = channeldb.ChanCount{} + peer3Access := peerStatusRestricted + + // peer4 exists with a score. + peer4 := "peer4" + peer4Access := peerStatusTemporary + a.peerScores[peer4] = peerSlotStatus{state: peer4Access} + + // peer5 doesn't exist. + peer5 := "peer5" + + // We now assert `hasPeer` returns the correct results. + // + // peer1 should be found with peerStatusProtected. + access, found := a.hasPeer(ctx, peer1) + require.True(t, found) + require.Equal(t, peer1Access, access) + + // peer2 should be found with peerStatusTemporary. + access, found = a.hasPeer(ctx, peer2) + require.True(t, found) + require.Equal(t, peer2Access, access) + + // peer3 should be found with peerStatusRestricted. + access, found = a.hasPeer(ctx, peer3) + require.True(t, found) + require.Equal(t, peer3Access, access) + + // peer4 should be found with peerStatusTemporary. + access, found = a.hasPeer(ctx, peer4) + require.True(t, found) + require.Equal(t, peer4Access, access) + + // peer5 should NOT be found. + access, found = a.hasPeer(ctx, peer5) + require.False(t, found) + require.Equal(t, peerStatusRestricted, access) +} + +// TestAddPeerAccessInbound asserts the num of slots is correctly incremented +// only for a new inbound peer with restricted access. +func TestAddPeerAccessInbound(t *testing.T) { + t.Parallel() + + // Create a testing accessMan. + a := &accessMan{ + peerChanInfo: make(map[string]channeldb.ChanCount), + peerScores: make(map[string]peerSlotStatus), + } + + // Create a testing key. + priv, err := btcec.NewPrivateKey() + require.NoError(t, err) + pub := priv.PubKey() + pubStr := string(pub.SerializeCompressed()) + + // Add this peer as an inbound peer with peerStatusRestricted. + a.addPeerAccess(pub, peerStatusRestricted, true) + + // Assert the accessMan's internal state. + // + // We expect to see one peer found in the score map, and one slot is + // taken, and this peer is not found in the counts map. + require.Len(t, a.peerScores, 1) + require.Equal(t, int64(1), a.numRestricted) + require.NotContains(t, a.peerChanInfo, pubStr) + + // The peer should be found in the score map. + score, ok := a.peerScores[pubStr] + require.True(t, ok) + + expecedScore := peerSlotStatus{state: peerStatusRestricted} + require.Equal(t, expecedScore, score) + + // Add this peer again, we expect the available slots to stay unchanged. + a.addPeerAccess(pub, peerStatusRestricted, true) + + // Assert the internal state is not changed. + require.Len(t, a.peerScores, 1) + require.Equal(t, int64(1), a.numRestricted) + require.NotContains(t, a.peerChanInfo, pubStr) + + // Reset the accessMan. + a = &accessMan{ + peerChanInfo: make(map[string]channeldb.ChanCount), + peerScores: make(map[string]peerSlotStatus), + } + + // Add this peer as an inbound peer with peerStatusTemporary. + a.addPeerAccess(pub, peerStatusTemporary, true) + + // Assert the accessMan's internal state. + // + // We expect to see one peer found in the score map, and no slot is + // taken since this peer is not restricted. + require.Len(t, a.peerScores, 1) + require.Equal(t, int64(0), a.numRestricted) + + // NOTE: in reality this is not possible as the peer must have been put + // into the map `peerChanInfo` before its perm can be upgraded. + require.NotContains(t, a.peerChanInfo, pubStr) + + // The peer should be found in the score map. + score, ok = a.peerScores[pubStr] + require.True(t, ok) + + expecedScore = peerSlotStatus{state: peerStatusTemporary} + require.Equal(t, expecedScore, score) +} + +// TestAddPeerAccessOutbound asserts that outbound peer is not restricted and +// its perm is upgraded when it has peerStatusRestricted. +func TestAddPeerAccessOutbound(t *testing.T) { + t.Parallel() + + // Create a testing accessMan. + a := &accessMan{ + peerChanInfo: make(map[string]channeldb.ChanCount), + peerScores: make(map[string]peerSlotStatus), + } + + // Create a testing key. + priv, err := btcec.NewPrivateKey() + require.NoError(t, err) + pub := priv.PubKey() + pubStr := string(pub.SerializeCompressed()) + + // Add this peer as an outbound peer with peerStatusRestricted. + a.addPeerAccess(pub, peerStatusRestricted, false) + + // Assert the accessMan's internal state. + // + // We expect to see one peer found in the score map, and no slot is + // taken, and this peer is found in the counts map. + require.Len(t, a.peerScores, 1) + require.Equal(t, int64(0), a.numRestricted) + require.Contains(t, a.peerChanInfo, pubStr) + + // The peer should be found in the score map. + score, ok := a.peerScores[pubStr] + require.True(t, ok) + + // Its perm should be upgraded to temporary. + expecedScore := peerSlotStatus{state: peerStatusTemporary} + require.Equal(t, expecedScore, score) + + // The peer should be found in the peer counts map. + count, ok := a.peerChanInfo[pubStr] + require.True(t, ok) + + // The peer's count should be initialized correctly. + require.Zero(t, count.PendingOpenCount) + require.False(t, count.HasOpenOrClosedChan) + + // Add this peer again, we expect the available slots to stay unchanged. + a.addPeerAccess(pub, peerStatusRestricted, true) + + // Assert the internal state is not changed. + require.Len(t, a.peerScores, 1) + require.Equal(t, int64(0), a.numRestricted) + require.Contains(t, a.peerChanInfo, pubStr) + + // Reset the accessMan. + a = &accessMan{ + peerChanInfo: make(map[string]channeldb.ChanCount), + peerScores: make(map[string]peerSlotStatus), + } + + // Add this peer as an inbound peer with peerStatusTemporary. + a.addPeerAccess(pub, peerStatusTemporary, true) + + // Assert the accessMan's internal state. + // + // We expect to see one peer found in the score map, and no slot is + // taken since this peer is not restricted. + require.Len(t, a.peerScores, 1) + require.Equal(t, int64(0), a.numRestricted) + + // NOTE: in reality this is not possible as the peer must have been put + // into the map `peerChanInfo` before its perm can be upgraded. + require.NotContains(t, a.peerChanInfo, pubStr) + + // The peer should be found in the score map. + score, ok = a.peerScores[pubStr] + require.True(t, ok) + + expecedScore = peerSlotStatus{state: peerStatusTemporary} + require.Equal(t, expecedScore, score) +} + +// TestRemovePeerAccess asserts `removePeerAccess` correctly update the +// accessman's internal state based on the peer's status. +func TestRemovePeerAccess(t *testing.T) { + t.Parallel() + + // Create a testing accessMan. + a := &accessMan{ + peerChanInfo: make(map[string]channeldb.ChanCount), + peerScores: make(map[string]peerSlotStatus), + } + + // numRestrictedExpected specifies the final value to expect once the + // test finishes. + var numRestrictedExpected int + + // peer1 exists with an open channel, which should not be removed. Since + // it has protected status, the numRestricted should stay unchanged. + peer1 := "peer1" + a.peerChanInfo[peer1] = channeldb.ChanCount{ + HasOpenOrClosedChan: true, + } + peer1Access := peerStatusProtected + a.peerScores[peer1] = peerSlotStatus{state: peer1Access} + + // peer2 exists with a pending channel, which should not be removed. + // Since it has temporary status, the numRestricted should stay + // unchanged. + peer2 := "peer2" + a.peerChanInfo[peer2] = channeldb.ChanCount{ + PendingOpenCount: 1, + } + peer2Access := peerStatusTemporary + a.peerScores[peer2] = peerSlotStatus{state: peer2Access} + + // peer3 exists without any channels, which will be removed. Since it + // has restricted status, the numRestricted should be decremented. + peer3 := "peer3" + a.peerChanInfo[peer3] = channeldb.ChanCount{} + peer3Access := peerStatusRestricted + a.peerScores[peer3] = peerSlotStatus{state: peer3Access} + numRestrictedExpected-- + + // peer4 exists with a score and a temporary status, which will be + // removed. + peer4 := "peer4" + peer4Access := peerStatusTemporary + a.peerScores[peer4] = peerSlotStatus{state: peer4Access} + + // peer5 doesn't exist, removing it will be a NOOP. + peer5 := "peer5" + + // We now assert `removePeerAccess` behaves as expected. + // + // Remove peer1 should change nothing. + a.removePeerAccess(peer1) + + // peer1 should be removed from peerScores but not peerChanInfo. + _, found := a.peerScores[peer1] + require.False(t, found) + _, found = a.peerChanInfo[peer1] + require.True(t, found) + + // Remove peer2 should change nothing. + a.removePeerAccess(peer2) + + // peer2 should be removed from peerScores but not peerChanInfo. + _, found = a.peerScores[peer2] + require.False(t, found) + _, found = a.peerChanInfo[peer2] + require.True(t, found) + + // Remove peer3 should remove it from the maps. + a.removePeerAccess(peer3) + + // peer3 should be removed from peerScores and peerChanInfo. + _, found = a.peerScores[peer3] + require.False(t, found) + _, found = a.peerChanInfo[peer3] + require.False(t, found) + + // Remove peer4 should remove it from the maps. + a.removePeerAccess(peer4) + + // peer4 should be removed from peerScores and NOT be found in + // peerChanInfo. + _, found = a.peerScores[peer4] + require.False(t, found) + _, found = a.peerChanInfo[peer4] + require.False(t, found) + + // Remove peer5 should be NOOP. + a.removePeerAccess(peer5) + + // peer5 should NOT be found. + _, found = a.peerScores[peer5] + require.False(t, found) + _, found = a.peerChanInfo[peer5] + require.False(t, found) + + // Finally, assert the numRestricted is decremented as expected. Given + // we only have peer3 which has restricted access, it should decrement + // once. + // + // NOTE: The value is actually negative here, which is allowed in + // accessman. + require.EqualValues(t, numRestrictedExpected, a.numRestricted) +} diff --git a/channeldb/db.go b/channeldb/db.go index 90530459e..63915bc1b 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -743,7 +743,7 @@ type ChanCount struct { func (c *ChannelStateDB) FetchPermAndTempPeers( chainHash []byte) (map[string]ChanCount, error) { - peerCounts := make(map[string]ChanCount) + peerChanInfo := make(map[string]ChanCount) err := kvdb.View(c.backend, func(tx kvdb.RTx) error { openChanBucket := tx.ReadBucket(openChannelBucket) @@ -829,7 +829,7 @@ func (c *ChannelStateDB) FetchPermAndTempPeers( HasOpenOrClosedChan: isPermPeer, PendingOpenCount: pendingOpenCount, } - peerCounts[string(nodePub)] = peerCount + peerChanInfo[string(nodePub)] = peerCount return nil }) @@ -893,15 +893,15 @@ func (c *ChannelStateDB) FetchPermAndTempPeers( remoteSer := remotePub.SerializeCompressed() remoteKey := string(remoteSer) - count, exists := peerCounts[remoteKey] + count, exists := peerChanInfo[remoteKey] if exists { count.HasOpenOrClosedChan = true - peerCounts[remoteKey] = count + peerChanInfo[remoteKey] = count } else { peerCount := ChanCount{ HasOpenOrClosedChan: true, } - peerCounts[remoteKey] = peerCount + peerChanInfo[remoteKey] = peerCount } } @@ -913,10 +913,10 @@ func (c *ChannelStateDB) FetchPermAndTempPeers( return nil }, func() { - clear(peerCounts) + clear(peerChanInfo) }) - return peerCounts, err + return peerChanInfo, err } // channelSelector describes a function that takes a chain-hash bucket from diff --git a/channeldb/db_test.go b/channeldb/db_test.go index b010d1b65..5cc1e16f6 100644 --- a/channeldb/db_test.go +++ b/channeldb/db_test.go @@ -770,16 +770,16 @@ func TestFetchPermTempPeer(t *testing.T) { ) // Fetch the ChanCount for our peers. - peerCounts, err := cdb.FetchPermAndTempPeers(key[:]) + peerChanInfo, err := cdb.FetchPermAndTempPeers(key[:]) require.NoError(t, err, "unable to fetch perm and temp peers") // There should only be three entries. - require.Len(t, peerCounts, 3) + require.Len(t, peerChanInfo, 3) // The first entry should have OpenClosed set to true and Pending set // to 0. - count1, found := peerCounts[string(pubKey1.SerializeCompressed())] - require.True(t, found, "unable to find peer 1 in peerCounts") + count1, found := peerChanInfo[string(pubKey1.SerializeCompressed())] + require.True(t, found, "unable to find peer 1 in peerChanInfo") require.True( t, count1.HasOpenOrClosedChan, "couldn't find peer 1's channels", @@ -789,15 +789,15 @@ func TestFetchPermTempPeer(t *testing.T) { "peer 1 doesn't have 0 pending-open", ) - count2, found := peerCounts[string(pubKey2.SerializeCompressed())] - require.True(t, found, "unable to find peer 2 in peerCounts") + count2, found := peerChanInfo[string(pubKey2.SerializeCompressed())] + require.True(t, found, "unable to find peer 2 in peerChanInfo") require.False( t, count2.HasOpenOrClosedChan, "found erroneous channels", ) require.Equal(t, uint64(1), count2.PendingOpenCount) - count3, found := peerCounts[string(pubKey3.SerializeCompressed())] - require.True(t, found, "unable to find peer 3 in peerCounts") + count3, found := peerChanInfo[string(pubKey3.SerializeCompressed())] + require.True(t, found, "unable to find peer 3 in peerChanInfo") require.True( t, count3.HasOpenOrClosedChan, "couldn't find peer 3's channels", diff --git a/config.go b/config.go index c66c7c919..53f8f36e5 100644 --- a/config.go +++ b/config.go @@ -238,8 +238,9 @@ const ( // defaultHTTPHeaderTimeout is the default timeout for HTTP requests. DefaultHTTPHeaderTimeout = 5 * time.Second - // DefaultNumRestrictedSlots is the default number of restricted slots - // we'll allocate in the server. + // DefaultNumRestrictedSlots is the default max number of incoming + // connections allowed in the server. Outbound connections are not + // restricted. DefaultNumRestrictedSlots = 100 // BitcoinChainName is a string that represents the Bitcoin blockchain. @@ -529,9 +530,9 @@ type Config struct { // before timing out reading the headers of an HTTP request. HTTPHeaderTimeout time.Duration `long:"http-header-timeout" description:"The maximum duration that the server will wait before timing out reading the headers of an HTTP request."` - // NumRestrictedSlots is the number of restricted slots we'll allocate - // in the server. - NumRestrictedSlots uint64 `long:"num-restricted-slots" description:"The number of restricted slots we'll allocate in the server."` + // NumRestrictedSlots is the max number of incoming connections allowed + // in the server. Outbound connections are not restricted. + NumRestrictedSlots uint64 `long:"num-restricted-slots" description:"The max number of incoming connections allowed in the server. Outbound connections are not restricted."` // NoDisconnectOnPongFailure controls if we'll disconnect if a peer // doesn't respond to a pong in time. diff --git a/docs/release-notes/release-notes-0.19.2.md b/docs/release-notes/release-notes-0.19.2.md index 7dd5820dc..cdbfb2384 100644 --- a/docs/release-notes/release-notes-0.19.2.md +++ b/docs/release-notes/release-notes-0.19.2.md @@ -41,6 +41,10 @@ ## Functional Updates +- [Improved](https://github.com/lightningnetwork/lnd/pull/9880) the connection + restriction logic enforced by `accessman`. In addition, the restriction placed + on outbound connections is now lifted. + ## RPC Updates ## lncli Updates @@ -71,5 +75,4 @@ much more slowly. ## Tooling and Documentation # Contributors (Alphabetical Order) - * Yong Yu diff --git a/funding/manager.go b/funding/manager.go index fb3b599d1..068773a19 100644 --- a/funding/manager.go +++ b/funding/manager.go @@ -511,7 +511,7 @@ type Config struct { // NotifyOpenChannelEvent informs the ChannelNotifier when channels // transition from pending open to open. - NotifyOpenChannelEvent func(wire.OutPoint, *btcec.PublicKey) error + NotifyOpenChannelEvent func(wire.OutPoint, *btcec.PublicKey) // OpenChannelPredicate is a predicate on the lnwire.OpenChannel message // and on the requesting node's public key that returns a bool which @@ -521,13 +521,13 @@ type Config struct { // NotifyPendingOpenChannelEvent informs the ChannelNotifier when // channels enter a pending state. NotifyPendingOpenChannelEvent func(wire.OutPoint, - *channeldb.OpenChannel, *btcec.PublicKey) error + *channeldb.OpenChannel, *btcec.PublicKey) // NotifyFundingTimeout informs the ChannelNotifier when a pending-open // channel times out because the funding transaction hasn't confirmed. // This is only called for the fundee and only if the channel is // zero-conf. - NotifyFundingTimeout func(wire.OutPoint, *btcec.PublicKey) error + NotifyFundingTimeout func(wire.OutPoint, *btcec.PublicKey) // EnableUpfrontShutdown specifies whether the upfront shutdown script // is enabled. @@ -1319,13 +1319,9 @@ func (f *Manager) advancePendingChannelState(channel *channeldb.OpenChannel, // Inform the ChannelNotifier that the channel has transitioned // from pending open to open. - if err := f.cfg.NotifyOpenChannelEvent( + f.cfg.NotifyOpenChannelEvent( channel.FundingOutpoint, channel.IdentityPub, - ); err != nil { - log.Errorf("Unable to notify open channel event for "+ - "ChannelPoint(%v): %v", - channel.FundingOutpoint, err) - } + ) // Find and close the discoverySignal for this channel such // that ChannelReady messages will be processed. @@ -2666,12 +2662,9 @@ func (f *Manager) fundeeProcessFundingCreated(peer lnpeer.Peer, // Inform the ChannelNotifier that the channel has entered // pending open state. - if err := f.cfg.NotifyPendingOpenChannelEvent( + f.cfg.NotifyPendingOpenChannelEvent( fundingOut, completeChan, completeChan.IdentityPub, - ); err != nil { - log.Errorf("Unable to send pending-open channel event for "+ - "ChannelPoint(%v) %v", fundingOut, err) - } + ) // At this point we have sent our last funding message to the // initiating peer before the funding transaction will be broadcast. @@ -2891,13 +2884,9 @@ func (f *Manager) funderProcessFundingSigned(peer lnpeer.Peer, case resCtx.updates <- upd: // Inform the ChannelNotifier that the channel has entered // pending open state. - if err := f.cfg.NotifyPendingOpenChannelEvent( + f.cfg.NotifyPendingOpenChannelEvent( *fundingPoint, completeChan, completeChan.IdentityPub, - ); err != nil { - log.Errorf("Unable to send pending-open channel "+ - "event for ChannelPoint(%v) %v", fundingPoint, - err) - } + ) case <-f.quit: return @@ -2955,11 +2944,7 @@ func (f *Manager) fundingTimeout(c *channeldb.OpenChannel, } // Notify other subsystems about the funding timeout. - err := f.cfg.NotifyFundingTimeout(c.FundingOutpoint, c.IdentityPub) - if err != nil { - log.Errorf("failed to notify of funding timeout for "+ - "ChanPoint(%v): %v", c.FundingOutpoint, err) - } + f.cfg.NotifyFundingTimeout(c.FundingOutpoint, c.IdentityPub) timeoutErr := fmt.Errorf("timeout waiting for funding tx (%v) to "+ "confirm", c.FundingOutpoint) @@ -3341,13 +3326,9 @@ func (f *Manager) handleFundingConfirmation( // Inform the ChannelNotifier that the channel has transitioned from // pending open to open. - if err := f.cfg.NotifyOpenChannelEvent( + f.cfg.NotifyOpenChannelEvent( completeChan.FundingOutpoint, completeChan.IdentityPub, - ); err != nil { - log.Errorf("Unable to notify open channel event for "+ - "ChannelPoint(%v): %v", completeChan.FundingOutpoint, - err) - } + ) // Close the discoverySignal channel, indicating to a separate // goroutine that the channel now is marked as open in the database diff --git a/funding/manager_test.go b/funding/manager_test.go index 6a1b35ed2..72a60e02a 100644 --- a/funding/manager_test.go +++ b/funding/manager_test.go @@ -232,29 +232,23 @@ type mockChanEvent struct { } func (m *mockChanEvent) NotifyOpenChannelEvent(outpoint wire.OutPoint, - remotePub *btcec.PublicKey) error { + remotePub *btcec.PublicKey) { m.openEvent <- outpoint - - return nil } func (m *mockChanEvent) NotifyPendingOpenChannelEvent(outpoint wire.OutPoint, pendingChannel *channeldb.OpenChannel, - remotePub *btcec.PublicKey) error { + remotePub *btcec.PublicKey) { m.pendingOpenEvent <- channelnotifier.PendingOpenChannelEvent{ ChannelPoint: &outpoint, PendingChannel: pendingChannel, } - - return nil } func (m *mockChanEvent) NotifyFundingTimeout(outpoint wire.OutPoint, - remotePub *btcec.PublicKey) error { - - return nil + remotePub *btcec.PublicKey) { } // mockZeroConfAcceptor always accepts the channel open request for zero-conf diff --git a/itest/list_on_test.go b/itest/list_on_test.go index 87275aff4..ec7f672e1 100644 --- a/itest/list_on_test.go +++ b/itest/list_on_test.go @@ -691,10 +691,6 @@ var allTestCases = []*lntest.TestCase{ Name: "funding manager funding timeout", TestFunc: testFundingManagerFundingTimeout, }, - { - Name: "access perm", - TestFunc: testAccessPerm, - }, { Name: "rbf coop close", TestFunc: testCoopCloseRbf, @@ -782,6 +778,9 @@ func init() { "coop close with external delivery", allTestCases, coopCloseWithExternalTestCases, ) + allTestCases = appendPrefixed( + "peer conn", allTestCases, peerConnTestCases, + ) // Prepare the test cases for windows to exclude some of the flaky // ones. diff --git a/itest/lnd_access_perm_test.go b/itest/lnd_access_perm_test.go index 6ff0fbb24..956bdaed0 100644 --- a/itest/lnd_access_perm_test.go +++ b/itest/lnd_access_perm_test.go @@ -1,93 +1,341 @@ package itest import ( - "strconv" - - "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lntest" + "github.com/stretchr/testify/require" ) -// testAccessPerm tests that the number of restricted slots is upheld when -// connecting to the server from a restrictedd peer. -func testAccessPerm(ht *lntest.HarnessTest) { - args := []string{ - "--minbackoff=5m", - "--maxbackoff=5m", - "--num-restricted-slots=5", - } +var peerConnTestCases = []*lntest.TestCase{ + { + Name: "restricted on inbound", + TestFunc: testPeerConnRestrictedOnInbound, + }, + { + Name: "already connected", + TestFunc: testPeerConnAlreadyConnected, + }, + { + Name: "unlimited outbound", + TestFunc: testPeerConnUnlimitedOutbound, + }, + { + Name: "upgrade access perm", + TestFunc: testPeerConnUpgradeAccessPerm, + }, + { + Name: "node restart", + TestFunc: testPeerConnNodeRestart, + }, + { + Name: "peer reconnect", + TestFunc: testPeerConnPeerReconnect, + }, +} - alice := ht.NewNodeWithCoins("Alice", args) - bob := ht.NewNodeWithCoins("Bob", args) - ht.ConnectNodes(alice, bob) +// testPeerConnRestrictedOnInbound checks that when the `num-restricted-slots` +// is reached, no more inbound connection is allowed. In addition, when a slot +// becomes available, new inbound connection should be allowed. +func testPeerConnRestrictedOnInbound(ht *lntest.HarnessTest) { + args := []string{"--num-restricted-slots=1"} - // Open a confirmed channel to Bob. Bob will have protected access. - chanPoint1 := ht.OpenChannel( - alice, bob, lntest.OpenChannelParams{ - Amt: chanAmt, - }, - ) - defer ht.CloseChannel(alice, chanPoint1) + // Create a new node with only one slot available. + alice := ht.NewNode("Alice", args) - // Open and close channel to Carol. Carol will have protected access. - carol := ht.NewNodeWithCoins("Carol", args) - ht.ConnectNodes(alice, carol) + // Create two nodes Bob and Carol. Bob will connect to Alice first, + // taking up Alice's only slot, and we expect the connection from Carol + // to Alice to be failed. + bob := ht.NewNode("Bob", nil) + carol := ht.NewNode("Carol", nil) - chanPoint2 := ht.OpenChannel( - alice, carol, lntest.OpenChannelParams{ - Amt: chanAmt, - }, - ) - - ht.CloseChannel(alice, chanPoint2) - - // Make a pending channel with Dave. - dave := ht.NewNodeWithCoins("Dave", args) - ht.ConnectNodes(alice, dave) - - ht.OpenChannelAssertStream( - dave, alice, lntest.OpenChannelParams{ - Amt: chanAmt, - }, - ) - - // Disconnect Bob, Carol, and Dave. - ht.DisconnectNodes(alice, bob) - ht.AssertNotConnected(alice, bob) - - ht.DisconnectNodes(alice, carol) - ht.AssertNotConnected(alice, carol) - - ht.DisconnectNodes(alice, dave) - ht.AssertNotConnected(alice, dave) - - // Connect 5 times to Alice. All of these connections should be - // successful. - for i := 0; i < 5; i++ { - peer := ht.NewNode("Peer"+strconv.Itoa(i), args) - ht.ConnectNodes(peer, alice) - ht.AssertConnected(peer, alice) - } - - // Connect an additional time to Alice. This should fail. - failedPeer := ht.NewNode("FailedPeer", args) - req := &lnrpc.ConnectPeerRequest{ - Addr: &lnrpc.LightningAddress{ - Pubkey: alice.RPC.GetInfo().IdentityPubkey, - Host: alice.Cfg.P2PAddr(), - }, - } - _ = failedPeer.RPC.ConnectPeer(req) - ht.AssertNotConnected(failedPeer, alice) - - // Connect nodes and assert access status. - ht.ConnectNodes(alice, bob) + // Bob now connects to Alice - from Alice's PoV, this is her inbound + // connection from Bob. We expect Bob to connect to Alice successfully. + _, err := ht.ConnectNodesNoAssert(bob, alice) + require.NoError(ht, err) ht.AssertConnected(alice, bob) - ht.ConnectNodes(alice, carol) + // Carol now connects to Alice - from Alice's PoV, this is her inbound + // connection from Carol. Carol's connection should be failed. + _, err = ht.ConnectNodesNoAssert(carol, alice) + require.NoError(ht, err) + ht.AssertNotConnected(alice, carol) + + // Bob now disconnects, which will free up Alice's slot. + ht.DisconnectNodes(bob, alice) + + // Carol connects again - since Alice's slot is freed, this connection + // should succeed. + _, err = ht.ConnectNodesNoAssert(carol, alice) + require.NoError(ht, err) + + ht.AssertConnected(alice, carol) + ht.AssertNotConnected(alice, bob) +} + +// testPeerConnAlreadyConnected checks that when there's already a connection +// alive, another attempt to connect doesn't take up the slot or kill the +// existing connection, in specific, +// - When Alice has an inbound connection from Bob, another inbound connection +// from Bob will be noop. +// - When Bob has an outbound connection to Alice, another outbound connection +// to Alice will be noop. +// +// In this test we will create nodes using the dev flag `unsafeconnect` to mimic +// the behaviour of persistent/permanent peer connections, where it's likely +// inbound and outbound connections are happening at the same time. +func testPeerConnAlreadyConnected(ht *lntest.HarnessTest) { + args := []string{ + "--num-restricted-slots=1", + } + + // Create a new node with two slots available. + alice := ht.NewNode("Alice", args) + bob := ht.NewNode("Bob", []string{"--dev.unsafeconnect"}) + carol := ht.NewNode("Carol", nil) + + // Bob now connects to Alice - from Alice's PoV, this is her inbound + // connection from Bob. We expect Bob to connect to Alice successfully. + _, err := ht.ConnectNodesNoAssert(bob, alice) + require.NoError(ht, err) + ht.AssertConnected(alice, bob) + + // Assert Alice's slot has been filled up by connecting Carol to Alice. + _, err = ht.ConnectNodesNoAssert(carol, alice) + require.NoError(ht, err) + ht.AssertNotConnected(alice, carol) + + // Bob connects to Alice again - from Alice's PoV, she already has an + // inbound connection from Bob so this connection attempt should be + // noop. We expect Alice and Bob to stay connected. + // + // TODO(yy): There's no way to assert the connection stays the same atm, + // need to update the RPC to return the socket port. As for now we need + // to visit the logs to check the connection is the same or not. + _, err = ht.ConnectNodesNoAssert(bob, alice) + require.NoError(ht, err) + ht.AssertConnected(alice, bob) +} + +// testPeerConnUnlimitedOutbound checks that for outbound connections, they are +// not restricted by the `num-restricted-slots` config. +func testPeerConnUnlimitedOutbound(ht *lntest.HarnessTest) { + args := []string{"--num-restricted-slots=1"} + + // Create a new node with one slot available. + alice := ht.NewNode("Alice", args) + + // Create three nodes. Alice will have an inbound connection with Bob + // and outbound connections with Carol and Dave. + bob := ht.NewNode("Bob", args) + carol := ht.NewNode("Carol", args) + dave := ht.NewNode("Dave", args) + + // Bob now connects to Alice - from Alice's PoV, this is her inbound + // connection from Bob. We expect Bob to connect to Alice successfully. + _, err := ht.ConnectNodesNoAssert(bob, alice) + require.NoError(ht, err) + ht.AssertConnected(alice, bob) + + // Assert Alice's slot has been filled up by connecting Carol to Alice. + _, err = ht.ConnectNodesNoAssert(carol, alice) + require.NoError(ht, err) + ht.AssertNotConnected(alice, carol) + + // Now let Alice make an outbound connection to Carol and assert it's + // succeeded. + _, err = ht.ConnectNodesNoAssert(alice, carol) + require.NoError(ht, err) ht.AssertConnected(alice, carol) - ht.ConnectNodes(alice, dave) + // Alice can also make an outbound connection to Dave and assert it's + // succeeded since outbound connection is not restricted. + _, err = ht.ConnectNodesNoAssert(alice, dave) + require.NoError(ht, err) ht.AssertConnected(alice, dave) - - ht.MineBlocksAndAssertNumTxes(1, 1) +} + +// testPeerConnUpgradeAccessPerm checks that when a peer has, or used to have a +// channel with Alice, it won't use her restricted slots. +func testPeerConnUpgradeAccessPerm(ht *lntest.HarnessTest) { + args := []string{"--num-restricted-slots=1"} + + // Create a new node with one slot available. + alice := ht.NewNodeWithCoins("Alice", args) + bob := ht.NewNode("Bob", nil) + carol := ht.NewNodeWithCoins("Carol", nil) + dave := ht.NewNode("Dave", nil) + eve := ht.NewNode("Eve", nil) + + // Connect Bob to Alice, which will use Alice's available slot. + ht.ConnectNodes(bob, alice) + + // Assert Alice's slot has been filled up by connecting Carol to Alice. + _, err := ht.ConnectNodesNoAssert(carol, alice) + require.NoError(ht, err) + ht.AssertNotConnected(alice, carol) + + // Open a channel from Alice to Bob and let it stay pending. + p := lntest.OpenChannelParams{ + Amt: chanAmt, + } + pendingUpdate := ht.OpenChannelAssertPending(alice, bob, p) + cpAB := lntest.ChanPointFromPendingUpdate(pendingUpdate) + + // Connect Carol to Alice - since Bob now has a pending channel with + // Alice, there's a slot available for Carol to connect. + ht.ConnectNodes(carol, alice) + + // Open a channel from Carol to Alice and let it stay pending. + pendingUpdate = ht.OpenChannelAssertPending(carol, alice, p) + cpCA := lntest.ChanPointFromPendingUpdate(pendingUpdate) + + // Mine the funding txns. + ht.MineBlocksAndAssertNumTxes(1, 2) + + // Dave should be able to connect to Alice. + ht.ConnectNodes(dave, alice) + + // Open a channel from Alice to Dave, which will free up Alice's slot. + cpAD := ht.OpenChannel(alice, dave, p) + + // Close the channels. + ht.CloseChannel(bob, cpAB) + ht.CloseChannel(carol, cpCA) + ht.CloseChannel(dave, cpAD) + + // Alice should have one slot available, connect Eve to Alice now. + ht.ConnectNodes(eve, alice) +} + +// testPeerConnNodeRestart checks that when a peer has or used to have a channel +// with Alice, when Alice restarts, she should still have the available slot for +// more inbound connections. +func testPeerConnNodeRestart(ht *lntest.HarnessTest) { + args := []string{"--num-restricted-slots=1"} + + // Create a new node with one slot available. + alice := ht.NewNodeWithCoins("Alice", args) + bob := ht.NewNode("Bob", []string{"--dev.unsafeconnect"}) + carol := ht.NewNode("Carol", nil) + + // Connect Bob to Alice, which will use Alice's available slot. + ht.ConnectNodes(bob, alice) + + // Assert Alice's slot has been filled up by connecting Carol to Alice. + _, err := ht.ConnectNodesNoAssert(carol, alice) + require.NoError(ht, err) + ht.AssertNotConnected(alice, carol) + + // Restart Alice, which will reset her current slots, allowing Bob to + // connect to her again. + ht.RestartNode(alice) + ht.ConnectNodes(bob, alice) + + // Open a channel from Alice to Bob and let it stay pending. + p := lntest.OpenChannelParams{ + Amt: chanAmt, + } + pendingUpdate := ht.OpenChannelAssertPending(alice, bob, p) + cp := lntest.ChanPointFromPendingUpdate(pendingUpdate) + + // Restart Alice and let Bob connect to her - since Bob has a pending + // channel, it shouldn't take any slot. + ht.RestartNode(alice) + ht.ConnectNodes(bob, alice) + + // Connect Carol to Alice since Alice has a free slot. + ht.ConnectNodes(carol, alice) + + // Mine the funding tx. + ht.MineBlocksAndAssertNumTxes(1, 1) + + // Restart Alice and let Bob connect to her - since Bob has an open + // channel, it shouldn't take any slot. + ht.RestartNode(alice) + ht.ConnectNodes(bob, alice) + + // Connect Carol to Alice since Alice has a free slot. + ht.ConnectNodes(carol, alice) + + // Close the channel. + ht.CloseChannel(alice, cp) + + // Restart Alice and let Bob connect to her - since Bob has a closed + // channel, it shouldn't take any slot. + ht.RestartNode(alice) + ht.ConnectNodes(bob, alice) + + // Connect Carol to Alice since Alice has a free slot. + ht.ConnectNodes(carol, alice) +} + +// testPeerConnPeerReconnect checks that when a peer has or used to have a +// channel with Alice, it will not account for the restricted slot during +// reconnection. +func testPeerConnPeerReconnect(ht *lntest.HarnessTest) { + args := []string{"--num-restricted-slots=1"} + + // Create a new node with one slot available. + alice := ht.NewNodeWithCoins("Alice", args) + bob := ht.NewNode("Bob", []string{"--dev.unsafeconnect"}) + carol := ht.NewNode("Carol", nil) + + // Connect Bob to Alice, which will use Alice's available slot. + ht.ConnectNodes(bob, alice) + + // Let Bob connect to Alice again, which put Bob in Alice's + // `scheduledPeerConnection` map. + ht.ConnectNodes(bob, alice) + + // Assert Alice's slot has been filled up by connecting Carol to Alice. + _, err := ht.ConnectNodesNoAssert(carol, alice) + require.NoError(ht, err) + ht.AssertNotConnected(alice, carol) + + // Open a channel from Alice to Bob and let it stay pending. + p := lntest.OpenChannelParams{ + Amt: chanAmt, + } + pendingUpdate := ht.OpenChannelAssertPending(alice, bob, p) + cp := lntest.ChanPointFromPendingUpdate(pendingUpdate) + + // Bob now perform a reconnection - since Bob has a pending channel, it + // shouldn't take any slot. + ht.DisconnectNodes(bob, alice) + ht.AssertNotConnected(alice, bob) + ht.ConnectNodes(bob, alice) + + // Connect Carol to Alice since Alice has a free slot. + ht.ConnectNodes(carol, alice) + + // Once the above connection succeeded we let Carol disconnect to free + // the slot. + ht.DisconnectNodes(carol, alice) + + // Mine the funding tx. + ht.MineBlocksAndAssertNumTxes(1, 1) + + // Bob now perform a reconnection - since Bob has a pending channel, it + // shouldn't take any slot. + ht.DisconnectNodes(bob, alice) + ht.AssertNotConnected(alice, bob) + ht.ConnectNodes(bob, alice) + + // Connect Carol to Alice since Alice has a free slot. + ht.ConnectNodes(carol, alice) + + // Once the above connection succeeded we let Carol disconnect to free + // the slot. + ht.DisconnectNodes(carol, alice) + + // Close the channel. + ht.CloseChannel(alice, cp) + + // Bob now perform a reconnection - since Bob has a pending channel, it + // shouldn't take any slot. + ht.DisconnectNodes(bob, alice) + ht.AssertNotConnected(alice, bob) + ht.ConnectNodes(bob, alice) + + // Connect Carol to Alice since Alice has a free slot. + ht.ConnectNodes(carol, alice) } diff --git a/lncfg/dev.go b/lncfg/dev.go index 6b2471024..f048d69b7 100644 --- a/lncfg/dev.go +++ b/lncfg/dev.go @@ -52,3 +52,9 @@ func (d *DevConfig) GetZombieSweeperInterval() time.Duration { func (d *DevConfig) GetMaxWaitNumBlocksFundingConf() uint32 { return DefaultMaxWaitNumBlocksFundingConf } + +// GetUnsafeConnect returns the config value `UnsafeConnect`, which is always +// false for production build. +func (d *DevConfig) GetUnsafeConnect() bool { + return false +} diff --git a/lncfg/dev_integration.go b/lncfg/dev_integration.go index c6467af2b..8ac85f5d9 100644 --- a/lncfg/dev_integration.go +++ b/lncfg/dev_integration.go @@ -26,6 +26,7 @@ type DevConfig struct { ZombieSweeperInterval time.Duration `long:"zombiesweeperinterval" description:"The time interval at which channel opening flows are evaluated for zombie status."` UnsafeDisconnect bool `long:"unsafedisconnect" description:"Allows the rpcserver to intentionally disconnect from peers with open channels."` MaxWaitNumBlocksFundingConf uint32 `long:"maxwaitnumblocksfundingconf" description:"Maximum blocks to wait for funding confirmation before discarding non-initiated channels."` + UnsafeConnect bool `long:"unsafeconnect" description:"Allow the rpcserver to connect to a peer even if there's already a connection."` } // ChannelReadyWait returns the config value `ProcessChannelReadyWait`. @@ -51,7 +52,7 @@ func (d *DevConfig) GetZombieSweeperInterval() time.Duration { return d.ZombieSweeperInterval } -// ChannelReadyWait returns the config value `UnsafeDisconnect`. +// GetUnsafeDisconnect returns the config value `UnsafeDisconnect`. func (d *DevConfig) GetUnsafeDisconnect() bool { return d.UnsafeDisconnect } @@ -65,3 +66,8 @@ func (d *DevConfig) GetMaxWaitNumBlocksFundingConf() uint32 { return d.MaxWaitNumBlocksFundingConf } + +// GetUnsafeConnect returns the config value `UnsafeConnect`. +func (d *DevConfig) GetUnsafeConnect() bool { + return d.UnsafeConnect +} diff --git a/lntest/harness_assertion.go b/lntest/harness_assertion.go index e8082bdba..2540ef6b0 100644 --- a/lntest/harness_assertion.go +++ b/lntest/harness_assertion.go @@ -119,7 +119,7 @@ func (h *HarnessTest) ConnectNodes(a, b *node.HarnessNode) { }, } a.RPC.ConnectPeer(req) - h.AssertPeerConnected(a, b) + h.AssertConnected(a, b) } // ConnectNodesPerm creates a persistent connection between the two nodes and @@ -240,6 +240,24 @@ func (h *HarnessTest) EnsureConnected(a, b *node.HarnessNode) { h.AssertPeerConnected(b, a) } +// ConnectNodesNoAssert creates a connection from node A to node B. +func (h *HarnessTest) ConnectNodesNoAssert(a, b *node.HarnessNode) ( + *lnrpc.ConnectPeerResponse, error) { + + bobInfo := b.RPC.GetInfo() + + req := &lnrpc.ConnectPeerRequest{ + Addr: &lnrpc.LightningAddress{ + Pubkey: bobInfo.IdentityPubkey, + Host: b.Cfg.P2PAddr(), + }, + } + ctxt, cancel := context.WithTimeout(h.runCtx, DefaultTimeout) + defer cancel() + + return a.RPC.LN.ConnectPeer(ctxt, req) +} + // AssertNumEdges checks that an expected number of edges can be found in the // node specified. func (h *HarnessTest) AssertNumEdges(hn *node.HarnessNode, @@ -1669,6 +1687,11 @@ func (h *HarnessTest) AssertPeerNotConnected(a, b *node.HarnessNode) { // AssertNotConnected asserts that two peers are not connected. func (h *HarnessTest) AssertNotConnected(a, b *node.HarnessNode) { + // Sleep one second before the assertion to make sure that when there's + // a RPC call to connect, that RPC call is finished before the + // assertion. + time.Sleep(1 * time.Second) + h.AssertPeerNotConnected(a, b) h.AssertPeerNotConnected(b, a) } diff --git a/sample-lnd.conf b/sample-lnd.conf index f4c622ed7..12758f5be 100644 --- a/sample-lnd.conf +++ b/sample-lnd.conf @@ -566,7 +566,8 @@ ; the headers of an HTTP request. ; http-header-timeout=5s -; The number of restricted slots the server will allocate for peers. +; The max number of incoming connections allowed in the server. Outbound +; connections are not restricted. ; num-restricted-slots=100 ; If true, a peer will *not* be disconnected if a pong is not received in time diff --git a/server.go b/server.go index 677e978ae..47d1eaaf4 100644 --- a/server.go +++ b/server.go @@ -193,6 +193,23 @@ const ( peerStatusProtected ) +// String returns a human-readable representation of the status code. +func (p peerAccessStatus) String() string { + switch p { + case peerStatusRestricted: + return "restricted" + + case peerStatusTemporary: + return "temporary" + + case peerStatusProtected: + return "protected" + + default: + return "unknown" + } +} + // peerSlotStatus determines whether a peer gets access to one of our free // slots or gets to bypass this safety mechanism. type peerSlotStatus struct { @@ -1891,7 +1908,9 @@ func newServer(ctx context.Context, cfg *Config, listenAddrs []net.Addr, // connection requests when we call NewListener. listeners[i], err = brontide.NewListener( nodeKeyECDH, listenAddr.String(), - s.peerAccessMan.checkIncomingConnBanScore, + // TODO(yy): remove this check and unify the inbound + // connection check inside `InboundPeerConnected`. + s.peerAccessMan.checkAcceptIncomingConn, ) if err != nil { return nil, err @@ -4009,22 +4028,6 @@ func (s *server) InboundPeerConnected(conn net.Conn) { s.mu.Lock() defer s.mu.Unlock() - // If the remote node's public key is banned, drop the connection. - access, err := s.peerAccessMan.assignPeerPerms(nodePub) - if err != nil { - // Clean up the persistent peer maps if we're dropping this - // connection. - s.bannedPersistentPeerConnection(pubStr) - - srvrLog.Debugf("Dropping connection for %x since we are out "+ - "of restricted-access connection slots: %v.", pubSer, - err) - - conn.Close() - - return - } - // If we already have an outbound connection to this peer, then ignore // this new connection. if p, ok := s.outboundPeers[pubStr]; ok { @@ -4059,7 +4062,7 @@ func (s *server) InboundPeerConnected(conn net.Conn) { // We were unable to locate an existing connection with the // target peer, proceed to connect. s.cancelConnReqs(pubStr, nil) - s.peerConnected(conn, nil, true, access) + s.peerConnected(conn, nil, true) case nil: // We already have a connection with the incoming peer. If the @@ -4091,7 +4094,7 @@ func (s *server) InboundPeerConnected(conn net.Conn) { s.removePeer(connectedPeer) s.ignorePeerTermination[connectedPeer] = struct{}{} s.scheduledPeerConnection[pubStr] = func() { - s.peerConnected(conn, nil, true, access) + s.peerConnected(conn, nil, true) } } } @@ -4116,25 +4119,6 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) s.mu.Lock() defer s.mu.Unlock() - access, err := s.peerAccessMan.assignPeerPerms(nodePub) - if err != nil { - // Clean up the persistent peer maps if we're dropping this - // connection. - s.bannedPersistentPeerConnection(pubStr) - - srvrLog.Debugf("Dropping connection for %x since we are out "+ - "of restricted-access connection slots: %v.", pubSer, - err) - - if connReq != nil { - s.connMgr.Remove(connReq.ID()) - } - - conn.Close() - - return - } - // If we already have an inbound connection to this peer, then ignore // this new connection. if p, ok := s.inboundPeers[pubStr]; ok { @@ -4169,7 +4153,7 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) return } - srvrLog.Infof("Established connection to: %x@%v", pubStr, + srvrLog.Infof("Established outbound connection to: %x@%v", pubStr, conn.RemoteAddr()) if connReq != nil { @@ -4193,7 +4177,7 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) case ErrPeerNotConnected: // We were unable to locate an existing connection with the // target peer, proceed to connect. - s.peerConnected(conn, connReq, false, access) + s.peerConnected(conn, connReq, false) case nil: // We already have a connection with the incoming peer. If the @@ -4227,7 +4211,7 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) s.removePeer(connectedPeer) s.ignorePeerTermination[connectedPeer] = struct{}{} s.scheduledPeerConnection[pubStr] = func() { - s.peerConnected(conn, connReq, false, access) + s.peerConnected(conn, connReq, false) } } } @@ -4304,43 +4288,48 @@ func (s *server) SubscribeCustomMessages() (*subscribe.Client, error) { // notifyOpenChannelPeerEvent updates the access manager's maps and then calls // the channelNotifier's NotifyOpenChannelEvent. func (s *server) notifyOpenChannelPeerEvent(op wire.OutPoint, - remotePub *btcec.PublicKey) error { + remotePub *btcec.PublicKey) { // Call newOpenChan to update the access manager's maps for this peer. if err := s.peerAccessMan.newOpenChan(remotePub); err != nil { - return err + srvrLog.Errorf("Failed to update peer[%x] access status after "+ + "channel[%v] open", remotePub.SerializeCompressed(), op) } // Notify subscribers about this open channel event. s.channelNotifier.NotifyOpenChannelEvent(op) - - return nil } // notifyPendingOpenChannelPeerEvent updates the access manager's maps and then // calls the channelNotifier's NotifyPendingOpenChannelEvent. func (s *server) notifyPendingOpenChannelPeerEvent(op wire.OutPoint, - pendingChan *channeldb.OpenChannel, remotePub *btcec.PublicKey) error { + pendingChan *channeldb.OpenChannel, remotePub *btcec.PublicKey) { // Call newPendingOpenChan to update the access manager's maps for this // peer. if err := s.peerAccessMan.newPendingOpenChan(remotePub); err != nil { - return err + srvrLog.Errorf("Failed to update peer[%x] access status after "+ + "channel[%v] pending open", + remotePub.SerializeCompressed(), op) } // Notify subscribers about this event. s.channelNotifier.NotifyPendingOpenChannelEvent(op, pendingChan) - - return nil } // notifyFundingTimeoutPeerEvent updates the access manager's maps and then // calls the channelNotifier's NotifyFundingTimeout. func (s *server) notifyFundingTimeoutPeerEvent(op wire.OutPoint, - remotePub *btcec.PublicKey) error { + remotePub *btcec.PublicKey) { // Call newPendingCloseChan to potentially demote the peer. err := s.peerAccessMan.newPendingCloseChan(remotePub) + if err != nil { + srvrLog.Errorf("Failed to update peer[%x] access status after "+ + "channel[%v] pending close", + remotePub.SerializeCompressed(), op) + } + if errors.Is(err, ErrNoMoreRestrictedAccessSlots) { // If we encounter an error while attempting to disconnect the // peer, log the error. @@ -4351,8 +4340,6 @@ func (s *server) notifyFundingTimeoutPeerEvent(op wire.OutPoint, // Notify subscribers about this event. s.channelNotifier.NotifyFundingTimeout(op) - - return nil } // peerConnected is a function that handles initialization a newly connected @@ -4360,12 +4347,35 @@ func (s *server) notifyFundingTimeoutPeerEvent(op wire.OutPoint, // starting all the goroutines the peer needs to function properly. The inbound // boolean should be true if the peer initiated the connection to us. func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, - inbound bool, access peerAccessStatus) { + inbound bool) { brontideConn := conn.(*brontide.Conn) addr := conn.RemoteAddr() pubKey := brontideConn.RemotePub() + // Only restrict access for inbound connections, which means if the + // remote node's public key is banned or the restricted slots are used + // up, we will drop the connection. + // + // TODO(yy): Consider perform this check in + // `peerAccessMan.addPeerAccess`. + access, err := s.peerAccessMan.assignPeerPerms(pubKey) + if inbound && err != nil { + pubSer := pubKey.SerializeCompressed() + + // Clean up the persistent peer maps if we're dropping this + // connection. + s.bannedPersistentPeerConnection(string(pubSer)) + + srvrLog.Debugf("Dropping connection for %x since we are out "+ + "of restricted-access connection slots: %v.", pubSer, + err) + + conn.Close() + + return + } + srvrLog.Infof("Finalizing connection to %x@%s, inbound=%v", pubKey.SerializeCompressed(), addr, inbound) @@ -4503,7 +4513,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, p := peer.NewBrontide(pCfg) // Update the access manager with the access permission for this peer. - s.peerAccessMan.addPeerAccess(pubKey, access) + s.peerAccessMan.addPeerAccess(pubKey, access, inbound) // TODO(roasbeef): update IP address for link-node // * also mark last-seen, do it one single transaction? @@ -4965,7 +4975,8 @@ func (s *server) removePeer(p *peer.Brontide) { } // Remove the peer's access permission from the access manager. - s.peerAccessMan.removePeerAccess(p.IdentityKey()) + peerPubStr := string(p.IdentityKey().SerializeCompressed()) + s.peerAccessMan.removePeerAccess(peerPubStr) // Copy the peer's error buffer across to the server if it has any items // in it so that we can restore peer errors across connections. @@ -4999,7 +5010,11 @@ func (s *server) ConnectToPeer(addr *lnwire.NetAddress, // Ensure we're not already connected to this peer. peer, err := s.findPeerByPubStr(targetPub) - if err == nil { + + // When there's no error it means we already have a connection with this + // peer. If this is a dev environment with the `--unsafeconnect` flag + // set, we will ignore the existing connection and continue. + if err == nil && !s.cfg.Dev.GetUnsafeConnect() { s.mu.Unlock() return &errPeerAlreadyConnected{peer: peer} }