mirror of
https://github.com/fiatjaf/khatru.git
synced 2025-03-17 13:22:56 +01:00
support hyperloglog handlers on COUNT.
This commit is contained in:
parent
5b88b9b087
commit
6275f52134
7
go.mod
7
go.mod
@ -7,7 +7,7 @@ require (
|
||||
github.com/fasthttp/websocket v1.5.7
|
||||
github.com/fiatjaf/eventstore v0.13.0
|
||||
github.com/liamg/magic v0.0.1
|
||||
github.com/nbd-wtf/go-nostr v0.42.0
|
||||
github.com/nbd-wtf/go-nostr v0.43.0
|
||||
github.com/puzpuzpuz/xsync/v3 v3.4.0
|
||||
github.com/rs/cors v1.11.1
|
||||
github.com/stretchr/testify v1.9.0
|
||||
@ -20,7 +20,6 @@ require (
|
||||
github.com/aquasecurity/esquery v0.2.0 // indirect
|
||||
github.com/btcsuite/btcd/btcec/v2 v2.3.4 // indirect
|
||||
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect
|
||||
github.com/cespare/xxhash v1.1.0 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/decred/dcrd/crypto/blake256 v1.1.0 // indirect
|
||||
@ -39,13 +38,15 @@ require (
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
||||
github.com/golang/protobuf v1.5.4 // indirect
|
||||
github.com/google/flatbuffers v24.3.25+incompatible // indirect
|
||||
github.com/greatroar/blobloom v0.8.0 // indirect
|
||||
github.com/jmoiron/sqlx v1.3.5 // indirect
|
||||
github.com/josharian/intern v1.0.0 // indirect
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/klauspost/compress v1.17.10 // indirect
|
||||
github.com/lib/pq v1.10.9 // indirect
|
||||
github.com/mailru/easyjson v0.7.7 // indirect
|
||||
github.com/mattn/go-sqlite3 v1.14.18 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee // indirect
|
||||
|
22
go.sum
22
go.sum
@ -2,8 +2,6 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
|
||||
fiatjaf.com/lib v0.2.0 h1:TgIJESbbND6GjOgGHxF5jsO6EMjuAxIzZHPo5DXYexs=
|
||||
fiatjaf.com/lib v0.2.0/go.mod h1:Ycqq3+mJ9jAWu7XjbQI1cVr+OFgnHn79dQR5oTII47g=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
|
||||
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
|
||||
github.com/PowerDNS/lmdb-go v1.9.2 h1:Cmgerh9y3ZKBZGz1irxSShhfmFyRUh+Zdk4cZk7ZJvU=
|
||||
github.com/PowerDNS/lmdb-go v1.9.2/go.mod h1:TE0l+EZK8Z1B4dx070ZxkWTlp8RG1mjN0/+FkFRQMtU=
|
||||
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
|
||||
@ -17,8 +15,6 @@ github.com/btcsuite/btcd/btcec/v2 v2.3.4/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY
|
||||
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 h1:59Kx4K6lzOW5w6nFlA0v5+lk/6sjybR934QNHSJZPTQ=
|
||||
github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc=
|
||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
|
||||
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||
@ -92,9 +88,8 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
|
||||
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/greatroar/blobloom v0.8.0 h1:I9RlEkfqK9/6f1v9mFmDYegDQ/x0mISCpiNpAm23Pt4=
|
||||
github.com/greatroar/blobloom v0.8.0/go.mod h1:mjMJ1hh1wjGVfr93QIHJ6FfDNVrA0IELv8OvMHJxHKs=
|
||||
github.com/jgroeneveld/schema v1.0.0 h1:J0E10CrOkiSEsw6dfb1IfrDJD14pf6QLVJ3tRPl/syI=
|
||||
github.com/jgroeneveld/schema v1.0.0/go.mod h1:M14lv7sNMtGvo3ops1MwslaSYgDYxrSmbzWIQ0Mr5rs=
|
||||
github.com/jgroeneveld/trial v2.0.0+incompatible h1:d59ctdgor+VqdZCAiUfVN8K13s0ALDioG5DWwZNtRuQ=
|
||||
@ -103,6 +98,8 @@ github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g=
|
||||
github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ=
|
||||
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
|
||||
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
|
||||
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
|
||||
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
|
||||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/NMPtT0=
|
||||
@ -121,8 +118,12 @@ github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJ
|
||||
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
|
||||
github.com/mattn/go-sqlite3 v1.14.18 h1:JL0eqdCOq6DJVNPSvArO/bIV9/P7fbGrV00LZHc+5aI=
|
||||
github.com/mattn/go-sqlite3 v1.14.18/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
|
||||
github.com/nbd-wtf/go-nostr v0.42.0 h1:EofWfXEhKic9AYVf4RHuXZr+kKUZE2jVyJtJByNe1rE=
|
||||
github.com/nbd-wtf/go-nostr v0.42.0/go.mod h1:FBa4FBJO7NuANvkeKSlrf0BIyxGufmrUbuelr6Q4Ick=
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
|
||||
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
|
||||
github.com/nbd-wtf/go-nostr v0.43.0 h1:KJh/HXjKkhNCejRswHWVg8IgoAyLjw4iWxu/JDUZzqM=
|
||||
github.com/nbd-wtf/go-nostr v0.43.0/go.mod h1:8YfmT9tBuRT+4nWHuMBDh+xSIZqAdZC6QIOgQfBgWxU=
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
@ -132,17 +133,14 @@ github.com/puzpuzpuz/xsync/v3 v3.4.0 h1:DuVBAdXuGFHv8adVXjWWZ63pJq+NRXOWVXlKDBZ+
|
||||
github.com/puzpuzpuz/xsync/v3 v3.4.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA=
|
||||
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
|
||||
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
|
||||
github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik=
|
||||
github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
|
||||
github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA=
|
||||
github.com/rs/cors v1.11.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
|
||||
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee h1:8Iv5m6xEo1NR1AvpV+7XmhI4r39LGNzwUL4YpMuL5vk=
|
||||
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee/go.mod h1:qwtSXrKuJh/zsFQ12yEE89xfCrGKK63Rr7ctU/uCo4g=
|
||||
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
|
||||
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||
|
39
handlers.go
39
handlers.go
@ -14,6 +14,8 @@ import (
|
||||
"github.com/fasthttp/websocket"
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
"github.com/nbd-wtf/go-nostr/nip42"
|
||||
"github.com/nbd-wtf/go-nostr/nip45"
|
||||
"github.com/nbd-wtf/go-nostr/nip45/hyperloglog"
|
||||
"github.com/nbd-wtf/go-nostr/nip77"
|
||||
"github.com/nbd-wtf/go-nostr/nip77/negentropy"
|
||||
"github.com/puzpuzpuz/xsync/v3"
|
||||
@ -228,20 +230,53 @@ func (rl *Relay) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
ws.WriteJSON(nostr.OKEnvelope{EventID: env.Event.ID, OK: ok, Reason: reason})
|
||||
case *nostr.CountEnvelope:
|
||||
if rl.CountEvents == nil {
|
||||
if rl.CountEvents == nil && rl.CountEventsHLL == nil {
|
||||
ws.WriteJSON(nostr.ClosedEnvelope{SubscriptionID: env.SubscriptionID, Reason: "unsupported: this relay does not support NIP-45"})
|
||||
return
|
||||
}
|
||||
|
||||
var total int64
|
||||
var hll *hyperloglog.HyperLogLog
|
||||
uneligibleForHLL := false
|
||||
|
||||
for _, filter := range env.Filters {
|
||||
srl := rl
|
||||
if rl.getSubRelayFromFilter != nil {
|
||||
srl = rl.getSubRelayFromFilter(filter)
|
||||
}
|
||||
|
||||
if offset := nip45.HyperLogLogEventPubkeyOffsetForFilter(filter); offset != -1 && !uneligibleForHLL {
|
||||
partial, phll := srl.handleCountRequestWithHLL(ctx, ws, filter, offset)
|
||||
if phll != nil {
|
||||
if hll == nil {
|
||||
// in the first iteration (which should be the only case of the times)
|
||||
// we optimize slightly by assigning instead of merging
|
||||
hll = phll
|
||||
} else {
|
||||
hll.Merge(phll)
|
||||
}
|
||||
} else {
|
||||
// if any of the filters is uneligible then we will discard previous HLL results
|
||||
// and refuse to do HLL at all anymore for this query
|
||||
uneligibleForHLL = true
|
||||
hll = nil
|
||||
}
|
||||
total += partial
|
||||
} else {
|
||||
total += srl.handleCountRequest(ctx, ws, filter)
|
||||
}
|
||||
ws.WriteJSON(nostr.CountEnvelope{SubscriptionID: env.SubscriptionID, Count: &total})
|
||||
}
|
||||
|
||||
resp := nostr.CountEnvelope{
|
||||
SubscriptionID: env.SubscriptionID,
|
||||
Count: &total,
|
||||
}
|
||||
if hll != nil {
|
||||
resp.HyperLogLog = hll.GetRegisters()
|
||||
}
|
||||
|
||||
ws.WriteJSON(resp)
|
||||
|
||||
case *nostr.ReqEnvelope:
|
||||
eose := sync.WaitGroup{}
|
||||
eose.Add(len(env.Filters))
|
||||
|
3
relay.go
3
relay.go
@ -11,6 +11,7 @@ import (
|
||||
"github.com/fasthttp/websocket"
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
"github.com/nbd-wtf/go-nostr/nip11"
|
||||
"github.com/nbd-wtf/go-nostr/nip45/hyperloglog"
|
||||
)
|
||||
|
||||
func NewRelay() *Relay {
|
||||
@ -56,9 +57,9 @@ type Relay struct {
|
||||
RejectFilter []func(ctx context.Context, filter nostr.Filter) (reject bool, msg string)
|
||||
RejectCountFilter []func(ctx context.Context, filter nostr.Filter) (reject bool, msg string)
|
||||
OverwriteFilter []func(ctx context.Context, filter *nostr.Filter)
|
||||
OverwriteCountFilter []func(ctx context.Context, filter *nostr.Filter)
|
||||
QueryEvents []func(ctx context.Context, filter nostr.Filter) (chan *nostr.Event, error)
|
||||
CountEvents []func(ctx context.Context, filter nostr.Filter) (int64, error)
|
||||
CountEventsHLL []func(ctx context.Context, filter nostr.Filter, offset int) (int64, *hyperloglog.HyperLogLog, error)
|
||||
RejectConnection []func(r *http.Request) bool
|
||||
OnConnect []func(ctx context.Context)
|
||||
OnDisconnect []func(ctx context.Context)
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
"github.com/nbd-wtf/go-nostr/nip45/hyperloglog"
|
||||
)
|
||||
|
||||
func (rl *Relay) handleRequest(ctx context.Context, id string, eose *sync.WaitGroup, ws *WebSocket, filter nostr.Filter) error {
|
||||
@ -61,12 +62,7 @@ func (rl *Relay) handleRequest(ctx context.Context, id string, eose *sync.WaitGr
|
||||
}
|
||||
|
||||
func (rl *Relay) handleCountRequest(ctx context.Context, ws *WebSocket, filter nostr.Filter) int64 {
|
||||
// overwrite the filter (for example, to eliminate some kinds or tags that we know we don't support)
|
||||
for _, ovw := range rl.OverwriteCountFilter {
|
||||
ovw(ctx, &filter)
|
||||
}
|
||||
|
||||
// then check if we'll reject this filter
|
||||
// check if we'll reject this filter
|
||||
for _, reject := range rl.RejectCountFilter {
|
||||
if rejecting, msg := reject(ctx, filter); rejecting {
|
||||
ws.WriteJSON(nostr.NoticeEnvelope(msg))
|
||||
@ -86,3 +82,38 @@ func (rl *Relay) handleCountRequest(ctx context.Context, ws *WebSocket, filter n
|
||||
|
||||
return subtotal
|
||||
}
|
||||
|
||||
func (rl *Relay) handleCountRequestWithHLL(
|
||||
ctx context.Context,
|
||||
ws *WebSocket,
|
||||
filter nostr.Filter,
|
||||
offset int,
|
||||
) (int64, *hyperloglog.HyperLogLog) {
|
||||
// check if we'll reject this filter
|
||||
for _, reject := range rl.RejectCountFilter {
|
||||
if rejecting, msg := reject(ctx, filter); rejecting {
|
||||
ws.WriteJSON(nostr.NoticeEnvelope(msg))
|
||||
return 0, nil
|
||||
}
|
||||
}
|
||||
|
||||
// run the functions to count (generally it will be just one)
|
||||
var subtotal int64 = 0
|
||||
var hll *hyperloglog.HyperLogLog
|
||||
for _, countHLL := range rl.CountEventsHLL {
|
||||
res, fhll, err := countHLL(ctx, filter, offset)
|
||||
if err != nil {
|
||||
ws.WriteJSON(nostr.NoticeEnvelope(err.Error()))
|
||||
}
|
||||
subtotal += res
|
||||
if fhll != nil {
|
||||
if hll == nil {
|
||||
hll = fhll
|
||||
} else {
|
||||
hll.Merge(fhll)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return subtotal, hll
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user