stop expecting each have/need to be only emitted once, adjust api.

This commit is contained in:
fiatjaf
2024-09-11 23:15:35 -03:00
parent 9584a84293
commit da8443b26d
2 changed files with 72 additions and 42 deletions

View File

@@ -6,7 +6,6 @@ import (
"fmt" "fmt"
"math" "math"
"os" "os"
"slices"
"unsafe" "unsafe"
"github.com/nbd-wtf/go-nostr" "github.com/nbd-wtf/go-nostr"
@@ -26,15 +25,16 @@ type Negentropy struct {
isInitiator bool isInitiator bool
lastTimestampIn nostr.Timestamp lastTimestampIn nostr.Timestamp
lastTimestampOut nostr.Timestamp lastTimestampOut nostr.Timestamp
haveIds []string
needIds []string Haves chan string
HaveNots chan string
} }
func NewNegentropy(storage Storage, frameSizeLimit int) (*Negentropy, error) { func NewNegentropy(storage Storage, frameSizeLimit int) *Negentropy {
return &Negentropy{ return &Negentropy{
storage: storage, storage: storage,
frameSizeLimit: frameSizeLimit, frameSizeLimit: frameSizeLimit,
}, nil }
} }
func (n *Negentropy) Insert(evt *nostr.Event) { func (n *Negentropy) Insert(evt *nostr.Event) {
@@ -55,8 +55,8 @@ func (n *Negentropy) Initiate() []byte {
n.seal() n.seal()
n.isInitiator = true n.isInitiator = true
n.haveIds = make([]string, 0, n.storage.Size()/2) n.Haves = make(chan string, n.storage.Size()/2)
n.needIds = make([]string, 0, n.storage.Size()/2) n.HaveNots = make(chan string, n.storage.Size()/2)
output := bytes.NewBuffer(make([]byte, 0, 1+n.storage.Size()*32)) output := bytes.NewBuffer(make([]byte, 0, 1+n.storage.Size()*32))
output.WriteByte(protocolVersion) output.WriteByte(protocolVersion)
@@ -65,22 +65,22 @@ func (n *Negentropy) Initiate() []byte {
return output.Bytes() return output.Bytes()
} }
func (n *Negentropy) Reconcile(step int, query []byte) (output []byte, haveIds []string, needIds []string, err error) { func (n *Negentropy) Reconcile(step int, query []byte) (output []byte, err error) {
n.seal() n.seal()
reader := bytes.NewReader(query) reader := bytes.NewReader(query)
output, err = n.reconcileAux(step, reader) output, err = n.reconcileAux(step, reader)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, err
} }
if len(output) == 1 && n.isInitiator { if len(output) == 1 && n.isInitiator {
slices.Sort(n.haveIds) close(n.Haves)
slices.Sort(n.needIds) close(n.HaveNots)
return nil, n.haveIds, n.needIds, nil return nil, nil
} }
return output, nil, nil, nil return output, nil
} }
func (n *Negentropy) reconcileAux(step int, reader *bytes.Reader) ([]byte, error) { func (n *Negentropy) reconcileAux(step int, reader *bytes.Reader) ([]byte, error) {
@@ -178,7 +178,7 @@ func (n *Negentropy) reconcileAux(step int, reader *bytes.Reader) ([]byte, error
id := item.ID id := item.ID
if _, exists := theirElems[id]; !exists { if _, exists := theirElems[id]; !exists {
if n.isInitiator { if n.isInitiator {
n.haveIds = append(n.haveIds, id) n.Haves <- id
} }
} else { } else {
delete(theirElems, id) delete(theirElems, id)
@@ -189,7 +189,7 @@ func (n *Negentropy) reconcileAux(step int, reader *bytes.Reader) ([]byte, error
if n.isInitiator { if n.isInitiator {
skip = true skip = true
for id := range theirElems { for id := range theirElems {
n.needIds = append(n.needIds, id) n.HaveNots <- id
} }
} else { } else {
doSkip() doSkip()

View File

@@ -3,7 +3,9 @@ package negentropy
import ( import (
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"slices"
"strings" "strings"
"sync"
"testing" "testing"
"github.com/nbd-wtf/go-nostr" "github.com/nbd-wtf/go-nostr"
@@ -73,7 +75,7 @@ func runTestWith(t *testing.T,
} }
{ {
n1, _ = NewNegentropy(NewVector(), 1<<16) n1 = NewNegentropy(NewVector(), 1<<16)
for _, r := range n1Ranges { for _, r := range n1Ranges {
for i := r[0]; i < r[1]; i++ { for i := r[0]; i < r[1]; i++ {
n1.Insert(events[i]) n1.Insert(events[i])
@@ -84,14 +86,14 @@ func runTestWith(t *testing.T,
} }
{ {
n2, _ = NewNegentropy(NewVector(), 1<<16) n2 = NewNegentropy(NewVector(), 1<<16)
for _, r := range n2Ranges { for _, r := range n2Ranges {
for i := r[0]; i < r[1]; i++ { for i := r[0]; i < r[1]; i++ {
n2.Insert(events[i]) n2.Insert(events[i])
} }
} }
q, _, _, err = n2.Reconcile(1, q) q, err = n2.Reconcile(1, q)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
return return
@@ -103,36 +105,64 @@ func runTestWith(t *testing.T,
n2: n1, n2: n1,
} }
i := 1 i := 1
for n := n1; q != nil; n = invert[n] {
i++
var have []string
var need []string
q, have, need, err = n.Reconcile(i, q) wg := sync.WaitGroup{}
if err != nil { wg.Add(3)
t.Fatal(err)
return
}
if q == nil { go func() {
expectedNeed := make([]string, 0, 100) wg.Done()
for _, r := range expectedN1NeedRanges { for n := n1; q != nil; n = invert[n] {
for i := r[0]; i < r[1]; i++ { i++
expectedNeed = append(expectedNeed, events[i].ID)
} q, err = n.Reconcile(i, q)
if err != nil {
t.Fatal(err)
return
} }
expectedHave := make([]string, 0, 100) if q == nil {
for _, r := range expectedN1HaveRanges { return
for i := r[0]; i < r[1]; i++ {
expectedHave = append(expectedHave, events[i].ID)
}
} }
require.ElementsMatch(t, expectedNeed, need, "wrong need")
require.ElementsMatch(t, expectedHave, have, "wrong have")
} }
} }()
go func() {
defer wg.Done()
expectedHave := make([]string, 0, 100)
for _, r := range expectedN1HaveRanges {
for i := r[0]; i < r[1]; i++ {
expectedHave = append(expectedHave, events[i].ID)
}
}
haves := make([]string, 0, 100)
for item := range n1.Haves {
if slices.Contains(haves, item) {
continue
}
haves = append(haves, item)
}
require.ElementsMatch(t, expectedHave, haves, "wrong have")
}()
go func() {
defer wg.Done()
expectedNeed := make([]string, 0, 100)
for _, r := range expectedN1NeedRanges {
for i := r[0]; i < r[1]; i++ {
expectedNeed = append(expectedNeed, events[i].ID)
}
}
havenots := make([]string, 0, 100)
for item := range n1.HaveNots {
if slices.Contains(havenots, item) {
continue
}
havenots = append(havenots, item)
}
require.ElementsMatch(t, expectedNeed, havenots, "wrong need")
}()
wg.Wait()
} }
func hexedBytes(o []byte) string { func hexedBytes(o []byte) string {