From d9754c6e3a94ef1081a75fd917a2f0772e2df48f Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 15 Aug 2025 09:43:40 +0200 Subject: [PATCH] graph/db+sqldb: improve efficiency of node migration There is no need to use the "collect-then-update" pattern for node insertion during the SQL migration since if we do have any previously persisted data for the node and happen to re-run the insertion for that node, the data will be exactly the same. So we can make use of "On conflict, no nothing" here too. --- graph/db/sql_migration.go | 61 ++++++++++++++++++++++++++---------- graph/db/sql_store.go | 59 +++++++++++++++++++++------------- sqldb/sqlc/graph.sql.go | 7 +++-- sqldb/sqlc/queries/graph.sql | 7 +++-- 4 files changed, 92 insertions(+), 42 deletions(-) diff --git a/graph/db/sql_migration.go b/graph/db/sql_migration.go index ec1c27c98..760ad8045 100644 --- a/graph/db/sql_migration.go +++ b/graph/db/sql_migration.go @@ -1390,9 +1390,6 @@ func forEachClosedSCID(db kvdb.Backend, // update have a newer timestamp than the existing one. This is because we want // the migration to be idempotent and dont want to error out if we re-insert the // exact same node. -// -// TODO(elle): update the upsert calls in this function to be more efficient. -// since no data collection steps should be required during the migration. func insertNodeSQLMig(ctx context.Context, db SQLQueries, node *models.LightningNode) (int64, error) { @@ -1419,19 +1416,42 @@ func insertNodeSQLMig(ctx context.Context, db SQLQueries, return nodeID, nil } - // NOTE: The upserts here will be updated to be more efficient in the - // following commits. - - // Update the node's features. - err = upsertNodeFeatures(ctx, db, nodeID, node.Features) - if err != nil { - return 0, fmt.Errorf("inserting node features: %w", err) + // Insert the node's features. + for feature := range node.Features.Features() { + err = db.InsertNodeFeature(ctx, sqlc.InsertNodeFeatureParams{ + NodeID: nodeID, + FeatureBit: int32(feature), + }) + if err != nil { + return 0, fmt.Errorf("unable to insert node(%d) "+ + "feature(%v): %w", nodeID, feature, err) + } } // Update the node's addresses. - err = upsertNodeAddresses(ctx, db, nodeID, node.Addresses) + newAddresses, err := collectAddressRecords(node.Addresses) if err != nil { - return 0, fmt.Errorf("inserting node addresses: %w", err) + return 0, err + } + + // Any remaining entries in newAddresses are new addresses that need to + // be added to the database for the first time. + for addrType, addrList := range newAddresses { + for position, addr := range addrList { + err := db.InsertNodeAddress( + ctx, sqlc.InsertNodeAddressParams{ + NodeID: nodeID, + Type: int16(addrType), + Address: addr, + Position: int32(position), + }, + ) + if err != nil { + return 0, fmt.Errorf("unable to insert "+ + "node(%d) address(%v): %w", nodeID, + addr, err) + } + } } // Convert the flat extra opaque data into a map of TLV types to @@ -1442,10 +1462,19 @@ func insertNodeSQLMig(ctx context.Context, db SQLQueries, err) } - // Update the node's extra signed fields. - err = upsertNodeExtraSignedFields(ctx, db, nodeID, extra) - if err != nil { - return 0, fmt.Errorf("inserting node extra TLVs: %w", err) + // Insert the node's extra signed fields. + for tlvType, value := range extra { + err = db.UpsertNodeExtraType( + ctx, sqlc.UpsertNodeExtraTypeParams{ + NodeID: nodeID, + Type: int64(tlvType), + Value: value, + }, + ) + if err != nil { + return 0, fmt.Errorf("unable to upsert node(%d) extra "+ + "signed field(%v): %w", nodeID, tlvType, err) + } } return nodeID, nil diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 5dff752f2..801634d31 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -3534,23 +3534,11 @@ const ( addressTypeOpaque dbAddressType = math.MaxInt8 ) -// upsertNodeAddresses updates the node's addresses in the database. This -// includes deleting any existing addresses and inserting the new set of -// addresses. The deletion is necessary since the ordering of the addresses may -// change, and we need to ensure that the database reflects the latest set of -// addresses so that at the time of reconstructing the node announcement, the -// order is preserved and the signature over the message remains valid. -func upsertNodeAddresses(ctx context.Context, db SQLQueries, nodeID int64, - addresses []net.Addr) error { - - // Delete any existing addresses for the node. This is required since - // even if the new set of addresses is the same, the ordering may have - // changed for a given address type. - err := db.DeleteNodeAddresses(ctx, nodeID) - if err != nil { - return fmt.Errorf("unable to delete node(%d) addresses: %w", - nodeID, err) - } +// collectAddressRecords collects the addresses from the provided +// net.Addr slice and returns a map of dbAddressType to a slice of address +// strings. +func collectAddressRecords(addresses []net.Addr) (map[dbAddressType][]string, + error) { // Copy the nodes latest set of addresses. newAddresses := map[dbAddressType][]string{ @@ -3573,8 +3561,8 @@ func upsertNodeAddresses(ctx context.Context, db SQLQueries, nodeID int64, } else if ip6 := addr.IP.To16(); ip6 != nil { addAddr(addressTypeIPv6, addr) } else { - return fmt.Errorf("unhandled IP address: %v", - addr) + return nil, fmt.Errorf("unhandled IP "+ + "address: %v", addr) } case *tor.OnionAddr: @@ -3584,8 +3572,8 @@ func upsertNodeAddresses(ctx context.Context, db SQLQueries, nodeID int64, case tor.V3Len: addAddr(addressTypeTorV3, addr) default: - return fmt.Errorf("invalid length for a tor " + - "address") + return nil, fmt.Errorf("invalid length for " + + "a tor address") } case *lnwire.DNSAddress: @@ -3595,10 +3583,37 @@ func upsertNodeAddresses(ctx context.Context, db SQLQueries, nodeID int64, addAddr(addressTypeOpaque, addr) default: - return fmt.Errorf("unhandled address type: %T", addr) + return nil, fmt.Errorf("unhandled address type: %T", + addr) } } + return newAddresses, nil +} + +// upsertNodeAddresses updates the node's addresses in the database. This +// includes deleting any existing addresses and inserting the new set of +// addresses. The deletion is necessary since the ordering of the addresses may +// change, and we need to ensure that the database reflects the latest set of +// addresses so that at the time of reconstructing the node announcement, the +// order is preserved and the signature over the message remains valid. +func upsertNodeAddresses(ctx context.Context, db SQLQueries, nodeID int64, + addresses []net.Addr) error { + + // Delete any existing addresses for the node. This is required since + // even if the new set of addresses is the same, the ordering may have + // changed for a given address type. + err := db.DeleteNodeAddresses(ctx, nodeID) + if err != nil { + return fmt.Errorf("unable to delete node(%d) addresses: %w", + nodeID, err) + } + + newAddresses, err := collectAddressRecords(addresses) + if err != nil { + return err + } + // Any remaining entries in newAddresses are new addresses that need to // be added to the database for the first time. for addrType, addrList := range newAddresses { diff --git a/sqldb/sqlc/graph.sql.go b/sqldb/sqlc/graph.sql.go index 7afe8b1b0..d8152ced8 100644 --- a/sqldb/sqlc/graph.sql.go +++ b/sqldb/sqlc/graph.sql.go @@ -2402,7 +2402,8 @@ INSERT INTO graph_node_addresses ( position ) VALUES ( $1, $2, $3, $4 - ) +) ON CONFLICT (node_id, type, position) + DO UPDATE SET address = EXCLUDED.address ` type InsertNodeAddressParams struct { @@ -2432,7 +2433,9 @@ INSERT INTO graph_node_features ( node_id, feature_bit ) VALUES ( $1, $2 -) +) ON CONFLICT (node_id, feature_bit) + -- Do nothing if the feature already exists for the node. + DO NOTHING ` type InsertNodeFeatureParams struct { diff --git a/sqldb/sqlc/queries/graph.sql b/sqldb/sqlc/queries/graph.sql index 82e9b8736..c5ebcc0a4 100644 --- a/sqldb/sqlc/queries/graph.sql +++ b/sqldb/sqlc/queries/graph.sql @@ -105,7 +105,9 @@ INSERT INTO graph_node_features ( node_id, feature_bit ) VALUES ( $1, $2 -); +) ON CONFLICT (node_id, feature_bit) + -- Do nothing if the feature already exists for the node. + DO NOTHING; -- name: GetNodeFeatures :many SELECT * @@ -143,7 +145,8 @@ INSERT INTO graph_node_addresses ( position ) VALUES ( $1, $2, $3, $4 - ); +) ON CONFLICT (node_id, type, position) + DO UPDATE SET address = EXCLUDED.address; -- name: GetNodeAddresses :many SELECT type, address