channeldb: convert to uniformly use new kvdb abstractions

In this commit, we migrate all the code in `channeldb` to only reference
the new `kvdb` package rather than `bbolt` directly.

In many instances, we need to add two version to fetch a bucket as both
read and write when needed. As an example, we add a new
`fetchChanBucketRw` function. This function is identical to
`fetchChanBucket`, but it will be used to fetch the main channel bucket
for all _write_ transactions. We need a new method as you can pass a
write transaction where a read is accepted, but not the other way around
due to the stronger typing of the new `kvdb` package.
This commit is contained in:
Olaoluwa Osuntokun
2019-12-12 18:22:19 -08:00
parent fc808ac538
commit f0911765af
36 changed files with 804 additions and 752 deletions

View File

@@ -7,7 +7,7 @@ import (
"fmt"
"io"
"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
)
@@ -318,7 +318,7 @@ type SettleFailRef struct {
type SettleFailAcker interface {
// AckSettleFails atomically updates the settle-fail filters in *other*
// channels' forwarding packages.
AckSettleFails(tx *bbolt.Tx, settleFailRefs ...SettleFailRef) error
AckSettleFails(tx kvdb.RwTx, settleFailRefs ...SettleFailRef) error
}
// GlobalFwdPkgReader is an interface used to retrieve the forwarding packages
@@ -326,7 +326,7 @@ type SettleFailAcker interface {
type GlobalFwdPkgReader interface {
// LoadChannelFwdPkgs loads all known forwarding packages for the given
// channel.
LoadChannelFwdPkgs(tx *bbolt.Tx,
LoadChannelFwdPkgs(tx kvdb.RwTx,
source lnwire.ShortChannelID) ([]*FwdPkg, error)
}
@@ -357,14 +357,14 @@ func NewSwitchPackager() *SwitchPackager {
// AckSettleFails atomically updates the settle-fail filters in *other*
// channels' forwarding packages, to mark that the switch has received a settle
// or fail residing in the forwarding package of a link.
func (*SwitchPackager) AckSettleFails(tx *bbolt.Tx,
func (*SwitchPackager) AckSettleFails(tx kvdb.RwTx,
settleFailRefs ...SettleFailRef) error {
return ackSettleFails(tx, settleFailRefs)
}
// LoadChannelFwdPkgs loads all forwarding packages for a particular channel.
func (*SwitchPackager) LoadChannelFwdPkgs(tx *bbolt.Tx,
func (*SwitchPackager) LoadChannelFwdPkgs(tx kvdb.RwTx,
source lnwire.ShortChannelID) ([]*FwdPkg, error) {
return loadChannelFwdPkgs(tx, source)
@@ -376,19 +376,19 @@ func (*SwitchPackager) LoadChannelFwdPkgs(tx *bbolt.Tx,
type FwdPackager interface {
// AddFwdPkg serializes and writes a FwdPkg for this channel at the
// remote commitment height included in the forwarding package.
AddFwdPkg(tx *bbolt.Tx, fwdPkg *FwdPkg) error
AddFwdPkg(tx kvdb.RwTx, fwdPkg *FwdPkg) error
// SetFwdFilter looks up the forwarding package at the remote `height`
// and sets the `fwdFilter`, marking the Adds for which:
// 1) We are not the exit node
// 2) Passed all validation
// 3) Should be forwarded to the switch immediately after a failure
SetFwdFilter(tx *bbolt.Tx, height uint64, fwdFilter *PkgFilter) error
SetFwdFilter(tx kvdb.RwTx, height uint64, fwdFilter *PkgFilter) error
// AckAddHtlcs atomically updates the add filters in this channel's
// forwarding packages to mark the resolution of an Add that was
// received from the remote party.
AckAddHtlcs(tx *bbolt.Tx, addRefs ...AddRef) error
AckAddHtlcs(tx kvdb.RwTx, addRefs ...AddRef) error
// SettleFailAcker allows a link to acknowledge settle/fail HTLCs
// belonging to other channels.
@@ -396,11 +396,11 @@ type FwdPackager interface {
// LoadFwdPkgs loads all known forwarding packages owned by this
// channel.
LoadFwdPkgs(tx *bbolt.Tx) ([]*FwdPkg, error)
LoadFwdPkgs(tx kvdb.ReadTx) ([]*FwdPkg, error)
// RemovePkg deletes a forwarding package owned by this channel at
// the provided remote `height`.
RemovePkg(tx *bbolt.Tx, height uint64) error
RemovePkg(tx kvdb.RwTx, height uint64) error
}
// ChannelPackager is used by a channel to manage the lifecycle of its forwarding
@@ -420,8 +420,8 @@ func NewChannelPackager(source lnwire.ShortChannelID) *ChannelPackager {
}
// AddFwdPkg writes a newly locked in forwarding package to disk.
func (*ChannelPackager) AddFwdPkg(tx *bbolt.Tx, fwdPkg *FwdPkg) error {
fwdPkgBkt, err := tx.CreateBucketIfNotExists(fwdPackagesKey)
func (*ChannelPackager) AddFwdPkg(tx kvdb.RwTx, fwdPkg *FwdPkg) error {
fwdPkgBkt, err := tx.CreateTopLevelBucket(fwdPackagesKey)
if err != nil {
return err
}
@@ -485,7 +485,7 @@ func (*ChannelPackager) AddFwdPkg(tx *bbolt.Tx, fwdPkg *FwdPkg) error {
}
// putLogUpdate writes an htlc to the provided `bkt`, using `index` as the key.
func putLogUpdate(bkt *bbolt.Bucket, idx uint16, htlc *LogUpdate) error {
func putLogUpdate(bkt kvdb.RwBucket, idx uint16, htlc *LogUpdate) error {
var b bytes.Buffer
if err := htlc.Encode(&b); err != nil {
return err
@@ -497,19 +497,19 @@ func putLogUpdate(bkt *bbolt.Bucket, idx uint16, htlc *LogUpdate) error {
// LoadFwdPkgs scans the forwarding log for any packages that haven't been
// processed, and returns their deserialized log updates in a map indexed by the
// remote commitment height at which the updates were locked in.
func (p *ChannelPackager) LoadFwdPkgs(tx *bbolt.Tx) ([]*FwdPkg, error) {
func (p *ChannelPackager) LoadFwdPkgs(tx kvdb.ReadTx) ([]*FwdPkg, error) {
return loadChannelFwdPkgs(tx, p.source)
}
// loadChannelFwdPkgs loads all forwarding packages owned by `source`.
func loadChannelFwdPkgs(tx *bbolt.Tx, source lnwire.ShortChannelID) ([]*FwdPkg, error) {
fwdPkgBkt := tx.Bucket(fwdPackagesKey)
func loadChannelFwdPkgs(tx kvdb.ReadTx, source lnwire.ShortChannelID) ([]*FwdPkg, error) {
fwdPkgBkt := tx.ReadBucket(fwdPackagesKey)
if fwdPkgBkt == nil {
return nil, nil
}
sourceKey := makeLogKey(source.ToUint64())
sourceBkt := fwdPkgBkt.Bucket(sourceKey[:])
sourceBkt := fwdPkgBkt.NestedReadBucket(sourceKey[:])
if sourceBkt == nil {
return nil, nil
}
@@ -543,23 +543,23 @@ func loadChannelFwdPkgs(tx *bbolt.Tx, source lnwire.ShortChannelID) ([]*FwdPkg,
// loadFwPkg reads the packager's fwd pkg at a given height, and determines the
// appropriate FwdState.
func loadFwdPkg(fwdPkgBkt *bbolt.Bucket, source lnwire.ShortChannelID,
func loadFwdPkg(fwdPkgBkt kvdb.ReadBucket, source lnwire.ShortChannelID,
height uint64) (*FwdPkg, error) {
sourceKey := makeLogKey(source.ToUint64())
sourceBkt := fwdPkgBkt.Bucket(sourceKey[:])
sourceBkt := fwdPkgBkt.NestedReadBucket(sourceKey[:])
if sourceBkt == nil {
return nil, ErrCorruptedFwdPkg
}
heightKey := makeLogKey(height)
heightBkt := sourceBkt.Bucket(heightKey[:])
heightBkt := sourceBkt.NestedReadBucket(heightKey[:])
if heightBkt == nil {
return nil, ErrCorruptedFwdPkg
}
// Load ADDs from disk.
addBkt := heightBkt.Bucket(addBucketKey)
addBkt := heightBkt.NestedReadBucket(addBucketKey)
if addBkt == nil {
return nil, ErrCorruptedFwdPkg
}
@@ -582,7 +582,7 @@ func loadFwdPkg(fwdPkgBkt *bbolt.Bucket, source lnwire.ShortChannelID,
}
// Load SETTLE/FAILs from disk.
failSettleBkt := heightBkt.Bucket(failSettleBucketKey)
failSettleBkt := heightBkt.NestedReadBucket(failSettleBucketKey)
if failSettleBkt == nil {
return nil, ErrCorruptedFwdPkg
}
@@ -649,7 +649,7 @@ func loadFwdPkg(fwdPkgBkt *bbolt.Bucket, source lnwire.ShortChannelID,
// loadHtlcs retrieves all serialized htlcs in a bucket, returning
// them in order of the indexes they were written under.
func loadHtlcs(bkt *bbolt.Bucket) ([]LogUpdate, error) {
func loadHtlcs(bkt kvdb.ReadBucket) ([]LogUpdate, error) {
var htlcs []LogUpdate
if err := bkt.ForEach(func(_, v []byte) error {
var htlc LogUpdate
@@ -674,22 +674,22 @@ func loadHtlcs(bkt *bbolt.Bucket) ([]LogUpdate, error) {
// leaving this channel. After a restart, we skip validation of these Adds,
// since they are assumed to have already been validated, and make the switch or
// outgoing link responsible for handling replays.
func (p *ChannelPackager) SetFwdFilter(tx *bbolt.Tx, height uint64,
func (p *ChannelPackager) SetFwdFilter(tx kvdb.RwTx, height uint64,
fwdFilter *PkgFilter) error {
fwdPkgBkt := tx.Bucket(fwdPackagesKey)
fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
if fwdPkgBkt == nil {
return ErrCorruptedFwdPkg
}
source := makeLogKey(p.source.ToUint64())
sourceBkt := fwdPkgBkt.Bucket(source[:])
sourceBkt := fwdPkgBkt.NestedReadWriteBucket(source[:])
if sourceBkt == nil {
return ErrCorruptedFwdPkg
}
heightKey := makeLogKey(height)
heightBkt := sourceBkt.Bucket(heightKey[:])
heightBkt := sourceBkt.NestedReadWriteBucket(heightKey[:])
if heightBkt == nil {
return ErrCorruptedFwdPkg
}
@@ -713,18 +713,18 @@ func (p *ChannelPackager) SetFwdFilter(tx *bbolt.Tx, height uint64,
// AckAddHtlcs accepts a list of references to add htlcs, and updates the
// AckAddFilter of those forwarding packages to indicate that a settle or fail
// has been received in response to the add.
func (p *ChannelPackager) AckAddHtlcs(tx *bbolt.Tx, addRefs ...AddRef) error {
func (p *ChannelPackager) AckAddHtlcs(tx kvdb.RwTx, addRefs ...AddRef) error {
if len(addRefs) == 0 {
return nil
}
fwdPkgBkt := tx.Bucket(fwdPackagesKey)
fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
if fwdPkgBkt == nil {
return ErrCorruptedFwdPkg
}
sourceKey := makeLogKey(p.source.ToUint64())
sourceBkt := fwdPkgBkt.Bucket(sourceKey[:])
sourceBkt := fwdPkgBkt.NestedReadWriteBucket(sourceKey[:])
if sourceBkt == nil {
return ErrCorruptedFwdPkg
}
@@ -753,11 +753,11 @@ func (p *ChannelPackager) AckAddHtlcs(tx *bbolt.Tx, addRefs ...AddRef) error {
// ackAddHtlcsAtHeight updates the AddAckFilter of a single forwarding package
// with a list of indexes, writing the resulting filter back in its place.
func ackAddHtlcsAtHeight(sourceBkt *bbolt.Bucket, height uint64,
func ackAddHtlcsAtHeight(sourceBkt kvdb.RwBucket, height uint64,
indexes []uint16) error {
heightKey := makeLogKey(height)
heightBkt := sourceBkt.Bucket(heightKey[:])
heightBkt := sourceBkt.NestedReadWriteBucket(heightKey[:])
if heightBkt == nil {
// If the height bucket isn't found, this could be because the
// forwarding package was already removed. We'll return nil to
@@ -796,17 +796,17 @@ func ackAddHtlcsAtHeight(sourceBkt *bbolt.Bucket, height uint64,
// package. This should only be called after the source of the Add has locked in
// the settle/fail, or it becomes otherwise safe to forgo retransmitting the
// settle/fail after a restart.
func (p *ChannelPackager) AckSettleFails(tx *bbolt.Tx, settleFailRefs ...SettleFailRef) error {
func (p *ChannelPackager) AckSettleFails(tx kvdb.RwTx, settleFailRefs ...SettleFailRef) error {
return ackSettleFails(tx, settleFailRefs)
}
// ackSettleFails persistently acknowledges a batch of settle fail references.
func ackSettleFails(tx *bbolt.Tx, settleFailRefs []SettleFailRef) error {
func ackSettleFails(tx kvdb.RwTx, settleFailRefs []SettleFailRef) error {
if len(settleFailRefs) == 0 {
return nil
}
fwdPkgBkt := tx.Bucket(fwdPackagesKey)
fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
if fwdPkgBkt == nil {
return ErrCorruptedFwdPkg
}
@@ -832,7 +832,7 @@ func ackSettleFails(tx *bbolt.Tx, settleFailRefs []SettleFailRef) error {
// settle/fail htlcs.
for dest, destHeights := range destHeightDiffs {
destKey := makeLogKey(dest.ToUint64())
destBkt := fwdPkgBkt.Bucket(destKey[:])
destBkt := fwdPkgBkt.NestedReadWriteBucket(destKey[:])
if destBkt == nil {
// If the destination bucket is not found, this is
// likely the result of the destination channel being
@@ -855,11 +855,11 @@ func ackSettleFails(tx *bbolt.Tx, settleFailRefs []SettleFailRef) error {
// ackSettleFailsAtHeight given a destination bucket, acks the provided indexes
// at particular a height by updating the settle fail filter.
func ackSettleFailsAtHeight(destBkt *bbolt.Bucket, height uint64,
func ackSettleFailsAtHeight(destBkt kvdb.RwBucket, height uint64,
indexes []uint16) error {
heightKey := makeLogKey(height)
heightBkt := destBkt.Bucket(heightKey[:])
heightBkt := destBkt.NestedReadWriteBucket(heightKey[:])
if heightBkt == nil {
// If the height bucket isn't found, this could be because the
// forwarding package was already removed. We'll return nil to
@@ -895,21 +895,21 @@ func ackSettleFailsAtHeight(destBkt *bbolt.Bucket, height uint64,
// RemovePkg deletes the forwarding package at the given height from the
// packager's source bucket.
func (p *ChannelPackager) RemovePkg(tx *bbolt.Tx, height uint64) error {
fwdPkgBkt := tx.Bucket(fwdPackagesKey)
func (p *ChannelPackager) RemovePkg(tx kvdb.RwTx, height uint64) error {
fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey)
if fwdPkgBkt == nil {
return nil
}
sourceBytes := makeLogKey(p.source.ToUint64())
sourceBkt := fwdPkgBkt.Bucket(sourceBytes[:])
sourceBkt := fwdPkgBkt.NestedReadWriteBucket(sourceBytes[:])
if sourceBkt == nil {
return ErrCorruptedFwdPkg
}
heightKey := makeLogKey(height)
return sourceBkt.DeleteBucket(heightKey[:])
return sourceBkt.DeleteNestedBucket(heightKey[:])
}
// uint16Key writes the provided 16-bit unsigned integer to a 2-byte slice.