From 15f6e7a80af47bce968a64e53b92d373ba8d51f5 Mon Sep 17 00:00:00 2001 From: Matt Morehouse Date: Thu, 20 Jul 2023 14:53:46 -0500 Subject: [PATCH 1/2] htlcswitch: split mailCourier by courierType The goroutine is very long and littered with switches on courierType. By separating the implementations into different methods for each courierType we eliminate the cruft and improve readability. --- htlcswitch/mailbox.go | 334 ++++++++++++++++++++---------------------- 1 file changed, 159 insertions(+), 175 deletions(-) diff --git a/htlcswitch/mailbox.go b/htlcswitch/mailbox.go index cc3bd1c83..8bc8ba6da 100644 --- a/htlcswitch/mailbox.go +++ b/htlcswitch/mailbox.go @@ -202,8 +202,8 @@ const ( // NOTE: This method is part of the MailBox interface. func (m *memoryMailBox) Start() { m.started.Do(func() { - go m.mailCourier(wireCourier) - go m.mailCourier(pktCourier) + go m.wireMailCourier() + go m.pktMailCourier() }) } @@ -365,62 +365,84 @@ func (p *pktWithExpiry) deadline(clock clock.Clock) <-chan time.Time { return clock.TickAfter(p.expiry.Sub(clock.Now())) } -// mailCourier is a dedicated goroutine whose job is to reliably deliver -// messages of a particular type. There are two types of couriers: wire -// couriers, and mail couriers. Depending on the passed courierType, this -// goroutine will assume one of two roles. -func (m *memoryMailBox) mailCourier(cType courierType) { - switch cType { - case wireCourier: - defer close(m.wireShutdown) - case pktCourier: - defer close(m.pktShutdown) - } - - // TODO(roasbeef): refactor... +// wireMailCourier is a dedicated goroutine whose job is to reliably deliver +// wire messages. +func (m *memoryMailBox) wireMailCourier() { + defer close(m.wireShutdown) for { - // First, we'll check our condition. If our target mailbox is - // empty, then we'll wait until a new item is added. - switch cType { - case wireCourier: - m.wireCond.L.Lock() - for m.wireMessages.Front() == nil { - m.wireCond.Wait() + // First, we'll check our condition. If our mailbox is empty, + // then we'll wait until a new item is added. + m.wireCond.L.Lock() + for m.wireMessages.Front() == nil { + m.wireCond.Wait() - select { - case msgDone := <-m.msgReset: - m.wireMessages.Init() - - close(msgDone) - case <-m.quit: - m.wireCond.L.Unlock() - return - default: - } + select { + case msgDone := <-m.msgReset: + m.wireMessages.Init() + close(msgDone) + case <-m.quit: + m.wireCond.L.Unlock() + return + default: } + } - case pktCourier: - m.pktCond.L.Lock() - for m.repHead == nil && m.addHead == nil { - m.pktCond.Wait() + // Grab the datum off the front of the queue, shifting the + // slice's reference down one in order to remove the datum from + // the queue. + entry := m.wireMessages.Front() - select { - // Resetting the packet queue means just moving - // our pointer to the front. This ensures that - // any un-ACK'd messages are re-delivered upon - // reconnect. - case pktDone := <-m.pktReset: - m.repHead = m.repPkts.Front() - m.addHead = m.addPkts.Front() + //nolint:forcetypeassert + nextMsg := m.wireMessages.Remove(entry).(lnwire.Message) - close(pktDone) + // Now that we're done with the condition, we can unlock it to + // allow any callers to append to the end of our target queue. + m.wireCond.L.Unlock() - case <-m.quit: - m.pktCond.L.Unlock() - return - default: - } + // With the next message obtained, we'll now select to attempt + // to deliver the message. If we receive a kill signal, then + // we'll bail out. + select { + case m.messageOutbox <- nextMsg: + case msgDone := <-m.msgReset: + m.wireCond.L.Lock() + m.wireMessages.Init() + m.wireCond.L.Unlock() + + close(msgDone) + case <-m.quit: + return + } + } +} + +// pktMailCourier is a dedicated goroutine whose job is to reliably deliver +// packet messages. +func (m *memoryMailBox) pktMailCourier() { + defer close(m.pktShutdown) + + for { + // First, we'll check our condition. If our mailbox is empty, + // then we'll wait until a new item is added. + m.pktCond.L.Lock() + for m.repHead == nil && m.addHead == nil { + m.pktCond.Wait() + + select { + // Resetting the packet queue means just moving our + // pointer to the front. This ensures that any un-ACK'd + // messages are re-delivered upon reconnect. + case pktDone := <-m.pktReset: + m.repHead = m.repPkts.Front() + m.addHead = m.addPkts.Front() + + close(pktDone) + + case <-m.quit: + m.pktCond.L.Unlock() + return + default: } } @@ -429,142 +451,104 @@ func (m *memoryMailBox) mailCourier(cType courierType) { nextRepEl *list.Element nextAdd *pktWithExpiry nextAddEl *list.Element - nextMsg lnwire.Message ) - switch cType { - // Grab the datum off the front of the queue, shifting the - // slice's reference down one in order to remove the datum from - // the queue. - case wireCourier: - entry := m.wireMessages.Front() - nextMsg = m.wireMessages.Remove(entry).(lnwire.Message) - // For packets, we actually never remove an item until it has // been ACK'd by the link. This ensures that if a read packet // doesn't make it into a commitment, then it'll be // re-delivered once the link comes back online. - case pktCourier: - // Peek at the head of the Settle/Fails and Add queues. - // We peak both even if there is a Settle/Fail present - // because we need to set a deadline for the next - // pending Add if it's present. Due to clock - // monotonicity, we know that the head of the Adds is - // the next to expire. - if m.repHead != nil { - nextRep = m.repHead.Value.(*htlcPacket) - nextRepEl = m.repHead - } - if m.addHead != nil { - nextAdd = m.addHead.Value.(*pktWithExpiry) - nextAddEl = m.addHead - } + + // Peek at the head of the Settle/Fails and Add queues. We peak + // both even if there is a Settle/Fail present because we need + // to set a deadline for the next pending Add if it's present. + // Due to clock monotonicity, we know that the head of the Adds + // is the next to expire. + if m.repHead != nil { + //nolint:forcetypeassert + nextRep = m.repHead.Value.(*htlcPacket) + nextRepEl = m.repHead + } + if m.addHead != nil { + //nolint:forcetypeassert + nextAdd = m.addHead.Value.(*pktWithExpiry) + nextAddEl = m.addHead } // Now that we're done with the condition, we can unlock it to // allow any callers to append to the end of our target queue. - switch cType { - case wireCourier: - m.wireCond.L.Unlock() - case pktCourier: + m.pktCond.L.Unlock() + + var ( + pktOutbox chan *htlcPacket + addOutbox chan *htlcPacket + add *htlcPacket + deadline <-chan time.Time + ) + + // Prioritize delivery of Settle/Fail packets over Adds. This + // ensures that we actively clear the commitment of existing + // HTLCs before trying to add new ones. This can help to improve + // forwarding performance since the time to sign a commitment is + // linear in the number of HTLCs manifested on the commitments. + // + // NOTE: Both types are eventually delivered over the same + // channel, but we can control which is delivered by exclusively + // making one nil and the other non-nil. We know from our loop + // condition that at least one nextRep and nextAdd are non-nil. + if nextRep != nil { + pktOutbox = m.pktOutbox + } else { + addOutbox = m.pktOutbox + } + + // If we have a pending Add, we'll also construct the deadline + // so we can fail it back if we are unable to deliver any + // message in time. We also dereference the nextAdd's packet, + // since we will need access to it in the case we are delivering + // it and/or if the deadline expires. + // + // NOTE: It's possible after this point for add to be nil, but + // this can only occur when addOutbox is also nil, hence we + // won't accidentally deliver a nil packet. + if nextAdd != nil { + add = nextAdd.pkt + deadline = nextAdd.deadline(m.cfg.clock) + } + + select { + case pktOutbox <- nextRep: + m.pktCond.L.Lock() + // Only advance the repHead if this Settle or Fail is + // still at the head of the queue. + if m.repHead != nil && m.repHead == nextRepEl { + m.repHead = m.repHead.Next() + } m.pktCond.L.Unlock() + + case addOutbox <- add: + m.pktCond.L.Lock() + // Only advance the addHead if this Add is still at the + // head of the queue. + if m.addHead != nil && m.addHead == nextAddEl { + m.addHead = m.addHead.Next() + } + m.pktCond.L.Unlock() + + case <-deadline: + log.Debugf("Expiring add htlc with "+ + "keystone=%v", add.keystone()) + m.FailAdd(add) + + case pktDone := <-m.pktReset: + m.pktCond.L.Lock() + m.repHead = m.repPkts.Front() + m.addHead = m.addPkts.Front() + m.pktCond.L.Unlock() + + close(pktDone) + + case <-m.quit: + return } - - // With the next message obtained, we'll now select to attempt - // to deliver the message. If we receive a kill signal, then - // we'll bail out. - switch cType { - case wireCourier: - select { - case m.messageOutbox <- nextMsg: - case msgDone := <-m.msgReset: - m.wireCond.L.Lock() - m.wireMessages.Init() - m.wireCond.L.Unlock() - - close(msgDone) - case <-m.quit: - return - } - - case pktCourier: - var ( - pktOutbox chan *htlcPacket - addOutbox chan *htlcPacket - add *htlcPacket - deadline <-chan time.Time - ) - - // Prioritize delivery of Settle/Fail packets over Adds. - // This ensures that we actively clear the commitment of - // existing HTLCs before trying to add new ones. This - // can help to improve forwarding performance since the - // time to sign a commitment is linear in the number of - // HTLCs manifested on the commitments. - // - // NOTE: Both types are eventually delivered over the - // same channel, but we can control which is delivered - // by exclusively making one nil and the other non-nil. - // We know from our loop condition that at least one - // nextRep and nextAdd are non-nil. - if nextRep != nil { - pktOutbox = m.pktOutbox - } else { - addOutbox = m.pktOutbox - } - - // If we have a pending Add, we'll also construct the - // deadline so we can fail it back if we are unable to - // deliver any message in time. We also dereference the - // nextAdd's packet, since we will need access to it in - // the case we are delivering it and/or if the deadline - // expires. - // - // NOTE: It's possible after this point for add to be - // nil, but this can only occur when addOutbox is also - // nil, hence we won't accidentally deliver a nil - // packet. - if nextAdd != nil { - add = nextAdd.pkt - deadline = nextAdd.deadline(m.cfg.clock) - } - - select { - case pktOutbox <- nextRep: - m.pktCond.L.Lock() - // Only advance the repHead if this Settle or - // Fail is still at the head of the queue. - if m.repHead != nil && m.repHead == nextRepEl { - m.repHead = m.repHead.Next() - } - m.pktCond.L.Unlock() - - case addOutbox <- add: - m.pktCond.L.Lock() - // Only advance the addHead if this Add is still - // at the head of the queue. - if m.addHead != nil && m.addHead == nextAddEl { - m.addHead = m.addHead.Next() - } - m.pktCond.L.Unlock() - - case <-deadline: - log.Debugf("Expiring add htlc with "+ - "keystone=%v", add.keystone()) - m.FailAdd(add) - - case pktDone := <-m.pktReset: - m.pktCond.L.Lock() - m.repHead = m.repPkts.Front() - m.addHead = m.addPkts.Front() - m.pktCond.L.Unlock() - - close(pktDone) - - case <-m.quit: - return - } - } - } } From b1a3c46759be0c5b1492c15a46bf81905e475a3e Mon Sep 17 00:00:00 2001 From: Matt Morehouse Date: Mon, 31 Jul 2023 09:52:33 -0500 Subject: [PATCH 2/2] htlcswitch: TODO for obviating type assertions memoryMailBox uses multiple container/list.List objects to track messages and packets, which use interface{} to accept objects of any type. go1.18 added generics to the language, which means we could use a typed list instead, allowing us to stop using forced type assertions when reading objects from the list. I'm not aware of any standard library implementation of a typed list yet, so let's just add a TODO for now. --- htlcswitch/mailbox.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/htlcswitch/mailbox.go b/htlcswitch/mailbox.go index 8bc8ba6da..99066497c 100644 --- a/htlcswitch/mailbox.go +++ b/htlcswitch/mailbox.go @@ -112,6 +112,8 @@ type mailBoxConfig struct { // memoryMailBox is an implementation of the MailBox struct backed by purely // in-memory queues. +// +// TODO(morehouse): use typed lists instead of list.Lists to avoid type asserts. type memoryMailBox struct { started sync.Once stopped sync.Once