support for EOSE and OK messages on relay/subscription.

This commit is contained in:
fiatjaf 2022-11-12 21:49:57 -03:00
parent 50e47392a9
commit 2641327c28
No known key found for this signature in database
GPG Key ID: BAD43C4BE5C1A3A1
2 changed files with 36 additions and 3 deletions

View File

@ -39,7 +39,8 @@ type Relay struct {
Connection *Connection
subscriptions s.MapOf[string, *Subscription]
Notices chan string
Notices chan string
statusChans s.MapOf[string, chan Status]
}
func NewRelay(url string) *Relay {
@ -124,6 +125,33 @@ func (r *Relay) Connect() error {
subscription.Events <- event
}
}
case "EOSE":
if len(jsonMessage) < 2 {
continue
}
var channel string
json.Unmarshal(jsonMessage[1], &channel)
if subscription, ok := r.subscriptions.Load(channel); ok {
subscription.EndOfStoredEvents <- struct{}{}
}
case "OK":
if len(jsonMessage) < 3 {
continue
}
var (
eventId string
ok bool
)
json.Unmarshal(jsonMessage[1], &eventId)
json.Unmarshal(jsonMessage[2], &ok)
if statusChan, ok := r.statusChans.Load(eventId); ok {
if ok {
statusChan <- PublishStatusSucceeded
} else {
statusChan <- PublishStatusFailed
}
}
}
}
}
@ -132,6 +160,10 @@ func (r Relay) Publish(event Event) chan Status {
statusChan := make(chan Status)
go func() {
// we keep track of this so the OK message can be used to close it
r.statusChans.Store(event.ID, statusChan)
defer r.statusChans.Delete(event.ID)
err := r.Connection.WriteJSON([]interface{}{"EVENT", event})
if err != nil {
statusChan <- PublishStatusFailed

View File

@ -4,8 +4,9 @@ type Subscription struct {
id string
conn *Connection
filters Filters
Events chan Event
filters Filters
Events chan Event
EndOfStoredEvents chan struct{}
stopped bool
}