mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-17 11:48:42 +02:00
Compare commits
2 Commits
v0.2.31
...
agent/gpt-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
04c02d4121 | ||
|
|
a418877856 |
@@ -22,6 +22,11 @@ DATABASE_URL=postgres://multica:multica@localhost:5432/multica?sslmode=disable
|
||||
# See SELF_HOSTING.md for the full login setup.
|
||||
APP_ENV=
|
||||
PORT=8080
|
||||
# Prometheus metrics are disabled by default. When enabled, bind to loopback
|
||||
# unless you protect the listener with private networking, allowlists, or
|
||||
# proxy auth. Do not expose this endpoint through the public app/API ingress.
|
||||
# HTTP request metrics start accumulating only when this listener is enabled.
|
||||
# METRICS_ADDR=127.0.0.1:9090
|
||||
JWT_SECRET=change-me-in-production
|
||||
MULTICA_SERVER_URL=ws://localhost:8080/ws
|
||||
MULTICA_APP_URL=http://localhost:3000
|
||||
|
||||
@@ -15,7 +15,7 @@ COPY server/ ./server/
|
||||
# Build binaries
|
||||
ARG VERSION=dev
|
||||
ARG COMMIT=unknown
|
||||
RUN cd server && CGO_ENABLED=0 go build -ldflags "-s -w" -o bin/server ./cmd/server
|
||||
RUN cd server && CGO_ENABLED=0 go build -ldflags "-s -w -X main.version=${VERSION} -X main.commit=${COMMIT}" -o bin/server ./cmd/server
|
||||
RUN cd server && CGO_ENABLED=0 go build -ldflags "-s -w -X main.version=${VERSION} -X main.commit=${COMMIT}" -o bin/multica ./cmd/multica
|
||||
RUN cd server && CGO_ENABLED=0 go build -ldflags "-s -w" -o bin/migrate ./cmd/migrate
|
||||
|
||||
|
||||
2
Makefile
2
Makefile
@@ -277,7 +277,7 @@ COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
|
||||
DATE ?= $(shell date -u '+%Y-%m-%dT%H:%M:%SZ')
|
||||
|
||||
build: ## Build the server, CLI, and migrate binaries into server/bin
|
||||
cd server && go build -o bin/server ./cmd/server
|
||||
cd server && go build -ldflags "-X main.version=$(VERSION) -X main.commit=$(COMMIT)" -o bin/server ./cmd/server
|
||||
cd server && go build -ldflags "-X main.version=$(VERSION) -X main.commit=$(COMMIT) -X main.date=$(DATE)" -o bin/multica ./cmd/multica
|
||||
cd server && go build -o bin/migrate ./cmd/migrate
|
||||
|
||||
|
||||
@@ -79,6 +79,7 @@ The `Secure` flag on session cookies is derived automatically from the scheme of
|
||||
| Variable | Default | Description |
|
||||
|----------|---------|-------------|
|
||||
| `PORT` | `8080` | Backend server port |
|
||||
| `METRICS_ADDR` | empty | Optional Prometheus metrics listener, for example `127.0.0.1:9090` |
|
||||
| `FRONTEND_PORT` | `3000` | Frontend port |
|
||||
| `CORS_ALLOWED_ORIGINS` | Value of `FRONTEND_ORIGIN` | Comma-separated list of allowed origins |
|
||||
| `LOG_LEVEL` | `info` | Log level: `debug`, `info`, `warn`, `error` |
|
||||
@@ -308,6 +309,28 @@ dependency-aware readiness probes and external monitoring that should fail when
|
||||
the database is unavailable or migrations are not fully applied. `/healthz` is
|
||||
kept as an alias for operator familiarity.
|
||||
|
||||
## Prometheus Metrics
|
||||
|
||||
The backend can expose Prometheus metrics on a separate management listener:
|
||||
|
||||
```bash
|
||||
METRICS_ADDR=127.0.0.1:9090 ./server/bin/server
|
||||
curl http://127.0.0.1:9090/metrics
|
||||
```
|
||||
|
||||
`METRICS_ADDR` is empty by default, so no metrics listener is started. The
|
||||
public API port does not serve `/metrics`; keep it that way for internet-facing
|
||||
deployments. HTTP request metrics start accumulating only after the metrics
|
||||
listener is enabled. Metrics can reveal internal routes, traffic volume,
|
||||
dependency state, and runtime health.
|
||||
|
||||
For Docker or Kubernetes deployments, prefer a private scrape path: bind the
|
||||
metrics listener to an internal interface and protect it with private
|
||||
networking, allowlists, NetworkPolicy, or proxy authentication. If you bind
|
||||
`METRICS_ADDR=0.0.0.0:9090` inside a container, only publish that port to a
|
||||
trusted network, for example a host-local mapping such as
|
||||
`127.0.0.1:9090:9090`.
|
||||
|
||||
## Upgrading
|
||||
|
||||
```bash
|
||||
|
||||
@@ -40,6 +40,7 @@ services:
|
||||
environment:
|
||||
DATABASE_URL: postgres://${POSTGRES_USER:-multica}:${POSTGRES_PASSWORD:-multica}@postgres:5432/${POSTGRES_DB:-multica}?sslmode=disable
|
||||
PORT: "8080"
|
||||
METRICS_ADDR: ${METRICS_ADDR:-}
|
||||
JWT_SECRET: ${JWT_SECRET:-change-me-in-production}
|
||||
FRONTEND_ORIGIN: ${FRONTEND_ORIGIN:-http://localhost:3000}
|
||||
CORS_ALLOWED_ORIGINS: ${CORS_ALLOWED_ORIGINS:-}
|
||||
|
||||
@@ -14,12 +14,18 @@ import (
|
||||
"github.com/multica-ai/multica/server/internal/analytics"
|
||||
"github.com/multica-ai/multica/server/internal/events"
|
||||
"github.com/multica-ai/multica/server/internal/logger"
|
||||
obsmetrics "github.com/multica-ai/multica/server/internal/metrics"
|
||||
"github.com/multica-ai/multica/server/internal/realtime"
|
||||
"github.com/multica-ai/multica/server/internal/service"
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
var (
|
||||
version = "dev"
|
||||
commit = "unknown"
|
||||
)
|
||||
|
||||
func newNamedRedisClient(base *redis.Options, suffix string) *redis.Client {
|
||||
opts := *base
|
||||
opts.ClientName = redisClientName(opts.ClientName, suffix)
|
||||
@@ -233,7 +239,29 @@ func main() {
|
||||
registerActivityListeners(bus, queries)
|
||||
registerNotificationListeners(bus, queries)
|
||||
|
||||
r := NewRouter(pool, hub, bus, analyticsClient, storeRedis)
|
||||
metricsConfig := obsmetrics.ConfigFromEnv()
|
||||
var metricsServer *http.Server
|
||||
var httpMetrics *obsmetrics.HTTPMetrics
|
||||
if metricsConfig.Enabled() {
|
||||
metricsRegistry := obsmetrics.NewRegistry(obsmetrics.RegistryOptions{
|
||||
Pool: pool,
|
||||
Realtime: realtime.M,
|
||||
Version: version,
|
||||
Commit: commit,
|
||||
})
|
||||
httpMetrics = metricsRegistry.HTTP
|
||||
metricsServer = obsmetrics.NewServer(metricsConfig.Addr, metricsRegistry.Gatherer)
|
||||
if !obsmetrics.IsLoopbackAddr(metricsConfig.Addr) {
|
||||
slog.Warn(
|
||||
"metrics listener is not loopback-only; restrict access with private networking, allowlists, or proxy auth",
|
||||
"addr", metricsConfig.Addr,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
r := NewRouterWithOptions(pool, hub, bus, analyticsClient, storeRedis, RouterOptions{
|
||||
HTTPMetrics: httpMetrics,
|
||||
})
|
||||
|
||||
srv := &http.Server{
|
||||
Addr: ":" + port,
|
||||
@@ -252,7 +280,15 @@ func main() {
|
||||
go runAutopilotScheduler(autopilotCtx, queries, autopilotSvc)
|
||||
go runDBStatsLogger(sweepCtx, pool)
|
||||
|
||||
// Graceful shutdown
|
||||
if metricsServer != nil {
|
||||
go func() {
|
||||
slog.Info("metrics server starting", "addr", metricsConfig.Addr)
|
||||
if err := metricsServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||||
slog.Error("metrics server disabled after startup error", "error", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
go func() {
|
||||
slog.Info("server starting", "port", port)
|
||||
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||||
@@ -268,12 +304,21 @@ func main() {
|
||||
slog.Info("shutting down server")
|
||||
sweepCancel()
|
||||
autopilotCancel()
|
||||
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if err := srv.Shutdown(shutdownCtx); err != nil {
|
||||
apiShutdownCtx, apiShutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
if err := srv.Shutdown(apiShutdownCtx); err != nil {
|
||||
apiShutdownCancel()
|
||||
slog.Error("server forced to shutdown", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
apiShutdownCancel()
|
||||
|
||||
if metricsServer != nil {
|
||||
metricsShutdownCtx, metricsShutdownCancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
if err := metricsServer.Shutdown(metricsShutdownCtx); err != nil {
|
||||
slog.Error("metrics server forced to shutdown", "error", err)
|
||||
}
|
||||
metricsShutdownCancel()
|
||||
}
|
||||
slog.Info("server stopped")
|
||||
}
|
||||
|
||||
23
server/cmd/server/metrics_test.go
Normal file
23
server/cmd/server/metrics_test.go
Normal file
@@ -0,0 +1,23 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/multica-ai/multica/server/internal/analytics"
|
||||
"github.com/multica-ai/multica/server/internal/events"
|
||||
"github.com/multica-ai/multica/server/internal/realtime"
|
||||
)
|
||||
|
||||
func TestMainRouterDoesNotExposePrometheusMetrics(t *testing.T) {
|
||||
router := NewRouter(nil, realtime.NewHub(), events.New(), analytics.NoopClient{}, nil)
|
||||
|
||||
rec := httptest.NewRecorder()
|
||||
req := httptest.NewRequest(http.MethodGet, "/metrics", nil)
|
||||
router.ServeHTTP(rec, req)
|
||||
|
||||
if rec.Code != http.StatusNotFound {
|
||||
t.Fatalf("main API /metrics status = %d, want %d", rec.Code, http.StatusNotFound)
|
||||
}
|
||||
}
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
"github.com/multica-ai/multica/server/internal/auth"
|
||||
"github.com/multica-ai/multica/server/internal/events"
|
||||
"github.com/multica-ai/multica/server/internal/handler"
|
||||
obsmetrics "github.com/multica-ai/multica/server/internal/metrics"
|
||||
"github.com/multica-ai/multica/server/internal/middleware"
|
||||
"github.com/multica-ai/multica/server/internal/realtime"
|
||||
"github.com/multica-ai/multica/server/internal/service"
|
||||
@@ -62,6 +63,14 @@ func allowedOrigins() []string {
|
||||
// keeps the default in-memory stores which are fine for single-node dev and
|
||||
// tests.
|
||||
func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus, analyticsClient analytics.Client, rdb *redis.Client) chi.Router {
|
||||
return NewRouterWithOptions(pool, hub, bus, analyticsClient, rdb, RouterOptions{})
|
||||
}
|
||||
|
||||
type RouterOptions struct {
|
||||
HTTPMetrics *obsmetrics.HTTPMetrics
|
||||
}
|
||||
|
||||
func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus, analyticsClient analytics.Client, rdb *redis.Client, opts RouterOptions) chi.Router {
|
||||
queries := db.New(pool)
|
||||
emailSvc := service.NewEmailService()
|
||||
|
||||
@@ -97,6 +106,9 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus, analytics
|
||||
r.Use(chimw.RequestID)
|
||||
r.Use(middleware.ClientMetadata)
|
||||
r.Use(middleware.RequestLogger)
|
||||
if opts.HTTPMetrics != nil {
|
||||
r.Use(opts.HTTPMetrics.Middleware)
|
||||
}
|
||||
r.Use(chimw.Recoverer)
|
||||
r.Use(middleware.ContentSecurityPolicy)
|
||||
origins := allowedOrigins()
|
||||
|
||||
@@ -16,6 +16,7 @@ require (
|
||||
github.com/jackc/pgx/v5 v5.8.0
|
||||
github.com/lmittmann/tint v1.1.3
|
||||
github.com/oklog/ulid/v2 v2.1.1
|
||||
github.com/prometheus/client_golang v1.23.2
|
||||
github.com/redis/go-redis/v9 v9.18.0
|
||||
github.com/resend/resend-go/v2 v2.28.0
|
||||
github.com/robfig/cron/v3 v3.0.1
|
||||
@@ -38,14 +39,23 @@ require (
|
||||
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.18 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.41.10 // indirect
|
||||
github.com/aws/smithy-go v1.24.2 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
||||
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
||||
github.com/kr/text v0.2.0 // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/prometheus/client_model v0.6.2 // indirect
|
||||
github.com/prometheus/common v0.66.1 // indirect
|
||||
github.com/prometheus/procfs v0.16.1 // indirect
|
||||
github.com/spf13/pflag v1.0.9 // indirect
|
||||
go.uber.org/atomic v1.11.0 // indirect
|
||||
go.yaml.in/yaml/v2 v2.4.2 // indirect
|
||||
golang.org/x/sync v0.20.0 // indirect
|
||||
golang.org/x/sys v0.35.0 // indirect
|
||||
golang.org/x/text v0.35.0 // indirect
|
||||
google.golang.org/protobuf v1.36.8 // indirect
|
||||
)
|
||||
|
||||
@@ -38,6 +38,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.41.10 h1:p8ogvvLugcR/zLBXTXrTkj0RYBU
|
||||
github.com/aws/aws-sdk-go-v2/service/sts v1.41.10/go.mod h1:60dv0eZJfeVXfbT1tFJinbHrDfSJ2GZl4Q//OSSNAVw=
|
||||
github.com/aws/smithy-go v1.24.2 h1:FzA3bu/nt/vDvmnkg+R8Xl46gmzEDam6mZ1hzmwXFng=
|
||||
github.com/aws/smithy-go v1.24.2/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc=
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
|
||||
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
|
||||
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
|
||||
@@ -45,6 +47,7 @@ github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
|
||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
@@ -56,6 +59,8 @@ github.com/go-chi/cors v1.2.2 h1:Jmey33TE+b+rB7fT8MUy1u0I4L+NARQlK6LhzKPSyQE=
|
||||
github.com/go-chi/cors v1.2.2/go.mod h1:sSbTewc+6wYHBBCW7ytsFSn836hqM7JxpglAy2Vzc58=
|
||||
github.com/golang-jwt/jwt/v5 v5.3.1 h1:kYf81DTWFe7t+1VvL7eS+jKFVWaUnK9cB1qbwn63YCY=
|
||||
github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE=
|
||||
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
|
||||
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
|
||||
@@ -70,21 +75,41 @@ github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo=
|
||||
github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw=
|
||||
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
|
||||
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
||||
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
|
||||
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
|
||||
github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
|
||||
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
|
||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
|
||||
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
|
||||
github.com/lmittmann/tint v1.1.3 h1:Hv4EaHWXQr+GTFnOU4VKf8UvAtZgn0VuKT+G0wFlO3I=
|
||||
github.com/lmittmann/tint v1.1.3/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE=
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
|
||||
github.com/oklog/ulid/v2 v2.1.1 h1:suPZ4ARWLOJLegGFiZZ1dFAkqzhMjL3J1TzI+5wHz8s=
|
||||
github.com/oklog/ulid/v2 v2.1.1/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
|
||||
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o=
|
||||
github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg=
|
||||
github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
|
||||
github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=
|
||||
github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs=
|
||||
github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA=
|
||||
github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg=
|
||||
github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
|
||||
github.com/redis/go-redis/v9 v9.18.0 h1:pMkxYPkEbMPwRdenAzUNyFNrDgHx9U+DrBabWNfSRQs=
|
||||
github.com/redis/go-redis/v9 v9.18.0/go.mod h1:k3ufPphLU5YXwNTUcCRXGxUoF1fqxnhFQmscfkCoDA0=
|
||||
github.com/resend/resend-go/v2 v2.28.0 h1:ttM1/VZR4fApBv3xI1TneSKi1pbfFsVrq7fXFlHKtj4=
|
||||
github.com/resend/resend-go/v2 v2.28.0/go.mod h1:3YCb8c8+pLiqhtRFXTyFwlLvfjQtluxOr9HEh2BwCkQ=
|
||||
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
|
||||
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
|
||||
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
|
||||
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
|
||||
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
|
||||
github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU=
|
||||
github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiTUUS4=
|
||||
@@ -99,12 +124,22 @@ github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
|
||||
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
|
||||
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
|
||||
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI=
|
||||
go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
|
||||
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
|
||||
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
|
||||
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
|
||||
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
|
||||
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||
golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8=
|
||||
golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA=
|
||||
google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc=
|
||||
google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
||||
35
server/internal/metrics/config.go
Normal file
35
server/internal/metrics/config.go
Normal file
@@ -0,0 +1,35 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Addr string
|
||||
}
|
||||
|
||||
func ConfigFromEnv() Config {
|
||||
return Config{Addr: strings.TrimSpace(os.Getenv("METRICS_ADDR"))}
|
||||
}
|
||||
|
||||
func (c Config) Enabled() bool {
|
||||
return strings.TrimSpace(c.Addr) != ""
|
||||
}
|
||||
|
||||
func IsLoopbackAddr(addr string) bool {
|
||||
host, _, err := net.SplitHostPort(strings.TrimSpace(addr))
|
||||
if err != nil {
|
||||
host = strings.TrimSpace(addr)
|
||||
}
|
||||
host = strings.Trim(host, "[]")
|
||||
if host == "" {
|
||||
return false
|
||||
}
|
||||
if strings.EqualFold(host, "localhost") {
|
||||
return true
|
||||
}
|
||||
ip := net.ParseIP(host)
|
||||
return ip != nil && ip.IsLoopback()
|
||||
}
|
||||
27
server/internal/metrics/config_test.go
Normal file
27
server/internal/metrics/config_test.go
Normal file
@@ -0,0 +1,27 @@
|
||||
package metrics
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestIsLoopbackAddr(t *testing.T) {
|
||||
tests := []struct {
|
||||
addr string
|
||||
want bool
|
||||
}{
|
||||
{"127.0.0.1:9090", true},
|
||||
{"localhost:9090", true},
|
||||
{"[::1]:9090", true},
|
||||
{":9090", false},
|
||||
{"0.0.0.0:9090", false},
|
||||
{"10.0.0.5:9090", false},
|
||||
{"metrics.example.com:9090", false},
|
||||
{"", false},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.addr, func(t *testing.T) {
|
||||
if got := IsLoopbackAddr(tt.addr); got != tt.want {
|
||||
t.Fatalf("IsLoopbackAddr(%q) = %v, want %v", tt.addr, got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
88
server/internal/metrics/db.go
Normal file
88
server/internal/metrics/db.go
Normal file
@@ -0,0 +1,88 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type DBCollector struct {
|
||||
pool *pgxpool.Pool
|
||||
|
||||
acquiredConns *prometheus.Desc
|
||||
idleConns *prometheus.Desc
|
||||
maxConns *prometheus.Desc
|
||||
totalConns *prometheus.Desc
|
||||
constructingConns *prometheus.Desc
|
||||
acquireCount *prometheus.Desc
|
||||
acquireDuration *prometheus.Desc
|
||||
emptyAcquireCount *prometheus.Desc
|
||||
emptyAcquireWaitTime *prometheus.Desc
|
||||
canceledAcquireCount *prometheus.Desc
|
||||
newConnsCount *prometheus.Desc
|
||||
maxIdleDestroyCount *prometheus.Desc
|
||||
maxLifetimeDestroyCnt *prometheus.Desc
|
||||
}
|
||||
|
||||
func NewDBCollector(pool *pgxpool.Pool) *DBCollector {
|
||||
return &DBCollector{
|
||||
pool: pool,
|
||||
|
||||
acquiredConns: newDBDesc("acquired_conns", "Currently acquired PostgreSQL connections."),
|
||||
idleConns: newDBDesc("idle_conns", "Currently idle PostgreSQL connections."),
|
||||
maxConns: newDBDesc("max_conns", "Maximum PostgreSQL connections allowed by the pool."),
|
||||
totalConns: newDBDesc("total_conns", "Total PostgreSQL connections currently in the pool."),
|
||||
constructingConns: newDBDesc("constructing_conns", "PostgreSQL connections currently being established."),
|
||||
acquireCount: newDBDesc("acquire_count", "Total successful PostgreSQL connection acquires."),
|
||||
acquireDuration: newDBDesc("acquire_duration_seconds_total", "Total time spent acquiring PostgreSQL connections."),
|
||||
emptyAcquireCount: newDBDesc("empty_acquire_count", "Total acquires that waited because the PostgreSQL pool was empty."),
|
||||
emptyAcquireWaitTime: newDBDesc("empty_acquire_wait_seconds_total", "Total time spent waiting for PostgreSQL connections when the pool was empty."),
|
||||
canceledAcquireCount: newDBDesc("canceled_acquire_count", "Total canceled PostgreSQL connection acquires."),
|
||||
newConnsCount: newDBDesc("new_conns_count", "Total PostgreSQL connections created by the pool."),
|
||||
maxIdleDestroyCount: newDBDesc("max_idle_destroy_count", "Total PostgreSQL connections destroyed due to idle limits."),
|
||||
maxLifetimeDestroyCnt: newDBDesc("max_lifetime_destroy_count", "Total PostgreSQL connections destroyed due to max lifetime."),
|
||||
}
|
||||
}
|
||||
|
||||
func newDBDesc(name, help string) *prometheus.Desc {
|
||||
return prometheus.NewDesc("multica_db_pool_"+name, help, nil, nil)
|
||||
}
|
||||
|
||||
func (c *DBCollector) Describe(ch chan<- *prometheus.Desc) {
|
||||
for _, desc := range []*prometheus.Desc{
|
||||
c.acquiredConns,
|
||||
c.idleConns,
|
||||
c.maxConns,
|
||||
c.totalConns,
|
||||
c.constructingConns,
|
||||
c.acquireCount,
|
||||
c.acquireDuration,
|
||||
c.emptyAcquireCount,
|
||||
c.emptyAcquireWaitTime,
|
||||
c.canceledAcquireCount,
|
||||
c.newConnsCount,
|
||||
c.maxIdleDestroyCount,
|
||||
c.maxLifetimeDestroyCnt,
|
||||
} {
|
||||
ch <- desc
|
||||
}
|
||||
}
|
||||
|
||||
func (c *DBCollector) Collect(ch chan<- prometheus.Metric) {
|
||||
if c.pool == nil {
|
||||
return
|
||||
}
|
||||
stat := c.pool.Stat()
|
||||
ch <- prometheus.MustNewConstMetric(c.acquiredConns, prometheus.GaugeValue, float64(stat.AcquiredConns()))
|
||||
ch <- prometheus.MustNewConstMetric(c.idleConns, prometheus.GaugeValue, float64(stat.IdleConns()))
|
||||
ch <- prometheus.MustNewConstMetric(c.maxConns, prometheus.GaugeValue, float64(stat.MaxConns()))
|
||||
ch <- prometheus.MustNewConstMetric(c.totalConns, prometheus.GaugeValue, float64(stat.TotalConns()))
|
||||
ch <- prometheus.MustNewConstMetric(c.constructingConns, prometheus.GaugeValue, float64(stat.ConstructingConns()))
|
||||
ch <- prometheus.MustNewConstMetric(c.acquireCount, prometheus.CounterValue, float64(stat.AcquireCount()))
|
||||
ch <- prometheus.MustNewConstMetric(c.acquireDuration, prometheus.CounterValue, stat.AcquireDuration().Seconds())
|
||||
ch <- prometheus.MustNewConstMetric(c.emptyAcquireCount, prometheus.CounterValue, float64(stat.EmptyAcquireCount()))
|
||||
ch <- prometheus.MustNewConstMetric(c.emptyAcquireWaitTime, prometheus.CounterValue, stat.EmptyAcquireWaitTime().Seconds())
|
||||
ch <- prometheus.MustNewConstMetric(c.canceledAcquireCount, prometheus.CounterValue, float64(stat.CanceledAcquireCount()))
|
||||
ch <- prometheus.MustNewConstMetric(c.newConnsCount, prometheus.CounterValue, float64(stat.NewConnsCount()))
|
||||
ch <- prometheus.MustNewConstMetric(c.maxIdleDestroyCount, prometheus.CounterValue, float64(stat.MaxIdleDestroyCount()))
|
||||
ch <- prometheus.MustNewConstMetric(c.maxLifetimeDestroyCnt, prometheus.CounterValue, float64(stat.MaxLifetimeDestroyCount()))
|
||||
}
|
||||
35
server/internal/metrics/db_test.go
Normal file
35
server/internal/metrics/db_test.go
Normal file
@@ -0,0 +1,35 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
func TestDBCollectorExposesPoolStats(t *testing.T) {
|
||||
pool, err := pgxpool.New(context.Background(), "postgres://multica:multica@127.0.0.1:1/multica?sslmode=disable")
|
||||
if err != nil {
|
||||
t.Fatalf("create pool: %v", err)
|
||||
}
|
||||
defer pool.Close()
|
||||
|
||||
registry := NewRegistry(RegistryOptions{Pool: pool})
|
||||
rec := httptest.NewRecorder()
|
||||
NewHandler(registry.Gatherer).ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/metrics", nil))
|
||||
body := rec.Body.String()
|
||||
|
||||
for _, want := range []string{
|
||||
"multica_db_pool_acquired_conns",
|
||||
"multica_db_pool_idle_conns",
|
||||
"multica_db_pool_max_conns",
|
||||
"multica_db_pool_acquire_duration_seconds_total",
|
||||
} {
|
||||
if !strings.Contains(body, want) {
|
||||
t.Fatalf("metrics body missing %q\n%s", want, body)
|
||||
}
|
||||
}
|
||||
}
|
||||
94
server/internal/metrics/http.go
Normal file
94
server/internal/metrics/http.go
Normal file
@@ -0,0 +1,94 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
chimw "github.com/go-chi/chi/v5/middleware"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type HTTPMetrics struct {
|
||||
requests *prometheus.CounterVec
|
||||
duration *prometheus.HistogramVec
|
||||
inFlight prometheus.Gauge
|
||||
}
|
||||
|
||||
func NewHTTPMetrics() *HTTPMetrics {
|
||||
return &HTTPMetrics{
|
||||
requests: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "multica",
|
||||
Subsystem: "http",
|
||||
Name: "requests_total",
|
||||
Help: "Total HTTP requests served by the API server.",
|
||||
}, []string{"method", "route", "status"}),
|
||||
duration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: "multica",
|
||||
Subsystem: "http",
|
||||
Name: "request_duration_seconds",
|
||||
Help: "HTTP request duration observed by the API server.",
|
||||
Buckets: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10},
|
||||
}, []string{"method", "route", "status"}),
|
||||
inFlight: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: "multica",
|
||||
Subsystem: "http",
|
||||
Name: "in_flight_requests",
|
||||
Help: "Current number of in-flight HTTP requests served by the API server.",
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *HTTPMetrics) Collectors() []prometheus.Collector {
|
||||
return []prometheus.Collector{m.requests, m.duration, m.inFlight}
|
||||
}
|
||||
|
||||
func (m *HTTPMetrics) Middleware(next http.Handler) http.Handler {
|
||||
if m == nil {
|
||||
return next
|
||||
}
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if isHealthProbePath(r.URL.Path) {
|
||||
next.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
m.inFlight.Inc()
|
||||
defer m.inFlight.Dec()
|
||||
|
||||
start := time.Now()
|
||||
ww := chimw.NewWrapResponseWriter(w, r.ProtoMajor)
|
||||
next.ServeHTTP(ww, r)
|
||||
|
||||
status := ww.Status()
|
||||
if status == 0 {
|
||||
status = http.StatusOK
|
||||
}
|
||||
labels := prometheus.Labels{
|
||||
"method": r.Method,
|
||||
"route": routePattern(r),
|
||||
"status": strconv.Itoa(status),
|
||||
}
|
||||
m.requests.With(labels).Inc()
|
||||
m.duration.With(labels).Observe(time.Since(start).Seconds())
|
||||
})
|
||||
}
|
||||
|
||||
func routePattern(r *http.Request) string {
|
||||
if rctx := chi.RouteContext(r.Context()); rctx != nil {
|
||||
if pattern := rctx.RoutePattern(); pattern != "" {
|
||||
return pattern
|
||||
}
|
||||
}
|
||||
return "unmatched"
|
||||
}
|
||||
|
||||
func isHealthProbePath(path string) bool {
|
||||
switch path {
|
||||
case "/health", "/healthz", "/readyz":
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
100
server/internal/metrics/http_test.go
Normal file
100
server/internal/metrics/http_test.go
Normal file
@@ -0,0 +1,100 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
)
|
||||
|
||||
func TestHTTPMiddlewareUsesRoutePatternLabels(t *testing.T) {
|
||||
registry := NewRegistry(RegistryOptions{
|
||||
Version: "v-test",
|
||||
Commit: "abc123",
|
||||
})
|
||||
|
||||
r := chi.NewRouter()
|
||||
r.Use(registry.HTTP.Middleware)
|
||||
r.Get("/api/issues/{id}", func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.WriteHeader(http.StatusCreated)
|
||||
_, _ = w.Write([]byte("ok"))
|
||||
})
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "/api/issues/secret-issue-id?token=secret-token", nil)
|
||||
rec := httptest.NewRecorder()
|
||||
r.ServeHTTP(rec, req)
|
||||
if rec.Code != http.StatusCreated {
|
||||
t.Fatalf("request status = %d, want %d", rec.Code, http.StatusCreated)
|
||||
}
|
||||
|
||||
metricsRec := httptest.NewRecorder()
|
||||
NewHandler(registry.Gatherer).ServeHTTP(metricsRec, httptest.NewRequest(http.MethodGet, "/metrics", nil))
|
||||
body := metricsRec.Body.String()
|
||||
|
||||
for _, want := range []string{
|
||||
`multica_http_requests_total{method="GET",route="/api/issues/{id}",status="201"} 1`,
|
||||
`multica_build_info{commit="abc123",version="v-test"} 1`,
|
||||
} {
|
||||
if !strings.Contains(body, want) {
|
||||
t.Fatalf("metrics body missing %q\n%s", want, body)
|
||||
}
|
||||
}
|
||||
for _, leaked := range []string{"secret-issue-id", "secret-token"} {
|
||||
if strings.Contains(body, leaked) {
|
||||
t.Fatalf("metrics body leaked %q\n%s", leaked, body)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetricsHandlerOnlyServesMetricsPath(t *testing.T) {
|
||||
registry := NewRegistry(RegistryOptions{})
|
||||
handler := NewHandler(registry.Gatherer)
|
||||
|
||||
rec := httptest.NewRecorder()
|
||||
handler.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/metrics", nil))
|
||||
if rec.Code != http.StatusOK {
|
||||
t.Fatalf("/metrics status = %d, want %d", rec.Code, http.StatusOK)
|
||||
}
|
||||
if body, _ := io.ReadAll(rec.Body); !strings.Contains(string(body), "multica_build_info") {
|
||||
t.Fatalf("/metrics body missing build info: %s", body)
|
||||
}
|
||||
|
||||
rec = httptest.NewRecorder()
|
||||
handler.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/health", nil))
|
||||
if rec.Code != http.StatusNotFound {
|
||||
t.Fatalf("/health status = %d, want %d", rec.Code, http.StatusNotFound)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHTTPMiddlewareSkipsHealthProbePaths(t *testing.T) {
|
||||
registry := NewRegistry(RegistryOptions{})
|
||||
|
||||
r := chi.NewRouter()
|
||||
r.Use(registry.HTTP.Middleware)
|
||||
r.Get("/health", func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
})
|
||||
r.Get("/readyz", func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
})
|
||||
|
||||
for _, path := range []string{"/health", "/readyz"} {
|
||||
rec := httptest.NewRecorder()
|
||||
r.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, path, nil))
|
||||
if rec.Code != http.StatusOK {
|
||||
t.Fatalf("%s status = %d, want %d", path, rec.Code, http.StatusOK)
|
||||
}
|
||||
}
|
||||
|
||||
metricsRec := httptest.NewRecorder()
|
||||
NewHandler(registry.Gatherer).ServeHTTP(metricsRec, httptest.NewRequest(http.MethodGet, "/metrics", nil))
|
||||
body := metricsRec.Body.String()
|
||||
for _, skippedRoute := range []string{`route="/health"`, `route="/readyz"`} {
|
||||
if strings.Contains(body, skippedRoute) {
|
||||
t.Fatalf("metrics body contains skipped health route %q\n%s", skippedRoute, body)
|
||||
}
|
||||
}
|
||||
}
|
||||
101
server/internal/metrics/realtime.go
Normal file
101
server/internal/metrics/realtime.go
Normal file
@@ -0,0 +1,101 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/multica-ai/multica/server/internal/realtime"
|
||||
)
|
||||
|
||||
type RealtimeCollector struct {
|
||||
metrics *realtime.Metrics
|
||||
|
||||
connectsTotal *prometheus.Desc
|
||||
disconnectsTotal *prometheus.Desc
|
||||
activeConnections *prometheus.Desc
|
||||
slowEvictionsTotal *prometheus.Desc
|
||||
messagesSentTotal *prometheus.Desc
|
||||
messagesDropped *prometheus.Desc
|
||||
redisConnected *prometheus.Desc
|
||||
redisXAddTotal *prometheus.Desc
|
||||
redisXAddErrors *prometheus.Desc
|
||||
redisXReadTotal *prometheus.Desc
|
||||
redisXReadErrors *prometheus.Desc
|
||||
redisAckTotal *prometheus.Desc
|
||||
redisMirrorErrors *prometheus.Desc
|
||||
redisMirrorDiverged *prometheus.Desc
|
||||
}
|
||||
|
||||
func NewRealtimeCollector(m *realtime.Metrics) *RealtimeCollector {
|
||||
return &RealtimeCollector{
|
||||
metrics: m,
|
||||
|
||||
connectsTotal: newRealtimeDesc("connects_total", "Total realtime WebSocket connections opened."),
|
||||
disconnectsTotal: newRealtimeDesc("disconnects_total", "Total realtime WebSocket connections closed."),
|
||||
activeConnections: newRealtimeDesc("active_connections", "Current realtime WebSocket connections."),
|
||||
slowEvictionsTotal: newRealtimeDesc("slow_evictions_total", "Total realtime clients evicted for slow consumption."),
|
||||
messagesSentTotal: newRealtimeDesc("messages_sent_total", "Total realtime messages sent."),
|
||||
messagesDropped: newRealtimeDesc("messages_dropped_total", "Total realtime messages dropped."),
|
||||
redisConnected: newRealtimeDesc("redis_connected", "Whether the realtime Redis relay is connected."),
|
||||
redisXAddTotal: newRealtimeDesc("redis_xadd_total", "Total Redis XADD operations by the realtime relay."),
|
||||
redisXAddErrors: newRealtimeDesc("redis_xadd_errors_total", "Total Redis XADD errors by the realtime relay."),
|
||||
redisXReadTotal: newRealtimeDesc("redis_xread_total", "Total Redis XREAD operations by the realtime relay."),
|
||||
redisXReadErrors: newRealtimeDesc("redis_xread_errors_total", "Total Redis XREAD errors by the realtime relay."),
|
||||
redisAckTotal: newRealtimeDesc("redis_ack_total", "Total Redis stream acknowledgements by the realtime relay."),
|
||||
redisMirrorErrors: prometheus.NewDesc("multica_realtime_redis_mirror_errors_total", "Total Redis mirror write errors by the realtime relay.", []string{"target"}, nil),
|
||||
redisMirrorDiverged: newRealtimeDesc("redis_mirror_divergence_total", "Total Redis mirror divergence events by the realtime relay."),
|
||||
}
|
||||
}
|
||||
|
||||
func newRealtimeDesc(name, help string) *prometheus.Desc {
|
||||
return prometheus.NewDesc("multica_realtime_"+name, help, nil, nil)
|
||||
}
|
||||
|
||||
func (c *RealtimeCollector) Describe(ch chan<- *prometheus.Desc) {
|
||||
for _, desc := range []*prometheus.Desc{
|
||||
c.connectsTotal,
|
||||
c.disconnectsTotal,
|
||||
c.activeConnections,
|
||||
c.slowEvictionsTotal,
|
||||
c.messagesSentTotal,
|
||||
c.messagesDropped,
|
||||
c.redisConnected,
|
||||
c.redisXAddTotal,
|
||||
c.redisXAddErrors,
|
||||
c.redisXReadTotal,
|
||||
c.redisXReadErrors,
|
||||
c.redisAckTotal,
|
||||
c.redisMirrorErrors,
|
||||
c.redisMirrorDiverged,
|
||||
} {
|
||||
ch <- desc
|
||||
}
|
||||
}
|
||||
|
||||
func (c *RealtimeCollector) Collect(ch chan<- prometheus.Metric) {
|
||||
if c.metrics == nil {
|
||||
return
|
||||
}
|
||||
m := c.metrics
|
||||
ch <- prometheus.MustNewConstMetric(c.connectsTotal, prometheus.CounterValue, float64(m.ConnectsTotal.Load()))
|
||||
ch <- prometheus.MustNewConstMetric(c.disconnectsTotal, prometheus.CounterValue, float64(m.DisconnectsTotal.Load()))
|
||||
ch <- prometheus.MustNewConstMetric(c.activeConnections, prometheus.GaugeValue, float64(m.ActiveConnections.Load()))
|
||||
ch <- prometheus.MustNewConstMetric(c.slowEvictionsTotal, prometheus.CounterValue, float64(m.SlowEvictionsTotal.Load()))
|
||||
ch <- prometheus.MustNewConstMetric(c.messagesSentTotal, prometheus.CounterValue, float64(m.MessagesSentTotal.Load()))
|
||||
ch <- prometheus.MustNewConstMetric(c.messagesDropped, prometheus.CounterValue, float64(m.MessagesDroppedTotal.Load()))
|
||||
ch <- prometheus.MustNewConstMetric(c.redisConnected, prometheus.GaugeValue, boolFloat(m.RedisConnected.Load()))
|
||||
ch <- prometheus.MustNewConstMetric(c.redisXAddTotal, prometheus.CounterValue, float64(m.RedisXAddTotal.Load()))
|
||||
ch <- prometheus.MustNewConstMetric(c.redisXAddErrors, prometheus.CounterValue, float64(m.RedisXAddErrors.Load()))
|
||||
ch <- prometheus.MustNewConstMetric(c.redisXReadTotal, prometheus.CounterValue, float64(m.RedisXReadTotal.Load()))
|
||||
ch <- prometheus.MustNewConstMetric(c.redisXReadErrors, prometheus.CounterValue, float64(m.RedisXReadErrors.Load()))
|
||||
ch <- prometheus.MustNewConstMetric(c.redisAckTotal, prometheus.CounterValue, float64(m.RedisAckTotal.Load()))
|
||||
ch <- prometheus.MustNewConstMetric(c.redisMirrorErrors, prometheus.CounterValue, float64(m.RedisMirrorPrimaryErrors.Load()), "primary")
|
||||
ch <- prometheus.MustNewConstMetric(c.redisMirrorErrors, prometheus.CounterValue, float64(m.RedisMirrorSecondaryErrors.Load()), "secondary")
|
||||
ch <- prometheus.MustNewConstMetric(c.redisMirrorDiverged, prometheus.CounterValue, float64(m.RedisMirrorDivergenceTotal.Load()))
|
||||
}
|
||||
|
||||
func boolFloat(v bool) float64 {
|
||||
if v {
|
||||
return 1
|
||||
}
|
||||
return 0
|
||||
}
|
||||
36
server/internal/metrics/realtime_test.go
Normal file
36
server/internal/metrics/realtime_test.go
Normal file
@@ -0,0 +1,36 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/multica-ai/multica/server/internal/realtime"
|
||||
)
|
||||
|
||||
func TestRealtimeCollectorExposesCounters(t *testing.T) {
|
||||
m := &realtime.Metrics{}
|
||||
m.ActiveConnections.Store(3)
|
||||
m.MessagesSentTotal.Store(11)
|
||||
m.RedisConnected.Store(true)
|
||||
m.RedisMirrorPrimaryErrors.Store(2)
|
||||
m.RedisMirrorSecondaryErrors.Store(5)
|
||||
|
||||
registry := NewRegistry(RegistryOptions{Realtime: m})
|
||||
rec := httptest.NewRecorder()
|
||||
NewHandler(registry.Gatherer).ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/metrics", nil))
|
||||
body := rec.Body.String()
|
||||
|
||||
for _, want := range []string{
|
||||
"multica_realtime_active_connections 3",
|
||||
"multica_realtime_messages_sent_total 11",
|
||||
"multica_realtime_redis_connected 1",
|
||||
`multica_realtime_redis_mirror_errors_total{target="primary"} 2`,
|
||||
`multica_realtime_redis_mirror_errors_total{target="secondary"} 5`,
|
||||
} {
|
||||
if !strings.Contains(body, want) {
|
||||
t.Fatalf("metrics body missing %q\n%s", want, body)
|
||||
}
|
||||
}
|
||||
}
|
||||
59
server/internal/metrics/registry.go
Normal file
59
server/internal/metrics/registry.go
Normal file
@@ -0,0 +1,59 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/collectors"
|
||||
|
||||
"github.com/multica-ai/multica/server/internal/realtime"
|
||||
)
|
||||
|
||||
type RegistryOptions struct {
|
||||
Pool *pgxpool.Pool
|
||||
Realtime *realtime.Metrics
|
||||
Version string
|
||||
Commit string
|
||||
}
|
||||
|
||||
type Registry struct {
|
||||
Gatherer prometheus.Gatherer
|
||||
HTTP *HTTPMetrics
|
||||
}
|
||||
|
||||
func NewRegistry(opts RegistryOptions) *Registry {
|
||||
reg := prometheus.NewRegistry()
|
||||
reg.MustRegister(collectors.NewGoCollector())
|
||||
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
|
||||
|
||||
buildInfo := prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Name: "multica_build_info",
|
||||
Help: "Build information for the Multica server binary.",
|
||||
}, []string{"version", "commit"})
|
||||
buildInfo.WithLabelValues(defaultLabel(opts.Version, "dev"), defaultLabel(opts.Commit, "unknown")).Set(1)
|
||||
reg.MustRegister(buildInfo)
|
||||
|
||||
httpMetrics := NewHTTPMetrics()
|
||||
reg.MustRegister(httpMetrics.Collectors()...)
|
||||
|
||||
if opts.Pool != nil {
|
||||
reg.MustRegister(NewDBCollector(opts.Pool))
|
||||
}
|
||||
if opts.Realtime != nil {
|
||||
reg.MustRegister(NewRealtimeCollector(opts.Realtime))
|
||||
}
|
||||
|
||||
return &Registry{
|
||||
Gatherer: reg,
|
||||
HTTP: httpMetrics,
|
||||
}
|
||||
}
|
||||
|
||||
func defaultLabel(value, fallback string) string {
|
||||
value = strings.TrimSpace(value)
|
||||
if value == "" {
|
||||
return fallback
|
||||
}
|
||||
return value
|
||||
}
|
||||
29
server/internal/metrics/server.go
Normal file
29
server/internal/metrics/server.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
)
|
||||
|
||||
func NewHandler(gatherer prometheus.Gatherer) http.Handler {
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle("/metrics", promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{
|
||||
EnableOpenMetrics: true,
|
||||
ErrorHandling: promhttp.HTTPErrorOnError,
|
||||
}))
|
||||
return mux
|
||||
}
|
||||
|
||||
func NewServer(addr string, gatherer prometheus.Gatherer) *http.Server {
|
||||
return &http.Server{
|
||||
Addr: addr,
|
||||
Handler: NewHandler(gatherer),
|
||||
ReadHeaderTimeout: 5 * time.Second,
|
||||
ReadTimeout: 10 * time.Second,
|
||||
WriteTimeout: 10 * time.Second,
|
||||
IdleTimeout: 30 * time.Second,
|
||||
}
|
||||
}
|
||||
46
server/internal/metrics/server_test.go
Normal file
46
server/internal/metrics/server_test.go
Normal file
@@ -0,0 +1,46 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestMetricsServerCanBindLoopback(t *testing.T) {
|
||||
registry := NewRegistry(RegistryOptions{})
|
||||
server := NewServer("127.0.0.1:0", registry.Gatherer)
|
||||
ln, err := net.Listen("tcp", server.Addr)
|
||||
if err != nil {
|
||||
t.Fatalf("listen: %v", err)
|
||||
}
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
errCh <- server.Serve(ln)
|
||||
}()
|
||||
t.Cleanup(func() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
_ = server.Shutdown(ctx)
|
||||
if err := <-errCh; err != nil && err != http.ErrServerClosed {
|
||||
t.Fatalf("serve: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
resp, err := http.Get("http://" + ln.Addr().String() + "/metrics")
|
||||
if err != nil {
|
||||
t.Fatalf("get /metrics: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
t.Fatalf("/metrics status = %d, want %d", resp.StatusCode, http.StatusOK)
|
||||
}
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
if !strings.Contains(string(body), "multica_build_info") {
|
||||
t.Fatalf("/metrics body missing build info: %s", body)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user