graph/db+sqldb: use batch fetching during channel&policy migration

Restructue the `migrateChannelsAndPolicies` function so that it does the
validation of migrated channels and policies in batches. So instead of
fetching channel and its policies individually after migrating it, we
wait for a minimum batch size to be reached and then validate a batch of
them together. This lets us make way fewer DB round trips.
This commit is contained in:
Elle Mouton
2025-08-13 13:37:25 +02:00
parent 03ef2740a6
commit 81c54611c1
5 changed files with 452 additions and 81 deletions

View File

@@ -873,6 +873,179 @@ func (q *Queries) GetChannelPolicyExtraTypesBatch(ctx context.Context, policyIds
return items, nil
}
const getChannelsByIDs = `-- name: GetChannelsByIDs :many
SELECT
c.id, c.version, c.scid, c.node_id_1, c.node_id_2, c.outpoint, c.capacity, c.bitcoin_key_1, c.bitcoin_key_2, c.node_1_signature, c.node_2_signature, c.bitcoin_1_signature, c.bitcoin_2_signature,
-- Minimal node data.
n1.id AS node1_id,
n1.pub_key AS node1_pub_key,
n2.id AS node2_id,
n2.pub_key AS node2_pub_key,
-- Policy 1
cp1.id AS policy1_id,
cp1.node_id AS policy1_node_id,
cp1.version AS policy1_version,
cp1.timelock AS policy1_timelock,
cp1.fee_ppm AS policy1_fee_ppm,
cp1.base_fee_msat AS policy1_base_fee_msat,
cp1.min_htlc_msat AS policy1_min_htlc_msat,
cp1.max_htlc_msat AS policy1_max_htlc_msat,
cp1.last_update AS policy1_last_update,
cp1.disabled AS policy1_disabled,
cp1.inbound_base_fee_msat AS policy1_inbound_base_fee_msat,
cp1.inbound_fee_rate_milli_msat AS policy1_inbound_fee_rate_milli_msat,
cp1.message_flags AS policy1_message_flags,
cp1.channel_flags AS policy1_channel_flags,
cp1.signature AS policy1_signature,
-- Policy 2
cp2.id AS policy2_id,
cp2.node_id AS policy2_node_id,
cp2.version AS policy2_version,
cp2.timelock AS policy2_timelock,
cp2.fee_ppm AS policy2_fee_ppm,
cp2.base_fee_msat AS policy2_base_fee_msat,
cp2.min_htlc_msat AS policy2_min_htlc_msat,
cp2.max_htlc_msat AS policy2_max_htlc_msat,
cp2.last_update AS policy2_last_update,
cp2.disabled AS policy2_disabled,
cp2.inbound_base_fee_msat AS policy2_inbound_base_fee_msat,
cp2.inbound_fee_rate_milli_msat AS policy2_inbound_fee_rate_milli_msat,
cp2.message_flags AS policy2_message_flags,
cp2.channel_flags AS policy2_channel_flags,
cp2.signature AS policy2_signature
FROM graph_channels c
JOIN graph_nodes n1 ON c.node_id_1 = n1.id
JOIN graph_nodes n2 ON c.node_id_2 = n2.id
LEFT JOIN graph_channel_policies cp1
ON cp1.channel_id = c.id AND cp1.node_id = c.node_id_1 AND cp1.version = c.version
LEFT JOIN graph_channel_policies cp2
ON cp2.channel_id = c.id AND cp2.node_id = c.node_id_2 AND cp2.version = c.version
WHERE c.id IN (/*SLICE:ids*/?)
`
type GetChannelsByIDsRow struct {
GraphChannel GraphChannel
Node1ID int64
Node1PubKey []byte
Node2ID int64
Node2PubKey []byte
Policy1ID sql.NullInt64
Policy1NodeID sql.NullInt64
Policy1Version sql.NullInt16
Policy1Timelock sql.NullInt32
Policy1FeePpm sql.NullInt64
Policy1BaseFeeMsat sql.NullInt64
Policy1MinHtlcMsat sql.NullInt64
Policy1MaxHtlcMsat sql.NullInt64
Policy1LastUpdate sql.NullInt64
Policy1Disabled sql.NullBool
Policy1InboundBaseFeeMsat sql.NullInt64
Policy1InboundFeeRateMilliMsat sql.NullInt64
Policy1MessageFlags sql.NullInt16
Policy1ChannelFlags sql.NullInt16
Policy1Signature []byte
Policy2ID sql.NullInt64
Policy2NodeID sql.NullInt64
Policy2Version sql.NullInt16
Policy2Timelock sql.NullInt32
Policy2FeePpm sql.NullInt64
Policy2BaseFeeMsat sql.NullInt64
Policy2MinHtlcMsat sql.NullInt64
Policy2MaxHtlcMsat sql.NullInt64
Policy2LastUpdate sql.NullInt64
Policy2Disabled sql.NullBool
Policy2InboundBaseFeeMsat sql.NullInt64
Policy2InboundFeeRateMilliMsat sql.NullInt64
Policy2MessageFlags sql.NullInt16
Policy2ChannelFlags sql.NullInt16
Policy2Signature []byte
}
func (q *Queries) GetChannelsByIDs(ctx context.Context, ids []int64) ([]GetChannelsByIDsRow, error) {
query := getChannelsByIDs
var queryParams []interface{}
if len(ids) > 0 {
for _, v := range ids {
queryParams = append(queryParams, v)
}
query = strings.Replace(query, "/*SLICE:ids*/?", makeQueryParams(len(queryParams), len(ids)), 1)
} else {
query = strings.Replace(query, "/*SLICE:ids*/?", "NULL", 1)
}
rows, err := q.db.QueryContext(ctx, query, queryParams...)
if err != nil {
return nil, err
}
defer rows.Close()
var items []GetChannelsByIDsRow
for rows.Next() {
var i GetChannelsByIDsRow
if err := rows.Scan(
&i.GraphChannel.ID,
&i.GraphChannel.Version,
&i.GraphChannel.Scid,
&i.GraphChannel.NodeID1,
&i.GraphChannel.NodeID2,
&i.GraphChannel.Outpoint,
&i.GraphChannel.Capacity,
&i.GraphChannel.BitcoinKey1,
&i.GraphChannel.BitcoinKey2,
&i.GraphChannel.Node1Signature,
&i.GraphChannel.Node2Signature,
&i.GraphChannel.Bitcoin1Signature,
&i.GraphChannel.Bitcoin2Signature,
&i.Node1ID,
&i.Node1PubKey,
&i.Node2ID,
&i.Node2PubKey,
&i.Policy1ID,
&i.Policy1NodeID,
&i.Policy1Version,
&i.Policy1Timelock,
&i.Policy1FeePpm,
&i.Policy1BaseFeeMsat,
&i.Policy1MinHtlcMsat,
&i.Policy1MaxHtlcMsat,
&i.Policy1LastUpdate,
&i.Policy1Disabled,
&i.Policy1InboundBaseFeeMsat,
&i.Policy1InboundFeeRateMilliMsat,
&i.Policy1MessageFlags,
&i.Policy1ChannelFlags,
&i.Policy1Signature,
&i.Policy2ID,
&i.Policy2NodeID,
&i.Policy2Version,
&i.Policy2Timelock,
&i.Policy2FeePpm,
&i.Policy2BaseFeeMsat,
&i.Policy2MinHtlcMsat,
&i.Policy2MaxHtlcMsat,
&i.Policy2LastUpdate,
&i.Policy2Disabled,
&i.Policy2InboundBaseFeeMsat,
&i.Policy2InboundFeeRateMilliMsat,
&i.Policy2MessageFlags,
&i.Policy2ChannelFlags,
&i.Policy2Signature,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getChannelsByOutpoints = `-- name: GetChannelsByOutpoints :many
SELECT
c.id, c.version, c.scid, c.node_id_1, c.node_id_2, c.outpoint, c.capacity, c.bitcoin_key_1, c.bitcoin_key_2, c.node_1_signature, c.node_2_signature, c.bitcoin_1_signature, c.bitcoin_2_signature,

View File

@@ -42,6 +42,7 @@ type Querier interface {
GetChannelFeaturesBatch(ctx context.Context, chanIds []int64) ([]GraphChannelFeature, error)
GetChannelPolicyByChannelAndNode(ctx context.Context, arg GetChannelPolicyByChannelAndNodeParams) (GraphChannelPolicy, error)
GetChannelPolicyExtraTypesBatch(ctx context.Context, policyIds []int64) ([]GetChannelPolicyExtraTypesBatchRow, error)
GetChannelsByIDs(ctx context.Context, ids []int64) ([]GetChannelsByIDsRow, error)
GetChannelsByOutpoints(ctx context.Context, outpoints []string) ([]GetChannelsByOutpointsRow, error)
GetChannelsByPolicyLastUpdateRange(ctx context.Context, arg GetChannelsByPolicyLastUpdateRangeParams) ([]GetChannelsByPolicyLastUpdateRangeRow, error)
GetChannelsBySCIDRange(ctx context.Context, arg GetChannelsBySCIDRangeParams) ([]GetChannelsBySCIDRangeRow, error)

View File

@@ -335,6 +335,59 @@ WHERE
c.version = @version
AND c.scid IN (sqlc.slice('scids')/*SLICE:scids*/);
-- name: GetChannelsByIDs :many
SELECT
sqlc.embed(c),
-- Minimal node data.
n1.id AS node1_id,
n1.pub_key AS node1_pub_key,
n2.id AS node2_id,
n2.pub_key AS node2_pub_key,
-- Policy 1
cp1.id AS policy1_id,
cp1.node_id AS policy1_node_id,
cp1.version AS policy1_version,
cp1.timelock AS policy1_timelock,
cp1.fee_ppm AS policy1_fee_ppm,
cp1.base_fee_msat AS policy1_base_fee_msat,
cp1.min_htlc_msat AS policy1_min_htlc_msat,
cp1.max_htlc_msat AS policy1_max_htlc_msat,
cp1.last_update AS policy1_last_update,
cp1.disabled AS policy1_disabled,
cp1.inbound_base_fee_msat AS policy1_inbound_base_fee_msat,
cp1.inbound_fee_rate_milli_msat AS policy1_inbound_fee_rate_milli_msat,
cp1.message_flags AS policy1_message_flags,
cp1.channel_flags AS policy1_channel_flags,
cp1.signature AS policy1_signature,
-- Policy 2
cp2.id AS policy2_id,
cp2.node_id AS policy2_node_id,
cp2.version AS policy2_version,
cp2.timelock AS policy2_timelock,
cp2.fee_ppm AS policy2_fee_ppm,
cp2.base_fee_msat AS policy2_base_fee_msat,
cp2.min_htlc_msat AS policy2_min_htlc_msat,
cp2.max_htlc_msat AS policy2_max_htlc_msat,
cp2.last_update AS policy2_last_update,
cp2.disabled AS policy2_disabled,
cp2.inbound_base_fee_msat AS policy2_inbound_base_fee_msat,
cp2.inbound_fee_rate_milli_msat AS policy2_inbound_fee_rate_milli_msat,
cp2.message_flags AS policy2_message_flags,
cp2.channel_flags AS policy2_channel_flags,
cp2.signature AS policy2_signature
FROM graph_channels c
JOIN graph_nodes n1 ON c.node_id_1 = n1.id
JOIN graph_nodes n2 ON c.node_id_2 = n2.id
LEFT JOIN graph_channel_policies cp1
ON cp1.channel_id = c.id AND cp1.node_id = c.node_id_1 AND cp1.version = c.version
LEFT JOIN graph_channel_policies cp2
ON cp2.channel_id = c.id AND cp2.node_id = c.node_id_2 AND cp2.version = c.version
WHERE c.id IN (sqlc.slice('ids')/*SLICE:ids*/);
-- name: GetChannelsByPolicyLastUpdateRange :many
SELECT
sqlc.embed(c),