diff --git a/accessman.go b/accessman.go new file mode 100644 index 000000000..e1ccc5c34 --- /dev/null +++ b/accessman.go @@ -0,0 +1,384 @@ +package lnd + +import ( + "fmt" + "sync" + + "github.com/btcsuite/btcd/btcec/v2" + "github.com/lightningnetwork/lnd/channeldb" +) + +// accessMan is responsible for managing the server's access permissions. +type accessMan struct { + cfg *accessManConfig + + // banScoreMtx is used for the server's ban tracking. If the server + // mutex is also going to be locked, ensure that this is locked after + // the server mutex. + banScoreMtx sync.RWMutex + + // peerCounts is a mapping from remote public key to {bool, uint64} + // where the bool indicates that we have an open/closed channel with + // the peer and where the uint64 indicates the number of pending-open + // channels we currently have with them. This mapping will be used to + // determine access permissions for the peer. The map key is the + // string-version of the serialized public key. + // + // NOTE: This MUST be accessed with the banScoreMtx held. + peerCounts map[string]channeldb.ChanCount + + // peerScores stores each connected peer's access status. The map key + // is the string-version of the serialized public key. + // + // NOTE: This MUST be accessed with the banScoreMtx held. + peerScores map[string]peerSlotStatus + + // numRestricted tracks the number of peers with restricted access in + // peerScores. This MUST be accessed with the banScoreMtx held. + numRestricted int64 +} + +type accessManConfig struct { + // initAccessPerms checks the channeldb for initial access permissions + // and then populates the peerCounts and peerScores maps. + initAccessPerms func() (map[string]channeldb.ChanCount, error) + + // shouldDisconnect determines whether we should disconnect a peer or + // not. + shouldDisconnect func(*btcec.PublicKey) (bool, error) + + // maxRestrictedSlots is the number of restricted slots we'll allocate. + maxRestrictedSlots int64 +} + +func newAccessMan(cfg *accessManConfig) (*accessMan, error) { + a := &accessMan{ + cfg: cfg, + peerCounts: make(map[string]channeldb.ChanCount), + peerScores: make(map[string]peerSlotStatus), + } + + counts, err := a.cfg.initAccessPerms() + if err != nil { + return nil, err + } + + // We'll populate the server's peerCounts map with the counts fetched + // via initAccessPerms. Also note that we haven't yet connected to the + // peers. + for peerPub, count := range counts { + a.peerCounts[peerPub] = count + } + + return a, nil +} + +// assignPeerPerms assigns a new peer its permissions. This does not track the +// access in the maps. This is intentional. +func (a *accessMan) assignPeerPerms(remotePub *btcec.PublicKey) ( + peerAccessStatus, error) { + + // Default is restricted unless the below filters say otherwise. + access := peerStatusRestricted + + shouldDisconnect, err := a.cfg.shouldDisconnect(remotePub) + if err != nil { + // Access is restricted here. + return access, err + } + + if shouldDisconnect { + // Access is restricted here. + return access, ErrGossiperBan + } + + peerMapKey := string(remotePub.SerializeCompressed()) + + // Lock banScoreMtx for reading so that we can update the banning maps + // below. + a.banScoreMtx.RLock() + defer a.banScoreMtx.RUnlock() + + if count, found := a.peerCounts[peerMapKey]; found { + if count.HasOpenOrClosedChan { + access = peerStatusProtected + } else if count.PendingOpenCount != 0 { + access = peerStatusTemporary + } + } + + // If we've reached this point and access hasn't changed from + // restricted, then we need to check if we even have a slot for this + // peer. + if a.numRestricted >= a.cfg.maxRestrictedSlots && + access == peerStatusRestricted { + + return access, ErrNoMoreRestrictedAccessSlots + } + + return access, nil +} + +// newPendingOpenChan is called after the pending-open channel has been +// committed to the database. This may transition a restricted-access peer to a +// temporary-access peer. +func (a *accessMan) newPendingOpenChan(remotePub *btcec.PublicKey) error { + a.banScoreMtx.Lock() + defer a.banScoreMtx.Unlock() + + peerMapKey := string(remotePub.SerializeCompressed()) + + // Fetch the peer's access status from peerScores. + status, found := a.peerScores[peerMapKey] + if !found { + // If we didn't find the peer, we'll return an error. + return ErrNoPeerScore + } + + switch status.state { + case peerStatusProtected: + // If this peer's access status is protected, we don't need to + // do anything. + return nil + + case peerStatusTemporary: + // If this peer's access status is temporary, we'll need to + // update the peerCounts map. The peer's access status will + // stay temporary. + peerCount, found := a.peerCounts[peerMapKey] + if !found { + // Error if we did not find any info in peerCounts. + return ErrNoPendingPeerInfo + } + + // Increment the pending channel amount. + peerCount.PendingOpenCount += 1 + a.peerCounts[peerMapKey] = peerCount + + case peerStatusRestricted: + // If the peer's access status is restricted, then we can + // transition it to a temporary-access peer. We'll need to + // update numRestricted and also peerScores. We'll also need to + // update peerCounts. + peerCount := channeldb.ChanCount{ + HasOpenOrClosedChan: false, + PendingOpenCount: 1, + } + + a.peerCounts[peerMapKey] = peerCount + + // A restricted-access slot has opened up. + a.numRestricted -= 1 + + a.peerScores[peerMapKey] = peerSlotStatus{ + state: peerStatusTemporary, + } + + default: + // This should not be possible. + return fmt.Errorf("invalid peer access status") + } + + return nil +} + +// newPendingCloseChan is called when a pending-open channel prematurely closes +// before the funding transaction has confirmed. This potentially demotes a +// temporary-access peer to a restricted-access peer. If no restricted-access +// slots are available, the peer will be disconnected. +func (a *accessMan) newPendingCloseChan(remotePub *btcec.PublicKey) error { + a.banScoreMtx.Lock() + defer a.banScoreMtx.Unlock() + + peerMapKey := string(remotePub.SerializeCompressed()) + + // Fetch the peer's access status from peerScores. + status, found := a.peerScores[peerMapKey] + if !found { + return ErrNoPeerScore + } + + switch status.state { + case peerStatusProtected: + // If this peer is protected, we don't do anything. + return nil + + case peerStatusTemporary: + // If this peer is temporary, we need to check if it will + // revert to a restricted-access peer. + peerCount, found := a.peerCounts[peerMapKey] + if !found { + // Error if we did not find any info in peerCounts. + return ErrNoPendingPeerInfo + } + + currentNumPending := peerCount.PendingOpenCount - 1 + if currentNumPending == 0 { + // Remove the entry from peerCounts. + delete(a.peerCounts, peerMapKey) + + // If this is the only pending-open channel for this + // peer and it's getting removed, attempt to demote + // this peer to a restricted peer. + if a.numRestricted == a.cfg.maxRestrictedSlots { + // There are no available restricted slots, so + // we need to disconnect this peer. We leave + // this up to the caller. + return ErrNoMoreRestrictedAccessSlots + } + + // Otherwise, there is an available restricted-access + // slot, so we can demote this peer. + a.peerScores[peerMapKey] = peerSlotStatus{ + state: peerStatusRestricted, + } + + // Update numRestricted. + a.numRestricted++ + + return nil + } + + // Else, we don't need to demote this peer since it has other + // pending-open channels with us. + peerCount.PendingOpenCount = currentNumPending + a.peerCounts[peerMapKey] = peerCount + + return nil + + case peerStatusRestricted: + // This should not be possible. This indicates an error. + return fmt.Errorf("invalid peer access state transition") + + default: + // This should not be possible. + return fmt.Errorf("invalid peer access status") + } +} + +// newOpenChan is called when a pending-open channel becomes an open channel +// (i.e. the funding transaction has confirmed). If the remote peer is a +// temporary-access peer, it will be promoted to a protected-access peer. +func (a *accessMan) newOpenChan(remotePub *btcec.PublicKey) error { + a.banScoreMtx.Lock() + defer a.banScoreMtx.Unlock() + + peerMapKey := string(remotePub.SerializeCompressed()) + + // Fetch the peer's access status from peerScores. + status, found := a.peerScores[peerMapKey] + if !found { + // If we didn't find the peer, we'll return an error. + return ErrNoPeerScore + } + + switch status.state { + case peerStatusProtected: + // If the peer's state is already protected, we don't need to + // do anything more. + return nil + + case peerStatusTemporary: + // If the peer's state is temporary, we'll upgrade the peer to + // a protected peer. + peerCount, found := a.peerCounts[peerMapKey] + if !found { + // Error if we did not find any info in peerCounts. + return ErrNoPendingPeerInfo + } + + peerCount.HasOpenOrClosedChan = true + a.peerCounts[peerMapKey] = peerCount + + newStatus := peerSlotStatus{ + state: peerStatusProtected, + } + a.peerScores[peerMapKey] = newStatus + + return nil + + case peerStatusRestricted: + // This should not be possible. For the server to receive a + // state-transition event via NewOpenChan, the server must have + // previously granted this peer "temporary" access. This + // temporary access would not have been revoked or downgraded + // without `CloseChannel` being called with the pending + // argument set to true. This means that an open-channel state + // transition would be impossible. Therefore, we can return an + // error. + return fmt.Errorf("invalid peer access status") + + default: + // This should not be possible. + return fmt.Errorf("invalid peer access status") + } +} + +// checkIncomingConnBanScore checks whether, given the remote's public hex- +// encoded key, we should not accept this incoming connection or immediately +// disconnect. This does not assign to the server's peerScores maps. This is +// just an inbound filter that the brontide listeners use. +func (a *accessMan) checkIncomingConnBanScore(remotePub *btcec.PublicKey) ( + bool, error) { + + a.banScoreMtx.RLock() + defer a.banScoreMtx.RUnlock() + + peerMapKey := string(remotePub.SerializeCompressed()) + + if _, found := a.peerCounts[peerMapKey]; !found { + // Check numRestricted to see if there is an available slot. In + // the future, it's possible to add better heuristics. + if a.numRestricted < a.cfg.maxRestrictedSlots { + // There is an available slot. + return true, nil + } + + // If there are no slots left, then we reject this connection. + return false, ErrNoMoreRestrictedAccessSlots + } + + // Else, the peer is either protected or temporary. + return true, nil +} + +// addPeerAccess tracks a peer's access in the maps. This should be called when +// the peer has fully connected. +func (a *accessMan) addPeerAccess(remotePub *btcec.PublicKey, + access peerAccessStatus) { + + // Add the remote public key to peerScores. + a.banScoreMtx.Lock() + defer a.banScoreMtx.Unlock() + + peerMapKey := string(remotePub.SerializeCompressed()) + + a.peerScores[peerMapKey] = peerSlotStatus{state: access} + + // Increment numRestricted. + if access == peerStatusRestricted { + a.numRestricted++ + } +} + +// removePeerAccess removes the peer's access from the maps. This should be +// called when the peer has been disconnected. +func (a *accessMan) removePeerAccess(remotePub *btcec.PublicKey) { + a.banScoreMtx.Lock() + defer a.banScoreMtx.Unlock() + + peerMapKey := string(remotePub.SerializeCompressed()) + + status, found := a.peerScores[peerMapKey] + if !found { + return + } + + if status.state == peerStatusRestricted { + // If the status is restricted, then we decrement from + // numRestrictedSlots. + a.numRestricted-- + } + + delete(a.peerScores, peerMapKey) +} diff --git a/accessman_test.go b/accessman_test.go new file mode 100644 index 000000000..ee638cf13 --- /dev/null +++ b/accessman_test.go @@ -0,0 +1,153 @@ +package lnd + +import ( + "testing" + + "github.com/btcsuite/btcd/btcec/v2" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/stretchr/testify/require" +) + +// assertInboundConnection asserts that we're able to accept an inbound +// connection successfully without any access permissions being violated. +func assertInboundConnection(t *testing.T, a *accessMan, + remotePub *btcec.PublicKey, status peerAccessStatus) { + + remotePubSer := string(remotePub.SerializeCompressed()) + + isSlotAvailable, err := a.checkIncomingConnBanScore(remotePub) + require.NoError(t, err) + require.True(t, isSlotAvailable) + + peerAccess, err := a.assignPeerPerms(remotePub) + require.NoError(t, err) + require.Equal(t, status, peerAccess) + + a.addPeerAccess(remotePub, peerAccess) + peerScore, ok := a.peerScores[remotePubSer] + require.True(t, ok) + require.Equal(t, status, peerScore.state) +} + +func assertAccessState(t *testing.T, a *accessMan, remotePub *btcec.PublicKey, + expectedStatus peerAccessStatus) { + + remotePubSer := string(remotePub.SerializeCompressed()) + peerScore, ok := a.peerScores[remotePubSer] + require.True(t, ok) + require.Equal(t, expectedStatus, peerScore.state) +} + +// TestAccessManRestrictedSlots tests that the configurable number of +// restricted slots are properly allocated. It also tests that certain peers +// with access permissions are allowed to bypass the slot mechanism. +func TestAccessManRestrictedSlots(t *testing.T) { + t.Parallel() + + // We'll pre-populate the map to mock the database fetch. We'll make + // three peers. One has an open/closed channel. One has both an open + // / closed channel and a pending channel. The last one has only a + // pending channel. + peerPriv1, err := btcec.NewPrivateKey() + require.NoError(t, err) + peerKey1 := peerPriv1.PubKey() + peerKeySer1 := string(peerKey1.SerializeCompressed()) + + peerPriv2, err := btcec.NewPrivateKey() + require.NoError(t, err) + peerKey2 := peerPriv2.PubKey() + peerKeySer2 := string(peerKey2.SerializeCompressed()) + + peerPriv3, err := btcec.NewPrivateKey() + require.NoError(t, err) + peerKey3 := peerPriv3.PubKey() + peerKeySer3 := string(peerKey3.SerializeCompressed()) + + initPerms := func() (map[string]channeldb.ChanCount, error) { + return map[string]channeldb.ChanCount{ + peerKeySer1: { + HasOpenOrClosedChan: true, + }, + peerKeySer2: { + HasOpenOrClosedChan: true, + PendingOpenCount: 1, + }, + peerKeySer3: { + HasOpenOrClosedChan: false, + PendingOpenCount: 1, + }, + }, nil + } + + disconnect := func(*btcec.PublicKey) (bool, error) { + return false, nil + } + + cfg := &accessManConfig{ + initAccessPerms: initPerms, + shouldDisconnect: disconnect, + maxRestrictedSlots: 1, + } + + a, err := newAccessMan(cfg) + require.NoError(t, err) + + // Check that the peerCounts map is correctly populated with three + // peers. + require.Equal(t, 0, int(a.numRestricted)) + require.Equal(t, 3, len(a.peerCounts)) + + peerCount1, ok := a.peerCounts[peerKeySer1] + require.True(t, ok) + require.True(t, peerCount1.HasOpenOrClosedChan) + require.Equal(t, 0, int(peerCount1.PendingOpenCount)) + + peerCount2, ok := a.peerCounts[peerKeySer2] + require.True(t, ok) + require.True(t, peerCount2.HasOpenOrClosedChan) + require.Equal(t, 1, int(peerCount2.PendingOpenCount)) + + peerCount3, ok := a.peerCounts[peerKeySer3] + require.True(t, ok) + require.False(t, peerCount3.HasOpenOrClosedChan) + require.Equal(t, 1, int(peerCount3.PendingOpenCount)) + + // We'll now start to connect the peers. We'll add a new fourth peer + // that will take up the restricted slot. The first three peers should + // be able to bypass this restricted slot mechanism. + peerPriv4, err := btcec.NewPrivateKey() + require.NoError(t, err) + peerKey4 := peerPriv4.PubKey() + + // Follow the normal process of an incoming connection. We check if we + // can accommodate this peer in checkIncomingConnBanScore and then we + // assign its access permissions and then insert into the map. + assertInboundConnection(t, a, peerKey4, peerStatusRestricted) + + // Connect the three peers. This should happen without any issue. + assertInboundConnection(t, a, peerKey1, peerStatusProtected) + assertInboundConnection(t, a, peerKey2, peerStatusProtected) + assertInboundConnection(t, a, peerKey3, peerStatusTemporary) + + // Check that a pending-open channel promotes the restricted peer. + err = a.newPendingOpenChan(peerKey4) + require.NoError(t, err) + assertAccessState(t, a, peerKey4, peerStatusTemporary) + + // Check that an open channel promotes the temporary peer. + err = a.newOpenChan(peerKey3) + require.NoError(t, err) + assertAccessState(t, a, peerKey3, peerStatusProtected) + + // We should be able to accommodate a new peer. + peerPriv5, err := btcec.NewPrivateKey() + require.NoError(t, err) + peerKey5 := peerPriv5.PubKey() + + assertInboundConnection(t, a, peerKey5, peerStatusRestricted) + + // Check that a pending-close channel event for peer 4 demotes the + // peer. + err = a.newPendingCloseChan(peerKey4) + require.ErrorIs(t, err, ErrNoMoreRestrictedAccessSlots) +} diff --git a/brontide/listener.go b/brontide/listener.go index ba210853c..4d386c1c5 100644 --- a/brontide/listener.go +++ b/brontide/listener.go @@ -7,12 +7,13 @@ import ( "net" "time" + "github.com/btcsuite/btcd/btcec/v2" "github.com/lightningnetwork/lnd/keychain" ) // defaultHandshakes is the maximum number of handshakes that can be done in // parallel. -const defaultHandshakes = 1000 +const defaultHandshakes = 50 // Listener is an implementation of a net.Conn which executes an authenticated // key exchange and message encryption protocol dubbed "Machine" after @@ -24,6 +25,10 @@ type Listener struct { tcp *net.TCPListener + // shouldAccept is a closure that determines if we should accept the + // incoming connection or not based on its public key. + shouldAccept func(*btcec.PublicKey) (bool, error) + handshakeSema chan struct{} conns chan maybeConn quit chan struct{} @@ -34,8 +39,8 @@ var _ net.Listener = (*Listener)(nil) // NewListener returns a new net.Listener which enforces the Brontide scheme // during both initial connection establishment and data transfer. -func NewListener(localStatic keychain.SingleKeyECDH, - listenAddr string) (*Listener, error) { +func NewListener(localStatic keychain.SingleKeyECDH, listenAddr string, + shouldAccept func(*btcec.PublicKey) (bool, error)) (*Listener, error) { addr, err := net.ResolveTCPAddr("tcp", listenAddr) if err != nil { @@ -50,6 +55,7 @@ func NewListener(localStatic keychain.SingleKeyECDH, brontideListener := &Listener{ localStatic: localStatic, tcp: l, + shouldAccept: shouldAccept, handshakeSema: make(chan struct{}, defaultHandshakes), conns: make(chan maybeConn), quit: make(chan struct{}), @@ -193,6 +199,28 @@ func (l *Listener) doHandshake(conn net.Conn) { return } + // Call the shouldAccept closure to see if the remote node's public key + // is allowed according to our banning heuristic. This is here because + // we do not learn the remote node's public static key until we've + // received and validated Act 3. + remoteKey := brontideConn.RemotePub() + if remoteKey == nil { + connErr := fmt.Errorf("no remote pubkey") + brontideConn.conn.Close() + l.rejectConn(rejectedConnErr(connErr, remoteAddr)) + + return + } + + accepted, acceptErr := l.shouldAccept(remoteKey) + if !accepted { + // Reject the connection. + brontideConn.conn.Close() + l.rejectConn(rejectedConnErr(acceptErr, remoteAddr)) + + return + } + l.acceptConn(brontideConn) } @@ -255,3 +283,9 @@ func (l *Listener) Close() error { func (l *Listener) Addr() net.Addr { return l.tcp.Addr() } + +// DisabledBanClosure is used in places that NewListener is invoked to bypass +// the ban-scoring. +func DisabledBanClosure(p *btcec.PublicKey) (bool, error) { + return true, nil +} diff --git a/brontide/noise_test.go b/brontide/noise_test.go index fc4ff99e1..cd4dc4cd9 100644 --- a/brontide/noise_test.go +++ b/brontide/noise_test.go @@ -35,7 +35,7 @@ func makeListener() (*Listener, *lnwire.NetAddress, error) { addr := "localhost:0" // Our listener will be local, and the connection remote. - listener, err := NewListener(localKeyECDH, addr) + listener, err := NewListener(localKeyECDH, addr, DisabledBanClosure) if err != nil { return nil, nil, err } diff --git a/config.go b/config.go index 6418d9a68..1a06ed765 100644 --- a/config.go +++ b/config.go @@ -238,6 +238,10 @@ const ( // defaultHTTPHeaderTimeout is the default timeout for HTTP requests. DefaultHTTPHeaderTimeout = 5 * time.Second + // DefaultNumRestrictedSlots is the default number of restricted slots + // we'll allocate in the server. + DefaultNumRestrictedSlots = 30 + // BitcoinChainName is a string that represents the Bitcoin blockchain. BitcoinChainName = "bitcoin" @@ -518,6 +522,10 @@ type Config struct { // HTTPHeaderTimeout is the maximum duration that the server will wait // before timing out reading the headers of an HTTP request. HTTPHeaderTimeout time.Duration `long:"http-header-timeout" description:"The maximum duration that the server will wait before timing out reading the headers of an HTTP request."` + + // NumRestrictedSlots is the number of restricted slots we'll allocate + // in the server. + NumRestrictedSlots uint64 `long:"num-restricted-slots" description:"The number of restricted slots we'll allocate in the server."` } // GRPCConfig holds the configuration options for the gRPC server. @@ -735,9 +743,10 @@ func DefaultConfig() Config { ServerPingTimeout: defaultGrpcServerPingTimeout, ClientPingMinWait: defaultGrpcClientPingMinWait, }, - LogConfig: build.DefaultLogConfig(), - WtClient: lncfg.DefaultWtClientCfg(), - HTTPHeaderTimeout: DefaultHTTPHeaderTimeout, + LogConfig: build.DefaultLogConfig(), + WtClient: lncfg.DefaultWtClientCfg(), + HTTPHeaderTimeout: DefaultHTTPHeaderTimeout, + NumRestrictedSlots: DefaultNumRestrictedSlots, } } diff --git a/discovery/syncer.go b/discovery/syncer.go index 6b19908d5..991bd75d3 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -181,6 +181,9 @@ const ( // requestBatchSize is the maximum number of channels we will query the // remote peer for in a QueryShortChanIDs message. requestBatchSize = 500 + + // syncerBufferSize is the size of the syncer's buffers. + syncerBufferSize = 5 ) var ( @@ -436,8 +439,8 @@ func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer { rateLimiter: rateLimiter, syncTransitionReqs: make(chan *syncTransitionReq), historicalSyncReqs: make(chan *historicalSyncReq), - gossipMsgs: make(chan lnwire.Message, 100), - queryMsgs: make(chan lnwire.Message, 100), + gossipMsgs: make(chan lnwire.Message, syncerBufferSize), + queryMsgs: make(chan lnwire.Message, syncerBufferSize), syncerSema: sema, quit: make(chan struct{}), } diff --git a/funding/manager.go b/funding/manager.go index 6e81c6f8b..86f49e0eb 100644 --- a/funding/manager.go +++ b/funding/manager.go @@ -105,7 +105,7 @@ const ( // pendingChansLimit is the maximum number of pending channels that we // can have. After this point, pending channel opens will start to be // rejected. - pendingChansLimit = 1_000 + pendingChansLimit = 50 ) var ( diff --git a/peer/brontide.go b/peer/brontide.go index f0399b4e8..2e09aeaad 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -90,6 +90,9 @@ const ( // torTimeoutMultiplier is the scaling factor we use on network timeouts // for Tor peers. torTimeoutMultiplier = 3 + + // msgStreamSize is the size of the message streams. + msgStreamSize = 5 ) var ( @@ -1856,7 +1859,7 @@ func newChanMsgStream(p *Brontide, cid lnwire.ChannelID) *msgStream { return newMsgStream(p, fmt.Sprintf("Update stream for ChannelID(%x) created", cid[:]), fmt.Sprintf("Update stream for ChannelID(%x) exiting", cid[:]), - 1000, + msgStreamSize, apply, ) } @@ -1875,7 +1878,7 @@ func newDiscMsgStream(p *Brontide) *msgStream { p, "Update stream for gossiper created", "Update stream for gossiper exited", - 1000, + msgStreamSize, apply, ) } diff --git a/server.go b/server.go index 01019583f..65672c9e1 100644 --- a/server.go +++ b/server.go @@ -140,6 +140,25 @@ var ( // // Per blip04: January 1, 2026 12:00:00 AM UTC in unix seconds. EndorsementExperimentEnd = time.Unix(1767225600, 0) + + // ErrGossiperBan is one of the errors that can be returned when we + // attempt to finalize a connection to a remote peer. + ErrGossiperBan = errors.New("gossiper has banned remote's key") + + // ErrNoMoreRestrictedAccessSlots is one of the errors that can be + // returned when we attempt to finalize a connection. It means that + // this peer has no pending-open, open, or closed channels with us and + // are already at our connection ceiling for a peer with this access + // status. + ErrNoMoreRestrictedAccessSlots = errors.New("no more restricted slots") + + // ErrNoPeerScore is returned when we expect to find a score in + // peerScores, but one does not exist. + ErrNoPeerScore = errors.New("peer score not found") + + // ErrNoPendingPeerInfo is returned when we couldn't find any pending + // peer info. + ErrNoPendingPeerInfo = errors.New("no pending peer info") ) // errPeerAlreadyConnected is an error returned by the server when we're @@ -155,6 +174,33 @@ func (e *errPeerAlreadyConnected) Error() string { return fmt.Sprintf("already connected to peer: %v", e.peer) } +// peerAccessStatus denotes the p2p access status of a given peer. This will be +// used to assign peer ban scores that determine an action the server will +// take. +type peerAccessStatus int + +const ( + // peerStatusRestricted indicates that the peer only has access to the + // limited number of "free" reserved slots. + peerStatusRestricted peerAccessStatus = iota + + // peerStatusTemporary indicates that the peer only has temporary p2p + // access to the server. + peerStatusTemporary + + // peerStatusProtected indicates that the peer has been granted + // permanent p2p access to the server. The peer can still have its + // access revoked. + peerStatusProtected +) + +// peerSlotStatus determines whether a peer gets access to one of our free +// slots or gets to bypass this safety mechanism. +type peerSlotStatus struct { + // state determines which privileges the peer has with our server. + state peerAccessStatus +} + // server is the main server of the Lightning Network Daemon. The server houses // global state pertaining to the wallet, database, and the rpcserver. // Additionally, the server is also used as a central messaging bus to interact @@ -360,6 +406,9 @@ type server struct { // of new blocks. blockbeatDispatcher *chainio.BlockbeatDispatcher + // peerAccessMan implements peer access controls. + peerAccessMan *accessMan + quit chan struct{} wg sync.WaitGroup @@ -525,19 +574,6 @@ func newServer(cfg *Config, listenAddrs []net.Addr, ) ) - listeners := make([]net.Listener, len(listenAddrs)) - for i, listenAddr := range listenAddrs { - // Note: though brontide.NewListener uses ResolveTCPAddr, it - // doesn't need to call the general lndResolveTCP function - // since we are resolving a local address. - listeners[i], err = brontide.NewListener( - nodeKeyECDH, listenAddr.String(), - ) - if err != nil { - return nil, err - } - } - var serializedPubKey [33]byte copy(serializedPubKey[:], nodeKeyDesc.PubKey.SerializeCompressed()) @@ -1142,6 +1178,26 @@ func newServer(cfg *Config, listenAddrs []net.Addr, AssumeChannelValid: cfg.Routing.AssumeChannelValid, }, nodeKeyDesc) + accessCfg := &accessManConfig{ + initAccessPerms: func() (map[string]channeldb.ChanCount, + error) { + + genesisHash := *s.cfg.ActiveNetParams.GenesisHash + return s.chanStateDB.FetchPermAndTempPeers( + genesisHash[:], + ) + }, + shouldDisconnect: s.authGossiper.ShouldDisconnect, + maxRestrictedSlots: int64(s.cfg.NumRestrictedSlots), + } + + peerAccessMan, err := newAccessMan(accessCfg) + if err != nil { + return nil, err + } + + s.peerAccessMan = peerAccessMan + selfVertex := route.Vertex(nodeKeyDesc.PubKey.SerializeCompressed()) //nolint:ll s.localChanMgr = &localchans.Manager{ @@ -1816,6 +1872,23 @@ func newServer(cfg *Config, listenAddrs []net.Addr, // Create liveness monitor. s.createLivenessMonitor(cfg, cc, leaderElector) + listeners := make([]net.Listener, len(listenAddrs)) + for i, listenAddr := range listenAddrs { + // Note: though brontide.NewListener uses ResolveTCPAddr, it + // doesn't need to call the general lndResolveTCP function + // since we are resolving a local address. + + // RESOLVE: We are actually partially accepting inbound + // connection requests when we call NewListener. + listeners[i], err = brontide.NewListener( + nodeKeyECDH, listenAddr.String(), + s.peerAccessMan.checkIncomingConnBanScore, + ) + if err != nil { + return nil, err + } + } + // Create the connection manager which will be responsible for // maintaining persistent outbound connections and also accepting new // incoming connections @@ -2397,6 +2470,11 @@ func (s *server) Start() error { s.connMgr.Stop() return nil }) + + // RESOLVE: s.connMgr.Start() is called here, but + // brontide.NewListener() is called in newServer. This means + // that we are actually listening and partially accepting + // inbound connections even before the connMgr starts. s.connMgr.Start() // If peers are specified as a config option, we'll add those @@ -3622,6 +3700,23 @@ func (s *server) prunePersistentPeerConnection(compressedPubKey [33]byte) { s.mu.Unlock() } +// bannedPersistentPeerConnection does not actually "ban" a persistent peer. It +// is instead used to remove persistent peer state for a peer that has been +// disconnected for good cause by the server. Currently, a gossip ban from +// sending garbage and the server running out of restricted-access +// (i.e. "free") connection slots are the only way this logic gets hit. In the +// future, this function may expand when more ban criteria is added. +// +// NOTE: The server's write lock MUST be held when this is called. +func (s *server) bannedPersistentPeerConnection(remotePub string) { + if perm, ok := s.persistentPeers[remotePub]; ok && !perm { + delete(s.persistentPeers, remotePub) + delete(s.persistentPeersBackoff, remotePub) + delete(s.persistentPeerAddrs, remotePub) + s.cancelConnReqs(remotePub, nil) + } +} + // BroadcastMessage sends a request to the server to broadcast a set of // messages to all peers other than the one specified by the `skips` parameter. // All messages sent via BroadcastMessage will be queued for lazy delivery to @@ -3875,18 +3970,15 @@ func (s *server) InboundPeerConnected(conn net.Conn) { defer s.mu.Unlock() // If the remote node's public key is banned, drop the connection. - shouldDc, dcErr := s.authGossiper.ShouldDisconnect(nodePub) - if dcErr != nil { - srvrLog.Errorf("Unable to check if we should disconnect "+ - "peer: %v", dcErr) - conn.Close() + access, err := s.peerAccessMan.assignPeerPerms(nodePub) + if err != nil { + // Clean up the persistent peer maps if we're dropping this + // connection. + s.bannedPersistentPeerConnection(pubStr) - return - } - - if shouldDc { - srvrLog.Debugf("Dropping connection for %v since they are "+ - "banned.", pubSer) + srvrLog.Debugf("Dropping connection for %x since we are out "+ + "of restricted-access connection slots: %v.", pubSer, + err) conn.Close() @@ -3927,7 +4019,7 @@ func (s *server) InboundPeerConnected(conn net.Conn) { // We were unable to locate an existing connection with the // target peer, proceed to connect. s.cancelConnReqs(pubStr, nil) - s.peerConnected(conn, nil, true) + s.peerConnected(conn, nil, true, access) case nil: // We already have a connection with the incoming peer. If the @@ -3959,7 +4051,7 @@ func (s *server) InboundPeerConnected(conn net.Conn) { s.removePeer(connectedPeer) s.ignorePeerTermination[connectedPeer] = struct{}{} s.scheduledPeerConnection[pubStr] = func() { - s.peerConnected(conn, nil, true) + s.peerConnected(conn, nil, true, access) } } } @@ -3984,19 +4076,15 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) s.mu.Lock() defer s.mu.Unlock() - // If the remote node's public key is banned, drop the connection. - shouldDc, dcErr := s.authGossiper.ShouldDisconnect(nodePub) - if dcErr != nil { - srvrLog.Errorf("Unable to check if we should disconnect "+ - "peer: %v", dcErr) - conn.Close() + access, err := s.peerAccessMan.assignPeerPerms(nodePub) + if err != nil { + // Clean up the persistent peer maps if we're dropping this + // connection. + s.bannedPersistentPeerConnection(pubStr) - return - } - - if shouldDc { - srvrLog.Debugf("Dropping connection for %v since they are "+ - "banned.", pubSer) + srvrLog.Debugf("Dropping connection for %x since we are out "+ + "of restricted-access connection slots: %v.", pubSer, + err) if connReq != nil { s.connMgr.Remove(connReq.ID()) @@ -4065,7 +4153,7 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) case ErrPeerNotConnected: // We were unable to locate an existing connection with the // target peer, proceed to connect. - s.peerConnected(conn, connReq, false) + s.peerConnected(conn, connReq, false, access) case nil: // We already have a connection with the incoming peer. If the @@ -4099,7 +4187,7 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) s.removePeer(connectedPeer) s.ignorePeerTermination[connectedPeer] = struct{}{} s.scheduledPeerConnection[pubStr] = func() { - s.peerConnected(conn, connReq, false) + s.peerConnected(conn, connReq, false, access) } } } @@ -4173,12 +4261,66 @@ func (s *server) SubscribeCustomMessages() (*subscribe.Client, error) { return s.customMessageServer.Subscribe() } +// notifyOpenChannelPeerEvent updates the access manager's maps and then calls +// the channelNotifier's NotifyOpenChannelEvent. +func (s *server) notifyOpenChannelPeerEvent(op wire.OutPoint, + remotePub *btcec.PublicKey) error { + + // Call newOpenChan to update the access manager's maps for this peer. + if err := s.peerAccessMan.newOpenChan(remotePub); err != nil { + return err + } + + // Notify subscribers about this open channel event. + s.channelNotifier.NotifyOpenChannelEvent(op) + + return nil +} + +// notifyPendingOpenChannelPeerEvent updates the access manager's maps and then +// calls the channelNotifier's NotifyPendingOpenChannelEvent. +func (s *server) notifyPendingOpenChannelPeerEvent(op wire.OutPoint, + pendingChan *channeldb.OpenChannel, remotePub *btcec.PublicKey) error { + + // Call newPendingOpenChan to update the access manager's maps for this + // peer. + if err := s.peerAccessMan.newPendingOpenChan(remotePub); err != nil { + return err + } + + // Notify subscribers about this event. + s.channelNotifier.NotifyPendingOpenChannelEvent(op, pendingChan) + + return nil +} + +// notifyFundingTimeoutPeerEvent updates the access manager's maps and then +// calls the channelNotifier's NotifyFundingTimeout. +func (s *server) notifyFundingTimeoutPeerEvent(op wire.OutPoint, + remotePub *btcec.PublicKey) error { + + // Call newPendingCloseChan to potentially demote the peer. + err := s.peerAccessMan.newPendingCloseChan(remotePub) + if errors.Is(err, ErrNoMoreRestrictedAccessSlots) { + // If we encounter an error while attempting to disconnect the + // peer, log the error. + if dcErr := s.DisconnectPeer(remotePub); dcErr != nil { + srvrLog.Errorf("Unable to disconnect peer: %v\n", err) + } + } + + // Notify subscribers about this event. + s.channelNotifier.NotifyFundingTimeout(op) + + return nil +} + // peerConnected is a function that handles initialization a newly connected // peer by adding it to the server's global list of all active peers, and // starting all the goroutines the peer needs to function properly. The inbound // boolean should be true if the peer initiated the connection to us. func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, - inbound bool) { + inbound bool, access peerAccessStatus) { brontideConn := conn.(*brontide.Conn) addr := conn.RemoteAddr() @@ -4319,6 +4461,9 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, p := peer.NewBrontide(pCfg) + // Update the access manager with the access permission for this peer. + s.peerAccessMan.addPeerAccess(pubKey, access) + // TODO(roasbeef): update IP address for link-node // * also mark last-seen, do it one single transaction? @@ -4776,6 +4921,9 @@ func (s *server) removePeer(p *peer.Brontide) { delete(s.outboundPeers, pubStr) } + // Remove the peer's access permission from the access manager. + s.peerAccessMan.removePeerAccess(p.IdentityKey()) + // Copy the peer's error buffer across to the server if it has any items // in it so that we can restore peer errors across connections. if p.ErrorBuffer().Total() > 0 { diff --git a/watchtower/standalone.go b/watchtower/standalone.go index b973c9ce4..552001209 100644 --- a/watchtower/standalone.go +++ b/watchtower/standalone.go @@ -78,6 +78,7 @@ func New(cfg *Config) (*Standalone, error) { for _, listenAddr := range cfg.ListenAddrs { listener, err := brontide.NewListener( cfg.NodeKeyECDH, listenAddr.String(), + brontide.DisabledBanClosure, ) if err != nil { return nil, err