diff --git a/add-event.go b/add-event.go index d7987f9..b686258 100644 --- a/add-event.go +++ b/add-event.go @@ -1,17 +1,18 @@ package relayer import ( + "context" "fmt" "github.com/fiatjaf/relayer/storage" "github.com/nbd-wtf/go-nostr" ) -func AddEvent(relay Relay, evt nostr.Event) (accepted bool, message string) { - store := relay.Storage() +func AddEvent(ctx context.Context, relay Relay, evt nostr.Event) (accepted bool, message string) { + store := relay.Storage(ctx) advancedSaver, _ := store.(AdvancedSaver) - if !relay.AcceptEvent(&evt) { + if !relay.AcceptEvent(ctx, &evt) { return false, "blocked: event blocked by relay" } @@ -19,10 +20,10 @@ func AddEvent(relay Relay, evt nostr.Event) (accepted bool, message string) { // do not store ephemeral events } else { if advancedSaver != nil { - advancedSaver.BeforeSave(&evt) + advancedSaver.BeforeSave(ctx, &evt) } - if saveErr := store.SaveEvent(&evt); saveErr != nil { + if saveErr := store.SaveEvent(ctx, &evt); saveErr != nil { switch saveErr { case storage.ErrDupEvent: return true, saveErr.Error() diff --git a/go.mod b/go.mod index 9830496..167631e 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/lib/pq v1.10.3 github.com/mattn/go-sqlite3 v1.14.6 github.com/mmcdole/gofeed v1.1.3 - github.com/nbd-wtf/go-nostr v0.13.2 + github.com/nbd-wtf/go-nostr v0.17.1 github.com/rif/cache2go v1.0.0 github.com/rs/cors v1.7.0 github.com/stevelacy/daz v0.1.4 @@ -32,7 +32,7 @@ require ( github.com/andybalholm/cascadia v1.3.1 // indirect github.com/btcsuite/btcd v0.23.1 // indirect github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect - github.com/btcsuite/btcd/btcutil v1.1.1 // indirect + github.com/btcsuite/btcd/btcutil v1.1.3 // indirect github.com/btcsuite/btcd/btcutil/psbt v1.1.4 // indirect github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 // indirect github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect @@ -59,6 +59,7 @@ require ( github.com/go-errors/errors v1.0.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.11 // indirect github.com/kkdai/bstream v1.0.0 // indirect github.com/klauspost/compress v1.13.6 // indirect @@ -72,6 +73,7 @@ require ( github.com/lightningnetwork/lnd/ticker v1.1.0 // indirect github.com/lightningnetwork/lnd/tlv v1.0.3 // indirect github.com/lightningnetwork/lnd/tor v1.0.1 // indirect + github.com/mailru/easyjson v0.7.7 // indirect github.com/miekg/dns v1.1.43 // indirect github.com/mmcdole/goxpp v0.0.0-20181012175147-0068e33feabf // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect @@ -79,7 +81,6 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect - github.com/valyala/fastjson v1.6.4 // indirect golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f // indirect golang.org/x/sys v0.1.0 // indirect diff --git a/go.sum b/go.sum index 3186f35..2fe363f 100644 --- a/go.sum +++ b/go.sum @@ -47,8 +47,9 @@ github.com/btcsuite/btcd/btcec/v2 v2.3.2 h1:5n0X6hX0Zk+6omWcihdYvdAlGf2DfasC0GMf github.com/btcsuite/btcd/btcec/v2 v2.3.2/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04= github.com/btcsuite/btcd/btcutil v1.0.0/go.mod h1:Uoxwv0pqYWhD//tfTiipkxNfdhG9UrLwaeswfjfdF0A= github.com/btcsuite/btcd/btcutil v1.1.0/go.mod h1:5OapHB7A2hBBWLm48mmw4MOHNJCcUBTwmWH/0Jn8VHE= -github.com/btcsuite/btcd/btcutil v1.1.1 h1:hDcDaXiP0uEzR8Biqo2weECKqEw0uHDZ9ixIWevVQqY= github.com/btcsuite/btcd/btcutil v1.1.1/go.mod h1:nbKlBMNm9FGsdvKvu0essceubPiAcI57pYBNnsLAa34= +github.com/btcsuite/btcd/btcutil v1.1.3 h1:xfbtw8lwpp0G6NwSHb+UE67ryTFHJAiNuipusjXSohQ= +github.com/btcsuite/btcd/btcutil v1.1.3/go.mod h1:UR7dsSJzJUfMmFiiLlIrMq1lS9jh9EdCV7FStZSnpi0= github.com/btcsuite/btcd/btcutil/psbt v1.1.4 h1:Edx4AfBn+YPam2KP5AobDitulGp4r1Oibm8oruzkMdI= github.com/btcsuite/btcd/btcutil/psbt v1.1.4/go.mod h1:9AyU6EQVJ9Iw9zPyNT1lcdHd6cnEZdno5wLu5FY74os= github.com/btcsuite/btcd/chaincfg/chainhash v1.0.0/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc= @@ -237,6 +238,8 @@ github.com/jgroeneveld/trial v2.0.0+incompatible/go.mod h1:I6INLW96EN8WysNBXUFI3 github.com/jmoiron/sqlx v1.3.1 h1:aLN7YINNZ7cYOPK3QC83dbM6KT0NMqVMw961TqrejlE= github.com/jmoiron/sqlx v1.3.1/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ= github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9qUBdQ= +github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/jrick/logrotate v1.0.0 h1:lQ1bL/n9mBNeIXoTUoYRlK4dHuNJVofX9oWqBtPnSzI= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= @@ -305,6 +308,8 @@ github.com/lightningnetwork/lnd/tor v1.0.1 h1:A11FrpU0Y//g+fA827W4VnjOeoIvExONdc github.com/lightningnetwork/lnd/tor v1.0.1/go.mod h1:RDtaAdwfAm+ONuPYwUhNIH1RAvKPv+75lHPOegUcz64= github.com/ltcsuite/ltcd v0.0.0-20190101042124-f37f8bf35796 h1:sjOGyegMIhvgfq5oaue6Td+hxZuf3tDC8lAPrFldqFw= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= @@ -335,8 +340,8 @@ github.com/moul/http2curl v1.0.0/go.mod h1:8UbvGypXm98wA/IqH45anm5Y2Z6ep6O31QGOA github.com/nats-io/nats.go v1.8.1/go.mod h1:BrFz9vVn0fU3AcH9Vn4Kd7W0NpJ651tD5omQ3M8LwxM= github.com/nats-io/nkeys v0.0.2/go.mod h1:dab7URMsZm6Z/jp9Z5UGa87Uutgc2mVpXLC4B7TDb/4= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/nbd-wtf/go-nostr v0.13.2 h1:w/TgXbkWqkZQsPRZffPZpvR/uskOSSUCGYhtW6I3xPI= -github.com/nbd-wtf/go-nostr v0.13.2/go.mod h1:qFFTIxh15H5GGN0WsBI/P73DteqsevnhSEW/yk8nEf4= +github.com/nbd-wtf/go-nostr v0.17.1 h1:dRyNNf1rx5vGYi9AH1A/mcKKQg8ZSiY8uRUJGObXPPI= +github.com/nbd-wtf/go-nostr v0.17.1/go.mod h1:YCDHJtaFQE76d1ZkcUsTkz3dYNP+bldo5CIQwXPPcbk= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nwaples/rardecode v1.1.2 h1:Cj0yZY6T1Zx1R7AhTbyGSALm44/Mmq+BAPc4B/p/d3M= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= @@ -411,8 +416,6 @@ github.com/urfave/cli v1.22.3/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.6.0/go.mod h1:FstJa9V+Pj9vQ7OJie2qMHdwemEDaDiSdBnvPM1Su9w= -github.com/valyala/fastjson v1.6.4 h1:uAUNq9Z6ymTgGhcm0UynUAB6tlbakBrz6CQFax3BXVQ= -github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY= github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= diff --git a/handlers.go b/handlers.go index d151e87..a636f74 100644 --- a/handlers.go +++ b/handlers.go @@ -38,10 +38,10 @@ var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } -func (s *Server) handleWebsocket(w http.ResponseWriter, r *http.Request) { - store := s.relay.Storage() +func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + store := s.relay.Storage(ctx) advancedDeleter, _ := store.(AdvancedDeleter) - advancedQuerier, _ := store.(AdvancedQuerier) conn, err := upgrader.Upgrade(w, r, nil) if err != nil { @@ -158,10 +158,10 @@ func (s *Server) handleWebsocket(w http.ResponseWriter, r *http.Request) { for _, tag := range evt.Tags { if len(tag) >= 2 && tag[0] == "e" { if advancedDeleter != nil { - advancedDeleter.BeforeDelete(tag[1], evt.PubKey) + advancedDeleter.BeforeDelete(ctx, tag[1], evt.PubKey) } - if err := store.DeleteEvent(tag[1], evt.PubKey); err != nil { + if err := store.DeleteEvent(ctx, tag[1], evt.PubKey); err != nil { ws.WriteJSON([]interface{}{"OK", evt.ID, false, fmt.Sprintf("error: %s", err.Error())}) return } @@ -174,7 +174,7 @@ func (s *Server) handleWebsocket(w http.ResponseWriter, r *http.Request) { return } - ok, message := AddEvent(s.relay, evt) + ok, message := AddEvent(ctx, s.relay, evt) ws.WriteJSON([]interface{}{"OK", evt.ID, ok, message}) case "REQ": @@ -222,33 +222,27 @@ func (s *Server) handleWebsocket(w http.ResponseWriter, r *http.Request) { } } - if advancedQuerier != nil { - advancedQuerier.BeforeQuery(filter) - } - - events, err := store.QueryEvents(filter) + events, err := store.QueryEvents(ctx, filter) if err != nil { s.Log.Errorf("store: %v", err) continue } - if advancedQuerier != nil { - advancedQuerier.AfterQuery(events, filter) + // ensures the client won't be bombarded with events in case Storage doesn't do limits right + if filter.Limit == 0 { + filter.Limit = 9999999999 } - - // this block should not trigger if the SQL query accounts for filter.Limit - // other implementations may be broken, and this ensures the client - // won't be bombarded. - if filter.Limit > 0 && len(events) > filter.Limit { - events = events[0:filter.Limit] - } - - for _, event := range events { + i := 0 + for event := range events { ws.WriteJSON([]interface{}{"EVENT", id, event}) + + i++ + if i > filter.Limit { + break + } } } - // moved EOSE out of for loop. - // otherwise subscriptions may be cancelled too early + ws.WriteJSON([]interface{}{"EOSE", id}) setListener(id, ws, filters) case "CLOSE": @@ -305,7 +299,7 @@ func (s *Server) handleWebsocket(w http.ResponseWriter, r *http.Request) { }() } -func (s *Server) handleNIP11(w http.ResponseWriter, r *http.Request) { +func (s *Server) HandleNIP11(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") supportedNIPs := []int{9, 11, 12, 15, 16, 20, 33} diff --git a/interface.go b/interface.go index 5d24cf2..1b4f336 100644 --- a/interface.go +++ b/interface.go @@ -17,16 +17,13 @@ type Relay interface { // to initialize its internal resources. // Also see [Storage.Init]. Init() error - // OnInitialized is called by [Server.Start] right before starting to serve HTTP requests. - // It is passed the server to allow callers make final adjustments, such as custom routing. - OnInitialized(*Server) // AcceptEvent is called for every nostr event received by the server. // If the returned value is true, the event is passed on to [Storage.SaveEvent]. // Otherwise, the server responds with a negative and "blocked" message as described // in NIP-20. - AcceptEvent(*nostr.Event) bool + AcceptEvent(context.Context, *nostr.Event) bool // Storage returns the relay storage implementation. - Storage() Storage + Storage(context.Context) Storage } // Auther is the interface for implementing NIP-42. @@ -73,27 +70,21 @@ type Storage interface { Init() error // QueryEvents is invoked upon a client's REQ as described in NIP-01. - QueryEvents(filter *nostr.Filter) (events []nostr.Event, err error) + QueryEvents(ctx context.Context, filter *nostr.Filter) (chan *nostr.Event, error) // DeleteEvent is used to handle deletion events, as per NIP-09. - DeleteEvent(id string, pubkey string) error + DeleteEvent(ctx context.Context, id string, pubkey string) error // SaveEvent is called once Relay.AcceptEvent reports true. - SaveEvent(event *nostr.Event) error -} - -// AdvancedQuerier methods are called before and after [Storage.QueryEvents]. -type AdvancedQuerier interface { - BeforeQuery(*nostr.Filter) - AfterQuery([]nostr.Event, *nostr.Filter) + SaveEvent(ctx context.Context, event *nostr.Event) error } // AdvancedDeleter methods are called before and after [Storage.DeleteEvent]. type AdvancedDeleter interface { - BeforeDelete(id string, pubkey string) + BeforeDelete(ctx context.Context, id string, pubkey string) AfterDelete(id string, pubkey string) } // AdvancedSaver methods are called before and after [Storage.SaveEvent]. type AdvancedSaver interface { - BeforeSave(*nostr.Event) + BeforeSave(context.Context, *nostr.Event) AfterSave(*nostr.Event) } diff --git a/start.go b/start.go index 4eb0849..101a971 100644 --- a/start.go +++ b/start.go @@ -4,41 +4,14 @@ import ( "context" "fmt" "log" - "net" "net/http" "os" "sync" "time" - "github.com/gorilla/mux" "github.com/gorilla/websocket" - "github.com/kelseyhightower/envconfig" - "github.com/rs/cors" ) -// Settings specify initial startup parameters for Start and StartConf. -type Settings struct { - Host string `envconfig:"HOST" default:"0.0.0.0"` - Port string `envconfig:"PORT" default:"7447"` -} - -// Start calls StartConf with Settings parsed from the process environment. -func Start(relay Relay) error { - var s Settings - if err := envconfig.Process("", &s); err != nil { - return fmt.Errorf("envconfig: %w", err) - } - return StartConf(s, relay) -} - -// StartConf creates a new Server, passing it host:port for the address, -// and starts serving propagating any error returned from [Server.Start]. -func StartConf(s Settings, relay Relay) error { - addr := net.JoinHostPort(s.Host, s.Port) - srv := NewServer(addr, relay) - return srv.Start() -} - // Server is a base for package users to implement nostr relays. // It can serve HTTP requests and websockets, passing control over to a relay implementation. // @@ -58,84 +31,33 @@ type Server struct { // outputting to stderr. Log Logger - addr string - relay Relay - router *mux.Router - httpServer *http.Server // set at Server.Start + addr string + relay Relay // keep a connection reference to all connected clients for Server.Shutdown clientsMu sync.Mutex clients map[*websocket.Conn]struct{} } -// NewServer creates a relay server with sensible defaults. -// The provided address is used to listen and respond to HTTP requests. -func NewServer(addr string, relay Relay) *Server { +// NewServer initializes the relay and its storage using their respective Init methods, +// returning any non-nil errors, and returns a Server ready to listen for HTTP requests. +func NewServer(relay Relay) (*Server, error) { srv := &Server{ Log: defaultLogger(relay.Name() + ": "), - addr: addr, relay: relay, - router: mux.NewRouter(), clients: make(map[*websocket.Conn]struct{}), } - srv.router.Path("/").Headers("Upgrade", "websocket").HandlerFunc(srv.handleWebsocket) - srv.router.Path("/").Headers("Accept", "application/nostr+json").HandlerFunc(srv.handleNIP11) - return srv -} -// Router returns an http.Handler used to handle server's in-flight HTTP requests. -// By default, the router is setup to handle websocket upgrade and NIP-11 requests. -// -// In a larger system, where the relay server is not the only HTTP handler, -// prefer using s as http.Handler instead of the returned router. -func (s *Server) Router() *mux.Router { - return s.router -} - -// Addr returns Server's HTTP listener address in host:port form. -// If the initial port value provided in NewServer is 0, the actual port -// value is picked at random and available by the time [Relay.OnInitialized] -// is called. -func (s *Server) Addr() string { - return s.addr -} - -// ServeHTTP implements http.Handler interface. -func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { - s.router.ServeHTTP(w, r) -} - -// Start initializes the relay and its storage using their respective Init methods, -// returning any non-nil errors, and starts listening for HTTP requests on the address -// provided to NewServer. -// -// Just before starting to serve HTTP requests, Start calls Relay.OnInitialized -// allowing package users to make last adjustments, such as setting up custom HTTP -// handlers using s.Router. -// -// Start never returns until termination of the underlying http.Server, forwarding -// any but http.ErrServerClosed error from the server's ListenAndServe. -// To terminate the server, call Shutdown. -func (s *Server) Start() error { - ln, err := net.Listen("tcp", s.addr) - if err != nil { - return err - } - s.addr = ln.Addr().String() - return s.startListener(ln) -} - -func (s *Server) startListener(ln net.Listener) error { // init the relay - if err := s.relay.Init(); err != nil { - return fmt.Errorf("relay init: %w", err) + if err := relay.Init(); err != nil { + return nil, fmt.Errorf("relay init: %w", err) } - if err := s.relay.Storage().Init(); err != nil { - return fmt.Errorf("storage init: %w", err) + if err := relay.Storage(context.Background()).Init(); err != nil { + return nil, fmt.Errorf("storage init: %w", err) } - // push events from implementations, if any - if inj, ok := s.relay.(Injector); ok { + // start listening from events from other sources, if any + if inj, ok := relay.(Injector); ok { go func() { for event := range inj.InjectEvents() { notifyListeners(&event) @@ -143,41 +65,24 @@ func (s *Server) startListener(ln net.Listener) error { }() } - s.httpServer = &http.Server{ - Handler: cors.Default().Handler(s), - Addr: s.addr, - WriteTimeout: 2 * time.Second, - ReadTimeout: 2 * time.Second, - IdleTimeout: 30 * time.Second, - } - s.httpServer.RegisterOnShutdown(s.disconnectAllClients) - // final callback, just before serving http - s.relay.OnInitialized(s) - - // start accepting incoming requests - s.Log.Infof("listening on %s", s.addr) - err := s.httpServer.Serve(ln) - if err == http.ErrServerClosed { - err = nil - } - return err + return srv, nil } -// Shutdown stops serving HTTP requests and send a websocket close control message -// to all connected clients. +// ServeHTTP implements http.Handler interface. +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Header.Get("Upgrade") == "websocket" { + s.HandleWebsocket(w, r) + } else if r.Header.Get("Accept") == "application/nostr+json" { + s.HandleNIP11(w, r) + } +} + +// Shutdown sends a websocket close control message to all connected clients. // // If the relay is ShutdownAware, Shutdown calls its OnShutdown, passing the context as is. // Note that the HTTP server make some time to shutdown and so the context deadline, // if any, may have been shortened by the time OnShutdown is called. -func (s *Server) Shutdown(ctx context.Context) error { - err := s.httpServer.Shutdown(ctx) - if f, ok := s.relay.(ShutdownAware); ok { - f.OnShutdown(ctx) - } - return err -} - -func (s *Server) disconnectAllClients() { +func (s *Server) Shutdown(ctx context.Context) { s.clientsMu.Lock() defer s.clientsMu.Unlock() for conn := range s.clients { @@ -185,6 +90,10 @@ func (s *Server) disconnectAllClients() { conn.Close() delete(s.clients, conn) } + + if f, ok := s.relay.(ShutdownAware); ok { + f.OnShutdown(ctx) + } } func defaultLogger(prefix string) Logger {