Merge pull request #10162 from ellemouton/graphMigUnwrapDNSAddrs

graph/db: unwrap dns addresses from opaque ones during migration
This commit is contained in:
Yong
2025-09-04 22:29:49 +08:00
committed by GitHub
4 changed files with 410 additions and 147 deletions

View File

@@ -435,6 +435,13 @@ func TestPopulateDBs(t *testing.T) {
// NOTE: the testPostgres variable can be set to true to test with a
// postgres backend instead of the kvdb-sqlite backend.
//
// NOTE: you will need to set the following build tags in order to run this
// test:
//
// test_native_sql
// kvdb_sqlite // If your source is kvdb-sqlite
// kvdb_postgres // If your source is kvdb-postgres
//
// NOTE: this is a helper test and is not run by default.
func TestPopulateViaMigration(t *testing.T) {
// ======= STEP 0 ===========

View File

@@ -260,18 +260,17 @@ func migrateNodes(ctx context.Context, cfg *sqldb.QueryConfig,
"opaque data for node %x: %w", pub, err)
}
if err = maybeOverrideNodeAddresses(node); err != nil {
skipped++
log.Warnf("Skipping migration of node %x with invalid "+
"address (%v): %v", pub, node.Addresses, err)
return nil
}
count++
chunk++
// TODO(elle): At this point, we should check the loaded node
// to see if we should extract any DNS addresses from its
// opaque type addresses. This is expected to be done in:
// https://github.com/lightningnetwork/lnd/pull/9455.
// This TODO is being tracked in
// https://github.com/lightningnetwork/lnd/issues/9795 as this
// must be addressed before making this code path active in
// production.
// Write the node to the SQL database.
id, err := insertNodeSQLMig(ctx, sqlDB, node)
if err != nil {
@@ -323,12 +322,82 @@ func migrateNodes(ctx context.Context, cfg *sqldb.QueryConfig,
}
log.Infof("Migrated %d nodes from KV to SQL in %v (skipped %d nodes "+
"due to invalid TLV streams)", count, time.Since(totalTime),
"due to invalid TLV streams or invalid addresses)", count,
time.Since(totalTime),
skipped)
return nil
}
// maybeOverrideNodeAddresses checks if the node has any opaque addresses that
// can be parsed. If so, it replaces the node's addresses with the parsed
// addresses. If the address is unparseable, it returns an error.
func maybeOverrideNodeAddresses(node *models.LightningNode) error {
// In the majority of cases, the number of node addresses will remain
// unchanged, so we pre-allocate a slice of the same length.
addrs := make([]net.Addr, 0, len(node.Addresses))
// Iterate over each address in search of any opaque addresses that we
// can inspect.
for _, addr := range node.Addresses {
opaque, ok := addr.(*lnwire.OpaqueAddrs)
if !ok {
// Any non-opaque address is left unchanged.
addrs = append(addrs, addr)
continue
}
// For each opaque address, we'll now attempt to parse out any
// known addresses. We'll do this in a loop, as it's possible
// that there are several addresses encoded in a single opaque
// address.
payload := opaque.Payload
for len(payload) > 0 {
var (
r = bytes.NewReader(payload)
numAddrBytes = uint16(len(payload))
)
byteRead, readAddr, err := lnwire.ReadAddress(
r, numAddrBytes,
)
if err != nil {
return err
}
// If we were able to read an address, we'll add it to
// our list of addresses.
if readAddr != nil {
addrs = append(addrs, readAddr)
}
// If the address we read was an opaque address, it
// means we've hit an unknown address type, and it has
// consumed the rest of the payload. We can break out
// of the loop.
if _, ok := readAddr.(*lnwire.OpaqueAddrs); ok {
break
}
// If we've read all the bytes, we can also break.
if byteRead >= numAddrBytes {
break
}
// Otherwise, we'll advance our payload slice and
// continue.
payload = payload[byteRead:]
}
}
// Override the node addresses if we have any.
if len(addrs) != 0 {
node.Addresses = addrs
}
return nil
}
// migrateSourceNode migrates the source node from the KV backend to the
// SQL database.
func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend,

View File

@@ -49,6 +49,78 @@ var (
BitcoinSig1Bytes: testSig.Serialize(),
BitcoinSig2Bytes: testSig.Serialize(),
}
// testOpaqueAddrWithEmbeddedDNSAddr is an opaque address that contains
// a single DNS address within it.
testOpaqueAddrWithEmbeddedDNSAddr = &lnwire.OpaqueAddrs{
Payload: []byte{
// The protocol level type for DNS addresses.
0x05,
// Hostname length: 11.
0x0b,
// The hostname itself.
'e', 'x', 'a', 'm', 'p', 'l', 'e', '.', 'c', 'o', 'm',
// Port 8080 in big-endian.
0x1f, 0x90,
},
}
// testOpaqueAddrWithEmbeddedDNSAddrAndMore is an opaque address that
// contains a DNS address within it, along with some extra bytes that
// represent some other unknown address type.
testOpaqueAddrWithEmbeddedDNSAddrAndMore = &lnwire.OpaqueAddrs{
Payload: []byte{
// The protocol level type for DNS addresses.
0x05,
// Hostname length: 11.
0x0B,
// The hostname itself.
'e', 'x', 'a', 'm', 'p', 'l', 'e', '.', 'c', 'o', 'm',
// port 8080 in big-endian.
0x1F, 0x90,
// Now we add more opaque bytes to represent more
// addresses that we don't know about yet.
// NOTE: the 0xff is an address type that we definitely
// don't know about yet
0xff, 0x02, 0x03, 0x04, 0x05, 0x06,
},
}
// testOpaqueAddrWithEmbeddedBadDNSAddr is an opaque address that
// contains an invalid DNS address within it.
testOpaqueAddrWithEmbeddedBadDNSAddr = &lnwire.OpaqueAddrs{
Payload: []byte{
// The protocol level type for DNS addresses.
0x05,
// Hostname length: We set this to a size that is
// incorrect in order to simulate the bad DNS address.
0xAA,
// The hostname itself.
'e', 'x', 'a', 'm', 'p', 'l', 'e', '.', 'c', 'o', 'm',
// port 9735 in big-endian.
0x26, 0x07,
},
}
// testOpaqueAddrWithTwoEmbeddedDNSAddrs is an opaque address that
// contains two valid DNS addresses within it.
testOpaqueAddrWithTwoEmbeddedDNSAddrs = &lnwire.OpaqueAddrs{
Payload: []byte{
// The protocol level type for DNS addresses.
0x05,
// Hostname length: 11.
0x0B,
// The hostname itself.
'e', 'x', 'a', 'm', 'p', 'l', 'e', '.', 'c', 'o', 'm',
// port 8080 in big-endian.
0x1F, 0x90,
// Another DNS address.
0x05,
0x0B,
'e', 'x', 'a', 'm', 'p', 'l', 'e', '.', 'c', 'o', 'm',
0x1F, 0x90,
},
}
)
// TestMigrateGraphToSQL tests various deterministic cases that we want to test
@@ -1173,6 +1245,104 @@ func TestSQLMigrationEdgeCases(t *testing.T) {
zombies: []uint64{2},
})
})
// We have used this migration as a chance to also extract any DNS
// addresses that we previously may have wrapped in an opaque address.
// If we do encounter such a case, then the migrated node set will look
// slightly different from the original node set in the KV store, and so
// we test for that here.
t.Run("node with wrapped DNS address inside opaque addr",
func(t *testing.T) {
t.Parallel()
var expectedNodes []*models.LightningNode
// Let the first node have an opaque address that we
// still don't understand. This node will remain the
// same in the SQL store.
n1 := makeTestNode(t, func(n *models.LightningNode) {
n.Addresses = []net.Addr{
testOpaqueAddr,
}
})
expectedNodes = append(expectedNodes, n1)
// The second node will have a wrapped DNS address
// inside an opaque address. The opaque address will
// only contain a DNS address and so the migrated node
// will only contain a DNS address and no opaque
// address.
n2 := makeTestNode(t, func(n *models.LightningNode) {
n.Addresses = []net.Addr{
testOpaqueAddrWithEmbeddedDNSAddr,
}
})
n2Expected := *n2
n2Expected.Addresses = []net.Addr{
testDNSAddr,
}
expectedNodes = append(expectedNodes, &n2Expected)
// The third node will have an opaque address that
// wraps a DNS address along with some other data.
// So the resulting migrated node should have both
// the DNS address and remaining opaque address data.
n3 := makeTestNode(t, func(n *models.LightningNode) {
n.Addresses = []net.Addr{
//nolint:ll
testOpaqueAddrWithEmbeddedDNSAddrAndMore,
}
})
n3Expected := *n3
n3Expected.Addresses = []net.Addr{
testDNSAddr,
testOpaqueAddr,
}
expectedNodes = append(expectedNodes, &n3Expected)
// The fourth node will have an opaque address that
// wraps an invalid DNS address. Such a node will not be
// migrated since propagating an invalid DNS address
// is not allowed.
n4 := makeTestNode(t, func(n *models.LightningNode) {
n.Addresses = []net.Addr{
testOpaqueAddrWithEmbeddedBadDNSAddr,
}
})
// NOTE: we don't add this node to the expected nodes
// slice.
// The fifth node will have 2 DNS addresses embedded
// in the opaque address. The migration will result
// in _both_ dns addresses being extracted. This is
// invalid at a protocol level, and so we should not
// propagate such addresses, but this is left to higher
// level gossip logic.
n5 := makeTestNode(t, func(n *models.LightningNode) {
n.Addresses = []net.Addr{
testOpaqueAddrWithTwoEmbeddedDNSAddrs,
}
})
n5Expected := *n5
n5Expected.Addresses = []net.Addr{
testDNSAddr,
testDNSAddr,
}
expectedNodes = append(expectedNodes, &n5Expected)
populateKV := func(t *testing.T, db *KVStore) {
require.NoError(t, db.AddLightningNode(ctx, n1))
require.NoError(t, db.AddLightningNode(ctx, n2))
require.NoError(t, db.AddLightningNode(ctx, n3))
require.NoError(t, db.AddLightningNode(ctx, n4))
require.NoError(t, db.AddLightningNode(ctx, n5))
}
runTestMigration(t, populateKV, dbState{
nodes: expectedNodes,
})
},
)
}
// runTestMigration is a helper function that sets up the KVStore and SQLStore,

