From 6910f391feafe5c6f404f7a6c4639bcf8e583f35 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Wed, 18 Sep 2024 15:47:08 -0300 Subject: [PATCH] negentropy: small refactors here and there, comments and making the code clearer. --- nip77/negentropy/encoding.go | 87 ++++++++++++++++--------------- nip77/negentropy/negentropy.go | 20 +++---- nip77/negentropy/whatever_test.go | 40 ++++++-------- 3 files changed, 69 insertions(+), 78 deletions(-) diff --git a/nip77/negentropy/encoding.go b/nip77/negentropy/encoding.go index 317b1e1..2029b2f 100644 --- a/nip77/negentropy/encoding.go +++ b/nip77/negentropy/encoding.go @@ -6,34 +6,38 @@ import ( "github.com/nbd-wtf/go-nostr" ) -func (n *Negentropy) DecodeTimestampIn(reader *StringHexReader) (nostr.Timestamp, error) { - t, err := decodeVarInt(reader) +func (n *Negentropy) readTimestamp(reader *StringHexReader) (nostr.Timestamp, error) { + delta, err := readVarInt(reader) if err != nil { return 0, err } - timestamp := nostr.Timestamp(t) - if timestamp == 0 { - timestamp = maxTimestamp - } else { - timestamp-- + if delta == 0 { + // zeroes are infinite + timestamp := maxTimestamp + n.lastTimestampIn = timestamp + return timestamp, nil } - timestamp += n.lastTimestampIn - if timestamp < n.lastTimestampIn { // Check for overflow - timestamp = maxTimestamp - } + // remove 1 as we always add 1 when encoding + delta-- + + // we add the previously cached timestamp to get the current + timestamp := n.lastTimestampIn + nostr.Timestamp(delta) + + // cache this so we can apply it to the delta next time n.lastTimestampIn = timestamp + return timestamp, nil } -func (n *Negentropy) DecodeBound(reader *StringHexReader) (Bound, error) { - timestamp, err := n.DecodeTimestampIn(reader) +func (n *Negentropy) readBound(reader *StringHexReader) (Bound, error) { + timestamp, err := n.readTimestamp(reader) if err != nil { return Bound{}, fmt.Errorf("failed to decode bound timestamp: %w", err) } - length, err := decodeVarInt(reader) + length, err := readVarInt(reader) if err != nil { return Bound{}, fmt.Errorf("failed to decode bound length: %w", err) } @@ -46,22 +50,28 @@ func (n *Negentropy) DecodeBound(reader *StringHexReader) (Bound, error) { return Bound{Item{timestamp, id}}, nil } -func (n *Negentropy) encodeTimestampOut(w *StringHexWriter, timestamp nostr.Timestamp) { +func (n *Negentropy) writeTimestamp(w *StringHexWriter, timestamp nostr.Timestamp) { if timestamp == maxTimestamp { - n.lastTimestampOut = maxTimestamp - encodeVarIntToHex(w, 0) + // zeroes are infinite + n.lastTimestampOut = maxTimestamp // cache this (see below) + writeVarInt(w, 0) return } - temp := timestamp - timestamp -= n.lastTimestampOut - n.lastTimestampOut = temp - encodeVarIntToHex(w, int(timestamp+1)) + + // we will only encode the difference between this timestamp and the previous + delta := timestamp - n.lastTimestampOut + + // we cache this here as the next timestamp we encode will be just a delta from this + n.lastTimestampOut = timestamp + + // add 1 to prevent zeroes from being read as infinites + writeVarInt(w, int(delta+1)) return } -func (n *Negentropy) encodeBound(w *StringHexWriter, bound Bound) { - n.encodeTimestampOut(w, bound.Timestamp) - encodeVarIntToHex(w, len(bound.ID)/2) +func (n *Negentropy) writeBound(w *StringHexWriter, bound Bound) { + n.writeTimestamp(w, bound.Timestamp) + writeVarInt(w, len(bound.ID)/2) w.WriteHex(bound.Item.ID) } @@ -83,7 +93,7 @@ func getMinimalBound(prev, curr Item) Bound { return Bound{Item{curr.Timestamp, curr.ID[:(sharedPrefixBytes+1)*2]}} } -func decodeVarInt(reader *StringHexReader) (int, error) { +func readVarInt(reader *StringHexReader) (int, error) { var res int = 0 for { @@ -101,6 +111,15 @@ func decodeVarInt(reader *StringHexReader) (int, error) { return res, nil } +func writeVarInt(w *StringHexWriter, n int) { + if n == 0 { + w.WriteByte(0) + return + } + + w.WriteBytes(encodeVarInt(n)) +} + func encodeVarInt(n int) []byte { if n == 0 { return []byte{0} @@ -118,21 +137,3 @@ func encodeVarInt(n int) []byte { return o } - -func encodeVarIntToHex(w *StringHexWriter, n int) { - if n == 0 { - w.WriteByte(0) - } - - var o []byte - for n != 0 { - o = append([]byte{byte(n & 0x7F)}, o...) - n >>= 7 - } - - for i := 0; i < len(o)-1; i++ { - o[i] |= 0x80 - } - - w.WriteBytes(o) -} diff --git a/nip77/negentropy/negentropy.go b/nip77/negentropy/negentropy.go index 3ce618f..2a35628 100644 --- a/nip77/negentropy/negentropy.go +++ b/nip77/negentropy/negentropy.go @@ -125,16 +125,16 @@ func (n *Negentropy) reconcileAux(reader *StringHexReader) (string, error) { // end skip range, if necessary, so we can start a new bound that isn't a skip if skipping { skipping = false - n.encodeBound(partialOutput, prevBound) + n.writeBound(partialOutput, prevBound) partialOutput.WriteByte(byte(SkipMode)) } } - currBound, err := n.DecodeBound(reader) + currBound, err := n.readBound(reader) if err != nil { return "", fmt.Errorf("failed to decode bound: %w", err) } - modeVal, err := decodeVarInt(reader) + modeVal, err := readVarInt(reader) if err != nil { return "", fmt.Errorf("failed to decode mode: %w", err) } @@ -162,7 +162,7 @@ func (n *Negentropy) reconcileAux(reader *StringHexReader) (string, error) { } case IdListMode: - numIds, err := decodeVarInt(reader) + numIds, err := readVarInt(reader) if err != nil { return "", fmt.Errorf("failed to decode number of ids: %w", err) } @@ -222,9 +222,9 @@ func (n *Negentropy) reconcileAux(reader *StringHexReader) (string, error) { responses++ } - n.encodeBound(partialOutput, endBound) + n.writeBound(partialOutput, endBound) partialOutput.WriteByte(byte(IdListMode)) - encodeVarIntToHex(partialOutput, responses) + writeVarInt(partialOutput, responses) partialOutput.WriteHex(responseIds.String()) fullOutput.WriteHex(partialOutput.Hex()) @@ -238,7 +238,7 @@ func (n *Negentropy) reconcileAux(reader *StringHexReader) (string, error) { if n.frameSizeLimit-200 < fullOutput.Len()+partialOutput.Len() { // frame size limit exceeded, handle by encoding a boundary and fingerprint for the remaining range remainingFingerprint := n.storage.Fingerprint(upper, n.storage.Size()) - n.encodeBound(fullOutput, infiniteBound) + n.writeBound(fullOutput, infiniteBound) fullOutput.WriteByte(byte(FingerprintMode)) fullOutput.WriteBytes(remainingFingerprint[:]) @@ -261,9 +261,9 @@ func (n *Negentropy) SplitRange(lower, upper int, upperBound Bound, output *Stri if numElems < buckets*2 { // we just send the full ids here - n.encodeBound(output, upperBound) + n.writeBound(output, upperBound) output.WriteByte(byte(IdListMode)) - encodeVarIntToHex(output, numElems) + writeVarInt(output, numElems) for _, item := range n.storage.Range(lower, upper) { output.WriteHex(item.ID) @@ -299,7 +299,7 @@ func (n *Negentropy) SplitRange(lower, upper int, upperBound Bound, output *Stri nextBound = minBound } - n.encodeBound(output, nextBound) + n.writeBound(output, nextBound) output.WriteByte(byte(FingerprintMode)) output.WriteBytes(ourFingerprint[:]) } diff --git a/nip77/negentropy/whatever_test.go b/nip77/negentropy/whatever_test.go index 3d6d11b..bd44924 100644 --- a/nip77/negentropy/whatever_test.go +++ b/nip77/negentropy/whatever_test.go @@ -2,7 +2,6 @@ package negentropy import ( "fmt" - "log" "slices" "sync" "testing" @@ -106,27 +105,7 @@ func runTestWith(t *testing.T, i := 1 wg := sync.WaitGroup{} - wg.Add(3) - - var fatal error - - go func() { - defer wg.Done() - for n := n1; q != ""; n = invert[n] { - i++ - - fmt.Println("processing reconcile", n) - q, err = n.Reconcile(q) - if err != nil { - fatal = err - return - } - - if q == "" { - return - } - } - }() + wg.Add(2) go func() { defer wg.Done() @@ -166,8 +145,19 @@ func runTestWith(t *testing.T, require.Equal(t, expectedNeed, havenots, "wrong need") }() - wg.Wait() - if fatal != nil { - log.Fatal(fatal) + for n := n1; q != ""; n = invert[n] { + i++ + + fmt.Println("processing reconcile", n) + q, err = n.Reconcile(q) + + if err != nil { + t.Fatalf("reconciliation failed: %s", err) + } + + if q == "" { + wg.Wait() + return + } } }