diff --git a/graph/db/sql_migration.go b/graph/db/sql_migration.go index ae45aa9f5..2f4ede4f8 100644 --- a/graph/db/sql_migration.go +++ b/graph/db/sql_migration.go @@ -1363,9 +1363,6 @@ func forEachClosedSCID(db kvdb.Backend, // update have a newer timestamp than the existing one. This is because we want // the migration to be idempotent and dont want to error out if we re-insert the // exact same node. -// -// TODO(elle): update the upsert calls in this function to be more efficient. -// since no data collection steps should be required during the migration. func insertNodeSQLMig(ctx context.Context, db SQLQueries, node *models.LightningNode) (int64, error) { @@ -1392,19 +1389,42 @@ func insertNodeSQLMig(ctx context.Context, db SQLQueries, return nodeID, nil } - // NOTE: The upserts here will be updated to be more efficient in the - // following commits. - - // Update the node's features. - err = upsertNodeFeatures(ctx, db, nodeID, node.Features) - if err != nil { - return 0, fmt.Errorf("inserting node features: %w", err) + // Insert the node's features. + for feature := range node.Features.Features() { + err = db.InsertNodeFeature(ctx, sqlc.InsertNodeFeatureParams{ + NodeID: nodeID, + FeatureBit: int32(feature), + }) + if err != nil { + return 0, fmt.Errorf("unable to insert node(%d) "+ + "feature(%v): %w", nodeID, feature, err) + } } // Update the node's addresses. - err = upsertNodeAddresses(ctx, db, nodeID, node.Addresses) + newAddresses, err := collectAddressRecords(node.Addresses) if err != nil { - return 0, fmt.Errorf("inserting node addresses: %w", err) + return 0, err + } + + // Any remaining entries in newAddresses are new addresses that need to + // be added to the database for the first time. + for addrType, addrList := range newAddresses { + for position, addr := range addrList { + err := db.UpsertNodeAddress( + ctx, sqlc.UpsertNodeAddressParams{ + NodeID: nodeID, + Type: int16(addrType), + Address: addr, + Position: int32(position), + }, + ) + if err != nil { + return 0, fmt.Errorf("unable to insert "+ + "node(%d) address(%v): %w", nodeID, + addr, err) + } + } } // Convert the flat extra opaque data into a map of TLV types to @@ -1415,10 +1435,19 @@ func insertNodeSQLMig(ctx context.Context, db SQLQueries, err) } - // Update the node's extra signed fields. - err = upsertNodeExtraSignedFields(ctx, db, nodeID, extra) - if err != nil { - return 0, fmt.Errorf("inserting node extra TLVs: %w", err) + // Insert the node's extra signed fields. + for tlvType, value := range extra { + err = db.UpsertNodeExtraType( + ctx, sqlc.UpsertNodeExtraTypeParams{ + NodeID: nodeID, + Type: int64(tlvType), + Value: value, + }, + ) + if err != nil { + return 0, fmt.Errorf("unable to upsert node(%d) extra "+ + "signed field(%v): %w", nodeID, tlvType, err) + } } return nodeID, nil diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 449c4f35d..ef903ec67 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -70,7 +70,7 @@ type SQLQueries interface { UpsertNodeExtraType(ctx context.Context, arg sqlc.UpsertNodeExtraTypeParams) error DeleteExtraNodeType(ctx context.Context, arg sqlc.DeleteExtraNodeTypeParams) error - InsertNodeAddress(ctx context.Context, arg sqlc.InsertNodeAddressParams) error + UpsertNodeAddress(ctx context.Context, arg sqlc.UpsertNodeAddressParams) error GetNodeAddresses(ctx context.Context, nodeID int64) ([]sqlc.GetNodeAddressesRow, error) GetNodeAddressesBatch(ctx context.Context, ids []int64) ([]sqlc.GraphNodeAddress, error) DeleteNodeAddresses(ctx context.Context, nodeID int64) error @@ -3533,23 +3533,11 @@ const ( addressTypeOpaque dbAddressType = math.MaxInt8 ) -// upsertNodeAddresses updates the node's addresses in the database. This -// includes deleting any existing addresses and inserting the new set of -// addresses. The deletion is necessary since the ordering of the addresses may -// change, and we need to ensure that the database reflects the latest set of -// addresses so that at the time of reconstructing the node announcement, the -// order is preserved and the signature over the message remains valid. -func upsertNodeAddresses(ctx context.Context, db SQLQueries, nodeID int64, - addresses []net.Addr) error { - - // Delete any existing addresses for the node. This is required since - // even if the new set of addresses is the same, the ordering may have - // changed for a given address type. - err := db.DeleteNodeAddresses(ctx, nodeID) - if err != nil { - return fmt.Errorf("unable to delete node(%d) addresses: %w", - nodeID, err) - } +// collectAddressRecords collects the addresses from the provided +// net.Addr slice and returns a map of dbAddressType to a slice of address +// strings. +func collectAddressRecords(addresses []net.Addr) (map[dbAddressType][]string, + error) { // Copy the nodes latest set of addresses. newAddresses := map[dbAddressType][]string{ @@ -3571,8 +3559,8 @@ func upsertNodeAddresses(ctx context.Context, db SQLQueries, nodeID int64, } else if ip6 := addr.IP.To16(); ip6 != nil { addAddr(addressTypeIPv6, addr) } else { - return fmt.Errorf("unhandled IP address: %v", - addr) + return nil, fmt.Errorf("unhandled IP "+ + "address: %v", addr) } case *tor.OnionAddr: @@ -3582,24 +3570,51 @@ func upsertNodeAddresses(ctx context.Context, db SQLQueries, nodeID int64, case tor.V3Len: addAddr(addressTypeTorV3, addr) default: - return fmt.Errorf("invalid length for a tor " + - "address") + return nil, fmt.Errorf("invalid length for " + + "a tor address") } case *lnwire.OpaqueAddrs: addAddr(addressTypeOpaque, addr) default: - return fmt.Errorf("unhandled address type: %T", addr) + return nil, fmt.Errorf("unhandled address type: %T", + addr) } } + return newAddresses, nil +} + +// upsertNodeAddresses updates the node's addresses in the database. This +// includes deleting any existing addresses and inserting the new set of +// addresses. The deletion is necessary since the ordering of the addresses may +// change, and we need to ensure that the database reflects the latest set of +// addresses so that at the time of reconstructing the node announcement, the +// order is preserved and the signature over the message remains valid. +func upsertNodeAddresses(ctx context.Context, db SQLQueries, nodeID int64, + addresses []net.Addr) error { + + // Delete any existing addresses for the node. This is required since + // even if the new set of addresses is the same, the ordering may have + // changed for a given address type. + err := db.DeleteNodeAddresses(ctx, nodeID) + if err != nil { + return fmt.Errorf("unable to delete node(%d) addresses: %w", + nodeID, err) + } + + newAddresses, err := collectAddressRecords(addresses) + if err != nil { + return err + } + // Any remaining entries in newAddresses are new addresses that need to // be added to the database for the first time. for addrType, addrList := range newAddresses { for position, addr := range addrList { - err := db.InsertNodeAddress( - ctx, sqlc.InsertNodeAddressParams{ + err := db.UpsertNodeAddress( + ctx, sqlc.UpsertNodeAddressParams{ NodeID: nodeID, Type: int16(addrType), Address: addr, diff --git a/sqldb/sqlc/graph.sql.go b/sqldb/sqlc/graph.sql.go index 7afe8b1b0..ecf100e44 100644 --- a/sqldb/sqlc/graph.sql.go +++ b/sqldb/sqlc/graph.sql.go @@ -2389,39 +2389,6 @@ func (q *Queries) InsertClosedChannel(ctx context.Context, scid []byte) error { return err } -const insertNodeAddress = `-- name: InsertNodeAddress :exec -/* ───────────────────────────────────────────── - graph_node_addresses table queries - ───────────────────────────────────��───────── -*/ - -INSERT INTO graph_node_addresses ( - node_id, - type, - address, - position -) VALUES ( - $1, $2, $3, $4 - ) -` - -type InsertNodeAddressParams struct { - NodeID int64 - Type int16 - Address string - Position int32 -} - -func (q *Queries) InsertNodeAddress(ctx context.Context, arg InsertNodeAddressParams) error { - _, err := q.db.ExecContext(ctx, insertNodeAddress, - arg.NodeID, - arg.Type, - arg.Address, - arg.Position, - ) - return err -} - const insertNodeFeature = `-- name: InsertNodeFeature :exec /* ───────────────────────────────────────────── graph_node_features table queries @@ -2432,7 +2399,9 @@ INSERT INTO graph_node_features ( node_id, feature_bit ) VALUES ( $1, $2 -) +) ON CONFLICT (node_id, feature_bit) + -- Do nothing if the feature already exists for the node. + DO NOTHING ` type InsertNodeFeatureParams struct { @@ -3462,6 +3431,40 @@ func (q *Queries) UpsertNode(ctx context.Context, arg UpsertNodeParams) (int64, return id, err } +const upsertNodeAddress = `-- name: UpsertNodeAddress :exec +/* ───────────────────────────────────────────── + graph_node_addresses table queries + ───────────────────────────────────��───────── +*/ + +INSERT INTO graph_node_addresses ( + node_id, + type, + address, + position +) VALUES ( + $1, $2, $3, $4 +) ON CONFLICT (node_id, type, position) + DO UPDATE SET address = EXCLUDED.address +` + +type UpsertNodeAddressParams struct { + NodeID int64 + Type int16 + Address string + Position int32 +} + +func (q *Queries) UpsertNodeAddress(ctx context.Context, arg UpsertNodeAddressParams) error { + _, err := q.db.ExecContext(ctx, upsertNodeAddress, + arg.NodeID, + arg.Type, + arg.Address, + arg.Position, + ) + return err +} + const upsertNodeExtraType = `-- name: UpsertNodeExtraType :exec /* ───────────────────────────────────────────── graph_node_extra_types table queries diff --git a/sqldb/sqlc/querier.go b/sqldb/sqlc/querier.go index 7696e3651..1123d6bc2 100644 --- a/sqldb/sqlc/querier.go +++ b/sqldb/sqlc/querier.go @@ -97,7 +97,6 @@ type Querier interface { InsertInvoiceHTLCCustomRecord(ctx context.Context, arg InsertInvoiceHTLCCustomRecordParams) error InsertKVInvoiceKeyAndAddIndex(ctx context.Context, arg InsertKVInvoiceKeyAndAddIndexParams) error InsertMigratedInvoice(ctx context.Context, arg InsertMigratedInvoiceParams) (int64, error) - InsertNodeAddress(ctx context.Context, arg InsertNodeAddressParams) error InsertNodeFeature(ctx context.Context, arg InsertNodeFeatureParams) error // NOTE: This query is only meant to be used by the graph SQL migration since // for that migration, in order to be retry-safe, we don't want to error out if @@ -133,6 +132,7 @@ type Querier interface { UpsertAMPSubInvoice(ctx context.Context, arg UpsertAMPSubInvoiceParams) (sql.Result, error) UpsertEdgePolicy(ctx context.Context, arg UpsertEdgePolicyParams) (int64, error) UpsertNode(ctx context.Context, arg UpsertNodeParams) (int64, error) + UpsertNodeAddress(ctx context.Context, arg UpsertNodeAddressParams) error UpsertNodeExtraType(ctx context.Context, arg UpsertNodeExtraTypeParams) error UpsertPruneLogEntry(ctx context.Context, arg UpsertPruneLogEntryParams) error UpsertZombieChannel(ctx context.Context, arg UpsertZombieChannelParams) error diff --git a/sqldb/sqlc/queries/graph.sql b/sqldb/sqlc/queries/graph.sql index 82e9b8736..8a897003d 100644 --- a/sqldb/sqlc/queries/graph.sql +++ b/sqldb/sqlc/queries/graph.sql @@ -105,7 +105,9 @@ INSERT INTO graph_node_features ( node_id, feature_bit ) VALUES ( $1, $2 -); +) ON CONFLICT (node_id, feature_bit) + -- Do nothing if the feature already exists for the node. + DO NOTHING; -- name: GetNodeFeatures :many SELECT * @@ -135,7 +137,7 @@ WHERE node_id = $1 ───────────────────────────────────��───────── */ --- name: InsertNodeAddress :exec +-- name: UpsertNodeAddress :exec INSERT INTO graph_node_addresses ( node_id, type, @@ -143,7 +145,8 @@ INSERT INTO graph_node_addresses ( position ) VALUES ( $1, $2, $3, $4 - ); +) ON CONFLICT (node_id, type, position) + DO UPDATE SET address = EXCLUDED.address; -- name: GetNodeAddresses :many SELECT type, address