From d306c03369dca6e277615f6f746076db0063d7d6 Mon Sep 17 00:00:00 2001 From: Steve Perkins Date: Mon, 6 Feb 2023 13:30:29 -0500 Subject: [PATCH] basic elasticsearch storage example --- go.mod | 3 + go.sum | 6 + search/docker-compose.yml | 43 +++++ search/main.go | 65 +++++++ storage/elasticsearch/elasticsearch.go | 250 +++++++++++++++++++++++++ storage/elasticsearch/query_test.go | 46 +++++ 6 files changed, 413 insertions(+) create mode 100644 search/docker-compose.yml create mode 100644 search/main.go create mode 100644 storage/elasticsearch/elasticsearch.go create mode 100644 storage/elasticsearch/query_test.go diff --git a/go.mod b/go.mod index 08d23ee..afc7fd4 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,8 @@ go 1.18 require ( github.com/PuerkitoBio/goquery v1.8.0 github.com/cockroachdb/pebble v0.0.0-20220723153705-3fc374e4dc66 + github.com/elastic/go-elasticsearch v0.0.0 + github.com/elastic/go-elasticsearch/v8 v8.6.0 github.com/gorilla/mux v1.8.0 github.com/gorilla/websocket v1.4.2 github.com/grokify/html-strip-tags-go v0.0.1 @@ -50,6 +52,7 @@ require ( github.com/decred/dcrd/crypto/blake256 v1.0.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect github.com/decred/dcrd/lru v1.0.0 // indirect + github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c // indirect github.com/go-errors/errors v1.0.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect diff --git a/go.sum b/go.sum index a2f502e..262025e 100644 --- a/go.sum +++ b/go.sum @@ -123,6 +123,12 @@ github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4 github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dvyukov/go-fuzz v0.0.0-20210602112143-b1f3d6f4ef4e h1:qTP1telKJHlToHlwPQNmVg4yfMDMHe4Z3SYmzkrvA2M= github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM= +github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c h1:onA2RpIyeCPvYAj1LFYiiMTrSpqVINWMfYFRS7lofJs= +github.com/elastic/elastic-transport-go/v8 v8.0.0-20211216131617-bbee439d559c/go.mod h1:87Tcz8IVNe6rVSLdBux1o/PEItLtyabHU3naC7IoqKI= +github.com/elastic/go-elasticsearch v0.0.0 h1:Pd5fqOuBxKxv83b0+xOAJDAkziWYwFinWnBO0y+TZaA= +github.com/elastic/go-elasticsearch v0.0.0/go.mod h1:TkBSJBuTyFdBnrNqoPc54FN0vKf5c04IdM4zuStJ7xg= +github.com/elastic/go-elasticsearch/v8 v8.6.0 h1:xMaSe8jIh7NHzmNo9YBkewmaD2Pr+tX+zLkXxhieny4= +github.com/elastic/go-elasticsearch/v8 v8.6.0/go.mod h1:Usvydt+x0dv9a1TzEUaovqbJor8rmOHy5dSmPeMAE2k= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= diff --git a/search/docker-compose.yml b/search/docker-compose.yml new file mode 100644 index 0000000..40479cb --- /dev/null +++ b/search/docker-compose.yml @@ -0,0 +1,43 @@ +version: "3.8" +services: + + # relay: + # build: + # context: ../ + # dockerfile: ./basic/Dockerfile + # environment: + # PORT: 2700 + # POSTGRESQL_DATABASE: postgres://nostr:nostr@postgres:5432/nostr?sslmode=disable + # depends_on: + # postgres: + # condition: service_healthy + # ports: + # - 2700:2700 + # command: "./basic/relayer-basic" + + elasticsearch: + container_name: elasticsearch + image: docker.elastic.co/elasticsearch/elasticsearch:8.6.0 + restart: always + environment: + - network.host=0.0.0.0 + - discovery.type=single-node + - cluster.name=docker-cluster + - node.name=cluster1-node1 + - xpack.license.self_generated.type=basic + - xpack.security.enabled=false + - "ES_JAVA_OPTS=-Xms${ES_MEM:-4g} -Xmx${ES_MEM:-4g}" + ports: + - '127.0.0.1:9200:9200' + ulimits: + memlock: + soft: -1 + hard: -1 + healthcheck: + test: + ["CMD-SHELL", "curl --silent --fail elasticsearch:9200/_cluster/health || exit 1"] + interval: 10s + timeout: 5s + retries: 5 + + diff --git a/search/main.go b/search/main.go new file mode 100644 index 0000000..25947d2 --- /dev/null +++ b/search/main.go @@ -0,0 +1,65 @@ +package main + +import ( + "encoding/json" + "fmt" + "log" + + "github.com/fiatjaf/relayer" + "github.com/fiatjaf/relayer/storage/elasticsearch" + "github.com/kelseyhightower/envconfig" + "github.com/nbd-wtf/go-nostr" +) + +type Relay struct { + storage *elasticsearch.ElasticsearchStorage +} + +func (r *Relay) Name() string { + return "BasicRelay" +} + +func (r *Relay) Storage() relayer.Storage { + return r.storage +} + +func (r *Relay) OnInitialized(*relayer.Server) {} + +func (r *Relay) Init() error { + err := envconfig.Process("", r) + if err != nil { + return fmt.Errorf("couldn't process envconfig: %w", err) + } + + return nil +} + +func (r *Relay) AcceptEvent(evt *nostr.Event) bool { + // block events that are too large + jsonb, _ := json.Marshal(evt) + if len(jsonb) > 10000 { + return false + } + + return true +} + +func (r *Relay) BeforeSave(evt *nostr.Event) { + // do nothing +} + +func (r *Relay) AfterSave(evt *nostr.Event) { + +} + +func main() { + r := Relay{} + if err := envconfig.Process("", &r); err != nil { + log.Fatalf("failed to read from env: %v", err) + return + } + r.storage = &elasticsearch.ElasticsearchStorage{} + if err := relayer.Start(&r); err != nil { + log.Fatalf("server terminated: %v", err) + } +} diff --git a/storage/elasticsearch/elasticsearch.go b/storage/elasticsearch/elasticsearch.go new file mode 100644 index 0000000..acec779 --- /dev/null +++ b/storage/elasticsearch/elasticsearch.go @@ -0,0 +1,250 @@ +package elasticsearch + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "strings" + + "github.com/elastic/go-elasticsearch/esapi" + "github.com/elastic/go-elasticsearch/v8" + "github.com/nbd-wtf/go-nostr" +) + +/* +1. create index with mapping in Init +2. implement delete +3. build query in QueryEvents +4. implement replaceable events +*/ + +var indexMapping = ` +{ + "settings": { + "number_of_shards": 1, + "number_of_replicas": 0 + }, + "mappings": { + "dynamic": false, + "properties": { + "id": {"type": "keyword"}, + "pubkey": {"type": "keyword"}, + "kind": {"type": "integer"}, + "tags": {"type": "keyword"}, + "created_at": {"type": "date"} + } + } +} +` + +type ElasticsearchStorage struct { + es *elasticsearch.Client + indexName string +} + +func (ess *ElasticsearchStorage) Init() error { + es, err := elasticsearch.NewDefaultClient() + if err != nil { + return err + } + // log.Println(elasticsearch.Version) + // log.Println(es.Info()) + + // todo: config + ess.indexName = "test" + + // todo: don't delete index every time + // es.Indices.Delete([]string{ess.indexName}) + + res, err := es.Indices.Create(ess.indexName, es.Indices.Create.WithBody(strings.NewReader(indexMapping))) + if err != nil { + return err + } + if res.IsError() { + body, _ := io.ReadAll(res.Body) + txt := string(body) + if !strings.Contains(txt, "resource_already_exists_exception") { + return fmt.Errorf("%s", txt) + } + } + + ess.es = es + + return nil +} + +type EsSearchResult struct { + Took int + TimedOut bool `json:"timed_out"` + Hits struct { + Total struct { + Value int + Relation string + } + Hits []struct { + Source nostr.Event `json:"_source"` + } + } +} + +func buildDsl(filter *nostr.Filter) string { + b := &strings.Builder{} + b.WriteString(`{"query": {"bool": {"filter": {"bool": {"must": [`) + + prefixFilter := func(fieldName string, values []string) { + b.WriteString(`{"bool": {"should": [`) + for idx, val := range values { + if idx > 0 { + b.WriteRune(',') + } + op := "term" + if len(val) < 64 { + op = "prefix" + } + b.WriteString(fmt.Sprintf(`{"%s": {"%s": %q}}`, op, fieldName, val)) + } + b.WriteString(`]}},`) + } + + // ids + prefixFilter("id", filter.IDs) + + // authors + prefixFilter("pubkey", filter.Authors) + + // kinds + if len(filter.Kinds) > 0 { + k, _ := json.Marshal(filter.Kinds) + b.WriteString(fmt.Sprintf(`{"terms": {"kind": %s}},`, k)) + } + + // tags + { + b.WriteString(`{"bool": {"should": [`) + commaIdx := 0 + for char, terms := range filter.Tags { + if len(terms) == 0 { + continue + } + if commaIdx > 0 { + b.WriteRune(',') + } + commaIdx++ + b.WriteString(`{"bool": {"must": [`) + for _, t := range terms { + b.WriteString(fmt.Sprintf(`{"term": {"tags": %q}},`, t)) + } + // add the tag type at the end + b.WriteString(fmt.Sprintf(`{"term": {"tags": %q}}`, char)) + b.WriteString(`]}}`) + } + b.WriteString(`]}},`) + } + + // since + if filter.Since != nil { + b.WriteString(fmt.Sprintf(`{"range": {"created_at": {"gt": %d}}},`, filter.Since.Unix())) + } + + // until + if filter.Until != nil { + b.WriteString(fmt.Sprintf(`{"range": {"created_at": {"lt": %d}}},`, filter.Until.Unix())) + } + + // all blocks have a trailing comma... + // add a match_all "noop" at the end + // so json is valid + b.WriteString(`{"match_all": {}}`) + b.WriteString(`]}}}}}`) + return b.String() +} + +func (ess *ElasticsearchStorage) QueryEvents(filter *nostr.Filter) ([]nostr.Event, error) { + // Perform the search request... + // need to build up query body... + if filter == nil { + return nil, errors.New("filter cannot be null") + } + + dsl := buildDsl(filter) + // pprint([]byte(dsl)) + + es := ess.es + res, err := es.Search( + es.Search.WithContext(context.Background()), + es.Search.WithIndex(ess.indexName), + + es.Search.WithBody(strings.NewReader(dsl)), + es.Search.WithSize(filter.Limit), + es.Search.WithSort("created_at:desc"), + + es.Search.WithTrackTotalHits(true), + es.Search.WithPretty(), + ) + if err != nil { + log.Fatalf("Error getting response: %s", err) + } + defer res.Body.Close() + + if res.IsError() { + txt, _ := io.ReadAll(res.Body) + fmt.Println("oh no", string(txt)) + return nil, fmt.Errorf("%s", txt) + } + + var r EsSearchResult + if err := json.NewDecoder(res.Body).Decode(&r); err != nil { + return nil, err + } + + events := make([]nostr.Event, len(r.Hits.Hits)) + for i, e := range r.Hits.Hits { + events[i] = e.Source + } + + return events, nil +} + +func (ess *ElasticsearchStorage) DeleteEvent(id string, pubkey string) error { + // todo: is pubkey match required? + res, err := ess.es.Delete(ess.indexName, id) + if err != nil { + return err + } + if res.IsError() { + txt, _ := io.ReadAll(res.Body) + return fmt.Errorf("%s", txt) + } + return nil +} + +func (ess *ElasticsearchStorage) SaveEvent(event *nostr.Event) error { + data, err := json.Marshal(event) + if err != nil { + return err + } + + req := esapi.IndexRequest{ + Index: ess.indexName, + DocumentID: event.ID, + Body: bytes.NewReader(data), + // Refresh: "true", + } + + _, err = req.Do(context.Background(), ess.es) + return err +} + +func pprint(j []byte) { + var dst bytes.Buffer + err := json.Indent(&dst, j, "", " ") + if err != nil { + fmt.Println("invalid JSON", err, string(j)) + } else { + fmt.Println(dst.String()) + } +} diff --git a/storage/elasticsearch/query_test.go b/storage/elasticsearch/query_test.go new file mode 100644 index 0000000..e10fc45 --- /dev/null +++ b/storage/elasticsearch/query_test.go @@ -0,0 +1,46 @@ +package elasticsearch + +import ( + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/nbd-wtf/go-nostr" +) + +func TestQuery(t *testing.T) { + now := time.Now() + yesterday := now.Add(time.Hour * -24) + filter := &nostr.Filter{ + // IDs: []string{"abc", "123", "971b9489b4fd4e41a85951607922b982d981fa9d55318bc304f21f390721404c"}, + Kinds: []int{0, 1}, + // Tags: nostr.TagMap{ + // "a": []string{"abc"}, + // "b": []string{"aaa", "bbb"}, + // }, + Since: &yesterday, + Until: &now, + Limit: 100, + } + + dsl := buildDsl(filter) + pprint([]byte(dsl)) + + if !json.Valid([]byte(dsl)) { + t.Fail() + } + + // "integration" test + ess := &ElasticsearchStorage{} + err := ess.Init() + if err != nil { + t.Error(err) + } + + found, err := ess.QueryEvents(filter) + if err != nil { + t.Error(err) + } + fmt.Println(found) +}