From ccbb44989f321fa3219099037adf481859c58df7 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Tue, 9 May 2023 17:02:22 -0300 Subject: [PATCH] get rid of WriteJSON() and replace calls with manually marshaled envelopes. --- connection.go | 29 ----------------------------- envelopes.go | 28 ++++++++++++++++++++++++++++ relay.go | 18 +++++++++--------- subscription.go | 21 +++++++++------------ 4 files changed, 46 insertions(+), 50 deletions(-) diff --git a/connection.go b/connection.go index a54fdd7..6b525b0 100644 --- a/connection.go +++ b/connection.go @@ -4,7 +4,6 @@ import ( "bytes" "compress/flate" "context" - "encoding/json" "errors" "fmt" "io" @@ -105,34 +104,6 @@ func NewConnection(ctx context.Context, url string, requestHeader http.Header) ( }, nil } -func (c *Connection) WriteJSON(v any) error { - c.mutex.Lock() - defer c.mutex.Unlock() - - if c.enableCompression && c.msgState.IsCompressed() { - c.flateWriter.Reset(c.writer) - if err := json.NewEncoder(c.flateWriter).Encode(v); err != nil { - return fmt.Errorf("failed to encode json: %w", err) - } - - err := c.flateWriter.Close() - if err != nil { - return fmt.Errorf("failed to close flate writer: %w", err) - } - } else { - if err := json.NewEncoder(c.writer).Encode(v); err != nil { - return fmt.Errorf("failed to encode json: %w", err) - } - } - - err := c.writer.Flush() - if err != nil { - return fmt.Errorf("failed to flush writer: %w", err) - } - - return nil -} - func (c *Connection) Ping() error { c.mutex.Lock() defer c.mutex.Unlock() diff --git a/envelopes.go b/envelopes.go index 5e3e8fa..f6638f9 100644 --- a/envelopes.go +++ b/envelopes.go @@ -32,6 +32,9 @@ func ParseMessage(message []byte) Envelope { v = &OKEnvelope{} case bytes.Contains(label, []byte("AUTH")): v = &AuthEnvelope{} + case bytes.Contains(label, []byte("CLOSE")): + x := CloseEnvelope("") + v = &x } if err := v.UnmarshalJSON(message); err != nil { @@ -56,6 +59,7 @@ var ( _ Envelope = (*ReqEnvelope)(nil) _ Envelope = (*NoticeEnvelope)(nil) _ Envelope = (*EOSEEnvelope)(nil) + _ Envelope = (*CloseEnvelope)(nil) _ Envelope = (*OKEnvelope)(nil) _ Envelope = (*AuthEnvelope)(nil) ) @@ -173,6 +177,30 @@ func (v EOSEEnvelope) MarshalJSON() ([]byte, error) { return w.BuildBytes() } +type CloseEnvelope string + +func (_ CloseEnvelope) Label() string { return "CLOSE" } + +func (v *CloseEnvelope) UnmarshalJSON(data []byte) error { + r := gjson.ParseBytes(data) + arr := r.Array() + switch len(arr) { + case 2: + *v = CloseEnvelope(arr[1].Str) + return nil + default: + return fmt.Errorf("failed to decode CLOSE envelope") + } +} + +func (v CloseEnvelope) MarshalJSON() ([]byte, error) { + w := jwriter.Writer{} + w.RawString(`["CLOSE",`) + w.Raw(json.Marshal(string(v))) + w.RawString(`]`) + return w.BuildBytes() +} + type OKEnvelope struct { EventID string OK bool diff --git a/relay.go b/relay.go index 48d4aaf..5add6e5 100644 --- a/relay.go +++ b/relay.go @@ -141,7 +141,7 @@ func (r *Relay) Connect(ctx context.Context) error { switch env := envelope.(type) { case *NoticeEnvelope: - debugLog("{%s} %v\n", r.URL, string(message)) + debugLog("{%s} %v\n", r.URL, message) // TODO: improve this, otherwise if the application doesn't read the notices // we'll consume ever more memory with each new notice go func() { @@ -152,7 +152,7 @@ func (r *Relay) Connect(ctx context.Context) error { r.mutex.RUnlock() }() case *AuthEnvelope: - debugLog("{%s} %v\n", r.URL, string(message)) + debugLog("{%s} %v\n", r.URL, message) if env.Challenge == nil { continue } @@ -165,7 +165,7 @@ func (r *Relay) Connect(ctx context.Context) error { r.mutex.RUnlock() }() case *EventEnvelope: - debugLog("{%s} %v\n", r.URL, string(message)) + debugLog("{%s} %v\n", r.URL, message) if env.SubscriptionID == nil { continue } @@ -202,7 +202,7 @@ func (r *Relay) Connect(ctx context.Context) error { }() } case *EOSEEnvelope: - debugLog("{%s} %v\n", r.URL, string(message)) + debugLog("{%s} %v\n", r.URL, message) if subscription, ok := r.subscriptions.Load(string(*env)); ok { subscription.emitEose.Do(func() { subscription.EndOfStoredEvents <- struct{}{} @@ -256,10 +256,10 @@ func (r *Relay) Publish(ctx context.Context, event Event) (Status, error) { defer r.okCallbacks.Delete(event.ID) // publish event - message := []any{"EVENT", event} - debugLog("{%s} sending %v\n", r.URL, message) + envb, _ := EventEnvelope{Event: event}.MarshalJSON() + debugLog("{%s} sending %v\n", r.URL, envb) status = PublishStatusSent - if err := r.Connection.WriteJSON(message); err != nil { + if err := r.Connection.WriteMessage(envb); err != nil { status = PublishStatusFailed return status, err } @@ -338,9 +338,9 @@ func (r *Relay) Auth(ctx context.Context, event Event) (Status, error) { defer r.okCallbacks.Delete(event.ID) // send AUTH - authResponse := []any{"AUTH", event} + authResponse, _ := AuthEnvelope{Event: event}.MarshalJSON() debugLog("{%s} sending %v\n", r.URL, authResponse) - if err := r.Connection.WriteJSON(authResponse); err != nil { + if err := r.Connection.WriteMessage(authResponse); err != nil { // status will be "failed" return status, err } diff --git a/subscription.go b/subscription.go index c69cf69..978deb4 100644 --- a/subscription.go +++ b/subscription.go @@ -30,7 +30,8 @@ type EventMessage struct { } // SetLabel puts a label on the subscription that is prepended to the id that is sent to relays, -// it's only useful for debugging and sanity purposes. +// +// it's only useful for debugging and sanity purposes. func (sub *Subscription) SetLabel(label string) { sub.label = label } @@ -46,9 +47,10 @@ func (sub *Subscription) Unsub() { sub.mutex.Lock() defer sub.mutex.Unlock() - message := []any{"CLOSE", sub.GetID()} - debugLog("{%s} sending %v", sub.Relay.URL, message) - sub.conn.WriteJSON(message) + closeMsg := CloseEnvelope(sub.GetID()) + closeb, _ := (&closeMsg).MarshalJSON() + debugLog("{%s} sending %v", sub.Relay.URL, closeb) + sub.conn.WriteMessage(closeb) if sub.stopped == false && sub.Events != nil { close(sub.Events) @@ -66,14 +68,9 @@ func (sub *Subscription) Sub(ctx context.Context, filters Filters) { func (sub *Subscription) Fire() error { sub.Relay.subscriptions.Store(sub.GetID(), sub) - message := []any{"REQ", sub.GetID()} - for _, filter := range sub.Filters { - message = append(message, filter) - } - - debugLog("{%s} sending %v", sub.Relay.URL, message) - - err := sub.conn.WriteJSON(message) + reqb, _ := ReqEnvelope{sub.GetID(), sub.Filters}.MarshalJSON() + debugLog("{%s} sending %v", sub.Relay.URL, reqb) + err := sub.conn.WriteMessage(reqb) if err != nil { sub.cancel() return fmt.Errorf("failed to write: %w", err)