graph/db+sqldb: make channel SQL mig retry-safe

In this commit, we make the channel part of the graph SQL migration
idempotent (retry-safe!). We do this by adding a migration-only channel
insert query that will not error out if a the query is called and a
chanenl with the given scid&version already exists. We also ensure that
errors are not thrown if existing channel features & extra types are
re-added.
This commit is contained in:
Elle Mouton
2025-08-15 10:11:20 +02:00
parent a291d6f1a6
commit 8736fcafa8
6 changed files with 267 additions and 61 deletions

View File

@@ -524,7 +524,7 @@ func migrateChannelsAndPolicies(ctx context.Context, cfg *SQLStoreConfig,
chunk++
// Migrate the channel info along with its policies.
dbChanInfo, err := insertChannel(ctx, sqlDB, channel)
dbChanInfo, err := insertChannelMig(ctx, sqlDB, channel)
if err != nil {
return fmt.Errorf("could not insert record for "+
"channel %d in SQL store: %w", scid, err)
@@ -577,8 +577,13 @@ func migrateChannelsAndPolicies(ctx context.Context, cfg *SQLStoreConfig,
return nil
}, func() {
// No reset is needed since if a retry occurs, the entire
// migration will be retried from the start.
channelCount = 0
policyCount = 0
chunk = 0
skippedChanCount = 0
skippedPolicyCount = 0
t0 = time.Now()
batch = make(map[int64]*migChanInfo, cfg.QueryCfg.MaxBatchSize)
})
if err != nil {
return fmt.Errorf("could not migrate channels and policies: %w",
@@ -1452,3 +1457,106 @@ func insertNodeSQLMig(ctx context.Context, db SQLQueries,
return nodeID, nil
}
// dbChanInfo holds the DB level IDs of a channel and the nodes involved in the
// channel.
type dbChanInfo struct {
channelID int64
node1ID int64
node2ID int64
}
// insertChannelMig inserts a new channel record into the database during the
// graph SQL migration.
func insertChannelMig(ctx context.Context, db SQLQueries,
edge *models.ChannelEdgeInfo) (*dbChanInfo, error) {
// Make sure that at least a "shell" entry for each node is present in
// the nodes table.
//
// NOTE: we need this even during the SQL migration where nodes are
// migrated first because there are cases were some nodes may have
// been skipped due to invalid TLV data.
node1DBID, err := maybeCreateShellNode(ctx, db, edge.NodeKey1Bytes)
if err != nil {
return nil, fmt.Errorf("unable to create shell node: %w", err)
}
node2DBID, err := maybeCreateShellNode(ctx, db, edge.NodeKey2Bytes)
if err != nil {
return nil, fmt.Errorf("unable to create shell node: %w", err)
}
var capacity sql.NullInt64
if edge.Capacity != 0 {
capacity = sqldb.SQLInt64(int64(edge.Capacity))
}
createParams := sqlc.InsertChannelMigParams{
Version: int16(ProtocolV1),
Scid: channelIDToBytes(edge.ChannelID),
NodeID1: node1DBID,
NodeID2: node2DBID,
Outpoint: edge.ChannelPoint.String(),
Capacity: capacity,
BitcoinKey1: edge.BitcoinKey1Bytes[:],
BitcoinKey2: edge.BitcoinKey2Bytes[:],
}
if edge.AuthProof != nil {
proof := edge.AuthProof
createParams.Node1Signature = proof.NodeSig1Bytes
createParams.Node2Signature = proof.NodeSig2Bytes
createParams.Bitcoin1Signature = proof.BitcoinSig1Bytes
createParams.Bitcoin2Signature = proof.BitcoinSig2Bytes
}
// Insert the new channel record.
dbChanID, err := db.InsertChannelMig(ctx, createParams)
if err != nil {
return nil, err
}
// Insert any channel features.
for feature := range edge.Features.Features() {
err = db.InsertChannelFeature(
ctx, sqlc.InsertChannelFeatureParams{
ChannelID: dbChanID,
FeatureBit: int32(feature),
},
)
if err != nil {
return nil, fmt.Errorf("unable to insert channel(%d) "+
"feature(%v): %w", dbChanID, feature, err)
}
}
// Finally, insert any extra TLV fields in the channel announcement.
extra, err := marshalExtraOpaqueData(edge.ExtraOpaqueData)
if err != nil {
return nil, fmt.Errorf("unable to marshal extra opaque "+
"data: %w", err)
}
for tlvType, value := range extra {
err := db.UpsertChannelExtraType(
ctx, sqlc.UpsertChannelExtraTypeParams{
ChannelID: dbChanID,
Type: int64(tlvType),
Value: value,
},
)
if err != nil {
return nil, fmt.Errorf("unable to upsert "+
"channel(%d) extra signed field(%v): %w",
edge.ChannelID, tlvType, err)
}
}
return &dbChanInfo{
channelID: dbChanID,
node1ID: node1DBID,
node2ID: node2DBID,
}, nil
}

View File

@@ -225,7 +225,6 @@ func TestMigrateGraphToSQL(t *testing.T) {
numNodes: 4,
numChannels: 3,
},
expNotRetrySafety: true,
},
{
name: "channels and policies",

View File

@@ -112,7 +112,7 @@ type SQLQueries interface {
GetSCIDByOutpoint(ctx context.Context, arg sqlc.GetSCIDByOutpointParams) ([]byte, error)
DeleteChannels(ctx context.Context, ids []int64) error
CreateChannelExtraType(ctx context.Context, arg sqlc.CreateChannelExtraTypeParams) error
UpsertChannelExtraType(ctx context.Context, arg sqlc.UpsertChannelExtraTypeParams) error
GetChannelExtrasBatch(ctx context.Context, chanIds []int64) ([]sqlc.GraphChannelExtraType, error)
InsertChannelFeature(ctx context.Context, arg sqlc.InsertChannelFeatureParams) error
GetChannelFeaturesBatch(ctx context.Context, chanIds []int64) ([]sqlc.GraphChannelFeature, error)
@@ -163,6 +163,7 @@ type SQLQueries interface {
structs.
*/
InsertNodeMig(ctx context.Context, arg sqlc.InsertNodeMigParams) (int64, error)
InsertChannelMig(ctx context.Context, arg sqlc.InsertChannelMigParams) (int64, error)
}
// BatchedSQLQueries is a version of SQLQueries that's capable of batched
@@ -627,9 +628,7 @@ func (s *SQLStore) AddChannelEdge(ctx context.Context,
err)
}
_, err = insertChannel(ctx, tx, edge)
return err
return insertChannel(ctx, tx, edge)
},
OnCommit: func(err error) error {
switch {
@@ -3799,28 +3798,20 @@ func marshalExtraOpaqueData(data []byte) (map[uint64][]byte, error) {
return records, nil
}
// dbChanInfo holds the DB level IDs of a channel and the nodes involved in the
// channel.
type dbChanInfo struct {
channelID int64
node1ID int64
node2ID int64
}
// insertChannel inserts a new channel record into the database.
func insertChannel(ctx context.Context, db SQLQueries,
edge *models.ChannelEdgeInfo) (*dbChanInfo, error) {
edge *models.ChannelEdgeInfo) error {
// Make sure that at least a "shell" entry for each node is present in
// the nodes table.
node1DBID, err := maybeCreateShellNode(ctx, db, edge.NodeKey1Bytes)
if err != nil {
return nil, fmt.Errorf("unable to create shell node: %w", err)
return fmt.Errorf("unable to create shell node: %w", err)
}
node2DBID, err := maybeCreateShellNode(ctx, db, edge.NodeKey2Bytes)
if err != nil {
return nil, fmt.Errorf("unable to create shell node: %w", err)
return fmt.Errorf("unable to create shell node: %w", err)
}
var capacity sql.NullInt64
@@ -3851,7 +3842,7 @@ func insertChannel(ctx context.Context, db SQLQueries,
// Insert the new channel record.
dbChanID, err := db.CreateChannel(ctx, createParams)
if err != nil {
return nil, err
return err
}
// Insert any channel features.
@@ -3863,7 +3854,7 @@ func insertChannel(ctx context.Context, db SQLQueries,
},
)
if err != nil {
return nil, fmt.Errorf("unable to insert channel(%d) "+
return fmt.Errorf("unable to insert channel(%d) "+
"feature(%v): %w", dbChanID, feature, err)
}
}
@@ -3871,30 +3862,26 @@ func insertChannel(ctx context.Context, db SQLQueries,
// Finally, insert any extra TLV fields in the channel announcement.
extra, err := marshalExtraOpaqueData(edge.ExtraOpaqueData)
if err != nil {
return nil, fmt.Errorf("unable to marshal extra opaque "+
"data: %w", err)
return fmt.Errorf("unable to marshal extra opaque data: %w",
err)
}
for tlvType, value := range extra {
err := db.CreateChannelExtraType(
ctx, sqlc.CreateChannelExtraTypeParams{
err := db.UpsertChannelExtraType(
ctx, sqlc.UpsertChannelExtraTypeParams{
ChannelID: dbChanID,
Type: int64(tlvType),
Value: value,
},
)
if err != nil {
return nil, fmt.Errorf("unable to upsert "+
"channel(%d) extra signed field(%v): %w",
edge.ChannelID, tlvType, err)
return fmt.Errorf("unable to upsert channel(%d) "+
"extra signed field(%v): %w", edge.ChannelID,
tlvType, err)
}
}
return &dbChanInfo{
channelID: dbChanID,
node1ID: node1DBID,
node2ID: node2DBID,
}, nil
return nil
}
// maybeCreateShellNode checks if a shell node entry exists for the

View File

@@ -120,29 +120,6 @@ func (q *Queries) CreateChannel(ctx context.Context, arg CreateChannelParams) (i
return id, err
}
const createChannelExtraType = `-- name: CreateChannelExtraType :exec
/* ─────────────────────────────────────────────
graph_channel_extra_types table queries
─────────────────────────────────────────────
*/
INSERT INTO graph_channel_extra_types (
channel_id, type, value
)
VALUES ($1, $2, $3)
`
type CreateChannelExtraTypeParams struct {
ChannelID int64
Type int64
Value []byte
}
func (q *Queries) CreateChannelExtraType(ctx context.Context, arg CreateChannelExtraTypeParams) error {
_, err := q.db.ExecContext(ctx, createChannelExtraType, arg.ChannelID, arg.Type, arg.Value)
return err
}
const deleteChannelPolicyExtraTypes = `-- name: DeleteChannelPolicyExtraTypes :exec
DELETE FROM graph_channel_policy_extra_types
WHERE channel_policy_id = $1
@@ -2360,7 +2337,9 @@ INSERT INTO graph_channel_features (
channel_id, feature_bit
) VALUES (
$1, $2
)
) ON CONFLICT (channel_id, feature_bit)
-- Do nothing if the channel_id and feature_bit already exist.
DO NOTHING
`
type InsertChannelFeatureParams struct {
@@ -2373,6 +2352,72 @@ func (q *Queries) InsertChannelFeature(ctx context.Context, arg InsertChannelFea
return err
}
const insertChannelMig = `-- name: InsertChannelMig :one
INSERT INTO graph_channels (
version, scid, node_id_1, node_id_2,
outpoint, capacity, bitcoin_key_1, bitcoin_key_2,
node_1_signature, node_2_signature, bitcoin_1_signature,
bitcoin_2_signature
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12
) ON CONFLICT (scid, version)
-- If a conflict occurs, we have already migrated this channel. However, we
-- still need to do an "UPDATE SET" here instead of "DO NOTHING" because
-- otherwise, the "RETURNING id" part does not work.
DO UPDATE SET
node_id_1 = EXCLUDED.node_id_1,
node_id_2 = EXCLUDED.node_id_2,
outpoint = EXCLUDED.outpoint,
capacity = EXCLUDED.capacity,
bitcoin_key_1 = EXCLUDED.bitcoin_key_1,
bitcoin_key_2 = EXCLUDED.bitcoin_key_2,
node_1_signature = EXCLUDED.node_1_signature,
node_2_signature = EXCLUDED.node_2_signature,
bitcoin_1_signature = EXCLUDED.bitcoin_1_signature,
bitcoin_2_signature = EXCLUDED.bitcoin_2_signature
RETURNING id
`
type InsertChannelMigParams struct {
Version int16
Scid []byte
NodeID1 int64
NodeID2 int64
Outpoint string
Capacity sql.NullInt64
BitcoinKey1 []byte
BitcoinKey2 []byte
Node1Signature []byte
Node2Signature []byte
Bitcoin1Signature []byte
Bitcoin2Signature []byte
}
// NOTE: This query is only meant to be used by the graph SQL migration since
// for that migration, in order to be retry-safe, we don't want to error out if
// we re-insert the same channel again (which would error if the normal
// CreateChannel query is used because of the uniqueness constraint on the scid
// and version columns).
func (q *Queries) InsertChannelMig(ctx context.Context, arg InsertChannelMigParams) (int64, error) {
row := q.db.QueryRowContext(ctx, insertChannelMig,
arg.Version,
arg.Scid,
arg.NodeID1,
arg.NodeID2,
arg.Outpoint,
arg.Capacity,
arg.BitcoinKey1,
arg.BitcoinKey2,
arg.Node1Signature,
arg.Node2Signature,
arg.Bitcoin1Signature,
arg.Bitcoin2Signature,
)
var id int64
err := row.Scan(&id)
return id, err
}
const insertClosedChannel = `-- name: InsertClosedChannel :exec
/* ─────────────────────────────────────────────
graph_closed_scid table queries
@@ -3308,6 +3353,32 @@ func (q *Queries) ListNodesPaginated(ctx context.Context, arg ListNodesPaginated
return items, nil
}
const upsertChannelExtraType = `-- name: UpsertChannelExtraType :exec
/* ─────────────────────────────────────────────
graph_channel_extra_types table queries
─────────────────────────────────────────────
*/
INSERT INTO graph_channel_extra_types (
channel_id, type, value
)
VALUES ($1, $2, $3)
ON CONFLICT (channel_id, type)
-- Update the value if a conflict occurs on channel_id and type.
DO UPDATE SET value = EXCLUDED.value
`
type UpsertChannelExtraTypeParams struct {
ChannelID int64
Type int64
Value []byte
}
func (q *Queries) UpsertChannelExtraType(ctx context.Context, arg UpsertChannelExtraTypeParams) error {
_, err := q.db.ExecContext(ctx, upsertChannelExtraType, arg.ChannelID, arg.Type, arg.Value)
return err
}
const upsertEdgePolicy = `-- name: UpsertEdgePolicy :one
/* ─────────────────────────────────────────────
graph_channel_policies table queries

View File

@@ -16,7 +16,6 @@ type Querier interface {
ClearKVInvoiceHashIndex(ctx context.Context) error
CountZombieChannels(ctx context.Context, version int16) (int64, error)
CreateChannel(ctx context.Context, arg CreateChannelParams) (int64, error)
CreateChannelExtraType(ctx context.Context, arg CreateChannelExtraTypeParams) error
DeleteCanceledInvoices(ctx context.Context) (sql.Result, error)
DeleteChannelPolicyExtraTypes(ctx context.Context, channelPolicyID int64) error
DeleteChannels(ctx context.Context, ids []int64) error
@@ -90,6 +89,12 @@ type Querier interface {
InsertAMPSubInvoiceHTLC(ctx context.Context, arg InsertAMPSubInvoiceHTLCParams) error
InsertChanPolicyExtraType(ctx context.Context, arg InsertChanPolicyExtraTypeParams) error
InsertChannelFeature(ctx context.Context, arg InsertChannelFeatureParams) error
// NOTE: This query is only meant to be used by the graph SQL migration since
// for that migration, in order to be retry-safe, we don't want to error out if
// we re-insert the same channel again (which would error if the normal
// CreateChannel query is used because of the uniqueness constraint on the scid
// and version columns).
InsertChannelMig(ctx context.Context, arg InsertChannelMigParams) (int64, error)
InsertClosedChannel(ctx context.Context, scid []byte) error
InsertInvoice(ctx context.Context, arg InsertInvoiceParams) (int64, error)
InsertInvoiceFeature(ctx context.Context, arg InsertInvoiceFeatureParams) error
@@ -130,6 +135,7 @@ type Querier interface {
UpdateInvoiceHTLCs(ctx context.Context, arg UpdateInvoiceHTLCsParams) error
UpdateInvoiceState(ctx context.Context, arg UpdateInvoiceStateParams) (sql.Result, error)
UpsertAMPSubInvoice(ctx context.Context, arg UpsertAMPSubInvoiceParams) (sql.Result, error)
UpsertChannelExtraType(ctx context.Context, arg UpsertChannelExtraTypeParams) error
UpsertEdgePolicy(ctx context.Context, arg UpsertEdgePolicyParams) (int64, error)
UpsertNode(ctx context.Context, arg UpsertNodeParams) (int64, error)
UpsertNodeAddress(ctx context.Context, arg UpsertNodeAddressParams) error

View File

@@ -738,7 +738,9 @@ INSERT INTO graph_channel_features (
channel_id, feature_bit
) VALUES (
$1, $2
);
) ON CONFLICT (channel_id, feature_bit)
-- Do nothing if the channel_id and feature_bit already exist.
DO NOTHING;
-- name: GetChannelFeaturesBatch :many
SELECT
@@ -753,11 +755,14 @@ ORDER BY channel_id, feature_bit;
─────────────────────────────────────────────
*/
-- name: CreateChannelExtraType :exec
-- name: UpsertChannelExtraType :exec
INSERT INTO graph_channel_extra_types (
channel_id, type, value
)
VALUES ($1, $2, $3);
VALUES ($1, $2, $3)
ON CONFLICT (channel_id, type)
-- Update the value if a conflict occurs on channel_id and type.
DO UPDATE SET value = EXCLUDED.value;
-- name: GetChannelExtrasBatch :many
SELECT
@@ -1029,3 +1034,33 @@ ON CONFLICT (pub_key, version)
color = EXCLUDED.color,
signature = EXCLUDED.signature
RETURNING id;
-- NOTE: This query is only meant to be used by the graph SQL migration since
-- for that migration, in order to be retry-safe, we don't want to error out if
-- we re-insert the same channel again (which would error if the normal
-- CreateChannel query is used because of the uniqueness constraint on the scid
-- and version columns).
-- name: InsertChannelMig :one
INSERT INTO graph_channels (
version, scid, node_id_1, node_id_2,
outpoint, capacity, bitcoin_key_1, bitcoin_key_2,
node_1_signature, node_2_signature, bitcoin_1_signature,
bitcoin_2_signature
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12
) ON CONFLICT (scid, version)
-- If a conflict occurs, we have already migrated this channel. However, we
-- still need to do an "UPDATE SET" here instead of "DO NOTHING" because
-- otherwise, the "RETURNING id" part does not work.
DO UPDATE SET
node_id_1 = EXCLUDED.node_id_1,
node_id_2 = EXCLUDED.node_id_2,
outpoint = EXCLUDED.outpoint,
capacity = EXCLUDED.capacity,
bitcoin_key_1 = EXCLUDED.bitcoin_key_1,
bitcoin_key_2 = EXCLUDED.bitcoin_key_2,
node_1_signature = EXCLUDED.node_1_signature,
node_2_signature = EXCLUDED.node_2_signature,
bitcoin_1_signature = EXCLUDED.bitcoin_1_signature,
bitcoin_2_signature = EXCLUDED.bitcoin_2_signature
RETURNING id;