multi: move all channelstate operations to ChannelStateDB

This commit is contained in:
Andras Banki-Horvath
2021-09-21 19:18:17 +02:00
committed by Oliver Gugger
parent ddea833d31
commit 11cf4216e4
35 changed files with 377 additions and 238 deletions

View File

@@ -222,8 +222,8 @@ var (
type DB struct {
kvdb.Backend
// linkNodeDB separates all DB operations on LinkNodes.
linkNodeDB *LinkNodeDB
// channelStateDB separates all DB operations on channel state.
channelStateDB *ChannelStateDB
dbPath string
graph *ChannelGraph
@@ -273,13 +273,19 @@ func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB,
chanDB := &DB{
Backend: backend,
linkNodeDB: &LinkNodeDB{
channelStateDB: &ChannelStateDB{
linkNodeDB: &LinkNodeDB{
backend: backend,
},
backend: backend,
},
clock: opts.clock,
dryRun: opts.dryRun,
}
// Set the parent pointer (only used in tests).
chanDB.channelStateDB.parent = chanDB
chanDB.graph = newChannelGraph(
backend, opts.RejectCacheSize, opts.ChannelCacheSize,
opts.BatchCommitInterval,
@@ -339,10 +345,10 @@ func (d *DB) Wipe() error {
return initChannelDB(d.Backend)
}
// createChannelDB creates and initializes a fresh version of channeldb. In
// the case that the target path has not yet been created or doesn't yet exist,
// then the path is created. Additionally, all required top-level buckets used
// within the database are created.
// initChannelDB creates and initializes a fresh version of channeldb. In the
// case that the target path has not yet been created or doesn't yet exist, then
// the path is created. Additionally, all required top-level buckets used within
// the database are created.
func initChannelDB(db kvdb.Backend) error {
err := kvdb.Update(db, func(tx kvdb.RwTx) error {
meta := &Meta{}
@@ -409,15 +415,45 @@ func fileExists(path string) bool {
return true
}
// ChannelStateDB is a database that keeps track of all channel state.
type ChannelStateDB struct {
// linkNodeDB separates all DB operations on LinkNodes.
linkNodeDB *LinkNodeDB
// parent holds a pointer to the "main" channeldb.DB object. This is
// only used for testing and should never be used in production code.
// For testing use the ChannelStateDB.GetParentDB() function to retrieve
// this pointer.
parent *DB
// backend points to the actual backend holding the channel state
// database. This may be a real backend or a cache middleware.
backend kvdb.Backend
}
// GetParentDB returns the "main" channeldb.DB object that is the owner of this
// ChannelStateDB instance. Use this function only in tests where passing around
// pointers makes testing less readable. Never to be used in production code!
func (c *ChannelStateDB) GetParentDB() *DB {
return c.parent
}
// LinkNodeDB returns the current instance of the link node database.
func (c *ChannelStateDB) LinkNodeDB() *LinkNodeDB {
return c.linkNodeDB
}
// FetchOpenChannels starts a new database transaction and returns all stored
// currently active/open channels associated with the target nodeID. In the case
// that no active channels are known to have been created with this node, then a
// zero-length slice is returned.
func (d *DB) FetchOpenChannels(nodeID *btcec.PublicKey) ([]*OpenChannel, error) {
func (c *ChannelStateDB) FetchOpenChannels(nodeID *btcec.PublicKey) (
[]*OpenChannel, error) {
var channels []*OpenChannel
err := kvdb.View(d, func(tx kvdb.RTx) error {
err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
var err error
channels, err = d.fetchOpenChannels(tx, nodeID)
channels, err = c.fetchOpenChannels(tx, nodeID)
return err
}, func() {
channels = nil
@@ -430,7 +466,7 @@ func (d *DB) FetchOpenChannels(nodeID *btcec.PublicKey) ([]*OpenChannel, error)
// stored currently active/open channels associated with the target nodeID. In
// the case that no active channels are known to have been created with this
// node, then a zero-length slice is returned.
func (d *DB) fetchOpenChannels(tx kvdb.RTx,
func (c *ChannelStateDB) fetchOpenChannels(tx kvdb.RTx,
nodeID *btcec.PublicKey) ([]*OpenChannel, error) {
// Get the bucket dedicated to storing the metadata for open channels.
@@ -466,7 +502,7 @@ func (d *DB) fetchOpenChannels(tx kvdb.RTx,
// Finally, we both of the necessary buckets retrieved, fetch
// all the active channels related to this node.
nodeChannels, err := d.fetchNodeChannels(chainBucket)
nodeChannels, err := c.fetchNodeChannels(chainBucket)
if err != nil {
return fmt.Errorf("unable to read channel for "+
"chain_hash=%x, node_key=%x: %v",
@@ -483,7 +519,8 @@ func (d *DB) fetchOpenChannels(tx kvdb.RTx,
// fetchNodeChannels retrieves all active channels from the target chainBucket
// which is under a node's dedicated channel bucket. This function is typically
// used to fetch all the active channels related to a particular node.
func (d *DB) fetchNodeChannels(chainBucket kvdb.RBucket) ([]*OpenChannel, error) {
func (c *ChannelStateDB) fetchNodeChannels(chainBucket kvdb.RBucket) (
[]*OpenChannel, error) {
var channels []*OpenChannel
@@ -509,7 +546,7 @@ func (d *DB) fetchNodeChannels(chainBucket kvdb.RBucket) ([]*OpenChannel, error)
return fmt.Errorf("unable to read channel data for "+
"chan_point=%v: %v", outPoint, err)
}
oChannel.Db = d
oChannel.Db = c
channels = append(channels, oChannel)
@@ -526,8 +563,8 @@ func (d *DB) fetchNodeChannels(chainBucket kvdb.RBucket) ([]*OpenChannel, error)
// point. If the channel cannot be found, then an error will be returned.
// Optionally an existing db tx can be supplied. Optionally an existing db tx
// can be supplied.
func (d *DB) FetchChannel(tx kvdb.RTx, chanPoint wire.OutPoint) (*OpenChannel,
error) {
func (c *ChannelStateDB) FetchChannel(tx kvdb.RTx, chanPoint wire.OutPoint) (
*OpenChannel, error) {
var (
targetChan *OpenChannel
@@ -603,7 +640,7 @@ func (d *DB) FetchChannel(tx kvdb.RTx, chanPoint wire.OutPoint) (*OpenChannel,
}
targetChan = channel
targetChan.Db = d
targetChan.Db = c
return nil
})
@@ -612,7 +649,7 @@ func (d *DB) FetchChannel(tx kvdb.RTx, chanPoint wire.OutPoint) (*OpenChannel,
var err error
if tx == nil {
err = kvdb.View(d, chanScan, func() {})
err = kvdb.View(c.backend, chanScan, func() {})
} else {
err = chanScan(tx)
}
@@ -632,16 +669,16 @@ func (d *DB) FetchChannel(tx kvdb.RTx, chanPoint wire.OutPoint) (*OpenChannel,
// FetchAllChannels attempts to retrieve all open channels currently stored
// within the database, including pending open, fully open and channels waiting
// for a closing transaction to confirm.
func (d *DB) FetchAllChannels() ([]*OpenChannel, error) {
return fetchChannels(d)
func (c *ChannelStateDB) FetchAllChannels() ([]*OpenChannel, error) {
return fetchChannels(c)
}
// FetchAllOpenChannels will return all channels that have the funding
// transaction confirmed, and is not waiting for a closing transaction to be
// confirmed.
func (d *DB) FetchAllOpenChannels() ([]*OpenChannel, error) {
func (c *ChannelStateDB) FetchAllOpenChannels() ([]*OpenChannel, error) {
return fetchChannels(
d,
c,
pendingChannelFilter(false),
waitingCloseFilter(false),
)
@@ -650,8 +687,8 @@ func (d *DB) FetchAllOpenChannels() ([]*OpenChannel, error) {
// FetchPendingChannels will return channels that have completed the process of
// generating and broadcasting funding transactions, but whose funding
// transactions have yet to be confirmed on the blockchain.
func (d *DB) FetchPendingChannels() ([]*OpenChannel, error) {
return fetchChannels(d,
func (c *ChannelStateDB) FetchPendingChannels() ([]*OpenChannel, error) {
return fetchChannels(c,
pendingChannelFilter(true),
waitingCloseFilter(false),
)
@@ -661,9 +698,9 @@ func (d *DB) FetchPendingChannels() ([]*OpenChannel, error) {
// but are now waiting for a closing transaction to be confirmed.
//
// NOTE: This includes channels that are also pending to be opened.
func (d *DB) FetchWaitingCloseChannels() ([]*OpenChannel, error) {
func (c *ChannelStateDB) FetchWaitingCloseChannels() ([]*OpenChannel, error) {
return fetchChannels(
d, waitingCloseFilter(true),
c, waitingCloseFilter(true),
)
}
@@ -704,10 +741,12 @@ func waitingCloseFilter(waitingClose bool) fetchChannelsFilter {
// which have a true value returned for *all* of the filters will be returned.
// If no filters are provided, every channel in the open channels bucket will
// be returned.
func fetchChannels(d *DB, filters ...fetchChannelsFilter) ([]*OpenChannel, error) {
func fetchChannels(c *ChannelStateDB, filters ...fetchChannelsFilter) (
[]*OpenChannel, error) {
var channels []*OpenChannel
err := kvdb.View(d, func(tx kvdb.RTx) error {
err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
// Get the bucket dedicated to storing the metadata for open
// channels.
openChanBucket := tx.ReadBucket(openChannelBucket)
@@ -749,7 +788,7 @@ func fetchChannels(d *DB, filters ...fetchChannelsFilter) ([]*OpenChannel, error
"bucket for chain=%x", chainHash[:])
}
nodeChans, err := d.fetchNodeChannels(chainBucket)
nodeChans, err := c.fetchNodeChannels(chainBucket)
if err != nil {
return fmt.Errorf("unable to read "+
"channel for chain_hash=%x, "+
@@ -798,10 +837,12 @@ func fetchChannels(d *DB, filters ...fetchChannelsFilter) ([]*OpenChannel, error
// it becomes fully closed after a single confirmation. When a channel was
// forcibly closed, it will become fully closed after _all_ the pending funds
// (if any) have been swept.
func (d *DB) FetchClosedChannels(pendingOnly bool) ([]*ChannelCloseSummary, error) {
func (c *ChannelStateDB) FetchClosedChannels(pendingOnly bool) (
[]*ChannelCloseSummary, error) {
var chanSummaries []*ChannelCloseSummary
if err := kvdb.View(d, func(tx kvdb.RTx) error {
if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
closeBucket := tx.ReadBucket(closedChannelBucket)
if closeBucket == nil {
return ErrNoClosedChannels
@@ -839,9 +880,11 @@ var ErrClosedChannelNotFound = errors.New("unable to find closed channel summary
// FetchClosedChannel queries for a channel close summary using the channel
// point of the channel in question.
func (d *DB) FetchClosedChannel(chanID *wire.OutPoint) (*ChannelCloseSummary, error) {
func (c *ChannelStateDB) FetchClosedChannel(chanID *wire.OutPoint) (
*ChannelCloseSummary, error) {
var chanSummary *ChannelCloseSummary
if err := kvdb.View(d, func(tx kvdb.RTx) error {
if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
closeBucket := tx.ReadBucket(closedChannelBucket)
if closeBucket == nil {
return ErrClosedChannelNotFound
@@ -873,11 +916,11 @@ func (d *DB) FetchClosedChannel(chanID *wire.OutPoint) (*ChannelCloseSummary, er
// FetchClosedChannelForID queries for a channel close summary using the
// channel ID of the channel in question.
func (d *DB) FetchClosedChannelForID(cid lnwire.ChannelID) (
func (c *ChannelStateDB) FetchClosedChannelForID(cid lnwire.ChannelID) (
*ChannelCloseSummary, error) {
var chanSummary *ChannelCloseSummary
if err := kvdb.View(d, func(tx kvdb.RTx) error {
if err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
closeBucket := tx.ReadBucket(closedChannelBucket)
if closeBucket == nil {
return ErrClosedChannelNotFound
@@ -926,12 +969,12 @@ func (d *DB) FetchClosedChannelForID(cid lnwire.ChannelID) (
// cooperatively closed and it's reached a single confirmation, or after all
// the pending funds in a channel that has been forcibly closed have been
// swept.
func (d *DB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
func (c *ChannelStateDB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
var (
openChannels []*OpenChannel
pruneLinkNode *btcec.PublicKey
)
err := kvdb.Update(d, func(tx kvdb.RwTx) error {
err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
var b bytes.Buffer
if err := writeOutpoint(&b, chanPoint); err != nil {
return err
@@ -978,7 +1021,9 @@ func (d *DB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
// garbage collect it to ensure we don't establish persistent
// connections to peers without open channels.
pruneLinkNode = chanSummary.RemotePub
openChannels, err = d.fetchOpenChannels(tx, pruneLinkNode)
openChannels, err = c.fetchOpenChannels(
tx, pruneLinkNode,
)
if err != nil {
return fmt.Errorf("unable to fetch open channels for "+
"peer %x: %v",
@@ -996,13 +1041,13 @@ func (d *DB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error {
// Decide whether we want to remove the link node, based upon the number
// of still open channels.
return d.pruneLinkNode(openChannels, pruneLinkNode)
return c.pruneLinkNode(openChannels, pruneLinkNode)
}
// pruneLinkNode determines whether we should garbage collect a link node from
// the database due to no longer having any open channels with it. If there are
// any left, then this acts as a no-op.
func (d *DB) pruneLinkNode(openChannels []*OpenChannel,
func (c *ChannelStateDB) pruneLinkNode(openChannels []*OpenChannel,
remotePub *btcec.PublicKey) error {
if len(openChannels) > 0 {
@@ -1012,13 +1057,13 @@ func (d *DB) pruneLinkNode(openChannels []*OpenChannel,
log.Infof("Pruning link node %x with zero open channels from database",
remotePub.SerializeCompressed())
return d.linkNodeDB.DeleteLinkNode(remotePub)
return c.linkNodeDB.DeleteLinkNode(remotePub)
}
// PruneLinkNodes attempts to prune all link nodes found within the databse with
// whom we no longer have any open channels with.
func (d *DB) PruneLinkNodes() error {
allLinkNodes, err := d.linkNodeDB.FetchAllLinkNodes()
func (c *ChannelStateDB) PruneLinkNodes() error {
allLinkNodes, err := c.linkNodeDB.FetchAllLinkNodes()
if err != nil {
return err
}
@@ -1028,9 +1073,9 @@ func (d *DB) PruneLinkNodes() error {
openChannels []*OpenChannel
linkNode = linkNode
)
err := kvdb.View(d, func(tx kvdb.RTx) error {
err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
var err error
openChannels, err = d.fetchOpenChannels(
openChannels, err = c.fetchOpenChannels(
tx, linkNode.IdentityPub,
)
return err
@@ -1041,7 +1086,7 @@ func (d *DB) PruneLinkNodes() error {
return err
}
err = d.pruneLinkNode(openChannels, linkNode.IdentityPub)
err = c.pruneLinkNode(openChannels, linkNode.IdentityPub)
if err != nil {
return err
}
@@ -1069,8 +1114,8 @@ type ChannelShell struct {
// addresses, and finally create an edge within the graph for the channel as
// well. This method is idempotent, so repeated calls with the same set of
// channel shells won't modify the database after the initial call.
func (d *DB) RestoreChannelShells(channelShells ...*ChannelShell) error {
err := kvdb.Update(d, func(tx kvdb.RwTx) error {
func (c *ChannelStateDB) RestoreChannelShells(channelShells ...*ChannelShell) error {
err := kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
for _, channelShell := range channelShells {
channel := channelShell.Chan
@@ -1084,7 +1129,7 @@ func (d *DB) RestoreChannelShells(channelShells ...*ChannelShell) error {
// and link node for this channel. If the channel
// already exists, then in order to ensure this method
// is idempotent, we'll continue to the next step.
channel.Db = d
channel.Db = c
err := syncNewChannel(
tx, channel, channelShell.NodeAddrs,
)
@@ -1104,8 +1149,10 @@ func (d *DB) RestoreChannelShells(channelShells ...*ChannelShell) error {
// AddrsForNode consults the graph and channel database for all addresses known
// to the passed node public key.
func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr, error) {
linkNode, err := d.linkNodeDB.FetchLinkNode(nodePub)
func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr,
error) {
linkNode, err := d.channelStateDB.linkNodeDB.FetchLinkNode(nodePub)
if err != nil {
return nil, err
}
@@ -1157,16 +1204,18 @@ func (d *DB) AddrsForNode(nodePub *btcec.PublicKey) ([]net.Addr, error) {
// database. If the channel was already removed (has a closed channel entry),
// then we'll return a nil error. Otherwise, we'll insert a new close summary
// into the database.
func (d *DB) AbandonChannel(chanPoint *wire.OutPoint, bestHeight uint32) error {
func (c *ChannelStateDB) AbandonChannel(chanPoint *wire.OutPoint,
bestHeight uint32) error {
// With the chanPoint constructed, we'll attempt to find the target
// channel in the database. If we can't find the channel, then we'll
// return the error back to the caller.
dbChan, err := d.FetchChannel(nil, *chanPoint)
dbChan, err := c.FetchChannel(nil, *chanPoint)
switch {
// If the channel wasn't found, then it's possible that it was already
// abandoned from the database.
case err == ErrChannelNotFound:
_, closedErr := d.FetchClosedChannel(chanPoint)
_, closedErr := c.FetchClosedChannel(chanPoint)
if closedErr != nil {
return closedErr
}
@@ -1204,8 +1253,10 @@ func (d *DB) AbandonChannel(chanPoint *wire.OutPoint, bestHeight uint32) error {
// SaveChannelOpeningState saves the serialized channel state for the provided
// chanPoint to the channelOpeningStateBucket.
func (d *DB) SaveChannelOpeningState(outPoint, serializedState []byte) error {
return kvdb.Update(d, func(tx kvdb.RwTx) error {
func (c *ChannelStateDB) SaveChannelOpeningState(outPoint,
serializedState []byte) error {
return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
bucket, err := tx.CreateTopLevelBucket(channelOpeningStateBucket)
if err != nil {
return err
@@ -1218,9 +1269,9 @@ func (d *DB) SaveChannelOpeningState(outPoint, serializedState []byte) error {
// GetChannelOpeningState fetches the serialized channel state for the provided
// outPoint from the database, or returns ErrChannelNotFound if the channel
// is not found.
func (d *DB) GetChannelOpeningState(outPoint []byte) ([]byte, error) {
func (c *ChannelStateDB) GetChannelOpeningState(outPoint []byte) ([]byte, error) {
var serializedState []byte
err := kvdb.View(d, func(tx kvdb.RTx) error {
err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
bucket := tx.ReadBucket(channelOpeningStateBucket)
if bucket == nil {
// If the bucket does not exist, it means we never added
@@ -1241,8 +1292,8 @@ func (d *DB) GetChannelOpeningState(outPoint []byte) ([]byte, error) {
}
// DeleteChannelOpeningState removes any state for outPoint from the database.
func (d *DB) DeleteChannelOpeningState(outPoint []byte) error {
return kvdb.Update(d, func(tx kvdb.RwTx) error {
func (c *ChannelStateDB) DeleteChannelOpeningState(outPoint []byte) error {
return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
bucket := tx.ReadWriteBucket(channelOpeningStateBucket)
if bucket == nil {
return ErrChannelNotFound
@@ -1330,9 +1381,10 @@ func (d *DB) ChannelGraph() *ChannelGraph {
return d.graph
}
// LinkNodeDB returns the current instance of the link node database.
func (d *DB) LinkNodeDB() *LinkNodeDB {
return d.linkNodeDB
// ChannelStateDB returns the sub database that is concerned with the channel
// state.
func (d *DB) ChannelStateDB() *ChannelStateDB {
return d.channelStateDB
}
func getLatestDBVersion(versions []version) uint32 {
@@ -1384,9 +1436,11 @@ func fetchHistoricalChanBucket(tx kvdb.RTx,
// FetchHistoricalChannel fetches open channel data from the historical channel
// bucket.
func (d *DB) FetchHistoricalChannel(outPoint *wire.OutPoint) (*OpenChannel, error) {
func (c *ChannelStateDB) FetchHistoricalChannel(outPoint *wire.OutPoint) (
*OpenChannel, error) {
var channel *OpenChannel
err := kvdb.View(d, func(tx kvdb.RTx) error {
err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
chanBucket, err := fetchHistoricalChanBucket(tx, outPoint)
if err != nil {
return err
@@ -1394,7 +1448,7 @@ func (d *DB) FetchHistoricalChannel(outPoint *wire.OutPoint) (*OpenChannel, erro
channel, err = fetchOpenChannel(chanBucket, outPoint)
channel.Db = d
channel.Db = c
return err
}, func() {
channel = nil