From f94199cfc0e9a290473c3e4fe99fb1867d7c9655 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Thu, 19 Sep 2024 10:04:54 -0300 Subject: [PATCH] negentropy: refactor for allowing different types of storage. --- nip77/negentropy/negentropy.go | 32 ++++++------------- nip77/negentropy/storage.go | 13 ++++++++ .../negentropy/{ => storage/vector}/vector.go | 31 +++++++++--------- nip77/negentropy/types.go | 15 ++------- nip77/negentropy/whatever_test.go | 24 ++++++++------ nip77/nip77.go | 7 ++-- 6 files changed, 59 insertions(+), 63 deletions(-) create mode 100644 nip77/negentropy/storage.go rename nip77/negentropy/{ => storage/vector}/vector.go (56%) diff --git a/nip77/negentropy/negentropy.go b/nip77/negentropy/negentropy.go index 5c7ee39..b433119 100644 --- a/nip77/negentropy/negentropy.go +++ b/nip77/negentropy/negentropy.go @@ -16,11 +16,11 @@ const ( buckets = 16 ) -var infiniteBound = Bound{Item: Item{Timestamp: maxTimestamp}} +var InfiniteBound = Bound{Item: Item{Timestamp: maxTimestamp}} type Negentropy struct { storage Storage - sealed bool + initialized bool frameSizeLimit int isClient bool lastTimestampIn nostr.Timestamp @@ -30,7 +30,7 @@ type Negentropy struct { HaveNots chan string } -func NewNegentropy(storage Storage, frameSizeLimit int) *Negentropy { +func New(storage Storage, frameSizeLimit int) *Negentropy { if frameSizeLimit == 0 { frameSizeLimit = math.MaxInt } else if frameSizeLimit < 4096 { @@ -46,8 +46,8 @@ func NewNegentropy(storage Storage, frameSizeLimit int) *Negentropy { } func (n *Negentropy) String() string { - label := "unsealed" - if n.sealed { + label := "uninitialized" + if n.initialized { label = "server" if n.isClient { label = "client" @@ -56,33 +56,19 @@ func (n *Negentropy) String() string { return fmt.Sprintf("", label, n.storage.Size()) } -func (n *Negentropy) Insert(evt *nostr.Event) { - err := n.storage.Insert(evt.CreatedAt, evt.ID) - if err != nil { - panic(err) - } -} - -func (n *Negentropy) seal() { - if !n.sealed { - n.storage.Seal() - } - n.sealed = true -} - func (n *Negentropy) Initiate() string { - n.seal() + n.initialized = true n.isClient = true output := NewStringHexWriter(make([]byte, 0, 1+n.storage.Size()*64)) output.WriteByte(protocolVersion) - n.SplitRange(0, n.storage.Size(), infiniteBound, output) + n.SplitRange(0, n.storage.Size(), InfiniteBound, output) return output.Hex() } func (n *Negentropy) Reconcile(msg string) (output string, err error) { - n.seal() + n.initialized = true reader := NewStringHexReader(msg) output, err = n.reconcileAux(reader) @@ -238,7 +224,7 @@ func (n *Negentropy) reconcileAux(reader *StringHexReader) (string, error) { if n.frameSizeLimit-200 < fullOutput.Len()+partialOutput.Len() { // frame size limit exceeded, handle by encoding a boundary and fingerprint for the remaining range remainingFingerprint := n.storage.Fingerprint(upper, n.storage.Size()) - n.writeBound(fullOutput, infiniteBound) + n.writeBound(fullOutput, InfiniteBound) fullOutput.WriteByte(byte(FingerprintMode)) fullOutput.WriteBytes(remainingFingerprint[:]) diff --git a/nip77/negentropy/storage.go b/nip77/negentropy/storage.go new file mode 100644 index 0000000..54fa73c --- /dev/null +++ b/nip77/negentropy/storage.go @@ -0,0 +1,13 @@ +package negentropy + +import ( + "iter" +) + +type Storage interface { + Size() int + Range(begin, end int) iter.Seq2[int, Item] + FindLowerBound(begin, end int, value Bound) int + GetBound(idx int) Bound + Fingerprint(begin, end int) [FingerprintSize]byte +} diff --git a/nip77/negentropy/vector.go b/nip77/negentropy/storage/vector/vector.go similarity index 56% rename from nip77/negentropy/vector.go rename to nip77/negentropy/storage/vector/vector.go index 854927d..80062db 100644 --- a/nip77/negentropy/vector.go +++ b/nip77/negentropy/storage/vector/vector.go @@ -1,4 +1,4 @@ -package negentropy +package vector import ( "encoding/hex" @@ -7,16 +7,17 @@ import ( "slices" "github.com/nbd-wtf/go-nostr" + "github.com/nbd-wtf/go-nostr/nip77/negentropy" ) type Vector struct { - items []Item + items []negentropy.Item sealed bool } -func NewVector() *Vector { +func New() *Vector { return &Vector{ - items: make([]Item, 0, 30), + items: make([]negentropy.Item, 0, 30), } } @@ -25,7 +26,7 @@ func (v *Vector) Insert(createdAt nostr.Timestamp, id string) error { return fmt.Errorf("bad id size for added item: expected %d, got %d", 32, len(id)/2) } - item := Item{createdAt, id} + item := negentropy.Item{Timestamp: createdAt, ID: id} v.items = append(v.items, item) return nil } @@ -37,18 +38,18 @@ func (v *Vector) Seal() { panic("trying to seal an already sealed vector") } v.sealed = true - slices.SortFunc(v.items, itemCompare) + slices.SortFunc(v.items, negentropy.ItemCompare) } -func (v *Vector) GetBound(idx int) Bound { +func (v *Vector) GetBound(idx int) negentropy.Bound { if idx < len(v.items) { - return Bound{v.items[idx]} + return negentropy.Bound{Item: v.items[idx]} } - return infiniteBound + return negentropy.InfiniteBound } -func (v *Vector) Range(begin, end int) iter.Seq2[int, Item] { - return func(yield func(int, Item) bool) { +func (v *Vector) Range(begin, end int) iter.Seq2[int, negentropy.Item] { + return func(yield func(int, negentropy.Item) bool) { for i := begin; i < end; i++ { if !yield(i, v.items[i]) { break @@ -57,13 +58,13 @@ func (v *Vector) Range(begin, end int) iter.Seq2[int, Item] { } } -func (v *Vector) FindLowerBound(begin, end int, bound Bound) int { - idx, _ := slices.BinarySearchFunc(v.items[begin:end], bound.Item, itemCompare) +func (v *Vector) FindLowerBound(begin, end int, bound negentropy.Bound) int { + idx, _ := slices.BinarySearchFunc(v.items[begin:end], bound.Item, negentropy.ItemCompare) return begin + idx } -func (v *Vector) Fingerprint(begin, end int) [FingerprintSize]byte { - var out Accumulator +func (v *Vector) Fingerprint(begin, end int) [negentropy.FingerprintSize]byte { + var out negentropy.Accumulator out.SetToZero() tmp := make([]byte, 32) diff --git a/nip77/negentropy/types.go b/nip77/negentropy/types.go index c6e581e..d644c53 100644 --- a/nip77/negentropy/types.go +++ b/nip77/negentropy/types.go @@ -5,7 +5,6 @@ import ( "crypto/sha256" "encoding/binary" "fmt" - "iter" "strings" "github.com/nbd-wtf/go-nostr" @@ -34,22 +33,12 @@ func (v Mode) String() string { } } -type Storage interface { - Insert(nostr.Timestamp, string) error - Seal() - Size() int - Range(begin, end int) iter.Seq2[int, Item] - FindLowerBound(begin, end int, value Bound) int - GetBound(idx int) Bound - Fingerprint(begin, end int) [FingerprintSize]byte -} - type Item struct { Timestamp nostr.Timestamp ID string } -func itemCompare(a, b Item) int { +func ItemCompare(a, b Item) int { if a.Timestamp == b.Timestamp { return strings.Compare(a.ID, b.ID) } @@ -61,7 +50,7 @@ func (i Item) String() string { return fmt.Sprintf("Item<%d:%s>", i.Timestamp, i type Bound struct{ Item } func (b Bound) String() string { - if b.Timestamp == infiniteBound.Timestamp { + if b.Timestamp == InfiniteBound.Timestamp { return "Bound" } return fmt.Sprintf("Bound<%d:%s>", b.Timestamp, b.ID) diff --git a/nip77/negentropy/whatever_test.go b/nip77/negentropy/whatever_test.go index bd44924..1da0ad4 100644 --- a/nip77/negentropy/whatever_test.go +++ b/nip77/negentropy/whatever_test.go @@ -1,4 +1,4 @@ -package negentropy +package negentropy_test import ( "fmt" @@ -7,6 +7,8 @@ import ( "testing" "github.com/nbd-wtf/go-nostr" + "github.com/nbd-wtf/go-nostr/nip77/negentropy" + "github.com/nbd-wtf/go-nostr/nip77/negentropy/storage/vector" "github.com/stretchr/testify/require" ) @@ -59,8 +61,8 @@ func runTestWith(t *testing.T, ) { var err error var q string - var n1 *Negentropy - var n2 *Negentropy + var n1 *negentropy.Negentropy + var n2 *negentropy.Negentropy events := make([]*nostr.Event, totalEvents) for i := range events { @@ -73,23 +75,27 @@ func runTestWith(t *testing.T, } { - n1 = NewNegentropy(NewVector(), 1<<16) + n1s := vector.New() + n1 = negentropy.New(n1s, 1<<16) for _, r := range n1Ranges { for i := r[0]; i < r[1]; i++ { - n1.Insert(events[i]) + n1s.Insert(events[i].CreatedAt, events[1].ID) } } + n1s.Seal() q = n1.Initiate() } { - n2 = NewNegentropy(NewVector(), 1<<16) + n2s := vector.New() + n2 = negentropy.New(n2s, 1<<16) for _, r := range n2Ranges { for i := r[0]; i < r[1]; i++ { - n2.Insert(events[i]) + n2s.Insert(events[i].CreatedAt, events[1].ID) } } + n2s.Seal() q, err = n2.Reconcile(q) if err != nil { @@ -98,7 +104,7 @@ func runTestWith(t *testing.T, } } - invert := map[*Negentropy]*Negentropy{ + invert := map[*negentropy.Negentropy]*negentropy.Negentropy{ n1: n2, n2: n1, } @@ -148,9 +154,7 @@ func runTestWith(t *testing.T, for n := n1; q != ""; n = invert[n] { i++ - fmt.Println("processing reconcile", n) q, err = n.Reconcile(q) - if err != nil { t.Fatalf("reconciliation failed: %s", err) } diff --git a/nip77/nip77.go b/nip77/nip77.go index bec454c..a40a931 100644 --- a/nip77/nip77.go +++ b/nip77/nip77.go @@ -9,6 +9,7 @@ import ( "github.com/greatroar/blobloom" "github.com/nbd-wtf/go-nostr" "github.com/nbd-wtf/go-nostr/nip77/negentropy" + "github.com/nbd-wtf/go-nostr/nip77/negentropy/storage/vector" ) func NegentropySync(ctx context.Context, store nostr.RelayStore, url string, filter nostr.Filter) error { @@ -19,10 +20,12 @@ func NegentropySync(ctx context.Context, store nostr.RelayStore, url string, fil return fmt.Errorf("failed to query our local store: %w", err) } - neg := negentropy.NewNegentropy(negentropy.NewVector(), 1024*1024) + vec := vector.New() + neg := negentropy.New(vec, 1024*1024) for _, evt := range data { - neg.Insert(evt) + vec.Insert(evt.CreatedAt, evt.ID) } + vec.Seal() result := make(chan error)