diff --git a/graph/session/graph_session.go b/graph/session/graph_session.go index 8145ed273..54e9e9ee7 100644 --- a/graph/session/graph_session.go +++ b/graph/session/graph_session.go @@ -4,7 +4,6 @@ import ( "fmt" graphdb "github.com/lightningnetwork/lnd/graph/db" - "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/routing/route" @@ -53,7 +52,7 @@ var _ routing.GraphSessionFactory = (*Factory)(nil) // access the backing channel graph. type session struct { graph graph - tx kvdb.RTx + tx RTx } // NewRoutingGraph constructs a session that which does not first start a @@ -72,7 +71,7 @@ func (g *session) close() error { return nil } - err := g.tx.Rollback() + err := g.tx.Close() if err != nil { return fmt.Errorf("error closing db tx: %w", err) } @@ -109,7 +108,7 @@ type ReadOnlyGraph interface { // NewPathFindTx returns a new read transaction that can be used for a // single path finding session. Will return nil if the graph cache is // enabled. - NewPathFindTx() (kvdb.RTx, error) + NewPathFindTx() (RTx, error) graph } @@ -128,7 +127,7 @@ type graph interface { // // NOTE: if a nil tx is provided, then it is expected that the // implementation create a read only tx. - ForEachNodeDirectedChannel(tx kvdb.RTx, node route.Vertex, + ForEachNodeDirectedChannel(tx RTx, node route.Vertex, cb func(channel *graphdb.DirectedChannel) error) error // FetchNodeFeatures returns the features of a given node. If no @@ -136,10 +135,6 @@ type graph interface { // // NOTE: if a nil tx is provided, then it is expected that the // implementation create a read only tx. - FetchNodeFeatures(tx kvdb.RTx, node route.Vertex) ( - *lnwire.FeatureVector, error) + FetchNodeFeatures(tx RTx, node route.Vertex) (*lnwire.FeatureVector, + error) } - -// A compile-time check to ensure that *channeldb.ChannelGraph implements the -// graph interface. -var _ graph = (*graphdb.ChannelGraph)(nil) diff --git a/graph/session/read_tx.go b/graph/session/read_tx.go new file mode 100644 index 000000000..f675e59db --- /dev/null +++ b/graph/session/read_tx.go @@ -0,0 +1,14 @@ +package session + +// RTx represents a read-only transaction that can only be used for graph +// reads during a path-finding session. +type RTx interface { + // Close closes the transaction. + Close() error + + // MustImplementRTx is a helper method that ensures that the RTx + // interface is implemented by the underlying type. This is useful since + // the other methods in the interface are quite generic and so many + // types will satisfy the interface if it only contains those methods. + MustImplementRTx() +} diff --git a/graph/sources/chan_graph.go b/graph/sources/chan_graph.go index a2acfcf6d..d12fc9cc6 100644 --- a/graph/sources/chan_graph.go +++ b/graph/sources/chan_graph.go @@ -1,6 +1,14 @@ package sources -import graphdb "github.com/lightningnetwork/lnd/graph/db" +import ( + "fmt" + + graphdb "github.com/lightningnetwork/lnd/graph/db" + "github.com/lightningnetwork/lnd/graph/session" + "github.com/lightningnetwork/lnd/kvdb" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing/route" +) // DBSource is an implementation of the GraphSource interface backed by a local // persistence layer holding graph related data. @@ -19,3 +27,102 @@ func NewDBGSource(db *graphdb.ChannelGraph) *DBSource { db: db, } } + +// NewPathFindTx returns a new read transaction that can be used for a single +// path finding session. Will return nil if the graph cache is enabled for the +// underlying graphdb.ChannelGraph. +// +// NOTE: this is part of the graphsession.ReadOnlyGraph interface. +func (s *DBSource) NewPathFindTx() (session.RTx, error) { + tx, err := s.db.NewPathFindTx() + if err != nil { + return nil, err + } + + return newKVDBRTx(tx), nil +} + +// ForEachNodeDirectedChannel iterates through all channels of a given node, +// executing the passed callback on the directed edge representing the channel +// and its incoming policy. If the callback returns an error, then the +// iteration is halted with the error propagated back up to the caller. An +// optional read transaction may be provided. If it is, then it will be cast +// into a kvdb.RTx and passed into the callback. +// +// Unknown policies are passed into the callback as nil values. +// +// NOTE: this is part of the graphsession.ReadOnlyGraph interface. +func (s *DBSource) ForEachNodeDirectedChannel(tx session.RTx, + node route.Vertex, + cb func(channel *graphdb.DirectedChannel) error) error { + + kvdbTx, err := extractKVDBRTx(tx) + if err != nil { + return err + } + + return s.db.ForEachNodeDirectedChannel(kvdbTx, node, cb) +} + +// FetchNodeFeatures returns the features of a given node. If no features are +// known for the node, an empty feature vector is returned. An optional read +// transaction may be provided. If it is, then it will be cast into a kvdb.RTx +// and passed into the callback. +// +// NOTE: this is part of the graphsession.ReadOnlyGraph interface. +func (s *DBSource) FetchNodeFeatures(tx session.RTx, + node route.Vertex) (*lnwire.FeatureVector, error) { + + kvdbTx, err := extractKVDBRTx(tx) + if err != nil { + return nil, err + } + + return s.db.FetchNodeFeatures(kvdbTx, node) +} + +// kvdbRTx is an implementation of graphdb.RTx backed by a KVDB database read +// transaction. +type kvdbRTx struct { + kvdb.RTx +} + +// newKVDBRTx constructs a kvdbRTx instance backed by the given kvdb.RTx. +func newKVDBRTx(tx kvdb.RTx) *kvdbRTx { + return &kvdbRTx{tx} +} + +// Close closes the underlying transaction. +// +// NOTE: this is part of the graphdb.RTx interface. +func (t *kvdbRTx) Close() error { + if t.RTx == nil { + return nil + } + + return t.RTx.Rollback() +} + +// MustImplementRTx is a helper method that ensures that the kvdbRTx type +// implements the RTx interface. +// +// NOTE: this is part of the graphdb.RTx interface. +func (t *kvdbRTx) MustImplementRTx() {} + +// A compile-time assertion to ensure that kvdbRTx implements the RTx interface. +var _ session.RTx = (*kvdbRTx)(nil) + +// extractKVDBRTx is a helper function that casts an RTx into a kvdbRTx and +// errors if the cast fails. +func extractKVDBRTx(tx session.RTx) (kvdb.RTx, error) { + if tx == nil { + return nil, nil + } + + kvdbTx, ok := tx.(*kvdbRTx) + if !ok { + return nil, fmt.Errorf("expected a graphdb.kvdbRTx, got %T", tx) + } + + return kvdbTx, nil +} diff --git a/graph/sources/interfaces.go b/graph/sources/interfaces.go index 99698d722..a99d779c0 100644 --- a/graph/sources/interfaces.go +++ b/graph/sources/interfaces.go @@ -1,6 +1,9 @@ package sources +import "github.com/lightningnetwork/lnd/graph/session" + // GraphSource defines the read-only graph interface required by LND for graph // related queries. type GraphSource interface { + session.ReadOnlyGraph } diff --git a/rpcserver.go b/rpcserver.go index 7e7fe9821..d4a55966e 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -695,6 +695,7 @@ func (r *rpcServer) addDeps(s *server, macService *macaroons.Service, return err } graph := s.graphDB + graphSource := s.graphSource routerBackend := &routerrpc.RouterBackend{ SelfNode: selfNode.PubKeyBytes, @@ -711,7 +712,7 @@ func (r *rpcServer) addDeps(s *server, macService *macaroons.Service, amount lnwire.MilliSatoshi) (btcutil.Amount, error) { return routing.FetchAmountPairCapacity( - graphsession.NewRoutingGraph(graph), + graphsession.NewRoutingGraph(graphSource), selfNode.PubKeyBytes, nodeFrom, nodeTo, amount, ) }, diff --git a/server.go b/server.go index 6f8c8c97a..4c0e52bc7 100644 --- a/server.go +++ b/server.go @@ -1026,7 +1026,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, } paymentSessionSource := &routing.SessionSource{ GraphSessionFactory: graphsession.NewGraphSessionFactory( - dbs.GraphDB, + graphSource, ), SourceNode: sourceNode, MissionControl: s.defaultMC, @@ -1060,7 +1060,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, s.chanRouter, err = routing.New(routing.Config{ SelfNode: selfNode.PubKeyBytes, - RoutingGraph: graphsession.NewRoutingGraph(dbs.GraphDB), + RoutingGraph: graphsession.NewRoutingGraph(graphSource), Chain: cc.ChainIO, Payer: s.htlcSwitch, Control: s.controlTower,