diff --git a/negentropy/base.go b/negentropy/base.go deleted file mode 100644 index f409d1e..0000000 --- a/negentropy/base.go +++ /dev/null @@ -1,13 +0,0 @@ -package negentropy - -// Storage defines an interface for storage operations, similar to the abstract class in C++. -type Storage interface { - Insert(uint64, string) error - Seal() error - - Size() int - GetItem(i uint64) (Item, error) - Iterate(begin, end int, cb func(item Item, i int) bool) error - FindLowerBound(begin, end int, value Bound) (int, error) - Fingerprint(begin, end int) (Fingerprint, error) -} diff --git a/negentropy/negentropy.go b/negentropy/negentropy.go index 5c96cb9..52b15d8 100644 --- a/negentropy/negentropy.go +++ b/negentropy/negentropy.go @@ -3,51 +3,64 @@ package negentropy import ( "bytes" "encoding/hex" - "errors" "fmt" + "math" "os" + + "github.com/nbd-wtf/go-nostr" ) const ( - ProtocolVersion byte = 0x61 // Version 1 - MaxU64 uint64 = ^uint64(0) - FrameSizeMinLimit uint64 = 4096 + protocolVersion byte = 0x61 // version 1 + maxTimestamp = nostr.Timestamp(math.MaxInt64) ) type Negentropy struct { - Storage + storage Storage frameSizeLimit uint64 + idSize int // in bytes IsInitiator bool - lastTimestampIn uint64 - lastTimestampOut uint64 + lastTimestampIn nostr.Timestamp + lastTimestampOut nostr.Timestamp } -func NewNegentropy(storage Storage, frameSizeLimit uint64) (*Negentropy, error) { +func NewNegentropy(storage Storage, frameSizeLimit uint64, IDSize int) (*Negentropy, error) { if frameSizeLimit != 0 && frameSizeLimit < 4096 { - return nil, errors.New("frameSizeLimit too small") + return nil, fmt.Errorf("frameSizeLimit too small") + } + if IDSize > 32 { + return nil, fmt.Errorf("id size cannot be more than 32, got %d", IDSize) } return &Negentropy{ - Storage: storage, + storage: storage, frameSizeLimit: frameSizeLimit, + idSize: IDSize, }, nil } +func (n *Negentropy) Insert(evt *nostr.Event) { + err := n.storage.Insert(evt.CreatedAt, evt.ID[0:n.idSize*2]) + if err != nil { + panic(err) + } +} + func (n *Negentropy) Initiate() ([]byte, error) { if n.IsInitiator { - return []byte{}, errors.New("already initiated") + return []byte{}, fmt.Errorf("already initiated") } n.IsInitiator = true - output := make([]byte, 1, 1+n.Storage.Size()*IDSize) - output[0] = ProtocolVersion - n.SplitRange(0, n.Storage.Size(), Bound{Item: Item{Timestamp: MaxU64}}, &output) + output := make([]byte, 1, 1+n.storage.Size()*n.idSize) + output[0] = protocolVersion + n.SplitRange(0, n.storage.Size(), Bound{Item: Item{Timestamp: maxTimestamp}}, &output) return output, nil } func (n *Negentropy) Reconcile(query []byte) ([]byte, error) { if n.IsInitiator { - return []byte{}, errors.New("initiator not asking for have/need IDs") + return []byte{}, fmt.Errorf("initiator not asking for have/need IDs") } var haveIds, needIds []string @@ -66,7 +79,7 @@ func (n *Negentropy) Reconcile(query []byte) ([]byte, error) { // ReconcileWithIDs when IDs are expected to be returned. func (n *Negentropy) ReconcileWithIDs(query []byte, haveIds, needIds *[]string) ([]byte, error) { if !n.IsInitiator { - return nil, errors.New("non-initiator asking for have/need IDs") + return nil, fmt.Errorf("non-initiator asking for have/need IDs") } output, err := n.ReconcileAux(query, haveIds, needIds) @@ -85,28 +98,28 @@ func (n *Negentropy) ReconcileAux(query []byte, haveIds, needIds *[]string) ([]b n.lastTimestampIn, n.lastTimestampOut = 0, 0 // Reset for each message var fullOutput []byte - fullOutput = append(fullOutput, ProtocolVersion) + fullOutput = append(fullOutput, protocolVersion) protocolVersion, err := getByte(&query) if err != nil { return nil, err } if protocolVersion < 0x60 || protocolVersion > 0x6F { - return nil, errors.New("invalid negentropy protocol version byte") + return nil, fmt.Errorf("invalid negentropy protocol version byte") } - if protocolVersion != ProtocolVersion { + if protocolVersion != protocolVersion { if n.IsInitiator { - return nil, errors.New("unsupported negentropy protocol version requested") + return nil, fmt.Errorf("unsupported negentropy protocol version requested") } return fullOutput, nil } - storageSize := n.Storage.Size() + storageSize := n.storage.Size() var prevBound Bound prevIndex := 0 skip := false - // Convert the loop to process the query until it's consumed + // convert the loop to process the query until it's consumed for len(query) > 0 { var o []byte @@ -133,7 +146,7 @@ func (n *Negentropy) ReconcileAux(query []byte, haveIds, needIds *[]string) ([]b mode := Mode(modeVal) lower := prevIndex - upper, err := n.Storage.FindLowerBound(prevIndex, storageSize, currBound) + upper, err := n.storage.FindLowerBound(prevIndex, storageSize, currBound) if err != nil { return nil, err } @@ -147,7 +160,7 @@ func (n *Negentropy) ReconcileAux(query []byte, haveIds, needIds *[]string) ([]b if err != nil { return nil, err } - ourFingerprint, err := n.Storage.Fingerprint(lower, upper) + ourFingerprint, err := n.storage.Fingerprint(lower, upper) if err != nil { return nil, err // Handle the error appropriately } @@ -160,21 +173,22 @@ func (n *Negentropy) ReconcileAux(query []byte, haveIds, needIds *[]string) ([]b } case IdListMode: - numIds, err := decodeVarInt(&query) + numIds64, err := decodeVarInt(&query) if err != nil { return nil, err } + numIds := int(numIds64) theirElems := make(map[string]struct{}) for i := 0; i < numIds; i++ { - e, err := getBytes(&query, IDSize) + e, err := getBytes(&query, n.idSize) if err != nil { return nil, err } theirElems[hex.EncodeToString(e)] = struct{}{} } - n.Storage.Iterate(lower, upper, func(item Item, _ int) bool { + n.storage.Iterate(lower, upper, func(item Item, _ int) bool { k := item.ID if _, exists := theirElems[k]; !exists { if n.IsInitiator { @@ -195,14 +209,14 @@ func (n *Negentropy) ReconcileAux(query []byte, haveIds, needIds *[]string) ([]b } else { doSkip() - responseIds := make([]byte, 0, IDSize*n.Storage.Size()) + responseIds := make([]byte, 0, n.idSize*n.storage.Size()) responseIdsPtr := &responseIds numResponseIds := 0 endBound := currBound - n.Storage.Iterate(lower, upper, func(item Item, index int) bool { + n.storage.Iterate(lower, upper, func(item Item, index int) bool { if n.ExceededFrameSizeLimit(len(fullOutput) + len(*responseIdsPtr)) { - endBound = *NewBoundWithItem(item) + endBound = Bound{item} upper = index return false } @@ -229,18 +243,18 @@ func (n *Negentropy) ReconcileAux(query []byte, haveIds, needIds *[]string) ([]b } default: - return nil, errors.New("unexpected mode") + return nil, fmt.Errorf("unexpected mode %d", mode) } // Check if the frame size limit is exceeded if n.ExceededFrameSizeLimit(len(fullOutput) + len(o)) { // Frame size limit exceeded, handle by encoding a boundary and fingerprint for the remaining range - remainingFingerprint, err := n.Storage.Fingerprint(upper, storageSize) + remainingFingerprint, err := n.storage.Fingerprint(upper, storageSize) if err != nil { panic(err) } - encodedBound, err := n.encodeBound(Bound{Item: Item{Timestamp: MaxU64}}) + encodedBound, err := n.encodeBound(Bound{Item: Item{Timestamp: maxTimestamp}}) if err != nil { panic(err) } @@ -271,11 +285,12 @@ func (n *Negentropy) SplitRange(lower, upper int, upperBound Bound, output *[]by fmt.Fprintln(os.Stderr, err) panic(err) } + fmt.Println("upp", upperBound, boundEncoded) *output = append(*output, boundEncoded...) *output = append(*output, encodeVarInt(IdListMode)...) *output = append(*output, encodeVarInt(numElems)...) - n.Storage.Iterate(lower, upper, func(item Item, _ int) bool { + n.storage.Iterate(lower, upper, func(item Item, _ int) bool { id, _ := hex.DecodeString(item.ID) *output = append(*output, id...) return true @@ -290,7 +305,7 @@ func (n *Negentropy) SplitRange(lower, upper int, upperBound Bound, output *[]by if i < bucketsWithExtra { bucketSize++ } - ourFingerprint, err := n.Storage.Fingerprint(curr, curr+bucketSize) + ourFingerprint, err := n.storage.Fingerprint(curr, curr+bucketSize) if err != nil { fmt.Fprintln(os.Stderr, err) panic(err) @@ -304,7 +319,7 @@ func (n *Negentropy) SplitRange(lower, upper int, upperBound Bound, output *[]by } else { var prevItem, currItem Item - n.Storage.Iterate(curr-1, curr+1, func(item Item, index int) bool { + n.storage.Iterate(curr-1, curr+1, func(item Item, index int) bool { if index == curr-1 { prevItem = item } else { @@ -313,11 +328,7 @@ func (n *Negentropy) SplitRange(lower, upper int, upperBound Bound, output *[]by return true }) - minBound, err := getMinimalBound(prevItem, currItem) - if err != nil { - fmt.Fprintln(os.Stderr, err) - panic(err) - } + minBound := n.getMinimalBound(prevItem, currItem) nextBound = minBound } @@ -339,21 +350,22 @@ func (n *Negentropy) ExceededFrameSizeLimit(size int) bool { // Decoding -func (n *Negentropy) DecodeTimestampIn(encoded *[]byte) (uint64, error) { +func (n *Negentropy) DecodeTimestampIn(encoded *[]byte) (nostr.Timestamp, error) { t, err := decodeVarInt(encoded) if err != nil { return 0, err } - timestamp := uint64(t) + + timestamp := nostr.Timestamp(t) if timestamp == 0 { - timestamp = MaxU64 + timestamp = maxTimestamp } else { timestamp-- } timestamp += n.lastTimestampIn if timestamp < n.lastTimestampIn { // Check for overflow - timestamp = MaxU64 + timestamp = maxTimestamp } n.lastTimestampIn = timestamp return timestamp, nil @@ -375,20 +387,15 @@ func (n *Negentropy) DecodeBound(encoded *[]byte) (Bound, error) { return Bound{}, err } - bound, err := NewBound(timestamp, hex.EncodeToString(id)) - if err != nil { - return Bound{}, err - } - - return *bound, nil + return Bound{Item{timestamp, hex.EncodeToString(id)}}, nil } // Encoding // encodeTimestampOut encodes the given timestamp. -func (n *Negentropy) encodeTimestampOut(timestamp uint64) []byte { - if timestamp == MaxU64 { - n.lastTimestampOut = MaxU64 +func (n *Negentropy) encodeTimestampOut(timestamp nostr.Timestamp) []byte { + if timestamp == maxTimestamp { + n.lastTimestampOut = maxTimestamp return encodeVarInt(0) } temp := timestamp @@ -397,32 +404,27 @@ func (n *Negentropy) encodeTimestampOut(timestamp uint64) []byte { return encodeVarInt(int(timestamp + 1)) } -// encodeBound encodes the given Bound into a byte slice. func (n *Negentropy) encodeBound(bound Bound) ([]byte, error) { var output []byte t := n.encodeTimestampOut(bound.Item.Timestamp) - idlen := encodeVarInt(bound.IDLen) + idlen := encodeVarInt(n.idSize) output = append(output, t...) output = append(output, idlen...) id := bound.Item.ID - if len(id) < bound.IDLen { - return nil, errors.New("ID length exceeds bound") - } output = append(output, id...) return output, nil } -func getMinimalBound(prev, curr Item) (Bound, error) { +func (n *Negentropy) getMinimalBound(prev, curr Item) Bound { if curr.Timestamp != prev.Timestamp { - bound, err := NewBound(curr.Timestamp, "") - return *bound, err + return Bound{Item{curr.Timestamp, ""}} } sharedPrefixBytes := 0 - for i := 0; i < IDSize; i++ { + for i := 0; i < n.idSize; i++ { if curr.ID[i:i+2] != prev.ID[i:i+2] { break } @@ -430,7 +432,5 @@ func getMinimalBound(prev, curr Item) (Bound, error) { } // sharedPrefixBytes + 1 to include the first differing byte, or the entire ID if identical. - // Ensure not to exceed the slice's length. - bound, err := NewBound(curr.Timestamp, curr.ID[:sharedPrefixBytes*2+1]) - return *bound, err + return Bound{Item{curr.Timestamp, curr.ID[:sharedPrefixBytes*2+1]}} } diff --git a/negentropy/types.go b/negentropy/types.go index da6583e..8b152a9 100644 --- a/negentropy/types.go +++ b/negentropy/types.go @@ -4,18 +4,11 @@ import ( "crypto/sha256" "encoding/binary" "encoding/hex" - "errors" - "math/big" + + "github.com/nbd-wtf/go-nostr" ) -const ( - IDSize = 32 - FingerprintSize = 16 -) - -var modulo = new(big.Int).Exp(big.NewInt(2), big.NewInt(256), nil) - -var ErrBadIDSize = errors.New("bad id size") +const FingerprintSize = 16 type Mode int @@ -25,19 +18,22 @@ const ( IdListMode ) +type Storage interface { + Insert(nostr.Timestamp, string) error + Seal() error + + IDSize() int + Size() int + Iterate(begin, end int, cb func(item Item, i int) bool) error + FindLowerBound(begin, end int, value Bound) (int, error) + Fingerprint(begin, end int) (Fingerprint, error) +} + type Item struct { - Timestamp uint64 + Timestamp nostr.Timestamp ID string } -func NewItem(timestamp uint64, id string) *Item { - return &Item{Timestamp: timestamp, ID: id} -} - -func (i Item) Equals(other Item) bool { - return i.Timestamp == other.Timestamp && i.ID == other.ID -} - func (i Item) LessThan(other Item) bool { if i.Timestamp != other.Timestamp { return i.Timestamp < other.Timestamp @@ -45,37 +41,7 @@ func (i Item) LessThan(other Item) bool { return i.ID < other.ID } -type Bound struct { - Item Item - IDLen int -} - -// NewBound creates a new Bound instance with a timestamp and ID. -// It returns an error if the ID size is incorrect. -func NewBound(timestamp uint64, id string) (*Bound, error) { - b := &Bound{ - Item: *NewItem(timestamp, id), - IDLen: len(id), - } - return b, nil -} - -// NewBoundWithItem creates a new Bound instance from an existing Item. -func NewBoundWithItem(item Item) *Bound { - return &Bound{ - Item: item, - IDLen: len(item.ID), - } -} - -// Equals checks if two Bound instances are equal. -func (b Bound) Equals(other Bound) bool { - return b.Item.Equals(other.Item) -} - -func (b Bound) LessThan(other Bound) bool { - return b.Item.LessThan(other.Item) -} +type Bound struct{ Item } type Fingerprint struct { Buf [FingerprintSize]byte @@ -93,8 +59,8 @@ func (acc *Accumulator) SetToZero() { acc.Buf = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} } -func (acc *Accumulator) AddItem(other Item) { - b, _ := hex.DecodeString(other.ID) +func (acc *Accumulator) Add(id string) { + b, _ := hex.DecodeString(id) acc.AddBytes(b) } diff --git a/negentropy/encoding.go b/negentropy/utils.go similarity index 95% rename from negentropy/encoding.go rename to negentropy/utils.go index 0bcd95b..ae7e87a 100644 --- a/negentropy/encoding.go +++ b/negentropy/utils.go @@ -17,7 +17,7 @@ func getByte(encoded *[]byte) (byte, error) { } func getBytes(encoded *[]byte, n int) ([]byte, error) { - //fmt.Fprintln(os.Stderr, "getBytes", len(*encoded), n) + // fmt.Fprintln(os.Stderr, "getBytes", len(*encoded), n) if len(*encoded) < n { return nil, errors.New("parse ends prematurely") } @@ -39,7 +39,7 @@ func decodeVarInt(encoded *[]byte) (int, error) { // } //} //return 0, ErrParseEndsPrematurely - res := 0 + var res int = 0 for { if len(*encoded) == 0 { @@ -76,4 +76,4 @@ func encodeVarInt(n int) []byte { } return o -} \ No newline at end of file +} diff --git a/negentropy/vector.go b/negentropy/vector.go index 9a88fe7..170f340 100644 --- a/negentropy/vector.go +++ b/negentropy/vector.go @@ -2,27 +2,32 @@ package negentropy import ( "errors" + "fmt" "sort" + + "github.com/nbd-wtf/go-nostr" ) type Vector struct { - items []Item + items []Item + idSize int } -func NewVector() *Vector { +func NewVector(idSize int) *Vector { return &Vector{ - items: make([]Item, 0, 30), + items: make([]Item, 0, 30), + idSize: idSize, } } -func (v *Vector) Insert(createdAt uint64, id string) error { +func (v *Vector) Insert(createdAt nostr.Timestamp, id string) error { // fmt.Fprintln(os.Stderr, "Insert", createdAt, id) - if len(id) != IDSize*2 { - return errors.New("bad id size for added item") + if len(id)/2 != v.idSize { + return fmt.Errorf("bad id size for added item: expected %d, got %d", v.idSize, len(id)/2) } - item := NewItem(createdAt, id) - v.items = append(v.items, *item) + item := Item{createdAt, id} + v.items = append(v.items, item) return nil } @@ -32,23 +37,15 @@ func (v *Vector) Seal() error { }) for i := 1; i < len(v.items); i++ { - if v.items[i-1].Equals(v.items[i]) { + if v.items[i-1].ID == v.items[i].ID { return errors.New("duplicate item inserted") } } return nil } -func (v *Vector) Size() int { - return len(v.items) -} - -func (v *Vector) GetItem(i uint64) (Item, error) { - if i >= uint64(len(v.items)) { - return Item{}, errors.New("index out of bounds") - } - return v.items[i], nil -} +func (v *Vector) Size() int { return len(v.items) } +func (v *Vector) IDSize() int { return v.idSize } func (v *Vector) Iterate(begin, end int, cb func(Item, int) bool) error { for i := begin; i < end; i++ { @@ -71,7 +68,7 @@ func (v *Vector) Fingerprint(begin, end int) (Fingerprint, error) { out.SetToZero() if err := v.Iterate(begin, end, func(item Item, _ int) bool { - out.AddItem(item) + out.Add(item.ID) return true }); err != nil { return Fingerprint{}, err diff --git a/negentropy/whatever_test.go b/negentropy/whatever_test.go index dceef07..e6723c5 100644 --- a/negentropy/whatever_test.go +++ b/negentropy/whatever_test.go @@ -4,6 +4,7 @@ import ( "fmt" "testing" + "github.com/nbd-wtf/go-nostr" "github.com/stretchr/testify/require" ) @@ -13,13 +14,19 @@ func TestSimple(t *testing.T) { var n1 *Negentropy var n2 *Negentropy + events := make([]*nostr.Event, 20) + for i := range events { + evt := nostr.Event{Content: fmt.Sprintf("event %d", i+1)} + evt.CreatedAt = nostr.Timestamp(i) + evt.ID = evt.GetID() + events[i] = &evt + } + { - n1, _ = NewNegentropy(NewVector(), 1<<16) - n1.Insert(10, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") - n1.Insert(20, "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb") - n1.Insert(30, "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc") - n1.Insert(40, "dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd") - n1.Seal() + n1, _ = NewNegentropy(NewVector(32), 1<<16, 32) + for i := 2; i < 15; i++ { + n1.Insert(events[i]) + } q, err = n1.Initiate() if err != nil { @@ -31,11 +38,13 @@ func TestSimple(t *testing.T) { } { - n2, _ = NewNegentropy(NewVector(), 1<<16) - n2.Insert(20, "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb") - n2.Insert(30, "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc") - n2.Insert(50, "eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee") - n2.Seal() + n2, _ = NewNegentropy(NewVector(32), 1<<16, 32) + for i := 0; i < 2; i++ { + n2.Insert(events[i]) + } + for i := 15; i < 20; i++ { + n2.Insert(events[i]) + } q, err = n2.Reconcile(q) if err != nil {