diff --git a/go.mod b/go.mod index 3ff8e5a..45484e9 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( github.com/SaveTheRbtz/generic-sync-map-go v0.0.0-20220414055132-a37292614db8 // indirect github.com/aead/siphash v1.0.1 // indirect github.com/andybalholm/cascadia v1.3.1 // indirect + github.com/aquasecurity/esquery v0.2.0 // indirect github.com/btcsuite/btcd v0.23.1 // indirect github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect github.com/btcsuite/btcd/btcutil v1.1.1 // indirect @@ -52,6 +53,8 @@ require ( github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect github.com/decred/dcrd/lru v1.0.0 // indirect github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c // indirect + github.com/elastic/go-elasticsearch/v7 v7.6.0 // indirect + github.com/fatih/structs v1.1.0 // indirect github.com/go-errors/errors v1.0.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect diff --git a/go.sum b/go.sum index 90bd150..200de7f 100644 --- a/go.sum +++ b/go.sum @@ -26,6 +26,8 @@ github.com/andybalholm/brotli v1.0.3 h1:fpcw+r1N1h0Poc1F/pHbW40cUm/lMEQslZtCkBQ0 github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y= github.com/andybalholm/cascadia v1.3.1 h1:nhxRkql1kdYCc8Snf7D5/D3spOX+dBgjA6u8x004T2c= github.com/andybalholm/cascadia v1.3.1/go.mod h1:R4bJ1UQfqADjvDa4P6HZHLh/3OxWWEqc0Sk8XGwHqvA= +github.com/aquasecurity/esquery v0.2.0 h1:9WWXve95TE8hbm3736WB7nS6Owl8UGDeu+0jiyE9ttA= +github.com/aquasecurity/esquery v0.2.0/go.mod h1:VU+CIFR6C+H142HHZf9RUkp4Eedpo9UrEKeCQHWf9ao= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -125,6 +127,8 @@ github.com/dvyukov/go-fuzz v0.0.0-20210602112143-b1f3d6f4ef4e h1:qTP1telKJHlToHl github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM= github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c h1:onA2RpIyeCPvYAj1LFYiiMTrSpqVINWMfYFRS7lofJs= github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI= +github.com/elastic/go-elasticsearch/v7 v7.6.0 h1:sYpGLpEFHgLUKLsZUBfuaVI9QgHjS3JdH9fX4/z8QI8= +github.com/elastic/go-elasticsearch/v7 v7.6.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= github.com/elastic/go-elasticsearch/v8 v8.6.0 h1:xMaSe8jIh7NHzmNo9YBkewmaD2Pr+tX+zLkXxhieny4= github.com/elastic/go-elasticsearch/v8 v8.6.0/go.mod h1:Usvydt+x0dv9a1TzEUaovqbJor8rmOHy5dSmPeMAE2k= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -132,6 +136,7 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw= github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8= +github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/fergusstrange/embedded-postgres v1.10.0 h1:YnwF6xAQYmKLAXXrrRx4rHDLih47YJwVPvg8jeKfdNg= github.com/flosch/pongo2 v0.0.0-20190707114632-bbf5a6c351f4/go.mod h1:T9YF2M40nIgbVgp3rreNmTged+9HrbNTIQf1PsaIiTA= @@ -223,6 +228,8 @@ github.com/jb55/lnsocket/go v0.0.0-20220725174341-b98b5cd37bb6/go.mod h1:atFK/q4 github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= +github.com/jgroeneveld/schema v1.0.0/go.mod h1:M14lv7sNMtGvo3ops1MwslaSYgDYxrSmbzWIQ0Mr5rs= +github.com/jgroeneveld/trial v2.0.0+incompatible/go.mod h1:I6INLW96EN8WysNBXUFI3M4RIC8ePg9ntAc/Wy+U/+M= github.com/jmoiron/sqlx v1.3.1 h1:aLN7YINNZ7cYOPK3QC83dbM6KT0NMqVMw961TqrejlE= github.com/jmoiron/sqlx v1.3.1/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ= github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9qUBdQ= diff --git a/search/README.md b/search/README.md index e42ffb6..631f152 100644 --- a/search/README.md +++ b/search/README.md @@ -1,3 +1,9 @@ +# Search Relay + +Uses ElasticSearch storage backend for all queries, with some basic full text search support. + +Index some events: + ``` bzip2 -cd nostr-wellorder-early-1m-v1.jsonl.bz2 | \ jq -c '["EVENT", .]' | \ @@ -5,16 +11,16 @@ bzip2 -cd nostr-wellorder-early-1m-v1.jsonl.bz2 | \ websocat -n -B 200000 ws://127.0.0.1:7447 ``` +Do a search: + ``` echo '["REQ", "asdf", {"search": "steve", "kinds": [0]}]' | websocat -n ws://127.0.0.1:7447 ``` -todo: +## Customize -* index `content_search` field -* support search queries -* some kind of ranking signal (based on pubkey) -* better config for ES: adjust bulk indexer settings, use custom mapping? +Currently the indexing is very basic: It will index the `contents` field for all events where kind != 4. +Some additional mapping and pre-processing could add better support for different content types. +See comments in `storage/elasticsearch/elasticsearch.go`. -* ES DSL string builder might not escape json strings correctly... diff --git a/storage/elasticsearch/elasticsearch.go b/storage/elasticsearch/elasticsearch.go index 3d8a4d1..12103a8 100644 --- a/storage/elasticsearch/elasticsearch.go +++ b/storage/elasticsearch/elasticsearch.go @@ -6,7 +6,6 @@ import ( "encoding/json" "fmt" "io" - "log" "os" "strings" "time" @@ -47,12 +46,18 @@ var indexMapping = ` ` type ElasticsearchStorage struct { - es *elasticsearch.Client - bi esutil.BulkIndexer - indexName string + IndexName string + + es *elasticsearch.Client + bi esutil.BulkIndexer } func (ess *ElasticsearchStorage) Init() error { + + if ess.IndexName == "" { + ess.IndexName = "events" + } + cfg := elasticsearch.Config{} if x := os.Getenv("ES_URL"); x != "" { cfg.Addresses = strings.Split(x, ",") @@ -62,10 +67,7 @@ func (ess *ElasticsearchStorage) Init() error { return err } - // todo: config + mapping settings - ess.indexName = "test3" - - res, err := es.Indices.Create(ess.indexName, es.Indices.Create.WithBody(strings.NewReader(indexMapping))) + res, err := es.Indices.Create(ess.IndexName, es.Indices.Create.WithBody(strings.NewReader(indexMapping))) if err != nil { return err } @@ -79,13 +81,13 @@ func (ess *ElasticsearchStorage) Init() error { // bulk indexer bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ - Index: ess.indexName, // The default index name - Client: es, // The Elasticsearch client - NumWorkers: 2, // The number of worker goroutines - FlushInterval: 3 * time.Second, // The periodic flush interval + Index: ess.IndexName, + Client: es, + NumWorkers: 2, + FlushInterval: 3 * time.Second, }) if err != nil { - log.Fatalf("Error creating the indexer: %s", err) + return fmt.Errorf("error creating the indexer: %s", err) } ess.es = es @@ -127,9 +129,6 @@ func (ess *ElasticsearchStorage) DeleteEvent(id string, pubkey string) error { } err = <-done - if err != nil { - log.Println("DEL", err) - } return err } @@ -182,8 +181,5 @@ func (ess *ElasticsearchStorage) SaveEvent(event *nostr.Event) error { } err = <-done - if err != nil { - log.Println("SAVE", err) - } return err } diff --git a/storage/elasticsearch/query.go b/storage/elasticsearch/query.go index 03f99db..00df2e1 100644 --- a/storage/elasticsearch/query.go +++ b/storage/elasticsearch/query.go @@ -8,8 +8,9 @@ import ( "fmt" "io" "log" - "strings" + "reflect" + "github.com/aquasecurity/esquery" "github.com/elastic/go-elasticsearch/v8/esutil" "github.com/nbd-wtf/go-nostr" ) @@ -28,87 +29,67 @@ type EsSearchResult struct { } } -func buildDsl(filter *nostr.Filter) string { - b := &strings.Builder{} - b.WriteString(`{"query": {"bool": {"filter": {"bool": {"must": [`) +func buildDsl(filter *nostr.Filter) ([]byte, error) { + dsl := esquery.Bool() prefixFilter := func(fieldName string, values []string) { - b.WriteString(`{"bool": {"should": [`) - for idx, val := range values { - if idx > 0 { - b.WriteRune(',') - } - op := "term" - if len(val) < 64 { - op = "prefix" - } - b.WriteString(fmt.Sprintf(`{"%s": {"event.%s": %q}}`, op, fieldName, val)) + if len(values) == 0 { + return } - b.WriteString(`]}},`) + prefixQ := esquery.Bool() + for _, v := range values { + if len(v) < 64 { + prefixQ.Should(esquery.Prefix(fieldName, v)) + } else { + prefixQ.Should(esquery.Term(fieldName, v)) + } + } + dsl.Must(prefixQ) } // ids - prefixFilter("id", filter.IDs) + prefixFilter("event.id", filter.IDs) // authors - prefixFilter("pubkey", filter.Authors) + prefixFilter("event.pubkey", filter.Authors) // kinds if len(filter.Kinds) > 0 { - k, _ := json.Marshal(filter.Kinds) - b.WriteString(fmt.Sprintf(`{"terms": {"event.kind": %s}},`, k)) + dsl.Must(esquery.Terms("event.kind", toInterfaceSlice(filter.Kinds)...)) } // tags - { - b.WriteString(`{"bool": {"should": [`) - commaIdx := 0 + if len(filter.Tags) > 0 { + tagQ := esquery.Bool() for char, terms := range filter.Tags { - if len(terms) == 0 { - continue - } - if commaIdx > 0 { - b.WriteRune(',') - } - commaIdx++ - b.WriteString(`{"bool": {"must": [`) - for _, t := range terms { - b.WriteString(fmt.Sprintf(`{"term": {"event.tags": %q}},`, t)) - } - // add the tag type at the end - b.WriteString(fmt.Sprintf(`{"term": {"event.tags": %q}}`, char)) - b.WriteString(`]}}`) + vs := toInterfaceSlice(append(terms, char)) + tagQ.Should(esquery.Terms("event.tags", vs...)) } - b.WriteString(`]}},`) + dsl.Must(tagQ) } // since if filter.Since != nil { - b.WriteString(fmt.Sprintf(`{"range": {"event.created_at": {"gt": %d}}},`, filter.Since.Unix())) + dsl.Must(esquery.Range("event.created_at").Gt(filter.Since.Unix())) } // until if filter.Until != nil { - b.WriteString(fmt.Sprintf(`{"range": {"event.created_at": {"lt": %d}}},`, filter.Until.Unix())) + dsl.Must(esquery.Range("event.created_at").Lt(filter.Until.Unix())) } // search if filter.Search != "" { - b.WriteString(fmt.Sprintf(`{"match": {"content_search": {"query": %s}}},`, filter.Search)) + dsl.Must(esquery.Match("content_search", filter.Search)) } - // all blocks have a trailing comma... - // add a match_all "noop" at the end - // so json is valid - b.WriteString(`{"match_all": {}}`) - b.WriteString(`]}}}}}`) - return b.String() + return json.Marshal(esquery.Query(dsl)) } func (ess *ElasticsearchStorage) getByID(filter *nostr.Filter) ([]nostr.Event, error) { got, err := ess.es.Mget( esutil.NewJSONReader(filter), - ess.es.Mget.WithIndex(ess.indexName)) + ess.es.Mget.WithIndex(ess.IndexName)) if err != nil { return nil, err } @@ -134,8 +115,6 @@ func (ess *ElasticsearchStorage) getByID(filter *nostr.Filter) ([]nostr.Event, e } func (ess *ElasticsearchStorage) QueryEvents(filter *nostr.Filter) ([]nostr.Event, error) { - // Perform the search request... - // need to build up query body... if filter == nil { return nil, errors.New("filter cannot be null") } @@ -145,8 +124,10 @@ func (ess *ElasticsearchStorage) QueryEvents(filter *nostr.Filter) ([]nostr.Even return ess.getByID(filter) } - dsl := buildDsl(filter) - pprint([]byte(dsl)) + dsl, err := buildDsl(filter) + if err != nil { + return nil, err + } limit := 1000 if filter.Limit > 0 && filter.Limit < limit { @@ -156,14 +137,11 @@ func (ess *ElasticsearchStorage) QueryEvents(filter *nostr.Filter) ([]nostr.Even es := ess.es res, err := es.Search( es.Search.WithContext(context.Background()), - es.Search.WithIndex(ess.indexName), + es.Search.WithIndex(ess.IndexName), - es.Search.WithBody(strings.NewReader(dsl)), + es.Search.WithBody(bytes.NewReader(dsl)), es.Search.WithSize(limit), es.Search.WithSort("event.created_at:desc"), - - // es.Search.WithTrackTotalHits(true), - // es.Search.WithPretty(), ) if err != nil { log.Fatalf("Error getting response: %s", err) @@ -207,12 +185,23 @@ func isGetByID(filter *nostr.Filter) bool { return isGetById } -func pprint(j []byte) { - var dst bytes.Buffer - err := json.Indent(&dst, j, "", " ") - if err != nil { - fmt.Println("invalid JSON", err, string(j)) - } else { - fmt.Println(dst.String()) +// from: https://stackoverflow.com/a/12754757 +func toInterfaceSlice(slice interface{}) []interface{} { + s := reflect.ValueOf(slice) + if s.Kind() != reflect.Slice { + panic("InterfaceSlice() given a non-slice type") } + + // Keep the distinction between nil and empty slice input + if s.IsNil() { + return nil + } + + ret := make([]interface{}, s.Len()) + + for i := 0; i < s.Len(); i++ { + ret[i] = s.Index(i).Interface() + } + + return ret } diff --git a/storage/elasticsearch/query_test.go b/storage/elasticsearch/query_test.go index ce21372..f30d390 100644 --- a/storage/elasticsearch/query_test.go +++ b/storage/elasticsearch/query_test.go @@ -1,7 +1,9 @@ package elasticsearch import ( + "bytes" "encoding/json" + "fmt" "testing" "time" @@ -12,34 +14,32 @@ func TestQuery(t *testing.T) { now := time.Now() yesterday := now.Add(time.Hour * -24) filter := &nostr.Filter{ - // IDs: []string{"abc", "123", "971b9489b4fd4e41a85951607922b982d981fa9d55318bc304f21f390721404c"}, + IDs: []string{"abc", "123", "971b9489b4fd4e41a85951607922b982d981fa9d55318bc304f21f390721404c"}, Kinds: []int{0, 1}, - // Tags: nostr.TagMap{ - // "a": []string{"abc"}, - // "b": []string{"aaa", "bbb"}, - // }, - Since: &yesterday, - Until: &now, - Limit: 100, + Tags: nostr.TagMap{ + "e": []string{"abc"}, + "p": []string{"aaa", "bbb"}, + }, + Since: &yesterday, + Until: &now, + Limit: 100, + Search: "other stuff", } - dsl := buildDsl(filter) - pprint([]byte(dsl)) - - if !json.Valid([]byte(dsl)) { - t.Fail() + dsl, err := buildDsl(filter) + if err != nil { + t.Fatal(err) } + pprint(dsl) - // "integration" test - // ess := &ElasticsearchStorage{} - // err := ess.Init() - // if err != nil { - // t.Error(err) - // } - - // found, err := ess.QueryEvents(filter) - // if err != nil { - // t.Error(err) - // } - // fmt.Println(found) +} + +func pprint(j []byte) { + var dst bytes.Buffer + err := json.Indent(&dst, j, "", " ") + if err != nil { + fmt.Println("invalid JSON", err, string(j)) + } else { + fmt.Println(dst.String()) + } }