IMG-52: rename metrics to monitoring (#1498)

* Removed bufpool

* metrics -> monitoring

* metricsMeta -> monitoringMeta

* monitoring.Meta -> Filter
This commit is contained in:
Victor Sokolov
2025-08-20 17:17:18 +02:00
committed by GitHub
parent 08d20d46e6
commit 697c2ddcd6
17 changed files with 97 additions and 402 deletions

View File

@@ -1,302 +0,0 @@
package bufpool
// Based on https://github.com/valyala/bytebufferpool ideas
import (
"bytes"
"runtime"
"sync"
"sync/atomic"
"github.com/imgproxy/imgproxy/v3/config"
"github.com/imgproxy/imgproxy/v3/metrics"
)
const (
minBitSize = 6 // 2**6=64 is min bytes.Buffer capacity
steps = 20
minSize = 1 << minBitSize
)
var entriesPool = sync.Pool{
New: func() any {
return new(entry)
},
}
type entry struct {
buf *bytes.Buffer
prev, next *entry
}
type Pool struct {
name string
defaultSize int
maxSize uint64
root *entry
maxLen int
calls [steps]uint64
tmpCalls [steps]uint64
callsNum uint64
storeMu sync.Mutex
calibratingMu sync.Mutex
}
func New(name string, n int, defaultSize int) *Pool {
pool := Pool{
name: name,
defaultSize: defaultSize,
root: &entry{},
maxLen: n,
}
return &pool
}
func (p *Pool) insert(buf *bytes.Buffer) {
e := entriesPool.Get().(*entry)
e.buf = buf
e.next = p.root.next
e.prev = p.root
p.root.next = e
}
func (p *Pool) remove(e *entry) {
if e.next != nil {
e.next.prev = e.prev
}
e.prev.next = e.next
saveEntry(e)
}
func (p *Pool) calibrateAndClean() {
if !p.calibratingMu.TryLock() {
return
}
defer p.calibratingMu.Unlock()
var callsSum uint64
for i := 0; i < steps; i++ {
calls := atomic.SwapUint64(&p.calls[i], 0)
callsSum += calls
p.tmpCalls[i] = calls
}
if callsSum < uint64(config.BufferPoolCalibrationThreshold) {
return
}
atomic.StoreUint64(&p.callsNum, 0)
defSum := uint64(float64(callsSum) * 0.5)
maxSum := uint64(float64(callsSum) * 0.95)
defStep := -1
maxStep := -1
callsSum = 0
for i := 0; i < steps; i++ {
callsSum += p.tmpCalls[i]
if defStep < 0 && callsSum > defSum {
defStep = i
}
if callsSum > maxSum {
maxStep = i
break
}
}
p.defaultSize = minSize << defStep
p.maxSize = minSize << maxStep
maxSize := int(p.maxSize)
metrics.SetBufferDefaultSize(p.name, p.defaultSize)
metrics.SetBufferMaxSize(p.name, maxSize)
p.storeMu.Lock()
storeUnlocked := false
defer func() {
if !storeUnlocked {
p.storeMu.Unlock()
}
}()
cleaned := false
last := p.root
poolLen := 0
for entry := p.root.next; entry != nil; entry = last.next {
if poolLen >= p.maxLen || entry.buf.Cap() > maxSize {
last.next = entry.next
saveEntry(entry)
cleaned = true
} else {
last.next = entry
entry.prev = last
last = entry
poolLen++
}
}
// early unlock
p.storeMu.Unlock()
storeUnlocked = true
if cleaned {
runtime.GC()
}
}
func (p *Pool) Get(size int, grow bool) *bytes.Buffer {
p.storeMu.Lock()
storeUnlocked := false
defer func() {
if !storeUnlocked {
p.storeMu.Unlock()
}
}()
best := (*entry)(nil)
bestCap := -1
min := (*entry)(nil)
minCap := -1
for entry := p.root.next; entry != nil; entry = entry.next {
cap := entry.buf.Cap()
if size > 0 {
// If we know the required size, pick a buffer with the smallest size
// that is larger than the requested size
if cap >= size && (bestCap > cap || best == nil) {
best = entry
bestCap = cap
}
if cap < minCap || minCap == -1 {
min = entry
minCap = cap
}
} else if cap > bestCap {
// If we don't know the requested size, pick a largest buffer
best = entry
bestCap = cap
}
}
var buf *bytes.Buffer
switch {
case best != nil:
buf = best.buf
p.remove(best)
case min != nil:
buf = min.buf
p.remove(min)
default:
buf = new(bytes.Buffer)
}
// early unlock
p.storeMu.Unlock()
storeUnlocked = true
buf.Reset()
growSize := p.defaultSize
if grow {
growSize = max(p.normalizeCap(size), growSize)
}
// Grow the buffer only if we know the requested size and it is smaller than
// or equal to the grow size. Otherwise we'll grow the buffer twice
if size > 0 && size <= growSize && growSize > buf.Cap() {
buf.Grow(growSize)
}
return buf
}
func (p *Pool) Put(buf *bytes.Buffer) {
bufLen := buf.Len()
bufCap := buf.Cap()
if bufLen > 0 {
ind := index(bufLen)
atomic.AddUint64(&p.calls[ind], 1)
if atomic.AddUint64(&p.callsNum, 1) >= uint64(config.BufferPoolCalibrationThreshold) {
p.calibrateAndClean()
}
}
size := buf.Cap()
maxSize := int(atomic.LoadUint64(&p.maxSize))
if maxSize > 0 && size > maxSize {
return
}
if bufLen > 0 {
metrics.ObserveBufferSize(p.name, bufCap)
}
p.storeMu.Lock()
defer p.storeMu.Unlock()
p.insert(buf)
}
// GrowBuffer growth capacity of the buffer to the normalized provided value
func (p *Pool) GrowBuffer(buf *bytes.Buffer, cap int) {
cap = p.normalizeCap(cap)
if buf.Cap() < cap {
buf.Grow(cap - buf.Len())
}
}
func (p *Pool) normalizeCap(cap int) int {
// Don't normalize cap if it's larger than maxSize
// since we'll throw this buf out anyway
maxSize := int(atomic.LoadUint64(&p.maxSize))
if maxSize > 0 && cap > maxSize {
return cap
}
ind := index(cap)
return max(cap, minSize<<ind)
}
func saveEntry(e *entry) {
e.buf = nil
e.next = nil
e.prev = nil
entriesPool.Put(e)
}
func index(n int) int {
n--
n >>= minBitSize
idx := 0
for n > 0 {
n >>= 1
idx++
}
if idx >= steps {
idx = steps - 1
}
return idx
}

View File

@@ -1,48 +0,0 @@
package bufpool
import (
"math/rand"
"sync"
"testing"
"github.com/imgproxy/imgproxy/v3/config"
)
var (
testData [][]byte
testDataOnce sync.Once
testMu sync.Mutex
)
func initTestData() {
testData = make([][]byte, 1000)
for i := 6; i < 1000; i++ {
testData[i] = make([]byte, i*1271)
}
rand.Shuffle(len(testData), func(i, j int) { testData[i], testData[j] = testData[j], testData[i] })
}
func BenchmarkBufpool(b *testing.B) {
testMu.Lock()
defer testMu.Unlock()
config.Reset()
testDataOnce.Do(initTestData)
pool := New("test", 16, 0)
b.ResetTimer()
b.SetParallelism(16)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
for _, bb := range testData {
buf := pool.Get(len(bb), false)
buf.Write(bb)
pool.Put(buf)
}
}
})
}

