diff --git a/graph/db/sql_migration.go b/graph/db/sql_migration.go index 30e516a6a..b88518fd8 100644 --- a/graph/db/sql_migration.go +++ b/graph/db/sql_migration.go @@ -440,7 +440,9 @@ func migrateChannelsAndPolicies(ctx context.Context, cfg *SQLStoreConfig, Interval: 10 * time.Second, } ) - migChanPolicy := func(policy *models.ChannelEdgePolicy) error { + migChanPolicy := func(dbChanInfo *dbChanInfo, + policy *models.ChannelEdgePolicy) error { + // If the policy is nil, we can skip it. if policy == nil { return nil @@ -454,7 +456,7 @@ func migrateChannelsAndPolicies(ctx context.Context, cfg *SQLStoreConfig, policyCount++ - _, _, _, err := insertChanEdgePolicyMig(ctx, sqlDB, policy) + err := insertChanEdgePolicyMig(ctx, sqlDB, dbChanInfo, policy) if err != nil { return fmt.Errorf("could not migrate channel "+ "policy %d: %w", policy.ChannelID, err) @@ -531,12 +533,12 @@ func migrateChannelsAndPolicies(ctx context.Context, cfg *SQLStoreConfig, } // Now, migrate the two channel policies for the channel. - err = migChanPolicy(policy1) + err = migChanPolicy(dbChanInfo, policy1) if err != nil { return fmt.Errorf("could not migrate policy1(%d): %w", scid, err) } - err = migChanPolicy(policy2) + err = migChanPolicy(dbChanInfo, policy2) if err != nil { return fmt.Errorf("could not migrate policy2(%d): %w", scid, err) @@ -1564,45 +1566,14 @@ func insertChannelMig(ctx context.Context, db SQLQueries, // insertChanEdgePolicyMig inserts the channel policy info we have stored for // a channel we already know of. This is used during the SQL migration // process to insert channel policies. -// -// TODO(elle): update this function to be more performant in the migration -// setting. For the sake of keeping the commit that introduced this function -// simple, this is for now mostly the same as updateChanEdgePolicy. func insertChanEdgePolicyMig(ctx context.Context, tx SQLQueries, - edge *models.ChannelEdgePolicy) (route.Vertex, route.Vertex, bool, - error) { - - var ( - node1Pub, node2Pub route.Vertex - isNode1 bool - chanIDB = channelIDToBytes(edge.ChannelID) - ) - - // Check that this edge policy refers to a channel that we already - // know of. We do this explicitly so that we can return the appropriate - // ErrEdgeNotFound error if the channel doesn't exist, rather than - // abort the transaction which would abort the entire batch. - dbChan, err := tx.GetChannelAndNodesBySCID( - ctx, sqlc.GetChannelAndNodesBySCIDParams{ - Scid: chanIDB, - Version: int16(ProtocolV1), - }, - ) - if errors.Is(err, sql.ErrNoRows) { - return node1Pub, node2Pub, false, ErrEdgeNotFound - } else if err != nil { - return node1Pub, node2Pub, false, fmt.Errorf("unable to "+ - "fetch channel(%v): %w", edge.ChannelID, err) - } - - copy(node1Pub[:], dbChan.Node1PubKey) - copy(node2Pub[:], dbChan.Node2PubKey) + dbChan *dbChanInfo, edge *models.ChannelEdgePolicy) error { // Figure out which node this edge is from. - isNode1 = edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 - nodeID := dbChan.NodeID1 + isNode1 := edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 + nodeID := dbChan.node1ID if !isNode1 { - nodeID = dbChan.NodeID2 + nodeID = dbChan.node2ID } var ( @@ -1616,7 +1587,7 @@ func insertChanEdgePolicyMig(ctx context.Context, tx SQLQueries, id, err := tx.InsertEdgePolicyMig(ctx, sqlc.InsertEdgePolicyMigParams{ Version: int16(ProtocolV1), - ChannelID: dbChan.ID, + ChannelID: dbChan.channelID, NodeID: nodeID, Timelock: int32(edge.TimeLockDelta), FeePpm: int64(edge.FeeProportionalMillionths), @@ -1638,24 +1609,32 @@ func insertChanEdgePolicyMig(ctx context.Context, tx SQLQueries, Signature: edge.SigBytes, }) if err != nil { - return node1Pub, node2Pub, isNode1, - fmt.Errorf("unable to upsert edge policy: %w", err) + return fmt.Errorf("unable to upsert edge policy: %w", err) } // Convert the flat extra opaque data into a map of TLV types to // values. extra, err := marshalExtraOpaqueData(edge.ExtraOpaqueData) if err != nil { - return node1Pub, node2Pub, false, fmt.Errorf("unable to "+ - "marshal extra opaque data: %w", err) + return fmt.Errorf("unable to marshal extra opaque data: %w", + err) } - // Update the channel policy's extra signed fields. - err = upsertChanPolicyExtraSignedFields(ctx, tx, id, extra) - if err != nil { - return node1Pub, node2Pub, false, fmt.Errorf("inserting chan "+ - "policy extra TLVs: %w", err) + // Insert all new extra signed fields for the channel policy. + for tlvType, value := range extra { + err = tx.UpsertChanPolicyExtraType( + ctx, sqlc.UpsertChanPolicyExtraTypeParams{ + ChannelPolicyID: id, + Type: int64(tlvType), + Value: value, + }, + ) + if err != nil { + return fmt.Errorf("unable to insert "+ + "channel_policy(%d) extra signed field(%v): %w", + id, tlvType, err) + } } - return node1Pub, node2Pub, isNode1, nil + return nil } diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 60871e17b..1789c70c4 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -124,7 +124,7 @@ type SQLQueries interface { GetChannelPolicyByChannelAndNode(ctx context.Context, arg sqlc.GetChannelPolicyByChannelAndNodeParams) (sqlc.GraphChannelPolicy, error) GetV1DisabledSCIDs(ctx context.Context) ([][]byte, error) - InsertChanPolicyExtraType(ctx context.Context, arg sqlc.InsertChanPolicyExtraTypeParams) error + UpsertChanPolicyExtraType(ctx context.Context, arg sqlc.UpsertChanPolicyExtraTypeParams) error GetChannelPolicyExtraTypesBatch(ctx context.Context, policyIds []int64) ([]sqlc.GetChannelPolicyExtraTypesBatchRow, error) DeleteChannelPolicyExtraTypes(ctx context.Context, channelPolicyID int64) error @@ -3934,8 +3934,8 @@ func upsertChanPolicyExtraSignedFields(ctx context.Context, db SQLQueries, // Insert all new extra signed fields for the channel policy. for tlvType, value := range extraFields { - err = db.InsertChanPolicyExtraType( - ctx, sqlc.InsertChanPolicyExtraTypeParams{ + err = db.UpsertChanPolicyExtraType( + ctx, sqlc.UpsertChanPolicyExtraTypeParams{ ChannelPolicyID: chanPolicyID, Type: int64(tlvType), Value: value, diff --git a/sqldb/sqlc/graph.sql.go b/sqldb/sqlc/graph.sql.go index bdc049d85..4c2be19ea 100644 --- a/sqldb/sqlc/graph.sql.go +++ b/sqldb/sqlc/graph.sql.go @@ -2304,29 +2304,6 @@ func (q *Queries) HighestSCID(ctx context.Context, version int16) ([]byte, error return scid, err } -const insertChanPolicyExtraType = `-- name: InsertChanPolicyExtraType :exec -/* ───────────────────────────────────────────── - graph_channel_policy_extra_types table queries - ───────────────────────────────────────────── -*/ - -INSERT INTO graph_channel_policy_extra_types ( - channel_policy_id, type, value -) -VALUES ($1, $2, $3) -` - -type InsertChanPolicyExtraTypeParams struct { - ChannelPolicyID int64 - Type int64 - Value []byte -} - -func (q *Queries) InsertChanPolicyExtraType(ctx context.Context, arg InsertChanPolicyExtraTypeParams) error { - _, err := q.db.ExecContext(ctx, insertChanPolicyExtraType, arg.ChannelPolicyID, arg.Type, arg.Value) - return err -} - const insertChannelFeature = `-- name: InsertChannelFeature :exec /* ───────────────────────────────────────────── graph_channel_features table queries @@ -3429,6 +3406,33 @@ func (q *Queries) ListNodesPaginated(ctx context.Context, arg ListNodesPaginated return items, nil } +const upsertChanPolicyExtraType = `-- name: UpsertChanPolicyExtraType :exec +/* ───────────────────────────────────────────── + graph_channel_policy_extra_types table queries + ───────────────────────────────────────────── +*/ + +INSERT INTO graph_channel_policy_extra_types ( + channel_policy_id, type, value +) +VALUES ($1, $2, $3) +ON CONFLICT (channel_policy_id, type) + -- If a conflict occurs on channel_policy_id and type, then we update the + -- value. + DO UPDATE SET value = EXCLUDED.value +` + +type UpsertChanPolicyExtraTypeParams struct { + ChannelPolicyID int64 + Type int64 + Value []byte +} + +func (q *Queries) UpsertChanPolicyExtraType(ctx context.Context, arg UpsertChanPolicyExtraTypeParams) error { + _, err := q.db.ExecContext(ctx, upsertChanPolicyExtraType, arg.ChannelPolicyID, arg.Type, arg.Value) + return err +} + const upsertChannelExtraType = `-- name: UpsertChannelExtraType :exec /* ───────────────────────────────────────────── graph_channel_extra_types table queries diff --git a/sqldb/sqlc/querier.go b/sqldb/sqlc/querier.go index 6d38597a4..0087559be 100644 --- a/sqldb/sqlc/querier.go +++ b/sqldb/sqlc/querier.go @@ -87,7 +87,6 @@ type Querier interface { HighestSCID(ctx context.Context, version int16) ([]byte, error) InsertAMPSubInvoice(ctx context.Context, arg InsertAMPSubInvoiceParams) error InsertAMPSubInvoiceHTLC(ctx context.Context, arg InsertAMPSubInvoiceHTLCParams) error - InsertChanPolicyExtraType(ctx context.Context, arg InsertChanPolicyExtraTypeParams) error InsertChannelFeature(ctx context.Context, arg InsertChannelFeatureParams) error // NOTE: This query is only meant to be used by the graph SQL migration since // for that migration, in order to be retry-safe, we don't want to error out if @@ -141,6 +140,7 @@ type Querier interface { UpdateInvoiceHTLCs(ctx context.Context, arg UpdateInvoiceHTLCsParams) error UpdateInvoiceState(ctx context.Context, arg UpdateInvoiceStateParams) (sql.Result, error) UpsertAMPSubInvoice(ctx context.Context, arg UpsertAMPSubInvoiceParams) (sql.Result, error) + UpsertChanPolicyExtraType(ctx context.Context, arg UpsertChanPolicyExtraTypeParams) error UpsertChannelExtraType(ctx context.Context, arg UpsertChannelExtraTypeParams) error UpsertEdgePolicy(ctx context.Context, arg UpsertEdgePolicyParams) (int64, error) UpsertNode(ctx context.Context, arg UpsertNodeParams) (int64, error) diff --git a/sqldb/sqlc/queries/graph.sql b/sqldb/sqlc/queries/graph.sql index 7f12cd0c5..f2224c00f 100644 --- a/sqldb/sqlc/queries/graph.sql +++ b/sqldb/sqlc/queries/graph.sql @@ -869,11 +869,15 @@ WHERE c.scid = @scid ───────────────────────────────────────────── */ --- name: InsertChanPolicyExtraType :exec +-- name: UpsertChanPolicyExtraType :exec INSERT INTO graph_channel_policy_extra_types ( channel_policy_id, type, value ) -VALUES ($1, $2, $3); +VALUES ($1, $2, $3) +ON CONFLICT (channel_policy_id, type) + -- If a conflict occurs on channel_policy_id and type, then we update the + -- value. + DO UPDATE SET value = EXCLUDED.value; -- name: GetChannelPolicyExtraTypesBatch :many SELECT