multi: carve out LinkNodeDB from channeldb.DB for cleaner separation

This commit is contained in:
Andras Banki-Horvath
2021-09-21 19:18:13 +02:00
committed by Oliver Gugger
parent 292b8e1ce6
commit 60cccf8409
6 changed files with 115 additions and 65 deletions

View File

@@ -1501,7 +1501,10 @@ func syncNewChannel(tx kvdb.RwTx, c *OpenChannel, addrs []net.Addr) error {
// Next, we need to establish a (possibly) new LinkNode relationship // Next, we need to establish a (possibly) new LinkNode relationship
// for this channel. The LinkNode metadata contains reachability, // for this channel. The LinkNode metadata contains reachability,
// up-time, and service bits related information. // up-time, and service bits related information.
linkNode := c.Db.NewLinkNode(wire.MainNet, c.IdentityPub, addrs...) linkNode := NewLinkNode(
&LinkNodeDB{backend: c.Db.Backend},
wire.MainNet, c.IdentityPub, addrs...,
)
// TODO(roasbeef): do away with link node all together? // TODO(roasbeef): do away with link node all together?

View File

@@ -217,6 +217,9 @@ var (
type DB struct { type DB struct {
kvdb.Backend kvdb.Backend
// linkNodeDB separates all DB operations on LinkNodes.
linkNodeDB *LinkNodeDB
dbPath string dbPath string
graph *ChannelGraph graph *ChannelGraph
clock clock.Clock clock clock.Clock
@@ -265,9 +268,13 @@ func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB,
chanDB := &DB{ chanDB := &DB{
Backend: backend, Backend: backend,
clock: opts.clock, linkNodeDB: &LinkNodeDB{
dryRun: opts.dryRun, backend: backend,
},
clock: opts.clock,
dryRun: opts.dryRun,
} }
chanDB.graph = newChannelGraph( chanDB.graph = newChannelGraph(
backend, opts.RejectCacheSize, opts.ChannelCacheSize, backend, opts.RejectCacheSize, opts.ChannelCacheSize,
opts.BatchCommitInterval, opts.BatchCommitInterval,
@@ -915,7 +922,11 @@ func (d *DB) FetchClosedChannelForID(cid lnwire.ChannelID) (
// the pending funds in a channel that has been forcibly closed have been // the pending funds in a channel that has been forcibly closed have been
// swept. // swept.
func (d *DB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error { func (d *DB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
return kvdb.Update(d, func(tx kvdb.RwTx) error { var (
openChannels []*OpenChannel
pruneLinkNode *btcec.PublicKey
)
err := kvdb.Update(d, func(tx kvdb.RwTx) error {
var b bytes.Buffer var b bytes.Buffer
if err := writeOutpoint(&b, chanPoint); err != nil { if err := writeOutpoint(&b, chanPoint); err != nil {
return err return err
@@ -961,19 +972,33 @@ func (d *DB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
// other open channels with this peer. If we don't we'll // other open channels with this peer. If we don't we'll
// garbage collect it to ensure we don't establish persistent // garbage collect it to ensure we don't establish persistent
// connections to peers without open channels. // connections to peers without open channels.
return d.pruneLinkNode(tx, chanSummary.RemotePub) pruneLinkNode = chanSummary.RemotePub
}, func() {}) openChannels, err = d.fetchOpenChannels(tx, pruneLinkNode)
if err != nil {
return fmt.Errorf("unable to fetch open channels for "+
"peer %x: %v",
pruneLinkNode.SerializeCompressed(), err)
}
return nil
}, func() {
openChannels = nil
pruneLinkNode = nil
})
if err != nil {
return err
}
// Decide whether we want to remove the link node, based upon the number
// of still open channels.
return d.pruneLinkNode(openChannels, pruneLinkNode)
} }
// pruneLinkNode determines whether we should garbage collect a link node from // pruneLinkNode determines whether we should garbage collect a link node from
// the database due to no longer having any open channels with it. If there are // the database due to no longer having any open channels with it. If there are
// any left, then this acts as a no-op. // any left, then this acts as a no-op.
func (d *DB) pruneLinkNode(tx kvdb.RwTx, remotePub *btcec.PublicKey) error { func (d *DB) pruneLinkNode(openChannels []*OpenChannel,
openChannels, err := d.fetchOpenChannels(tx, remotePub) remotePub *btcec.PublicKey) error {
if err != nil {
return fmt.Errorf("unable to fetch open channels for peer %x: "+
"%v", remotePub.SerializeCompressed(), err)
}
if len(openChannels) > 0 { if len(openChannels) > 0 {
return nil return nil
@@ -982,27 +1007,42 @@ func (d *DB) pruneLinkNode(tx kvdb.RwTx, remotePub *btcec.PublicKey) error {
log.Infof("Pruning link node %x with zero open channels from database", log.Infof("Pruning link node %x with zero open channels from database",
remotePub.SerializeCompressed()) remotePub.SerializeCompressed())
return d.deleteLinkNode(tx, remotePub) return d.linkNodeDB.DeleteLinkNode(remotePub)
} }
// PruneLinkNodes attempts to prune all link nodes found within the databse with // PruneLinkNodes attempts to prune all link nodes found within the databse with
// whom we no longer have any open channels with. // whom we no longer have any open channels with.
func (d *DB) PruneLinkNodes() error { func (d *DB) PruneLinkNodes() error {
return kvdb.Update(d, func(tx kvdb.RwTx) error { allLinkNodes, err := d.linkNodeDB.FetchAllLinkNodes()
linkNodes, err := d.fetchAllLinkNodes(tx) if err != nil {
return err
}
for _, linkNode := range allLinkNodes {
var (
openChannels []*OpenChannel
linkNode = linkNode
)
err := kvdb.View(d, func(tx kvdb.RTx) error {
var err error
openChannels, err = d.fetchOpenChannels(
tx, linkNode.IdentityPub,
)
return err
}, func() {
openChannels = nil
})
if err != nil { if err != nil {
return err return err
} }
for _, linkNode := range linkNodes { err = d.pruneLinkNode(openChannels, linkNode.IdentityPub)
err := d.pruneLinkNode(tx, linkNode.IdentityPub) if err != nil {
if err != nil { return err
return err
}
} }
}
return nil return nil
}, func() {})
} }
// ChannelShell is a shell of a channel that is meant to be used for channel // ChannelShell is a shell of a channel that is meant to be used for channel
@@ -1060,19 +1100,13 @@ func (d *DB) RestoreChannelShells(channelShells ...*ChannelShell) error {
// AddrsForNode consults the graph and channel database for all addresses known // AddrsForNode consults the graph and channel database for all addresses known
// to the passed node public key. // to the passed node public key.
func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr, error) { func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr, error) {
var ( linkNode, err := d.linkNodeDB.FetchLinkNode(nodePub)
linkNode *LinkNode if err != nil {
graphNode LightningNode return nil, err
) }
dbErr := kvdb.View(d, func(tx kvdb.RTx) error {
var err error
linkNode, err = fetchLinkNode(tx, nodePub)
if err != nil {
return err
}
var graphNode LightningNode
err = kvdb.View(d, func(tx kvdb.RTx) error {
// We'll also query the graph for this peer to see if they have // We'll also query the graph for this peer to see if they have
// any addresses that we don't currently have stored within the // any addresses that we don't currently have stored within the
// link node database. // link node database.
@@ -1092,8 +1126,8 @@ func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr, error) {
}, func() { }, func() {
linkNode = nil linkNode = nil
}) })
if dbErr != nil { if err != nil {
return nil, dbErr return nil, err
} }
// Now that we have both sources of addrs for this node, we'll use a // Now that we have both sources of addrs for this node, we'll use a
@@ -1236,11 +1270,16 @@ func (d *DB) syncVersions(versions []version) error {
}, func() {}) }, func() {})
} }
// ChannelGraph returns a new instance of the directed channel graph. // ChannelGraph returns the current instance of the directed channel graph.
func (d *DB) ChannelGraph() *ChannelGraph { func (d *DB) ChannelGraph() *ChannelGraph {
return d.graph return d.graph
} }
// LinkNodeDB returns the current instance of the link node database.
func (d *DB) LinkNodeDB() *LinkNodeDB {
return d.linkNodeDB
}
func getLatestDBVersion(versions []version) uint32 { func getLatestDBVersion(versions []version) uint32 {
return versions[len(versions)-1].number return versions[len(versions)-1].number
} }

View File

@@ -210,8 +210,8 @@ func TestAddrsForNode(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("unable to recv node pub: %v", err) t.Fatalf("unable to recv node pub: %v", err)
} }
linkNode := cdb.NewLinkNode( linkNode := NewLinkNode(
wire.MainNet, nodePub, anotherAddr, cdb.linkNodeDB, wire.MainNet, nodePub, anotherAddr,
) )
if err := linkNode.Sync(); err != nil { if err := linkNode.Sync(); err != nil {
t.Fatalf("unable to sync link node: %v", err) t.Fatalf("unable to sync link node: %v", err)
@@ -423,7 +423,9 @@ func TestRestoreChannelShells(t *testing.T) {
// We should also be able to find the link node that was inserted by // We should also be able to find the link node that was inserted by
// its public key. // its public key.
linkNode, err := cdb.FetchLinkNode(channelShell.Chan.IdentityPub) linkNode, err := cdb.linkNodeDB.FetchLinkNode(
channelShell.Chan.IdentityPub,
)
if err != nil { if err != nil {
t.Fatalf("unable to fetch link node: %v", err) t.Fatalf("unable to fetch link node: %v", err)
} }

View File

@@ -56,12 +56,14 @@ type LinkNode struct {
// authenticated connection for the stored identity public key. // authenticated connection for the stored identity public key.
Addresses []net.Addr Addresses []net.Addr
db *DB // db is the database instance this node was fetched from. This is used
// to sync back the node's state if it is updated.
db *LinkNodeDB
} }
// NewLinkNode creates a new LinkNode from the provided parameters, which is // NewLinkNode creates a new LinkNode from the provided parameters, which is
// backed by an instance of channeldb. // backed by an instance of a link node DB.
func (d *DB) NewLinkNode(bitNet wire.BitcoinNet, pub *btcec.PublicKey, func NewLinkNode(db *LinkNodeDB, bitNet wire.BitcoinNet, pub *btcec.PublicKey,
addrs ...net.Addr) *LinkNode { addrs ...net.Addr) *LinkNode {
return &LinkNode{ return &LinkNode{
@@ -69,7 +71,7 @@ func (d *DB) NewLinkNode(bitNet wire.BitcoinNet, pub *btcec.PublicKey,
IdentityPub: pub, IdentityPub: pub,
LastSeen: time.Now(), LastSeen: time.Now(),
Addresses: addrs, Addresses: addrs,
db: d, db: db,
} }
} }
@@ -98,10 +100,9 @@ func (l *LinkNode) AddAddress(addr net.Addr) error {
// Sync performs a full database sync which writes the current up-to-date data // Sync performs a full database sync which writes the current up-to-date data
// within the struct to the database. // within the struct to the database.
func (l *LinkNode) Sync() error { func (l *LinkNode) Sync() error {
// Finally update the database by storing the link node and updating // Finally update the database by storing the link node and updating
// any relevant indexes. // any relevant indexes.
return kvdb.Update(l.db, func(tx kvdb.RwTx) error { return kvdb.Update(l.db.backend, func(tx kvdb.RwTx) error {
nodeMetaBucket := tx.ReadWriteBucket(nodeInfoBucket) nodeMetaBucket := tx.ReadWriteBucket(nodeInfoBucket)
if nodeMetaBucket == nil { if nodeMetaBucket == nil {
return ErrLinkNodesNotFound return ErrLinkNodesNotFound
@@ -127,15 +128,20 @@ func putLinkNode(nodeMetaBucket kvdb.RwBucket, l *LinkNode) error {
return nodeMetaBucket.Put(nodePub, b.Bytes()) return nodeMetaBucket.Put(nodePub, b.Bytes())
} }
// LinkNodeDB is a database that keeps track of all link nodes.
type LinkNodeDB struct {
backend kvdb.Backend
}
// DeleteLinkNode removes the link node with the given identity from the // DeleteLinkNode removes the link node with the given identity from the
// database. // database.
func (d *DB) DeleteLinkNode(identity *btcec.PublicKey) error { func (l *LinkNodeDB) DeleteLinkNode(identity *btcec.PublicKey) error {
return kvdb.Update(d, func(tx kvdb.RwTx) error { return kvdb.Update(l.backend, func(tx kvdb.RwTx) error {
return d.deleteLinkNode(tx, identity) return deleteLinkNode(tx, identity)
}, func() {}) }, func() {})
} }
func (d *DB) deleteLinkNode(tx kvdb.RwTx, identity *btcec.PublicKey) error { func deleteLinkNode(tx kvdb.RwTx, identity *btcec.PublicKey) error {
nodeMetaBucket := tx.ReadWriteBucket(nodeInfoBucket) nodeMetaBucket := tx.ReadWriteBucket(nodeInfoBucket)
if nodeMetaBucket == nil { if nodeMetaBucket == nil {
return ErrLinkNodesNotFound return ErrLinkNodesNotFound
@@ -148,9 +154,9 @@ func (d *DB) deleteLinkNode(tx kvdb.RwTx, identity *btcec.PublicKey) error {
// FetchLinkNode attempts to lookup the data for a LinkNode based on a target // FetchLinkNode attempts to lookup the data for a LinkNode based on a target
// identity public key. If a particular LinkNode for the passed identity public // identity public key. If a particular LinkNode for the passed identity public
// key cannot be found, then ErrNodeNotFound if returned. // key cannot be found, then ErrNodeNotFound if returned.
func (d *DB) FetchLinkNode(identity *btcec.PublicKey) (*LinkNode, error) { func (l *LinkNodeDB) FetchLinkNode(identity *btcec.PublicKey) (*LinkNode, error) {
var linkNode *LinkNode var linkNode *LinkNode
err := kvdb.View(d, func(tx kvdb.RTx) error { err := kvdb.View(l.backend, func(tx kvdb.RTx) error {
node, err := fetchLinkNode(tx, identity) node, err := fetchLinkNode(tx, identity)
if err != nil { if err != nil {
return err return err
@@ -191,10 +197,10 @@ func fetchLinkNode(tx kvdb.RTx, targetPub *btcec.PublicKey) (*LinkNode, error) {
// FetchAllLinkNodes starts a new database transaction to fetch all nodes with // FetchAllLinkNodes starts a new database transaction to fetch all nodes with
// whom we have active channels with. // whom we have active channels with.
func (d *DB) FetchAllLinkNodes() ([]*LinkNode, error) { func (l *LinkNodeDB) FetchAllLinkNodes() ([]*LinkNode, error) {
var linkNodes []*LinkNode var linkNodes []*LinkNode
err := kvdb.View(d, func(tx kvdb.RTx) error { err := kvdb.View(l.backend, func(tx kvdb.RTx) error {
nodes, err := d.fetchAllLinkNodes(tx) nodes, err := fetchAllLinkNodes(tx)
if err != nil { if err != nil {
return err return err
} }
@@ -213,7 +219,7 @@ func (d *DB) FetchAllLinkNodes() ([]*LinkNode, error) {
// fetchAllLinkNodes uses an existing database transaction to fetch all nodes // fetchAllLinkNodes uses an existing database transaction to fetch all nodes
// with whom we have active channels with. // with whom we have active channels with.
func (d *DB) fetchAllLinkNodes(tx kvdb.RTx) ([]*LinkNode, error) { func fetchAllLinkNodes(tx kvdb.RTx) ([]*LinkNode, error) {
nodeMetaBucket := tx.ReadBucket(nodeInfoBucket) nodeMetaBucket := tx.ReadBucket(nodeInfoBucket)
if nodeMetaBucket == nil { if nodeMetaBucket == nil {
return nil, ErrLinkNodesNotFound return nil, ErrLinkNodesNotFound

View File

@@ -34,8 +34,8 @@ func TestLinkNodeEncodeDecode(t *testing.T) {
// Create two fresh link node instances with the above dummy data, then // Create two fresh link node instances with the above dummy data, then
// fully sync both instances to disk. // fully sync both instances to disk.
node1 := cdb.NewLinkNode(wire.MainNet, pub1, addr1) node1 := NewLinkNode(cdb.linkNodeDB, wire.MainNet, pub1, addr1)
node2 := cdb.NewLinkNode(wire.TestNet3, pub2, addr2) node2 := NewLinkNode(cdb.linkNodeDB, wire.TestNet3, pub2, addr2)
if err := node1.Sync(); err != nil { if err := node1.Sync(); err != nil {
t.Fatalf("unable to sync node: %v", err) t.Fatalf("unable to sync node: %v", err)
} }
@@ -46,7 +46,7 @@ func TestLinkNodeEncodeDecode(t *testing.T) {
// Fetch all current link nodes from the database, they should exactly // Fetch all current link nodes from the database, they should exactly
// match the two created above. // match the two created above.
originalNodes := []*LinkNode{node2, node1} originalNodes := []*LinkNode{node2, node1}
linkNodes, err := cdb.FetchAllLinkNodes() linkNodes, err := cdb.linkNodeDB.FetchAllLinkNodes()
if err != nil { if err != nil {
t.Fatalf("unable to fetch nodes: %v", err) t.Fatalf("unable to fetch nodes: %v", err)
} }
@@ -82,7 +82,7 @@ func TestLinkNodeEncodeDecode(t *testing.T) {
} }
// Fetch the same node from the database according to its public key. // Fetch the same node from the database according to its public key.
node1DB, err := cdb.FetchLinkNode(pub1) node1DB, err := cdb.linkNodeDB.FetchLinkNode(pub1)
if err != nil { if err != nil {
t.Fatalf("unable to find node: %v", err) t.Fatalf("unable to find node: %v", err)
} }
@@ -121,20 +121,20 @@ func TestDeleteLinkNode(t *testing.T) {
IP: net.ParseIP("127.0.0.1"), IP: net.ParseIP("127.0.0.1"),
Port: 1337, Port: 1337,
} }
linkNode := cdb.NewLinkNode(wire.TestNet3, pubKey, addr) linkNode := NewLinkNode(cdb.linkNodeDB, wire.TestNet3, pubKey, addr)
if err := linkNode.Sync(); err != nil { if err := linkNode.Sync(); err != nil {
t.Fatalf("unable to write link node to db: %v", err) t.Fatalf("unable to write link node to db: %v", err)
} }
if _, err := cdb.FetchLinkNode(pubKey); err != nil { if _, err := cdb.linkNodeDB.FetchLinkNode(pubKey); err != nil {
t.Fatalf("unable to find link node: %v", err) t.Fatalf("unable to find link node: %v", err)
} }
if err := cdb.DeleteLinkNode(pubKey); err != nil { if err := cdb.linkNodeDB.DeleteLinkNode(pubKey); err != nil {
t.Fatalf("unable to delete link node from db: %v", err) t.Fatalf("unable to delete link node from db: %v", err)
} }
if _, err := cdb.FetchLinkNode(pubKey); err == nil { if _, err := cdb.linkNodeDB.FetchLinkNode(pubKey); err == nil {
t.Fatal("should not have found link node in db, but did") t.Fatal("should not have found link node in db, but did")
} }
} }

View File

@@ -2527,7 +2527,7 @@ func (s *server) establishPersistentConnections() error {
// Iterate through the list of LinkNodes to find addresses we should // Iterate through the list of LinkNodes to find addresses we should
// attempt to connect to based on our set of previous connections. Set // attempt to connect to based on our set of previous connections. Set
// the reconnection port to the default peer port. // the reconnection port to the default peer port.
linkNodes, err := s.chanStateDB.FetchAllLinkNodes() linkNodes, err := s.chanStateDB.LinkNodeDB().FetchAllLinkNodes()
if err != nil && err != channeldb.ErrLinkNodesNotFound { if err != nil && err != channeldb.ErrLinkNodesNotFound {
return err return err
} }