mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-05-03 08:20:30 +02:00
Merge pull request #1659 from cfromknecht/link-suppress-batch-ticker
htlcswitch/link: conditional batch ticker
This commit is contained in:
commit
639beb96ec
@ -107,22 +107,29 @@ type Ticker interface {
|
|||||||
|
|
||||||
// BatchTicker implements the Ticker interface, and wraps a time.Ticker.
|
// BatchTicker implements the Ticker interface, and wraps a time.Ticker.
|
||||||
type BatchTicker struct {
|
type BatchTicker struct {
|
||||||
ticker *time.Ticker
|
duration time.Duration
|
||||||
|
ticker *time.Ticker
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBatchTicker returns a new BatchTicker that wraps the passed time.Ticker.
|
// NewBatchTicker returns a new BatchTicker that wraps the passed time.Ticker.
|
||||||
func NewBatchTicker(t *time.Ticker) *BatchTicker {
|
func NewBatchTicker(d time.Duration) *BatchTicker {
|
||||||
return &BatchTicker{t}
|
return &BatchTicker{
|
||||||
|
duration: d,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start returns the tick channel for the underlying time.Ticker.
|
// Start returns the tick channel for the underlying time.Ticker.
|
||||||
func (t *BatchTicker) Start() <-chan time.Time {
|
func (t *BatchTicker) Start() <-chan time.Time {
|
||||||
|
t.ticker = time.NewTicker(t.duration)
|
||||||
return t.ticker.C
|
return t.ticker.C
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop stops the underlying time.Ticker.
|
// Stop stops the underlying time.Ticker.
|
||||||
func (t *BatchTicker) Stop() {
|
func (t *BatchTicker) Stop() {
|
||||||
t.ticker.Stop()
|
if t.ticker != nil {
|
||||||
|
t.ticker.Stop()
|
||||||
|
t.ticker = nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ChannelLinkConfig defines the configuration for the channel link. ALL
|
// ChannelLinkConfig defines the configuration for the channel link. ALL
|
||||||
@ -794,6 +801,7 @@ func (l *channelLink) fwdPkgGarbager() {
|
|||||||
// NOTE: This MUST be run as a goroutine.
|
// NOTE: This MUST be run as a goroutine.
|
||||||
func (l *channelLink) htlcManager() {
|
func (l *channelLink) htlcManager() {
|
||||||
defer func() {
|
defer func() {
|
||||||
|
l.cfg.BatchTicker.Stop()
|
||||||
l.wg.Done()
|
l.wg.Done()
|
||||||
log.Infof("ChannelLink(%v) has exited", l)
|
log.Infof("ChannelLink(%v) has exited", l)
|
||||||
}()
|
}()
|
||||||
@ -843,8 +851,12 @@ func (l *channelLink) htlcManager() {
|
|||||||
l.wg.Add(1)
|
l.wg.Add(1)
|
||||||
go l.fwdPkgGarbager()
|
go l.fwdPkgGarbager()
|
||||||
|
|
||||||
batchTick := l.cfg.BatchTicker.Start()
|
// We'll only need the batch ticker if we have outgoing updates that are
|
||||||
defer l.cfg.BatchTicker.Stop()
|
// not covered by our last signature. This value will be nil unless a
|
||||||
|
// downstream packet forces the batchCounter to be positive. After the
|
||||||
|
// batch is cleared, it will return to nil to prevent wasteful CPU time
|
||||||
|
// caused by the batch ticker waking up the htlcManager needlessly.
|
||||||
|
var maybeBatchTick <-chan time.Time
|
||||||
|
|
||||||
out:
|
out:
|
||||||
for {
|
for {
|
||||||
@ -926,10 +938,13 @@ out:
|
|||||||
break out
|
break out
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-batchTick:
|
case <-maybeBatchTick:
|
||||||
// If the current batch is empty, then we have no work
|
// If the current batch is empty, then we have no work
|
||||||
// here.
|
// here. We also disable the batch ticker from waking up
|
||||||
|
// the htlcManager while the batch is empty.
|
||||||
if l.batchCounter == 0 {
|
if l.batchCounter == 0 {
|
||||||
|
l.cfg.BatchTicker.Stop()
|
||||||
|
maybeBatchTick = nil
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -955,6 +970,13 @@ out:
|
|||||||
|
|
||||||
l.handleDownStreamPkt(packet, true)
|
l.handleDownStreamPkt(packet, true)
|
||||||
|
|
||||||
|
// If the downstream packet resulted in a non-empty
|
||||||
|
// batch, reinstate the batch ticker so that it can be
|
||||||
|
// cleared.
|
||||||
|
if l.batchCounter > 0 && maybeBatchTick == nil {
|
||||||
|
maybeBatchTick = l.cfg.BatchTicker.Start()
|
||||||
|
}
|
||||||
|
|
||||||
// A message from the switch was just received. This indicates
|
// A message from the switch was just received. This indicates
|
||||||
// that the link is an intermediate hop in a multi-hop HTLC
|
// that the link is an intermediate hop in a multi-hop HTLC
|
||||||
// circuit.
|
// circuit.
|
||||||
@ -977,6 +999,13 @@ out:
|
|||||||
|
|
||||||
l.handleDownStreamPkt(pkt, false)
|
l.handleDownStreamPkt(pkt, false)
|
||||||
|
|
||||||
|
// If the downstream packet resulted in a non-empty
|
||||||
|
// batch, reinstate the batch ticker so that it can be
|
||||||
|
// cleared.
|
||||||
|
if l.batchCounter > 0 && maybeBatchTick == nil {
|
||||||
|
maybeBatchTick = l.cfg.BatchTicker.Start()
|
||||||
|
}
|
||||||
|
|
||||||
// A message from the connected peer was just received. This
|
// A message from the connected peer was just received. This
|
||||||
// indicates that we have a new incoming HTLC, either directly
|
// indicates that we have a new incoming HTLC, either directly
|
||||||
// for us, or part of a multi-hop HTLC circuit.
|
// for us, or part of a multi-hop HTLC circuit.
|
||||||
|
@ -1497,7 +1497,7 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
|
|||||||
Registry: invoiceRegistry,
|
Registry: invoiceRegistry,
|
||||||
ChainEvents: &contractcourt.ChainEventSubscription{},
|
ChainEvents: &contractcourt.ChainEventSubscription{},
|
||||||
BatchTicker: ticker,
|
BatchTicker: ticker,
|
||||||
FwdPkgGCTicker: NewBatchTicker(time.NewTicker(5 * time.Second)),
|
FwdPkgGCTicker: NewBatchTicker(5 * time.Second),
|
||||||
// Make the BatchSize and Min/MaxFeeUpdateTimeout large enough
|
// Make the BatchSize and Min/MaxFeeUpdateTimeout large enough
|
||||||
// to not trigger commit updates automatically during tests.
|
// to not trigger commit updates automatically during tests.
|
||||||
BatchSize: 10000,
|
BatchSize: 10000,
|
||||||
@ -3885,7 +3885,7 @@ func restartLink(aliceChannel *lnwallet.LightningChannel, aliceSwitch *Switch,
|
|||||||
Registry: invoiceRegistry,
|
Registry: invoiceRegistry,
|
||||||
ChainEvents: &contractcourt.ChainEventSubscription{},
|
ChainEvents: &contractcourt.ChainEventSubscription{},
|
||||||
BatchTicker: ticker,
|
BatchTicker: ticker,
|
||||||
FwdPkgGCTicker: NewBatchTicker(time.NewTicker(5 * time.Second)),
|
FwdPkgGCTicker: NewBatchTicker(5 * time.Second),
|
||||||
// Make the BatchSize and Min/MaxFeeUpdateTimeout large enough
|
// Make the BatchSize and Min/MaxFeeUpdateTimeout large enough
|
||||||
// to not trigger commit updates automatically during tests.
|
// to not trigger commit updates automatically during tests.
|
||||||
BatchSize: 10000,
|
BatchSize: 10000,
|
||||||
|
10
peer.go
10
peer.go
@ -542,12 +542,10 @@ func (p *peer) addLink(chanPoint *wire.OutPoint,
|
|||||||
*chanPoint, signals,
|
*chanPoint, signals,
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
OnChannelFailure: onChannelFailure,
|
OnChannelFailure: onChannelFailure,
|
||||||
SyncStates: syncStates,
|
SyncStates: syncStates,
|
||||||
BatchTicker: htlcswitch.NewBatchTicker(
|
BatchTicker: htlcswitch.NewBatchTicker(50 * time.Millisecond),
|
||||||
time.NewTicker(50 * time.Millisecond)),
|
FwdPkgGCTicker: htlcswitch.NewBatchTicker(time.Minute),
|
||||||
FwdPkgGCTicker: htlcswitch.NewBatchTicker(
|
|
||||||
time.NewTicker(time.Minute)),
|
|
||||||
BatchSize: 10,
|
BatchSize: 10,
|
||||||
UnsafeReplay: cfg.UnsafeReplay,
|
UnsafeReplay: cfg.UnsafeReplay,
|
||||||
MinFeeUpdateTimeout: htlcswitch.DefaultMinLinkFeeUpdateTimeout,
|
MinFeeUpdateTimeout: htlcswitch.DefaultMinLinkFeeUpdateTimeout,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user