|
|
|
@@ -911,6 +911,7 @@ func (s *SQLStore) ChanUpdatesInHorizon(startTime,
|
|
|
|
|
edges []ChannelEdge
|
|
|
|
|
hits int
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
|
|
|
|
|
rows, err := db.GetChannelsByPolicyLastUpdateRange(
|
|
|
|
|
ctx, sqlc.GetChannelsByPolicyLastUpdateRangeParams{
|
|
|
|
@@ -923,72 +924,61 @@ func (s *SQLStore) ChanUpdatesInHorizon(startTime,
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(rows) == 0 {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// We'll pre-allocate the slices and maps here with a best
|
|
|
|
|
// effort size in order to avoid unnecessary allocations later
|
|
|
|
|
// on.
|
|
|
|
|
uncachedRows := make(
|
|
|
|
|
[]sqlc.GetChannelsByPolicyLastUpdateRangeRow, 0,
|
|
|
|
|
len(rows),
|
|
|
|
|
)
|
|
|
|
|
edgesToCache = make(map[uint64]ChannelEdge, len(rows))
|
|
|
|
|
edgesSeen = make(map[uint64]struct{}, len(rows))
|
|
|
|
|
edges = make([]ChannelEdge, 0, len(rows))
|
|
|
|
|
|
|
|
|
|
// Separate cached from non-cached channels since we will only
|
|
|
|
|
// batch load the data for the ones we haven't cached yet.
|
|
|
|
|
for _, row := range rows {
|
|
|
|
|
// If we've already retrieved the info and policies for
|
|
|
|
|
// this edge, then we can skip it as we don't need to do
|
|
|
|
|
// so again.
|
|
|
|
|
chanIDInt := byteOrder.Uint64(row.GraphChannel.Scid)
|
|
|
|
|
|
|
|
|
|
// Skip duplicates.
|
|
|
|
|
if _, ok := edgesSeen[chanIDInt]; ok {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
edgesSeen[chanIDInt] = struct{}{}
|
|
|
|
|
|
|
|
|
|
// Check cache first.
|
|
|
|
|
if channel, ok := s.chanCache.get(chanIDInt); ok {
|
|
|
|
|
hits++
|
|
|
|
|
edgesSeen[chanIDInt] = struct{}{}
|
|
|
|
|
edges = append(edges, channel)
|
|
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
node1, node2, err := buildNodes(
|
|
|
|
|
ctx, db, row.GraphNode, row.GraphNode_2,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
channel, err := getAndBuildEdgeInfo(
|
|
|
|
|
ctx, db, s.cfg.ChainHash, row.GraphChannel,
|
|
|
|
|
node1.PubKeyBytes, node2.PubKeyBytes,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("unable to build channel "+
|
|
|
|
|
"info: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
dbPol1, dbPol2, err := extractChannelPolicies(row)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("unable to extract channel "+
|
|
|
|
|
"policies: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
p1, p2, err := getAndBuildChanPolicies(
|
|
|
|
|
ctx, db, dbPol1, dbPol2, channel.ChannelID,
|
|
|
|
|
node1.PubKeyBytes, node2.PubKeyBytes,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("unable to build channel "+
|
|
|
|
|
"policies: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
edgesSeen[chanIDInt] = struct{}{}
|
|
|
|
|
chanEdge := ChannelEdge{
|
|
|
|
|
Info: channel,
|
|
|
|
|
Policy1: p1,
|
|
|
|
|
Policy2: p2,
|
|
|
|
|
Node1: node1,
|
|
|
|
|
Node2: node2,
|
|
|
|
|
}
|
|
|
|
|
edges = append(edges, chanEdge)
|
|
|
|
|
edgesToCache[chanIDInt] = chanEdge
|
|
|
|
|
// Mark this row as one we need to batch load data for.
|
|
|
|
|
uncachedRows = append(uncachedRows, row)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// If there are no uncached rows, then we can return early.
|
|
|
|
|
if len(uncachedRows) == 0 {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Batch load data for all uncached channels.
|
|
|
|
|
newEdges, err := batchBuildChannelEdges(
|
|
|
|
|
ctx, s.cfg, db, uncachedRows,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("unable to batch build channel "+
|
|
|
|
|
"edges: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
edges = append(edges, newEdges...)
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}, func() {
|
|
|
|
|
edgesSeen = make(map[uint64]struct{})
|
|
|
|
|
edgesToCache = make(map[uint64]ChannelEdge)
|
|
|
|
|
edges = nil
|
|
|
|
|
})
|
|
|
|
|
}, sqldb.NoOpReset)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("unable to fetch channels: %w", err)
|
|
|
|
|
}
|
|
|
|
@@ -1625,11 +1615,12 @@ func (s *SQLStore) DeleteChannelEdges(strictZombiePruning, markZombie bool,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
ctx = context.TODO()
|
|
|
|
|
deleted []*models.ChannelEdgeInfo
|
|
|
|
|
ctx = context.TODO()
|
|
|
|
|
edges []*models.ChannelEdgeInfo
|
|
|
|
|
)
|
|
|
|
|
err := s.db.ExecTx(ctx, sqldb.WriteTxOpt(), func(db SQLQueries) error {
|
|
|
|
|
chanIDsToDelete := make([]int64, 0, len(chanIDs))
|
|
|
|
|
// First, collect all channel rows.
|
|
|
|
|
var channelRows []sqlc.GetChannelsBySCIDWithPoliciesRow
|
|
|
|
|
chanCallBack := func(ctx context.Context,
|
|
|
|
|
row sqlc.GetChannelsBySCIDWithPoliciesRow) error {
|
|
|
|
|
|
|
|
|
@@ -1638,64 +1629,7 @@ func (s *SQLStore) DeleteChannelEdges(strictZombiePruning, markZombie bool,
|
|
|
|
|
scid := byteOrder.Uint64(row.GraphChannel.Scid)
|
|
|
|
|
delete(chanLookup, scid)
|
|
|
|
|
|
|
|
|
|
node1, node2, err := buildNodeVertices(
|
|
|
|
|
row.GraphNode.PubKey, row.GraphNode_2.PubKey,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
info, err := getAndBuildEdgeInfo(
|
|
|
|
|
ctx, db, s.cfg.ChainHash, row.GraphChannel,
|
|
|
|
|
node1, node2,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
deleted = append(deleted, info)
|
|
|
|
|
chanIDsToDelete = append(
|
|
|
|
|
chanIDsToDelete, row.GraphChannel.ID,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if !markZombie {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
nodeKey1, nodeKey2 := info.NodeKey1Bytes,
|
|
|
|
|
info.NodeKey2Bytes
|
|
|
|
|
if strictZombiePruning {
|
|
|
|
|
var e1UpdateTime, e2UpdateTime *time.Time
|
|
|
|
|
if row.Policy1LastUpdate.Valid {
|
|
|
|
|
e1Time := time.Unix(
|
|
|
|
|
row.Policy1LastUpdate.Int64, 0,
|
|
|
|
|
)
|
|
|
|
|
e1UpdateTime = &e1Time
|
|
|
|
|
}
|
|
|
|
|
if row.Policy2LastUpdate.Valid {
|
|
|
|
|
e2Time := time.Unix(
|
|
|
|
|
row.Policy2LastUpdate.Int64, 0,
|
|
|
|
|
)
|
|
|
|
|
e2UpdateTime = &e2Time
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
nodeKey1, nodeKey2 = makeZombiePubkeys(
|
|
|
|
|
info, e1UpdateTime, e2UpdateTime,
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err = db.UpsertZombieChannel(
|
|
|
|
|
ctx, sqlc.UpsertZombieChannelParams{
|
|
|
|
|
Version: int16(ProtocolV1),
|
|
|
|
|
Scid: channelIDToBytes(scid),
|
|
|
|
|
NodeKey1: nodeKey1[:],
|
|
|
|
|
NodeKey2: nodeKey2[:],
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("unable to mark channel as "+
|
|
|
|
|
"zombie: %w", err)
|
|
|
|
|
}
|
|
|
|
|
channelRows = append(channelRows, row)
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
@@ -1711,9 +1645,37 @@ func (s *SQLStore) DeleteChannelEdges(strictZombiePruning, markZombie bool,
|
|
|
|
|
return ErrEdgeNotFound
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(channelRows) == 0 {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Batch build all channel edges.
|
|
|
|
|
var chanIDsToDelete []int64
|
|
|
|
|
edges, chanIDsToDelete, err = batchBuildChannelInfo(
|
|
|
|
|
ctx, s.cfg, db, channelRows,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if markZombie {
|
|
|
|
|
for i, row := range channelRows {
|
|
|
|
|
scid := byteOrder.Uint64(row.GraphChannel.Scid)
|
|
|
|
|
|
|
|
|
|
err := handleZombieMarking(
|
|
|
|
|
ctx, db, row, edges[i],
|
|
|
|
|
strictZombiePruning, scid,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("unable to mark "+
|
|
|
|
|
"channel as zombie: %w", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return s.deleteChannels(ctx, db, chanIDsToDelete)
|
|
|
|
|
}, func() {
|
|
|
|
|
deleted = nil
|
|
|
|
|
edges = nil
|
|
|
|
|
|
|
|
|
|
// Re-fill the lookup map.
|
|
|
|
|
for _, chanID := range chanIDs {
|
|
|
|
@@ -1730,7 +1692,7 @@ func (s *SQLStore) DeleteChannelEdges(strictZombiePruning, markZombie bool,
|
|
|
|
|
s.chanCache.remove(chanID)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return deleted, nil
|
|
|
|
|
return edges, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// FetchChannelEdgesByID attempts to lookup the two directed edges for the
|
|
|
|
@@ -2093,55 +2055,40 @@ func (s *SQLStore) FetchChanInfos(chanIDs []uint64) ([]ChannelEdge, error) {
|
|
|
|
|
edges = make(map[uint64]ChannelEdge)
|
|
|
|
|
)
|
|
|
|
|
err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error {
|
|
|
|
|
// First, collect all channel rows.
|
|
|
|
|
var channelRows []sqlc.GetChannelsBySCIDWithPoliciesRow
|
|
|
|
|
chanCallBack := func(ctx context.Context,
|
|
|
|
|
row sqlc.GetChannelsBySCIDWithPoliciesRow) error {
|
|
|
|
|
|
|
|
|
|
node1, node2, err := buildNodes(
|
|
|
|
|
ctx, db, row.GraphNode, row.GraphNode_2,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("unable to fetch nodes: %w",
|
|
|
|
|
err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
edge, err := getAndBuildEdgeInfo(
|
|
|
|
|
ctx, db, s.cfg.ChainHash, row.GraphChannel,
|
|
|
|
|
node1.PubKeyBytes, node2.PubKeyBytes,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("unable to build "+
|
|
|
|
|
"channel info: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
dbPol1, dbPol2, err := extractChannelPolicies(row)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("unable to extract channel "+
|
|
|
|
|
"policies: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
p1, p2, err := getAndBuildChanPolicies(
|
|
|
|
|
ctx, db, dbPol1, dbPol2, edge.ChannelID,
|
|
|
|
|
node1.PubKeyBytes, node2.PubKeyBytes,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("unable to build channel "+
|
|
|
|
|
"policies: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
edges[edge.ChannelID] = ChannelEdge{
|
|
|
|
|
Info: edge,
|
|
|
|
|
Policy1: p1,
|
|
|
|
|
Policy2: p2,
|
|
|
|
|
Node1: node1,
|
|
|
|
|
Node2: node2,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
channelRows = append(channelRows, row)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return s.forEachChanWithPoliciesInSCIDList(
|
|
|
|
|
err := s.forEachChanWithPoliciesInSCIDList(
|
|
|
|
|
ctx, db, chanCallBack, chanIDs,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(channelRows) == 0 {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Batch build all channel edges.
|
|
|
|
|
chans, err := batchBuildChannelEdges(
|
|
|
|
|
ctx, s.cfg, db, channelRows,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("unable to build channel edges: %w",
|
|
|
|
|
err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, c := range chans {
|
|
|
|
|
edges[c.Info.ChannelID] = c
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return err
|
|
|
|
|
}, func() {
|
|
|
|
|
clear(edges)
|
|
|
|
|
})
|
|
|
|
@@ -2363,31 +2310,12 @@ func (s *SQLStore) PruneGraph(spentOutputs []*wire.OutPoint,
|
|
|
|
|
prunedNodes []route.Vertex
|
|
|
|
|
)
|
|
|
|
|
err := s.db.ExecTx(ctx, sqldb.WriteTxOpt(), func(db SQLQueries) error {
|
|
|
|
|
var chansToDelete []int64
|
|
|
|
|
|
|
|
|
|
// Define the callback function for processing each channel.
|
|
|
|
|
// First, collect all channel rows that need to be pruned.
|
|
|
|
|
var channelRows []sqlc.GetChannelsByOutpointsRow
|
|
|
|
|
channelCallback := func(ctx context.Context,
|
|
|
|
|
row sqlc.GetChannelsByOutpointsRow) error {
|
|
|
|
|
|
|
|
|
|
node1, node2, err := buildNodeVertices(
|
|
|
|
|
row.Node1Pubkey, row.Node2Pubkey,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
info, err := getAndBuildEdgeInfo(
|
|
|
|
|
ctx, db, s.cfg.ChainHash, row.GraphChannel,
|
|
|
|
|
node1, node2,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
closedChans = append(closedChans, info)
|
|
|
|
|
chansToDelete = append(
|
|
|
|
|
chansToDelete, row.GraphChannel.ID,
|
|
|
|
|
)
|
|
|
|
|
channelRows = append(channelRows, row)
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
@@ -2400,6 +2328,32 @@ func (s *SQLStore) PruneGraph(spentOutputs []*wire.OutPoint,
|
|
|
|
|
"outpoints: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(channelRows) == 0 {
|
|
|
|
|
// There are no channels to prune. So we can exit early
|
|
|
|
|
// after updating the prune log.
|
|
|
|
|
err = db.UpsertPruneLogEntry(
|
|
|
|
|
ctx, sqlc.UpsertPruneLogEntryParams{
|
|
|
|
|
BlockHash: blockHash[:],
|
|
|
|
|
BlockHeight: int64(blockHeight),
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("unable to insert prune log "+
|
|
|
|
|
"entry: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Batch build all channel edges for pruning.
|
|
|
|
|
var chansToDelete []int64
|
|
|
|
|
closedChans, chansToDelete, err = batchBuildChannelInfo(
|
|
|
|
|
ctx, s.cfg, db, channelRows,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
err = s.deleteChannels(ctx, db, chansToDelete)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("unable to delete channels: %w", err)
|
|
|
|
@@ -2658,27 +2612,29 @@ func (s *SQLStore) DisconnectBlockAtHeight(height uint32) (
|
|
|
|
|
return fmt.Errorf("unable to fetch channels: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
chanIDsToDelete := make([]int64, len(rows))
|
|
|
|
|
for i, row := range rows {
|
|
|
|
|
node1, node2, err := buildNodeVertices(
|
|
|
|
|
row.Node1PubKey, row.Node2PubKey,
|
|
|
|
|
if len(rows) == 0 {
|
|
|
|
|
// No channels to disconnect, but still clean up prune
|
|
|
|
|
// log.
|
|
|
|
|
return db.DeletePruneLogEntriesInRange(
|
|
|
|
|
ctx, sqlc.DeletePruneLogEntriesInRangeParams{
|
|
|
|
|
StartHeight: int64(height),
|
|
|
|
|
EndHeight: int64(
|
|
|
|
|
endShortChanID.BlockHeight,
|
|
|
|
|
),
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
channel, err := getAndBuildEdgeInfo(
|
|
|
|
|
ctx, db, s.cfg.ChainHash, row.GraphChannel,
|
|
|
|
|
node1, node2,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
chanIDsToDelete[i] = row.GraphChannel.ID
|
|
|
|
|
removedChans = append(removedChans, channel)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Batch build all channel edges for disconnection.
|
|
|
|
|
channelEdges, chanIDsToDelete, err := batchBuildChannelInfo(
|
|
|
|
|
ctx, s.cfg, db, rows,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
removedChans = channelEdges
|
|
|
|
|
|
|
|
|
|
err = s.deleteChannels(ctx, db, chanIDsToDelete)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("unable to delete channels: %w", err)
|
|
|
|
@@ -3230,7 +3186,7 @@ func getNodeByPubKey(ctx context.Context, db SQLQueries,
|
|
|
|
|
return 0, nil, fmt.Errorf("unable to fetch node: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
node, err := buildNode(ctx, db, &dbNode)
|
|
|
|
|
node, err := buildNode(ctx, db, dbNode)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return 0, nil, fmt.Errorf("unable to build node: %w", err)
|
|
|
|
|
}
|
|
|
|
@@ -3255,7 +3211,7 @@ func buildCacheableChannelInfo(scid []byte, capacity int64, node1Pub,
|
|
|
|
|
// record. The node's features, addresses and extra signed fields are also
|
|
|
|
|
// fetched from the database and set on the node.
|
|
|
|
|
func buildNode(ctx context.Context, db SQLQueries,
|
|
|
|
|
dbNode *sqlc.GraphNode) (*models.LightningNode, error) {
|
|
|
|
|
dbNode sqlc.GraphNode) (*models.LightningNode, error) {
|
|
|
|
|
|
|
|
|
|
// NOTE: buildNode is only used to load the data for a single node, and
|
|
|
|
|
// so no paged queries will be performed. This means that it's ok to
|
|
|
|
@@ -3275,7 +3231,7 @@ func buildNode(ctx context.Context, db SQLQueries,
|
|
|
|
|
// from the provided sqlc.GraphNode and batchNodeData. If the node does have
|
|
|
|
|
// features/addresses/extra fields, then the corresponding fields are expected
|
|
|
|
|
// to be present in the batchNodeData.
|
|
|
|
|
func buildNodeWithBatchData(dbNode *sqlc.GraphNode,
|
|
|
|
|
func buildNodeWithBatchData(dbNode sqlc.GraphNode,
|
|
|
|
|
batchData *batchNodeData) (*models.LightningNode, error) {
|
|
|
|
|
|
|
|
|
|
if dbNode.Version != int16(ProtocolV1) {
|
|
|
|
@@ -3363,7 +3319,7 @@ func forEachNodeInBatch(ctx context.Context, cfg *sqldb.QueryConfig,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, dbNode := range nodes {
|
|
|
|
|
node, err := buildNodeWithBatchData(&dbNode, batchData)
|
|
|
|
|
node, err := buildNodeWithBatchData(dbNode, batchData)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("unable to build node(id=%d): %w",
|
|
|
|
|
dbNode.ID, err)
|
|
|
|
@@ -4228,25 +4184,6 @@ func buildChanPolicy(dbPolicy sqlc.GraphChannelPolicy, channelID uint64,
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// buildNodes builds the models.LightningNode instances for the
|
|
|
|
|
// given row which is expected to be a sqlc type that contains node information.
|
|
|
|
|
func buildNodes(ctx context.Context, db SQLQueries, dbNode1,
|
|
|
|
|
dbNode2 sqlc.GraphNode) (*models.LightningNode, *models.LightningNode,
|
|
|
|
|
error) {
|
|
|
|
|
|
|
|
|
|
node1, err := buildNode(ctx, db, &dbNode1)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
node2, err := buildNode(ctx, db, &dbNode2)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return node1, node2, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// extractChannelPolicies extracts the sqlc.GraphChannelPolicy records from the give
|
|
|
|
|
// row which is expected to be a sqlc type that contains channel policy
|
|
|
|
|
// information. It returns two policies, which may be nil if the policy
|
|
|
|
@@ -5089,7 +5026,7 @@ func forEachNodePaginated(ctx context.Context, cfg *sqldb.QueryConfig,
|
|
|
|
|
processItem := func(ctx context.Context, dbNode sqlc.GraphNode,
|
|
|
|
|
batchData *batchNodeData) error {
|
|
|
|
|
|
|
|
|
|
node, err := buildNodeWithBatchData(&dbNode, batchData)
|
|
|
|
|
node, err := buildNodeWithBatchData(dbNode, batchData)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("unable to build "+
|
|
|
|
|
"node(id=%d): %w", dbNode.ID, err)
|
|
|
|
@@ -5297,3 +5234,208 @@ func buildDirectedChannel(chain chainhash.Hash, nodeID int64,
|
|
|
|
|
|
|
|
|
|
return directedChannel, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// batchBuildChannelEdges builds a slice of ChannelEdge instances from the
|
|
|
|
|
// provided rows. It uses batch loading for channels, policies, and nodes.
|
|
|
|
|
func batchBuildChannelEdges[T sqlc.ChannelAndNodes](ctx context.Context,
|
|
|
|
|
cfg *SQLStoreConfig, db SQLQueries, rows []T) ([]ChannelEdge, error) {
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
channelIDs = make([]int64, len(rows))
|
|
|
|
|
policyIDs = make([]int64, 0, len(rows)*2)
|
|
|
|
|
nodeIDs = make([]int64, 0, len(rows)*2)
|
|
|
|
|
|
|
|
|
|
// nodeIDSet is used to ensure we only collect unique node IDs.
|
|
|
|
|
nodeIDSet = make(map[int64]bool)
|
|
|
|
|
|
|
|
|
|
// edges will hold the final channel edges built from the rows.
|
|
|
|
|
edges = make([]ChannelEdge, 0, len(rows))
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Collect all IDs needed for batch loading.
|
|
|
|
|
for i, row := range rows {
|
|
|
|
|
channelIDs[i] = row.Channel().ID
|
|
|
|
|
|
|
|
|
|
// Collect policy IDs
|
|
|
|
|
dbPol1, dbPol2, err := extractChannelPolicies(row)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("unable to extract channel "+
|
|
|
|
|
"policies: %w", err)
|
|
|
|
|
}
|
|
|
|
|
if dbPol1 != nil {
|
|
|
|
|
policyIDs = append(policyIDs, dbPol1.ID)
|
|
|
|
|
}
|
|
|
|
|
if dbPol2 != nil {
|
|
|
|
|
policyIDs = append(policyIDs, dbPol2.ID)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
node1ID = row.Node1().ID
|
|
|
|
|
node2ID = row.Node2().ID
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Collect unique node IDs.
|
|
|
|
|
if !nodeIDSet[node1ID] {
|
|
|
|
|
nodeIDs = append(nodeIDs, node1ID)
|
|
|
|
|
nodeIDSet[node1ID] = true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !nodeIDSet[node2ID] {
|
|
|
|
|
nodeIDs = append(nodeIDs, node2ID)
|
|
|
|
|
nodeIDSet[node2ID] = true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Batch the data for all the channels and policies.
|
|
|
|
|
channelBatchData, err := batchLoadChannelData(
|
|
|
|
|
ctx, cfg.QueryCfg, db, channelIDs, policyIDs,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("unable to batch load channel and "+
|
|
|
|
|
"policy data: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Batch the data for all the nodes.
|
|
|
|
|
nodeBatchData, err := batchLoadNodeData(ctx, cfg.QueryCfg, db, nodeIDs)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("unable to batch load node data: %w",
|
|
|
|
|
err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Build all channel edges using batch data.
|
|
|
|
|
for _, row := range rows {
|
|
|
|
|
// Build nodes using batch data.
|
|
|
|
|
node1, err := buildNodeWithBatchData(row.Node1(), nodeBatchData)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("unable to build node1: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
node2, err := buildNodeWithBatchData(row.Node2(), nodeBatchData)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("unable to build node2: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Build channel info using batch data.
|
|
|
|
|
channel, err := buildEdgeInfoWithBatchData(
|
|
|
|
|
cfg.ChainHash, row.Channel(), node1.PubKeyBytes,
|
|
|
|
|
node2.PubKeyBytes, channelBatchData,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("unable to build channel "+
|
|
|
|
|
"info: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Extract and build policies using batch data.
|
|
|
|
|
dbPol1, dbPol2, err := extractChannelPolicies(row)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("unable to extract channel "+
|
|
|
|
|
"policies: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
p1, p2, err := buildChanPoliciesWithBatchData(
|
|
|
|
|
dbPol1, dbPol2, channel.ChannelID,
|
|
|
|
|
node1.PubKeyBytes, node2.PubKeyBytes, channelBatchData,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("unable to build channel "+
|
|
|
|
|
"policies: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
edges = append(edges, ChannelEdge{
|
|
|
|
|
Info: channel,
|
|
|
|
|
Policy1: p1,
|
|
|
|
|
Policy2: p2,
|
|
|
|
|
Node1: node1,
|
|
|
|
|
Node2: node2,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return edges, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// batchBuildChannelInfo builds a slice of models.ChannelEdgeInfo
|
|
|
|
|
// instances from the provided rows using batch loading for channel data.
|
|
|
|
|
func batchBuildChannelInfo[T sqlc.ChannelAndNodeIDs](ctx context.Context,
|
|
|
|
|
cfg *SQLStoreConfig, db SQLQueries, rows []T) (
|
|
|
|
|
[]*models.ChannelEdgeInfo, []int64, error) {
|
|
|
|
|
|
|
|
|
|
if len(rows) == 0 {
|
|
|
|
|
return nil, nil, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Collect all the channel IDs needed for batch loading.
|
|
|
|
|
channelIDs := make([]int64, len(rows))
|
|
|
|
|
for i, row := range rows {
|
|
|
|
|
channelIDs[i] = row.Channel().ID
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Batch load the channel data.
|
|
|
|
|
channelBatchData, err := batchLoadChannelData(
|
|
|
|
|
ctx, cfg.QueryCfg, db, channelIDs, nil,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, nil, fmt.Errorf("unable to batch load channel "+
|
|
|
|
|
"data: %w", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Build all channel edges using batch data.
|
|
|
|
|
edges := make([]*models.ChannelEdgeInfo, 0, len(rows))
|
|
|
|
|
for _, row := range rows {
|
|
|
|
|
node1, node2, err := buildNodeVertices(
|
|
|
|
|
row.Node1Pub(), row.Node2Pub(),
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Build channel info using batch data
|
|
|
|
|
info, err := buildEdgeInfoWithBatchData(
|
|
|
|
|
cfg.ChainHash, row.Channel(), node1, node2,
|
|
|
|
|
channelBatchData,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
edges = append(edges, info)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return edges, channelIDs, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// handleZombieMarking is a helper function that handles the logic of
|
|
|
|
|
// marking a channel as a zombie in the database. It takes into account whether
|
|
|
|
|
// we are in strict zombie pruning mode, and adjusts the node public keys
|
|
|
|
|
// accordingly based on the last update timestamps of the channel policies.
|
|
|
|
|
func handleZombieMarking(ctx context.Context, db SQLQueries,
|
|
|
|
|
row sqlc.GetChannelsBySCIDWithPoliciesRow, info *models.ChannelEdgeInfo,
|
|
|
|
|
strictZombiePruning bool, scid uint64) error {
|
|
|
|
|
|
|
|
|
|
nodeKey1, nodeKey2 := info.NodeKey1Bytes, info.NodeKey2Bytes
|
|
|
|
|
|
|
|
|
|
if strictZombiePruning {
|
|
|
|
|
var e1UpdateTime, e2UpdateTime *time.Time
|
|
|
|
|
if row.Policy1LastUpdate.Valid {
|
|
|
|
|
e1Time := time.Unix(row.Policy1LastUpdate.Int64, 0)
|
|
|
|
|
e1UpdateTime = &e1Time
|
|
|
|
|
}
|
|
|
|
|
if row.Policy2LastUpdate.Valid {
|
|
|
|
|
e2Time := time.Unix(row.Policy2LastUpdate.Int64, 0)
|
|
|
|
|
e2UpdateTime = &e2Time
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
nodeKey1, nodeKey2 = makeZombiePubkeys(
|
|
|
|
|
info.NodeKey1Bytes, info.NodeKey2Bytes, e1UpdateTime,
|
|
|
|
|
e2UpdateTime,
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return db.UpsertZombieChannel(
|
|
|
|
|
ctx, sqlc.UpsertZombieChannelParams{
|
|
|
|
|
Version: int16(ProtocolV1),
|
|
|
|
|
Scid: channelIDToBytes(scid),
|
|
|
|
|
NodeKey1: nodeKey1[:],
|
|
|
|
|
NodeKey2: nodeKey2[:],
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|