Merge pull request #7846 from morehouse/refactor_mailcourier

htlcswitch: split mailCourier by courierType
This commit is contained in:
Oliver Gugger 2023-08-02 10:52:46 +02:00 committed by GitHub
commit 19c121c7a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -112,6 +112,8 @@ type mailBoxConfig struct {
// memoryMailBox is an implementation of the MailBox struct backed by purely
// in-memory queues.
//
// TODO(morehouse): use typed lists instead of list.Lists to avoid type asserts.
type memoryMailBox struct {
started sync.Once
stopped sync.Once
@ -202,8 +204,8 @@ const (
// NOTE: This method is part of the MailBox interface.
func (m *memoryMailBox) Start() {
m.started.Do(func() {
go m.mailCourier(wireCourier)
go m.mailCourier(pktCourier)
go m.wireMailCourier()
go m.pktMailCourier()
})
}
@ -365,62 +367,84 @@ func (p *pktWithExpiry) deadline(clock clock.Clock) <-chan time.Time {
return clock.TickAfter(p.expiry.Sub(clock.Now()))
}
// mailCourier is a dedicated goroutine whose job is to reliably deliver
// messages of a particular type. There are two types of couriers: wire
// couriers, and mail couriers. Depending on the passed courierType, this
// goroutine will assume one of two roles.
func (m *memoryMailBox) mailCourier(cType courierType) {
switch cType {
case wireCourier:
defer close(m.wireShutdown)
case pktCourier:
defer close(m.pktShutdown)
}
// TODO(roasbeef): refactor...
// wireMailCourier is a dedicated goroutine whose job is to reliably deliver
// wire messages.
func (m *memoryMailBox) wireMailCourier() {
defer close(m.wireShutdown)
for {
// First, we'll check our condition. If our target mailbox is
// empty, then we'll wait until a new item is added.
switch cType {
case wireCourier:
m.wireCond.L.Lock()
for m.wireMessages.Front() == nil {
m.wireCond.Wait()
// First, we'll check our condition. If our mailbox is empty,
// then we'll wait until a new item is added.
m.wireCond.L.Lock()
for m.wireMessages.Front() == nil {
m.wireCond.Wait()
select {
case msgDone := <-m.msgReset:
m.wireMessages.Init()
close(msgDone)
case <-m.quit:
m.wireCond.L.Unlock()
return
default:
}
select {
case msgDone := <-m.msgReset:
m.wireMessages.Init()
close(msgDone)
case <-m.quit:
m.wireCond.L.Unlock()
return
default:
}
}
case pktCourier:
m.pktCond.L.Lock()
for m.repHead == nil && m.addHead == nil {
m.pktCond.Wait()
// Grab the datum off the front of the queue, shifting the
// slice's reference down one in order to remove the datum from
// the queue.
entry := m.wireMessages.Front()
select {
// Resetting the packet queue means just moving
// our pointer to the front. This ensures that
// any un-ACK'd messages are re-delivered upon
// reconnect.
case pktDone := <-m.pktReset:
m.repHead = m.repPkts.Front()
m.addHead = m.addPkts.Front()
//nolint:forcetypeassert
nextMsg := m.wireMessages.Remove(entry).(lnwire.Message)
close(pktDone)
// Now that we're done with the condition, we can unlock it to
// allow any callers to append to the end of our target queue.
m.wireCond.L.Unlock()
case <-m.quit:
m.pktCond.L.Unlock()
return
default:
}
// With the next message obtained, we'll now select to attempt
// to deliver the message. If we receive a kill signal, then
// we'll bail out.
select {
case m.messageOutbox <- nextMsg:
case msgDone := <-m.msgReset:
m.wireCond.L.Lock()
m.wireMessages.Init()
m.wireCond.L.Unlock()
close(msgDone)
case <-m.quit:
return
}
}
}
// pktMailCourier is a dedicated goroutine whose job is to reliably deliver
// packet messages.
func (m *memoryMailBox) pktMailCourier() {
defer close(m.pktShutdown)
for {
// First, we'll check our condition. If our mailbox is empty,
// then we'll wait until a new item is added.
m.pktCond.L.Lock()
for m.repHead == nil && m.addHead == nil {
m.pktCond.Wait()
select {
// Resetting the packet queue means just moving our
// pointer to the front. This ensures that any un-ACK'd
// messages are re-delivered upon reconnect.
case pktDone := <-m.pktReset:
m.repHead = m.repPkts.Front()
m.addHead = m.addPkts.Front()
close(pktDone)
case <-m.quit:
m.pktCond.L.Unlock()
return
default:
}
}
@ -429,142 +453,104 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
nextRepEl *list.Element
nextAdd *pktWithExpiry
nextAddEl *list.Element
nextMsg lnwire.Message
)
switch cType {
// Grab the datum off the front of the queue, shifting the
// slice's reference down one in order to remove the datum from
// the queue.
case wireCourier:
entry := m.wireMessages.Front()
nextMsg = m.wireMessages.Remove(entry).(lnwire.Message)
// For packets, we actually never remove an item until it has
// been ACK'd by the link. This ensures that if a read packet
// doesn't make it into a commitment, then it'll be
// re-delivered once the link comes back online.
case pktCourier:
// Peek at the head of the Settle/Fails and Add queues.
// We peak both even if there is a Settle/Fail present
// because we need to set a deadline for the next
// pending Add if it's present. Due to clock
// monotonicity, we know that the head of the Adds is
// the next to expire.
if m.repHead != nil {
nextRep = m.repHead.Value.(*htlcPacket)
nextRepEl = m.repHead
}
if m.addHead != nil {
nextAdd = m.addHead.Value.(*pktWithExpiry)
nextAddEl = m.addHead
}
// Peek at the head of the Settle/Fails and Add queues. We peak
// both even if there is a Settle/Fail present because we need
// to set a deadline for the next pending Add if it's present.
// Due to clock monotonicity, we know that the head of the Adds
// is the next to expire.
if m.repHead != nil {
//nolint:forcetypeassert
nextRep = m.repHead.Value.(*htlcPacket)
nextRepEl = m.repHead
}
if m.addHead != nil {
//nolint:forcetypeassert
nextAdd = m.addHead.Value.(*pktWithExpiry)
nextAddEl = m.addHead
}
// Now that we're done with the condition, we can unlock it to
// allow any callers to append to the end of our target queue.
switch cType {
case wireCourier:
m.wireCond.L.Unlock()
case pktCourier:
m.pktCond.L.Unlock()
var (
pktOutbox chan *htlcPacket
addOutbox chan *htlcPacket
add *htlcPacket
deadline <-chan time.Time
)
// Prioritize delivery of Settle/Fail packets over Adds. This
// ensures that we actively clear the commitment of existing
// HTLCs before trying to add new ones. This can help to improve
// forwarding performance since the time to sign a commitment is
// linear in the number of HTLCs manifested on the commitments.
//
// NOTE: Both types are eventually delivered over the same
// channel, but we can control which is delivered by exclusively
// making one nil and the other non-nil. We know from our loop
// condition that at least one nextRep and nextAdd are non-nil.
if nextRep != nil {
pktOutbox = m.pktOutbox
} else {
addOutbox = m.pktOutbox
}
// If we have a pending Add, we'll also construct the deadline
// so we can fail it back if we are unable to deliver any
// message in time. We also dereference the nextAdd's packet,
// since we will need access to it in the case we are delivering
// it and/or if the deadline expires.
//
// NOTE: It's possible after this point for add to be nil, but
// this can only occur when addOutbox is also nil, hence we
// won't accidentally deliver a nil packet.
if nextAdd != nil {
add = nextAdd.pkt
deadline = nextAdd.deadline(m.cfg.clock)
}
select {
case pktOutbox <- nextRep:
m.pktCond.L.Lock()
// Only advance the repHead if this Settle or Fail is
// still at the head of the queue.
if m.repHead != nil && m.repHead == nextRepEl {
m.repHead = m.repHead.Next()
}
m.pktCond.L.Unlock()
case addOutbox <- add:
m.pktCond.L.Lock()
// Only advance the addHead if this Add is still at the
// head of the queue.
if m.addHead != nil && m.addHead == nextAddEl {
m.addHead = m.addHead.Next()
}
m.pktCond.L.Unlock()
case <-deadline:
log.Debugf("Expiring add htlc with "+
"keystone=%v", add.keystone())
m.FailAdd(add)
case pktDone := <-m.pktReset:
m.pktCond.L.Lock()
m.repHead = m.repPkts.Front()
m.addHead = m.addPkts.Front()
m.pktCond.L.Unlock()
close(pktDone)
case <-m.quit:
return
}
// With the next message obtained, we'll now select to attempt
// to deliver the message. If we receive a kill signal, then
// we'll bail out.
switch cType {
case wireCourier:
select {
case m.messageOutbox <- nextMsg:
case msgDone := <-m.msgReset:
m.wireCond.L.Lock()
m.wireMessages.Init()
m.wireCond.L.Unlock()
close(msgDone)
case <-m.quit:
return
}
case pktCourier:
var (
pktOutbox chan *htlcPacket
addOutbox chan *htlcPacket
add *htlcPacket
deadline <-chan time.Time
)
// Prioritize delivery of Settle/Fail packets over Adds.
// This ensures that we actively clear the commitment of
// existing HTLCs before trying to add new ones. This
// can help to improve forwarding performance since the
// time to sign a commitment is linear in the number of
// HTLCs manifested on the commitments.
//
// NOTE: Both types are eventually delivered over the
// same channel, but we can control which is delivered
// by exclusively making one nil and the other non-nil.
// We know from our loop condition that at least one
// nextRep and nextAdd are non-nil.
if nextRep != nil {
pktOutbox = m.pktOutbox
} else {
addOutbox = m.pktOutbox
}
// If we have a pending Add, we'll also construct the
// deadline so we can fail it back if we are unable to
// deliver any message in time. We also dereference the
// nextAdd's packet, since we will need access to it in
// the case we are delivering it and/or if the deadline
// expires.
//
// NOTE: It's possible after this point for add to be
// nil, but this can only occur when addOutbox is also
// nil, hence we won't accidentally deliver a nil
// packet.
if nextAdd != nil {
add = nextAdd.pkt
deadline = nextAdd.deadline(m.cfg.clock)
}
select {
case pktOutbox <- nextRep:
m.pktCond.L.Lock()
// Only advance the repHead if this Settle or
// Fail is still at the head of the queue.
if m.repHead != nil && m.repHead == nextRepEl {
m.repHead = m.repHead.Next()
}
m.pktCond.L.Unlock()
case addOutbox <- add:
m.pktCond.L.Lock()
// Only advance the addHead if this Add is still
// at the head of the queue.
if m.addHead != nil && m.addHead == nextAddEl {
m.addHead = m.addHead.Next()
}
m.pktCond.L.Unlock()
case <-deadline:
log.Debugf("Expiring add htlc with "+
"keystone=%v", add.keystone())
m.FailAdd(add)
case pktDone := <-m.pktReset:
m.pktCond.L.Lock()
m.repHead = m.repPkts.Front()
m.addHead = m.addPkts.Front()
m.pktCond.L.Unlock()
close(pktDone)
case <-m.quit:
return
}
}
}
}