fasthttp; Optimized memory allocation

This commit is contained in:
DarthSim
2018-10-05 21:17:36 +06:00
parent 6e8933198f
commit 34a61d287f
95 changed files with 23591 additions and 179 deletions

208
server.go
View File

@@ -2,63 +2,61 @@ package main
import (
"bytes"
"compress/gzip"
"context"
"crypto/subtle"
"fmt"
"log"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
nanoid "github.com/matoous/go-nanoid"
"golang.org/x/net/netutil"
"github.com/valyala/fasthttp"
)
var mimes = map[imageType]string{
imageTypeJPEG: "image/jpeg",
imageTypePNG: "image/png",
imageTypeWEBP: "image/webp",
}
type httpHandler struct {
sem chan struct{}
}
func newHTTPHandler() *httpHandler {
return &httpHandler{make(chan struct{}, conf.Concurrency)}
}
func startServer() *http.Server {
l, err := net.Listen("tcp", conf.Bind)
if err != nil {
log.Fatal(err)
var (
mimes = map[imageType]string{
imageTypeJPEG: "image/jpeg",
imageTypePNG: "image/png",
imageTypeWEBP: "image/webp",
}
s := &http.Server{
Handler: newHTTPHandler(),
ReadTimeout: time.Duration(conf.ReadTimeout) * time.Second,
MaxHeaderBytes: 1 << 20,
responseBufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
authHeaderMust []byte
healthRequestURI = []byte("/health")
serverMutex mutex
)
func startServer() *fasthttp.Server {
serverMutex = newMutex(conf.Concurrency)
s := &fasthttp.Server{
Name: "imgproxy",
Handler: serveHTTP,
Concurrency: conf.MaxClients,
ReadTimeout: time.Duration(conf.ReadTimeout) * time.Second,
}
go func() {
log.Printf("Starting server at %s\n", conf.Bind)
log.Fatal(s.Serve(netutil.LimitListener(l, conf.MaxClients)))
if err := s.ListenAndServe(conf.Bind); err != nil {
log.Fatalln(err)
}
}()
return s
}
func shutdownServer(s *http.Server) {
func shutdownServer(s *fasthttp.Server) {
log.Println("Shutting down the server...")
ctx, close := context.WithTimeout(context.Background(), 5*time.Second)
defer close()
s.Shutdown(ctx)
s.Shutdown()
}
func logResponse(status int, msg string) {
@@ -75,143 +73,137 @@ func logResponse(status int, msg string) {
log.Printf("|\033[7;%dm %d \033[0m| %s\n", color, status, msg)
}
func writeCORS(rw http.ResponseWriter) {
func writeCORS(rctx *fasthttp.RequestCtx) {
if len(conf.AllowOrigin) > 0 {
rw.Header().Set("Access-Control-Allow-Origin", conf.AllowOrigin)
rw.Header().Set("Access-Control-Allow-Methods", "GET, OPTIONs")
rctx.Request.Header.Set("Access-Control-Allow-Origin", conf.AllowOrigin)
rctx.Request.Header.Set("Access-Control-Allow-Methods", "GET, OPTIONs")
}
}
func respondWithImage(reqID string, r *http.Request, rw http.ResponseWriter, data []byte, imgURL string, po processingOptions, duration time.Duration) {
gzipped := strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") && conf.GZipCompression > 0
func respondWithImage(ctx context.Context, reqID string, rctx *fasthttp.RequestCtx, data []byte) {
rctx.SetStatusCode(200)
rw.Header().Set("Expires", time.Now().Add(time.Second*time.Duration(conf.TTL)).Format(http.TimeFormat))
rw.Header().Set("Cache-Control", fmt.Sprintf("max-age=%d, public", conf.TTL))
rw.Header().Set("Content-Type", mimes[po.Format])
po := getprocessingOptions(ctx)
dataToRespond := data
rctx.SetContentType(mimes[po.Format])
rctx.Response.Header.Set("Cache-Control", fmt.Sprintf("max-age=%d, public", conf.TTL))
rctx.Response.Header.Set("Expires", time.Now().Add(time.Second*time.Duration(conf.TTL)).Format(http.TimeFormat))
if gzipped {
var buf bytes.Buffer
gz, _ := gzip.NewWriterLevel(&buf, conf.GZipCompression)
gz.Write(data)
gz.Close()
dataToRespond = buf.Bytes()
rw.Header().Set("Content-Encoding", "gzip")
if conf.GZipCompression > 0 && rctx.Request.Header.HasAcceptEncodingBytes([]byte("gzip")) {
rctx.Response.Header.Set("Content-Encoding", "gzip")
gzipData(data, rctx)
} else {
rctx.SetBody(data)
}
rw.Header().Set("Content-Length", strconv.Itoa(len(dataToRespond)))
rw.WriteHeader(200)
rw.Write(dataToRespond)
logResponse(200, fmt.Sprintf("[%s] Processed in %s: %s; %+v", reqID, duration, imgURL, po))
logResponse(200, fmt.Sprintf("[%s] Processed in %s: %s; %+v", reqID, getTimerSince(ctx), getImageURL(ctx), po))
}
func respondWithError(reqID string, rw http.ResponseWriter, err imgproxyError) {
func respondWithError(reqID string, rctx *fasthttp.RequestCtx, err imgproxyError) {
logResponse(err.StatusCode, fmt.Sprintf("[%s] %s", reqID, err.Message))
rw.WriteHeader(err.StatusCode)
rw.Write([]byte(err.PublicMessage))
rctx.SetStatusCode(err.StatusCode)
rctx.SetBodyString(err.PublicMessage)
}
func respondWithOptions(reqID string, rw http.ResponseWriter) {
func respondWithOptions(reqID string, rctx *fasthttp.RequestCtx) {
logResponse(200, fmt.Sprintf("[%s] Respond with options", reqID))
rw.WriteHeader(200)
rctx.SetStatusCode(200)
}
func checkSecret(s string) bool {
func prepareAuthHeaderMust() []byte {
if len(authHeaderMust) == 0 {
buf := bytes.NewBufferString("Bearer ")
buf.WriteString(conf.Secret)
authHeaderMust = []byte(conf.Secret)
}
return authHeaderMust
}
func checkSecret(rctx *fasthttp.RequestCtx) bool {
if len(conf.Secret) == 0 {
return true
}
return strings.HasPrefix(s, "Bearer ") && subtle.ConstantTimeCompare([]byte(strings.TrimPrefix(s, "Bearer ")), []byte(conf.Secret)) == 1
return subtle.ConstantTimeCompare(
rctx.Request.Header.Peek("Authorization"),
prepareAuthHeaderMust(),
) == 1
}
func (h *httpHandler) lock() {
h.sem <- struct{}{}
}
func (h *httpHandler) unlock() {
<-h.sem
}
func (h *httpHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
func serveHTTP(rctx *fasthttp.RequestCtx) {
reqID, _ := nanoid.Nanoid()
defer func() {
if r := recover(); r != nil {
if err, ok := r.(imgproxyError); ok {
respondWithError(reqID, rw, err)
respondWithError(reqID, rctx, err)
} else {
respondWithError(reqID, rw, newUnexpectedError(r.(error), 4))
respondWithError(reqID, rctx, newUnexpectedError(r.(error), 4))
}
}
}()
log.Printf("[%s] %s: %s\n", reqID, r.Method, r.URL.RequestURI())
log.Printf("[%s] %s: %s\n", reqID, rctx.Method(), rctx.RequestURI())
writeCORS(rw)
writeCORS(rctx)
if r.Method == http.MethodOptions {
respondWithOptions(reqID, rw)
if rctx.Request.Header.IsOptions() {
respondWithOptions(reqID, rctx)
return
}
if r.Method != http.MethodGet {
if !rctx.IsGet() {
panic(invalidMethodErr)
}
if !checkSecret(r.Header.Get("Authorization")) {
if !checkSecret(rctx) {
panic(invalidSecretErr)
}
h.lock()
defer h.unlock()
serverMutex.Lock()
defer serverMutex.Unock()
if r.URL.Path == "/health" {
rw.WriteHeader(200)
rw.Write([]byte("imgproxy is running"))
if bytes.Equal(rctx.RequestURI(), healthRequestURI) {
rctx.SetStatusCode(200)
rctx.SetBodyString("imgproxy is running")
return
}
t := startTimer(time.Duration(conf.WriteTimeout)*time.Second, "Processing")
ctx, timeoutCancel := startTimer(time.Duration(conf.WriteTimeout) * time.Second)
defer timeoutCancel()
imgURL, procOpt, err := parsePath(r)
ctx, err := parsePath(ctx, rctx)
if err != nil {
panic(newError(404, err.Error(), "Invalid image url"))
}
if _, err = url.ParseRequestURI(imgURL); err != nil {
panic(newError(404, err.Error(), "Invalid image url"))
}
b, imgtype, err := downloadImage(imgURL)
ctx, downloadcancel, err := downloadImage(ctx)
defer downloadcancel()
if err != nil {
panic(newError(404, err.Error(), "Image is unreachable"))
}
t.Check()
checkTimeout(ctx)
if conf.ETagEnabled {
eTag := calcETag(b, &procOpt)
rw.Header().Set("ETag", eTag)
// if conf.ETagEnabled {
// eTag := calcETag(b, &procOpt)
// rw.Header().Set("ETag", eTag)
if eTag == r.Header.Get("If-None-Match") {
panic(notModifiedErr)
}
}
// if eTag == r.Header.Get("If-None-Match") {
// panic(notModifiedErr)
// }
// }
t.Check()
checkTimeout(ctx)
b, err = processImage(b, imgtype, procOpt, t)
imageData, err := processImage(ctx)
if err != nil {
panic(newError(500, err.Error(), "Error occurred while processing image"))
}
t.Check()
checkTimeout(ctx)
respondWithImage(reqID, r, rw, b, imgURL, procOpt, t.Since())
respondWithImage(ctx, reqID, rctx, imageData)
}