View File

@@ -744,146 +744,19 @@ func ReadElement(r io.Reader, element interface{}) error {
)
for addrBytesRead < addrsLen {
var descriptor [1]byte
if _, err = io.ReadFull(addrBuf, descriptor[:]); err != nil {
return err
bytesRead, address, err := ReadAddress(
addrBuf, addrsLen-addrBytesRead,
)
if err != nil {
return fmt.Errorf("unable to read address: %w",
err)
}
addrBytesRead += bytesRead
addrBytesRead++
var address net.Addr
switch aType := addressType(descriptor[0]); aType {
case noAddr:
// If we encounter a noAddr descriptor, then we'll move
// on to the next address.
if address == nil {
continue
case tcp4Addr:
var ip [4]byte
if _, err := io.ReadFull(addrBuf, ip[:]); err != nil {
return err
}
var port [2]byte
if _, err := io.ReadFull(addrBuf, port[:]); err != nil {
return err
}
address = &net.TCPAddr{
IP: net.IP(ip[:]),
Port: int(binary.BigEndian.Uint16(port[:])),
}
addrBytesRead += tcp4AddrLen
case tcp6Addr:
var ip [16]byte
if _, err := io.ReadFull(addrBuf, ip[:]); err != nil {
return err
}
var port [2]byte
if _, err := io.ReadFull(addrBuf, port[:]); err != nil {
return err
}
address = &net.TCPAddr{
IP: net.IP(ip[:]),
Port: int(binary.BigEndian.Uint16(port[:])),
}
addrBytesRead += tcp6AddrLen
case v2OnionAddr:
var h [tor.V2DecodedLen]byte
if _, err := io.ReadFull(addrBuf, h[:]); err != nil {
return err
}
var p [2]byte
if _, err := io.ReadFull(addrBuf, p[:]); err != nil {
return err
}
onionService := tor.Base32Encoding.EncodeToString(h[:])
onionService += tor.OnionSuffix
port := int(binary.BigEndian.Uint16(p[:]))
address = &tor.OnionAddr{
OnionService: onionService,
Port: port,
}
addrBytesRead += v2OnionAddrLen
case v3OnionAddr:
var h [tor.V3DecodedLen]byte
if _, err := io.ReadFull(addrBuf, h[:]); err != nil {
return err
}
var p [2]byte
if _, err := io.ReadFull(addrBuf, p[:]); err != nil {
return err
}
onionService := tor.Base32Encoding.EncodeToString(h[:])
onionService += tor.OnionSuffix
port := int(binary.BigEndian.Uint16(p[:]))
address = &tor.OnionAddr{
OnionService: onionService,
Port: port,
}
addrBytesRead += v3OnionAddrLen
case dnsAddr:
var hostnameLen [1]byte
_, err := io.ReadFull(addrBuf, hostnameLen[:])
if err != nil {
return err
}
hostname := make([]byte, hostnameLen[0])
_, err = io.ReadFull(addrBuf, hostname)
if err != nil {
return err
}
var port [2]byte
_, err = io.ReadFull(addrBuf, port[:])
if err != nil {
return err
}
address = &DNSAddress{
Hostname: string(hostname),
Port: binary.BigEndian.Uint16(
port[:],
),
}
addrBytesRead += dnsAddrOverhead +
uint16(len(hostname))
default:
// If we don't understand this address type,
// we just store it along with the remaining
// address bytes as type OpaqueAddrs. We need
// to hold onto the bytes so that we can still
// write them back to the wire when we
// propagate this message.
payloadLen := 1 + addrsLen - addrBytesRead
payload := make([]byte, payloadLen)
// First write a byte for the address type that
// we already read.
payload[0] = byte(aType)
// Now append the rest of the address bytes.
_, err := io.ReadFull(addrBuf, payload[1:])
if err != nil {
return err
}
address = &OpaqueAddrs{
Payload: payload,
}
addrBytesRead = addrsLen
}
addresses = append(addresses, address)
@@ -949,3 +822,147 @@ func ReadElements(r io.Reader, elements ...interface{}) error {
}
return nil
}
// ReadAddress attempts to read a single address descriptor (as defined in
// Bolt 7) from the passed io.Reader. The total length of the address section
// (in bytes) must be provided so that we can ensure we don't read beyond the
// end of the address section. The number of bytes read from the reader and the
// parsed net.Addr are returned.
//
// NOTE: it is possible for the number of bytes read to be 1 even if a nil
// address is returned.
func ReadAddress(addrBuf io.Reader, addrsLen uint16) (uint16, net.Addr, error) {
var descriptor [1]byte
if _, err := io.ReadFull(addrBuf, descriptor[:]); err != nil {
return 0, nil, err
}
addrBytesRead := uint16(1)
var address net.Addr
switch aType := addressType(descriptor[0]); aType {
case noAddr:
return addrBytesRead, nil, nil
case tcp4Addr:
var ip [4]byte
if _, err := io.ReadFull(addrBuf, ip[:]); err != nil {
return 0, nil, err
}
var port [2]byte
if _, err := io.ReadFull(addrBuf, port[:]); err != nil {
return 0, nil, err
}
address = &net.TCPAddr{
IP: net.IP(ip[:]),
Port: int(binary.BigEndian.Uint16(port[:])),
}
addrBytesRead += tcp4AddrLen
case tcp6Addr:
var ip [16]byte
if _, err := io.ReadFull(addrBuf, ip[:]); err != nil {
return 0, nil, err
}
var port [2]byte
if _, err := io.ReadFull(addrBuf, port[:]); err != nil {
return 0, nil, err
}
address = &net.TCPAddr{
IP: net.IP(ip[:]),
Port: int(binary.BigEndian.Uint16(port[:])),
}
addrBytesRead += tcp6AddrLen
case v2OnionAddr:
var h [tor.V2DecodedLen]byte
if _, err := io.ReadFull(addrBuf, h[:]); err != nil {
return 0, nil, err
}
var p [2]byte
if _, err := io.ReadFull(addrBuf, p[:]); err != nil {
return 0, nil, err
}
onionService := tor.Base32Encoding.EncodeToString(h[:])
onionService += tor.OnionSuffix
port := int(binary.BigEndian.Uint16(p[:]))
address = &tor.OnionAddr{
OnionService: onionService,
Port: port,
}
addrBytesRead += v2OnionAddrLen
case v3OnionAddr:
var h [tor.V3DecodedLen]byte
if _, err := io.ReadFull(addrBuf, h[:]); err != nil {
return 0, nil, err
}
var p [2]byte
if _, err := io.ReadFull(addrBuf, p[:]); err != nil {
return 0, nil, err
}
onionService := tor.Base32Encoding.EncodeToString(h[:])
onionService += tor.OnionSuffix
port := int(binary.BigEndian.Uint16(p[:]))
address = &tor.OnionAddr{
OnionService: onionService,
Port: port,
}
addrBytesRead += v3OnionAddrLen
case dnsAddr:
var hostnameLen [1]byte
if _, err := io.ReadFull(addrBuf, hostnameLen[:]); err != nil {
return 0, nil, err
}
hostname := make([]byte, hostnameLen[0])
if _, err := io.ReadFull(addrBuf, hostname); err != nil {
return 0, nil, err
}
var port [2]byte
if _, err := io.ReadFull(addrBuf, port[:]); err != nil {
return 0, nil, err
}
address = &DNSAddress{
Hostname: string(hostname),
Port: binary.BigEndian.Uint16(port[:]),
}
addrBytesRead += dnsAddrOverhead + uint16(len(hostname))
default:
// If we don't understand this address type, we just store it
// along with the remaining address bytes as type OpaqueAddrs.
// We need to hold onto the bytes so that we can still write
// them back to the wire when we propagate this message.
payloadLen := 1 + addrsLen - addrBytesRead
payload := make([]byte, payloadLen)
// First write a byte for the address type that we already read.
payload[0] = byte(aType)
// Now append the rest of the address bytes.
if _, err := io.ReadFull(addrBuf, payload[1:]); err != nil {
return 0, nil, err
}
address = &OpaqueAddrs{
Payload: payload,
}
addrBytesRead = addrsLen
}
return addrBytesRead, address, nil
}