Merge pull request #5840 from guggero/bolt-pathfinding-fallback

db: allow turning off in-memory graph cache for bbolt
This commit is contained in:
Olaoluwa Osuntokun 2021-10-25 14:24:10 -07:00 committed by GitHub
commit 0a3bc3ee3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 527 additions and 223 deletions

View File

@ -291,6 +291,7 @@ func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB,
chanDB.graph, err = NewChannelGraph(
backend, opts.RejectCacheSize, opts.ChannelCacheSize,
opts.BatchCommitInterval, opts.PreAllocCacheNumNodes,
opts.UseGraphCache,
)
if err != nil {
return nil, err

View File

@ -188,8 +188,8 @@ type ChannelGraph struct {
// NewChannelGraph allocates a new ChannelGraph backed by a DB instance. The
// returned instance has its own unique reject cache and channel cache.
func NewChannelGraph(db kvdb.Backend, rejectCacheSize, chanCacheSize int,
batchCommitInterval time.Duration,
preAllocCacheNumNodes int) (*ChannelGraph, error) {
batchCommitInterval time.Duration, preAllocCacheNumNodes int,
useGraphCache bool) (*ChannelGraph, error) {
if err := initChannelGraph(db); err != nil {
return nil, err
@ -199,7 +199,6 @@ func NewChannelGraph(db kvdb.Backend, rejectCacheSize, chanCacheSize int,
db: db,
rejectCache: newRejectCache(rejectCacheSize),
chanCache: newChannelCache(chanCacheSize),
graphCache: NewGraphCache(preAllocCacheNumNodes),
}
g.chanScheduler = batch.NewTimeScheduler(
db, &g.cacheMu, batchCommitInterval,
@ -208,18 +207,25 @@ func NewChannelGraph(db kvdb.Backend, rejectCacheSize, chanCacheSize int,
db, nil, batchCommitInterval,
)
startTime := time.Now()
log.Debugf("Populating in-memory channel graph, this might take a " +
"while...")
err := g.ForEachNodeCacheable(func(tx kvdb.RTx, node GraphCacheNode) error {
return g.graphCache.AddNode(tx, node)
})
if err != nil {
return nil, err
}
// The graph cache can be turned off (e.g. for mobile users) for a
// speed/memory usage tradeoff.
if useGraphCache {
g.graphCache = NewGraphCache(preAllocCacheNumNodes)
startTime := time.Now()
log.Debugf("Populating in-memory channel graph, this might " +
"take a while...")
err := g.ForEachNodeCacheable(
func(tx kvdb.RTx, node GraphCacheNode) error {
return g.graphCache.AddNode(tx, node)
},
)
if err != nil {
return nil, err
}
log.Debugf("Finished populating in-memory channel graph (took %v, %s)",
time.Since(startTime), g.graphCache.Stats())
log.Debugf("Finished populating in-memory channel graph (took "+
"%v, %s)", time.Since(startTime), g.graphCache.Stats())
}
return g, nil
}
@ -302,6 +308,16 @@ func initChannelGraph(db kvdb.Backend) error {
return nil
}
// NewPathFindTx returns a new read transaction that can be used for a single
// path finding session. Will return nil if the graph cache is enabled.
func (c *ChannelGraph) NewPathFindTx() (kvdb.RTx, error) {
if c.graphCache != nil {
return nil, nil
}
return c.db.BeginReadTx()
}
// ForEachChannel iterates through all the channel edges stored within the
// graph and invokes the passed callback for each edge. The callback takes two
// edges as since this is a directed graph, both the in/out edges are visited.
@ -370,10 +386,45 @@ func (c *ChannelGraph) ForEachChannel(cb func(*ChannelEdgeInfo,
// halted with the error propagated back up to the caller.
//
// Unknown policies are passed into the callback as nil values.
func (c *ChannelGraph) ForEachNodeChannel(node route.Vertex,
func (c *ChannelGraph) ForEachNodeChannel(tx kvdb.RTx, node route.Vertex,
cb func(channel *DirectedChannel) error) error {
return c.graphCache.ForEachChannel(node, cb)
if c.graphCache != nil {
return c.graphCache.ForEachChannel(node, cb)
}
// Fallback that uses the database.
toNodeCallback := func() route.Vertex {
return node
}
toNodeFeatures, err := c.FetchNodeFeatures(node)
if err != nil {
return err
}
dbCallback := func(tx kvdb.RTx, e *ChannelEdgeInfo, p1,
p2 *ChannelEdgePolicy) error {
cachedInPolicy := NewCachedPolicy(p2)
cachedInPolicy.ToNodePubKey = toNodeCallback
cachedInPolicy.ToNodeFeatures = toNodeFeatures
directedChannel := &DirectedChannel{
ChannelID: e.ChannelID,
IsNode1: node == e.NodeKey1Bytes,
OtherNode: e.NodeKey2Bytes,
Capacity: e.Capacity,
OutPolicySet: p1 != nil,
InPolicy: cachedInPolicy,
}
if node == e.NodeKey2Bytes {
directedChannel.OtherNode = e.NodeKey1Bytes
}
return cb(directedChannel)
}
return nodeTraversal(tx, node[:], c.db, dbCallback)
}
// FetchNodeFeatures returns the features of a given node. If no features are
@ -381,7 +432,27 @@ func (c *ChannelGraph) ForEachNodeChannel(node route.Vertex,
func (c *ChannelGraph) FetchNodeFeatures(
node route.Vertex) (*lnwire.FeatureVector, error) {
return c.graphCache.GetFeatures(node), nil
if c.graphCache != nil {
return c.graphCache.GetFeatures(node), nil
}
// Fallback that uses the database.
targetNode, err := c.FetchLightningNode(node)
switch err {
// If the node exists and has features, return them directly.
case nil:
return targetNode.Features, nil
// If we couldn't find a node announcement, populate a blank feature
// vector.
case ErrGraphNodeNotFound:
return lnwire.EmptyFeatureVector(), nil
// Otherwise, bubble the error up.
default:
return nil, err
}
}
// DisabledChannelIDs returns the channel ids of disabled channels.
@ -601,11 +672,14 @@ func (c *ChannelGraph) AddLightningNode(node *LightningNode,
r := &batch.Request{
Update: func(tx kvdb.RwTx) error {
cNode := newGraphCacheNode(
node.PubKeyBytes, node.Features,
)
if err := c.graphCache.AddNode(tx, cNode); err != nil {
return err
if c.graphCache != nil {
cNode := newGraphCacheNode(
node.PubKeyBytes, node.Features,
)
err := c.graphCache.AddNode(tx, cNode)
if err != nil {
return err
}
}
return addLightningNode(tx, node)
@ -686,7 +760,9 @@ func (c *ChannelGraph) DeleteLightningNode(nodePub route.Vertex) error {
return ErrGraphNodeNotFound
}
c.graphCache.RemoveNode(nodePub)
if c.graphCache != nil {
c.graphCache.RemoveNode(nodePub)
}
return c.deleteLightningNode(nodes, nodePub[:])
}, func() {})
@ -814,7 +890,9 @@ func (c *ChannelGraph) addChannelEdge(tx kvdb.RwTx, edge *ChannelEdgeInfo) error
return ErrEdgeAlreadyExist
}
c.graphCache.AddChannel(edge, nil, nil)
if c.graphCache != nil {
c.graphCache.AddChannel(edge, nil, nil)
}
// Before we insert the channel into the database, we'll ensure that
// both nodes already exist in the channel graph. If either node
@ -1015,7 +1093,9 @@ func (c *ChannelGraph) UpdateChannelEdge(edge *ChannelEdgeInfo) error {
return ErrEdgeNotFound
}
c.graphCache.UpdateChannel(edge)
if c.graphCache != nil {
c.graphCache.UpdateChannel(edge)
}
return putChanEdgeInfo(edgeIndex, edge, chanKey)
}, func() {})
@ -1153,7 +1233,10 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
c.chanCache.remove(channel.ChannelID)
}
log.Debugf("Pruned graph, cache now has %s", c.graphCache.Stats())
if c.graphCache != nil {
log.Debugf("Pruned graph, cache now has %s",
c.graphCache.Stats())
}
return chansClosed, nil
}
@ -1255,7 +1338,9 @@ func (c *ChannelGraph) pruneGraphNodes(nodes kvdb.RwBucket,
continue
}
c.graphCache.RemoveNode(nodePubKey)
if c.graphCache != nil {
c.graphCache.RemoveNode(nodePubKey)
}
// If we reach this point, then there are no longer any edges
// that connect this node, so we can delete it.
@ -2100,10 +2185,12 @@ func (c *ChannelGraph) delChannelEdge(edges, edgeIndex, chanIndex, zombieIndex,
return err
}
c.graphCache.RemoveChannel(
edgeInfo.NodeKey1Bytes, edgeInfo.NodeKey2Bytes,
edgeInfo.ChannelID,
)
if c.graphCache != nil {
c.graphCache.RemoveChannel(
edgeInfo.NodeKey1Bytes, edgeInfo.NodeKey2Bytes,
edgeInfo.ChannelID,
)
}
// We'll also remove the entry in the edge update index bucket before
// we delete the edges themselves so we can access their last update
@ -2360,7 +2447,12 @@ func updateEdgePolicy(tx kvdb.RwTx, edge *ChannelEdgePolicy,
)
copy(fromNodePubKey[:], fromNode)
copy(toNodePubKey[:], toNode)
graphCache.UpdatePolicy(edge, fromNodePubKey, toNodePubKey, isUpdate1)
if graphCache != nil {
graphCache.UpdatePolicy(
edge, fromNodePubKey, toNodePubKey, isUpdate1,
)
}
return isUpdate1, nil
}
@ -3629,7 +3721,9 @@ func (c *ChannelGraph) MarkEdgeZombie(chanID uint64,
"bucket: %w", err)
}
c.graphCache.RemoveChannel(pubKey1, pubKey2, chanID)
if c.graphCache != nil {
c.graphCache.RemoveChannel(pubKey1, pubKey2, chanID)
}
return markEdgeZombie(zombieIndex, chanID, pubKey1, pubKey2)
})
@ -3691,10 +3785,13 @@ func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {
if err != nil {
return err
}
for _, edgeInfo := range edgeInfos {
c.graphCache.AddChannel(
edgeInfo.Info, edgeInfo.Policy1, edgeInfo.Policy2,
)
if c.graphCache != nil {
for _, edgeInfo := range edgeInfos {
c.graphCache.AddChannel(
edgeInfo.Info, edgeInfo.Policy1,
edgeInfo.Policy2,
)
}
}
return nil

View File

@ -77,6 +77,7 @@ func MakeTestGraph(modifiers ...OptionModifier) (*ChannelGraph, func(), error) {
graph, err := NewChannelGraph(
backend, opts.RejectCacheSize, opts.ChannelCacheSize,
opts.BatchCommitInterval, opts.PreAllocCacheNumNodes,
true,
)
if err != nil {
backendCleanup()

View File

@ -45,6 +45,11 @@ type Options struct {
// graph cache, so we can pre-allocate the map accordingly.
PreAllocCacheNumNodes int
// UseGraphCache denotes whether the in-memory graph cache should be
// used or a fallback version that uses the underlying database for
// path finding.
UseGraphCache bool
// clock is the time source used by the database.
clock clock.Clock
@ -65,6 +70,7 @@ func DefaultOptions() Options {
RejectCacheSize: DefaultRejectCacheSize,
ChannelCacheSize: DefaultChannelCacheSize,
PreAllocCacheNumNodes: DefaultPreAllocCacheNumNodes,
UseGraphCache: true,
clock: clock.NewDefaultClock(),
}
}
@ -93,6 +99,13 @@ func OptionSetPreAllocCacheNumNodes(n int) OptionModifier {
}
}
// OptionSetUseGraphCache sets the UseGraphCache option to the given value.
func OptionSetUseGraphCache(use bool) OptionModifier {
return func(o *Options) {
o.UseGraphCache = use
}
}
// OptionSetSyncFreelist allows the database to sync its freelist.
func OptionSetSyncFreelist(b bool) OptionModifier {
return func(o *Options) {

View File

@ -847,6 +847,7 @@ func (d *DefaultDatabaseBuilder) BuildDatabase(
channeldb.OptionSetChannelCacheSize(cfg.Caches.ChannelCacheSize),
channeldb.OptionSetBatchCommitInterval(cfg.DB.BatchCommitInterval),
channeldb.OptionDryRunMigration(cfg.DryRunMigration),
channeldb.OptionSetUseGraphCache(!cfg.DB.NoGraphCache),
}
// We want to pre-allocate the channel graph cache according to what we

View File

@ -92,6 +92,14 @@ usage. Users running `lnd` on low-memory systems are advised to run with the
removes zombie channels from the graph, reducing the number of channels that
need to be kept in memory.
There is a [fallback option](https://github.com/lightningnetwork/lnd/pull/5840)
`db.no-graph-cache=true` that can be used when running a Bolt (`bbolt`) based
database backend. Using the database for path finding is considerably slower
than using the in-memory graph cache but uses less RAM. The fallback option is
not available for `etcd` or Postgres database backends because of the way they
handle long-running database transactions that are required for the path finding
operations.
## Protocol Extensions
### Explicit Channel Negotiation

View File

@ -57,6 +57,8 @@ type DB struct {
Bolt *kvdb.BoltConfig `group:"bolt" namespace:"bolt" description:"Bolt settings."`
Postgres *postgres.Config `group:"postgres" namespace:"postgres" description:"Postgres settings."`
NoGraphCache bool `long:"no-graph-cache" description:"Don't use the in-memory graph cache for path finding. Much slower but uses less RAM. Can only be used with a bolt database backend."`
}
// DefaultDB creates and returns a new default DB config.
@ -87,8 +89,21 @@ func (db *DB) Validate() error {
}
default:
return fmt.Errorf("unknown backend, must be either \"%v\" or \"%v\"",
BoltBackend, EtcdBackend)
return fmt.Errorf("unknown backend, must be either '%v' or "+
"'%v'", BoltBackend, EtcdBackend)
}
// The path finding uses a manual read transaction that's open for a
// potentially long time. That works fine with the locking model of
// bbolt but can lead to locks or rolled back transactions with etcd or
// postgres. And since we already have a smaller memory footprint for
// remote database setups (due to not needing to memory-map the bbolt DB
// files), we can keep the graph in memory instead. But for mobile
// devices the tradeoff between a smaller memory footprint and the
// longer time needed for path finding might be a desirable one.
if db.NoGraphCache && db.Backend != BoltBackend {
return fmt.Errorf("cannot use no-graph-cache with database "+
"backend '%v'", db.Backend)
}
return nil

View File

@ -433,15 +433,46 @@ func testSingleHopSendToRouteCase(net *lntest.NetworkHarness, t *harnessTest,
// We'll query the daemon for routes from Alice to Carol and then
// send payments through the routes.
func testMultiHopSendToRoute(net *lntest.NetworkHarness, t *harnessTest) {
t.t.Run("with cache", func(tt *testing.T) {
ht := newHarnessTest(tt, t.lndHarness)
runMultiHopSendToRoute(net, ht, true)
})
if *dbBackendFlag == "bbolt" {
t.t.Run("without cache", func(tt *testing.T) {
ht := newHarnessTest(tt, t.lndHarness)
runMultiHopSendToRoute(net, ht, false)
})
}
}
// runMultiHopSendToRoute tests that payments are properly processed
// through a provided route. We'll create the following network topology:
// Alice --100k--> Bob --100k--> Carol
// We'll query the daemon for routes from Alice to Carol and then
// send payments through the routes.
func runMultiHopSendToRoute(net *lntest.NetworkHarness, t *harnessTest,
useGraphCache bool) {
ctxb := context.Background()
var opts []string
if !useGraphCache {
opts = append(opts, "--db.no-graph-cache")
}
alice := net.NewNode(t.t, "Alice", opts)
defer shutdownAndAssert(net, t, alice)
net.ConnectNodes(t.t, alice, net.Bob)
net.SendCoins(t.t, btcutil.SatoshiPerBitcoin, alice)
const chanAmt = btcutil.Amount(100000)
var networkChans []*lnrpc.ChannelPoint
// Open a channel with 100k satoshis between Alice and Bob with Alice
// being the sole funder of the channel.
chanPointAlice := openChannelAndAssert(
t, net, net.Alice, net.Bob,
t, net, alice, net.Bob,
lntest.OpenChannelParams{
Amt: chanAmt,
},
@ -483,7 +514,7 @@ func testMultiHopSendToRoute(net *lntest.NetworkHarness, t *harnessTest) {
}
// Wait for all nodes to have seen all channels.
nodes := []*lntest.HarnessNode{net.Alice, net.Bob, carol}
nodes := []*lntest.HarnessNode{alice, net.Bob, carol}
nodeNames := []string{"Alice", "Bob", "Carol"}
for _, chanPoint := range networkChans {
for i, node := range nodes {
@ -529,7 +560,7 @@ func testMultiHopSendToRoute(net *lntest.NetworkHarness, t *harnessTest) {
FinalCltvDelta: chainreg.DefaultBitcoinTimeLockDelta,
}
ctxt, _ := context.WithTimeout(ctxb, defaultTimeout)
routes, err := net.Alice.QueryRoutes(ctxt, routesReq)
routes, err := alice.QueryRoutes(ctxt, routesReq)
if err != nil {
t.Fatalf("unable to get route: %v", err)
}
@ -565,7 +596,7 @@ func testMultiHopSendToRoute(net *lntest.NetworkHarness, t *harnessTest) {
PaymentHash: rHash,
Route: &route,
}
resp, err := net.Alice.RouterClient.SendToRouteV2(ctxt, sendReq)
resp, err := alice.RouterClient.SendToRouteV2(ctxt, sendReq)
if err != nil {
t.Fatalf("unable to send payment: %v", err)
}
@ -593,10 +624,10 @@ func testMultiHopSendToRoute(net *lntest.NetworkHarness, t *harnessTest) {
bobFundPoint, amountPaid, int64(0))
assertAmountPaid(t, "Alice(local) => Bob(remote)", net.Bob,
aliceFundPoint, int64(0), amountPaid+(baseFee*numPayments))
assertAmountPaid(t, "Alice(local) => Bob(remote)", net.Alice,
assertAmountPaid(t, "Alice(local) => Bob(remote)", alice,
aliceFundPoint, amountPaid+(baseFee*numPayments), int64(0))
closeChannelAndAssert(t, net, net.Alice, chanPointAlice, false)
closeChannelAndAssert(t, net, alice, chanPointAlice, false)
closeChannelAndAssert(t, net, carol, chanPointBob, false)
}

View File

@ -2,6 +2,7 @@ package routing
import (
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
)
@ -25,6 +26,7 @@ type routingGraph interface {
// database.
type CachedGraph struct {
graph *channeldb.ChannelGraph
tx kvdb.RTx
source route.Vertex
}
@ -32,27 +34,40 @@ type CachedGraph struct {
// interface.
var _ routingGraph = (*CachedGraph)(nil)
// NewCachedGraph instantiates a new db-connected routing graph. It implictly
// NewCachedGraph instantiates a new db-connected routing graph. It implicitly
// instantiates a new read transaction.
func NewCachedGraph(graph *channeldb.ChannelGraph) (*CachedGraph, error) {
sourceNode, err := graph.SourceNode()
func NewCachedGraph(sourceNode *channeldb.LightningNode,
graph *channeldb.ChannelGraph) (*CachedGraph, error) {
tx, err := graph.NewPathFindTx()
if err != nil {
return nil, err
}
return &CachedGraph{
graph: graph,
tx: tx,
source: sourceNode.PubKeyBytes,
}, nil
}
// close attempts to close the underlying db transaction. This is a no-op in
// case the underlying graph uses an in-memory cache.
func (g *CachedGraph) close() error {
if g.tx == nil {
return nil
}
return g.tx.Rollback()
}
// forEachNodeChannel calls the callback for every channel of the given node.
//
// NOTE: Part of the routingGraph interface.
func (g *CachedGraph) forEachNodeChannel(nodePub route.Vertex,
cb func(channel *channeldb.DirectedChannel) error) error {
return g.graph.ForEachNodeChannel(nodePub, cb)
return g.graph.ForEachNodeChannel(g.tx, nodePub, cb)
}
// sourceNode returns the source node of the graph.

View File

@ -145,7 +145,7 @@ func (c *integratedRoutingContext) testPayment(maxParts uint32,
c.t.Fatal(err)
}
getBandwidthHints := func() (bandwidthHints, error) {
getBandwidthHints := func(_ routingGraph) (bandwidthHints, error) {
// Create bandwidth hints based on local channel balances.
bandwidthHints := map[uint64]lnwire.MilliSatoshi{}
for _, ch := range c.graph.nodes[c.source.pubkey].channels {
@ -179,7 +179,11 @@ func (c *integratedRoutingContext) testPayment(maxParts uint32,
}
session, err := newPaymentSession(
&payment, getBandwidthHints, c.graph, mc, c.pathFindingCfg,
&payment, getBandwidthHints,
func() (routingGraph, func(), error) {
return c.graph, func() {}, nil
},
mc, c.pathFindingCfg,
)
if err != nil {
c.t.Fatal(err)

View File

@ -150,7 +150,9 @@ type testChan struct {
// makeTestGraph creates a new instance of a channeldb.ChannelGraph for testing
// purposes. A callback which cleans up the created temporary directories is
// also returned and intended to be executed after the test completes.
func makeTestGraph() (*channeldb.ChannelGraph, kvdb.Backend, func(), error) {
func makeTestGraph(useCache bool) (*channeldb.ChannelGraph, kvdb.Backend,
func(), error) {
// First, create a temporary directory to be used for the duration of
// this test.
tempDirName, err := ioutil.TempDir("", "channeldb")
@ -173,6 +175,7 @@ func makeTestGraph() (*channeldb.ChannelGraph, kvdb.Backend, func(), error) {
graph, err := channeldb.NewChannelGraph(
backend, opts.RejectCacheSize, opts.ChannelCacheSize,
opts.BatchCommitInterval, opts.PreAllocCacheNumNodes,
useCache,
)
if err != nil {
cleanUp()
@ -184,7 +187,7 @@ func makeTestGraph() (*channeldb.ChannelGraph, kvdb.Backend, func(), error) {
// parseTestGraph returns a fully populated ChannelGraph given a path to a JSON
// file which encodes a test graph.
func parseTestGraph(path string) (*testGraphInstance, error) {
func parseTestGraph(useCache bool, path string) (*testGraphInstance, error) {
graphJSON, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
@ -209,7 +212,7 @@ func parseTestGraph(path string) (*testGraphInstance, error) {
testAddrs = append(testAddrs, testAddr)
// Next, create a temporary graph database for usage within the test.
graph, graphBackend, cleanUp, err := makeTestGraph()
graph, graphBackend, cleanUp, err := makeTestGraph(useCache)
if err != nil {
return nil, err
}
@ -528,8 +531,8 @@ func (g *testGraphInstance) getLink(chanID lnwire.ShortChannelID) (
// a deterministical way and added to the channel graph. A list of nodes is
// not required and derived from the channel data. The goal is to keep
// instantiating a test channel graph as light weight as possible.
func createTestGraphFromChannels(testChannels []*testChannel, source string) (
*testGraphInstance, error) {
func createTestGraphFromChannels(useCache bool, testChannels []*testChannel,
source string) (*testGraphInstance, error) {
// We'll use this fake address for the IP address of all the nodes in
// our tests. This value isn't needed for path finding so it doesn't
@ -542,7 +545,7 @@ func createTestGraphFromChannels(testChannels []*testChannel, source string) (
testAddrs = append(testAddrs, testAddr)
// Next, create a temporary graph database for usage within the test.
graph, graphBackend, cleanUp, err := makeTestGraph()
graph, graphBackend, cleanUp, err := makeTestGraph(useCache)
if err != nil {
return nil, err
}
@ -768,13 +771,106 @@ func createTestGraphFromChannels(testChannels []*testChannel, source string) (
}, nil
}
// TestFindLowestFeePath tests that out of two routes with identical total
// TestPathFinding tests all path finding related cases both with the in-memory
// graph cached turned on and off.
func TestPathFinding(t *testing.T) {
testCases := []struct {
name string
fn func(t *testing.T, useCache bool)
}{{
name: "lowest fee path",
fn: runFindLowestFeePath,
}, {
name: "basic graph path finding",
fn: runBasicGraphPathFinding,
}, {
name: "path finding with additional edges",
fn: runPathFindingWithAdditionalEdges,
}, {
name: "new route path too long",
fn: runNewRoutePathTooLong,
}, {
name: "path not available",
fn: runPathNotAvailable,
}, {
name: "destination tlv graph fallback",
fn: runDestTLVGraphFallback,
}, {
name: "missing feature dependency",
fn: runMissingFeatureDep,
}, {
name: "unknown required features",
fn: runUnknownRequiredFeatures,
}, {
name: "destination payment address",
fn: runDestPaymentAddr,
}, {
name: "path insufficient capacity",
fn: runPathInsufficientCapacity,
}, {
name: "route fail min HTLC",
fn: runRouteFailMinHTLC,
}, {
name: "route fail max HTLC",
fn: runRouteFailMaxHTLC,
}, {
name: "route fail disabled edge",
fn: runRouteFailDisabledEdge,
}, {
name: "path source edges bandwidth",
fn: runPathSourceEdgesBandwidth,
}, {
name: "restrict outgoing channel",
fn: runRestrictOutgoingChannel,
}, {
name: "restrict last hop",
fn: runRestrictLastHop,
}, {
name: "CLTV limit",
fn: runCltvLimit,
}, {
name: "probability routing",
fn: runProbabilityRouting,
}, {
name: "equal cost route selection",
fn: runEqualCostRouteSelection,
}, {
name: "no cycle",
fn: runNoCycle,
}, {
name: "route to self",
fn: runRouteToSelf,
}}
// Run with graph cache enabled.
for _, tc := range testCases {
tc := tc
t.Run("cache=true/"+tc.name, func(tt *testing.T) {
tt.Parallel()
tc.fn(tt, true)
})
}
// And with the DB fallback to make sure everything works the same
// still.
for _, tc := range testCases {
tc := tc
t.Run("cache=false/"+tc.name, func(tt *testing.T) {
tt.Parallel()
tc.fn(tt, false)
})
}
}
// runFindLowestFeePath tests that out of two routes with identical total
// time lock values, the route with the lowest total fee should be returned.
// The fee rates are chosen such that the test failed on the previous edge
// weight function where one of the terms was fee squared.
func TestFindLowestFeePath(t *testing.T) {
t.Parallel()
func runFindLowestFeePath(t *testing.T, useCache bool) {
// Set up a test graph with two paths from roasbeef to target. Both
// paths have equal total time locks, but the path through b has lower
// fees (700 compared to 800 for the path through a).
@ -811,7 +907,7 @@ func TestFindLowestFeePath(t *testing.T) {
}),
}
ctx := newPathFindingTestContext(t, testChannels, "roasbeef")
ctx := newPathFindingTestContext(t, useCache, testChannels, "roasbeef")
defer ctx.cleanup()
const (
@ -916,10 +1012,8 @@ var basicGraphPathFindingTests = []basicGraphPathFindingTestCase{
expectFailureNoPath: true,
}}
func TestBasicGraphPathFinding(t *testing.T) {
t.Parallel()
testGraphInstance, err := parseTestGraph(basicGraphFilePath)
func runBasicGraphPathFinding(t *testing.T, useCache bool) {
testGraphInstance, err := parseTestGraph(useCache, basicGraphFilePath)
if err != nil {
t.Fatalf("unable to create graph: %v", err)
}
@ -1091,14 +1185,12 @@ func testBasicGraphPathFindingCase(t *testing.T, graphInstance *testGraphInstanc
}
}
// TestPathFindingWithAdditionalEdges asserts that we are able to find paths to
// runPathFindingWithAdditionalEdges asserts that we are able to find paths to
// nodes that do not exist in the graph by way of hop hints. We also test that
// the path can support custom TLV records for the receiver under the
// appropriate circumstances.
func TestPathFindingWithAdditionalEdges(t *testing.T) {
t.Parallel()
graph, err := parseTestGraph(basicGraphFilePath)
func runPathFindingWithAdditionalEdges(t *testing.T, useCache bool) {
graph, err := parseTestGraph(useCache, basicGraphFilePath)
if err != nil {
t.Fatalf("unable to create graph: %v", err)
}
@ -1502,9 +1594,7 @@ func TestNewRoute(t *testing.T) {
}
}
func TestNewRoutePathTooLong(t *testing.T) {
t.Parallel()
func runNewRoutePathTooLong(t *testing.T, useCache bool) {
var testChannels []*testChannel
// Setup a linear network of 21 hops.
@ -1522,7 +1612,7 @@ func TestNewRoutePathTooLong(t *testing.T) {
fromNode = toNode
}
ctx := newPathFindingTestContext(t, testChannels, "start")
ctx := newPathFindingTestContext(t, useCache, testChannels, "start")
defer ctx.cleanup()
// Assert that we can find 20 hop routes.
@ -1552,10 +1642,8 @@ func TestNewRoutePathTooLong(t *testing.T) {
}
}
func TestPathNotAvailable(t *testing.T) {
t.Parallel()
graph, err := parseTestGraph(basicGraphFilePath)
func runPathNotAvailable(t *testing.T, useCache bool) {
graph, err := parseTestGraph(useCache, basicGraphFilePath)
if err != nil {
t.Fatalf("unable to create graph: %v", err)
}
@ -1587,12 +1675,10 @@ func TestPathNotAvailable(t *testing.T) {
}
}
// TestDestTLVGraphFallback asserts that we properly detect when we can send TLV
// runDestTLVGraphFallback asserts that we properly detect when we can send TLV
// records to a receiver, and also that we fallback to the receiver's node
// announcement if we don't have an invoice features.
func TestDestTLVGraphFallback(t *testing.T) {
t.Parallel()
func runDestTLVGraphFallback(t *testing.T, useCache bool) {
testChannels := []*testChannel{
asymmetricTestChannel("roasbeef", "luoji", 100000,
&testChannelPolicy{
@ -1621,7 +1707,7 @@ func TestDestTLVGraphFallback(t *testing.T) {
}, 0),
}
ctx := newPathFindingTestContext(t, testChannels, "roasbeef")
ctx := newPathFindingTestContext(t, useCache, testChannels, "roasbeef")
defer ctx.cleanup()
sourceNode, err := ctx.graph.SourceNode()
@ -1689,12 +1775,10 @@ func TestDestTLVGraphFallback(t *testing.T) {
assertExpectedPath(t, ctx.testGraphInstance.aliasMap, path, "luoji")
}
// TestMissingFeatureDep asserts that we fail path finding when the
// runMissingFeatureDep asserts that we fail path finding when the
// destination's features are broken, in that the feature vector doesn't signal
// all transitive dependencies.
func TestMissingFeatureDep(t *testing.T) {
t.Parallel()
func runMissingFeatureDep(t *testing.T, useCache bool) {
testChannels := []*testChannel{
asymmetricTestChannel("roasbeef", "conner", 100000,
&testChannelPolicy{
@ -1728,7 +1812,7 @@ func TestMissingFeatureDep(t *testing.T) {
),
}
ctx := newPathFindingTestContext(t, testChannels, "roasbeef")
ctx := newPathFindingTestContext(t, useCache, testChannels, "roasbeef")
defer ctx.cleanup()
// Conner's node in the graph has a broken feature vector, since it
@ -1766,12 +1850,10 @@ func TestMissingFeatureDep(t *testing.T) {
}
}
// TestUnknownRequiredFeatures asserts that we fail path finding when the
// runUnknownRequiredFeatures asserts that we fail path finding when the
// destination requires an unknown required feature, and that we skip
// intermediaries that signal unknown required features.
func TestUnknownRequiredFeatures(t *testing.T) {
t.Parallel()
func runUnknownRequiredFeatures(t *testing.T, useCache bool) {
testChannels := []*testChannel{
asymmetricTestChannel("roasbeef", "conner", 100000,
&testChannelPolicy{
@ -1805,7 +1887,7 @@ func TestUnknownRequiredFeatures(t *testing.T) {
),
}
ctx := newPathFindingTestContext(t, testChannels, "roasbeef")
ctx := newPathFindingTestContext(t, useCache, testChannels, "roasbeef")
defer ctx.cleanup()
conner := ctx.keyFromAlias("conner")
@ -1832,12 +1914,10 @@ func TestUnknownRequiredFeatures(t *testing.T) {
}
}
// TestDestPaymentAddr asserts that we properly detect when we can send a
// runDestPaymentAddr asserts that we properly detect when we can send a
// payment address to a receiver, and also that we fallback to the receiver's
// node announcement if we don't have an invoice features.
func TestDestPaymentAddr(t *testing.T) {
t.Parallel()
func runDestPaymentAddr(t *testing.T, useCache bool) {
testChannels := []*testChannel{
symmetricTestChannel("roasbeef", "luoji", 100000,
&testChannelPolicy{
@ -1849,7 +1929,7 @@ func TestDestPaymentAddr(t *testing.T) {
),
}
ctx := newPathFindingTestContext(t, testChannels, "roasbeef")
ctx := newPathFindingTestContext(t, useCache, testChannels, "roasbeef")
defer ctx.cleanup()
luoji := ctx.keyFromAlias("luoji")
@ -1877,10 +1957,8 @@ func TestDestPaymentAddr(t *testing.T) {
assertExpectedPath(t, ctx.testGraphInstance.aliasMap, path, "luoji")
}
func TestPathInsufficientCapacity(t *testing.T) {
t.Parallel()
graph, err := parseTestGraph(basicGraphFilePath)
func runPathInsufficientCapacity(t *testing.T, useCache bool) {
graph, err := parseTestGraph(useCache, basicGraphFilePath)
if err != nil {
t.Fatalf("unable to create graph: %v", err)
}
@ -1912,12 +1990,10 @@ func TestPathInsufficientCapacity(t *testing.T) {
}
}
// TestRouteFailMinHTLC tests that if we attempt to route an HTLC which is
// runRouteFailMinHTLC tests that if we attempt to route an HTLC which is
// smaller than the advertised minHTLC of an edge, then path finding fails.
func TestRouteFailMinHTLC(t *testing.T) {
t.Parallel()
graph, err := parseTestGraph(basicGraphFilePath)
func runRouteFailMinHTLC(t *testing.T, useCache bool) {
graph, err := parseTestGraph(useCache, basicGraphFilePath)
if err != nil {
t.Fatalf("unable to create graph: %v", err)
}
@ -1943,11 +2019,9 @@ func TestRouteFailMinHTLC(t *testing.T) {
}
}
// TestRouteFailMaxHTLC tests that if we attempt to route an HTLC which is
// runRouteFailMaxHTLC tests that if we attempt to route an HTLC which is
// larger than the advertised max HTLC of an edge, then path finding fails.
func TestRouteFailMaxHTLC(t *testing.T) {
t.Parallel()
func runRouteFailMaxHTLC(t *testing.T, useCache bool) {
// Set up a test graph:
// roasbeef <--> firstHop <--> secondHop <--> target
// We will be adjusting the max HTLC of the edge between the first and
@ -1974,7 +2048,7 @@ func TestRouteFailMaxHTLC(t *testing.T) {
}),
}
ctx := newPathFindingTestContext(t, testChannels, "roasbeef")
ctx := newPathFindingTestContext(t, useCache, testChannels, "roasbeef")
defer ctx.cleanup()
// First, attempt to send a payment greater than the max HTLC we are
@ -2007,15 +2081,13 @@ func TestRouteFailMaxHTLC(t *testing.T) {
}
}
// TestRouteFailDisabledEdge tests that if we attempt to route to an edge
// runRouteFailDisabledEdge tests that if we attempt to route to an edge
// that's disabled, then that edge is disqualified, and the routing attempt
// will fail. We also test that this is true only for non-local edges, as we'll
// ignore the disable flags, with the assumption that the correct bandwidth is
// found among the bandwidth hints.
func TestRouteFailDisabledEdge(t *testing.T) {
t.Parallel()
graph, err := parseTestGraph(basicGraphFilePath)
func runRouteFailDisabledEdge(t *testing.T, useCache bool) {
graph, err := parseTestGraph(useCache, basicGraphFilePath)
if err != nil {
t.Fatalf("unable to create graph: %v", err)
}
@ -2089,13 +2161,11 @@ func TestRouteFailDisabledEdge(t *testing.T) {
}
}
// TestPathSourceEdgesBandwidth tests that explicitly passing in a set of
// runPathSourceEdgesBandwidth tests that explicitly passing in a set of
// bandwidth hints is used by the path finding algorithm to consider whether to
// use a local channel.
func TestPathSourceEdgesBandwidth(t *testing.T) {
t.Parallel()
graph, err := parseTestGraph(basicGraphFilePath)
func runPathSourceEdgesBandwidth(t *testing.T, useCache bool) {
graph, err := parseTestGraph(useCache, basicGraphFilePath)
if err != nil {
t.Fatalf("unable to create graph: %v", err)
}
@ -2390,11 +2460,9 @@ func TestNewRouteFromEmptyHops(t *testing.T) {
}
}
// TestRestrictOutgoingChannel asserts that a outgoing channel restriction is
// runRestrictOutgoingChannel asserts that a outgoing channel restriction is
// obeyed by the path finding algorithm.
func TestRestrictOutgoingChannel(t *testing.T) {
t.Parallel()
func runRestrictOutgoingChannel(t *testing.T, useCache bool) {
// Define channel id constants
const (
chanSourceA = 1
@ -2430,7 +2498,7 @@ func TestRestrictOutgoingChannel(t *testing.T) {
}, chanSourceTarget),
}
ctx := newPathFindingTestContext(t, testChannels, "roasbeef")
ctx := newPathFindingTestContext(t, useCache, testChannels, "roasbeef")
defer ctx.cleanup()
const (
@ -2473,11 +2541,9 @@ func TestRestrictOutgoingChannel(t *testing.T) {
}
}
// TestRestrictLastHop asserts that a last hop restriction is obeyed by the path
// runRestrictLastHop asserts that a last hop restriction is obeyed by the path
// finding algorithm.
func TestRestrictLastHop(t *testing.T) {
t.Parallel()
func runRestrictLastHop(t *testing.T, useCache bool) {
// Set up a test graph with three possible paths from roasbeef to
// target. The path via channel 1 and 2 is the lowest cost path.
testChannels := []*testChannel{
@ -2497,7 +2563,7 @@ func TestRestrictLastHop(t *testing.T) {
}, 4),
}
ctx := newPathFindingTestContext(t, testChannels, "source")
ctx := newPathFindingTestContext(t, useCache, testChannels, "source")
defer ctx.cleanup()
paymentAmt := lnwire.NewMSatFromSatoshis(100)
@ -2518,15 +2584,23 @@ func TestRestrictLastHop(t *testing.T) {
}
}
// TestCltvLimit asserts that a cltv limit is obeyed by the path finding
// runCltvLimit asserts that a cltv limit is obeyed by the path finding
// algorithm.
func TestCltvLimit(t *testing.T) {
t.Run("no limit", func(t *testing.T) { testCltvLimit(t, 2016, 1) })
t.Run("no path", func(t *testing.T) { testCltvLimit(t, 50, 0) })
t.Run("force high cost", func(t *testing.T) { testCltvLimit(t, 80, 3) })
func runCltvLimit(t *testing.T, useCache bool) {
t.Run("no limit", func(t *testing.T) {
testCltvLimit(t, useCache, 2016, 1)
})
t.Run("no path", func(t *testing.T) {
testCltvLimit(t, useCache, 50, 0)
})
t.Run("force high cost", func(t *testing.T) {
testCltvLimit(t, useCache, 80, 3)
})
}
func testCltvLimit(t *testing.T, limit uint32, expectedChannel uint64) {
func testCltvLimit(t *testing.T, useCache bool, limit uint32,
expectedChannel uint64) {
t.Parallel()
// Set up a test graph with three possible paths to the target. The path
@ -2560,7 +2634,7 @@ func testCltvLimit(t *testing.T, limit uint32, expectedChannel uint64) {
}),
}
ctx := newPathFindingTestContext(t, testChannels, "roasbeef")
ctx := newPathFindingTestContext(t, useCache, testChannels, "roasbeef")
defer ctx.cleanup()
paymentAmt := lnwire.NewMSatFromSatoshis(100)
@ -2603,11 +2677,9 @@ func testCltvLimit(t *testing.T, limit uint32, expectedChannel uint64) {
}
}
// TestProbabilityRouting asserts that path finding not only takes into account
// runProbabilityRouting asserts that path finding not only takes into account
// fees but also success probability.
func TestProbabilityRouting(t *testing.T) {
t.Parallel()
func runProbabilityRouting(t *testing.T, useCache bool) {
testCases := []struct {
name string
p10, p11, p20 float64
@ -2693,15 +2765,16 @@ func TestProbabilityRouting(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
testProbabilityRouting(
t, tc.amount, tc.p10, tc.p11, tc.p20,
t, useCache, tc.amount, tc.p10, tc.p11, tc.p20,
tc.minProbability, tc.expectedChan,
)
})
}
}
func testProbabilityRouting(t *testing.T, paymentAmt btcutil.Amount,
p10, p11, p20, minProbability float64, expectedChan uint64) {
func testProbabilityRouting(t *testing.T, useCache bool,
paymentAmt btcutil.Amount, p10, p11, p20, minProbability float64,
expectedChan uint64) {
t.Parallel()
@ -2728,7 +2801,7 @@ func testProbabilityRouting(t *testing.T, paymentAmt btcutil.Amount,
}, 20),
}
ctx := newPathFindingTestContext(t, testChannels, "roasbeef")
ctx := newPathFindingTestContext(t, useCache, testChannels, "roasbeef")
defer ctx.cleanup()
alias := ctx.testGraphInstance.aliasMap
@ -2782,11 +2855,9 @@ func testProbabilityRouting(t *testing.T, paymentAmt btcutil.Amount,
}
}
// TestEqualCostRouteSelection asserts that route probability will be used as a
// runEqualCostRouteSelection asserts that route probability will be used as a
// tie breaker in case the path finding probabilities are equal.
func TestEqualCostRouteSelection(t *testing.T) {
t.Parallel()
func runEqualCostRouteSelection(t *testing.T, useCache bool) {
// Set up a test graph with two possible paths to the target: via a and
// via b. The routing fees and probabilities are chosen such that the
// algorithm will first explore target->a->source (backwards search).
@ -2811,7 +2882,7 @@ func TestEqualCostRouteSelection(t *testing.T) {
}, 2),
}
ctx := newPathFindingTestContext(t, testChannels, "source")
ctx := newPathFindingTestContext(t, useCache, testChannels, "source")
defer ctx.cleanup()
alias := ctx.testGraphInstance.aliasMap
@ -2848,11 +2919,9 @@ func TestEqualCostRouteSelection(t *testing.T) {
}
}
// TestNoCycle tries to guide the path finding algorithm into reconstructing an
// runNoCycle tries to guide the path finding algorithm into reconstructing an
// endless route. It asserts that the algorithm is able to handle this properly.
func TestNoCycle(t *testing.T) {
t.Parallel()
func runNoCycle(t *testing.T, useCache bool) {
// Set up a test graph with two paths: source->a->target and
// source->b->c->target. The fees are setup such that, searching
// backwards, the algorithm will evaluate the following end of the route
@ -2882,7 +2951,7 @@ func TestNoCycle(t *testing.T) {
}, 5),
}
ctx := newPathFindingTestContext(t, testChannels, "source")
ctx := newPathFindingTestContext(t, useCache, testChannels, "source")
defer ctx.cleanup()
const (
@ -2922,10 +2991,8 @@ func TestNoCycle(t *testing.T) {
}
}
// TestRouteToSelf tests that it is possible to find a route to the self node.
func TestRouteToSelf(t *testing.T) {
t.Parallel()
// runRouteToSelf tests that it is possible to find a route to the self node.
func runRouteToSelf(t *testing.T, useCache bool) {
testChannels := []*testChannel{
symmetricTestChannel("source", "a", 100000, &testChannelPolicy{
Expiry: 144,
@ -2941,7 +3008,7 @@ func TestRouteToSelf(t *testing.T) {
}, 3),
}
ctx := newPathFindingTestContext(t, testChannels, "source")
ctx := newPathFindingTestContext(t, useCache, testChannels, "source")
defer ctx.cleanup()
paymentAmt := lnwire.NewMSatFromSatoshis(100)
@ -2979,11 +3046,11 @@ type pathFindingTestContext struct {
source route.Vertex
}
func newPathFindingTestContext(t *testing.T, testChannels []*testChannel,
source string) *pathFindingTestContext {
func newPathFindingTestContext(t *testing.T, useCache bool,
testChannels []*testChannel, source string) *pathFindingTestContext {
testGraphInstance, err := createTestGraphFromChannels(
testChannels, source,
useCache, testChannels, source,
)
if err != nil {
t.Fatalf("unable to create graph: %v", err)
@ -3059,11 +3126,22 @@ func dbFindPath(graph *channeldb.ChannelGraph,
source, target route.Vertex, amt lnwire.MilliSatoshi,
finalHtlcExpiry int32) ([]*channeldb.CachedEdgePolicy, error) {
routingGraph, err := NewCachedGraph(graph)
sourceNode, err := graph.SourceNode()
if err != nil {
return nil, err
}
routingGraph, err := NewCachedGraph(sourceNode, graph)
if err != nil {
return nil, err
}
defer func() {
if err := routingGraph.close(); err != nil {
log.Errorf("Error closing db tx: %v", err)
}
}()
return findPath(
&graphParams{
additionalEdges: additionalEdges,

View File

@ -180,7 +180,7 @@ func TestRouterPaymentStateMachine(t *testing.T) {
}, 2),
}
testGraph, err := createTestGraphFromChannels(testChannels, "a")
testGraph, err := createTestGraphFromChannels(true, testChannels, "a")
if err != nil {
t.Fatalf("unable to create graph: %v", err)
}

View File

@ -164,7 +164,7 @@ type PaymentSession interface {
type paymentSession struct {
additionalEdges map[route.Vertex][]*channeldb.CachedEdgePolicy
getBandwidthHints func() (bandwidthHints, error)
getBandwidthHints func(routingGraph) (bandwidthHints, error)
payment *LightningPayment
@ -172,7 +172,7 @@ type paymentSession struct {
pathFinder pathFinder
routingGraph routingGraph
getRoutingGraph func() (routingGraph, func(), error)
// pathFindingConfig defines global parameters that control the
// trade-off in path finding between fees and probabiity.
@ -192,8 +192,8 @@ type paymentSession struct {
// newPaymentSession instantiates a new payment session.
func newPaymentSession(p *LightningPayment,
getBandwidthHints func() (bandwidthHints, error),
routingGraph routingGraph,
getBandwidthHints func(routingGraph) (bandwidthHints, error),
getRoutingGraph func() (routingGraph, func(), error),
missionControl MissionController, pathFindingConfig PathFindingConfig) (
*paymentSession, error) {
@ -209,7 +209,7 @@ func newPaymentSession(p *LightningPayment,
getBandwidthHints: getBandwidthHints,
payment: p,
pathFinder: findPath,
routingGraph: routingGraph,
getRoutingGraph: getRoutingGraph,
pathFindingConfig: pathFindingConfig,
missionControl: missionControl,
minShardAmt: DefaultShardMinAmt,
@ -274,33 +274,42 @@ func (p *paymentSession) RequestRoute(maxAmt, feeLimit lnwire.MilliSatoshi,
}
for {
// Get a routing graph.
routingGraph, cleanup, err := p.getRoutingGraph()
if err != nil {
return nil, err
}
// We'll also obtain a set of bandwidthHints from the lower
// layer for each of our outbound channels. This will allow the
// path finding to skip any links that aren't active or just
// don't have enough bandwidth to carry the payment. New
// bandwidth hints are queried for every new path finding
// attempt, because concurrent payments may change balances.
bandwidthHints, err := p.getBandwidthHints()
bandwidthHints, err := p.getBandwidthHints(routingGraph)
if err != nil {
return nil, err
}
p.log.Debugf("pathfinding for amt=%v", maxAmt)
sourceVertex := p.routingGraph.sourceNode()
sourceVertex := routingGraph.sourceNode()
// Find a route for the current amount.
path, err := p.pathFinder(
&graphParams{
additionalEdges: p.additionalEdges,
bandwidthHints: bandwidthHints,
graph: p.routingGraph,
graph: routingGraph,
},
restrictions, &p.pathFindingConfig,
sourceVertex, p.payment.Target,
maxAmt, finalHtlcExpiry,
)
// Close routing graph.
cleanup()
switch {
case err == errNoPathFound:
// Don't split if this is a legacy payment without mpp

View File

@ -17,7 +17,10 @@ var _ PaymentSessionSource = (*SessionSource)(nil)
type SessionSource struct {
// Graph is the channel graph that will be used to gather metrics from
// and also to carry out path finding queries.
Graph routingGraph
Graph *channeldb.ChannelGraph
// SourceNode is the graph's source node.
SourceNode *channeldb.LightningNode
// GetLink is a method that allows querying the lower link layer
// to determine the up to date available bandwidth at a prospective link
@ -40,6 +43,21 @@ type SessionSource struct {
PathFindingConfig PathFindingConfig
}
// getRoutingGraph returns a routing graph and a clean-up function for
// pathfinding.
func (m *SessionSource) getRoutingGraph() (routingGraph, func(), error) {
routingTx, err := NewCachedGraph(m.SourceNode, m.Graph)
if err != nil {
return nil, nil, err
}
return routingTx, func() {
err := routingTx.close()
if err != nil {
log.Errorf("Error closing db tx: %v", err)
}
}, nil
}
// NewPaymentSession creates a new payment session backed by the latest prune
// view from Mission Control. An optional set of routing hints can be provided
// in order to populate additional edges to explore when finding a path to the
@ -47,14 +65,14 @@ type SessionSource struct {
func (m *SessionSource) NewPaymentSession(p *LightningPayment) (
PaymentSession, error) {
sourceNode := m.Graph.sourceNode()
getBandwidthHints := func() (bandwidthHints, error) {
return newBandwidthManager(m.Graph, sourceNode, m.GetLink)
getBandwidthHints := func(graph routingGraph) (bandwidthHints, error) {
return newBandwidthManager(
graph, m.SourceNode.PubKeyBytes, m.GetLink,
)
}
session, err := newPaymentSession(
p, getBandwidthHints, m.Graph,
p, getBandwidthHints, m.getRoutingGraph,
m.MissionControl, m.PathFindingConfig,
)
if err != nil {

View File

@ -116,10 +116,12 @@ func TestUpdateAdditionalEdge(t *testing.T) {
// Create the paymentsession.
session, err := newPaymentSession(
payment,
func() (bandwidthHints, error) {
func(routingGraph) (bandwidthHints, error) {
return &mockBandwidthHints{}, nil
},
&sessionGraph{},
func() (routingGraph, func(), error) {
return &sessionGraph{}, func() {}, nil
},
&MissionControl{},
PathFindingConfig{},
)
@ -194,10 +196,12 @@ func TestRequestRoute(t *testing.T) {
session, err := newPaymentSession(
payment,
func() (bandwidthHints, error) {
func(routingGraph) (bandwidthHints, error) {
return &mockBandwidthHints{}, nil
},
&sessionGraph{},
func() (routingGraph, func(), error) {
return &sessionGraph{}, func() {}, nil
},
&MissionControl{},
PathFindingConfig{},
)

View File

@ -129,11 +129,11 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T,
)
require.NoError(t, err, "failed to create missioncontrol")
cachedGraph, err := NewCachedGraph(graphInstance.graph)
sourceNode, err := graphInstance.graph.SourceNode()
require.NoError(t, err)
sessionSource := &SessionSource{
Graph: cachedGraph,
Graph: graphInstance.graph,
SourceNode: sourceNode,
GetLink: graphInstance.getLink,
PathFindingConfig: pathFindingConfig,
MissionControl: mc,
@ -190,7 +190,7 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T,
func createTestCtxSingleNode(t *testing.T,
startingHeight uint32) (*testCtx, func()) {
graph, graphBackend, cleanup, err := makeTestGraph()
graph, graphBackend, cleanup, err := makeTestGraph(true)
require.NoError(t, err, "failed to make test graph")
sourceNode, err := createTestNode()
@ -216,7 +216,7 @@ func createTestCtxFromFile(t *testing.T,
// We'll attempt to locate and parse out the file
// that encodes the graph that our tests should be run against.
graphInstance, err := parseTestGraph(testGraph)
graphInstance, err := parseTestGraph(true, testGraph)
require.NoError(t, err, "unable to create test graph")
return createTestCtxFromGraphInstance(
@ -387,7 +387,7 @@ func TestChannelUpdateValidation(t *testing.T) {
}, 2),
}
testGraph, err := createTestGraphFromChannels(testChannels, "a")
testGraph, err := createTestGraphFromChannels(true, testChannels, "a")
require.NoError(t, err, "unable to create graph")
defer testGraph.cleanUp()
@ -1246,7 +1246,7 @@ func TestIgnoreChannelEdgePolicyForUnknownChannel(t *testing.T) {
// Setup an initially empty network.
testChannels := []*testChannel{}
testGraph, err := createTestGraphFromChannels(
testChannels, "roasbeef",
true, testChannels, "roasbeef",
)
if err != nil {
t.Fatalf("unable to create graph: %v", err)
@ -2260,7 +2260,9 @@ func TestPruneChannelGraphStaleEdges(t *testing.T) {
for _, strictPruning := range []bool{true, false} {
// We'll create our test graph and router backed with these test
// channels we've created.
testGraph, err := createTestGraphFromChannels(testChannels, "a")
testGraph, err := createTestGraphFromChannels(
true, testChannels, "a",
)
if err != nil {
t.Fatalf("unable to create test graph: %v", err)
}
@ -2390,7 +2392,9 @@ func testPruneChannelGraphDoubleDisabled(t *testing.T, assumeValid bool) {
// We'll create our test graph and router backed with these test
// channels we've created.
testGraph, err := createTestGraphFromChannels(testChannels, "self")
testGraph, err := createTestGraphFromChannels(
true, testChannels, "self",
)
if err != nil {
t.Fatalf("unable to create test graph: %v", err)
}
@ -2760,7 +2764,7 @@ func TestUnknownErrorSource(t *testing.T) {
}, 4),
}
testGraph, err := createTestGraphFromChannels(testChannels, "a")
testGraph, err := createTestGraphFromChannels(true, testChannels, "a")
defer testGraph.cleanUp()
if err != nil {
t.Fatalf("unable to create graph: %v", err)
@ -2896,7 +2900,7 @@ func TestSendToRouteStructuredError(t *testing.T) {
}, 2),
}
testGraph, err := createTestGraphFromChannels(testChannels, "a")
testGraph, err := createTestGraphFromChannels(true, testChannels, "a")
if err != nil {
t.Fatalf("unable to create graph: %v", err)
}
@ -3145,7 +3149,7 @@ func TestSendToRouteMaxHops(t *testing.T) {
}, 1),
}
testGraph, err := createTestGraphFromChannels(testChannels, "a")
testGraph, err := createTestGraphFromChannels(true, testChannels, "a")
if err != nil {
t.Fatalf("unable to create graph: %v", err)
}
@ -3256,7 +3260,7 @@ func TestBuildRoute(t *testing.T) {
}, 4),
}
testGraph, err := createTestGraphFromChannels(testChannels, "a")
testGraph, err := createTestGraphFromChannels(true, testChannels, "a")
if err != nil {
t.Fatalf("unable to create graph: %v", err)
}
@ -3496,7 +3500,7 @@ func createDummyTestGraph(t *testing.T) *testGraphInstance {
}, 2),
}
testGraph, err := createTestGraphFromChannels(testChannels, "a")
testGraph, err := createTestGraphFromChannels(true, testChannels, "a")
require.NoError(t, err, "failed to create graph")
return testGraph
}

View File

@ -1118,6 +1118,9 @@ litecoin.node=ltcd
; a batch of modifications to disk. Defaults to 500 milliseconds.
; db.batch-commit-interval=500ms
; Don't use the in-memory graph cache for path finding. Much slower but uses
; less RAM. Can only be used with a bolt database backend.
; db.no-graph-cache=true
[etcd]
@ -1170,6 +1173,27 @@ litecoin.node=ltcd
; disable.
; db.postgres.timeout=
[bolt]
; If true, prevents the database from syncing its freelist to disk.
; db.bolt.nofreelistsync=1
; Whether the databases used within lnd should automatically be compacted on
; every startup (and if the database has the configured minimum age). This is
; disabled by default because it requires additional disk space to be available
; during the compaction that is freed afterwards. In general compaction leads to
; smaller database files.
; db.bolt.auto-compact=true
; How long ago the last compaction of a database file must be for it to be
; considered for auto compaction again. Can be set to 0 to compact on every
; startup. (default: 168h)
; db.bolt.auto-compact-min-age=0
; Specify the timeout to be used when opening the database.
; db.bolt.dbtimeout=60s
[cluster]
; Enables leader election if set.
@ -1217,26 +1241,6 @@ litecoin.node=ltcd
; The TLS certificate to use for establishing the remote signer's identity.
; remotesigner.tlscertpath=/path/to/remote/signer/tls.cert
[bolt]
; If true, prevents the database from syncing its freelist to disk.
; db.bolt.nofreelistsync=1
; Whether the databases used within lnd should automatically be compacted on
; every startup (and if the database has the configured minimum age). This is
; disabled by default because it requires additional disk space to be available
; during the compaction that is freed afterwards. In general compaction leads to
; smaller database files.
; db.bolt.auto-compact=true
; How long ago the last compaction of a database file must be for it to be
; considered for auto compaction again. Can be set to 0 to compact on every
; startup. (default: 168h)
; db.bolt.auto-compact-min-age=0
; Specify the timeout to be used when opening the database.
; db.bolt.dbtimeout=60s
[gossip]

View File

@ -860,12 +860,13 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
MinProbability: routingConfig.MinRouteProbability,
}
cachedGraph, err := routing.NewCachedGraph(chanGraph)
sourceNode, err := chanGraph.SourceNode()
if err != nil {
return nil, err
return nil, fmt.Errorf("error getting source node: %v", err)
}
paymentSessionSource := &routing.SessionSource{
Graph: cachedGraph,
Graph: chanGraph,
SourceNode: sourceNode,
MissionControl: s.missionControl,
GetLink: s.htlcSwitch.GetLinkByShortID,
PathFindingConfig: pathFindingConfig,