discovery: revert passing ctx through to Start methods

This commit is contained in:
Elle Mouton
2025-04-11 09:54:28 +02:00
committed by ziggie
parent 2f99706fa1
commit 6202597eec
8 changed files with 35 additions and 51 deletions

View File

@@ -643,10 +643,10 @@ func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate(
// Start spawns network messages handler goroutine and registers on new block
// notifications in order to properly handle the premature announcements.
func (d *AuthenticatedGossiper) Start(ctx context.Context) error {
func (d *AuthenticatedGossiper) Start() error {
var err error
d.started.Do(func() {
ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancel(context.Background())
d.cancel = fn.Some(cancel)
log.Info("Authenticated Gossiper starting")
@@ -674,11 +674,11 @@ func (d *AuthenticatedGossiper) start(ctx context.Context) error {
// Start the reliable sender. In case we had any pending messages ready
// to be sent when the gossiper was last shut down, we must continue on
// our quest to deliver them to their respective peers.
if err := d.reliableSender.Start(ctx); err != nil {
if err := d.reliableSender.Start(); err != nil {
return err
}
d.syncMgr.Start(ctx)
d.syncMgr.Start()
d.banman.start()

View File

@@ -994,7 +994,7 @@ func createTestCtx(t *testing.T, startHeight uint32, isChanPeer bool) (
ScidCloser: newMockScidCloser(isChanPeer),
}, selfKeyDesc)
if err := gossiper.Start(context.Background()); err != nil {
if err := gossiper.Start(); err != nil {
return nil, fmt.Errorf("unable to start router: %w", err)
}
@@ -1692,7 +1692,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
KeyLocator: tCtx.gossiper.selfKeyLoc,
})
require.NoError(t, err, "unable to recreate gossiper")
if err := gossiper.Start(context.Background()); err != nil {
if err := gossiper.Start(); err != nil {
t.Fatalf("unable to start recreated gossiper: %v", err)
}
defer gossiper.Stop()

View File

@@ -76,10 +76,10 @@ func newReliableSender(cfg *reliableSenderCfg) *reliableSender {
}
// Start spawns message handlers for any peers with pending messages.
func (s *reliableSender) Start(ctx context.Context) error {
func (s *reliableSender) Start() error {
var err error
s.start.Do(func() {
ctx, cancel := context.WithCancel(ctx)
ctx, cancel := context.WithCancel(context.Background())
s.cancel = fn.Some(cancel)
err = s.resendPendingMsgs(ctx)

View File

@@ -8,7 +8,6 @@ import (
"time"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/lightningnetwork/lnd/fn/v2"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
@@ -201,9 +200,8 @@ type SyncManager struct {
// number of queries.
rateLimiter *rate.Limiter
wg sync.WaitGroup
quit chan struct{}
cancel fn.Option[context.CancelFunc]
wg sync.WaitGroup
quit chan struct{}
}
// newSyncManager constructs a new SyncManager backed by the given config.
@@ -248,13 +246,10 @@ func newSyncManager(cfg *SyncManagerCfg) *SyncManager {
}
// Start starts the SyncManager in order to properly carry out its duties.
func (m *SyncManager) Start(ctx context.Context) {
func (m *SyncManager) Start() {
m.start.Do(func() {
ctx, cancel := context.WithCancel(ctx)
m.cancel = fn.Some(cancel)
m.wg.Add(1)
go m.syncerHandler(ctx)
go m.syncerHandler()
})
}
@@ -264,7 +259,6 @@ func (m *SyncManager) Stop() {
log.Debugf("SyncManager is stopping")
defer log.Debugf("SyncManager stopped")
m.cancel.WhenSome(func(fn context.CancelFunc) { fn() })
close(m.quit)
m.wg.Wait()
@@ -288,7 +282,7 @@ func (m *SyncManager) Stop() {
// much of the public network as possible.
//
// NOTE: This must be run as a goroutine.
func (m *SyncManager) syncerHandler(ctx context.Context) {
func (m *SyncManager) syncerHandler() {
defer m.wg.Done()
m.cfg.RotateTicker.Resume()
@@ -386,7 +380,7 @@ func (m *SyncManager) syncerHandler(ctx context.Context) {
}
m.syncersMu.Unlock()
s.Start(ctx)
s.Start()
// Once we create the GossipSyncer, we'll signal to the
// caller that they can proceed since the SyncManager's
@@ -538,9 +532,6 @@ func (m *SyncManager) syncerHandler(ctx context.Context) {
case <-m.quit:
return
case <-ctx.Done():
return
}
}
}

View File

@@ -2,7 +2,6 @@ package discovery
import (
"bytes"
"context"
"fmt"
"io"
"reflect"
@@ -83,7 +82,7 @@ func TestSyncManagerNumActiveSyncers(t *testing.T) {
}
syncMgr := newPinnedTestSyncManager(numActiveSyncers, pinnedSyncers)
syncMgr.Start(context.Background())
syncMgr.Start()
defer syncMgr.Stop()
// First we'll start by adding the pinned syncers. These should
@@ -135,7 +134,7 @@ func TestSyncManagerNewActiveSyncerAfterDisconnect(t *testing.T) {
// We'll create our test sync manager to have two active syncers.
syncMgr := newTestSyncManager(2)
syncMgr.Start(context.Background())
syncMgr.Start()
defer syncMgr.Stop()
// The first will be an active syncer that performs a historical sync
@@ -188,7 +187,7 @@ func TestSyncManagerRotateActiveSyncerCandidate(t *testing.T) {
// We'll create our sync manager with three active syncers.
syncMgr := newTestSyncManager(1)
syncMgr.Start(context.Background())
syncMgr.Start()
defer syncMgr.Stop()
// The first syncer registered always performs a historical sync.
@@ -236,7 +235,7 @@ func TestSyncManagerNoInitialHistoricalSync(t *testing.T) {
t.Parallel()
syncMgr := newTestSyncManager(0)
syncMgr.Start(context.Background())
syncMgr.Start()
defer syncMgr.Stop()
// We should not expect any messages from the peer.
@@ -270,7 +269,7 @@ func TestSyncManagerInitialHistoricalSync(t *testing.T) {
t.Fatal("expected graph to not be considered as synced")
}
syncMgr.Start(context.Background())
syncMgr.Start()
defer syncMgr.Stop()
// We should expect to see a QueryChannelRange message with a
@@ -339,7 +338,7 @@ func TestSyncManagerHistoricalSyncOnReconnect(t *testing.T) {
t.Parallel()
syncMgr := newTestSyncManager(2)
syncMgr.Start(context.Background())
syncMgr.Start()
defer syncMgr.Stop()
// We should expect to see a QueryChannelRange message with a
@@ -373,7 +372,7 @@ func TestSyncManagerForceHistoricalSync(t *testing.T) {
t.Parallel()
syncMgr := newTestSyncManager(1)
syncMgr.Start(context.Background())
syncMgr.Start()
defer syncMgr.Stop()
// We should expect to see a QueryChannelRange message with a
@@ -411,7 +410,7 @@ func TestSyncManagerGraphSyncedAfterHistoricalSyncReplacement(t *testing.T) {
t.Parallel()
syncMgr := newTestSyncManager(1)
syncMgr.Start(context.Background())
syncMgr.Start()
defer syncMgr.Stop()
// We should expect to see a QueryChannelRange message with a
@@ -469,7 +468,7 @@ func TestSyncManagerWaitUntilInitialHistoricalSync(t *testing.T) {
// We'll start by creating our test sync manager which will hold up to
// 2 active syncers.
syncMgr := newTestSyncManager(numActiveSyncers)
syncMgr.Start(context.Background())
syncMgr.Start()
defer syncMgr.Stop()
// We'll go ahead and create our syncers.

View File

@@ -405,11 +405,11 @@ func newGossipSyncer(cfg gossipSyncerCfg, sema chan struct{}) *GossipSyncer {
// Start starts the GossipSyncer and any goroutines that it needs to carry out
// its duties.
func (g *GossipSyncer) Start(ctx context.Context) {
func (g *GossipSyncer) Start() {
g.started.Do(func() {
log.Debugf("Starting GossipSyncer(%x)", g.cfg.peerPub[:])
ctx, _ := g.cg.Create(ctx)
ctx, _ := g.cg.Create(context.Background())
// TODO(conner): only spawn channelGraphSyncer if remote
// supports gossip queries, and only spawn replyHandler if we

View File

@@ -1703,7 +1703,6 @@ func queryBatch(t *testing.T,
// them.
func TestGossipSyncerRoutineSync(t *testing.T) {
t.Parallel()
ctx := context.Background()
// We'll modify the chunk size to be a smaller value, so we can ensure
// our chunk parsing works properly. With this value we should get 3
@@ -1718,13 +1717,13 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
msgChan1, syncer1, chanSeries1 := newTestSyncer(
highestID, defaultEncoding, chunkSize, true, false,
)
syncer1.Start(ctx)
syncer1.Start()
defer syncer1.Stop()
msgChan2, syncer2, chanSeries2 := newTestSyncer(
highestID, defaultEncoding, chunkSize, false, true,
)
syncer2.Start(ctx)
syncer2.Start()
defer syncer2.Stop()
// Although both nodes are at the same height, syncer will have 3 chan
@@ -1851,7 +1850,6 @@ func TestGossipSyncerRoutineSync(t *testing.T) {
// final state and not perform any channel queries.
func TestGossipSyncerAlreadySynced(t *testing.T) {
t.Parallel()
ctx := context.Background()
// We'll modify the chunk size to be a smaller value, so we can ensure
// our chunk parsing works properly. With this value we should get 3
@@ -1867,13 +1865,13 @@ func TestGossipSyncerAlreadySynced(t *testing.T) {
msgChan1, syncer1, chanSeries1 := newTestSyncer(
highestID, defaultEncoding, chunkSize,
)
syncer1.Start(ctx)
syncer1.Start()
defer syncer1.Stop()
msgChan2, syncer2, chanSeries2 := newTestSyncer(
highestID, defaultEncoding, chunkSize,
)
syncer2.Start(ctx)
syncer2.Start()
defer syncer2.Stop()
// The channel state of both syncers will be identical. They should
@@ -2073,7 +2071,6 @@ func TestGossipSyncerAlreadySynced(t *testing.T) {
// carries out its duties when accepting a new sync transition request.
func TestGossipSyncerSyncTransitions(t *testing.T) {
t.Parallel()
ctx := context.Background()
assertMsgSent := func(t *testing.T, msgChan chan []lnwire.Message,
msg lnwire.Message) {
@@ -2194,7 +2191,7 @@ func TestGossipSyncerSyncTransitions(t *testing.T) {
// We'll then start the syncer in order to process the
// request.
syncer.Start(ctx)
syncer.Start()
defer syncer.Stop()
syncer.ProcessSyncTransition(test.finalSyncType)
@@ -2219,7 +2216,6 @@ func TestGossipSyncerSyncTransitions(t *testing.T) {
// historical sync with the remote peer.
func TestGossipSyncerHistoricalSync(t *testing.T) {
t.Parallel()
ctx := context.Background()
// We'll create a new gossip syncer and manually override its state to
// chansSynced. This is necessary as the syncer can only process
@@ -2231,7 +2227,7 @@ func TestGossipSyncerHistoricalSync(t *testing.T) {
syncer.setSyncType(PassiveSync)
syncer.setSyncState(chansSynced)
syncer.Start(ctx)
syncer.Start()
defer syncer.Stop()
syncer.historicalSync()
@@ -2264,7 +2260,6 @@ func TestGossipSyncerHistoricalSync(t *testing.T) {
// syncer reaches its terminal chansSynced state.
func TestGossipSyncerSyncedSignal(t *testing.T) {
t.Parallel()
ctx := context.Background()
// We'll create a new gossip syncer and manually override its state to
// chansSynced.
@@ -2279,7 +2274,7 @@ func TestGossipSyncerSyncedSignal(t *testing.T) {
signalChan := syncer.ResetSyncedSignal()
// Starting the gossip syncer should cause the signal to be delivered.
syncer.Start(ctx)
syncer.Start()
select {
case <-signalChan:
@@ -2298,7 +2293,7 @@ func TestGossipSyncerSyncedSignal(t *testing.T) {
syncer.setSyncState(chansSynced)
syncer.Start(ctx)
syncer.Start()
defer syncer.Stop()
signalChan = syncer.ResetSyncedSignal()
@@ -2317,7 +2312,6 @@ func TestGossipSyncerSyncedSignal(t *testing.T) {
// said limit are not processed.
func TestGossipSyncerMaxChannelRangeReplies(t *testing.T) {
t.Parallel()
ctx := context.Background()
msgChan, syncer, chanSeries := newTestSyncer(
lnwire.ShortChannelID{BlockHeight: latestKnownHeight},
@@ -2328,7 +2322,7 @@ func TestGossipSyncerMaxChannelRangeReplies(t *testing.T) {
// the sake of testing.
syncer.cfg.maxQueryChanRangeReplies = 100
syncer.Start(ctx)
syncer.Start()
defer syncer.Stop()
// Upon initialization, the syncer should submit a QueryChannelRange