diff --git a/graph/db/sql_migration.go b/graph/db/sql_migration.go index 61e61526c..30e516a6a 100644 --- a/graph/db/sql_migration.go +++ b/graph/db/sql_migration.go @@ -454,7 +454,7 @@ func migrateChannelsAndPolicies(ctx context.Context, cfg *SQLStoreConfig, policyCount++ - _, _, _, err := updateChanEdgePolicy(ctx, sqlDB, policy) + _, _, _, err := insertChanEdgePolicyMig(ctx, sqlDB, policy) if err != nil { return fmt.Errorf("could not migrate channel "+ "policy %d: %w", policy.ChannelID, err) @@ -1560,3 +1560,102 @@ func insertChannelMig(ctx context.Context, db SQLQueries, node2ID: node2DBID, }, nil } + +// insertChanEdgePolicyMig inserts the channel policy info we have stored for +// a channel we already know of. This is used during the SQL migration +// process to insert channel policies. +// +// TODO(elle): update this function to be more performant in the migration +// setting. For the sake of keeping the commit that introduced this function +// simple, this is for now mostly the same as updateChanEdgePolicy. +func insertChanEdgePolicyMig(ctx context.Context, tx SQLQueries, + edge *models.ChannelEdgePolicy) (route.Vertex, route.Vertex, bool, + error) { + + var ( + node1Pub, node2Pub route.Vertex + isNode1 bool + chanIDB = channelIDToBytes(edge.ChannelID) + ) + + // Check that this edge policy refers to a channel that we already + // know of. We do this explicitly so that we can return the appropriate + // ErrEdgeNotFound error if the channel doesn't exist, rather than + // abort the transaction which would abort the entire batch. + dbChan, err := tx.GetChannelAndNodesBySCID( + ctx, sqlc.GetChannelAndNodesBySCIDParams{ + Scid: chanIDB, + Version: int16(ProtocolV1), + }, + ) + if errors.Is(err, sql.ErrNoRows) { + return node1Pub, node2Pub, false, ErrEdgeNotFound + } else if err != nil { + return node1Pub, node2Pub, false, fmt.Errorf("unable to "+ + "fetch channel(%v): %w", edge.ChannelID, err) + } + + copy(node1Pub[:], dbChan.Node1PubKey) + copy(node2Pub[:], dbChan.Node2PubKey) + + // Figure out which node this edge is from. + isNode1 = edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 + nodeID := dbChan.NodeID1 + if !isNode1 { + nodeID = dbChan.NodeID2 + } + + var ( + inboundBase sql.NullInt64 + inboundRate sql.NullInt64 + ) + edge.InboundFee.WhenSome(func(fee lnwire.Fee) { + inboundRate = sqldb.SQLInt64(fee.FeeRate) + inboundBase = sqldb.SQLInt64(fee.BaseFee) + }) + + id, err := tx.InsertEdgePolicyMig(ctx, sqlc.InsertEdgePolicyMigParams{ + Version: int16(ProtocolV1), + ChannelID: dbChan.ID, + NodeID: nodeID, + Timelock: int32(edge.TimeLockDelta), + FeePpm: int64(edge.FeeProportionalMillionths), + BaseFeeMsat: int64(edge.FeeBaseMSat), + MinHtlcMsat: int64(edge.MinHTLC), + LastUpdate: sqldb.SQLInt64(edge.LastUpdate.Unix()), + Disabled: sql.NullBool{ + Valid: true, + Bool: edge.IsDisabled(), + }, + MaxHtlcMsat: sql.NullInt64{ + Valid: edge.MessageFlags.HasMaxHtlc(), + Int64: int64(edge.MaxHTLC), + }, + MessageFlags: sqldb.SQLInt16(edge.MessageFlags), + ChannelFlags: sqldb.SQLInt16(edge.ChannelFlags), + InboundBaseFeeMsat: inboundBase, + InboundFeeRateMilliMsat: inboundRate, + Signature: edge.SigBytes, + }) + if err != nil { + return node1Pub, node2Pub, isNode1, + fmt.Errorf("unable to upsert edge policy: %w", err) + } + + // Convert the flat extra opaque data into a map of TLV types to + // values. + extra, err := marshalExtraOpaqueData(edge.ExtraOpaqueData) + if err != nil { + return node1Pub, node2Pub, false, fmt.Errorf("unable to "+ + "marshal extra opaque data: %w", err) + } + + // Update the channel policy's extra signed fields. + err = upsertChanPolicyExtraSignedFields(ctx, tx, id, extra) + if err != nil { + return node1Pub, node2Pub, false, fmt.Errorf("inserting chan "+ + "policy extra TLVs: %w", err) + } + + return node1Pub, node2Pub, isNode1, nil +} diff --git a/graph/db/sql_migration_test.go b/graph/db/sql_migration_test.go index aaf70990b..3833b8cc6 100644 --- a/graph/db/sql_migration_test.go +++ b/graph/db/sql_migration_test.go @@ -105,15 +105,6 @@ func TestMigrateGraphToSQL(t *testing.T) { write func(t *testing.T, db *KVStore, object any) objects []any expGraphStats graphStats - - // expNotRetrySafety is true if we expect an error to occur for - // the test if the migration is run twice. In other-words, if - // the specific case in question is currently not idempotent. - // - // NOTE: we want _all_ the cases here to be idempotent, so this - // is a temporary field which will be removed once we have - // properly made the migration retry-safe. - expNotRetrySafety bool }{ { name: "empty", @@ -297,7 +288,6 @@ func TestMigrateGraphToSQL(t *testing.T) { numChannels: 3, numPolicies: 3, }, - expNotRetrySafety: true, }, { name: "prune log", @@ -409,19 +399,11 @@ func TestMigrateGraphToSQL(t *testing.T) { // Validate that the two databases are now in sync. assertInSync(t, kvDB, sql, test.expGraphStats) - // NOTE: for now, not all the cases in the test are - // retry safe! The aim is to completely remove this - // field once we have made the migration retry-safe. + // The migration should be retry-safe, so running it + // again should not change the state of the databases. err = MigrateGraphToSQL(ctx, sql.cfg, kvDB.db, sql.db) - if !test.expNotRetrySafety { - // The migration should be retry-safe, so - // running it again should not change the state - // of the databases. - require.NoError(t, err) - assertInSync(t, kvDB, sql, test.expGraphStats) - } else { - require.Error(t, err) - } + require.NoError(t, err) + assertInSync(t, kvDB, sql, test.expGraphStats) }) } } diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 1879f6f38..60871e17b 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -164,6 +164,7 @@ type SQLQueries interface { */ InsertNodeMig(ctx context.Context, arg sqlc.InsertNodeMigParams) (int64, error) InsertChannelMig(ctx context.Context, arg sqlc.InsertChannelMigParams) (int64, error) + InsertEdgePolicyMig(ctx context.Context, arg sqlc.InsertEdgePolicyMigParams) (int64, error) } // BatchedSQLQueries is a version of SQLQueries that's capable of batched diff --git a/sqldb/sqlc/graph.sql.go b/sqldb/sqlc/graph.sql.go index 1e32bc33b..bdc049d85 100644 --- a/sqldb/sqlc/graph.sql.go +++ b/sqldb/sqlc/graph.sql.go @@ -2434,6 +2434,82 @@ func (q *Queries) InsertClosedChannel(ctx context.Context, scid []byte) error { return err } +const insertEdgePolicyMig = `-- name: InsertEdgePolicyMig :one +INSERT INTO graph_channel_policies ( + version, channel_id, node_id, timelock, fee_ppm, + base_fee_msat, min_htlc_msat, last_update, disabled, + max_htlc_msat, inbound_base_fee_msat, + inbound_fee_rate_milli_msat, message_flags, channel_flags, + signature +) VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15 +) +ON CONFLICT (channel_id, node_id, version) + -- If a conflict occurs, we have already migrated this policy. However, we + -- still need to do an "UPDATE SET" here instead of "DO NOTHING" because + -- otherwise, the "RETURNING id" part does not work. + DO UPDATE SET + timelock = EXCLUDED.timelock, + fee_ppm = EXCLUDED.fee_ppm, + base_fee_msat = EXCLUDED.base_fee_msat, + min_htlc_msat = EXCLUDED.min_htlc_msat, + last_update = EXCLUDED.last_update, + disabled = EXCLUDED.disabled, + max_htlc_msat = EXCLUDED.max_htlc_msat, + inbound_base_fee_msat = EXCLUDED.inbound_base_fee_msat, + inbound_fee_rate_milli_msat = EXCLUDED.inbound_fee_rate_milli_msat, + message_flags = EXCLUDED.message_flags, + channel_flags = EXCLUDED.channel_flags, + signature = EXCLUDED.signature +RETURNING id +` + +type InsertEdgePolicyMigParams struct { + Version int16 + ChannelID int64 + NodeID int64 + Timelock int32 + FeePpm int64 + BaseFeeMsat int64 + MinHtlcMsat int64 + LastUpdate sql.NullInt64 + Disabled sql.NullBool + MaxHtlcMsat sql.NullInt64 + InboundBaseFeeMsat sql.NullInt64 + InboundFeeRateMilliMsat sql.NullInt64 + MessageFlags sql.NullInt16 + ChannelFlags sql.NullInt16 + Signature []byte +} + +// NOTE: This query is only meant to be used by the graph SQL migration since +// for that migration, in order to be retry-safe, we don't want to error out if +// we re-insert the same policy (which would error if the normal +// UpsertEdgePolicy query is used because of the constraint in that query that +// requires a policy update to have a newer last_update than the existing one). +func (q *Queries) InsertEdgePolicyMig(ctx context.Context, arg InsertEdgePolicyMigParams) (int64, error) { + row := q.db.QueryRowContext(ctx, insertEdgePolicyMig, + arg.Version, + arg.ChannelID, + arg.NodeID, + arg.Timelock, + arg.FeePpm, + arg.BaseFeeMsat, + arg.MinHtlcMsat, + arg.LastUpdate, + arg.Disabled, + arg.MaxHtlcMsat, + arg.InboundBaseFeeMsat, + arg.InboundFeeRateMilliMsat, + arg.MessageFlags, + arg.ChannelFlags, + arg.Signature, + ) + var id int64 + err := row.Scan(&id) + return id, err +} + const insertNodeFeature = `-- name: InsertNodeFeature :exec /* ───────────────────────────────────────────── graph_node_features table queries diff --git a/sqldb/sqlc/querier.go b/sqldb/sqlc/querier.go index 0011f086a..6d38597a4 100644 --- a/sqldb/sqlc/querier.go +++ b/sqldb/sqlc/querier.go @@ -96,6 +96,12 @@ type Querier interface { // and version columns). InsertChannelMig(ctx context.Context, arg InsertChannelMigParams) (int64, error) InsertClosedChannel(ctx context.Context, scid []byte) error + // NOTE: This query is only meant to be used by the graph SQL migration since + // for that migration, in order to be retry-safe, we don't want to error out if + // we re-insert the same policy (which would error if the normal + // UpsertEdgePolicy query is used because of the constraint in that query that + // requires a policy update to have a newer last_update than the existing one). + InsertEdgePolicyMig(ctx context.Context, arg InsertEdgePolicyMigParams) (int64, error) InsertInvoice(ctx context.Context, arg InsertInvoiceParams) (int64, error) InsertInvoiceFeature(ctx context.Context, arg InsertInvoiceFeatureParams) error InsertInvoiceHTLC(ctx context.Context, arg InsertInvoiceHTLCParams) (int64, error) diff --git a/sqldb/sqlc/queries/graph.sql b/sqldb/sqlc/queries/graph.sql index d8215b22e..7f12cd0c5 100644 --- a/sqldb/sqlc/queries/graph.sql +++ b/sqldb/sqlc/queries/graph.sql @@ -1064,3 +1064,37 @@ INSERT INTO graph_channels ( bitcoin_1_signature = EXCLUDED.bitcoin_1_signature, bitcoin_2_signature = EXCLUDED.bitcoin_2_signature RETURNING id; + +-- NOTE: This query is only meant to be used by the graph SQL migration since +-- for that migration, in order to be retry-safe, we don't want to error out if +-- we re-insert the same policy (which would error if the normal +-- UpsertEdgePolicy query is used because of the constraint in that query that +-- requires a policy update to have a newer last_update than the existing one). +-- name: InsertEdgePolicyMig :one +INSERT INTO graph_channel_policies ( + version, channel_id, node_id, timelock, fee_ppm, + base_fee_msat, min_htlc_msat, last_update, disabled, + max_htlc_msat, inbound_base_fee_msat, + inbound_fee_rate_milli_msat, message_flags, channel_flags, + signature +) VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15 +) +ON CONFLICT (channel_id, node_id, version) + -- If a conflict occurs, we have already migrated this policy. However, we + -- still need to do an "UPDATE SET" here instead of "DO NOTHING" because + -- otherwise, the "RETURNING id" part does not work. + DO UPDATE SET + timelock = EXCLUDED.timelock, + fee_ppm = EXCLUDED.fee_ppm, + base_fee_msat = EXCLUDED.base_fee_msat, + min_htlc_msat = EXCLUDED.min_htlc_msat, + last_update = EXCLUDED.last_update, + disabled = EXCLUDED.disabled, + max_htlc_msat = EXCLUDED.max_htlc_msat, + inbound_base_fee_msat = EXCLUDED.inbound_base_fee_msat, + inbound_fee_rate_milli_msat = EXCLUDED.inbound_fee_rate_milli_msat, + message_flags = EXCLUDED.message_flags, + channel_flags = EXCLUDED.channel_flags, + signature = EXCLUDED.signature +RETURNING id; \ No newline at end of file