From 3942c7ca02435f0305a95fdd029d7c822991cd4a Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 23 Jul 2021 09:35:28 +0800 Subject: [PATCH] htlcswitch: clean circuits and keystones for closed channels In this commit, a new method `cleanClosedChannels` is added and called when a circuit map is created. This method will delete the payment circuits and keystones for closed channels. --- htlcswitch/circuit_map.go | 215 ++++++++++++++++++ htlcswitch/circuit_map_test.go | 388 +++++++++++++++++++++++++++++++++ 2 files changed, 603 insertions(+) create mode 100644 htlcswitch/circuit_map_test.go diff --git a/htlcswitch/circuit_map.go b/htlcswitch/circuit_map.go index 8056120c6..951c922f0 100644 --- a/htlcswitch/circuit_map.go +++ b/htlcswitch/circuit_map.go @@ -219,6 +219,11 @@ func NewCircuitMap(cfg *CircuitMapConfig) (CircuitMap, error) { return nil, err } + // Delete old circuits and keystones of closed channels. + if err := cm.cleanClosedChannels(); err != nil { + return nil, err + } + // Load any previously persisted circuit into back into memory. if err := cm.restoreMemState(); err != nil { return nil, err @@ -250,6 +255,216 @@ func (cm *circuitMap) initBuckets() error { }, func() {}) } +// cleanClosedChannels deletes all circuits and keystones related to closed +// channels. It first reads all the closed channels and caches the ShortChanIDs +// into a map for fast lookup. Then it iterates the circuit bucket and keystone +// bucket and deletes items whose ChanID matches the ShortChanID. +// +// NOTE: this operation can also be built into restoreMemState since the latter +// already opens and iterates the two root buckets, circuitAddKey and +// circuitKeystoneKey. Depending on the size of the buckets, this marginal gain +// may be worth investigating. Atm, for clarity, this operation is wrapped into +// its own function. +func (cm *circuitMap) cleanClosedChannels() error { + log.Infof("Cleaning circuits from disk for closed channels") + + // closedChanIDSet stores the short channel IDs for closed channels. + closedChanIDSet := make(map[lnwire.ShortChannelID]struct{}) + + // circuitKeySet stores the incoming circuit keys of the payment + // circuits that need to be deleted. + circuitKeySet := make(map[CircuitKey]struct{}) + + // keystoneKeySet stores the outgoing keys of the keystones that need + // to be deleted. + keystoneKeySet := make(map[CircuitKey]struct{}) + + // isClosedChannel is a helper closure that returns a bool indicating + // the chanID belongs to a closed channel. + isClosedChannel := func(chanID lnwire.ShortChannelID) bool { + // Skip if the channel ID is zero value. This has the effect + // that a zero value incoming or outgoing key will never be + // matched and its corresponding circuits or keystones are not + // deleted. + if chanID.ToUint64() == 0 { + return false + } + + _, ok := closedChanIDSet[chanID] + return ok + } + + // Find closed channels and cache their ShortChannelIDs into a map. + // This map will be used for looking up relative circuits and keystones. + closedChannels, err := cm.cfg.DB.FetchClosedChannels(false) + if err != nil { + return err + } + + for _, closedChannel := range closedChannels { + // Skip if the channel close is pending. + if closedChannel.IsPending { + continue + } + + closedChanIDSet[closedChannel.ShortChanID] = struct{}{} + } + + log.Debugf("Found %v closed channels", len(closedChanIDSet)) + + // Exit early if there are no closed channels. + if len(closedChanIDSet) == 0 { + log.Infof("Finished cleaning: no closed channels found, " + + "no actions taken.", + ) + return nil + } + + // Find the payment circuits and keystones that need to be deleted. + if err := kvdb.View(cm.cfg.DB, func(tx kvdb.RTx) error { + circuitBkt := tx.ReadBucket(circuitAddKey) + if circuitBkt == nil { + return ErrCorruptedCircuitMap + } + keystoneBkt := tx.ReadBucket(circuitKeystoneKey) + if keystoneBkt == nil { + return ErrCorruptedCircuitMap + } + + // If a circuit's incoming/outgoing key prefix matches the + // ShortChanID, it will be deleted. However, if the ShortChanID + // of the incoming key is zero, the circuit will be kept as it + // indicates a locally initiated payment. + if err := circuitBkt.ForEach(func(_, v []byte) error { + circuit, err := cm.decodeCircuit(v) + if err != nil { + return err + } + + // Check if the incoming channel ID can be found in the + // closed channel ID map. + if !isClosedChannel(circuit.Incoming.ChanID) { + return nil + } + + circuitKeySet[circuit.Incoming] = struct{}{} + + return nil + }); err != nil { + return err + } + + // If a keystone's InKey or OutKey matches the short channel id + // in the closed channel ID map, it will be deleted. + err := keystoneBkt.ForEach(func(k, v []byte) error { + var ( + inKey CircuitKey + outKey CircuitKey + ) + + // Decode the incoming and outgoing circuit keys. + if err := inKey.SetBytes(v); err != nil { + return err + } + if err := outKey.SetBytes(k); err != nil { + return err + } + + // Check if the incoming channel ID can be found in the + // closed channel ID map. + if isClosedChannel(inKey.ChanID) { + // If the incoming channel is closed, we can + // skip checking on outgoing channel ID because + // this keystone will be deleted. + keystoneKeySet[outKey] = struct{}{} + + // Technically the incoming keys found in + // keystone bucket should be a subset of + // circuit bucket. So a previous loop should + // have this inKey put inside circuitAddKey map + // already. We do this again to be sure the + // circuits are properly cleaned. Even this + // inKey doesn't exist in circuit bucket, we + // are fine as db deletion is a noop. + circuitKeySet[inKey] = struct{}{} + return nil + } + + // Check if the outgoing channel ID can be found in the + // closed channel ID map. Notice that we need to store + // the outgoing key because it's used for db query. + if isClosedChannel(outKey.ChanID) { + keystoneKeySet[outKey] = struct{}{} + + // Also update circuitKeySet to mark the + // payment circuit needs to be deleted. + circuitKeySet[inKey] = struct{}{} + } + + return nil + }) + return err + + }, func() { + // Reset the sets. + circuitKeySet = make(map[CircuitKey]struct{}) + keystoneKeySet = make(map[CircuitKey]struct{}) + }); err != nil { + return err + } + + log.Debugf("To be deleted: num_circuits=%v, num_keystones=%v", + len(circuitKeySet), len(keystoneKeySet), + ) + + numCircuitsDeleted := 0 + numKeystonesDeleted := 0 + + // Delete all the circuits and keystones for closed channels. + if err := kvdb.Update(cm.cfg.DB, func(tx kvdb.RwTx) error { + circuitBkt := tx.ReadWriteBucket(circuitAddKey) + if circuitBkt == nil { + return ErrCorruptedCircuitMap + } + keystoneBkt := tx.ReadWriteBucket(circuitKeystoneKey) + if keystoneBkt == nil { + return ErrCorruptedCircuitMap + } + + // Delete the ciruit. + for inKey := range circuitKeySet { + if err := circuitBkt.Delete(inKey.Bytes()); err != nil { + return err + } + + numCircuitsDeleted++ + } + + // Delete the keystone using the outgoing key. + for outKey := range keystoneKeySet { + err := keystoneBkt.Delete(outKey.Bytes()) + if err != nil { + return err + } + + numKeystonesDeleted++ + } + + return nil + }, func() {}); err != nil { + numCircuitsDeleted = 0 + numKeystonesDeleted = 0 + return err + } + + log.Infof("Finished cleaning: num_closed_channel=%v, "+ + "num_circuits=%v, num_keystone=%v", + len(closedChannels), numCircuitsDeleted, numKeystonesDeleted, + ) + + return nil +} + // restoreMemState loads the contents of the half circuit and full circuit // buckets from disk and reconstructs the in-memory representation of the // circuit map. Afterwards, the state of the hash index is reconstructed using diff --git a/htlcswitch/circuit_map_test.go b/htlcswitch/circuit_map_test.go new file mode 100644 index 000000000..b2faa61ca --- /dev/null +++ b/htlcswitch/circuit_map_test.go @@ -0,0 +1,388 @@ +package htlcswitch_test + +import ( + "bytes" + "fmt" + "io" + "testing" + + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/htlcswitch" + "github.com/lightningnetwork/lnd/kvdb" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/stretchr/testify/require" +) + +var ( + // closedChannelBucket stores summarization information concerning + // previously open, but now closed channels. + closedChannelBucket = []byte("closed-chan-bucket") +) + +// TestCircuitMapCleanClosedChannels checks that the circuits and keystones are +// deleted for closed channels upon restart. +func TestCircuitMapCleanClosedChannels(t *testing.T) { + t.Parallel() + + var ( + // chanID0 is a zero value channel ID indicating a locally + // initiated payment. + chanID0 = lnwire.NewShortChanIDFromInt(uint64(0)) + chanID1 = lnwire.NewShortChanIDFromInt(uint64(1)) + chanID2 = lnwire.NewShortChanIDFromInt(uint64(2)) + + inKey00 = htlcswitch.CircuitKey{ChanID: chanID0, HtlcID: 0} + inKey10 = htlcswitch.CircuitKey{ChanID: chanID1, HtlcID: 0} + inKey11 = htlcswitch.CircuitKey{ChanID: chanID1, HtlcID: 1} + inKey20 = htlcswitch.CircuitKey{ChanID: chanID2, HtlcID: 0} + inKey21 = htlcswitch.CircuitKey{ChanID: chanID2, HtlcID: 1} + inKey22 = htlcswitch.CircuitKey{ChanID: chanID2, HtlcID: 2} + outKey00 = htlcswitch.CircuitKey{ChanID: chanID0, HtlcID: 0} + outKey10 = htlcswitch.CircuitKey{ChanID: chanID1, HtlcID: 0} + outKey11 = htlcswitch.CircuitKey{ChanID: chanID1, HtlcID: 1} + outKey20 = htlcswitch.CircuitKey{ChanID: chanID2, HtlcID: 0} + outKey21 = htlcswitch.CircuitKey{ChanID: chanID2, HtlcID: 1} + outKey22 = htlcswitch.CircuitKey{ChanID: chanID2, HtlcID: 2} + ) + + type closeChannelParams struct { + chanID lnwire.ShortChannelID + isPending bool + } + + testParams := []struct { + name string + + // keystones is used to create and open circuits. A keystone is + // a pair of circuit keys, inKey and outKey, with the outKey + // optionally being empty. If a keystone with an outKey is used, + // a circuit will be created and opened, thus creating a circuit + // and a keystone in the DB. Otherwise, only the circuit is + // created. + keystones []htlcswitch.Keystone + + chanParams []closeChannelParams + deleted []htlcswitch.Keystone + untouched []htlcswitch.Keystone + }{ + { + name: "no deletion if there are no closed channels", + keystones: []htlcswitch.Keystone{ + // Creates a circuit and a keystone + {InKey: inKey10, OutKey: outKey10}, + }, + untouched: []htlcswitch.Keystone{ + {InKey: inKey10, OutKey: outKey10}, + }, + }, + { + name: "no deletion if channel is pending close", + chanParams: []closeChannelParams{ + // Creates a pending close channel. + {chanID: chanID1, isPending: true}, + }, + keystones: []htlcswitch.Keystone{ + // Creates a circuit and a keystone + {InKey: inKey10, OutKey: outKey10}, + }, + untouched: []htlcswitch.Keystone{ + {InKey: inKey10, OutKey: outKey10}, + }, + }, + { + name: "no deletion if the chanID is zero value", + chanParams: []closeChannelParams{ + // Creates a close channel with chanID0. + {chanID: chanID0, isPending: false}, + }, + keystones: []htlcswitch.Keystone{ + // Creates a circuit and a keystone + {InKey: inKey00, OutKey: outKey00}, + }, + untouched: []htlcswitch.Keystone{ + {InKey: inKey00, OutKey: outKey00}, + }, + }, + { + name: "delete half circuits on inKey match", + chanParams: []closeChannelParams{ + // Creates a close channel with chanID1. + {chanID: chanID1, isPending: false}, + }, + keystones: []htlcswitch.Keystone{ + // Creates a circuit, no keystone created + {InKey: inKey10}, + // Creates a circuit, no keystone created + {InKey: inKey11}, + // Creates a circuit and a keystone + {InKey: inKey20, OutKey: outKey20}, + }, + deleted: []htlcswitch.Keystone{ + {InKey: inKey00}, {InKey: inKey11}, + }, + untouched: []htlcswitch.Keystone{ + {InKey: inKey20, OutKey: outKey20}, + }, + }, + { + name: "delete half circuits on outKey match", + chanParams: []closeChannelParams{ + // Creates a close channel with chanID1. + {chanID: chanID1, isPending: false}, + }, + keystones: []htlcswitch.Keystone{ + // Creates a circuit and a keystone + {InKey: inKey20, OutKey: outKey10}, + // Creates a circuit and a keystone + {InKey: inKey21, OutKey: outKey11}, + // Creates a circuit and a keystone + {InKey: inKey22, OutKey: outKey21}, + }, + deleted: []htlcswitch.Keystone{ + {InKey: inKey20, OutKey: outKey10}, + {InKey: inKey21, OutKey: outKey11}, + }, + untouched: []htlcswitch.Keystone{ + {InKey: inKey22, OutKey: outKey21}, + }, + }, + { + name: "delete full circuits on inKey match", + chanParams: []closeChannelParams{ + // Creates a close channel with chanID1. + {chanID: chanID1, isPending: false}, + }, + keystones: []htlcswitch.Keystone{ + // Creates a circuit and a keystone + {InKey: inKey10, OutKey: outKey20}, + // Creates a circuit and a keystone + {InKey: inKey11, OutKey: outKey21}, + // Creates a circuit and a keystone + {InKey: inKey20, OutKey: outKey22}, + }, + deleted: []htlcswitch.Keystone{ + {InKey: inKey10, OutKey: outKey20}, + {InKey: inKey11, OutKey: outKey21}, + }, + untouched: []htlcswitch.Keystone{ + {InKey: inKey20, OutKey: outKey22}, + }, + }, + { + name: "delete full circuits on outKey match", + chanParams: []closeChannelParams{ + // Creates a close channel with chanID1. + {chanID: chanID1, isPending: false}, + }, + keystones: []htlcswitch.Keystone{ + // Creates a circuit and a keystone + {InKey: inKey20, OutKey: outKey10}, + // Creates a circuit and a keystone + {InKey: inKey21, OutKey: outKey11}, + // Creates a circuit and a keystone + {InKey: inKey22, OutKey: outKey20}, + }, + deleted: []htlcswitch.Keystone{ + {InKey: inKey20, OutKey: outKey10}, + {InKey: inKey21, OutKey: outKey11}, + }, + untouched: []htlcswitch.Keystone{ + {InKey: inKey22, OutKey: outKey20}, + }, + }, + { + name: "delete all circuits", + chanParams: []closeChannelParams{ + // Creates a close channel with chanID1. + {chanID: chanID1, isPending: false}, + // Creates a close channel with chanID2. + {chanID: chanID2, isPending: false}, + }, + keystones: []htlcswitch.Keystone{ + // Creates a circuit and a keystone + {InKey: inKey20, OutKey: outKey10}, + // Creates a circuit and a keystone + {InKey: inKey21, OutKey: outKey11}, + // Creates a circuit and a keystone + {InKey: inKey22, OutKey: outKey20}, + }, + deleted: []htlcswitch.Keystone{ + {InKey: inKey20, OutKey: outKey10}, + {InKey: inKey21, OutKey: outKey11}, + {InKey: inKey22, OutKey: outKey20}, + }, + }, + } + + for _, tt := range testParams { + test := tt + + t.Run(test.name, func(t *testing.T) { + cfg, circuitMap := newCircuitMap(t) + + // create test circuits + for _, ks := range test.keystones { + err := createTestCircuit(ks, circuitMap) + require.NoError( + t, err, + "failed to create test circuit", + ) + } + + // create close channels + err := kvdb.Update(cfg.DB, func(tx kvdb.RwTx) error { + for _, channel := range test.chanParams { + if err := createTestCloseChannelSummery( + tx, channel.isPending, + channel.chanID, + ); err != nil { + return err + } + } + return nil + }, func() {}) + + require.NoError( + t, err, + "failed to create close channel summery", + ) + + // Now, restart the circuit map, and check that the + // circuits and keystones of closed channels are + // deleted in DB. + _, circuitMap = restartCircuitMap(t, cfg) + + // Check that items are deleted. LookupCircuit and + // LookupOpenCircuit will check the cached circuits, + // which are loaded on restart from the DB. + for _, ks := range test.deleted { + assertKeystoneDeleted(t, circuitMap, ks) + } + + // We also check we are not deleting wanted circuits. + for _, ks := range test.untouched { + assertKeystoneNotDeleted(t, circuitMap, ks) + } + + }) + } + +} + +// createTestCircuit creates a circuit for testing with its incoming key being +// the keystone's InKey. If the keystone has an OutKey, the circuit will be +// opened, which causes a Keystone to be created in DB. +func createTestCircuit(ks htlcswitch.Keystone, cm htlcswitch.CircuitMap) error { + circuit := &htlcswitch.PaymentCircuit{ + Incoming: ks.InKey, + ErrorEncrypter: testExtracter, + } + + // First we will try to add an new circuit to the circuit map, this + // should succeed. + _, err := cm.CommitCircuits(circuit) + if err != nil { + return fmt.Errorf("failed to commit circuits: %v", err) + } + + // If the keystone has no outgoing key, we won't open it. + if ks.OutKey == htlcswitch.EmptyCircuitKey { + return nil + } + + // Open the circuit, implicitly creates a keystone on disk. + err = cm.OpenCircuits(ks) + if err != nil { + return fmt.Errorf("failed to open circuits: %v", err) + } + + return nil +} + +// assertKeystoneDeleted checks that a given keystone is deleted from the +// circuit map. +func assertKeystoneDeleted(t *testing.T, + cm htlcswitch.CircuitLookup, ks htlcswitch.Keystone) { + + c := cm.LookupCircuit(ks.InKey) + require.Nil(t, c, "no circuit should be found using InKey") + + if ks.OutKey != htlcswitch.EmptyCircuitKey { + c = cm.LookupOpenCircuit(ks.OutKey) + require.Nil(t, c, "no circuit should be found using OutKey") + } +} + +// assertKeystoneDeleted checks that a given keystone is not deleted from the +// circuit map. +func assertKeystoneNotDeleted(t *testing.T, + cm htlcswitch.CircuitLookup, ks htlcswitch.Keystone) { + + c := cm.LookupCircuit(ks.InKey) + require.NotNil(t, c, "expecting circuit found using InKey") + + if ks.OutKey != htlcswitch.EmptyCircuitKey { + c = cm.LookupOpenCircuit(ks.OutKey) + require.NotNil(t, c, "expecting circuit found using OutKey") + } +} + +// createTestCloseChannelSummery creates a CloseChannelSummery for testing. +func createTestCloseChannelSummery(tx kvdb.RwTx, isPending bool, + chanID lnwire.ShortChannelID) error { + + closedChanBucket, err := tx.CreateTopLevelBucket(closedChannelBucket) + if err != nil { + return err + } + outputPoint := wire.OutPoint{Hash: hash1, Index: 1} + + ccs := &channeldb.ChannelCloseSummary{ + ChanPoint: outputPoint, + ShortChanID: chanID, + ChainHash: hash1, + ClosingTXID: hash2, + CloseHeight: 100, + RemotePub: testEphemeralKey, + Capacity: btcutil.Amount(10000), + SettledBalance: btcutil.Amount(50000), + CloseType: channeldb.RemoteForceClose, + IsPending: isPending, + } + var b bytes.Buffer + if err := serializeChannelCloseSummary(&b, ccs); err != nil { + return err + } + + var chanPointBuf bytes.Buffer + if err := lnwire.WriteOutPoint(&chanPointBuf, outputPoint); err != nil { + return err + } + + return closedChanBucket.Put(chanPointBuf.Bytes(), b.Bytes()) +} + +func serializeChannelCloseSummary( + w io.Writer, + cs *channeldb.ChannelCloseSummary) error { + + err := channeldb.WriteElements( + w, + cs.ChanPoint, cs.ShortChanID, cs.ChainHash, cs.ClosingTXID, + cs.CloseHeight, cs.RemotePub, cs.Capacity, cs.SettledBalance, + cs.TimeLockedBalance, cs.CloseType, cs.IsPending, + ) + if err != nil { + return err + } + + // If this is a close channel summary created before the addition of + // the new fields, then we can exit here. + if cs.RemoteCurrentRevocation == nil { + return channeldb.WriteElements(w, false) + } + + return nil +}