start: introduce Server type and Shutdown (breaking change)

the main motivation for this change is to be able to run tests.
before this commit, Start, Router and Log operated on global variables,
making automated testing unreasonably hard.

this commit puts all that a server needs in a new Server type,
which also made it possible for a Server.Shutdown - see ShutdownAware
doc comments.

BREAKING CHANGES:
- Relay.OnInitialized takes one argument now, *relayer.Server.
- relayer.Router is now replaced by relayer.Server.Router().
  package users can still hook into the router from OnInitialized
  for custom HTTP routing.
- relayer.Log is gone. apart from another global var, imho this was
  a too opinionated choice for a framework to build a custom relay upon.
  this commit introduces a Logger interface which package users can implement
  for zerolog to make it log like before. see Server.Log for details.

other notable changes: finally added a couple basic tests, for start up
and shutdown. doc comments now explain most of the essentials,
hopefully making it more approachable for newcomers and easier to understand
the relayer package.

the changes in handlers.go are minimal, although git diff goes crazy.
this is because most of the lines are simply shifted indentation back by one
due to go fmt.

before this commit:

    func handleWebsocket(relay Relay) func(http.ResponseWriter, *http.Request)
    func handleNIP11(relay Relay) func(http.ResponseWriter, *http.Request)

after:

    func (s *Server) handleWebsocket(w http.ResponseWriter, r *http.Request)
    func (s *Server) handleNIP11(w http.ResponseWriter, r *http.Request)
This commit is contained in:
alex 2022-12-24 22:21:26 +01:00 committed by fiatjaf
parent 932a9b62a7
commit 627724f702
14 changed files with 654 additions and 305 deletions

View File

