mirror of
https://github.com/imgproxy/imgproxy.git
synced 2025-10-10 12:12:40 +02:00
111 lines
3.1 KiB
Go
111 lines
3.1 KiB
Go
package workers
|
|
|
|
import (
|
|
"context"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/imgproxy/imgproxy/v3/testutil"
|
|
"github.com/stretchr/testify/suite"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
type WorkersTestSuite struct {
|
|
testutil.LazySuite
|
|
|
|
config testutil.LazyObj[*Config]
|
|
workers testutil.LazyObj[*Workers]
|
|
}
|
|
|
|
func (s *WorkersTestSuite) SetupSuite() {
|
|
s.config, _ = testutil.NewLazySuiteObj(s, func() (*Config, error) {
|
|
return &Config{RequestsQueueSize: 0, WorkersNumber: 1}, nil
|
|
})
|
|
|
|
s.workers, _ = testutil.NewLazySuiteObj(s, func() (*Workers, error) {
|
|
return New(s.config())
|
|
})
|
|
}
|
|
|
|
func (s *WorkersTestSuite) acquire(ctx context.Context, n int, delay time.Duration) (int, error) {
|
|
errg := new(errgroup.Group)
|
|
aquired := int64(0)
|
|
|
|
// Get the workers instance before running goroutines
|
|
workers := s.workers()
|
|
|
|
for i := 0; i < n; i++ {
|
|
errg.Go(func() error {
|
|
release, err := workers.Acquire(ctx)
|
|
|
|
if err == nil {
|
|
time.Sleep(delay)
|
|
|
|
release()
|
|
atomic.AddInt64(&aquired, 1)
|
|
}
|
|
|
|
return err
|
|
})
|
|
}
|
|
|
|
err := errg.Wait()
|
|
return int(aquired), err
|
|
}
|
|
|
|
func (s *WorkersTestSuite) TestQueueDisabled() {
|
|
s.config().RequestsQueueSize = 0
|
|
s.config().WorkersNumber = 2
|
|
|
|
// Try to acquire workers that exceed allowed workers number
|
|
aquired, err := s.acquire(s.T().Context(), 4, 10*time.Millisecond)
|
|
s.Require().Equal(4, aquired, "All workers should be eventually acquired")
|
|
s.Require().NoError(err, "All workers should be acquired without error")
|
|
}
|
|
|
|
func (s *WorkersTestSuite) TestQueueEnabled() {
|
|
s.config().RequestsQueueSize = 1
|
|
s.config().WorkersNumber = 2
|
|
|
|
// Try to acquire workers that fit allowed workers number + queue size
|
|
aquired, err := s.acquire(s.T().Context(), 3, 10*time.Millisecond)
|
|
s.Require().Equal(3, aquired, "All workers should be eventually acquired")
|
|
s.Require().NoError(err, "All workers should be acquired without error")
|
|
|
|
// Try to acquire workers that exceed allowed workers number + queue size
|
|
aquired, err = s.acquire(s.T().Context(), 6, 10*time.Millisecond)
|
|
s.Require().Equal(3, aquired, "Only 4 workers should be acquired")
|
|
s.Require().ErrorAs(err, new(TooManyRequestsError))
|
|
}
|
|
|
|
func (s *WorkersTestSuite) TestContextTimeout() {
|
|
ctx, cancel := context.WithTimeout(s.T().Context(), 5*time.Millisecond)
|
|
defer cancel()
|
|
|
|
aquired, err := s.acquire(ctx, 2, 100*time.Millisecond)
|
|
s.Require().Equal(1, aquired, "Only 1 worker should be acquired")
|
|
s.Require().ErrorIs(err, context.DeadlineExceeded, "Context deadline exceeded error expected")
|
|
}
|
|
|
|
func (s *WorkersTestSuite) TestContextCanceled() {
|
|
ctx, cancel := context.WithCancel(s.T().Context())
|
|
cancel()
|
|
|
|
aquired, err := s.acquire(ctx, 2, 100*time.Millisecond)
|
|
s.Require().Equal(0, aquired, "No worker should be acquired")
|
|
s.Require().ErrorIs(err, context.Canceled, "Context canceled error expected")
|
|
}
|
|
|
|
func (s *WorkersTestSuite) TestSemaphoresInvalidConfig() {
|
|
_, err := New(&Config{RequestsQueueSize: 0, WorkersNumber: 0})
|
|
s.Require().Error(err)
|
|
|
|
_, err = New(&Config{RequestsQueueSize: -1, WorkersNumber: 1})
|
|
s.Require().Error(err)
|
|
}
|
|
|
|
func TestWorkers(t *testing.T) {
|
|
suite.Run(t, new(WorkersTestSuite))
|
|
}
|