diff --git a/go.mod b/go.mod index 167631e..7277259 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/fiatjaf/relayer +module github.com/fiatjaf/relayer/v2 go 1.18 @@ -7,7 +7,7 @@ require ( github.com/aquasecurity/esquery v0.2.0 github.com/cockroachdb/pebble v0.0.0-20220723153705-3fc374e4dc66 github.com/elastic/go-elasticsearch/v8 v8.6.0 - github.com/gorilla/mux v1.8.0 + github.com/fiatjaf/relayer v1.7.3 github.com/gorilla/websocket v1.5.0 github.com/grokify/html-strip-tags-go v0.0.1 github.com/jb55/lnsocket/go v0.0.0-20220725174341-b98b5cd37bb6 @@ -18,7 +18,6 @@ require ( github.com/mmcdole/gofeed v1.1.3 github.com/nbd-wtf/go-nostr v0.17.1 github.com/rif/cache2go v1.0.0 - github.com/rs/cors v1.7.0 github.com/stevelacy/daz v0.1.4 github.com/tidwall/gjson v1.14.1 golang.org/x/exp v0.0.0-20230224173230-c95f2b4c22f2 @@ -59,6 +58,7 @@ require ( github.com/go-errors/errors v1.0.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/gorilla/mux v1.8.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.11 // indirect github.com/kkdai/bstream v1.0.0 // indirect @@ -79,6 +79,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.1 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/rs/cors v1.7.0 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect diff --git a/go.sum b/go.sum index 2fe363f..86cbde5 100644 --- a/go.sum +++ b/go.sum @@ -142,6 +142,8 @@ github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/fergusstrange/embedded-postgres v1.10.0 h1:YnwF6xAQYmKLAXXrrRx4rHDLih47YJwVPvg8jeKfdNg= +github.com/fiatjaf/relayer v1.7.3 h1:HbE67EFsabLO4bvbuB3uMRfRizsvDxseCIaeQHQstv8= +github.com/fiatjaf/relayer v1.7.3/go.mod h1:cQGM8YSoU/7I79Mg9ULlLQWYm/U54/B/4k60fRXEY2o= github.com/flosch/pongo2 v0.0.0-20190707114632-bbf5a6c351f4/go.mod h1:T9YF2M40nIgbVgp3rreNmTged+9HrbNTIQf1PsaIiTA= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/form3tech-oss/jwt-go v3.2.3+incompatible h1:7ZaBxOI7TMoYBfyA3cQHErNNyAWIKUMIwqxEtgHOs5c= diff --git a/handlers.go b/handlers.go index a636f74..10372d9 100644 --- a/handlers.go +++ b/handlers.go @@ -241,6 +241,10 @@ func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) { break } } + + // exhaust the channel (in case we broke out of it early) so it is closed by the storage + for range events { + } } ws.WriteJSON([]interface{}{"EOSE", id}) diff --git a/interface.go b/interface.go index 1b4f336..dbe65fd 100644 --- a/interface.go +++ b/interface.go @@ -70,6 +70,8 @@ type Storage interface { Init() error // QueryEvents is invoked upon a client's REQ as described in NIP-01. + // it should return a channel with the events as they're recovered from a database. + // the channel should be closed after the events are all delivered. QueryEvents(ctx context.Context, filter *nostr.Filter) (chan *nostr.Event, error) // DeleteEvent is used to handle deletion events, as per NIP-09. DeleteEvent(ctx context.Context, id string, pubkey string) error diff --git a/storage/elasticsearch/elasticsearch.go b/storage/elasticsearch/elasticsearch.go index 74e2185..589fb2d 100644 --- a/storage/elasticsearch/elasticsearch.go +++ b/storage/elasticsearch/elasticsearch.go @@ -12,9 +12,12 @@ import ( "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/esutil" + "github.com/fiatjaf/relayer/v2" "github.com/nbd-wtf/go-nostr" ) +var _ relayer.Storage = (*ElasticsearchStorage)(nil) + type IndexedEvent struct { Event nostr.Event `json:"event"` ContentSearch string `json:"content_search"` @@ -53,7 +56,6 @@ type ElasticsearchStorage struct { } func (ess *ElasticsearchStorage) Init() error { - if ess.IndexName == "" { ess.IndexName = "events" } @@ -96,7 +98,7 @@ func (ess *ElasticsearchStorage) Init() error { return nil } -func (ess *ElasticsearchStorage) DeleteEvent(id string, pubkey string) error { +func (ess *ElasticsearchStorage) DeleteEvent(ctx context.Context, id string, pubkey string) error { // first do get by ID and check that pubkeys match // this is cheaper than doing delete by query, which also doesn't work with bulk indexer. found, _ := ess.getByID(&nostr.Filter{IDs: []string{id}}) @@ -106,7 +108,7 @@ func (ess *ElasticsearchStorage) DeleteEvent(id string, pubkey string) error { done := make(chan error) err := ess.bi.Add( - context.Background(), + ctx, esutil.BulkIndexerItem{ Action: "delete", DocumentID: id, @@ -137,7 +139,7 @@ func (ess *ElasticsearchStorage) DeleteEvent(id string, pubkey string) error { return err } -func (ess *ElasticsearchStorage) SaveEvent(evt *nostr.Event) error { +func (ess *ElasticsearchStorage) SaveEvent(ctx context.Context, evt *nostr.Event) error { ie := &IndexedEvent{ Event: *evt, } @@ -163,8 +165,8 @@ func (ess *ElasticsearchStorage) SaveEvent(evt *nostr.Event) error { // delete replaceable events deleteIDs := []string{} queryForDelete := func(filter *nostr.Filter) { - toDelete, _ := ess.QueryEvents(filter) - for _, e := range toDelete { + toDelete, _ := ess.QueryEvents(ctx, filter) + for e := range toDelete { // KindRecommendServer: we can't query ES for exact content match // so query by kind and loop over results to compare content if evt.Kind == nostr.KindRecommendServer { @@ -197,7 +199,7 @@ func (ess *ElasticsearchStorage) SaveEvent(evt *nostr.Event) error { } for _, id := range deleteIDs { ess.bi.Add( - context.Background(), + ctx, esutil.BulkIndexerItem{ Action: "delete", DocumentID: id, @@ -207,7 +209,7 @@ func (ess *ElasticsearchStorage) SaveEvent(evt *nostr.Event) error { // adapted from: // https://github.com/elastic/go-elasticsearch/blob/main/_examples/bulk/indexer.go#L196 err = ess.bi.Add( - context.Background(), + ctx, esutil.BulkIndexerItem{ Action: "index", DocumentID: evt.ID, diff --git a/storage/elasticsearch/query.go b/storage/elasticsearch/query.go index 2d3788b..56b1336 100644 --- a/storage/elasticsearch/query.go +++ b/storage/elasticsearch/query.go @@ -70,12 +70,12 @@ func buildDsl(filter *nostr.Filter) ([]byte, error) { // since if filter.Since != nil { - dsl.Must(esquery.Range("event.created_at").Gt(filter.Since.Unix())) + dsl.Must(esquery.Range("event.created_at").Gt(filter.Since)) } // until if filter.Until != nil { - dsl.Must(esquery.Range("event.created_at").Lt(filter.Until.Unix())) + dsl.Must(esquery.Range("event.created_at").Lt(filter.Until)) } // search @@ -86,7 +86,7 @@ func buildDsl(filter *nostr.Filter) ([]byte, error) { return json.Marshal(esquery.Query(dsl)) } -func (ess *ElasticsearchStorage) getByID(filter *nostr.Filter) ([]nostr.Event, error) { +func (ess *ElasticsearchStorage) getByID(filter *nostr.Filter) ([]*nostr.Event, error) { got, err := ess.es.Mget( esutil.NewJSONReader(filter), ess.es.Mget.WithIndex(ess.IndexName)) @@ -104,24 +104,33 @@ func (ess *ElasticsearchStorage) getByID(filter *nostr.Filter) ([]nostr.Event, e return nil, err } - events := make([]nostr.Event, 0, len(mgetResponse.Docs)) + events := make([]*nostr.Event, 0, len(mgetResponse.Docs)) for _, e := range mgetResponse.Docs { if e.Found { - events = append(events, e.Source.Event) + events = append(events, &e.Source.Event) } } return events, nil } -func (ess *ElasticsearchStorage) QueryEvents(filter *nostr.Filter) ([]nostr.Event, error) { +func (ess *ElasticsearchStorage) QueryEvents(ctx context.Context, filter *nostr.Filter) (chan *nostr.Event, error) { + ch := make(chan *nostr.Event) + if filter == nil { return nil, errors.New("filter cannot be null") } // optimization: get by id if isGetByID(filter) { - return ess.getByID(filter) + if evts, err := ess.getByID(filter); err == nil { + for _, evt := range evts { + ch <- evt + } + close(ch) + } else { + return nil, fmt.Errorf("error getting by id: %w", err) + } } dsl, err := buildDsl(filter) @@ -136,7 +145,7 @@ func (ess *ElasticsearchStorage) QueryEvents(filter *nostr.Filter) ([]nostr.Even es := ess.es res, err := es.Search( - es.Search.WithContext(context.Background()), + es.Search.WithContext(ctx), es.Search.WithIndex(ess.IndexName), es.Search.WithBody(bytes.NewReader(dsl)), @@ -159,12 +168,14 @@ func (ess *ElasticsearchStorage) QueryEvents(filter *nostr.Filter) ([]nostr.Even return nil, err } - events := make([]nostr.Event, len(r.Hits.Hits)) - for i, e := range r.Hits.Hits { - events[i] = e.Source.Event - } + go func() { + for _, e := range r.Hits.Hits { + ch <- &e.Source.Event + } + close(ch) + }() - return events, nil + return ch, nil } func isGetByID(filter *nostr.Filter) bool { diff --git a/storage/elasticsearch/query_test.go b/storage/elasticsearch/query_test.go index f30d390..4f58801 100644 --- a/storage/elasticsearch/query_test.go +++ b/storage/elasticsearch/query_test.go @@ -5,14 +5,13 @@ import ( "encoding/json" "fmt" "testing" - "time" "github.com/nbd-wtf/go-nostr" ) func TestQuery(t *testing.T) { - now := time.Now() - yesterday := now.Add(time.Hour * -24) + now := nostr.Now() + yesterday := now - 60*60*24 filter := &nostr.Filter{ IDs: []string{"abc", "123", "971b9489b4fd4e41a85951607922b982d981fa9d55318bc304f21f390721404c"}, Kinds: []int{0, 1}, @@ -31,7 +30,6 @@ func TestQuery(t *testing.T) { t.Fatal(err) } pprint(dsl) - } func pprint(j []byte) { diff --git a/storage/postgresql/delete.go b/storage/postgresql/delete.go index f71902e..b164045 100644 --- a/storage/postgresql/delete.go +++ b/storage/postgresql/delete.go @@ -1,6 +1,8 @@ package postgresql -func (b PostgresBackend) DeleteEvent(id string, pubkey string) error { - _, err := b.DB.Exec("DELETE FROM event WHERE id = $1 AND pubkey = $2", id, pubkey) +import "context" + +func (b PostgresBackend) DeleteEvent(ctx context.Context, id string, pubkey string) error { + _, err := b.DB.ExecContext(ctx, "DELETE FROM event WHERE id = $1 AND pubkey = $2", id, pubkey) return err } diff --git a/storage/postgresql/init.go b/storage/postgresql/init.go index 15f03f7..edc8c89 100644 --- a/storage/postgresql/init.go +++ b/storage/postgresql/init.go @@ -1,11 +1,14 @@ package postgresql import ( + "github.com/fiatjaf/relayer/v2" "github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx/reflectx" _ "github.com/lib/pq" ) +var _ relayer.Storage = (*PostgresBackend)(nil) + func (b *PostgresBackend) Init() error { db, err := sqlx.Connect("postgres", b.DatabaseURL) if err != nil { diff --git a/storage/postgresql/query.go b/storage/postgresql/query.go index 7c250ed..54226e9 100644 --- a/storage/postgresql/query.go +++ b/storage/postgresql/query.go @@ -1,18 +1,20 @@ package postgresql import ( + "context" "database/sql" "encoding/hex" "errors" "fmt" "strconv" "strings" - "time" "github.com/nbd-wtf/go-nostr" ) -func (b PostgresBackend) QueryEvents(filter *nostr.Filter) (events []nostr.Event, err error) { +func (b PostgresBackend) QueryEvents(ctx context.Context, filter *nostr.Filter) (ch chan *nostr.Event, err error) { + ch = make(chan *nostr.Event) + var conditions []string var params []any @@ -116,11 +118,11 @@ func (b PostgresBackend) QueryEvents(filter *nostr.Filter) (events []nostr.Event if filter.Since != nil { conditions = append(conditions, "created_at > ?") - params = append(params, filter.Since.Unix()) + params = append(params, filter.Since) } if filter.Until != nil { conditions = append(conditions, "created_at < ?") - params = append(params, filter.Until.Unix()) + params = append(params, filter.Until) } if len(conditions) == 0 { @@ -145,19 +147,21 @@ func (b PostgresBackend) QueryEvents(filter *nostr.Filter) (events []nostr.Event return nil, fmt.Errorf("failed to fetch events using query %q: %w", query, err) } - defer rows.Close() - - for rows.Next() { - var evt nostr.Event - var timestamp int64 - err := rows.Scan(&evt.ID, &evt.PubKey, ×tamp, - &evt.Kind, &evt.Tags, &evt.Content, &evt.Sig) - if err != nil { - return nil, fmt.Errorf("failed to scan row: %w", err) + go func() { + defer rows.Close() + defer close(ch) + for rows.Next() { + var evt nostr.Event + var timestamp int64 + err := rows.Scan(&evt.ID, &evt.PubKey, ×tamp, + &evt.Kind, &evt.Tags, &evt.Content, &evt.Sig) + if err != nil { + return + } + evt.CreatedAt = nostr.Timestamp(timestamp) + ch <- &evt } - evt.CreatedAt = time.Unix(timestamp, 0) - events = append(events, evt) - } + }() - return events, nil + return ch, nil } diff --git a/storage/postgresql/save.go b/storage/postgresql/save.go index e160ebf..6f57f60 100644 --- a/storage/postgresql/save.go +++ b/storage/postgresql/save.go @@ -1,20 +1,21 @@ package postgresql import ( + "context" "encoding/json" - "github.com/fiatjaf/relayer/storage" + "github.com/fiatjaf/relayer/v2/storage" "github.com/nbd-wtf/go-nostr" ) -func (b *PostgresBackend) SaveEvent(evt *nostr.Event) error { +func (b *PostgresBackend) SaveEvent(ctx context.Context, evt *nostr.Event) error { // react to different kinds of events if evt.Kind == nostr.KindSetMetadata || evt.Kind == nostr.KindContactList || (10000 <= evt.Kind && evt.Kind < 20000) { // delete past events from this user - b.DB.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = $2`, evt.PubKey, evt.Kind) + b.DB.ExecContext(ctx, `DELETE FROM event WHERE pubkey = $1 AND kind = $2`, evt.PubKey, evt.Kind) } else if evt.Kind == nostr.KindRecommendServer { // delete past recommend_server events equal to this one - b.DB.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = $2 AND content = $3`, + b.DB.ExecContext(ctx, `DELETE FROM event WHERE pubkey = $1 AND kind = $2 AND content = $3`, evt.PubKey, evt.Kind, evt.Content) } else if evt.Kind >= 30000 && evt.Kind < 40000 { // NIP-33 @@ -27,11 +28,11 @@ func (b *PostgresBackend) SaveEvent(evt *nostr.Event) error { // insert tagsj, _ := json.Marshal(evt.Tags) - res, err := b.DB.Exec(` + res, err := b.DB.ExecContext(ctx, ` INSERT INTO event (id, pubkey, created_at, kind, tags, content, sig) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (id) DO NOTHING - `, evt.ID, evt.PubKey, evt.CreatedAt.Unix(), evt.Kind, tagsj, evt.Content, evt.Sig) + `, evt.ID, evt.PubKey, evt.CreatedAt, evt.Kind, tagsj, evt.Content, evt.Sig) if err != nil { return err } @@ -48,7 +49,7 @@ func (b *PostgresBackend) SaveEvent(evt *nostr.Event) error { return nil } -func (b *PostgresBackend) BeforeSave(evt *nostr.Event) { +func (b *PostgresBackend) BeforeSave(ctx context.Context, evt *nostr.Event) { // do nothing } diff --git a/storage/sqlite3/delete.go b/storage/sqlite3/delete.go index 06183f2..ac55485 100644 --- a/storage/sqlite3/delete.go +++ b/storage/sqlite3/delete.go @@ -1,6 +1,8 @@ package sqlite3 -func (b SQLite3Backend) DeleteEvent(id string, pubkey string) error { - _, err := b.DB.Exec("DELETE FROM event WHERE id = $1 AND pubkey = $2", id, pubkey) +import "context" + +func (b SQLite3Backend) DeleteEvent(ctx context.Context, id string, pubkey string) error { + _, err := b.DB.ExecContext(ctx, "DELETE FROM event WHERE id = $1 AND pubkey = $2", id, pubkey) return err } diff --git a/storage/sqlite3/init.go b/storage/sqlite3/init.go index 4a17aa7..cb7fe0d 100644 --- a/storage/sqlite3/init.go +++ b/storage/sqlite3/init.go @@ -1,11 +1,14 @@ package sqlite3 import ( + "github.com/fiatjaf/relayer/v2" "github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx/reflectx" _ "github.com/mattn/go-sqlite3" ) +var _ relayer.Storage = (*SQLite3Backend)(nil) + func (b *SQLite3Backend) Init() error { db, err := sqlx.Connect("sqlite3", b.DatabaseURL) if err != nil { diff --git a/storage/sqlite3/query.go b/storage/sqlite3/query.go index 9287c18..1a84a35 100644 --- a/storage/sqlite3/query.go +++ b/storage/sqlite3/query.go @@ -1,18 +1,20 @@ package sqlite3 import ( + "context" "database/sql" "encoding/hex" "errors" "fmt" "strconv" "strings" - "time" "github.com/nbd-wtf/go-nostr" ) -func (b SQLite3Backend) QueryEvents(filter *nostr.Filter) (events []nostr.Event, err error) { +func (b SQLite3Backend) QueryEvents(ctx context.Context, filter *nostr.Filter) (ch chan *nostr.Event, err error) { + ch = make(chan *nostr.Event) + var conditions []string var params []any @@ -110,11 +112,11 @@ func (b SQLite3Backend) QueryEvents(filter *nostr.Filter) (events []nostr.Event, if filter.Since != nil { conditions = append(conditions, "created_at > ?") - params = append(params, filter.Since.Unix()) + params = append(params, filter.Since) } if filter.Until != nil { conditions = append(conditions, "created_at < ?") - params = append(params, filter.Until.Unix()) + params = append(params, filter.Until) } if filter.Search != "" { conditions = append(conditions, "content LIKE ?") @@ -143,19 +145,21 @@ func (b SQLite3Backend) QueryEvents(filter *nostr.Filter) (events []nostr.Event, return nil, fmt.Errorf("failed to fetch events using query %q: %w", query, err) } - defer rows.Close() - - for rows.Next() { - var evt nostr.Event - var timestamp int64 - err := rows.Scan(&evt.ID, &evt.PubKey, ×tamp, - &evt.Kind, &evt.Tags, &evt.Content, &evt.Sig) - if err != nil { - return nil, fmt.Errorf("failed to scan row: %w", err) + go func() { + defer rows.Close() + defer close(ch) + for rows.Next() { + var evt nostr.Event + var timestamp int64 + err := rows.Scan(&evt.ID, &evt.PubKey, ×tamp, + &evt.Kind, &evt.Tags, &evt.Content, &evt.Sig) + if err != nil { + return + } + evt.CreatedAt = nostr.Timestamp(timestamp) + ch <- &evt } - evt.CreatedAt = time.Unix(timestamp, 0) - events = append(events, evt) - } + }() - return events, nil + return ch, nil } diff --git a/storage/sqlite3/save.go b/storage/sqlite3/save.go index 78df76f..b40d5b2 100644 --- a/storage/sqlite3/save.go +++ b/storage/sqlite3/save.go @@ -1,6 +1,7 @@ package sqlite3 import ( + "context" "encoding/json" "fmt" @@ -8,30 +9,30 @@ import ( "github.com/nbd-wtf/go-nostr" ) -func (b *SQLite3Backend) SaveEvent(evt *nostr.Event) error { +func (b *SQLite3Backend) SaveEvent(ctx context.Context, evt *nostr.Event) error { // react to different kinds of events if evt.Kind == nostr.KindSetMetadata || evt.Kind == nostr.KindContactList || (10000 <= evt.Kind && evt.Kind < 20000) { // delete past events from this user - b.DB.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = $2`, evt.PubKey, evt.Kind) + b.DB.ExecContext(ctx, `DELETE FROM event WHERE pubkey = $1 AND kind = $2`, evt.PubKey, evt.Kind) } else if evt.Kind == nostr.KindRecommendServer { // delete past recommend_server events equal to this one - b.DB.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = $2 AND content = $3`, + b.DB.ExecContext(ctx, `DELETE FROM event WHERE pubkey = $1 AND kind = $2 AND content = $3`, evt.PubKey, evt.Kind, evt.Content) } else if evt.Kind >= 30000 && evt.Kind < 40000 { // NIP-33 d := evt.Tags.GetFirst([]string{"d"}) if d != nil { tagsLike := fmt.Sprintf(`%%"d","%s"%%`, d.Value()) - b.DB.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = $2 AND tags LIKE $3`, evt.PubKey, evt.Kind, tagsLike) + b.DB.ExecContext(ctx, `DELETE FROM event WHERE pubkey = $1 AND kind = $2 AND tags LIKE $3`, evt.PubKey, evt.Kind, tagsLike) } } // insert tagsj, _ := json.Marshal(evt.Tags) - res, err := b.DB.Exec(` + res, err := b.DB.ExecContext(ctx, ` INSERT INTO event (id, pubkey, created_at, kind, tags, content, sig) VALUES ($1, $2, $3, $4, $5, $6, $7) - `, evt.ID, evt.PubKey, evt.CreatedAt.Unix(), evt.Kind, tagsj, evt.Content, evt.Sig) + `, evt.ID, evt.PubKey, evt.CreatedAt, evt.Kind, tagsj, evt.Content, evt.Sig) if err != nil { return err } @@ -48,7 +49,7 @@ func (b *SQLite3Backend) SaveEvent(evt *nostr.Event) error { return nil } -func (b *SQLite3Backend) BeforeSave(evt *nostr.Event) { +func (b *SQLite3Backend) BeforeSave(ctx context.Context, evt *nostr.Event) { // do nothing }