support handling CLOSED messages from relay client.

This commit is contained in:
fiatjaf 2023-11-28 18:54:24 -03:00
parent fa20f84ec7
commit 7449f254db
No known key found for this signature in database
GPG Key ID: BAD43C4BE5C1A3A1
3 changed files with 17 additions and 1 deletions

View File

@ -300,6 +300,10 @@ func (r *Relay) Connect(ctx context.Context) error {
if subscription, ok := r.Subscriptions.Load(string(*env)); ok { if subscription, ok := r.Subscriptions.Load(string(*env)); ok {
subscription.dispatchEose() subscription.dispatchEose()
} }
case *ClosedEnvelope:
if subscription, ok := r.Subscriptions.Load(string(env.SubscriptionID)); ok {
subscription.dispatchClosed(env.Reason)
}
case *CountEnvelope: case *CountEnvelope:
if subscription, ok := r.Subscriptions.Load(string(env.SubscriptionID)); ok && env.Count != nil && subscription.countResult != nil { if subscription, ok := r.Subscriptions.Load(string(env.SubscriptionID)); ok && env.Count != nil && subscription.countResult != nil {
subscription.countResult <- *env.Count subscription.countResult <- *env.Count
@ -478,6 +482,7 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filters Filters, opts .
counter: int(current), counter: int(current),
Events: make(chan *Event), Events: make(chan *Event),
EndOfStoredEvents: make(chan struct{}), EndOfStoredEvents: make(chan struct{}),
ClosedReason: make(chan string, 1),
Filters: filters, Filters: filters,
} }

View File

@ -26,11 +26,15 @@ type Subscription struct {
// the EndOfStoredEvents channel gets closed when an EOSE comes for that subscription // the EndOfStoredEvents channel gets closed when an EOSE comes for that subscription
EndOfStoredEvents chan struct{} EndOfStoredEvents chan struct{}
// the ClosedReason channel emits the reason when a CLOSED message is received
ClosedReason chan string
// Context will be .Done() when the subscription ends // Context will be .Done() when the subscription ends
Context context.Context Context context.Context
live atomic.Bool live atomic.Bool
eosed atomic.Bool eosed atomic.Bool
closed atomic.Bool
cancel context.CancelFunc cancel context.CancelFunc
// this keeps track of the events we've received before the EOSE that we must dispatch before // this keeps track of the events we've received before the EOSE that we must dispatch before
@ -107,6 +111,13 @@ func (sub *Subscription) dispatchEose() {
} }
} }
func (sub *Subscription) dispatchClosed(reason string) {
if sub.closed.CompareAndSwap(false, true) {
sub.ClosedReason <- reason
close(sub.ClosedReason)
}
}
// Unsub closes the subscription, sending "CLOSE" to relay as in NIP-01. // Unsub closes the subscription, sending "CLOSE" to relay as in NIP-01.
// Unsub() also closes the channel sub.Events and makes a new one. // Unsub() also closes the channel sub.Events and makes a new one.
func (sub *Subscription) Unsub() { func (sub *Subscription) Unsub() {

View File

@ -8,7 +8,7 @@ import (
"time" "time"
) )
const RELAY = "wss://nostr.mom" const RELAY = "wss://relay.nostr.bg"
// test if we can fetch a couple of random events // test if we can fetch a couple of random events
func TestSubscribe(t *testing.T) { func TestSubscribe(t *testing.T) {