diff --git a/docs/release-notes/release-notes-0.20.0.md b/docs/release-notes/release-notes-0.20.0.md index a3e76581e..a6c2b0819 100644 --- a/docs/release-notes/release-notes-0.20.0.md +++ b/docs/release-notes/release-notes-0.20.0.md @@ -85,6 +85,7 @@ circuit. The indices are only available for forwarding events saved after v0.20. * [3](https://github.com/lightningnetwork/lnd/pull/9887) * [4](https://github.com/lightningnetwork/lnd/pull/9931) * [5](https://github.com/lightningnetwork/lnd/pull/9935) + * [6](https://github.com/lightningnetwork/lnd/pull/9936) ## RPC Updates diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index dbd828527..4cce1d9af 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -1277,7 +1277,7 @@ func TestForEachSourceNodeChannel(t *testing.T) { func TestGraphTraversal(t *testing.T) { t.Parallel() - graph := MakeTestGraph(t) + graph := MakeTestGraphNew(t) // We'd like to test some of the graph traversal capabilities within // the DB, so we'll create a series of fake nodes to insert into the @@ -1937,7 +1937,7 @@ func TestChanUpdatesInHorizon(t *testing.T) { t.Parallel() ctx := context.Background() - graph := MakeTestGraph(t) + graph := MakeTestGraphNew(t) // If we issue an arbitrary query before any channel updates are // inserted in the database, we should get zero results. @@ -2727,7 +2727,7 @@ func TestFilterChannelRange(t *testing.T) { t.Parallel() ctx := context.Background() - graph := MakeTestGraph(t) + graph := MakeTestGraphNew(t) // We'll first populate our graph with two nodes. All channels created // below will be made between these two nodes. diff --git a/graph/db/kv_store.go b/graph/db/kv_store.go index 62dda158d..c2c681965 100644 --- a/graph/db/kv_store.go +++ b/graph/db/kv_store.go @@ -2149,9 +2149,14 @@ func (c *KVStore) ChanUpdatesInHorizon(startTime, c.chanCache.insert(chanid, channel) } - log.Debugf("ChanUpdatesInHorizon hit percentage: %f (%d/%d)", - float64(hits)/float64(len(edgesInHorizon)), hits, - len(edgesInHorizon)) + if len(edgesInHorizon) > 0 { + log.Debugf("ChanUpdatesInHorizon hit percentage: %f (%d/%d)", + float64(hits)/float64(len(edgesInHorizon)), hits, + len(edgesInHorizon)) + } else { + log.Debugf("ChanUpdatesInHorizon returned no edges in "+ + "horizon (%s, %s)", startTime, endTime) + } return edgesInHorizon, nil } diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 564ce96c6..3710efa8e 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -7,8 +7,10 @@ import ( "encoding/hex" "errors" "fmt" + "maps" "math" "net" + "slices" "strconv" "sync" "time" @@ -90,6 +92,9 @@ type SQLQueries interface { GetChannelFeaturesAndExtras(ctx context.Context, channelID int64) ([]sqlc.GetChannelFeaturesAndExtrasRow, error) HighestSCID(ctx context.Context, version int16) ([]byte, error) ListChannelsByNodeID(ctx context.Context, arg sqlc.ListChannelsByNodeIDParams) ([]sqlc.ListChannelsByNodeIDRow, error) + ListChannelsWithPoliciesPaginated(ctx context.Context, arg sqlc.ListChannelsWithPoliciesPaginatedParams) ([]sqlc.ListChannelsWithPoliciesPaginatedRow, error) + GetChannelsByPolicyLastUpdateRange(ctx context.Context, arg sqlc.GetChannelsByPolicyLastUpdateRangeParams) ([]sqlc.GetChannelsByPolicyLastUpdateRangeRow, error) + GetPublicV1ChannelsBySCID(ctx context.Context, arg sqlc.GetPublicV1ChannelsBySCIDParams) ([]sqlc.Channel, error) CreateChannelExtraType(ctx context.Context, arg sqlc.CreateChannelExtraTypeParams) error InsertChannelFeature(ctx context.Context, arg sqlc.InsertChannelFeatureParams) error @@ -98,6 +103,7 @@ type SQLQueries interface { Channel Policy table queries. */ UpsertEdgePolicy(ctx context.Context, arg sqlc.UpsertEdgePolicyParams) (int64, error) + GetChannelPolicyByChannelAndNode(ctx context.Context, arg sqlc.GetChannelPolicyByChannelAndNodeParams) (sqlc.ChannelPolicy, error) InsertChanPolicyExtraType(ctx context.Context, arg sqlc.InsertChanPolicyExtraTypeParams) error GetChannelPolicyExtraTypes(ctx context.Context, arg sqlc.GetChannelPolicyExtraTypesParams) ([]sqlc.GetChannelPolicyExtraTypesRow, error) @@ -924,6 +930,469 @@ func (s *SQLStore) ForEachNodeChannel(nodePub route.Vertex, }, sqldb.NoOpReset) } +// ChanUpdatesInHorizon returns all the known channel edges which have at least +// one edge that has an update timestamp within the specified horizon. +// +// NOTE: This is part of the V1Store interface. +func (s *SQLStore) ChanUpdatesInHorizon(startTime, + endTime time.Time) ([]ChannelEdge, error) { + + s.cacheMu.Lock() + defer s.cacheMu.Unlock() + + var ( + ctx = context.TODO() + // To ensure we don't return duplicate ChannelEdges, we'll use + // an additional map to keep track of the edges already seen to + // prevent re-adding it. + edgesSeen = make(map[uint64]struct{}) + edgesToCache = make(map[uint64]ChannelEdge) + edges []ChannelEdge + hits int + ) + err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { + rows, err := db.GetChannelsByPolicyLastUpdateRange( + ctx, sqlc.GetChannelsByPolicyLastUpdateRangeParams{ + Version: int16(ProtocolV1), + StartTime: sqldb.SQLInt64(startTime.Unix()), + EndTime: sqldb.SQLInt64(endTime.Unix()), + }, + ) + if err != nil { + return err + } + + for _, row := range rows { + // If we've already retrieved the info and policies for + // this edge, then we can skip it as we don't need to do + // so again. + chanIDInt := byteOrder.Uint64(row.Channel.Scid) + if _, ok := edgesSeen[chanIDInt]; ok { + continue + } + + if channel, ok := s.chanCache.get(chanIDInt); ok { + hits++ + edgesSeen[chanIDInt] = struct{}{} + edges = append(edges, channel) + + continue + } + + node1, node2, err := buildNodes( + ctx, db, row.Node, row.Node_2, + ) + if err != nil { + return err + } + + channel, err := getAndBuildEdgeInfo( + ctx, db, s.cfg.ChainHash, row.Channel.ID, + row.Channel, node1.PubKeyBytes, + node2.PubKeyBytes, + ) + if err != nil { + return fmt.Errorf("unable to build channel "+ + "info: %w", err) + } + + dbPol1, dbPol2, err := extractChannelPolicies(row) + if err != nil { + return fmt.Errorf("unable to extract channel "+ + "policies: %w", err) + } + + p1, p2, err := getAndBuildChanPolicies( + ctx, db, dbPol1, dbPol2, channel.ChannelID, + node1.PubKeyBytes, node2.PubKeyBytes, + ) + if err != nil { + return fmt.Errorf("unable to build channel "+ + "policies: %w", err) + } + + edgesSeen[chanIDInt] = struct{}{} + chanEdge := ChannelEdge{ + Info: channel, + Policy1: p1, + Policy2: p2, + Node1: node1, + Node2: node2, + } + edges = append(edges, chanEdge) + edgesToCache[chanIDInt] = chanEdge + } + + return nil + }, func() { + edgesSeen = make(map[uint64]struct{}) + edgesToCache = make(map[uint64]ChannelEdge) + edges = nil + }) + if err != nil { + return nil, fmt.Errorf("unable to fetch channels: %w", err) + } + + // Insert any edges loaded from disk into the cache. + for chanid, channel := range edgesToCache { + s.chanCache.insert(chanid, channel) + } + + if len(edges) > 0 { + log.Debugf("ChanUpdatesInHorizon hit percentage: %f (%d/%d)", + float64(hits)/float64(len(edges)), hits, len(edges)) + } else { + log.Debugf("ChanUpdatesInHorizon returned no edges in "+ + "horizon (%s, %s)", startTime, endTime) + } + + return edges, nil +} + +// ForEachNodeCached is similar to forEachNode, but it returns DirectedChannel +// data to the call-back. +// +// NOTE: The callback contents MUST not be modified. +// +// NOTE: part of the V1Store interface. +func (s *SQLStore) ForEachNodeCached(cb func(node route.Vertex, + chans map[uint64]*DirectedChannel) error) error { + + var ctx = context.TODO() + + return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { + return forEachNodeCacheable(ctx, db, func(nodeID int64, + nodePub route.Vertex) error { + + features, err := getNodeFeatures(ctx, db, nodeID) + if err != nil { + return fmt.Errorf("unable to fetch "+ + "node(id=%d) features: %w", nodeID, err) + } + + toNodeCallback := func() route.Vertex { + return nodePub + } + + rows, err := db.ListChannelsByNodeID( + ctx, sqlc.ListChannelsByNodeIDParams{ + Version: int16(ProtocolV1), + NodeID1: nodeID, + }, + ) + if err != nil { + return fmt.Errorf("unable to fetch channels "+ + "of node(id=%d): %w", nodeID, err) + } + + channels := make(map[uint64]*DirectedChannel, len(rows)) + for _, row := range rows { + node1, node2, err := buildNodeVertices( + row.Node1Pubkey, row.Node2Pubkey, + ) + if err != nil { + return err + } + + e, err := getAndBuildEdgeInfo( + ctx, db, s.cfg.ChainHash, + row.Channel.ID, row.Channel, node1, + node2, + ) + if err != nil { + return fmt.Errorf("unable to build "+ + "channel info: %w", err) + } + + dbPol1, dbPol2, err := extractChannelPolicies( + row, + ) + if err != nil { + return fmt.Errorf("unable to "+ + "extract channel "+ + "policies: %w", err) + } + + p1, p2, err := getAndBuildChanPolicies( + ctx, db, dbPol1, dbPol2, e.ChannelID, + node1, node2, + ) + if err != nil { + return fmt.Errorf("unable to "+ + "build channel policies: %w", + err) + } + + // Determine the outgoing and incoming policy + // for this channel and node combo. + outPolicy, inPolicy := p1, p2 + if p1 != nil && p1.ToNode == nodePub { + outPolicy, inPolicy = p2, p1 + } else if p2 != nil && p2.ToNode != nodePub { + outPolicy, inPolicy = p2, p1 + } + + var cachedInPolicy *models.CachedEdgePolicy + if inPolicy != nil { + cachedInPolicy = models.NewCachedPolicy( + p2, + ) + cachedInPolicy.ToNodePubKey = + toNodeCallback + cachedInPolicy.ToNodeFeatures = + features + } + + var inboundFee lnwire.Fee + outPolicy.InboundFee.WhenSome( + func(fee lnwire.Fee) { + inboundFee = fee + }, + ) + + directedChannel := &DirectedChannel{ + ChannelID: e.ChannelID, + IsNode1: nodePub == + e.NodeKey1Bytes, + OtherNode: e.NodeKey2Bytes, + Capacity: e.Capacity, + OutPolicySet: p1 != nil, + InPolicy: cachedInPolicy, + InboundFee: inboundFee, + } + + if nodePub == e.NodeKey2Bytes { + directedChannel.OtherNode = + e.NodeKey1Bytes + } + + channels[e.ChannelID] = directedChannel + } + + return cb(nodePub, channels) + }) + }, sqldb.NoOpReset) +} + +// ForEachChannel iterates through all the channel edges stored within the +// graph and invokes the passed callback for each edge. The callback takes two +// edges as since this is a directed graph, both the in/out edges are visited. +// If the callback returns an error, then the transaction is aborted and the +// iteration stops early. +// +// NOTE: If an edge can't be found, or wasn't advertised, then a nil pointer +// for that particular channel edge routing policy will be passed into the +// callback. +// +// NOTE: part of the V1Store interface. +func (s *SQLStore) ForEachChannel(cb func(*models.ChannelEdgeInfo, + *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error { + + ctx := context.TODO() + + handleChannel := func(db SQLQueries, + row sqlc.ListChannelsWithPoliciesPaginatedRow) error { + + node1, node2, err := buildNodeVertices( + row.Node1Pubkey, row.Node2Pubkey, + ) + if err != nil { + return fmt.Errorf("unable to build node vertices: %w", + err) + } + + edge, err := getAndBuildEdgeInfo( + ctx, db, s.cfg.ChainHash, row.Channel.ID, row.Channel, + node1, node2, + ) + if err != nil { + return fmt.Errorf("unable to build channel info: %w", + err) + } + + dbPol1, dbPol2, err := extractChannelPolicies(row) + if err != nil { + return fmt.Errorf("unable to extract channel "+ + "policies: %w", err) + } + + p1, p2, err := getAndBuildChanPolicies( + ctx, db, dbPol1, dbPol2, edge.ChannelID, node1, node2, + ) + if err != nil { + return fmt.Errorf("unable to build channel "+ + "policies: %w", err) + } + + err = cb(edge, p1, p2) + if err != nil { + return fmt.Errorf("callback failed for channel "+ + "id=%d: %w", edge.ChannelID, err) + } + + return nil + } + + return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { + var lastID int64 + for { + //nolint:ll + rows, err := db.ListChannelsWithPoliciesPaginated( + ctx, sqlc.ListChannelsWithPoliciesPaginatedParams{ + Version: int16(ProtocolV1), + ID: lastID, + Limit: pageSize, + }, + ) + if err != nil { + return err + } + + if len(rows) == 0 { + break + } + + for _, row := range rows { + err := handleChannel(db, row) + if err != nil { + return err + } + + lastID = row.Channel.ID + } + } + + return nil + }, sqldb.NoOpReset) +} + +// FilterChannelRange returns the channel ID's of all known channels which were +// mined in a block height within the passed range. The channel IDs are grouped +// by their common block height. This method can be used to quickly share with a +// peer the set of channels we know of within a particular range to catch them +// up after a period of time offline. If withTimestamps is true then the +// timestamp info of the latest received channel update messages of the channel +// will be included in the response. +// +// NOTE: This is part of the V1Store interface. +func (s *SQLStore) FilterChannelRange(startHeight, endHeight uint32, + withTimestamps bool) ([]BlockChannelRange, error) { + + var ( + ctx = context.TODO() + startSCID = &lnwire.ShortChannelID{ + BlockHeight: startHeight, + } + endSCID = lnwire.ShortChannelID{ + BlockHeight: endHeight, + TxIndex: math.MaxUint32 & 0x00ffffff, + TxPosition: math.MaxUint16, + } + ) + + var chanIDStart [8]byte + byteOrder.PutUint64(chanIDStart[:], startSCID.ToUint64()) + var chanIDEnd [8]byte + byteOrder.PutUint64(chanIDEnd[:], endSCID.ToUint64()) + + // 1) get all channels where channelID is between start and end chan ID. + // 2) skip if not public (ie, no channel_proof) + // 3) collect that channel. + // 4) if timestamps are wanted, fetch both policies for node 1 and node2 + // and add those timestamps to the collected channel. + channelsPerBlock := make(map[uint32][]ChannelUpdateInfo) + err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { + dbChans, err := db.GetPublicV1ChannelsBySCID( + ctx, sqlc.GetPublicV1ChannelsBySCIDParams{ + StartScid: chanIDStart[:], + EndScid: chanIDEnd[:], + }, + ) + if err != nil { + return fmt.Errorf("unable to fetch channel range: %w", + err) + } + + for _, dbChan := range dbChans { + cid := lnwire.NewShortChanIDFromInt( + byteOrder.Uint64(dbChan.Scid), + ) + chanInfo := NewChannelUpdateInfo( + cid, time.Time{}, time.Time{}, + ) + + if !withTimestamps { + channelsPerBlock[cid.BlockHeight] = append( + channelsPerBlock[cid.BlockHeight], + chanInfo, + ) + + continue + } + + //nolint:ll + node1Policy, err := db.GetChannelPolicyByChannelAndNode( + ctx, sqlc.GetChannelPolicyByChannelAndNodeParams{ + Version: int16(ProtocolV1), + ChannelID: dbChan.ID, + NodeID: dbChan.NodeID1, + }, + ) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return fmt.Errorf("unable to fetch node1 "+ + "policy: %w", err) + } else if err == nil { + chanInfo.Node1UpdateTimestamp = time.Unix( + node1Policy.LastUpdate.Int64, 0, + ) + } + + //nolint:ll + node2Policy, err := db.GetChannelPolicyByChannelAndNode( + ctx, sqlc.GetChannelPolicyByChannelAndNodeParams{ + Version: int16(ProtocolV1), + ChannelID: dbChan.ID, + NodeID: dbChan.NodeID2, + }, + ) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return fmt.Errorf("unable to fetch node2 "+ + "policy: %w", err) + } else if err == nil { + chanInfo.Node2UpdateTimestamp = time.Unix( + node2Policy.LastUpdate.Int64, 0, + ) + } + + channelsPerBlock[cid.BlockHeight] = append( + channelsPerBlock[cid.BlockHeight], chanInfo, + ) + } + + return nil + }, func() { + channelsPerBlock = make(map[uint32][]ChannelUpdateInfo) + }) + if err != nil { + return nil, fmt.Errorf("unable to fetch channel range: %w", err) + } + + if len(channelsPerBlock) == 0 { + return nil, nil + } + + // Return the channel ranges in ascending block height order. + blocks := slices.Collect(maps.Keys(channelsPerBlock)) + slices.Sort(blocks) + + return fn.Map(blocks, func(block uint32) BlockChannelRange { + return BlockChannelRange{ + Height: block, + Channels: channelsPerBlock[block], + } + }), nil +} + // forEachNodeDirectedChannel iterates through all channels of a given // node, executing the passed callback on the directed edge representing the // channel and its incoming policy. If the node is not found, no error is @@ -977,12 +1446,7 @@ func forEachNodeDirectedChannel(ctx context.Context, db SQLQueries, err) } - edge, err := buildCacheableChannelInfo( - row.Channel, node1, node2, - ) - if err != nil { - return err - } + edge := buildCacheableChannelInfo(row.Channel, node1, node2) dbPol1, dbPol2, err := extractChannelPolicies(row) if err != nil { @@ -1286,14 +1750,14 @@ func getNodeByPubKey(ctx context.Context, db SQLQueries, // provided database channel row and the public keys of the two nodes // involved in the channel. func buildCacheableChannelInfo(dbChan sqlc.Channel, node1Pub, - node2Pub route.Vertex) (*models.CachedEdgeInfo, error) { + node2Pub route.Vertex) *models.CachedEdgeInfo { return &models.CachedEdgeInfo{ ChannelID: byteOrder.Uint64(dbChan.Scid), NodeKey1Bytes: node1Pub, NodeKey2Bytes: node2Pub, Capacity: btcutil.Amount(dbChan.Capacity.Int64), - }, nil + } } // buildNode constructs a LightningNode instance from the given database node @@ -2302,17 +2766,76 @@ func buildChanPolicy(dbPolicy sqlc.ChannelPolicy, channelID uint64, }, nil } +// buildNodes builds the models.LightningNode instances for the +// given row which is expected to be a sqlc type that contains node information. +func buildNodes(ctx context.Context, db SQLQueries, dbNode1, + dbNode2 sqlc.Node) (*models.LightningNode, *models.LightningNode, + error) { + + node1, err := buildNode(ctx, db, &dbNode1) + if err != nil { + return nil, nil, err + } + + node2, err := buildNode(ctx, db, &dbNode2) + if err != nil { + return nil, nil, err + } + + return node1, node2, nil +} + // extractChannelPolicies extracts the sqlc.ChannelPolicy records from the give // row which is expected to be a sqlc type that contains channel policy // information. It returns two policies, which may be nil if the policy // information is not present in the row. // -//nolint:ll +//nolint:ll,dupl func extractChannelPolicies(row any) (*sqlc.ChannelPolicy, *sqlc.ChannelPolicy, error) { var policy1, policy2 *sqlc.ChannelPolicy switch r := row.(type) { + case sqlc.GetChannelsByPolicyLastUpdateRangeRow: + if r.Policy1ID.Valid { + policy1 = &sqlc.ChannelPolicy{ + ID: r.Policy1ID.Int64, + Version: r.Policy1Version.Int16, + ChannelID: r.Channel.ID, + NodeID: r.Policy1NodeID.Int64, + Timelock: r.Policy1Timelock.Int32, + FeePpm: r.Policy1FeePpm.Int64, + BaseFeeMsat: r.Policy1BaseFeeMsat.Int64, + MinHtlcMsat: r.Policy1MinHtlcMsat.Int64, + MaxHtlcMsat: r.Policy1MaxHtlcMsat, + LastUpdate: r.Policy1LastUpdate, + InboundBaseFeeMsat: r.Policy1InboundBaseFeeMsat, + InboundFeeRateMilliMsat: r.Policy1InboundFeeRateMilliMsat, + Disabled: r.Policy1Disabled, + Signature: r.Policy1Signature, + } + } + if r.Policy2ID.Valid { + policy2 = &sqlc.ChannelPolicy{ + ID: r.Policy2ID.Int64, + Version: r.Policy2Version.Int16, + ChannelID: r.Channel.ID, + NodeID: r.Policy2NodeID.Int64, + Timelock: r.Policy2Timelock.Int32, + FeePpm: r.Policy2FeePpm.Int64, + BaseFeeMsat: r.Policy2BaseFeeMsat.Int64, + MinHtlcMsat: r.Policy2MinHtlcMsat.Int64, + MaxHtlcMsat: r.Policy2MaxHtlcMsat, + LastUpdate: r.Policy2LastUpdate, + InboundBaseFeeMsat: r.Policy2InboundBaseFeeMsat, + InboundFeeRateMilliMsat: r.Policy2InboundFeeRateMilliMsat, + Disabled: r.Policy2Disabled, + Signature: r.Policy2Signature, + } + } + + return policy1, policy2, nil + case sqlc.ListChannelsByNodeIDRow: if r.Policy1ID.Valid { policy1 = &sqlc.ChannelPolicy{ @@ -2351,6 +2874,46 @@ func extractChannelPolicies(row any) (*sqlc.ChannelPolicy, *sqlc.ChannelPolicy, } } + return policy1, policy2, nil + + case sqlc.ListChannelsWithPoliciesPaginatedRow: + if r.Policy1ID.Valid { + policy1 = &sqlc.ChannelPolicy{ + ID: r.Policy1ID.Int64, + Version: r.Policy1Version.Int16, + ChannelID: r.Channel.ID, + NodeID: r.Policy1NodeID.Int64, + Timelock: r.Policy1Timelock.Int32, + FeePpm: r.Policy1FeePpm.Int64, + BaseFeeMsat: r.Policy1BaseFeeMsat.Int64, + MinHtlcMsat: r.Policy1MinHtlcMsat.Int64, + MaxHtlcMsat: r.Policy1MaxHtlcMsat, + LastUpdate: r.Policy1LastUpdate, + InboundBaseFeeMsat: r.Policy1InboundBaseFeeMsat, + InboundFeeRateMilliMsat: r.Policy1InboundFeeRateMilliMsat, + Disabled: r.Policy1Disabled, + Signature: r.Policy1Signature, + } + } + if r.Policy2ID.Valid { + policy2 = &sqlc.ChannelPolicy{ + ID: r.Policy2ID.Int64, + Version: r.Policy2Version.Int16, + ChannelID: r.Channel.ID, + NodeID: r.Policy2NodeID.Int64, + Timelock: r.Policy2Timelock.Int32, + FeePpm: r.Policy2FeePpm.Int64, + BaseFeeMsat: r.Policy2BaseFeeMsat.Int64, + MinHtlcMsat: r.Policy2MinHtlcMsat.Int64, + MaxHtlcMsat: r.Policy2MaxHtlcMsat, + LastUpdate: r.Policy2LastUpdate, + InboundBaseFeeMsat: r.Policy2InboundBaseFeeMsat, + InboundFeeRateMilliMsat: r.Policy2InboundFeeRateMilliMsat, + Disabled: r.Policy2Disabled, + Signature: r.Policy2Signature, + } + } + return policy1, policy2, nil default: return nil, nil, fmt.Errorf("unexpected row type in "+ diff --git a/sqldb/sqlc/graph.sql.go b/sqldb/sqlc/graph.sql.go index 606d40217..76e61fe8c 100644 --- a/sqldb/sqlc/graph.sql.go +++ b/sqldb/sqlc/graph.sql.go @@ -316,6 +316,42 @@ func (q *Queries) GetChannelFeaturesAndExtras(ctx context.Context, channelID int return items, nil } +const getChannelPolicyByChannelAndNode = `-- name: GetChannelPolicyByChannelAndNode :one +SELECT id, version, channel_id, node_id, timelock, fee_ppm, base_fee_msat, min_htlc_msat, max_htlc_msat, last_update, disabled, inbound_base_fee_msat, inbound_fee_rate_milli_msat, signature +FROM channel_policies +WHERE channel_id = $1 + AND node_id = $2 + AND version = $3 +` + +type GetChannelPolicyByChannelAndNodeParams struct { + ChannelID int64 + NodeID int64 + Version int16 +} + +func (q *Queries) GetChannelPolicyByChannelAndNode(ctx context.Context, arg GetChannelPolicyByChannelAndNodeParams) (ChannelPolicy, error) { + row := q.db.QueryRowContext(ctx, getChannelPolicyByChannelAndNode, arg.ChannelID, arg.NodeID, arg.Version) + var i ChannelPolicy + err := row.Scan( + &i.ID, + &i.Version, + &i.ChannelID, + &i.NodeID, + &i.Timelock, + &i.FeePpm, + &i.BaseFeeMsat, + &i.MinHtlcMsat, + &i.MaxHtlcMsat, + &i.LastUpdate, + &i.Disabled, + &i.InboundBaseFeeMsat, + &i.InboundFeeRateMilliMsat, + &i.Signature, + ) + return i, err +} + const getChannelPolicyExtraTypes = `-- name: GetChannelPolicyExtraTypes :many SELECT cp.id AS policy_id, @@ -371,6 +407,178 @@ func (q *Queries) GetChannelPolicyExtraTypes(ctx context.Context, arg GetChannel return items, nil } +const getChannelsByPolicyLastUpdateRange = `-- name: GetChannelsByPolicyLastUpdateRange :many +SELECT + c.id, c.version, c.scid, c.node_id_1, c.node_id_2, c.outpoint, c.capacity, c.bitcoin_key_1, c.bitcoin_key_2, c.node_1_signature, c.node_2_signature, c.bitcoin_1_signature, c.bitcoin_2_signature, + n1.id, n1.version, n1.pub_key, n1.alias, n1.last_update, n1.color, n1.signature, + n2.id, n2.version, n2.pub_key, n2.alias, n2.last_update, n2.color, n2.signature, + + -- Policy 1 (node_id_1) + cp1.id AS policy1_id, + cp1.node_id AS policy1_node_id, + cp1.version AS policy1_version, + cp1.timelock AS policy1_timelock, + cp1.fee_ppm AS policy1_fee_ppm, + cp1.base_fee_msat AS policy1_base_fee_msat, + cp1.min_htlc_msat AS policy1_min_htlc_msat, + cp1.max_htlc_msat AS policy1_max_htlc_msat, + cp1.last_update AS policy1_last_update, + cp1.disabled AS policy1_disabled, + cp1.inbound_base_fee_msat AS policy1_inbound_base_fee_msat, + cp1.inbound_fee_rate_milli_msat AS policy1_inbound_fee_rate_milli_msat, + cp1.signature AS policy1_signature, + + -- Policy 2 (node_id_2) + cp2.id AS policy2_id, + cp2.node_id AS policy2_node_id, + cp2.version AS policy2_version, + cp2.timelock AS policy2_timelock, + cp2.fee_ppm AS policy2_fee_ppm, + cp2.base_fee_msat AS policy2_base_fee_msat, + cp2.min_htlc_msat AS policy2_min_htlc_msat, + cp2.max_htlc_msat AS policy2_max_htlc_msat, + cp2.last_update AS policy2_last_update, + cp2.disabled AS policy2_disabled, + cp2.inbound_base_fee_msat AS policy2_inbound_base_fee_msat, + cp2.inbound_fee_rate_milli_msat AS policy2_inbound_fee_rate_milli_msat, + cp2.signature AS policy2_signature + +FROM channels c + JOIN nodes n1 ON c.node_id_1 = n1.id + JOIN nodes n2 ON c.node_id_2 = n2.id + LEFT JOIN channel_policies cp1 + ON cp1.channel_id = c.id AND cp1.node_id = c.node_id_1 AND cp1.version = c.version + LEFT JOIN channel_policies cp2 + ON cp2.channel_id = c.id AND cp2.node_id = c.node_id_2 AND cp2.version = c.version +WHERE c.version = $1 + AND ( + (cp1.last_update >= $2 AND cp1.last_update < $3) + OR + (cp2.last_update >= $2 AND cp2.last_update < $3) + ) +ORDER BY + CASE + WHEN COALESCE(cp1.last_update, 0) >= COALESCE(cp2.last_update, 0) + THEN COALESCE(cp1.last_update, 0) + ELSE COALESCE(cp2.last_update, 0) + END ASC +` + +type GetChannelsByPolicyLastUpdateRangeParams struct { + Version int16 + StartTime sql.NullInt64 + EndTime sql.NullInt64 +} + +type GetChannelsByPolicyLastUpdateRangeRow struct { + Channel Channel + Node Node + Node_2 Node + Policy1ID sql.NullInt64 + Policy1NodeID sql.NullInt64 + Policy1Version sql.NullInt16 + Policy1Timelock sql.NullInt32 + Policy1FeePpm sql.NullInt64 + Policy1BaseFeeMsat sql.NullInt64 + Policy1MinHtlcMsat sql.NullInt64 + Policy1MaxHtlcMsat sql.NullInt64 + Policy1LastUpdate sql.NullInt64 + Policy1Disabled sql.NullBool + Policy1InboundBaseFeeMsat sql.NullInt64 + Policy1InboundFeeRateMilliMsat sql.NullInt64 + Policy1Signature []byte + Policy2ID sql.NullInt64 + Policy2NodeID sql.NullInt64 + Policy2Version sql.NullInt16 + Policy2Timelock sql.NullInt32 + Policy2FeePpm sql.NullInt64 + Policy2BaseFeeMsat sql.NullInt64 + Policy2MinHtlcMsat sql.NullInt64 + Policy2MaxHtlcMsat sql.NullInt64 + Policy2LastUpdate sql.NullInt64 + Policy2Disabled sql.NullBool + Policy2InboundBaseFeeMsat sql.NullInt64 + Policy2InboundFeeRateMilliMsat sql.NullInt64 + Policy2Signature []byte +} + +func (q *Queries) GetChannelsByPolicyLastUpdateRange(ctx context.Context, arg GetChannelsByPolicyLastUpdateRangeParams) ([]GetChannelsByPolicyLastUpdateRangeRow, error) { + rows, err := q.db.QueryContext(ctx, getChannelsByPolicyLastUpdateRange, arg.Version, arg.StartTime, arg.EndTime) + if err != nil { + return nil, err + } + defer rows.Close() + var items []GetChannelsByPolicyLastUpdateRangeRow + for rows.Next() { + var i GetChannelsByPolicyLastUpdateRangeRow + if err := rows.Scan( + &i.Channel.ID, + &i.Channel.Version, + &i.Channel.Scid, + &i.Channel.NodeID1, + &i.Channel.NodeID2, + &i.Channel.Outpoint, + &i.Channel.Capacity, + &i.Channel.BitcoinKey1, + &i.Channel.BitcoinKey2, + &i.Channel.Node1Signature, + &i.Channel.Node2Signature, + &i.Channel.Bitcoin1Signature, + &i.Channel.Bitcoin2Signature, + &i.Node.ID, + &i.Node.Version, + &i.Node.PubKey, + &i.Node.Alias, + &i.Node.LastUpdate, + &i.Node.Color, + &i.Node.Signature, + &i.Node_2.ID, + &i.Node_2.Version, + &i.Node_2.PubKey, + &i.Node_2.Alias, + &i.Node_2.LastUpdate, + &i.Node_2.Color, + &i.Node_2.Signature, + &i.Policy1ID, + &i.Policy1NodeID, + &i.Policy1Version, + &i.Policy1Timelock, + &i.Policy1FeePpm, + &i.Policy1BaseFeeMsat, + &i.Policy1MinHtlcMsat, + &i.Policy1MaxHtlcMsat, + &i.Policy1LastUpdate, + &i.Policy1Disabled, + &i.Policy1InboundBaseFeeMsat, + &i.Policy1InboundFeeRateMilliMsat, + &i.Policy1Signature, + &i.Policy2ID, + &i.Policy2NodeID, + &i.Policy2Version, + &i.Policy2Timelock, + &i.Policy2FeePpm, + &i.Policy2BaseFeeMsat, + &i.Policy2MinHtlcMsat, + &i.Policy2MaxHtlcMsat, + &i.Policy2LastUpdate, + &i.Policy2Disabled, + &i.Policy2InboundBaseFeeMsat, + &i.Policy2InboundFeeRateMilliMsat, + &i.Policy2Signature, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const getExtraNodeTypes = `-- name: GetExtraNodeTypes :many SELECT node_id, type, value FROM node_extra_types @@ -595,6 +803,56 @@ func (q *Queries) GetNodesByLastUpdateRange(ctx context.Context, arg GetNodesByL return items, nil } +const getPublicV1ChannelsBySCID = `-- name: GetPublicV1ChannelsBySCID :many +SELECT id, version, scid, node_id_1, node_id_2, outpoint, capacity, bitcoin_key_1, bitcoin_key_2, node_1_signature, node_2_signature, bitcoin_1_signature, bitcoin_2_signature +FROM channels +WHERE node_1_signature IS NOT NULL + AND scid >= $1 + AND scid < $2 +` + +type GetPublicV1ChannelsBySCIDParams struct { + StartScid []byte + EndScid []byte +} + +func (q *Queries) GetPublicV1ChannelsBySCID(ctx context.Context, arg GetPublicV1ChannelsBySCIDParams) ([]Channel, error) { + rows, err := q.db.QueryContext(ctx, getPublicV1ChannelsBySCID, arg.StartScid, arg.EndScid) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Channel + for rows.Next() { + var i Channel + if err := rows.Scan( + &i.ID, + &i.Version, + &i.Scid, + &i.NodeID1, + &i.NodeID2, + &i.Outpoint, + &i.Capacity, + &i.BitcoinKey1, + &i.BitcoinKey2, + &i.Node1Signature, + &i.Node2Signature, + &i.Bitcoin1Signature, + &i.Bitcoin2Signature, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const getSourceNodesByVersion = `-- name: GetSourceNodesByVersion :many SELECT sn.node_id, n.pub_key FROM source_nodes sn @@ -898,6 +1156,159 @@ func (q *Queries) ListChannelsByNodeID(ctx context.Context, arg ListChannelsByNo return items, nil } +const listChannelsWithPoliciesPaginated = `-- name: ListChannelsWithPoliciesPaginated :many +SELECT + c.id, c.version, c.scid, c.node_id_1, c.node_id_2, c.outpoint, c.capacity, c.bitcoin_key_1, c.bitcoin_key_2, c.node_1_signature, c.node_2_signature, c.bitcoin_1_signature, c.bitcoin_2_signature, + + -- Join node pubkeys + n1.pub_key AS node1_pubkey, + n2.pub_key AS node2_pubkey, + + -- Node 1 policy + cp1.id AS policy_1_id, + cp1.node_id AS policy_1_node_id, + cp1.version AS policy_1_version, + cp1.timelock AS policy_1_timelock, + cp1.fee_ppm AS policy_1_fee_ppm, + cp1.base_fee_msat AS policy_1_base_fee_msat, + cp1.min_htlc_msat AS policy_1_min_htlc_msat, + cp1.max_htlc_msat AS policy_1_max_htlc_msat, + cp1.last_update AS policy_1_last_update, + cp1.disabled AS policy_1_disabled, + cp1.inbound_base_fee_msat AS policy1_inbound_base_fee_msat, + cp1.inbound_fee_rate_milli_msat AS policy1_inbound_fee_rate_milli_msat, + cp1.signature AS policy_1_signature, + + -- Node 2 policy + cp2.id AS policy_2_id, + cp2.node_id AS policy_2_node_id, + cp2.version AS policy_2_version, + cp2.timelock AS policy_2_timelock, + cp2.fee_ppm AS policy_2_fee_ppm, + cp2.base_fee_msat AS policy_2_base_fee_msat, + cp2.min_htlc_msat AS policy_2_min_htlc_msat, + cp2.max_htlc_msat AS policy_2_max_htlc_msat, + cp2.last_update AS policy_2_last_update, + cp2.disabled AS policy_2_disabled, + cp2.inbound_base_fee_msat AS policy2_inbound_base_fee_msat, + cp2.inbound_fee_rate_milli_msat AS policy2_inbound_fee_rate_milli_msat, + cp2.signature AS policy_2_signature + +FROM channels c +JOIN nodes n1 ON c.node_id_1 = n1.id +JOIN nodes n2 ON c.node_id_2 = n2.id +LEFT JOIN channel_policies cp1 + ON cp1.channel_id = c.id AND cp1.node_id = c.node_id_1 AND cp1.version = c.version +LEFT JOIN channel_policies cp2 + ON cp2.channel_id = c.id AND cp2.node_id = c.node_id_2 AND cp2.version = c.version +WHERE c.version = $1 AND c.id > $2 +ORDER BY c.id +LIMIT $3 +` + +type ListChannelsWithPoliciesPaginatedParams struct { + Version int16 + ID int64 + Limit int32 +} + +type ListChannelsWithPoliciesPaginatedRow struct { + Channel Channel + Node1Pubkey []byte + Node2Pubkey []byte + Policy1ID sql.NullInt64 + Policy1NodeID sql.NullInt64 + Policy1Version sql.NullInt16 + Policy1Timelock sql.NullInt32 + Policy1FeePpm sql.NullInt64 + Policy1BaseFeeMsat sql.NullInt64 + Policy1MinHtlcMsat sql.NullInt64 + Policy1MaxHtlcMsat sql.NullInt64 + Policy1LastUpdate sql.NullInt64 + Policy1Disabled sql.NullBool + Policy1InboundBaseFeeMsat sql.NullInt64 + Policy1InboundFeeRateMilliMsat sql.NullInt64 + Policy1Signature []byte + Policy2ID sql.NullInt64 + Policy2NodeID sql.NullInt64 + Policy2Version sql.NullInt16 + Policy2Timelock sql.NullInt32 + Policy2FeePpm sql.NullInt64 + Policy2BaseFeeMsat sql.NullInt64 + Policy2MinHtlcMsat sql.NullInt64 + Policy2MaxHtlcMsat sql.NullInt64 + Policy2LastUpdate sql.NullInt64 + Policy2Disabled sql.NullBool + Policy2InboundBaseFeeMsat sql.NullInt64 + Policy2InboundFeeRateMilliMsat sql.NullInt64 + Policy2Signature []byte +} + +func (q *Queries) ListChannelsWithPoliciesPaginated(ctx context.Context, arg ListChannelsWithPoliciesPaginatedParams) ([]ListChannelsWithPoliciesPaginatedRow, error) { + rows, err := q.db.QueryContext(ctx, listChannelsWithPoliciesPaginated, arg.Version, arg.ID, arg.Limit) + if err != nil { + return nil, err + } + defer rows.Close() + var items []ListChannelsWithPoliciesPaginatedRow + for rows.Next() { + var i ListChannelsWithPoliciesPaginatedRow + if err := rows.Scan( + &i.Channel.ID, + &i.Channel.Version, + &i.Channel.Scid, + &i.Channel.NodeID1, + &i.Channel.NodeID2, + &i.Channel.Outpoint, + &i.Channel.Capacity, + &i.Channel.BitcoinKey1, + &i.Channel.BitcoinKey2, + &i.Channel.Node1Signature, + &i.Channel.Node2Signature, + &i.Channel.Bitcoin1Signature, + &i.Channel.Bitcoin2Signature, + &i.Node1Pubkey, + &i.Node2Pubkey, + &i.Policy1ID, + &i.Policy1NodeID, + &i.Policy1Version, + &i.Policy1Timelock, + &i.Policy1FeePpm, + &i.Policy1BaseFeeMsat, + &i.Policy1MinHtlcMsat, + &i.Policy1MaxHtlcMsat, + &i.Policy1LastUpdate, + &i.Policy1Disabled, + &i.Policy1InboundBaseFeeMsat, + &i.Policy1InboundFeeRateMilliMsat, + &i.Policy1Signature, + &i.Policy2ID, + &i.Policy2NodeID, + &i.Policy2Version, + &i.Policy2Timelock, + &i.Policy2FeePpm, + &i.Policy2BaseFeeMsat, + &i.Policy2MinHtlcMsat, + &i.Policy2MaxHtlcMsat, + &i.Policy2LastUpdate, + &i.Policy2Disabled, + &i.Policy2InboundBaseFeeMsat, + &i.Policy2InboundFeeRateMilliMsat, + &i.Policy2Signature, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const listNodeIDsAndPubKeys = `-- name: ListNodeIDsAndPubKeys :many SELECT id, pub_key FROM nodes diff --git a/sqldb/sqlc/migrations/000007_graph.down.sql b/sqldb/sqlc/migrations/000007_graph.down.sql index 5b42a3a9b..0990705ef 100644 --- a/sqldb/sqlc/migrations/000007_graph.down.sql +++ b/sqldb/sqlc/migrations/000007_graph.down.sql @@ -3,6 +3,7 @@ DROP INDEX IF EXISTS nodes_unique; DROP INDEX IF EXISTS node_extra_types_unique; DROP INDEX IF EXISTS node_features_unique; DROP INDEX IF EXISTS node_addresses_unique; +DROP INDEX IF EXISTS node_last_update_idx; DROP INDEX IF EXISTS source_nodes_unique; DROP INDEX IF EXISTS channels_node_id_1_idx; DROP INDEX IF EXISTS channels_node_id_2_idx; @@ -12,6 +13,7 @@ DROP INDEX IF EXISTS channel_features_unique; DROP INDEX IF EXISTS channel_extra_types_unique; DROP INDEX IF EXISTS channel_policies_unique; DROP INDEX IF EXISTS channel_policy_extra_types_unique; +DROP INDEX IF EXISTS channel_policy_last_update_idx; -- Drop tables in order of reverse dependencies. DROP TABLE IF EXISTS channel_policy_extra_types; diff --git a/sqldb/sqlc/migrations/000007_graph.up.sql b/sqldb/sqlc/migrations/000007_graph.up.sql index b52dd4508..0d179f1d3 100644 --- a/sqldb/sqlc/migrations/000007_graph.up.sql +++ b/sqldb/sqlc/migrations/000007_graph.up.sql @@ -37,6 +37,7 @@ CREATE TABLE IF NOT EXISTS nodes ( CREATE UNIQUE INDEX IF NOT EXISTS nodes_unique ON nodes ( pub_key, version ); +CREATE INDEX IF NOT EXISTS node_last_update_idx ON nodes(last_update); -- node_extra_types stores any extra TLV fields covered by a node announcement that -- we do not have an explicit column for in the nodes table. @@ -273,6 +274,7 @@ CREATE TABLE IF NOT EXISTS channel_policies ( CREATE UNIQUE INDEX IF NOT EXISTS channel_policies_unique ON channel_policies ( channel_id, node_id, version ); +CREATE INDEX IF NOT EXISTS channel_policy_last_update_idx ON channel_policies(last_update); -- channel_policy_extra_types stores any extra TLV fields covered by a channel -- update that we do not have an explicit column for in the channel_policies diff --git a/sqldb/sqlc/querier.go b/sqldb/sqlc/querier.go index c0abde3c2..8174abcd9 100644 --- a/sqldb/sqlc/querier.go +++ b/sqldb/sqlc/querier.go @@ -30,7 +30,9 @@ type Querier interface { GetChannelAndNodesBySCID(ctx context.Context, arg GetChannelAndNodesBySCIDParams) (GetChannelAndNodesBySCIDRow, error) GetChannelBySCID(ctx context.Context, arg GetChannelBySCIDParams) (Channel, error) GetChannelFeaturesAndExtras(ctx context.Context, channelID int64) ([]GetChannelFeaturesAndExtrasRow, error) + GetChannelPolicyByChannelAndNode(ctx context.Context, arg GetChannelPolicyByChannelAndNodeParams) (ChannelPolicy, error) GetChannelPolicyExtraTypes(ctx context.Context, arg GetChannelPolicyExtraTypesParams) ([]GetChannelPolicyExtraTypesRow, error) + GetChannelsByPolicyLastUpdateRange(ctx context.Context, arg GetChannelsByPolicyLastUpdateRangeParams) ([]GetChannelsByPolicyLastUpdateRangeRow, error) GetDatabaseVersion(ctx context.Context) (int32, error) GetExtraNodeTypes(ctx context.Context, nodeID int64) ([]NodeExtraType, error) // This method may return more than one invoice if filter using multiple fields @@ -50,6 +52,7 @@ type Querier interface { GetNodeFeaturesByPubKey(ctx context.Context, arg GetNodeFeaturesByPubKeyParams) ([]int32, error) GetNodeIDByPubKey(ctx context.Context, arg GetNodeIDByPubKeyParams) (int64, error) GetNodesByLastUpdateRange(ctx context.Context, arg GetNodesByLastUpdateRangeParams) ([]Node, error) + GetPublicV1ChannelsBySCID(ctx context.Context, arg GetPublicV1ChannelsBySCIDParams) ([]Channel, error) GetSourceNodesByVersion(ctx context.Context, version int16) ([]GetSourceNodesByVersionRow, error) HighestSCID(ctx context.Context, version int16) ([]byte, error) InsertAMPSubInvoice(ctx context.Context, arg InsertAMPSubInvoiceParams) error @@ -65,6 +68,7 @@ type Querier interface { InsertNodeAddress(ctx context.Context, arg InsertNodeAddressParams) error InsertNodeFeature(ctx context.Context, arg InsertNodeFeatureParams) error ListChannelsByNodeID(ctx context.Context, arg ListChannelsByNodeIDParams) ([]ListChannelsByNodeIDRow, error) + ListChannelsWithPoliciesPaginated(ctx context.Context, arg ListChannelsWithPoliciesPaginatedParams) ([]ListChannelsWithPoliciesPaginatedRow, error) ListNodeIDsAndPubKeys(ctx context.Context, arg ListNodeIDsAndPubKeysParams) ([]ListNodeIDsAndPubKeysRow, error) ListNodesPaginated(ctx context.Context, arg ListNodesPaginatedParams) ([]Node, error) NextInvoiceSettleIndex(ctx context.Context) (int64, error) diff --git a/sqldb/sqlc/queries/graph.sql b/sqldb/sqlc/queries/graph.sql index 510195906..5560ec90d 100644 --- a/sqldb/sqlc/queries/graph.sql +++ b/sqldb/sqlc/queries/graph.sql @@ -206,6 +206,62 @@ SELECT FROM channel_extra_types cet WHERE cet.channel_id = $1; +-- name: GetChannelsByPolicyLastUpdateRange :many +SELECT + sqlc.embed(c), + sqlc.embed(n1), + sqlc.embed(n2), + + -- Policy 1 (node_id_1) + cp1.id AS policy1_id, + cp1.node_id AS policy1_node_id, + cp1.version AS policy1_version, + cp1.timelock AS policy1_timelock, + cp1.fee_ppm AS policy1_fee_ppm, + cp1.base_fee_msat AS policy1_base_fee_msat, + cp1.min_htlc_msat AS policy1_min_htlc_msat, + cp1.max_htlc_msat AS policy1_max_htlc_msat, + cp1.last_update AS policy1_last_update, + cp1.disabled AS policy1_disabled, + cp1.inbound_base_fee_msat AS policy1_inbound_base_fee_msat, + cp1.inbound_fee_rate_milli_msat AS policy1_inbound_fee_rate_milli_msat, + cp1.signature AS policy1_signature, + + -- Policy 2 (node_id_2) + cp2.id AS policy2_id, + cp2.node_id AS policy2_node_id, + cp2.version AS policy2_version, + cp2.timelock AS policy2_timelock, + cp2.fee_ppm AS policy2_fee_ppm, + cp2.base_fee_msat AS policy2_base_fee_msat, + cp2.min_htlc_msat AS policy2_min_htlc_msat, + cp2.max_htlc_msat AS policy2_max_htlc_msat, + cp2.last_update AS policy2_last_update, + cp2.disabled AS policy2_disabled, + cp2.inbound_base_fee_msat AS policy2_inbound_base_fee_msat, + cp2.inbound_fee_rate_milli_msat AS policy2_inbound_fee_rate_milli_msat, + cp2.signature AS policy2_signature + +FROM channels c + JOIN nodes n1 ON c.node_id_1 = n1.id + JOIN nodes n2 ON c.node_id_2 = n2.id + LEFT JOIN channel_policies cp1 + ON cp1.channel_id = c.id AND cp1.node_id = c.node_id_1 AND cp1.version = c.version + LEFT JOIN channel_policies cp2 + ON cp2.channel_id = c.id AND cp2.node_id = c.node_id_2 AND cp2.version = c.version +WHERE c.version = @version + AND ( + (cp1.last_update >= @start_time AND cp1.last_update < @end_time) + OR + (cp2.last_update >= @start_time AND cp2.last_update < @end_time) + ) +ORDER BY + CASE + WHEN COALESCE(cp1.last_update, 0) >= COALESCE(cp2.last_update, 0) + THEN COALESCE(cp1.last_update, 0) + ELSE COALESCE(cp2.last_update, 0) + END ASC; + -- name: HighestSCID :one SELECT scid FROM channels @@ -261,6 +317,62 @@ FROM channels c WHERE c.version = $1 AND (c.node_id_1 = $2 OR c.node_id_2 = $2); +-- name: GetPublicV1ChannelsBySCID :many +SELECT * +FROM channels +WHERE node_1_signature IS NOT NULL + AND scid >= @start_scid + AND scid < @end_scid; + +-- name: ListChannelsWithPoliciesPaginated :many +SELECT + sqlc.embed(c), + + -- Join node pubkeys + n1.pub_key AS node1_pubkey, + n2.pub_key AS node2_pubkey, + + -- Node 1 policy + cp1.id AS policy_1_id, + cp1.node_id AS policy_1_node_id, + cp1.version AS policy_1_version, + cp1.timelock AS policy_1_timelock, + cp1.fee_ppm AS policy_1_fee_ppm, + cp1.base_fee_msat AS policy_1_base_fee_msat, + cp1.min_htlc_msat AS policy_1_min_htlc_msat, + cp1.max_htlc_msat AS policy_1_max_htlc_msat, + cp1.last_update AS policy_1_last_update, + cp1.disabled AS policy_1_disabled, + cp1.inbound_base_fee_msat AS policy1_inbound_base_fee_msat, + cp1.inbound_fee_rate_milli_msat AS policy1_inbound_fee_rate_milli_msat, + cp1.signature AS policy_1_signature, + + -- Node 2 policy + cp2.id AS policy_2_id, + cp2.node_id AS policy_2_node_id, + cp2.version AS policy_2_version, + cp2.timelock AS policy_2_timelock, + cp2.fee_ppm AS policy_2_fee_ppm, + cp2.base_fee_msat AS policy_2_base_fee_msat, + cp2.min_htlc_msat AS policy_2_min_htlc_msat, + cp2.max_htlc_msat AS policy_2_max_htlc_msat, + cp2.last_update AS policy_2_last_update, + cp2.disabled AS policy_2_disabled, + cp2.inbound_base_fee_msat AS policy2_inbound_base_fee_msat, + cp2.inbound_fee_rate_milli_msat AS policy2_inbound_fee_rate_milli_msat, + cp2.signature AS policy_2_signature + +FROM channels c +JOIN nodes n1 ON c.node_id_1 = n1.id +JOIN nodes n2 ON c.node_id_2 = n2.id +LEFT JOIN channel_policies cp1 + ON cp1.channel_id = c.id AND cp1.node_id = c.node_id_1 AND cp1.version = c.version +LEFT JOIN channel_policies cp2 + ON cp2.channel_id = c.id AND cp2.node_id = c.node_id_2 AND cp2.version = c.version +WHERE c.version = $1 AND c.id > $2 +ORDER BY c.id +LIMIT $3; + /* ───────────────────────────────────────────── channel_features table queries ───────────────────────────────────────────── @@ -315,6 +427,13 @@ ON CONFLICT (channel_id, node_id, version) WHERE EXCLUDED.last_update > channel_policies.last_update RETURNING id; +-- name: GetChannelPolicyByChannelAndNode :one +SELECT * +FROM channel_policies +WHERE channel_id = $1 + AND node_id = $2 + AND version = $3; + /* ───────────────────────────────────────────── channel_policy_extra_types table queries ─────────────────────────────────────────────