graph/db: introduce ForEachSourceNodeChannel

In preparation for creating a clean interface for the graph store, we
want to hide anything that is DB specific from the exposed methods on
the interface. Currently the `ForEachNodeChannel` and the
`FetchOtherNode` methods of the `KVStore` expose a `kvdb.RTx` parameter
which is bbolt specific. There is only one call-site of
`ForEachNodeChannel` actually makes use of the passed `kvdb.RTx`
parameter, and that is in the `establishPersistentConnections` method of
the `server` which then passes the tx parameter to `FetchOtherNode`.

So to clean-up the interface such that the `kvdb.RTx` is no longer
exposed: we instead create one new method called
`ForEachSourceNodeChannel` which can be used to replace the above
mentioned call-site. So as of this commit, all the remaining call-site
of `ForEachNodeChannel` pass in a nil param for `kvdb.RTx` - meaning we
can remove the parameter in a future commit.
This commit is contained in:
Elle Mouton
2025-03-26 17:23:51 +02:00
parent cf1ec68c0c
commit 42e87396ec
3 changed files with 136 additions and 26 deletions

View File

@@ -3553,36 +3553,17 @@ func (s *server) establishPersistentConnections() error {
// After checking our previous connections for addresses to connect to,
// iterate through the nodes in our channel graph to find addresses
// that have been added via NodeAnnouncement messages.
sourceNode, err := s.graphDB.SourceNode()
if err != nil {
return fmt.Errorf("failed to fetch source node: %w", err)
}
// TODO(roasbeef): instead iterate over link nodes and query graph for
// each of the nodes.
selfPub := s.identityECDH.PubKey().SerializeCompressed()
err = s.graphDB.ForEachNodeChannel(sourceNode.PubKeyBytes, func(
tx kvdb.RTx,
chanInfo *models.ChannelEdgeInfo,
policy, _ *models.ChannelEdgePolicy) error {
err = s.graphDB.ForEachSourceNodeChannel(func(chanPoint wire.OutPoint,
havePolicy bool, channelPeer *models.LightningNode) error {
// If the remote party has announced the channel to us, but we
// haven't yet, then we won't have a policy. However, we don't
// need this to connect to the peer, so we'll log it and move on.
if policy == nil {
if !havePolicy {
srvrLog.Warnf("No channel policy found for "+
"ChannelPoint(%v): ", chanInfo.ChannelPoint)
}
// We'll now fetch the peer opposite from us within this
// channel so we can queue up a direct connection to them.
channelPeer, err := s.graphDB.FetchOtherNode(
tx, chanInfo, selfPub,
)
if err != nil {
return fmt.Errorf("unable to fetch channel peer for "+
"ChannelPoint(%v): %v", chanInfo.ChannelPoint,
err)
"ChannelPoint(%v): ", chanPoint)
}
pubStr := string(channelPeer.PubKeyBytes[:])
@@ -3642,8 +3623,8 @@ func (s *server) establishPersistentConnections() error {
return nil
})
if err != nil {
srvrLog.Errorf("Failed to iterate channels for node %x",
sourceNode.PubKeyBytes)
srvrLog.Errorf("Failed to iterate over source node channels: "+
"%v", err)
if !errors.Is(err, graphdb.ErrGraphNoEdgesFound) &&
!errors.Is(err, graphdb.ErrEdgeNotFound) {