Merge pull request #5642 from guggero/in-memory-graph

In-memory graph cache for faster pathfinding
This commit is contained in:
Oliver Gugger
2021-10-04 11:20:23 +02:00
committed by GitHub
65 changed files with 2595 additions and 1171 deletions

View File

@@ -23,7 +23,6 @@ import (
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/labels"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnrpc"
@@ -550,19 +549,6 @@ const (
addedToRouterGraph
)
var (
// channelOpeningStateBucket is the database bucket used to store the
// channelOpeningState for each channel that is currently in the process
// of being opened.
channelOpeningStateBucket = []byte("channelOpeningState")
// ErrChannelNotFound is an error returned when a channel is not known
// to us. In this case of the fundingManager, this error is returned
// when the channel in question is not considered being in an opening
// state.
ErrChannelNotFound = fmt.Errorf("channel not found")
)
// NewFundingManager creates and initializes a new instance of the
// fundingManager.
func NewFundingManager(cfg Config) (*Manager, error) {
@@ -887,7 +873,7 @@ func (f *Manager) advanceFundingState(channel *channeldb.OpenChannel,
channelState, shortChanID, err := f.getChannelOpeningState(
&channel.FundingOutpoint,
)
if err == ErrChannelNotFound {
if err == channeldb.ErrChannelNotFound {
// Channel not in fundingManager's opening database,
// meaning it was successfully announced to the
// network.
@@ -3551,26 +3537,20 @@ func copyPubKey(pub *btcec.PublicKey) *btcec.PublicKey {
// chanPoint to the channelOpeningStateBucket.
func (f *Manager) saveChannelOpeningState(chanPoint *wire.OutPoint,
state channelOpeningState, shortChanID *lnwire.ShortChannelID) error {
return kvdb.Update(f.cfg.Wallet.Cfg.Database, func(tx kvdb.RwTx) error {
bucket, err := tx.CreateTopLevelBucket(channelOpeningStateBucket)
if err != nil {
return err
}
var outpointBytes bytes.Buffer
if err := WriteOutpoint(&outpointBytes, chanPoint); err != nil {
return err
}
var outpointBytes bytes.Buffer
if err = WriteOutpoint(&outpointBytes, chanPoint); err != nil {
return err
}
// Save state and the uint64 representation of the shortChanID
// for later use.
scratch := make([]byte, 10)
byteOrder.PutUint16(scratch[:2], uint16(state))
byteOrder.PutUint64(scratch[2:], shortChanID.ToUint64())
return bucket.Put(outpointBytes.Bytes(), scratch)
}, func() {})
// Save state and the uint64 representation of the shortChanID
// for later use.
scratch := make([]byte, 10)
byteOrder.PutUint16(scratch[:2], uint16(state))
byteOrder.PutUint64(scratch[2:], shortChanID.ToUint64())
return f.cfg.Wallet.Cfg.Database.SaveChannelOpeningState(
outpointBytes.Bytes(), scratch,
)
}
// getChannelOpeningState fetches the channelOpeningState for the provided
@@ -3579,51 +3559,31 @@ func (f *Manager) saveChannelOpeningState(chanPoint *wire.OutPoint,
func (f *Manager) getChannelOpeningState(chanPoint *wire.OutPoint) (
channelOpeningState, *lnwire.ShortChannelID, error) {
var state channelOpeningState
var shortChanID lnwire.ShortChannelID
err := kvdb.View(f.cfg.Wallet.Cfg.Database, func(tx kvdb.RTx) error {
var outpointBytes bytes.Buffer
if err := WriteOutpoint(&outpointBytes, chanPoint); err != nil {
return 0, nil, err
}
bucket := tx.ReadBucket(channelOpeningStateBucket)
if bucket == nil {
// If the bucket does not exist, it means we never added
// a channel to the db, so return ErrChannelNotFound.
return ErrChannelNotFound
}
var outpointBytes bytes.Buffer
if err := WriteOutpoint(&outpointBytes, chanPoint); err != nil {
return err
}
value := bucket.Get(outpointBytes.Bytes())
if value == nil {
return ErrChannelNotFound
}
state = channelOpeningState(byteOrder.Uint16(value[:2]))
shortChanID = lnwire.NewShortChanIDFromInt(byteOrder.Uint64(value[2:]))
return nil
}, func() {})
value, err := f.cfg.Wallet.Cfg.Database.GetChannelOpeningState(
outpointBytes.Bytes(),
)
if err != nil {
return 0, nil, err
}
state := channelOpeningState(byteOrder.Uint16(value[:2]))
shortChanID := lnwire.NewShortChanIDFromInt(byteOrder.Uint64(value[2:]))
return state, &shortChanID, nil
}
// deleteChannelOpeningState removes any state for chanPoint from the database.
func (f *Manager) deleteChannelOpeningState(chanPoint *wire.OutPoint) error {
return kvdb.Update(f.cfg.Wallet.Cfg.Database, func(tx kvdb.RwTx) error {
bucket := tx.ReadWriteBucket(channelOpeningStateBucket)
if bucket == nil {
return fmt.Errorf("bucket not found")
}
var outpointBytes bytes.Buffer
if err := WriteOutpoint(&outpointBytes, chanPoint); err != nil {
return err
}
var outpointBytes bytes.Buffer
if err := WriteOutpoint(&outpointBytes, chanPoint); err != nil {
return err
}
return bucket.Delete(outpointBytes.Bytes())
}, func() {})
return f.cfg.Wallet.Cfg.Database.DeleteChannelOpeningState(
outpointBytes.Bytes(),
)
}

View File

@@ -262,7 +262,7 @@ func (n *testNode) AddNewChannel(channel *channeldb.OpenChannel,
}
}
func createTestWallet(cdb *channeldb.DB, netParams *chaincfg.Params,
func createTestWallet(cdb *channeldb.ChannelStateDB, netParams *chaincfg.Params,
notifier chainntnfs.ChainNotifier, wc lnwallet.WalletController,
signer input.Signer, keyRing keychain.SecretKeyRing,
bio lnwallet.BlockChainIO,
@@ -330,11 +330,13 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
}
dbDir := filepath.Join(tempTestDir, "cdb")
cdb, err := channeldb.Open(dbDir)
fullDB, err := channeldb.Open(dbDir)
if err != nil {
return nil, err
}
cdb := fullDB.ChannelStateDB()
keyRing := &mock.SecretKeyRing{
RootKey: alicePrivKey,
}
@@ -923,12 +925,12 @@ func assertDatabaseState(t *testing.T, node *testNode,
}
state, _, err = node.fundingMgr.getChannelOpeningState(
fundingOutPoint)
if err != nil && err != ErrChannelNotFound {
if err != nil && err != channeldb.ErrChannelNotFound {
t.Fatalf("unable to get channel state: %v", err)
}
// If we found the channel, check if it had the expected state.
if err != ErrChannelNotFound && state == expectedState {
if err != channeldb.ErrChannelNotFound && state == expectedState {
// Got expected state, return with success.
return
}
@@ -1166,7 +1168,7 @@ func assertErrChannelNotFound(t *testing.T, node *testNode,
}
state, _, err = node.fundingMgr.getChannelOpeningState(
fundingOutPoint)
if err == ErrChannelNotFound {
if err == channeldb.ErrChannelNotFound {
// Got expected state, return with success.
return
} else if err != nil {