From d61a8af4c4e0fe45186a1f7ac6fc781c7f597f00 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Wed, 3 Sep 2025 08:32:37 +0200 Subject: [PATCH 1/3] lnwire: re-usable address descriptor parsers In this commit, the logic in ReadElements that is used to read a single address descriptor from a io.Reader is separated out into a new function so that the logic can be re-used elsewhere. --- lnwire/lnwire.go | 291 +++++++++++++++++++++++++---------------------- 1 file changed, 154 insertions(+), 137 deletions(-) diff --git a/lnwire/lnwire.go b/lnwire/lnwire.go index 86f7caa61..938240b30 100644 --- a/lnwire/lnwire.go +++ b/lnwire/lnwire.go @@ -744,146 +744,19 @@ func ReadElement(r io.Reader, element interface{}) error { ) for addrBytesRead < addrsLen { - var descriptor [1]byte - if _, err = io.ReadFull(addrBuf, descriptor[:]); err != nil { - return err + bytesRead, address, err := ReadAddress( + addrBuf, addrsLen-addrBytesRead, + ) + if err != nil { + return fmt.Errorf("unable to read address: %w", + err) } + addrBytesRead += bytesRead - addrBytesRead++ - - var address net.Addr - switch aType := addressType(descriptor[0]); aType { - case noAddr: + // If we encounter a noAddr descriptor, then we'll move + // on to the next address. + if address == nil { continue - - case tcp4Addr: - var ip [4]byte - if _, err := io.ReadFull(addrBuf, ip[:]); err != nil { - return err - } - - var port [2]byte - if _, err := io.ReadFull(addrBuf, port[:]); err != nil { - return err - } - - address = &net.TCPAddr{ - IP: net.IP(ip[:]), - Port: int(binary.BigEndian.Uint16(port[:])), - } - addrBytesRead += tcp4AddrLen - - case tcp6Addr: - var ip [16]byte - if _, err := io.ReadFull(addrBuf, ip[:]); err != nil { - return err - } - - var port [2]byte - if _, err := io.ReadFull(addrBuf, port[:]); err != nil { - return err - } - - address = &net.TCPAddr{ - IP: net.IP(ip[:]), - Port: int(binary.BigEndian.Uint16(port[:])), - } - addrBytesRead += tcp6AddrLen - - case v2OnionAddr: - var h [tor.V2DecodedLen]byte - if _, err := io.ReadFull(addrBuf, h[:]); err != nil { - return err - } - - var p [2]byte - if _, err := io.ReadFull(addrBuf, p[:]); err != nil { - return err - } - - onionService := tor.Base32Encoding.EncodeToString(h[:]) - onionService += tor.OnionSuffix - port := int(binary.BigEndian.Uint16(p[:])) - - address = &tor.OnionAddr{ - OnionService: onionService, - Port: port, - } - addrBytesRead += v2OnionAddrLen - - case v3OnionAddr: - var h [tor.V3DecodedLen]byte - if _, err := io.ReadFull(addrBuf, h[:]); err != nil { - return err - } - - var p [2]byte - if _, err := io.ReadFull(addrBuf, p[:]); err != nil { - return err - } - - onionService := tor.Base32Encoding.EncodeToString(h[:]) - onionService += tor.OnionSuffix - port := int(binary.BigEndian.Uint16(p[:])) - - address = &tor.OnionAddr{ - OnionService: onionService, - Port: port, - } - addrBytesRead += v3OnionAddrLen - - case dnsAddr: - var hostnameLen [1]byte - _, err := io.ReadFull(addrBuf, hostnameLen[:]) - if err != nil { - return err - } - - hostname := make([]byte, hostnameLen[0]) - _, err = io.ReadFull(addrBuf, hostname) - if err != nil { - return err - } - - var port [2]byte - _, err = io.ReadFull(addrBuf, port[:]) - if err != nil { - return err - } - - address = &DNSAddress{ - Hostname: string(hostname), - Port: binary.BigEndian.Uint16( - port[:], - ), - } - addrBytesRead += dnsAddrOverhead + - uint16(len(hostname)) - - default: - // If we don't understand this address type, - // we just store it along with the remaining - // address bytes as type OpaqueAddrs. We need - // to hold onto the bytes so that we can still - // write them back to the wire when we - // propagate this message. - payloadLen := 1 + addrsLen - addrBytesRead - payload := make([]byte, payloadLen) - - // First write a byte for the address type that - // we already read. - payload[0] = byte(aType) - - // Now append the rest of the address bytes. - _, err := io.ReadFull(addrBuf, payload[1:]) - if err != nil { - return err - } - - address = &OpaqueAddrs{ - Payload: payload, - } - addrBytesRead = addrsLen } addresses = append(addresses, address) @@ -949,3 +822,147 @@ func ReadElements(r io.Reader, elements ...interface{}) error { } return nil } + +// ReadAddress attempts to read a single address descriptor (as defined in +// Bolt 7) from the passed io.Reader. The total length of the address section +// (in bytes) must be provided so that we can ensure we don't read beyond the +// end of the address section. The number of bytes read from the reader and the +// parsed net.Addr are returned. +// +// NOTE: it is possible for the number of bytes read to be 1 even if a nil +// address is returned. +func ReadAddress(addrBuf io.Reader, addrsLen uint16) (uint16, net.Addr, error) { + var descriptor [1]byte + if _, err := io.ReadFull(addrBuf, descriptor[:]); err != nil { + return 0, nil, err + } + + addrBytesRead := uint16(1) + + var address net.Addr + switch aType := addressType(descriptor[0]); aType { + case noAddr: + return addrBytesRead, nil, nil + + case tcp4Addr: + var ip [4]byte + if _, err := io.ReadFull(addrBuf, ip[:]); err != nil { + return 0, nil, err + } + + var port [2]byte + if _, err := io.ReadFull(addrBuf, port[:]); err != nil { + return 0, nil, err + } + + address = &net.TCPAddr{ + IP: net.IP(ip[:]), + Port: int(binary.BigEndian.Uint16(port[:])), + } + addrBytesRead += tcp4AddrLen + + case tcp6Addr: + var ip [16]byte + if _, err := io.ReadFull(addrBuf, ip[:]); err != nil { + return 0, nil, err + } + + var port [2]byte + if _, err := io.ReadFull(addrBuf, port[:]); err != nil { + return 0, nil, err + } + + address = &net.TCPAddr{ + IP: net.IP(ip[:]), + Port: int(binary.BigEndian.Uint16(port[:])), + } + addrBytesRead += tcp6AddrLen + + case v2OnionAddr: + var h [tor.V2DecodedLen]byte + if _, err := io.ReadFull(addrBuf, h[:]); err != nil { + return 0, nil, err + } + + var p [2]byte + if _, err := io.ReadFull(addrBuf, p[:]); err != nil { + return 0, nil, err + } + + onionService := tor.Base32Encoding.EncodeToString(h[:]) + onionService += tor.OnionSuffix + port := int(binary.BigEndian.Uint16(p[:])) + + address = &tor.OnionAddr{ + OnionService: onionService, + Port: port, + } + addrBytesRead += v2OnionAddrLen + + case v3OnionAddr: + var h [tor.V3DecodedLen]byte + if _, err := io.ReadFull(addrBuf, h[:]); err != nil { + return 0, nil, err + } + + var p [2]byte + if _, err := io.ReadFull(addrBuf, p[:]); err != nil { + return 0, nil, err + } + + onionService := tor.Base32Encoding.EncodeToString(h[:]) + onionService += tor.OnionSuffix + port := int(binary.BigEndian.Uint16(p[:])) + + address = &tor.OnionAddr{ + OnionService: onionService, + Port: port, + } + addrBytesRead += v3OnionAddrLen + + case dnsAddr: + var hostnameLen [1]byte + if _, err := io.ReadFull(addrBuf, hostnameLen[:]); err != nil { + return 0, nil, err + } + + hostname := make([]byte, hostnameLen[0]) + if _, err := io.ReadFull(addrBuf, hostname); err != nil { + return 0, nil, err + } + + var port [2]byte + if _, err := io.ReadFull(addrBuf, port[:]); err != nil { + return 0, nil, err + } + + address = &DNSAddress{ + Hostname: string(hostname), + Port: binary.BigEndian.Uint16(port[:]), + } + addrBytesRead += dnsAddrOverhead + uint16(len(hostname)) + + default: + // If we don't understand this address type, we just store it + // along with the remaining address bytes as type OpaqueAddrs. + // We need to hold onto the bytes so that we can still write + // them back to the wire when we propagate this message. + payloadLen := 1 + addrsLen - addrBytesRead + payload := make([]byte, payloadLen) + + // First write a byte for the address type that we already read. + payload[0] = byte(aType) + + // Now append the rest of the address bytes. + if _, err := io.ReadFull(addrBuf, payload[1:]); err != nil { + return 0, nil, err + } + + address = &OpaqueAddrs{ + Payload: payload, + } + addrBytesRead = addrsLen + } + + return addrBytesRead, address, nil +} From af380c9eb1046ab422cca274ea29616fdc56ad85 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Wed, 3 Sep 2025 09:13:42 +0200 Subject: [PATCH 2/3] graph/db: extract DNS addresses during SQL migration In this commit, we take advantage of the graph SQL migration and use it to also extract DNS addresses from the opaque address type. We use opaque addresses to store addresses that we dont understand yet. We recently added logic for DNS addresses and so we may have persisted node announcements that have DNS addresses but we would currently have them stored under the opaque address type. So we use this migration to see if we can extract such addresses. A few decisions were made here: 1) If multiple DNS addressees are extracted, this is ok and we continue to migrate the node even though this is actually invalid at a protocol level. We will currently check (at a higher level) that a node announcement only has 1 DNS address in it before we broadcast it though. 2) If an invalid DNS address is encountered (so we hit the DNS type descriptor but then the rest of the DNS address payload is invalid and cannot be parsed into the expected hostname:port, then we skip migrating the node completely. --- graph/db/sql_migration.go | 89 +++++++++++++++-- graph/db/sql_migration_test.go | 170 +++++++++++++++++++++++++++++++++ 2 files changed, 249 insertions(+), 10 deletions(-) diff --git a/graph/db/sql_migration.go b/graph/db/sql_migration.go index b88518fd8..1071974cb 100644 --- a/graph/db/sql_migration.go +++ b/graph/db/sql_migration.go @@ -260,18 +260,17 @@ func migrateNodes(ctx context.Context, cfg *sqldb.QueryConfig, "opaque data for node %x: %w", pub, err) } + if err = maybeOverrideNodeAddresses(node); err != nil { + skipped++ + log.Warnf("Skipping migration of node %x with invalid "+ + "address (%v): %v", pub, node.Addresses, err) + + return nil + } + count++ chunk++ - // TODO(elle): At this point, we should check the loaded node - // to see if we should extract any DNS addresses from its - // opaque type addresses. This is expected to be done in: - // https://github.com/lightningnetwork/lnd/pull/9455. - // This TODO is being tracked in - // https://github.com/lightningnetwork/lnd/issues/9795 as this - // must be addressed before making this code path active in - // production. - // Write the node to the SQL database. id, err := insertNodeSQLMig(ctx, sqlDB, node) if err != nil { @@ -323,12 +322,82 @@ func migrateNodes(ctx context.Context, cfg *sqldb.QueryConfig, } log.Infof("Migrated %d nodes from KV to SQL in %v (skipped %d nodes "+ - "due to invalid TLV streams)", count, time.Since(totalTime), + "due to invalid TLV streams or invalid addresses)", count, + time.Since(totalTime), + skipped) return nil } +// maybeOverrideNodeAddresses checks if the node has any opaque addresses that +// can be parsed. If so, it replaces the node's addresses with the parsed +// addresses. If the address is unparseable, it returns an error. +func maybeOverrideNodeAddresses(node *models.LightningNode) error { + // In the majority of cases, the number of node addresses will remain + // unchanged, so we pre-allocate a slice of the same length. + addrs := make([]net.Addr, 0, len(node.Addresses)) + + // Iterate over each address in search of any opaque addresses that we + // can inspect. + for _, addr := range node.Addresses { + opaque, ok := addr.(*lnwire.OpaqueAddrs) + if !ok { + // Any non-opaque address is left unchanged. + addrs = append(addrs, addr) + continue + } + + // For each opaque address, we'll now attempt to parse out any + // known addresses. We'll do this in a loop, as it's possible + // that there are several addresses encoded in a single opaque + // address. + payload := opaque.Payload + for len(payload) > 0 { + var ( + r = bytes.NewReader(payload) + numAddrBytes = uint16(len(payload)) + ) + byteRead, readAddr, err := lnwire.ReadAddress( + r, numAddrBytes, + ) + if err != nil { + return err + } + + // If we were able to read an address, we'll add it to + // our list of addresses. + if readAddr != nil { + addrs = append(addrs, readAddr) + } + + // If the address we read was an opaque address, it + // means we've hit an unknown address type, and it has + // consumed the rest of the payload. We can break out + // of the loop. + if _, ok := readAddr.(*lnwire.OpaqueAddrs); ok { + break + } + + // If we've read all the bytes, we can also break. + if byteRead >= numAddrBytes { + break + } + + // Otherwise, we'll advance our payload slice and + // continue. + payload = payload[byteRead:] + } + } + + // Override the node addresses if we have any. + if len(addrs) != 0 { + node.Addresses = addrs + } + + return nil +} + // migrateSourceNode migrates the source node from the KV backend to the // SQL database. func migrateSourceNode(ctx context.Context, kvdb kvdb.Backend, diff --git a/graph/db/sql_migration_test.go b/graph/db/sql_migration_test.go index 90b2a8fd8..cb8443f81 100644 --- a/graph/db/sql_migration_test.go +++ b/graph/db/sql_migration_test.go @@ -49,6 +49,78 @@ var ( BitcoinSig1Bytes: testSig.Serialize(), BitcoinSig2Bytes: testSig.Serialize(), } + + // testOpaqueAddrWithEmbeddedDNSAddr is an opaque address that contains + // a single DNS address within it. + testOpaqueAddrWithEmbeddedDNSAddr = &lnwire.OpaqueAddrs{ + Payload: []byte{ + // The protocol level type for DNS addresses. + 0x05, + // Hostname length: 11. + 0x0b, + // The hostname itself. + 'e', 'x', 'a', 'm', 'p', 'l', 'e', '.', 'c', 'o', 'm', + // Port 8080 in big-endian. + 0x1f, 0x90, + }, + } + + // testOpaqueAddrWithEmbeddedDNSAddrAndMore is an opaque address that + // contains a DNS address within it, along with some extra bytes that + // represent some other unknown address type. + testOpaqueAddrWithEmbeddedDNSAddrAndMore = &lnwire.OpaqueAddrs{ + Payload: []byte{ + // The protocol level type for DNS addresses. + 0x05, + // Hostname length: 11. + 0x0B, + // The hostname itself. + 'e', 'x', 'a', 'm', 'p', 'l', 'e', '.', 'c', 'o', 'm', + // port 8080 in big-endian. + 0x1F, 0x90, + // Now we add more opaque bytes to represent more + // addresses that we don't know about yet. + // NOTE: the 0xff is an address type that we definitely + // don't know about yet + 0xff, 0x02, 0x03, 0x04, 0x05, 0x06, + }, + } + + // testOpaqueAddrWithEmbeddedBadDNSAddr is an opaque address that + // contains an invalid DNS address within it. + testOpaqueAddrWithEmbeddedBadDNSAddr = &lnwire.OpaqueAddrs{ + Payload: []byte{ + // The protocol level type for DNS addresses. + 0x05, + // Hostname length: We set this to a size that is + // incorrect in order to simulate the bad DNS address. + 0xAA, + // The hostname itself. + 'e', 'x', 'a', 'm', 'p', 'l', 'e', '.', 'c', 'o', 'm', + // port 9735 in big-endian. + 0x26, 0x07, + }, + } + + // testOpaqueAddrWithTwoEmbeddedDNSAddrs is an opaque address that + // contains two valid DNS addresses within it. + testOpaqueAddrWithTwoEmbeddedDNSAddrs = &lnwire.OpaqueAddrs{ + Payload: []byte{ + // The protocol level type for DNS addresses. + 0x05, + // Hostname length: 11. + 0x0B, + // The hostname itself. + 'e', 'x', 'a', 'm', 'p', 'l', 'e', '.', 'c', 'o', 'm', + // port 8080 in big-endian. + 0x1F, 0x90, + // Another DNS address. + 0x05, + 0x0B, + 'e', 'x', 'a', 'm', 'p', 'l', 'e', '.', 'c', 'o', 'm', + 0x1F, 0x90, + }, + } ) // TestMigrateGraphToSQL tests various deterministic cases that we want to test @@ -1174,6 +1246,104 @@ func TestSQLMigrationEdgeCases(t *testing.T) { zombies: []uint64{2}, }) }) + + // We have used this migration as a chance to also extract any DNS + // addresses that we previously may have wrapped in an opaque address. + // If we do encounter such a case, then the migrated node set will look + // slightly different from the original node set in the KV store, and so + // we test for that here. + t.Run("node with wrapped DNS address inside opaque addr", + func(t *testing.T) { + t.Parallel() + + var expectedNodes []*models.LightningNode + + // Let the first node have an opaque address that we + // still don't understand. This node will remain the + // same in the SQL store. + n1 := makeTestNode(t, func(n *models.LightningNode) { + n.Addresses = []net.Addr{ + testOpaqueAddr, + } + }) + expectedNodes = append(expectedNodes, n1) + + // The second node will have a wrapped DNS address + // inside an opaque address. The opaque address will + // only contain a DNS address and so the migrated node + // will only contain a DNS address and no opaque + // address. + n2 := makeTestNode(t, func(n *models.LightningNode) { + n.Addresses = []net.Addr{ + testOpaqueAddrWithEmbeddedDNSAddr, + } + }) + n2Expected := *n2 + n2Expected.Addresses = []net.Addr{ + testDNSAddr, + } + expectedNodes = append(expectedNodes, &n2Expected) + + // The third node will have an opaque address that + // wraps a DNS address along with some other data. + // So the resulting migrated node should have both + // the DNS address and remaining opaque address data. + n3 := makeTestNode(t, func(n *models.LightningNode) { + n.Addresses = []net.Addr{ + //nolint:ll + testOpaqueAddrWithEmbeddedDNSAddrAndMore, + } + }) + n3Expected := *n3 + n3Expected.Addresses = []net.Addr{ + testDNSAddr, + testOpaqueAddr, + } + expectedNodes = append(expectedNodes, &n3Expected) + + // The fourth node will have an opaque address that + // wraps an invalid DNS address. Such a node will not be + // migrated since propagating an invalid DNS address + // is not allowed. + n4 := makeTestNode(t, func(n *models.LightningNode) { + n.Addresses = []net.Addr{ + testOpaqueAddrWithEmbeddedBadDNSAddr, + } + }) + // NOTE: we don't add this node to the expected nodes + // slice. + + // The fifth node will have 2 DNS addresses embedded + // in the opaque address. The migration will result + // in _both_ dns addresses being extracted. This is + // invalid at a protocol level, and so we should not + // propagate such addresses, but this is left to higher + // level gossip logic. + n5 := makeTestNode(t, func(n *models.LightningNode) { + n.Addresses = []net.Addr{ + testOpaqueAddrWithTwoEmbeddedDNSAddrs, + } + }) + n5Expected := *n5 + n5Expected.Addresses = []net.Addr{ + testDNSAddr, + testDNSAddr, + } + expectedNodes = append(expectedNodes, &n5Expected) + + populateKV := func(t *testing.T, db *KVStore) { + require.NoError(t, db.AddLightningNode(ctx, n1)) + require.NoError(t, db.AddLightningNode(ctx, n2)) + require.NoError(t, db.AddLightningNode(ctx, n3)) + require.NoError(t, db.AddLightningNode(ctx, n4)) + require.NoError(t, db.AddLightningNode(ctx, n5)) + } + + runTestMigration(t, populateKV, dbState{ + nodes: expectedNodes, + }) + }, + ) } // runTestMigration is a helper function that sets up the KVStore and SQLStore, From a74f0b133a876a3e1f8c84f370fdb4c8ddb0b512 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Wed, 3 Sep 2025 09:35:18 +0200 Subject: [PATCH 3/3] graph/db: expand test comment with build tag info --- graph/db/benchmark_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/graph/db/benchmark_test.go b/graph/db/benchmark_test.go index 611653d53..20d194f70 100644 --- a/graph/db/benchmark_test.go +++ b/graph/db/benchmark_test.go @@ -435,6 +435,13 @@ func TestPopulateDBs(t *testing.T) { // NOTE: the testPostgres variable can be set to true to test with a // postgres backend instead of the kvdb-sqlite backend. // +// NOTE: you will need to set the following build tags in order to run this +// test: +// +// test_native_sql +// kvdb_sqlite // If your source is kvdb-sqlite +// kvdb_postgres // If your source is kvdb-postgres +// // NOTE: this is a helper test and is not run by default. func TestPopulateViaMigration(t *testing.T) { // ======= STEP 0 ===========