htlcswitch: split mailCourier by courierType

The goroutine is very long and littered with switches on courierType. By
separating the implementations into different methods for each
courierType we eliminate the cruft and improve readability.
This commit is contained in:
Matt Morehouse
2023-07-20 14:53:46 -05:00
parent c3cd93c98a
commit 15f6e7a80a

View File

@@ -202,8 +202,8 @@ const (
// NOTE: This method is part of the MailBox interface. // NOTE: This method is part of the MailBox interface.
func (m *memoryMailBox) Start() { func (m *memoryMailBox) Start() {
m.started.Do(func() { m.started.Do(func() {
go m.mailCourier(wireCourier) go m.wireMailCourier()
go m.mailCourier(pktCourier) go m.pktMailCourier()
}) })
} }
@@ -365,25 +365,14 @@ func (p *pktWithExpiry) deadline(clock clock.Clock) <-chan time.Time {
return clock.TickAfter(p.expiry.Sub(clock.Now())) return clock.TickAfter(p.expiry.Sub(clock.Now()))
} }
// mailCourier is a dedicated goroutine whose job is to reliably deliver // wireMailCourier is a dedicated goroutine whose job is to reliably deliver
// messages of a particular type. There are two types of couriers: wire // wire messages.
// couriers, and mail couriers. Depending on the passed courierType, this func (m *memoryMailBox) wireMailCourier() {
// goroutine will assume one of two roles.
func (m *memoryMailBox) mailCourier(cType courierType) {
switch cType {
case wireCourier:
defer close(m.wireShutdown) defer close(m.wireShutdown)
case pktCourier:
defer close(m.pktShutdown)
}
// TODO(roasbeef): refactor...
for { for {
// First, we'll check our condition. If our target mailbox is // First, we'll check our condition. If our mailbox is empty,
// empty, then we'll wait until a new item is added. // then we'll wait until a new item is added.
switch cType {
case wireCourier:
m.wireCond.L.Lock() m.wireCond.L.Lock()
for m.wireMessages.Front() == nil { for m.wireMessages.Front() == nil {
m.wireCond.Wait() m.wireCond.Wait()
@@ -391,7 +380,6 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
select { select {
case msgDone := <-m.msgReset: case msgDone := <-m.msgReset:
m.wireMessages.Init() m.wireMessages.Init()
close(msgDone) close(msgDone)
case <-m.quit: case <-m.quit:
m.wireCond.L.Unlock() m.wireCond.L.Unlock()
@@ -400,16 +388,51 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
} }
} }
case pktCourier: // Grab the datum off the front of the queue, shifting the
// slice's reference down one in order to remove the datum from
// the queue.
entry := m.wireMessages.Front()
//nolint:forcetypeassert
nextMsg := m.wireMessages.Remove(entry).(lnwire.Message)
// Now that we're done with the condition, we can unlock it to
// allow any callers to append to the end of our target queue.
m.wireCond.L.Unlock()
// With the next message obtained, we'll now select to attempt
// to deliver the message. If we receive a kill signal, then
// we'll bail out.
select {
case m.messageOutbox <- nextMsg:
case msgDone := <-m.msgReset:
m.wireCond.L.Lock()
m.wireMessages.Init()
m.wireCond.L.Unlock()
close(msgDone)
case <-m.quit:
return
}
}
}
// pktMailCourier is a dedicated goroutine whose job is to reliably deliver
// packet messages.
func (m *memoryMailBox) pktMailCourier() {
defer close(m.pktShutdown)
for {
// First, we'll check our condition. If our mailbox is empty,
// then we'll wait until a new item is added.
m.pktCond.L.Lock() m.pktCond.L.Lock()
for m.repHead == nil && m.addHead == nil { for m.repHead == nil && m.addHead == nil {
m.pktCond.Wait() m.pktCond.Wait()
select { select {
// Resetting the packet queue means just moving // Resetting the packet queue means just moving our
// our pointer to the front. This ensures that // pointer to the front. This ensures that any un-ACK'd
// any un-ACK'd messages are re-delivered upon // messages are re-delivered upon reconnect.
// reconnect.
case pktDone := <-m.pktReset: case pktDone := <-m.pktReset:
m.repHead = m.repPkts.Front() m.repHead = m.repPkts.Front()
m.addHead = m.addPkts.Front() m.addHead = m.addPkts.Front()
@@ -422,71 +445,38 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
default: default:
} }
} }
}
var ( var (
nextRep *htlcPacket nextRep *htlcPacket
nextRepEl *list.Element nextRepEl *list.Element
nextAdd *pktWithExpiry nextAdd *pktWithExpiry
nextAddEl *list.Element nextAddEl *list.Element
nextMsg lnwire.Message
) )
switch cType {
// Grab the datum off the front of the queue, shifting the
// slice's reference down one in order to remove the datum from
// the queue.
case wireCourier:
entry := m.wireMessages.Front()
nextMsg = m.wireMessages.Remove(entry).(lnwire.Message)
// For packets, we actually never remove an item until it has // For packets, we actually never remove an item until it has
// been ACK'd by the link. This ensures that if a read packet // been ACK'd by the link. This ensures that if a read packet
// doesn't make it into a commitment, then it'll be // doesn't make it into a commitment, then it'll be
// re-delivered once the link comes back online. // re-delivered once the link comes back online.
case pktCourier:
// Peek at the head of the Settle/Fails and Add queues. // Peek at the head of the Settle/Fails and Add queues. We peak
// We peak both even if there is a Settle/Fail present // both even if there is a Settle/Fail present because we need
// because we need to set a deadline for the next // to set a deadline for the next pending Add if it's present.
// pending Add if it's present. Due to clock // Due to clock monotonicity, we know that the head of the Adds
// monotonicity, we know that the head of the Adds is // is the next to expire.
// the next to expire.
if m.repHead != nil { if m.repHead != nil {
//nolint:forcetypeassert
nextRep = m.repHead.Value.(*htlcPacket) nextRep = m.repHead.Value.(*htlcPacket)
nextRepEl = m.repHead nextRepEl = m.repHead
} }
if m.addHead != nil { if m.addHead != nil {
//nolint:forcetypeassert
nextAdd = m.addHead.Value.(*pktWithExpiry) nextAdd = m.addHead.Value.(*pktWithExpiry)
nextAddEl = m.addHead nextAddEl = m.addHead
} }
}
// Now that we're done with the condition, we can unlock it to // Now that we're done with the condition, we can unlock it to
// allow any callers to append to the end of our target queue. // allow any callers to append to the end of our target queue.
switch cType {
case wireCourier:
m.wireCond.L.Unlock()
case pktCourier:
m.pktCond.L.Unlock() m.pktCond.L.Unlock()
}
// With the next message obtained, we'll now select to attempt
// to deliver the message. If we receive a kill signal, then
// we'll bail out.
switch cType {
case wireCourier:
select {
case m.messageOutbox <- nextMsg:
case msgDone := <-m.msgReset:
m.wireCond.L.Lock()
m.wireMessages.Init()
m.wireCond.L.Unlock()
close(msgDone)
case <-m.quit:
return
}
case pktCourier:
var ( var (
pktOutbox chan *htlcPacket pktOutbox chan *htlcPacket
addOutbox chan *htlcPacket addOutbox chan *htlcPacket
@@ -494,35 +484,31 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
deadline <-chan time.Time deadline <-chan time.Time
) )
// Prioritize delivery of Settle/Fail packets over Adds. // Prioritize delivery of Settle/Fail packets over Adds. This
// This ensures that we actively clear the commitment of // ensures that we actively clear the commitment of existing
// existing HTLCs before trying to add new ones. This // HTLCs before trying to add new ones. This can help to improve
// can help to improve forwarding performance since the // forwarding performance since the time to sign a commitment is
// time to sign a commitment is linear in the number of // linear in the number of HTLCs manifested on the commitments.
// HTLCs manifested on the commitments.
// //
// NOTE: Both types are eventually delivered over the // NOTE: Both types are eventually delivered over the same
// same channel, but we can control which is delivered // channel, but we can control which is delivered by exclusively
// by exclusively making one nil and the other non-nil. // making one nil and the other non-nil. We know from our loop
// We know from our loop condition that at least one // condition that at least one nextRep and nextAdd are non-nil.
// nextRep and nextAdd are non-nil.
if nextRep != nil { if nextRep != nil {
pktOutbox = m.pktOutbox pktOutbox = m.pktOutbox
} else { } else {
addOutbox = m.pktOutbox addOutbox = m.pktOutbox
} }
// If we have a pending Add, we'll also construct the // If we have a pending Add, we'll also construct the deadline
// deadline so we can fail it back if we are unable to // so we can fail it back if we are unable to deliver any
// deliver any message in time. We also dereference the // message in time. We also dereference the nextAdd's packet,
// nextAdd's packet, since we will need access to it in // since we will need access to it in the case we are delivering
// the case we are delivering it and/or if the deadline // it and/or if the deadline expires.
// expires.
// //
// NOTE: It's possible after this point for add to be // NOTE: It's possible after this point for add to be nil, but
// nil, but this can only occur when addOutbox is also // this can only occur when addOutbox is also nil, hence we
// nil, hence we won't accidentally deliver a nil // won't accidentally deliver a nil packet.
// packet.
if nextAdd != nil { if nextAdd != nil {
add = nextAdd.pkt add = nextAdd.pkt
deadline = nextAdd.deadline(m.cfg.clock) deadline = nextAdd.deadline(m.cfg.clock)
@@ -531,8 +517,8 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
select { select {
case pktOutbox <- nextRep: case pktOutbox <- nextRep:
m.pktCond.L.Lock() m.pktCond.L.Lock()
// Only advance the repHead if this Settle or // Only advance the repHead if this Settle or Fail is
// Fail is still at the head of the queue. // still at the head of the queue.
if m.repHead != nil && m.repHead == nextRepEl { if m.repHead != nil && m.repHead == nextRepEl {
m.repHead = m.repHead.Next() m.repHead = m.repHead.Next()
} }
@@ -540,8 +526,8 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
case addOutbox <- add: case addOutbox <- add:
m.pktCond.L.Lock() m.pktCond.L.Lock()
// Only advance the addHead if this Add is still // Only advance the addHead if this Add is still at the
// at the head of the queue. // head of the queue.
if m.addHead != nil && m.addHead == nextAddEl { if m.addHead != nil && m.addHead == nextAddEl {
m.addHead = m.addHead.Next() m.addHead = m.addHead.Next()
} }
@@ -564,8 +550,6 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
return return
} }
} }
}
} }
// AddMessage appends a new message to the end of the message queue. // AddMessage appends a new message to the end of the message queue.