mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-09-27 18:46:18 +02:00
Merge pull request #9936 from ellemouton/graphSQL12
[12] graph/db: Implement more graph SQLStore methods
This commit is contained in:
@@ -85,6 +85,7 @@ circuit. The indices are only available for forwarding events saved after v0.20.
|
||||
* [3](https://github.com/lightningnetwork/lnd/pull/9887)
|
||||
* [4](https://github.com/lightningnetwork/lnd/pull/9931)
|
||||
* [5](https://github.com/lightningnetwork/lnd/pull/9935)
|
||||
* [6](https://github.com/lightningnetwork/lnd/pull/9936)
|
||||
|
||||
## RPC Updates
|
||||
|
||||
|
@@ -1277,7 +1277,7 @@ func TestForEachSourceNodeChannel(t *testing.T) {
|
||||
func TestGraphTraversal(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
graph := MakeTestGraph(t)
|
||||
graph := MakeTestGraphNew(t)
|
||||
|
||||
// We'd like to test some of the graph traversal capabilities within
|
||||
// the DB, so we'll create a series of fake nodes to insert into the
|
||||
@@ -1937,7 +1937,7 @@ func TestChanUpdatesInHorizon(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := context.Background()
|
||||
|
||||
graph := MakeTestGraph(t)
|
||||
graph := MakeTestGraphNew(t)
|
||||
|
||||
// If we issue an arbitrary query before any channel updates are
|
||||
// inserted in the database, we should get zero results.
|
||||
@@ -2727,7 +2727,7 @@ func TestFilterChannelRange(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := context.Background()
|
||||
|
||||
graph := MakeTestGraph(t)
|
||||
graph := MakeTestGraphNew(t)
|
||||
|
||||
// We'll first populate our graph with two nodes. All channels created
|
||||
// below will be made between these two nodes.
|
||||
|
@@ -2149,9 +2149,14 @@ func (c *KVStore) ChanUpdatesInHorizon(startTime,
|
||||
c.chanCache.insert(chanid, channel)
|
||||
}
|
||||
|
||||
log.Debugf("ChanUpdatesInHorizon hit percentage: %f (%d/%d)",
|
||||
float64(hits)/float64(len(edgesInHorizon)), hits,
|
||||
len(edgesInHorizon))
|
||||
if len(edgesInHorizon) > 0 {
|
||||
log.Debugf("ChanUpdatesInHorizon hit percentage: %f (%d/%d)",
|
||||
float64(hits)/float64(len(edgesInHorizon)), hits,
|
||||
len(edgesInHorizon))
|
||||
} else {
|
||||
log.Debugf("ChanUpdatesInHorizon returned no edges in "+
|
||||
"horizon (%s, %s)", startTime, endTime)
|
||||
}
|
||||
|
||||
return edgesInHorizon, nil
|
||||
}
|
||||
|
@@ -7,8 +7,10 @@ import (
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"maps"
|
||||
"math"
|
||||
"net"
|
||||
"slices"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -90,6 +92,9 @@ type SQLQueries interface {
|
||||
GetChannelFeaturesAndExtras(ctx context.Context, channelID int64) ([]sqlc.GetChannelFeaturesAndExtrasRow, error)
|
||||
HighestSCID(ctx context.Context, version int16) ([]byte, error)
|
||||
ListChannelsByNodeID(ctx context.Context, arg sqlc.ListChannelsByNodeIDParams) ([]sqlc.ListChannelsByNodeIDRow, error)
|
||||
ListChannelsWithPoliciesPaginated(ctx context.Context, arg sqlc.ListChannelsWithPoliciesPaginatedParams) ([]sqlc.ListChannelsWithPoliciesPaginatedRow, error)
|
||||
GetChannelsByPolicyLastUpdateRange(ctx context.Context, arg sqlc.GetChannelsByPolicyLastUpdateRangeParams) ([]sqlc.GetChannelsByPolicyLastUpdateRangeRow, error)
|
||||
GetPublicV1ChannelsBySCID(ctx context.Context, arg sqlc.GetPublicV1ChannelsBySCIDParams) ([]sqlc.Channel, error)
|
||||
|
||||
CreateChannelExtraType(ctx context.Context, arg sqlc.CreateChannelExtraTypeParams) error
|
||||
InsertChannelFeature(ctx context.Context, arg sqlc.InsertChannelFeatureParams) error
|
||||
@@ -98,6 +103,7 @@ type SQLQueries interface {
|
||||
Channel Policy table queries.
|
||||
*/
|
||||
UpsertEdgePolicy(ctx context.Context, arg sqlc.UpsertEdgePolicyParams) (int64, error)
|
||||
GetChannelPolicyByChannelAndNode(ctx context.Context, arg sqlc.GetChannelPolicyByChannelAndNodeParams) (sqlc.ChannelPolicy, error)
|
||||
|
||||
InsertChanPolicyExtraType(ctx context.Context, arg sqlc.InsertChanPolicyExtraTypeParams) error
|
||||
GetChannelPolicyExtraTypes(ctx context.Context, arg sqlc.GetChannelPolicyExtraTypesParams) ([]sqlc.GetChannelPolicyExtraTypesRow, error)
|
||||
@@ -924,6 +930,469 @@ func (s *SQLStore) ForEachNodeChannel(nodePub route.Vertex,
|
||||
}, sqldb.NoOpReset)
|
||||
}
|
||||
|
||||
// ChanUpdatesInHorizon returns all the known channel edges which have at least
|
||||
// one edge that has an update timestamp within the specified horizon.
|
||||
//
|
||||
// NOTE: This is part of the V1Store interface.
|
||||
func (s *SQLStore) ChanUpdatesInHorizon(startTime,
|
||||
endTime time.Time) ([]ChannelEdge, error) {
|
||||
|
||||
s.cacheMu.Lock()
|
||||
defer s.cacheMu.Unlock()
|
||||
|
||||
var (
|
||||
ctx = context.TODO()
|
||||
// To ensure we don't return duplicate ChannelEdges, we'll use
|
||||
// an additional map to keep track of the edges already seen to
|
||||
// prevent re-adding it.
|
||||
edgesSeen = make(map[uint64]struct{})
|
||||
edgesToCache = make(map[uint64]ChannelEdge)
|
||||
edges []ChannelEdge
|
||||
hits int
|
||||
)
|
||||
err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
|
||||
rows, err := db.GetChannelsByPolicyLastUpdateRange(
|
||||
ctx, sqlc.GetChannelsByPolicyLastUpdateRangeParams{
|
||||
Version: int16(ProtocolV1),
|
||||
StartTime: sqldb.SQLInt64(startTime.Unix()),
|
||||
EndTime: sqldb.SQLInt64(endTime.Unix()),
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, row := range rows {
|
||||
// If we've already retrieved the info and policies for
|
||||
// this edge, then we can skip it as we don't need to do
|
||||
// so again.
|
||||
chanIDInt := byteOrder.Uint64(row.Channel.Scid)
|
||||
if _, ok := edgesSeen[chanIDInt]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if channel, ok := s.chanCache.get(chanIDInt); ok {
|
||||
hits++
|
||||
edgesSeen[chanIDInt] = struct{}{}
|
||||
edges = append(edges, channel)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
node1, node2, err := buildNodes(
|
||||
ctx, db, row.Node, row.Node_2,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
channel, err := getAndBuildEdgeInfo(
|
||||
ctx, db, s.cfg.ChainHash, row.Channel.ID,
|
||||
row.Channel, node1.PubKeyBytes,
|
||||
node2.PubKeyBytes,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to build channel "+
|
||||
"info: %w", err)
|
||||
}
|
||||
|
||||
dbPol1, dbPol2, err := extractChannelPolicies(row)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to extract channel "+
|
||||
"policies: %w", err)
|
||||
}
|
||||
|
||||
p1, p2, err := getAndBuildChanPolicies(
|
||||
ctx, db, dbPol1, dbPol2, channel.ChannelID,
|
||||
node1.PubKeyBytes, node2.PubKeyBytes,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to build channel "+
|
||||
"policies: %w", err)
|
||||
}
|
||||
|
||||
edgesSeen[chanIDInt] = struct{}{}
|
||||
chanEdge := ChannelEdge{
|
||||
Info: channel,
|
||||
Policy1: p1,
|
||||
Policy2: p2,
|
||||
Node1: node1,
|
||||
Node2: node2,
|
||||
}
|
||||
edges = append(edges, chanEdge)
|
||||
edgesToCache[chanIDInt] = chanEdge
|
||||
}
|
||||
|
||||
return nil
|
||||
}, func() {
|
||||
edgesSeen = make(map[uint64]struct{})
|
||||
edgesToCache = make(map[uint64]ChannelEdge)
|
||||
edges = nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to fetch channels: %w", err)
|
||||
}
|
||||
|
||||
// Insert any edges loaded from disk into the cache.
|
||||
for chanid, channel := range edgesToCache {
|
||||
s.chanCache.insert(chanid, channel)
|
||||
}
|
||||
|
||||
if len(edges) > 0 {
|
||||
log.Debugf("ChanUpdatesInHorizon hit percentage: %f (%d/%d)",
|
||||
float64(hits)/float64(len(edges)), hits, len(edges))
|
||||
} else {
|
||||
log.Debugf("ChanUpdatesInHorizon returned no edges in "+
|
||||
"horizon (%s, %s)", startTime, endTime)
|
||||
}
|
||||
|
||||
return edges, nil
|
||||
}
|
||||
|
||||
// ForEachNodeCached is similar to forEachNode, but it returns DirectedChannel
|
||||
// data to the call-back.
|
||||
//
|
||||
// NOTE: The callback contents MUST not be modified.
|
||||
//
|
||||
// NOTE: part of the V1Store interface.
|
||||
func (s *SQLStore) ForEachNodeCached(cb func(node route.Vertex,
|
||||
chans map[uint64]*DirectedChannel) error) error {
|
||||
|
||||
var ctx = context.TODO()
|
||||
|
||||
return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
|
||||
return forEachNodeCacheable(ctx, db, func(nodeID int64,
|
||||
nodePub route.Vertex) error {
|
||||
|
||||
features, err := getNodeFeatures(ctx, db, nodeID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to fetch "+
|
||||
"node(id=%d) features: %w", nodeID, err)
|
||||
}
|
||||
|
||||
toNodeCallback := func() route.Vertex {
|
||||
return nodePub
|
||||
}
|
||||
|
||||
rows, err := db.ListChannelsByNodeID(
|
||||
ctx, sqlc.ListChannelsByNodeIDParams{
|
||||
Version: int16(ProtocolV1),
|
||||
NodeID1: nodeID,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to fetch channels "+
|
||||
"of node(id=%d): %w", nodeID, err)
|
||||
}
|
||||
|
||||
channels := make(map[uint64]*DirectedChannel, len(rows))
|
||||
for _, row := range rows {
|
||||
node1, node2, err := buildNodeVertices(
|
||||
row.Node1Pubkey, row.Node2Pubkey,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e, err := getAndBuildEdgeInfo(
|
||||
ctx, db, s.cfg.ChainHash,
|
||||
row.Channel.ID, row.Channel, node1,
|
||||
node2,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to build "+
|
||||
"channel info: %w", err)
|
||||
}
|
||||
|
||||
dbPol1, dbPol2, err := extractChannelPolicies(
|
||||
row,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to "+
|
||||
"extract channel "+
|
||||
"policies: %w", err)
|
||||
}
|
||||
|
||||
p1, p2, err := getAndBuildChanPolicies(
|
||||
ctx, db, dbPol1, dbPol2, e.ChannelID,
|
||||
node1, node2,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to "+
|
||||
"build channel policies: %w",
|
||||
err)
|
||||
}
|
||||
|
||||
// Determine the outgoing and incoming policy
|
||||
// for this channel and node combo.
|
||||
outPolicy, inPolicy := p1, p2
|
||||
if p1 != nil && p1.ToNode == nodePub {
|
||||
outPolicy, inPolicy = p2, p1
|
||||
} else if p2 != nil && p2.ToNode != nodePub {
|
||||
outPolicy, inPolicy = p2, p1
|
||||
}
|
||||
|
||||
var cachedInPolicy *models.CachedEdgePolicy
|
||||
if inPolicy != nil {
|
||||
cachedInPolicy = models.NewCachedPolicy(
|
||||
p2,
|
||||
)
|
||||
cachedInPolicy.ToNodePubKey =
|
||||
toNodeCallback
|
||||
cachedInPolicy.ToNodeFeatures =
|
||||
features
|
||||
}
|
||||
|
||||
var inboundFee lnwire.Fee
|
||||
outPolicy.InboundFee.WhenSome(
|
||||
func(fee lnwire.Fee) {
|
||||
inboundFee = fee
|
||||
},
|
||||
)
|
||||
|
||||
directedChannel := &DirectedChannel{
|
||||
ChannelID: e.ChannelID,
|
||||
IsNode1: nodePub ==
|
||||
e.NodeKey1Bytes,
|
||||
OtherNode: e.NodeKey2Bytes,
|
||||
Capacity: e.Capacity,
|
||||
OutPolicySet: p1 != nil,
|
||||
InPolicy: cachedInPolicy,
|
||||
InboundFee: inboundFee,
|
||||
}
|
||||
|
||||
if nodePub == e.NodeKey2Bytes {
|
||||
directedChannel.OtherNode =
|
||||
e.NodeKey1Bytes
|
||||
}
|
||||
|
||||
channels[e.ChannelID] = directedChannel
|
||||
}
|
||||
|
||||
return cb(nodePub, channels)
|
||||
})
|
||||
}, sqldb.NoOpReset)
|
||||
}
|
||||
|
||||
// ForEachChannel iterates through all the channel edges stored within the
|
||||
// graph and invokes the passed callback for each edge. The callback takes two
|
||||
// edges as since this is a directed graph, both the in/out edges are visited.
|
||||
// If the callback returns an error, then the transaction is aborted and the
|
||||
// iteration stops early.
|
||||
//
|
||||
// NOTE: If an edge can't be found, or wasn't advertised, then a nil pointer
|
||||
// for that particular channel edge routing policy will be passed into the
|
||||
// callback.
|
||||
//
|
||||
// NOTE: part of the V1Store interface.
|
||||
func (s *SQLStore) ForEachChannel(cb func(*models.ChannelEdgeInfo,
|
||||
*models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error {
|
||||
|
||||
ctx := context.TODO()
|
||||
|
||||
handleChannel := func(db SQLQueries,
|
||||
row sqlc.ListChannelsWithPoliciesPaginatedRow) error {
|
||||
|
||||
node1, node2, err := buildNodeVertices(
|
||||
row.Node1Pubkey, row.Node2Pubkey,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to build node vertices: %w",
|
||||
err)
|
||||
}
|
||||
|
||||
edge, err := getAndBuildEdgeInfo(
|
||||
ctx, db, s.cfg.ChainHash, row.Channel.ID, row.Channel,
|
||||
node1, node2,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to build channel info: %w",
|
||||
err)
|
||||
}
|
||||
|
||||
dbPol1, dbPol2, err := extractChannelPolicies(row)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to extract channel "+
|
||||
"policies: %w", err)
|
||||
}
|
||||
|
||||
p1, p2, err := getAndBuildChanPolicies(
|
||||
ctx, db, dbPol1, dbPol2, edge.ChannelID, node1, node2,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to build channel "+
|
||||
"policies: %w", err)
|
||||
}
|
||||
|
||||
err = cb(edge, p1, p2)
|
||||
if err != nil {
|
||||
return fmt.Errorf("callback failed for channel "+
|
||||
"id=%d: %w", edge.ChannelID, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
|
||||
var lastID int64
|
||||
for {
|
||||
//nolint:ll
|
||||
rows, err := db.ListChannelsWithPoliciesPaginated(
|
||||
ctx, sqlc.ListChannelsWithPoliciesPaginatedParams{
|
||||
Version: int16(ProtocolV1),
|
||||
ID: lastID,
|
||||
Limit: pageSize,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(rows) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
for _, row := range rows {
|
||||
err := handleChannel(db, row)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
lastID = row.Channel.ID
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}, sqldb.NoOpReset)
|
||||
}
|
||||
|
||||
// FilterChannelRange returns the channel ID's of all known channels which were
|
||||
// mined in a block height within the passed range. The channel IDs are grouped
|
||||
// by their common block height. This method can be used to quickly share with a
|
||||
// peer the set of channels we know of within a particular range to catch them
|
||||
// up after a period of time offline. If withTimestamps is true then the
|
||||
// timestamp info of the latest received channel update messages of the channel
|
||||
// will be included in the response.
|
||||
//
|
||||
// NOTE: This is part of the V1Store interface.
|
||||
func (s *SQLStore) FilterChannelRange(startHeight, endHeight uint32,
|
||||
withTimestamps bool) ([]BlockChannelRange, error) {
|
||||
|
||||
var (
|
||||
ctx = context.TODO()
|
||||
startSCID = &lnwire.ShortChannelID{
|
||||
BlockHeight: startHeight,
|
||||
}
|
||||
endSCID = lnwire.ShortChannelID{
|
||||
BlockHeight: endHeight,
|
||||
TxIndex: math.MaxUint32 & 0x00ffffff,
|
||||
TxPosition: math.MaxUint16,
|
||||
}
|
||||
)
|
||||
|
||||
var chanIDStart [8]byte
|
||||
byteOrder.PutUint64(chanIDStart[:], startSCID.ToUint64())
|
||||
var chanIDEnd [8]byte
|
||||
byteOrder.PutUint64(chanIDEnd[:], endSCID.ToUint64())
|
||||
|
||||
// 1) get all channels where channelID is between start and end chan ID.
|
||||
// 2) skip if not public (ie, no channel_proof)
|
||||
// 3) collect that channel.
|
||||
// 4) if timestamps are wanted, fetch both policies for node 1 and node2
|
||||
// and add those timestamps to the collected channel.
|
||||
channelsPerBlock := make(map[uint32][]ChannelUpdateInfo)
|
||||
err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
|
||||
dbChans, err := db.GetPublicV1ChannelsBySCID(
|
||||
ctx, sqlc.GetPublicV1ChannelsBySCIDParams{
|
||||
StartScid: chanIDStart[:],
|
||||
EndScid: chanIDEnd[:],
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to fetch channel range: %w",
|
||||
err)
|
||||
}
|
||||
|
||||
for _, dbChan := range dbChans {
|
||||
cid := lnwire.NewShortChanIDFromInt(
|
||||
byteOrder.Uint64(dbChan.Scid),
|
||||
)
|
||||
chanInfo := NewChannelUpdateInfo(
|
||||
cid, time.Time{}, time.Time{},
|
||||
)
|
||||
|
||||
if !withTimestamps {
|
||||
channelsPerBlock[cid.BlockHeight] = append(
|
||||
channelsPerBlock[cid.BlockHeight],
|
||||
chanInfo,
|
||||
)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
//nolint:ll
|
||||
node1Policy, err := db.GetChannelPolicyByChannelAndNode(
|
||||
ctx, sqlc.GetChannelPolicyByChannelAndNodeParams{
|
||||
Version: int16(ProtocolV1),
|
||||
ChannelID: dbChan.ID,
|
||||
NodeID: dbChan.NodeID1,
|
||||
},
|
||||
)
|
||||
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return fmt.Errorf("unable to fetch node1 "+
|
||||
"policy: %w", err)
|
||||
} else if err == nil {
|
||||
chanInfo.Node1UpdateTimestamp = time.Unix(
|
||||
node1Policy.LastUpdate.Int64, 0,
|
||||
)
|
||||
}
|
||||
|
||||
//nolint:ll
|
||||
node2Policy, err := db.GetChannelPolicyByChannelAndNode(
|
||||
ctx, sqlc.GetChannelPolicyByChannelAndNodeParams{
|
||||
Version: int16(ProtocolV1),
|
||||
ChannelID: dbChan.ID,
|
||||
NodeID: dbChan.NodeID2,
|
||||
},
|
||||
)
|
||||
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return fmt.Errorf("unable to fetch node2 "+
|
||||
"policy: %w", err)
|
||||
} else if err == nil {
|
||||
chanInfo.Node2UpdateTimestamp = time.Unix(
|
||||
node2Policy.LastUpdate.Int64, 0,
|
||||
)
|
||||
}
|
||||
|
||||
channelsPerBlock[cid.BlockHeight] = append(
|
||||
channelsPerBlock[cid.BlockHeight], chanInfo,
|
||||
)
|
||||
}
|
||||
|
||||
return nil
|
||||
}, func() {
|
||||
channelsPerBlock = make(map[uint32][]ChannelUpdateInfo)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to fetch channel range: %w", err)
|
||||
}
|
||||
|
||||
if len(channelsPerBlock) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Return the channel ranges in ascending block height order.
|
||||
blocks := slices.Collect(maps.Keys(channelsPerBlock))
|
||||
slices.Sort(blocks)
|
||||
|
||||
return fn.Map(blocks, func(block uint32) BlockChannelRange {
|
||||
return BlockChannelRange{
|
||||
Height: block,
|
||||
Channels: channelsPerBlock[block],
|
||||
}
|
||||
}), nil
|
||||
}
|
||||
|
||||
// forEachNodeDirectedChannel iterates through all channels of a given
|
||||
// node, executing the passed callback on the directed edge representing the
|
||||
// channel and its incoming policy. If the node is not found, no error is
|
||||
@@ -977,12 +1446,7 @@ func forEachNodeDirectedChannel(ctx context.Context, db SQLQueries,
|
||||
err)
|
||||
}
|
||||
|
||||
edge, err := buildCacheableChannelInfo(
|
||||
row.Channel, node1, node2,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
edge := buildCacheableChannelInfo(row.Channel, node1, node2)
|
||||
|
||||
dbPol1, dbPol2, err := extractChannelPolicies(row)
|
||||
if err != nil {
|
||||
@@ -1286,14 +1750,14 @@ func getNodeByPubKey(ctx context.Context, db SQLQueries,
|
||||
// provided database channel row and the public keys of the two nodes
|
||||
// involved in the channel.
|
||||
func buildCacheableChannelInfo(dbChan sqlc.Channel, node1Pub,
|
||||
node2Pub route.Vertex) (*models.CachedEdgeInfo, error) {
|
||||
node2Pub route.Vertex) *models.CachedEdgeInfo {
|
||||
|
||||
return &models.CachedEdgeInfo{
|
||||
ChannelID: byteOrder.Uint64(dbChan.Scid),
|
||||
NodeKey1Bytes: node1Pub,
|
||||
NodeKey2Bytes: node2Pub,
|
||||
Capacity: btcutil.Amount(dbChan.Capacity.Int64),
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
// buildNode constructs a LightningNode instance from the given database node
|
||||
@@ -2302,17 +2766,76 @@ func buildChanPolicy(dbPolicy sqlc.ChannelPolicy, channelID uint64,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// buildNodes builds the models.LightningNode instances for the
|
||||
// given row which is expected to be a sqlc type that contains node information.
|
||||
func buildNodes(ctx context.Context, db SQLQueries, dbNode1,
|
||||
dbNode2 sqlc.Node) (*models.LightningNode, *models.LightningNode,
|
||||
error) {
|
||||
|
||||
node1, err := buildNode(ctx, db, &dbNode1)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
node2, err := buildNode(ctx, db, &dbNode2)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return node1, node2, nil
|
||||
}
|
||||
|
||||
// extractChannelPolicies extracts the sqlc.ChannelPolicy records from the give
|
||||
// row which is expected to be a sqlc type that contains channel policy
|
||||
// information. It returns two policies, which may be nil if the policy
|
||||
// information is not present in the row.
|
||||
//
|
||||
//nolint:ll
|
||||
//nolint:ll,dupl
|
||||
func extractChannelPolicies(row any) (*sqlc.ChannelPolicy, *sqlc.ChannelPolicy,
|
||||
error) {
|
||||
|
||||
var policy1, policy2 *sqlc.ChannelPolicy
|
||||
switch r := row.(type) {
|
||||
case sqlc.GetChannelsByPolicyLastUpdateRangeRow:
|
||||
if r.Policy1ID.Valid {
|
||||
policy1 = &sqlc.ChannelPolicy{
|
||||
ID: r.Policy1ID.Int64,
|
||||
Version: r.Policy1Version.Int16,
|
||||
ChannelID: r.Channel.ID,
|
||||
NodeID: r.Policy1NodeID.Int64,
|
||||
Timelock: r.Policy1Timelock.Int32,
|
||||
FeePpm: r.Policy1FeePpm.Int64,
|
||||
BaseFeeMsat: r.Policy1BaseFeeMsat.Int64,
|
||||
MinHtlcMsat: r.Policy1MinHtlcMsat.Int64,
|
||||
MaxHtlcMsat: r.Policy1MaxHtlcMsat,
|
||||
LastUpdate: r.Policy1LastUpdate,
|
||||
InboundBaseFeeMsat: r.Policy1InboundBaseFeeMsat,
|
||||
InboundFeeRateMilliMsat: r.Policy1InboundFeeRateMilliMsat,
|
||||
Disabled: r.Policy1Disabled,
|
||||
Signature: r.Policy1Signature,
|
||||
}
|
||||
}
|
||||
if r.Policy2ID.Valid {
|
||||
policy2 = &sqlc.ChannelPolicy{
|
||||
ID: r.Policy2ID.Int64,
|
||||
Version: r.Policy2Version.Int16,
|
||||
ChannelID: r.Channel.ID,
|
||||
NodeID: r.Policy2NodeID.Int64,
|
||||
Timelock: r.Policy2Timelock.Int32,
|
||||
FeePpm: r.Policy2FeePpm.Int64,
|
||||
BaseFeeMsat: r.Policy2BaseFeeMsat.Int64,
|
||||
MinHtlcMsat: r.Policy2MinHtlcMsat.Int64,
|
||||
MaxHtlcMsat: r.Policy2MaxHtlcMsat,
|
||||
LastUpdate: r.Policy2LastUpdate,
|
||||
InboundBaseFeeMsat: r.Policy2InboundBaseFeeMsat,
|
||||
InboundFeeRateMilliMsat: r.Policy2InboundFeeRateMilliMsat,
|
||||
Disabled: r.Policy2Disabled,
|
||||
Signature: r.Policy2Signature,
|
||||
}
|
||||
}
|
||||
|
||||
return policy1, policy2, nil
|
||||
|
||||
case sqlc.ListChannelsByNodeIDRow:
|
||||
if r.Policy1ID.Valid {
|
||||
policy1 = &sqlc.ChannelPolicy{
|
||||
@@ -2351,6 +2874,46 @@ func extractChannelPolicies(row any) (*sqlc.ChannelPolicy, *sqlc.ChannelPolicy,
|
||||
}
|
||||
}
|
||||
|
||||
return policy1, policy2, nil
|
||||
|
||||
case sqlc.ListChannelsWithPoliciesPaginatedRow:
|
||||
if r.Policy1ID.Valid {
|
||||
policy1 = &sqlc.ChannelPolicy{
|
||||
ID: r.Policy1ID.Int64,
|
||||
Version: r.Policy1Version.Int16,
|
||||
ChannelID: r.Channel.ID,
|
||||
NodeID: r.Policy1NodeID.Int64,
|
||||
Timelock: r.Policy1Timelock.Int32,
|
||||
FeePpm: r.Policy1FeePpm.Int64,
|
||||
BaseFeeMsat: r.Policy1BaseFeeMsat.Int64,
|
||||
MinHtlcMsat: r.Policy1MinHtlcMsat.Int64,
|
||||
MaxHtlcMsat: r.Policy1MaxHtlcMsat,
|
||||
LastUpdate: r.Policy1LastUpdate,
|
||||
InboundBaseFeeMsat: r.Policy1InboundBaseFeeMsat,
|
||||
InboundFeeRateMilliMsat: r.Policy1InboundFeeRateMilliMsat,
|
||||
Disabled: r.Policy1Disabled,
|
||||
Signature: r.Policy1Signature,
|
||||
}
|
||||
}
|
||||
if r.Policy2ID.Valid {
|
||||
policy2 = &sqlc.ChannelPolicy{
|
||||
ID: r.Policy2ID.Int64,
|
||||
Version: r.Policy2Version.Int16,
|
||||
ChannelID: r.Channel.ID,
|
||||
NodeID: r.Policy2NodeID.Int64,
|
||||
Timelock: r.Policy2Timelock.Int32,
|
||||
FeePpm: r.Policy2FeePpm.Int64,
|
||||
BaseFeeMsat: r.Policy2BaseFeeMsat.Int64,
|
||||
MinHtlcMsat: r.Policy2MinHtlcMsat.Int64,
|
||||
MaxHtlcMsat: r.Policy2MaxHtlcMsat,
|
||||
LastUpdate: r.Policy2LastUpdate,
|
||||
InboundBaseFeeMsat: r.Policy2InboundBaseFeeMsat,
|
||||
InboundFeeRateMilliMsat: r.Policy2InboundFeeRateMilliMsat,
|
||||
Disabled: r.Policy2Disabled,
|
||||
Signature: r.Policy2Signature,
|
||||
}
|
||||
}
|
||||
|
||||
return policy1, policy2, nil
|
||||
default:
|
||||
return nil, nil, fmt.Errorf("unexpected row type in "+
|
||||
|
@@ -316,6 +316,42 @@ func (q *Queries) GetChannelFeaturesAndExtras(ctx context.Context, channelID int
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const getChannelPolicyByChannelAndNode = `-- name: GetChannelPolicyByChannelAndNode :one
|
||||
SELECT id, version, channel_id, node_id, timelock, fee_ppm, base_fee_msat, min_htlc_msat, max_htlc_msat, last_update, disabled, inbound_base_fee_msat, inbound_fee_rate_milli_msat, signature
|
||||
FROM channel_policies
|
||||
WHERE channel_id = $1
|
||||
AND node_id = $2
|
||||
AND version = $3
|
||||
`
|
||||
|
||||
type GetChannelPolicyByChannelAndNodeParams struct {
|
||||
ChannelID int64
|
||||
NodeID int64
|
||||
Version int16
|
||||
}
|
||||
|
||||
func (q *Queries) GetChannelPolicyByChannelAndNode(ctx context.Context, arg GetChannelPolicyByChannelAndNodeParams) (ChannelPolicy, error) {
|
||||
row := q.db.QueryRowContext(ctx, getChannelPolicyByChannelAndNode, arg.ChannelID, arg.NodeID, arg.Version)
|
||||
var i ChannelPolicy
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.Version,
|
||||
&i.ChannelID,
|
||||
&i.NodeID,
|
||||
&i.Timelock,
|
||||
&i.FeePpm,
|
||||
&i.BaseFeeMsat,
|
||||
&i.MinHtlcMsat,
|
||||
&i.MaxHtlcMsat,
|
||||
&i.LastUpdate,
|
||||
&i.Disabled,
|
||||
&i.InboundBaseFeeMsat,
|
||||
&i.InboundFeeRateMilliMsat,
|
||||
&i.Signature,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const getChannelPolicyExtraTypes = `-- name: GetChannelPolicyExtraTypes :many
|
||||
SELECT
|
||||
cp.id AS policy_id,
|
||||
@@ -371,6 +407,178 @@ func (q *Queries) GetChannelPolicyExtraTypes(ctx context.Context, arg GetChannel
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const getChannelsByPolicyLastUpdateRange = `-- name: GetChannelsByPolicyLastUpdateRange :many
|
||||
SELECT
|
||||
c.id, c.version, c.scid, c.node_id_1, c.node_id_2, c.outpoint, c.capacity, c.bitcoin_key_1, c.bitcoin_key_2, c.node_1_signature, c.node_2_signature, c.bitcoin_1_signature, c.bitcoin_2_signature,
|
||||
n1.id, n1.version, n1.pub_key, n1.alias, n1.last_update, n1.color, n1.signature,
|
||||
n2.id, n2.version, n2.pub_key, n2.alias, n2.last_update, n2.color, n2.signature,
|
||||
|
||||
-- Policy 1 (node_id_1)
|
||||
cp1.id AS policy1_id,
|
||||
cp1.node_id AS policy1_node_id,
|
||||
cp1.version AS policy1_version,
|
||||
cp1.timelock AS policy1_timelock,
|
||||
cp1.fee_ppm AS policy1_fee_ppm,
|
||||
cp1.base_fee_msat AS policy1_base_fee_msat,
|
||||
cp1.min_htlc_msat AS policy1_min_htlc_msat,
|
||||
cp1.max_htlc_msat AS policy1_max_htlc_msat,
|
||||
cp1.last_update AS policy1_last_update,
|
||||
cp1.disabled AS policy1_disabled,
|
||||
cp1.inbound_base_fee_msat AS policy1_inbound_base_fee_msat,
|
||||
cp1.inbound_fee_rate_milli_msat AS policy1_inbound_fee_rate_milli_msat,
|
||||
cp1.signature AS policy1_signature,
|
||||
|
||||
-- Policy 2 (node_id_2)
|
||||
cp2.id AS policy2_id,
|
||||
cp2.node_id AS policy2_node_id,
|
||||
cp2.version AS policy2_version,
|
||||
cp2.timelock AS policy2_timelock,
|
||||
cp2.fee_ppm AS policy2_fee_ppm,
|
||||
cp2.base_fee_msat AS policy2_base_fee_msat,
|
||||
cp2.min_htlc_msat AS policy2_min_htlc_msat,
|
||||
cp2.max_htlc_msat AS policy2_max_htlc_msat,
|
||||
cp2.last_update AS policy2_last_update,
|
||||
cp2.disabled AS policy2_disabled,
|
||||
cp2.inbound_base_fee_msat AS policy2_inbound_base_fee_msat,
|
||||
cp2.inbound_fee_rate_milli_msat AS policy2_inbound_fee_rate_milli_msat,
|
||||
cp2.signature AS policy2_signature
|
||||
|
||||
FROM channels c
|
||||
JOIN nodes n1 ON c.node_id_1 = n1.id
|
||||
JOIN nodes n2 ON c.node_id_2 = n2.id
|
||||
LEFT JOIN channel_policies cp1
|
||||
ON cp1.channel_id = c.id AND cp1.node_id = c.node_id_1 AND cp1.version = c.version
|
||||
LEFT JOIN channel_policies cp2
|
||||
ON cp2.channel_id = c.id AND cp2.node_id = c.node_id_2 AND cp2.version = c.version
|
||||
WHERE c.version = $1
|
||||
AND (
|
||||
(cp1.last_update >= $2 AND cp1.last_update < $3)
|
||||
OR
|
||||
(cp2.last_update >= $2 AND cp2.last_update < $3)
|
||||
)
|
||||
ORDER BY
|
||||
CASE
|
||||
WHEN COALESCE(cp1.last_update, 0) >= COALESCE(cp2.last_update, 0)
|
||||
THEN COALESCE(cp1.last_update, 0)
|
||||
ELSE COALESCE(cp2.last_update, 0)
|
||||
END ASC
|
||||
`
|
||||
|
||||
type GetChannelsByPolicyLastUpdateRangeParams struct {
|
||||
Version int16
|
||||
StartTime sql.NullInt64
|
||||
EndTime sql.NullInt64
|
||||
}
|
||||
|
||||
type GetChannelsByPolicyLastUpdateRangeRow struct {
|
||||
Channel Channel
|
||||
Node Node
|
||||
Node_2 Node
|
||||
Policy1ID sql.NullInt64
|
||||
Policy1NodeID sql.NullInt64
|
||||
Policy1Version sql.NullInt16
|
||||
Policy1Timelock sql.NullInt32
|
||||
Policy1FeePpm sql.NullInt64
|
||||
Policy1BaseFeeMsat sql.NullInt64
|
||||
Policy1MinHtlcMsat sql.NullInt64
|
||||
Policy1MaxHtlcMsat sql.NullInt64
|
||||
Policy1LastUpdate sql.NullInt64
|
||||
Policy1Disabled sql.NullBool
|
||||
Policy1InboundBaseFeeMsat sql.NullInt64
|
||||
Policy1InboundFeeRateMilliMsat sql.NullInt64
|
||||
Policy1Signature []byte
|
||||
Policy2ID sql.NullInt64
|
||||
Policy2NodeID sql.NullInt64
|
||||
Policy2Version sql.NullInt16
|
||||
Policy2Timelock sql.NullInt32
|
||||
Policy2FeePpm sql.NullInt64
|
||||
Policy2BaseFeeMsat sql.NullInt64
|
||||
Policy2MinHtlcMsat sql.NullInt64
|
||||
Policy2MaxHtlcMsat sql.NullInt64
|
||||
Policy2LastUpdate sql.NullInt64
|
||||
Policy2Disabled sql.NullBool
|
||||
Policy2InboundBaseFeeMsat sql.NullInt64
|
||||
Policy2InboundFeeRateMilliMsat sql.NullInt64
|
||||
Policy2Signature []byte
|
||||
}
|
||||
|
||||
func (q *Queries) GetChannelsByPolicyLastUpdateRange(ctx context.Context, arg GetChannelsByPolicyLastUpdateRangeParams) ([]GetChannelsByPolicyLastUpdateRangeRow, error) {
|
||||
rows, err := q.db.QueryContext(ctx, getChannelsByPolicyLastUpdateRange, arg.Version, arg.StartTime, arg.EndTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []GetChannelsByPolicyLastUpdateRangeRow
|
||||
for rows.Next() {
|
||||
var i GetChannelsByPolicyLastUpdateRangeRow
|
||||
if err := rows.Scan(
|
||||
&i.Channel.ID,
|
||||
&i.Channel.Version,
|
||||
&i.Channel.Scid,
|
||||
&i.Channel.NodeID1,
|
||||
&i.Channel.NodeID2,
|
||||
&i.Channel.Outpoint,
|
||||
&i.Channel.Capacity,
|
||||
&i.Channel.BitcoinKey1,
|
||||
&i.Channel.BitcoinKey2,
|
||||
&i.Channel.Node1Signature,
|
||||
&i.Channel.Node2Signature,
|
||||
&i.Channel.Bitcoin1Signature,
|
||||
&i.Channel.Bitcoin2Signature,
|
||||
&i.Node.ID,
|
||||
&i.Node.Version,
|
||||
&i.Node.PubKey,
|
||||
&i.Node.Alias,
|
||||
&i.Node.LastUpdate,
|
||||
&i.Node.Color,
|
||||
&i.Node.Signature,
|
||||
&i.Node_2.ID,
|
||||
&i.Node_2.Version,
|
||||
&i.Node_2.PubKey,
|
||||
&i.Node_2.Alias,
|
||||
&i.Node_2.LastUpdate,
|
||||
&i.Node_2.Color,
|
||||
&i.Node_2.Signature,
|
||||
&i.Policy1ID,
|
||||
&i.Policy1NodeID,
|
||||
&i.Policy1Version,
|
||||
&i.Policy1Timelock,
|
||||
&i.Policy1FeePpm,
|
||||
&i.Policy1BaseFeeMsat,
|
||||
&i.Policy1MinHtlcMsat,
|
||||
&i.Policy1MaxHtlcMsat,
|
||||
&i.Policy1LastUpdate,
|
||||
&i.Policy1Disabled,
|
||||
&i.Policy1InboundBaseFeeMsat,
|
||||
&i.Policy1InboundFeeRateMilliMsat,
|
||||
&i.Policy1Signature,
|
||||
&i.Policy2ID,
|
||||
&i.Policy2NodeID,
|
||||
&i.Policy2Version,
|
||||
&i.Policy2Timelock,
|
||||
&i.Policy2FeePpm,
|
||||
&i.Policy2BaseFeeMsat,
|
||||
&i.Policy2MinHtlcMsat,
|
||||
&i.Policy2MaxHtlcMsat,
|
||||
&i.Policy2LastUpdate,
|
||||
&i.Policy2Disabled,
|
||||
&i.Policy2InboundBaseFeeMsat,
|
||||
&i.Policy2InboundFeeRateMilliMsat,
|
||||
&i.Policy2Signature,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const getExtraNodeTypes = `-- name: GetExtraNodeTypes :many
|
||||
SELECT node_id, type, value
|
||||
FROM node_extra_types
|
||||
@@ -595,6 +803,56 @@ func (q *Queries) GetNodesByLastUpdateRange(ctx context.Context, arg GetNodesByL
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const getPublicV1ChannelsBySCID = `-- name: GetPublicV1ChannelsBySCID :many
|
||||
SELECT id, version, scid, node_id_1, node_id_2, outpoint, capacity, bitcoin_key_1, bitcoin_key_2, node_1_signature, node_2_signature, bitcoin_1_signature, bitcoin_2_signature
|
||||
FROM channels
|
||||
WHERE node_1_signature IS NOT NULL
|
||||
AND scid >= $1
|
||||
AND scid < $2
|
||||
`
|
||||
|
||||
type GetPublicV1ChannelsBySCIDParams struct {
|
||||
StartScid []byte
|
||||
EndScid []byte
|
||||
}
|
||||
|
||||
func (q *Queries) GetPublicV1ChannelsBySCID(ctx context.Context, arg GetPublicV1ChannelsBySCIDParams) ([]Channel, error) {
|
||||
rows, err := q.db.QueryContext(ctx, getPublicV1ChannelsBySCID, arg.StartScid, arg.EndScid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []Channel
|
||||
for rows.Next() {
|
||||
var i Channel
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.Version,
|
||||
&i.Scid,
|
||||
&i.NodeID1,
|
||||
&i.NodeID2,
|
||||
&i.Outpoint,
|
||||
&i.Capacity,
|
||||
&i.BitcoinKey1,
|
||||
&i.BitcoinKey2,
|
||||
&i.Node1Signature,
|
||||
&i.Node2Signature,
|
||||
&i.Bitcoin1Signature,
|
||||
&i.Bitcoin2Signature,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const getSourceNodesByVersion = `-- name: GetSourceNodesByVersion :many
|
||||
SELECT sn.node_id, n.pub_key
|
||||
FROM source_nodes sn
|
||||
@@ -898,6 +1156,159 @@ func (q *Queries) ListChannelsByNodeID(ctx context.Context, arg ListChannelsByNo
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const listChannelsWithPoliciesPaginated = `-- name: ListChannelsWithPoliciesPaginated :many
|
||||
SELECT
|
||||
c.id, c.version, c.scid, c.node_id_1, c.node_id_2, c.outpoint, c.capacity, c.bitcoin_key_1, c.bitcoin_key_2, c.node_1_signature, c.node_2_signature, c.bitcoin_1_signature, c.bitcoin_2_signature,
|
||||
|
||||
-- Join node pubkeys
|
||||
n1.pub_key AS node1_pubkey,
|
||||
n2.pub_key AS node2_pubkey,
|
||||
|
||||
-- Node 1 policy
|
||||
cp1.id AS policy_1_id,
|
||||
cp1.node_id AS policy_1_node_id,
|
||||
cp1.version AS policy_1_version,
|
||||
cp1.timelock AS policy_1_timelock,
|
||||
cp1.fee_ppm AS policy_1_fee_ppm,
|
||||
cp1.base_fee_msat AS policy_1_base_fee_msat,
|
||||
cp1.min_htlc_msat AS policy_1_min_htlc_msat,
|
||||
cp1.max_htlc_msat AS policy_1_max_htlc_msat,
|
||||
cp1.last_update AS policy_1_last_update,
|
||||
cp1.disabled AS policy_1_disabled,
|
||||
cp1.inbound_base_fee_msat AS policy1_inbound_base_fee_msat,
|
||||
cp1.inbound_fee_rate_milli_msat AS policy1_inbound_fee_rate_milli_msat,
|
||||
cp1.signature AS policy_1_signature,
|
||||
|
||||
-- Node 2 policy
|
||||
cp2.id AS policy_2_id,
|
||||
cp2.node_id AS policy_2_node_id,
|
||||
cp2.version AS policy_2_version,
|
||||
cp2.timelock AS policy_2_timelock,
|
||||
cp2.fee_ppm AS policy_2_fee_ppm,
|
||||
cp2.base_fee_msat AS policy_2_base_fee_msat,
|
||||
cp2.min_htlc_msat AS policy_2_min_htlc_msat,
|
||||
cp2.max_htlc_msat AS policy_2_max_htlc_msat,
|
||||
cp2.last_update AS policy_2_last_update,
|
||||
cp2.disabled AS policy_2_disabled,
|
||||
cp2.inbound_base_fee_msat AS policy2_inbound_base_fee_msat,
|
||||
cp2.inbound_fee_rate_milli_msat AS policy2_inbound_fee_rate_milli_msat,
|
||||
cp2.signature AS policy_2_signature
|
||||
|
||||
FROM channels c
|
||||
JOIN nodes n1 ON c.node_id_1 = n1.id
|
||||
JOIN nodes n2 ON c.node_id_2 = n2.id
|
||||
LEFT JOIN channel_policies cp1
|
||||
ON cp1.channel_id = c.id AND cp1.node_id = c.node_id_1 AND cp1.version = c.version
|
||||
LEFT JOIN channel_policies cp2
|
||||
ON cp2.channel_id = c.id AND cp2.node_id = c.node_id_2 AND cp2.version = c.version
|
||||
WHERE c.version = $1 AND c.id > $2
|
||||
ORDER BY c.id
|
||||
LIMIT $3
|
||||
`
|
||||
|
||||
type ListChannelsWithPoliciesPaginatedParams struct {
|
||||
Version int16
|
||||
ID int64
|
||||
Limit int32
|
||||
}
|
||||
|
||||
type ListChannelsWithPoliciesPaginatedRow struct {
|
||||
Channel Channel
|
||||
Node1Pubkey []byte
|
||||
Node2Pubkey []byte
|
||||
Policy1ID sql.NullInt64
|
||||
Policy1NodeID sql.NullInt64
|
||||
Policy1Version sql.NullInt16
|
||||
Policy1Timelock sql.NullInt32
|
||||
Policy1FeePpm sql.NullInt64
|
||||
Policy1BaseFeeMsat sql.NullInt64
|
||||
Policy1MinHtlcMsat sql.NullInt64
|
||||
Policy1MaxHtlcMsat sql.NullInt64
|
||||
Policy1LastUpdate sql.NullInt64
|
||||
Policy1Disabled sql.NullBool
|
||||
Policy1InboundBaseFeeMsat sql.NullInt64
|
||||
Policy1InboundFeeRateMilliMsat sql.NullInt64
|
||||
Policy1Signature []byte
|
||||
Policy2ID sql.NullInt64
|
||||
Policy2NodeID sql.NullInt64
|
||||
Policy2Version sql.NullInt16
|
||||
Policy2Timelock sql.NullInt32
|
||||
Policy2FeePpm sql.NullInt64
|
||||
Policy2BaseFeeMsat sql.NullInt64
|
||||
Policy2MinHtlcMsat sql.NullInt64
|
||||
Policy2MaxHtlcMsat sql.NullInt64
|
||||
Policy2LastUpdate sql.NullInt64
|
||||
Policy2Disabled sql.NullBool
|
||||
Policy2InboundBaseFeeMsat sql.NullInt64
|
||||
Policy2InboundFeeRateMilliMsat sql.NullInt64
|
||||
Policy2Signature []byte
|
||||
}
|
||||
|
||||
func (q *Queries) ListChannelsWithPoliciesPaginated(ctx context.Context, arg ListChannelsWithPoliciesPaginatedParams) ([]ListChannelsWithPoliciesPaginatedRow, error) {
|
||||
rows, err := q.db.QueryContext(ctx, listChannelsWithPoliciesPaginated, arg.Version, arg.ID, arg.Limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []ListChannelsWithPoliciesPaginatedRow
|
||||
for rows.Next() {
|
||||
var i ListChannelsWithPoliciesPaginatedRow
|
||||
if err := rows.Scan(
|
||||
&i.Channel.ID,
|
||||
&i.Channel.Version,
|
||||
&i.Channel.Scid,
|
||||
&i.Channel.NodeID1,
|
||||
&i.Channel.NodeID2,
|
||||
&i.Channel.Outpoint,
|
||||
&i.Channel.Capacity,
|
||||
&i.Channel.BitcoinKey1,
|
||||
&i.Channel.BitcoinKey2,
|
||||
&i.Channel.Node1Signature,
|
||||
&i.Channel.Node2Signature,
|
||||
&i.Channel.Bitcoin1Signature,
|
||||
&i.Channel.Bitcoin2Signature,
|
||||
&i.Node1Pubkey,
|
||||
&i.Node2Pubkey,
|
||||
&i.Policy1ID,
|
||||
&i.Policy1NodeID,
|
||||
&i.Policy1Version,
|
||||
&i.Policy1Timelock,
|
||||
&i.Policy1FeePpm,
|
||||
&i.Policy1BaseFeeMsat,
|
||||
&i.Policy1MinHtlcMsat,
|
||||
&i.Policy1MaxHtlcMsat,
|
||||
&i.Policy1LastUpdate,
|
||||
&i.Policy1Disabled,
|
||||
&i.Policy1InboundBaseFeeMsat,
|
||||
&i.Policy1InboundFeeRateMilliMsat,
|
||||
&i.Policy1Signature,
|
||||
&i.Policy2ID,
|
||||
&i.Policy2NodeID,
|
||||
&i.Policy2Version,
|
||||
&i.Policy2Timelock,
|
||||
&i.Policy2FeePpm,
|
||||
&i.Policy2BaseFeeMsat,
|
||||
&i.Policy2MinHtlcMsat,
|
||||
&i.Policy2MaxHtlcMsat,
|
||||
&i.Policy2LastUpdate,
|
||||
&i.Policy2Disabled,
|
||||
&i.Policy2InboundBaseFeeMsat,
|
||||
&i.Policy2InboundFeeRateMilliMsat,
|
||||
&i.Policy2Signature,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const listNodeIDsAndPubKeys = `-- name: ListNodeIDsAndPubKeys :many
|
||||
SELECT id, pub_key
|
||||
FROM nodes
|
||||
|
@@ -3,6 +3,7 @@ DROP INDEX IF EXISTS nodes_unique;
|
||||
DROP INDEX IF EXISTS node_extra_types_unique;
|
||||
DROP INDEX IF EXISTS node_features_unique;
|
||||
DROP INDEX IF EXISTS node_addresses_unique;
|
||||
DROP INDEX IF EXISTS node_last_update_idx;
|
||||
DROP INDEX IF EXISTS source_nodes_unique;
|
||||
DROP INDEX IF EXISTS channels_node_id_1_idx;
|
||||
DROP INDEX IF EXISTS channels_node_id_2_idx;
|
||||
@@ -12,6 +13,7 @@ DROP INDEX IF EXISTS channel_features_unique;
|
||||
DROP INDEX IF EXISTS channel_extra_types_unique;
|
||||
DROP INDEX IF EXISTS channel_policies_unique;
|
||||
DROP INDEX IF EXISTS channel_policy_extra_types_unique;
|
||||
DROP INDEX IF EXISTS channel_policy_last_update_idx;
|
||||
|
||||
-- Drop tables in order of reverse dependencies.
|
||||
DROP TABLE IF EXISTS channel_policy_extra_types;
|
||||
|
@@ -37,6 +37,7 @@ CREATE TABLE IF NOT EXISTS nodes (
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS nodes_unique ON nodes (
|
||||
pub_key, version
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS node_last_update_idx ON nodes(last_update);
|
||||
|
||||
-- node_extra_types stores any extra TLV fields covered by a node announcement that
|
||||
-- we do not have an explicit column for in the nodes table.
|
||||
@@ -273,6 +274,7 @@ CREATE TABLE IF NOT EXISTS channel_policies (
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS channel_policies_unique ON channel_policies (
|
||||
channel_id, node_id, version
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS channel_policy_last_update_idx ON channel_policies(last_update);
|
||||
|
||||
-- channel_policy_extra_types stores any extra TLV fields covered by a channel
|
||||
-- update that we do not have an explicit column for in the channel_policies
|
||||
|
@@ -30,7 +30,9 @@ type Querier interface {
|
||||
GetChannelAndNodesBySCID(ctx context.Context, arg GetChannelAndNodesBySCIDParams) (GetChannelAndNodesBySCIDRow, error)
|
||||
GetChannelBySCID(ctx context.Context, arg GetChannelBySCIDParams) (Channel, error)
|
||||
GetChannelFeaturesAndExtras(ctx context.Context, channelID int64) ([]GetChannelFeaturesAndExtrasRow, error)
|
||||
GetChannelPolicyByChannelAndNode(ctx context.Context, arg GetChannelPolicyByChannelAndNodeParams) (ChannelPolicy, error)
|
||||
GetChannelPolicyExtraTypes(ctx context.Context, arg GetChannelPolicyExtraTypesParams) ([]GetChannelPolicyExtraTypesRow, error)
|
||||
GetChannelsByPolicyLastUpdateRange(ctx context.Context, arg GetChannelsByPolicyLastUpdateRangeParams) ([]GetChannelsByPolicyLastUpdateRangeRow, error)
|
||||
GetDatabaseVersion(ctx context.Context) (int32, error)
|
||||
GetExtraNodeTypes(ctx context.Context, nodeID int64) ([]NodeExtraType, error)
|
||||
// This method may return more than one invoice if filter using multiple fields
|
||||
@@ -50,6 +52,7 @@ type Querier interface {
|
||||
GetNodeFeaturesByPubKey(ctx context.Context, arg GetNodeFeaturesByPubKeyParams) ([]int32, error)
|
||||
GetNodeIDByPubKey(ctx context.Context, arg GetNodeIDByPubKeyParams) (int64, error)
|
||||
GetNodesByLastUpdateRange(ctx context.Context, arg GetNodesByLastUpdateRangeParams) ([]Node, error)
|
||||
GetPublicV1ChannelsBySCID(ctx context.Context, arg GetPublicV1ChannelsBySCIDParams) ([]Channel, error)
|
||||
GetSourceNodesByVersion(ctx context.Context, version int16) ([]GetSourceNodesByVersionRow, error)
|
||||
HighestSCID(ctx context.Context, version int16) ([]byte, error)
|
||||
InsertAMPSubInvoice(ctx context.Context, arg InsertAMPSubInvoiceParams) error
|
||||
@@ -65,6 +68,7 @@ type Querier interface {
|
||||
InsertNodeAddress(ctx context.Context, arg InsertNodeAddressParams) error
|
||||
InsertNodeFeature(ctx context.Context, arg InsertNodeFeatureParams) error
|
||||
ListChannelsByNodeID(ctx context.Context, arg ListChannelsByNodeIDParams) ([]ListChannelsByNodeIDRow, error)
|
||||
ListChannelsWithPoliciesPaginated(ctx context.Context, arg ListChannelsWithPoliciesPaginatedParams) ([]ListChannelsWithPoliciesPaginatedRow, error)
|
||||
ListNodeIDsAndPubKeys(ctx context.Context, arg ListNodeIDsAndPubKeysParams) ([]ListNodeIDsAndPubKeysRow, error)
|
||||
ListNodesPaginated(ctx context.Context, arg ListNodesPaginatedParams) ([]Node, error)
|
||||
NextInvoiceSettleIndex(ctx context.Context) (int64, error)
|
||||
|
@@ -206,6 +206,62 @@ SELECT
|
||||
FROM channel_extra_types cet
|
||||
WHERE cet.channel_id = $1;
|
||||
|
||||
-- name: GetChannelsByPolicyLastUpdateRange :many
|
||||
SELECT
|
||||
sqlc.embed(c),
|
||||
sqlc.embed(n1),
|
||||
sqlc.embed(n2),
|
||||
|
||||
-- Policy 1 (node_id_1)
|
||||
cp1.id AS policy1_id,
|
||||
cp1.node_id AS policy1_node_id,
|
||||
cp1.version AS policy1_version,
|
||||
cp1.timelock AS policy1_timelock,
|
||||
cp1.fee_ppm AS policy1_fee_ppm,
|
||||
cp1.base_fee_msat AS policy1_base_fee_msat,
|
||||
cp1.min_htlc_msat AS policy1_min_htlc_msat,
|
||||
cp1.max_htlc_msat AS policy1_max_htlc_msat,
|
||||
cp1.last_update AS policy1_last_update,
|
||||
cp1.disabled AS policy1_disabled,
|
||||
cp1.inbound_base_fee_msat AS policy1_inbound_base_fee_msat,
|
||||
cp1.inbound_fee_rate_milli_msat AS policy1_inbound_fee_rate_milli_msat,
|
||||
cp1.signature AS policy1_signature,
|
||||
|
||||
-- Policy 2 (node_id_2)
|
||||
cp2.id AS policy2_id,
|
||||
cp2.node_id AS policy2_node_id,
|
||||
cp2.version AS policy2_version,
|
||||
cp2.timelock AS policy2_timelock,
|
||||
cp2.fee_ppm AS policy2_fee_ppm,
|
||||
cp2.base_fee_msat AS policy2_base_fee_msat,
|
||||
cp2.min_htlc_msat AS policy2_min_htlc_msat,
|
||||
cp2.max_htlc_msat AS policy2_max_htlc_msat,
|
||||
cp2.last_update AS policy2_last_update,
|
||||
cp2.disabled AS policy2_disabled,
|
||||
cp2.inbound_base_fee_msat AS policy2_inbound_base_fee_msat,
|
||||
cp2.inbound_fee_rate_milli_msat AS policy2_inbound_fee_rate_milli_msat,
|
||||
cp2.signature AS policy2_signature
|
||||
|
||||
FROM channels c
|
||||
JOIN nodes n1 ON c.node_id_1 = n1.id
|
||||
JOIN nodes n2 ON c.node_id_2 = n2.id
|
||||
LEFT JOIN channel_policies cp1
|
||||
ON cp1.channel_id = c.id AND cp1.node_id = c.node_id_1 AND cp1.version = c.version
|
||||
LEFT JOIN channel_policies cp2
|
||||
ON cp2.channel_id = c.id AND cp2.node_id = c.node_id_2 AND cp2.version = c.version
|
||||
WHERE c.version = @version
|
||||
AND (
|
||||
(cp1.last_update >= @start_time AND cp1.last_update < @end_time)
|
||||
OR
|
||||
(cp2.last_update >= @start_time AND cp2.last_update < @end_time)
|
||||
)
|
||||
ORDER BY
|
||||
CASE
|
||||
WHEN COALESCE(cp1.last_update, 0) >= COALESCE(cp2.last_update, 0)
|
||||
THEN COALESCE(cp1.last_update, 0)
|
||||
ELSE COALESCE(cp2.last_update, 0)
|
||||
END ASC;
|
||||
|
||||
-- name: HighestSCID :one
|
||||
SELECT scid
|
||||
FROM channels
|
||||
@@ -261,6 +317,62 @@ FROM channels c
|
||||
WHERE c.version = $1
|
||||
AND (c.node_id_1 = $2 OR c.node_id_2 = $2);
|
||||
|
||||
-- name: GetPublicV1ChannelsBySCID :many
|
||||
SELECT *
|
||||
FROM channels
|
||||
WHERE node_1_signature IS NOT NULL
|
||||
AND scid >= @start_scid
|
||||
AND scid < @end_scid;
|
||||
|
||||
-- name: ListChannelsWithPoliciesPaginated :many
|
||||
SELECT
|
||||
sqlc.embed(c),
|
||||
|
||||
-- Join node pubkeys
|
||||
n1.pub_key AS node1_pubkey,
|
||||
n2.pub_key AS node2_pubkey,
|
||||
|
||||
-- Node 1 policy
|
||||
cp1.id AS policy_1_id,
|
||||
cp1.node_id AS policy_1_node_id,
|
||||
cp1.version AS policy_1_version,
|
||||
cp1.timelock AS policy_1_timelock,
|
||||
cp1.fee_ppm AS policy_1_fee_ppm,
|
||||
cp1.base_fee_msat AS policy_1_base_fee_msat,
|
||||
cp1.min_htlc_msat AS policy_1_min_htlc_msat,
|
||||
cp1.max_htlc_msat AS policy_1_max_htlc_msat,
|
||||
cp1.last_update AS policy_1_last_update,
|
||||
cp1.disabled AS policy_1_disabled,
|
||||
cp1.inbound_base_fee_msat AS policy1_inbound_base_fee_msat,
|
||||
cp1.inbound_fee_rate_milli_msat AS policy1_inbound_fee_rate_milli_msat,
|
||||
cp1.signature AS policy_1_signature,
|
||||
|
||||
-- Node 2 policy
|
||||
cp2.id AS policy_2_id,
|
||||
cp2.node_id AS policy_2_node_id,
|
||||
cp2.version AS policy_2_version,
|
||||
cp2.timelock AS policy_2_timelock,
|
||||
cp2.fee_ppm AS policy_2_fee_ppm,
|
||||
cp2.base_fee_msat AS policy_2_base_fee_msat,
|
||||
cp2.min_htlc_msat AS policy_2_min_htlc_msat,
|
||||
cp2.max_htlc_msat AS policy_2_max_htlc_msat,
|
||||
cp2.last_update AS policy_2_last_update,
|
||||
cp2.disabled AS policy_2_disabled,
|
||||
cp2.inbound_base_fee_msat AS policy2_inbound_base_fee_msat,
|
||||
cp2.inbound_fee_rate_milli_msat AS policy2_inbound_fee_rate_milli_msat,
|
||||
cp2.signature AS policy_2_signature
|
||||
|
||||
FROM channels c
|
||||
JOIN nodes n1 ON c.node_id_1 = n1.id
|
||||
JOIN nodes n2 ON c.node_id_2 = n2.id
|
||||
LEFT JOIN channel_policies cp1
|
||||
ON cp1.channel_id = c.id AND cp1.node_id = c.node_id_1 AND cp1.version = c.version
|
||||
LEFT JOIN channel_policies cp2
|
||||
ON cp2.channel_id = c.id AND cp2.node_id = c.node_id_2 AND cp2.version = c.version
|
||||
WHERE c.version = $1 AND c.id > $2
|
||||
ORDER BY c.id
|
||||
LIMIT $3;
|
||||
|
||||
/* ─────────────────────────────────────────────
|
||||
channel_features table queries
|
||||
─────────────────────────────────────────────
|
||||
@@ -315,6 +427,13 @@ ON CONFLICT (channel_id, node_id, version)
|
||||
WHERE EXCLUDED.last_update > channel_policies.last_update
|
||||
RETURNING id;
|
||||
|
||||
-- name: GetChannelPolicyByChannelAndNode :one
|
||||
SELECT *
|
||||
FROM channel_policies
|
||||
WHERE channel_id = $1
|
||||
AND node_id = $2
|
||||
AND version = $3;
|
||||
|
||||
/* ─────────────────────────────────────────────
|
||||
channel_policy_extra_types table queries
|
||||
─────────────────────────────────────────────
|
||||
|
Reference in New Issue
Block a user