10
main.go
View File

@@ -20,8 +20,8 @@ import (
"github.com/imgproxy/imgproxy/v3/imagedata" "github.com/imgproxy/imgproxy/v3/imagedata"
"github.com/imgproxy/imgproxy/v3/logger" "github.com/imgproxy/imgproxy/v3/logger"
"github.com/imgproxy/imgproxy/v3/memory" "github.com/imgproxy/imgproxy/v3/memory"
"github.com/imgproxy/imgproxy/v3/metrics" "github.com/imgproxy/imgproxy/v3/monitoring"
"github.com/imgproxy/imgproxy/v3/metrics/prometheus" "github.com/imgproxy/imgproxy/v3/monitoring/prometheus"
"github.com/imgproxy/imgproxy/v3/options" "github.com/imgproxy/imgproxy/v3/options"
"github.com/imgproxy/imgproxy/v3/processing" "github.com/imgproxy/imgproxy/v3/processing"
"github.com/imgproxy/imgproxy/v3/server" "github.com/imgproxy/imgproxy/v3/server"
@@ -40,7 +40,7 @@ func buildRouter(r *server.Router) *server.Router {
r.GET( r.GET(
"/", false, handleProcessing, "/", false, handleProcessing,
r.WithSecret, r.WithCORS, r.WithPanic, r.WithReportError, r.WithMetrics, r.WithSecret, r.WithCORS, r.WithPanic, r.WithReportError, r.WithMonitoring,
) )
r.HEAD("/", false, r.OkHandler, r.WithCORS) r.HEAD("/", false, r.OkHandler, r.WithCORS)
@@ -72,7 +72,7 @@ func initialize() error {
return err return err
} }
if err := metrics.Init(); err != nil { if err := monitoring.Init(); err != nil {
return err return err
} }
@@ -108,7 +108,7 @@ func initialize() error {
func shutdown() { func shutdown() {
vips.Shutdown() vips.Shutdown()
metrics.Stop() monitoring.Stop()
errorreport.Close() errorreport.Close()
} }

View File

@@ -14,7 +14,7 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/imgproxy/imgproxy/v3/config" "github.com/imgproxy/imgproxy/v3/config"
"github.com/imgproxy/imgproxy/v3/metrics/stats" "github.com/imgproxy/imgproxy/v3/monitoring/stats"
) )
type GaugeFunc func() float64 type GaugeFunc func() float64

