diff --git a/basic/main.go b/basic/main.go index 789b2ea..141b31a 100644 --- a/basic/main.go +++ b/basic/main.go @@ -3,6 +3,7 @@ package main import ( "encoding/json" "fmt" + "log" "time" "github.com/fiatjaf/relayer" @@ -25,7 +26,7 @@ func (r *Relay) Storage() relayer.Storage { return r.storage } -func (r *Relay) OnInitialized() {} +func (r *Relay) OnInitialized(*relayer.Server) {} func (r *Relay) Init() error { err := envconfig.Process("", r) @@ -71,11 +72,11 @@ func (r *Relay) AfterSave(evt *nostr.Event) { func main() { r := Relay{} if err := envconfig.Process("", &r); err != nil { - relayer.Log.Fatal().Err(err).Msg("failed to read from env") + log.Fatalf("failed to read from env: %v", err) return } r.storage = &postgresql.PostgresBackend{DatabaseURL: r.PostgresDatabase} if err := relayer.Start(&r); err != nil { - relayer.Log.Fatal().Err(err).Msg("server terminated") + log.Fatalf("server terminated: %v", err) } } diff --git a/expensive/main.go b/expensive/main.go index 0dcb75f..daacd91 100644 --- a/expensive/main.go +++ b/expensive/main.go @@ -2,6 +2,7 @@ package main import ( "encoding/json" + "log" "time" "github.com/fiatjaf/relayer" @@ -45,10 +46,10 @@ func (r *Relay) Init() error { return nil } -func (r *Relay) OnInitialized() { +func (r *Relay) OnInitialized(s *relayer.Server) { // special handlers - relayer.Router.Path("/").HandlerFunc(handleWebpage) - relayer.Router.Path("/invoice").HandlerFunc(handleInvoice) + s.Router().Path("/").HandlerFunc(handleWebpage) + s.Router().Path("/invoice").HandlerFunc(handleInvoice) } func (r *Relay) AcceptEvent(evt *nostr.Event) bool { @@ -81,11 +82,11 @@ func (r *Relay) AfterSave(evt *nostr.Event) { func main() { r := Relay{} if err := envconfig.Process("", &r); err != nil { - relayer.Log.Fatal().Err(err).Msg("failed to read from env") + log.Fatalf("failed to read from env: %v", err) return } r.storage = &postgresql.PostgresBackend{DatabaseURL: r.PostgresDatabase} if err := relayer.Start(&r); err != nil { - relayer.Log.Fatal().Err(err).Msg("server terminated") + log.Fatalf("server terminated: %v", err) } } diff --git a/go.mod b/go.mod index f07735e..87de90c 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,6 @@ require ( github.com/nbd-wtf/go-nostr v0.9.0 github.com/rif/cache2go v1.0.0 github.com/rs/cors v1.7.0 - github.com/rs/zerolog v1.20.0 github.com/stevelacy/daz v0.1.4 github.com/tidwall/gjson v1.14.1 ) diff --git a/go.sum b/go.sum index 9a65215..1fc0efa 100644 --- a/go.sum +++ b/go.sum @@ -100,7 +100,6 @@ github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= -github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f h1:JOrtw2xFKzlg+cbHpyrpLDmnN1HqhBfnX7WDiW7eG2c= github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= @@ -353,9 +352,6 @@ github.com/rif/cache2go v1.0.0 h1:DhvZcxXvsuD9ExQ6ZO6f/sOE66OaAQIwB8Mfumap4w4= github.com/rif/cache2go v1.0.0/go.mod h1:reDqW0mGufW34CGJ1tvjMobI1BY3dCTxA0ZWdbvm06s= github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= -github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= -github.com/rs/zerolog v1.20.0 h1:38k9hgtUBdxFwE34yS8rTHmHBa4eN16E4DJlv177LNs= -github.com/rs/zerolog v1.20.0/go.mod h1:IzD0RJ65iWH0w97OQQebJEvTZYvsCUm9WVLWBQrJRjo= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= @@ -553,7 +549,6 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20190327201419-c70d86f8b7cf/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= -golang.org/x/tools v0.0.0-20190828213141-aed303cbaa74/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200207183749-b753a1ba74fa/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= diff --git a/handlers.go b/handlers.go index c66172d..43029d8 100644 --- a/handlers.go +++ b/handlers.go @@ -13,6 +13,7 @@ import ( "github.com/nbd-wtf/go-nostr/nip11" ) +// TODO: consdier moving these to Server as config params const ( // Time allowed to write a message to the peer. writeWait = 10 * time.Second @@ -27,237 +28,242 @@ const ( maxMessageSize = 512000 ) +// TODO: consdier moving these to Server as config params var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, CheckOrigin: func(r *http.Request) bool { return true }, } -func handleWebsocket(relay Relay) func(http.ResponseWriter, *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - store := relay.Storage() - advancedDeleter, _ := store.(AdvancedDeleter) - advancedQuerier, _ := store.(AdvancedQuerier) +func (s *Server) handleWebsocket(w http.ResponseWriter, r *http.Request) { + store := s.relay.Storage() + advancedDeleter, _ := store.(AdvancedDeleter) + advancedQuerier, _ := store.(AdvancedQuerier) - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - log.Warn().Err(err).Msg("failed to upgrade websocket") - return - } - ticker := time.NewTicker(pingPeriod) + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + s.Log.Errorf("failed to upgrade websocket: %v", err) + return + } + s.clientsMu.Lock() + defer s.clientsMu.Unlock() + s.clients[conn] = struct{}{} + ticker := time.NewTicker(pingPeriod) - ws := &WebSocket{conn: conn} + ws := &WebSocket{conn: conn} - // reader - go func() { - defer func() { - ticker.Stop() + // reader + go func() { + defer func() { + ticker.Stop() + s.clientsMu.Lock() + if _, ok := s.clients[conn]; ok { conn.Close() - }() + delete(s.clients, conn) + } + s.clientsMu.Unlock() + }() - conn.SetReadLimit(maxMessageSize) + conn.SetReadLimit(maxMessageSize) + conn.SetReadDeadline(time.Now().Add(pongWait)) + conn.SetPongHandler(func(string) error { conn.SetReadDeadline(time.Now().Add(pongWait)) - conn.SetPongHandler(func(string) error { - conn.SetReadDeadline(time.Now().Add(pongWait)) - return nil - }) + return nil + }) - for { - typ, message, err := conn.ReadMessage() - if err != nil { - if websocket.IsUnexpectedCloseError( - err, - websocket.CloseGoingAway, // 1001 - websocket.CloseNoStatusReceived, // 1005 - websocket.CloseAbnormalClosure, // 1006 - ) { - log.Warn().Err(err).Str("ip", r.Header.Get("x-forwarded-for")).Msg("unexpected close error") + for { + typ, message, err := conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError( + err, + websocket.CloseGoingAway, // 1001 + websocket.CloseNoStatusReceived, // 1005 + websocket.CloseAbnormalClosure, // 1006 + ) { + s.Log.Warningf("unexpected close error from %s: %v", r.Header.Get("X-Forwarded-For"), err) + } + break + } + + if typ == websocket.PingMessage { + ws.WriteMessage(websocket.PongMessage, nil) + continue + } + + go func(message []byte) { + var notice string + defer func() { + if notice != "" { + ws.WriteJSON([]interface{}{"NOTICE", notice}) } + }() + + var request []json.RawMessage + if err := json.Unmarshal(message, &request); err != nil { + // stop silently + return + } + + if len(request) < 2 { + notice = "request has less than 2 parameters" + return + } + + var typ string + json.Unmarshal(request[0], &typ) + + switch typ { + case "EVENT": + // it's a new event + var evt nostr.Event + if err := json.Unmarshal(request[1], &evt); err != nil { + notice = "failed to decode event: " + err.Error() + return + } + + // check serialization + serialized := evt.Serialize() + + // assign ID + hash := sha256.Sum256(serialized) + evt.ID = hex.EncodeToString(hash[:]) + + // check signature (requires the ID to be set) + if ok, err := evt.CheckSignature(); err != nil { + ws.WriteJSON([]interface{}{"OK", evt.ID, false, "error: failed to verify signature"}) + return + } else if !ok { + ws.WriteJSON([]interface{}{"OK", evt.ID, false, "invalid: signature is invalid"}) + return + } + + if evt.Kind == 5 { + // event deletion -- nip09 + for _, tag := range evt.Tags { + if len(tag) >= 2 && tag[0] == "e" { + if advancedDeleter != nil { + advancedDeleter.BeforeDelete(tag[1], evt.PubKey) + } + + if err := store.DeleteEvent(tag[1], evt.PubKey); err != nil { + ws.WriteJSON([]interface{}{"OK", evt.ID, false, fmt.Sprintf("error: %s", err.Error())}) + return + } + + if advancedDeleter != nil { + advancedDeleter.AfterDelete(tag[1], evt.PubKey) + } + } + } + return + } + + ok, message := AddEvent(s.relay, evt) + ws.WriteJSON([]interface{}{"OK", evt.ID, ok, message}) + break + case "REQ": + var id string + json.Unmarshal(request[1], &id) + if id == "" { + notice = "REQ has no " + return + } + + filters := make(nostr.Filters, len(request)-2) + for i, filterReq := range request[2:] { + if err := json.Unmarshal( + filterReq, + &filters[i], + ); err != nil { + notice = "failed to decode filter" + return + } + + filter := &filters[i] + + if advancedQuerier != nil { + advancedQuerier.BeforeQuery(filter) + } + + events, err := store.QueryEvents(filter) + if err != nil { + s.Log.Errorf("store: %v", err) + continue + } + + if advancedQuerier != nil { + advancedQuerier.AfterQuery(events, filter) + } + if filter.Limit > 0 && len(events) > filter.Limit { + events = events[0:filter.Limit] + } + for _, event := range events { + ws.WriteJSON([]interface{}{"EVENT", id, event}) + } + ws.WriteJSON([]interface{}{"EOSE", id}) + } + + setListener(id, ws, filters) + break + case "CLOSE": + var id string + json.Unmarshal(request[1], &id) + if id == "" { + notice = "CLOSE has no " + return + } + + removeListener(ws, id) + break + default: + if cwh, ok := s.relay.(CustomWebSocketHandler); ok { + cwh.HandleUnknownType(ws, typ, request) + } else { + notice = "unknown message type " + typ + } + return } + }(message) + } + }() - if typ == websocket.PingMessage { - ws.WriteMessage(websocket.PongMessage, nil) - continue - } - - go func(message []byte) { - var notice string - defer func() { - if notice != "" { - ws.WriteJSON([]interface{}{"NOTICE", notice}) - } - }() - - var request []json.RawMessage - if err := json.Unmarshal(message, &request); err != nil { - // stop silently - return - } - - if len(request) < 2 { - notice = "request has less than 2 parameters" - return - } - - var typ string - json.Unmarshal(request[0], &typ) - - switch typ { - case "EVENT": - // it's a new event - var evt nostr.Event - if err := json.Unmarshal(request[1], &evt); err != nil { - notice = "failed to decode event: " + err.Error() - return - } - - // check serialization - serialized := evt.Serialize() - - // assign ID - hash := sha256.Sum256(serialized) - evt.ID = hex.EncodeToString(hash[:]) - - // check signature (requires the ID to be set) - if ok, err := evt.CheckSignature(); err != nil { - ws.WriteJSON([]interface{}{"OK", evt.ID, false, "error: failed to verify signature"}) - return - } else if !ok { - ws.WriteJSON([]interface{}{"OK", evt.ID, false, "invalid: signature is invalid"}) - return - } - - if evt.Kind == 5 { - // event deletion -- nip09 - for _, tag := range evt.Tags { - if len(tag) >= 2 && tag[0] == "e" { - if advancedDeleter != nil { - advancedDeleter.BeforeDelete(tag[1], evt.PubKey) - } - - if err := store.DeleteEvent(tag[1], evt.PubKey); err != nil { - ws.WriteJSON([]interface{}{"OK", evt.ID, false, fmt.Sprintf("error: %s", err.Error())}) - return - } - - if advancedDeleter != nil { - advancedDeleter.AfterDelete(tag[1], evt.PubKey) - } - } - } - return - } - - ok, message := AddEvent(relay, evt) - ws.WriteJSON([]interface{}{"OK", evt.ID, ok, message}) - - break - case "REQ": - var id string - json.Unmarshal(request[1], &id) - if id == "" { - notice = "REQ has no " - return - } - - filters := make(nostr.Filters, len(request)-2) - for i, filterReq := range request[2:] { - if err := json.Unmarshal( - filterReq, - &filters[i], - ); err != nil { - notice = "failed to decode filter" - return - } - - filter := &filters[i] - - if advancedQuerier != nil { - advancedQuerier.BeforeQuery(filter) - } - - events, err := store.QueryEvents(filter) - if err == nil { - if advancedQuerier != nil { - advancedQuerier.AfterQuery(events, filter) - } - - if filter.Limit > 0 && len(events) > filter.Limit { - events = events[0:filter.Limit] - } - - for _, event := range events { - ws.WriteJSON([]interface{}{"EVENT", id, event}) - } - - ws.WriteJSON([]interface{}{"EOSE", id}) - } - } - - setListener(id, ws, filters) - break - case "CLOSE": - var id string - json.Unmarshal(request[1], &id) - if id == "" { - notice = "CLOSE has no " - return - } - - removeListener(ws, id) - break - default: - if cwh, ok := relay.(CustomWebSocketHandler); ok { - cwh.HandleUnknownType(ws, typ, request) - } else { - notice = "unknown message type " + typ - } - return - } - }(message) - } + // writer + go func() { + defer func() { + ticker.Stop() + conn.Close() }() - // writer - go func() { - defer func() { - ticker.Stop() - conn.Close() - }() - - for { - select { - case <-ticker.C: - err := ws.WriteMessage(websocket.PingMessage, nil) - if err != nil { - log.Warn().Err(err).Msg("error writing ping, closing websocket") - return - } + for { + select { + case <-ticker.C: + err := ws.WriteMessage(websocket.PingMessage, nil) + if err != nil { + s.Log.Errorf("error writing ping: %v; closing websocket", err) + return } } - }() - } + } + }() } -func handleNIP11(relay Relay) func(http.ResponseWriter, *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") +func (s *Server) handleNIP11(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") - info := nip11.RelayInformationDocument{ - Name: relay.Name(), - Description: "relay powered by the relayer framework", - PubKey: "~", - Contact: "~", - SupportedNIPs: []int{9, 15, 16}, - Software: "https://github.com/fiatjaf/relayer", - Version: "~", - } - - if ifmer, ok := relay.(Informationer); ok { - info = ifmer.GetNIP11InformationDocument() - } - - json.NewEncoder(w).Encode(info) + info := nip11.RelayInformationDocument{ + Name: s.relay.Name(), + Description: "relay powered by the relayer framework", + PubKey: "~", + Contact: "~", + SupportedNIPs: []int{9, 15, 16}, + Software: "https://github.com/fiatjaf/relayer", + Version: "~", } + + if ifmer, ok := s.relay.(Informationer); ok { + info = ifmer.GetNIP11InformationDocument() + } + + json.NewEncoder(w).Encode(info) } diff --git a/interface.go b/interface.go index e77d01b..14742b2 100644 --- a/interface.go +++ b/interface.go @@ -1,20 +1,31 @@ package relayer import ( + "context" "encoding/json" "github.com/nbd-wtf/go-nostr" "github.com/nbd-wtf/go-nostr/nip11" ) -var Log = log - -// relay +// Relay is the main interface for implementing a nostr relay. type Relay interface { + // Name is used as the "name" field in NIP-11 and as a prefix in default Server logging. + // For other NIP-11 fields, see [Informationer]. Name() string + // Init is called at the very beginning by [Server.Start], allowing a relay + // to initialize its internal resources. + // Also see [Storage.Init]. Init() error - OnInitialized() + // OnInitialized is called by [Server.Start] right before starting to serve HTTP requests. + // It is passed the server to allow callers make final adjustments, such as custom routing. + OnInitialized(*Server) + // AcceptEvent is called for every nostr event received by the server. + // If the returned value is true, the event is passed on to [Storage.SaveEvent]. + // Otherwise, the server responds with a negative and "blocked" message as described + // in NIP-20. AcceptEvent(*nostr.Event) bool + // Storage returns the relay storage implementation. Storage() Storage } @@ -22,33 +33,60 @@ type Injector interface { InjectEvents() chan nostr.Event } +// Informationer is called to compose NIP-11 response to an HTTP request +// with application/nostr+json mime type. +// See also [Relay.Name]. type Informationer interface { GetNIP11InformationDocument() nip11.RelayInformationDocument } +// CustomWebSocketHandler, if implemented, is passed nostr message types unrecognized +// by the server. +// The server handles "EVENT", "REQ" and "CLOSE" messages, as described in NIP-01. type CustomWebSocketHandler interface { HandleUnknownType(ws *WebSocket, typ string, request []json.RawMessage) } -// storage +// ShutdownAware is called during the server shutdown. +// See [Server.Shutdown] for details. +type ShutdownAware interface { + OnShutdown(context.Context) +} + +// Logger is what [Server] uses to log messages. +type Logger interface { + Infof(format string, v ...any) + Warningf(format string, v ...any) + Errorf(format string, v ...any) +} + +// Storage is a persistence layer for nostr events handled by a relay. type Storage interface { + // Init is called at the very beginning by [Server.Start], after [Relay.Init], + // allowing a storage to initialize its internal resources. Init() error + // QueryEvents is invoked upon a client's REQ as described in NIP-01. QueryEvents(filter *nostr.Filter) (events []nostr.Event, err error) + // DeleteEvent is used to handle deletion events, as per NIP-09. DeleteEvent(id string, pubkey string) error + // SaveEvent is called once Relay.AcceptEvent reports true. SaveEvent(event *nostr.Event) error } +// AdvancedQuerier methods are called before and after [Storage.QueryEvents]. type AdvancedQuerier interface { BeforeQuery(*nostr.Filter) AfterQuery([]nostr.Event, *nostr.Filter) } +// AdvancedDeleter methods are called before and after [Storage.DeleteEvent]. type AdvancedDeleter interface { BeforeDelete(id string, pubkey string) AfterDelete(id string, pubkey string) } +// AdvancedSaver methods are called before and after [Storage.SaveEvent]. type AdvancedSaver interface { BeforeSave(*nostr.Event) AfterSave(*nostr.Event) diff --git a/rss-bridge/handlers.go b/rss-bridge/handlers.go index be611c8..127e88b 100644 --- a/rss-bridge/handlers.go +++ b/rss-bridge/handlers.go @@ -3,10 +3,10 @@ package main import ( "encoding/json" "fmt" + "log" "net/http" "github.com/nbd-wtf/go-nostr" - "github.com/fiatjaf/relayer" . "github.com/stevelacy/daz" ) @@ -123,7 +123,7 @@ func handleCreateFeed(w http.ResponseWriter, r *http.Request) { return } - relayer.Log.Info().Str("url", feedurl).Str("pubkey", pubkey).Msg("saved feed") + log.Printf("saved feed at url %q as pubkey %s", feedurl, pubkey) fmt.Fprintf(w, "url : %s\npubkey: %s", feedurl, pubkey) return diff --git a/rss-bridge/main.go b/rss-bridge/main.go index d16cf9a..1e9c696 100644 --- a/rss-bridge/main.go +++ b/rss-bridge/main.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "log" "sync" "time" @@ -29,7 +30,10 @@ func (relay *Relay) Name() string { return "relayer-rss-bridge" } -func (r *Relay) OnInitialized() {} +func (r *Relay) OnInitialized(s *relayer.Server) { + s.Router().Path("/").HandlerFunc(handleWebpage) + s.Router().Path("/create").HandlerFunc(handleCreateFeed) +} func (relay *Relay) Init() error { err := envconfig.Process("", relay) @@ -38,20 +42,16 @@ func (relay *Relay) Init() error { } if db, err := pebble.Open("db", nil); err != nil { - relayer.Log.Fatal().Err(err).Str("path", "db").Msg("failed to open db") + log.Fatalf("failed to open db: %v", err) } else { relay.db = db } - relayer.Router.Path("/").HandlerFunc(handleWebpage) - relayer.Router.Path("/create").HandlerFunc(handleCreateFeed) - go func() { time.Sleep(20 * time.Minute) filters := relayer.GetListeningFilters() - relayer.Log.Info().Int("filters active", len(filters)). - Msg("checking for updates") + log.Printf("checking for updates; %d filters active", len(filters)) for _, filter := range filters { if filter.Kinds == nil || filter.Kinds.Contains(nostr.KindTextNote) { @@ -61,16 +61,13 @@ func (relay *Relay) Init() error { var entity Entity if err := json.Unmarshal(val, &entity); err != nil { - relayer.Log.Error().Err(err).Str("key", pubkey). - Str("val", string(val)). - Msg("got invalid json from db") + log.Printf("got invalid json from db at key %s: %v", pubkey, err) continue } feed, err := parseFeed(entity.URL) if err != nil { - relayer.Log.Warn().Err(err).Str("url", entity.URL). - Msg("failed to parse feed") + log.Printf("failed to parse feed at url %q: %v", entity.URL, err) continue } @@ -121,15 +118,13 @@ func (b store) QueryEvents(filter *nostr.Filter) ([]nostr.Event, error) { var entity Entity if err := json.Unmarshal(val, &entity); err != nil { - relayer.Log.Error().Err(err).Str("key", pubkey).Str("val", string(val)). - Msg("got invalid json from db") + log.Printf("got invalid json from db at key %s: %v", pubkey, err) continue } feed, err := parseFeed(entity.URL) if err != nil { - relayer.Log.Warn().Err(err).Str("url", entity.URL). - Msg("failed to parse feed") + log.Printf("failed to parse feed at url %q: %v", entity.URL, err) continue } @@ -182,6 +177,6 @@ func (relay *Relay) InjectEvents() chan nostr.Event { func main() { if err := relayer.Start(relay); err != nil { - relayer.Log.Fatal().Err(err).Msg("server terminated") + log.Fatalf("server terminated: %v", err) } } diff --git a/start.go b/start.go index 574645e..4eb0849 100644 --- a/start.go +++ b/start.go @@ -1,29 +1,27 @@ package relayer import ( - "net" + "context" "fmt" + "log" + "net" "net/http" "os" + "sync" "time" "github.com/gorilla/mux" + "github.com/gorilla/websocket" "github.com/kelseyhightower/envconfig" "github.com/rs/cors" - "github.com/rs/zerolog" ) -// Settings specify initial startup parameters for a relay server. -// See StartConf for details. +// Settings specify initial startup parameters for Start and StartConf. type Settings struct { Host string `envconfig:"HOST" default:"0.0.0.0"` Port string `envconfig:"PORT" default:"7447"` } -var log = zerolog.New(os.Stderr).Output(zerolog.ConsoleWriter{Out: os.Stderr}) - -var Router = mux.NewRouter() - // Start calls StartConf with Settings parsed from the process environment. func Start(relay Relay) error { var s Settings @@ -33,34 +31,111 @@ func Start(relay Relay) error { return StartConf(s, relay) } -// StartConf initalizes the relay and its storage using their respective Init methods, -// returning any non-nil errors, and starts listening for HTTP requests on host:port otherwise, -// as specified in the settings. -// -// StartConf never returns until termination of the underlying http.Server, forwarding -// any but http.ErrServerClosed error from the server's ListenAndServe. +// StartConf creates a new Server, passing it host:port for the address, +// and starts serving propagating any error returned from [Server.Start]. func StartConf(s Settings, relay Relay) error { - // allow implementations to do initialization stuff - if err := relay.Init(); err != nil { + addr := net.JoinHostPort(s.Host, s.Port) + srv := NewServer(addr, relay) + return srv.Start() +} + +// Server is a base for package users to implement nostr relays. +// It can serve HTTP requests and websockets, passing control over to a relay implementation. +// +// To implement a relay, it is enough to satisfy [Relay] interface. Other interfaces are +// [Informationer], [CustomWebSocketHandler], [ShutdownAware] and AdvancedXxx types. +// See their respective doc comments. +// +// The basic usage is to call Start or StartConf, which starts serving immediately. +// For a more fine-grained control, use NewServer. +// See [basic/main.go], [whitelisted/main.go], [expensive/main.go] and [rss-bridge/main.go] +// for example implementations. +// +// The following resource is a good starting point for details on what nostr protocol is +// and how it works: https://github.com/nostr-protocol/nostr +type Server struct { + // Default logger, as set by NewServer, is a stdlib logger prefixed with [Relay.Name], + // outputting to stderr. + Log Logger + + addr string + relay Relay + router *mux.Router + httpServer *http.Server // set at Server.Start + + // keep a connection reference to all connected clients for Server.Shutdown + clientsMu sync.Mutex + clients map[*websocket.Conn]struct{} +} + +// NewServer creates a relay server with sensible defaults. +// The provided address is used to listen and respond to HTTP requests. +func NewServer(addr string, relay Relay) *Server { + srv := &Server{ + Log: defaultLogger(relay.Name() + ": "), + addr: addr, + relay: relay, + router: mux.NewRouter(), + clients: make(map[*websocket.Conn]struct{}), + } + srv.router.Path("/").Headers("Upgrade", "websocket").HandlerFunc(srv.handleWebsocket) + srv.router.Path("/").Headers("Accept", "application/nostr+json").HandlerFunc(srv.handleNIP11) + return srv +} + +// Router returns an http.Handler used to handle server's in-flight HTTP requests. +// By default, the router is setup to handle websocket upgrade and NIP-11 requests. +// +// In a larger system, where the relay server is not the only HTTP handler, +// prefer using s as http.Handler instead of the returned router. +func (s *Server) Router() *mux.Router { + return s.router +} + +// Addr returns Server's HTTP listener address in host:port form. +// If the initial port value provided in NewServer is 0, the actual port +// value is picked at random and available by the time [Relay.OnInitialized] +// is called. +func (s *Server) Addr() string { + return s.addr +} + +// ServeHTTP implements http.Handler interface. +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.router.ServeHTTP(w, r) +} + +// Start initializes the relay and its storage using their respective Init methods, +// returning any non-nil errors, and starts listening for HTTP requests on the address +// provided to NewServer. +// +// Just before starting to serve HTTP requests, Start calls Relay.OnInitialized +// allowing package users to make last adjustments, such as setting up custom HTTP +// handlers using s.Router. +// +// Start never returns until termination of the underlying http.Server, forwarding +// any but http.ErrServerClosed error from the server's ListenAndServe. +// To terminate the server, call Shutdown. +func (s *Server) Start() error { + ln, err := net.Listen("tcp", s.addr) + if err != nil { + return err + } + s.addr = ln.Addr().String() + return s.startListener(ln) +} + +func (s *Server) startListener(ln net.Listener) error { + // init the relay + if err := s.relay.Init(); err != nil { return fmt.Errorf("relay init: %w", err) } - - // initialize storage - if err := relay.Storage().Init(); err != nil { + if err := s.relay.Storage().Init(); err != nil { return fmt.Errorf("storage init: %w", err) } - // expose this Log instance so implementations can use it - Log = log.With().Str("name", relay.Name()).Logger() - - // catch the websocket call before anything else - Router.Path("/").Headers("Upgrade", "websocket").HandlerFunc(handleWebsocket(relay)) - - // nip-11, relay information - Router.Path("/").Headers("Accept", "application/nostr+json").HandlerFunc(handleNIP11(relay)) - - // wait for events to come from implementations, if this is implemented - if inj, ok := relay.(Injector); ok { + // push events from implementations, if any + if inj, ok := s.relay.(Injector); ok { go func() { for event := range inj.InjectEvents() { notifyListeners(&event) @@ -68,21 +143,58 @@ func StartConf(s Settings, relay Relay) error { }() } - relay.OnInitialized() + s.httpServer = &http.Server{ + Handler: cors.Default().Handler(s), + Addr: s.addr, + WriteTimeout: 2 * time.Second, + ReadTimeout: 2 * time.Second, + IdleTimeout: 30 * time.Second, + } + s.httpServer.RegisterOnShutdown(s.disconnectAllClients) + // final callback, just before serving http + s.relay.OnInitialized(s) - // start http server - srv := &http.Server{ - Handler: cors.Default().Handler(Router), - Addr: net.JoinHostPort(s.Host, s.Port), - WriteTimeout: 2 * time.Second, - ReadTimeout: 2 * time.Second, - IdleTimeout: 30 * time.Second, - ReadHeaderTimeout: 2 * time.Second, + // start accepting incoming requests + s.Log.Infof("listening on %s", s.addr) + err := s.httpServer.Serve(ln) + if err == http.ErrServerClosed { + err = nil } - log.Debug().Str("addr", srv.Addr).Msg("listening") - srvErr := srv.ListenAndServe() - if srvErr == http.ErrServerClosed { - srvErr = nil - } - return srvErr + return err } + +// Shutdown stops serving HTTP requests and send a websocket close control message +// to all connected clients. +// +// If the relay is ShutdownAware, Shutdown calls its OnShutdown, passing the context as is. +// Note that the HTTP server make some time to shutdown and so the context deadline, +// if any, may have been shortened by the time OnShutdown is called. +func (s *Server) Shutdown(ctx context.Context) error { + err := s.httpServer.Shutdown(ctx) + if f, ok := s.relay.(ShutdownAware); ok { + f.OnShutdown(ctx) + } + return err +} + +func (s *Server) disconnectAllClients() { + s.clientsMu.Lock() + defer s.clientsMu.Unlock() + for conn := range s.clients { + conn.WriteControl(websocket.CloseMessage, nil, time.Now().Add(time.Second)) + conn.Close() + delete(s.clients, conn) + } +} + +func defaultLogger(prefix string) Logger { + l := log.New(os.Stderr, "", log.LstdFlags|log.Lmsgprefix) + l.SetPrefix(prefix) + return stdLogger{l} +} + +type stdLogger struct{ log *log.Logger } + +func (l stdLogger) Infof(format string, v ...any) { l.log.Printf(format, v...) } +func (l stdLogger) Warningf(format string, v ...any) { l.log.Printf(format, v...) } +func (l stdLogger) Errorf(format string, v ...any) { l.log.Printf(format, v...) } diff --git a/start_test.go b/start_test.go new file mode 100644 index 0000000..50267ad --- /dev/null +++ b/start_test.go @@ -0,0 +1,106 @@ +package relayer + +import ( + "context" + "net/http" + "testing" + "time" + + "github.com/gorilla/websocket" + "github.com/nbd-wtf/go-nostr" +) + +func TestServerStartShutdown(t *testing.T) { + var ( + serverHost string + inited bool + storeInited bool + shutdown bool + ) + ready := make(chan struct{}) + rl := &testRelay{ + name: "test server start", + init: func() error { + inited = true + return nil + }, + onInitialized: func(s *Server) { + serverHost = s.Addr() + close(ready) + }, + onShutdown: func(context.Context) { shutdown = true }, + storage: &testStorage{ + init: func() error { storeInited = true; return nil }, + }, + } + srv := NewServer("127.0.0.1:0", rl) + done := make(chan error) + go func() { done <- srv.Start(); close(done) }() + + // verify everything's initialized + select { + case <-ready: + // continue + case <-time.After(time.Second): + t.Fatal("srv.Start too long to initialize") + } + if !inited { + t.Error("didn't call testRelay.init") + } + if !storeInited { + t.Error("didn't call testStorage.init") + } + + // check that http requests are served + if _, err := http.Get("http://" + serverHost); err != nil { + t.Errorf("GET %s: %v", serverHost, err) + } + + // verify server shuts down + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + if err := srv.Shutdown(ctx); err != nil { + t.Errorf("srv.Shutdown: %v", err) + } + if !shutdown { + t.Error("didn't call testRelay.onShutdown") + } + select { + case err := <-done: + if err != nil { + t.Errorf("srv.Start: %v", err) + } + case <-time.After(time.Second): + t.Error("srv.Start too long to return") + } +} + +func TestServerShutdownWebsocket(t *testing.T) { + // set up a new relay server + srv := startTestRelay(t, &testRelay{storage: &testStorage{}}) + + // connect a client to it + ctx1, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + client, err := nostr.RelayConnectContext(ctx1, "ws://"+srv.Addr()) + if err != nil { + t.Fatalf("nostr.RelayConnectContext: %v", err) + } + + // now, shut down the server + ctx2, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + if err := srv.Shutdown(ctx2); err != nil { + t.Errorf("srv.Shutdown: %v", err) + } + + // wait for the client to receive a "connection close" + select { + case err := <-client.ConnectionError: + if _, ok := err.(*websocket.CloseError); !ok { + t.Errorf("client.ConnextionError: %v (%T); want websocket.CloseError", err, err) + } + case <-time.After(2 * time.Second): + t.Error("client took too long to disconnect") + } +} diff --git a/storage/postgresql/init.go b/storage/postgresql/init.go index 3f598e9..22a4858 100644 --- a/storage/postgresql/init.go +++ b/storage/postgresql/init.go @@ -1,7 +1,6 @@ package postgresql import ( - "github.com/fiatjaf/relayer" "github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx/reflectx" _ "github.com/lib/pq" @@ -41,8 +40,5 @@ CREATE INDEX IF NOT EXISTS timeidx ON event (created_at); CREATE INDEX IF NOT EXISTS kindidx ON event (kind); CREATE INDEX IF NOT EXISTS arbitrarytagvalues ON event USING gin (tagvalues); `) - if err != nil { - relayer.Log.Print(err) - } - return nil + return err } diff --git a/storage/postgresql/query.go b/storage/postgresql/query.go index 67ad738..8ead377 100644 --- a/storage/postgresql/query.go +++ b/storage/postgresql/query.go @@ -10,7 +10,6 @@ import ( "time" "github.com/nbd-wtf/go-nostr" - "github.com/fiatjaf/relayer" ) func (b PostgresBackend) QueryEvents(filter *nostr.Filter) (events []nostr.Event, err error) { @@ -137,11 +136,7 @@ func (b PostgresBackend) QueryEvents(filter *nostr.Filter) (events []nostr.Event rows, err := b.DB.Query(query, params...) if err != nil && err != sql.ErrNoRows { - relayer.Log.Warn().Err(err). - Interface("filter", filter). - Str("query", query). - Msg("failed to fetch events") - return nil, fmt.Errorf("failed to fetch events: %w", err) + return nil, fmt.Errorf("failed to fetch events using query %q: %w", query, err) } for rows.Next() { diff --git a/util_test.go b/util_test.go new file mode 100644 index 0000000..f1fff91 --- /dev/null +++ b/util_test.go @@ -0,0 +1,104 @@ +package relayer + +import ( + "context" + "testing" + "time" + + "github.com/nbd-wtf/go-nostr" +) + +func startTestRelay(t *testing.T, tr *testRelay) *Server { + t.Helper() + ready := make(chan struct{}) + + onInitializedFn := tr.onInitialized + tr.onInitialized = func(s *Server) { + close(ready) + if onInitializedFn != nil { + onInitializedFn(s) + } + } + srv := NewServer("127.0.0.1:0", tr) + go srv.Start() + + select { + case <-ready: + case <-time.After(time.Second): + t.Fatal("server took too long to start up") + } + return srv +} + +type testRelay struct { + name string + storage Storage + init func() error + onInitialized func(*Server) + onShutdown func(context.Context) + acceptEvent func(*nostr.Event) bool +} + +func (tr *testRelay) Name() string { return tr.name } +func (tr *testRelay) Storage() Storage { return tr.storage } + +func (tr *testRelay) Init() error { + if fn := tr.init; fn != nil { + return fn() + } + return nil +} + +func (tr *testRelay) OnInitialized(s *Server) { + if fn := tr.onInitialized; fn != nil { + fn(s) + } +} + +func (tr *testRelay) OnShutdown(ctx context.Context) { + if fn := tr.onShutdown; fn != nil { + fn(ctx) + } +} + +func (tr *testRelay) AcceptEvent(e *nostr.Event) bool { + if fn := tr.acceptEvent; fn != nil { + return fn(e) + } + return true +} + +type testStorage struct { + init func() error + queryEvents func(*nostr.Filter) ([]nostr.Event, error) + deleteEvent func(id string, pubkey string) error + saveEvent func(*nostr.Event) error +} + +func (st *testStorage) Init() error { + if fn := st.init; fn != nil { + return fn() + } + return nil +} + +func (st *testStorage) QueryEvents(f *nostr.Filter) ([]nostr.Event, error) { + if fn := st.queryEvents; fn != nil { + return fn(f) + } + return nil, nil +} + +func (st *testStorage) DeleteEvent(id string, pubkey string) error { + if fn := st.deleteEvent; fn != nil { + return fn(id, pubkey) + } + return nil +} + +func (st *testStorage) SaveEvent(e *nostr.Event) error { + if fn := st.saveEvent; fn != nil { + return fn(e) + } + return nil +} diff --git a/whitelisted/main.go b/whitelisted/main.go index 79189dc..94b31b6 100644 --- a/whitelisted/main.go +++ b/whitelisted/main.go @@ -2,6 +2,7 @@ package main import ( "encoding/json" + "log" "github.com/fiatjaf/relayer" "github.com/fiatjaf/relayer/storage/postgresql" @@ -20,7 +21,7 @@ func (r *Relay) Name() string { return "WhitelistedRelay" } -func (r *Relay) OnInitialized() {} +func (r *Relay) OnInitialized(*relayer.Server) {} func (r *Relay) Storage() relayer.Storage { return r.storage @@ -55,11 +56,11 @@ func (r *Relay) AcceptEvent(evt *nostr.Event) bool { func main() { r := Relay{} if err := envconfig.Process("", &r); err != nil { - relayer.Log.Fatal().Err(err).Msg("failed to read from env") + log.Fatalf("failed to read from env: %v", err) return } r.storage = &postgresql.PostgresBackend{DatabaseURL: r.PostgresDatabase} if err := relayer.Start(&r); err != nil { - relayer.Log.Fatal().Err(err).Msg("server terminated") + log.Fatalf("server terminated: %v", err) } }