graph/db+sqldb: improve efficiency of node migration

There is no need to use the "collect-then-update" pattern for node
insertion during the SQL migration since if we do have any previously
persisted data for the node and happen to re-run the insertion for that
node, the data will be exactly the same. So we can make use of "On
conflict, no nothing" here too.
This commit is contained in:
Elle Mouton
2025-08-15 09:43:40 +02:00
parent 7b61744bed
commit d9754c6e3a
4 changed files with 92 additions and 42 deletions

View File

@@ -1390,9 +1390,6 @@ func forEachClosedSCID(db kvdb.Backend,
// update have a newer timestamp than the existing one. This is because we want
// the migration to be idempotent and dont want to error out if we re-insert the
// exact same node.
//
// TODO(elle): update the upsert calls in this function to be more efficient.
// since no data collection steps should be required during the migration.
func insertNodeSQLMig(ctx context.Context, db SQLQueries,
node *models.LightningNode) (int64, error) {
@@ -1419,19 +1416,42 @@ func insertNodeSQLMig(ctx context.Context, db SQLQueries,
return nodeID, nil
}
// NOTE: The upserts here will be updated to be more efficient in the
// following commits.
// Update the node's features.
err = upsertNodeFeatures(ctx, db, nodeID, node.Features)
if err != nil {
return 0, fmt.Errorf("inserting node features: %w", err)
// Insert the node's features.
for feature := range node.Features.Features() {
err = db.InsertNodeFeature(ctx, sqlc.InsertNodeFeatureParams{
NodeID: nodeID,
FeatureBit: int32(feature),
})
if err != nil {
return 0, fmt.Errorf("unable to insert node(%d) "+
"feature(%v): %w", nodeID, feature, err)
}
}
// Update the node's addresses.
err = upsertNodeAddresses(ctx, db, nodeID, node.Addresses)
newAddresses, err := collectAddressRecords(node.Addresses)
if err != nil {
return 0, fmt.Errorf("inserting node addresses: %w", err)
return 0, err
}
// Any remaining entries in newAddresses are new addresses that need to
// be added to the database for the first time.
for addrType, addrList := range newAddresses {
for position, addr := range addrList {
err := db.InsertNodeAddress(
ctx, sqlc.InsertNodeAddressParams{
NodeID: nodeID,
Type: int16(addrType),
Address: addr,
Position: int32(position),
},
)
if err != nil {
return 0, fmt.Errorf("unable to insert "+
"node(%d) address(%v): %w", nodeID,
addr, err)
}
}
}
// Convert the flat extra opaque data into a map of TLV types to
@@ -1442,10 +1462,19 @@ func insertNodeSQLMig(ctx context.Context, db SQLQueries,
err)
}
// Update the node's extra signed fields.
err = upsertNodeExtraSignedFields(ctx, db, nodeID, extra)
if err != nil {
return 0, fmt.Errorf("inserting node extra TLVs: %w", err)
// Insert the node's extra signed fields.
for tlvType, value := range extra {
err = db.UpsertNodeExtraType(
ctx, sqlc.UpsertNodeExtraTypeParams{
NodeID: nodeID,
Type: int64(tlvType),
Value: value,
},
)
if err != nil {
return 0, fmt.Errorf("unable to upsert node(%d) extra "+
"signed field(%v): %w", nodeID, tlvType, err)
}
}
return nodeID, nil

View File

@@ -3534,23 +3534,11 @@ const (
addressTypeOpaque dbAddressType = math.MaxInt8
)
// upsertNodeAddresses updates the node's addresses in the database. This
// includes deleting any existing addresses and inserting the new set of
// addresses. The deletion is necessary since the ordering of the addresses may
// change, and we need to ensure that the database reflects the latest set of
// addresses so that at the time of reconstructing the node announcement, the
// order is preserved and the signature over the message remains valid.
func upsertNodeAddresses(ctx context.Context, db SQLQueries, nodeID int64,
addresses []net.Addr) error {
// Delete any existing addresses for the node. This is required since
// even if the new set of addresses is the same, the ordering may have
// changed for a given address type.
err := db.DeleteNodeAddresses(ctx, nodeID)
if err != nil {
return fmt.Errorf("unable to delete node(%d) addresses: %w",
nodeID, err)
}
// collectAddressRecords collects the addresses from the provided
// net.Addr slice and returns a map of dbAddressType to a slice of address
// strings.
func collectAddressRecords(addresses []net.Addr) (map[dbAddressType][]string,
error) {
// Copy the nodes latest set of addresses.
newAddresses := map[dbAddressType][]string{
@@ -3573,8 +3561,8 @@ func upsertNodeAddresses(ctx context.Context, db SQLQueries, nodeID int64,
} else if ip6 := addr.IP.To16(); ip6 != nil {
addAddr(addressTypeIPv6, addr)
} else {
return fmt.Errorf("unhandled IP address: %v",
addr)
return nil, fmt.Errorf("unhandled IP "+
"address: %v", addr)
}
case *tor.OnionAddr:
@@ -3584,8 +3572,8 @@ func upsertNodeAddresses(ctx context.Context, db SQLQueries, nodeID int64,
case tor.V3Len:
addAddr(addressTypeTorV3, addr)
default:
return fmt.Errorf("invalid length for a tor " +
"address")
return nil, fmt.Errorf("invalid length for " +
"a tor address")
}
case *lnwire.DNSAddress:
@@ -3595,10 +3583,37 @@ func upsertNodeAddresses(ctx context.Context, db SQLQueries, nodeID int64,
addAddr(addressTypeOpaque, addr)
default:
return fmt.Errorf("unhandled address type: %T", addr)
return nil, fmt.Errorf("unhandled address type: %T",
addr)
}
}
return newAddresses, nil
}
// upsertNodeAddresses updates the node's addresses in the database. This
// includes deleting any existing addresses and inserting the new set of
// addresses. The deletion is necessary since the ordering of the addresses may
// change, and we need to ensure that the database reflects the latest set of
// addresses so that at the time of reconstructing the node announcement, the
// order is preserved and the signature over the message remains valid.
func upsertNodeAddresses(ctx context.Context, db SQLQueries, nodeID int64,
addresses []net.Addr) error {
// Delete any existing addresses for the node. This is required since
// even if the new set of addresses is the same, the ordering may have
// changed for a given address type.
err := db.DeleteNodeAddresses(ctx, nodeID)
if err != nil {
return fmt.Errorf("unable to delete node(%d) addresses: %w",
nodeID, err)
}
newAddresses, err := collectAddressRecords(addresses)
if err != nil {
return err
}
// Any remaining entries in newAddresses are new addresses that need to
// be added to the database for the first time.
for addrType, addrList := range newAddresses {

View File

@@ -2402,7 +2402,8 @@ INSERT INTO graph_node_addresses (
position
) VALUES (
$1, $2, $3, $4
)
) ON CONFLICT (node_id, type, position)
DO UPDATE SET address = EXCLUDED.address
`
type InsertNodeAddressParams struct {
@@ -2432,7 +2433,9 @@ INSERT INTO graph_node_features (
node_id, feature_bit
) VALUES (
$1, $2
)
) ON CONFLICT (node_id, feature_bit)
-- Do nothing if the feature already exists for the node.
DO NOTHING
`
type InsertNodeFeatureParams struct {

View File

@@ -105,7 +105,9 @@ INSERT INTO graph_node_features (
node_id, feature_bit
) VALUES (
$1, $2
);
) ON CONFLICT (node_id, feature_bit)
-- Do nothing if the feature already exists for the node.
DO NOTHING;
-- name: GetNodeFeatures :many
SELECT *
@@ -143,7 +145,8 @@ INSERT INTO graph_node_addresses (
position
) VALUES (
$1, $2, $3, $4
);
) ON CONFLICT (node_id, type, position)
DO UPDATE SET address = EXCLUDED.address;
-- name: GetNodeAddresses :many
SELECT type, address