graph/db+sqldb: implement ForEachSourceNodeChannel

In this commit, the ForEachSourceNodeChannel implementation of the
SQLStore is added. Since this is the first method of the SQLStore that
fetches channel and policy info, it also adds all the helpers that are
required to do so. These will be re-used in upcoming commits as more
"For"-type methods are added.

With this implementation, we convert the `TestForEachSourceNodeChannel`
such that it is run against SQL backends.
This commit is contained in:
Elle Mouton
2025-06-11 15:40:10 +02:00
parent 3ed53cdcba
commit f89e3ceced
6 changed files with 797 additions and 1 deletions

View File

@ -257,6 +257,120 @@ func (q *Queries) GetChannelBySCID(ctx context.Context, arg GetChannelBySCIDPara
return i, err
}
const getChannelFeaturesAndExtras = `-- name: GetChannelFeaturesAndExtras :many
SELECT
cf.channel_id,
true AS is_feature,
cf.feature_bit AS feature_bit,
NULL AS extra_key,
NULL AS value
FROM channel_features cf
WHERE cf.channel_id = $1
UNION ALL
SELECT
cet.channel_id,
false AS is_feature,
0 AS feature_bit,
cet.type AS extra_key,
cet.value AS value
FROM channel_extra_types cet
WHERE cet.channel_id = $1
`
type GetChannelFeaturesAndExtrasRow struct {
ChannelID int64
IsFeature bool
FeatureBit int32
ExtraKey interface{}
Value interface{}
}
func (q *Queries) GetChannelFeaturesAndExtras(ctx context.Context, channelID int64) ([]GetChannelFeaturesAndExtrasRow, error) {
rows, err := q.db.QueryContext(ctx, getChannelFeaturesAndExtras, channelID)
if err != nil {
return nil, err
}
defer rows.Close()
var items []GetChannelFeaturesAndExtrasRow
for rows.Next() {
var i GetChannelFeaturesAndExtrasRow
if err := rows.Scan(
&i.ChannelID,
&i.IsFeature,
&i.FeatureBit,
&i.ExtraKey,
&i.Value,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getChannelPolicyExtraTypes = `-- name: GetChannelPolicyExtraTypes :many
SELECT
cp.id AS policy_id,
cp.channel_id,
cp.node_id,
cpet.type,
cpet.value
FROM channel_policies cp
JOIN channel_policy_extra_types cpet
ON cp.id = cpet.channel_policy_id
WHERE cp.id = $1 OR cp.id = $2
`
type GetChannelPolicyExtraTypesParams struct {
ID int64
ID_2 int64
}
type GetChannelPolicyExtraTypesRow struct {
PolicyID int64
ChannelID int64
NodeID int64
Type int64
Value []byte
}
func (q *Queries) GetChannelPolicyExtraTypes(ctx context.Context, arg GetChannelPolicyExtraTypesParams) ([]GetChannelPolicyExtraTypesRow, error) {
rows, err := q.db.QueryContext(ctx, getChannelPolicyExtraTypes, arg.ID, arg.ID_2)
if err != nil {
return nil, err
}
defer rows.Close()
var items []GetChannelPolicyExtraTypesRow
for rows.Next() {
var i GetChannelPolicyExtraTypesRow
if err := rows.Scan(
&i.PolicyID,
&i.ChannelID,
&i.NodeID,
&i.Type,
&i.Value,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getExtraNodeTypes = `-- name: GetExtraNodeTypes :many
SELECT node_id, type, value
FROM node_extra_types
@ -614,6 +728,158 @@ func (q *Queries) InsertNodeFeature(ctx context.Context, arg InsertNodeFeaturePa
return err
}
const listChannelsByNodeID = `-- name: ListChannelsByNodeID :many
SELECT c.id, c.version, c.scid, c.node_id_1, c.node_id_2, c.outpoint, c.capacity, c.bitcoin_key_1, c.bitcoin_key_2, c.node_1_signature, c.node_2_signature, c.bitcoin_1_signature, c.bitcoin_2_signature,
n1.pub_key AS node1_pubkey,
n2.pub_key AS node2_pubkey,
-- Policy 1
-- TODO(elle): use sqlc.embed to embed policy structs
-- once this issue is resolved:
-- https://github.com/sqlc-dev/sqlc/issues/2997
cp1.id AS policy1_id,
cp1.node_id AS policy1_node_id,
cp1.version AS policy1_version,
cp1.timelock AS policy1_timelock,
cp1.fee_ppm AS policy1_fee_ppm,
cp1.base_fee_msat AS policy1_base_fee_msat,
cp1.min_htlc_msat AS policy1_min_htlc_msat,
cp1.max_htlc_msat AS policy1_max_htlc_msat,
cp1.last_update AS policy1_last_update,
cp1.disabled AS policy1_disabled,
cp1.inbound_base_fee_msat AS policy1_inbound_base_fee_msat,
cp1.inbound_fee_rate_milli_msat AS policy1_inbound_fee_rate_milli_msat,
cp1.signature AS policy1_signature,
-- Policy 2
cp2.id AS policy2_id,
cp2.node_id AS policy2_node_id,
cp2.version AS policy2_version,
cp2.timelock AS policy2_timelock,
cp2.fee_ppm AS policy2_fee_ppm,
cp2.base_fee_msat AS policy2_base_fee_msat,
cp2.min_htlc_msat AS policy2_min_htlc_msat,
cp2.max_htlc_msat AS policy2_max_htlc_msat,
cp2.last_update AS policy2_last_update,
cp2.disabled AS policy2_disabled,
cp2.inbound_base_fee_msat AS policy2_inbound_base_fee_msat,
cp2.inbound_fee_rate_milli_msat AS policy2_inbound_fee_rate_milli_msat,
cp2.signature AS policy2_signature
FROM channels c
JOIN nodes n1 ON c.node_id_1 = n1.id
JOIN nodes n2 ON c.node_id_2 = n2.id
LEFT JOIN channel_policies cp1
ON cp1.channel_id = c.id AND cp1.node_id = c.node_id_1 AND cp1.version = c.version
LEFT JOIN channel_policies cp2
ON cp2.channel_id = c.id AND cp2.node_id = c.node_id_2 AND cp2.version = c.version
WHERE c.version = $1
AND (c.node_id_1 = $2 OR c.node_id_2 = $2)
`
type ListChannelsByNodeIDParams struct {
Version int16
NodeID1 int64
}
type ListChannelsByNodeIDRow struct {
Channel Channel
Node1Pubkey []byte
Node2Pubkey []byte
Policy1ID sql.NullInt64
Policy1NodeID sql.NullInt64
Policy1Version sql.NullInt16
Policy1Timelock sql.NullInt32
Policy1FeePpm sql.NullInt64
Policy1BaseFeeMsat sql.NullInt64
Policy1MinHtlcMsat sql.NullInt64
Policy1MaxHtlcMsat sql.NullInt64
Policy1LastUpdate sql.NullInt64
Policy1Disabled sql.NullBool
Policy1InboundBaseFeeMsat sql.NullInt64
Policy1InboundFeeRateMilliMsat sql.NullInt64
Policy1Signature []byte
Policy2ID sql.NullInt64
Policy2NodeID sql.NullInt64
Policy2Version sql.NullInt16
Policy2Timelock sql.NullInt32
Policy2FeePpm sql.NullInt64
Policy2BaseFeeMsat sql.NullInt64
Policy2MinHtlcMsat sql.NullInt64
Policy2MaxHtlcMsat sql.NullInt64
Policy2LastUpdate sql.NullInt64
Policy2Disabled sql.NullBool
Policy2InboundBaseFeeMsat sql.NullInt64
Policy2InboundFeeRateMilliMsat sql.NullInt64
Policy2Signature []byte
}
func (q *Queries) ListChannelsByNodeID(ctx context.Context, arg ListChannelsByNodeIDParams) ([]ListChannelsByNodeIDRow, error) {
rows, err := q.db.QueryContext(ctx, listChannelsByNodeID, arg.Version, arg.NodeID1)
if err != nil {
return nil, err
}
defer rows.Close()
var items []ListChannelsByNodeIDRow
for rows.Next() {
var i ListChannelsByNodeIDRow
if err := rows.Scan(
&i.Channel.ID,
&i.Channel.Version,
&i.Channel.Scid,
&i.Channel.NodeID1,
&i.Channel.NodeID2,
&i.Channel.Outpoint,
&i.Channel.Capacity,
&i.Channel.BitcoinKey1,
&i.Channel.BitcoinKey2,
&i.Channel.Node1Signature,
&i.Channel.Node2Signature,
&i.Channel.Bitcoin1Signature,
&i.Channel.Bitcoin2Signature,
&i.Node1Pubkey,
&i.Node2Pubkey,
&i.Policy1ID,
&i.Policy1NodeID,
&i.Policy1Version,
&i.Policy1Timelock,
&i.Policy1FeePpm,
&i.Policy1BaseFeeMsat,
&i.Policy1MinHtlcMsat,
&i.Policy1MaxHtlcMsat,
&i.Policy1LastUpdate,
&i.Policy1Disabled,
&i.Policy1InboundBaseFeeMsat,
&i.Policy1InboundFeeRateMilliMsat,
&i.Policy1Signature,
&i.Policy2ID,
&i.Policy2NodeID,
&i.Policy2Version,
&i.Policy2Timelock,
&i.Policy2FeePpm,
&i.Policy2BaseFeeMsat,
&i.Policy2MinHtlcMsat,
&i.Policy2MaxHtlcMsat,
&i.Policy2LastUpdate,
&i.Policy2Disabled,
&i.Policy2InboundBaseFeeMsat,
&i.Policy2InboundFeeRateMilliMsat,
&i.Policy2Signature,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const upsertEdgePolicy = `-- name: UpsertEdgePolicy :one
/* ─────────────────────────────────────────────
channel_policies table queries

View File

@ -29,6 +29,8 @@ type Querier interface {
GetAMPInvoiceID(ctx context.Context, setID []byte) (int64, error)
GetChannelAndNodesBySCID(ctx context.Context, arg GetChannelAndNodesBySCIDParams) (GetChannelAndNodesBySCIDRow, error)
GetChannelBySCID(ctx context.Context, arg GetChannelBySCIDParams) (Channel, error)
GetChannelFeaturesAndExtras(ctx context.Context, channelID int64) ([]GetChannelFeaturesAndExtrasRow, error)
GetChannelPolicyExtraTypes(ctx context.Context, arg GetChannelPolicyExtraTypesParams) ([]GetChannelPolicyExtraTypesRow, error)
GetDatabaseVersion(ctx context.Context) (int32, error)
GetExtraNodeTypes(ctx context.Context, nodeID int64) ([]NodeExtraType, error)
// This method may return more than one invoice if filter using multiple fields
@ -61,6 +63,7 @@ type Querier interface {
InsertMigratedInvoice(ctx context.Context, arg InsertMigratedInvoiceParams) (int64, error)
InsertNodeAddress(ctx context.Context, arg InsertNodeAddressParams) error
InsertNodeFeature(ctx context.Context, arg InsertNodeFeatureParams) error
ListChannelsByNodeID(ctx context.Context, arg ListChannelsByNodeIDParams) ([]ListChannelsByNodeIDRow, error)
NextInvoiceSettleIndex(ctx context.Context) (int64, error)
OnAMPSubInvoiceCanceled(ctx context.Context, arg OnAMPSubInvoiceCanceledParams) error
OnAMPSubInvoiceCreated(ctx context.Context, arg OnAMPSubInvoiceCreatedParams) error

View File

@ -165,6 +165,27 @@ FROM channels c
WHERE c.scid = $1
AND c.version = $2;
-- name: GetChannelFeaturesAndExtras :many
SELECT
cf.channel_id,
true AS is_feature,
cf.feature_bit AS feature_bit,
NULL AS extra_key,
NULL AS value
FROM channel_features cf
WHERE cf.channel_id = $1
UNION ALL
SELECT
cet.channel_id,
false AS is_feature,
0 AS feature_bit,
cet.type AS extra_key,
cet.value AS value
FROM channel_extra_types cet
WHERE cet.channel_id = $1;
-- name: HighestSCID :one
SELECT scid
FROM channels
@ -172,6 +193,55 @@ WHERE version = $1
ORDER BY scid DESC
LIMIT 1;
-- name: ListChannelsByNodeID :many
SELECT sqlc.embed(c),
n1.pub_key AS node1_pubkey,
n2.pub_key AS node2_pubkey,
-- Policy 1
-- TODO(elle): use sqlc.embed to embed policy structs
-- once this issue is resolved:
-- https://github.com/sqlc-dev/sqlc/issues/2997
cp1.id AS policy1_id,
cp1.node_id AS policy1_node_id,
cp1.version AS policy1_version,
cp1.timelock AS policy1_timelock,
cp1.fee_ppm AS policy1_fee_ppm,
cp1.base_fee_msat AS policy1_base_fee_msat,
cp1.min_htlc_msat AS policy1_min_htlc_msat,
cp1.max_htlc_msat AS policy1_max_htlc_msat,
cp1.last_update AS policy1_last_update,
cp1.disabled AS policy1_disabled,
cp1.inbound_base_fee_msat AS policy1_inbound_base_fee_msat,
cp1.inbound_fee_rate_milli_msat AS policy1_inbound_fee_rate_milli_msat,
cp1.signature AS policy1_signature,
-- Policy 2
cp2.id AS policy2_id,
cp2.node_id AS policy2_node_id,
cp2.version AS policy2_version,
cp2.timelock AS policy2_timelock,
cp2.fee_ppm AS policy2_fee_ppm,
cp2.base_fee_msat AS policy2_base_fee_msat,
cp2.min_htlc_msat AS policy2_min_htlc_msat,
cp2.max_htlc_msat AS policy2_max_htlc_msat,
cp2.last_update AS policy2_last_update,
cp2.disabled AS policy2_disabled,
cp2.inbound_base_fee_msat AS policy2_inbound_base_fee_msat,
cp2.inbound_fee_rate_milli_msat AS policy2_inbound_fee_rate_milli_msat,
cp2.signature AS policy2_signature
FROM channels c
JOIN nodes n1 ON c.node_id_1 = n1.id
JOIN nodes n2 ON c.node_id_2 = n2.id
LEFT JOIN channel_policies cp1
ON cp1.channel_id = c.id AND cp1.node_id = c.node_id_1 AND cp1.version = c.version
LEFT JOIN channel_policies cp2
ON cp2.channel_id = c.id AND cp2.node_id = c.node_id_2 AND cp2.version = c.version
WHERE c.version = $1
AND (c.node_id_1 = $2 OR c.node_id_2 = $2);
/* ─────────────────────────────────────────────
channel_features table queries
─────────────────────────────────────────────
@ -237,6 +307,18 @@ INSERT INTO channel_policy_extra_types (
)
VALUES ($1, $2, $3);
-- name: GetChannelPolicyExtraTypes :many
SELECT
cp.id AS policy_id,
cp.channel_id,
cp.node_id,
cpet.type,
cpet.value
FROM channel_policies cp
JOIN channel_policy_extra_types cpet
ON cp.id = cpet.channel_policy_id
WHERE cp.id = $1 OR cp.id = $2;
-- name: DeleteChannelPolicyExtraTypes :exec
DELETE FROM channel_policy_extra_types
WHERE channel_policy_id = $1;