From d306c03369dca6e277615f6f746076db0063d7d6 Mon Sep 17 00:00:00 2001 From: Steve Perkins Date: Mon, 6 Feb 2023 13:30:29 -0500 Subject: [PATCH 1/5] basic elasticsearch storage example --- go.mod | 3 + go.sum | 6 + search/docker-compose.yml | 43 +++++ search/main.go | 65 +++++++ storage/elasticsearch/elasticsearch.go | 250 +++++++++++++++++++++++++ storage/elasticsearch/query_test.go | 46 +++++ 6 files changed, 413 insertions(+) create mode 100644 search/docker-compose.yml create mode 100644 search/main.go create mode 100644 storage/elasticsearch/elasticsearch.go create mode 100644 storage/elasticsearch/query_test.go diff --git a/go.mod b/go.mod index 08d23ee..afc7fd4 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,8 @@ go 1.18 require ( github.com/PuerkitoBio/goquery v1.8.0 github.com/cockroachdb/pebble v0.0.0-20220723153705-3fc374e4dc66 + github.com/elastic/go-elasticsearch v0.0.0 + github.com/elastic/go-elasticsearch/v8 v8.6.0 github.com/gorilla/mux v1.8.0 github.com/gorilla/websocket v1.4.2 github.com/grokify/html-strip-tags-go v0.0.1 @@ -50,6 +52,7 @@ require ( github.com/decred/dcrd/crypto/blake256 v1.0.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect github.com/decred/dcrd/lru v1.0.0 // indirect + github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c // indirect github.com/go-errors/errors v1.0.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect diff --git a/go.sum b/go.sum index a2f502e..262025e 100644 --- a/go.sum +++ b/go.sum @@ -123,6 +123,12 @@ github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4 github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dvyukov/go-fuzz v0.0.0-20210602112143-b1f3d6f4ef4e h1:qTP1telKJHlToHlwPQNmVg4yfMDMHe4Z3SYmzkrvA2M= github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM= +github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c h1:onA2RpIyeCPvYAj1LFYiiMTrSpqVINWMfYFRS7lofJs= +github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI= +github.com/elastic/go-elasticsearch v0.0.0 h1:Pd5fqOuBxKxv83b0+xOAJDAkziWYwFinWnBO0y+TZaA= +github.com/elastic/go-elasticsearch v0.0.0/go.mod h1:TkBSJBuTyFdBnrNqoPc54FN0vKf5c04IdM4zuStJ7xg= +github.com/elastic/go-elasticsearch/v8 v8.6.0 h1:xMaSe8jIh7NHzmNo9YBkewmaD2Pr+tX+zLkXxhieny4= +github.com/elastic/go-elasticsearch/v8 v8.6.0/go.mod h1:Usvydt+x0dv9a1TzEUaovqbJor8rmOHy5dSmPeMAE2k= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= diff --git a/search/docker-compose.yml b/search/docker-compose.yml new file mode 100644 index 0000000..40479cb --- /dev/null +++ b/search/docker-compose.yml @@ -0,0 +1,43 @@ +version: "3.8" +services: + + # relay: + # build: + # context: ../ + # dockerfile: ./basic/Dockerfile + # environment: + # PORT: 2700 + # POSTGRESQL_DATABASE: postgres://nostr:nostr@postgres:5432/nostr?sslmode=disable + # depends_on: + # postgres: + # condition: service_healthy + # ports: + # - 2700:2700 + # command: "./basic/relayer-basic" + + elasticsearch: + container_name: elasticsearch + image: docker.elastic.co/elasticsearch/elasticsearch:8.6.0 + restart: always + environment: + - network.host=0.0.0.0 + - discovery.type=single-node + - cluster.name=docker-cluster + - node.name=cluster1-node1 + - xpack.license.self_generated.type=basic + - xpack.security.enabled=false + - "ES_JAVA_OPTS=-Xms${ES_MEM:-4g} -Xmx${ES_MEM:-4g}" + ports: + - '127.0.0.1:9200:9200' + ulimits: + memlock: + soft: -1 + hard: -1 + healthcheck: + test: + ["CMD-SHELL", "curl --silent --fail elasticsearch:9200/_cluster/health || exit 1"] + interval: 10s + timeout: 5s + retries: 5 + + diff --git a/search/main.go b/search/main.go new file mode 100644 index 0000000..25947d2 --- /dev/null +++ b/search/main.go @@ -0,0 +1,65 @@ +package main + +import ( + "encoding/json" + "fmt" + "log" + + "github.com/fiatjaf/relayer" + "github.com/fiatjaf/relayer/storage/elasticsearch" + "github.com/kelseyhightower/envconfig" + "github.com/nbd-wtf/go-nostr" +) + +type Relay struct { + storage *elasticsearch.ElasticsearchStorage +} + +func (r *Relay) Name() string { + return "BasicRelay" +} + +func (r *Relay) Storage() relayer.Storage { + return r.storage +} + +func (r *Relay) OnInitialized(*relayer.Server) {} + +func (r *Relay) Init() error { + err := envconfig.Process("", r) + if err != nil { + return fmt.Errorf("couldn't process envconfig: %w", err) + } + + return nil +} + +func (r *Relay) AcceptEvent(evt *nostr.Event) bool { + // block events that are too large + jsonb, _ := json.Marshal(evt) + if len(jsonb) > 10000 { + return false + } + + return true +} + +func (r *Relay) BeforeSave(evt *nostr.Event) { + // do nothing +} + +func (r *Relay) AfterSave(evt *nostr.Event) { + +} + +func main() { + r := Relay{} + if err := envconfig.Process("", &r); err != nil { + log.Fatalf("failed to read from env: %v", err) + return + } + r.storage = &elasticsearch.ElasticsearchStorage{} + if err := relayer.Start(&r); err != nil { + log.Fatalf("server terminated: %v", err) + } +} diff --git a/storage/elasticsearch/elasticsearch.go b/storage/elasticsearch/elasticsearch.go new file mode 100644 index 0000000..acec779 --- /dev/null +++ b/storage/elasticsearch/elasticsearch.go @@ -0,0 +1,250 @@ +package elasticsearch + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "strings" + + "github.com/elastic/go-elasticsearch/esapi" + "github.com/elastic/go-elasticsearch/v8" + "github.com/nbd-wtf/go-nostr" +) + +/* +1. create index with mapping in Init +2. implement delete +3. build query in QueryEvents +4. implement replaceable events +*/ + +var indexMapping = ` +{ + "settings": { + "number_of_shards": 1, + "number_of_replicas": 0 + }, + "mappings": { + "dynamic": false, + "properties": { + "id": {"type": "keyword"}, + "pubkey": {"type": "keyword"}, + "kind": {"type": "integer"}, + "tags": {"type": "keyword"}, + "created_at": {"type": "date"} + } + } +} +` + +type ElasticsearchStorage struct { + es *elasticsearch.Client + indexName string +} + +func (ess *ElasticsearchStorage) Init() error { + es, err := elasticsearch.NewDefaultClient() + if err != nil { + return err + } + // log.Println(elasticsearch.Version) + // log.Println(es.Info()) + + // todo: config + ess.indexName = "test" + + // todo: don't delete index every time + // es.Indices.Delete([]string{ess.indexName}) + + res, err := es.Indices.Create(ess.indexName, es.Indices.Create.WithBody(strings.NewReader(indexMapping))) + if err != nil { + return err + } + if res.IsError() { + body, _ := io.ReadAll(res.Body) + txt := string(body) + if !strings.Contains(txt, "resource_already_exists_exception") { + return fmt.Errorf("%s", txt) + } + } + + ess.es = es + + return nil +} + +type EsSearchResult struct { + Took int + TimedOut bool `json:"timed_out"` + Hits struct { + Total struct { + Value int + Relation string + } + Hits []struct { + Source nostr.Event `json:"_source"` + } + } +} + +func buildDsl(filter *nostr.Filter) string { + b := &strings.Builder{} + b.WriteString(`{"query": {"bool": {"filter": {"bool": {"must": [`) + + prefixFilter := func(fieldName string, values []string) { + b.WriteString(`{"bool": {"should": [`) + for idx, val := range values { + if idx > 0 { + b.WriteRune(',') + } + op := "term" + if len(val) < 64 { + op = "prefix" + } + b.WriteString(fmt.Sprintf(`{"%s": {"%s": %q}}`, op, fieldName, val)) + } + b.WriteString(`]}},`) + } + + // ids + prefixFilter("id", filter.IDs) + + // authors + prefixFilter("pubkey", filter.Authors) + + // kinds + if len(filter.Kinds) > 0 { + k, _ := json.Marshal(filter.Kinds) + b.WriteString(fmt.Sprintf(`{"terms": {"kind": %s}},`, k)) + } + + // tags + { + b.WriteString(`{"bool": {"should": [`) + commaIdx := 0 + for char, terms := range filter.Tags { + if len(terms) == 0 { + continue + } + if commaIdx > 0 { + b.WriteRune(',') + } + commaIdx++ + b.WriteString(`{"bool": {"must": [`) + for _, t := range terms { + b.WriteString(fmt.Sprintf(`{"term": {"tags": %q}},`, t)) + } + // add the tag type at the end + b.WriteString(fmt.Sprintf(`{"term": {"tags": %q}}`, char)) + b.WriteString(`]}}`) + } + b.WriteString(`]}},`) + } + + // since + if filter.Since != nil { + b.WriteString(fmt.Sprintf(`{"range": {"created_at": {"gt": %d}}},`, filter.Since.Unix())) + } + + // until + if filter.Until != nil { + b.WriteString(fmt.Sprintf(`{"range": {"created_at": {"lt": %d}}},`, filter.Until.Unix())) + } + + // all blocks have a trailing comma... + // add a match_all "noop" at the end + // so json is valid + b.WriteString(`{"match_all": {}}`) + b.WriteString(`]}}}}}`) + return b.String() +} + +func (ess *ElasticsearchStorage) QueryEvents(filter *nostr.Filter) ([]nostr.Event, error) { + // Perform the search request... + // need to build up query body... + if filter == nil { + return nil, errors.New("filter cannot be null") + } + + dsl := buildDsl(filter) + // pprint([]byte(dsl)) + + es := ess.es + res, err := es.Search( + es.Search.WithContext(context.Background()), + es.Search.WithIndex(ess.indexName), + + es.Search.WithBody(strings.NewReader(dsl)), + es.Search.WithSize(filter.Limit), + es.Search.WithSort("created_at:desc"), + + es.Search.WithTrackTotalHits(true), + es.Search.WithPretty(), + ) + if err != nil { + log.Fatalf("Error getting response: %s", err) + } + defer res.Body.Close() + + if res.IsError() { + txt, _ := io.ReadAll(res.Body) + fmt.Println("oh no", string(txt)) + return nil, fmt.Errorf("%s", txt) + } + + var r EsSearchResult + if err := json.NewDecoder(res.Body).Decode(&r); err != nil { + return nil, err + } + + events := make([]nostr.Event, len(r.Hits.Hits)) + for i, e := range r.Hits.Hits { + events[i] = e.Source + } + + return events, nil +} + +func (ess *ElasticsearchStorage) DeleteEvent(id string, pubkey string) error { + // todo: is pubkey match required? + res, err := ess.es.Delete(ess.indexName, id) + if err != nil { + return err + } + if res.IsError() { + txt, _ := io.ReadAll(res.Body) + return fmt.Errorf("%s", txt) + } + return nil +} + +func (ess *ElasticsearchStorage) SaveEvent(event *nostr.Event) error { + data, err := json.Marshal(event) + if err != nil { + return err + } + + req := esapi.IndexRequest{ + Index: ess.indexName, + DocumentID: event.ID, + Body: bytes.NewReader(data), + // Refresh: "true", + } + + _, err = req.Do(context.Background(), ess.es) + return err +} + +func pprint(j []byte) { + var dst bytes.Buffer + err := json.Indent(&dst, j, "", " ") + if err != nil { + fmt.Println("invalid JSON", err, string(j)) + } else { + fmt.Println(dst.String()) + } +} diff --git a/storage/elasticsearch/query_test.go b/storage/elasticsearch/query_test.go new file mode 100644 index 0000000..e10fc45 --- /dev/null +++ b/storage/elasticsearch/query_test.go @@ -0,0 +1,46 @@ +package elasticsearch + +import ( + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/nbd-wtf/go-nostr" +) + +func TestQuery(t *testing.T) { + now := time.Now() + yesterday := now.Add(time.Hour * -24) + filter := &nostr.Filter{ + // IDs: []string{"abc", "123", "971b9489b4fd4e41a85951607922b982d981fa9d55318bc304f21f390721404c"}, + Kinds: []int{0, 1}, + // Tags: nostr.TagMap{ + // "a": []string{"abc"}, + // "b": []string{"aaa", "bbb"}, + // }, + Since: &yesterday, + Until: &now, + Limit: 100, + } + + dsl := buildDsl(filter) + pprint([]byte(dsl)) + + if !json.Valid([]byte(dsl)) { + t.Fail() + } + + // "integration" test + ess := &ElasticsearchStorage{} + err := ess.Init() + if err != nil { + t.Error(err) + } + + found, err := ess.QueryEvents(filter) + if err != nil { + t.Error(err) + } + fmt.Println(found) +} From 34a21cb3740136d95a6e3c7fefb446c37d130a54 Mon Sep 17 00:00:00 2001 From: Steve Perkins Date: Tue, 7 Feb 2023 10:46:27 -0500 Subject: [PATCH 2/5] Use bulk indexer for writes. Special case get by ID. --- search/README.md | 13 ++ search/main.go | 2 +- storage/elasticsearch/elasticsearch.go | 237 ++++++++----------------- storage/elasticsearch/query.go | 213 ++++++++++++++++++++++ storage/elasticsearch/query_test.go | 21 ++- 5 files changed, 312 insertions(+), 174 deletions(-) create mode 100644 search/README.md create mode 100644 storage/elasticsearch/query.go diff --git a/search/README.md b/search/README.md new file mode 100644 index 0000000..f3ed003 --- /dev/null +++ b/search/README.md @@ -0,0 +1,13 @@ +``` +bzip2 -cd nostr-wellorder-early-1m-v1.jsonl.bz2 | \ + jq -c '["EVENT", .]' | \ + awk 'length($0)<131072' | \ + websocat -n -B 200000 ws://127.0.0.1:7447 +``` + +todo: + +* index `content_search` field +* support search queries +* some kind of ranking signal (based on pubkey) +* better config for ES: adjust bulk indexer settings, use custom mapping? diff --git a/search/main.go b/search/main.go index 25947d2..16658ca 100644 --- a/search/main.go +++ b/search/main.go @@ -37,7 +37,7 @@ func (r *Relay) Init() error { func (r *Relay) AcceptEvent(evt *nostr.Event) bool { // block events that are too large jsonb, _ := json.Marshal(evt) - if len(jsonb) > 10000 { + if len(jsonb) > 100000 { return false } diff --git a/storage/elasticsearch/elasticsearch.go b/storage/elasticsearch/elasticsearch.go index acec779..b443335 100644 --- a/storage/elasticsearch/elasticsearch.go +++ b/storage/elasticsearch/elasticsearch.go @@ -4,24 +4,17 @@ import ( "bytes" "context" "encoding/json" - "errors" "fmt" "io" "log" "strings" + "time" - "github.com/elastic/go-elasticsearch/esapi" "github.com/elastic/go-elasticsearch/v8" + "github.com/elastic/go-elasticsearch/v8/esutil" "github.com/nbd-wtf/go-nostr" ) -/* -1. create index with mapping in Init -2. implement delete -3. build query in QueryEvents -4. implement replaceable events -*/ - var indexMapping = ` { "settings": { @@ -43,6 +36,7 @@ var indexMapping = ` type ElasticsearchStorage struct { es *elasticsearch.Client + bi esutil.BulkIndexer indexName string } @@ -72,154 +66,60 @@ func (ess *ElasticsearchStorage) Init() error { } } + // bulk indexer + bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ + Index: ess.indexName, // The default index name + Client: es, // The Elasticsearch client + NumWorkers: 2, // The number of worker goroutines + FlushInterval: 3 * time.Second, // The periodic flush interval + }) + if err != nil { + log.Fatalf("Error creating the indexer: %s", err) + } + ess.es = es + ess.bi = bi return nil } -type EsSearchResult struct { - Took int - TimedOut bool `json:"timed_out"` - Hits struct { - Total struct { - Value int - Relation string - } - Hits []struct { - Source nostr.Event `json:"_source"` - } - } -} - -func buildDsl(filter *nostr.Filter) string { - b := &strings.Builder{} - b.WriteString(`{"query": {"bool": {"filter": {"bool": {"must": [`) - - prefixFilter := func(fieldName string, values []string) { - b.WriteString(`{"bool": {"should": [`) - for idx, val := range values { - if idx > 0 { - b.WriteRune(',') - } - op := "term" - if len(val) < 64 { - op = "prefix" - } - b.WriteString(fmt.Sprintf(`{"%s": {"%s": %q}}`, op, fieldName, val)) - } - b.WriteString(`]}},`) - } - - // ids - prefixFilter("id", filter.IDs) - - // authors - prefixFilter("pubkey", filter.Authors) - - // kinds - if len(filter.Kinds) > 0 { - k, _ := json.Marshal(filter.Kinds) - b.WriteString(fmt.Sprintf(`{"terms": {"kind": %s}},`, k)) - } - - // tags - { - b.WriteString(`{"bool": {"should": [`) - commaIdx := 0 - for char, terms := range filter.Tags { - if len(terms) == 0 { - continue - } - if commaIdx > 0 { - b.WriteRune(',') - } - commaIdx++ - b.WriteString(`{"bool": {"must": [`) - for _, t := range terms { - b.WriteString(fmt.Sprintf(`{"term": {"tags": %q}},`, t)) - } - // add the tag type at the end - b.WriteString(fmt.Sprintf(`{"term": {"tags": %q}}`, char)) - b.WriteString(`]}}`) - } - b.WriteString(`]}},`) - } - - // since - if filter.Since != nil { - b.WriteString(fmt.Sprintf(`{"range": {"created_at": {"gt": %d}}},`, filter.Since.Unix())) - } - - // until - if filter.Until != nil { - b.WriteString(fmt.Sprintf(`{"range": {"created_at": {"lt": %d}}},`, filter.Until.Unix())) - } - - // all blocks have a trailing comma... - // add a match_all "noop" at the end - // so json is valid - b.WriteString(`{"match_all": {}}`) - b.WriteString(`]}}}}}`) - return b.String() -} - -func (ess *ElasticsearchStorage) QueryEvents(filter *nostr.Filter) ([]nostr.Event, error) { - // Perform the search request... - // need to build up query body... - if filter == nil { - return nil, errors.New("filter cannot be null") - } - - dsl := buildDsl(filter) - // pprint([]byte(dsl)) - - es := ess.es - res, err := es.Search( - es.Search.WithContext(context.Background()), - es.Search.WithIndex(ess.indexName), - - es.Search.WithBody(strings.NewReader(dsl)), - es.Search.WithSize(filter.Limit), - es.Search.WithSort("created_at:desc"), - - es.Search.WithTrackTotalHits(true), - es.Search.WithPretty(), - ) - if err != nil { - log.Fatalf("Error getting response: %s", err) - } - defer res.Body.Close() - - if res.IsError() { - txt, _ := io.ReadAll(res.Body) - fmt.Println("oh no", string(txt)) - return nil, fmt.Errorf("%s", txt) - } - - var r EsSearchResult - if err := json.NewDecoder(res.Body).Decode(&r); err != nil { - return nil, err - } - - events := make([]nostr.Event, len(r.Hits.Hits)) - for i, e := range r.Hits.Hits { - events[i] = e.Source - } - - return events, nil -} - func (ess *ElasticsearchStorage) DeleteEvent(id string, pubkey string) error { // todo: is pubkey match required? - res, err := ess.es.Delete(ess.indexName, id) + + done := make(chan error) + err := ess.bi.Add( + context.Background(), + esutil.BulkIndexerItem{ + Action: "delete", + DocumentID: id, + OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { + close(done) + }, + OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { + if err != nil { + done <- err + } else { + // ok if deleted item not found + if res.Status == 404 { + close(done) + return + } + txt, _ := json.Marshal(res) + err := fmt.Errorf("ERROR: %s", txt) + done <- err + } + }, + }, + ) if err != nil { return err } - if res.IsError() { - txt, _ := io.ReadAll(res.Body) - return fmt.Errorf("%s", txt) + + err = <-done + if err != nil { + log.Println("DEL", err) } - return nil + return err } func (ess *ElasticsearchStorage) SaveEvent(event *nostr.Event) error { @@ -228,23 +128,36 @@ func (ess *ElasticsearchStorage) SaveEvent(event *nostr.Event) error { return err } - req := esapi.IndexRequest{ - Index: ess.indexName, - DocumentID: event.ID, - Body: bytes.NewReader(data), - // Refresh: "true", + done := make(chan error) + + // adapted from: + // https://github.com/elastic/go-elasticsearch/blob/main/_examples/bulk/indexer.go#L196 + err = ess.bi.Add( + context.Background(), + esutil.BulkIndexerItem{ + Action: "index", + DocumentID: event.ID, + Body: bytes.NewReader(data), + OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { + close(done) + }, + OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { + if err != nil { + done <- err + } else { + err := fmt.Errorf("ERROR: %s: %s", res.Error.Type, res.Error.Reason) + done <- err + } + }, + }, + ) + if err != nil { + return err } - _, err = req.Do(context.Background(), ess.es) + err = <-done + if err != nil { + log.Println("SAVE", err) + } return err } - -func pprint(j []byte) { - var dst bytes.Buffer - err := json.Indent(&dst, j, "", " ") - if err != nil { - fmt.Println("invalid JSON", err, string(j)) - } else { - fmt.Println(dst.String()) - } -} diff --git a/storage/elasticsearch/query.go b/storage/elasticsearch/query.go new file mode 100644 index 0000000..6b3bd59 --- /dev/null +++ b/storage/elasticsearch/query.go @@ -0,0 +1,213 @@ +package elasticsearch + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "strings" + + "github.com/elastic/go-elasticsearch/v8/esutil" + "github.com/nbd-wtf/go-nostr" +) + +type EsSearchResult struct { + Took int + TimedOut bool `json:"timed_out"` + Hits struct { + Total struct { + Value int + Relation string + } + Hits []struct { + Source nostr.Event `json:"_source"` + } + } +} + +func buildDsl(filter *nostr.Filter) string { + b := &strings.Builder{} + b.WriteString(`{"query": {"bool": {"filter": {"bool": {"must": [`) + + prefixFilter := func(fieldName string, values []string) { + b.WriteString(`{"bool": {"should": [`) + for idx, val := range values { + if idx > 0 { + b.WriteRune(',') + } + op := "term" + if len(val) < 64 { + op = "prefix" + } + b.WriteString(fmt.Sprintf(`{"%s": {"%s": %q}}`, op, fieldName, val)) + } + b.WriteString(`]}},`) + } + + // ids + prefixFilter("id", filter.IDs) + + // authors + prefixFilter("pubkey", filter.Authors) + + // kinds + if len(filter.Kinds) > 0 { + k, _ := json.Marshal(filter.Kinds) + b.WriteString(fmt.Sprintf(`{"terms": {"kind": %s}},`, k)) + } + + // tags + { + b.WriteString(`{"bool": {"should": [`) + commaIdx := 0 + for char, terms := range filter.Tags { + if len(terms) == 0 { + continue + } + if commaIdx > 0 { + b.WriteRune(',') + } + commaIdx++ + b.WriteString(`{"bool": {"must": [`) + for _, t := range terms { + b.WriteString(fmt.Sprintf(`{"term": {"tags": %q}},`, t)) + } + // add the tag type at the end + b.WriteString(fmt.Sprintf(`{"term": {"tags": %q}}`, char)) + b.WriteString(`]}}`) + } + b.WriteString(`]}},`) + } + + // since + if filter.Since != nil { + b.WriteString(fmt.Sprintf(`{"range": {"created_at": {"gt": %d}}},`, filter.Since.Unix())) + } + + // until + if filter.Until != nil { + b.WriteString(fmt.Sprintf(`{"range": {"created_at": {"lt": %d}}},`, filter.Until.Unix())) + } + + // all blocks have a trailing comma... + // add a match_all "noop" at the end + // so json is valid + b.WriteString(`{"match_all": {}}`) + b.WriteString(`]}}}}}`) + return b.String() +} + +func (ess *ElasticsearchStorage) getByID(filter *nostr.Filter) ([]nostr.Event, error) { + got, err := ess.es.Mget( + esutil.NewJSONReader(filter), + ess.es.Mget.WithIndex(ess.indexName)) + if err != nil { + return nil, err + } + + var mgetResponse struct { + Docs []struct { + Found bool + Source nostr.Event `json:"_source"` + } + } + if err := json.NewDecoder(got.Body).Decode(&mgetResponse); err != nil { + return nil, err + } + + events := make([]nostr.Event, 0, len(mgetResponse.Docs)) + for _, e := range mgetResponse.Docs { + if e.Found { + events = append(events, e.Source) + } + } + + return events, nil +} + +func (ess *ElasticsearchStorage) QueryEvents(filter *nostr.Filter) ([]nostr.Event, error) { + // Perform the search request... + // need to build up query body... + if filter == nil { + return nil, errors.New("filter cannot be null") + } + + // optimization: get by id + if isGetByID(filter) { + return ess.getByID(filter) + } + + dsl := buildDsl(filter) + pprint([]byte(dsl)) + + limit := 1000 + if filter.Limit > 0 && filter.Limit < limit { + limit = filter.Limit + } + + es := ess.es + res, err := es.Search( + es.Search.WithContext(context.Background()), + es.Search.WithIndex(ess.indexName), + + es.Search.WithBody(strings.NewReader(dsl)), + es.Search.WithSize(limit), + es.Search.WithSort("created_at:desc"), + + // es.Search.WithTrackTotalHits(true), + // es.Search.WithPretty(), + ) + if err != nil { + log.Fatalf("Error getting response: %s", err) + } + defer res.Body.Close() + + if res.IsError() { + txt, _ := io.ReadAll(res.Body) + fmt.Println("oh no", string(txt)) + return nil, fmt.Errorf("%s", txt) + } + + var r EsSearchResult + if err := json.NewDecoder(res.Body).Decode(&r); err != nil { + return nil, err + } + + events := make([]nostr.Event, len(r.Hits.Hits)) + for i, e := range r.Hits.Hits { + events[i] = e.Source + } + + return events, nil +} + +func isGetByID(filter *nostr.Filter) bool { + isGetById := len(filter.IDs) > 0 && + len(filter.Authors) == 0 && + len(filter.Kinds) == 0 && + len(filter.Tags) == 0 && + filter.Since == nil && + filter.Until == nil + + if isGetById { + for _, id := range filter.IDs { + if len(id) != 64 { + return false + } + } + } + return isGetById +} + +func pprint(j []byte) { + var dst bytes.Buffer + err := json.Indent(&dst, j, "", " ") + if err != nil { + fmt.Println("invalid JSON", err, string(j)) + } else { + fmt.Println(dst.String()) + } +} diff --git a/storage/elasticsearch/query_test.go b/storage/elasticsearch/query_test.go index e10fc45..ce21372 100644 --- a/storage/elasticsearch/query_test.go +++ b/storage/elasticsearch/query_test.go @@ -2,7 +2,6 @@ package elasticsearch import ( "encoding/json" - "fmt" "testing" "time" @@ -32,15 +31,15 @@ func TestQuery(t *testing.T) { } // "integration" test - ess := &ElasticsearchStorage{} - err := ess.Init() - if err != nil { - t.Error(err) - } + // ess := &ElasticsearchStorage{} + // err := ess.Init() + // if err != nil { + // t.Error(err) + // } - found, err := ess.QueryEvents(filter) - if err != nil { - t.Error(err) - } - fmt.Println(found) + // found, err := ess.QueryEvents(filter) + // if err != nil { + // t.Error(err) + // } + // fmt.Println(found) } From 05600231b4eede0424557c4008e03307a8752c9a Mon Sep 17 00:00:00 2001 From: Steve Perkins Date: Fri, 10 Feb 2023 14:58:01 -0500 Subject: [PATCH 3/5] support search req a-la nip-50 --- go.mod | 3 ++- go.sum | 4 --- search/README.md | 7 ++++++ search/main.go | 9 +++---- storage/elasticsearch/elasticsearch.go | 35 ++++++++++++++++++++------ storage/elasticsearch/query.go | 27 ++++++++++++-------- 6 files changed, 57 insertions(+), 28 deletions(-) diff --git a/go.mod b/go.mod index afc7fd4..2917985 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.18 require ( github.com/PuerkitoBio/goquery v1.8.0 github.com/cockroachdb/pebble v0.0.0-20220723153705-3fc374e4dc66 - github.com/elastic/go-elasticsearch v0.0.0 github.com/elastic/go-elasticsearch/v8 v8.6.0 github.com/gorilla/mux v1.8.0 github.com/gorilla/websocket v1.4.2 @@ -83,3 +82,5 @@ require ( golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 // indirect golang.org/x/text v0.3.7 // indirect ) + +replace github.com/nbd-wtf/go-nostr => /Users/steve/opc/go-nostr diff --git a/go.sum b/go.sum index 262025e..e3f3e48 100644 --- a/go.sum +++ b/go.sum @@ -125,8 +125,6 @@ github.com/dvyukov/go-fuzz v0.0.0-20210602112143-b1f3d6f4ef4e h1:qTP1telKJHlToHl github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM= github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c h1:onA2RpIyeCPvYAj1LFYiiMTrSpqVINWMfYFRS7lofJs= github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI= -github.com/elastic/go-elasticsearch v0.0.0 h1:Pd5fqOuBxKxv83b0+xOAJDAkziWYwFinWnBO0y+TZaA= -github.com/elastic/go-elasticsearch v0.0.0/go.mod h1:TkBSJBuTyFdBnrNqoPc54FN0vKf5c04IdM4zuStJ7xg= github.com/elastic/go-elasticsearch/v8 v8.6.0 h1:xMaSe8jIh7NHzmNo9YBkewmaD2Pr+tX+zLkXxhieny4= github.com/elastic/go-elasticsearch/v8 v8.6.0/go.mod h1:Usvydt+x0dv9a1TzEUaovqbJor8rmOHy5dSmPeMAE2k= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -326,8 +324,6 @@ github.com/moul/http2curl v1.0.0/go.mod h1:8UbvGypXm98wA/IqH45anm5Y2Z6ep6O31QGOA github.com/nats-io/nats.go v1.8.1/go.mod h1:BrFz9vVn0fU3AcH9Vn4Kd7W0NpJ651tD5omQ3M8LwxM= github.com/nats-io/nkeys v0.0.2/go.mod h1:dab7URMsZm6Z/jp9Z5UGa87Uutgc2mVpXLC4B7TDb/4= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/nbd-wtf/go-nostr v0.12.0 h1:6uo6D6jhcNzrzm6Fi8nA3jfZQqoXbeTWi9dIX5MsgZc= -github.com/nbd-wtf/go-nostr v0.12.0/go.mod h1:qFFTIxh15H5GGN0WsBI/P73DteqsevnhSEW/yk8nEf4= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nwaples/rardecode v1.1.2 h1:Cj0yZY6T1Zx1R7AhTbyGSALm44/Mmq+BAPc4B/p/d3M= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= diff --git a/search/README.md b/search/README.md index f3ed003..e42ffb6 100644 --- a/search/README.md +++ b/search/README.md @@ -5,9 +5,16 @@ bzip2 -cd nostr-wellorder-early-1m-v1.jsonl.bz2 | \ websocat -n -B 200000 ws://127.0.0.1:7447 ``` +``` +echo '["REQ", "asdf", {"search": "steve", "kinds": [0]}]' | websocat -n ws://127.0.0.1:7447 +``` + + todo: * index `content_search` field * support search queries * some kind of ranking signal (based on pubkey) * better config for ES: adjust bulk indexer settings, use custom mapping? + +* ES DSL string builder might not escape json strings correctly... diff --git a/search/main.go b/search/main.go index 16658ca..1688e3e 100644 --- a/search/main.go +++ b/search/main.go @@ -1,7 +1,6 @@ package main import ( - "encoding/json" "fmt" "log" @@ -36,10 +35,10 @@ func (r *Relay) Init() error { func (r *Relay) AcceptEvent(evt *nostr.Event) bool { // block events that are too large - jsonb, _ := json.Marshal(evt) - if len(jsonb) > 100000 { - return false - } + // jsonb, _ := json.Marshal(evt) + // if len(jsonb) > 100000 { + // return false + // } return true } diff --git a/storage/elasticsearch/elasticsearch.go b/storage/elasticsearch/elasticsearch.go index b443335..9953603 100644 --- a/storage/elasticsearch/elasticsearch.go +++ b/storage/elasticsearch/elasticsearch.go @@ -15,6 +15,11 @@ import ( "github.com/nbd-wtf/go-nostr" ) +type IndexedEvent struct { + Event nostr.Event `json:"event"` + ContentSearch string `json:"content_search"` +} + var indexMapping = ` { "settings": { @@ -24,11 +29,17 @@ var indexMapping = ` "mappings": { "dynamic": false, "properties": { - "id": {"type": "keyword"}, - "pubkey": {"type": "keyword"}, - "kind": {"type": "integer"}, - "tags": {"type": "keyword"}, - "created_at": {"type": "date"} + "event": { + "dynamic": false, + "properties": { + "id": {"type": "keyword"}, + "pubkey": {"type": "keyword"}, + "kind": {"type": "integer"}, + "tags": {"type": "keyword"}, + "created_at": {"type": "date"} + } + }, + "content_search": {"type": "text"} } } } @@ -49,7 +60,7 @@ func (ess *ElasticsearchStorage) Init() error { // log.Println(es.Info()) // todo: config - ess.indexName = "test" + ess.indexName = "test3" // todo: don't delete index every time // es.Indices.Delete([]string{ess.indexName}) @@ -123,7 +134,17 @@ func (ess *ElasticsearchStorage) DeleteEvent(id string, pubkey string) error { } func (ess *ElasticsearchStorage) SaveEvent(event *nostr.Event) error { - data, err := json.Marshal(event) + ie := &IndexedEvent{ + Event: *event, + } + + // post processing: index for FTS + // this could also possibly do custom indexing for kind=0. + if event.Kind != 4 { + ie.ContentSearch = event.Content + } + + data, err := json.Marshal(ie) if err != nil { return err } diff --git a/storage/elasticsearch/query.go b/storage/elasticsearch/query.go index 6b3bd59..03f99db 100644 --- a/storage/elasticsearch/query.go +++ b/storage/elasticsearch/query.go @@ -23,7 +23,7 @@ type EsSearchResult struct { Relation string } Hits []struct { - Source nostr.Event `json:"_source"` + Source IndexedEvent `json:"_source"` } } } @@ -42,7 +42,7 @@ func buildDsl(filter *nostr.Filter) string { if len(val) < 64 { op = "prefix" } - b.WriteString(fmt.Sprintf(`{"%s": {"%s": %q}}`, op, fieldName, val)) + b.WriteString(fmt.Sprintf(`{"%s": {"event.%s": %q}}`, op, fieldName, val)) } b.WriteString(`]}},`) } @@ -56,7 +56,7 @@ func buildDsl(filter *nostr.Filter) string { // kinds if len(filter.Kinds) > 0 { k, _ := json.Marshal(filter.Kinds) - b.WriteString(fmt.Sprintf(`{"terms": {"kind": %s}},`, k)) + b.WriteString(fmt.Sprintf(`{"terms": {"event.kind": %s}},`, k)) } // tags @@ -73,10 +73,10 @@ func buildDsl(filter *nostr.Filter) string { commaIdx++ b.WriteString(`{"bool": {"must": [`) for _, t := range terms { - b.WriteString(fmt.Sprintf(`{"term": {"tags": %q}},`, t)) + b.WriteString(fmt.Sprintf(`{"term": {"event.tags": %q}},`, t)) } // add the tag type at the end - b.WriteString(fmt.Sprintf(`{"term": {"tags": %q}}`, char)) + b.WriteString(fmt.Sprintf(`{"term": {"event.tags": %q}}`, char)) b.WriteString(`]}}`) } b.WriteString(`]}},`) @@ -84,12 +84,17 @@ func buildDsl(filter *nostr.Filter) string { // since if filter.Since != nil { - b.WriteString(fmt.Sprintf(`{"range": {"created_at": {"gt": %d}}},`, filter.Since.Unix())) + b.WriteString(fmt.Sprintf(`{"range": {"event.created_at": {"gt": %d}}},`, filter.Since.Unix())) } // until if filter.Until != nil { - b.WriteString(fmt.Sprintf(`{"range": {"created_at": {"lt": %d}}},`, filter.Until.Unix())) + b.WriteString(fmt.Sprintf(`{"range": {"event.created_at": {"lt": %d}}},`, filter.Until.Unix())) + } + + // search + if filter.Search != "" { + b.WriteString(fmt.Sprintf(`{"match": {"content_search": {"query": %s}}},`, filter.Search)) } // all blocks have a trailing comma... @@ -111,7 +116,7 @@ func (ess *ElasticsearchStorage) getByID(filter *nostr.Filter) ([]nostr.Event, e var mgetResponse struct { Docs []struct { Found bool - Source nostr.Event `json:"_source"` + Source IndexedEvent `json:"_source"` } } if err := json.NewDecoder(got.Body).Decode(&mgetResponse); err != nil { @@ -121,7 +126,7 @@ func (ess *ElasticsearchStorage) getByID(filter *nostr.Filter) ([]nostr.Event, e events := make([]nostr.Event, 0, len(mgetResponse.Docs)) for _, e := range mgetResponse.Docs { if e.Found { - events = append(events, e.Source) + events = append(events, e.Source.Event) } } @@ -155,7 +160,7 @@ func (ess *ElasticsearchStorage) QueryEvents(filter *nostr.Filter) ([]nostr.Even es.Search.WithBody(strings.NewReader(dsl)), es.Search.WithSize(limit), - es.Search.WithSort("created_at:desc"), + es.Search.WithSort("event.created_at:desc"), // es.Search.WithTrackTotalHits(true), // es.Search.WithPretty(), @@ -178,7 +183,7 @@ func (ess *ElasticsearchStorage) QueryEvents(filter *nostr.Filter) ([]nostr.Even events := make([]nostr.Event, len(r.Hits.Hits)) for i, e := range r.Hits.Hits { - events[i] = e.Source + events[i] = e.Source.Event } return events, nil From 0e18a498610f826068ad2adf7308585c07f08657 Mon Sep 17 00:00:00 2001 From: Steve Perkins Date: Mon, 13 Feb 2023 20:44:06 -0500 Subject: [PATCH 4/5] wip docker-compose setup --- go.mod | 2 +- go.sum | 1 + search/docker-compose.yml | 33 +++++++++++++------------- search/main.go | 2 +- storage/elasticsearch/elasticsearch.go | 21 +++++++++------- 5 files changed, 33 insertions(+), 26 deletions(-) diff --git a/go.mod b/go.mod index 2917985..3ff8e5a 100644 --- a/go.mod +++ b/go.mod @@ -83,4 +83,4 @@ require ( golang.org/x/text v0.3.7 // indirect ) -replace github.com/nbd-wtf/go-nostr => /Users/steve/opc/go-nostr +replace github.com/nbd-wtf/go-nostr => ../go-nostr diff --git a/go.sum b/go.sum index e3f3e48..90bd150 100644 --- a/go.sum +++ b/go.sum @@ -371,6 +371,7 @@ github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb6 github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= +github.com/stereosteve/go-nostr v0.13.1/go.mod h1:qFFTIxh15H5GGN0WsBI/P73DteqsevnhSEW/yk8nEf4= github.com/stevelacy/daz v0.1.4 h1:ugmff/D7D764wZjXSgSryEINE/bi+Xddllw3JQQGbWk= github.com/stevelacy/daz v0.1.4/go.mod h1:AbK6DzjiIL15r4bQtcFvOBAvDGMXoh+uIG26NRUugt0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/search/docker-compose.yml b/search/docker-compose.yml index 40479cb..66f20a8 100644 --- a/search/docker-compose.yml +++ b/search/docker-compose.yml @@ -1,20 +1,23 @@ version: "3.8" services: - # relay: - # build: - # context: ../ - # dockerfile: ./basic/Dockerfile - # environment: - # PORT: 2700 - # POSTGRESQL_DATABASE: postgres://nostr:nostr@postgres:5432/nostr?sslmode=disable - # depends_on: - # postgres: - # condition: service_healthy - # ports: - # - 2700:2700 - # command: "./basic/relayer-basic" - + relay: + image: golang + # build: + # context: ../ + # dockerfile: ./basic/Dockerfile + environment: + PORT: 2700 + ES_URL: http://elasticsearch:9200 + depends_on: + elasticsearch: + condition: service_healthy + ports: + - 2700:2700 + volumes: + - ./nostres:/bin/relay + command: "/bin/relay" + elasticsearch: container_name: elasticsearch image: docker.elastic.co/elasticsearch/elasticsearch:8.6.0 @@ -39,5 +42,3 @@ services: interval: 10s timeout: 5s retries: 5 - - diff --git a/search/main.go b/search/main.go index 1688e3e..14dce89 100644 --- a/search/main.go +++ b/search/main.go @@ -15,7 +15,7 @@ type Relay struct { } func (r *Relay) Name() string { - return "BasicRelay" + return "SearchRelay" } func (r *Relay) Storage() relayer.Storage { diff --git a/storage/elasticsearch/elasticsearch.go b/storage/elasticsearch/elasticsearch.go index 9953603..3d8a4d1 100644 --- a/storage/elasticsearch/elasticsearch.go +++ b/storage/elasticsearch/elasticsearch.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "log" + "os" "strings" "time" @@ -52,19 +53,18 @@ type ElasticsearchStorage struct { } func (ess *ElasticsearchStorage) Init() error { - es, err := elasticsearch.NewDefaultClient() + cfg := elasticsearch.Config{} + if x := os.Getenv("ES_URL"); x != "" { + cfg.Addresses = strings.Split(x, ",") + } + es, err := elasticsearch.NewClient(cfg) if err != nil { return err } - // log.Println(elasticsearch.Version) - // log.Println(es.Info()) - // todo: config + // todo: config + mapping settings ess.indexName = "test3" - // todo: don't delete index every time - // es.Indices.Delete([]string{ess.indexName}) - res, err := es.Indices.Create(ess.indexName, es.Indices.Create.WithBody(strings.NewReader(indexMapping))) if err != nil { return err @@ -139,7 +139,12 @@ func (ess *ElasticsearchStorage) SaveEvent(event *nostr.Event) error { } // post processing: index for FTS - // this could also possibly do custom indexing for kind=0. + // some ideas: + // - index kind=0 fields a set of dedicated mapped fields + // (or use a separate index for profiles with a dedicated mapping) + // - if it's valid JSON just index the "values" and not the keys + // - more content introspection: language detection + // - denormalization... attach profile + ranking signals to events if event.Kind != 4 { ie.ContentSearch = event.Content } From a7a0bb66820fd063ddbc27f552e2297c6cbd0d6f Mon Sep 17 00:00:00 2001 From: Steve Perkins Date: Wed, 15 Feb 2023 16:28:39 -0500 Subject: [PATCH 5/5] Use dsl builder for es query --- go.mod | 3 + go.sum | 7 ++ search/README.md | 18 ++-- storage/elasticsearch/elasticsearch.go | 34 ++++---- storage/elasticsearch/query.go | 115 +++++++++++-------------- storage/elasticsearch/query_test.go | 50 +++++------ 6 files changed, 114 insertions(+), 113 deletions(-) diff --git a/go.mod b/go.mod index 3ff8e5a..45484e9 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( github.com/SaveTheRbtz/generic-sync-map-go v0.0.0-20220414055132-a37292614db8 // indirect github.com/aead/siphash v1.0.1 // indirect github.com/andybalholm/cascadia v1.3.1 // indirect + github.com/aquasecurity/esquery v0.2.0 // indirect github.com/btcsuite/btcd v0.23.1 // indirect github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect github.com/btcsuite/btcd/btcutil v1.1.1 // indirect @@ -52,6 +53,8 @@ require ( github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect github.com/decred/dcrd/lru v1.0.0 // indirect github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c // indirect + github.com/elastic/go-elasticsearch/v7 v7.6.0 // indirect + github.com/fatih/structs v1.1.0 // indirect github.com/go-errors/errors v1.0.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect diff --git a/go.sum b/go.sum index 90bd150..200de7f 100644 --- a/go.sum +++ b/go.sum @@ -26,6 +26,8 @@ github.com/andybalholm/brotli v1.0.3 h1:fpcw+r1N1h0Poc1F/pHbW40cUm/lMEQslZtCkBQ0 github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y= github.com/andybalholm/cascadia v1.3.1 h1:nhxRkql1kdYCc8Snf7D5/D3spOX+dBgjA6u8x004T2c= github.com/andybalholm/cascadia v1.3.1/go.mod h1:R4bJ1UQfqADjvDa4P6HZHLh/3OxWWEqc0Sk8XGwHqvA= +github.com/aquasecurity/esquery v0.2.0 h1:9WWXve95TE8hbm3736WB7nS6Owl8UGDeu+0jiyE9ttA= +github.com/aquasecurity/esquery v0.2.0/go.mod h1:VU+CIFR6C+H142HHZf9RUkp4Eedpo9UrEKeCQHWf9ao= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -125,6 +127,8 @@ github.com/dvyukov/go-fuzz v0.0.0-20210602112143-b1f3d6f4ef4e h1:qTP1telKJHlToHl github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM= github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c h1:onA2RpIyeCPvYAj1LFYiiMTrSpqVINWMfYFRS7lofJs= github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI= +github.com/elastic/go-elasticsearch/v7 v7.6.0 h1:sYpGLpEFHgLUKLsZUBfuaVI9QgHjS3JdH9fX4/z8QI8= +github.com/elastic/go-elasticsearch/v7 v7.6.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= github.com/elastic/go-elasticsearch/v8 v8.6.0 h1:xMaSe8jIh7NHzmNo9YBkewmaD2Pr+tX+zLkXxhieny4= github.com/elastic/go-elasticsearch/v8 v8.6.0/go.mod h1:Usvydt+x0dv9a1TzEUaovqbJor8rmOHy5dSmPeMAE2k= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -132,6 +136,7 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw= github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8= +github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/fergusstrange/embedded-postgres v1.10.0 h1:YnwF6xAQYmKLAXXrrRx4rHDLih47YJwVPvg8jeKfdNg= github.com/flosch/pongo2 v0.0.0-20190707114632-bbf5a6c351f4/go.mod h1:T9YF2M40nIgbVgp3rreNmTged+9HrbNTIQf1PsaIiTA= @@ -223,6 +228,8 @@ github.com/jb55/lnsocket/go v0.0.0-20220725174341-b98b5cd37bb6/go.mod h1:atFK/q4 github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= +github.com/jgroeneveld/schema v1.0.0/go.mod h1:M14lv7sNMtGvo3ops1MwslaSYgDYxrSmbzWIQ0Mr5rs= +github.com/jgroeneveld/trial v2.0.0+incompatible/go.mod h1:I6INLW96EN8WysNBXUFI3M4RIC8ePg9ntAc/Wy+U/+M= github.com/jmoiron/sqlx v1.3.1 h1:aLN7YINNZ7cYOPK3QC83dbM6KT0NMqVMw961TqrejlE= github.com/jmoiron/sqlx v1.3.1/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ= github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9qUBdQ= diff --git a/search/README.md b/search/README.md index e42ffb6..631f152 100644 --- a/search/README.md +++ b/search/README.md @@ -1,3 +1,9 @@ +# Search Relay + +Uses ElasticSearch storage backend for all queries, with some basic full text search support. + +Index some events: + ``` bzip2 -cd nostr-wellorder-early-1m-v1.jsonl.bz2 | \ jq -c '["EVENT", .]' | \ @@ -5,16 +11,16 @@ bzip2 -cd nostr-wellorder-early-1m-v1.jsonl.bz2 | \ websocat -n -B 200000 ws://127.0.0.1:7447 ``` +Do a search: + ``` echo '["REQ", "asdf", {"search": "steve", "kinds": [0]}]' | websocat -n ws://127.0.0.1:7447 ``` -todo: +## Customize -* index `content_search` field -* support search queries -* some kind of ranking signal (based on pubkey) -* better config for ES: adjust bulk indexer settings, use custom mapping? +Currently the indexing is very basic: It will index the `contents` field for all events where kind != 4. +Some additional mapping and pre-processing could add better support for different content types. +See comments in `storage/elasticsearch/elasticsearch.go`. -* ES DSL string builder might not escape json strings correctly... diff --git a/storage/elasticsearch/elasticsearch.go b/storage/elasticsearch/elasticsearch.go index 3d8a4d1..12103a8 100644 --- a/storage/elasticsearch/elasticsearch.go +++ b/storage/elasticsearch/elasticsearch.go @@ -6,7 +6,6 @@ import ( "encoding/json" "fmt" "io" - "log" "os" "strings" "time" @@ -47,12 +46,18 @@ var indexMapping = ` ` type ElasticsearchStorage struct { - es *elasticsearch.Client - bi esutil.BulkIndexer - indexName string + IndexName string + + es *elasticsearch.Client + bi esutil.BulkIndexer } func (ess *ElasticsearchStorage) Init() error { + + if ess.IndexName == "" { + ess.IndexName = "events" + } + cfg := elasticsearch.Config{} if x := os.Getenv("ES_URL"); x != "" { cfg.Addresses = strings.Split(x, ",") @@ -62,10 +67,7 @@ func (ess *ElasticsearchStorage) Init() error { return err } - // todo: config + mapping settings - ess.indexName = "test3" - - res, err := es.Indices.Create(ess.indexName, es.Indices.Create.WithBody(strings.NewReader(indexMapping))) + res, err := es.Indices.Create(ess.IndexName, es.Indices.Create.WithBody(strings.NewReader(indexMapping))) if err != nil { return err } @@ -79,13 +81,13 @@ func (ess *ElasticsearchStorage) Init() error { // bulk indexer bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ - Index: ess.indexName, // The default index name - Client: es, // The Elasticsearch client - NumWorkers: 2, // The number of worker goroutines - FlushInterval: 3 * time.Second, // The periodic flush interval + Index: ess.IndexName, + Client: es, + NumWorkers: 2, + FlushInterval: 3 * time.Second, }) if err != nil { - log.Fatalf("Error creating the indexer: %s", err) + return fmt.Errorf("error creating the indexer: %s", err) } ess.es = es @@ -127,9 +129,6 @@ func (ess *ElasticsearchStorage) DeleteEvent(id string, pubkey string) error { } err = <-done - if err != nil { - log.Println("DEL", err) - } return err } @@ -182,8 +181,5 @@ func (ess *ElasticsearchStorage) SaveEvent(event *nostr.Event) error { } err = <-done - if err != nil { - log.Println("SAVE", err) - } return err } diff --git a/storage/elasticsearch/query.go b/storage/elasticsearch/query.go index 03f99db..00df2e1 100644 --- a/storage/elasticsearch/query.go +++ b/storage/elasticsearch/query.go @@ -8,8 +8,9 @@ import ( "fmt" "io" "log" - "strings" + "reflect" + "github.com/aquasecurity/esquery" "github.com/elastic/go-elasticsearch/v8/esutil" "github.com/nbd-wtf/go-nostr" ) @@ -28,87 +29,67 @@ type EsSearchResult struct { } } -func buildDsl(filter *nostr.Filter) string { - b := &strings.Builder{} - b.WriteString(`{"query": {"bool": {"filter": {"bool": {"must": [`) +func buildDsl(filter *nostr.Filter) ([]byte, error) { + dsl := esquery.Bool() prefixFilter := func(fieldName string, values []string) { - b.WriteString(`{"bool": {"should": [`) - for idx, val := range values { - if idx > 0 { - b.WriteRune(',') - } - op := "term" - if len(val) < 64 { - op = "prefix" - } - b.WriteString(fmt.Sprintf(`{"%s": {"event.%s": %q}}`, op, fieldName, val)) + if len(values) == 0 { + return } - b.WriteString(`]}},`) + prefixQ := esquery.Bool() + for _, v := range values { + if len(v) < 64 { + prefixQ.Should(esquery.Prefix(fieldName, v)) + } else { + prefixQ.Should(esquery.Term(fieldName, v)) + } + } + dsl.Must(prefixQ) } // ids - prefixFilter("id", filter.IDs) + prefixFilter("event.id", filter.IDs) // authors - prefixFilter("pubkey", filter.Authors) + prefixFilter("event.pubkey", filter.Authors) // kinds if len(filter.Kinds) > 0 { - k, _ := json.Marshal(filter.Kinds) - b.WriteString(fmt.Sprintf(`{"terms": {"event.kind": %s}},`, k)) + dsl.Must(esquery.Terms("event.kind", toInterfaceSlice(filter.Kinds)...)) } // tags - { - b.WriteString(`{"bool": {"should": [`) - commaIdx := 0 + if len(filter.Tags) > 0 { + tagQ := esquery.Bool() for char, terms := range filter.Tags { - if len(terms) == 0 { - continue - } - if commaIdx > 0 { - b.WriteRune(',') - } - commaIdx++ - b.WriteString(`{"bool": {"must": [`) - for _, t := range terms { - b.WriteString(fmt.Sprintf(`{"term": {"event.tags": %q}},`, t)) - } - // add the tag type at the end - b.WriteString(fmt.Sprintf(`{"term": {"event.tags": %q}}`, char)) - b.WriteString(`]}}`) + vs := toInterfaceSlice(append(terms, char)) + tagQ.Should(esquery.Terms("event.tags", vs...)) } - b.WriteString(`]}},`) + dsl.Must(tagQ) } // since if filter.Since != nil { - b.WriteString(fmt.Sprintf(`{"range": {"event.created_at": {"gt": %d}}},`, filter.Since.Unix())) + dsl.Must(esquery.Range("event.created_at").Gt(filter.Since.Unix())) } // until if filter.Until != nil { - b.WriteString(fmt.Sprintf(`{"range": {"event.created_at": {"lt": %d}}},`, filter.Until.Unix())) + dsl.Must(esquery.Range("event.created_at").Lt(filter.Until.Unix())) } // search if filter.Search != "" { - b.WriteString(fmt.Sprintf(`{"match": {"content_search": {"query": %s}}},`, filter.Search)) + dsl.Must(esquery.Match("content_search", filter.Search)) } - // all blocks have a trailing comma... - // add a match_all "noop" at the end - // so json is valid - b.WriteString(`{"match_all": {}}`) - b.WriteString(`]}}}}}`) - return b.String() + return json.Marshal(esquery.Query(dsl)) } func (ess *ElasticsearchStorage) getByID(filter *nostr.Filter) ([]nostr.Event, error) { got, err := ess.es.Mget( esutil.NewJSONReader(filter), - ess.es.Mget.WithIndex(ess.indexName)) + ess.es.Mget.WithIndex(ess.IndexName)) if err != nil { return nil, err } @@ -134,8 +115,6 @@ func (ess *ElasticsearchStorage) getByID(filter *nostr.Filter) ([]nostr.Event, e } func (ess *ElasticsearchStorage) QueryEvents(filter *nostr.Filter) ([]nostr.Event, error) { - // Perform the search request... - // need to build up query body... if filter == nil { return nil, errors.New("filter cannot be null") } @@ -145,8 +124,10 @@ func (ess *ElasticsearchStorage) QueryEvents(filter *nostr.Filter) ([]nostr.Even return ess.getByID(filter) } - dsl := buildDsl(filter) - pprint([]byte(dsl)) + dsl, err := buildDsl(filter) + if err != nil { + return nil, err + } limit := 1000 if filter.Limit > 0 && filter.Limit < limit { @@ -156,14 +137,11 @@ func (ess *ElasticsearchStorage) QueryEvents(filter *nostr.Filter) ([]nostr.Even es := ess.es res, err := es.Search( es.Search.WithContext(context.Background()), - es.Search.WithIndex(ess.indexName), + es.Search.WithIndex(ess.IndexName), - es.Search.WithBody(strings.NewReader(dsl)), + es.Search.WithBody(bytes.NewReader(dsl)), es.Search.WithSize(limit), es.Search.WithSort("event.created_at:desc"), - - // es.Search.WithTrackTotalHits(true), - // es.Search.WithPretty(), ) if err != nil { log.Fatalf("Error getting response: %s", err) @@ -207,12 +185,23 @@ func isGetByID(filter *nostr.Filter) bool { return isGetById } -func pprint(j []byte) { - var dst bytes.Buffer - err := json.Indent(&dst, j, "", " ") - if err != nil { - fmt.Println("invalid JSON", err, string(j)) - } else { - fmt.Println(dst.String()) +// from: https://stackoverflow.com/a/12754757 +func toInterfaceSlice(slice interface{}) []interface{} { + s := reflect.ValueOf(slice) + if s.Kind() != reflect.Slice { + panic("InterfaceSlice() given a non-slice type") } + + // Keep the distinction between nil and empty slice input + if s.IsNil() { + return nil + } + + ret := make([]interface{}, s.Len()) + + for i := 0; i < s.Len(); i++ { + ret[i] = s.Index(i).Interface() + } + + return ret } diff --git a/storage/elasticsearch/query_test.go b/storage/elasticsearch/query_test.go index ce21372..f30d390 100644 --- a/storage/elasticsearch/query_test.go +++ b/storage/elasticsearch/query_test.go @@ -1,7 +1,9 @@ package elasticsearch import ( + "bytes" "encoding/json" + "fmt" "testing" "time" @@ -12,34 +14,32 @@ func TestQuery(t *testing.T) { now := time.Now() yesterday := now.Add(time.Hour * -24) filter := &nostr.Filter{ - // IDs: []string{"abc", "123", "971b9489b4fd4e41a85951607922b982d981fa9d55318bc304f21f390721404c"}, + IDs: []string{"abc", "123", "971b9489b4fd4e41a85951607922b982d981fa9d55318bc304f21f390721404c"}, Kinds: []int{0, 1}, - // Tags: nostr.TagMap{ - // "a": []string{"abc"}, - // "b": []string{"aaa", "bbb"}, - // }, - Since: &yesterday, - Until: &now, - Limit: 100, + Tags: nostr.TagMap{ + "e": []string{"abc"}, + "p": []string{"aaa", "bbb"}, + }, + Since: &yesterday, + Until: &now, + Limit: 100, + Search: "other stuff", } - dsl := buildDsl(filter) - pprint([]byte(dsl)) - - if !json.Valid([]byte(dsl)) { - t.Fail() + dsl, err := buildDsl(filter) + if err != nil { + t.Fatal(err) } + pprint(dsl) - // "integration" test - // ess := &ElasticsearchStorage{} - // err := ess.Init() - // if err != nil { - // t.Error(err) - // } - - // found, err := ess.QueryEvents(filter) - // if err != nil { - // t.Error(err) - // } - // fmt.Println(found) +} + +func pprint(j []byte) { + var dst bytes.Buffer + err := json.Indent(&dst, j, "", " ") + if err != nil { + fmt.Println("invalid JSON", err, string(j)) + } else { + fmt.Println(dst.String()) + } }