View File

@@ -17,8 +17,8 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/imgproxy/imgproxy/v3/config" "github.com/imgproxy/imgproxy/v3/config"
"github.com/imgproxy/imgproxy/v3/metrics/errformat" "github.com/imgproxy/imgproxy/v3/monitoring/errformat"
"github.com/imgproxy/imgproxy/v3/metrics/stats" "github.com/imgproxy/imgproxy/v3/monitoring/stats"
"github.com/imgproxy/imgproxy/v3/version" "github.com/imgproxy/imgproxy/v3/version"
) )

View File

@@ -1,14 +1,14 @@
package metrics package monitoring
import ( import (
"context" "context"
"net/http" "net/http"
"github.com/imgproxy/imgproxy/v3/metrics/cloudwatch" "github.com/imgproxy/imgproxy/v3/monitoring/cloudwatch"
"github.com/imgproxy/imgproxy/v3/metrics/datadog" "github.com/imgproxy/imgproxy/v3/monitoring/datadog"
"github.com/imgproxy/imgproxy/v3/metrics/newrelic" "github.com/imgproxy/imgproxy/v3/monitoring/newrelic"
"github.com/imgproxy/imgproxy/v3/metrics/otel" "github.com/imgproxy/imgproxy/v3/monitoring/otel"
"github.com/imgproxy/imgproxy/v3/metrics/prometheus" "github.com/imgproxy/imgproxy/v3/monitoring/prometheus"
) )
const ( const (
@@ -19,6 +19,17 @@ const (
type Meta map[string]any type Meta map[string]any
// Filter creates a copy of Meta with only the specified keys.
func (m Meta) Filter(only ...string) Meta {
filtered := make(Meta)
for _, key := range only {
if value, ok := m[key]; ok {
filtered[key] = value
}
}
return filtered
}
func Init() error { func Init() error {
prometheus.Init() prometheus.Init()

View File

@@ -0,0 +1,37 @@
package monitoring
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestMetaFilter(t *testing.T) {
// Create a Meta with some test data
meta := Meta{
"key1": "value1",
"key2": "value2",
"key3": "value3",
"key4": 42,
}
// Test filtering with existing keys
filtered := meta.Filter("key1", "key3")
// Check that filtered meta has the correct keys
require.Len(t, filtered, 2)
require.Equal(t, "value1", filtered["key1"])
require.Equal(t, "value3", filtered["key3"])
// Check that non-requested keys are not present
require.NotContains(t, filtered, "key2")
require.NotContains(t, filtered, "key4")
// Test filtering with non-existing keys
filtered2 := meta.Filter("nonexistent")
require.Empty(t, filtered2)
// Test filtering with empty parameters
filtered3 := meta.Filter()
require.Empty(t, filtered3)
}

View File

@@ -15,8 +15,8 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/imgproxy/imgproxy/v3/config" "github.com/imgproxy/imgproxy/v3/config"
"github.com/imgproxy/imgproxy/v3/metrics/errformat" "github.com/imgproxy/imgproxy/v3/monitoring/errformat"
"github.com/imgproxy/imgproxy/v3/metrics/stats" "github.com/imgproxy/imgproxy/v3/monitoring/stats"
) )
type transactionCtxKey struct{} type transactionCtxKey struct{}

View File

@@ -44,8 +44,8 @@ import (
"github.com/imgproxy/imgproxy/v3/config" "github.com/imgproxy/imgproxy/v3/config"
"github.com/imgproxy/imgproxy/v3/config/configurators" "github.com/imgproxy/imgproxy/v3/config/configurators"
"github.com/imgproxy/imgproxy/v3/ierrors" "github.com/imgproxy/imgproxy/v3/ierrors"
"github.com/imgproxy/imgproxy/v3/metrics/errformat" "github.com/imgproxy/imgproxy/v3/monitoring/errformat"
"github.com/imgproxy/imgproxy/v3/metrics/stats" "github.com/imgproxy/imgproxy/v3/monitoring/stats"
"github.com/imgproxy/imgproxy/v3/version" "github.com/imgproxy/imgproxy/v3/version"
) )

View File

@@ -13,7 +13,7 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/imgproxy/imgproxy/v3/config" "github.com/imgproxy/imgproxy/v3/config"
"github.com/imgproxy/imgproxy/v3/metrics/stats" "github.com/imgproxy/imgproxy/v3/monitoring/stats"
"github.com/imgproxy/imgproxy/v3/reuseport" "github.com/imgproxy/imgproxy/v3/reuseport"
) )

