diff --git a/negentropy/negentropy.go b/negentropy/negentropy.go index adf8b75..0a66f4f 100644 --- a/negentropy/negentropy.go +++ b/negentropy/negentropy.go @@ -6,7 +6,6 @@ import ( "fmt" "math" "os" - "slices" "unsafe" "github.com/nbd-wtf/go-nostr" @@ -26,15 +25,16 @@ type Negentropy struct { isInitiator bool lastTimestampIn nostr.Timestamp lastTimestampOut nostr.Timestamp - haveIds []string - needIds []string + + Haves chan string + HaveNots chan string } -func NewNegentropy(storage Storage, frameSizeLimit int) (*Negentropy, error) { +func NewNegentropy(storage Storage, frameSizeLimit int) *Negentropy { return &Negentropy{ storage: storage, frameSizeLimit: frameSizeLimit, - }, nil + } } func (n *Negentropy) Insert(evt *nostr.Event) { @@ -55,8 +55,8 @@ func (n *Negentropy) Initiate() []byte { n.seal() n.isInitiator = true - n.haveIds = make([]string, 0, n.storage.Size()/2) - n.needIds = make([]string, 0, n.storage.Size()/2) + n.Haves = make(chan string, n.storage.Size()/2) + n.HaveNots = make(chan string, n.storage.Size()/2) output := bytes.NewBuffer(make([]byte, 0, 1+n.storage.Size()*32)) output.WriteByte(protocolVersion) @@ -65,22 +65,22 @@ func (n *Negentropy) Initiate() []byte { return output.Bytes() } -func (n *Negentropy) Reconcile(step int, query []byte) (output []byte, haveIds []string, needIds []string, err error) { +func (n *Negentropy) Reconcile(step int, query []byte) (output []byte, err error) { n.seal() reader := bytes.NewReader(query) output, err = n.reconcileAux(step, reader) if err != nil { - return nil, nil, nil, err + return nil, err } if len(output) == 1 && n.isInitiator { - slices.Sort(n.haveIds) - slices.Sort(n.needIds) - return nil, n.haveIds, n.needIds, nil + close(n.Haves) + close(n.HaveNots) + return nil, nil } - return output, nil, nil, nil + return output, nil } func (n *Negentropy) reconcileAux(step int, reader *bytes.Reader) ([]byte, error) { @@ -178,7 +178,7 @@ func (n *Negentropy) reconcileAux(step int, reader *bytes.Reader) ([]byte, error id := item.ID if _, exists := theirElems[id]; !exists { if n.isInitiator { - n.haveIds = append(n.haveIds, id) + n.Haves <- id } } else { delete(theirElems, id) @@ -189,7 +189,7 @@ func (n *Negentropy) reconcileAux(step int, reader *bytes.Reader) ([]byte, error if n.isInitiator { skip = true for id := range theirElems { - n.needIds = append(n.needIds, id) + n.HaveNots <- id } } else { doSkip() diff --git a/negentropy/whatever_test.go b/negentropy/whatever_test.go index 3eaddda..4f4bb89 100644 --- a/negentropy/whatever_test.go +++ b/negentropy/whatever_test.go @@ -3,7 +3,9 @@ package negentropy import ( "encoding/hex" "fmt" + "slices" "strings" + "sync" "testing" "github.com/nbd-wtf/go-nostr" @@ -73,7 +75,7 @@ func runTestWith(t *testing.T, } { - n1, _ = NewNegentropy(NewVector(), 1<<16) + n1 = NewNegentropy(NewVector(), 1<<16) for _, r := range n1Ranges { for i := r[0]; i < r[1]; i++ { n1.Insert(events[i]) @@ -84,14 +86,14 @@ func runTestWith(t *testing.T, } { - n2, _ = NewNegentropy(NewVector(), 1<<16) + n2 = NewNegentropy(NewVector(), 1<<16) for _, r := range n2Ranges { for i := r[0]; i < r[1]; i++ { n2.Insert(events[i]) } } - q, _, _, err = n2.Reconcile(1, q) + q, err = n2.Reconcile(1, q) if err != nil { t.Fatal(err) return @@ -103,36 +105,64 @@ func runTestWith(t *testing.T, n2: n1, } i := 1 - for n := n1; q != nil; n = invert[n] { - i++ - var have []string - var need []string - q, have, need, err = n.Reconcile(i, q) - if err != nil { - t.Fatal(err) - return - } + wg := sync.WaitGroup{} + wg.Add(3) - if q == nil { - expectedNeed := make([]string, 0, 100) - for _, r := range expectedN1NeedRanges { - for i := r[0]; i < r[1]; i++ { - expectedNeed = append(expectedNeed, events[i].ID) - } + go func() { + wg.Done() + for n := n1; q != nil; n = invert[n] { + i++ + + q, err = n.Reconcile(i, q) + if err != nil { + t.Fatal(err) + return } - expectedHave := make([]string, 0, 100) - for _, r := range expectedN1HaveRanges { - for i := r[0]; i < r[1]; i++ { - expectedHave = append(expectedHave, events[i].ID) - } + if q == nil { + return } - - require.ElementsMatch(t, expectedNeed, need, "wrong need") - require.ElementsMatch(t, expectedHave, have, "wrong have") } - } + }() + + go func() { + defer wg.Done() + expectedHave := make([]string, 0, 100) + for _, r := range expectedN1HaveRanges { + for i := r[0]; i < r[1]; i++ { + expectedHave = append(expectedHave, events[i].ID) + } + } + haves := make([]string, 0, 100) + for item := range n1.Haves { + if slices.Contains(haves, item) { + continue + } + haves = append(haves, item) + } + require.ElementsMatch(t, expectedHave, haves, "wrong have") + }() + + go func() { + defer wg.Done() + expectedNeed := make([]string, 0, 100) + for _, r := range expectedN1NeedRanges { + for i := r[0]; i < r[1]; i++ { + expectedNeed = append(expectedNeed, events[i].ID) + } + } + havenots := make([]string, 0, 100) + for item := range n1.HaveNots { + if slices.Contains(havenots, item) { + continue + } + havenots = append(havenots, item) + } + require.ElementsMatch(t, expectedNeed, havenots, "wrong need") + }() + + wg.Wait() } func hexedBytes(o []byte) string {