mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-06-29 10:09:08 +02:00
server.go+accessman.go: introduce caches for access permissions
Here we introduce the access manager which has caches that will determine the access control status of our peers. Peers that have had their funding transaction confirm with us are protected. Peers that only have pending-open channels with us are temporary access and can have their access revoked. The rest of the peers are granted restricted access.
This commit is contained in:
230
server.go
230
server.go
@ -140,6 +140,25 @@ var (
|
||||
//
|
||||
// Per blip04: January 1, 2026 12:00:00 AM UTC in unix seconds.
|
||||
EndorsementExperimentEnd = time.Unix(1767225600, 0)
|
||||
|
||||
// ErrGossiperBan is one of the errors that can be returned when we
|
||||
// attempt to finalize a connection to a remote peer.
|
||||
ErrGossiperBan = errors.New("gossiper has banned remote's key")
|
||||
|
||||
// ErrNoMoreRestrictedAccessSlots is one of the errors that can be
|
||||
// returned when we attempt to finalize a connection. It means that
|
||||
// this peer has no pending-open, open, or closed channels with us and
|
||||
// are already at our connection ceiling for a peer with this access
|
||||
// status.
|
||||
ErrNoMoreRestrictedAccessSlots = errors.New("no more restricted slots")
|
||||
|
||||
// ErrNoPeerScore is returned when we expect to find a score in
|
||||
// peerScores, but one does not exist.
|
||||
ErrNoPeerScore = errors.New("peer score not found")
|
||||
|
||||
// ErrNoPendingPeerInfo is returned when we couldn't find any pending
|
||||
// peer info.
|
||||
ErrNoPendingPeerInfo = errors.New("no pending peer info")
|
||||
)
|
||||
|
||||
// errPeerAlreadyConnected is an error returned by the server when we're
|
||||
@ -155,6 +174,33 @@ func (e *errPeerAlreadyConnected) Error() string {
|
||||
return fmt.Sprintf("already connected to peer: %v", e.peer)
|
||||
}
|
||||
|
||||
// peerAccessStatus denotes the p2p access status of a given peer. This will be
|
||||
// used to assign peer ban scores that determine an action the server will
|
||||
// take.
|
||||
type peerAccessStatus int
|
||||
|
||||
const (
|
||||
// peerStatusRestricted indicates that the peer only has access to the
|
||||
// limited number of "free" reserved slots.
|
||||
peerStatusRestricted peerAccessStatus = iota
|
||||
|
||||
// peerStatusTemporary indicates that the peer only has temporary p2p
|
||||
// access to the server.
|
||||
peerStatusTemporary
|
||||
|
||||
// peerStatusProtected indicates that the peer has been granted
|
||||
// permanent p2p access to the server. The peer can still have its
|
||||
// access revoked.
|
||||
peerStatusProtected
|
||||
)
|
||||
|
||||
// peerSlotStatus determines whether a peer gets access to one of our free
|
||||
// slots or gets to bypass this safety mechanism.
|
||||
type peerSlotStatus struct {
|
||||
// state determines which privileges the peer has with our server.
|
||||
state peerAccessStatus
|
||||
}
|
||||
|
||||
// server is the main server of the Lightning Network Daemon. The server houses
|
||||
// global state pertaining to the wallet, database, and the rpcserver.
|
||||
// Additionally, the server is also used as a central messaging bus to interact
|
||||
@ -360,6 +406,9 @@ type server struct {
|
||||
// of new blocks.
|
||||
blockbeatDispatcher *chainio.BlockbeatDispatcher
|
||||
|
||||
// peerAccessMan implements peer access controls.
|
||||
peerAccessMan *accessMan
|
||||
|
||||
quit chan struct{}
|
||||
|
||||
wg sync.WaitGroup
|
||||
@ -525,19 +574,6 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
|
||||
)
|
||||
)
|
||||
|
||||
listeners := make([]net.Listener, len(listenAddrs))
|
||||
for i, listenAddr := range listenAddrs {
|
||||
// Note: though brontide.NewListener uses ResolveTCPAddr, it
|
||||
// doesn't need to call the general lndResolveTCP function
|
||||
// since we are resolving a local address.
|
||||
listeners[i], err = brontide.NewListener(
|
||||
nodeKeyECDH, listenAddr.String(),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
var serializedPubKey [33]byte
|
||||
copy(serializedPubKey[:], nodeKeyDesc.PubKey.SerializeCompressed())
|
||||
|
||||
@ -1142,6 +1178,26 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
|
||||
AssumeChannelValid: cfg.Routing.AssumeChannelValid,
|
||||
}, nodeKeyDesc)
|
||||
|
||||
accessCfg := &accessManConfig{
|
||||
initAccessPerms: func() (map[string]channeldb.ChanCount,
|
||||
error) {
|
||||
|
||||
genesisHash := *s.cfg.ActiveNetParams.GenesisHash
|
||||
return s.chanStateDB.FetchPermAndTempPeers(
|
||||
genesisHash[:],
|
||||
)
|
||||
},
|
||||
shouldDisconnect: s.authGossiper.ShouldDisconnect,
|
||||
maxRestrictedSlots: int64(s.cfg.NumRestrictedSlots),
|
||||
}
|
||||
|
||||
peerAccessMan, err := newAccessMan(accessCfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.peerAccessMan = peerAccessMan
|
||||
|
||||
selfVertex := route.Vertex(nodeKeyDesc.PubKey.SerializeCompressed())
|
||||
//nolint:ll
|
||||
s.localChanMgr = &localchans.Manager{
|
||||
@ -1816,6 +1872,23 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
|
||||
// Create liveness monitor.
|
||||
s.createLivenessMonitor(cfg, cc, leaderElector)
|
||||
|
||||
listeners := make([]net.Listener, len(listenAddrs))
|
||||
for i, listenAddr := range listenAddrs {
|
||||
// Note: though brontide.NewListener uses ResolveTCPAddr, it
|
||||
// doesn't need to call the general lndResolveTCP function
|
||||
// since we are resolving a local address.
|
||||
|
||||
// RESOLVE: We are actually partially accepting inbound
|
||||
// connection requests when we call NewListener.
|
||||
listeners[i], err = brontide.NewListener(
|
||||
nodeKeyECDH, listenAddr.String(),
|
||||
s.peerAccessMan.checkIncomingConnBanScore,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Create the connection manager which will be responsible for
|
||||
// maintaining persistent outbound connections and also accepting new
|
||||
// incoming connections
|
||||
@ -2397,6 +2470,11 @@ func (s *server) Start() error {
|
||||
s.connMgr.Stop()
|
||||
return nil
|
||||
})
|
||||
|
||||
// RESOLVE: s.connMgr.Start() is called here, but
|
||||
// brontide.NewListener() is called in newServer. This means
|
||||
// that we are actually listening and partially accepting
|
||||
// inbound connections even before the connMgr starts.
|
||||
s.connMgr.Start()
|
||||
|
||||
// If peers are specified as a config option, we'll add those
|
||||
@ -3622,6 +3700,23 @@ func (s *server) prunePersistentPeerConnection(compressedPubKey [33]byte) {
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
// bannedPersistentPeerConnection does not actually "ban" a persistent peer. It
|
||||
// is instead used to remove persistent peer state for a peer that has been
|
||||
// disconnected for good cause by the server. Currently, a gossip ban from
|
||||
// sending garbage and the server running out of restricted-access
|
||||
// (i.e. "free") connection slots are the only way this logic gets hit. In the
|
||||
// future, this function may expand when more ban criteria is added.
|
||||
//
|
||||
// NOTE: The server's write lock MUST be held when this is called.
|
||||
func (s *server) bannedPersistentPeerConnection(remotePub string) {
|
||||
if perm, ok := s.persistentPeers[remotePub]; ok && !perm {
|
||||
delete(s.persistentPeers, remotePub)
|
||||
delete(s.persistentPeersBackoff, remotePub)
|
||||
delete(s.persistentPeerAddrs, remotePub)
|
||||
s.cancelConnReqs(remotePub, nil)
|
||||
}
|
||||
}
|
||||
|
||||
// BroadcastMessage sends a request to the server to broadcast a set of
|
||||
// messages to all peers other than the one specified by the `skips` parameter.
|
||||
// All messages sent via BroadcastMessage will be queued for lazy delivery to
|
||||
@ -3875,18 +3970,15 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// If the remote node's public key is banned, drop the connection.
|
||||
shouldDc, dcErr := s.authGossiper.ShouldDisconnect(nodePub)
|
||||
if dcErr != nil {
|
||||
srvrLog.Errorf("Unable to check if we should disconnect "+
|
||||
"peer: %v", dcErr)
|
||||
conn.Close()
|
||||
access, err := s.peerAccessMan.assignPeerPerms(nodePub)
|
||||
if err != nil {
|
||||
// Clean up the persistent peer maps if we're dropping this
|
||||
// connection.
|
||||
s.bannedPersistentPeerConnection(pubStr)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if shouldDc {
|
||||
srvrLog.Debugf("Dropping connection for %v since they are "+
|
||||
"banned.", pubSer)
|
||||
srvrLog.Debugf("Dropping connection for %x since we are out "+
|
||||
"of restricted-access connection slots: %v.", pubSer,
|
||||
err)
|
||||
|
||||
conn.Close()
|
||||
|
||||
@ -3927,7 +4019,7 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
|
||||
// We were unable to locate an existing connection with the
|
||||
// target peer, proceed to connect.
|
||||
s.cancelConnReqs(pubStr, nil)
|
||||
s.peerConnected(conn, nil, true)
|
||||
s.peerConnected(conn, nil, true, access)
|
||||
|
||||
case nil:
|
||||
// We already have a connection with the incoming peer. If the
|
||||
@ -3959,7 +4051,7 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
|
||||
s.removePeer(connectedPeer)
|
||||
s.ignorePeerTermination[connectedPeer] = struct{}{}
|
||||
s.scheduledPeerConnection[pubStr] = func() {
|
||||
s.peerConnected(conn, nil, true)
|
||||
s.peerConnected(conn, nil, true, access)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -3984,19 +4076,15 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// If the remote node's public key is banned, drop the connection.
|
||||
shouldDc, dcErr := s.authGossiper.ShouldDisconnect(nodePub)
|
||||
if dcErr != nil {
|
||||
srvrLog.Errorf("Unable to check if we should disconnect "+
|
||||
"peer: %v", dcErr)
|
||||
conn.Close()
|
||||
access, err := s.peerAccessMan.assignPeerPerms(nodePub)
|
||||
if err != nil {
|
||||
// Clean up the persistent peer maps if we're dropping this
|
||||
// connection.
|
||||
s.bannedPersistentPeerConnection(pubStr)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if shouldDc {
|
||||
srvrLog.Debugf("Dropping connection for %v since they are "+
|
||||
"banned.", pubSer)
|
||||
srvrLog.Debugf("Dropping connection for %x since we are out "+
|
||||
"of restricted-access connection slots: %v.", pubSer,
|
||||
err)
|
||||
|
||||
if connReq != nil {
|
||||
s.connMgr.Remove(connReq.ID())
|
||||
@ -4065,7 +4153,7 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
|
||||
case ErrPeerNotConnected:
|
||||
// We were unable to locate an existing connection with the
|
||||
// target peer, proceed to connect.
|
||||
s.peerConnected(conn, connReq, false)
|
||||
s.peerConnected(conn, connReq, false, access)
|
||||
|
||||
case nil:
|
||||
// We already have a connection with the incoming peer. If the
|
||||
@ -4099,7 +4187,7 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
|
||||
s.removePeer(connectedPeer)
|
||||
s.ignorePeerTermination[connectedPeer] = struct{}{}
|
||||
s.scheduledPeerConnection[pubStr] = func() {
|
||||
s.peerConnected(conn, connReq, false)
|
||||
s.peerConnected(conn, connReq, false, access)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -4173,12 +4261,66 @@ func (s *server) SubscribeCustomMessages() (*subscribe.Client, error) {
|
||||
return s.customMessageServer.Subscribe()
|
||||
}
|
||||
|
||||
// notifyOpenChannelPeerEvent updates the access manager's maps and then calls
|
||||
// the channelNotifier's NotifyOpenChannelEvent.
|
||||
func (s *server) notifyOpenChannelPeerEvent(op wire.OutPoint,
|
||||
remotePub *btcec.PublicKey) error {
|
||||
|
||||
// Call newOpenChan to update the access manager's maps for this peer.
|
||||
if err := s.peerAccessMan.newOpenChan(remotePub); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Notify subscribers about this open channel event.
|
||||
s.channelNotifier.NotifyOpenChannelEvent(op)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// notifyPendingOpenChannelPeerEvent updates the access manager's maps and then
|
||||
// calls the channelNotifier's NotifyPendingOpenChannelEvent.
|
||||
func (s *server) notifyPendingOpenChannelPeerEvent(op wire.OutPoint,
|
||||
pendingChan *channeldb.OpenChannel, remotePub *btcec.PublicKey) error {
|
||||
|
||||
// Call newPendingOpenChan to update the access manager's maps for this
|
||||
// peer.
|
||||
if err := s.peerAccessMan.newPendingOpenChan(remotePub); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Notify subscribers about this event.
|
||||
s.channelNotifier.NotifyPendingOpenChannelEvent(op, pendingChan)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// notifyFundingTimeoutPeerEvent updates the access manager's maps and then
|
||||
// calls the channelNotifier's NotifyFundingTimeout.
|
||||
func (s *server) notifyFundingTimeoutPeerEvent(op wire.OutPoint,
|
||||
remotePub *btcec.PublicKey) error {
|
||||
|
||||
// Call newPendingCloseChan to potentially demote the peer.
|
||||
err := s.peerAccessMan.newPendingCloseChan(remotePub)
|
||||
if errors.Is(err, ErrNoMoreRestrictedAccessSlots) {
|
||||
// If we encounter an error while attempting to disconnect the
|
||||
// peer, log the error.
|
||||
if dcErr := s.DisconnectPeer(remotePub); dcErr != nil {
|
||||
srvrLog.Errorf("Unable to disconnect peer: %v\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Notify subscribers about this event.
|
||||
s.channelNotifier.NotifyFundingTimeout(op)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// peerConnected is a function that handles initialization a newly connected
|
||||
// peer by adding it to the server's global list of all active peers, and
|
||||
// starting all the goroutines the peer needs to function properly. The inbound
|
||||
// boolean should be true if the peer initiated the connection to us.
|
||||
func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
|
||||
inbound bool) {
|
||||
inbound bool, access peerAccessStatus) {
|
||||
|
||||
brontideConn := conn.(*brontide.Conn)
|
||||
addr := conn.RemoteAddr()
|
||||
@ -4319,6 +4461,9 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
|
||||
|
||||
p := peer.NewBrontide(pCfg)
|
||||
|
||||
// Update the access manager with the access permission for this peer.
|
||||
s.peerAccessMan.addPeerAccess(pubKey, access)
|
||||
|
||||
// TODO(roasbeef): update IP address for link-node
|
||||
// * also mark last-seen, do it one single transaction?
|
||||
|
||||
@ -4776,6 +4921,9 @@ func (s *server) removePeer(p *peer.Brontide) {
|
||||
delete(s.outboundPeers, pubStr)
|
||||
}
|
||||
|
||||
// Remove the peer's access permission from the access manager.
|
||||
s.peerAccessMan.removePeerAccess(p.IdentityKey())
|
||||
|
||||
// Copy the peer's error buffer across to the server if it has any items
|
||||
// in it so that we can restore peer errors across connections.
|
||||
if p.ErrorBuffer().Total() > 0 {
|
||||
|
Reference in New Issue
Block a user