diff --git a/negentropy/base.go b/negentropy/base.go new file mode 100644 index 0000000..f409d1e --- /dev/null +++ b/negentropy/base.go @@ -0,0 +1,13 @@ +package negentropy + +// Storage defines an interface for storage operations, similar to the abstract class in C++. +type Storage interface { + Insert(uint64, string) error + Seal() error + + Size() int + GetItem(i uint64) (Item, error) + Iterate(begin, end int, cb func(item Item, i int) bool) error + FindLowerBound(begin, end int, value Bound) (int, error) + Fingerprint(begin, end int) (Fingerprint, error) +} diff --git a/negentropy/encoding.go b/negentropy/encoding.go new file mode 100644 index 0000000..0bcd95b --- /dev/null +++ b/negentropy/encoding.go @@ -0,0 +1,79 @@ +package negentropy + +import ( + "errors" +) + +var ErrParseEndsPrematurely = errors.New("parse ends prematurely") + +func getByte(encoded *[]byte) (byte, error) { + if len(*encoded) < 1 { + return 0, ErrParseEndsPrematurely + } + b := (*encoded)[0] + *encoded = (*encoded)[1:] + + return b, nil +} + +func getBytes(encoded *[]byte, n int) ([]byte, error) { + //fmt.Fprintln(os.Stderr, "getBytes", len(*encoded), n) + if len(*encoded) < n { + return nil, errors.New("parse ends prematurely") + } + result := (*encoded)[:n] + *encoded = (*encoded)[n:] + return result, nil +} + +func decodeVarInt(encoded *[]byte) (int, error) { + //var res uint64 + // + //for i := 0; i < len(*encoded); i++ { + // byte := (*encoded)[i] + // res = (res << 7) | uint64(byte&0x7F) + // if (byte & 0x80) == 0 { + // fmt.Fprintln(os.Stderr, "decodeVarInt", encoded, i) + // *encoded = (*encoded)[i+1:] // Advance the slice to reflect consumed bytes + // return res, nil + // } + //} + //return 0, ErrParseEndsPrematurely + res := 0 + + for { + if len(*encoded) == 0 { + return 0, errors.New("parse ends prematurely") + } + + // Remove the first byte from the slice and update the slice. + // This simulates JavaScript's shift operation on arrays. + byte := (*encoded)[0] + *encoded = (*encoded)[1:] + + res = (res << 7) | (int(byte) & 127) + if (byte & 128) == 0 { + break + } + } + + return res, nil +} + +func encodeVarInt(n int) []byte { + if n == 0 { + return []byte{0} + } + + var o []byte + for n != 0 { + o = append([]byte{byte(n & 0x7F)}, o...) + n >>= 7 + } + + for i := 0; i < len(o)-1; i++ { + o[i] |= 0x80 + } + + return o +} \ No newline at end of file diff --git a/negentropy/negentropy.go b/negentropy/negentropy.go new file mode 100644 index 0000000..5c96cb9 --- /dev/null +++ b/negentropy/negentropy.go @@ -0,0 +1,436 @@ +package negentropy + +import ( + "bytes" + "encoding/hex" + "errors" + "fmt" + "os" +) + +const ( + ProtocolVersion byte = 0x61 // Version 1 + MaxU64 uint64 = ^uint64(0) + FrameSizeMinLimit uint64 = 4096 +) + +type Negentropy struct { + Storage + frameSizeLimit uint64 + IsInitiator bool + lastTimestampIn uint64 + lastTimestampOut uint64 +} + +func NewNegentropy(storage Storage, frameSizeLimit uint64) (*Negentropy, error) { + if frameSizeLimit != 0 && frameSizeLimit < 4096 { + return nil, errors.New("frameSizeLimit too small") + } + return &Negentropy{ + Storage: storage, + frameSizeLimit: frameSizeLimit, + }, nil +} + +func (n *Negentropy) Initiate() ([]byte, error) { + if n.IsInitiator { + return []byte{}, errors.New("already initiated") + } + n.IsInitiator = true + + output := make([]byte, 1, 1+n.Storage.Size()*IDSize) + output[0] = ProtocolVersion + n.SplitRange(0, n.Storage.Size(), Bound{Item: Item{Timestamp: MaxU64}}, &output) + + return output, nil +} + +func (n *Negentropy) Reconcile(query []byte) ([]byte, error) { + if n.IsInitiator { + return []byte{}, errors.New("initiator not asking for have/need IDs") + } + var haveIds, needIds []string + + output, err := n.ReconcileAux(query, &haveIds, &needIds) + if err != nil { + return nil, err + } + + if len(output) == 1 && n.IsInitiator { + return nil, nil + } + + return output, nil +} + +// ReconcileWithIDs when IDs are expected to be returned. +func (n *Negentropy) ReconcileWithIDs(query []byte, haveIds, needIds *[]string) ([]byte, error) { + if !n.IsInitiator { + return nil, errors.New("non-initiator asking for have/need IDs") + } + + output, err := n.ReconcileAux(query, haveIds, needIds) + if err != nil { + return nil, err + } + if len(output) == 1 { + // Assuming an empty string is a special case indicating a condition similar to std::nullopt + return nil, nil + } + + return output, nil +} + +func (n *Negentropy) ReconcileAux(query []byte, haveIds, needIds *[]string) ([]byte, error) { + n.lastTimestampIn, n.lastTimestampOut = 0, 0 // Reset for each message + + var fullOutput []byte + fullOutput = append(fullOutput, ProtocolVersion) + + protocolVersion, err := getByte(&query) + if err != nil { + return nil, err + } + if protocolVersion < 0x60 || protocolVersion > 0x6F { + return nil, errors.New("invalid negentropy protocol version byte") + } + if protocolVersion != ProtocolVersion { + if n.IsInitiator { + return nil, errors.New("unsupported negentropy protocol version requested") + } + return fullOutput, nil + } + + storageSize := n.Storage.Size() + var prevBound Bound + prevIndex := 0 + skip := false + + // Convert the loop to process the query until it's consumed + for len(query) > 0 { + var o []byte + + doSkip := func() { + if skip { + skip = false + encodedBound, err := n.encodeBound(prevBound) // Handle error appropriately + if err != nil { + panic(err) + } + o = append(o, encodedBound...) + o = append(o, encodeVarInt(SkipMode)...) + } + } + + currBound, err := n.DecodeBound(&query) + if err != nil { + return nil, err + } + modeVal, err := decodeVarInt(&query) + if err != nil { + return nil, err + } + mode := Mode(modeVal) + + lower := prevIndex + upper, err := n.Storage.FindLowerBound(prevIndex, storageSize, currBound) + if err != nil { + return nil, err + } + + switch mode { + case SkipMode: + skip = true + + case FingerprintMode: + theirFingerprint, err := getBytes(&query, FingerprintSize) + if err != nil { + return nil, err + } + ourFingerprint, err := n.Storage.Fingerprint(lower, upper) + if err != nil { + return nil, err // Handle the error appropriately + } + + if !bytes.Equal(theirFingerprint, ourFingerprint.Buf[:]) { + doSkip() + n.SplitRange(lower, upper, currBound, &o) + } else { + skip = true + } + + case IdListMode: + numIds, err := decodeVarInt(&query) + if err != nil { + return nil, err + } + + theirElems := make(map[string]struct{}) + for i := 0; i < numIds; i++ { + e, err := getBytes(&query, IDSize) + if err != nil { + return nil, err + } + theirElems[hex.EncodeToString(e)] = struct{}{} + } + + n.Storage.Iterate(lower, upper, func(item Item, _ int) bool { + k := item.ID + if _, exists := theirElems[k]; !exists { + if n.IsInitiator { + *haveIds = append(*haveIds, k) + } + } else { + delete(theirElems, k) + } + return true + }) + + if n.IsInitiator { + skip = true + + for k := range theirElems { + *needIds = append(*needIds, k) + } + } else { + doSkip() + + responseIds := make([]byte, 0, IDSize*n.Storage.Size()) + responseIdsPtr := &responseIds + numResponseIds := 0 + endBound := currBound + + n.Storage.Iterate(lower, upper, func(item Item, index int) bool { + if n.ExceededFrameSizeLimit(len(fullOutput) + len(*responseIdsPtr)) { + endBound = *NewBoundWithItem(item) + upper = index + return false + } + + id, _ := hex.DecodeString(item.ID) + *responseIdsPtr = append(*responseIdsPtr, id...) + numResponseIds++ + return true + }) + + encodedBound, err := n.encodeBound(endBound) + if err != nil { + fmt.Fprintln(os.Stderr, err) + panic(err) + } + + o = append(o, encodedBound...) + o = append(o, encodeVarInt(IdListMode)...) + o = append(o, encodeVarInt(numResponseIds)...) + o = append(o, responseIds...) + + fullOutput = append(fullOutput, o...) + o = []byte{} + } + + default: + return nil, errors.New("unexpected mode") + } + + // Check if the frame size limit is exceeded + if n.ExceededFrameSizeLimit(len(fullOutput) + len(o)) { + // Frame size limit exceeded, handle by encoding a boundary and fingerprint for the remaining range + remainingFingerprint, err := n.Storage.Fingerprint(upper, storageSize) + if err != nil { + panic(err) + } + + encodedBound, err := n.encodeBound(Bound{Item: Item{Timestamp: MaxU64}}) + if err != nil { + panic(err) + } + fullOutput = append(fullOutput, encodedBound...) + fullOutput = append(fullOutput, encodeVarInt(FingerprintMode)...) + fullOutput = append(fullOutput, remainingFingerprint.SV()...) + + break // Stop processing further + } else { + // Append the constructed output for this iteration + fullOutput = append(fullOutput, o...) + } + + prevIndex = upper + prevBound = currBound + } + + return fullOutput, nil +} + +func (n *Negentropy) SplitRange(lower, upper int, upperBound Bound, output *[]byte) { + numElems := upper - lower + const Buckets = 16 + + if numElems < Buckets*2 { + boundEncoded, err := n.encodeBound(upperBound) + if err != nil { + fmt.Fprintln(os.Stderr, err) + panic(err) + } + *output = append(*output, boundEncoded...) + *output = append(*output, encodeVarInt(IdListMode)...) + *output = append(*output, encodeVarInt(numElems)...) + + n.Storage.Iterate(lower, upper, func(item Item, _ int) bool { + id, _ := hex.DecodeString(item.ID) + *output = append(*output, id...) + return true + }) + } else { + itemsPerBucket := numElems / Buckets + bucketsWithExtra := numElems % Buckets + curr := lower + + for i := 0; i < Buckets; i++ { + bucketSize := itemsPerBucket + if i < bucketsWithExtra { + bucketSize++ + } + ourFingerprint, err := n.Storage.Fingerprint(curr, curr+bucketSize) + if err != nil { + fmt.Fprintln(os.Stderr, err) + panic(err) + } + + curr += bucketSize + + var nextBound Bound + if curr == upper { + nextBound = upperBound + } else { + var prevItem, currItem Item + + n.Storage.Iterate(curr-1, curr+1, func(item Item, index int) bool { + if index == curr-1 { + prevItem = item + } else { + currItem = item + } + return true + }) + + minBound, err := getMinimalBound(prevItem, currItem) + if err != nil { + fmt.Fprintln(os.Stderr, err) + panic(err) + } + nextBound = minBound + } + + boundEncoded, err := n.encodeBound(nextBound) + if err != nil { + fmt.Fprintln(os.Stderr, err) + panic(err) + } + *output = append(*output, boundEncoded...) + *output = append(*output, encodeVarInt(FingerprintMode)...) + *output = append(*output, ourFingerprint.SV()...) + } + } +} + +func (n *Negentropy) ExceededFrameSizeLimit(size int) bool { + return n.frameSizeLimit != 0 && size > int(n.frameSizeLimit)-200 +} + +// Decoding + +func (n *Negentropy) DecodeTimestampIn(encoded *[]byte) (uint64, error) { + t, err := decodeVarInt(encoded) + if err != nil { + return 0, err + } + timestamp := uint64(t) + if timestamp == 0 { + timestamp = MaxU64 + } else { + timestamp-- + } + + timestamp += n.lastTimestampIn + if timestamp < n.lastTimestampIn { // Check for overflow + timestamp = MaxU64 + } + n.lastTimestampIn = timestamp + return timestamp, nil +} + +func (n *Negentropy) DecodeBound(encoded *[]byte) (Bound, error) { + timestamp, err := n.DecodeTimestampIn(encoded) + if err != nil { + return Bound{}, err + } + + length, err := decodeVarInt(encoded) + if err != nil { + return Bound{}, err + } + + id, err := getBytes(encoded, length) + if err != nil { + return Bound{}, err + } + + bound, err := NewBound(timestamp, hex.EncodeToString(id)) + if err != nil { + return Bound{}, err + } + + return *bound, nil +} + +// Encoding + +// encodeTimestampOut encodes the given timestamp. +func (n *Negentropy) encodeTimestampOut(timestamp uint64) []byte { + if timestamp == MaxU64 { + n.lastTimestampOut = MaxU64 + return encodeVarInt(0) + } + temp := timestamp + timestamp -= n.lastTimestampOut + n.lastTimestampOut = temp + return encodeVarInt(int(timestamp + 1)) +} + +// encodeBound encodes the given Bound into a byte slice. +func (n *Negentropy) encodeBound(bound Bound) ([]byte, error) { + var output []byte + + t := n.encodeTimestampOut(bound.Item.Timestamp) + idlen := encodeVarInt(bound.IDLen) + output = append(output, t...) + output = append(output, idlen...) + id := bound.Item.ID + + if len(id) < bound.IDLen { + return nil, errors.New("ID length exceeds bound") + } + output = append(output, id...) + return output, nil +} + +func getMinimalBound(prev, curr Item) (Bound, error) { + if curr.Timestamp != prev.Timestamp { + bound, err := NewBound(curr.Timestamp, "") + return *bound, err + } + + sharedPrefixBytes := 0 + + for i := 0; i < IDSize; i++ { + if curr.ID[i:i+2] != prev.ID[i:i+2] { + break + } + sharedPrefixBytes++ + } + + // sharedPrefixBytes + 1 to include the first differing byte, or the entire ID if identical. + // Ensure not to exceed the slice's length. + bound, err := NewBound(curr.Timestamp, curr.ID[:sharedPrefixBytes*2+1]) + return *bound, err +} diff --git a/negentropy/types.go b/negentropy/types.go new file mode 100644 index 0000000..da6583e --- /dev/null +++ b/negentropy/types.go @@ -0,0 +1,154 @@ +package negentropy + +import ( + "crypto/sha256" + "encoding/binary" + "encoding/hex" + "errors" + "math/big" +) + +const ( + IDSize = 32 + FingerprintSize = 16 +) + +var modulo = new(big.Int).Exp(big.NewInt(2), big.NewInt(256), nil) + +var ErrBadIDSize = errors.New("bad id size") + +type Mode int + +const ( + SkipMode = iota + FingerprintMode + IdListMode +) + +type Item struct { + Timestamp uint64 + ID string +} + +func NewItem(timestamp uint64, id string) *Item { + return &Item{Timestamp: timestamp, ID: id} +} + +func (i Item) Equals(other Item) bool { + return i.Timestamp == other.Timestamp && i.ID == other.ID +} + +func (i Item) LessThan(other Item) bool { + if i.Timestamp != other.Timestamp { + return i.Timestamp < other.Timestamp + } + return i.ID < other.ID +} + +type Bound struct { + Item Item + IDLen int +} + +// NewBound creates a new Bound instance with a timestamp and ID. +// It returns an error if the ID size is incorrect. +func NewBound(timestamp uint64, id string) (*Bound, error) { + b := &Bound{ + Item: *NewItem(timestamp, id), + IDLen: len(id), + } + return b, nil +} + +// NewBoundWithItem creates a new Bound instance from an existing Item. +func NewBoundWithItem(item Item) *Bound { + return &Bound{ + Item: item, + IDLen: len(item.ID), + } +} + +// Equals checks if two Bound instances are equal. +func (b Bound) Equals(other Bound) bool { + return b.Item.Equals(other.Item) +} + +func (b Bound) LessThan(other Bound) bool { + return b.Item.LessThan(other.Item) +} + +type Fingerprint struct { + Buf [FingerprintSize]byte +} + +func (f *Fingerprint) SV() []byte { + return f.Buf[:] +} + +type Accumulator struct { + Buf []byte +} + +func (acc *Accumulator) SetToZero() { + acc.Buf = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} +} + +func (acc *Accumulator) AddItem(other Item) { + b, _ := hex.DecodeString(other.ID) + acc.AddBytes(b) +} + +func (acc *Accumulator) AddAccumulator(other Accumulator) { + acc.AddBytes(other.Buf) +} + +func (acc *Accumulator) AddBytes(other []byte) { + var currCarry, nextCarry uint32 + + if len(acc.Buf) < 32 { + newBuf := make([]byte, 32) + copy(newBuf, acc.Buf) + acc.Buf = newBuf + } + + for i := 0; i < 8; i++ { + offset := i * 4 + orig := binary.LittleEndian.Uint32(acc.Buf[offset:]) + otherV := binary.LittleEndian.Uint32(other[offset:]) + + next := orig + currCarry + otherV + if next < orig || next < otherV { + nextCarry = 1 + } + + binary.LittleEndian.PutUint32(acc.Buf[offset:], next&0xFFFFFFFF) + currCarry = nextCarry + nextCarry = 0 + } +} + +func (acc *Accumulator) Negate() { + for i := range acc.Buf { + acc.Buf[i] = ^acc.Buf[i] + } + + var one []byte + one[0] = 1 // Assuming little-endian; if big-endian, use one[len(one)-1] = 1 + + acc.AddBytes(one) +} + +func (acc *Accumulator) SV() []byte { + return acc.Buf[:] +} + +func (acc *Accumulator) GetFingerprint(n int) Fingerprint { + input := acc.SV() + input = append(input, encodeVarInt(n)...) + + hash := sha256.Sum256(input) + + var fingerprint Fingerprint + copy(fingerprint.Buf[:], hash[:FingerprintSize]) + return fingerprint +} diff --git a/negentropy/vector.go b/negentropy/vector.go new file mode 100644 index 0000000..9a88fe7 --- /dev/null +++ b/negentropy/vector.go @@ -0,0 +1,81 @@ +package negentropy + +import ( + "errors" + "sort" +) + +type Vector struct { + items []Item +} + +func NewVector() *Vector { + return &Vector{ + items: make([]Item, 0, 30), + } +} + +func (v *Vector) Insert(createdAt uint64, id string) error { + // fmt.Fprintln(os.Stderr, "Insert", createdAt, id) + if len(id) != IDSize*2 { + return errors.New("bad id size for added item") + } + item := NewItem(createdAt, id) + + v.items = append(v.items, *item) + return nil +} + +func (v *Vector) Seal() error { + sort.Slice(v.items, func(i, j int) bool { + return v.items[i].LessThan(v.items[j]) + }) + + for i := 1; i < len(v.items); i++ { + if v.items[i-1].Equals(v.items[i]) { + return errors.New("duplicate item inserted") + } + } + return nil +} + +func (v *Vector) Size() int { + return len(v.items) +} + +func (v *Vector) GetItem(i uint64) (Item, error) { + if i >= uint64(len(v.items)) { + return Item{}, errors.New("index out of bounds") + } + return v.items[i], nil +} + +func (v *Vector) Iterate(begin, end int, cb func(Item, int) bool) error { + for i := begin; i < end; i++ { + if !cb(v.items[i], i) { + break + } + } + return nil +} + +func (v *Vector) FindLowerBound(begin, end int, bound Bound) (int, error) { + i := sort.Search(len(v.items[begin:end]), func(i int) bool { + return !v.items[begin+i].LessThan(bound.Item) + }) + return begin + i, nil +} + +func (v *Vector) Fingerprint(begin, end int) (Fingerprint, error) { + var out Accumulator + out.SetToZero() + + if err := v.Iterate(begin, end, func(item Item, _ int) bool { + out.AddItem(item) + return true + }); err != nil { + return Fingerprint{}, err + } + + return out.GetFingerprint(end - begin), nil +} diff --git a/negentropy/whatever_test.go b/negentropy/whatever_test.go new file mode 100644 index 0000000..dceef07 --- /dev/null +++ b/negentropy/whatever_test.go @@ -0,0 +1,63 @@ +package negentropy + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSimple(t *testing.T) { + var err error + var q []byte + var n1 *Negentropy + var n2 *Negentropy + + { + n1, _ = NewNegentropy(NewVector(), 1<<16) + n1.Insert(10, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") + n1.Insert(20, "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb") + n1.Insert(30, "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc") + n1.Insert(40, "dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd") + n1.Seal() + + q, err = n1.Initiate() + if err != nil { + t.Fatal(err) + return + } + + fmt.Println("n1:", q) + } + + { + n2, _ = NewNegentropy(NewVector(), 1<<16) + n2.Insert(20, "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb") + n2.Insert(30, "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc") + n2.Insert(50, "eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee") + n2.Seal() + + q, err = n2.Reconcile(q) + if err != nil { + t.Fatal(err) + return + } + fmt.Println("n2:", q) + } + + { + var have []string + var need []string + q, err = n1.ReconcileWithIDs(q, &have, &need) + if err != nil { + t.Fatal(err) + return + } + fmt.Println("n1:", q) + fmt.Println("have", have) + fmt.Println("need", need) + + require.Equal(t, have, []string{"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd"}) + require.Equal(t, need, []string{"eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee"}) + } +}