mirror of
https://github.com/imgproxy/imgproxy.git
synced 2025-09-28 20:43:54 +02:00
IMG-46: Call finishFn on download finished (metrics) (#1487)
* Skip goes to processing * Call finishFn on download finished * defer DownloadFinished in DownloadSync
This commit is contained in:
@@ -14,6 +14,7 @@
|
|||||||
package asyncbuffer
|
package asyncbuffer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -68,15 +69,19 @@ type AsyncBuffer struct {
|
|||||||
|
|
||||||
paused *Latch // Paused buffer does not read data beyond threshold
|
paused *Latch // Paused buffer does not read data beyond threshold
|
||||||
chunkCond *Cond // Ticker that signals when a new chunk is ready
|
chunkCond *Cond // Ticker that signals when a new chunk is ready
|
||||||
|
|
||||||
|
finishOnce sync.Once
|
||||||
|
finishFn []context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new AsyncBuffer that reads from the given io.ReadCloser in background
|
// New creates a new AsyncBuffer that reads from the given io.ReadCloser in background
|
||||||
// and closes it when finished.
|
// and closes it when finished.
|
||||||
func New(r io.ReadCloser) *AsyncBuffer {
|
func New(r io.ReadCloser, finishFn ...context.CancelFunc) *AsyncBuffer {
|
||||||
ab := &AsyncBuffer{
|
ab := &AsyncBuffer{
|
||||||
r: r,
|
r: r,
|
||||||
paused: NewLatch(),
|
paused: NewLatch(),
|
||||||
chunkCond: NewCond(),
|
chunkCond: NewCond(),
|
||||||
|
finishFn: finishFn,
|
||||||
}
|
}
|
||||||
|
|
||||||
go ab.readChunks()
|
go ab.readChunks()
|
||||||
@@ -113,6 +118,12 @@ func (ab *AsyncBuffer) readChunks() {
|
|||||||
if err := ab.r.Close(); err != nil {
|
if err := ab.r.Close(); err != nil {
|
||||||
logrus.WithField("source", "asyncbuffer.AsyncBuffer.readChunks").Warningf("error closing upstream reader: %s", err)
|
logrus.WithField("source", "asyncbuffer.AsyncBuffer.readChunks").Warningf("error closing upstream reader: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ab.finishOnce.Do(func() {
|
||||||
|
for _, fn := range ab.finishFn {
|
||||||
|
fn()
|
||||||
|
}
|
||||||
|
})
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Stop reading if the reader is closed
|
// Stop reading if the reader is closed
|
||||||
@@ -391,6 +402,13 @@ func (ab *AsyncBuffer) Close() error {
|
|||||||
// Release the paused latch so that no goroutines are waiting for it
|
// Release the paused latch so that no goroutines are waiting for it
|
||||||
ab.paused.Release()
|
ab.paused.Release()
|
||||||
|
|
||||||
|
// Finish downloading
|
||||||
|
ab.finishOnce.Do(func() {
|
||||||
|
for _, fn := range ab.finishFn {
|
||||||
|
fn()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -1,6 +1,7 @@
|
|||||||
package imagedata
|
package imagedata
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/imgproxy/imgproxy/v3/config"
|
"github.com/imgproxy/imgproxy/v3/config"
|
||||||
@@ -18,16 +19,18 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type DownloadOptions struct {
|
type DownloadOptions struct {
|
||||||
Header http.Header
|
Header http.Header
|
||||||
CookieJar http.CookieJar
|
CookieJar http.CookieJar
|
||||||
MaxSrcFileSize int
|
MaxSrcFileSize int
|
||||||
|
DownloadFinished context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
func DefaultDownloadOptions() DownloadOptions {
|
func DefaultDownloadOptions() DownloadOptions {
|
||||||
return DownloadOptions{
|
return DownloadOptions{
|
||||||
Header: nil,
|
Header: nil,
|
||||||
CookieJar: nil,
|
CookieJar: nil,
|
||||||
MaxSrcFileSize: config.MaxSrcFileSize,
|
MaxSrcFileSize: config.MaxSrcFileSize,
|
||||||
|
DownloadFinished: nil,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -106,6 +106,10 @@ func sendRequest(ctx context.Context, url string, opts DownloadOptions) (*imagef
|
|||||||
|
|
||||||
// DownloadSync downloads the image synchronously and returns the ImageData and HTTP headers.
|
// DownloadSync downloads the image synchronously and returns the ImageData and HTTP headers.
|
||||||
func downloadSync(ctx context.Context, imageURL string, opts DownloadOptions) (ImageData, http.Header, error) {
|
func downloadSync(ctx context.Context, imageURL string, opts DownloadOptions) (ImageData, http.Header, error) {
|
||||||
|
if opts.DownloadFinished != nil {
|
||||||
|
defer opts.DownloadFinished()
|
||||||
|
}
|
||||||
|
|
||||||
req, res, h, err := sendRequest(ctx, imageURL, opts)
|
req, res, h, err := sendRequest(ctx, imageURL, opts)
|
||||||
if res != nil {
|
if res != nil {
|
||||||
defer res.Body.Close()
|
defer res.Body.Close()
|
||||||
@@ -140,10 +144,13 @@ func downloadAsync(ctx context.Context, imageURL string, opts DownloadOptions) (
|
|||||||
//nolint:bodyclose
|
//nolint:bodyclose
|
||||||
req, res, h, err := sendRequest(ctx, imageURL, opts)
|
req, res, h, err := sendRequest(ctx, imageURL, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if opts.DownloadFinished != nil {
|
||||||
|
defer opts.DownloadFinished()
|
||||||
|
}
|
||||||
return nil, h, err
|
return nil, h, err
|
||||||
}
|
}
|
||||||
|
|
||||||
b := asyncbuffer.New(res.Body)
|
b := asyncbuffer.New(res.Body, opts.DownloadFinished)
|
||||||
|
|
||||||
format, err := imagetype.Detect(b.Reader())
|
format, err := imagetype.Detect(b.Reader())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@@ -29,6 +29,7 @@ import (
|
|||||||
"github.com/imgproxy/imgproxy/v3/processing"
|
"github.com/imgproxy/imgproxy/v3/processing"
|
||||||
"github.com/imgproxy/imgproxy/v3/router"
|
"github.com/imgproxy/imgproxy/v3/router"
|
||||||
"github.com/imgproxy/imgproxy/v3/security"
|
"github.com/imgproxy/imgproxy/v3/security"
|
||||||
|
"github.com/imgproxy/imgproxy/v3/svg"
|
||||||
"github.com/imgproxy/imgproxy/v3/vips"
|
"github.com/imgproxy/imgproxy/v3/vips"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -361,15 +362,16 @@ func handleProcessing(reqID string, rw http.ResponseWriter, r *http.Request) {
|
|||||||
statusCode := http.StatusOK
|
statusCode := http.StatusOK
|
||||||
|
|
||||||
originData, originHeaders, err := func() (imagedata.ImageData, http.Header, error) {
|
originData, originHeaders, err := func() (imagedata.ImageData, http.Header, error) {
|
||||||
defer metrics.StartDownloadingSegment(ctx, metrics.Meta{
|
downloadFinished := metrics.StartDownloadingSegment(ctx, metrics.Meta{
|
||||||
metrics.MetaSourceImageURL: metricsMeta[metrics.MetaSourceImageURL],
|
metrics.MetaSourceImageURL: metricsMeta[metrics.MetaSourceImageURL],
|
||||||
metrics.MetaSourceImageOrigin: metricsMeta[metrics.MetaSourceImageOrigin],
|
metrics.MetaSourceImageOrigin: metricsMeta[metrics.MetaSourceImageOrigin],
|
||||||
})()
|
})
|
||||||
|
|
||||||
downloadOpts := imagedata.DownloadOptions{
|
downloadOpts := imagedata.DownloadOptions{
|
||||||
Header: imgRequestHeader,
|
Header: imgRequestHeader,
|
||||||
CookieJar: nil,
|
CookieJar: nil,
|
||||||
MaxSrcFileSize: po.SecurityOptions.MaxSrcFileSize,
|
MaxSrcFileSize: po.SecurityOptions.MaxSrcFileSize,
|
||||||
|
DownloadFinished: downloadFinished,
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.CookiePassthrough {
|
if config.CookiePassthrough {
|
||||||
@@ -468,6 +470,12 @@ func handleProcessing(reqID string, rw http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// svg_processing is an exception. We use As here because Is does not work with types.
|
||||||
|
var e svg.SanitizeError
|
||||||
|
if errors.As(err, &e) {
|
||||||
|
checkErr(ctx, "svg_processing", e)
|
||||||
|
}
|
||||||
|
|
||||||
// First, check if the processing error wasn't caused by an image data error
|
// First, check if the processing error wasn't caused by an image data error
|
||||||
checkErr(ctx, "download", originData.Error())
|
checkErr(ctx, "download", originData.Error())
|
||||||
|
|
||||||
|
@@ -341,6 +341,7 @@ func (s *ProcessingHandlerTestSuite) TestErrorSavingToSVG() {
|
|||||||
res := rw.Result()
|
res := rw.Result()
|
||||||
|
|
||||||
s.Require().Equal(422, res.StatusCode)
|
s.Require().Equal(422, res.StatusCode)
|
||||||
|
fmt.Println(io.ReadAll(res.Body)) // Read the body to avoid resource leak
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ProcessingHandlerTestSuite) TestCacheControlPassthroughCacheControl() {
|
func (s *ProcessingHandlerTestSuite) TestCacheControlPassthroughCacheControl() {
|
||||||
|
Reference in New Issue
Block a user