get rid of WriteJSON() and replace calls with manually marshaled envelopes.

This commit is contained in:
fiatjaf 2023-05-09 17:02:22 -03:00
parent d36fbb95b9
commit ccbb44989f
No known key found for this signature in database
GPG Key ID: BAD43C4BE5C1A3A1
4 changed files with 46 additions and 50 deletions

View File

@ -4,7 +4,6 @@ import (
"bytes" "bytes"
"compress/flate" "compress/flate"
"context" "context"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -105,34 +104,6 @@ func NewConnection(ctx context.Context, url string, requestHeader http.Header) (
}, nil }, nil
} }
func (c *Connection) WriteJSON(v any) error {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.enableCompression && c.msgState.IsCompressed() {
c.flateWriter.Reset(c.writer)
if err := json.NewEncoder(c.flateWriter).Encode(v); err != nil {
return fmt.Errorf("failed to encode json: %w", err)
}
err := c.flateWriter.Close()
if err != nil {
return fmt.Errorf("failed to close flate writer: %w", err)
}
} else {
if err := json.NewEncoder(c.writer).Encode(v); err != nil {
return fmt.Errorf("failed to encode json: %w", err)
}
}
err := c.writer.Flush()
if err != nil {
return fmt.Errorf("failed to flush writer: %w", err)
}
return nil
}
func (c *Connection) Ping() error { func (c *Connection) Ping() error {
c.mutex.Lock() c.mutex.Lock()
defer c.mutex.Unlock() defer c.mutex.Unlock()

View File

@ -32,6 +32,9 @@ func ParseMessage(message []byte) Envelope {
v = &OKEnvelope{} v = &OKEnvelope{}
case bytes.Contains(label, []byte("AUTH")): case bytes.Contains(label, []byte("AUTH")):
v = &AuthEnvelope{} v = &AuthEnvelope{}
case bytes.Contains(label, []byte("CLOSE")):
x := CloseEnvelope("")
v = &x
} }
if err := v.UnmarshalJSON(message); err != nil { if err := v.UnmarshalJSON(message); err != nil {
@ -56,6 +59,7 @@ var (
_ Envelope = (*ReqEnvelope)(nil) _ Envelope = (*ReqEnvelope)(nil)
_ Envelope = (*NoticeEnvelope)(nil) _ Envelope = (*NoticeEnvelope)(nil)
_ Envelope = (*EOSEEnvelope)(nil) _ Envelope = (*EOSEEnvelope)(nil)
_ Envelope = (*CloseEnvelope)(nil)
_ Envelope = (*OKEnvelope)(nil) _ Envelope = (*OKEnvelope)(nil)
_ Envelope = (*AuthEnvelope)(nil) _ Envelope = (*AuthEnvelope)(nil)
) )
@ -173,6 +177,30 @@ func (v EOSEEnvelope) MarshalJSON() ([]byte, error) {
return w.BuildBytes() return w.BuildBytes()
} }
type CloseEnvelope string
func (_ CloseEnvelope) Label() string { return "CLOSE" }
func (v *CloseEnvelope) UnmarshalJSON(data []byte) error {
r := gjson.ParseBytes(data)
arr := r.Array()
switch len(arr) {
case 2:
*v = CloseEnvelope(arr[1].Str)
return nil
default:
return fmt.Errorf("failed to decode CLOSE envelope")
}
}
func (v CloseEnvelope) MarshalJSON() ([]byte, error) {
w := jwriter.Writer{}
w.RawString(`["CLOSE",`)
w.Raw(json.Marshal(string(v)))
w.RawString(`]`)
return w.BuildBytes()
}
type OKEnvelope struct { type OKEnvelope struct {
EventID string EventID string
OK bool OK bool

View File

@ -141,7 +141,7 @@ func (r *Relay) Connect(ctx context.Context) error {
switch env := envelope.(type) { switch env := envelope.(type) {
case *NoticeEnvelope: case *NoticeEnvelope:
debugLog("{%s} %v\n", r.URL, string(message)) debugLog("{%s} %v\n", r.URL, message)
// TODO: improve this, otherwise if the application doesn't read the notices // TODO: improve this, otherwise if the application doesn't read the notices
// we'll consume ever more memory with each new notice // we'll consume ever more memory with each new notice
go func() { go func() {
@ -152,7 +152,7 @@ func (r *Relay) Connect(ctx context.Context) error {
r.mutex.RUnlock() r.mutex.RUnlock()
}() }()
case *AuthEnvelope: case *AuthEnvelope:
debugLog("{%s} %v\n", r.URL, string(message)) debugLog("{%s} %v\n", r.URL, message)
if env.Challenge == nil { if env.Challenge == nil {
continue continue
} }
@ -165,7 +165,7 @@ func (r *Relay) Connect(ctx context.Context) error {
r.mutex.RUnlock() r.mutex.RUnlock()
}() }()
case *EventEnvelope: case *EventEnvelope:
debugLog("{%s} %v\n", r.URL, string(message)) debugLog("{%s} %v\n", r.URL, message)
if env.SubscriptionID == nil { if env.SubscriptionID == nil {
continue continue
} }
@ -202,7 +202,7 @@ func (r *Relay) Connect(ctx context.Context) error {
}() }()
} }
case *EOSEEnvelope: case *EOSEEnvelope:
debugLog("{%s} %v\n", r.URL, string(message)) debugLog("{%s} %v\n", r.URL, message)
if subscription, ok := r.subscriptions.Load(string(*env)); ok { if subscription, ok := r.subscriptions.Load(string(*env)); ok {
subscription.emitEose.Do(func() { subscription.emitEose.Do(func() {
subscription.EndOfStoredEvents <- struct{}{} subscription.EndOfStoredEvents <- struct{}{}
@ -256,10 +256,10 @@ func (r *Relay) Publish(ctx context.Context, event Event) (Status, error) {
defer r.okCallbacks.Delete(event.ID) defer r.okCallbacks.Delete(event.ID)
// publish event // publish event
message := []any{"EVENT", event} envb, _ := EventEnvelope{Event: event}.MarshalJSON()
debugLog("{%s} sending %v\n", r.URL, message) debugLog("{%s} sending %v\n", r.URL, envb)
status = PublishStatusSent status = PublishStatusSent
if err := r.Connection.WriteJSON(message); err != nil { if err := r.Connection.WriteMessage(envb); err != nil {
status = PublishStatusFailed status = PublishStatusFailed
return status, err return status, err
} }
@ -338,9 +338,9 @@ func (r *Relay) Auth(ctx context.Context, event Event) (Status, error) {
defer r.okCallbacks.Delete(event.ID) defer r.okCallbacks.Delete(event.ID)
// send AUTH // send AUTH
authResponse := []any{"AUTH", event} authResponse, _ := AuthEnvelope{Event: event}.MarshalJSON()
debugLog("{%s} sending %v\n", r.URL, authResponse) debugLog("{%s} sending %v\n", r.URL, authResponse)
if err := r.Connection.WriteJSON(authResponse); err != nil { if err := r.Connection.WriteMessage(authResponse); err != nil {
// status will be "failed" // status will be "failed"
return status, err return status, err
} }

View File

@ -30,7 +30,8 @@ type EventMessage struct {
} }
// SetLabel puts a label on the subscription that is prepended to the id that is sent to relays, // SetLabel puts a label on the subscription that is prepended to the id that is sent to relays,
// it's only useful for debugging and sanity purposes. //
// it's only useful for debugging and sanity purposes.
func (sub *Subscription) SetLabel(label string) { func (sub *Subscription) SetLabel(label string) {
sub.label = label sub.label = label
} }
@ -46,9 +47,10 @@ func (sub *Subscription) Unsub() {
sub.mutex.Lock() sub.mutex.Lock()
defer sub.mutex.Unlock() defer sub.mutex.Unlock()
message := []any{"CLOSE", sub.GetID()} closeMsg := CloseEnvelope(sub.GetID())
debugLog("{%s} sending %v", sub.Relay.URL, message) closeb, _ := (&closeMsg).MarshalJSON()
sub.conn.WriteJSON(message) debugLog("{%s} sending %v", sub.Relay.URL, closeb)
sub.conn.WriteMessage(closeb)
if sub.stopped == false && sub.Events != nil { if sub.stopped == false && sub.Events != nil {
close(sub.Events) close(sub.Events)
@ -66,14 +68,9 @@ func (sub *Subscription) Sub(ctx context.Context, filters Filters) {
func (sub *Subscription) Fire() error { func (sub *Subscription) Fire() error {
sub.Relay.subscriptions.Store(sub.GetID(), sub) sub.Relay.subscriptions.Store(sub.GetID(), sub)
message := []any{"REQ", sub.GetID()} reqb, _ := ReqEnvelope{sub.GetID(), sub.Filters}.MarshalJSON()
for _, filter := range sub.Filters { debugLog("{%s} sending %v", sub.Relay.URL, reqb)
message = append(message, filter) err := sub.conn.WriteMessage(reqb)
}
debugLog("{%s} sending %v", sub.Relay.URL, message)
err := sub.conn.WriteJSON(message)
if err != nil { if err != nil {
sub.cancel() sub.cancel()
return fmt.Errorf("failed to write: %w", err) return fmt.Errorf("failed to write: %w", err)