diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index c90efac28..d6ca33bcd 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -103,6 +103,7 @@ type SQLQueries interface { HighestSCID(ctx context.Context, version int16) ([]byte, error) ListChannelsByNodeID(ctx context.Context, arg sqlc.ListChannelsByNodeIDParams) ([]sqlc.ListChannelsByNodeIDRow, error) ListChannelsWithPoliciesPaginated(ctx context.Context, arg sqlc.ListChannelsWithPoliciesPaginatedParams) ([]sqlc.ListChannelsWithPoliciesPaginatedRow, error) + ListChannelsWithPoliciesForCachePaginated(ctx context.Context, arg sqlc.ListChannelsWithPoliciesForCachePaginatedParams) ([]sqlc.ListChannelsWithPoliciesForCachePaginatedRow, error) ListChannelsPaginated(ctx context.Context, arg sqlc.ListChannelsPaginatedParams) ([]sqlc.ListChannelsPaginatedRow, error) GetChannelsByPolicyLastUpdateRange(ctx context.Context, arg sqlc.GetChannelsByPolicyLastUpdateRangeParams) ([]sqlc.GetChannelsByPolicyLastUpdateRangeRow, error) GetChannelByOutpointWithPolicies(ctx context.Context, arg sqlc.GetChannelByOutpointWithPoliciesParams) (sqlc.GetChannelByOutpointWithPoliciesRow, error) @@ -1247,8 +1248,8 @@ func (s *SQLStore) ForEachChannelCacheable(cb func(*models.CachedEdgeInfo, ctx := context.TODO() - handleChannel := func(db SQLQueries, - row sqlc.ListChannelsWithPoliciesPaginatedRow) error { + handleChannel := func( + row sqlc.ListChannelsWithPoliciesForCachePaginatedRow) error { node1, node2, err := buildNodeVertices( row.Node1Pubkey, row.Node2Pubkey, @@ -1258,7 +1259,7 @@ func (s *SQLStore) ForEachChannelCacheable(cb func(*models.CachedEdgeInfo, } edge := buildCacheableChannelInfo( - row.GraphChannel, node1, node2, + row.Scid, row.Capacity.Int64, node1, node2, ) dbPol1, dbPol2, err := extractChannelPolicies(row) @@ -1299,8 +1300,8 @@ func (s *SQLStore) ForEachChannelCacheable(cb func(*models.CachedEdgeInfo, lastID := int64(-1) for { //nolint:ll - rows, err := db.ListChannelsWithPoliciesPaginated( - ctx, sqlc.ListChannelsWithPoliciesPaginatedParams{ + rows, err := db.ListChannelsWithPoliciesForCachePaginated( + ctx, sqlc.ListChannelsWithPoliciesForCachePaginatedParams{ Version: int16(ProtocolV1), ID: lastID, Limit: pageSize, @@ -1315,12 +1316,12 @@ func (s *SQLStore) ForEachChannelCacheable(cb func(*models.CachedEdgeInfo, } for _, row := range rows { - err := handleChannel(db, row) + err := handleChannel(row) if err != nil { return err } - lastID = row.GraphChannel.ID + lastID = row.ID } } @@ -3018,7 +3019,8 @@ func forEachNodeDirectedChannel(ctx context.Context, db SQLQueries, } edge := buildCacheableChannelInfo( - row.GraphChannel, node1, node2, + row.GraphChannel.Scid, row.GraphChannel.Capacity.Int64, + node1, node2, ) dbPol1, dbPol2, err := extractChannelPolicies(row) @@ -3321,16 +3323,15 @@ func getNodeByPubKey(ctx context.Context, db SQLQueries, } // buildCacheableChannelInfo builds a models.CachedEdgeInfo instance from the -// provided database channel row and the public keys of the two nodes -// involved in the channel. -func buildCacheableChannelInfo(dbChan sqlc.GraphChannel, node1Pub, +// provided parameters. +func buildCacheableChannelInfo(scid []byte, capacity int64, node1Pub, node2Pub route.Vertex) *models.CachedEdgeInfo { return &models.CachedEdgeInfo{ - ChannelID: byteOrder.Uint64(dbChan.Scid), + ChannelID: byteOrder.Uint64(scid), NodeKey1Bytes: node1Pub, NodeKey2Bytes: node2Pub, - Capacity: btcutil.Amount(dbChan.Capacity.Int64), + Capacity: btcutil.Amount(capacity), } } @@ -4368,6 +4369,38 @@ func extractChannelPolicies(row any) (*sqlc.GraphChannelPolicy, var policy1, policy2 *sqlc.GraphChannelPolicy switch r := row.(type) { + case sqlc.ListChannelsWithPoliciesForCachePaginatedRow: + if r.Policy1Timelock.Valid { + policy1 = &sqlc.GraphChannelPolicy{ + Timelock: r.Policy1Timelock.Int32, + FeePpm: r.Policy1FeePpm.Int64, + BaseFeeMsat: r.Policy1BaseFeeMsat.Int64, + MinHtlcMsat: r.Policy1MinHtlcMsat.Int64, + MaxHtlcMsat: r.Policy1MaxHtlcMsat, + InboundBaseFeeMsat: r.Policy1InboundBaseFeeMsat, + InboundFeeRateMilliMsat: r.Policy1InboundFeeRateMilliMsat, + Disabled: r.Policy1Disabled, + MessageFlags: r.Policy1MessageFlags, + ChannelFlags: r.Policy1ChannelFlags, + } + } + if r.Policy2Timelock.Valid { + policy2 = &sqlc.GraphChannelPolicy{ + Timelock: r.Policy2Timelock.Int32, + FeePpm: r.Policy2FeePpm.Int64, + BaseFeeMsat: r.Policy2BaseFeeMsat.Int64, + MinHtlcMsat: r.Policy2MinHtlcMsat.Int64, + MaxHtlcMsat: r.Policy2MaxHtlcMsat, + InboundBaseFeeMsat: r.Policy2InboundBaseFeeMsat, + InboundFeeRateMilliMsat: r.Policy2InboundFeeRateMilliMsat, + Disabled: r.Policy2Disabled, + MessageFlags: r.Policy2MessageFlags, + ChannelFlags: r.Policy2ChannelFlags, + } + } + + return policy1, policy2, nil + case sqlc.GetChannelsBySCIDWithPoliciesRow: if r.Policy1ID.Valid { policy1 = &sqlc.GraphChannelPolicy{ diff --git a/sqldb/sqlc/graph.sql.go b/sqldb/sqlc/graph.sql.go index 89a92927f..98b120ec0 100644 --- a/sqldb/sqlc/graph.sql.go +++ b/sqldb/sqlc/graph.sql.go @@ -2230,6 +2230,135 @@ func (q *Queries) ListChannelsPaginated(ctx context.Context, arg ListChannelsPag return items, nil } +const listChannelsWithPoliciesForCachePaginated = `-- name: ListChannelsWithPoliciesForCachePaginated :many +SELECT + c.id as id, + c.scid as scid, + c.capacity AS capacity, + + -- Join node pubkeys + n1.pub_key AS node1_pubkey, + n2.pub_key AS node2_pubkey, + + -- Node 1 policy + cp1.timelock AS policy_1_timelock, + cp1.fee_ppm AS policy_1_fee_ppm, + cp1.base_fee_msat AS policy_1_base_fee_msat, + cp1.min_htlc_msat AS policy_1_min_htlc_msat, + cp1.max_htlc_msat AS policy_1_max_htlc_msat, + cp1.disabled AS policy_1_disabled, + cp1.inbound_base_fee_msat AS policy1_inbound_base_fee_msat, + cp1.inbound_fee_rate_milli_msat AS policy1_inbound_fee_rate_milli_msat, + cp1.message_flags AS policy1_message_flags, + cp1.channel_flags AS policy1_channel_flags, + + -- Node 2 policy + cp2.timelock AS policy_2_timelock, + cp2.fee_ppm AS policy_2_fee_ppm, + cp2.base_fee_msat AS policy_2_base_fee_msat, + cp2.min_htlc_msat AS policy_2_min_htlc_msat, + cp2.max_htlc_msat AS policy_2_max_htlc_msat, + cp2.disabled AS policy_2_disabled, + cp2.inbound_base_fee_msat AS policy2_inbound_base_fee_msat, + cp2.inbound_fee_rate_milli_msat AS policy2_inbound_fee_rate_milli_msat, + cp2.message_flags AS policy2_message_flags, + cp2.channel_flags AS policy2_channel_flags + +FROM graph_channels c +JOIN graph_nodes n1 ON c.node_id_1 = n1.id +JOIN graph_nodes n2 ON c.node_id_2 = n2.id +LEFT JOIN graph_channel_policies cp1 + ON cp1.channel_id = c.id AND cp1.node_id = c.node_id_1 AND cp1.version = c.version +LEFT JOIN graph_channel_policies cp2 + ON cp2.channel_id = c.id AND cp2.node_id = c.node_id_2 AND cp2.version = c.version +WHERE c.version = $1 AND c.id > $2 +ORDER BY c.id +LIMIT $3 +` + +type ListChannelsWithPoliciesForCachePaginatedParams struct { + Version int16 + ID int64 + Limit int32 +} + +type ListChannelsWithPoliciesForCachePaginatedRow struct { + ID int64 + Scid []byte + Capacity sql.NullInt64 + Node1Pubkey []byte + Node2Pubkey []byte + Policy1Timelock sql.NullInt32 + Policy1FeePpm sql.NullInt64 + Policy1BaseFeeMsat sql.NullInt64 + Policy1MinHtlcMsat sql.NullInt64 + Policy1MaxHtlcMsat sql.NullInt64 + Policy1Disabled sql.NullBool + Policy1InboundBaseFeeMsat sql.NullInt64 + Policy1InboundFeeRateMilliMsat sql.NullInt64 + Policy1MessageFlags sql.NullInt16 + Policy1ChannelFlags sql.NullInt16 + Policy2Timelock sql.NullInt32 + Policy2FeePpm sql.NullInt64 + Policy2BaseFeeMsat sql.NullInt64 + Policy2MinHtlcMsat sql.NullInt64 + Policy2MaxHtlcMsat sql.NullInt64 + Policy2Disabled sql.NullBool + Policy2InboundBaseFeeMsat sql.NullInt64 + Policy2InboundFeeRateMilliMsat sql.NullInt64 + Policy2MessageFlags sql.NullInt16 + Policy2ChannelFlags sql.NullInt16 +} + +func (q *Queries) ListChannelsWithPoliciesForCachePaginated(ctx context.Context, arg ListChannelsWithPoliciesForCachePaginatedParams) ([]ListChannelsWithPoliciesForCachePaginatedRow, error) { + rows, err := q.db.QueryContext(ctx, listChannelsWithPoliciesForCachePaginated, arg.Version, arg.ID, arg.Limit) + if err != nil { + return nil, err + } + defer rows.Close() + var items []ListChannelsWithPoliciesForCachePaginatedRow + for rows.Next() { + var i ListChannelsWithPoliciesForCachePaginatedRow + if err := rows.Scan( + &i.ID, + &i.Scid, + &i.Capacity, + &i.Node1Pubkey, + &i.Node2Pubkey, + &i.Policy1Timelock, + &i.Policy1FeePpm, + &i.Policy1BaseFeeMsat, + &i.Policy1MinHtlcMsat, + &i.Policy1MaxHtlcMsat, + &i.Policy1Disabled, + &i.Policy1InboundBaseFeeMsat, + &i.Policy1InboundFeeRateMilliMsat, + &i.Policy1MessageFlags, + &i.Policy1ChannelFlags, + &i.Policy2Timelock, + &i.Policy2FeePpm, + &i.Policy2BaseFeeMsat, + &i.Policy2MinHtlcMsat, + &i.Policy2MaxHtlcMsat, + &i.Policy2Disabled, + &i.Policy2InboundBaseFeeMsat, + &i.Policy2InboundFeeRateMilliMsat, + &i.Policy2MessageFlags, + &i.Policy2ChannelFlags, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const listChannelsWithPoliciesPaginated = `-- name: ListChannelsWithPoliciesPaginated :many SELECT c.id, c.version, c.scid, c.node_id_1, c.node_id_2, c.outpoint, c.capacity, c.bitcoin_key_1, c.bitcoin_key_2, c.node_1_signature, c.node_2_signature, c.bitcoin_1_signature, c.bitcoin_2_signature, diff --git a/sqldb/sqlc/querier.go b/sqldb/sqlc/querier.go index 9b9d010b4..3cfb8924e 100644 --- a/sqldb/sqlc/querier.go +++ b/sqldb/sqlc/querier.go @@ -95,6 +95,7 @@ type Querier interface { IsZombieChannel(ctx context.Context, arg IsZombieChannelParams) (bool, error) ListChannelsByNodeID(ctx context.Context, arg ListChannelsByNodeIDParams) ([]ListChannelsByNodeIDRow, error) ListChannelsPaginated(ctx context.Context, arg ListChannelsPaginatedParams) ([]ListChannelsPaginatedRow, error) + ListChannelsWithPoliciesForCachePaginated(ctx context.Context, arg ListChannelsWithPoliciesForCachePaginatedParams) ([]ListChannelsWithPoliciesForCachePaginatedRow, error) ListChannelsWithPoliciesPaginated(ctx context.Context, arg ListChannelsWithPoliciesPaginatedParams) ([]ListChannelsWithPoliciesPaginatedRow, error) ListNodeIDsAndPubKeys(ctx context.Context, arg ListNodeIDsAndPubKeysParams) ([]ListNodeIDsAndPubKeysRow, error) ListNodesPaginated(ctx context.Context, arg ListNodesPaginatedParams) ([]GraphNode, error) diff --git a/sqldb/sqlc/queries/graph.sql b/sqldb/sqlc/queries/graph.sql index 8551a7706..509a9434e 100644 --- a/sqldb/sqlc/queries/graph.sql +++ b/sqldb/sqlc/queries/graph.sql @@ -569,6 +569,51 @@ WHERE c.version = $1 AND c.id > $2 ORDER BY c.id LIMIT $3; +-- name: ListChannelsWithPoliciesForCachePaginated :many +SELECT + c.id as id, + c.scid as scid, + c.capacity AS capacity, + + -- Join node pubkeys + n1.pub_key AS node1_pubkey, + n2.pub_key AS node2_pubkey, + + -- Node 1 policy + cp1.timelock AS policy_1_timelock, + cp1.fee_ppm AS policy_1_fee_ppm, + cp1.base_fee_msat AS policy_1_base_fee_msat, + cp1.min_htlc_msat AS policy_1_min_htlc_msat, + cp1.max_htlc_msat AS policy_1_max_htlc_msat, + cp1.disabled AS policy_1_disabled, + cp1.inbound_base_fee_msat AS policy1_inbound_base_fee_msat, + cp1.inbound_fee_rate_milli_msat AS policy1_inbound_fee_rate_milli_msat, + cp1.message_flags AS policy1_message_flags, + cp1.channel_flags AS policy1_channel_flags, + + -- Node 2 policy + cp2.timelock AS policy_2_timelock, + cp2.fee_ppm AS policy_2_fee_ppm, + cp2.base_fee_msat AS policy_2_base_fee_msat, + cp2.min_htlc_msat AS policy_2_min_htlc_msat, + cp2.max_htlc_msat AS policy_2_max_htlc_msat, + cp2.disabled AS policy_2_disabled, + cp2.inbound_base_fee_msat AS policy2_inbound_base_fee_msat, + cp2.inbound_fee_rate_milli_msat AS policy2_inbound_fee_rate_milli_msat, + cp2.message_flags AS policy2_message_flags, + cp2.channel_flags AS policy2_channel_flags + +FROM graph_channels c +JOIN graph_nodes n1 ON c.node_id_1 = n1.id +JOIN graph_nodes n2 ON c.node_id_2 = n2.id +LEFT JOIN graph_channel_policies cp1 + ON cp1.channel_id = c.id AND cp1.node_id = c.node_id_1 AND cp1.version = c.version +LEFT JOIN graph_channel_policies cp2 + ON cp2.channel_id = c.id AND cp2.node_id = c.node_id_2 AND cp2.version = c.version +WHERE c.version = $1 AND c.id > $2 +ORDER BY c.id +LIMIT $3; + -- name: DeleteChannels :exec DELETE FROM graph_channels WHERE id IN (sqlc.slice('ids')/*SLICE:ids*/);