negentropy: refactor for allowing different types of storage.

This commit is contained in:
fiatjaf 2024-09-19 10:04:54 -03:00
parent 3d58ac5ec2
commit f94199cfc0
6 changed files with 59 additions and 63 deletions

View File

@ -16,11 +16,11 @@ const (
buckets = 16 buckets = 16
) )
var infiniteBound = Bound{Item: Item{Timestamp: maxTimestamp}} var InfiniteBound = Bound{Item: Item{Timestamp: maxTimestamp}}
type Negentropy struct { type Negentropy struct {
storage Storage storage Storage
sealed bool initialized bool
frameSizeLimit int frameSizeLimit int
isClient bool isClient bool
lastTimestampIn nostr.Timestamp lastTimestampIn nostr.Timestamp
@ -30,7 +30,7 @@ type Negentropy struct {
HaveNots chan string HaveNots chan string
} }
func NewNegentropy(storage Storage, frameSizeLimit int) *Negentropy { func New(storage Storage, frameSizeLimit int) *Negentropy {
if frameSizeLimit == 0 { if frameSizeLimit == 0 {
frameSizeLimit = math.MaxInt frameSizeLimit = math.MaxInt
} else if frameSizeLimit < 4096 { } else if frameSizeLimit < 4096 {
@ -46,8 +46,8 @@ func NewNegentropy(storage Storage, frameSizeLimit int) *Negentropy {
} }
func (n *Negentropy) String() string { func (n *Negentropy) String() string {
label := "unsealed" label := "uninitialized"
if n.sealed { if n.initialized {
label = "server" label = "server"
if n.isClient { if n.isClient {
label = "client" label = "client"
@ -56,33 +56,19 @@ func (n *Negentropy) String() string {
return fmt.Sprintf("<Negentropy %s with %d items>", label, n.storage.Size()) return fmt.Sprintf("<Negentropy %s with %d items>", label, n.storage.Size())
} }
func (n *Negentropy) Insert(evt *nostr.Event) {
err := n.storage.Insert(evt.CreatedAt, evt.ID)
if err != nil {
panic(err)
}
}
func (n *Negentropy) seal() {
if !n.sealed {
n.storage.Seal()
}
n.sealed = true
}
func (n *Negentropy) Initiate() string { func (n *Negentropy) Initiate() string {
n.seal() n.initialized = true
n.isClient = true n.isClient = true
output := NewStringHexWriter(make([]byte, 0, 1+n.storage.Size()*64)) output := NewStringHexWriter(make([]byte, 0, 1+n.storage.Size()*64))
output.WriteByte(protocolVersion) output.WriteByte(protocolVersion)
n.SplitRange(0, n.storage.Size(), infiniteBound, output) n.SplitRange(0, n.storage.Size(), InfiniteBound, output)
return output.Hex() return output.Hex()
} }
func (n *Negentropy) Reconcile(msg string) (output string, err error) { func (n *Negentropy) Reconcile(msg string) (output string, err error) {
n.seal() n.initialized = true
reader := NewStringHexReader(msg) reader := NewStringHexReader(msg)
output, err = n.reconcileAux(reader) output, err = n.reconcileAux(reader)
@ -238,7 +224,7 @@ func (n *Negentropy) reconcileAux(reader *StringHexReader) (string, error) {
if n.frameSizeLimit-200 < fullOutput.Len()+partialOutput.Len() { if n.frameSizeLimit-200 < fullOutput.Len()+partialOutput.Len() {
// frame size limit exceeded, handle by encoding a boundary and fingerprint for the remaining range // frame size limit exceeded, handle by encoding a boundary and fingerprint for the remaining range
remainingFingerprint := n.storage.Fingerprint(upper, n.storage.Size()) remainingFingerprint := n.storage.Fingerprint(upper, n.storage.Size())
n.writeBound(fullOutput, infiniteBound) n.writeBound(fullOutput, InfiniteBound)
fullOutput.WriteByte(byte(FingerprintMode)) fullOutput.WriteByte(byte(FingerprintMode))
fullOutput.WriteBytes(remainingFingerprint[:]) fullOutput.WriteBytes(remainingFingerprint[:])

View File

@ -0,0 +1,13 @@
package negentropy
import (
"iter"
)
type Storage interface {
Size() int
Range(begin, end int) iter.Seq2[int, Item]
FindLowerBound(begin, end int, value Bound) int
GetBound(idx int) Bound
Fingerprint(begin, end int) [FingerprintSize]byte
}

View File

@ -1,4 +1,4 @@
package negentropy package vector
import ( import (
"encoding/hex" "encoding/hex"
@ -7,16 +7,17 @@ import (
"slices" "slices"
"github.com/nbd-wtf/go-nostr" "github.com/nbd-wtf/go-nostr"
"github.com/nbd-wtf/go-nostr/nip77/negentropy"
) )
type Vector struct { type Vector struct {
items []Item items []negentropy.Item
sealed bool sealed bool
} }
func NewVector() *Vector { func New() *Vector {
return &Vector{ return &Vector{
items: make([]Item, 0, 30), items: make([]negentropy.Item, 0, 30),
} }
} }
@ -25,7 +26,7 @@ func (v *Vector) Insert(createdAt nostr.Timestamp, id string) error {
return fmt.Errorf("bad id size for added item: expected %d, got %d", 32, len(id)/2) return fmt.Errorf("bad id size for added item: expected %d, got %d", 32, len(id)/2)
} }
item := Item{createdAt, id} item := negentropy.Item{Timestamp: createdAt, ID: id}
v.items = append(v.items, item) v.items = append(v.items, item)
return nil return nil
} }
@ -37,18 +38,18 @@ func (v *Vector) Seal() {
panic("trying to seal an already sealed vector") panic("trying to seal an already sealed vector")
} }
v.sealed = true v.sealed = true
slices.SortFunc(v.items, itemCompare) slices.SortFunc(v.items, negentropy.ItemCompare)
} }
func (v *Vector) GetBound(idx int) Bound { func (v *Vector) GetBound(idx int) negentropy.Bound {
if idx < len(v.items) { if idx < len(v.items) {
return Bound{v.items[idx]} return negentropy.Bound{Item: v.items[idx]}
} }
return infiniteBound return negentropy.InfiniteBound
} }
func (v *Vector) Range(begin, end int) iter.Seq2[int, Item] { func (v *Vector) Range(begin, end int) iter.Seq2[int, negentropy.Item] {
return func(yield func(int, Item) bool) { return func(yield func(int, negentropy.Item) bool) {
for i := begin; i < end; i++ { for i := begin; i < end; i++ {
if !yield(i, v.items[i]) { if !yield(i, v.items[i]) {
break break
@ -57,13 +58,13 @@ func (v *Vector) Range(begin, end int) iter.Seq2[int, Item] {
} }
} }
func (v *Vector) FindLowerBound(begin, end int, bound Bound) int { func (v *Vector) FindLowerBound(begin, end int, bound negentropy.Bound) int {
idx, _ := slices.BinarySearchFunc(v.items[begin:end], bound.Item, itemCompare) idx, _ := slices.BinarySearchFunc(v.items[begin:end], bound.Item, negentropy.ItemCompare)
return begin + idx return begin + idx
} }
func (v *Vector) Fingerprint(begin, end int) [FingerprintSize]byte { func (v *Vector) Fingerprint(begin, end int) [negentropy.FingerprintSize]byte {
var out Accumulator var out negentropy.Accumulator
out.SetToZero() out.SetToZero()
tmp := make([]byte, 32) tmp := make([]byte, 32)

View File

@ -5,7 +5,6 @@ import (
"crypto/sha256" "crypto/sha256"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"iter"
"strings" "strings"
"github.com/nbd-wtf/go-nostr" "github.com/nbd-wtf/go-nostr"
@ -34,22 +33,12 @@ func (v Mode) String() string {
} }
} }
type Storage interface {
Insert(nostr.Timestamp, string) error
Seal()
Size() int
Range(begin, end int) iter.Seq2[int, Item]
FindLowerBound(begin, end int, value Bound) int
GetBound(idx int) Bound
Fingerprint(begin, end int) [FingerprintSize]byte
}
type Item struct { type Item struct {
Timestamp nostr.Timestamp Timestamp nostr.Timestamp
ID string ID string
} }
func itemCompare(a, b Item) int { func ItemCompare(a, b Item) int {
if a.Timestamp == b.Timestamp { if a.Timestamp == b.Timestamp {
return strings.Compare(a.ID, b.ID) return strings.Compare(a.ID, b.ID)
} }
@ -61,7 +50,7 @@ func (i Item) String() string { return fmt.Sprintf("Item<%d:%s>", i.Timestamp, i
type Bound struct{ Item } type Bound struct{ Item }
func (b Bound) String() string { func (b Bound) String() string {
if b.Timestamp == infiniteBound.Timestamp { if b.Timestamp == InfiniteBound.Timestamp {
return "Bound<infinite>" return "Bound<infinite>"
} }
return fmt.Sprintf("Bound<%d:%s>", b.Timestamp, b.ID) return fmt.Sprintf("Bound<%d:%s>", b.Timestamp, b.ID)

View File

@ -1,4 +1,4 @@
package negentropy package negentropy_test
import ( import (
"fmt" "fmt"
@ -7,6 +7,8 @@ import (
"testing" "testing"
"github.com/nbd-wtf/go-nostr" "github.com/nbd-wtf/go-nostr"
"github.com/nbd-wtf/go-nostr/nip77/negentropy"
"github.com/nbd-wtf/go-nostr/nip77/negentropy/storage/vector"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -59,8 +61,8 @@ func runTestWith(t *testing.T,
) { ) {
var err error var err error
var q string var q string
var n1 *Negentropy var n1 *negentropy.Negentropy
var n2 *Negentropy var n2 *negentropy.Negentropy
events := make([]*nostr.Event, totalEvents) events := make([]*nostr.Event, totalEvents)
for i := range events { for i := range events {
@ -73,23 +75,27 @@ func runTestWith(t *testing.T,
} }
{ {
n1 = NewNegentropy(NewVector(), 1<<16) n1s := vector.New()
n1 = negentropy.New(n1s, 1<<16)
for _, r := range n1Ranges { for _, r := range n1Ranges {
for i := r[0]; i < r[1]; i++ { for i := r[0]; i < r[1]; i++ {
n1.Insert(events[i]) n1s.Insert(events[i].CreatedAt, events[1].ID)
} }
} }
n1s.Seal()
q = n1.Initiate() q = n1.Initiate()
} }
{ {
n2 = NewNegentropy(NewVector(), 1<<16) n2s := vector.New()
n2 = negentropy.New(n2s, 1<<16)
for _, r := range n2Ranges { for _, r := range n2Ranges {
for i := r[0]; i < r[1]; i++ { for i := r[0]; i < r[1]; i++ {
n2.Insert(events[i]) n2s.Insert(events[i].CreatedAt, events[1].ID)
} }
} }
n2s.Seal()
q, err = n2.Reconcile(q) q, err = n2.Reconcile(q)
if err != nil { if err != nil {
@ -98,7 +104,7 @@ func runTestWith(t *testing.T,
} }
} }
invert := map[*Negentropy]*Negentropy{ invert := map[*negentropy.Negentropy]*negentropy.Negentropy{
n1: n2, n1: n2,
n2: n1, n2: n1,
} }
@ -148,9 +154,7 @@ func runTestWith(t *testing.T,
for n := n1; q != ""; n = invert[n] { for n := n1; q != ""; n = invert[n] {
i++ i++
fmt.Println("processing reconcile", n)
q, err = n.Reconcile(q) q, err = n.Reconcile(q)
if err != nil { if err != nil {
t.Fatalf("reconciliation failed: %s", err) t.Fatalf("reconciliation failed: %s", err)
} }

View File

@ -9,6 +9,7 @@ import (
"github.com/greatroar/blobloom" "github.com/greatroar/blobloom"
"github.com/nbd-wtf/go-nostr" "github.com/nbd-wtf/go-nostr"
"github.com/nbd-wtf/go-nostr/nip77/negentropy" "github.com/nbd-wtf/go-nostr/nip77/negentropy"
"github.com/nbd-wtf/go-nostr/nip77/negentropy/storage/vector"
) )
func NegentropySync(ctx context.Context, store nostr.RelayStore, url string, filter nostr.Filter) error { func NegentropySync(ctx context.Context, store nostr.RelayStore, url string, filter nostr.Filter) error {
@ -19,10 +20,12 @@ func NegentropySync(ctx context.Context, store nostr.RelayStore, url string, fil
return fmt.Errorf("failed to query our local store: %w", err) return fmt.Errorf("failed to query our local store: %w", err)
} }
neg := negentropy.NewNegentropy(negentropy.NewVector(), 1024*1024) vec := vector.New()
neg := negentropy.New(vec, 1024*1024)
for _, evt := range data { for _, evt := range data {
neg.Insert(evt) vec.Insert(evt.CreatedAt, evt.ID)
} }
vec.Seal()
result := make(chan error) result := make(chan error)