graph/db+sqldb: improve performance of chan update sql migration

This commit simplifies insertChanEdgePolicyMig. Much of the logic can be
removed given that this method is only used in the context of the graph
SQL migration.

This should improve the performance of the migration quite a lot since
it removes the extra GetChannelAndNodesBySCID call.
This commit is contained in:
Elle Mouton
2025-08-15 10:30:01 +02:00
parent 22bf88e900
commit f2ed5564ef
5 changed files with 66 additions and 79 deletions

View File

@@ -440,7 +440,9 @@ func migrateChannelsAndPolicies(ctx context.Context, cfg *SQLStoreConfig,
Interval: 10 * time.Second,
}
)
migChanPolicy := func(policy *models.ChannelEdgePolicy) error {
migChanPolicy := func(dbChanInfo *dbChanInfo,
policy *models.ChannelEdgePolicy) error {
// If the policy is nil, we can skip it.
if policy == nil {
return nil
@@ -454,7 +456,7 @@ func migrateChannelsAndPolicies(ctx context.Context, cfg *SQLStoreConfig,
policyCount++
_, _, _, err := insertChanEdgePolicyMig(ctx, sqlDB, policy)
err := insertChanEdgePolicyMig(ctx, sqlDB, dbChanInfo, policy)
if err != nil {
return fmt.Errorf("could not migrate channel "+
"policy %d: %w", policy.ChannelID, err)
@@ -531,12 +533,12 @@ func migrateChannelsAndPolicies(ctx context.Context, cfg *SQLStoreConfig,
}
// Now, migrate the two channel policies for the channel.
err = migChanPolicy(policy1)
err = migChanPolicy(dbChanInfo, policy1)
if err != nil {
return fmt.Errorf("could not migrate policy1(%d): %w",
scid, err)
}
err = migChanPolicy(policy2)
err = migChanPolicy(dbChanInfo, policy2)
if err != nil {
return fmt.Errorf("could not migrate policy2(%d): %w",
scid, err)
@@ -1564,45 +1566,14 @@ func insertChannelMig(ctx context.Context, db SQLQueries,
// insertChanEdgePolicyMig inserts the channel policy info we have stored for
// a channel we already know of. This is used during the SQL migration
// process to insert channel policies.
//
// TODO(elle): update this function to be more performant in the migration
// setting. For the sake of keeping the commit that introduced this function
// simple, this is for now mostly the same as updateChanEdgePolicy.
func insertChanEdgePolicyMig(ctx context.Context, tx SQLQueries,
edge *models.ChannelEdgePolicy) (route.Vertex, route.Vertex, bool,
error) {
var (
node1Pub, node2Pub route.Vertex
isNode1 bool
chanIDB = channelIDToBytes(edge.ChannelID)
)
// Check that this edge policy refers to a channel that we already
// know of. We do this explicitly so that we can return the appropriate
// ErrEdgeNotFound error if the channel doesn't exist, rather than
// abort the transaction which would abort the entire batch.
dbChan, err := tx.GetChannelAndNodesBySCID(
ctx, sqlc.GetChannelAndNodesBySCIDParams{
Scid: chanIDB,
Version: int16(ProtocolV1),
},
)
if errors.Is(err, sql.ErrNoRows) {
return node1Pub, node2Pub, false, ErrEdgeNotFound
} else if err != nil {
return node1Pub, node2Pub, false, fmt.Errorf("unable to "+
"fetch channel(%v): %w", edge.ChannelID, err)
}
copy(node1Pub[:], dbChan.Node1PubKey)
copy(node2Pub[:], dbChan.Node2PubKey)
dbChan *dbChanInfo, edge *models.ChannelEdgePolicy) error {
// Figure out which node this edge is from.
isNode1 = edge.ChannelFlags&lnwire.ChanUpdateDirection == 0
nodeID := dbChan.NodeID1
isNode1 := edge.ChannelFlags&lnwire.ChanUpdateDirection == 0
nodeID := dbChan.node1ID
if !isNode1 {
nodeID = dbChan.NodeID2
nodeID = dbChan.node2ID
}
var (
@@ -1616,7 +1587,7 @@ func insertChanEdgePolicyMig(ctx context.Context, tx SQLQueries,
id, err := tx.InsertEdgePolicyMig(ctx, sqlc.InsertEdgePolicyMigParams{
Version: int16(ProtocolV1),
ChannelID: dbChan.ID,
ChannelID: dbChan.channelID,
NodeID: nodeID,
Timelock: int32(edge.TimeLockDelta),
FeePpm: int64(edge.FeeProportionalMillionths),
@@ -1638,24 +1609,32 @@ func insertChanEdgePolicyMig(ctx context.Context, tx SQLQueries,
Signature: edge.SigBytes,
})
if err != nil {
return node1Pub, node2Pub, isNode1,
fmt.Errorf("unable to upsert edge policy: %w", err)
return fmt.Errorf("unable to upsert edge policy: %w", err)
}
// Convert the flat extra opaque data into a map of TLV types to
// values.
extra, err := marshalExtraOpaqueData(edge.ExtraOpaqueData)
if err != nil {
return node1Pub, node2Pub, false, fmt.Errorf("unable to "+
"marshal extra opaque data: %w", err)
return fmt.Errorf("unable to marshal extra opaque data: %w",
err)
}
// Update the channel policy's extra signed fields.
err = upsertChanPolicyExtraSignedFields(ctx, tx, id, extra)
// Insert all new extra signed fields for the channel policy.
for tlvType, value := range extra {
err = tx.UpsertChanPolicyExtraType(
ctx, sqlc.UpsertChanPolicyExtraTypeParams{
ChannelPolicyID: id,
Type: int64(tlvType),
Value: value,
},
)
if err != nil {
return node1Pub, node2Pub, false, fmt.Errorf("inserting chan "+
"policy extra TLVs: %w", err)
return fmt.Errorf("unable to insert "+
"channel_policy(%d) extra signed field(%v): %w",
id, tlvType, err)
}
}
return node1Pub, node2Pub, isNode1, nil
return nil
}

View File

@@ -124,7 +124,7 @@ type SQLQueries interface {
GetChannelPolicyByChannelAndNode(ctx context.Context, arg sqlc.GetChannelPolicyByChannelAndNodeParams) (sqlc.GraphChannelPolicy, error)
GetV1DisabledSCIDs(ctx context.Context) ([][]byte, error)
InsertChanPolicyExtraType(ctx context.Context, arg sqlc.InsertChanPolicyExtraTypeParams) error
UpsertChanPolicyExtraType(ctx context.Context, arg sqlc.UpsertChanPolicyExtraTypeParams) error
GetChannelPolicyExtraTypesBatch(ctx context.Context, policyIds []int64) ([]sqlc.GetChannelPolicyExtraTypesBatchRow, error)
DeleteChannelPolicyExtraTypes(ctx context.Context, channelPolicyID int64) error
@@ -3934,8 +3934,8 @@ func upsertChanPolicyExtraSignedFields(ctx context.Context, db SQLQueries,
// Insert all new extra signed fields for the channel policy.
for tlvType, value := range extraFields {
err = db.InsertChanPolicyExtraType(
ctx, sqlc.InsertChanPolicyExtraTypeParams{
err = db.UpsertChanPolicyExtraType(
ctx, sqlc.UpsertChanPolicyExtraTypeParams{
ChannelPolicyID: chanPolicyID,
Type: int64(tlvType),
Value: value,

View File

@@ -2304,29 +2304,6 @@ func (q *Queries) HighestSCID(ctx context.Context, version int16) ([]byte, error
return scid, err
}
const insertChanPolicyExtraType = `-- name: InsertChanPolicyExtraType :exec
/* ─────────────────────────────────────────────
graph_channel_policy_extra_types table queries
─────────────────────────────────────────────
*/
INSERT INTO graph_channel_policy_extra_types (
channel_policy_id, type, value
)
VALUES ($1, $2, $3)
`
type InsertChanPolicyExtraTypeParams struct {
ChannelPolicyID int64
Type int64
Value []byte
}
func (q *Queries) InsertChanPolicyExtraType(ctx context.Context, arg InsertChanPolicyExtraTypeParams) error {
_, err := q.db.ExecContext(ctx, insertChanPolicyExtraType, arg.ChannelPolicyID, arg.Type, arg.Value)
return err
}
const insertChannelFeature = `-- name: InsertChannelFeature :exec
/* ─────────────────────────────────────────────
graph_channel_features table queries
@@ -3429,6 +3406,33 @@ func (q *Queries) ListNodesPaginated(ctx context.Context, arg ListNodesPaginated
return items, nil
}
const upsertChanPolicyExtraType = `-- name: UpsertChanPolicyExtraType :exec
/* ─────────────────────────────────────────────
graph_channel_policy_extra_types table queries
─────────────────────────────────────────────
*/
INSERT INTO graph_channel_policy_extra_types (
channel_policy_id, type, value
)
VALUES ($1, $2, $3)
ON CONFLICT (channel_policy_id, type)
-- If a conflict occurs on channel_policy_id and type, then we update the
-- value.
DO UPDATE SET value = EXCLUDED.value
`
type UpsertChanPolicyExtraTypeParams struct {
ChannelPolicyID int64
Type int64
Value []byte
}
func (q *Queries) UpsertChanPolicyExtraType(ctx context.Context, arg UpsertChanPolicyExtraTypeParams) error {
_, err := q.db.ExecContext(ctx, upsertChanPolicyExtraType, arg.ChannelPolicyID, arg.Type, arg.Value)
return err
}
const upsertChannelExtraType = `-- name: UpsertChannelExtraType :exec
/* ─────────────────────────────────────────────
graph_channel_extra_types table queries

View File

@@ -87,7 +87,6 @@ type Querier interface {
HighestSCID(ctx context.Context, version int16) ([]byte, error)
InsertAMPSubInvoice(ctx context.Context, arg InsertAMPSubInvoiceParams) error
InsertAMPSubInvoiceHTLC(ctx context.Context, arg InsertAMPSubInvoiceHTLCParams) error
InsertChanPolicyExtraType(ctx context.Context, arg InsertChanPolicyExtraTypeParams) error
InsertChannelFeature(ctx context.Context, arg InsertChannelFeatureParams) error
// NOTE: This query is only meant to be used by the graph SQL migration since
// for that migration, in order to be retry-safe, we don't want to error out if
@@ -141,6 +140,7 @@ type Querier interface {
UpdateInvoiceHTLCs(ctx context.Context, arg UpdateInvoiceHTLCsParams) error
UpdateInvoiceState(ctx context.Context, arg UpdateInvoiceStateParams) (sql.Result, error)
UpsertAMPSubInvoice(ctx context.Context, arg UpsertAMPSubInvoiceParams) (sql.Result, error)
UpsertChanPolicyExtraType(ctx context.Context, arg UpsertChanPolicyExtraTypeParams) error
UpsertChannelExtraType(ctx context.Context, arg UpsertChannelExtraTypeParams) error
UpsertEdgePolicy(ctx context.Context, arg UpsertEdgePolicyParams) (int64, error)
UpsertNode(ctx context.Context, arg UpsertNodeParams) (int64, error)

View File

@@ -869,11 +869,15 @@ WHERE c.scid = @scid
─────────────────────────────────────────────
*/
-- name: InsertChanPolicyExtraType :exec
-- name: UpsertChanPolicyExtraType :exec
INSERT INTO graph_channel_policy_extra_types (
channel_policy_id, type, value
)
VALUES ($1, $2, $3);
VALUES ($1, $2, $3)
ON CONFLICT (channel_policy_id, type)
-- If a conflict occurs on channel_policy_id and type, then we update the
-- value.
DO UPDATE SET value = EXCLUDED.value;
-- name: GetChannelPolicyExtraTypesBatch :many
SELECT