mirror of
https://github.com/nbd-wtf/go-nostr.git
synced 2025-11-19 10:36:42 +01:00
update to go1.18 and use generic sync maps instead of normal maps.
fixes https://github.com/fiatjaf/go-nostr/issues/12
This commit is contained in:
61
relaypool.go
61
relaypool.go
@@ -9,6 +9,7 @@ import (
|
||||
"log"
|
||||
"time"
|
||||
|
||||
s "github.com/SaveTheRbtz/generic-sync-map-go"
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
@@ -41,9 +42,9 @@ type PublishStatus struct {
|
||||
type RelayPool struct {
|
||||
SecretKey *string
|
||||
|
||||
Relays map[string]RelayPoolPolicy
|
||||
websockets map[string]*Connection
|
||||
subscriptions map[string]*Subscription
|
||||
Relays s.MapOf[string, RelayPoolPolicy]
|
||||
websockets s.MapOf[string, *Connection]
|
||||
subscriptions s.MapOf[string, *Subscription]
|
||||
|
||||
Notices chan *NoticeMessage
|
||||
}
|
||||
@@ -74,9 +75,9 @@ type NoticeMessage struct {
|
||||
// New creates a new RelayPool with no relays in it
|
||||
func NewRelayPool() *RelayPool {
|
||||
return &RelayPool{
|
||||
Relays: make(map[string]RelayPoolPolicy),
|
||||
websockets: make(map[string]*Connection),
|
||||
subscriptions: make(map[string]*Subscription),
|
||||
Relays: s.MapOf[string, RelayPoolPolicy]{},
|
||||
websockets: s.MapOf[string, *Connection]{},
|
||||
subscriptions: s.MapOf[string, *Subscription]{},
|
||||
|
||||
Notices: make(chan *NoticeMessage),
|
||||
}
|
||||
@@ -101,12 +102,13 @@ func (r *RelayPool) Add(url string, policy RelayPoolPolicy) error {
|
||||
|
||||
conn := NewConnection(socket)
|
||||
|
||||
r.Relays[nm] = policy
|
||||
r.websockets[nm] = conn
|
||||
r.Relays.Store(nm, policy)
|
||||
r.websockets.Store(nm, conn)
|
||||
|
||||
for _, sub := range r.subscriptions {
|
||||
r.subscriptions.Range(func(_ string, sub *Subscription) bool {
|
||||
sub.addRelay(nm, conn)
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
go func() {
|
||||
for {
|
||||
@@ -152,7 +154,7 @@ func (r *RelayPool) Add(url string, policy RelayPoolPolicy) error {
|
||||
|
||||
var channel string
|
||||
json.Unmarshal(jsonMessage[1], &channel)
|
||||
if subscription, ok := r.subscriptions[channel]; ok {
|
||||
if subscription, ok := r.subscriptions.Load(channel); ok {
|
||||
var event Event
|
||||
json.Unmarshal(jsonMessage[2], &event)
|
||||
|
||||
@@ -190,15 +192,17 @@ func (r *RelayPool) Add(url string, policy RelayPoolPolicy) error {
|
||||
func (r *RelayPool) Remove(url string) {
|
||||
nm := NormalizeURL(url)
|
||||
|
||||
for _, sub := range r.subscriptions {
|
||||
r.subscriptions.Range(func(_ string, sub *Subscription) bool {
|
||||
sub.removeRelay(nm)
|
||||
}
|
||||
if conn, ok := r.websockets[nm]; ok {
|
||||
return true
|
||||
})
|
||||
|
||||
if conn, ok := r.websockets.Load(nm); ok {
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
delete(r.Relays, nm)
|
||||
delete(r.websockets, nm)
|
||||
r.Relays.Delete(nm)
|
||||
r.websockets.Delete(nm)
|
||||
}
|
||||
|
||||
func (r *RelayPool) Sub(filters Filters) *Subscription {
|
||||
@@ -207,16 +211,19 @@ func (r *RelayPool) Sub(filters Filters) *Subscription {
|
||||
|
||||
subscription := Subscription{}
|
||||
subscription.channel = hex.EncodeToString(random)
|
||||
subscription.relays = make(map[string]*Connection)
|
||||
for relay, policy := range r.Relays {
|
||||
subscription.relays = s.MapOf[string, *Connection]{}
|
||||
|
||||
r.Relays.Range(func(relay string, policy RelayPoolPolicy) bool {
|
||||
if policy.ShouldRead(filters) {
|
||||
ws := r.websockets[relay]
|
||||
subscription.relays[relay] = ws
|
||||
if ws, ok := r.websockets.Load(relay); ok {
|
||||
subscription.relays.Store(relay, ws)
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
subscription.Events = make(chan EventMessage)
|
||||
subscription.UniqueEvents = make(chan Event)
|
||||
r.subscriptions[subscription.channel] = &subscription
|
||||
r.subscriptions.Store(subscription.channel, &subscription)
|
||||
|
||||
subscription.Sub(filters)
|
||||
return &subscription
|
||||
@@ -244,9 +251,9 @@ func (r *RelayPool) PublishEvent(evt *Event) (*Event, chan PublishStatus, error)
|
||||
}
|
||||
}
|
||||
|
||||
for relay, conn := range r.websockets {
|
||||
if !r.Relays[relay].ShouldWrite(evt) {
|
||||
continue
|
||||
r.websockets.Range(func(relay string, conn *Connection) bool {
|
||||
if r, ok := r.Relays.Load(relay); !ok || !r.ShouldWrite(evt) {
|
||||
return true
|
||||
}
|
||||
|
||||
go func(relay string, conn *Connection) {
|
||||
@@ -273,7 +280,9 @@ func (r *RelayPool) PublishEvent(evt *Event) (*Event, chan PublishStatus, error)
|
||||
break
|
||||
}
|
||||
}(relay, conn)
|
||||
}
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
return evt, status, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user