migrate all built-in storage backends.

This commit is contained in:
fiatjaf
2023-05-01 19:40:16 -03:00
parent e3d4655dba
commit 4e15120111
15 changed files with 120 additions and 80 deletions

View File

@@ -12,9 +12,12 @@ import (
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esutil"
"github.com/fiatjaf/relayer/v2"
"github.com/nbd-wtf/go-nostr"
)
var _ relayer.Storage = (*ElasticsearchStorage)(nil)
type IndexedEvent struct {
Event nostr.Event `json:"event"`
ContentSearch string `json:"content_search"`
@@ -53,7 +56,6 @@ type ElasticsearchStorage struct {
}
func (ess *ElasticsearchStorage) Init() error {
if ess.IndexName == "" {
ess.IndexName = "events"
}
@@ -96,7 +98,7 @@ func (ess *ElasticsearchStorage) Init() error {
return nil
}
func (ess *ElasticsearchStorage) DeleteEvent(id string, pubkey string) error {
func (ess *ElasticsearchStorage) DeleteEvent(ctx context.Context, id string, pubkey string) error {
// first do get by ID and check that pubkeys match
// this is cheaper than doing delete by query, which also doesn't work with bulk indexer.
found, _ := ess.getByID(&nostr.Filter{IDs: []string{id}})
@@ -106,7 +108,7 @@ func (ess *ElasticsearchStorage) DeleteEvent(id string, pubkey string) error {
done := make(chan error)
err := ess.bi.Add(
context.Background(),
ctx,
esutil.BulkIndexerItem{
Action: "delete",
DocumentID: id,
@@ -137,7 +139,7 @@ func (ess *ElasticsearchStorage) DeleteEvent(id string, pubkey string) error {
return err
}
func (ess *ElasticsearchStorage) SaveEvent(evt *nostr.Event) error {
func (ess *ElasticsearchStorage) SaveEvent(ctx context.Context, evt *nostr.Event) error {
ie := &IndexedEvent{
Event: *evt,
}
@@ -163,8 +165,8 @@ func (ess *ElasticsearchStorage) SaveEvent(evt *nostr.Event) error {
// delete replaceable events
deleteIDs := []string{}
queryForDelete := func(filter *nostr.Filter) {
toDelete, _ := ess.QueryEvents(filter)
for _, e := range toDelete {
toDelete, _ := ess.QueryEvents(ctx, filter)
for e := range toDelete {
// KindRecommendServer: we can't query ES for exact content match
// so query by kind and loop over results to compare content
if evt.Kind == nostr.KindRecommendServer {
@@ -197,7 +199,7 @@ func (ess *ElasticsearchStorage) SaveEvent(evt *nostr.Event) error {
}
for _, id := range deleteIDs {
ess.bi.Add(
context.Background(),
ctx,
esutil.BulkIndexerItem{
Action: "delete",
DocumentID: id,
@@ -207,7 +209,7 @@ func (ess *ElasticsearchStorage) SaveEvent(evt *nostr.Event) error {
// adapted from:
// https://github.com/elastic/go-elasticsearch/blob/main/_examples/bulk/indexer.go#L196
err = ess.bi.Add(
context.Background(),
ctx,
esutil.BulkIndexerItem{
Action: "index",
DocumentID: evt.ID,

View File

@@ -70,12 +70,12 @@ func buildDsl(filter *nostr.Filter) ([]byte, error) {
// since
if filter.Since != nil {
dsl.Must(esquery.Range("event.created_at").Gt(filter.Since.Unix()))
dsl.Must(esquery.Range("event.created_at").Gt(filter.Since))
}
// until
if filter.Until != nil {
dsl.Must(esquery.Range("event.created_at").Lt(filter.Until.Unix()))
dsl.Must(esquery.Range("event.created_at").Lt(filter.Until))
}
// search
@@ -86,7 +86,7 @@ func buildDsl(filter *nostr.Filter) ([]byte, error) {
return json.Marshal(esquery.Query(dsl))
}
func (ess *ElasticsearchStorage) getByID(filter *nostr.Filter) ([]nostr.Event, error) {
func (ess *ElasticsearchStorage) getByID(filter *nostr.Filter) ([]*nostr.Event, error) {
got, err := ess.es.Mget(
esutil.NewJSONReader(filter),
ess.es.Mget.WithIndex(ess.IndexName))
@@ -104,24 +104,33 @@ func (ess *ElasticsearchStorage) getByID(filter *nostr.Filter) ([]nostr.Event, e
return nil, err
}
events := make([]nostr.Event, 0, len(mgetResponse.Docs))
events := make([]*nostr.Event, 0, len(mgetResponse.Docs))
for _, e := range mgetResponse.Docs {
if e.Found {
events = append(events, e.Source.Event)
events = append(events, &e.Source.Event)
}
}
return events, nil
}
func (ess *ElasticsearchStorage) QueryEvents(filter *nostr.Filter) ([]nostr.Event, error) {
func (ess *ElasticsearchStorage) QueryEvents(ctx context.Context, filter *nostr.Filter) (chan *nostr.Event, error) {
ch := make(chan *nostr.Event)
if filter == nil {
return nil, errors.New("filter cannot be null")
}
// optimization: get by id
if isGetByID(filter) {
return ess.getByID(filter)
if evts, err := ess.getByID(filter); err == nil {
for _, evt := range evts {
ch <- evt
}
close(ch)
} else {
return nil, fmt.Errorf("error getting by id: %w", err)
}
}
dsl, err := buildDsl(filter)
@@ -136,7 +145,7 @@ func (ess *ElasticsearchStorage) QueryEvents(filter *nostr.Filter) ([]nostr.Even
es := ess.es
res, err := es.Search(
es.Search.WithContext(context.Background()),
es.Search.WithContext(ctx),
es.Search.WithIndex(ess.IndexName),
es.Search.WithBody(bytes.NewReader(dsl)),
@@ -159,12 +168,14 @@ func (ess *ElasticsearchStorage) QueryEvents(filter *nostr.Filter) ([]nostr.Even
return nil, err
}
events := make([]nostr.Event, len(r.Hits.Hits))
for i, e := range r.Hits.Hits {
events[i] = e.Source.Event
}
go func() {
for _, e := range r.Hits.Hits {
ch <- &e.Source.Event
}
close(ch)
}()
return events, nil
return ch, nil
}
func isGetByID(filter *nostr.Filter) bool {

View File

@@ -5,14 +5,13 @@ import (
"encoding/json"
"fmt"
"testing"
"time"
"github.com/nbd-wtf/go-nostr"
)
func TestQuery(t *testing.T) {
now := time.Now()
yesterday := now.Add(time.Hour * -24)
now := nostr.Now()
yesterday := now - 60*60*24
filter := &nostr.Filter{
IDs: []string{"abc", "123", "971b9489b4fd4e41a85951607922b982d981fa9d55318bc304f21f390721404c"},
Kinds: []int{0, 1},
@@ -31,7 +30,6 @@ func TestQuery(t *testing.T) {
t.Fatal(err)
}
pprint(dsl)
}
func pprint(j []byte) {

View File

@@ -1,6 +1,8 @@
package postgresql
func (b PostgresBackend) DeleteEvent(id string, pubkey string) error {
_, err := b.DB.Exec("DELETE FROM event WHERE id = $1 AND pubkey = $2", id, pubkey)
import "context"
func (b PostgresBackend) DeleteEvent(ctx context.Context, id string, pubkey string) error {
_, err := b.DB.ExecContext(ctx, "DELETE FROM event WHERE id = $1 AND pubkey = $2", id, pubkey)
return err
}

View File

@@ -1,11 +1,14 @@
package postgresql
import (
"github.com/fiatjaf/relayer/v2"
"github.com/jmoiron/sqlx"
"github.com/jmoiron/sqlx/reflectx"
_ "github.com/lib/pq"
)
var _ relayer.Storage = (*PostgresBackend)(nil)
func (b *PostgresBackend) Init() error {
db, err := sqlx.Connect("postgres", b.DatabaseURL)
if err != nil {

View File

@@ -1,18 +1,20 @@
package postgresql
import (
"context"
"database/sql"
"encoding/hex"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/nbd-wtf/go-nostr"
)
func (b PostgresBackend) QueryEvents(filter *nostr.Filter) (events []nostr.Event, err error) {
func (b PostgresBackend) QueryEvents(ctx context.Context, filter *nostr.Filter) (ch chan *nostr.Event, err error) {
ch = make(chan *nostr.Event)
var conditions []string
var params []any
@@ -116,11 +118,11 @@ func (b PostgresBackend) QueryEvents(filter *nostr.Filter) (events []nostr.Event
if filter.Since != nil {
conditions = append(conditions, "created_at > ?")
params = append(params, filter.Since.Unix())
params = append(params, filter.Since)
}
if filter.Until != nil {
conditions = append(conditions, "created_at < ?")
params = append(params, filter.Until.Unix())
params = append(params, filter.Until)
}
if len(conditions) == 0 {
@@ -145,19 +147,21 @@ func (b PostgresBackend) QueryEvents(filter *nostr.Filter) (events []nostr.Event
return nil, fmt.Errorf("failed to fetch events using query %q: %w", query, err)
}
defer rows.Close()
for rows.Next() {
var evt nostr.Event
var timestamp int64
err := rows.Scan(&evt.ID, &evt.PubKey, &timestamp,
&evt.Kind, &evt.Tags, &evt.Content, &evt.Sig)
if err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
go func() {
defer rows.Close()
defer close(ch)
for rows.Next() {
var evt nostr.Event
var timestamp int64
err := rows.Scan(&evt.ID, &evt.PubKey, &timestamp,
&evt.Kind, &evt.Tags, &evt.Content, &evt.Sig)
if err != nil {
return
}
evt.CreatedAt = nostr.Timestamp(timestamp)
ch <- &evt
}
evt.CreatedAt = time.Unix(timestamp, 0)
events = append(events, evt)
}
}()
return events, nil
return ch, nil
}

View File

@@ -1,20 +1,21 @@
package postgresql
import (
"context"
"encoding/json"
"github.com/fiatjaf/relayer/storage"
"github.com/fiatjaf/relayer/v2/storage"
"github.com/nbd-wtf/go-nostr"
)
func (b *PostgresBackend) SaveEvent(evt *nostr.Event) error {
func (b *PostgresBackend) SaveEvent(ctx context.Context, evt *nostr.Event) error {
// react to different kinds of events
if evt.Kind == nostr.KindSetMetadata || evt.Kind == nostr.KindContactList || (10000 <= evt.Kind && evt.Kind < 20000) {
// delete past events from this user
b.DB.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = $2`, evt.PubKey, evt.Kind)
b.DB.ExecContext(ctx, `DELETE FROM event WHERE pubkey = $1 AND kind = $2`, evt.PubKey, evt.Kind)
} else if evt.Kind == nostr.KindRecommendServer {
// delete past recommend_server events equal to this one
b.DB.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = $2 AND content = $3`,
b.DB.ExecContext(ctx, `DELETE FROM event WHERE pubkey = $1 AND kind = $2 AND content = $3`,
evt.PubKey, evt.Kind, evt.Content)
} else if evt.Kind >= 30000 && evt.Kind < 40000 {
// NIP-33
@@ -27,11 +28,11 @@ func (b *PostgresBackend) SaveEvent(evt *nostr.Event) error {
// insert
tagsj, _ := json.Marshal(evt.Tags)
res, err := b.DB.Exec(`
res, err := b.DB.ExecContext(ctx, `
INSERT INTO event (id, pubkey, created_at, kind, tags, content, sig)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (id) DO NOTHING
`, evt.ID, evt.PubKey, evt.CreatedAt.Unix(), evt.Kind, tagsj, evt.Content, evt.Sig)
`, evt.ID, evt.PubKey, evt.CreatedAt, evt.Kind, tagsj, evt.Content, evt.Sig)
if err != nil {
return err
}
@@ -48,7 +49,7 @@ func (b *PostgresBackend) SaveEvent(evt *nostr.Event) error {
return nil
}
func (b *PostgresBackend) BeforeSave(evt *nostr.Event) {
func (b *PostgresBackend) BeforeSave(ctx context.Context, evt *nostr.Event) {
// do nothing
}

View File

@@ -1,6 +1,8 @@
package sqlite3
func (b SQLite3Backend) DeleteEvent(id string, pubkey string) error {
_, err := b.DB.Exec("DELETE FROM event WHERE id = $1 AND pubkey = $2", id, pubkey)
import "context"
func (b SQLite3Backend) DeleteEvent(ctx context.Context, id string, pubkey string) error {
_, err := b.DB.ExecContext(ctx, "DELETE FROM event WHERE id = $1 AND pubkey = $2", id, pubkey)
return err
}

View File

@@ -1,11 +1,14 @@
package sqlite3
import (
"github.com/fiatjaf/relayer/v2"
"github.com/jmoiron/sqlx"
"github.com/jmoiron/sqlx/reflectx"
_ "github.com/mattn/go-sqlite3"
)
var _ relayer.Storage = (*SQLite3Backend)(nil)
func (b *SQLite3Backend) Init() error {
db, err := sqlx.Connect("sqlite3", b.DatabaseURL)
if err != nil {

View File

@@ -1,18 +1,20 @@
package sqlite3
import (
"context"
"database/sql"
"encoding/hex"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/nbd-wtf/go-nostr"
)
func (b SQLite3Backend) QueryEvents(filter *nostr.Filter) (events []nostr.Event, err error) {
func (b SQLite3Backend) QueryEvents(ctx context.Context, filter *nostr.Filter) (ch chan *nostr.Event, err error) {
ch = make(chan *nostr.Event)
var conditions []string
var params []any
@@ -110,11 +112,11 @@ func (b SQLite3Backend) QueryEvents(filter *nostr.Filter) (events []nostr.Event,
if filter.Since != nil {
conditions = append(conditions, "created_at > ?")
params = append(params, filter.Since.Unix())
params = append(params, filter.Since)
}
if filter.Until != nil {
conditions = append(conditions, "created_at < ?")
params = append(params, filter.Until.Unix())
params = append(params, filter.Until)
}
if filter.Search != "" {
conditions = append(conditions, "content LIKE ?")
@@ -143,19 +145,21 @@ func (b SQLite3Backend) QueryEvents(filter *nostr.Filter) (events []nostr.Event,
return nil, fmt.Errorf("failed to fetch events using query %q: %w", query, err)
}
defer rows.Close()
for rows.Next() {
var evt nostr.Event
var timestamp int64
err := rows.Scan(&evt.ID, &evt.PubKey, &timestamp,
&evt.Kind, &evt.Tags, &evt.Content, &evt.Sig)
if err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
go func() {
defer rows.Close()
defer close(ch)
for rows.Next() {
var evt nostr.Event
var timestamp int64
err := rows.Scan(&evt.ID, &evt.PubKey, &timestamp,
&evt.Kind, &evt.Tags, &evt.Content, &evt.Sig)
if err != nil {
return
}
evt.CreatedAt = nostr.Timestamp(timestamp)
ch <- &evt
}
evt.CreatedAt = time.Unix(timestamp, 0)
events = append(events, evt)
}
}()
return events, nil
return ch, nil
}

View File

@@ -1,6 +1,7 @@
package sqlite3
import (
"context"
"encoding/json"
"fmt"
@@ -8,30 +9,30 @@ import (
"github.com/nbd-wtf/go-nostr"
)
func (b *SQLite3Backend) SaveEvent(evt *nostr.Event) error {
func (b *SQLite3Backend) SaveEvent(ctx context.Context, evt *nostr.Event) error {
// react to different kinds of events
if evt.Kind == nostr.KindSetMetadata || evt.Kind == nostr.KindContactList || (10000 <= evt.Kind && evt.Kind < 20000) {
// delete past events from this user
b.DB.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = $2`, evt.PubKey, evt.Kind)
b.DB.ExecContext(ctx, `DELETE FROM event WHERE pubkey = $1 AND kind = $2`, evt.PubKey, evt.Kind)
} else if evt.Kind == nostr.KindRecommendServer {
// delete past recommend_server events equal to this one
b.DB.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = $2 AND content = $3`,
b.DB.ExecContext(ctx, `DELETE FROM event WHERE pubkey = $1 AND kind = $2 AND content = $3`,
evt.PubKey, evt.Kind, evt.Content)
} else if evt.Kind >= 30000 && evt.Kind < 40000 {
// NIP-33
d := evt.Tags.GetFirst([]string{"d"})
if d != nil {
tagsLike := fmt.Sprintf(`%%"d","%s"%%`, d.Value())
b.DB.Exec(`DELETE FROM event WHERE pubkey = $1 AND kind = $2 AND tags LIKE $3`, evt.PubKey, evt.Kind, tagsLike)
b.DB.ExecContext(ctx, `DELETE FROM event WHERE pubkey = $1 AND kind = $2 AND tags LIKE $3`, evt.PubKey, evt.Kind, tagsLike)
}
}
// insert
tagsj, _ := json.Marshal(evt.Tags)
res, err := b.DB.Exec(`
res, err := b.DB.ExecContext(ctx, `
INSERT INTO event (id, pubkey, created_at, kind, tags, content, sig)
VALUES ($1, $2, $3, $4, $5, $6, $7)
`, evt.ID, evt.PubKey, evt.CreatedAt.Unix(), evt.Kind, tagsj, evt.Content, evt.Sig)
`, evt.ID, evt.PubKey, evt.CreatedAt, evt.Kind, tagsj, evt.Content, evt.Sig)
if err != nil {
return err
}
@@ -48,7 +49,7 @@ func (b *SQLite3Backend) SaveEvent(evt *nostr.Event) error {
return nil
}
func (b *SQLite3Backend) BeforeSave(evt *nostr.Event) {
func (b *SQLite3Backend) BeforeSave(ctx context.Context, evt *nostr.Event) {
// do nothing
}