From 3f775778c391af533885574f2ed0cef80a68f978 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 23 Aug 2021 12:16:37 +0200 Subject: [PATCH 1/2] channeldb+routing: add tx parameter Adds an optional tx parameter to ForAllOutgoingChannels and FetchChannel so that data can be queried within the context of an existing database transaction. --- chanbackup/backup.go | 8 +++++--- chanbackup/backup_test.go | 5 ++++- channeldb/db.go | 15 ++++++++++++--- channeldb/db_test.go | 8 ++++---- channelnotifier/channelnotifier.go | 2 +- contractcourt/chain_arbitrator.go | 4 ++-- discovery/gossiper.go | 2 ++ discovery/gossiper_test.go | 9 +++++++-- pilot.go | 2 +- routing/localchans/manager.go | 14 +++++++++----- routing/localchans/manager_test.go | 9 ++++++--- routing/router.go | 12 +++++++----- rpcserver.go | 4 ++-- 13 files changed, 62 insertions(+), 32 deletions(-) diff --git a/chanbackup/backup.go b/chanbackup/backup.go index b956c6fa9..076e2fcf6 100644 --- a/chanbackup/backup.go +++ b/chanbackup/backup.go @@ -7,6 +7,7 @@ import ( "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/kvdb" ) // LiveChannelSource is an interface that allows us to query for the set of @@ -17,8 +18,9 @@ type LiveChannelSource interface { FetchAllChannels() ([]*channeldb.OpenChannel, error) // FetchChannel attempts to locate a live channel identified by the - // passed chanPoint. - FetchChannel(chanPoint wire.OutPoint) (*channeldb.OpenChannel, error) + // passed chanPoint. Optionally an existing db tx can be supplied. + FetchChannel(tx kvdb.RTx, chanPoint wire.OutPoint) ( + *channeldb.OpenChannel, error) // AddrsForNode returns all known addresses for the target node public // key. @@ -55,7 +57,7 @@ func FetchBackupForChan(chanPoint wire.OutPoint, // First, we'll query the channel source to see if the channel is known // and open within the database. - targetChan, err := chanSource.FetchChannel(chanPoint) + targetChan, err := chanSource.FetchChannel(nil, chanPoint) if err != nil { // If we can't find the channel, then we return with an error, // as we have nothing to backup. diff --git a/chanbackup/backup_test.go b/chanbackup/backup_test.go index ea8bfceac..e718dce3e 100644 --- a/chanbackup/backup_test.go +++ b/chanbackup/backup_test.go @@ -8,6 +8,7 @@ import ( "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/kvdb" ) type mockChannelSource struct { @@ -38,7 +39,9 @@ func (m *mockChannelSource) FetchAllChannels() ([]*channeldb.OpenChannel, error) return chans, nil } -func (m *mockChannelSource) FetchChannel(chanPoint wire.OutPoint) (*channeldb.OpenChannel, error) { +func (m *mockChannelSource) FetchChannel(_ kvdb.RTx, chanPoint wire.OutPoint) ( + *channeldb.OpenChannel, error) { + if m.failQuery { return nil, fmt.Errorf("fail") } diff --git a/channeldb/db.go b/channeldb/db.go index e1a29dc1c..530c819a1 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -501,7 +501,11 @@ func (d *DB) fetchNodeChannels(chainBucket kvdb.RBucket) ([]*OpenChannel, error) // FetchChannel attempts to locate a channel specified by the passed channel // point. If the channel cannot be found, then an error will be returned. -func (d *DB) FetchChannel(chanPoint wire.OutPoint) (*OpenChannel, error) { +// Optionally an existing db tx can be supplied. Optionally an existing db tx +// can be supplied. +func (d *DB) FetchChannel(tx kvdb.RTx, chanPoint wire.OutPoint) (*OpenChannel, + error) { + var ( targetChan *OpenChannel targetChanPoint bytes.Buffer @@ -583,7 +587,12 @@ func (d *DB) FetchChannel(chanPoint wire.OutPoint) (*OpenChannel, error) { }) } - err := kvdb.View(d, chanScan, func() {}) + var err error + if tx == nil { + err = kvdb.View(d, chanScan, func() {}) + } else { + err = chanScan(tx) + } if err != nil { return nil, err } @@ -1102,7 +1111,7 @@ func (d *DB) AbandonChannel(chanPoint *wire.OutPoint, bestHeight uint32) error { // With the chanPoint constructed, we'll attempt to find the target // channel in the database. If we can't find the channel, then we'll // return the error back to the caller. - dbChan, err := d.FetchChannel(*chanPoint) + dbChan, err := d.FetchChannel(nil, *chanPoint) switch { // If the channel wasn't found, then it's possible that it was already // abandoned from the database. diff --git a/channeldb/db_test.go b/channeldb/db_test.go index ed357e5e4..60e82e680 100644 --- a/channeldb/db_test.go +++ b/channeldb/db_test.go @@ -255,7 +255,7 @@ func TestFetchChannel(t *testing.T) { channelState := createTestChannel(t, cdb, openChannelOption()) // Next, attempt to fetch the channel by its chan point. - dbChannel, err := cdb.FetchChannel(channelState.FundingOutpoint) + dbChannel, err := cdb.FetchChannel(nil, channelState.FundingOutpoint) if err != nil { t.Fatalf("unable to fetch channel: %v", err) } @@ -275,7 +275,7 @@ func TestFetchChannel(t *testing.T) { } channelState2.FundingOutpoint.Index ^= 1 - _, err = cdb.FetchChannel(channelState2.FundingOutpoint) + _, err = cdb.FetchChannel(nil, channelState2.FundingOutpoint) if err == nil { t.Fatalf("expected query to fail") } @@ -416,7 +416,7 @@ func TestRestoreChannelShells(t *testing.T) { // We should also be able to find the channel if we query for it // directly. - _, err = cdb.FetchChannel(channelShell.Chan.FundingOutpoint) + _, err = cdb.FetchChannel(nil, channelShell.Chan.FundingOutpoint) if err != nil { t.Fatalf("unable to fetch channel: %v", err) } @@ -470,7 +470,7 @@ func TestAbandonChannel(t *testing.T) { // At this point, the channel should no longer be found in the set of // open channels. - _, err = cdb.FetchChannel(chanState.FundingOutpoint) + _, err = cdb.FetchChannel(nil, chanState.FundingOutpoint) if err != ErrChannelNotFound { t.Fatalf("channel should not have been found: %v", err) } diff --git a/channelnotifier/channelnotifier.go b/channelnotifier/channelnotifier.go index df062c437..b99d18d40 100644 --- a/channelnotifier/channelnotifier.go +++ b/channelnotifier/channelnotifier.go @@ -134,7 +134,7 @@ func (c *ChannelNotifier) NotifyPendingOpenChannelEvent(chanPoint wire.OutPoint, // channel has gone from pending open to open. func (c *ChannelNotifier) NotifyOpenChannelEvent(chanPoint wire.OutPoint) { // Fetch the relevant channel from the database. - channel, err := c.chanDB.FetchChannel(chanPoint) + channel, err := c.chanDB.FetchChannel(nil, chanPoint) if err != nil { log.Warnf("Unable to fetch open channel from the db: %v", err) } diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index f653ebcad..2ddc9c8cd 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -258,7 +258,7 @@ func (a *arbChannel) NewAnchorResolutions() (*lnwallet.AnchorResolutions, // same instance that is used by the link. chanPoint := a.channel.FundingOutpoint - channel, err := a.c.chanSource.FetchChannel(chanPoint) + channel, err := a.c.chanSource.FetchChannel(nil, chanPoint) if err != nil { return nil, err } @@ -301,7 +301,7 @@ func (a *arbChannel) ForceCloseChan() (*lnwallet.LocalForceCloseSummary, error) // Now that we know the link can't mutate the channel // state, we'll read the channel from disk the target // channel according to its channel point. - channel, err := a.c.chanSource.FetchChannel(chanPoint) + channel, err := a.c.chanSource.FetchChannel(nil, chanPoint) if err != nil { return nil, err } diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 66174c05a..d7f2fd5fc 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -15,6 +15,7 @@ import ( "github.com/lightningnetwork/lnd/batch" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" @@ -1210,6 +1211,7 @@ func (d *AuthenticatedGossiper) retransmitStaleAnns(now time.Time) error { edgesToUpdate []updateTuple ) err := d.cfg.Router.ForAllOutgoingChannels(func( + _ kvdb.RTx, info *channeldb.ChannelEdgeInfo, edge *channeldb.ChannelEdgePolicy) error { diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index c12b9b532..55563018c 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -25,6 +25,7 @@ import ( "github.com/lightningnetwork/lnd/batch" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lntest/mock" "github.com/lightningnetwork/lnd/lntest/wait" @@ -200,7 +201,8 @@ func (r *mockGraphSource) ForEachNode(func(node *channeldb.LightningNode) error) return nil } -func (r *mockGraphSource) ForAllOutgoingChannels(cb func(i *channeldb.ChannelEdgeInfo, +func (r *mockGraphSource) ForAllOutgoingChannels(cb func(tx kvdb.RTx, + i *channeldb.ChannelEdgeInfo, c *channeldb.ChannelEdgePolicy) error) error { r.mu.Lock() @@ -223,7 +225,9 @@ func (r *mockGraphSource) ForAllOutgoingChannels(cb func(i *channeldb.ChannelEdg } for _, channel := range chans { - cb(channel.Info, channel.Policy1) + if err := cb(nil, channel.Info, channel.Policy1); err != nil { + return err + } } return nil @@ -3568,6 +3572,7 @@ out: const newTimeLockDelta = 100 var edgesToUpdate []EdgeWithInfo err = ctx.router.ForAllOutgoingChannels(func( + _ kvdb.RTx, info *channeldb.ChannelEdgeInfo, edge *channeldb.ChannelEdgePolicy) error { diff --git a/pilot.go b/pilot.go index 13f500f56..a9ac84777 100644 --- a/pilot.go +++ b/pilot.go @@ -282,7 +282,7 @@ func initAutoPilot(svr *server, cfg *lncfg.AutoPilot, ChannelInfo: func(chanPoint wire.OutPoint) ( *autopilot.LocalChannel, error) { - channel, err := svr.chanStateDB.FetchChannel(chanPoint) + channel, err := svr.chanStateDB.FetchChannel(nil, chanPoint) if err != nil { return nil, err } diff --git a/routing/localchans/manager.go b/routing/localchans/manager.go index 413f86433..b88652e89 100644 --- a/routing/localchans/manager.go +++ b/routing/localchans/manager.go @@ -8,6 +8,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/discovery" "github.com/lightningnetwork/lnd/htlcswitch" + "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing" ) @@ -27,12 +28,14 @@ type Manager struct { // ForAllOutgoingChannels is required to iterate over all our local // channels. - ForAllOutgoingChannels func(cb func(*channeldb.ChannelEdgeInfo, + ForAllOutgoingChannels func(cb func(kvdb.RTx, + *channeldb.ChannelEdgeInfo, *channeldb.ChannelEdgePolicy) error) error - // FetchChannel is used to query local channel parameters. - FetchChannel func(chanPoint wire.OutPoint) (*channeldb.OpenChannel, - error) + // FetchChannel is used to query local channel parameters. Optionally an + // existing db tx can be supplied. + FetchChannel func(tx kvdb.RTx, chanPoint wire.OutPoint) ( + *channeldb.OpenChannel, error) // policyUpdateLock ensures that the database and the link do not fall // out of sync if there are concurrent fee update calls. Without it, @@ -66,6 +69,7 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy, // If we have a filter then we'll only collected those channels, // otherwise we'll collect them all. err := r.ForAllOutgoingChannels(func( + tx kvdb.RTx, info *channeldb.ChannelEdgeInfo, edge *channeldb.ChannelEdgePolicy) error { @@ -197,7 +201,7 @@ func (r *Manager) updateEdge(chanPoint wire.OutPoint, func (r *Manager) getHtlcAmtLimits(chanPoint wire.OutPoint) ( lnwire.MilliSatoshi, lnwire.MilliSatoshi, error) { - ch, err := r.FetchChannel(chanPoint) + ch, err := r.FetchChannel(nil, chanPoint) if err != nil { return 0, 0, err } diff --git a/routing/localchans/manager_test.go b/routing/localchans/manager_test.go index 7ad5128ce..05c97a845 100644 --- a/routing/localchans/manager_test.go +++ b/routing/localchans/manager_test.go @@ -3,6 +3,7 @@ package localchans import ( "testing" + "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnwire" "github.com/btcsuite/btcd/chaincfg/chainhash" @@ -88,10 +89,12 @@ func TestManager(t *testing.T) { return nil } - forAllOutgoingChannels := func(cb func(*channeldb.ChannelEdgeInfo, + forAllOutgoingChannels := func(cb func(kvdb.RTx, + *channeldb.ChannelEdgeInfo, *channeldb.ChannelEdgePolicy) error) error { return cb( + nil, &channeldb.ChannelEdgeInfo{ Capacity: chanCap, ChannelPoint: chanPoint, @@ -100,8 +103,8 @@ func TestManager(t *testing.T) { ) } - fetchChannel := func(chanPoint wire.OutPoint) (*channeldb.OpenChannel, - error) { + fetchChannel := func(tx kvdb.RTx, chanPoint wire.OutPoint) ( + *channeldb.OpenChannel, error) { constraints := channeldb.ChannelConstraints{ MaxPendingAmount: maxPendingAmount, diff --git a/routing/router.go b/routing/router.go index e6f4c321e..17b5c4e72 100644 --- a/routing/router.go +++ b/routing/router.go @@ -137,7 +137,8 @@ type ChannelGraphSource interface { // ForAllOutgoingChannels is used to iterate over all channels // emanating from the "source" node which is the center of the // star-graph. - ForAllOutgoingChannels(cb func(c *channeldb.ChannelEdgeInfo, + ForAllOutgoingChannels(cb func(tx kvdb.RTx, + c *channeldb.ChannelEdgeInfo, e *channeldb.ChannelEdgePolicy) error) error // CurrentBlockHeight returns the block height from POV of the router @@ -2426,17 +2427,18 @@ func (r *ChannelRouter) ForEachNode(cb func(*channeldb.LightningNode) error) err // the router. // // NOTE: This method is part of the ChannelGraphSource interface. -func (r *ChannelRouter) ForAllOutgoingChannels(cb func(*channeldb.ChannelEdgeInfo, - *channeldb.ChannelEdgePolicy) error) error { +func (r *ChannelRouter) ForAllOutgoingChannels(cb func(kvdb.RTx, + *channeldb.ChannelEdgeInfo, *channeldb.ChannelEdgePolicy) error) error { - return r.selfNode.ForEachChannel(nil, func(_ kvdb.RTx, c *channeldb.ChannelEdgeInfo, + return r.selfNode.ForEachChannel(nil, func(tx kvdb.RTx, + c *channeldb.ChannelEdgeInfo, e, _ *channeldb.ChannelEdgePolicy) error { if e == nil { return fmt.Errorf("channel from self node has no policy") } - return cb(c, e) + return cb(tx, c, e) }) } diff --git a/rpcserver.go b/rpcserver.go index a15235b92..947d28e1c 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -2124,7 +2124,7 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest, // First, we'll fetch the channel as is, as we'll need to examine it // regardless of if this is a force close or not. - channel, err := r.server.chanStateDB.FetchChannel(*chanPoint) + channel, err := r.server.chanStateDB.FetchChannel(nil, *chanPoint) if err != nil { return err } @@ -2402,7 +2402,7 @@ func (r *rpcServer) AbandonChannel(_ context.Context, return nil, err } - dbChan, err := r.server.chanStateDB.FetchChannel(*chanPoint) + dbChan, err := r.server.chanStateDB.FetchChannel(nil, *chanPoint) switch { // If the channel isn't found in the set of open channels, then we can // continue on as it can't be loaded into the link/peer. From bd07b5f49e9cbe4bbb35481408d683332474d630 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Thu, 19 Aug 2021 17:25:58 +0200 Subject: [PATCH 2/2] routing/localchans: fix nested db tx This commit fixes a bug where a db tx is opened within another db tx. --- docs/release-notes/release-notes-0.14.0.md | 2 ++ routing/localchans/manager.go | 10 +++++----- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/docs/release-notes/release-notes-0.14.0.md b/docs/release-notes/release-notes-0.14.0.md index b68efef4e..4cfc22bc9 100644 --- a/docs/release-notes/release-notes-0.14.0.md +++ b/docs/release-notes/release-notes-0.14.0.md @@ -148,6 +148,8 @@ you. * [Fixed context leak in integration tests, and properly handled context timeout](https://github.com/lightningnetwork/lnd/pull/5646). +* [Removed nested db tx](https://github.com/lightningnetwork/lnd/pull/5643) + ## Database * [Ensure single writer for legacy diff --git a/routing/localchans/manager.go b/routing/localchans/manager.go index b88652e89..f4d4e5589 100644 --- a/routing/localchans/manager.go +++ b/routing/localchans/manager.go @@ -81,7 +81,7 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy, } // Apply the new policy to the edge. - err := r.updateEdge(info.ChannelPoint, edge, newSchema) + err := r.updateEdge(tx, info.ChannelPoint, edge, newSchema) if err != nil { log.Warnf("Cannot update policy for %v: %v\n", info.ChannelPoint, err, @@ -127,7 +127,7 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy, } // updateEdge updates the given edge with the new schema. -func (r *Manager) updateEdge(chanPoint wire.OutPoint, +func (r *Manager) updateEdge(tx kvdb.RTx, chanPoint wire.OutPoint, edge *channeldb.ChannelEdgePolicy, newSchema routing.ChannelPolicy) error { @@ -139,7 +139,7 @@ func (r *Manager) updateEdge(chanPoint wire.OutPoint, edge.TimeLockDelta = uint16(newSchema.TimeLockDelta) // Retrieve negotiated channel htlc amt limits. - amtMin, amtMax, err := r.getHtlcAmtLimits(chanPoint) + amtMin, amtMax, err := r.getHtlcAmtLimits(tx, chanPoint) if err != nil { return nil } @@ -198,10 +198,10 @@ func (r *Manager) updateEdge(chanPoint wire.OutPoint, // getHtlcAmtLimits retrieves the negotiated channel min and max htlc amount // constraints. -func (r *Manager) getHtlcAmtLimits(chanPoint wire.OutPoint) ( +func (r *Manager) getHtlcAmtLimits(tx kvdb.RTx, chanPoint wire.OutPoint) ( lnwire.MilliSatoshi, lnwire.MilliSatoshi, error) { - ch, err := r.FetchChannel(nil, chanPoint) + ch, err := r.FetchChannel(tx, chanPoint) if err != nil { return 0, 0, err }