mirror of
https://github.com/fiatjaf/khatru.git
synced 2025-03-17 21:32:55 +01:00
Merge pull request #37 from stereosteve/elasticsearch
This commit is contained in:
commit
50ad9de70b
7
go.mod
7
go.mod
@ -5,6 +5,7 @@ go 1.18
|
||||
require (
|
||||
github.com/PuerkitoBio/goquery v1.8.0
|
||||
github.com/cockroachdb/pebble v0.0.0-20220723153705-3fc374e4dc66
|
||||
github.com/elastic/go-elasticsearch/v8 v8.6.0
|
||||
github.com/gorilla/mux v1.8.0
|
||||
github.com/gorilla/websocket v1.4.2
|
||||
github.com/grokify/html-strip-tags-go v0.0.1
|
||||
@ -27,6 +28,7 @@ require (
|
||||
github.com/SaveTheRbtz/generic-sync-map-go v0.0.0-20220414055132-a37292614db8 // indirect
|
||||
github.com/aead/siphash v1.0.1 // indirect
|
||||
github.com/andybalholm/cascadia v1.3.1 // indirect
|
||||
github.com/aquasecurity/esquery v0.2.0 // indirect
|
||||
github.com/btcsuite/btcd v0.23.1 // indirect
|
||||
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
|
||||
github.com/btcsuite/btcd/btcutil v1.1.1 // indirect
|
||||
@ -50,6 +52,9 @@ require (
|
||||
github.com/decred/dcrd/crypto/blake256 v1.0.0 // indirect
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
|
||||
github.com/decred/dcrd/lru v1.0.0 // indirect
|
||||
github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c // indirect
|
||||
github.com/elastic/go-elasticsearch/v7 v7.6.0 // indirect
|
||||
github.com/fatih/structs v1.1.0 // indirect
|
||||
github.com/go-errors/errors v1.0.1 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang/snappy v0.0.4 // indirect
|
||||
@ -80,3 +85,5 @@ require (
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 // indirect
|
||||
golang.org/x/text v0.3.7 // indirect
|
||||
)
|
||||
|
||||
replace github.com/nbd-wtf/go-nostr => ../go-nostr
|
||||
|
14
go.sum
14
go.sum
@ -26,6 +26,8 @@ github.com/andybalholm/brotli v1.0.3 h1:fpcw+r1N1h0Poc1F/pHbW40cUm/lMEQslZtCkBQ0
|
||||
github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
|
||||
github.com/andybalholm/cascadia v1.3.1 h1:nhxRkql1kdYCc8Snf7D5/D3spOX+dBgjA6u8x004T2c=
|
||||
github.com/andybalholm/cascadia v1.3.1/go.mod h1:R4bJ1UQfqADjvDa4P6HZHLh/3OxWWEqc0Sk8XGwHqvA=
|
||||
github.com/aquasecurity/esquery v0.2.0 h1:9WWXve95TE8hbm3736WB7nS6Owl8UGDeu+0jiyE9ttA=
|
||||
github.com/aquasecurity/esquery v0.2.0/go.mod h1:VU+CIFR6C+H142HHZf9RUkp4Eedpo9UrEKeCQHWf9ao=
|
||||
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
|
||||
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
@ -123,11 +125,18 @@ github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4
|
||||
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
|
||||
github.com/dvyukov/go-fuzz v0.0.0-20210602112143-b1f3d6f4ef4e h1:qTP1telKJHlToHlwPQNmVg4yfMDMHe4Z3SYmzkrvA2M=
|
||||
github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM=
|
||||
github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c h1:onA2RpIyeCPvYAj1LFYiiMTrSpqVINWMfYFRS7lofJs=
|
||||
github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI=
|
||||
github.com/elastic/go-elasticsearch/v7 v7.6.0 h1:sYpGLpEFHgLUKLsZUBfuaVI9QgHjS3JdH9fX4/z8QI8=
|
||||
github.com/elastic/go-elasticsearch/v7 v7.6.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
|
||||
github.com/elastic/go-elasticsearch/v8 v8.6.0 h1:xMaSe8jIh7NHzmNo9YBkewmaD2Pr+tX+zLkXxhieny4=
|
||||
github.com/elastic/go-elasticsearch/v8 v8.6.0/go.mod h1:Usvydt+x0dv9a1TzEUaovqbJor8rmOHy5dSmPeMAE2k=
|
||||
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
|
||||
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
||||
github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw=
|
||||
github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8=
|
||||
github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
|
||||
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
|
||||
github.com/fergusstrange/embedded-postgres v1.10.0 h1:YnwF6xAQYmKLAXXrrRx4rHDLih47YJwVPvg8jeKfdNg=
|
||||
github.com/flosch/pongo2 v0.0.0-20190707114632-bbf5a6c351f4/go.mod h1:T9YF2M40nIgbVgp3rreNmTged+9HrbNTIQf1PsaIiTA=
|
||||
@ -219,6 +228,8 @@ github.com/jb55/lnsocket/go v0.0.0-20220725174341-b98b5cd37bb6/go.mod h1:atFK/q4
|
||||
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||
github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA=
|
||||
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||
github.com/jgroeneveld/schema v1.0.0/go.mod h1:M14lv7sNMtGvo3ops1MwslaSYgDYxrSmbzWIQ0Mr5rs=
|
||||
github.com/jgroeneveld/trial v2.0.0+incompatible/go.mod h1:I6INLW96EN8WysNBXUFI3M4RIC8ePg9ntAc/Wy+U/+M=
|
||||
github.com/jmoiron/sqlx v1.3.1 h1:aLN7YINNZ7cYOPK3QC83dbM6KT0NMqVMw961TqrejlE=
|
||||
github.com/jmoiron/sqlx v1.3.1/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ=
|
||||
github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9qUBdQ=
|
||||
@ -320,8 +331,6 @@ github.com/moul/http2curl v1.0.0/go.mod h1:8UbvGypXm98wA/IqH45anm5Y2Z6ep6O31QGOA
|
||||
github.com/nats-io/nats.go v1.8.1/go.mod h1:BrFz9vVn0fU3AcH9Vn4Kd7W0NpJ651tD5omQ3M8LwxM=
|
||||
github.com/nats-io/nkeys v0.0.2/go.mod h1:dab7URMsZm6Z/jp9Z5UGa87Uutgc2mVpXLC4B7TDb/4=
|
||||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||
github.com/nbd-wtf/go-nostr v0.12.0 h1:6uo6D6jhcNzrzm6Fi8nA3jfZQqoXbeTWi9dIX5MsgZc=
|
||||
github.com/nbd-wtf/go-nostr v0.12.0/go.mod h1:qFFTIxh15H5GGN0WsBI/P73DteqsevnhSEW/yk8nEf4=
|
||||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
|
||||
github.com/nwaples/rardecode v1.1.2 h1:Cj0yZY6T1Zx1R7AhTbyGSALm44/Mmq+BAPc4B/p/d3M=
|
||||
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
|
||||
@ -369,6 +378,7 @@ github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb6
|
||||
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
|
||||
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
|
||||
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
|
||||
github.com/stereosteve/go-nostr v0.13.1/go.mod h1:qFFTIxh15H5GGN0WsBI/P73DteqsevnhSEW/yk8nEf4=
|
||||
github.com/stevelacy/daz v0.1.4 h1:ugmff/D7D764wZjXSgSryEINE/bi+Xddllw3JQQGbWk=
|
||||
github.com/stevelacy/daz v0.1.4/go.mod h1:AbK6DzjiIL15r4bQtcFvOBAvDGMXoh+uIG26NRUugt0=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
|
26
search/README.md
Normal file
26
search/README.md
Normal file
@ -0,0 +1,26 @@
|
||||
# Search Relay
|
||||
|
||||
Uses ElasticSearch storage backend for all queries, with some basic full text search support.
|
||||
|
||||
Index some events:
|
||||
|
||||
```
|
||||
bzip2 -cd nostr-wellorder-early-1m-v1.jsonl.bz2 | \
|
||||
jq -c '["EVENT", .]' | \
|
||||
awk 'length($0)<131072' | \
|
||||
websocat -n -B 200000 ws://127.0.0.1:7447
|
||||
```
|
||||
|
||||
Do a search:
|
||||
|
||||
```
|
||||
echo '["REQ", "asdf", {"search": "steve", "kinds": [0]}]' | websocat -n ws://127.0.0.1:7447
|
||||
```
|
||||
|
||||
|
||||
## Customize
|
||||
|
||||
Currently the indexing is very basic: It will index the `contents` field for all events where kind != 4.
|
||||
Some additional mapping and pre-processing could add better support for different content types.
|
||||
See comments in `storage/elasticsearch/elasticsearch.go`.
|
||||
|
44
search/docker-compose.yml
Normal file
44
search/docker-compose.yml
Normal file
@ -0,0 +1,44 @@
|
||||
version: "3.8"
|
||||
services:
|
||||
|
||||
relay:
|
||||
image: golang
|
||||
# build:
|
||||
# context: ../
|
||||
# dockerfile: ./basic/Dockerfile
|
||||
environment:
|
||||
PORT: 2700
|
||||
ES_URL: http://elasticsearch:9200
|
||||
depends_on:
|
||||
elasticsearch:
|
||||
condition: service_healthy
|
||||
ports:
|
||||
- 2700:2700
|
||||
volumes:
|
||||
- ./nostres:/bin/relay
|
||||
command: "/bin/relay"
|
||||
|
||||
elasticsearch:
|
||||
container_name: elasticsearch
|
||||
image: docker.elastic.co/elasticsearch/elasticsearch:8.6.0
|
||||
restart: always
|
||||
environment:
|
||||
- network.host=0.0.0.0
|
||||
- discovery.type=single-node
|
||||
- cluster.name=docker-cluster
|
||||
- node.name=cluster1-node1
|
||||
- xpack.license.self_generated.type=basic
|
||||
- xpack.security.enabled=false
|
||||
- "ES_JAVA_OPTS=-Xms${ES_MEM:-4g} -Xmx${ES_MEM:-4g}"
|
||||
ports:
|
||||
- '127.0.0.1:9200:9200'
|
||||
ulimits:
|
||||
memlock:
|
||||
soft: -1
|
||||
hard: -1
|
||||
healthcheck:
|
||||
test:
|
||||
["CMD-SHELL", "curl --silent --fail elasticsearch:9200/_cluster/health || exit 1"]
|
||||
interval: 10s
|
||||
timeout: 5s
|
||||
retries: 5
|
64
search/main.go
Normal file
64
search/main.go
Normal file
@ -0,0 +1,64 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/fiatjaf/relayer"
|
||||
"github.com/fiatjaf/relayer/storage/elasticsearch"
|
||||
"github.com/kelseyhightower/envconfig"
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
)
|
||||
|
||||
type Relay struct {
|
||||
storage *elasticsearch.ElasticsearchStorage
|
||||
}
|
||||
|
||||
func (r *Relay) Name() string {
|
||||
return "SearchRelay"
|
||||
}
|
||||
|
||||
func (r *Relay) Storage() relayer.Storage {
|
||||
return r.storage
|
||||
}
|
||||
|
||||
func (r *Relay) OnInitialized(*relayer.Server) {}
|
||||
|
||||
func (r *Relay) Init() error {
|
||||
err := envconfig.Process("", r)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't process envconfig: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Relay) AcceptEvent(evt *nostr.Event) bool {
|
||||
// block events that are too large
|
||||
// jsonb, _ := json.Marshal(evt)
|
||||
// if len(jsonb) > 100000 {
|
||||
// return false
|
||||
// }
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (r *Relay) BeforeSave(evt *nostr.Event) {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
func (r *Relay) AfterSave(evt *nostr.Event) {
|
||||
|
||||
}
|
||||
|
||||
func main() {
|
||||
r := Relay{}
|
||||
if err := envconfig.Process("", &r); err != nil {
|
||||
log.Fatalf("failed to read from env: %v", err)
|
||||
return
|
||||
}
|
||||
r.storage = &elasticsearch.ElasticsearchStorage{}
|
||||
if err := relayer.Start(&r); err != nil {
|
||||
log.Fatalf("server terminated: %v", err)
|
||||
}
|
||||
}
|
185
storage/elasticsearch/elasticsearch.go
Normal file
185
storage/elasticsearch/elasticsearch.go
Normal file
@ -0,0 +1,185 @@
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/elastic/go-elasticsearch/v8"
|
||||
"github.com/elastic/go-elasticsearch/v8/esutil"
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
)
|
||||
|
||||
type IndexedEvent struct {
|
||||
Event nostr.Event `json:"event"`
|
||||
ContentSearch string `json:"content_search"`
|
||||
}
|
||||
|
||||
var indexMapping = `
|
||||
{
|
||||
"settings": {
|
||||
"number_of_shards": 1,
|
||||
"number_of_replicas": 0
|
||||
},
|
||||
"mappings": {
|
||||
"dynamic": false,
|
||||
"properties": {
|
||||
"event": {
|
||||
"dynamic": false,
|
||||
"properties": {
|
||||
"id": {"type": "keyword"},
|
||||
"pubkey": {"type": "keyword"},
|
||||
"kind": {"type": "integer"},
|
||||
"tags": {"type": "keyword"},
|
||||
"created_at": {"type": "date"}
|
||||
}
|
||||
},
|
||||
"content_search": {"type": "text"}
|
||||
}
|
||||
}
|
||||
}
|
||||
`
|
||||
|
||||
type ElasticsearchStorage struct {
|
||||
IndexName string
|
||||
|
||||
es *elasticsearch.Client
|
||||
bi esutil.BulkIndexer
|
||||
}
|
||||
|
||||
func (ess *ElasticsearchStorage) Init() error {
|
||||
|
||||
if ess.IndexName == "" {
|
||||
ess.IndexName = "events"
|
||||
}
|
||||
|
||||
cfg := elasticsearch.Config{}
|
||||
if x := os.Getenv("ES_URL"); x != "" {
|
||||
cfg.Addresses = strings.Split(x, ",")
|
||||
}
|
||||
es, err := elasticsearch.NewClient(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
res, err := es.Indices.Create(ess.IndexName, es.Indices.Create.WithBody(strings.NewReader(indexMapping)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if res.IsError() {
|
||||
body, _ := io.ReadAll(res.Body)
|
||||
txt := string(body)
|
||||
if !strings.Contains(txt, "resource_already_exists_exception") {
|
||||
return fmt.Errorf("%s", txt)
|
||||
}
|
||||
}
|
||||
|
||||
// bulk indexer
|
||||
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
|
||||
Index: ess.IndexName,
|
||||
Client: es,
|
||||
NumWorkers: 2,
|
||||
FlushInterval: 3 * time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating the indexer: %s", err)
|
||||
}
|
||||
|
||||
ess.es = es
|
||||
ess.bi = bi
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ess *ElasticsearchStorage) DeleteEvent(id string, pubkey string) error {
|
||||
// todo: is pubkey match required?
|
||||
|
||||
done := make(chan error)
|
||||
err := ess.bi.Add(
|
||||
context.Background(),
|
||||
esutil.BulkIndexerItem{
|
||||
Action: "delete",
|
||||
DocumentID: id,
|
||||
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
|
||||
close(done)
|
||||
},
|
||||
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
|
||||
if err != nil {
|
||||
done <- err
|
||||
} else {
|
||||
// ok if deleted item not found
|
||||
if res.Status == 404 {
|
||||
close(done)
|
||||
return
|
||||
}
|
||||
txt, _ := json.Marshal(res)
|
||||
err := fmt.Errorf("ERROR: %s", txt)
|
||||
done <- err
|
||||
}
|
||||
},
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = <-done
|
||||
return err
|
||||
}
|
||||
|
||||
func (ess *ElasticsearchStorage) SaveEvent(event *nostr.Event) error {
|
||||
ie := &IndexedEvent{
|
||||
Event: *event,
|
||||
}
|
||||
|
||||
// post processing: index for FTS
|
||||
// some ideas:
|
||||
// - index kind=0 fields a set of dedicated mapped fields
|
||||
// (or use a separate index for profiles with a dedicated mapping)
|
||||
// - if it's valid JSON just index the "values" and not the keys
|
||||
// - more content introspection: language detection
|
||||
// - denormalization... attach profile + ranking signals to events
|
||||
if event.Kind != 4 {
|
||||
ie.ContentSearch = event.Content
|
||||
}
|
||||
|
||||
data, err := json.Marshal(ie)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
done := make(chan error)
|
||||
|
||||
// adapted from:
|
||||
// https://github.com/elastic/go-elasticsearch/blob/main/_examples/bulk/indexer.go#L196
|
||||
err = ess.bi.Add(
|
||||
context.Background(),
|
||||
esutil.BulkIndexerItem{
|
||||
Action: "index",
|
||||
DocumentID: event.ID,
|
||||
Body: bytes.NewReader(data),
|
||||
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
|
||||
close(done)
|
||||
},
|
||||
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
|
||||
if err != nil {
|
||||
done <- err
|
||||
} else {
|
||||
err := fmt.Errorf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
|
||||
done <- err
|
||||
}
|
||||
},
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = <-done
|
||||
return err
|
||||
}
|
207
storage/elasticsearch/query.go
Normal file
207
storage/elasticsearch/query.go
Normal file
@ -0,0 +1,207 @@
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"reflect"
|
||||
|
||||
"github.com/aquasecurity/esquery"
|
||||
"github.com/elastic/go-elasticsearch/v8/esutil"
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
)
|
||||
|
||||
type EsSearchResult struct {
|
||||
Took int
|
||||
TimedOut bool `json:"timed_out"`
|
||||
Hits struct {
|
||||
Total struct {
|
||||
Value int
|
||||
Relation string
|
||||
}
|
||||
Hits []struct {
|
||||
Source IndexedEvent `json:"_source"`
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func buildDsl(filter *nostr.Filter) ([]byte, error) {
|
||||
dsl := esquery.Bool()
|
||||
|
||||
prefixFilter := func(fieldName string, values []string) {
|
||||
if len(values) == 0 {
|
||||
return
|
||||
}
|
||||
prefixQ := esquery.Bool()
|
||||
for _, v := range values {
|
||||
if len(v) < 64 {
|
||||
prefixQ.Should(esquery.Prefix(fieldName, v))
|
||||
} else {
|
||||
prefixQ.Should(esquery.Term(fieldName, v))
|
||||
}
|
||||
}
|
||||
dsl.Must(prefixQ)
|
||||
}
|
||||
|
||||
// ids
|
||||
prefixFilter("event.id", filter.IDs)
|
||||
|
||||
// authors
|
||||
prefixFilter("event.pubkey", filter.Authors)
|
||||
|
||||
// kinds
|
||||
if len(filter.Kinds) > 0 {
|
||||
dsl.Must(esquery.Terms("event.kind", toInterfaceSlice(filter.Kinds)...))
|
||||
}
|
||||
|
||||
// tags
|
||||
if len(filter.Tags) > 0 {
|
||||
tagQ := esquery.Bool()
|
||||
for char, terms := range filter.Tags {
|
||||
vs := toInterfaceSlice(append(terms, char))
|
||||
tagQ.Should(esquery.Terms("event.tags", vs...))
|
||||
}
|
||||
dsl.Must(tagQ)
|
||||
}
|
||||
|
||||
// since
|
||||
if filter.Since != nil {
|
||||
dsl.Must(esquery.Range("event.created_at").Gt(filter.Since.Unix()))
|
||||
}
|
||||
|
||||
// until
|
||||
if filter.Until != nil {
|
||||
dsl.Must(esquery.Range("event.created_at").Lt(filter.Until.Unix()))
|
||||
}
|
||||
|
||||
// search
|
||||
if filter.Search != "" {
|
||||
dsl.Must(esquery.Match("content_search", filter.Search))
|
||||
}
|
||||
|
||||
return json.Marshal(esquery.Query(dsl))
|
||||
}
|
||||
|
||||
func (ess *ElasticsearchStorage) getByID(filter *nostr.Filter) ([]nostr.Event, error) {
|
||||
got, err := ess.es.Mget(
|
||||
esutil.NewJSONReader(filter),
|
||||
ess.es.Mget.WithIndex(ess.IndexName))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var mgetResponse struct {
|
||||
Docs []struct {
|
||||
Found bool
|
||||
Source IndexedEvent `json:"_source"`
|
||||
}
|
||||
}
|
||||
if err := json.NewDecoder(got.Body).Decode(&mgetResponse); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
events := make([]nostr.Event, 0, len(mgetResponse.Docs))
|
||||
for _, e := range mgetResponse.Docs {
|
||||
if e.Found {
|
||||
events = append(events, e.Source.Event)
|
||||
}
|
||||
}
|
||||
|
||||
return events, nil
|
||||
}
|
||||
|
||||
func (ess *ElasticsearchStorage) QueryEvents(filter *nostr.Filter) ([]nostr.Event, error) {
|
||||
if filter == nil {
|
||||
return nil, errors.New("filter cannot be null")
|
||||
}
|
||||
|
||||
// optimization: get by id
|
||||
if isGetByID(filter) {
|
||||
return ess.getByID(filter)
|
||||
}
|
||||
|
||||
dsl, err := buildDsl(filter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
limit := 1000
|
||||
if filter.Limit > 0 && filter.Limit < limit {
|
||||
limit = filter.Limit
|
||||
}
|
||||
|
||||
es := ess.es
|
||||
res, err := es.Search(
|
||||
es.Search.WithContext(context.Background()),
|
||||
es.Search.WithIndex(ess.IndexName),
|
||||
|
||||
es.Search.WithBody(bytes.NewReader(dsl)),
|
||||
es.Search.WithSize(limit),
|
||||
es.Search.WithSort("event.created_at:desc"),
|
||||
)
|
||||
if err != nil {
|
||||
log.Fatalf("Error getting response: %s", err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
if res.IsError() {
|
||||
txt, _ := io.ReadAll(res.Body)
|
||||
fmt.Println("oh no", string(txt))
|
||||
return nil, fmt.Errorf("%s", txt)
|
||||
}
|
||||
|
||||
var r EsSearchResult
|
||||
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
events := make([]nostr.Event, len(r.Hits.Hits))
|
||||
for i, e := range r.Hits.Hits {
|
||||
events[i] = e.Source.Event
|
||||
}
|
||||
|
||||
return events, nil
|
||||
}
|
||||
|
||||
func isGetByID(filter *nostr.Filter) bool {
|
||||
isGetById := len(filter.IDs) > 0 &&
|
||||
len(filter.Authors) == 0 &&
|
||||
len(filter.Kinds) == 0 &&
|
||||
len(filter.Tags) == 0 &&
|
||||
filter.Since == nil &&
|
||||
filter.Until == nil
|
||||
|
||||
if isGetById {
|
||||
for _, id := range filter.IDs {
|
||||
if len(id) != 64 {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
return isGetById
|
||||
}
|
||||
|
||||
// from: https://stackoverflow.com/a/12754757
|
||||
func toInterfaceSlice(slice interface{}) []interface{} {
|
||||
s := reflect.ValueOf(slice)
|
||||
if s.Kind() != reflect.Slice {
|
||||
panic("InterfaceSlice() given a non-slice type")
|
||||
}
|
||||
|
||||
// Keep the distinction between nil and empty slice input
|
||||
if s.IsNil() {
|
||||
return nil
|
||||
}
|
||||
|
||||
ret := make([]interface{}, s.Len())
|
||||
|
||||
for i := 0; i < s.Len(); i++ {
|
||||
ret[i] = s.Index(i).Interface()
|
||||
}
|
||||
|
||||
return ret
|
||||
}
|
45
storage/elasticsearch/query_test.go
Normal file
45
storage/elasticsearch/query_test.go
Normal file
@ -0,0 +1,45 @@
|
||||
package elasticsearch
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
)
|
||||
|
||||
func TestQuery(t *testing.T) {
|
||||
now := time.Now()
|
||||
yesterday := now.Add(time.Hour * -24)
|
||||
filter := &nostr.Filter{
|
||||
IDs: []string{"abc", "123", "971b9489b4fd4e41a85951607922b982d981fa9d55318bc304f21f390721404c"},
|
||||
Kinds: []int{0, 1},
|
||||
Tags: nostr.TagMap{
|
||||
"e": []string{"abc"},
|
||||
"p": []string{"aaa", "bbb"},
|
||||
},
|
||||
Since: &yesterday,
|
||||
Until: &now,
|
||||
Limit: 100,
|
||||
Search: "other stuff",
|
||||
}
|
||||
|
||||
dsl, err := buildDsl(filter)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
pprint(dsl)
|
||||
|
||||
}
|
||||
|
||||
func pprint(j []byte) {
|
||||
var dst bytes.Buffer
|
||||
err := json.Indent(&dst, j, "", " ")
|
||||
if err != nil {
|
||||
fmt.Println("invalid JSON", err, string(j))
|
||||
} else {
|
||||
fmt.Println(dst.String())
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user