@ -3,6 +3,7 @@ package main
import (
"encoding/json"
"fmt"
"log"
"time"
"github.com/fiatjaf/relayer"
@ -25,7 +26,7 @@ func (r *Relay) Storage() relayer.Storage {
return r.storage
}
func (r *Relay) OnInitialized() {}
func (r *Relay) OnInitialized(*relayer.Server) {}
func (r *Relay) Init() error {
err := envconfig.Process("", r)
@ -71,11 +72,11 @@ func (r *Relay) AfterSave(evt *nostr.Event) {
func main() {
r := Relay{}
if err := envconfig.Process("", &r); err != nil {
relayer.Log.Fatal().Err(err).Msg("failed to read from env")
log.Fatalf("failed to read from env: %v", err)
return
}
r.storage = &postgresql.PostgresBackend{DatabaseURL: r.PostgresDatabase}
if err := relayer.Start(&r); err != nil {
relayer.Log.Fatal().Err(err).Msg("server terminated")
log.Fatalf("server terminated: %v", err)
}
}

View File

@ -2,6 +2,7 @@ package main
import (
"encoding/json"
"log"
"time"
"github.com/fiatjaf/relayer"
@ -45,10 +46,10 @@ func (r *Relay) Init() error {
return nil
}
func (r *Relay) OnInitialized() {
func (r *Relay) OnInitialized(s *relayer.Server) {
// special handlers
relayer.Router.Path("/").HandlerFunc(handleWebpage)
relayer.Router.Path("/invoice").HandlerFunc(handleInvoice)
s.Router().Path("/").HandlerFunc(handleWebpage)
s.Router().Path("/invoice").HandlerFunc(handleInvoice)
}
func (r *Relay) AcceptEvent(evt *nostr.Event) bool {
@ -81,11 +82,11 @@ func (r *Relay) AfterSave(evt *nostr.Event) {
func main() {
r := Relay{}
if err := envconfig.Process("", &r); err != nil {
relayer.Log.Fatal().Err(err).Msg("failed to read from env")
log.Fatalf("failed to read from env: %v", err)
return
}
r.storage = &postgresql.PostgresBackend{DatabaseURL: r.PostgresDatabase}
if err := relayer.Start(&r); err != nil {
relayer.Log.Fatal().Err(err).Msg("server terminated")
log.Fatalf("server terminated: %v", err)
}
}

1
go.mod
View File

@ -16,7 +16,6 @@ require (
github.com/nbd-wtf/go-nostr v0.9.0
github.com/rif/cache2go v1.0.0
github.com/rs/cors v1.7.0
github.com/rs/zerolog v1.20.0
github.com/stevelacy/daz v0.1.4
github.com/tidwall/gjson v1.14.1
)

5
go.sum
View File

@ -100,7 +100,6 @@ github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f h1:JOrtw2xFKzlg+cbHpyrpLDmnN1HqhBfnX7WDiW7eG2c=
github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI=
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
@ -353,9 +352,6 @@ github.com/rif/cache2go v1.0.0 h1:DhvZcxXvsuD9ExQ6ZO6f/sOE66OaAQIwB8Mfumap4w4=
github.com/rif/cache2go v1.0.0/go.mod h1:reDqW0mGufW34CGJ1tvjMobI1BY3dCTxA0ZWdbvm06s=
github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik=
github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/rs/zerolog v1.20.0 h1:38k9hgtUBdxFwE34yS8rTHmHBa4eN16E4DJlv177LNs=
github.com/rs/zerolog v1.20.0/go.mod h1:IzD0RJ65iWH0w97OQQebJEvTZYvsCUm9WVLWBQrJRjo=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
@ -553,7 +549,6 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3
golang.org/x/tools v0.0.0-20190327201419-c70d86f8b7cf/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190828213141-aed303cbaa74/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200207183749-b753a1ba74fa/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=

View File

@ -13,6 +13,7 @@ import (
"github.com/nbd-wtf/go-nostr/nip11"
)
// TODO: consdier moving these to Server as config params
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
@ -27,237 +28,242 @@ const (
maxMessageSize = 512000
)
// TODO: consdier moving these to Server as config params
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool { return true },
}
func handleWebsocket(relay Relay) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
store := relay.Storage()
advancedDeleter, _ := store.(AdvancedDeleter)
advancedQuerier, _ := store.(AdvancedQuerier)
func (s *Server) handleWebsocket(w http.ResponseWriter, r *http.Request) {
store := s.relay.Storage()
advancedDeleter, _ := store.(AdvancedDeleter)
advancedQuerier, _ := store.(AdvancedQuerier)
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Warn().Err(err).Msg("failed to upgrade websocket")
return
}
ticker := time.NewTicker(pingPeriod)
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
s.Log.Errorf("failed to upgrade websocket: %v", err)
return
}
s.clientsMu.Lock()
defer s.clientsMu.Unlock()
s.clients[conn] = struct{}{}
ticker := time.NewTicker(pingPeriod)
ws := &WebSocket{conn: conn}
ws := &WebSocket{conn: conn}
// reader
go func() {
defer func() {
ticker.Stop()
// reader
go func() {
defer func() {
ticker.Stop()
s.clientsMu.Lock()
if _, ok := s.clients[conn]; ok {
conn.Close()
}()
delete(s.clients, conn)
}
s.clientsMu.Unlock()
}()
conn.SetReadLimit(maxMessageSize)
conn.SetReadLimit(maxMessageSize)
conn.SetReadDeadline(time.Now().Add(pongWait))
conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(pongWait))
conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
return nil
})
for {
typ, message, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(
err,
websocket.CloseGoingAway, // 1001
websocket.CloseNoStatusReceived, // 1005
websocket.CloseAbnormalClosure, // 1006
) {
log.Warn().Err(err).Str("ip", r.Header.Get("x-forwarded-for")).Msg("unexpected close error")
for {
typ, message, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(
err,
websocket.CloseGoingAway, // 1001
websocket.CloseNoStatusReceived, // 1005
websocket.CloseAbnormalClosure, // 1006
) {
s.Log.Warningf("unexpected close error from %s: %v", r.Header.Get("X-Forwarded-For"), err)
}
break
}
if typ == websocket.PingMessage {
ws.WriteMessage(websocket.PongMessage, nil)
continue
}
go func(message []byte) {
var notice string
defer func() {
if notice != "" {
ws.WriteJSON([]interface{}{"NOTICE", notice})
}
}()
var request []json.RawMessage
if err := json.Unmarshal(message, &request); err != nil {
// stop silently
return
}
if len(request) < 2 {
notice = "request has less than 2 parameters"
return
}
var typ string
json.Unmarshal(request[0], &typ)
switch typ {
case "EVENT":
// it's a new event
var evt nostr.Event
if err := json.Unmarshal(request[1], &evt); err != nil {
notice = "failed to decode event: " + err.Error()
return
}
// check serialization
serialized := evt.Serialize()
// assign ID
hash := sha256.Sum256(serialized)
evt.ID = hex.EncodeToString(hash[:])
// check signature (requires the ID to be set)
if ok, err := evt.CheckSignature(); err != nil {
ws.WriteJSON([]interface{}{"OK", evt.ID, false, "error: failed to verify signature"})
return
} else if !ok {
ws.WriteJSON([]interface{}{"OK", evt.ID, false, "invalid: signature is invalid"})
return
}
if evt.Kind == 5 {
// event deletion -- nip09
for _, tag := range evt.Tags {
if len(tag) >= 2 && tag[0] == "e" {
if advancedDeleter != nil {
advancedDeleter.BeforeDelete(tag[1], evt.PubKey)
}
if err := store.DeleteEvent(tag[1], evt.PubKey); err != nil {
ws.WriteJSON([]interface{}{"OK", evt.ID, false, fmt.Sprintf("error: %s", err.Error())})
return
}
if advancedDeleter != nil {
advancedDeleter.AfterDelete(tag[1], evt.PubKey)
}
}
}
return
}
ok, message := AddEvent(s.relay, evt)
ws.WriteJSON([]interface{}{"OK", evt.ID, ok, message})
break
case "REQ":
var id string
json.Unmarshal(request[1], &id)
if id == "" {
notice = "REQ has no <id>"
return
}
filters := make(nostr.Filters, len(request)-2)
for i, filterReq := range request[2:] {
if err := json.Unmarshal(
filterReq,
&filters[i],
); err != nil {
notice = "failed to decode filter"
return
}
filter := &filters[i]
if advancedQuerier != nil {
advancedQuerier.BeforeQuery(filter)
}
events, err := store.QueryEvents(filter)
if err != nil {
s.Log.Errorf("store: %v", err)
continue
}
if advancedQuerier != nil {
advancedQuerier.AfterQuery(events, filter)
}
if filter.Limit > 0 && len(events) > filter.Limit {
events = events[0:filter.Limit]
}
for _, event := range events {
ws.WriteJSON([]interface{}{"EVENT", id, event})
}
ws.WriteJSON([]interface{}{"EOSE", id})
}
setListener(id, ws, filters)
break
case "CLOSE":
var id string
json.Unmarshal(request[1], &id)
if id == "" {
notice = "CLOSE has no <id>"
return
}
removeListener(ws, id)
break
default:
if cwh, ok := s.relay.(CustomWebSocketHandler); ok {
cwh.HandleUnknownType(ws, typ, request)
} else {
notice = "unknown message type " + typ
}
return
}
}(message)
}
}()
if typ == websocket.PingMessage {
ws.WriteMessage(websocket.PongMessage, nil)
continue
}
go func(message []byte) {
var notice string
defer func() {
if notice != "" {
ws.WriteJSON([]interface{}{"NOTICE", notice})
}
}()
var request []json.RawMessage
if err := json.Unmarshal(message, &request); err != nil {
// stop silently
return
}
if len(request) < 2 {
notice = "request has less than 2 parameters"
return
}
var typ string
json.Unmarshal(request[0], &typ)
switch typ {
case "EVENT":
// it's a new event
var evt nostr.Event
if err := json.Unmarshal(request[1], &evt); err != nil {
notice = "failed to decode event: " + err.Error()
return
}
// check serialization
serialized := evt.Serialize()
// assign ID
hash := sha256.Sum256(serialized)
evt.ID = hex.EncodeToString(hash[:])
// check signature (requires the ID to be set)
if ok, err := evt.CheckSignature(); err != nil {
ws.WriteJSON([]interface{}{"OK", evt.ID, false, "error: failed to verify signature"})
return
} else if !ok {
ws.WriteJSON([]interface{}{"OK", evt.ID, false, "invalid: signature is invalid"})
return
}
if evt.Kind == 5 {
// event deletion -- nip09
for _, tag := range evt.Tags {
if len(tag) >= 2 && tag[0] == "e" {
if advancedDeleter != nil {
advancedDeleter.BeforeDelete(tag[1], evt.PubKey)
}
if err := store.DeleteEvent(tag[1], evt.PubKey); err != nil {
ws.WriteJSON([]interface{}{"OK", evt.ID, false, fmt.Sprintf("error: %s", err.Error())})
return
}
if advancedDeleter != nil {
advancedDeleter.AfterDelete(tag[1], evt.PubKey)
}
}
}
return
}
ok, message := AddEvent(relay, evt)
ws.WriteJSON([]interface{}{"OK", evt.ID, ok, message})
break
case "REQ":
var id string
json.Unmarshal(request[1], &id)
if id == "" {
notice = "REQ has no <id>"
return
}
filters := make(nostr.Filters, len(request)-2)
for i, filterReq := range request[2:] {
if err := json.Unmarshal(
filterReq,
&filters[i],
); err != nil {
notice = "failed to decode filter"
return
}
filter := &filters[i]
if advancedQuerier != nil {
advancedQuerier.BeforeQuery(filter)
}
events, err := store.QueryEvents(filter)
if err == nil {
if advancedQuerier != nil {
advancedQuerier.AfterQuery(events, filter)
}
if filter.Limit > 0 && len(events) > filter.Limit {
events = events[0:filter.Limit]
}
for _, event := range events {
ws.WriteJSON([]interface{}{"EVENT", id, event})
}
ws.WriteJSON([]interface{}{"EOSE", id})
}
}
setListener(id, ws, filters)
break
case "CLOSE":
var id string
json.Unmarshal(request[1], &id)
if id == "" {
notice = "CLOSE has no <id>"
return
}
removeListener(ws, id)
break
default:
if cwh, ok := relay.(CustomWebSocketHandler); ok {
cwh.HandleUnknownType(ws, typ, request)
} else {
notice = "unknown message type " + typ
}
return
}
}(message)
}
// writer
go func() {
defer func() {
ticker.Stop()
conn.Close()
}()
// writer
go func() {
defer func() {
ticker.Stop()
conn.Close()
}()
for {
select {
case <-ticker.C:
err := ws.WriteMessage(websocket.PingMessage, nil)
if err != nil {
log.Warn().Err(err).Msg("error writing ping, closing websocket")
return
}
for {
select {
case <-ticker.C:
err := ws.WriteMessage(websocket.PingMessage, nil)
if err != nil {
s.Log.Errorf("error writing ping: %v; closing websocket", err)
return
}
}
}()
}
}
}()
}
func handleNIP11(relay Relay) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
func (s *Server) handleNIP11(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
info := nip11.RelayInformationDocument{
Name: relay.Name(),
Description: "relay powered by the relayer framework",
PubKey: "~",
Contact: "~",
SupportedNIPs: []int{9, 15, 16},
Software: "https://github.com/fiatjaf/relayer",
Version: "~",
}
if ifmer, ok := relay.(Informationer); ok {
info = ifmer.GetNIP11InformationDocument()
}
json.NewEncoder(w).Encode(info)
info := nip11.RelayInformationDocument{
Name: s.relay.Name(),
Description: "relay powered by the relayer framework",
PubKey: "~",
Contact: "~",
SupportedNIPs: []int{9, 15, 16},
Software: "https://github.com/fiatjaf/relayer",
Version: "~",
}
if ifmer, ok := s.relay.(Informationer); ok {
info = ifmer.GetNIP11InformationDocument()
}
json.NewEncoder(w).Encode(info)
}

View File

@ -1,20 +1,31 @@
package relayer
import (
"context"
"encoding/json"
"github.com/nbd-wtf/go-nostr"
"github.com/nbd-wtf/go-nostr/nip11"
)
var Log = log
// relay
// Relay is the main interface for implementing a nostr relay.
type Relay interface {
// Name is used as the "name" field in NIP-11 and as a prefix in default Server logging.
// For other NIP-11 fields, see [Informationer].
Name() string
// Init is called at the very beginning by [Server.Start], allowing a relay
// to initialize its internal resources.
// Also see [Storage.Init].
Init() error
OnInitialized()
// OnInitialized is called by [Server.Start] right before starting to serve HTTP requests.
// It is passed the server to allow callers make final adjustments, such as custom routing.
OnInitialized(*Server)
// AcceptEvent is called for every nostr event received by the server.
// If the returned value is true, the event is passed on to [Storage.SaveEvent].
// Otherwise, the server responds with a negative and "blocked" message as described
// in NIP-20.
AcceptEvent(*nostr.Event) bool
// Storage returns the relay storage implementation.
Storage() Storage
}
@ -22,33 +33,60 @@ type Injector interface {
InjectEvents() chan nostr.Event
}
// Informationer is called to compose NIP-11 response to an HTTP request
// with application/nostr+json mime type.
// See also [Relay.Name].
type Informationer interface {
GetNIP11InformationDocument() nip11.RelayInformationDocument
}
// CustomWebSocketHandler, if implemented, is passed nostr message types unrecognized
// by the server.
// The server handles "EVENT", "REQ" and "CLOSE" messages, as described in NIP-01.
type CustomWebSocketHandler interface {
HandleUnknownType(ws *WebSocket, typ string, request []json.RawMessage)
}
// storage
// ShutdownAware is called during the server shutdown.
// See [Server.Shutdown] for details.
type ShutdownAware interface {
OnShutdown(context.Context)
}
// Logger is what [Server] uses to log messages.
type Logger interface {
Infof(format string, v ...any)
Warningf(format string, v ...any)
Errorf(format string, v ...any)
}
// Storage is a persistence layer for nostr events handled by a relay.
type Storage interface {
// Init is called at the very beginning by [Server.Start], after [Relay.Init],
// allowing a storage to initialize its internal resources.
Init() error
// QueryEvents is invoked upon a client's REQ as described in NIP-01.
QueryEvents(filter *nostr.Filter) (events []nostr.Event, err error)
// DeleteEvent is used to handle deletion events, as per NIP-09.
DeleteEvent(id string, pubkey string) error
// SaveEvent is called once Relay.AcceptEvent reports true.
SaveEvent(event *nostr.Event) error
}
// AdvancedQuerier methods are called before and after [Storage.QueryEvents].
type AdvancedQuerier interface {
BeforeQuery(*nostr.Filter)
AfterQuery([]nostr.Event, *nostr.Filter)
}
// AdvancedDeleter methods are called before and after [Storage.DeleteEvent].
type AdvancedDeleter interface {
BeforeDelete(id string, pubkey string)
AfterDelete(id string, pubkey string)
}
// AdvancedSaver methods are called before and after [Storage.SaveEvent].
type AdvancedSaver interface {
BeforeSave(*nostr.Event)
AfterSave(*nostr.Event)

View File

@ -3,10 +3,10 @@ package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"github.com/nbd-wtf/go-nostr"
"github.com/fiatjaf/relayer"
. "github.com/stevelacy/daz"
)
@ -123,7 +123,7 @@ func handleCreateFeed(w http.ResponseWriter, r *http.Request) {
return
}
relayer.Log.Info().Str("url", feedurl).Str("pubkey", pubkey).Msg("saved feed")
log.Printf("saved feed at url %q as pubkey %s", feedurl, pubkey)
fmt.Fprintf(w, "url : %s\npubkey: %s", feedurl, pubkey)
return

View File

@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
"sync"
"time"
@ -29,7 +30,10 @@ func (relay *Relay) Name() string {
return "relayer-rss-bridge"
}
func (r *Relay) OnInitialized() {}
func (r *Relay) OnInitialized(s *relayer.Server) {
s.Router().Path("/").HandlerFunc(handleWebpage)
s.Router().Path("/create").HandlerFunc(handleCreateFeed)
}
func (relay *Relay) Init() error {
err := envconfig.Process("", relay)
@ -38,20 +42,16 @@ func (relay *Relay) Init() error {
}
if db, err := pebble.Open("db", nil); err != nil {
relayer.Log.Fatal().Err(err).Str("path", "db").Msg("failed to open db")
log.Fatalf("failed to open db: %v", err)
} else {
relay.db = db
}
relayer.Router.Path("/").HandlerFunc(handleWebpage)
relayer.Router.Path("/create").HandlerFunc(handleCreateFeed)
go func() {
time.Sleep(20 * time.Minute)
filters := relayer.GetListeningFilters()
relayer.Log.Info().Int("filters active", len(filters)).
Msg("checking for updates")
log.Printf("checking for updates; %d filters active", len(filters))
for _, filter := range filters {
if filter.Kinds == nil || filter.Kinds.Contains(nostr.KindTextNote) {
@ -61,16 +61,13 @@ func (relay *Relay) Init() error {
var entity Entity
if err := json.Unmarshal(val, &entity); err != nil {
relayer.Log.Error().Err(err).Str("key", pubkey).
Str("val", string(val)).
Msg("got invalid json from db")
log.Printf("got invalid json from db at key %s: %v", pubkey, err)
continue
}
feed, err := parseFeed(entity.URL)
if err != nil {
relayer.Log.Warn().Err(err).Str("url", entity.URL).
Msg("failed to parse feed")
log.Printf("failed to parse feed at url %q: %v", entity.URL, err)
continue
}
@ -121,15 +118,13 @@ func (b store) QueryEvents(filter *nostr.Filter) ([]nostr.Event, error) {
var entity Entity
if err := json.Unmarshal(val, &entity); err != nil {
relayer.Log.Error().Err(err).Str("key", pubkey).Str("val", string(val)).
Msg("got invalid json from db")
log.Printf("got invalid json from db at key %s: %v", pubkey, err)
continue
}
feed, err := parseFeed(entity.URL)
if err != nil {
relayer.Log.Warn().Err(err).Str("url", entity.URL).
Msg("failed to parse feed")
log.Printf("failed to parse feed at url %q: %v", entity.URL, err)
continue
}
@ -182,6 +177,6 @@ func (relay *Relay) InjectEvents() chan nostr.Event {
func main() {
if err := relayer.Start(relay); err != nil {
relayer.Log.Fatal().Err(err).Msg("server terminated")
log.Fatalf("server terminated: %v", err)
}
}

202
start.go
View File

@ -1,29 +1,27 @@
package relayer
import (
"net"
"context"
"fmt"
"log"
"net"
"net/http"
"os"
"sync"
"time"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/kelseyhightower/envconfig"
"github.com/rs/cors"
"github.com/rs/zerolog"
)
// Settings specify initial startup parameters for a relay server.
// See StartConf for details.
// Settings specify initial startup parameters for Start and StartConf.
type Settings struct {
Host string `envconfig:"HOST" default:"0.0.0.0"`
Port string `envconfig:"PORT" default:"7447"`
}
var log = zerolog.New(os.Stderr).Output(zerolog.ConsoleWriter{Out: os.Stderr})
var Router = mux.NewRouter()
// Start calls StartConf with Settings parsed from the process environment.
func Start(relay Relay) error {
var s Settings
@ -33,34 +31,111 @@ func Start(relay Relay) error {
return StartConf(s, relay)
}
// StartConf initalizes the relay and its storage using their respective Init methods,
// returning any non-nil errors, and starts listening for HTTP requests on host:port otherwise,
// as specified in the settings.
//
// StartConf never returns until termination of the underlying http.Server, forwarding
// any but http.ErrServerClosed error from the server's ListenAndServe.
// StartConf creates a new Server, passing it host:port for the address,
// and starts serving propagating any error returned from [Server.Start].
func StartConf(s Settings, relay Relay) error {
// allow implementations to do initialization stuff
if err := relay.Init(); err != nil {
addr := net.JoinHostPort(s.Host, s.Port)
srv := NewServer(addr, relay)
return srv.Start()
}
// Server is a base for package users to implement nostr relays.
// It can serve HTTP requests and websockets, passing control over to a relay implementation.
//
// To implement a relay, it is enough to satisfy [Relay] interface. Other interfaces are
// [Informationer], [CustomWebSocketHandler], [ShutdownAware] and AdvancedXxx types.
// See their respective doc comments.
//
// The basic usage is to call Start or StartConf, which starts serving immediately.
// For a more fine-grained control, use NewServer.
// See [basic/main.go], [whitelisted/main.go], [expensive/main.go] and [rss-bridge/main.go]
// for example implementations.
//
// The following resource is a good starting point for details on what nostr protocol is
// and how it works: https://github.com/nostr-protocol/nostr
type Server struct {
// Default logger, as set by NewServer, is a stdlib logger prefixed with [Relay.Name],
// outputting to stderr.
Log Logger
addr string
relay Relay
router *mux.Router
httpServer *http.Server // set at Server.Start
// keep a connection reference to all connected clients for Server.Shutdown
clientsMu sync.Mutex
clients map[*websocket.Conn]struct{}
}
// NewServer creates a relay server with sensible defaults.
// The provided address is used to listen and respond to HTTP requests.
func NewServer(addr string, relay Relay) *Server {
srv := &Server{
Log: defaultLogger(relay.Name() + ": "),
addr: addr,
relay: relay,
router: mux.NewRouter(),
clients: make(map[*websocket.Conn]struct{}),
}
srv.router.Path("/").Headers("Upgrade", "websocket").HandlerFunc(srv.handleWebsocket)
srv.router.Path("/").Headers("Accept", "application/nostr+json").HandlerFunc(srv.handleNIP11)
return srv
}
// Router returns an http.Handler used to handle server's in-flight HTTP requests.
// By default, the router is setup to handle websocket upgrade and NIP-11 requests.
//
// In a larger system, where the relay server is not the only HTTP handler,
// prefer using s as http.Handler instead of the returned router.
func (s *Server) Router() *mux.Router {
return s.router
}
// Addr returns Server's HTTP listener address in host:port form.
// If the initial port value provided in NewServer is 0, the actual port
// value is picked at random and available by the time [Relay.OnInitialized]
// is called.
func (s *Server) Addr() string {
return s.addr
}
// ServeHTTP implements http.Handler interface.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.router.ServeHTTP(w, r)
}
// Start initializes the relay and its storage using their respective Init methods,
// returning any non-nil errors, and starts listening for HTTP requests on the address
// provided to NewServer.
//
// Just before starting to serve HTTP requests, Start calls Relay.OnInitialized
// allowing package users to make last adjustments, such as setting up custom HTTP
// handlers using s.Router.
//
// Start never returns until termination of the underlying http.Server, forwarding
// any but http.ErrServerClosed error from the server's ListenAndServe.
// To terminate the server, call Shutdown.
func (s *Server) Start() error {
ln, err := net.Listen("tcp", s.addr)
if err != nil {
return err
}
s.addr = ln.Addr().String()
return s.startListener(ln)
}
func (s *Server) startListener(ln net.Listener) error {
// init the relay
if err := s.relay.Init(); err != nil {
return fmt.Errorf("relay init: %w", err)
}
// initialize storage
if err := relay.Storage().Init(); err != nil {
if err := s.relay.Storage().Init(); err != nil {
return fmt.Errorf("storage init: %w", err)
}
// expose this Log instance so implementations can use it
Log = log.With().Str("name", relay.Name()).Logger()
// catch the websocket call before anything else
Router.Path("/").Headers("Upgrade", "websocket").HandlerFunc(handleWebsocket(relay))
// nip-11, relay information
Router.Path("/").Headers("Accept", "application/nostr+json").HandlerFunc(handleNIP11(relay))
// wait for events to come from implementations, if this is implemented
if inj, ok := relay.(Injector); ok {
// push events from implementations, if any
if inj, ok := s.relay.(Injector); ok {
go func() {
for event := range inj.InjectEvents() {
notifyListeners(&event)
@ -68,21 +143,58 @@ func StartConf(s Settings, relay Relay) error {
}()
}
relay.OnInitialized()
s.httpServer = &http.Server{
Handler: cors.Default().Handler(s),
Addr: s.addr,
WriteTimeout: 2 * time.Second,
ReadTimeout: 2 * time.Second,
IdleTimeout: 30 * time.Second,
}
s.httpServer.RegisterOnShutdown(s.disconnectAllClients)
// final callback, just before serving http
s.relay.OnInitialized(s)
// start http server
srv := &http.Server{
Handler: cors.Default().Handler(Router),
Addr: net.JoinHostPort(s.Host, s.Port),
WriteTimeout: 2 * time.Second,
ReadTimeout: 2 * time.Second,
IdleTimeout: 30 * time.Second,
ReadHeaderTimeout: 2 * time.Second,
// start accepting incoming requests
s.Log.Infof("listening on %s", s.addr)
err := s.httpServer.Serve(ln)
if err == http.ErrServerClosed {
err = nil
}
log.Debug().Str("addr", srv.Addr).Msg("listening")
srvErr := srv.ListenAndServe()
if srvErr == http.ErrServerClosed {
srvErr = nil
}
return srvErr
return err
}
// Shutdown stops serving HTTP requests and send a websocket close control message
// to all connected clients.
//
// If the relay is ShutdownAware, Shutdown calls its OnShutdown, passing the context as is.
// Note that the HTTP server make some time to shutdown and so the context deadline,
// if any, may have been shortened by the time OnShutdown is called.
func (s *Server) Shutdown(ctx context.Context) error {
err := s.httpServer.Shutdown(ctx)
if f, ok := s.relay.(ShutdownAware); ok {
f.OnShutdown(ctx)
}
return err
}
func (s *Server) disconnectAllClients() {
s.clientsMu.Lock()
defer s.clientsMu.Unlock()
for conn := range s.clients {
conn.WriteControl(websocket.CloseMessage, nil, time.Now().Add(time.Second))
conn.Close()
delete(s.clients, conn)
}
}
func defaultLogger(prefix string) Logger {
l := log.New(os.Stderr, "", log.LstdFlags|log.Lmsgprefix)
l.SetPrefix(prefix)
return stdLogger{l}
}
type stdLogger struct{ log *log.Logger }
func (l stdLogger) Infof(format string, v ...any) { l.log.Printf(format, v...) }
func (l stdLogger) Warningf(format string, v ...any) { l.log.Printf(format, v...) }
func (l stdLogger) Errorf(format string, v ...any) { l.log.Printf(format, v...) }

106
start_test.go Normal file
View File

@ -0,0 +1,106 @@
package relayer
import (
"context"
"net/http"
"testing"
"time"
"github.com/gorilla/websocket"
"github.com/nbd-wtf/go-nostr"
)
func TestServerStartShutdown(t *testing.T) {
var (
serverHost string
inited bool
storeInited bool
shutdown bool
)
ready := make(chan struct{})
rl := &testRelay{
name: "test server start",
init: func() error {
inited = true
return nil
},
onInitialized: func(s *Server) {
serverHost = s.Addr()
close(ready)
},
onShutdown: func(context.Context) { shutdown = true },
storage: &testStorage{
init: func() error { storeInited = true; return nil },
},
}
srv := NewServer("127.0.0.1:0", rl)
done := make(chan error)
go func() { done <- srv.Start(); close(done) }()
// verify everything's initialized
select {
case <-ready:
// continue
case <-time.After(time.Second):
t.Fatal("srv.Start too long to initialize")
}
if !inited {
t.Error("didn't call testRelay.init")
}
if !storeInited {
t.Error("didn't call testStorage.init")
}
// check that http requests are served
if _, err := http.Get("http://" + serverHost); err != nil {
t.Errorf("GET %s: %v", serverHost, err)
}
// verify server shuts down
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
t.Errorf("srv.Shutdown: %v", err)
}
if !shutdown {
t.Error("didn't call testRelay.onShutdown")
}
select {
case err := <-done:
if err != nil {
t.Errorf("srv.Start: %v", err)
}
case <-time.After(time.Second):
t.Error("srv.Start too long to return")
}
}
func TestServerShutdownWebsocket(t *testing.T) {
// set up a new relay server
srv := startTestRelay(t, &testRelay{storage: &testStorage{}})
// connect a client to it
ctx1, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
client, err := nostr.RelayConnectContext(ctx1, "ws://"+srv.Addr())
if err != nil {
t.Fatalf("nostr.RelayConnectContext: %v", err)
}
// now, shut down the server
ctx2, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := srv.Shutdown(ctx2); err != nil {
t.Errorf("srv.Shutdown: %v", err)
}
// wait for the client to receive a "connection close"
select {
case err := <-client.ConnectionError:
if _, ok := err.(*websocket.CloseError); !ok {
t.Errorf("client.ConnextionError: %v (%T); want websocket.CloseError", err, err)
}
case <-time.After(2 * time.Second):
t.Error("client took too long to disconnect")
}
}

View File

@ -1,7 +1,6 @@
package postgresql
import (
"github.com/fiatjaf/relayer"
"github.com/jmoiron/sqlx"
"github.com/jmoiron/sqlx/reflectx"
_ "github.com/lib/pq"
@ -41,8 +40,5 @@ CREATE INDEX IF NOT EXISTS timeidx ON event (created_at);
CREATE INDEX IF NOT EXISTS kindidx ON event (kind);
CREATE INDEX IF NOT EXISTS arbitrarytagvalues ON event USING gin (tagvalues);
`)
if err != nil {
relayer.Log.Print(err)
}
return nil
return err
}

View File

@ -10,7 +10,6 @@ import (
"time"
"github.com/nbd-wtf/go-nostr"
"github.com/fiatjaf/relayer"
)
func (b PostgresBackend) QueryEvents(filter *nostr.Filter) (events []nostr.Event, err error) {
@ -137,11 +136,7 @@ func (b PostgresBackend) QueryEvents(filter *nostr.Filter) (events []nostr.Event
rows, err := b.DB.Query(query, params...)
if err != nil && err != sql.ErrNoRows {
relayer.Log.Warn().Err(err).
Interface("filter", filter).
Str("query", query).
Msg("failed to fetch events")
return nil, fmt.Errorf("failed to fetch events: %w", err)
return nil, fmt.Errorf("failed to fetch events using query %q: %w", query, err)
}
for rows.Next() {

104
util_test.go Normal file
View File

@ -0,0 +1,104 @@
package relayer
import (
"context"
"testing"
"time"
"github.com/nbd-wtf/go-nostr"
)
func startTestRelay(t *testing.T, tr *testRelay) *Server {
t.Helper()
ready := make(chan struct{})
onInitializedFn := tr.onInitialized
tr.onInitialized = func(s *Server) {
close(ready)
if onInitializedFn != nil {
onInitializedFn(s)
}
}
srv := NewServer("127.0.0.1:0", tr)
go srv.Start()
select {
case <-ready:
case <-time.After(time.Second):
t.Fatal("server took too long to start up")
}
return srv
}
type testRelay struct {
name string
storage Storage
init func() error
onInitialized func(*Server)
onShutdown func(context.Context)
acceptEvent func(*nostr.Event) bool
}
func (tr *testRelay) Name() string { return tr.name }
func (tr *testRelay) Storage() Storage { return tr.storage }
func (tr *testRelay) Init() error {
if fn := tr.init; fn != nil {
return fn()
}
return nil
}
func (tr *testRelay) OnInitialized(s *Server) {
if fn := tr.onInitialized; fn != nil {
fn(s)
}
}
func (tr *testRelay) OnShutdown(ctx context.Context) {
if fn := tr.onShutdown; fn != nil {
fn(ctx)
}
}
func (tr *testRelay) AcceptEvent(e *nostr.Event) bool {
if fn := tr.acceptEvent; fn != nil {
return fn(e)
}
return true
}
type testStorage struct {
init func() error
queryEvents func(*nostr.Filter) ([]nostr.Event, error)
deleteEvent func(id string, pubkey string) error
saveEvent func(*nostr.Event) error
}
func (st *testStorage) Init() error {
if fn := st.init; fn != nil {
return fn()
}
return nil
}
func (st *testStorage) QueryEvents(f *nostr.Filter) ([]nostr.Event, error) {
if fn := st.queryEvents; fn != nil {
return fn(f)
}
return nil, nil
}
func (st *testStorage) DeleteEvent(id string, pubkey string) error {
if fn := st.deleteEvent; fn != nil {
return fn(id, pubkey)
}
return nil
}
func (st *testStorage) SaveEvent(e *nostr.Event) error {
if fn := st.saveEvent; fn != nil {
return fn(e)
}
return nil
}

View File

@ -2,6 +2,7 @@ package main
import (
"encoding/json"
"log"
"github.com/fiatjaf/relayer"
"github.com/fiatjaf/relayer/storage/postgresql"
@ -20,7 +21,7 @@ func (r *Relay) Name() string {
return "WhitelistedRelay"
}
func (r *Relay) OnInitialized() {}
func (r *Relay) OnInitialized(*relayer.Server) {}
func (r *Relay) Storage() relayer.Storage {
return r.storage
@ -55,11 +56,11 @@ func (r *Relay) AcceptEvent(evt *nostr.Event) bool {
func main() {
r := Relay{}
if err := envconfig.Process("", &r); err != nil {
relayer.Log.Fatal().Err(err).Msg("failed to read from env")
log.Fatalf("failed to read from env: %v", err)
return
}
r.storage = &postgresql.PostgresBackend{DatabaseURL: r.PostgresDatabase}
if err := relayer.Start(&r); err != nil {
relayer.Log.Fatal().Err(err).Msg("server terminated")
log.Fatalf("server terminated: %v", err)
}
}