mirror of
https://github.com/nbd-wtf/go-nostr.git
synced 2025-07-01 19:11:05 +02:00
negentropy: do the algorithm entirely in hex.
plus: - nicer iterators - some optimizations here and there. - something else I forgot.
This commit is contained in:
@ -1,13 +1,12 @@
|
|||||||
package negentropy
|
package negentropy
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"fmt"
|
||||||
"encoding/hex"
|
|
||||||
|
|
||||||
"github.com/nbd-wtf/go-nostr"
|
"github.com/nbd-wtf/go-nostr"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (n *Negentropy) DecodeTimestampIn(reader *bytes.Reader) (nostr.Timestamp, error) {
|
func (n *Negentropy) DecodeTimestampIn(reader *StringHexReader) (nostr.Timestamp, error) {
|
||||||
t, err := decodeVarInt(reader)
|
t, err := decodeVarInt(reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
@ -28,47 +27,42 @@ func (n *Negentropy) DecodeTimestampIn(reader *bytes.Reader) (nostr.Timestamp, e
|
|||||||
return timestamp, nil
|
return timestamp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Negentropy) DecodeBound(reader *bytes.Reader) (Bound, error) {
|
func (n *Negentropy) DecodeBound(reader *StringHexReader) (Bound, error) {
|
||||||
timestamp, err := n.DecodeTimestampIn(reader)
|
timestamp, err := n.DecodeTimestampIn(reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Bound{}, err
|
return Bound{}, fmt.Errorf("failed to decode bound timestamp: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
length, err := decodeVarInt(reader)
|
length, err := decodeVarInt(reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Bound{}, err
|
return Bound{}, fmt.Errorf("failed to decode bound length: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
id := make([]byte, length)
|
id, err := reader.ReadString(length * 2)
|
||||||
if _, err = reader.Read(id); err != nil {
|
if err != nil {
|
||||||
return Bound{}, err
|
return Bound{}, fmt.Errorf("failed to read bound id: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return Bound{Item{timestamp, hex.EncodeToString(id)}}, nil
|
return Bound{Item{timestamp, id}}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Negentropy) encodeTimestampOut(timestamp nostr.Timestamp) []byte {
|
func (n *Negentropy) encodeTimestampOut(w *StringHexWriter, timestamp nostr.Timestamp) {
|
||||||
if timestamp == maxTimestamp {
|
if timestamp == maxTimestamp {
|
||||||
n.lastTimestampOut = maxTimestamp
|
n.lastTimestampOut = maxTimestamp
|
||||||
return encodeVarInt(0)
|
encodeVarIntToHex(w, 0)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
temp := timestamp
|
temp := timestamp
|
||||||
timestamp -= n.lastTimestampOut
|
timestamp -= n.lastTimestampOut
|
||||||
n.lastTimestampOut = temp
|
n.lastTimestampOut = temp
|
||||||
return encodeVarInt(int(timestamp + 1))
|
encodeVarIntToHex(w, int(timestamp+1))
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Negentropy) encodeBound(bound Bound) []byte {
|
func (n *Negentropy) encodeBound(w *StringHexWriter, bound Bound) {
|
||||||
var output []byte
|
n.encodeTimestampOut(w, bound.Timestamp)
|
||||||
|
encodeVarIntToHex(w, len(bound.ID)/2)
|
||||||
t := n.encodeTimestampOut(bound.Timestamp)
|
w.WriteHex(bound.Item.ID)
|
||||||
idlen := encodeVarInt(len(bound.ID) / 2)
|
|
||||||
output = append(output, t...)
|
|
||||||
output = append(output, idlen...)
|
|
||||||
id, _ := hex.DecodeString(bound.Item.ID)
|
|
||||||
|
|
||||||
output = append(output, id...)
|
|
||||||
return output
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func getMinimalBound(prev, curr Item) Bound {
|
func getMinimalBound(prev, curr Item) Bound {
|
||||||
@ -89,11 +83,11 @@ func getMinimalBound(prev, curr Item) Bound {
|
|||||||
return Bound{Item{curr.Timestamp, curr.ID[:(sharedPrefixBytes+1)*2]}}
|
return Bound{Item{curr.Timestamp, curr.ID[:(sharedPrefixBytes+1)*2]}}
|
||||||
}
|
}
|
||||||
|
|
||||||
func decodeVarInt(reader *bytes.Reader) (int, error) {
|
func decodeVarInt(reader *StringHexReader) (int, error) {
|
||||||
var res int = 0
|
var res int = 0
|
||||||
|
|
||||||
for {
|
for {
|
||||||
b, err := reader.ReadByte()
|
b, err := reader.ReadHexByte()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
@ -124,3 +118,21 @@ func encodeVarInt(n int) []byte {
|
|||||||
|
|
||||||
return o
|
return o
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func encodeVarIntToHex(w *StringHexWriter, n int) {
|
||||||
|
if n == 0 {
|
||||||
|
w.WriteByte(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
var o []byte
|
||||||
|
for n != 0 {
|
||||||
|
o = append([]byte{byte(n & 0x7F)}, o...)
|
||||||
|
n >>= 7
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < len(o)-1; i++ {
|
||||||
|
o[i] |= 0x80
|
||||||
|
}
|
||||||
|
|
||||||
|
w.WriteBytes(o)
|
||||||
|
}
|
||||||
|
96
nip77/negentropy/hex.go
Normal file
96
nip77/negentropy/hex.go
Normal file
@ -0,0 +1,96 @@
|
|||||||
|
package negentropy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/hex"
|
||||||
|
"io"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewStringHexReader(source string) *StringHexReader {
|
||||||
|
return &StringHexReader{source, 0, make([]byte, 1)}
|
||||||
|
}
|
||||||
|
|
||||||
|
type StringHexReader struct {
|
||||||
|
source string
|
||||||
|
idx int
|
||||||
|
|
||||||
|
tmp []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *StringHexReader) Len() int {
|
||||||
|
return len(r.source) - r.idx
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *StringHexReader) ReadHexBytes(buf []byte) error {
|
||||||
|
n := len(buf) * 2
|
||||||
|
r.idx += n
|
||||||
|
if len(r.source) < r.idx {
|
||||||
|
return io.EOF
|
||||||
|
}
|
||||||
|
_, err := hex.Decode(buf, []byte(r.source[r.idx-n:r.idx]))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *StringHexReader) ReadHexByte() (byte, error) {
|
||||||
|
err := r.ReadHexBytes(r.tmp)
|
||||||
|
return r.tmp[0], err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *StringHexReader) ReadString(size int) (string, error) {
|
||||||
|
if size == 0 {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
r.idx += size
|
||||||
|
if len(r.source) < r.idx {
|
||||||
|
return "", io.EOF
|
||||||
|
}
|
||||||
|
return r.source[r.idx-size : r.idx], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewStringHexWriter(buf []byte) *StringHexWriter {
|
||||||
|
return &StringHexWriter{buf, make([]byte, 2)}
|
||||||
|
}
|
||||||
|
|
||||||
|
type StringHexWriter struct {
|
||||||
|
hexbuf []byte
|
||||||
|
|
||||||
|
tmp []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *StringHexWriter) Len() int {
|
||||||
|
return len(r.hexbuf)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *StringHexWriter) Hex() string {
|
||||||
|
return string(r.hexbuf)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *StringHexWriter) Reset() {
|
||||||
|
r.hexbuf = r.hexbuf[:0]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *StringHexWriter) WriteHex(hexString string) {
|
||||||
|
r.hexbuf = append(r.hexbuf, hexString...)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *StringHexWriter) WriteByte(b byte) error {
|
||||||
|
hex.Encode(r.tmp, []byte{b})
|
||||||
|
r.hexbuf = append(r.hexbuf, r.tmp...)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *StringHexWriter) WriteBytes(in []byte) {
|
||||||
|
r.hexbuf = hex.AppendEncode(r.hexbuf, in)
|
||||||
|
|
||||||
|
// curr := len(r.hexbuf)
|
||||||
|
// next := curr + len(in)*2
|
||||||
|
// for cap(r.hexbuf) < next {
|
||||||
|
// r.hexbuf = append(r.hexbuf, in...)
|
||||||
|
// }
|
||||||
|
// r.hexbuf = r.hexbuf[0:next]
|
||||||
|
// dst := r.hexbuf[curr:next]
|
||||||
|
|
||||||
|
// hex.Encode(dst, in)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
@ -1,11 +1,10 @@
|
|||||||
package negentropy
|
package negentropy
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"encoding/hex"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"os"
|
"slices"
|
||||||
|
"strings"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
|
|
||||||
"github.com/nbd-wtf/go-nostr"
|
"github.com/nbd-wtf/go-nostr"
|
||||||
@ -22,7 +21,7 @@ type Negentropy struct {
|
|||||||
storage Storage
|
storage Storage
|
||||||
sealed bool
|
sealed bool
|
||||||
frameSizeLimit int
|
frameSizeLimit int
|
||||||
isInitiator bool
|
isClient bool
|
||||||
lastTimestampIn nostr.Timestamp
|
lastTimestampIn nostr.Timestamp
|
||||||
lastTimestampOut nostr.Timestamp
|
lastTimestampOut nostr.Timestamp
|
||||||
|
|
||||||
@ -37,6 +36,17 @@ func NewNegentropy(storage Storage, frameSizeLimit int) *Negentropy {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *Negentropy) String() string {
|
||||||
|
label := "unsealed"
|
||||||
|
if n.sealed {
|
||||||
|
label = "server"
|
||||||
|
if n.isClient {
|
||||||
|
label = "client"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("<Negentropy %s with %d items>", label, n.storage.Size())
|
||||||
|
}
|
||||||
|
|
||||||
func (n *Negentropy) Insert(evt *nostr.Event) {
|
func (n *Negentropy) Insert(evt *nostr.Event) {
|
||||||
err := n.storage.Insert(evt.CreatedAt, evt.ID)
|
err := n.storage.Insert(evt.CreatedAt, evt.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -51,83 +61,76 @@ func (n *Negentropy) seal() {
|
|||||||
n.sealed = true
|
n.sealed = true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Negentropy) Initiate() []byte {
|
func (n *Negentropy) Initiate() string {
|
||||||
n.seal()
|
n.seal()
|
||||||
n.isInitiator = true
|
n.isClient = true
|
||||||
|
|
||||||
n.Haves = make(chan string, n.storage.Size()/2)
|
n.Haves = make(chan string, n.storage.Size()/2)
|
||||||
n.HaveNots = make(chan string, n.storage.Size()/2)
|
n.HaveNots = make(chan string, n.storage.Size()/2)
|
||||||
|
|
||||||
output := bytes.NewBuffer(make([]byte, 0, 1+n.storage.Size()*32))
|
output := NewStringHexWriter(make([]byte, 0, 1+n.storage.Size()*64))
|
||||||
output.WriteByte(protocolVersion)
|
output.WriteByte(protocolVersion)
|
||||||
n.SplitRange(0, n.storage.Size(), infiniteBound, output)
|
n.SplitRange(0, n.storage.Size(), infiniteBound, output)
|
||||||
|
|
||||||
return output.Bytes()
|
return output.Hex()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Negentropy) Reconcile(msg []byte) (output []byte, err error) {
|
func (n *Negentropy) Reconcile(msg string) (output string, err error) {
|
||||||
n.seal()
|
n.seal()
|
||||||
reader := bytes.NewReader(msg)
|
reader := NewStringHexReader(msg)
|
||||||
|
|
||||||
output, err = n.reconcileAux(reader)
|
output, err = n.reconcileAux(reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(output) == 1 && n.isInitiator {
|
if len(output) == 2 && n.isClient {
|
||||||
close(n.Haves)
|
close(n.Haves)
|
||||||
close(n.HaveNots)
|
close(n.HaveNots)
|
||||||
return nil, nil
|
return "", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return output, nil
|
return output, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Negentropy) reconcileAux(reader *bytes.Reader) ([]byte, error) {
|
func (n *Negentropy) reconcileAux(reader *StringHexReader) (string, error) {
|
||||||
n.lastTimestampIn, n.lastTimestampOut = 0, 0 // reset for each message
|
n.lastTimestampIn, n.lastTimestampOut = 0, 0 // reset for each message
|
||||||
|
|
||||||
fullOutput := bytes.NewBuffer(make([]byte, 0, 5000))
|
fullOutput := NewStringHexWriter(make([]byte, 0, 5000))
|
||||||
fullOutput.WriteByte(protocolVersion)
|
fullOutput.WriteByte(protocolVersion)
|
||||||
|
|
||||||
pv, err := reader.ReadByte()
|
pv, err := reader.ReadHexByte()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return "", fmt.Errorf("failed to read pv: %w", err)
|
||||||
}
|
|
||||||
|
|
||||||
if pv < 0x60 || pv > 0x6f {
|
|
||||||
return nil, fmt.Errorf("invalid protocol version byte")
|
|
||||||
}
|
}
|
||||||
if pv != protocolVersion {
|
if pv != protocolVersion {
|
||||||
if n.isInitiator {
|
return "", fmt.Errorf("unsupported negentropy protocol version %v", pv)
|
||||||
return nil, fmt.Errorf("unsupported negentropy protocol version requested")
|
|
||||||
}
|
|
||||||
return fullOutput.Bytes(), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var prevBound Bound
|
var prevBound Bound
|
||||||
prevIndex := 0
|
prevIndex := 0
|
||||||
skip := false
|
skipping := false // this means we are currently coalescing ranges into skip
|
||||||
|
|
||||||
partialOutput := bytes.NewBuffer(make([]byte, 0, 100))
|
partialOutput := NewStringHexWriter(make([]byte, 0, 100))
|
||||||
for reader.Len() > 0 {
|
for reader.Len() > 0 {
|
||||||
partialOutput.Reset()
|
partialOutput.Reset()
|
||||||
|
|
||||||
doSkip := func() {
|
finishSkip := func() {
|
||||||
if skip {
|
// end skip range, if necessary, so we can start a new bound that isn't a skip
|
||||||
skip = false
|
if skipping {
|
||||||
encodedBound := n.encodeBound(prevBound)
|
skipping = false
|
||||||
partialOutput.Write(encodedBound)
|
n.encodeBound(partialOutput, prevBound)
|
||||||
partialOutput.WriteByte(SkipMode)
|
partialOutput.WriteByte(byte(SkipMode))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
currBound, err := n.DecodeBound(reader)
|
currBound, err := n.DecodeBound(reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return "", fmt.Errorf("failed to decode bound: %w", err)
|
||||||
}
|
}
|
||||||
modeVal, err := decodeVarInt(reader)
|
modeVal, err := decodeVarInt(reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return "", fmt.Errorf("failed to decode mode: %w", err)
|
||||||
}
|
}
|
||||||
mode := Mode(modeVal)
|
mode := Mode(modeVal)
|
||||||
|
|
||||||
@ -136,134 +139,129 @@ func (n *Negentropy) reconcileAux(reader *bytes.Reader) ([]byte, error) {
|
|||||||
|
|
||||||
switch mode {
|
switch mode {
|
||||||
case SkipMode:
|
case SkipMode:
|
||||||
skip = true
|
skipping = true
|
||||||
|
|
||||||
case FingerprintMode:
|
case FingerprintMode:
|
||||||
var theirFingerprint [FingerprintSize]byte
|
var theirFingerprint [FingerprintSize]byte
|
||||||
_, err := reader.Read(theirFingerprint[:])
|
if err := reader.ReadHexBytes(theirFingerprint[:]); err != nil {
|
||||||
if err != nil {
|
return "", fmt.Errorf("failed to read fingerprint: %w", err)
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
ourFingerprint, err := n.storage.Fingerprint(lower, upper)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
ourFingerprint := n.storage.Fingerprint(lower, upper)
|
||||||
|
|
||||||
if theirFingerprint == ourFingerprint {
|
if theirFingerprint == ourFingerprint {
|
||||||
skip = true
|
skipping = true
|
||||||
} else {
|
} else {
|
||||||
doSkip()
|
finishSkip()
|
||||||
n.SplitRange(lower, upper, currBound, partialOutput)
|
n.SplitRange(lower, upper, currBound, partialOutput)
|
||||||
}
|
}
|
||||||
|
|
||||||
case IdListMode:
|
case IdListMode:
|
||||||
numIds, err := decodeVarInt(reader)
|
numIds, err := decodeVarInt(reader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return "", fmt.Errorf("failed to decode number of ids: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
theirElems := make(map[string]struct{})
|
// what they have
|
||||||
var idb [32]byte
|
theirItems := make([]string, 0, numIds)
|
||||||
|
|
||||||
for i := 0; i < numIds; i++ {
|
for i := 0; i < numIds; i++ {
|
||||||
_, err := reader.Read(idb[:])
|
if id, err := reader.ReadString(64); err != nil {
|
||||||
if err != nil {
|
return "", fmt.Errorf("failed to read id (#%d/%d) in list: %w", i, numIds, err)
|
||||||
return nil, err
|
} else {
|
||||||
|
theirItems = append(theirItems, id)
|
||||||
}
|
}
|
||||||
id := hex.EncodeToString(idb[:])
|
|
||||||
theirElems[id] = struct{}{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
n.storage.Iterate(lower, upper, func(item Item, _ int) bool {
|
// what we have
|
||||||
|
for _, item := range n.storage.Range(lower, upper) {
|
||||||
id := item.ID
|
id := item.ID
|
||||||
if _, exists := theirElems[id]; !exists {
|
|
||||||
if n.isInitiator {
|
if idx, theyHave := slices.BinarySearch(theirItems, id); theyHave {
|
||||||
|
// if we have and they have, ignore
|
||||||
|
theirItems[idx] = ""
|
||||||
|
} else {
|
||||||
|
// if we have and they don't, notify client
|
||||||
|
if n.isClient {
|
||||||
n.Haves <- id
|
n.Haves <- id
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
delete(theirElems, id)
|
|
||||||
}
|
}
|
||||||
return true
|
}
|
||||||
})
|
|
||||||
|
|
||||||
if n.isInitiator {
|
if n.isClient {
|
||||||
skip = true
|
// notify client of what they have and we don't
|
||||||
for id := range theirElems {
|
for _, id := range theirItems {
|
||||||
n.HaveNots <- id
|
if id != "" {
|
||||||
|
n.HaveNots <- id
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// client got list of ids, it's done, skip
|
||||||
|
skipping = true
|
||||||
} else {
|
} else {
|
||||||
doSkip()
|
// server got list of ids, reply with their own ids for the same range
|
||||||
|
finishSkip()
|
||||||
|
|
||||||
|
responseIds := strings.Builder{}
|
||||||
|
responseIds.Grow(64 * 100)
|
||||||
|
responses := 0
|
||||||
|
|
||||||
responseIds := make([]byte, 0, 32*n.storage.Size())
|
|
||||||
endBound := currBound
|
endBound := currBound
|
||||||
|
|
||||||
n.storage.Iterate(lower, upper, func(item Item, index int) bool {
|
for index, item := range n.storage.Range(lower, upper) {
|
||||||
if n.frameSizeLimit-200 < fullOutput.Len()+len(responseIds) {
|
if n.frameSizeLimit-200 < fullOutput.Len()+1+8+responseIds.Len() {
|
||||||
endBound = Bound{item}
|
endBound = Bound{item}
|
||||||
upper = index
|
upper = index
|
||||||
return false
|
break
|
||||||
}
|
}
|
||||||
|
responseIds.WriteString(item.ID)
|
||||||
|
responses++
|
||||||
|
}
|
||||||
|
|
||||||
id, _ := hex.DecodeString(item.ID)
|
n.encodeBound(partialOutput, endBound)
|
||||||
responseIds = append(responseIds, id...)
|
partialOutput.WriteByte(byte(IdListMode))
|
||||||
return true
|
encodeVarIntToHex(partialOutput, responses)
|
||||||
})
|
partialOutput.WriteHex(responseIds.String())
|
||||||
|
|
||||||
encodedBound := n.encodeBound(endBound)
|
fullOutput.WriteHex(partialOutput.Hex())
|
||||||
|
|
||||||
partialOutput.Write(encodedBound)
|
|
||||||
partialOutput.WriteByte(IdListMode)
|
|
||||||
partialOutput.Write(encodeVarInt(len(responseIds) / 32))
|
|
||||||
partialOutput.Write(responseIds)
|
|
||||||
|
|
||||||
partialOutput.WriteTo(fullOutput)
|
|
||||||
partialOutput.Reset()
|
partialOutput.Reset()
|
||||||
}
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("unexpected mode %d", mode)
|
return "", fmt.Errorf("unexpected mode %d", mode)
|
||||||
}
|
}
|
||||||
|
|
||||||
if n.frameSizeLimit-200 < fullOutput.Len()+partialOutput.Len() {
|
if n.frameSizeLimit-200 < fullOutput.Len()+partialOutput.Len() {
|
||||||
// frame size limit exceeded, handle by encoding a boundary and fingerprint for the remaining range
|
// frame size limit exceeded, handle by encoding a boundary and fingerprint for the remaining range
|
||||||
remainingFingerprint, err := n.storage.Fingerprint(upper, n.storage.Size())
|
remainingFingerprint := n.storage.Fingerprint(upper, n.storage.Size())
|
||||||
if err != nil {
|
n.encodeBound(fullOutput, infiniteBound)
|
||||||
panic(err)
|
fullOutput.WriteByte(byte(FingerprintMode))
|
||||||
}
|
fullOutput.WriteBytes(remainingFingerprint[:])
|
||||||
|
|
||||||
fullOutput.Write(n.encodeBound(infiniteBound))
|
|
||||||
fullOutput.WriteByte(FingerprintMode)
|
|
||||||
fullOutput.Write(remainingFingerprint[:])
|
|
||||||
|
|
||||||
break // stop processing further
|
break // stop processing further
|
||||||
} else {
|
} else {
|
||||||
// append the constructed output for this iteration
|
// append the constructed output for this iteration
|
||||||
partialOutput.WriteTo(fullOutput)
|
fullOutput.WriteHex(partialOutput.Hex())
|
||||||
}
|
}
|
||||||
|
|
||||||
prevIndex = upper
|
prevIndex = upper
|
||||||
prevBound = currBound
|
prevBound = currBound
|
||||||
}
|
}
|
||||||
|
|
||||||
return fullOutput.Bytes(), nil
|
return fullOutput.Hex(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Negentropy) SplitRange(lower, upper int, upperBound Bound, output *bytes.Buffer) {
|
func (n *Negentropy) SplitRange(lower, upper int, upperBound Bound, output *StringHexWriter) {
|
||||||
numElems := upper - lower
|
numElems := upper - lower
|
||||||
const buckets = 16
|
const buckets = 16
|
||||||
|
|
||||||
if numElems < buckets*2 {
|
if numElems < buckets*2 {
|
||||||
// we just send the full ids here
|
// we just send the full ids here
|
||||||
boundEncoded := n.encodeBound(upperBound)
|
n.encodeBound(output, upperBound)
|
||||||
output.Write(boundEncoded)
|
output.WriteByte(byte(IdListMode))
|
||||||
output.WriteByte(IdListMode)
|
encodeVarIntToHex(output, numElems)
|
||||||
output.Write(encodeVarInt(numElems))
|
|
||||||
|
|
||||||
n.storage.Iterate(lower, upper, func(item Item, _ int) bool {
|
for _, item := range n.storage.Range(lower, upper) {
|
||||||
id, _ := hex.DecodeString(item.ID)
|
output.WriteHex(item.ID)
|
||||||
output.Write(id)
|
}
|
||||||
return true
|
|
||||||
})
|
|
||||||
} else {
|
} else {
|
||||||
itemsPerBucket := numElems / buckets
|
itemsPerBucket := numElems / buckets
|
||||||
bucketsWithExtra := numElems % buckets
|
bucketsWithExtra := numElems % buckets
|
||||||
@ -274,12 +272,7 @@ func (n *Negentropy) SplitRange(lower, upper int, upperBound Bound, output *byte
|
|||||||
if i < bucketsWithExtra {
|
if i < bucketsWithExtra {
|
||||||
bucketSize++
|
bucketSize++
|
||||||
}
|
}
|
||||||
ourFingerprint, err := n.storage.Fingerprint(curr, curr+bucketSize)
|
ourFingerprint := n.storage.Fingerprint(curr, curr+bucketSize)
|
||||||
if err != nil {
|
|
||||||
fmt.Fprintln(os.Stderr, err)
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
curr += bucketSize
|
curr += bucketSize
|
||||||
|
|
||||||
var nextBound Bound
|
var nextBound Bound
|
||||||
@ -288,23 +281,21 @@ func (n *Negentropy) SplitRange(lower, upper int, upperBound Bound, output *byte
|
|||||||
} else {
|
} else {
|
||||||
var prevItem, currItem Item
|
var prevItem, currItem Item
|
||||||
|
|
||||||
n.storage.Iterate(curr-1, curr+1, func(item Item, index int) bool {
|
for index, item := range n.storage.Range(curr-1, curr+1) {
|
||||||
if index == curr-1 {
|
if index == curr-1 {
|
||||||
prevItem = item
|
prevItem = item
|
||||||
} else {
|
} else {
|
||||||
currItem = item
|
currItem = item
|
||||||
}
|
}
|
||||||
return true
|
}
|
||||||
})
|
|
||||||
|
|
||||||
minBound := getMinimalBound(prevItem, currItem)
|
minBound := getMinimalBound(prevItem, currItem)
|
||||||
nextBound = minBound
|
nextBound = minBound
|
||||||
}
|
}
|
||||||
|
|
||||||
boundEncoded := n.encodeBound(nextBound)
|
n.encodeBound(output, nextBound)
|
||||||
output.Write(boundEncoded)
|
output.WriteByte(byte(FingerprintMode))
|
||||||
output.WriteByte(FingerprintMode)
|
output.WriteBytes(ourFingerprint[:])
|
||||||
output.Write(ourFingerprint[:])
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,10 +1,11 @@
|
|||||||
package negentropy
|
package negentropy
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"cmp"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"encoding/hex"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"iter"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/nbd-wtf/go-nostr"
|
"github.com/nbd-wtf/go-nostr"
|
||||||
@ -12,22 +13,35 @@ import (
|
|||||||
|
|
||||||
const FingerprintSize = 16
|
const FingerprintSize = 16
|
||||||
|
|
||||||
type Mode int
|
type Mode uint8
|
||||||
|
|
||||||
const (
|
const (
|
||||||
SkipMode = 0
|
SkipMode Mode = 0
|
||||||
FingerprintMode = 1
|
FingerprintMode Mode = 1
|
||||||
IdListMode = 2
|
IdListMode Mode = 2
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func (v Mode) String() string {
|
||||||
|
switch v {
|
||||||
|
case SkipMode:
|
||||||
|
return "SKIP"
|
||||||
|
case FingerprintMode:
|
||||||
|
return "FINGERPRINT"
|
||||||
|
case IdListMode:
|
||||||
|
return "IDLIST"
|
||||||
|
default:
|
||||||
|
return "<UNKNOWN-ERROR>"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type Storage interface {
|
type Storage interface {
|
||||||
Insert(nostr.Timestamp, string) error
|
Insert(nostr.Timestamp, string) error
|
||||||
Seal()
|
Seal()
|
||||||
Size() int
|
Size() int
|
||||||
Iterate(begin, end int, cb func(item Item, i int) bool) error
|
Range(begin, end int) iter.Seq2[int, Item]
|
||||||
FindLowerBound(begin, end int, value Bound) int
|
FindLowerBound(begin, end int, value Bound) int
|
||||||
GetBound(idx int) Bound
|
GetBound(idx int) Bound
|
||||||
Fingerprint(begin, end int) ([FingerprintSize]byte, error)
|
Fingerprint(begin, end int) [FingerprintSize]byte
|
||||||
}
|
}
|
||||||
|
|
||||||
type Item struct {
|
type Item struct {
|
||||||
@ -36,10 +50,10 @@ type Item struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func itemCompare(a, b Item) int {
|
func itemCompare(a, b Item) int {
|
||||||
if a.Timestamp != b.Timestamp {
|
if a.Timestamp == b.Timestamp {
|
||||||
return int(a.Timestamp - b.Timestamp)
|
return strings.Compare(a.ID, b.ID)
|
||||||
}
|
}
|
||||||
return strings.Compare(a.ID, b.ID)
|
return cmp.Compare(a.Timestamp, b.Timestamp)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i Item) String() string { return fmt.Sprintf("Item<%d:%s>", i.Timestamp, i.ID) }
|
func (i Item) String() string { return fmt.Sprintf("Item<%d:%s>", i.Timestamp, i.ID) }
|
||||||
@ -61,11 +75,6 @@ func (acc *Accumulator) SetToZero() {
|
|||||||
acc.Buf = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
|
acc.Buf = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (acc *Accumulator) Add(id string) {
|
|
||||||
b, _ := hex.DecodeString(id)
|
|
||||||
acc.AddBytes(b)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (acc *Accumulator) AddAccumulator(other Accumulator) {
|
func (acc *Accumulator) AddAccumulator(other Accumulator) {
|
||||||
acc.AddBytes(other.Buf)
|
acc.AddBytes(other.Buf)
|
||||||
}
|
}
|
||||||
@ -95,12 +104,8 @@ func (acc *Accumulator) AddBytes(other []byte) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (acc *Accumulator) SV() []byte {
|
|
||||||
return acc.Buf[:]
|
|
||||||
}
|
|
||||||
|
|
||||||
func (acc *Accumulator) GetFingerprint(n int) [FingerprintSize]byte {
|
func (acc *Accumulator) GetFingerprint(n int) [FingerprintSize]byte {
|
||||||
input := acc.SV()
|
input := acc.Buf[:]
|
||||||
input = append(input, encodeVarInt(n)...)
|
input = append(input, encodeVarInt(n)...)
|
||||||
|
|
||||||
hash := sha256.Sum256(input)
|
hash := sha256.Sum256(input)
|
||||||
|
@ -1,7 +1,9 @@
|
|||||||
package negentropy
|
package negentropy
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"iter"
|
||||||
"slices"
|
"slices"
|
||||||
|
|
||||||
"github.com/nbd-wtf/go-nostr"
|
"github.com/nbd-wtf/go-nostr"
|
||||||
@ -45,13 +47,14 @@ func (v *Vector) GetBound(idx int) Bound {
|
|||||||
return infiniteBound
|
return infiniteBound
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Vector) Iterate(begin, end int, cb func(Item, int) bool) error {
|
func (v *Vector) Range(begin, end int) iter.Seq2[int, Item] {
|
||||||
for i := begin; i < end; i++ {
|
return func(yield func(int, Item) bool) {
|
||||||
if !cb(v.items[i], i) {
|
for i := begin; i < end; i++ {
|
||||||
break
|
if !yield(i, v.items[i]) {
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Vector) FindLowerBound(begin, end int, bound Bound) int {
|
func (v *Vector) FindLowerBound(begin, end int, bound Bound) int {
|
||||||
@ -59,16 +62,15 @@ func (v *Vector) FindLowerBound(begin, end int, bound Bound) int {
|
|||||||
return begin + idx
|
return begin + idx
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Vector) Fingerprint(begin, end int) ([FingerprintSize]byte, error) {
|
func (v *Vector) Fingerprint(begin, end int) [FingerprintSize]byte {
|
||||||
var out Accumulator
|
var out Accumulator
|
||||||
out.SetToZero()
|
out.SetToZero()
|
||||||
|
|
||||||
if err := v.Iterate(begin, end, func(item Item, _ int) bool {
|
tmp := make([]byte, 32)
|
||||||
out.Add(item.ID)
|
for _, item := range v.Range(begin, end) {
|
||||||
return true
|
hex.Decode(tmp, []byte(item.ID))
|
||||||
}); err != nil {
|
out.AddBytes(tmp)
|
||||||
return [FingerprintSize]byte{}, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return out.GetFingerprint(end - begin), nil
|
return out.GetFingerprint(end - begin)
|
||||||
}
|
}
|
||||||
|
@ -1,10 +1,9 @@
|
|||||||
package negentropy
|
package negentropy
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/hex"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"slices"
|
"slices"
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
@ -60,7 +59,7 @@ func runTestWith(t *testing.T,
|
|||||||
expectedN1NeedRanges [][]int, expectedN1HaveRanges [][]int,
|
expectedN1NeedRanges [][]int, expectedN1HaveRanges [][]int,
|
||||||
) {
|
) {
|
||||||
var err error
|
var err error
|
||||||
var q []byte
|
var q string
|
||||||
var n1 *Negentropy
|
var n1 *Negentropy
|
||||||
var n2 *Negentropy
|
var n2 *Negentropy
|
||||||
|
|
||||||
@ -109,18 +108,21 @@ func runTestWith(t *testing.T,
|
|||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
wg.Add(3)
|
wg.Add(3)
|
||||||
|
|
||||||
|
var fatal error
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
wg.Done()
|
defer wg.Done()
|
||||||
for n := n1; q != nil; n = invert[n] {
|
for n := n1; q != ""; n = invert[n] {
|
||||||
i++
|
i++
|
||||||
|
|
||||||
|
fmt.Println("processing reconcile", n)
|
||||||
q, err = n.Reconcile(q)
|
q, err = n.Reconcile(q)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
fatal = err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if q == nil {
|
if q == "" {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -141,6 +143,7 @@ func runTestWith(t *testing.T,
|
|||||||
}
|
}
|
||||||
haves = append(haves, item)
|
haves = append(haves, item)
|
||||||
}
|
}
|
||||||
|
slices.Sort(haves)
|
||||||
require.ElementsMatch(t, expectedHave, haves, "wrong have")
|
require.ElementsMatch(t, expectedHave, haves, "wrong have")
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -159,22 +162,12 @@ func runTestWith(t *testing.T,
|
|||||||
}
|
}
|
||||||
havenots = append(havenots, item)
|
havenots = append(havenots, item)
|
||||||
}
|
}
|
||||||
|
slices.Sort(havenots)
|
||||||
require.ElementsMatch(t, expectedNeed, havenots, "wrong need")
|
require.ElementsMatch(t, expectedNeed, havenots, "wrong need")
|
||||||
}()
|
}()
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
if fatal != nil {
|
||||||
|
log.Fatal(fatal)
|
||||||
func hexedBytes(o []byte) string {
|
|
||||||
s := strings.Builder{}
|
|
||||||
s.Grow(2 + 1 + len(o)*5)
|
|
||||||
s.WriteString("[ ")
|
|
||||||
for _, b := range o {
|
|
||||||
x := hex.EncodeToString([]byte{b})
|
|
||||||
s.WriteString("0x")
|
|
||||||
s.WriteString(x)
|
|
||||||
s.WriteString(" ")
|
|
||||||
}
|
}
|
||||||
s.WriteString("]")
|
|
||||||
return s.String()
|
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user