lnd+chanbackup: thread contexts through

Remove four context.TODO()s
This commit is contained in:
Elle Mouton
2024-11-13 10:12:27 +02:00
parent f36fbd0e45
commit 2192bf4155
13 changed files with 138 additions and 106 deletions

View File

@@ -506,7 +506,7 @@ func noiseDial(idKey keychain.SingleKeyECDH,
// newServer creates a new instance of the server which is to listen using the
// passed listener address.
func newServer(cfg *Config, listenAddrs []net.Addr,
func newServer(ctx context.Context, cfg *Config, listenAddrs []net.Addr,
dbs *DatabaseInstances, cc *chainreg.ChainControl,
nodeKeyDesc *keychain.KeyDescriptor,
chansToRestore walletunlocker.ChannelsToRecover,
@@ -1637,13 +1637,13 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
}
backupFile := chanbackup.NewMultiFile(cfg.BackupFilePath)
startingChans, err := chanbackup.FetchStaticChanBackups(
s.chanStateDB, s.addrSource,
ctx, s.chanStateDB, s.addrSource,
)
if err != nil {
return nil, err
}
s.chanSubSwapper, err = chanbackup.NewSubSwapper(
startingChans, chanNotifier, s.cc.KeyRing, backupFile,
ctx, startingChans, chanNotifier, s.cc.KeyRing, backupFile,
)
if err != nil {
return nil, err
@@ -1805,14 +1805,18 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
// maintaining persistent outbound connections and also accepting new
// incoming connections
cmgr, err := connmgr.New(&connmgr.Config{
Listeners: listeners,
OnAccept: s.InboundPeerConnected,
Listeners: listeners,
OnAccept: func(conn net.Conn) {
s.InboundPeerConnected(ctx, conn)
},
RetryDuration: time.Second * 5,
TargetOutbound: 100,
Dial: noiseDial(
nodeKeyECDH, s.cfg.net, s.cfg.ConnectionTimeout,
),
OnConnection: s.OutboundPeerConnected,
OnConnection: func(req *connmgr.ConnReq, conn net.Conn) {
s.OutboundPeerConnected(ctx, req, conn)
},
})
if err != nil {
return nil, err
@@ -2078,7 +2082,7 @@ func (c cleaner) run() {
// NOTE: This function is safe for concurrent access.
//
//nolint:funlen
func (s *server) Start() error {
func (s *server) Start(ctx context.Context) error {
var startErr error
// If one sub system fails to start, the following code ensures that the
@@ -2289,7 +2293,7 @@ func (s *server) Start() error {
}
if len(s.chansToRestore.PackedSingleChanBackups) != 0 {
_, err := chanbackup.UnpackAndRecoverSingles(
s.chansToRestore.PackedSingleChanBackups,
ctx, s.chansToRestore.PackedSingleChanBackups,
s.cc.KeyRing, chanRestorer, s,
)
if err != nil {
@@ -2300,7 +2304,7 @@ func (s *server) Start() error {
}
if len(s.chansToRestore.PackedMultiChanBackup) != 0 {
_, err := chanbackup.UnpackAndRecoverMulti(
s.chansToRestore.PackedMultiChanBackup,
ctx, s.chansToRestore.PackedMultiChanBackup,
s.cc.KeyRing, chanRestorer, s,
)
if err != nil {
@@ -2365,8 +2369,7 @@ func (s *server) Start() error {
}
err = s.ConnectToPeer(
peerAddr, true,
s.cfg.ConnectionTimeout,
ctx, peerAddr, true, s.cfg.ConnectionTimeout,
)
if err != nil {
startErr = fmt.Errorf("unable to connect to "+
@@ -2453,14 +2456,16 @@ func (s *server) Start() error {
// dedicated goroutine to maintain a set of persistent
// connections.
if shouldPeerBootstrap(s.cfg) {
bootstrappers, err := initNetworkBootstrappers(s)
bootstrappers, err := initNetworkBootstrappers(ctx, s)
if err != nil {
startErr = err
return
}
s.wg.Add(1)
go s.peerBootstrapper(defaultMinPeers, bootstrappers)
go s.peerBootstrapper(
ctx, defaultMinPeers, bootstrappers,
)
} else {
srvrLog.Infof("Auto peer bootstrapping is disabled")
}
@@ -2482,6 +2487,7 @@ func (s *server) Start() error {
// NOTE: This function is safe for concurrent access.
func (s *server) Stop() error {
s.stop.Do(func() {
ctx := context.Background()
atomic.StoreInt32(&s.stopping, 1)
close(s.quit)
@@ -2551,7 +2557,7 @@ func (s *server) Stop() error {
// Update channel.backup file. Make sure to do it before
// stopping chanSubSwapper.
singles, err := chanbackup.FetchStaticChanBackups(
s.chanStateDB, s.addrSource,
ctx, s.chanStateDB, s.addrSource,
)
if err != nil {
srvrLog.Warnf("failed to fetch channel states: %v",
@@ -2816,8 +2822,9 @@ out:
// initNetworkBootstrappers initializes a set of network peer bootstrappers
// based on the server, and currently active bootstrap mechanisms as defined
// within the current configuration.
func initNetworkBootstrappers(s *server) ([]discovery.NetworkPeerBootstrapper, error) {
ctx := context.TODO()
func initNetworkBootstrappers(ctx context.Context,
s *server) ([]discovery.NetworkPeerBootstrapper, error) {
srvrLog.Infof("Initializing peer network bootstrappers!")
var bootStrappers []discovery.NetworkPeerBootstrapper
@@ -2890,7 +2897,7 @@ func (s *server) createBootstrapIgnorePeers() map[autopilot.NodeID]struct{} {
// invariant, we ensure that our node is connected to a diverse set of peers
// and that nodes newly joining the network receive an up to date network view
// as soon as possible.
func (s *server) peerBootstrapper(numTargetPeers uint32,
func (s *server) peerBootstrapper(ctx context.Context, numTargetPeers uint32,
bootstrappers []discovery.NetworkPeerBootstrapper) {
defer s.wg.Done()
@@ -2900,7 +2907,7 @@ func (s *server) peerBootstrapper(numTargetPeers uint32,
// We'll start off by aggressively attempting connections to peers in
// order to be a part of the network as soon as possible.
s.initialPeerBootstrap(ignoreList, numTargetPeers, bootstrappers)
s.initialPeerBootstrap(ctx, ignoreList, numTargetPeers, bootstrappers)
// Once done, we'll attempt to maintain our target minimum number of
// peers.
@@ -2978,7 +2985,7 @@ func (s *server) peerBootstrapper(numTargetPeers uint32,
ignoreList = s.createBootstrapIgnorePeers()
peerAddrs, err := discovery.MultiSourceBootstrap(
ignoreList, numNeeded*2, bootstrappers...,
ctx, ignoreList, numNeeded*2, bootstrappers...,
)
if err != nil {
srvrLog.Errorf("Unable to retrieve bootstrap "+
@@ -2996,7 +3003,7 @@ func (s *server) peerBootstrapper(numTargetPeers uint32,
// country diversity, etc
errChan := make(chan error, 1)
s.connectToPeer(
a, errChan,
ctx, a, errChan,
s.cfg.ConnectionTimeout,
)
select {
@@ -3027,8 +3034,8 @@ const bootstrapBackOffCeiling = time.Minute * 5
// initialPeerBootstrap attempts to continuously connect to peers on startup
// until the target number of peers has been reached. This ensures that nodes
// receive an up to date network view as soon as possible.
func (s *server) initialPeerBootstrap(ignore map[autopilot.NodeID]struct{},
numTargetPeers uint32,
func (s *server) initialPeerBootstrap(ctx context.Context,
ignore map[autopilot.NodeID]struct{}, numTargetPeers uint32,
bootstrappers []discovery.NetworkPeerBootstrapper) {
srvrLog.Debugf("Init bootstrap with targetPeers=%v, bootstrappers=%v, "+
@@ -3087,7 +3094,7 @@ func (s *server) initialPeerBootstrap(ignore map[autopilot.NodeID]struct{},
// in order to reach our target.
peersNeeded := numTargetPeers - numActivePeers
bootstrapAddrs, err := discovery.MultiSourceBootstrap(
ignore, peersNeeded, bootstrappers...,
ctx, ignore, peersNeeded, bootstrappers...,
)
if err != nil {
srvrLog.Errorf("Unable to retrieve initial bootstrap "+
@@ -3105,7 +3112,8 @@ func (s *server) initialPeerBootstrap(ignore map[autopilot.NodeID]struct{},
errChan := make(chan error, 1)
go s.connectToPeer(
addr, errChan, s.cfg.ConnectionTimeout,
ctx, addr, errChan,
s.cfg.ConnectionTimeout,
)
// We'll only allow this connection attempt to
@@ -3783,7 +3791,7 @@ func shouldDropLocalConnection(local, remote *btcec.PublicKey) bool {
// connection.
//
// NOTE: This function is safe for concurrent access.
func (s *server) InboundPeerConnected(conn net.Conn) {
func (s *server) InboundPeerConnected(ctx context.Context, conn net.Conn) {
// Exit early if we have already been instructed to shutdown, this
// prevents any delayed callbacks from accidentally registering peers.
if s.Stopped() {
@@ -3853,7 +3861,7 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
// We were unable to locate an existing connection with the
// target peer, proceed to connect.
s.cancelConnReqs(pubStr, nil)
s.peerConnected(conn, nil, true)
s.peerConnected(ctx, conn, nil, true)
case nil:
// We already have a connection with the incoming peer. If the
@@ -3885,7 +3893,7 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
s.removePeer(connectedPeer)
s.ignorePeerTermination[connectedPeer] = struct{}{}
s.scheduledPeerConnection[pubStr] = func() {
s.peerConnected(conn, nil, true)
s.peerConnected(ctx, conn, nil, true)
}
}
}
@@ -3893,7 +3901,9 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
// OutboundPeerConnected initializes a new peer in response to a new outbound
// connection.
// NOTE: This function is safe for concurrent access.
func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) {
func (s *server) OutboundPeerConnected(ctx context.Context,
connReq *connmgr.ConnReq, conn net.Conn) {
// Exit early if we have already been instructed to shutdown, this
// prevents any delayed callbacks from accidentally registering peers.
if s.Stopped() {
@@ -3991,7 +4001,7 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
case ErrPeerNotConnected:
// We were unable to locate an existing connection with the
// target peer, proceed to connect.
s.peerConnected(conn, connReq, false)
s.peerConnected(ctx, conn, connReq, false)
case nil:
// We already have a connection with the incoming peer. If the
@@ -4025,7 +4035,7 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
s.removePeer(connectedPeer)
s.ignorePeerTermination[connectedPeer] = struct{}{}
s.scheduledPeerConnection[pubStr] = func() {
s.peerConnected(conn, connReq, false)
s.peerConnected(ctx, conn, connReq, false)
}
}
}
@@ -4103,8 +4113,8 @@ func (s *server) SubscribeCustomMessages() (*subscribe.Client, error) {
// peer by adding it to the server's global list of all active peers, and
// starting all the goroutines the peer needs to function properly. The inbound
// boolean should be true if the peer initiated the connection to us.
func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
inbound bool) {
func (s *server) peerConnected(ctx context.Context, conn net.Conn,
connReq *connmgr.ConnReq, inbound bool) {
brontideConn := conn.(*brontide.Conn)
addr := conn.RemoteAddr()
@@ -4258,7 +4268,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
// includes sending and receiving Init messages, which would be a DOS
// vector if we held the server's mutex throughout the procedure.
s.wg.Add(1)
go s.peerInitializer(p)
go s.peerInitializer(ctx, p)
}
// addPeer adds the passed peer to the server's global state of all active
@@ -4313,7 +4323,7 @@ func (s *server) addPeer(p *peer.Brontide) {
// be signaled of the new peer once the method returns.
//
// NOTE: This MUST be launched as a goroutine.
func (s *server) peerInitializer(p *peer.Brontide) {
func (s *server) peerInitializer(ctx context.Context, p *peer.Brontide) {
defer s.wg.Done()
pubBytes := p.IdentityKey().SerializeCompressed()
@@ -4337,7 +4347,7 @@ func (s *server) peerInitializer(p *peer.Brontide) {
// the peer is ever added to the ignorePeerTermination map, indicating
// that the server has already handled the removal of this peer.
s.wg.Add(1)
go s.peerTerminationWatcher(p, ready)
go s.peerTerminationWatcher(ctx, p, ready)
// Start the peer! If an error occurs, we Disconnect the peer, which
// will unblock the peerTerminationWatcher.
@@ -4382,7 +4392,9 @@ func (s *server) peerInitializer(p *peer.Brontide) {
// successfully, otherwise the peer should be disconnected instead.
//
// NOTE: This MUST be launched as a goroutine.
func (s *server) peerTerminationWatcher(p *peer.Brontide, ready chan struct{}) {
func (s *server) peerTerminationWatcher(ctx context.Context, p *peer.Brontide,
ready chan struct{}) {
defer s.wg.Done()
p.WaitForDisconnect(ready)
@@ -4471,7 +4483,7 @@ func (s *server) peerTerminationWatcher(p *peer.Brontide, ready chan struct{}) {
// We'll ensure that we locate all the peers advertised addresses for
// reconnection purposes.
advertisedAddrs, err := s.fetchNodeAdvertisedAddrs(pubKey)
advertisedAddrs, err := s.fetchNodeAdvertisedAddrs(ctx, pubKey)
switch {
// We found advertised addresses, so use them.
case err == nil:
@@ -4720,7 +4732,7 @@ func (s *server) removePeer(p *peer.Brontide) {
// connection is established, or the initial handshake process fails.
//
// NOTE: This function is safe for concurrent access.
func (s *server) ConnectToPeer(addr *lnwire.NetAddress,
func (s *server) ConnectToPeer(ctx context.Context, addr *lnwire.NetAddress,
perm bool, timeout time.Duration) error {
targetPub := string(addr.IdentityKey.SerializeCompressed())
@@ -4782,7 +4794,7 @@ func (s *server) ConnectToPeer(addr *lnwire.NetAddress,
// the crypto negotiation breaks down, then return an error to the
// caller.
errChan := make(chan error, 1)
s.connectToPeer(addr, errChan, timeout)
s.connectToPeer(ctx, addr, errChan, timeout)
select {
case err := <-errChan:
@@ -4795,7 +4807,7 @@ func (s *server) ConnectToPeer(addr *lnwire.NetAddress,
// connectToPeer establishes a connection to a remote peer. errChan is used to
// notify the caller if the connection attempt has failed. Otherwise, it will be
// closed.
func (s *server) connectToPeer(addr *lnwire.NetAddress,
func (s *server) connectToPeer(ctx context.Context, addr *lnwire.NetAddress,
errChan chan<- error, timeout time.Duration) {
conn, err := brontide.Dial(
@@ -4815,7 +4827,7 @@ func (s *server) connectToPeer(addr *lnwire.NetAddress,
srvrLog.Tracef("Brontide dialer made local=%v, remote=%v",
conn.LocalAddr(), conn.RemoteAddr())
s.OutboundPeerConnected(nil, conn)
s.OutboundPeerConnected(ctx, nil, conn)
}
// DisconnectPeer sends the request to server to close the connection with peer
@@ -4961,8 +4973,8 @@ func computeNextBackoff(currBackoff, maxBackoff time.Duration) time.Duration {
var errNoAdvertisedAddr = errors.New("no advertised address found")
// fetchNodeAdvertisedAddrs attempts to fetch the advertised addresses of a node.
func (s *server) fetchNodeAdvertisedAddrs(pub *btcec.PublicKey) ([]net.Addr, error) {
ctx := context.TODO()
func (s *server) fetchNodeAdvertisedAddrs(ctx context.Context,
pub *btcec.PublicKey) ([]net.Addr, error) {
vertex, err := route.NewVertexFromBytes(pub.SerializeCompressed())
if err != nil {