From d89f51d1d0297425e375106050c3939edb9a2602 Mon Sep 17 00:00:00 2001 From: Andras Banki-Horvath Date: Mon, 26 Oct 2020 14:06:32 +0100 Subject: [PATCH] multi: add reset closure to kvdb.Update Similarly as with kvdb.View this commits adds a reset closure to the kvdb.Update call in order to be able to reset external state if the underlying db backend needs to retry the transaction. --- breacharbiter.go | 6 +-- channeldb/channel.go | 26 ++++++------ channeldb/channel_test.go | 2 +- channeldb/db.go | 23 ++++++---- channeldb/forwarding_package_test.go | 42 +++++++++---------- channeldb/graph.go | 22 ++++++---- channeldb/graph_test.go | 2 +- channeldb/invoices.go | 6 ++- channeldb/kvdb/etcd/db.go | 16 +++---- channeldb/kvdb/etcd/db_test.go | 4 +- channeldb/kvdb/etcd/readwrite_bucket_test.go | 20 ++++----- channeldb/kvdb/etcd/readwrite_cursor_test.go | 10 ++--- channeldb/kvdb/etcd/readwrite_tx_test.go | 2 +- channeldb/kvdb/interface.go | 26 +++++++----- channeldb/meta.go | 2 +- channeldb/meta_test.go | 14 +++---- channeldb/migration_01_to_11/db.go | 2 +- channeldb/migration_01_to_11/graph.go | 2 +- channeldb/migration_01_to_11/meta_test.go | 2 +- .../migration_09_legacy_serialization.go | 2 +- .../migration_11_invoices_test.go | 2 +- .../migration_01_to_11/migrations_test.go | 10 +++-- channeldb/migtest/migtest.go | 6 +-- channeldb/nodes.go | 4 +- channeldb/payments.go | 2 +- channeldb/payments_test.go | 6 +-- channeldb/peers.go | 2 +- channeldb/reports.go | 2 +- channeldb/reports_test.go | 4 +- channeldb/waitingproof.go | 4 +- contractcourt/briefcase.go | 4 +- discovery/message_store_test.go | 2 +- fundingmanager.go | 4 +- htlcswitch/circuit_map.go | 13 +++--- htlcswitch/decayedlog.go | 2 +- htlcswitch/payment_result.go | 2 +- htlcswitch/sequencer.go | 4 +- macaroons/store.go | 8 +++- nursery_store.go | 10 ++--- routing/missioncontrol_store.go | 14 ++++--- sweep/store.go | 4 +- watchtower/wtdb/client_db.go | 16 ++++--- watchtower/wtdb/tower_db.go | 10 +++-- watchtower/wtdb/version.go | 4 +- 44 files changed, 208 insertions(+), 162 deletions(-) diff --git a/breacharbiter.go b/breacharbiter.go index cf96f5ad1..ab1a8bd54 100644 --- a/breacharbiter.go +++ b/breacharbiter.go @@ -1265,7 +1265,7 @@ func (rs *retributionStore) Add(ret *retributionInfo) error { } return retBucket.Put(outBuf.Bytes(), retBuf.Bytes()) - }) + }, func() {}) } // Finalize writes a signed justice transaction to the retribution store. This @@ -1290,7 +1290,7 @@ func (rs *retributionStore) Finalize(chanPoint *wire.OutPoint, } return justiceBkt.Put(chanBuf.Bytes(), txBuf.Bytes()) - }) + }, func() {}) } // GetFinalizedTxn loads the finalized justice transaction for the provided @@ -1396,7 +1396,7 @@ func (rs *retributionStore) Remove(chanPoint *wire.OutPoint) error { } return justiceBkt.Delete(chanBytes) - }) + }, func() {}) } // ForAll iterates through all stored retributions and executes the passed diff --git a/channeldb/channel.go b/channeldb/channel.go index e30d6eb55..ec6654aa3 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -893,7 +893,7 @@ func (c *OpenChannel) MarkAsOpen(openLoc lnwire.ShortChannelID) error { channel.ShortChannelID = openLoc return putOpenChannel(chanBucket.(kvdb.RwBucket), channel) - }); err != nil { + }, func() {}); err != nil { return err } @@ -1219,7 +1219,7 @@ func (c *OpenChannel) putChanStatus(status ChannelStatus, } return nil - }); err != nil { + }, func() {}); err != nil { return err } @@ -1248,7 +1248,7 @@ func (c *OpenChannel) clearChanStatus(status ChannelStatus) error { channel.chanStatus = status return putOpenChannel(chanBucket, channel) - }); err != nil { + }, func() {}); err != nil { return err } @@ -1356,7 +1356,7 @@ func (c *OpenChannel) SyncPending(addr net.Addr, pendingHeight uint32) error { return kvdb.Update(c.Db, func(tx kvdb.RwTx) error { return syncNewChannel(tx, c, []net.Addr{addr}) - }) + }, func() {}) } // syncNewChannel will write the passed channel to disk, and also create a @@ -1490,7 +1490,7 @@ func (c *OpenChannel) UpdateCommitment(newCommitment *ChannelCommitment, } return nil - }) + }, func() {}) if err != nil { return err } @@ -2030,7 +2030,7 @@ func (c *OpenChannel) AppendRemoteCommitChain(diff *CommitDiff) error { return err } return chanBucket.Put(commitDiffKey, b.Bytes()) - }) + }, func() {}) } // RemoteCommitChainTip returns the "tip" of the current remote commitment @@ -2167,7 +2167,7 @@ func (c *OpenChannel) InsertNextRevocation(revKey *btcec.PublicKey) error { } return putChanRevocationState(chanBucket, c) - }) + }, func() {}) if err != nil { return err } @@ -2327,6 +2327,8 @@ func (c *OpenChannel) AdvanceCommitChainTail(fwdPkg *FwdPkg, newRemoteCommit = &newCommit.Commitment return nil + }, func() { + newRemoteCommit = nil }) if err != nil { return err @@ -2393,7 +2395,7 @@ func (c *OpenChannel) AckAddHtlcs(addRefs ...AddRef) error { return kvdb.Update(c.Db, func(tx kvdb.RwTx) error { return c.Packager.AckAddHtlcs(tx, addRefs...) - }) + }, func() {}) } // AckSettleFails updates the SettleFailFilter containing any of the provided @@ -2406,7 +2408,7 @@ func (c *OpenChannel) AckSettleFails(settleFailRefs ...SettleFailRef) error { return kvdb.Update(c.Db, func(tx kvdb.RwTx) error { return c.Packager.AckSettleFails(tx, settleFailRefs...) - }) + }, func() {}) } // SetFwdFilter atomically sets the forwarding filter for the forwarding package @@ -2417,7 +2419,7 @@ func (c *OpenChannel) SetFwdFilter(height uint64, fwdFilter *PkgFilter) error { return kvdb.Update(c.Db, func(tx kvdb.RwTx) error { return c.Packager.SetFwdFilter(tx, height, fwdFilter) - }) + }, func() {}) } // RemoveFwdPkgs atomically removes forwarding packages specified by the remote @@ -2438,7 +2440,7 @@ func (c *OpenChannel) RemoveFwdPkgs(heights ...uint64) error { } return nil - }) + }, func() {}) } // RevocationLogTail returns the "tail", or the end of the current revocation @@ -2799,7 +2801,7 @@ func (c *OpenChannel) CloseChannel(summary *ChannelCloseSummary, return putChannelCloseSummary( tx, chanPointBuf.Bytes(), summary, chanState, ) - }) + }, func() {}) } // ChannelSnapshot is a frozen snapshot of the current channel state. A diff --git a/channeldb/channel_test.go b/channeldb/channel_test.go index 656a885bf..50ffb3bfc 100644 --- a/channeldb/channel_test.go +++ b/channeldb/channel_test.go @@ -1447,7 +1447,7 @@ func TestBalanceAtHeight(t *testing.T) { commit.RemoteBalance = remote return appendChannelLogEntry(logBucket, &commit) - }) + }, func() {}) return err } diff --git a/channeldb/db.go b/channeldb/db.go index 02e1c6970..018c01b22 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -191,11 +191,16 @@ type DB struct { // Update is a wrapper around walletdb.Update which calls into the extended // backend when available. This call is needed to be able to cast DB to -// ExtendedBackend. -func (db *DB) Update(f func(tx walletdb.ReadWriteTx) error) error { +// ExtendedBackend. The passed reset function is called before the start of the +// transaction and can be used to reset intermediate state. As callers may +// expect retries of the f closure (depending on the database backend used), the +// reset function will be called before each retry respectively. +func (db *DB) Update(f func(tx walletdb.ReadWriteTx) error, reset func()) error { if v, ok := db.Backend.(kvdb.ExtendedBackend); ok { - return v.Update(f) + return v.Update(f, reset) } + + reset() return walletdb.Update(db, f) } @@ -310,7 +315,7 @@ func (d *DB) Wipe() error { } } return nil - }) + }, func() {}) } // createChannelDB creates and initializes a fresh version of channeldb. In @@ -364,7 +369,7 @@ func initChannelDB(db kvdb.Backend) error { meta.DbVersionNumber = getLatestDBVersion(dbVersions) return putMeta(meta, tx) - }) + }, func() {}) if err != nil { return fmt.Errorf("unable to create new channeldb: %v", err) } @@ -939,7 +944,7 @@ func (d *DB) MarkChanFullyClosed(chanPoint *wire.OutPoint) error { // garbage collect it to ensure we don't establish persistent // connections to peers without open channels. return d.pruneLinkNode(tx, chanSummary.RemotePub) - }) + }, func() {}) } // pruneLinkNode determines whether we should garbage collect a link node from @@ -979,7 +984,7 @@ func (d *DB) PruneLinkNodes() error { } return nil - }) + }, func() {}) } // ChannelShell is a shell of a channel that is meant to be used for channel @@ -1026,7 +1031,7 @@ func (d *DB) RestoreChannelShells(channelShells ...*ChannelShell) error { } return nil - }) + }, func() {}) if err != nil { return err } @@ -1210,7 +1215,7 @@ func (d *DB) syncVersions(versions []version) error { } return nil - }) + }, func() {}) } // ChannelGraph returns a new instance of the directed channel graph. diff --git a/channeldb/forwarding_package_test.go b/channeldb/forwarding_package_test.go index daeb46210..a85c7420f 100644 --- a/channeldb/forwarding_package_test.go +++ b/channeldb/forwarding_package_test.go @@ -209,7 +209,7 @@ func TestPackagerEmptyFwdPkg(t *testing.T) { if err := kvdb.Update(db, func(tx kvdb.RwTx) error { return packager.AddFwdPkg(tx, fwdPkg) - }); err != nil { + }, func() {}); err != nil { t.Fatalf("unable to add fwd pkg: %v", err) } @@ -228,7 +228,7 @@ func TestPackagerEmptyFwdPkg(t *testing.T) { // fwd filter. if err := kvdb.Update(db, func(tx kvdb.RwTx) error { return packager.SetFwdFilter(tx, fwdPkg.Height, fwdPkg.FwdFilter) - }); err != nil { + }, func() {}); err != nil { t.Fatalf("unable to set fwdfiter: %v", err) } @@ -246,7 +246,7 @@ func TestPackagerEmptyFwdPkg(t *testing.T) { // Lastly, remove the completed forwarding package from disk. if err := kvdb.Update(db, func(tx kvdb.RwTx) error { return packager.RemovePkg(tx, fwdPkg.Height) - }); err != nil { + }, func() {}); err != nil { t.Fatalf("unable to remove fwdpkg: %v", err) } @@ -281,7 +281,7 @@ func TestPackagerOnlyAdds(t *testing.T) { if err := kvdb.Update(db, func(tx kvdb.RwTx) error { return packager.AddFwdPkg(tx, fwdPkg) - }); err != nil { + }, func() {}); err != nil { t.Fatalf("unable to add fwd pkg: %v", err) } @@ -302,7 +302,7 @@ func TestPackagerOnlyAdds(t *testing.T) { // was failed locally. if err := kvdb.Update(db, func(tx kvdb.RwTx) error { return packager.SetFwdFilter(tx, fwdPkg.Height, fwdPkg.FwdFilter) - }); err != nil { + }, func() {}); err != nil { t.Fatalf("unable to set fwdfiter: %v", err) } @@ -326,7 +326,7 @@ func TestPackagerOnlyAdds(t *testing.T) { if err := kvdb.Update(db, func(tx kvdb.RwTx) error { return packager.AckAddHtlcs(tx, addRef) - }); err != nil { + }, func() {}); err != nil { t.Fatalf("unable to ack add htlc: %v", err) } } @@ -345,7 +345,7 @@ func TestPackagerOnlyAdds(t *testing.T) { // Lastly, remove the completed forwarding package from disk. if err := kvdb.Update(db, func(tx kvdb.RwTx) error { return packager.RemovePkg(tx, fwdPkg.Height) - }); err != nil { + }, func() {}); err != nil { t.Fatalf("unable to remove fwdpkg: %v", err) } @@ -383,7 +383,7 @@ func TestPackagerOnlySettleFails(t *testing.T) { if err := kvdb.Update(db, func(tx kvdb.RwTx) error { return packager.AddFwdPkg(tx, fwdPkg) - }); err != nil { + }, func() {}); err != nil { t.Fatalf("unable to add fwd pkg: %v", err) } @@ -404,7 +404,7 @@ func TestPackagerOnlySettleFails(t *testing.T) { // was failed locally. if err := kvdb.Update(db, func(tx kvdb.RwTx) error { return packager.SetFwdFilter(tx, fwdPkg.Height, fwdPkg.FwdFilter) - }); err != nil { + }, func() {}); err != nil { t.Fatalf("unable to set fwdfiter: %v", err) } @@ -430,7 +430,7 @@ func TestPackagerOnlySettleFails(t *testing.T) { if err := kvdb.Update(db, func(tx kvdb.RwTx) error { return packager.AckSettleFails(tx, failSettleRef) - }); err != nil { + }, func() {}); err != nil { t.Fatalf("unable to ack add htlc: %v", err) } } @@ -450,7 +450,7 @@ func TestPackagerOnlySettleFails(t *testing.T) { // Lastly, remove the completed forwarding package from disk. if err := kvdb.Update(db, func(tx kvdb.RwTx) error { return packager.RemovePkg(tx, fwdPkg.Height) - }); err != nil { + }, func() {}); err != nil { t.Fatalf("unable to remove fwdpkg: %v", err) } @@ -488,7 +488,7 @@ func TestPackagerAddsThenSettleFails(t *testing.T) { if err := kvdb.Update(db, func(tx kvdb.RwTx) error { return packager.AddFwdPkg(tx, fwdPkg) - }); err != nil { + }, func() {}); err != nil { t.Fatalf("unable to add fwd pkg: %v", err) } @@ -509,7 +509,7 @@ func TestPackagerAddsThenSettleFails(t *testing.T) { // was failed locally. if err := kvdb.Update(db, func(tx kvdb.RwTx) error { return packager.SetFwdFilter(tx, fwdPkg.Height, fwdPkg.FwdFilter) - }); err != nil { + }, func() {}); err != nil { t.Fatalf("unable to set fwdfiter: %v", err) } @@ -534,7 +534,7 @@ func TestPackagerAddsThenSettleFails(t *testing.T) { if err := kvdb.Update(db, func(tx kvdb.RwTx) error { return packager.AckAddHtlcs(tx, addRef) - }); err != nil { + }, func() {}); err != nil { t.Fatalf("unable to ack add htlc: %v", err) } } @@ -561,7 +561,7 @@ func TestPackagerAddsThenSettleFails(t *testing.T) { if err := kvdb.Update(db, func(tx kvdb.RwTx) error { return packager.AckSettleFails(tx, failSettleRef) - }); err != nil { + }, func() {}); err != nil { t.Fatalf("unable to remove settle/fail htlc: %v", err) } } @@ -581,7 +581,7 @@ func TestPackagerAddsThenSettleFails(t *testing.T) { // Lastly, remove the completed forwarding package from disk. if err := kvdb.Update(db, func(tx kvdb.RwTx) error { return packager.RemovePkg(tx, fwdPkg.Height) - }); err != nil { + }, func() {}); err != nil { t.Fatalf("unable to remove fwdpkg: %v", err) } @@ -621,7 +621,7 @@ func TestPackagerSettleFailsThenAdds(t *testing.T) { if err := kvdb.Update(db, func(tx kvdb.RwTx) error { return packager.AddFwdPkg(tx, fwdPkg) - }); err != nil { + }, func() {}); err != nil { t.Fatalf("unable to add fwd pkg: %v", err) } @@ -642,7 +642,7 @@ func TestPackagerSettleFailsThenAdds(t *testing.T) { // was failed locally. if err := kvdb.Update(db, func(tx kvdb.RwTx) error { return packager.SetFwdFilter(tx, fwdPkg.Height, fwdPkg.FwdFilter) - }); err != nil { + }, func() {}); err != nil { t.Fatalf("unable to set fwdfiter: %v", err) } @@ -671,7 +671,7 @@ func TestPackagerSettleFailsThenAdds(t *testing.T) { if err := kvdb.Update(db, func(tx kvdb.RwTx) error { return packager.AckSettleFails(tx, failSettleRef) - }); err != nil { + }, func() {}); err != nil { t.Fatalf("unable to remove settle/fail htlc: %v", err) } } @@ -698,7 +698,7 @@ func TestPackagerSettleFailsThenAdds(t *testing.T) { if err := kvdb.Update(db, func(tx kvdb.RwTx) error { return packager.AckAddHtlcs(tx, addRef) - }); err != nil { + }, func() {}); err != nil { t.Fatalf("unable to ack add htlc: %v", err) } } @@ -718,7 +718,7 @@ func TestPackagerSettleFailsThenAdds(t *testing.T) { // Lastly, remove the completed forwarding package from disk. if err := kvdb.Update(db, func(tx kvdb.RwTx) error { return packager.RemovePkg(tx, fwdPkg.Height) - }); err != nil { + }, func() {}); err != nil { t.Fatalf("unable to remove fwdpkg: %v", err) } diff --git a/channeldb/graph.go b/channeldb/graph.go index 101f11a28..67910852e 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -434,7 +434,7 @@ func (c *ChannelGraph) SetSourceNode(node *LightningNode) error { // Finally, we commit the information of the lightning node // itself. return addLightningNode(tx, node) - }) + }, func() {}) } // AddLightningNode adds a vertex/node to the graph database. If the node is not @@ -448,7 +448,7 @@ func (c *ChannelGraph) SetSourceNode(node *LightningNode) error { func (c *ChannelGraph) AddLightningNode(node *LightningNode) error { return kvdb.Update(c.db, func(tx kvdb.RwTx) error { return addLightningNode(tx, node) - }) + }, func() {}) } func addLightningNode(tx kvdb.RwTx, node *LightningNode) error { @@ -519,7 +519,7 @@ func (c *ChannelGraph) DeleteLightningNode(nodePub route.Vertex) error { } return c.deleteLightningNode(nodes, nodePub[:]) - }) + }, func() {}) } // deleteLightningNode uses an existing database transaction to remove a @@ -579,7 +579,7 @@ func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error { err := kvdb.Update(c.db, func(tx kvdb.RwTx) error { return c.addChannelEdge(tx, edge) - }) + }, func() {}) if err != nil { return err } @@ -820,7 +820,7 @@ func (c *ChannelGraph) UpdateChannelEdge(edge *ChannelEdgeInfo) error { } return putChanEdgeInfo(edgeIndex, edge, chanKey) - }) + }, func() {}) } const ( @@ -943,6 +943,8 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint, // prune any nodes that have had a channel closed within the // latest block. return c.pruneGraphNodes(nodes, edgeIndex) + }, func() { + chansClosed = nil }) if err != nil { return nil, err @@ -976,7 +978,7 @@ func (c *ChannelGraph) PruneGraphNodes() error { } return c.pruneGraphNodes(nodes, edgeIndex) - }) + }, func() {}) } // pruneGraphNodes attempts to remove any nodes from the graph who have had a @@ -1195,6 +1197,8 @@ func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) ([]*ChannelEdgeInf } return nil + }, func() { + removedChans = nil }); err != nil { return nil, err } @@ -1297,7 +1301,7 @@ func (c *ChannelGraph) DeleteChannelEdges(chanIDs ...uint64) error { } return nil - }) + }, func() {}) if err != nil { return err } @@ -1936,6 +1940,8 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error { var err error isUpdate1, err = updateEdgePolicy(tx, edge) return err + }, func() { + isUpdate1 = false }) if err != nil { return err @@ -3268,7 +3274,7 @@ func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error { var k [8]byte byteOrder.PutUint64(k[:], chanID) return zombieIndex.Delete(k[:]) - }) + }, func() {}) if err != nil { return err } diff --git a/channeldb/graph_test.go b/channeldb/graph_test.go index 43d786d9a..52d1114dc 100644 --- a/channeldb/graph_test.go +++ b/channeldb/graph_test.go @@ -2894,7 +2894,7 @@ func TestEdgePolicyMissingMaxHtcl(t *testing.T) { } return edges.Put(edgeKey[:], stripped) - }) + }, func() {}) if err != nil { t.Fatalf("error writing db: %v", err) } diff --git a/channeldb/invoices.go b/channeldb/invoices.go index c5209f1e8..e6bd46a09 100644 --- a/channeldb/invoices.go +++ b/channeldb/invoices.go @@ -562,6 +562,8 @@ func (d *DB) AddInvoice(newInvoice *Invoice, paymentHash lntypes.Hash) ( invoiceAddIndex = newIndex return nil + }, func() { + invoiceAddIndex = 0 }) if err != nil { return 0, err @@ -950,6 +952,8 @@ func (d *DB) UpdateInvoice(ref InvoiceRef, ) return err + }, func() { + updatedInvoice = nil }) return updatedInvoice, err @@ -1866,7 +1870,7 @@ func (d *DB) DeleteInvoice(invoicesToDelete []InvoiceDeleteRef) error { } return nil - }) + }, func() {}) return err } diff --git a/channeldb/kvdb/etcd/db.go b/channeldb/kvdb/etcd/db.go index c63bbdbd5..4684113af 100644 --- a/channeldb/kvdb/etcd/db.go +++ b/channeldb/kvdb/etcd/db.go @@ -234,13 +234,15 @@ func (db *db) View(f func(tx walletdb.ReadTx) error, reset func()) error { } // Update opens a database read/write transaction and executes the function f -// with the transaction passed as a parameter. After f exits, if f did not -// error, the transaction is committed. Otherwise, if f did error, the -// transaction is rolled back. If the rollback fails, the original error -// returned by f is still returned. If the commit fails, the commit error is -// returned. -func (db *db) Update(f func(tx walletdb.ReadWriteTx) error) error { +// with the transaction passed as a parameter. After f exits, if f did not +// error, the transaction is committed. Otherwise, if f did error, the +// transaction is rolled back. If the rollback fails, the original error +// returned by f is still returned. If the commit fails, the commit error is +// returned. As callers may expect retries of the f closure, the reset function +// will be called before each retry respectively. +func (db *db) Update(f func(tx walletdb.ReadWriteTx) error, reset func()) error { apply := func(stm STM) error { + reset() return f(newReadWriteTx(stm, db.config.Prefix)) } @@ -304,5 +306,5 @@ func (db *db) Close() error { // // Batch is only useful when there are multiple goroutines calling it. func (db *db) Batch(apply func(tx walletdb.ReadWriteTx) error) error { - return db.Update(apply) + return db.Update(apply, func() {}) } diff --git a/channeldb/kvdb/etcd/db_test.go b/channeldb/kvdb/etcd/db_test.go index 8a5d623e5..191c35ab0 100644 --- a/channeldb/kvdb/etcd/db_test.go +++ b/channeldb/kvdb/etcd/db_test.go @@ -28,7 +28,7 @@ func TestCopy(t *testing.T) { require.NoError(t, apple.Put([]byte("key"), []byte("val"))) return nil - }) + }, func() {}) // Expect non-zero copy. var buf bytes.Buffer @@ -66,7 +66,7 @@ func TestAbortContext(t *testing.T) { require.Error(t, err, "context canceled") return nil - }) + }, func() {}) require.Error(t, err, "context canceled") diff --git a/channeldb/kvdb/etcd/readwrite_bucket_test.go b/channeldb/kvdb/etcd/readwrite_bucket_test.go index 2795dce34..085d6e6d6 100644 --- a/channeldb/kvdb/etcd/readwrite_bucket_test.go +++ b/channeldb/kvdb/etcd/readwrite_bucket_test.go @@ -79,7 +79,7 @@ func TestBucketCreation(t *testing.T) { require.NotNil(t, apple.NestedReadWriteBucket([]byte("banana"))) require.NotNil(t, apple.NestedReadBucket([]byte("banana"))) return nil - }) + }, func() {}) require.Nil(t, err) @@ -189,7 +189,7 @@ func TestBucketDeletion(t *testing.T) { // "aple/banana" exists require.NotNil(t, apple.NestedReadWriteBucket([]byte("banana"))) return nil - }) + }, func() {}) require.Nil(t, err) @@ -261,7 +261,7 @@ func TestBucketForEach(t *testing.T) { require.Equal(t, expected, got) return nil - }) + }, func() {}) require.Nil(t, err) @@ -354,7 +354,7 @@ func TestBucketForEachWithError(t *testing.T) { require.Equal(t, expected, got) require.Error(t, err) return nil - }) + }, func() {}) require.Nil(t, err) @@ -399,7 +399,7 @@ func TestBucketSequence(t *testing.T) { } return nil - }) + }, func() {}) require.Nil(t, err) } @@ -431,7 +431,7 @@ func TestKeyClash(t *testing.T) { require.NotNil(t, banana) return nil - }) + }, func() {}) require.Nil(t, err) @@ -457,7 +457,7 @@ func TestKeyClash(t *testing.T) { require.Error(t, walletdb.ErrIncompatibleValue, b) return nil - }) + }, func() {}) require.Nil(t, err) @@ -494,7 +494,7 @@ func TestBucketCreateDelete(t *testing.T) { require.NotNil(t, banana) return nil - }) + }, func() {}) require.NoError(t, err) err = db.Update(func(tx walletdb.ReadWriteTx) error { @@ -503,7 +503,7 @@ func TestBucketCreateDelete(t *testing.T) { require.NoError(t, apple.DeleteNestedBucket([]byte("banana"))) return nil - }) + }, func() {}) require.NoError(t, err) err = db.Update(func(tx walletdb.ReadWriteTx) error { @@ -512,7 +512,7 @@ func TestBucketCreateDelete(t *testing.T) { require.NoError(t, apple.Put([]byte("banana"), []byte("value"))) return nil - }) + }, func() {}) require.NoError(t, err) expected := map[string]string{ diff --git a/channeldb/kvdb/etcd/readwrite_cursor_test.go b/channeldb/kvdb/etcd/readwrite_cursor_test.go index 6b4f317fd..09d4bf7b8 100644 --- a/channeldb/kvdb/etcd/readwrite_cursor_test.go +++ b/channeldb/kvdb/etcd/readwrite_cursor_test.go @@ -24,7 +24,7 @@ func TestReadCursorEmptyInterval(t *testing.T) { require.NotNil(t, b) return nil - }) + }, func() {}) require.NoError(t, err) err = db.View(func(tx walletdb.ReadTx) error { @@ -78,7 +78,7 @@ func TestReadCursorNonEmptyInterval(t *testing.T) { require.NoError(t, b.Put([]byte(kv.key), []byte(kv.val))) } return nil - }) + }, func() {}) require.NoError(t, err) @@ -162,7 +162,7 @@ func TestReadWriteCursor(t *testing.T) { require.NoError(t, err) } return nil - })) + }, func() {})) err = db.Update(func(tx walletdb.ReadWriteTx) error { b := tx.ReadWriteBucket([]byte("apple")) @@ -276,7 +276,7 @@ func TestReadWriteCursor(t *testing.T) { require.Equal(t, reverseKVs(expected), kvs) return nil - }) + }, func() {}) require.NoError(t, err) @@ -320,7 +320,7 @@ func TestReadWriteCursorWithBucketAndValue(t *testing.T) { require.NotNil(t, b2) return nil - })) + }, func() {})) err = db.View(func(tx walletdb.ReadTx) error { b := tx.ReadBucket([]byte("apple")) diff --git a/channeldb/kvdb/etcd/readwrite_tx_test.go b/channeldb/kvdb/etcd/readwrite_tx_test.go index bab6967f8..f005b2f53 100644 --- a/channeldb/kvdb/etcd/readwrite_tx_test.go +++ b/channeldb/kvdb/etcd/readwrite_tx_test.go @@ -142,7 +142,7 @@ func TestChangeDuringUpdate(t *testing.T) { count++ return nil - }) + }, func() {}) require.Nil(t, err) require.Equal(t, count, 2) diff --git a/channeldb/kvdb/interface.go b/channeldb/kvdb/interface.go index 7d44f56c5..9ea4ccdf7 100644 --- a/channeldb/kvdb/interface.go +++ b/channeldb/kvdb/interface.go @@ -10,11 +10,15 @@ import ( // error, the transaction is committed. Otherwise, if f did error, the // transaction is rolled back. If the rollback fails, the original error // returned by f is still returned. If the commit fails, the commit error is -// returned. -func Update(db Backend, f func(tx RwTx) error) error { +// returned. As callers may expect retries of the f closure (depending on the +// database backend used), the reset function will be called before each retry +// respectively. +func Update(db Backend, f func(tx RwTx) error, reset func()) error { if extendedDB, ok := db.(ExtendedBackend); ok { - return extendedDB.Update(f) + return extendedDB.Update(f, reset) } + + reset() return walletdb.Update(db, f) } @@ -72,13 +76,15 @@ type ExtendedBackend interface { //called before each retry respectively. View(f func(tx walletdb.ReadTx) error, reset func()) error - // Update opens a database read/write transaction and executes the function - // f with the transaction passed as a parameter. After f exits, if f did not - // error, the transaction is committed. Otherwise, if f did error, the - // transaction is rolled back. If the rollback fails, the original error - // returned by f is still returned. If the commit fails, the commit error is - // returned. - Update(f func(tx walletdb.ReadWriteTx) error) error + // Update opens a database read/write transaction and executes the + // function f with the transaction passed as a parameter. After f exits, + // if f did not error, the transaction is committed. Otherwise, if f did + // error, the transaction is rolled back. If the rollback fails, the + // original error returned by f is still returned. If the commit fails, + // the commit error is returned. As callers may expect retries of the f + // closure (depending on the database backend used), the reset function + // will be called before each retry respectively. + Update(f func(tx walletdb.ReadWriteTx) error, reset func()) error } // Open opens an existing database for the specified type. The arguments are diff --git a/channeldb/meta.go b/channeldb/meta.go index c8ade44ba..4612c38d3 100644 --- a/channeldb/meta.go +++ b/channeldb/meta.go @@ -60,7 +60,7 @@ func fetchMeta(meta *Meta, tx kvdb.RTx) error { func (d *DB) PutMeta(meta *Meta) error { return kvdb.Update(d, func(tx kvdb.RwTx) error { return putMeta(meta, tx) - }) + }, func() {}) } // putMeta is an internal helper function used in order to allow callers to diff --git a/channeldb/meta_test.go b/channeldb/meta_test.go index 7eabfc2c9..892f7eb05 100644 --- a/channeldb/meta_test.go +++ b/channeldb/meta_test.go @@ -209,7 +209,7 @@ func TestMigrationWithPanic(t *testing.T) { } return bucket.Put(keyPrefix, beforeMigration) - }) + }, func() {}) if err != nil { t.Fatalf("unable to insert: %v", err) } @@ -251,7 +251,7 @@ func TestMigrationWithPanic(t *testing.T) { } return nil - }) + }, func() {}) if err != nil { t.Fatal(err) } @@ -283,7 +283,7 @@ func TestMigrationWithFatal(t *testing.T) { } return bucket.Put(keyPrefix, beforeMigration) - }) + }, func() {}) if err != nil { t.Fatalf("unable to insert pre migration key: %v", err) } @@ -326,7 +326,7 @@ func TestMigrationWithFatal(t *testing.T) { } return nil - }) + }, func() {}) if err != nil { t.Fatal(err) } @@ -359,7 +359,7 @@ func TestMigrationWithoutErrors(t *testing.T) { } return bucket.Put(keyPrefix, beforeMigration) - }) + }, func() {}) if err != nil { t.Fatalf("unable to update db pre migration: %v", err) } @@ -401,7 +401,7 @@ func TestMigrationWithoutErrors(t *testing.T) { } return nil - }) + }, func() {}) if err != nil { t.Fatal(err) } @@ -448,7 +448,7 @@ func TestMigrationReversion(t *testing.T) { } return putMeta(newMeta, tx) - }) + }, func() {}) // Close the database. Even if we succeeded, our next step is to reopen. cdb.Close() diff --git a/channeldb/migration_01_to_11/db.go b/channeldb/migration_01_to_11/db.go index 71128a118..be690a987 100644 --- a/channeldb/migration_01_to_11/db.go +++ b/channeldb/migration_01_to_11/db.go @@ -152,7 +152,7 @@ func createChannelDB(dbPath string) error { DbVersionNumber: 0, } return putMeta(meta, tx) - }) + }, func() {}) if err != nil { return fmt.Errorf("unable to create new channeldb") } diff --git a/channeldb/migration_01_to_11/graph.go b/channeldb/migration_01_to_11/graph.go index 0e34b4058..c7e78e746 100644 --- a/channeldb/migration_01_to_11/graph.go +++ b/channeldb/migration_01_to_11/graph.go @@ -244,7 +244,7 @@ func (c *ChannelGraph) SetSourceNode(node *LightningNode) error { // Finally, we commit the information of the lightning node // itself. return addLightningNode(tx, node) - }) + }, func() {}) } func addLightningNode(tx kvdb.RwTx, node *LightningNode) error { diff --git a/channeldb/migration_01_to_11/meta_test.go b/channeldb/migration_01_to_11/meta_test.go index d3850c155..b5dfee0a1 100644 --- a/channeldb/migration_01_to_11/meta_test.go +++ b/channeldb/migration_01_to_11/meta_test.go @@ -51,7 +51,7 @@ func applyMigration(t *testing.T, beforeMigration, afterMigration func(d *DB), // Apply migration. err = kvdb.Update(cdb, func(tx kvdb.RwTx) error { return migrationFunc(tx) - }) + }, func() {}) if err != nil { log.Error(err) } diff --git a/channeldb/migration_01_to_11/migration_09_legacy_serialization.go b/channeldb/migration_01_to_11/migration_09_legacy_serialization.go index b8f86d386..fda082269 100644 --- a/channeldb/migration_01_to_11/migration_09_legacy_serialization.go +++ b/channeldb/migration_01_to_11/migration_09_legacy_serialization.go @@ -95,7 +95,7 @@ func (db *DB) addPayment(payment *outgoingPayment) error { binary.BigEndian.PutUint64(paymentIDBytes, paymentID) return payments.Put(paymentIDBytes, paymentBytes) - }) + }, func() {}) } // fetchAllPayments returns all outgoing payments in DB. diff --git a/channeldb/migration_01_to_11/migration_11_invoices_test.go b/channeldb/migration_01_to_11/migration_11_invoices_test.go index 32899d8e0..f80ea1ecf 100644 --- a/channeldb/migration_01_to_11/migration_11_invoices_test.go +++ b/channeldb/migration_01_to_11/migration_11_invoices_test.go @@ -55,7 +55,7 @@ func beforeMigrationFuncV11(t *testing.T, d *DB, invoices []Invoice) { } return nil - }) + }, func() {}) if err != nil { t.Fatal(err) } diff --git a/channeldb/migration_01_to_11/migrations_test.go b/channeldb/migration_01_to_11/migrations_test.go index 010c10a1c..3677c90b7 100644 --- a/channeldb/migration_01_to_11/migrations_test.go +++ b/channeldb/migration_01_to_11/migrations_test.go @@ -125,7 +125,7 @@ func TestPaymentStatusesMigration(t *testing.T) { } return circuits.Put(inFlightKey, inFlightCircuit) - }) + }, func() {}) if err != nil { t.Fatalf("unable to add circuit map entry: %v", err) } @@ -385,7 +385,7 @@ func TestMigrateOptionalChannelCloseSummaryFields(t *testing.T) { return err } return closedChanBucket.Put(chanID, old) - }) + }, func() {}) if err != nil { t.Fatalf("unable to add old serialization: %v", err) @@ -493,7 +493,7 @@ func TestMigrateGossipMessageStoreKeys(t *testing.T) { } return messageStore.Put(oldMsgKey[:], b.Bytes()) - }) + }, func() {}) if err != nil { t.Fatal(err) } @@ -683,7 +683,7 @@ func TestOutgoingPaymentsMigration(t *testing.T) { } return nil - }) + }, func() {}) if err != nil { t.Fatal(err) } @@ -859,6 +859,8 @@ func TestPaymentRouteSerialization(t *testing.T) { } return nil + }, func() { + oldPayments = nil }) if err != nil { t.Fatalf("unable to create test payments: %v", err) diff --git a/channeldb/migtest/migtest.go b/channeldb/migtest/migtest.go index 09edc0333..9555d093c 100644 --- a/channeldb/migtest/migtest.go +++ b/channeldb/migtest/migtest.go @@ -47,7 +47,7 @@ func ApplyMigration(t *testing.T, // beforeMigration usually used for populating the database // with test data. - err = kvdb.Update(cdb, beforeMigration) + err = kvdb.Update(cdb, beforeMigration, func() {}) if err != nil { t.Fatal(err) } @@ -65,14 +65,14 @@ func ApplyMigration(t *testing.T, // afterMigration usually used for checking the database state and // throwing the error if something went wrong. - err = kvdb.Update(cdb, afterMigration) + err = kvdb.Update(cdb, afterMigration, func() {}) if err != nil { t.Fatal(err) } }() // Apply migration. - err = kvdb.Update(cdb, migrationFunc) + err = kvdb.Update(cdb, migrationFunc, func() {}) if err != nil { t.Logf("migration error: %v", err) } diff --git a/channeldb/nodes.go b/channeldb/nodes.go index 038712644..4ae9dfbdd 100644 --- a/channeldb/nodes.go +++ b/channeldb/nodes.go @@ -108,7 +108,7 @@ func (l *LinkNode) Sync() error { } return putLinkNode(nodeMetaBucket, l) - }) + }, func() {}) } // putLinkNode serializes then writes the encoded version of the passed link @@ -132,7 +132,7 @@ func putLinkNode(nodeMetaBucket kvdb.RwBucket, l *LinkNode) error { func (db *DB) DeleteLinkNode(identity *btcec.PublicKey) error { return kvdb.Update(db, func(tx kvdb.RwTx) error { return db.deleteLinkNode(tx, identity) - }) + }, func() {}) } func (db *DB) deleteLinkNode(tx kvdb.RwTx, identity *btcec.PublicKey) error { diff --git a/channeldb/payments.go b/channeldb/payments.go index 3344451c1..1a42d7180 100644 --- a/channeldb/payments.go +++ b/channeldb/payments.go @@ -749,7 +749,7 @@ func (db *DB) DeletePayments() error { } return nil - }) + }, func() {}) } // fetchSequenceNumbers fetches all the sequence numbers associated with a diff --git a/channeldb/payments_test.go b/channeldb/payments_test.go index 0dc059561..f3de20532 100644 --- a/channeldb/payments_test.go +++ b/channeldb/payments_test.go @@ -183,7 +183,7 @@ func deletePayment(t *testing.T, db *DB, paymentHash lntypes.Hash, seqNr uint64) // Delete the index that references this payment. indexes := tx.ReadWriteBucket(paymentsIndexBucket) return indexes.Delete(key) - }) + }, func() {}) if err != nil { t.Fatalf("could not delete "+ @@ -622,7 +622,7 @@ func TestFetchPaymentWithSequenceNumber(t *testing.T) { tx, test.paymentHash, seqNrBytes[:], ) return err - }) + }, func() {}) require.Equal(t, test.expectedErr, err) }) } @@ -666,7 +666,7 @@ func appendDuplicatePayment(t *testing.T, db *DB, paymentHash lntypes.Hash, require.NoError(t, err) return nil - }) + }, func() {}) if err != nil { t.Fatalf("could not create payment: %v", err) } diff --git a/channeldb/peers.go b/channeldb/peers.go index 55920b0db..9122f6bed 100644 --- a/channeldb/peers.go +++ b/channeldb/peers.go @@ -80,7 +80,7 @@ func (d *DB) WriteFlapCounts(flapCounts map[route.Vertex]*FlapCount) error { } return nil - }) + }, func() {}) } // ReadFlapCount attempts to read the flap count for a peer, failing if the diff --git a/channeldb/reports.go b/channeldb/reports.go index 111e787ea..1338a5194 100644 --- a/channeldb/reports.go +++ b/channeldb/reports.go @@ -130,7 +130,7 @@ func (d *DB) PutResolverReport(tx kvdb.RwTx, chainHash chainhash.Hash, // If the transaction is nil, we'll create a new one. if tx == nil { - return kvdb.Update(d, putReportFunc) + return kvdb.Update(d, putReportFunc, func() {}) } // Otherwise, we can write the report to disk using the existing diff --git a/channeldb/reports_test.go b/channeldb/reports_test.go index a63fe42b0..445be0161 100644 --- a/channeldb/reports_test.go +++ b/channeldb/reports_test.go @@ -202,7 +202,7 @@ func TestFetchChannelWriteBucket(t *testing.T) { defer cleanup() // Update our db to the starting state we expect. - err = kvdb.Update(db, test.setup) + err = kvdb.Update(db, test.setup, func() {}) require.NoError(t, err) // Try to get our report bucket. @@ -211,7 +211,7 @@ func TestFetchChannelWriteBucket(t *testing.T) { tx, testChainHash, &testChanPoint1, ) return err - }) + }, func() {}) require.NoError(t, err) }) } diff --git a/channeldb/waitingproof.go b/channeldb/waitingproof.go index 43a4fd8ad..81bfd6d4b 100644 --- a/channeldb/waitingproof.go +++ b/channeldb/waitingproof.go @@ -80,7 +80,7 @@ func (s *WaitingProofStore) Add(proof *WaitingProof) error { key := proof.Key() return bucket.Put(key[:], b.Bytes()) - }) + }, func() {}) if err != nil { return err } @@ -109,7 +109,7 @@ func (s *WaitingProofStore) Remove(key WaitingProofKey) error { } return bucket.Delete(key[:]) - }) + }, func() {}) if err != nil { return err } diff --git a/contractcourt/briefcase.go b/contractcourt/briefcase.go index d1dc17dd8..6b377eeb0 100644 --- a/contractcourt/briefcase.go +++ b/contractcourt/briefcase.go @@ -924,7 +924,7 @@ func (b *boltArbitratorLog) WipeHistory() error { // Finally, we'll delete the enclosing bucket itself. return tx.DeleteTopLevelBucket(b.scopeKey[:]) - }) + }, func() {}) } // checkpointContract is a private method that will be fed into @@ -951,7 +951,7 @@ func (b *boltArbitratorLog) checkpointContract(c ContractResolver, } return nil - }) + }, func() {}) } func encodeIncomingResolution(w io.Writer, i *lnwallet.IncomingHtlcResolution) error { diff --git a/discovery/message_store_test.go b/discovery/message_store_test.go index fc7ba3360..7d0fa1c26 100644 --- a/discovery/message_store_test.go +++ b/discovery/message_store_test.go @@ -239,7 +239,7 @@ func TestMessageStoreUnsupportedMessage(t *testing.T) { err = kvdb.Update(msgStore.db, func(tx kvdb.RwTx) error { messageStore := tx.ReadWriteBucket(messageStoreBucket) return messageStore.Put(msgKey, rawMsg.Bytes()) - }) + }, func() {}) if err != nil { t.Fatalf("unable to add unsupported message to store: %v", err) } diff --git a/fundingmanager.go b/fundingmanager.go index 1bd018b19..350e5a4a6 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -3505,7 +3505,7 @@ func (f *fundingManager) saveChannelOpeningState(chanPoint *wire.OutPoint, byteOrder.PutUint64(scratch[2:], shortChanID.ToUint64()) return bucket.Put(outpointBytes.Bytes(), scratch) - }) + }, func() {}) } // getChannelOpeningState fetches the channelOpeningState for the provided @@ -3560,5 +3560,5 @@ func (f *fundingManager) deleteChannelOpeningState(chanPoint *wire.OutPoint) err } return bucket.Delete(outpointBytes.Bytes()) - }) + }, func() {}) } diff --git a/htlcswitch/circuit_map.go b/htlcswitch/circuit_map.go index e167584e8..547bba3a4 100644 --- a/htlcswitch/circuit_map.go +++ b/htlcswitch/circuit_map.go @@ -227,7 +227,7 @@ func (cm *circuitMap) initBuckets() error { _, err = tx.CreateTopLevelBucket(circuitAddKey) return err - }) + }, func() {}) } // restoreMemState loads the contents of the half circuit and full circuit @@ -240,8 +240,8 @@ func (cm *circuitMap) restoreMemState() error { log.Infof("Restoring in-memory circuit state from disk") var ( - opened = make(map[CircuitKey]*PaymentCircuit) - pending = make(map[CircuitKey]*PaymentCircuit) + opened map[CircuitKey]*PaymentCircuit + pending map[CircuitKey]*PaymentCircuit ) if err := kvdb.Update(cm.cfg.DB, func(tx kvdb.RwTx) error { @@ -331,6 +331,9 @@ func (cm *circuitMap) restoreMemState() error { return nil + }, func() { + opened = make(map[CircuitKey]*PaymentCircuit) + pending = make(map[CircuitKey]*PaymentCircuit) }); err != nil { return err } @@ -483,7 +486,7 @@ func (cm *circuitMap) TrimOpenCircuits(chanID lnwire.ShortChannelID, } return nil - }) + }, func() {}) } // LookupByHTLC looks up the payment circuit by the outgoing channel and HTLC @@ -730,7 +733,7 @@ func (cm *circuitMap) OpenCircuits(keystones ...Keystone) error { } return nil - }) + }, func() {}) if err != nil { return err diff --git a/htlcswitch/decayedlog.go b/htlcswitch/decayedlog.go index 8b8a78efd..59694b076 100644 --- a/htlcswitch/decayedlog.go +++ b/htlcswitch/decayedlog.go @@ -131,7 +131,7 @@ func (d *DecayedLog) initBuckets() error { } return nil - }) + }, func() {}) } // Stop halts the garbage collector and closes boltdb. diff --git a/htlcswitch/payment_result.go b/htlcswitch/payment_result.go index d341ba124..fa14f37a8 100644 --- a/htlcswitch/payment_result.go +++ b/htlcswitch/payment_result.go @@ -305,5 +305,5 @@ func (store *networkResultStore) cleanStore(keep map[uint64]struct{}) error { } return nil - }) + }, func() {}) } diff --git a/htlcswitch/sequencer.go b/htlcswitch/sequencer.go index 5b1526b6b..ae47c1827 100644 --- a/htlcswitch/sequencer.go +++ b/htlcswitch/sequencer.go @@ -100,6 +100,8 @@ func (s *persistentSequencer) NextID() (uint64, error) { nextIDBkt.SetSequence(nextHorizonID) return nil + }, func() { + nextHorizonID = 0 }); err != nil { return 0, err } @@ -124,5 +126,5 @@ func (s *persistentSequencer) initDB() error { return kvdb.Update(s.db, func(tx kvdb.RwTx) error { _, err := tx.CreateTopLevelBucket(nextPaymentIDKey) return err - }) + }, func() {}) } diff --git a/macaroons/store.go b/macaroons/store.go index 1e4447a8e..4bf758240 100644 --- a/macaroons/store.go +++ b/macaroons/store.go @@ -62,7 +62,7 @@ func NewRootKeyStorage(db kvdb.Backend) (*RootKeyStorage, error) { err := kvdb.Update(db, func(tx kvdb.RwTx) error { _, err := tx.CreateTopLevelBucket(rootKeyBucketName) return err - }) + }, func() {}) if err != nil { return nil, err } @@ -123,7 +123,7 @@ func (r *RootKeyStorage) CreateUnlock(password *[]byte) error { r.encKey = encKey return nil - }) + }, func() {}) } // Get implements the Get method for the bakery.RootKeyStorage interface. @@ -211,6 +211,8 @@ func (r *RootKeyStorage) RootKey(ctx context.Context) ([]byte, []byte, error) { return err } return ns.Put(id, encKey) + }, func() { + rootKey = nil }) if err != nil { return nil, nil, err @@ -310,6 +312,8 @@ func (r *RootKeyStorage) DeleteMacaroonID( rootKeyIDDeleted = rootKeyID return nil + }, func() { + rootKeyIDDeleted = nil }) if err != nil { return nil, err diff --git a/nursery_store.go b/nursery_store.go index 97fe08cc3..6254021f2 100644 --- a/nursery_store.go +++ b/nursery_store.go @@ -283,7 +283,7 @@ func (ns *nurseryStore) Incubate(kids []kidOutput, babies []babyOutput) error { } return nil - }) + }, func() {}) } // CribToKinder atomically moves a babyOutput in the crib bucket to the @@ -365,7 +365,7 @@ func (ns *nurseryStore) CribToKinder(bby *babyOutput) error { // This informs the utxo nursery that it should attempt to spend // this output when the blockchain reaches the maturity height. return hghtChanBucketCsv.Put(pfxOutputKey, []byte{}) - }) + }, func() {}) } // PreschoolToKinder atomically moves a kidOutput from the preschool bucket to @@ -463,7 +463,7 @@ func (ns *nurseryStore) PreschoolToKinder(kid *kidOutput, // that this CSV delayed output will be ready to broadcast at // the maturity height, after a brief period of incubation. return hghtChanBucket.Put(pfxOutputKey, []byte{}) - }) + }, func() {}) } // GraduateKinder atomically moves an output at the provided height into the @@ -525,7 +525,7 @@ func (ns *nurseryStore) GraduateKinder(height uint32, kid *kidOutput) error { // using graduate-prefixed key. return chanBucket.Put(pfxOutputKey, gradBuffer.Bytes()) - }) + }, func() {}) } // FetchClass returns a list of babyOutputs in the crib bucket whose CLTV @@ -844,7 +844,7 @@ func (ns *nurseryStore) RemoveChannel(chanPoint *wire.OutPoint) error { } return removeBucketIfExists(chanIndex, chanBytes) - }) + }, func() {}) } // Helper Methods diff --git a/routing/missioncontrol_store.go b/routing/missioncontrol_store.go index 5113b0b0f..a0243996b 100644 --- a/routing/missioncontrol_store.go +++ b/routing/missioncontrol_store.go @@ -41,10 +41,7 @@ type missionControlStore struct { } func newMissionControlStore(db kvdb.Backend, maxRecords int) (*missionControlStore, error) { - store := &missionControlStore{ - db: db, - maxRecords: maxRecords, - } + var store *missionControlStore // Create buckets if not yet existing. err := kvdb.Update(db, func(tx kvdb.RwTx) error { @@ -64,6 +61,11 @@ func newMissionControlStore(db kvdb.Backend, maxRecords int) (*missionControlSto } return nil + }, func() { + store = &missionControlStore{ + db: db, + maxRecords: maxRecords, + } }) if err != nil { return nil, err @@ -81,7 +83,7 @@ func (b *missionControlStore) clear() error { _, err := tx.CreateTopLevelBucket(resultsKey) return err - }) + }, func() {}) } // fetchAll returns all results currently stored in the database. @@ -251,7 +253,7 @@ func (b *missionControlStore) AddResult(rp *paymentResult) error { // Put into results bucket. return bucket.Put(k, v) - }) + }, func() {}) } // getResultKey returns a byte slice representing a unique key for this payment diff --git a/sweep/store.go b/sweep/store.go index c78759c48..1733107ff 100644 --- a/sweep/store.go +++ b/sweep/store.go @@ -92,7 +92,7 @@ func NewSweeperStore(db kvdb.Backend, chainHash *chainhash.Hash) ( err = migrateTxHashes(tx, txHashesBucket, chainHash) return err - }) + }, func() {}) if err != nil { return nil, err } @@ -193,7 +193,7 @@ func (s *sweeperStore) NotifyPublishTx(sweepTx *wire.MsgTx) error { hash := sweepTx.TxHash() return txHashesBucket.Put(hash[:], []byte{}) - }) + }, func() {}) } // GetLastPublishedTx returns the last tx that we called NotifyPublishTx diff --git a/watchtower/wtdb/client_db.go b/watchtower/wtdb/client_db.go index dc88d45c4..fb5117cc4 100644 --- a/watchtower/wtdb/client_db.go +++ b/watchtower/wtdb/client_db.go @@ -146,7 +146,7 @@ func OpenClientDB(dbPath string) (*ClientDB, error) { // initialized. This allows us to assume their presence throughout all // operations. If an known top-level bucket is expected to exist but is // missing, this will trigger a ErrUninitializedDB error. - err = kvdb.Update(clientDB.db, initClientDBBuckets) + err = kvdb.Update(clientDB.db, initClientDBBuckets, func() {}) if err != nil { bdb.Close() return nil, err @@ -293,6 +293,8 @@ func (c *ClientDB) CreateTower(lnAddr *lnwire.NetAddress) (*Tower, error) { // Store the new or updated tower under its tower id. return putTower(towers, tower) + }, func() { + tower = nil }) if err != nil { return nil, err @@ -379,7 +381,7 @@ func (c *ClientDB) RemoveTower(pubKey *btcec.PublicKey, addr net.Addr) error { } return nil - }) + }, func() {}) } // LoadTowerByID retrieves a tower by its tower ID. @@ -506,6 +508,8 @@ func (c *ClientDB) NextSessionKeyIndex(towerID TowerID) (uint32, error) { // Record the reserved session key index under this tower's id. return keyIndex.Put(towerIDBytes, indexBuf[:]) + }, func() { + index = 0 }) if err != nil { return 0, err @@ -558,7 +562,7 @@ func (c *ClientDB) CreateClientSession(session *ClientSession) error { // Finally, write the client session's body in the sessions // bucket. return putClientSessionBody(sessions, session) - }) + }, func() {}) } // ListClientSessions returns the set of all client sessions known to the db. An @@ -686,7 +690,7 @@ func (c *ClientDB) RegisterChannel(chanID lnwire.ChannelID, } return putChanSummary(chanSummaries, chanID, &summary) - }) + }, func() {}) } // MarkBackupIneligible records that the state identified by the (channel id, @@ -794,6 +798,8 @@ func (c *ClientDB) CommitUpdate(id *SessionID, return nil + }, func() { + lastApplied = 0 }) if err != nil { return 0, err @@ -899,7 +905,7 @@ func (c *ClientDB) AckUpdate(id *SessionID, seqNum uint16, // Finally, insert the ack into the sessionAcks sub-bucket. return sessionAcks.Put(seqNumBuf[:], b.Bytes()) - }) + }, func() {}) } // getClientSessionBody loads the body of a ClientSession from the sessions diff --git a/watchtower/wtdb/tower_db.go b/watchtower/wtdb/tower_db.go index a788a1003..f3ba6ef53 100644 --- a/watchtower/wtdb/tower_db.go +++ b/watchtower/wtdb/tower_db.go @@ -88,7 +88,7 @@ func OpenTowerDB(dbPath string) (*TowerDB, error) { // initialized. This allows us to assume their presence throughout all // operations. If an known top-level bucket is expected to exist but is // missing, this will trigger a ErrUninitializedDB error. - err = kvdb.Update(towerDB.db, initTowerDBBuckets) + err = kvdb.Update(towerDB.db, initTowerDBBuckets, func() {}) if err != nil { bdb.Close() return nil, err @@ -214,7 +214,7 @@ func (t *TowerDB) InsertSessionInfo(session *SessionInfo) error { // be deleted without needing to iterate over the entire // database. return touchSessionHintBkt(updateIndex, &session.ID) - }) + }, func() {}) } // InsertStateUpdate stores an update sent by the client after validating that @@ -296,6 +296,8 @@ func (t *TowerDB) InsertStateUpdate(update *SessionStateUpdate) (uint16, error) // hint under its session id. This will allow us to delete the // entries efficiently if the session is ever removed. return putHintForSession(updateIndex, &update.ID, update.Hint) + }, func() { + lastApplied = 0 }) if err != nil { return 0, err @@ -385,7 +387,7 @@ func (t *TowerDB) DeleteSession(target SessionID) error { // Finally, remove this session from the update index, which // also removes any of the indexed hints beneath it. return removeSessionHintBkt(updateIndex, &target) - }) + }, func() {}) } // QueryMatches searches against all known state updates for any that match the @@ -484,7 +486,7 @@ func (t *TowerDB) SetLookoutTip(epoch *chainntnfs.BlockEpoch) error { } return putLookoutEpoch(lookoutTip, epoch) - }) + }, func() {}) } // GetLookoutTip retrieves the current lookout tip block epoch from the tower diff --git a/watchtower/wtdb/version.go b/watchtower/wtdb/version.go index b7f8ccc22..a59d8c0e1 100644 --- a/watchtower/wtdb/version.go +++ b/watchtower/wtdb/version.go @@ -107,7 +107,7 @@ func initOrSyncVersions(db versionedDB, init bool, versions []version) error { if init { return kvdb.Update(db.bdb(), func(tx kvdb.RwTx) error { return initDBVersion(tx, getLatestDBVersion(versions)) - }) + }, func() {}) } // Otherwise, ensure that any migrations are applied to ensure the data @@ -159,5 +159,5 @@ func syncVersions(db versionedDB, versions []version) error { } return putDBVersion(tx, latestVersion) - }) + }, func() {}) }