Files
imgproxy/handlers/stream/handler.go
Victor Sokolov c6a95facbb headerwriter in processing_handler.go (#1507)
* headerwriter in processing_handler.go

* Remove not required etag tests

* ETagEnabled, LastModifiedEnabled true by default

* Changed Passthrough signature

* Removed etag package

* Merge writeDebugHeaders*
2025-08-27 15:20:10 +02:00

197 lines
5.1 KiB
Go

package stream
import (
"context"
"io"
"net/http"
"sync"
"github.com/imgproxy/imgproxy/v3/cookies"
"github.com/imgproxy/imgproxy/v3/headerwriter"
"github.com/imgproxy/imgproxy/v3/httpheaders"
"github.com/imgproxy/imgproxy/v3/ierrors"
"github.com/imgproxy/imgproxy/v3/imagefetcher"
"github.com/imgproxy/imgproxy/v3/monitoring"
"github.com/imgproxy/imgproxy/v3/monitoring/stats"
"github.com/imgproxy/imgproxy/v3/options"
"github.com/imgproxy/imgproxy/v3/server"
log "github.com/sirupsen/logrus"
)
const (
streamBufferSize = 4096 // Size of the buffer used for streaming
categoryStreaming = "streaming" // Streaming error category
)
var (
// streamBufPool is a sync.Pool for reusing byte slices used for streaming
streamBufPool = sync.Pool{
New: func() any {
buf := make([]byte, streamBufferSize)
return &buf
},
}
)
// Handler handles image passthrough requests, allowing images to be streamed directly
type Handler struct {
config *Config // Configuration for the streamer
fetcher *imagefetcher.Fetcher // Fetcher instance to handle image fetching
hw *headerwriter.Writer // Configured HeaderWriter instance
}
// request holds the parameters and state for a single streaming request
type request struct {
handler *Handler
imageRequest *http.Request
imageURL string
reqID string
po *options.ProcessingOptions
rw http.ResponseWriter
}
// New creates new handler object
func New(config *Config, hw *headerwriter.Writer, fetcher *imagefetcher.Fetcher) (*Handler, error) {
if err := config.Validate(); err != nil {
return nil, err
}
return &Handler{
fetcher: fetcher,
config: config,
hw: hw,
}, nil
}
// Stream handles the image passthrough request, streaming the image directly to the response writer
func (s *Handler) Execute(
ctx context.Context,
userRequest *http.Request,
imageURL string,
reqID string,
po *options.ProcessingOptions,
rw http.ResponseWriter,
) error {
stream := &request{
handler: s,
imageRequest: userRequest,
imageURL: imageURL,
reqID: reqID,
po: po,
rw: rw,
}
return stream.execute(ctx)
}
// execute handles the actual streaming logic
func (s *request) execute(ctx context.Context) error {
stats.IncImagesInProgress()
defer stats.DecImagesInProgress()
defer monitoring.StartStreamingSegment(ctx)()
// Passthrough request headers from the original request
requestHeaders := s.getImageRequestHeaders()
cookieJar, err := s.getCookieJar()
if err != nil {
return ierrors.Wrap(err, 0, ierrors.WithCategory(categoryStreaming))
}
// Build the request to fetch the image
r, err := s.handler.fetcher.BuildRequest(ctx, s.imageURL, requestHeaders, cookieJar)
if r != nil {
defer r.Cancel()
}
if err != nil {
return ierrors.Wrap(err, 0, ierrors.WithCategory(categoryStreaming))
}
// Send the request to fetch the image
res, err := r.Send()
if res != nil {
defer res.Body.Close()
}
if err != nil {
return ierrors.Wrap(err, 0, ierrors.WithCategory(categoryStreaming))
}
// Output streaming response headers
hw := s.handler.hw.NewRequest(res.Header, s.imageURL)
hw.Passthrough(s.handler.config.PassthroughResponseHeaders...) // NOTE: priority? This is lowest as it was
hw.SetContentLength(int(res.ContentLength))
hw.SetCanonical()
hw.SetExpires(s.po.Expires)
// Set the Content-Disposition header
s.setContentDisposition(r.URL().Path, res, hw)
// Write headers from writer
hw.Write(s.rw)
// Copy the status code from the original response
s.rw.WriteHeader(res.StatusCode)
// Write the actual data
s.streamData(res)
return nil
}
// getCookieJar returns non-empty cookie jar if cookie passthrough is enabled
func (s *request) getCookieJar() (http.CookieJar, error) {
if !s.handler.config.CookiePassthrough {
return nil, nil
}
return cookies.JarFromRequest(s.imageRequest)
}
// getImageRequestHeaders returns a new http.Header containing only
// the headers that should be passed through from the user request
func (s *request) getImageRequestHeaders() http.Header {
h := make(http.Header)
httpheaders.CopyFromRequest(s.imageRequest, h, s.handler.config.PassthroughRequestHeaders)
return h
}
// setContentDisposition writes the headers to the response writer
func (s *request) setContentDisposition(imagePath string, serverResponse *http.Response, hw *headerwriter.Request) {
// Try to set correct Content-Disposition file name and extension
if serverResponse.StatusCode < 200 || serverResponse.StatusCode >= 300 {
return
}
ct := serverResponse.Header.Get(httpheaders.ContentType)
hw.SetContentDisposition(
imagePath,
s.po.Filename,
"",
ct,
s.po.ReturnAttachment,
)
}
// streamData copies the image data from the response body to the response writer
func (s *request) streamData(res *http.Response) {
buf := streamBufPool.Get().(*[]byte)
defer streamBufPool.Put(buf)
_, copyerr := io.CopyBuffer(s.rw, res.Body, *buf)
server.LogResponse(
s.reqID, s.imageRequest, res.StatusCode, nil,
log.Fields{
"image_url": s.imageURL,
"processing_options": s.po,
},
)
// We've got to skip logging here
if copyerr != nil {
panic(http.ErrAbortHandler)
}
}