Use bulk indexer for writes. Special case get by ID.

This commit is contained in:
Steve Perkins 2023-02-07 10:46:27 -05:00
parent d306c03369
commit 34a21cb374
5 changed files with 312 additions and 174 deletions

13
search/README.md Normal file
View File

@ -0,0 +1,13 @@
```
bzip2 -cd nostr-wellorder-early-1m-v1.jsonl.bz2 | \
jq -c '["EVENT", .]' | \
awk 'length($0)<131072' | \
websocat -n -B 200000 ws://127.0.0.1:7447
```
todo:
* index `content_search` field
* support search queries
* some kind of ranking signal (based on pubkey)
* better config for ES: adjust bulk indexer settings, use custom mapping?

View File

@ -37,7 +37,7 @@ func (r *Relay) Init() error {
func (r *Relay) AcceptEvent(evt *nostr.Event) bool {
// block events that are too large
jsonb, _ := json.Marshal(evt)
if len(jsonb) > 10000 {
if len(jsonb) > 100000 {
return false
}

View File

@ -4,24 +4,17 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"strings"
"time"
"github.com/elastic/go-elasticsearch/esapi"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esutil"
"github.com/nbd-wtf/go-nostr"
)
/*
1. create index with mapping in Init
2. implement delete
3. build query in QueryEvents
4. implement replaceable events
*/
var indexMapping = `
{
"settings": {
@ -43,6 +36,7 @@ var indexMapping = `
type ElasticsearchStorage struct {
es *elasticsearch.Client
bi esutil.BulkIndexer
indexName string
}
@ -72,154 +66,60 @@ func (ess *ElasticsearchStorage) Init() error {
}
}
// bulk indexer
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: ess.indexName, // The default index name
Client: es, // The Elasticsearch client
NumWorkers: 2, // The number of worker goroutines
FlushInterval: 3 * time.Second, // The periodic flush interval
})
if err != nil {
log.Fatalf("Error creating the indexer: %s", err)
}
ess.es = es
ess.bi = bi
return nil
}
type EsSearchResult struct {
Took int
TimedOut bool `json:"timed_out"`
Hits struct {
Total struct {
Value int
Relation string
}
Hits []struct {
Source nostr.Event `json:"_source"`
}
}
}
func buildDsl(filter *nostr.Filter) string {
b := &strings.Builder{}
b.WriteString(`{"query": {"bool": {"filter": {"bool": {"must": [`)
prefixFilter := func(fieldName string, values []string) {
b.WriteString(`{"bool": {"should": [`)
for idx, val := range values {
if idx > 0 {
b.WriteRune(',')
}
op := "term"
if len(val) < 64 {
op = "prefix"
}
b.WriteString(fmt.Sprintf(`{"%s": {"%s": %q}}`, op, fieldName, val))
}
b.WriteString(`]}},`)
}
// ids
prefixFilter("id", filter.IDs)
// authors
prefixFilter("pubkey", filter.Authors)
// kinds
if len(filter.Kinds) > 0 {
k, _ := json.Marshal(filter.Kinds)
b.WriteString(fmt.Sprintf(`{"terms": {"kind": %s}},`, k))
}
// tags
{
b.WriteString(`{"bool": {"should": [`)
commaIdx := 0
for char, terms := range filter.Tags {
if len(terms) == 0 {
continue
}
if commaIdx > 0 {
b.WriteRune(',')
}
commaIdx++
b.WriteString(`{"bool": {"must": [`)
for _, t := range terms {
b.WriteString(fmt.Sprintf(`{"term": {"tags": %q}},`, t))
}
// add the tag type at the end
b.WriteString(fmt.Sprintf(`{"term": {"tags": %q}}`, char))
b.WriteString(`]}}`)
}
b.WriteString(`]}},`)
}
// since
if filter.Since != nil {
b.WriteString(fmt.Sprintf(`{"range": {"created_at": {"gt": %d}}},`, filter.Since.Unix()))
}
// until
if filter.Until != nil {
b.WriteString(fmt.Sprintf(`{"range": {"created_at": {"lt": %d}}},`, filter.Until.Unix()))
}
// all blocks have a trailing comma...
// add a match_all "noop" at the end
// so json is valid
b.WriteString(`{"match_all": {}}`)
b.WriteString(`]}}}}}`)
return b.String()
}
func (ess *ElasticsearchStorage) QueryEvents(filter *nostr.Filter) ([]nostr.Event, error) {
// Perform the search request...
// need to build up query body...
if filter == nil {
return nil, errors.New("filter cannot be null")
}
dsl := buildDsl(filter)
// pprint([]byte(dsl))
es := ess.es
res, err := es.Search(
es.Search.WithContext(context.Background()),
es.Search.WithIndex(ess.indexName),
es.Search.WithBody(strings.NewReader(dsl)),
es.Search.WithSize(filter.Limit),
es.Search.WithSort("created_at:desc"),
es.Search.WithTrackTotalHits(true),
es.Search.WithPretty(),
)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
txt, _ := io.ReadAll(res.Body)
fmt.Println("oh no", string(txt))
return nil, fmt.Errorf("%s", txt)
}
var r EsSearchResult
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
return nil, err
}
events := make([]nostr.Event, len(r.Hits.Hits))
for i, e := range r.Hits.Hits {
events[i] = e.Source
}
return events, nil
}
func (ess *ElasticsearchStorage) DeleteEvent(id string, pubkey string) error {
// todo: is pubkey match required?
res, err := ess.es.Delete(ess.indexName, id)
done := make(chan error)
err := ess.bi.Add(
context.Background(),
esutil.BulkIndexerItem{
Action: "delete",
DocumentID: id,
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
close(done)
},
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
if err != nil {
done <- err
} else {
// ok if deleted item not found
if res.Status == 404 {
close(done)
return
}
txt, _ := json.Marshal(res)
err := fmt.Errorf("ERROR: %s", txt)
done <- err
}
},
},
)
if err != nil {
return err
}
if res.IsError() {
txt, _ := io.ReadAll(res.Body)
return fmt.Errorf("%s", txt)
err = <-done
if err != nil {
log.Println("DEL", err)
}
return nil
return err
}
func (ess *ElasticsearchStorage) SaveEvent(event *nostr.Event) error {
@ -228,23 +128,36 @@ func (ess *ElasticsearchStorage) SaveEvent(event *nostr.Event) error {
return err
}
req := esapi.IndexRequest{
Index: ess.indexName,
DocumentID: event.ID,
Body: bytes.NewReader(data),
// Refresh: "true",
done := make(chan error)
// adapted from:
// https://github.com/elastic/go-elasticsearch/blob/main/_examples/bulk/indexer.go#L196
err = ess.bi.Add(
context.Background(),
esutil.BulkIndexerItem{
Action: "index",
DocumentID: event.ID,
Body: bytes.NewReader(data),
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
close(done)
},
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
if err != nil {
done <- err
} else {
err := fmt.Errorf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
done <- err
}
},
},
)
if err != nil {
return err
}
_, err = req.Do(context.Background(), ess.es)
err = <-done
if err != nil {
log.Println("SAVE", err)
}
return err
}
func pprint(j []byte) {
var dst bytes.Buffer
err := json.Indent(&dst, j, "", " ")
if err != nil {
fmt.Println("invalid JSON", err, string(j))
} else {
fmt.Println(dst.String())
}
}

