mirror of
https://github.com/fiatjaf/khatru.git
synced 2025-03-17 21:32:55 +01:00
blossom: store as a standalone interface (and an eventstore wrapper).
This commit is contained in:
parent
91e7737ec1
commit
a893dc2d2c
@ -1,11 +1,26 @@
|
||||
package blossom
|
||||
|
||||
import "github.com/nbd-wtf/go-nostr"
|
||||
import (
|
||||
"context"
|
||||
|
||||
type Blob struct {
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
)
|
||||
|
||||
type BlobDescriptor struct {
|
||||
URL string `json:"url"`
|
||||
SHA256 string `json:"sha256"`
|
||||
Size int `json:"size"`
|
||||
Type string `json:"type"`
|
||||
Uploaded nostr.Timestamp `json:"uploaded"`
|
||||
|
||||
Owner string
|
||||
}
|
||||
|
||||
type BlobIndex interface {
|
||||
Keep(ctx context.Context, blob BlobDescriptor, pubkey string) error
|
||||
List(ctx context.Context, pubkey string) (chan BlobDescriptor, error)
|
||||
Get(ctx context.Context, sha256 string) (*BlobDescriptor, error)
|
||||
Delete(ctx context.Context, sha256 string, pubkey string) error
|
||||
}
|
||||
|
||||
var _ BlobIndex = (*EventStoreBlobIndexWrapper)(nil)
|
||||
|
108
blossom/eventstorewrapper.go
Normal file
108
blossom/eventstorewrapper.go
Normal file
@ -0,0 +1,108 @@
|
||||
package blossom
|
||||
|
||||
import (
|
||||
"context"
|
||||
"mime"
|
||||
"strconv"
|
||||
|
||||
"github.com/fiatjaf/eventstore"
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
)
|
||||
|
||||
// EventStoreBlobIndexWrapper uses fake events to keep track of what blobs we have stored and who owns them
|
||||
type EventStoreBlobIndexWrapper struct {
|
||||
eventstore.Store
|
||||
|
||||
ServiceURL string
|
||||
}
|
||||
|
||||
func (es EventStoreBlobIndexWrapper) Keep(ctx context.Context, blob BlobDescriptor, pubkey string) error {
|
||||
ch, err := es.Store.QueryEvents(ctx, nostr.Filter{Authors: []string{pubkey}, Kinds: []int{24242}, Tags: nostr.TagMap{"x": []string{blob.SHA256}}})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if <-ch == nil {
|
||||
// doesn't exist, save
|
||||
evt := &nostr.Event{
|
||||
PubKey: pubkey,
|
||||
Kind: 24242,
|
||||
Tags: nostr.Tags{
|
||||
{"x", blob.SHA256},
|
||||
{"type", blob.Type},
|
||||
{"size", strconv.Itoa(blob.Size)},
|
||||
},
|
||||
CreatedAt: blob.Uploaded,
|
||||
}
|
||||
evt.ID = evt.GetID()
|
||||
es.Store.SaveEvent(ctx, evt)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (es EventStoreBlobIndexWrapper) List(ctx context.Context, pubkey string) (chan BlobDescriptor, error) {
|
||||
ech, err := es.Store.QueryEvents(ctx, nostr.Filter{Authors: []string{pubkey}, Kinds: []int{24242}})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ch := make(chan BlobDescriptor)
|
||||
|
||||
go func() {
|
||||
for evt := range ech {
|
||||
ch <- es.parseEvent(evt)
|
||||
}
|
||||
}()
|
||||
|
||||
return ch, nil
|
||||
}
|
||||
|
||||
func (es EventStoreBlobIndexWrapper) Get(ctx context.Context, sha256 string) (*BlobDescriptor, error) {
|
||||
ech, err := es.Store.QueryEvents(ctx, nostr.Filter{Tags: nostr.TagMap{"x": []string{sha256}}, Kinds: []int{24242}, Limit: 1})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
evt := <-ech
|
||||
if evt != nil {
|
||||
bd := es.parseEvent(evt)
|
||||
return &bd, nil
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (es EventStoreBlobIndexWrapper) Delete(ctx context.Context, sha256 string, pubkey string) error {
|
||||
ech, err := es.Store.QueryEvents(ctx, nostr.Filter{Authors: []string{pubkey}, Tags: nostr.TagMap{"x": []string{sha256}}, Kinds: []int{24242}, Limit: 1})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
evt := <-ech
|
||||
if evt != nil {
|
||||
return es.Store.DeleteEvent(ctx, evt)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (es EventStoreBlobIndexWrapper) parseEvent(evt *nostr.Event) BlobDescriptor {
|
||||
hhash := evt.Tags[0][1]
|
||||
mimetype := evt.Tags[1][1]
|
||||
exts, _ := mime.ExtensionsByType(mimetype)
|
||||
var ext string
|
||||
if exts != nil {
|
||||
ext = exts[0]
|
||||
}
|
||||
size, _ := strconv.Atoi(evt.Tags[2][1])
|
||||
|
||||
return BlobDescriptor{
|
||||
Owner: evt.PubKey,
|
||||
Uploaded: evt.CreatedAt,
|
||||
URL: es.ServiceURL + "/" + hhash + ext,
|
||||
SHA256: hhash,
|
||||
Type: mimetype,
|
||||
Size: size,
|
||||
}
|
||||
}
|
@ -71,15 +71,23 @@ func (bs BlossomServer) handleUpload(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// store the authorization event, it serves as a way to keep track of the uploaded blobs
|
||||
if err := bs.Store.SaveEvent(r.Context(), auth); err != nil {
|
||||
hash := sha256.Sum256(b)
|
||||
hhash := hex.EncodeToString(hash[:])
|
||||
|
||||
// keep track of the blob descriptor
|
||||
bd := BlobDescriptor{
|
||||
URL: bs.ServiceURL + "/" + hhash + ft.Extension,
|
||||
SHA256: hhash,
|
||||
Size: len(b),
|
||||
Type: mime.TypeByExtension(ft.Extension),
|
||||
Uploaded: nostr.Now(),
|
||||
}
|
||||
if err := bs.Store.Keep(r.Context(), bd, auth.PubKey); err != nil {
|
||||
http.Error(w, "failed to save event: "+err.Error(), 400)
|
||||
return
|
||||
}
|
||||
|
||||
// save blob
|
||||
hash := sha256.Sum256(b)
|
||||
hhash := hex.EncodeToString(hash[:])
|
||||
for _, sb := range bs.StoreBlob {
|
||||
if err := sb(r.Context(), hhash, b); err != nil {
|
||||
http.Error(w, "failed to save: "+err.Error(), 500)
|
||||
@ -88,13 +96,7 @@ func (bs BlossomServer) handleUpload(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
// return response
|
||||
json.NewEncoder(w).Encode(Blob{
|
||||
URL: bs.ServiceURL + "/" + hhash + ft.Extension,
|
||||
SHA256: hhash,
|
||||
Size: len(b),
|
||||
Type: mime.TypeByExtension(ft.Extension),
|
||||
Uploaded: nostr.Now(),
|
||||
})
|
||||
json.NewEncoder(w).Encode(bd)
|
||||
}
|
||||
|
||||
func (bs BlossomServer) handleGetBlob(w http.ResponseWriter, r *http.Request) {
|
||||
@ -162,13 +164,13 @@ func (bs BlossomServer) handleHasBlob(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
hhash = hhash[1:]
|
||||
|
||||
ch, err := bs.Store.QueryEvents(r.Context(), nostr.Filter{Tags: nostr.TagMap{"x": []string{hhash}}})
|
||||
bd, err := bs.Store.Get(r.Context(), hhash)
|
||||
if err != nil {
|
||||
http.Error(w, "failed to query: "+err.Error(), 500)
|
||||
return
|
||||
}
|
||||
|
||||
if <-ch == nil {
|
||||
if bd == nil {
|
||||
http.Error(w, "file not found", 404)
|
||||
return
|
||||
}
|
||||
@ -202,7 +204,7 @@ func (bs BlossomServer) handleList(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
ch, err := bs.Store.QueryEvents(r.Context(), nostr.Filter{Authors: []string{pubkey}})
|
||||
ch, err := bs.Store.List(r.Context(), pubkey)
|
||||
if err != nil {
|
||||
http.Error(w, "failed to query: "+err.Error(), 500)
|
||||
return
|
||||
@ -210,18 +212,56 @@ func (bs BlossomServer) handleList(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
w.Write([]byte{'['})
|
||||
enc := json.NewEncoder(w)
|
||||
for evt := range ch {
|
||||
hhashTag := evt.Tags.GetFirst([]string{"x", ""})
|
||||
enc.Encode(Blob{
|
||||
URL: bs.ServiceURL + "/" + (*hhashTag)[1],
|
||||
SHA256: (*hhashTag)[1],
|
||||
Uploaded: evt.CreatedAt,
|
||||
})
|
||||
for bd := range ch {
|
||||
enc.Encode(bd)
|
||||
}
|
||||
w.Write([]byte{']'})
|
||||
}
|
||||
|
||||
func (bs BlossomServer) handleDelete(w http.ResponseWriter, r *http.Request) {
|
||||
auth, err := readAuthorization(r)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), 400)
|
||||
return
|
||||
}
|
||||
|
||||
if auth.Tags.GetFirst([]string{"t", "delete"}) == nil {
|
||||
http.Error(w, "invalid Authorization event \"t\" tag", 403)
|
||||
return
|
||||
}
|
||||
|
||||
spl := strings.SplitN(r.URL.Path, ".", 2)
|
||||
hhash := spl[0]
|
||||
if len(hhash) != 65 {
|
||||
http.Error(w, "invalid /<sha256>[.ext] path", 400)
|
||||
return
|
||||
}
|
||||
hhash = hhash[1:]
|
||||
if auth.Tags.GetFirst([]string{"x", hhash}) == nil &&
|
||||
auth.Tags.GetFirst([]string{"server", bs.ServiceURL}) == nil {
|
||||
http.Error(w, "invalid Authorization event \"x\" or \"server\" tag", 403)
|
||||
return
|
||||
}
|
||||
|
||||
for _, rd := range bs.RejectDelete {
|
||||
reject, reason := rd(r.Context(), auth, hhash)
|
||||
if reject {
|
||||
http.Error(w, reason, 401)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
for _, del := range bs.DeleteBlob {
|
||||
if err := del(r.Context(), hhash); err != nil {
|
||||
http.Error(w, "failed to delete blob: "+err.Error(), 500)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if err := bs.Store.Delete(r.Context(), hhash, auth.PubKey); err != nil {
|
||||
http.Error(w, "delete of blob entry failed: "+err.Error(), 500)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (bs BlossomServer) handleMirror(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -5,14 +5,13 @@ import (
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/fiatjaf/eventstore"
|
||||
"github.com/fiatjaf/khatru"
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
)
|
||||
|
||||
type BlossomServer struct {
|
||||
ServiceURL string
|
||||
Store eventstore.Store
|
||||
Store BlobIndex
|
||||
|
||||
StoreBlob []func(ctx context.Context, sha256 string, body []byte) error
|
||||
LoadBlob []func(ctx context.Context, sha256 string) ([]byte, error)
|
||||
@ -21,12 +20,12 @@ type BlossomServer struct {
|
||||
RejectUpload []func(ctx context.Context, auth *nostr.Event, ext string) (bool, string)
|
||||
RejectGet []func(ctx context.Context, auth *nostr.Event, sha256 string) (bool, string)
|
||||
RejectList []func(ctx context.Context, auth *nostr.Event, pubkey string) (bool, string)
|
||||
RejectDelete []func(ctx context.Context, auth *nostr.Event, sha256 string) (bool, string)
|
||||
}
|
||||
|
||||
func New(rl *khatru.Relay, serviceURL string, store eventstore.Store) *BlossomServer {
|
||||
func New(rl *khatru.Relay, serviceURL string) *BlossomServer {
|
||||
bs := &BlossomServer{
|
||||
ServiceURL: serviceURL,
|
||||
Store: store,
|
||||
}
|
||||
|
||||
base := rl.Router()
|
||||
|
@ -23,7 +23,8 @@ func main() {
|
||||
relay.CountEvents = append(relay.CountEvents, db.CountEvents)
|
||||
relay.DeleteEvent = append(relay.DeleteEvent, db.DeleteEvent)
|
||||
|
||||
bl := blossom.New(relay, "http://localhost:3334", db)
|
||||
bl := blossom.New(relay, "http://localhost:3334")
|
||||
bl.Store = blossom.EventStoreBlobIndexWrapper{Store: db, ServiceURL: bl.ServiceURL}
|
||||
bl.StoreBlob = append(bl.StoreBlob, func(ctx context.Context, sha256 string, body []byte) error {
|
||||
fmt.Println("storing", sha256, len(body))
|
||||
return nil
|
||||
|
Loading…
x
Reference in New Issue
Block a user