routing+channeldb: migrate MC store to use minimal Route encoding

Add a new mcRoute type that houses the data about a route that MC
actually uses. Then add a migration (channeldb/migration32) that
migrates the existing store from its current serialisation to the new,
more minimal serialisation.
This commit is contained in:
Elle Mouton
2024-08-06 14:33:07 +02:00
parent 96445f99b4
commit 383a6d274f
9 changed files with 873 additions and 136 deletions

View File

@@ -26,6 +26,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb/migration29"
"github.com/lightningnetwork/lnd/channeldb/migration30"
"github.com/lightningnetwork/lnd/channeldb/migration31"
"github.com/lightningnetwork/lnd/channeldb/migration32"
"github.com/lightningnetwork/lnd/channeldb/migration_01_to_11"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/invoices"
@@ -286,6 +287,10 @@ var (
number: 31,
migration: migration31.DeleteLastPublishedTxTLB,
},
{
number: 32,
migration: migration32.MigrateMCRouteSerialisation,
},
}
// optionalVersions stores all optional migrations that are applied

View File

@@ -0,0 +1,53 @@
package migration32
import (
"bytes"
"fmt"
"github.com/lightningnetwork/lnd/kvdb"
)
// MigrateMCRouteSerialisation reads all the mission control store entries and
// re-serializes them using a minimal route serialisation so that only the parts
// of the route that are actually required for mission control are persisted.
func MigrateMCRouteSerialisation(tx kvdb.RwTx) error {
log.Infof("Migrating Mission Control store to use a more minimal " +
"encoding for routes")
resultBucket := tx.ReadWriteBucket(resultsKey)
// If the results bucket does not exist then there are no entries in
// the mission control store yet and so there is nothing to migrate.
if resultBucket == nil {
return nil
}
// For each entry, read it into memory using the old encoding. Then,
// extract the more minimal route, re-encode and persist the entry.
return resultBucket.ForEach(func(k, v []byte) error {
// Read the entry using the old encoding.
resultOld, err := deserializeOldResult(k, v)
if err != nil {
return err
}
// Convert to the new payment result format with the minimal
// route.
resultNew := convertPaymentResult(resultOld)
// Serialise the new payment result using the new encoding.
key, resultNewBytes, err := serializeNewResult(resultNew)
if err != nil {
return err
}
// Make sure that the derived key is the same.
if !bytes.Equal(key, k) {
return fmt.Errorf("new payment result key (%v) is "+
"not the same as the old key (%v)", key, k)
}
// Finally, overwrite the previous value with the new encoding.
return resultBucket.Put(k, resultNewBytes)
})
}

View File

