diff --git a/fn/goroutine_manager.go b/fn/goroutine_manager.go index 8c9ad8b2d..81c538ea0 100644 --- a/fn/goroutine_manager.go +++ b/fn/goroutine_manager.go @@ -2,13 +2,9 @@ package fn import ( "context" - "errors" "sync" ) -// ErrStopping is returned when trying to add a new goroutine while stopping. -var ErrStopping = errors.New("can not add goroutine, stopping") - // GoroutineManager is used to launch goroutines until context expires or the // manager is stopped. The Stop method blocks until all started goroutines stop. type GoroutineManager struct { @@ -29,8 +25,10 @@ func NewGoroutineManager(ctx context.Context) *GoroutineManager { } } -// Go starts a new goroutine if the manager is not stopping. -func (g *GoroutineManager) Go(f func(ctx context.Context)) error { +// Go tries to start a new goroutine and returns a boolean indicating its +// success. It fails iff the goroutine manager is stopping or its context passed +// to NewGoroutineManager has expired. +func (g *GoroutineManager) Go(f func(ctx context.Context)) bool { // Calling wg.Add(1) and wg.Wait() when wg's counter is 0 is a race // condition, since it is not clear should Wait() block or not. This // kind of race condition is detected by Go runtime and results in a @@ -43,7 +41,7 @@ func (g *GoroutineManager) Go(f func(ctx context.Context)) error { defer g.mu.Unlock() if g.ctx.Err() != nil { - return ErrStopping + return false } g.wg.Add(1) @@ -52,7 +50,7 @@ func (g *GoroutineManager) Go(f func(ctx context.Context)) error { f(g.ctx) }() - return nil + return true } // Stop prevents new goroutines from being added and waits for all running @@ -66,7 +64,7 @@ func (g *GoroutineManager) Stop() { // safe, since it can't run in parallel with wg.Add(1) call in Go, since // we just cancelled the context and even if Go call starts running here // after acquiring the mutex, it would see that the context has expired - // and return ErrStopping instead of calling wg.Add(1). + // and return false instead of calling wg.Add(1). g.wg.Wait() } diff --git a/fn/goroutine_manager_test.go b/fn/goroutine_manager_test.go index d06a62b4a..1fc945b97 100644 --- a/fn/goroutine_manager_test.go +++ b/fn/goroutine_manager_test.go @@ -2,6 +2,7 @@ package fn import ( "context" + "sync" "testing" "time" @@ -19,7 +20,7 @@ func TestGoroutineManager(t *testing.T) { taskChan := make(chan struct{}) - require.NoError(t, m.Go(func(ctx context.Context) { + require.True(t, m.Go(func(ctx context.Context) { <-taskChan })) @@ -37,7 +38,7 @@ func TestGoroutineManager(t *testing.T) { require.Greater(t, stopDelay, time.Second) // Make sure new goroutines do not start after Stop. - require.ErrorIs(t, m.Go(func(ctx context.Context) {}), ErrStopping) + require.False(t, m.Go(func(ctx context.Context) {})) // When Stop() is called, the internal context expires and m.Done() is // closed. Test this. @@ -56,7 +57,7 @@ func TestGoroutineManagerContextExpires(t *testing.T) { m := NewGoroutineManager(ctx) - require.NoError(t, m.Go(func(ctx context.Context) { + require.True(t, m.Go(func(ctx context.Context) { <-ctx.Done() })) @@ -79,7 +80,7 @@ func TestGoroutineManagerContextExpires(t *testing.T) { } // Make sure new goroutines do not start after context expiry. - require.ErrorIs(t, m.Go(func(ctx context.Context) {}), ErrStopping) + require.False(t, m.Go(func(ctx context.Context) {})) // Stop will wait for all goroutines to stop. m.Stop() @@ -107,11 +108,11 @@ func TestGoroutineManagerStress(t *testing.T) { // implementation, this test crashes under `-race`. for i := 0; i < 100; i++ { taskChan := make(chan struct{}) - err := m.Go(func(ctx context.Context) { + ok := m.Go(func(ctx context.Context) { close(taskChan) }) // If goroutine was started, wait for its completion. - if err == nil { + if ok { <-taskChan } } @@ -119,3 +120,38 @@ func TestGoroutineManagerStress(t *testing.T) { // Wait for Stop to complete. <-stopChan } + +// TestGoroutineManagerStopsStress launches many Stop() calls in parallel with a +// task exiting. It attempts to catch a race condition between wg.Done() and +// wg.Wait() calls. According to documentation of wg.Wait() this is acceptable, +// therefore this test passes even with -race. +func TestGoroutineManagerStopsStress(t *testing.T) { + t.Parallel() + + m := NewGoroutineManager(context.Background()) + + // jobChan is used to make the task to finish. + jobChan := make(chan struct{}) + + // Start a task and wait inside it until we start calling Stop() method. + ok := m.Go(func(ctx context.Context) { + <-jobChan + }) + require.True(t, ok) + + // Now launch many gorotines calling Stop() method in parallel. + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + m.Stop() + }() + } + + // Exit the task in parallel with Stop() calls. + close(jobChan) + + // Wait until all the Stop() calls complete. + wg.Wait() +}