From 4cc60493d233b4dcb4bc1ffab85a56f135d061e5 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 10 May 2018 17:40:29 -0400 Subject: [PATCH 1/2] peer+htlcswitch: randomize link commitment fee updates In this commit, we modify the behavior of links updating their commitment fees. Rather than attempting to update the commitment fee for each link every time a new block comes in, we'll use a timer with a random interval between 10 and 60 minutes for each link to determine when to update their corresponding commitment fee. This prevents us from oscillating the fee rate for our various commitment transactions. --- htlcswitch/link.go | 54 +++++++++++++++++++++++------- htlcswitch/link_test.go | 72 +++++++++++----------------------------- htlcswitch/test_utils.go | 58 +++++++++++++++++++++----------- peer.go | 20 ++++++----- 4 files changed, 111 insertions(+), 93 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index c674a9524..668214444 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -3,6 +3,7 @@ package htlcswitch import ( "bytes" "fmt" + prand "math/rand" "sync" "sync/atomic" "time" @@ -21,6 +22,10 @@ import ( "github.com/roasbeef/btcd/chaincfg/chainhash" ) +func init() { + prand.Seed(time.Now().UnixNano()) +} + const ( // expiryGraceDelta is a grace period that the timeout of incoming // HTLC's that pay directly to us (i.e we're the "exit node") must up @@ -36,6 +41,12 @@ const ( // for a new fee update. We'll use this as a fee floor when proposing // and accepting updates. minCommitFeePerKw = 253 + + // DefaultMinLinkFeeUpdateTimeout and DefaultMaxLinkFeeUpdateTimeout + // represent the default timeout bounds in which a link should propose + // to update its commitment fee rate. + DefaultMinLinkFeeUpdateTimeout = 10 * time.Minute + DefaultMaxLinkFeeUpdateTimeout = 60 * time.Minute ) // ForwardingPolicy describes the set of constraints that a given ChannelLink @@ -248,6 +259,12 @@ type ChannelLinkConfig struct { // in testing, it is here to ensure the sphinx replay detection on the // receiving node is persistent. UnsafeReplay bool + + // MinFeeUpdateTimeout and MaxFeeUpdateTimeout represent the timeout + // interval bounds in which a link will propose to update its commitment + // fee rate. A random timeout will be selected between these values. + MinFeeUpdateTimeout time.Duration + MaxFeeUpdateTimeout time.Duration } // channelLink is the service which drives a channel's commitment update @@ -342,6 +359,10 @@ type channelLink struct { logCommitTimer *time.Timer logCommitTick <-chan time.Time + // updateFeeTimer is the timer responsible for updating the link's + // commitment fee every time it fires. + updateFeeTimer *time.Timer + sync.RWMutex wg sync.WaitGroup @@ -427,6 +448,8 @@ func (l *channelLink) Start() error { } } + l.updateFeeTimer = time.NewTimer(l.randomFeeUpdateTimeout()) + l.wg.Add(1) go l.htlcManager() @@ -449,8 +472,8 @@ func (l *channelLink) Stop() { l.cfg.ChainEvents.Cancel() } + l.updateFeeTimer.Stop() l.channel.Stop() - l.overflowQueue.Stop() close(l.quit) @@ -835,7 +858,6 @@ func (l *channelLink) htlcManager() { out: for { - // We must always check if we failed at some point processing // the last update before processing the next. if l.failed { @@ -844,16 +866,10 @@ out: } select { - - // A new block has arrived, we'll check the network fee to see - // if we should adjust our commitment fee, and also update our - // track of the best current height. - case blockEpoch, ok := <-l.cfg.BlockEpochs.Epochs: - if !ok { - break out - } - - l.bestHeight = uint32(blockEpoch.Height) + // Our update fee timer has fired, so we'll check the network + // fee to see if we should adjust our commitment fee. + case <-l.updateFeeTimer.C: + l.updateFeeTimer.Reset(l.randomFeeUpdateTimeout()) // If we're not the initiator of the channel, don't we // don't control the fees, so we can ignore this. @@ -983,6 +999,20 @@ out: } } +// randomFeeUpdateTimeout returns a random timeout between the bounds defined +// within the link's configuration that will be used to determine when the link +// should propose an update to its commitment fee rate. +func (l *channelLink) randomFeeUpdateTimeout() time.Duration { + lower := int64(l.cfg.MinFeeUpdateTimeout) + upper := int64(l.cfg.MaxFeeUpdateTimeout) + rand := prand.Int63n(upper) + if rand < lower { + rand = lower + } + + return time.Duration(rand) +} + // handleDownStreamPkt processes an HTLC packet sent from the downstream HTLC // Switch. Possible messages sent by the switch include requests to forward new // HTLCs, timeout previously cleared HTLCs, and finally to settle currently diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index ea4ba96ba..f145e33b4 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -1498,9 +1498,11 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( BlockEpochs: globalEpoch, BatchTicker: ticker, FwdPkgGCTicker: NewBatchTicker(time.NewTicker(5 * time.Second)), - // Make the BatchSize large enough to not - // trigger commit update automatically during tests. - BatchSize: 10000, + // Make the BatchSize and Min/MaxFeeUpdateTimeout large enough + // to not trigger commit updates automatically during tests. + BatchSize: 10000, + MinFeeUpdateTimeout: 30 * time.Minute, + MaxFeeUpdateTimeout: 30 * time.Minute, } const startingHeight = 100 @@ -3451,22 +3453,9 @@ func TestChannelLinkUpdateCommitFee(t *testing.T) { defer n.stop() defer n.feeEstimator.Stop() - // First, we'll start off all channels at "height" 9000 by sending a - // new epoch to all the clients. - select { - case n.aliceBlockEpoch <- &chainntnfs.BlockEpoch{ - Height: 9000, - }: - case <-time.After(time.Second * 5): - t.Fatalf("link didn't read block epoch") - } - select { - case n.bobFirstBlockEpoch <- &chainntnfs.BlockEpoch{ - Height: 9000, - }: - case <-time.After(time.Second * 5): - t.Fatalf("link didn't read block epoch") - } + // For the sake of this test, we'll reset the timer to fire in a second + // so that Alice's link queries for a new network fee. + n.aliceChannelLink.updateFeeTimer.Reset(time.Millisecond) startingFeeRate := channels.aliceToBob.CommitFeeRate() @@ -3480,20 +3469,15 @@ func TestChannelLinkUpdateCommitFee(t *testing.T) { select { case n.feeEstimator.byteFeeIn <- startingFeeRateSatPerVByte: case <-time.After(time.Second * 5): - t.Fatalf("alice didn't query for the new " + - "network fee") + t.Fatalf("alice didn't query for the new network fee") } - time.Sleep(time.Millisecond * 500) + time.Sleep(time.Second) // The fee rate on the alice <-> bob channel should still be the same // on both sides. aliceFeeRate := channels.aliceToBob.CommitFeeRate() bobFeeRate := channels.bobToAlice.CommitFeeRate() - if aliceFeeRate != bobFeeRate { - t.Fatalf("fee rates don't match: expected %v got %v", - aliceFeeRate, bobFeeRate) - } if aliceFeeRate != startingFeeRate { t.Fatalf("alice's fee rate shouldn't have changed: "+ "expected %v, got %v", aliceFeeRate, startingFeeRate) @@ -3503,22 +3487,9 @@ func TestChannelLinkUpdateCommitFee(t *testing.T) { "expected %v, got %v", bobFeeRate, startingFeeRate) } - // Now we'll send a new block update to all end points, with a new - // height THAT'S OVER 9000!!! - select { - case n.aliceBlockEpoch <- &chainntnfs.BlockEpoch{ - Height: 9001, - }: - case <-time.After(time.Second * 5): - t.Fatalf("link didn't read block epoch") - } - select { - case n.bobFirstBlockEpoch <- &chainntnfs.BlockEpoch{ - Height: 9001, - }: - case <-time.After(time.Second * 5): - t.Fatalf("link didn't read block epoch") - } + // We'll reset the timer once again to ensure Alice's link queries for a + // new network fee. + n.aliceChannelLink.updateFeeTimer.Reset(time.Millisecond) // Next, we'll set up a deliver a fee rate that's triple the current // fee rate. This should cause the Alice (the initiator) to trigger a @@ -3527,11 +3498,10 @@ func TestChannelLinkUpdateCommitFee(t *testing.T) { select { case n.feeEstimator.byteFeeIn <- startingFeeRateSatPerVByte * 3: case <-time.After(time.Second * 5): - t.Fatalf("alice didn't query for the new " + - "network fee") + t.Fatalf("alice didn't query for the new network fee") } - time.Sleep(time.Second * 2) + time.Sleep(time.Second) // At this point, Alice should've triggered a new fee update that // increased the fee rate to match the new rate. @@ -3545,10 +3515,6 @@ func TestChannelLinkUpdateCommitFee(t *testing.T) { t.Fatalf("bob's fee rate didn't change: expected %v, got %v", newFeeRate, aliceFeeRate) } - if aliceFeeRate != bobFeeRate { - t.Fatalf("fee rates don't match: expected %v got %v", - aliceFeeRate, bobFeeRate) - } } // TestChannelLinkAcceptDuplicatePayment tests that if a link receives an @@ -3917,9 +3883,11 @@ func restartLink(aliceChannel *lnwallet.LightningChannel, aliceSwitch *Switch, BlockEpochs: globalEpoch, BatchTicker: ticker, FwdPkgGCTicker: NewBatchTicker(time.NewTicker(5 * time.Second)), - // Make the BatchSize large enough to not - // trigger commit update automatically during tests. - BatchSize: 10000, + // Make the BatchSize and Min/MaxFeeUpdateTimeout large enough + // to not trigger commit updates automatically during tests. + BatchSize: 10000, + MinFeeUpdateTimeout: 30 * time.Minute, + MaxFeeUpdateTimeout: 30 * time.Minute, // Set any hodl flags requested for the new link. HodlMask: hodl.MaskFromFlags(hodlFlags...), DebugHTLC: len(hodlFlags) > 0, diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index da48a4126..06760bd13 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -882,6 +882,12 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, quit: make(chan struct{}), } + const ( + batchTimeout = 50 * time.Millisecond + fwdPkgTimeout = 5 * time.Second + feeUpdateTimeout = 30 * time.Minute + ) + pCache := &mockPreimageCache{ // hash -> preimage preimageMap: make(map[[32]byte][]byte), @@ -921,11 +927,14 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, UpdateContractSignals: func(*contractcourt.ContractSignals) error { return nil }, - ChainEvents: &contractcourt.ChainEventSubscription{}, - SyncStates: true, - BatchTicker: &mockTicker{aliceTicker.C}, - FwdPkgGCTicker: &mockTicker{time.NewTicker(5 * time.Second).C}, - BatchSize: 10, + ChainEvents: &contractcourt.ChainEventSubscription{}, + SyncStates: true, + BatchSize: 10, + BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C}, + FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C}, + MinFeeUpdateTimeout: feeUpdateTimeout, + MaxFeeUpdateTimeout: feeUpdateTimeout, + OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {}, }, aliceChannel, startingHeight, @@ -970,11 +979,14 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, UpdateContractSignals: func(*contractcourt.ContractSignals) error { return nil }, - ChainEvents: &contractcourt.ChainEventSubscription{}, - SyncStates: true, - BatchTicker: &mockTicker{firstBobTicker.C}, - FwdPkgGCTicker: &mockTicker{time.NewTicker(5 * time.Second).C}, - BatchSize: 10, + ChainEvents: &contractcourt.ChainEventSubscription{}, + SyncStates: true, + BatchSize: 10, + BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C}, + FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C}, + MinFeeUpdateTimeout: feeUpdateTimeout, + MaxFeeUpdateTimeout: feeUpdateTimeout, + OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {}, }, firstBobChannel, startingHeight, @@ -1019,11 +1031,14 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, UpdateContractSignals: func(*contractcourt.ContractSignals) error { return nil }, - ChainEvents: &contractcourt.ChainEventSubscription{}, - SyncStates: true, - BatchTicker: &mockTicker{secondBobTicker.C}, - FwdPkgGCTicker: &mockTicker{time.NewTicker(5 * time.Second).C}, - BatchSize: 10, + ChainEvents: &contractcourt.ChainEventSubscription{}, + SyncStates: true, + BatchSize: 10, + BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C}, + FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C}, + MinFeeUpdateTimeout: feeUpdateTimeout, + MaxFeeUpdateTimeout: feeUpdateTimeout, + OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {}, }, secondBobChannel, startingHeight, @@ -1068,11 +1083,14 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, UpdateContractSignals: func(*contractcourt.ContractSignals) error { return nil }, - ChainEvents: &contractcourt.ChainEventSubscription{}, - SyncStates: true, - BatchTicker: &mockTicker{carolTicker.C}, - FwdPkgGCTicker: &mockTicker{time.NewTicker(5 * time.Second).C}, - BatchSize: 10, + ChainEvents: &contractcourt.ChainEventSubscription{}, + SyncStates: true, + BatchSize: 10, + BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C}, + FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C}, + MinFeeUpdateTimeout: feeUpdateTimeout, + MaxFeeUpdateTimeout: feeUpdateTimeout, + OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {}, }, carolChannel, startingHeight, diff --git a/peer.go b/peer.go index f21ece2e5..26537cd29 100644 --- a/peer.go +++ b/peer.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "container/list" "fmt" "net" @@ -9,14 +10,11 @@ import ( "time" "github.com/davecgh/go-spew/spew" - "github.com/lightningnetwork/lnd/brontide" - "github.com/lightningnetwork/lnd/contractcourt" - - "bytes" - "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/brontide" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwallet" @@ -551,11 +549,15 @@ func (p *peer) addLink(chanPoint *wire.OutPoint, time.NewTicker(50 * time.Millisecond)), FwdPkgGCTicker: htlcswitch.NewBatchTicker( time.NewTicker(time.Minute)), - BatchSize: 10, - UnsafeReplay: cfg.UnsafeReplay, + BatchSize: 10, + UnsafeReplay: cfg.UnsafeReplay, + MinFeeUpdateTimeout: htlcswitch.DefaultMinLinkFeeUpdateTimeout, + MaxFeeUpdateTimeout: htlcswitch.DefaultMaxLinkFeeUpdateTimeout, } - link := htlcswitch.NewChannelLink(linkCfg, lnChan, - uint32(currentHeight)) + + link := htlcswitch.NewChannelLink( + linkCfg, lnChan, uint32(currentHeight), + ) // With the channel link created, we'll now notify the htlc switch so // this channel can be used to dispatch local payments and also From 8198466972d27f1667a3ae0b389698634785324e Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 31 May 2018 20:31:40 -0700 Subject: [PATCH 2/2] multi: move block epochs dependency from links to switch In this commit, we move the block height dependency from the links in the switch to the switch itself. This is possible due to a recent change on the links no longer depending on the block height to update their commitment fees. We'll now only have the switch be alerted of new blocks coming in and links will retrieve the height from it atomically. --- htlcswitch/link.go | 23 ++--------- htlcswitch/link_test.go | 26 +++---------- htlcswitch/mock.go | 23 ++++++----- htlcswitch/switch.go | 51 +++++++++++++++++++++---- htlcswitch/switch_test.go | 80 +++++++++++++++++++-------------------- htlcswitch/test_utils.go | 74 ++++-------------------------------- peer.go | 5 +-- server.go | 8 +++- test_utils.go | 9 ++++- 9 files changed, 128 insertions(+), 171 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 668214444..03ee9d5fe 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -2,17 +2,15 @@ package htlcswitch import ( "bytes" + "crypto/sha256" "fmt" prand "math/rand" "sync" "sync/atomic" "time" - "crypto/sha256" - "github.com/davecgh/go-spew/spew" "github.com/go-errors/errors" - "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch/hodl" @@ -213,13 +211,6 @@ type ChannelLinkConfig struct { // transaction to ensure timely confirmation. FeeEstimator lnwallet.FeeEstimator - // BlockEpochs is an active block epoch event stream backed by an - // active ChainNotifier instance. The ChannelLink will use new block - // notifications sent over this channel to decide when a _new_ HTLC is - // too close to expiry, and also when any active HTLC's have expired - // (or are close to expiry). - BlockEpochs *chainntnfs.BlockEpochEvent - // DebugHTLC should be turned on if you want all HTLCs sent to a node // with the debug htlc R-Hash are immediately settled in the next // available state transition. @@ -290,10 +281,6 @@ type channelLink struct { // method in state machine. batchCounter uint32 - // bestHeight is the best known height of the main chain. The link will - // use this information to govern decisions based on HTLC timeouts. - bestHeight uint32 - // keystoneBatch represents a volatile list of keystones that must be // written before attempting to sign the next commitment txn. These // represent all the HTLC's forwarded to the link from the switch. Once @@ -371,8 +358,8 @@ type channelLink struct { // NewChannelLink creates a new instance of a ChannelLink given a configuration // and active channel that will be used to verify/apply updates to. -func NewChannelLink(cfg ChannelLinkConfig, channel *lnwallet.LightningChannel, - currentHeight uint32) ChannelLink { +func NewChannelLink(cfg ChannelLinkConfig, + channel *lnwallet.LightningChannel) ChannelLink { return &channelLink{ cfg: cfg, @@ -381,7 +368,6 @@ func NewChannelLink(cfg ChannelLinkConfig, channel *lnwallet.LightningChannel, // TODO(roasbeef): just do reserve here? logCommitTimer: time.NewTimer(300 * time.Millisecond), overflowQueue: newPacketQueue(lnwallet.MaxHTLCNumber / 2), - bestHeight: currentHeight, htlcUpdates: make(chan []channeldb.HTLC), quit: make(chan struct{}), } @@ -804,7 +790,6 @@ func (l *channelLink) fwdPkgGarbager() { func (l *channelLink) htlcManager() { defer func() { l.wg.Done() - l.cfg.BlockEpochs.Cancel() log.Infof("ChannelLink(%v) has exited", l) }() @@ -2095,7 +2080,7 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, continue } - heightNow := l.bestHeight + heightNow := l.cfg.Switch.BestHeight() fwdInfo := chanIterator.ForwardingInstructions() switch fwdInfo.NextHop { diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index f145e33b4..d76090125 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -6,6 +6,7 @@ import ( "encoding/binary" "fmt" "io" + "math" "reflect" "runtime" "strings" @@ -13,12 +14,9 @@ import ( "testing" "time" - "math" - "github.com/coreos/bbolt" "github.com/davecgh/go-spew/spew" "github.com/go-errors/errors" - "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch/hodl" @@ -1057,7 +1055,7 @@ func TestChannelLinkMultiHopUnknownNextHop(t *testing.T) { htlcAmt, totalTimelock, hops := generateHops(amount, testStartingHeight, n.firstBobChannelLink, n.carolChannelLink) - daveServer, err := newMockServer(t, "dave", nil) + daveServer, err := newMockServer(t, "dave", testStartingHeight, nil) if err != nil { t.Fatalf("unable to init dave's server: %v", err) } @@ -1443,11 +1441,6 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( } var ( - globalEpoch = &chainntnfs.BlockEpochEvent{ - Epochs: make(chan *chainntnfs.BlockEpoch), - Cancel: func() { - }, - } invoiceRegistry = newMockRegistry() decoder = newMockIteratorDecoder() obfuscator = NewMockObfuscator() @@ -1468,7 +1461,7 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( } aliceDb := aliceChannel.State().Db - aliceSwitch, err := initSwitchWithDB(aliceDb) + aliceSwitch, err := initSwitchWithDB(testStartingHeight, aliceDb) if err != nil { return nil, nil, nil, nil, nil, nil, err } @@ -1495,7 +1488,6 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( }, Registry: invoiceRegistry, ChainEvents: &contractcourt.ChainEventSubscription{}, - BlockEpochs: globalEpoch, BatchTicker: ticker, FwdPkgGCTicker: NewBatchTicker(time.NewTicker(5 * time.Second)), // Make the BatchSize and Min/MaxFeeUpdateTimeout large enough @@ -1506,7 +1498,7 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( } const startingHeight = 100 - aliceLink := NewChannelLink(aliceCfg, aliceChannel, startingHeight) + aliceLink := NewChannelLink(aliceCfg, aliceChannel) start := func() error { return aliceSwitch.AddLink(aliceLink) } @@ -3825,11 +3817,6 @@ func restartLink(aliceChannel *lnwallet.LightningChannel, aliceSwitch *Switch, hodlFlags []hodl.Flag) (ChannelLink, chan time.Time, func(), error) { var ( - globalEpoch = &chainntnfs.BlockEpochEvent{ - Epochs: make(chan *chainntnfs.BlockEpoch), - Cancel: func() { - }, - } invoiceRegistry = newMockRegistry() decoder = newMockIteratorDecoder() obfuscator = NewMockObfuscator() @@ -3854,7 +3841,7 @@ func restartLink(aliceChannel *lnwallet.LightningChannel, aliceSwitch *Switch, if aliceSwitch == nil { var err error - aliceSwitch, err = initSwitchWithDB(aliceDb) + aliceSwitch, err = initSwitchWithDB(testStartingHeight, aliceDb) if err != nil { return nil, nil, nil, err } @@ -3880,7 +3867,6 @@ func restartLink(aliceChannel *lnwallet.LightningChannel, aliceSwitch *Switch, }, Registry: invoiceRegistry, ChainEvents: &contractcourt.ChainEventSubscription{}, - BlockEpochs: globalEpoch, BatchTicker: ticker, FwdPkgGCTicker: NewBatchTicker(time.NewTicker(5 * time.Second)), // Make the BatchSize and Min/MaxFeeUpdateTimeout large enough @@ -3894,7 +3880,7 @@ func restartLink(aliceChannel *lnwallet.LightningChannel, aliceSwitch *Switch, } const startingHeight = 100 - aliceLink := NewChannelLink(aliceCfg, aliceChannel, startingHeight) + aliceLink := NewChannelLink(aliceCfg, aliceChannel) if err := aliceSwitch.AddLink(aliceLink); err != nil { return nil, nil, nil, err } diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index db1dcc655..dc47730f5 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -1,19 +1,17 @@ package htlcswitch import ( + "bytes" "crypto/sha256" "encoding/binary" "fmt" + "io" "io/ioutil" "sync" + "sync/atomic" "testing" "time" - "io" - "sync/atomic" - - "bytes" - "github.com/btcsuite/fastsha256" "github.com/go-errors/errors" "github.com/lightningnetwork/lightning-onion" @@ -122,7 +120,7 @@ type mockServer struct { var _ lnpeer.Peer = (*mockServer)(nil) -func initSwitchWithDB(db *channeldb.DB) (*Switch, error) { +func initSwitchWithDB(startingHeight uint32, db *channeldb.DB) (*Switch, error) { if db == nil { tempPath, err := ioutil.TempDir("", "switchdb") if err != nil { @@ -135,7 +133,7 @@ func initSwitchWithDB(db *channeldb.DB) (*Switch, error) { } } - return New(Config{ + cfg := Config{ DB: db, SwitchPackager: channeldb.NewSwitchPackager(), FwdingLog: &mockForwardingLog{ @@ -144,15 +142,20 @@ func initSwitchWithDB(db *channeldb.DB) (*Switch, error) { FetchLastChannelUpdate: func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) { return nil, nil }, - }) + Notifier: &mockNotifier{}, + } + + return New(cfg, startingHeight) } -func newMockServer(t testing.TB, name string, db *channeldb.DB) (*mockServer, error) { +func newMockServer(t testing.TB, name string, startingHeight uint32, + db *channeldb.DB) (*mockServer, error) { + var id [33]byte h := sha256.Sum256([]byte(name)) copy(id[:], h[:]) - htlcSwitch, err := initSwitchWithDB(db) + htlcSwitch, err := initSwitchWithDB(startingHeight, db) if err != nil { return nil, err } diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 91d6b0e11..f7d3b5c88 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -2,23 +2,22 @@ package htlcswitch import ( "bytes" + "crypto/sha256" "fmt" "sync" "sync/atomic" "time" - "crypto/sha256" - "github.com/coreos/bbolt" "github.com/davecgh/go-spew/spew" - "github.com/roasbeef/btcd/btcec" - "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcutil" ) @@ -142,6 +141,10 @@ type Config struct { // provide payment senders our latest policy when sending encrypted // error messages. FetchLastChannelUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) + + // Notifier is an instance of a chain notifier that we'll use to signal + // the switch when a new block has arrived. + Notifier chainntnfs.ChainNotifier } // Switch is the central messaging bus for all incoming/outgoing HTLCs. @@ -155,8 +158,14 @@ type Config struct { type Switch struct { started int32 // To be used atomically. shutdown int32 // To be used atomically. - wg sync.WaitGroup - quit chan struct{} + + // bestHeight is the best known height of the main chain. The links will + // be used this information to govern decisions based on HTLC timeouts. + // This will be retrieved by the registered links atomically. + bestHeight uint32 + + wg sync.WaitGroup + quit chan struct{} // cfg is a copy of the configuration struct that the htlc switch // service was initialized with. @@ -229,10 +238,15 @@ type Switch struct { // to the forwarding log. fwdEventMtx sync.Mutex pendingFwdingEvents []channeldb.ForwardingEvent + + // blockEpochStream is an active block epoch event stream backed by an + // active ChainNotifier instance. This will be used to retrieve the + // lastest height of the chain. + blockEpochStream *chainntnfs.BlockEpochEvent } // New creates the new instance of htlc switch. -func New(cfg Config) (*Switch, error) { +func New(cfg Config, currentHeight uint32) (*Switch, error) { circuitMap, err := NewCircuitMap(&CircuitMapConfig{ DB: cfg.DB, ExtractErrorEncrypter: cfg.ExtractErrorEncrypter, @@ -247,6 +261,7 @@ func New(cfg Config) (*Switch, error) { } return &Switch{ + bestHeight: currentHeight, cfg: &cfg, circuits: circuitMap, paymentSequencer: sequencer, @@ -1339,8 +1354,10 @@ func (s *Switch) CloseLink(chanPoint *wire.OutPoint, closeType ChannelCloseType, func (s *Switch) htlcForwarder() { defer s.wg.Done() - // Remove all links once we've been signalled for shutdown. defer func() { + s.blockEpochStream.Cancel() + + // Remove all links once we've been signalled for shutdown. s.indexMtx.Lock() for _, link := range s.linkIndex { if err := s.removeLink(link.ChanID()); err != nil { @@ -1378,8 +1395,15 @@ func (s *Switch) htlcForwarder() { fwdEventTicker := time.NewTicker(15 * time.Second) defer fwdEventTicker.Stop() +out: for { select { + case blockEpoch, ok := <-s.blockEpochStream.Epochs: + if !ok { + break out + } + + atomic.StoreUint32(&s.bestHeight, uint32(blockEpoch.Height)) // A local close request has arrived, we'll forward this to the // relevant link (if it exists) so the channel can be // cooperatively closed (if possible). @@ -1549,6 +1573,12 @@ func (s *Switch) Start() error { log.Infof("Starting HTLC Switch") + blockEpochStream, err := s.cfg.Notifier.RegisterBlockEpochNtfn() + if err != nil { + return err + } + s.blockEpochStream = blockEpochStream + s.wg.Add(1) go s.htlcForwarder() @@ -2033,3 +2063,8 @@ func (s *Switch) FlushForwardingEvents() error { // forwarding log. return s.cfg.FwdingLog.AddForwardingEvents(events) } + +// BestHeight returns the best height known to the switch. +func (s *Switch) BestHeight() uint32 { + return atomic.LoadUint32(&s.bestHeight) +} diff --git a/htlcswitch/switch_test.go b/htlcswitch/switch_test.go index 82932d3c0..ae0a2586f 100644 --- a/htlcswitch/switch_test.go +++ b/htlcswitch/switch_test.go @@ -30,12 +30,12 @@ func genPreimage() ([32]byte, error) { func TestSwitchSendPending(t *testing.T) { t.Parallel() - alicePeer, err := newMockServer(t, "alice", nil) + alicePeer, err := newMockServer(t, "alice", testStartingHeight, nil) if err != nil { t.Fatalf("unable to create alice server: %v", err) } - s, err := initSwitchWithDB(nil) + s, err := initSwitchWithDB(testStartingHeight, nil) if err != nil { t.Fatalf("unable to init switch: %v", err) } @@ -125,16 +125,16 @@ func TestSwitchSendPending(t *testing.T) { func TestSwitchForward(t *testing.T) { t.Parallel() - alicePeer, err := newMockServer(t, "alice", nil) + alicePeer, err := newMockServer(t, "alice", testStartingHeight, nil) if err != nil { t.Fatalf("unable to create alice server: %v", err) } - bobPeer, err := newMockServer(t, "bob", nil) + bobPeer, err := newMockServer(t, "bob", testStartingHeight, nil) if err != nil { t.Fatalf("unable to create bob server: %v", err) } - s, err := initSwitchWithDB(nil) + s, err := initSwitchWithDB(testStartingHeight, nil) if err != nil { t.Fatalf("unable to init switch: %v", err) } @@ -230,11 +230,11 @@ func TestSwitchForwardFailAfterFullAdd(t *testing.T) { chanID1, chanID2, aliceChanID, bobChanID := genIDs() - alicePeer, err := newMockServer(t, "alice", nil) + alicePeer, err := newMockServer(t, "alice", testStartingHeight, nil) if err != nil { t.Fatalf("unable to create alice server: %v", err) } - bobPeer, err := newMockServer(t, "bob", nil) + bobPeer, err := newMockServer(t, "bob", testStartingHeight, nil) if err != nil { t.Fatalf("unable to create bob server: %v", err) } @@ -249,7 +249,7 @@ func TestSwitchForwardFailAfterFullAdd(t *testing.T) { t.Fatalf("unable to open channeldb: %v", err) } - s, err := initSwitchWithDB(cdb) + s, err := initSwitchWithDB(testStartingHeight, cdb) if err != nil { t.Fatalf("unable to init switch: %v", err) } @@ -344,7 +344,7 @@ func TestSwitchForwardFailAfterFullAdd(t *testing.T) { t.Fatalf("unable to reopen channeldb: %v", err) } - s2, err := initSwitchWithDB(cdb2) + s2, err := initSwitchWithDB(testStartingHeight, cdb2) if err != nil { t.Fatalf("unable reinit switch: %v", err) } @@ -421,11 +421,11 @@ func TestSwitchForwardSettleAfterFullAdd(t *testing.T) { chanID1, chanID2, aliceChanID, bobChanID := genIDs() - alicePeer, err := newMockServer(t, "alice", nil) + alicePeer, err := newMockServer(t, "alice", testStartingHeight, nil) if err != nil { t.Fatalf("unable to create alice server: %v", err) } - bobPeer, err := newMockServer(t, "bob", nil) + bobPeer, err := newMockServer(t, "bob", testStartingHeight, nil) if err != nil { t.Fatalf("unable to create bob server: %v", err) } @@ -440,7 +440,7 @@ func TestSwitchForwardSettleAfterFullAdd(t *testing.T) { t.Fatalf("unable to open channeldb: %v", err) } - s, err := initSwitchWithDB(cdb) + s, err := initSwitchWithDB(testStartingHeight, cdb) if err != nil { t.Fatalf("unable to init switch: %v", err) } @@ -535,7 +535,7 @@ func TestSwitchForwardSettleAfterFullAdd(t *testing.T) { t.Fatalf("unable to reopen channeldb: %v", err) } - s2, err := initSwitchWithDB(cdb2) + s2, err := initSwitchWithDB(testStartingHeight, cdb2) if err != nil { t.Fatalf("unable reinit switch: %v", err) } @@ -615,11 +615,11 @@ func TestSwitchForwardDropAfterFullAdd(t *testing.T) { chanID1, chanID2, aliceChanID, bobChanID := genIDs() - alicePeer, err := newMockServer(t, "alice", nil) + alicePeer, err := newMockServer(t, "alice", testStartingHeight, nil) if err != nil { t.Fatalf("unable to create alice server: %v", err) } - bobPeer, err := newMockServer(t, "bob", nil) + bobPeer, err := newMockServer(t, "bob", testStartingHeight, nil) if err != nil { t.Fatalf("unable to create bob server: %v", err) } @@ -634,7 +634,7 @@ func TestSwitchForwardDropAfterFullAdd(t *testing.T) { t.Fatalf("unable to open channeldb: %v", err) } - s, err := initSwitchWithDB(cdb) + s, err := initSwitchWithDB(testStartingHeight, cdb) if err != nil { t.Fatalf("unable to init switch: %v", err) } @@ -721,7 +721,7 @@ func TestSwitchForwardDropAfterFullAdd(t *testing.T) { t.Fatalf("unable to reopen channeldb: %v", err) } - s2, err := initSwitchWithDB(cdb2) + s2, err := initSwitchWithDB(testStartingHeight, cdb2) if err != nil { t.Fatalf("unable reinit switch: %v", err) } @@ -778,11 +778,11 @@ func TestSwitchForwardFailAfterHalfAdd(t *testing.T) { chanID1, chanID2, aliceChanID, bobChanID := genIDs() - alicePeer, err := newMockServer(t, "alice", nil) + alicePeer, err := newMockServer(t, "alice", testStartingHeight, nil) if err != nil { t.Fatalf("unable to create alice server: %v", err) } - bobPeer, err := newMockServer(t, "bob", nil) + bobPeer, err := newMockServer(t, "bob", testStartingHeight, nil) if err != nil { t.Fatalf("unable to create bob server: %v", err) } @@ -797,7 +797,7 @@ func TestSwitchForwardFailAfterHalfAdd(t *testing.T) { t.Fatalf("unable to open channeldb: %v", err) } - s, err := initSwitchWithDB(cdb) + s, err := initSwitchWithDB(testStartingHeight, cdb) if err != nil { t.Fatalf("unable to init switch: %v", err) } @@ -879,7 +879,7 @@ func TestSwitchForwardFailAfterHalfAdd(t *testing.T) { t.Fatalf("unable to reopen channeldb: %v", err) } - s2, err := initSwitchWithDB(cdb2) + s2, err := initSwitchWithDB(testStartingHeight, cdb2) if err != nil { t.Fatalf("unable reinit switch: %v", err) } @@ -936,11 +936,11 @@ func TestSwitchForwardCircuitPersistence(t *testing.T) { chanID1, chanID2, aliceChanID, bobChanID := genIDs() - alicePeer, err := newMockServer(t, "alice", nil) + alicePeer, err := newMockServer(t, "alice", testStartingHeight, nil) if err != nil { t.Fatalf("unable to create alice server: %v", err) } - bobPeer, err := newMockServer(t, "bob", nil) + bobPeer, err := newMockServer(t, "bob", testStartingHeight, nil) if err != nil { t.Fatalf("unable to create bob server: %v", err) } @@ -955,7 +955,7 @@ func TestSwitchForwardCircuitPersistence(t *testing.T) { t.Fatalf("unable to open channeldb: %v", err) } - s, err := initSwitchWithDB(cdb) + s, err := initSwitchWithDB(testStartingHeight, cdb) if err != nil { t.Fatalf("unable to init switch: %v", err) } @@ -1036,7 +1036,7 @@ func TestSwitchForwardCircuitPersistence(t *testing.T) { t.Fatalf("unable to reopen channeldb: %v", err) } - s2, err := initSwitchWithDB(cdb2) + s2, err := initSwitchWithDB(testStartingHeight, cdb2) if err != nil { t.Fatalf("unable reinit switch: %v", err) } @@ -1129,7 +1129,7 @@ func TestSwitchForwardCircuitPersistence(t *testing.T) { t.Fatalf("unable to reopen channeldb: %v", err) } - s3, err := initSwitchWithDB(cdb3) + s3, err := initSwitchWithDB(testStartingHeight, cdb3) if err != nil { t.Fatalf("unable reinit switch: %v", err) } @@ -1167,16 +1167,16 @@ func TestSkipIneligibleLinksMultiHopForward(t *testing.T) { var packet *htlcPacket - alicePeer, err := newMockServer(t, "alice", nil) + alicePeer, err := newMockServer(t, "alice", testStartingHeight, nil) if err != nil { t.Fatalf("unable to create alice server: %v", err) } - bobPeer, err := newMockServer(t, "bob", nil) + bobPeer, err := newMockServer(t, "bob", testStartingHeight, nil) if err != nil { t.Fatalf("unable to create bob server: %v", err) } - s, err := initSwitchWithDB(nil) + s, err := initSwitchWithDB(testStartingHeight, nil) if err != nil { t.Fatalf("unable to init switch: %v", err) } @@ -1237,12 +1237,12 @@ func TestSkipIneligibleLinksLocalForward(t *testing.T) { // We'll create a single link for this test, marking it as being unable // to forward form the get go. - alicePeer, err := newMockServer(t, "alice", nil) + alicePeer, err := newMockServer(t, "alice", testStartingHeight, nil) if err != nil { t.Fatalf("unable to create alice server: %v", err) } - s, err := initSwitchWithDB(nil) + s, err := initSwitchWithDB(testStartingHeight, nil) if err != nil { t.Fatalf("unable to init switch: %v", err) } @@ -1289,16 +1289,16 @@ func TestSkipIneligibleLinksLocalForward(t *testing.T) { func TestSwitchCancel(t *testing.T) { t.Parallel() - alicePeer, err := newMockServer(t, "alice", nil) + alicePeer, err := newMockServer(t, "alice", testStartingHeight, nil) if err != nil { t.Fatalf("unable to create alice server: %v", err) } - bobPeer, err := newMockServer(t, "bob", nil) + bobPeer, err := newMockServer(t, "bob", testStartingHeight, nil) if err != nil { t.Fatalf("unable to create bob server: %v", err) } - s, err := initSwitchWithDB(nil) + s, err := initSwitchWithDB(testStartingHeight, nil) if err != nil { t.Fatalf("unable to init switch: %v", err) } @@ -1402,16 +1402,16 @@ func TestSwitchAddSamePayment(t *testing.T) { chanID1, chanID2, aliceChanID, bobChanID := genIDs() - alicePeer, err := newMockServer(t, "alice", nil) + alicePeer, err := newMockServer(t, "alice", testStartingHeight, nil) if err != nil { t.Fatalf("unable to create alice server: %v", err) } - bobPeer, err := newMockServer(t, "bob", nil) + bobPeer, err := newMockServer(t, "bob", testStartingHeight, nil) if err != nil { t.Fatalf("unable to create bob server: %v", err) } - s, err := initSwitchWithDB(nil) + s, err := initSwitchWithDB(testStartingHeight, nil) if err != nil { t.Fatalf("unable to init switch: %v", err) } @@ -1561,12 +1561,12 @@ func TestSwitchAddSamePayment(t *testing.T) { func TestSwitchSendPayment(t *testing.T) { t.Parallel() - alicePeer, err := newMockServer(t, "alice", nil) + alicePeer, err := newMockServer(t, "alice", testStartingHeight, nil) if err != nil { t.Fatalf("unable to create alice server: %v", err) } - s, err := initSwitchWithDB(nil) + s, err := initSwitchWithDB(testStartingHeight, nil) if err != nil { t.Fatalf("unable to init switch: %v", err) } @@ -1805,8 +1805,6 @@ func TestMultiHopPaymentForwardingEvents(t *testing.T) { } } - time.Sleep(time.Millisecond * 200) - // With all 10 payments sent. We'll now manually stop each of the // switches so we can examine their end state. n.stop() diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index 06760bd13..505b87c7e 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -17,7 +17,6 @@ import ( "github.com/btcsuite/fastsha256" "github.com/coreos/bbolt" "github.com/go-errors/errors" - "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/keychain" @@ -569,22 +568,13 @@ func generateRoute(hops ...ForwardingInfo) ([lnwire.OnionPacketSize]byte, error) type threeHopNetwork struct { aliceServer *mockServer aliceChannelLink *channelLink - aliceBlockEpoch chan *chainntnfs.BlockEpoch - aliceTicker *time.Ticker - - firstBobChannelLink *channelLink - bobFirstBlockEpoch chan *chainntnfs.BlockEpoch - firstBobTicker *time.Ticker bobServer *mockServer + firstBobChannelLink *channelLink secondBobChannelLink *channelLink - bobSecondBlockEpoch chan *chainntnfs.BlockEpoch - secondBobTicker *time.Ticker - carolChannelLink *channelLink carolServer *mockServer - carolBlockEpoch chan *chainntnfs.BlockEpoch - carolTicker *time.Ticker + carolChannelLink *channelLink feeEstimator *mockFeeEstimator @@ -762,11 +752,6 @@ func (n *threeHopNetwork) stop() { done <- struct{}{} }() - n.aliceTicker.Stop() - n.firstBobTicker.Stop() - n.secondBobTicker.Stop() - n.carolTicker.Stop() - for i := 0; i < 3; i++ { <-done } @@ -858,15 +843,15 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, carolDb := carolChannel.State().Db // Create three peers/servers. - aliceServer, err := newMockServer(t, "alice", aliceDb) + aliceServer, err := newMockServer(t, "alice", startingHeight, aliceDb) if err != nil { t.Fatalf("unable to create alice server: %v", err) } - bobServer, err := newMockServer(t, "bob", bobDb) + bobServer, err := newMockServer(t, "bob", startingHeight, bobDb) if err != nil { t.Fatalf("unable to create bob server: %v", err) } - carolServer, err := newMockServer(t, "carol", carolDb) + carolServer, err := newMockServer(t, "carol", startingHeight, carolDb) if err != nil { t.Fatalf("unable to create carol server: %v", err) } @@ -900,13 +885,6 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, } obfuscator := NewMockObfuscator() - aliceEpochChan := make(chan *chainntnfs.BlockEpoch) - aliceEpoch := &chainntnfs.BlockEpochEvent{ - Epochs: aliceEpochChan, - Cancel: func() { - }, - } - aliceTicker := time.NewTicker(50 * time.Millisecond) aliceChannelLink := NewChannelLink( ChannelLinkConfig{ Switch: aliceServer.htlcSwitch, @@ -921,7 +899,6 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, }, FetchLastChannelUpdate: mockGetChanUpdateMessage, Registry: aliceServer.registry, - BlockEpochs: aliceEpoch, FeeEstimator: feeEstimator, PreimageCache: pCache, UpdateContractSignals: func(*contractcourt.ContractSignals) error { @@ -937,7 +914,6 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {}, }, aliceChannel, - startingHeight, ) if err := aliceServer.htlcSwitch.AddLink(aliceChannelLink); err != nil { t.Fatalf("unable to add alice channel link: %v", err) @@ -952,13 +928,6 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, } }() - bobFirstEpochChan := make(chan *chainntnfs.BlockEpoch) - bobFirstEpoch := &chainntnfs.BlockEpochEvent{ - Epochs: bobFirstEpochChan, - Cancel: func() { - }, - } - firstBobTicker := time.NewTicker(50 * time.Millisecond) firstBobChannelLink := NewChannelLink( ChannelLinkConfig{ Switch: bobServer.htlcSwitch, @@ -973,7 +942,6 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, }, FetchLastChannelUpdate: mockGetChanUpdateMessage, Registry: bobServer.registry, - BlockEpochs: bobFirstEpoch, FeeEstimator: feeEstimator, PreimageCache: pCache, UpdateContractSignals: func(*contractcourt.ContractSignals) error { @@ -989,7 +957,6 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {}, }, firstBobChannel, - startingHeight, ) if err := bobServer.htlcSwitch.AddLink(firstBobChannelLink); err != nil { t.Fatalf("unable to add first bob channel link: %v", err) @@ -1004,13 +971,6 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, } }() - bobSecondEpochChan := make(chan *chainntnfs.BlockEpoch) - bobSecondEpoch := &chainntnfs.BlockEpochEvent{ - Epochs: bobSecondEpochChan, - Cancel: func() { - }, - } - secondBobTicker := time.NewTicker(50 * time.Millisecond) secondBobChannelLink := NewChannelLink( ChannelLinkConfig{ Switch: bobServer.htlcSwitch, @@ -1025,7 +985,6 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, }, FetchLastChannelUpdate: mockGetChanUpdateMessage, Registry: bobServer.registry, - BlockEpochs: bobSecondEpoch, FeeEstimator: feeEstimator, PreimageCache: pCache, UpdateContractSignals: func(*contractcourt.ContractSignals) error { @@ -1041,7 +1000,6 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {}, }, secondBobChannel, - startingHeight, ) if err := bobServer.htlcSwitch.AddLink(secondBobChannelLink); err != nil { t.Fatalf("unable to add second bob channel link: %v", err) @@ -1056,13 +1014,6 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, } }() - carolBlockEpoch := make(chan *chainntnfs.BlockEpoch) - carolEpoch := &chainntnfs.BlockEpochEvent{ - Epochs: bobSecondEpochChan, - Cancel: func() { - }, - } - carolTicker := time.NewTicker(50 * time.Millisecond) carolChannelLink := NewChannelLink( ChannelLinkConfig{ Switch: carolServer.htlcSwitch, @@ -1077,7 +1028,6 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, }, FetchLastChannelUpdate: mockGetChanUpdateMessage, Registry: carolServer.registry, - BlockEpochs: carolEpoch, FeeEstimator: feeEstimator, PreimageCache: pCache, UpdateContractSignals: func(*contractcourt.ContractSignals) error { @@ -1093,7 +1043,6 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {}, }, carolChannel, - startingHeight, ) if err := carolServer.htlcSwitch.AddLink(carolChannelLink); err != nil { t.Fatalf("unable to add carol channel link: %v", err) @@ -1111,22 +1060,13 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, return &threeHopNetwork{ aliceServer: aliceServer, aliceChannelLink: aliceChannelLink.(*channelLink), - aliceBlockEpoch: aliceEpochChan, - aliceTicker: aliceTicker, - - firstBobChannelLink: firstBobChannelLink.(*channelLink), - bobFirstBlockEpoch: bobFirstEpochChan, - firstBobTicker: firstBobTicker, bobServer: bobServer, + firstBobChannelLink: firstBobChannelLink.(*channelLink), secondBobChannelLink: secondBobChannelLink.(*channelLink), - bobSecondBlockEpoch: bobSecondEpochChan, - secondBobTicker: secondBobTicker, - carolChannelLink: carolChannelLink.(*channelLink), carolServer: carolServer, - carolBlockEpoch: carolBlockEpoch, - carolTicker: carolTicker, + carolChannelLink: carolChannelLink.(*channelLink), feeEstimator: feeEstimator, globalPolicy: globalPolicy, diff --git a/peer.go b/peer.go index 26537cd29..91058d7db 100644 --- a/peer.go +++ b/peer.go @@ -535,7 +535,6 @@ func (p *peer) addLink(chanPoint *wire.OutPoint, ForwardPackets: p.server.htlcSwitch.ForwardPackets, FwrdingPolicy: *forwardingPolicy, FeeEstimator: p.server.cc.feeEstimator, - BlockEpochs: blockEpoch, PreimageCache: p.server.witnessBeacon, ChainEvents: chainEvents, UpdateContractSignals: func(signals *contractcourt.ContractSignals) error { @@ -555,9 +554,7 @@ func (p *peer) addLink(chanPoint *wire.OutPoint, MaxFeeUpdateTimeout: htlcswitch.DefaultMaxLinkFeeUpdateTimeout, } - link := htlcswitch.NewChannelLink( - linkCfg, lnChan, uint32(currentHeight), - ) + link := htlcswitch.NewChannelLink(linkCfg, lnChan) // With the channel link created, we'll now notify the htlc switch so // this channel can be used to dispatch local payments and also diff --git a/server.go b/server.go index 45ad1dbb2..743affb14 100644 --- a/server.go +++ b/server.go @@ -284,6 +284,11 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl, debugPre[:], debugHash[:]) } + _, currentHeight, err := s.cc.chainIO.GetBestBlock() + if err != nil { + return nil, err + } + s.htlcSwitch, err = htlcswitch.New(htlcswitch.Config{ DB: chanDB, SelfKey: s.identityPriv.PubKey(), @@ -313,7 +318,8 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl, SwitchPackager: channeldb.NewSwitchPackager(), ExtractErrorEncrypter: s.sphinx.ExtractErrorEncrypter, FetchLastChannelUpdate: fetchLastChanUpdate(s, serializedPubKey), - }) + Notifier: s.cc.chainNotifier, + }, uint32(currentHeight)) if err != nil { return nil, err } diff --git a/test_utils.go b/test_utils.go index 0baf779d7..98531120e 100644 --- a/test_utils.go +++ b/test_utils.go @@ -341,10 +341,17 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, breachArbiter: breachArbiter, chainArb: chainArb, } + + _, currentHeight, err := s.cc.chainIO.GetBestBlock() + if err != nil { + return nil, nil, nil, nil, err + } + htlcSwitch, err := htlcswitch.New(htlcswitch.Config{ DB: dbAlice, SwitchPackager: channeldb.NewSwitchPackager(), - }) + Notifier: notifier, + }, uint32(currentHeight)) if err != nil { return nil, nil, nil, nil, err }