View File

@@ -22,8 +22,8 @@ import (
"github.com/imgproxy/imgproxy/v3/imagedata" "github.com/imgproxy/imgproxy/v3/imagedata"
"github.com/imgproxy/imgproxy/v3/imagefetcher" "github.com/imgproxy/imgproxy/v3/imagefetcher"
"github.com/imgproxy/imgproxy/v3/imagetype" "github.com/imgproxy/imgproxy/v3/imagetype"
"github.com/imgproxy/imgproxy/v3/metrics" "github.com/imgproxy/imgproxy/v3/monitoring"
"github.com/imgproxy/imgproxy/v3/metrics/stats" "github.com/imgproxy/imgproxy/v3/monitoring/stats"
"github.com/imgproxy/imgproxy/v3/options" "github.com/imgproxy/imgproxy/v3/options"
"github.com/imgproxy/imgproxy/v3/processing" "github.com/imgproxy/imgproxy/v3/processing"
"github.com/imgproxy/imgproxy/v3/security" "github.com/imgproxy/imgproxy/v3/security"
@@ -261,13 +261,13 @@ func handleProcessing(reqID string, rw http.ResponseWriter, r *http.Request) err
errorreport.SetMetadata(r, "Source Image Origin", imageOrigin) errorreport.SetMetadata(r, "Source Image Origin", imageOrigin)
errorreport.SetMetadata(r, "Processing Options", po) errorreport.SetMetadata(r, "Processing Options", po)
metricsMeta := metrics.Meta{ monitoringMeta := monitoring.Meta{
metrics.MetaSourceImageURL: imageURL, monitoring.MetaSourceImageURL: imageURL,
metrics.MetaSourceImageOrigin: imageOrigin, monitoring.MetaSourceImageOrigin: imageOrigin,
metrics.MetaProcessingOptions: po.Diff().Flatten(), monitoring.MetaProcessingOptions: po.Diff().Flatten(),
} }
metrics.SetMetadata(ctx, metricsMeta) monitoring.SetMetadata(ctx, monitoringMeta)
err = security.VerifySourceURL(imageURL) err = security.VerifySourceURL(imageURL)
if err != nil { if err != nil {
@@ -316,7 +316,7 @@ func handleProcessing(reqID string, rw http.ResponseWriter, r *http.Request) err
// The heavy part starts here, so we need to restrict worker number // The heavy part starts here, so we need to restrict worker number
err = func() error { err = func() error {
defer metrics.StartQueueSegment(ctx)() defer monitoring.StartQueueSegment(ctx)()
err = processingSem.Acquire(ctx, 1) err = processingSem.Acquire(ctx, 1)
if err != nil { if err != nil {
@@ -346,10 +346,10 @@ func handleProcessing(reqID string, rw http.ResponseWriter, r *http.Request) err
statusCode := http.StatusOK statusCode := http.StatusOK
originData, originHeaders, err := func() (imagedata.ImageData, http.Header, error) { originData, originHeaders, err := func() (imagedata.ImageData, http.Header, error) {
downloadFinished := metrics.StartDownloadingSegment(ctx, metrics.Meta{ downloadFinished := monitoring.StartDownloadingSegment(ctx, monitoringMeta.Filter(
metrics.MetaSourceImageURL: metricsMeta[metrics.MetaSourceImageURL], monitoring.MetaSourceImageURL,
metrics.MetaSourceImageOrigin: metricsMeta[metrics.MetaSourceImageOrigin], monitoring.MetaSourceImageOrigin,
}) ))
downloadOpts := imagedata.DownloadOptions{ downloadOpts := imagedata.DownloadOptions{
Header: imgRequestHeader, Header: imgRequestHeader,
@@ -399,7 +399,7 @@ func handleProcessing(reqID string, rw http.ResponseWriter, r *http.Request) err
} }
// Just send error // Just send error
metrics.SendError(ctx, categoryDownload, ierr) monitoring.SendError(ctx, categoryDownload, ierr)
// We didn't return, so we have to report error // We didn't return, so we have to report error
if ierr.ShouldReport() { if ierr.ShouldReport() {
@@ -452,9 +452,7 @@ func handleProcessing(reqID string, rw http.ResponseWriter, r *http.Request) err
} }
result, err := func() (*processing.Result, error) { result, err := func() (*processing.Result, error) {
defer metrics.StartProcessingSegment(ctx, metrics.Meta{ defer monitoring.StartProcessingSegment(ctx, monitoringMeta.Filter(monitoring.MetaProcessingOptions))()
metrics.MetaProcessingOptions: metricsMeta[metrics.MetaProcessingOptions],
})()
return processing.ProcessImage(ctx, originData, po) return processing.ProcessImage(ctx, originData, po)
}() }()

View File

@@ -10,22 +10,22 @@ import (
"github.com/imgproxy/imgproxy/v3/errorreport" "github.com/imgproxy/imgproxy/v3/errorreport"
"github.com/imgproxy/imgproxy/v3/httpheaders" "github.com/imgproxy/imgproxy/v3/httpheaders"
"github.com/imgproxy/imgproxy/v3/ierrors" "github.com/imgproxy/imgproxy/v3/ierrors"
"github.com/imgproxy/imgproxy/v3/metrics" "github.com/imgproxy/imgproxy/v3/monitoring"
) )
const ( const (
categoryTimeout = "timeout" categoryTimeout = "timeout"
) )
// WithMetrics wraps RouteHandler with metrics handling. // WithMonitoring wraps RouteHandler with monitoring handling.
func (r *Router) WithMetrics(h RouteHandler) RouteHandler { func (r *Router) WithMonitoring(h RouteHandler) RouteHandler {
if !metrics.Enabled() { if !monitoring.Enabled() {
return h return h
} }
return func(reqID string, rw http.ResponseWriter, req *http.Request) error { return func(reqID string, rw http.ResponseWriter, req *http.Request) error {
ctx, metricsCancel, rw := metrics.StartRequest(req.Context(), rw, req) ctx, cancel, rw := monitoring.StartRequest(req.Context(), rw, req)
defer metricsCancel() defer cancel()
return h(reqID, rw, req.WithContext(ctx)) return h(reqID, rw, req.WithContext(ctx))
} }
@@ -92,7 +92,7 @@ func (r *Router) WithPanic(h RouteHandler) RouteHandler {
} }
// WithReportError handles error reporting. // WithReportError handles error reporting.
// It should be placed after `WithMetrics`, but before `WithPanic`. // It should be placed after `WithMonitoring`, but before `WithPanic`.
func (r *Router) WithReportError(h RouteHandler) RouteHandler { func (r *Router) WithReportError(h RouteHandler) RouteHandler {
return func(reqID string, rw http.ResponseWriter, req *http.Request) error { return func(reqID string, rw http.ResponseWriter, req *http.Request) error {
// Open the error context // Open the error context
@@ -119,7 +119,7 @@ func (r *Router) WithReportError(h RouteHandler) RouteHandler {
// We do not need to send any canceled context // We do not need to send any canceled context
if !errors.Is(ierr, context.Canceled) { if !errors.Is(ierr, context.Canceled) {
metrics.SendError(ctx, errCat, err) monitoring.SendError(ctx, errCat, err)
} }
// Report error to error collectors // Report error to error collectors

View File

@@ -14,8 +14,8 @@ import (
"github.com/imgproxy/imgproxy/v3/httpheaders" "github.com/imgproxy/imgproxy/v3/httpheaders"
"github.com/imgproxy/imgproxy/v3/ierrors" "github.com/imgproxy/imgproxy/v3/ierrors"
"github.com/imgproxy/imgproxy/v3/imagedata" "github.com/imgproxy/imgproxy/v3/imagedata"
"github.com/imgproxy/imgproxy/v3/metrics" "github.com/imgproxy/imgproxy/v3/monitoring"
"github.com/imgproxy/imgproxy/v3/metrics/stats" "github.com/imgproxy/imgproxy/v3/monitoring/stats"
"github.com/imgproxy/imgproxy/v3/options" "github.com/imgproxy/imgproxy/v3/options"
"github.com/imgproxy/imgproxy/v3/server" "github.com/imgproxy/imgproxy/v3/server"
) )
@@ -48,8 +48,7 @@ var (
func streamOriginImage(ctx context.Context, reqID string, r *http.Request, rw http.ResponseWriter, po *options.ProcessingOptions, imageURL string) error { func streamOriginImage(ctx context.Context, reqID string, r *http.Request, rw http.ResponseWriter, po *options.ProcessingOptions, imageURL string) error {
stats.IncImagesInProgress() stats.IncImagesInProgress()
defer stats.DecImagesInProgress() defer stats.DecImagesInProgress()
defer monitoring.StartStreamingSegment(ctx)()
defer metrics.StartStreamingSegment(ctx)()
var ( var (
cookieJar http.CookieJar cookieJar http.CookieJar

View File

@@ -27,11 +27,11 @@ import (
"github.com/imgproxy/imgproxy/v3/ierrors" "github.com/imgproxy/imgproxy/v3/ierrors"
"github.com/imgproxy/imgproxy/v3/imagedata" "github.com/imgproxy/imgproxy/v3/imagedata"
"github.com/imgproxy/imgproxy/v3/imagetype" "github.com/imgproxy/imgproxy/v3/imagetype"
"github.com/imgproxy/imgproxy/v3/metrics/cloudwatch" "github.com/imgproxy/imgproxy/v3/monitoring/cloudwatch"
"github.com/imgproxy/imgproxy/v3/metrics/datadog" "github.com/imgproxy/imgproxy/v3/monitoring/datadog"
"github.com/imgproxy/imgproxy/v3/metrics/newrelic" "github.com/imgproxy/imgproxy/v3/monitoring/newrelic"
"github.com/imgproxy/imgproxy/v3/metrics/otel" "github.com/imgproxy/imgproxy/v3/monitoring/otel"
"github.com/imgproxy/imgproxy/v3/metrics/prometheus" "github.com/imgproxy/imgproxy/v3/monitoring/prometheus"
) )
type Image struct { type Image struct {