From 22bf88e900dc6290194f4f6fb16c4444181434ef Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Fri, 15 Aug 2025 10:21:11 +0200 Subject: [PATCH] graph/db+sqldb: make policy migration idempotent Finally, we make the channel-policy part of the SQL migration idempotent by adding a migration-only policy insert query which will not error out if the policy already exists and does not have a timestamp that is newer than the existing records timestamp. To keep the commit simple, a insertChanEdgePolicyMig function is added which is basically identical to the updateChanEdgePolicy function except for the fact that it uses the newly added query. In the next commit, it will be simplified even more. --- graph/db/sql_migration.go | 101 ++++++++++++++++++++++++++++++++- graph/db/sql_migration_test.go | 26 ++------- graph/db/sql_store.go | 1 + sqldb/sqlc/graph.sql.go | 76 +++++++++++++++++++++++++ sqldb/sqlc/querier.go | 6 ++ sqldb/sqlc/queries/graph.sql | 34 +++++++++++ 6 files changed, 221 insertions(+), 23 deletions(-) diff --git a/graph/db/sql_migration.go b/graph/db/sql_migration.go index 61e61526c..30e516a6a 100644 --- a/graph/db/sql_migration.go +++ b/graph/db/sql_migration.go @@ -454,7 +454,7 @@ func migrateChannelsAndPolicies(ctx context.Context, cfg *SQLStoreConfig, policyCount++ - _, _, _, err := updateChanEdgePolicy(ctx, sqlDB, policy) + _, _, _, err := insertChanEdgePolicyMig(ctx, sqlDB, policy) if err != nil { return fmt.Errorf("could not migrate channel "+ "policy %d: %w", policy.ChannelID, err) @@ -1560,3 +1560,102 @@ func insertChannelMig(ctx context.Context, db SQLQueries, node2ID: node2DBID, }, nil } + +// insertChanEdgePolicyMig inserts the channel policy info we have stored for +// a channel we already know of. This is used during the SQL migration +// process to insert channel policies. +// +// TODO(elle): update this function to be more performant in the migration +// setting. For the sake of keeping the commit that introduced this function +// simple, this is for now mostly the same as updateChanEdgePolicy. +func insertChanEdgePolicyMig(ctx context.Context, tx SQLQueries, + edge *models.ChannelEdgePolicy) (route.Vertex, route.Vertex, bool, + error) { + + var ( + node1Pub, node2Pub route.Vertex + isNode1 bool + chanIDB = channelIDToBytes(edge.ChannelID) + ) + + // Check that this edge policy refers to a channel that we already + // know of. We do this explicitly so that we can return the appropriate + // ErrEdgeNotFound error if the channel doesn't exist, rather than + // abort the transaction which would abort the entire batch. + dbChan, err := tx.GetChannelAndNodesBySCID( + ctx, sqlc.GetChannelAndNodesBySCIDParams{ + Scid: chanIDB, + Version: int16(ProtocolV1), + }, + ) + if errors.Is(err, sql.ErrNoRows) { + return node1Pub, node2Pub, false, ErrEdgeNotFound + } else if err != nil { + return node1Pub, node2Pub, false, fmt.Errorf("unable to "+ + "fetch channel(%v): %w", edge.ChannelID, err) + } + + copy(node1Pub[:], dbChan.Node1PubKey) + copy(node2Pub[:], dbChan.Node2PubKey) + + // Figure out which node this edge is from. + isNode1 = edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 + nodeID := dbChan.NodeID1 + if !isNode1 { + nodeID = dbChan.NodeID2 + } + + var ( + inboundBase sql.NullInt64 + inboundRate sql.NullInt64 + ) + edge.InboundFee.WhenSome(func(fee lnwire.Fee) { + inboundRate = sqldb.SQLInt64(fee.FeeRate) + inboundBase = sqldb.SQLInt64(fee.BaseFee) + }) + + id, err := tx.InsertEdgePolicyMig(ctx, sqlc.InsertEdgePolicyMigParams{ + Version: int16(ProtocolV1), + ChannelID: dbChan.ID, + NodeID: nodeID, + Timelock: int32(edge.TimeLockDelta), + FeePpm: int64(edge.FeeProportionalMillionths), + BaseFeeMsat: int64(edge.FeeBaseMSat), + MinHtlcMsat: int64(edge.MinHTLC), + LastUpdate: sqldb.SQLInt64(edge.LastUpdate.Unix()), + Disabled: sql.NullBool{ + Valid: true, + Bool: edge.IsDisabled(), + }, + MaxHtlcMsat: sql.NullInt64{ + Valid: edge.MessageFlags.HasMaxHtlc(), + Int64: int64(edge.MaxHTLC), + }, + MessageFlags: sqldb.SQLInt16(edge.MessageFlags), + ChannelFlags: sqldb.SQLInt16(edge.ChannelFlags), + InboundBaseFeeMsat: inboundBase, + InboundFeeRateMilliMsat: inboundRate, + Signature: edge.SigBytes, + }) + if err != nil { + return node1Pub, node2Pub, isNode1, + fmt.Errorf("unable to upsert edge policy: %w", err) + } + + // Convert the flat extra opaque data into a map of TLV types to + // values. + extra, err := marshalExtraOpaqueData(edge.ExtraOpaqueData) + if err != nil { + return node1Pub, node2Pub, false, fmt.Errorf("unable to "+ + "marshal extra opaque data: %w", err) + } + + // Update the channel policy's extra signed fields. + err = upsertChanPolicyExtraSignedFields(ctx, tx, id, extra) + if err != nil { + return node1Pub, node2Pub, false, fmt.Errorf("inserting chan "+ + "policy extra TLVs: %w", err) + } + + return node1Pub, node2Pub, isNode1, nil +} diff --git a/graph/db/sql_migration_test.go b/graph/db/sql_migration_test.go index aaf70990b..3833b8cc6 100644 --- a/graph/db/sql_migration_test.go +++ b/graph/db/sql_migration_test.go @@ -105,15 +105,6 @@ func TestMigrateGraphToSQL(t *testing.T) { write func(t *testing.T, db *KVStore, object any) objects []any expGraphStats graphStats - - // expNotRetrySafety is true if we expect an error to occur for - // the test if the migration is run twice. In other-words, if - // the specific case in question is currently not idempotent. - // - // NOTE: we want _all_ the cases here to be idempotent, so this - // is a temporary field which will be removed once we have - // properly made the migration retry-safe. - expNotRetrySafety bool }{ { name: "empty", @@ -297,7 +288,6 @@ func TestMigrateGraphToSQL(t *testing.T) { numChannels: 3, numPolicies: 3, }, - expNotRetrySafety: true, }, { name: "prune log", @@ -409,19 +399,11 @@ func TestMigrateGraphToSQL(t *testing.T) { // Validate that the two databases are now in sync. assertInSync(t, kvDB, sql, test.expGraphStats) - // NOTE: for now, not all the cases in the test are - // retry safe! The aim is to completely remove this - // field once we have made the migration retry-safe. + // The migration should be retry-safe, so running it + // again should not change the state of the databases. err = MigrateGraphToSQL(ctx, sql.cfg, kvDB.db, sql.db) - if !test.expNotRetrySafety { - // The migration should be retry-safe, so - // running it again should not change the state - // of the databases. - require.NoError(t, err) - assertInSync(t, kvDB, sql, test.expGraphStats) - } else { - require.Error(t, err) - } + require.NoError(t, err) + assertInSync(t, kvDB, sql, test.expGraphStats) }) } } diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 1879f6f38..60871e17b 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -164,6 +164,7 @@ type SQLQueries interface { */ InsertNodeMig(ctx context.Context, arg sqlc.InsertNodeMigParams) (int64, error) InsertChannelMig(ctx context.Context, arg sqlc.InsertChannelMigParams) (int64, error) + InsertEdgePolicyMig(ctx context.Context, arg sqlc.InsertEdgePolicyMigParams) (int64, error) } // BatchedSQLQueries is a version of SQLQueries that's capable of batched diff --git a/sqldb/sqlc/graph.sql.go b/sqldb/sqlc/graph.sql.go index 1e32bc33b..bdc049d85 100644 --- a/sqldb/sqlc/graph.sql.go +++ b/sqldb/sqlc/graph.sql.go @@ -2434,6 +2434,82 @@ func (q *Queries) InsertClosedChannel(ctx context.Context, scid []byte) error { return err } +const insertEdgePolicyMig = `-- name: InsertEdgePolicyMig :one +INSERT INTO graph_channel_policies ( + version, channel_id, node_id, timelock, fee_ppm, + base_fee_msat, min_htlc_msat, last_update, disabled, + max_htlc_msat, inbound_base_fee_msat, + inbound_fee_rate_milli_msat, message_flags, channel_flags, + signature +) VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15 +) +ON CONFLICT (channel_id, node_id, version) + -- If a conflict occurs, we have already migrated this policy. However, we + -- still need to do an "UPDATE SET" here instead of "DO NOTHING" because + -- otherwise, the "RETURNING id" part does not work. + DO UPDATE SET + timelock = EXCLUDED.timelock, + fee_ppm = EXCLUDED.fee_ppm, + base_fee_msat = EXCLUDED.base_fee_msat, + min_htlc_msat = EXCLUDED.min_htlc_msat, + last_update = EXCLUDED.last_update, + disabled = EXCLUDED.disabled, + max_htlc_msat = EXCLUDED.max_htlc_msat, + inbound_base_fee_msat = EXCLUDED.inbound_base_fee_msat, + inbound_fee_rate_milli_msat = EXCLUDED.inbound_fee_rate_milli_msat, + message_flags = EXCLUDED.message_flags, + channel_flags = EXCLUDED.channel_flags, + signature = EXCLUDED.signature +RETURNING id +` + +type InsertEdgePolicyMigParams struct { + Version int16 + ChannelID int64 + NodeID int64 + Timelock int32 + FeePpm int64 + BaseFeeMsat int64 + MinHtlcMsat int64 + LastUpdate sql.NullInt64 + Disabled sql.NullBool + MaxHtlcMsat sql.NullInt64 + InboundBaseFeeMsat sql.NullInt64 + InboundFeeRateMilliMsat sql.NullInt64 + MessageFlags sql.NullInt16 + ChannelFlags sql.NullInt16 + Signature []byte +} + +// NOTE: This query is only meant to be used by the graph SQL migration since +// for that migration, in order to be retry-safe, we don't want to error out if +// we re-insert the same policy (which would error if the normal +// UpsertEdgePolicy query is used because of the constraint in that query that +// requires a policy update to have a newer last_update than the existing one). +func (q *Queries) InsertEdgePolicyMig(ctx context.Context, arg InsertEdgePolicyMigParams) (int64, error) { + row := q.db.QueryRowContext(ctx, insertEdgePolicyMig, + arg.Version, + arg.ChannelID, + arg.NodeID, + arg.Timelock, + arg.FeePpm, + arg.BaseFeeMsat, + arg.MinHtlcMsat, + arg.LastUpdate, + arg.Disabled, + arg.MaxHtlcMsat, + arg.InboundBaseFeeMsat, + arg.InboundFeeRateMilliMsat, + arg.MessageFlags, + arg.ChannelFlags, + arg.Signature, + ) + var id int64 + err := row.Scan(&id) + return id, err +} + const insertNodeFeature = `-- name: InsertNodeFeature :exec /* ───────────────────────────────────────────── graph_node_features table queries diff --git a/sqldb/sqlc/querier.go b/sqldb/sqlc/querier.go index 0011f086a..6d38597a4 100644 --- a/sqldb/sqlc/querier.go +++ b/sqldb/sqlc/querier.go @@ -96,6 +96,12 @@ type Querier interface { // and version columns). InsertChannelMig(ctx context.Context, arg InsertChannelMigParams) (int64, error) InsertClosedChannel(ctx context.Context, scid []byte) error + // NOTE: This query is only meant to be used by the graph SQL migration since + // for that migration, in order to be retry-safe, we don't want to error out if + // we re-insert the same policy (which would error if the normal + // UpsertEdgePolicy query is used because of the constraint in that query that + // requires a policy update to have a newer last_update than the existing one). + InsertEdgePolicyMig(ctx context.Context, arg InsertEdgePolicyMigParams) (int64, error) InsertInvoice(ctx context.Context, arg InsertInvoiceParams) (int64, error) InsertInvoiceFeature(ctx context.Context, arg InsertInvoiceFeatureParams) error InsertInvoiceHTLC(ctx context.Context, arg InsertInvoiceHTLCParams) (int64, error) diff --git a/sqldb/sqlc/queries/graph.sql b/sqldb/sqlc/queries/graph.sql index d8215b22e..7f12cd0c5 100644 --- a/sqldb/sqlc/queries/graph.sql +++ b/sqldb/sqlc/queries/graph.sql @@ -1064,3 +1064,37 @@ INSERT INTO graph_channels ( bitcoin_1_signature = EXCLUDED.bitcoin_1_signature, bitcoin_2_signature = EXCLUDED.bitcoin_2_signature RETURNING id; + +-- NOTE: This query is only meant to be used by the graph SQL migration since +-- for that migration, in order to be retry-safe, we don't want to error out if +-- we re-insert the same policy (which would error if the normal +-- UpsertEdgePolicy query is used because of the constraint in that query that +-- requires a policy update to have a newer last_update than the existing one). +-- name: InsertEdgePolicyMig :one +INSERT INTO graph_channel_policies ( + version, channel_id, node_id, timelock, fee_ppm, + base_fee_msat, min_htlc_msat, last_update, disabled, + max_htlc_msat, inbound_base_fee_msat, + inbound_fee_rate_milli_msat, message_flags, channel_flags, + signature +) VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15 +) +ON CONFLICT (channel_id, node_id, version) + -- If a conflict occurs, we have already migrated this policy. However, we + -- still need to do an "UPDATE SET" here instead of "DO NOTHING" because + -- otherwise, the "RETURNING id" part does not work. + DO UPDATE SET + timelock = EXCLUDED.timelock, + fee_ppm = EXCLUDED.fee_ppm, + base_fee_msat = EXCLUDED.base_fee_msat, + min_htlc_msat = EXCLUDED.min_htlc_msat, + last_update = EXCLUDED.last_update, + disabled = EXCLUDED.disabled, + max_htlc_msat = EXCLUDED.max_htlc_msat, + inbound_base_fee_msat = EXCLUDED.inbound_base_fee_msat, + inbound_fee_rate_milli_msat = EXCLUDED.inbound_fee_rate_milli_msat, + message_flags = EXCLUDED.message_flags, + channel_flags = EXCLUDED.channel_flags, + signature = EXCLUDED.signature +RETURNING id; \ No newline at end of file