From 2641327c287a0c79aef07152dbf0e2de91382ce9 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Sat, 12 Nov 2022 21:49:57 -0300 Subject: [PATCH] support for EOSE and OK messages on relay/subscription. --- relay.go | 34 +++++++++++++++++++++++++++++++++- subscription.go | 5 +++-- 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/relay.go b/relay.go index 361950d..3c88bd6 100644 --- a/relay.go +++ b/relay.go @@ -39,7 +39,8 @@ type Relay struct { Connection *Connection subscriptions s.MapOf[string, *Subscription] - Notices chan string + Notices chan string + statusChans s.MapOf[string, chan Status] } func NewRelay(url string) *Relay { @@ -124,6 +125,33 @@ func (r *Relay) Connect() error { subscription.Events <- event } } + case "EOSE": + if len(jsonMessage) < 2 { + continue + } + var channel string + json.Unmarshal(jsonMessage[1], &channel) + if subscription, ok := r.subscriptions.Load(channel); ok { + subscription.EndOfStoredEvents <- struct{}{} + } + case "OK": + if len(jsonMessage) < 3 { + continue + } + var ( + eventId string + ok bool + ) + json.Unmarshal(jsonMessage[1], &eventId) + json.Unmarshal(jsonMessage[2], &ok) + + if statusChan, ok := r.statusChans.Load(eventId); ok { + if ok { + statusChan <- PublishStatusSucceeded + } else { + statusChan <- PublishStatusFailed + } + } } } } @@ -132,6 +160,10 @@ func (r Relay) Publish(event Event) chan Status { statusChan := make(chan Status) go func() { + // we keep track of this so the OK message can be used to close it + r.statusChans.Store(event.ID, statusChan) + defer r.statusChans.Delete(event.ID) + err := r.Connection.WriteJSON([]interface{}{"EVENT", event}) if err != nil { statusChan <- PublishStatusFailed diff --git a/subscription.go b/subscription.go index bd0eee5..3c0a0f4 100644 --- a/subscription.go +++ b/subscription.go @@ -4,8 +4,9 @@ type Subscription struct { id string conn *Connection - filters Filters - Events chan Event + filters Filters + Events chan Event + EndOfStoredEvents chan struct{} stopped bool }