From 34a21cb3740136d95a6e3c7fefb446c37d130a54 Mon Sep 17 00:00:00 2001 From: Steve Perkins Date: Tue, 7 Feb 2023 10:46:27 -0500 Subject: [PATCH] Use bulk indexer for writes. Special case get by ID. --- search/README.md | 13 ++ search/main.go | 2 +- storage/elasticsearch/elasticsearch.go | 237 ++++++++----------------- storage/elasticsearch/query.go | 213 ++++++++++++++++++++++ storage/elasticsearch/query_test.go | 21 ++- 5 files changed, 312 insertions(+), 174 deletions(-) create mode 100644 search/README.md create mode 100644 storage/elasticsearch/query.go diff --git a/search/README.md b/search/README.md new file mode 100644 index 0000000..f3ed003 --- /dev/null +++ b/search/README.md @@ -0,0 +1,13 @@ +``` +bzip2 -cd nostr-wellorder-early-1m-v1.jsonl.bz2 | \ + jq -c '["EVENT", .]' | \ + awk 'length($0)<131072' | \ + websocat -n -B 200000 ws://127.0.0.1:7447 +``` + +todo: + +* index `content_search` field +* support search queries +* some kind of ranking signal (based on pubkey) +* better config for ES: adjust bulk indexer settings, use custom mapping? diff --git a/search/main.go b/search/main.go index 25947d2..16658ca 100644 --- a/search/main.go +++ b/search/main.go @@ -37,7 +37,7 @@ func (r *Relay) Init() error { func (r *Relay) AcceptEvent(evt *nostr.Event) bool { // block events that are too large jsonb, _ := json.Marshal(evt) - if len(jsonb) > 10000 { + if len(jsonb) > 100000 { return false } diff --git a/storage/elasticsearch/elasticsearch.go b/storage/elasticsearch/elasticsearch.go index acec779..b443335 100644 --- a/storage/elasticsearch/elasticsearch.go +++ b/storage/elasticsearch/elasticsearch.go @@ -4,24 +4,17 @@ import ( "bytes" "context" "encoding/json" - "errors" "fmt" "io" "log" "strings" + "time" - "github.com/elastic/go-elasticsearch/esapi" "github.com/elastic/go-elasticsearch/v8" + "github.com/elastic/go-elasticsearch/v8/esutil" "github.com/nbd-wtf/go-nostr" ) -/* -1. create index with mapping in Init -2. implement delete -3. build query in QueryEvents -4. implement replaceable events -*/ - var indexMapping = ` { "settings": { @@ -43,6 +36,7 @@ var indexMapping = ` type ElasticsearchStorage struct { es *elasticsearch.Client + bi esutil.BulkIndexer indexName string } @@ -72,154 +66,60 @@ func (ess *ElasticsearchStorage) Init() error { } } + // bulk indexer + bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ + Index: ess.indexName, // The default index name + Client: es, // The Elasticsearch client + NumWorkers: 2, // The number of worker goroutines + FlushInterval: 3 * time.Second, // The periodic flush interval + }) + if err != nil { + log.Fatalf("Error creating the indexer: %s", err) + } + ess.es = es + ess.bi = bi return nil } -type EsSearchResult struct { - Took int - TimedOut bool `json:"timed_out"` - Hits struct { - Total struct { - Value int - Relation string - } - Hits []struct { - Source nostr.Event `json:"_source"` - } - } -} - -func buildDsl(filter *nostr.Filter) string { - b := &strings.Builder{} - b.WriteString(`{"query": {"bool": {"filter": {"bool": {"must": [`) - - prefixFilter := func(fieldName string, values []string) { - b.WriteString(`{"bool": {"should": [`) - for idx, val := range values { - if idx > 0 { - b.WriteRune(',') - } - op := "term" - if len(val) < 64 { - op = "prefix" - } - b.WriteString(fmt.Sprintf(`{"%s": {"%s": %q}}`, op, fieldName, val)) - } - b.WriteString(`]}},`) - } - - // ids - prefixFilter("id", filter.IDs) - - // authors - prefixFilter("pubkey", filter.Authors) - - // kinds - if len(filter.Kinds) > 0 { - k, _ := json.Marshal(filter.Kinds) - b.WriteString(fmt.Sprintf(`{"terms": {"kind": %s}},`, k)) - } - - // tags - { - b.WriteString(`{"bool": {"should": [`) - commaIdx := 0 - for char, terms := range filter.Tags { - if len(terms) == 0 { - continue - } - if commaIdx > 0 { - b.WriteRune(',') - } - commaIdx++ - b.WriteString(`{"bool": {"must": [`) - for _, t := range terms { - b.WriteString(fmt.Sprintf(`{"term": {"tags": %q}},`, t)) - } - // add the tag type at the end - b.WriteString(fmt.Sprintf(`{"term": {"tags": %q}}`, char)) - b.WriteString(`]}}`) - } - b.WriteString(`]}},`) - } - - // since - if filter.Since != nil { - b.WriteString(fmt.Sprintf(`{"range": {"created_at": {"gt": %d}}},`, filter.Since.Unix())) - } - - // until - if filter.Until != nil { - b.WriteString(fmt.Sprintf(`{"range": {"created_at": {"lt": %d}}},`, filter.Until.Unix())) - } - - // all blocks have a trailing comma... - // add a match_all "noop" at the end - // so json is valid - b.WriteString(`{"match_all": {}}`) - b.WriteString(`]}}}}}`) - return b.String() -} - -func (ess *ElasticsearchStorage) QueryEvents(filter *nostr.Filter) ([]nostr.Event, error) { - // Perform the search request... - // need to build up query body... - if filter == nil { - return nil, errors.New("filter cannot be null") - } - - dsl := buildDsl(filter) - // pprint([]byte(dsl)) - - es := ess.es - res, err := es.Search( - es.Search.WithContext(context.Background()), - es.Search.WithIndex(ess.indexName), - - es.Search.WithBody(strings.NewReader(dsl)), - es.Search.WithSize(filter.Limit), - es.Search.WithSort("created_at:desc"), - - es.Search.WithTrackTotalHits(true), - es.Search.WithPretty(), - ) - if err != nil { - log.Fatalf("Error getting response: %s", err) - } - defer res.Body.Close() - - if res.IsError() { - txt, _ := io.ReadAll(res.Body) - fmt.Println("oh no", string(txt)) - return nil, fmt.Errorf("%s", txt) - } - - var r EsSearchResult - if err := json.NewDecoder(res.Body).Decode(&r); err != nil { - return nil, err - } - - events := make([]nostr.Event, len(r.Hits.Hits)) - for i, e := range r.Hits.Hits { - events[i] = e.Source - } - - return events, nil -} - func (ess *ElasticsearchStorage) DeleteEvent(id string, pubkey string) error { // todo: is pubkey match required? - res, err := ess.es.Delete(ess.indexName, id) + + done := make(chan error) + err := ess.bi.Add( + context.Background(), + esutil.BulkIndexerItem{ + Action: "delete", + DocumentID: id, + OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { + close(done) + }, + OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { + if err != nil { + done <- err + } else { + // ok if deleted item not found + if res.Status == 404 { + close(done) + return + } + txt, _ := json.Marshal(res) + err := fmt.Errorf("ERROR: %s", txt) + done <- err + } + }, + }, + ) if err != nil { return err } - if res.IsError() { - txt, _ := io.ReadAll(res.Body) - return fmt.Errorf("%s", txt) + + err = <-done + if err != nil { + log.Println("DEL", err) } - return nil + return err } func (ess *ElasticsearchStorage) SaveEvent(event *nostr.Event) error { @@ -228,23 +128,36 @@ func (ess *ElasticsearchStorage) SaveEvent(event *nostr.Event) error { return err } - req := esapi.IndexRequest{ - Index: ess.indexName, - DocumentID: event.ID, - Body: bytes.NewReader(data), - // Refresh: "true", + done := make(chan error) + + // adapted from: + // https://github.com/elastic/go-elasticsearch/blob/main/_examples/bulk/indexer.go#L196 + err = ess.bi.Add( + context.Background(), + esutil.BulkIndexerItem{ + Action: "index", + DocumentID: event.ID, + Body: bytes.NewReader(data), + OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { + close(done) + }, + OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { + if err != nil { + done <- err + } else { + err := fmt.Errorf("ERROR: %s: %s", res.Error.Type, res.Error.Reason) + done <- err + } + }, + }, + ) + if err != nil { + return err } - _, err = req.Do(context.Background(), ess.es) + err = <-done + if err != nil { + log.Println("SAVE", err) + } return err } - -func pprint(j []byte) { - var dst bytes.Buffer - err := json.Indent(&dst, j, "", " ") - if err != nil { - fmt.Println("invalid JSON", err, string(j)) - } else { - fmt.Println(dst.String()) - } -} diff --git a/storage/elasticsearch/query.go b/storage/elasticsearch/query.go new file mode 100644 index 0000000..6b3bd59 --- /dev/null +++ b/storage/elasticsearch/query.go @@ -0,0 +1,213 @@ +package elasticsearch + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "strings" + + "github.com/elastic/go-elasticsearch/v8/esutil" + "github.com/nbd-wtf/go-nostr" +) + +type EsSearchResult struct { + Took int + TimedOut bool `json:"timed_out"` + Hits struct { + Total struct { + Value int + Relation string + } + Hits []struct { + Source nostr.Event `json:"_source"` + } + } +} + +func buildDsl(filter *nostr.Filter) string { + b := &strings.Builder{} + b.WriteString(`{"query": {"bool": {"filter": {"bool": {"must": [`) + + prefixFilter := func(fieldName string, values []string) { + b.WriteString(`{"bool": {"should": [`) + for idx, val := range values { + if idx > 0 { + b.WriteRune(',') + } + op := "term" + if len(val) < 64 { + op = "prefix" + } + b.WriteString(fmt.Sprintf(`{"%s": {"%s": %q}}`, op, fieldName, val)) + } + b.WriteString(`]}},`) + } + + // ids + prefixFilter("id", filter.IDs) + + // authors + prefixFilter("pubkey", filter.Authors) + + // kinds + if len(filter.Kinds) > 0 { + k, _ := json.Marshal(filter.Kinds) + b.WriteString(fmt.Sprintf(`{"terms": {"kind": %s}},`, k)) + } + + // tags + { + b.WriteString(`{"bool": {"should": [`) + commaIdx := 0 + for char, terms := range filter.Tags { + if len(terms) == 0 { + continue + } + if commaIdx > 0 { + b.WriteRune(',') + } + commaIdx++ + b.WriteString(`{"bool": {"must": [`) + for _, t := range terms { + b.WriteString(fmt.Sprintf(`{"term": {"tags": %q}},`, t)) + } + // add the tag type at the end + b.WriteString(fmt.Sprintf(`{"term": {"tags": %q}}`, char)) + b.WriteString(`]}}`) + } + b.WriteString(`]}},`) + } + + // since + if filter.Since != nil { + b.WriteString(fmt.Sprintf(`{"range": {"created_at": {"gt": %d}}},`, filter.Since.Unix())) + } + + // until + if filter.Until != nil { + b.WriteString(fmt.Sprintf(`{"range": {"created_at": {"lt": %d}}},`, filter.Until.Unix())) + } + + // all blocks have a trailing comma... + // add a match_all "noop" at the end + // so json is valid + b.WriteString(`{"match_all": {}}`) + b.WriteString(`]}}}}}`) + return b.String() +} + +func (ess *ElasticsearchStorage) getByID(filter *nostr.Filter) ([]nostr.Event, error) { + got, err := ess.es.Mget( + esutil.NewJSONReader(filter), + ess.es.Mget.WithIndex(ess.indexName)) + if err != nil { + return nil, err + } + + var mgetResponse struct { + Docs []struct { + Found bool + Source nostr.Event `json:"_source"` + } + } + if err := json.NewDecoder(got.Body).Decode(&mgetResponse); err != nil { + return nil, err + } + + events := make([]nostr.Event, 0, len(mgetResponse.Docs)) + for _, e := range mgetResponse.Docs { + if e.Found { + events = append(events, e.Source) + } + } + + return events, nil +} + +func (ess *ElasticsearchStorage) QueryEvents(filter *nostr.Filter) ([]nostr.Event, error) { + // Perform the search request... + // need to build up query body... + if filter == nil { + return nil, errors.New("filter cannot be null") + } + + // optimization: get by id + if isGetByID(filter) { + return ess.getByID(filter) + } + + dsl := buildDsl(filter) + pprint([]byte(dsl)) + + limit := 1000 + if filter.Limit > 0 && filter.Limit < limit { + limit = filter.Limit + } + + es := ess.es + res, err := es.Search( + es.Search.WithContext(context.Background()), + es.Search.WithIndex(ess.indexName), + + es.Search.WithBody(strings.NewReader(dsl)), + es.Search.WithSize(limit), + es.Search.WithSort("created_at:desc"), + + // es.Search.WithTrackTotalHits(true), + // es.Search.WithPretty(), + ) + if err != nil { + log.Fatalf("Error getting response: %s", err) + } + defer res.Body.Close() + + if res.IsError() { + txt, _ := io.ReadAll(res.Body) + fmt.Println("oh no", string(txt)) + return nil, fmt.Errorf("%s", txt) + } + + var r EsSearchResult + if err := json.NewDecoder(res.Body).Decode(&r); err != nil { + return nil, err + } + + events := make([]nostr.Event, len(r.Hits.Hits)) + for i, e := range r.Hits.Hits { + events[i] = e.Source + } + + return events, nil +} + +func isGetByID(filter *nostr.Filter) bool { + isGetById := len(filter.IDs) > 0 && + len(filter.Authors) == 0 && + len(filter.Kinds) == 0 && + len(filter.Tags) == 0 && + filter.Since == nil && + filter.Until == nil + + if isGetById { + for _, id := range filter.IDs { + if len(id) != 64 { + return false + } + } + } + return isGetById +} + +func pprint(j []byte) { + var dst bytes.Buffer + err := json.Indent(&dst, j, "", " ") + if err != nil { + fmt.Println("invalid JSON", err, string(j)) + } else { + fmt.Println(dst.String()) + } +} diff --git a/storage/elasticsearch/query_test.go b/storage/elasticsearch/query_test.go index e10fc45..ce21372 100644 --- a/storage/elasticsearch/query_test.go +++ b/storage/elasticsearch/query_test.go @@ -2,7 +2,6 @@ package elasticsearch import ( "encoding/json" - "fmt" "testing" "time" @@ -32,15 +31,15 @@ func TestQuery(t *testing.T) { } // "integration" test - ess := &ElasticsearchStorage{} - err := ess.Init() - if err != nil { - t.Error(err) - } + // ess := &ElasticsearchStorage{} + // err := ess.Init() + // if err != nil { + // t.Error(err) + // } - found, err := ess.QueryEvents(filter) - if err != nil { - t.Error(err) - } - fmt.Println(found) + // found, err := ess.QueryEvents(filter) + // if err != nil { + // t.Error(err) + // } + // fmt.Println(found) }