mirror of
https://github.com/nbd-wtf/go-nostr.git
synced 2025-04-10 04:39:09 +02:00
a million debug logs.
This commit is contained in:
parent
9c97d0fc28
commit
9d28f7e7f2
126
negentropy/encoding.go
Normal file
126
negentropy/encoding.go
Normal file
@ -0,0 +1,126 @@
|
||||
package negentropy
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/hex"
|
||||
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
)
|
||||
|
||||
func (n *Negentropy) DecodeTimestampIn(reader *bytes.Reader) (nostr.Timestamp, error) {
|
||||
t, err := decodeVarInt(reader)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
timestamp := nostr.Timestamp(t)
|
||||
if timestamp == 0 {
|
||||
timestamp = maxTimestamp
|
||||
} else {
|
||||
timestamp--
|
||||
}
|
||||
|
||||
timestamp += n.lastTimestampIn
|
||||
if timestamp < n.lastTimestampIn { // Check for overflow
|
||||
timestamp = maxTimestamp
|
||||
}
|
||||
n.lastTimestampIn = timestamp
|
||||
return timestamp, nil
|
||||
}
|
||||
|
||||
func (n *Negentropy) DecodeBound(reader *bytes.Reader) (Bound, error) {
|
||||
timestamp, err := n.DecodeTimestampIn(reader)
|
||||
if err != nil {
|
||||
return Bound{}, err
|
||||
}
|
||||
|
||||
length, err := decodeVarInt(reader)
|
||||
if err != nil {
|
||||
return Bound{}, err
|
||||
}
|
||||
|
||||
id := make([]byte, length)
|
||||
if _, err = reader.Read(id); err != nil {
|
||||
return Bound{}, err
|
||||
}
|
||||
|
||||
return Bound{Item{timestamp, hex.EncodeToString(id)}}, nil
|
||||
}
|
||||
|
||||
func (n *Negentropy) encodeTimestampOut(timestamp nostr.Timestamp) []byte {
|
||||
if timestamp == maxTimestamp {
|
||||
n.lastTimestampOut = maxTimestamp
|
||||
return encodeVarInt(0)
|
||||
}
|
||||
temp := timestamp
|
||||
timestamp -= n.lastTimestampOut
|
||||
n.lastTimestampOut = temp
|
||||
return encodeVarInt(int(timestamp + 1))
|
||||
}
|
||||
|
||||
func (n *Negentropy) encodeBound(bound Bound) []byte {
|
||||
var output []byte
|
||||
|
||||
t := n.encodeTimestampOut(bound.Timestamp)
|
||||
idlen := encodeVarInt(len(bound.ID) / 2)
|
||||
output = append(output, t...)
|
||||
output = append(output, idlen...)
|
||||
id, _ := hex.DecodeString(bound.Item.ID)
|
||||
|
||||
output = append(output, id...)
|
||||
return output
|
||||
}
|
||||
|
||||
func getMinimalBound(prev, curr Item) Bound {
|
||||
if curr.Timestamp != prev.Timestamp {
|
||||
return Bound{Item{curr.Timestamp, ""}}
|
||||
}
|
||||
|
||||
sharedPrefixBytes := 0
|
||||
|
||||
for i := 0; i < 32; i++ {
|
||||
if curr.ID[i:i+2] != prev.ID[i:i+2] {
|
||||
break
|
||||
}
|
||||
sharedPrefixBytes++
|
||||
}
|
||||
|
||||
// sharedPrefixBytes + 1 to include the first differing byte, or the entire ID if identical.
|
||||
return Bound{Item{curr.Timestamp, curr.ID[:sharedPrefixBytes*2+1]}}
|
||||
}
|
||||
|
||||
func decodeVarInt(reader *bytes.Reader) (int, error) {
|
||||
var res int = 0
|
||||
|
||||
for {
|
||||
b, err := reader.ReadByte()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
res = (res << 7) | (int(b) & 127)
|
||||
if (b & 128) == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func encodeVarInt(n int) []byte {
|
||||
if n == 0 {
|
||||
return []byte{0}
|
||||
}
|
||||
|
||||
var o []byte
|
||||
for n != 0 {
|
||||
o = append([]byte{byte(n & 0x7F)}, o...)
|
||||
n >>= 7
|
||||
}
|
||||
|
||||
for i := 0; i < len(o)-1; i++ {
|
||||
o[i] |= 0x80
|
||||
}
|
||||
|
||||
return o
|
||||
}
|
@ -6,6 +6,8 @@ import (
|
||||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
"slices"
|
||||
"unsafe"
|
||||
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
)
|
||||
@ -15,69 +17,74 @@ const (
|
||||
maxTimestamp = nostr.Timestamp(math.MaxInt64)
|
||||
)
|
||||
|
||||
var infiniteBound = Bound{Item: Item{Timestamp: maxTimestamp}}
|
||||
|
||||
type Negentropy struct {
|
||||
storage Storage
|
||||
frameSizeLimit uint64
|
||||
idSize int // in bytes
|
||||
IsInitiator bool
|
||||
sealed bool
|
||||
frameSizeLimit int
|
||||
isInitiator bool
|
||||
lastTimestampIn nostr.Timestamp
|
||||
lastTimestampOut nostr.Timestamp
|
||||
haveIds []string
|
||||
needIds []string
|
||||
}
|
||||
|
||||
func NewNegentropy(storage Storage, frameSizeLimit uint64, IDSize int) (*Negentropy, error) {
|
||||
if frameSizeLimit != 0 && frameSizeLimit < 4096 {
|
||||
return nil, fmt.Errorf("frameSizeLimit too small")
|
||||
}
|
||||
if IDSize > 32 {
|
||||
return nil, fmt.Errorf("id size cannot be more than 32, got %d", IDSize)
|
||||
}
|
||||
func NewNegentropy(storage Storage, frameSizeLimit int) (*Negentropy, error) {
|
||||
return &Negentropy{
|
||||
storage: storage,
|
||||
frameSizeLimit: frameSizeLimit,
|
||||
idSize: IDSize,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (n *Negentropy) Insert(evt *nostr.Event) {
|
||||
err := n.storage.Insert(evt.CreatedAt, evt.ID[0:n.idSize*2])
|
||||
err := n.storage.Insert(evt.CreatedAt, evt.ID)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Negentropy) Initiate() ([]byte, error) {
|
||||
if n.IsInitiator {
|
||||
return []byte{}, fmt.Errorf("already initiated")
|
||||
func (n *Negentropy) seal() {
|
||||
if !n.sealed {
|
||||
n.storage.Seal()
|
||||
}
|
||||
n.IsInitiator = true
|
||||
|
||||
output := bytes.NewBuffer(make([]byte, 0, 1+n.storage.Size()*n.idSize))
|
||||
output.WriteByte(protocolVersion)
|
||||
n.SplitRange(0, n.storage.Size(), Bound{Item: Item{Timestamp: maxTimestamp}}, output)
|
||||
|
||||
return output.Bytes(), nil
|
||||
n.sealed = true
|
||||
}
|
||||
|
||||
func (n *Negentropy) Reconcile(query []byte) (output []byte, haveIds []string, needIds []string, err error) {
|
||||
func (n *Negentropy) Initiate() []byte {
|
||||
n.seal()
|
||||
n.isInitiator = true
|
||||
|
||||
n.haveIds = make([]string, 0, n.storage.Size()/2)
|
||||
n.needIds = make([]string, 0, n.storage.Size()/2)
|
||||
|
||||
output := bytes.NewBuffer(make([]byte, 0, 1+n.storage.Size()*32))
|
||||
output.WriteByte(protocolVersion)
|
||||
n.SplitRange(0, 0, n.storage.Size(), infiniteBound, output)
|
||||
|
||||
return output.Bytes()
|
||||
}
|
||||
|
||||
func (n *Negentropy) Reconcile(step int, query []byte) (output []byte, haveIds []string, needIds []string, err error) {
|
||||
n.seal()
|
||||
reader := bytes.NewReader(query)
|
||||
|
||||
haveIds = make([]string, 0, 100)
|
||||
needIds = make([]string, 0, 100)
|
||||
|
||||
output, err = n.reconcileAux(reader, &haveIds, &needIds)
|
||||
output, err = n.reconcileAux(step, reader)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
if len(output) == 1 && n.IsInitiator {
|
||||
return nil, haveIds, needIds, nil
|
||||
if len(output) == 1 && n.isInitiator {
|
||||
slices.Sort(n.haveIds)
|
||||
slices.Sort(n.needIds)
|
||||
return nil, n.haveIds, n.needIds, nil
|
||||
}
|
||||
|
||||
return output, nil, nil, nil
|
||||
}
|
||||
|
||||
func (n *Negentropy) reconcileAux(reader *bytes.Reader, haveIds, needIds *[]string) ([]byte, error) {
|
||||
n.lastTimestampIn, n.lastTimestampOut = 0, 0 // Reset for each message
|
||||
func (n *Negentropy) reconcileAux(step int, reader *bytes.Reader) ([]byte, error) {
|
||||
n.lastTimestampIn, n.lastTimestampOut = 0, 0 // reset for each message
|
||||
|
||||
fullOutput := bytes.NewBuffer(make([]byte, 0, 5000))
|
||||
fullOutput.WriteByte(protocolVersion)
|
||||
@ -91,7 +98,7 @@ func (n *Negentropy) reconcileAux(reader *bytes.Reader, haveIds, needIds *[]stri
|
||||
return nil, fmt.Errorf("invalid protocol version byte")
|
||||
}
|
||||
if pv != protocolVersion {
|
||||
if n.IsInitiator {
|
||||
if n.isInitiator {
|
||||
return nil, fmt.Errorf("unsupported negentropy protocol version requested")
|
||||
}
|
||||
return fullOutput.Bytes(), nil
|
||||
@ -108,10 +115,8 @@ func (n *Negentropy) reconcileAux(reader *bytes.Reader, haveIds, needIds *[]stri
|
||||
doSkip := func() {
|
||||
if skip {
|
||||
skip = false
|
||||
encodedBound, err := n.encodeBound(prevBound) // Handle error appropriately
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
encodedBound := n.encodeBound(prevBound)
|
||||
fmt.Println(n.Name(), step, "~> skip", prevBound)
|
||||
partialOutput.Write(encodedBound)
|
||||
partialOutput.WriteByte(SkipMode)
|
||||
}
|
||||
@ -121,6 +126,7 @@ func (n *Negentropy) reconcileAux(reader *bytes.Reader, haveIds, needIds *[]stri
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
fmt.Println(n.Name(), step, "<~ read bound", currBound)
|
||||
modeVal, err := decodeVarInt(reader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -128,98 +134,121 @@ func (n *Negentropy) reconcileAux(reader *bytes.Reader, haveIds, needIds *[]stri
|
||||
mode := Mode(modeVal)
|
||||
|
||||
lower := prevIndex
|
||||
upper, err := n.storage.FindLowerBound(prevIndex, n.storage.Size(), currBound)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
upper := n.storage.FindLowerBound(prevIndex, n.storage.Size(), currBound)
|
||||
fmt.Println(n.Name(), step, "<~ [", lower, n.storage.GetBound(lower), "---", n.storage.GetBound(upper), upper, "]")
|
||||
|
||||
switch mode {
|
||||
case SkipMode:
|
||||
fmt.Println(n.Name(), step, "<~ skip")
|
||||
fmt.Println(n.Name(), step, "~> will be skipped")
|
||||
skip = true
|
||||
|
||||
case FingerprintMode:
|
||||
theirFingerprint := make([]byte, FingerprintSize)
|
||||
_, err := reader.Read(theirFingerprint)
|
||||
fmt.Println(n.Name(), step, "<~ fingerprint")
|
||||
|
||||
var theirFingerprint [FingerprintSize]byte
|
||||
_, err := reader.Read(theirFingerprint[:])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ourFingerprint, err := n.storage.Fingerprint(lower, upper)
|
||||
if err != nil {
|
||||
return nil, err // Handle the error appropriately
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !bytes.Equal(theirFingerprint, ourFingerprint.Buf[:]) {
|
||||
doSkip()
|
||||
n.SplitRange(lower, upper, currBound, partialOutput)
|
||||
} else {
|
||||
fmt.Println(n.Name(), step, "<~ ours", hex.EncodeToString(ourFingerprint[:]))
|
||||
fmt.Println(n.Name(), step, "<~ thrs", hex.EncodeToString(theirFingerprint[:]))
|
||||
|
||||
if theirFingerprint == ourFingerprint {
|
||||
skip = true
|
||||
} else {
|
||||
doSkip()
|
||||
n.SplitRange(step, lower, upper, currBound, partialOutput)
|
||||
}
|
||||
|
||||
case IdListMode:
|
||||
numIds64, err := decodeVarInt(reader)
|
||||
fmt.Print(n.Name(), " ", step, " <~ idlist")
|
||||
numIds, err := decodeVarInt(reader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
numIds := int(numIds64)
|
||||
fmt.Printf(" (%d)", numIds)
|
||||
|
||||
theirElems := make(map[string]struct{})
|
||||
idb := make([]byte, n.idSize)
|
||||
var idb [32]byte
|
||||
|
||||
firstid := "()"
|
||||
lastid := "()"
|
||||
for i := 0; i < numIds; i++ {
|
||||
_, err := reader.Read(idb)
|
||||
_, err := reader.Read(idb[:])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
theirElems[hex.EncodeToString(idb)] = struct{}{}
|
||||
// fmt.Println(n.Name(), step, "<~ id", hex.EncodeToString(idb))
|
||||
id := hex.EncodeToString(idb[:])
|
||||
if firstid == "()" {
|
||||
firstid = id
|
||||
}
|
||||
theirElems[id] = struct{}{}
|
||||
lastid = id
|
||||
}
|
||||
fmt.Println("", firstid, "---", lastid)
|
||||
|
||||
n.storage.Iterate(lower, upper, func(item Item, _ int) bool {
|
||||
k := item.ID
|
||||
if _, exists := theirElems[k]; !exists {
|
||||
if n.IsInitiator {
|
||||
*haveIds = append(*haveIds, k)
|
||||
id := item.ID
|
||||
if _, exists := theirElems[id]; !exists {
|
||||
if n.isInitiator {
|
||||
n.haveIds = append(n.haveIds, id)
|
||||
// fmt.Println(n.Name(), step, "<~ have", id)
|
||||
}
|
||||
} else {
|
||||
delete(theirElems, k)
|
||||
delete(theirElems, id)
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
if n.IsInitiator {
|
||||
if n.isInitiator {
|
||||
skip = true
|
||||
fmt.Println(n.Name(), step, "~> will be skipped")
|
||||
|
||||
for k := range theirElems {
|
||||
*needIds = append(*needIds, k)
|
||||
for id := range theirElems {
|
||||
n.needIds = append(n.needIds, id)
|
||||
// fmt.Println(n.Name(), step, "<~ need", id)
|
||||
}
|
||||
} else {
|
||||
doSkip()
|
||||
|
||||
responseIds := make([]byte, 0, n.idSize*n.storage.Size())
|
||||
responseIdsPtr := &responseIds
|
||||
numResponseIds := 0
|
||||
responseIds := make([]byte, 0, 32*n.storage.Size())
|
||||
endBound := currBound
|
||||
fmt.Print(n.Name(), " ", step, " ~> idlist")
|
||||
|
||||
firstid := "()"
|
||||
lastid := "()"
|
||||
n.storage.Iterate(lower, upper, func(item Item, index int) bool {
|
||||
if n.ExceededFrameSizeLimit(fullOutput.Len() + len(*responseIdsPtr)) {
|
||||
if firstid == "()" {
|
||||
firstid = item.ID
|
||||
}
|
||||
|
||||
if n.frameSizeLimit-200 < fullOutput.Len()+len(responseIds) {
|
||||
fmt.Println(" ###")
|
||||
endBound = Bound{item}
|
||||
upper = index
|
||||
return false
|
||||
}
|
||||
|
||||
lastid = item.ID
|
||||
id, _ := hex.DecodeString(item.ID)
|
||||
*responseIdsPtr = append(*responseIdsPtr, id...)
|
||||
numResponseIds++
|
||||
// fmt.Println(n.Name(), step, "~> id", item.ID)
|
||||
responseIds = append(responseIds, id...)
|
||||
return true
|
||||
})
|
||||
fmt.Println(endBound, firstid, "---", lastid)
|
||||
|
||||
encodedBound, err := n.encodeBound(endBound)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
panic(err)
|
||||
}
|
||||
encodedBound := n.encodeBound(endBound)
|
||||
|
||||
partialOutput.Write(encodedBound)
|
||||
partialOutput.WriteByte(IdListMode)
|
||||
partialOutput.Write(encodeVarInt(numResponseIds))
|
||||
partialOutput.Write(encodeVarInt(len(responseIds) / 32))
|
||||
partialOutput.Write(responseIds)
|
||||
|
||||
partialOutput.WriteTo(fullOutput)
|
||||
@ -230,26 +259,22 @@ func (n *Negentropy) reconcileAux(reader *bytes.Reader, haveIds, needIds *[]stri
|
||||
return nil, fmt.Errorf("unexpected mode %d", mode)
|
||||
}
|
||||
|
||||
// Check if the frame size limit is exceeded
|
||||
if n.ExceededFrameSizeLimit(fullOutput.Len() + partialOutput.Len()) {
|
||||
// Frame size limit exceeded, handle by encoding a boundary and fingerprint for the remaining range
|
||||
if n.frameSizeLimit-200 < fullOutput.Len()+partialOutput.Len() {
|
||||
fmt.Println(" #####")
|
||||
// frame size limit exceeded, handle by encoding a boundary and fingerprint for the remaining range
|
||||
remainingFingerprint, err := n.storage.Fingerprint(upper, n.storage.Size())
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
encodedBound, err := n.encodeBound(Bound{Item: Item{Timestamp: maxTimestamp}})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
fullOutput.Write(encodedBound)
|
||||
fullOutput.Write(n.encodeBound(infiniteBound))
|
||||
fullOutput.WriteByte(FingerprintMode)
|
||||
fullOutput.Write(remainingFingerprint.SV())
|
||||
fullOutput.Write(remainingFingerprint[:])
|
||||
fmt.Println(n.Name(), step, "~> last fingerprint", infiniteBound)
|
||||
|
||||
break // Stop processing further
|
||||
break // stop processing further
|
||||
} else {
|
||||
// Append the constructed output for this iteration
|
||||
// append the constructed output for this iteration
|
||||
partialOutput.WriteTo(fullOutput)
|
||||
}
|
||||
|
||||
@ -260,26 +285,34 @@ func (n *Negentropy) reconcileAux(reader *bytes.Reader, haveIds, needIds *[]stri
|
||||
return fullOutput.Bytes(), nil
|
||||
}
|
||||
|
||||
func (n *Negentropy) SplitRange(lower, upper int, upperBound Bound, output *bytes.Buffer) {
|
||||
func (n *Negentropy) SplitRange(step int, lower, upper int, upperBound Bound, output *bytes.Buffer) {
|
||||
numElems := upper - lower
|
||||
const buckets = 16
|
||||
|
||||
if numElems < buckets*2 {
|
||||
boundEncoded, err := n.encodeBound(upperBound)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
panic(err)
|
||||
}
|
||||
fmt.Println(n.Name(), step, "~> splitting range", lower, n.storage.GetBound(lower), "---", n.storage.GetBound(upper), upper)
|
||||
|
||||
if numElems < buckets*2 {
|
||||
// we just send the full ids here
|
||||
boundEncoded := n.encodeBound(upperBound)
|
||||
output.Write(boundEncoded)
|
||||
output.WriteByte(IdListMode)
|
||||
output.Write(encodeVarInt(numElems))
|
||||
|
||||
fmt.Print(n.Name(), " ", step, " ~> idlist ", upperBound)
|
||||
|
||||
firstid := "()"
|
||||
lastid := "()"
|
||||
n.storage.Iterate(lower, upper, func(item Item, _ int) bool {
|
||||
if firstid == "()" {
|
||||
firstid = item.ID
|
||||
}
|
||||
lastid = item.ID
|
||||
// fmt.Println(n.Name(), step, "~> ", item.ID)
|
||||
id, _ := hex.DecodeString(item.ID)
|
||||
output.Write(id)
|
||||
return true
|
||||
})
|
||||
fmt.Println("", firstid, "---", lastid)
|
||||
} else {
|
||||
itemsPerBucket := numElems / buckets
|
||||
bucketsWithExtra := numElems % buckets
|
||||
@ -313,110 +346,20 @@ func (n *Negentropy) SplitRange(lower, upper int, upperBound Bound, output *byte
|
||||
return true
|
||||
})
|
||||
|
||||
minBound := n.getMinimalBound(prevItem, currItem)
|
||||
minBound := getMinimalBound(prevItem, currItem)
|
||||
nextBound = minBound
|
||||
}
|
||||
|
||||
boundEncoded, err := n.encodeBound(nextBound)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
panic(err)
|
||||
}
|
||||
|
||||
fmt.Println(n.Name(), step, "~> bound and fingerprint", nextBound, hex.EncodeToString(ourFingerprint[:]))
|
||||
boundEncoded := n.encodeBound(nextBound)
|
||||
output.Write(boundEncoded)
|
||||
output.WriteByte(FingerprintMode)
|
||||
output.Write(ourFingerprint.SV())
|
||||
output.Write(ourFingerprint[:])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Negentropy) ExceededFrameSizeLimit(size int) bool {
|
||||
return n.frameSizeLimit != 0 && size > int(n.frameSizeLimit)-200
|
||||
}
|
||||
|
||||
// Decoding
|
||||
|
||||
func (n *Negentropy) DecodeTimestampIn(reader *bytes.Reader) (nostr.Timestamp, error) {
|
||||
t, err := decodeVarInt(reader)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
timestamp := nostr.Timestamp(t)
|
||||
if timestamp == 0 {
|
||||
timestamp = maxTimestamp
|
||||
} else {
|
||||
timestamp--
|
||||
}
|
||||
|
||||
timestamp += n.lastTimestampIn
|
||||
if timestamp < n.lastTimestampIn { // Check for overflow
|
||||
timestamp = maxTimestamp
|
||||
}
|
||||
n.lastTimestampIn = timestamp
|
||||
return timestamp, nil
|
||||
}
|
||||
|
||||
func (n *Negentropy) DecodeBound(reader *bytes.Reader) (Bound, error) {
|
||||
timestamp, err := n.DecodeTimestampIn(reader)
|
||||
if err != nil {
|
||||
return Bound{}, err
|
||||
}
|
||||
|
||||
length, err := decodeVarInt(reader)
|
||||
if err != nil {
|
||||
return Bound{}, err
|
||||
}
|
||||
|
||||
id := make([]byte, length)
|
||||
if _, err = reader.Read(id); err != nil {
|
||||
return Bound{}, err
|
||||
}
|
||||
|
||||
return Bound{Item{timestamp, hex.EncodeToString(id)}}, nil
|
||||
}
|
||||
|
||||
// Encoding
|
||||
|
||||
// encodeTimestampOut encodes the given timestamp.
|
||||
func (n *Negentropy) encodeTimestampOut(timestamp nostr.Timestamp) []byte {
|
||||
if timestamp == maxTimestamp {
|
||||
n.lastTimestampOut = maxTimestamp
|
||||
return encodeVarInt(0)
|
||||
}
|
||||
temp := timestamp
|
||||
timestamp -= n.lastTimestampOut
|
||||
n.lastTimestampOut = temp
|
||||
return encodeVarInt(int(timestamp + 1))
|
||||
}
|
||||
|
||||
func (n *Negentropy) encodeBound(bound Bound) ([]byte, error) {
|
||||
var output []byte
|
||||
|
||||
t := n.encodeTimestampOut(bound.Timestamp)
|
||||
idlen := encodeVarInt(len(bound.ID) / 2)
|
||||
output = append(output, t...)
|
||||
output = append(output, idlen...)
|
||||
id := bound.Item.ID
|
||||
|
||||
output = append(output, id...)
|
||||
return output, nil
|
||||
}
|
||||
|
||||
func (n *Negentropy) getMinimalBound(prev, curr Item) Bound {
|
||||
if curr.Timestamp != prev.Timestamp {
|
||||
return Bound{Item{curr.Timestamp, ""}}
|
||||
}
|
||||
|
||||
sharedPrefixBytes := 0
|
||||
|
||||
for i := 0; i < n.idSize; i++ {
|
||||
if curr.ID[i:i+2] != prev.ID[i:i+2] {
|
||||
break
|
||||
}
|
||||
sharedPrefixBytes++
|
||||
}
|
||||
|
||||
// sharedPrefixBytes + 1 to include the first differing byte, or the entire ID if identical.
|
||||
return Bound{Item{curr.Timestamp, curr.ID[:sharedPrefixBytes*2+1]}}
|
||||
func (n *Negentropy) Name() string {
|
||||
p := unsafe.Pointer(n)
|
||||
return fmt.Sprintf("%d", uintptr(p)&127)
|
||||
}
|
||||
|
@ -4,6 +4,8 @@ import (
|
||||
"crypto/sha256"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
)
|
||||
@ -20,12 +22,12 @@ const (
|
||||
|
||||
type Storage interface {
|
||||
Insert(nostr.Timestamp, string) error
|
||||
Seal() error
|
||||
|
||||
Seal()
|
||||
Size() int
|
||||
Iterate(begin, end int, cb func(item Item, i int) bool) error
|
||||
FindLowerBound(begin, end int, value Bound) (int, error)
|
||||
Fingerprint(begin, end int) (Fingerprint, error)
|
||||
FindLowerBound(begin, end int, value Bound) int
|
||||
GetBound(idx int) Bound
|
||||
Fingerprint(begin, end int) ([FingerprintSize]byte, error)
|
||||
}
|
||||
|
||||
type Item struct {
|
||||
@ -33,21 +35,22 @@ type Item struct {
|
||||
ID string
|
||||
}
|
||||
|
||||
func (i Item) LessThan(other Item) bool {
|
||||
if i.Timestamp != other.Timestamp {
|
||||
return i.Timestamp < other.Timestamp
|
||||
func itemCompare(a, b Item) int {
|
||||
if a.Timestamp != b.Timestamp {
|
||||
return int(a.Timestamp - b.Timestamp)
|
||||
}
|
||||
return i.ID < other.ID
|
||||
return strings.Compare(a.ID, b.ID)
|
||||
}
|
||||
|
||||
func (i Item) String() string { return fmt.Sprintf("Item<%d:%s>", i.Timestamp, i.ID) }
|
||||
|
||||
type Bound struct{ Item }
|
||||
|
||||
type Fingerprint struct {
|
||||
Buf [FingerprintSize]byte
|
||||
}
|
||||
|
||||
func (f *Fingerprint) SV() []byte {
|
||||
return f.Buf[:]
|
||||
func (b Bound) String() string {
|
||||
if b.Timestamp == infiniteBound.Timestamp {
|
||||
return "Bound<infinite>"
|
||||
}
|
||||
return fmt.Sprintf("Bound<%d:%s>", b.Timestamp, b.ID)
|
||||
}
|
||||
|
||||
type Accumulator struct {
|
||||
@ -107,13 +110,13 @@ func (acc *Accumulator) SV() []byte {
|
||||
return acc.Buf[:]
|
||||
}
|
||||
|
||||
func (acc *Accumulator) GetFingerprint(n int) Fingerprint {
|
||||
func (acc *Accumulator) GetFingerprint(n int) [FingerprintSize]byte {
|
||||
input := acc.SV()
|
||||
input = append(input, encodeVarInt(n)...)
|
||||
|
||||
hash := sha256.Sum256(input)
|
||||
|
||||
var fingerprint Fingerprint
|
||||
copy(fingerprint.Buf[:], hash[:FingerprintSize])
|
||||
var fingerprint [FingerprintSize]byte
|
||||
copy(fingerprint[:], hash[:FingerprintSize])
|
||||
return fingerprint
|
||||
}
|
||||
|
@ -1,39 +0,0 @@
|
||||
package negentropy
|
||||
|
||||
import "bytes"
|
||||
|
||||
func decodeVarInt(reader *bytes.Reader) (int, error) {
|
||||
var res int = 0
|
||||
|
||||
for {
|
||||
b, err := reader.ReadByte()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
res = (res << 7) | (int(b) & 127)
|
||||
if (b & 128) == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func encodeVarInt(n int) []byte {
|
||||
if n == 0 {
|
||||
return []byte{0}
|
||||
}
|
||||
|
||||
var o []byte
|
||||
for n != 0 {
|
||||
o = append([]byte{byte(n & 0x7F)}, o...)
|
||||
n >>= 7
|
||||
}
|
||||
|
||||
for i := 0; i < len(o)-1; i++ {
|
||||
o[i] |= 0x80
|
||||
}
|
||||
|
||||
return o
|
||||
}
|
@ -1,29 +1,26 @@
|
||||
package negentropy
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"slices"
|
||||
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
)
|
||||
|
||||
type Vector struct {
|
||||
items []Item
|
||||
idSize int
|
||||
sealed bool
|
||||
}
|
||||
|
||||
func NewVector(idSize int) *Vector {
|
||||
func NewVector() *Vector {
|
||||
return &Vector{
|
||||
items: make([]Item, 0, 30),
|
||||
idSize: idSize,
|
||||
items: make([]Item, 0, 30),
|
||||
}
|
||||
}
|
||||
|
||||
func (v *Vector) Insert(createdAt nostr.Timestamp, id string) error {
|
||||
// fmt.Fprintln(os.Stderr, "Insert", createdAt, id)
|
||||
if len(id)/2 != v.idSize {
|
||||
return fmt.Errorf("bad id size for added item: expected %d, got %d", v.idSize, len(id)/2)
|
||||
if len(id)/2 != 32 {
|
||||
return fmt.Errorf("bad id size for added item: expected %d, got %d", 32, len(id)/2)
|
||||
}
|
||||
|
||||
item := Item{createdAt, id}
|
||||
@ -31,20 +28,22 @@ func (v *Vector) Insert(createdAt nostr.Timestamp, id string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *Vector) Seal() error {
|
||||
sort.Slice(v.items, func(i, j int) bool {
|
||||
return v.items[i].LessThan(v.items[j])
|
||||
})
|
||||
func (v *Vector) Size() int { return len(v.items) }
|
||||
|
||||
for i := 1; i < len(v.items); i++ {
|
||||
if v.items[i-1].ID == v.items[i].ID {
|
||||
return errors.New("duplicate item inserted")
|
||||
}
|
||||
func (v *Vector) Seal() {
|
||||
if v.sealed {
|
||||
panic("trying to seal an already sealed vector")
|
||||
}
|
||||
return nil
|
||||
v.sealed = true
|
||||
slices.SortFunc(v.items, itemCompare)
|
||||
}
|
||||
|
||||
func (v *Vector) Size() int { return len(v.items) }
|
||||
func (v *Vector) GetBound(idx int) Bound {
|
||||
if idx < len(v.items) {
|
||||
return Bound{v.items[idx]}
|
||||
}
|
||||
return infiniteBound
|
||||
}
|
||||
|
||||
func (v *Vector) Iterate(begin, end int, cb func(Item, int) bool) error {
|
||||
for i := begin; i < end; i++ {
|
||||
@ -55,14 +54,12 @@ func (v *Vector) Iterate(begin, end int, cb func(Item, int) bool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *Vector) FindLowerBound(begin, end int, bound Bound) (int, error) {
|
||||
i := sort.Search(len(v.items[begin:end]), func(i int) bool {
|
||||
return !v.items[begin+i].LessThan(bound.Item)
|
||||
})
|
||||
return begin + i, nil
|
||||
func (v *Vector) FindLowerBound(begin, end int, bound Bound) int {
|
||||
idx, _ := slices.BinarySearchFunc(v.items[begin:end], bound.Item, itemCompare)
|
||||
return begin + idx
|
||||
}
|
||||
|
||||
func (v *Vector) Fingerprint(begin, end int) (Fingerprint, error) {
|
||||
func (v *Vector) Fingerprint(begin, end int) ([FingerprintSize]byte, error) {
|
||||
var out Accumulator
|
||||
out.SetToZero()
|
||||
|
||||
@ -70,7 +67,7 @@ func (v *Vector) Fingerprint(begin, end int) (Fingerprint, error) {
|
||||
out.Add(item.ID)
|
||||
return true
|
||||
}); err != nil {
|
||||
return Fingerprint{}, err
|
||||
return [FingerprintSize]byte{}, err
|
||||
}
|
||||
|
||||
return out.GetFingerprint(end - begin), nil
|
||||
|
@ -1,7 +1,9 @@
|
||||
package negentropy
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/nbd-wtf/go-nostr"
|
||||
@ -40,6 +42,16 @@ func TestBigNumbers(t *testing.T) {
|
||||
)
|
||||
}
|
||||
|
||||
func TestMuchBiggerNumbersAndConfusion(t *testing.T) {
|
||||
runTestWith(t,
|
||||
20000,
|
||||
[][]int{{20, 150}, {1700, 3400}, {7000, 8100}, {13800, 13816}, {13817, 14950}, {19800, 20000}}, // n1
|
||||
[][]int{{0, 2000}, {3000, 3600}, {10000, 12200}, {13799, 13801}, {14800, 19900}}, // n2
|
||||
[][]int{{0, 20}, {150, 1700}, {3400, 3600}, {10000, 12200}, {13799, 13800}, {14950, 19800}}, // n1 need
|
||||
[][]int{{2000, 3000}, {7000, 8100}, {13801, 13816}, {13817, 14800}, {19900, 20000}}, // n1 have
|
||||
)
|
||||
}
|
||||
|
||||
func runTestWith(t *testing.T,
|
||||
totalEvents int,
|
||||
n1Ranges [][]int, n2Ranges [][]int,
|
||||
@ -53,62 +65,61 @@ func runTestWith(t *testing.T,
|
||||
events := make([]*nostr.Event, totalEvents)
|
||||
for i := range events {
|
||||
evt := nostr.Event{}
|
||||
evt.Content = fmt.Sprintf("event %d", i+1)
|
||||
evt.Content = fmt.Sprintf("event %d", i)
|
||||
evt.Kind = 1
|
||||
evt.CreatedAt = nostr.Timestamp(i)
|
||||
evt.ID = evt.GetID()
|
||||
evt.ID = fmt.Sprintf("%064d", i)
|
||||
events[i] = &evt
|
||||
fmt.Println("evt", i, evt.ID)
|
||||
}
|
||||
|
||||
{
|
||||
n1, _ = NewNegentropy(NewVector(32), 1<<16, 32)
|
||||
n1, _ = NewNegentropy(NewVector(), 1<<16)
|
||||
for _, r := range n1Ranges {
|
||||
for i := r[0]; i < r[1]; i++ {
|
||||
n1.Insert(events[i])
|
||||
}
|
||||
}
|
||||
|
||||
q, err = n1.Initiate()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
return
|
||||
}
|
||||
q = n1.Initiate()
|
||||
|
||||
fmt.Println("[n1]:", len(q), q)
|
||||
fmt.Println("[n1]:", len(q)) //, hexedBytes(q))
|
||||
fmt.Println("")
|
||||
}
|
||||
|
||||
{
|
||||
n2, _ = NewNegentropy(NewVector(32), 1<<16, 32)
|
||||
n2, _ = NewNegentropy(NewVector(), 1<<16)
|
||||
for _, r := range n2Ranges {
|
||||
for i := r[0]; i < r[1]; i++ {
|
||||
n2.Insert(events[i])
|
||||
}
|
||||
}
|
||||
|
||||
q, _, _, err = n2.Reconcile(q)
|
||||
q, _, _, err = n2.Reconcile(1, q)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
return
|
||||
}
|
||||
fmt.Println("[n2]:", len(q), q)
|
||||
fmt.Println("[n2]:", len(q)) // , hexedBytes(q))
|
||||
fmt.Println("")
|
||||
}
|
||||
|
||||
invert := map[*Negentropy]*Negentropy{
|
||||
n1: n2,
|
||||
n2: n1,
|
||||
}
|
||||
|
||||
i := 1
|
||||
for n := n1; q != nil; n = invert[n] {
|
||||
i++
|
||||
var have []string
|
||||
var need []string
|
||||
|
||||
q, have, need, err = n.Reconcile(q)
|
||||
q, have, need, err = n.Reconcile(i, q)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
return
|
||||
}
|
||||
fmt.Println("[n-]:", len(q), q)
|
||||
fmt.Println("[n-]:", len(q)) //, hexedBytes(q))
|
||||
fmt.Println("")
|
||||
|
||||
if q == nil {
|
||||
fmt.Println("")
|
||||
@ -137,3 +148,17 @@ func runTestWith(t *testing.T,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func hexedBytes(o []byte) string {
|
||||
s := strings.Builder{}
|
||||
s.Grow(2 + 1 + len(o)*5)
|
||||
s.WriteString("[ ")
|
||||
for _, b := range o {
|
||||
x := hex.EncodeToString([]byte{b})
|
||||
s.WriteString("0x")
|
||||
s.WriteString(x)
|
||||
s.WriteString(" ")
|
||||
}
|
||||
s.WriteString("]")
|
||||
return s.String()
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user