diff --git a/go.mod b/go.mod index b1dcaded..806a9d0e 100644 --- a/go.mod +++ b/go.mod @@ -60,6 +60,7 @@ require ( go.uber.org/automaxprocs v1.5.3 golang.org/x/image v0.15.0 golang.org/x/net v0.24.0 + golang.org/x/sync v0.7.0 golang.org/x/sys v0.19.0 google.golang.org/api v0.176.1 google.golang.org/grpc v1.63.2 @@ -167,7 +168,6 @@ require ( go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.22.0 // indirect golang.org/x/oauth2 v0.19.0 // indirect - golang.org/x/sync v0.7.0 // indirect golang.org/x/term v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.5.0 // indirect diff --git a/processing_handler.go b/processing_handler.go index eb2f9a22..ab901201 100644 --- a/processing_handler.go +++ b/processing_handler.go @@ -10,6 +10,7 @@ import ( "time" log "github.com/sirupsen/logrus" + "golang.org/x/sync/semaphore" "github.com/imgproxy/imgproxy/v3/config" "github.com/imgproxy/imgproxy/v3/cookies" @@ -25,24 +26,23 @@ import ( "github.com/imgproxy/imgproxy/v3/processing" "github.com/imgproxy/imgproxy/v3/router" "github.com/imgproxy/imgproxy/v3/security" - "github.com/imgproxy/imgproxy/v3/semaphore" "github.com/imgproxy/imgproxy/v3/svg" "github.com/imgproxy/imgproxy/v3/vips" ) var ( - queueSem *semaphore.Semaphore - processingSem *semaphore.Semaphore + queueSem *semaphore.Weighted + processingSem *semaphore.Weighted headerVaryValue string ) func initProcessingHandler() { if config.RequestsQueueSize > 0 { - queueSem = semaphore.New(config.RequestsQueueSize + config.Workers) + queueSem = semaphore.NewWeighted(int64(config.RequestsQueueSize + config.Workers)) } - processingSem = semaphore.New(config.Workers) + processingSem = semaphore.NewWeighted(int64(config.Workers)) vary := make([]string, 0) @@ -205,11 +205,11 @@ func handleProcessing(reqID string, rw http.ResponseWriter, r *http.Request) { ctx := r.Context() if queueSem != nil { - token, acquired := queueSem.TryAcquire() + acquired := queueSem.TryAcquire(1) if !acquired { panic(ierrors.New(429, "Too many requests", "Too many requests")) } - defer token.Release() + defer queueSem.Release(1) } path := r.RequestURI @@ -282,21 +282,22 @@ func handleProcessing(reqID string, rw http.ResponseWriter, r *http.Request) { } } - // The heavy part start here, so we need to restrict worker number - var processingSemToken *semaphore.Token + // The heavy part starts here, so we need to restrict worker number func() { defer metrics.StartQueueSegment(ctx)() - var acquired bool - processingSemToken, acquired = processingSem.Acquire(ctx) - if !acquired { + err = processingSem.Acquire(ctx, 1) + if err != nil { // We don't actually need to check timeout here, // but it's an easy way to check if this is an actual timeout // or the request was canceled checkErr(ctx, "queue", router.CheckTimeout(ctx)) + // We should never reach this line as err could be only ctx.Err() + // and we've already checked for it. But beter safe than sorry + sendErrAndPanic(ctx, "queue", err) } }() - defer processingSemToken.Release() + defer processingSem.Release(1) stats.IncImagesInProgress() defer stats.DecImagesInProgress() diff --git a/semaphore/semaphore.go b/semaphore/semaphore.go deleted file mode 100644 index 3a33db4d..00000000 --- a/semaphore/semaphore.go +++ /dev/null @@ -1,47 +0,0 @@ -package semaphore - -import ( - "context" - "sync" -) - -type Semaphore struct { - sem chan struct{} -} - -func New(n int) *Semaphore { - return &Semaphore{ - sem: make(chan struct{}, n), - } -} - -func (s *Semaphore) Acquire(ctx context.Context) (*Token, bool) { - select { - case s.sem <- struct{}{}: - return &Token{release: s.release}, true - case <-ctx.Done(): - return &Token{release: func() {}}, false - } -} - -func (s *Semaphore) TryAcquire() (*Token, bool) { - select { - case s.sem <- struct{}{}: - return &Token{release: s.release}, true - default: - return &Token{release: func() {}}, false - } -} - -func (s *Semaphore) release() { - <-s.sem -} - -type Token struct { - release func() - releaseOnce sync.Once -} - -func (t *Token) Release() { - t.releaseOnce.Do(t.release) -}