@@ -0,0 +1,237 @@
package migration32
import (
"encoding/hex"
"testing"
"time"
"github.com/btcsuite/btcd/btcec/v2"
lnwire "github.com/lightningnetwork/lnd/channeldb/migration/lnwire21"
"github.com/lightningnetwork/lnd/channeldb/migtest"
"github.com/lightningnetwork/lnd/kvdb"
)
var (
failureIndex = 8
testPub = Vertex{2, 202, 4}
testPub2 = Vertex{22, 202, 4}
pubkeyBytes, _ = hex.DecodeString(
"598ec453728e0ffe0ae2f5e174243cf58f2" +
"a3f2c83d2457b43036db568b11093",
)
pubKeyY = new(btcec.FieldVal)
_ = pubKeyY.SetByteSlice(pubkeyBytes)
pubkey = btcec.NewPublicKey(new(btcec.FieldVal).SetInt(4), pubKeyY)
paymentResultCommon1 = paymentResultCommon{
id: 0,
timeFwd: time.Unix(0, 1),
timeReply: time.Unix(0, 2),
success: false,
failureSourceIdx: &failureIndex,
failure: &lnwire.FailFeeInsufficient{},
}
paymentResultCommon2 = paymentResultCommon{
id: 2,
timeFwd: time.Unix(0, 4),
timeReply: time.Unix(0, 7),
success: true,
}
)
// TestMigrateMCRouteSerialisation tests that the MigrateMCRouteSerialisation
// migration function correctly migrates the MC store from using the old route
// encoding to using the newer, more minimal route encoding.
func TestMigrateMCRouteSerialisation(t *testing.T) {
customRecord := map[uint64][]byte{
65536: {4, 2, 2},
}
resultsOld := []*paymentResultOld{
{
paymentResultCommon: paymentResultCommon1,
route: &Route{
TotalTimeLock: 100,
TotalAmount: 400,
SourcePubKey: testPub,
Hops: []*Hop{
// A hop with MPP, AMP and custom
// records.
{
PubKeyBytes: testPub,
ChannelID: 100,
OutgoingTimeLock: 300,
AmtToForward: 500,
MPP: &MPP{
paymentAddr: [32]byte{
4, 5,
},
totalMsat: 900,
},
AMP: &AMP{
rootShare: [32]byte{
0, 0,
},
setID: [32]byte{
5, 5, 5,
},
childIndex: 90,
},
CustomRecords: customRecord,
Metadata: []byte{6, 7, 7},
},
// A legacy hop.
{
PubKeyBytes: testPub,
ChannelID: 800,
OutgoingTimeLock: 4,
AmtToForward: 4,
LegacyPayload: true,
},
// A hop with a blinding key.
{
PubKeyBytes: testPub,
ChannelID: 800,
OutgoingTimeLock: 4,
AmtToForward: 4,
BlindingPoint: pubkey,
EncryptedData: []byte{
1, 2, 3,
},
TotalAmtMsat: 600,
},
// A hop with a blinding key and custom
// records.
{
PubKeyBytes: testPub,
ChannelID: 800,
OutgoingTimeLock: 4,
AmtToForward: 4,
CustomRecords: customRecord,
BlindingPoint: pubkey,
EncryptedData: []byte{
1, 2, 3,
},
TotalAmtMsat: 600,
},
},
},
},
{
paymentResultCommon: paymentResultCommon2,
route: &Route{
TotalTimeLock: 101,
TotalAmount: 401,
SourcePubKey: testPub2,
Hops: []*Hop{
{
PubKeyBytes: testPub,
ChannelID: 800,
OutgoingTimeLock: 4,
AmtToForward: 4,
BlindingPoint: pubkey,
EncryptedData: []byte{
1, 2, 3,
},
TotalAmtMsat: 600,
},
},
},
},
}
expectedResultsNew := []*paymentResultNew{
{
paymentResultCommon: paymentResultCommon1,
route: &mcRoute{
sourcePubKey: testPub,
totalAmount: 400,
hops: []*mcHop{
{
channelID: 100,
pubKeyBytes: testPub,
amtToFwd: 500,
hasCustomRecords: true,
},
{
channelID: 800,
pubKeyBytes: testPub,
amtToFwd: 4,
},
{
channelID: 800,
pubKeyBytes: testPub,
amtToFwd: 4,
hasBlindingPoint: true,
},
{
channelID: 800,
pubKeyBytes: testPub,
amtToFwd: 4,
hasBlindingPoint: true,
hasCustomRecords: true,
},
},
},
},
{
paymentResultCommon: paymentResultCommon2,
route: &mcRoute{
sourcePubKey: testPub2,
totalAmount: 401,
hops: []*mcHop{
{
channelID: 800,
pubKeyBytes: testPub,
amtToFwd: 4,
hasBlindingPoint: true,
},
},
},
},
}
// Prime the database with some mission control data that uses the
// old route encoding.
before := func(tx kvdb.RwTx) error {
resultBucket, err := tx.CreateTopLevelBucket(resultsKey)
if err != nil {
return err
}
for _, result := range resultsOld {
k, v, err := serializeOldResult(result)
if err != nil {
return err
}
if err := resultBucket.Put(k, v); err != nil {
return err
}
}
return nil
}
// After the migration, ensure that all the relevant info was
// maintained.
after := func(tx kvdb.RwTx) error {
m := make(map[string]interface{})
for _, result := range expectedResultsNew {
k, v, err := serializeNewResult(result)
if err != nil {
return err
}
m[string(k)] = string(v)
}
return migtest.VerifyDB(tx, resultsKey, m)
}
migtest.ApplyMigration(
t, before, after, MigrateMCRouteSerialisation, false,
)
}

View File