View File

@ -0,0 +1,213 @@
package elasticsearch
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"strings"
"github.com/elastic/go-elasticsearch/v8/esutil"
"github.com/nbd-wtf/go-nostr"
)
type EsSearchResult struct {
Took int
TimedOut bool `json:"timed_out"`
Hits struct {
Total struct {
Value int
Relation string
}
Hits []struct {
Source nostr.Event `json:"_source"`
}
}
}
func buildDsl(filter *nostr.Filter) string {
b := &strings.Builder{}
b.WriteString(`{"query": {"bool": {"filter": {"bool": {"must": [`)
prefixFilter := func(fieldName string, values []string) {
b.WriteString(`{"bool": {"should": [`)
for idx, val := range values {
if idx > 0 {
b.WriteRune(',')
}
op := "term"
if len(val) < 64 {
op = "prefix"
}
b.WriteString(fmt.Sprintf(`{"%s": {"%s": %q}}`, op, fieldName, val))
}
b.WriteString(`]}},`)
}
// ids
prefixFilter("id", filter.IDs)
// authors
prefixFilter("pubkey", filter.Authors)
// kinds
if len(filter.Kinds) > 0 {
k, _ := json.Marshal(filter.Kinds)
b.WriteString(fmt.Sprintf(`{"terms": {"kind": %s}},`, k))
}
// tags
{
b.WriteString(`{"bool": {"should": [`)
commaIdx := 0
for char, terms := range filter.Tags {
if len(terms) == 0 {
continue
}
if commaIdx > 0 {
b.WriteRune(',')
}
commaIdx++
b.WriteString(`{"bool": {"must": [`)
for _, t := range terms {
b.WriteString(fmt.Sprintf(`{"term": {"tags": %q}},`, t))
}
// add the tag type at the end
b.WriteString(fmt.Sprintf(`{"term": {"tags": %q}}`, char))
b.WriteString(`]}}`)
}
b.WriteString(`]}},`)
}
// since
if filter.Since != nil {
b.WriteString(fmt.Sprintf(`{"range": {"created_at": {"gt": %d}}},`, filter.Since.Unix()))
}
// until
if filter.Until != nil {
b.WriteString(fmt.Sprintf(`{"range": {"created_at": {"lt": %d}}},`, filter.Until.Unix()))
}
// all blocks have a trailing comma...
// add a match_all "noop" at the end
// so json is valid
b.WriteString(`{"match_all": {}}`)
b.WriteString(`]}}}}}`)
return b.String()
}
func (ess *ElasticsearchStorage) getByID(filter *nostr.Filter) ([]nostr.Event, error) {
got, err := ess.es.Mget(
esutil.NewJSONReader(filter),
ess.es.Mget.WithIndex(ess.indexName))
if err != nil {
return nil, err
}
var mgetResponse struct {
Docs []struct {
Found bool
Source nostr.Event `json:"_source"`
}
}
if err := json.NewDecoder(got.Body).Decode(&mgetResponse); err != nil {
return nil, err
}
events := make([]nostr.Event, 0, len(mgetResponse.Docs))
for _, e := range mgetResponse.Docs {
if e.Found {
events = append(events, e.Source)
}
}
return events, nil
}
func (ess *ElasticsearchStorage) QueryEvents(filter *nostr.Filter) ([]nostr.Event, error) {
// Perform the search request...
// need to build up query body...
if filter == nil {
return nil, errors.New("filter cannot be null")
}
// optimization: get by id
if isGetByID(filter) {
return ess.getByID(filter)
}
dsl := buildDsl(filter)
pprint([]byte(dsl))
limit := 1000
if filter.Limit > 0 && filter.Limit < limit {
limit = filter.Limit
}
es := ess.es
res, err := es.Search(
es.Search.WithContext(context.Background()),
es.Search.WithIndex(ess.indexName),
es.Search.WithBody(strings.NewReader(dsl)),
es.Search.WithSize(limit),
es.Search.WithSort("created_at:desc"),
// es.Search.WithTrackTotalHits(true),
// es.Search.WithPretty(),
)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
txt, _ := io.ReadAll(res.Body)
fmt.Println("oh no", string(txt))
return nil, fmt.Errorf("%s", txt)
}
var r EsSearchResult
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
return nil, err
}
events := make([]nostr.Event, len(r.Hits.Hits))
for i, e := range r.Hits.Hits {
events[i] = e.Source
}
return events, nil
}
func isGetByID(filter *nostr.Filter) bool {
isGetById := len(filter.IDs) > 0 &&
len(filter.Authors) == 0 &&
len(filter.Kinds) == 0 &&
len(filter.Tags) == 0 &&
filter.Since == nil &&
filter.Until == nil
if isGetById {
for _, id := range filter.IDs {
if len(id) != 64 {
return false
}
}
}
return isGetById
}
func pprint(j []byte) {
var dst bytes.Buffer
err := json.Indent(&dst, j, "", " ")
if err != nil {
fmt.Println("invalid JSON", err, string(j))
} else {
fmt.Println(dst.String())
}
}

View File

@ -2,7 +2,6 @@ package elasticsearch
import (
"encoding/json"
"fmt"
"testing"
"time"
@ -32,15 +31,15 @@ func TestQuery(t *testing.T) {
}
// "integration" test
ess := &ElasticsearchStorage{}
err := ess.Init()
if err != nil {
t.Error(err)
}
// ess := &ElasticsearchStorage{}
// err := ess.Init()
// if err != nil {
// t.Error(err)
// }
found, err := ess.QueryEvents(filter)
if err != nil {
t.Error(err)
}
fmt.Println(found)
// found, err := ess.QueryEvents(filter)
// if err != nil {
// t.Error(err)
// }
// fmt.Println(found)
}