mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-12-10 04:43:33 +01:00
graph/db+sqldb: make node migration idempotent
In this commit, the graph SQL migration is updated so that the node migration step is retry-safe. This is done by using migration specific logic & queries that do not use the same node-update-constraint as the normal node upsert logic. For normal "run-time" logic, we always expect a node update to have a newer timestamp than any previously stored one. But for the migration, we will only ever be dealing with a single announcement for a given node & to make things retry-safe, we dont want the query to error if we re-insert the exact same node.
This commit is contained in:
@@ -273,7 +273,7 @@ func migrateNodes(ctx context.Context, cfg *sqldb.QueryConfig,
|
||||
// production.
|
||||
|
||||
// Write the node to the SQL database.
|
||||
id, err := upsertNode(ctx, sqlDB, node)
|
||||
id, err := insertNodeSQLMig(ctx, sqlDB, node)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not persist node(%x): %w", pub,
|
||||
err)
|
||||
@@ -303,8 +303,11 @@ func migrateNodes(ctx context.Context, cfg *sqldb.QueryConfig,
|
||||
|
||||
return nil
|
||||
}, func() {
|
||||
// No reset is needed since if a retry occurs, the entire
|
||||
// migration will be retried from the start.
|
||||
count = 0
|
||||
chunk = 0
|
||||
skipped = 0
|
||||
t0 = time.Now()
|
||||
batch = make(map[int64]*models.LightningNode, cfg.MaxBatchSize)
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not migrate nodes: %w", err)
|
||||
@@ -1353,3 +1356,70 @@ func forEachClosedSCID(db kvdb.Backend,
|
||||
})
|
||||
}, reset)
|
||||
}
|
||||
|
||||
// insertNodeSQLMig inserts the node record into the database during the graph
|
||||
// SQL migration. No error is expected if the node already exists. Unlike the
|
||||
// main upsertNode function, this function does not require that a new node
|
||||
// update have a newer timestamp than the existing one. This is because we want
|
||||
// the migration to be idempotent and dont want to error out if we re-insert the
|
||||
// exact same node.
|
||||
//
|
||||
// TODO(elle): update the upsert calls in this function to be more efficient.
|
||||
// since no data collection steps should be required during the migration.
|
||||
func insertNodeSQLMig(ctx context.Context, db SQLQueries,
|
||||
node *models.LightningNode) (int64, error) {
|
||||
|
||||
params := sqlc.InsertNodeMigParams{
|
||||
Version: int16(ProtocolV1),
|
||||
PubKey: node.PubKeyBytes[:],
|
||||
}
|
||||
|
||||
if node.HaveNodeAnnouncement {
|
||||
params.LastUpdate = sqldb.SQLInt64(node.LastUpdate.Unix())
|
||||
params.Color = sqldb.SQLStr(EncodeHexColor(node.Color))
|
||||
params.Alias = sqldb.SQLStr(node.Alias)
|
||||
params.Signature = node.AuthSigBytes
|
||||
}
|
||||
|
||||
nodeID, err := db.InsertNodeMig(ctx, params)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("upserting node(%x): %w", node.PubKeyBytes,
|
||||
err)
|
||||
}
|
||||
|
||||
// We can exit here if we don't have the announcement yet.
|
||||
if !node.HaveNodeAnnouncement {
|
||||
return nodeID, nil
|
||||
}
|
||||
|
||||
// NOTE: The upserts here will be updated to be more efficient in the
|
||||
// following commits.
|
||||
|
||||
// Update the node's features.
|
||||
err = upsertNodeFeatures(ctx, db, nodeID, node.Features)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("inserting node features: %w", err)
|
||||
}
|
||||
|
||||
// Update the node's addresses.
|
||||
err = upsertNodeAddresses(ctx, db, nodeID, node.Addresses)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("inserting node addresses: %w", err)
|
||||
}
|
||||
|
||||
// Convert the flat extra opaque data into a map of TLV types to
|
||||
// values.
|
||||
extra, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("unable to marshal extra opaque data: %w",
|
||||
err)
|
||||
}
|
||||
|
||||
// Update the node's extra signed fields.
|
||||
err = upsertNodeExtraSignedFields(ctx, db, nodeID, extra)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("inserting node extra TLVs: %w", err)
|
||||
}
|
||||
|
||||
return nodeID, nil
|
||||
}
|
||||
|
||||
@@ -155,7 +155,6 @@ func TestMigrateGraphToSQL(t *testing.T) {
|
||||
expGraphStats: graphStats{
|
||||
numNodes: 6,
|
||||
},
|
||||
expNotRetrySafety: true,
|
||||
},
|
||||
{
|
||||
name: "source node",
|
||||
@@ -173,7 +172,6 @@ func TestMigrateGraphToSQL(t *testing.T) {
|
||||
numNodes: 1,
|
||||
srcNodeSet: true,
|
||||
},
|
||||
expNotRetrySafety: true,
|
||||
},
|
||||
{
|
||||
name: "channel with no policies",
|
||||
|
||||
@@ -153,6 +153,16 @@ type SQLQueries interface {
|
||||
InsertClosedChannel(ctx context.Context, scid []byte) error
|
||||
IsClosedChannel(ctx context.Context, scid []byte) (bool, error)
|
||||
GetClosedChannelsSCIDs(ctx context.Context, scids [][]byte) ([][]byte, error)
|
||||
|
||||
/*
|
||||
Migration specific queries.
|
||||
|
||||
NOTE: these should not be used in code other than migrations.
|
||||
Once sqldbv2 is in place, these can be removed from this struct
|
||||
as then migrations will have their own dedicated queries
|
||||
structs.
|
||||
*/
|
||||
InsertNodeMig(ctx context.Context, arg sqlc.InsertNodeMigParams) (int64, error)
|
||||
}
|
||||
|
||||
// BatchedSQLQueries is a version of SQLQueries that's capable of batched
|
||||
|
||||
@@ -2445,6 +2445,61 @@ func (q *Queries) InsertNodeFeature(ctx context.Context, arg InsertNodeFeaturePa
|
||||
return err
|
||||
}
|
||||
|
||||
const insertNodeMig = `-- name: InsertNodeMig :one
|
||||
/* ─────────────────────────────────────────────
|
||||
Migration specific queries
|
||||
|
||||
NOTE: once sqldbv2 is in place, these queries can be contained to a package
|
||||
dedicated to the migration that requires it, and so we can then remove
|
||||
it from the main set of "live" queries that the code-base has access to.
|
||||
────────────────────────────────────────────-
|
||||
*/
|
||||
|
||||
INSERT INTO graph_nodes (
|
||||
version, pub_key, alias, last_update, color, signature
|
||||
) VALUES (
|
||||
$1, $2, $3, $4, $5, $6
|
||||
)
|
||||
ON CONFLICT (pub_key, version)
|
||||
-- If a conflict occurs, we have already migrated this node. However, we
|
||||
-- still need to do an "UPDATE SET" here instead of "DO NOTHING" because
|
||||
-- otherwise, the "RETURNING id" part does not work.
|
||||
DO UPDATE SET
|
||||
alias = EXCLUDED.alias,
|
||||
last_update = EXCLUDED.last_update,
|
||||
color = EXCLUDED.color,
|
||||
signature = EXCLUDED.signature
|
||||
RETURNING id
|
||||
`
|
||||
|
||||
type InsertNodeMigParams struct {
|
||||
Version int16
|
||||
PubKey []byte
|
||||
Alias sql.NullString
|
||||
LastUpdate sql.NullInt64
|
||||
Color sql.NullString
|
||||
Signature []byte
|
||||
}
|
||||
|
||||
// NOTE: This query is only meant to be used by the graph SQL migration since
|
||||
// for that migration, in order to be retry-safe, we don't want to error out if
|
||||
// we re-insert the same node (which would error if the normal UpsertNode query
|
||||
// is used because of the constraint in that query that requires a node update
|
||||
// to have a newer last_update than the existing node).
|
||||
func (q *Queries) InsertNodeMig(ctx context.Context, arg InsertNodeMigParams) (int64, error) {
|
||||
row := q.db.QueryRowContext(ctx, insertNodeMig,
|
||||
arg.Version,
|
||||
arg.PubKey,
|
||||
arg.Alias,
|
||||
arg.LastUpdate,
|
||||
arg.Color,
|
||||
arg.Signature,
|
||||
)
|
||||
var id int64
|
||||
err := row.Scan(&id)
|
||||
return id, err
|
||||
}
|
||||
|
||||
const isClosedChannel = `-- name: IsClosedChannel :one
|
||||
SELECT EXISTS (
|
||||
SELECT 1
|
||||
|
||||
@@ -99,6 +99,12 @@ type Querier interface {
|
||||
InsertMigratedInvoice(ctx context.Context, arg InsertMigratedInvoiceParams) (int64, error)
|
||||
InsertNodeAddress(ctx context.Context, arg InsertNodeAddressParams) error
|
||||
InsertNodeFeature(ctx context.Context, arg InsertNodeFeatureParams) error
|
||||
// NOTE: This query is only meant to be used by the graph SQL migration since
|
||||
// for that migration, in order to be retry-safe, we don't want to error out if
|
||||
// we re-insert the same node (which would error if the normal UpsertNode query
|
||||
// is used because of the constraint in that query that requires a node update
|
||||
// to have a newer last_update than the existing node).
|
||||
InsertNodeMig(ctx context.Context, arg InsertNodeMigParams) (int64, error)
|
||||
IsClosedChannel(ctx context.Context, scid []byte) (bool, error)
|
||||
IsPublicV1Node(ctx context.Context, pubKey []byte) (bool, error)
|
||||
IsZombieChannel(ctx context.Context, arg IsZombieChannelParams) (bool, error)
|
||||
|
||||
@@ -995,3 +995,34 @@ SELECT EXISTS (
|
||||
SELECT scid
|
||||
FROM graph_closed_scids
|
||||
WHERE scid IN (sqlc.slice('scids')/*SLICE:scids*/);
|
||||
|
||||
/* ─────────────────────────────────────────────
|
||||
Migration specific queries
|
||||
|
||||
NOTE: once sqldbv2 is in place, these queries can be contained to a package
|
||||
dedicated to the migration that requires it, and so we can then remove
|
||||
it from the main set of "live" queries that the code-base has access to.
|
||||
────────────────────────────────────────────-
|
||||
*/
|
||||
|
||||
-- NOTE: This query is only meant to be used by the graph SQL migration since
|
||||
-- for that migration, in order to be retry-safe, we don't want to error out if
|
||||
-- we re-insert the same node (which would error if the normal UpsertNode query
|
||||
-- is used because of the constraint in that query that requires a node update
|
||||
-- to have a newer last_update than the existing node).
|
||||
-- name: InsertNodeMig :one
|
||||
INSERT INTO graph_nodes (
|
||||
version, pub_key, alias, last_update, color, signature
|
||||
) VALUES (
|
||||
$1, $2, $3, $4, $5, $6
|
||||
)
|
||||
ON CONFLICT (pub_key, version)
|
||||
-- If a conflict occurs, we have already migrated this node. However, we
|
||||
-- still need to do an "UPDATE SET" here instead of "DO NOTHING" because
|
||||
-- otherwise, the "RETURNING id" part does not work.
|
||||
DO UPDATE SET
|
||||
alias = EXCLUDED.alias,
|
||||
last_update = EXCLUDED.last_update,
|
||||
color = EXCLUDED.color,
|
||||
signature = EXCLUDED.signature
|
||||
RETURNING id;
|
||||
|
||||
Reference in New Issue
Block a user