diff --git a/htlcswitch/mailbox.go b/htlcswitch/mailbox.go index cc3bd1c83..8bc8ba6da 100644 --- a/htlcswitch/mailbox.go +++ b/htlcswitch/mailbox.go @@ -202,8 +202,8 @@ const ( // NOTE: This method is part of the MailBox interface. func (m *memoryMailBox) Start() { m.started.Do(func() { - go m.mailCourier(wireCourier) - go m.mailCourier(pktCourier) + go m.wireMailCourier() + go m.pktMailCourier() }) } @@ -365,62 +365,84 @@ func (p *pktWithExpiry) deadline(clock clock.Clock) <-chan time.Time { return clock.TickAfter(p.expiry.Sub(clock.Now())) } -// mailCourier is a dedicated goroutine whose job is to reliably deliver -// messages of a particular type. There are two types of couriers: wire -// couriers, and mail couriers. Depending on the passed courierType, this -// goroutine will assume one of two roles. -func (m *memoryMailBox) mailCourier(cType courierType) { - switch cType { - case wireCourier: - defer close(m.wireShutdown) - case pktCourier: - defer close(m.pktShutdown) - } - - // TODO(roasbeef): refactor... +// wireMailCourier is a dedicated goroutine whose job is to reliably deliver +// wire messages. +func (m *memoryMailBox) wireMailCourier() { + defer close(m.wireShutdown) for { - // First, we'll check our condition. If our target mailbox is - // empty, then we'll wait until a new item is added. - switch cType { - case wireCourier: - m.wireCond.L.Lock() - for m.wireMessages.Front() == nil { - m.wireCond.Wait() + // First, we'll check our condition. If our mailbox is empty, + // then we'll wait until a new item is added. + m.wireCond.L.Lock() + for m.wireMessages.Front() == nil { + m.wireCond.Wait() - select { - case msgDone := <-m.msgReset: - m.wireMessages.Init() - - close(msgDone) - case <-m.quit: - m.wireCond.L.Unlock() - return - default: - } + select { + case msgDone := <-m.msgReset: + m.wireMessages.Init() + close(msgDone) + case <-m.quit: + m.wireCond.L.Unlock() + return + default: } + } - case pktCourier: - m.pktCond.L.Lock() - for m.repHead == nil && m.addHead == nil { - m.pktCond.Wait() + // Grab the datum off the front of the queue, shifting the + // slice's reference down one in order to remove the datum from + // the queue. + entry := m.wireMessages.Front() - select { - // Resetting the packet queue means just moving - // our pointer to the front. This ensures that - // any un-ACK'd messages are re-delivered upon - // reconnect. - case pktDone := <-m.pktReset: - m.repHead = m.repPkts.Front() - m.addHead = m.addPkts.Front() + //nolint:forcetypeassert + nextMsg := m.wireMessages.Remove(entry).(lnwire.Message) - close(pktDone) + // Now that we're done with the condition, we can unlock it to + // allow any callers to append to the end of our target queue. + m.wireCond.L.Unlock() - case <-m.quit: - m.pktCond.L.Unlock() - return - default: - } + // With the next message obtained, we'll now select to attempt + // to deliver the message. If we receive a kill signal, then + // we'll bail out. + select { + case m.messageOutbox <- nextMsg: + case msgDone := <-m.msgReset: + m.wireCond.L.Lock() + m.wireMessages.Init() + m.wireCond.L.Unlock() + + close(msgDone) + case <-m.quit: + return + } + } +} + +// pktMailCourier is a dedicated goroutine whose job is to reliably deliver +// packet messages. +func (m *memoryMailBox) pktMailCourier() { + defer close(m.pktShutdown) + + for { + // First, we'll check our condition. If our mailbox is empty, + // then we'll wait until a new item is added. + m.pktCond.L.Lock() + for m.repHead == nil && m.addHead == nil { + m.pktCond.Wait() + + select { + // Resetting the packet queue means just moving our + // pointer to the front. This ensures that any un-ACK'd + // messages are re-delivered upon reconnect. + case pktDone := <-m.pktReset: + m.repHead = m.repPkts.Front() + m.addHead = m.addPkts.Front() + + close(pktDone) + + case <-m.quit: + m.pktCond.L.Unlock() + return + default: } } @@ -429,142 +451,104 @@ func (m *memoryMailBox) mailCourier(cType courierType) { nextRepEl *list.Element nextAdd *pktWithExpiry nextAddEl *list.Element - nextMsg lnwire.Message ) - switch cType { - // Grab the datum off the front of the queue, shifting the - // slice's reference down one in order to remove the datum from - // the queue. - case wireCourier: - entry := m.wireMessages.Front() - nextMsg = m.wireMessages.Remove(entry).(lnwire.Message) - // For packets, we actually never remove an item until it has // been ACK'd by the link. This ensures that if a read packet // doesn't make it into a commitment, then it'll be // re-delivered once the link comes back online. - case pktCourier: - // Peek at the head of the Settle/Fails and Add queues. - // We peak both even if there is a Settle/Fail present - // because we need to set a deadline for the next - // pending Add if it's present. Due to clock - // monotonicity, we know that the head of the Adds is - // the next to expire. - if m.repHead != nil { - nextRep = m.repHead.Value.(*htlcPacket) - nextRepEl = m.repHead - } - if m.addHead != nil { - nextAdd = m.addHead.Value.(*pktWithExpiry) - nextAddEl = m.addHead - } + + // Peek at the head of the Settle/Fails and Add queues. We peak + // both even if there is a Settle/Fail present because we need + // to set a deadline for the next pending Add if it's present. + // Due to clock monotonicity, we know that the head of the Adds + // is the next to expire. + if m.repHead != nil { + //nolint:forcetypeassert + nextRep = m.repHead.Value.(*htlcPacket) + nextRepEl = m.repHead + } + if m.addHead != nil { + //nolint:forcetypeassert + nextAdd = m.addHead.Value.(*pktWithExpiry) + nextAddEl = m.addHead } // Now that we're done with the condition, we can unlock it to // allow any callers to append to the end of our target queue. - switch cType { - case wireCourier: - m.wireCond.L.Unlock() - case pktCourier: + m.pktCond.L.Unlock() + + var ( + pktOutbox chan *htlcPacket + addOutbox chan *htlcPacket + add *htlcPacket + deadline <-chan time.Time + ) + + // Prioritize delivery of Settle/Fail packets over Adds. This + // ensures that we actively clear the commitment of existing + // HTLCs before trying to add new ones. This can help to improve + // forwarding performance since the time to sign a commitment is + // linear in the number of HTLCs manifested on the commitments. + // + // NOTE: Both types are eventually delivered over the same + // channel, but we can control which is delivered by exclusively + // making one nil and the other non-nil. We know from our loop + // condition that at least one nextRep and nextAdd are non-nil. + if nextRep != nil { + pktOutbox = m.pktOutbox + } else { + addOutbox = m.pktOutbox + } + + // If we have a pending Add, we'll also construct the deadline + // so we can fail it back if we are unable to deliver any + // message in time. We also dereference the nextAdd's packet, + // since we will need access to it in the case we are delivering + // it and/or if the deadline expires. + // + // NOTE: It's possible after this point for add to be nil, but + // this can only occur when addOutbox is also nil, hence we + // won't accidentally deliver a nil packet. + if nextAdd != nil { + add = nextAdd.pkt + deadline = nextAdd.deadline(m.cfg.clock) + } + + select { + case pktOutbox <- nextRep: + m.pktCond.L.Lock() + // Only advance the repHead if this Settle or Fail is + // still at the head of the queue. + if m.repHead != nil && m.repHead == nextRepEl { + m.repHead = m.repHead.Next() + } m.pktCond.L.Unlock() + + case addOutbox <- add: + m.pktCond.L.Lock() + // Only advance the addHead if this Add is still at the + // head of the queue. + if m.addHead != nil && m.addHead == nextAddEl { + m.addHead = m.addHead.Next() + } + m.pktCond.L.Unlock() + + case <-deadline: + log.Debugf("Expiring add htlc with "+ + "keystone=%v", add.keystone()) + m.FailAdd(add) + + case pktDone := <-m.pktReset: + m.pktCond.L.Lock() + m.repHead = m.repPkts.Front() + m.addHead = m.addPkts.Front() + m.pktCond.L.Unlock() + + close(pktDone) + + case <-m.quit: + return } - - // With the next message obtained, we'll now select to attempt - // to deliver the message. If we receive a kill signal, then - // we'll bail out. - switch cType { - case wireCourier: - select { - case m.messageOutbox <- nextMsg: - case msgDone := <-m.msgReset: - m.wireCond.L.Lock() - m.wireMessages.Init() - m.wireCond.L.Unlock() - - close(msgDone) - case <-m.quit: - return - } - - case pktCourier: - var ( - pktOutbox chan *htlcPacket - addOutbox chan *htlcPacket - add *htlcPacket - deadline <-chan time.Time - ) - - // Prioritize delivery of Settle/Fail packets over Adds. - // This ensures that we actively clear the commitment of - // existing HTLCs before trying to add new ones. This - // can help to improve forwarding performance since the - // time to sign a commitment is linear in the number of - // HTLCs manifested on the commitments. - // - // NOTE: Both types are eventually delivered over the - // same channel, but we can control which is delivered - // by exclusively making one nil and the other non-nil. - // We know from our loop condition that at least one - // nextRep and nextAdd are non-nil. - if nextRep != nil { - pktOutbox = m.pktOutbox - } else { - addOutbox = m.pktOutbox - } - - // If we have a pending Add, we'll also construct the - // deadline so we can fail it back if we are unable to - // deliver any message in time. We also dereference the - // nextAdd's packet, since we will need access to it in - // the case we are delivering it and/or if the deadline - // expires. - // - // NOTE: It's possible after this point for add to be - // nil, but this can only occur when addOutbox is also - // nil, hence we won't accidentally deliver a nil - // packet. - if nextAdd != nil { - add = nextAdd.pkt - deadline = nextAdd.deadline(m.cfg.clock) - } - - select { - case pktOutbox <- nextRep: - m.pktCond.L.Lock() - // Only advance the repHead if this Settle or - // Fail is still at the head of the queue. - if m.repHead != nil && m.repHead == nextRepEl { - m.repHead = m.repHead.Next() - } - m.pktCond.L.Unlock() - - case addOutbox <- add: - m.pktCond.L.Lock() - // Only advance the addHead if this Add is still - // at the head of the queue. - if m.addHead != nil && m.addHead == nextAddEl { - m.addHead = m.addHead.Next() - } - m.pktCond.L.Unlock() - - case <-deadline: - log.Debugf("Expiring add htlc with "+ - "keystone=%v", add.keystone()) - m.FailAdd(add) - - case pktDone := <-m.pktReset: - m.pktCond.L.Lock() - m.repHead = m.repPkts.Front() - m.addHead = m.addPkts.Front() - m.pktCond.L.Unlock() - - close(pktDone) - - case <-m.quit: - return - } - } - } }