From c68a19c0baa7130c5ecc25eeb387b62fd5e2dfe3 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 11 Apr 2025 09:54:28 +0200 Subject: [PATCH] discovery: revert passing ctx through to Start methods --- discovery/gossiper.go | 8 ++++---- discovery/gossiper_test.go | 4 ++-- discovery/reliable_sender.go | 4 ++-- discovery/sync_manager.go | 21 ++++++--------------- discovery/sync_manager_test.go | 19 +++++++++---------- discovery/syncer.go | 4 ++-- discovery/syncer_test.go | 24 +++++++++--------------- server.go | 2 +- 8 files changed, 35 insertions(+), 51 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index ab12524fb..4b2f4f220 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -643,10 +643,10 @@ func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate( // Start spawns network messages handler goroutine and registers on new block // notifications in order to properly handle the premature announcements. -func (d *AuthenticatedGossiper) Start(ctx context.Context) error { +func (d *AuthenticatedGossiper) Start() error { var err error d.started.Do(func() { - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(context.Background()) d.cancel = fn.Some(cancel) log.Info("Authenticated Gossiper starting") @@ -674,11 +674,11 @@ func (d *AuthenticatedGossiper) start(ctx context.Context) error { // Start the reliable sender. In case we had any pending messages ready // to be sent when the gossiper was last shut down, we must continue on // our quest to deliver them to their respective peers. - if err := d.reliableSender.Start(ctx); err != nil { + if err := d.reliableSender.Start(); err != nil { return err } - d.syncMgr.Start(ctx) + d.syncMgr.Start() d.banman.start() diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index ef7f2f21f..399010991 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -994,7 +994,7 @@ func createTestCtx(t *testing.T, startHeight uint32, isChanPeer bool) ( ScidCloser: newMockScidCloser(isChanPeer), }, selfKeyDesc) - if err := gossiper.Start(context.Background()); err != nil { + if err := gossiper.Start(); err != nil { return nil, fmt.Errorf("unable to start router: %w", err) } @@ -1692,7 +1692,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { KeyLocator: tCtx.gossiper.selfKeyLoc, }) require.NoError(t, err, "unable to recreate gossiper") - if err := gossiper.Start(context.Background()); err != nil { + if err := gossiper.Start(); err != nil { t.Fatalf("unable to start recreated gossiper: %v", err) } defer gossiper.Stop() diff --git a/discovery/reliable_sender.go b/discovery/reliable_sender.go index 57f2f28ff..357654a65 100644 --- a/discovery/reliable_sender.go +++ b/discovery/reliable_sender.go @@ -76,10 +76,10 @@ func newReliableSender(cfg *reliableSenderCfg) *reliableSender { } // Start spawns message handlers for any peers with pending messages. -func (s *reliableSender) Start(ctx context.Context) error { +func (s *reliableSender) Start() error { var err error s.start.Do(func() { - ctx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(context.Background()) s.cancel = fn.Some(cancel) err = s.resendPendingMsgs(ctx) diff --git a/discovery/sync_manager.go b/discovery/sync_manager.go index 807bd99b9..65e0774a4 100644 --- a/discovery/sync_manager.go +++ b/discovery/sync_manager.go @@ -8,7 +8,6 @@ import ( "time" "github.com/btcsuite/btcd/chaincfg/chainhash" - "github.com/lightningnetwork/lnd/fn/v2" "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" @@ -201,9 +200,8 @@ type SyncManager struct { // number of queries. rateLimiter *rate.Limiter - wg sync.WaitGroup - quit chan struct{} - cancel fn.Option[context.CancelFunc] + wg sync.WaitGroup + quit chan struct{} } // newSyncManager constructs a new SyncManager backed by the given config. @@ -248,13 +246,10 @@ func newSyncManager(cfg *SyncManagerCfg) *SyncManager { } // Start starts the SyncManager in order to properly carry out its duties. -func (m *SyncManager) Start(ctx context.Context) { +func (m *SyncManager) Start() { m.start.Do(func() { - ctx, cancel := context.WithCancel(ctx) - m.cancel = fn.Some(cancel) - m.wg.Add(1) - go m.syncerHandler(ctx) + go m.syncerHandler() }) } @@ -264,7 +259,6 @@ func (m *SyncManager) Stop() { log.Debugf("SyncManager is stopping") defer log.Debugf("SyncManager stopped") - m.cancel.WhenSome(func(fn context.CancelFunc) { fn() }) close(m.quit) m.wg.Wait() @@ -288,7 +282,7 @@ func (m *SyncManager) Stop() { // much of the public network as possible. // // NOTE: This must be run as a goroutine. -func (m *SyncManager) syncerHandler(ctx context.Context) { +func (m *SyncManager) syncerHandler() { defer m.wg.Done() m.cfg.RotateTicker.Resume() @@ -386,7 +380,7 @@ func (m *SyncManager) syncerHandler(ctx context.Context) { } m.syncersMu.Unlock() - s.Start(ctx) + s.Start() // Once we create the GossipSyncer, we'll signal to the // caller that they can proceed since the SyncManager's @@ -538,9 +532,6 @@ func (m *SyncManager) syncerHandler(ctx context.Context) { case <-m.quit: return - - case <-ctx.Done(): - return } } } diff --git a/discovery/sync_manager_test.go b/discovery/sync_manager_test.go index b8ef93197..4aff5b631 100644 --- a/discovery/sync_manager_test.go +++ b/discovery/sync_manager_test.go @@ -2,7 +2,6 @@ package discovery import ( "bytes" - "context" "fmt" "io" "reflect" @@ -83,7 +82,7 @@ func TestSyncManagerNumActiveSyncers(t *testing.T) { } syncMgr := newPinnedTestSyncManager(numActiveSyncers, pinnedSyncers) - syncMgr.Start(context.Background()) + syncMgr.Start() defer syncMgr.Stop() // First we'll start by adding the pinned syncers. These should @@ -135,7 +134,7 @@ func TestSyncManagerNewActiveSyncerAfterDisconnect(t *testing.T) { // We'll create our test sync manager to have two active syncers. syncMgr := newTestSyncManager(2) - syncMgr.Start(context.Background()) + syncMgr.Start() defer syncMgr.Stop() // The first will be an active syncer that performs a historical sync @@ -188,7 +187,7 @@ func TestSyncManagerRotateActiveSyncerCandidate(t *testing.T) { // We'll create our sync manager with three active syncers. syncMgr := newTestSyncManager(1) - syncMgr.Start(context.Background()) + syncMgr.Start() defer syncMgr.Stop() // The first syncer registered always performs a historical sync. @@ -236,7 +235,7 @@ func TestSyncManagerNoInitialHistoricalSync(t *testing.T) { t.Parallel() syncMgr := newTestSyncManager(0) - syncMgr.Start(context.Background()) + syncMgr.Start() defer syncMgr.Stop() // We should not expect any messages from the peer. @@ -270,7 +269,7 @@ func TestSyncManagerInitialHistoricalSync(t *testing.T) { t.Fatal("expected graph to not be considered as synced") } - syncMgr.Start(context.Background()) + syncMgr.Start() defer syncMgr.Stop() // We should expect to see a QueryChannelRange message with a @@ -339,7 +338,7 @@ func TestSyncManagerHistoricalSyncOnReconnect(t *testing.T) { t.Parallel() syncMgr := newTestSyncManager(2) - syncMgr.Start(context.Background()) + syncMgr.Start() defer syncMgr.Stop() // We should expect to see a QueryChannelRange message with a @@ -373,7 +372,7 @@ func TestSyncManagerForceHistoricalSync(t *testing.T) { t.Parallel() syncMgr := newTestSyncManager(1) - syncMgr.Start(context.Background()) + syncMgr.Start() defer syncMgr.Stop() // We should expect to see a QueryChannelRange message with a @@ -411,7 +410,7 @@ func TestSyncManagerGraphSyncedAfterHistoricalSyncReplacement(t *testing.T) { t.Parallel() syncMgr := newTestSyncManager(1) - syncMgr.Start(context.Background()) + syncMgr.Start() defer syncMgr.Stop() // We should expect to see a QueryChannelRange message with a @@ -469,7 +468,7 @@ func TestSyncManagerWaitUntilInitialHistoricalSync(t *testing.T) { // We'll start by creating our test sync manager which will hold up to // 2 active syncers. syncMgr := newTestSyncManager(numActiveSyncers) - syncMgr.Start(context.Background()) + syncMgr.Start() defer syncMgr.Stop() // We'll go ahead and create our syncers. diff --git a/discovery/syncer.go b/discovery/syncer.go index 0b4e7030b..b27cc8c71 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -405,11 +405,11 @@ func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer { // Start starts the GossipSyncer and any goroutines that it needs to carry out // its duties. -func (g *GossipSyncer) Start(ctx context.Context) { +func (g *GossipSyncer) Start() { g.started.Do(func() { log.Debugf("Starting GossipSyncer(%x)", g.cfg.peerPub[:]) - ctx, _ := g.cg.Create(ctx) + ctx, _ := g.cg.Create(context.Background()) // TODO(conner): only spawn channelGraphSyncer if remote // supports gossip queries, and only spawn replyHandler if we diff --git a/discovery/syncer_test.go b/discovery/syncer_test.go index 13071d4b0..44e8d6d70 100644 --- a/discovery/syncer_test.go +++ b/discovery/syncer_test.go @@ -1703,7 +1703,6 @@ func queryBatch(t *testing.T, // them. func TestGossipSyncerRoutineSync(t *testing.T) { t.Parallel() - ctx := context.Background() // We'll modify the chunk size to be a smaller value, so we can ensure // our chunk parsing works properly. With this value we should get 3 @@ -1718,13 +1717,13 @@ func TestGossipSyncerRoutineSync(t *testing.T) { msgChan1, syncer1, chanSeries1 := newTestSyncer( highestID, defaultEncoding, chunkSize, true, false, ) - syncer1.Start(ctx) + syncer1.Start() defer syncer1.Stop() msgChan2, syncer2, chanSeries2 := newTestSyncer( highestID, defaultEncoding, chunkSize, false, true, ) - syncer2.Start(ctx) + syncer2.Start() defer syncer2.Stop() // Although both nodes are at the same height, syncer will have 3 chan @@ -1851,7 +1850,6 @@ func TestGossipSyncerRoutineSync(t *testing.T) { // final state and not perform any channel queries. func TestGossipSyncerAlreadySynced(t *testing.T) { t.Parallel() - ctx := context.Background() // We'll modify the chunk size to be a smaller value, so we can ensure // our chunk parsing works properly. With this value we should get 3 @@ -1867,13 +1865,13 @@ func TestGossipSyncerAlreadySynced(t *testing.T) { msgChan1, syncer1, chanSeries1 := newTestSyncer( highestID, defaultEncoding, chunkSize, ) - syncer1.Start(ctx) + syncer1.Start() defer syncer1.Stop() msgChan2, syncer2, chanSeries2 := newTestSyncer( highestID, defaultEncoding, chunkSize, ) - syncer2.Start(ctx) + syncer2.Start() defer syncer2.Stop() // The channel state of both syncers will be identical. They should @@ -2073,7 +2071,6 @@ func TestGossipSyncerAlreadySynced(t *testing.T) { // carries out its duties when accepting a new sync transition request. func TestGossipSyncerSyncTransitions(t *testing.T) { t.Parallel() - ctx := context.Background() assertMsgSent := func(t *testing.T, msgChan chan []lnwire.Message, msg lnwire.Message) { @@ -2194,7 +2191,7 @@ func TestGossipSyncerSyncTransitions(t *testing.T) { // We'll then start the syncer in order to process the // request. - syncer.Start(ctx) + syncer.Start() defer syncer.Stop() syncer.ProcessSyncTransition(test.finalSyncType) @@ -2219,7 +2216,6 @@ func TestGossipSyncerSyncTransitions(t *testing.T) { // historical sync with the remote peer. func TestGossipSyncerHistoricalSync(t *testing.T) { t.Parallel() - ctx := context.Background() // We'll create a new gossip syncer and manually override its state to // chansSynced. This is necessary as the syncer can only process @@ -2231,7 +2227,7 @@ func TestGossipSyncerHistoricalSync(t *testing.T) { syncer.setSyncType(PassiveSync) syncer.setSyncState(chansSynced) - syncer.Start(ctx) + syncer.Start() defer syncer.Stop() syncer.historicalSync() @@ -2264,7 +2260,6 @@ func TestGossipSyncerHistoricalSync(t *testing.T) { // syncer reaches its terminal chansSynced state. func TestGossipSyncerSyncedSignal(t *testing.T) { t.Parallel() - ctx := context.Background() // We'll create a new gossip syncer and manually override its state to // chansSynced. @@ -2279,7 +2274,7 @@ func TestGossipSyncerSyncedSignal(t *testing.T) { signalChan := syncer.ResetSyncedSignal() // Starting the gossip syncer should cause the signal to be delivered. - syncer.Start(ctx) + syncer.Start() select { case <-signalChan: @@ -2298,7 +2293,7 @@ func TestGossipSyncerSyncedSignal(t *testing.T) { syncer.setSyncState(chansSynced) - syncer.Start(ctx) + syncer.Start() defer syncer.Stop() signalChan = syncer.ResetSyncedSignal() @@ -2317,7 +2312,6 @@ func TestGossipSyncerSyncedSignal(t *testing.T) { // said limit are not processed. func TestGossipSyncerMaxChannelRangeReplies(t *testing.T) { t.Parallel() - ctx := context.Background() msgChan, syncer, chanSeries := newTestSyncer( lnwire.ShortChannelID{BlockHeight: latestKnownHeight}, @@ -2328,7 +2322,7 @@ func TestGossipSyncerMaxChannelRangeReplies(t *testing.T) { // the sake of testing. syncer.cfg.maxQueryChanRangeReplies = 100 - syncer.Start(ctx) + syncer.Start() defer syncer.Stop() // Upon initialization, the syncer should submit a QueryChannelRange diff --git a/server.go b/server.go index 3c352db90..81cffee3e 100644 --- a/server.go +++ b/server.go @@ -2389,7 +2389,7 @@ func (s *server) Start(ctx context.Context) error { // The authGossiper depends on the chanRouter and therefore // should be started after it. cleanup = cleanup.add(s.authGossiper.Stop) - if err := s.authGossiper.Start(ctx); err != nil { + if err := s.authGossiper.Start(); err != nil { startErr = err return }