mirror of
https://github.com/fiatjaf/khatru.git
synced 2025-11-18 10:07:44 +01:00
v2 with breaking changes and a simpler API.
This commit is contained in:
11
add-event.go
11
add-event.go
@@ -1,17 +1,18 @@
|
||||
package relayer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/fiatjaf/relayer/storage"
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
)
|
||||
|
||||
func AddEvent(relay Relay, evt nostr.Event) (accepted bool, message string) {
|
||||
store := relay.Storage()
|
||||
func AddEvent(ctx context.Context, relay Relay, evt nostr.Event) (accepted bool, message string) {
|
||||
store := relay.Storage(ctx)
|
||||
advancedSaver, _ := store.(AdvancedSaver)
|
||||
|
||||
if !relay.AcceptEvent(&evt) {
|
||||
if !relay.AcceptEvent(ctx, &evt) {
|
||||
return false, "blocked: event blocked by relay"
|
||||
}
|
||||
|
||||
@@ -19,10 +20,10 @@ func AddEvent(relay Relay, evt nostr.Event) (accepted bool, message string) {
|
||||
// do not store ephemeral events
|
||||
} else {
|
||||
if advancedSaver != nil {
|
||||
advancedSaver.BeforeSave(&evt)
|
||||
advancedSaver.BeforeSave(ctx, &evt)
|
||||
}
|
||||
|
||||
if saveErr := store.SaveEvent(&evt); saveErr != nil {
|
||||
if saveErr := store.SaveEvent(ctx, &evt); saveErr != nil {
|
||||
switch saveErr {
|
||||
case storage.ErrDupEvent:
|
||||
return true, saveErr.Error()
|
||||
|
||||
7
go.mod
7
go.mod
@@ -16,7 +16,7 @@ require (
|
||||
github.com/lib/pq v1.10.3
|
||||
github.com/mattn/go-sqlite3 v1.14.6
|
||||
github.com/mmcdole/gofeed v1.1.3
|
||||
github.com/nbd-wtf/go-nostr v0.13.2
|
||||
github.com/nbd-wtf/go-nostr v0.17.1
|
||||
github.com/rif/cache2go v1.0.0
|
||||
github.com/rs/cors v1.7.0
|
||||
github.com/stevelacy/daz v0.1.4
|
||||
@@ -32,7 +32,7 @@ require (
|
||||
github.com/andybalholm/cascadia v1.3.1 // indirect
|
||||
github.com/btcsuite/btcd v0.23.1 // indirect
|
||||
github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect
|
||||
github.com/btcsuite/btcd/btcutil v1.1.1 // indirect
|
||||
github.com/btcsuite/btcd/btcutil v1.1.3 // indirect
|
||||
github.com/btcsuite/btcd/btcutil/psbt v1.1.4 // indirect
|
||||
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 // indirect
|
||||
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f // indirect
|
||||
@@ -59,6 +59,7 @@ require (
|
||||
github.com/go-errors/errors v1.0.1 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang/snappy v0.0.4 // indirect
|
||||
github.com/josharian/intern v1.0.0 // indirect
|
||||
github.com/json-iterator/go v1.1.11 // indirect
|
||||
github.com/kkdai/bstream v1.0.0 // indirect
|
||||
github.com/klauspost/compress v1.13.6 // indirect
|
||||
@@ -72,6 +73,7 @@ require (
|
||||
github.com/lightningnetwork/lnd/ticker v1.1.0 // indirect
|
||||
github.com/lightningnetwork/lnd/tlv v1.0.3 // indirect
|
||||
github.com/lightningnetwork/lnd/tor v1.0.1 // indirect
|
||||
github.com/mailru/easyjson v0.7.7 // indirect
|
||||
github.com/miekg/dns v1.1.43 // indirect
|
||||
github.com/mmcdole/goxpp v0.0.0-20181012175147-0068e33feabf // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
@@ -79,7 +81,6 @@ require (
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/tidwall/match v1.1.1 // indirect
|
||||
github.com/tidwall/pretty v1.2.0 // indirect
|
||||
github.com/valyala/fastjson v1.6.4 // indirect
|
||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
|
||||
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f // indirect
|
||||
golang.org/x/sys v0.1.0 // indirect
|
||||
|
||||
13
go.sum
13
go.sum
@@ -47,8 +47,9 @@ github.com/btcsuite/btcd/btcec/v2 v2.3.2 h1:5n0X6hX0Zk+6omWcihdYvdAlGf2DfasC0GMf
|
||||
github.com/btcsuite/btcd/btcec/v2 v2.3.2/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04=
|
||||
github.com/btcsuite/btcd/btcutil v1.0.0/go.mod h1:Uoxwv0pqYWhD//tfTiipkxNfdhG9UrLwaeswfjfdF0A=
|
||||
github.com/btcsuite/btcd/btcutil v1.1.0/go.mod h1:5OapHB7A2hBBWLm48mmw4MOHNJCcUBTwmWH/0Jn8VHE=
|
||||
github.com/btcsuite/btcd/btcutil v1.1.1 h1:hDcDaXiP0uEzR8Biqo2weECKqEw0uHDZ9ixIWevVQqY=
|
||||
github.com/btcsuite/btcd/btcutil v1.1.1/go.mod h1:nbKlBMNm9FGsdvKvu0essceubPiAcI57pYBNnsLAa34=
|
||||
github.com/btcsuite/btcd/btcutil v1.1.3 h1:xfbtw8lwpp0G6NwSHb+UE67ryTFHJAiNuipusjXSohQ=
|
||||
github.com/btcsuite/btcd/btcutil v1.1.3/go.mod h1:UR7dsSJzJUfMmFiiLlIrMq1lS9jh9EdCV7FStZSnpi0=
|
||||
github.com/btcsuite/btcd/btcutil/psbt v1.1.4 h1:Edx4AfBn+YPam2KP5AobDitulGp4r1Oibm8oruzkMdI=
|
||||
github.com/btcsuite/btcd/btcutil/psbt v1.1.4/go.mod h1:9AyU6EQVJ9Iw9zPyNT1lcdHd6cnEZdno5wLu5FY74os=
|
||||
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.0/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc=
|
||||
@@ -237,6 +238,8 @@ github.com/jgroeneveld/trial v2.0.0+incompatible/go.mod h1:I6INLW96EN8WysNBXUFI3
|
||||
github.com/jmoiron/sqlx v1.3.1 h1:aLN7YINNZ7cYOPK3QC83dbM6KT0NMqVMw961TqrejlE=
|
||||
github.com/jmoiron/sqlx v1.3.1/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ=
|
||||
github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9qUBdQ=
|
||||
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
|
||||
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
|
||||
github.com/jrick/logrotate v1.0.0 h1:lQ1bL/n9mBNeIXoTUoYRlK4dHuNJVofX9oWqBtPnSzI=
|
||||
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
|
||||
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
|
||||
@@ -305,6 +308,8 @@ github.com/lightningnetwork/lnd/tor v1.0.1 h1:A11FrpU0Y//g+fA827W4VnjOeoIvExONdc
|
||||
github.com/lightningnetwork/lnd/tor v1.0.1/go.mod h1:RDtaAdwfAm+ONuPYwUhNIH1RAvKPv+75lHPOegUcz64=
|
||||
github.com/ltcsuite/ltcd v0.0.0-20190101042124-f37f8bf35796 h1:sjOGyegMIhvgfq5oaue6Td+hxZuf3tDC8lAPrFldqFw=
|
||||
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
|
||||
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
|
||||
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
|
||||
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
|
||||
github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
|
||||
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
|
||||
@@ -335,8 +340,8 @@ github.com/moul/http2curl v1.0.0/go.mod h1:8UbvGypXm98wA/IqH45anm5Y2Z6ep6O31QGOA
|
||||
github.com/nats-io/nats.go v1.8.1/go.mod h1:BrFz9vVn0fU3AcH9Vn4Kd7W0NpJ651tD5omQ3M8LwxM=
|
||||
github.com/nats-io/nkeys v0.0.2/go.mod h1:dab7URMsZm6Z/jp9Z5UGa87Uutgc2mVpXLC4B7TDb/4=
|
||||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||
github.com/nbd-wtf/go-nostr v0.13.2 h1:w/TgXbkWqkZQsPRZffPZpvR/uskOSSUCGYhtW6I3xPI=
|
||||
github.com/nbd-wtf/go-nostr v0.13.2/go.mod h1:qFFTIxh15H5GGN0WsBI/P73DteqsevnhSEW/yk8nEf4=
|
||||
github.com/nbd-wtf/go-nostr v0.17.1 h1:dRyNNf1rx5vGYi9AH1A/mcKKQg8ZSiY8uRUJGObXPPI=
|
||||
github.com/nbd-wtf/go-nostr v0.17.1/go.mod h1:YCDHJtaFQE76d1ZkcUsTkz3dYNP+bldo5CIQwXPPcbk=
|
||||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
|
||||
github.com/nwaples/rardecode v1.1.2 h1:Cj0yZY6T1Zx1R7AhTbyGSALm44/Mmq+BAPc4B/p/d3M=
|
||||
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
|
||||
@@ -411,8 +416,6 @@ github.com/urfave/cli v1.22.3/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX
|
||||
github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4=
|
||||
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
|
||||
github.com/valyala/fasthttp v1.6.0/go.mod h1:FstJa9V+Pj9vQ7OJie2qMHdwemEDaDiSdBnvPM1Su9w=
|
||||
github.com/valyala/fastjson v1.6.4 h1:uAUNq9Z6ymTgGhcm0UynUAB6tlbakBrz6CQFax3BXVQ=
|
||||
github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY=
|
||||
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
|
||||
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
|
||||
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
|
||||
|
||||
44
handlers.go
44
handlers.go
@@ -38,10 +38,10 @@ var upgrader = websocket.Upgrader{
|
||||
CheckOrigin: func(r *http.Request) bool { return true },
|
||||
}
|
||||
|
||||
func (s *Server) handleWebsocket(w http.ResponseWriter, r *http.Request) {
|
||||
store := s.relay.Storage()
|
||||
func (s *Server) HandleWebsocket(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
store := s.relay.Storage(ctx)
|
||||
advancedDeleter, _ := store.(AdvancedDeleter)
|
||||
advancedQuerier, _ := store.(AdvancedQuerier)
|
||||
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
@@ -158,10 +158,10 @@ func (s *Server) handleWebsocket(w http.ResponseWriter, r *http.Request) {
|
||||
for _, tag := range evt.Tags {
|
||||
if len(tag) >= 2 && tag[0] == "e" {
|
||||
if advancedDeleter != nil {
|
||||
advancedDeleter.BeforeDelete(tag[1], evt.PubKey)
|
||||
advancedDeleter.BeforeDelete(ctx, tag[1], evt.PubKey)
|
||||
}
|
||||
|
||||
if err := store.DeleteEvent(tag[1], evt.PubKey); err != nil {
|
||||
if err := store.DeleteEvent(ctx, tag[1], evt.PubKey); err != nil {
|
||||
ws.WriteJSON([]interface{}{"OK", evt.ID, false, fmt.Sprintf("error: %s", err.Error())})
|
||||
return
|
||||
}
|
||||
@@ -174,7 +174,7 @@ func (s *Server) handleWebsocket(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
ok, message := AddEvent(s.relay, evt)
|
||||
ok, message := AddEvent(ctx, s.relay, evt)
|
||||
ws.WriteJSON([]interface{}{"OK", evt.ID, ok, message})
|
||||
|
||||
case "REQ":
|
||||
@@ -222,33 +222,27 @@ func (s *Server) handleWebsocket(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
if advancedQuerier != nil {
|
||||
advancedQuerier.BeforeQuery(filter)
|
||||
}
|
||||
|
||||
events, err := store.QueryEvents(filter)
|
||||
events, err := store.QueryEvents(ctx, filter)
|
||||
if err != nil {
|
||||
s.Log.Errorf("store: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if advancedQuerier != nil {
|
||||
advancedQuerier.AfterQuery(events, filter)
|
||||
// ensures the client won't be bombarded with events in case Storage doesn't do limits right
|
||||
if filter.Limit == 0 {
|
||||
filter.Limit = 9999999999
|
||||
}
|
||||
|
||||
// this block should not trigger if the SQL query accounts for filter.Limit
|
||||
// other implementations may be broken, and this ensures the client
|
||||
// won't be bombarded.
|
||||
if filter.Limit > 0 && len(events) > filter.Limit {
|
||||
events = events[0:filter.Limit]
|
||||
}
|
||||
|
||||
for _, event := range events {
|
||||
i := 0
|
||||
for event := range events {
|
||||
ws.WriteJSON([]interface{}{"EVENT", id, event})
|
||||
|
||||
i++
|
||||
if i > filter.Limit {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
// moved EOSE out of for loop.
|
||||
// otherwise subscriptions may be cancelled too early
|
||||
|
||||
ws.WriteJSON([]interface{}{"EOSE", id})
|
||||
setListener(id, ws, filters)
|
||||
case "CLOSE":
|
||||
@@ -305,7 +299,7 @@ func (s *Server) handleWebsocket(w http.ResponseWriter, r *http.Request) {
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *Server) handleNIP11(w http.ResponseWriter, r *http.Request) {
|
||||
func (s *Server) HandleNIP11(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
||||
supportedNIPs := []int{9, 11, 12, 15, 16, 20, 33}
|
||||
|
||||
23
interface.go
23
interface.go
@@ -17,16 +17,13 @@ type Relay interface {
|
||||
// to initialize its internal resources.
|
||||
// Also see [Storage.Init].
|
||||
Init() error
|
||||
// OnInitialized is called by [Server.Start] right before starting to serve HTTP requests.
|
||||
// It is passed the server to allow callers make final adjustments, such as custom routing.
|
||||
OnInitialized(*Server)
|
||||
// AcceptEvent is called for every nostr event received by the server.
|
||||
// If the returned value is true, the event is passed on to [Storage.SaveEvent].
|
||||
// Otherwise, the server responds with a negative and "blocked" message as described
|
||||
// in NIP-20.
|
||||
AcceptEvent(*nostr.Event) bool
|
||||
AcceptEvent(context.Context, *nostr.Event) bool
|
||||
// Storage returns the relay storage implementation.
|
||||
Storage() Storage
|
||||
Storage(context.Context) Storage
|
||||
}
|
||||
|
||||
// Auther is the interface for implementing NIP-42.
|
||||
@@ -73,27 +70,21 @@ type Storage interface {
|
||||
Init() error
|
||||
|
||||
// QueryEvents is invoked upon a client's REQ as described in NIP-01.
|
||||
QueryEvents(filter *nostr.Filter) (events []nostr.Event, err error)
|
||||
QueryEvents(ctx context.Context, filter *nostr.Filter) (chan *nostr.Event, error)
|
||||
// DeleteEvent is used to handle deletion events, as per NIP-09.
|
||||
DeleteEvent(id string, pubkey string) error
|
||||
DeleteEvent(ctx context.Context, id string, pubkey string) error
|
||||
// SaveEvent is called once Relay.AcceptEvent reports true.
|
||||
SaveEvent(event *nostr.Event) error
|
||||
}
|
||||
|
||||
// AdvancedQuerier methods are called before and after [Storage.QueryEvents].
|
||||
type AdvancedQuerier interface {
|
||||
BeforeQuery(*nostr.Filter)
|
||||
AfterQuery([]nostr.Event, *nostr.Filter)
|
||||
SaveEvent(ctx context.Context, event *nostr.Event) error
|
||||
}
|
||||
|
||||
// AdvancedDeleter methods are called before and after [Storage.DeleteEvent].
|
||||
type AdvancedDeleter interface {
|
||||
BeforeDelete(id string, pubkey string)
|
||||
BeforeDelete(ctx context.Context, id string, pubkey string)
|
||||
AfterDelete(id string, pubkey string)
|
||||
}
|
||||
|
||||
// AdvancedSaver methods are called before and after [Storage.SaveEvent].
|
||||
type AdvancedSaver interface {
|
||||
BeforeSave(*nostr.Event)
|
||||
BeforeSave(context.Context, *nostr.Event)
|
||||
AfterSave(*nostr.Event)
|
||||
}
|
||||
|
||||
145
start.go
145
start.go
@@ -4,41 +4,14 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/kelseyhightower/envconfig"
|
||||
"github.com/rs/cors"
|
||||
)
|
||||
|
||||
// Settings specify initial startup parameters for Start and StartConf.
|
||||
type Settings struct {
|
||||
Host string `envconfig:"HOST" default:"0.0.0.0"`
|
||||
Port string `envconfig:"PORT" default:"7447"`
|
||||
}
|
||||
|
||||
// Start calls StartConf with Settings parsed from the process environment.
|
||||
func Start(relay Relay) error {
|
||||
var s Settings
|
||||
if err := envconfig.Process("", &s); err != nil {
|
||||
return fmt.Errorf("envconfig: %w", err)
|
||||
}
|
||||
return StartConf(s, relay)
|
||||
}
|
||||
|
||||
// StartConf creates a new Server, passing it host:port for the address,
|
||||
// and starts serving propagating any error returned from [Server.Start].
|
||||
func StartConf(s Settings, relay Relay) error {
|
||||
addr := net.JoinHostPort(s.Host, s.Port)
|
||||
srv := NewServer(addr, relay)
|
||||
return srv.Start()
|
||||
}
|
||||
|
||||
// Server is a base for package users to implement nostr relays.
|
||||
// It can serve HTTP requests and websockets, passing control over to a relay implementation.
|
||||
//
|
||||
@@ -58,84 +31,33 @@ type Server struct {
|
||||
// outputting to stderr.
|
||||
Log Logger
|
||||
|
||||
addr string
|
||||
relay Relay
|
||||
router *mux.Router
|
||||
httpServer *http.Server // set at Server.Start
|
||||
addr string
|
||||
relay Relay
|
||||
|
||||
// keep a connection reference to all connected clients for Server.Shutdown
|
||||
clientsMu sync.Mutex
|
||||
clients map[*websocket.Conn]struct{}
|
||||
}
|
||||
|
||||
// NewServer creates a relay server with sensible defaults.
|
||||
// The provided address is used to listen and respond to HTTP requests.
|
||||
func NewServer(addr string, relay Relay) *Server {
|
||||
// NewServer initializes the relay and its storage using their respective Init methods,
|
||||
// returning any non-nil errors, and returns a Server ready to listen for HTTP requests.
|
||||
func NewServer(relay Relay) (*Server, error) {
|
||||
srv := &Server{
|
||||
Log: defaultLogger(relay.Name() + ": "),
|
||||
addr: addr,
|
||||
relay: relay,
|
||||
router: mux.NewRouter(),
|
||||
clients: make(map[*websocket.Conn]struct{}),
|
||||
}
|
||||
srv.router.Path("/").Headers("Upgrade", "websocket").HandlerFunc(srv.handleWebsocket)
|
||||
srv.router.Path("/").Headers("Accept", "application/nostr+json").HandlerFunc(srv.handleNIP11)
|
||||
return srv
|
||||
}
|
||||
|
||||
// Router returns an http.Handler used to handle server's in-flight HTTP requests.
|
||||
// By default, the router is setup to handle websocket upgrade and NIP-11 requests.
|
||||
//
|
||||
// In a larger system, where the relay server is not the only HTTP handler,
|
||||
// prefer using s as http.Handler instead of the returned router.
|
||||
func (s *Server) Router() *mux.Router {
|
||||
return s.router
|
||||
}
|
||||
|
||||
// Addr returns Server's HTTP listener address in host:port form.
|
||||
// If the initial port value provided in NewServer is 0, the actual port
|
||||
// value is picked at random and available by the time [Relay.OnInitialized]
|
||||
// is called.
|
||||
func (s *Server) Addr() string {
|
||||
return s.addr
|
||||
}
|
||||
|
||||
// ServeHTTP implements http.Handler interface.
|
||||
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
s.router.ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
// Start initializes the relay and its storage using their respective Init methods,
|
||||
// returning any non-nil errors, and starts listening for HTTP requests on the address
|
||||
// provided to NewServer.
|
||||
//
|
||||
// Just before starting to serve HTTP requests, Start calls Relay.OnInitialized
|
||||
// allowing package users to make last adjustments, such as setting up custom HTTP
|
||||
// handlers using s.Router.
|
||||
//
|
||||
// Start never returns until termination of the underlying http.Server, forwarding
|
||||
// any but http.ErrServerClosed error from the server's ListenAndServe.
|
||||
// To terminate the server, call Shutdown.
|
||||
func (s *Server) Start() error {
|
||||
ln, err := net.Listen("tcp", s.addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.addr = ln.Addr().String()
|
||||
return s.startListener(ln)
|
||||
}
|
||||
|
||||
func (s *Server) startListener(ln net.Listener) error {
|
||||
// init the relay
|
||||
if err := s.relay.Init(); err != nil {
|
||||
return fmt.Errorf("relay init: %w", err)
|
||||
if err := relay.Init(); err != nil {
|
||||
return nil, fmt.Errorf("relay init: %w", err)
|
||||
}
|
||||
if err := s.relay.Storage().Init(); err != nil {
|
||||
return fmt.Errorf("storage init: %w", err)
|
||||
if err := relay.Storage(context.Background()).Init(); err != nil {
|
||||
return nil, fmt.Errorf("storage init: %w", err)
|
||||
}
|
||||
|
||||
// push events from implementations, if any
|
||||
if inj, ok := s.relay.(Injector); ok {
|
||||
// start listening from events from other sources, if any
|
||||
if inj, ok := relay.(Injector); ok {
|
||||
go func() {
|
||||
for event := range inj.InjectEvents() {
|
||||
notifyListeners(&event)
|
||||
@@ -143,41 +65,24 @@ func (s *Server) startListener(ln net.Listener) error {
|
||||
}()
|
||||
}
|
||||
|
||||
s.httpServer = &http.Server{
|
||||
Handler: cors.Default().Handler(s),
|
||||
Addr: s.addr,
|
||||
WriteTimeout: 2 * time.Second,
|
||||
ReadTimeout: 2 * time.Second,
|
||||
IdleTimeout: 30 * time.Second,
|
||||
}
|
||||
s.httpServer.RegisterOnShutdown(s.disconnectAllClients)
|
||||
// final callback, just before serving http
|
||||
s.relay.OnInitialized(s)
|
||||
|
||||
// start accepting incoming requests
|
||||
s.Log.Infof("listening on %s", s.addr)
|
||||
err := s.httpServer.Serve(ln)
|
||||
if err == http.ErrServerClosed {
|
||||
err = nil
|
||||
}
|
||||
return err
|
||||
return srv, nil
|
||||
}
|
||||
|
||||
// Shutdown stops serving HTTP requests and send a websocket close control message
|
||||
// to all connected clients.
|
||||
// ServeHTTP implements http.Handler interface.
|
||||
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Header.Get("Upgrade") == "websocket" {
|
||||
s.HandleWebsocket(w, r)
|
||||
} else if r.Header.Get("Accept") == "application/nostr+json" {
|
||||
s.HandleNIP11(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown sends a websocket close control message to all connected clients.
|
||||
//
|
||||
// If the relay is ShutdownAware, Shutdown calls its OnShutdown, passing the context as is.
|
||||
// Note that the HTTP server make some time to shutdown and so the context deadline,
|
||||
// if any, may have been shortened by the time OnShutdown is called.
|
||||
func (s *Server) Shutdown(ctx context.Context) error {
|
||||
err := s.httpServer.Shutdown(ctx)
|
||||
if f, ok := s.relay.(ShutdownAware); ok {
|
||||
f.OnShutdown(ctx)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Server) disconnectAllClients() {
|
||||
func (s *Server) Shutdown(ctx context.Context) {
|
||||
s.clientsMu.Lock()
|
||||
defer s.clientsMu.Unlock()
|
||||
for conn := range s.clients {
|
||||
@@ -185,6 +90,10 @@ func (s *Server) disconnectAllClients() {
|
||||
conn.Close()
|
||||
delete(s.clients, conn)
|
||||
}
|
||||
|
||||
if f, ok := s.relay.(ShutdownAware); ok {
|
||||
f.OnShutdown(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
func defaultLogger(prefix string) Logger {
|
||||
|
||||
Reference in New Issue
Block a user