mirror of
https://github.com/imgproxy/imgproxy.git
synced 2025-09-28 12:37:47 +02:00
Wrap asyncbuffer and imageDataAsyncBuffer errors
This commit is contained in:
committed by
Sergei Aleksandrovich
parent
4706416d5b
commit
d38720a0b8
@@ -22,6 +22,7 @@ import (
|
|||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/imgproxy/imgproxy/v3/ierrors"
|
||||||
"github.com/imgproxy/imgproxy/v3/ioutil"
|
"github.com/imgproxy/imgproxy/v3/ioutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -108,6 +109,17 @@ func (ab *AsyncBuffer) callFinishFn() {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ab *AsyncBuffer) setErr(err error) {
|
||||||
|
if err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the error is already set, we do not overwrite it
|
||||||
|
if ab.err.Load() == nil {
|
||||||
|
ab.err.Store(ierrors.Wrap(err, 1))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// addChunk adds a new chunk to the AsyncBuffer, increments bytesRead
|
// addChunk adds a new chunk to the AsyncBuffer, increments bytesRead
|
||||||
// and signals that a chunk is ready
|
// and signals that a chunk is ready
|
||||||
func (ab *AsyncBuffer) addChunk(chunk *byteChunk) {
|
func (ab *AsyncBuffer) addChunk(chunk *byteChunk) {
|
||||||
@@ -139,10 +151,10 @@ func (ab *AsyncBuffer) readChunks() {
|
|||||||
logrus.WithField("source", "asyncbuffer.AsyncBuffer.readChunks").Warningf("error closing upstream reader: %s", err)
|
logrus.WithField("source", "asyncbuffer.AsyncBuffer.readChunks").Warningf("error closing upstream reader: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if ab.bytesRead.Load() < int64(ab.dataLen) && ab.err.Load() == nil {
|
if ab.bytesRead.Load() < int64(ab.dataLen) {
|
||||||
// If the reader has finished reading and we have not read enough data,
|
// If the reader has finished reading and we have not read enough data,
|
||||||
// set err to io.ErrUnexpectedEOF
|
// set err to io.ErrUnexpectedEOF
|
||||||
ab.err.Store(io.ErrUnexpectedEOF)
|
ab.setErr(io.ErrUnexpectedEOF)
|
||||||
}
|
}
|
||||||
|
|
||||||
ab.callFinishFn()
|
ab.callFinishFn()
|
||||||
@@ -171,7 +183,7 @@ func (ab *AsyncBuffer) readChunks() {
|
|||||||
// If the pool is empty, it will create a new byteChunk with ChunkSize
|
// If the pool is empty, it will create a new byteChunk with ChunkSize
|
||||||
chunk, ok := chunkPool.Get().(*byteChunk)
|
chunk, ok := chunkPool.Get().(*byteChunk)
|
||||||
if !ok {
|
if !ok {
|
||||||
ab.err.Store(errors.New("asyncbuffer.AsyncBuffer.readChunks: failed to get chunk from pool"))
|
ab.setErr(errors.New("asyncbuffer.AsyncBuffer.readChunks: failed to get chunk from pool"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -182,7 +194,7 @@ func (ab *AsyncBuffer) readChunks() {
|
|||||||
|
|
||||||
// If it's not the EOF, we need to store the error
|
// If it's not the EOF, we need to store the error
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
ab.err.Store(err)
|
ab.setErr(err)
|
||||||
chunkPool.Put(chunk)
|
chunkPool.Put(chunk)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
14
imagedata/errors.go
Normal file
14
imagedata/errors.go
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
package imagedata
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/imgproxy/imgproxy/v3/ierrors"
|
||||||
|
)
|
||||||
|
|
||||||
|
func wrapDownloadError(err error, desc string) error {
|
||||||
|
return ierrors.Wrap(
|
||||||
|
err, 0,
|
||||||
|
ierrors.WithPrefix(fmt.Sprintf("can't download %s", desc)),
|
||||||
|
)
|
||||||
|
}
|
@@ -4,13 +4,11 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"github.com/imgproxy/imgproxy/v3/asyncbuffer"
|
"github.com/imgproxy/imgproxy/v3/asyncbuffer"
|
||||||
"github.com/imgproxy/imgproxy/v3/ierrors"
|
|
||||||
"github.com/imgproxy/imgproxy/v3/imagefetcher"
|
"github.com/imgproxy/imgproxy/v3/imagefetcher"
|
||||||
"github.com/imgproxy/imgproxy/v3/imagetype"
|
"github.com/imgproxy/imgproxy/v3/imagetype"
|
||||||
"github.com/imgproxy/imgproxy/v3/security"
|
"github.com/imgproxy/imgproxy/v3/security"
|
||||||
@@ -105,7 +103,7 @@ func sendRequest(ctx context.Context, url string, opts DownloadOptions) (*imagef
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DownloadSync downloads the image synchronously and returns the ImageData and HTTP headers.
|
// DownloadSync downloads the image synchronously and returns the ImageData and HTTP headers.
|
||||||
func downloadSync(ctx context.Context, imageURL string, opts DownloadOptions) (ImageData, http.Header, error) {
|
func DownloadSync(ctx context.Context, imageURL, desc string, opts DownloadOptions) (ImageData, http.Header, error) {
|
||||||
if opts.DownloadFinished != nil {
|
if opts.DownloadFinished != nil {
|
||||||
defer opts.DownloadFinished()
|
defer opts.DownloadFinished()
|
||||||
}
|
}
|
||||||
@@ -120,26 +118,26 @@ func downloadSync(ctx context.Context, imageURL string, opts DownloadOptions) (I
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, h, err
|
return nil, h, wrapDownloadError(err, desc)
|
||||||
}
|
}
|
||||||
|
|
||||||
b, err := io.ReadAll(res.Body)
|
b, err := io.ReadAll(res.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, h, err
|
return nil, h, wrapDownloadError(err, desc)
|
||||||
}
|
}
|
||||||
|
|
||||||
format, err := imagetype.Detect(bytes.NewReader(b))
|
format, err := imagetype.Detect(bytes.NewReader(b))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, h, err
|
return nil, h, wrapDownloadError(err, desc)
|
||||||
}
|
}
|
||||||
|
|
||||||
d := NewFromBytesWithFormat(format, b)
|
d := NewFromBytesWithFormat(format, b)
|
||||||
return d, h, err
|
return d, h, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// downloadAsync downloads the image asynchronously and returns the ImageData
|
// DownloadAsync downloads the image asynchronously and returns the ImageData
|
||||||
// backed by AsyncBuffer and HTTP headers.
|
// backed by AsyncBuffer and HTTP headers.
|
||||||
func downloadAsync(ctx context.Context, imageURL string, opts DownloadOptions) (ImageData, http.Header, error) {
|
func DownloadAsync(ctx context.Context, imageURL, desc string, opts DownloadOptions) (ImageData, http.Header, error) {
|
||||||
// We pass this responsibility to AsyncBuffer
|
// We pass this responsibility to AsyncBuffer
|
||||||
//nolint:bodyclose
|
//nolint:bodyclose
|
||||||
req, res, h, err := sendRequest(ctx, imageURL, opts)
|
req, res, h, err := sendRequest(ctx, imageURL, opts)
|
||||||
@@ -147,7 +145,7 @@ func downloadAsync(ctx context.Context, imageURL string, opts DownloadOptions) (
|
|||||||
if opts.DownloadFinished != nil {
|
if opts.DownloadFinished != nil {
|
||||||
defer opts.DownloadFinished()
|
defer opts.DownloadFinished()
|
||||||
}
|
}
|
||||||
return nil, h, err
|
return nil, h, wrapDownloadError(err, desc)
|
||||||
}
|
}
|
||||||
|
|
||||||
b := asyncbuffer.New(res.Body, int(res.ContentLength), opts.DownloadFinished)
|
b := asyncbuffer.New(res.Body, int(res.ContentLength), opts.DownloadFinished)
|
||||||
@@ -156,43 +154,16 @@ func downloadAsync(ctx context.Context, imageURL string, opts DownloadOptions) (
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
b.Close()
|
b.Close()
|
||||||
req.Cancel()
|
req.Cancel()
|
||||||
return nil, h, err
|
return nil, h, wrapDownloadError(err, desc)
|
||||||
}
|
}
|
||||||
|
|
||||||
d := &imageDataAsyncBuffer{
|
d := &imageDataAsyncBuffer{
|
||||||
b: b,
|
b: b,
|
||||||
format: format,
|
format: format,
|
||||||
|
desc: desc,
|
||||||
cancel: nil,
|
cancel: nil,
|
||||||
}
|
}
|
||||||
d.AddCancel(req.Cancel) // request will be closed when the image data is consumed
|
d.AddCancel(req.Cancel) // request will be closed when the image data is consumed
|
||||||
|
|
||||||
return d, h, err
|
return d, h, nil
|
||||||
}
|
|
||||||
|
|
||||||
// DownloadSyncWithDesc downloads the image synchronously and returns the ImageData, but
|
|
||||||
// wraps errors with desc.
|
|
||||||
func DownloadSync(ctx context.Context, imageURL, desc string, opts DownloadOptions) (ImageData, http.Header, error) {
|
|
||||||
imgdata, h, err := downloadSync(ctx, imageURL, opts)
|
|
||||||
if err != nil {
|
|
||||||
return nil, h, ierrors.Wrap(
|
|
||||||
err, 0,
|
|
||||||
ierrors.WithPrefix(fmt.Sprintf("can't download %s", desc)),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
return imgdata, h, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// DownloadSyncWithDesc downloads the image synchronously and returns the ImageData, but
|
|
||||||
// wraps errors with desc.
|
|
||||||
func DownloadAsync(ctx context.Context, imageURL, desc string, opts DownloadOptions) (ImageData, http.Header, error) {
|
|
||||||
imgdata, h, err := downloadAsync(ctx, imageURL, opts)
|
|
||||||
if err != nil {
|
|
||||||
return nil, h, ierrors.Wrap(
|
|
||||||
err, 0,
|
|
||||||
ierrors.WithPrefix(fmt.Sprintf("can't download %s", desc)),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
return imgdata, h, nil
|
|
||||||
}
|
}
|
||||||
|
@@ -47,6 +47,7 @@ type imageDataBytes struct {
|
|||||||
type imageDataAsyncBuffer struct {
|
type imageDataAsyncBuffer struct {
|
||||||
b *asyncbuffer.AsyncBuffer
|
b *asyncbuffer.AsyncBuffer
|
||||||
format imagetype.Type
|
format imagetype.Type
|
||||||
|
desc string
|
||||||
cancel []context.CancelFunc
|
cancel []context.CancelFunc
|
||||||
cancelOnce sync.Once
|
cancelOnce sync.Once
|
||||||
}
|
}
|
||||||
@@ -123,7 +124,10 @@ func (d *imageDataAsyncBuffer) AddCancel(cancel context.CancelFunc) {
|
|||||||
// Error returns any error that occurred during reading data from
|
// Error returns any error that occurred during reading data from
|
||||||
// async buffer or the underlying source.
|
// async buffer or the underlying source.
|
||||||
func (d *imageDataAsyncBuffer) Error() error {
|
func (d *imageDataAsyncBuffer) Error() error {
|
||||||
return d.b.Error()
|
if err := d.b.Error(); err != nil {
|
||||||
|
return wrapDownloadError(err, d.desc)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func Init() error {
|
func Init() error {
|
||||||
|
@@ -462,8 +462,8 @@ func handleProcessing(reqID string, rw http.ResponseWriter, r *http.Request) err
|
|||||||
}
|
}
|
||||||
|
|
||||||
// First, check if the processing error wasn't caused by an image data error
|
// First, check if the processing error wasn't caused by an image data error
|
||||||
if originData.Error() != nil {
|
if derr := originData.Error(); derr != nil {
|
||||||
return ierrors.Wrap(originData.Error(), 0, ierrors.WithCategory(categoryDownload))
|
return ierrors.Wrap(derr, 0, ierrors.WithCategory(categoryDownload))
|
||||||
}
|
}
|
||||||
|
|
||||||
// If it wasn't, than it was a processing error
|
// If it wasn't, than it was a processing error
|
||||||
|
Reference in New Issue
Block a user