@@ -0,0 +1,320 @@
package migration32
import (
"bytes"
"io"
"math"
"time"
"github.com/btcsuite/btcd/wire"
lnwire "github.com/lightningnetwork/lnd/channeldb/migration/lnwire21"
)
const (
// unknownFailureSourceIdx is the database encoding of an unknown error
// source.
unknownFailureSourceIdx = -1
)
var (
// resultsKey is the fixed key under which the attempt results are
// stored.
resultsKey = []byte("missioncontrol-results")
)
// paymentResultCommon holds the fields that are shared by the old and new
// payment result encoding.
type paymentResultCommon struct {
id uint64
timeFwd, timeReply time.Time
success bool
failureSourceIdx *int
failure lnwire.FailureMessage
}
// paymentResultOld is the information that becomes available when a payment
// attempt completes.
type paymentResultOld struct {
paymentResultCommon
route *Route
}
// deserializeOldResult deserializes a payment result using the old encoding.
func deserializeOldResult(k, v []byte) (*paymentResultOld, error) {
// Parse payment id.
result := paymentResultOld{
paymentResultCommon: paymentResultCommon{
id: byteOrder.Uint64(k[8:]),
},
}
r := bytes.NewReader(v)
// Read timestamps, success status and failure source index.
var (
timeFwd, timeReply uint64
dbFailureSourceIdx int32
)
err := ReadElements(
r, &timeFwd, &timeReply, &result.success, &dbFailureSourceIdx,
)
if err != nil {
return nil, err
}
// Convert time stamps to local time zone for consistent logging.
result.timeFwd = time.Unix(0, int64(timeFwd)).Local()
result.timeReply = time.Unix(0, int64(timeReply)).Local()
// Convert from unknown index magic number to nil value.
if dbFailureSourceIdx != unknownFailureSourceIdx {
failureSourceIdx := int(dbFailureSourceIdx)
result.failureSourceIdx = &failureSourceIdx
}
// Read route.
route, err := DeserializeRoute(r)
if err != nil {
return nil, err
}
result.route = &route
// Read failure.
failureBytes, err := wire.ReadVarBytes(r, 0, math.MaxUint16, "failure")
if err != nil {
return nil, err
}
if len(failureBytes) > 0 {
result.failure, err = lnwire.DecodeFailureMessage(
bytes.NewReader(failureBytes), 0,
)
if err != nil {
return nil, err
}
}
return &result, nil
}
// convertPaymentResult converts a paymentResultOld to a paymentResultNew.
func convertPaymentResult(old *paymentResultOld) *paymentResultNew {
return &paymentResultNew{
paymentResultCommon: old.paymentResultCommon,
route: extractMCRoute(old.route),
}
}
// paymentResultNew is the information that becomes available when a payment
// attempt completes.
type paymentResultNew struct {
paymentResultCommon
route *mcRoute
}
// extractMCRoute extracts the fields required by MC from the Route struct to
// create the more minimal mcRoute struct.
func extractMCRoute(route *Route) *mcRoute {
return &mcRoute{
sourcePubKey: route.SourcePubKey,
totalAmount: route.TotalAmount,
hops: extractMCHops(route.Hops),
}
}
// extractMCHops extracts the Hop fields that MC actually uses from a slice of
// Hops.
func extractMCHops(hops []*Hop) []*mcHop {
mcHops := make([]*mcHop, len(hops))
for i, hop := range hops {
mcHops[i] = extractMCHop(hop)
}
return mcHops
}
// extractMCHop extracts the Hop fields that MC actually uses from a Hop.
func extractMCHop(hop *Hop) *mcHop {
return &mcHop{
channelID: hop.ChannelID,
pubKeyBytes: hop.PubKeyBytes,
amtToFwd: hop.AmtToForward,
hasBlindingPoint: hop.BlindingPoint != nil,
hasCustomRecords: len(hop.CustomRecords) > 0,
}
}
// mcRoute holds the bare minimum info about a payment attempt route that MC
// requires.
type mcRoute struct {
sourcePubKey Vertex
totalAmount lnwire.MilliSatoshi
hops []*mcHop
}
// mcHop holds the bare minimum info about a payment attempt route hop that MC
// requires.
type mcHop struct {
channelID uint64
pubKeyBytes Vertex
amtToFwd lnwire.MilliSatoshi
hasBlindingPoint bool
hasCustomRecords bool
}
// serializeOldResult serializes a payment result and returns a key and value
// byte slice to insert into the bucket.
func serializeOldResult(rp *paymentResultOld) ([]byte, []byte, error) {
// Write timestamps, success status, failure source index and route.
var b bytes.Buffer
var dbFailureSourceIdx int32
if rp.failureSourceIdx == nil {
dbFailureSourceIdx = unknownFailureSourceIdx
} else {
dbFailureSourceIdx = int32(*rp.failureSourceIdx)
}
err := WriteElements(
&b,
uint64(rp.timeFwd.UnixNano()),
uint64(rp.timeReply.UnixNano()),
rp.success, dbFailureSourceIdx,
)
if err != nil {
return nil, nil, err
}
if err := SerializeRoute(&b, *rp.route); err != nil {
return nil, nil, err
}
// Write failure. If there is no failure message, write an empty
// byte slice.
var failureBytes bytes.Buffer
if rp.failure != nil {
err := lnwire.EncodeFailureMessage(&failureBytes, rp.failure, 0)
if err != nil {
return nil, nil, err
}
}
err = wire.WriteVarBytes(&b, 0, failureBytes.Bytes())
if err != nil {
return nil, nil, err
}
// Compose key that identifies this result.
key := getResultKeyOld(rp)
return key, b.Bytes(), nil
}
// getResultKeyOld returns a byte slice representing a unique key for this
// payment result.
func getResultKeyOld(rp *paymentResultOld) []byte {
var keyBytes [8 + 8 + 33]byte
// Identify records by a combination of time, payment id and sender pub
// key. This allows importing mission control data from an external
// source without key collisions and keeps the records sorted
// chronologically.
byteOrder.PutUint64(keyBytes[:], uint64(rp.timeReply.UnixNano()))
byteOrder.PutUint64(keyBytes[8:], rp.id)
copy(keyBytes[16:], rp.route.SourcePubKey[:])
return keyBytes[:]
}
// serializeNewResult serializes a payment result and returns a key and value
// byte slice to insert into the bucket.
func serializeNewResult(rp *paymentResultNew) ([]byte, []byte, error) {
// Write timestamps, success status, failure source index and route.
var b bytes.Buffer
var dbFailureSourceIdx int32
if rp.failureSourceIdx == nil {
dbFailureSourceIdx = unknownFailureSourceIdx
} else {
dbFailureSourceIdx = int32(*rp.failureSourceIdx)
}
err := WriteElements(
&b,
uint64(rp.timeFwd.UnixNano()),
uint64(rp.timeReply.UnixNano()),
rp.success, dbFailureSourceIdx,
)
if err != nil {
return nil, nil, err
}
if err := serializeMCRoute(&b, rp.route); err != nil {
return nil, nil, err
}
// Write failure. If there is no failure message, write an empty
// byte slice.
var failureBytes bytes.Buffer
if rp.failure != nil {
err := lnwire.EncodeFailureMessage(&failureBytes, rp.failure, 0)
if err != nil {
return nil, nil, err
}
}
err = wire.WriteVarBytes(&b, 0, failureBytes.Bytes())
if err != nil {
return nil, nil, err
}
// Compose key that identifies this result.
key := getResultKeyNew(rp)
return key, b.Bytes(), nil
}
// getResultKeyNew returns a byte slice representing a unique key for this
// payment result.
func getResultKeyNew(rp *paymentResultNew) []byte {
var keyBytes [8 + 8 + 33]byte
// Identify records by a combination of time, payment id and sender pub
// key. This allows importing mission control data from an external
// source without key collisions and keeps the records sorted
// chronologically.
byteOrder.PutUint64(keyBytes[:], uint64(rp.timeReply.UnixNano()))
byteOrder.PutUint64(keyBytes[8:], rp.id)
copy(keyBytes[16:], rp.route.sourcePubKey[:])
return keyBytes[:]
}
// serializeMCRoute serializes an mcRoute and writes the bytes to the given
// io.Writer.
func serializeMCRoute(w io.Writer, r *mcRoute) error {
if err := WriteElements(
w, r.totalAmount, r.sourcePubKey[:],
); err != nil {
return err
}
if err := WriteElements(w, uint32(len(r.hops))); err != nil {
return err
}
for _, h := range r.hops {
if err := serializeNewHop(w, h); err != nil {
return err
}
}
return nil
}
// serializeMCRoute serializes an mcHop and writes the bytes to the given
// io.Writer.
func serializeNewHop(w io.Writer, h *mcHop) error {
return WriteElements(w,
h.pubKeyBytes[:],
h.channelID,
h.amtToFwd,
h.hasBlindingPoint,
h.hasCustomRecords,
)
}