htlcswitch/switch: segment pending links from live links

This commit is contained in:
Conner Fromknecht
2018-05-01 16:30:30 -07:00
parent 1dace1fed4
commit d3403306b6

View File

@@ -175,17 +175,20 @@ type Switch struct {
// forward the settle/fail htlc updates back to the add htlc initiator. // forward the settle/fail htlc updates back to the add htlc initiator.
circuits CircuitMap circuits CircuitMap
// mailMtx is a read/write mutex that protects the mailboxes map. // mailOrchestrator manages the lifecycle of mailboxes used throughout
mailMtx sync.RWMutex // the switch, and facilitates delayed delivery of packets to links that
// later come online.
// mailboxes is a map of channel id to mailboxes, which allows the mailOrchestrator *mailOrchestrator
// switch to buffer messages for peers that have not come back online.
mailboxes map[lnwire.ShortChannelID]MailBox
// indexMtx is a read/write mutex that protects the set of indexes // indexMtx is a read/write mutex that protects the set of indexes
// below. // below.
indexMtx sync.RWMutex indexMtx sync.RWMutex
// pendingLinkIndex holds links that have not had their final, live
// short_chan_id assigned. These links can be transitioned into the
// primary linkIndex by using UpdateShortChanID to load their live id.
pendingLinkIndex map[lnwire.ChannelID]ChannelLink
// links is a map of channel id and channel link which manages // links is a map of channel id and channel link which manages
// this channel. // this channel.
linkIndex map[lnwire.ChannelID]ChannelLink linkIndex map[lnwire.ChannelID]ChannelLink
@@ -248,9 +251,10 @@ func New(cfg Config) (*Switch, error) {
circuits: circuitMap, circuits: circuitMap,
paymentSequencer: sequencer, paymentSequencer: sequencer,
linkIndex: make(map[lnwire.ChannelID]ChannelLink), linkIndex: make(map[lnwire.ChannelID]ChannelLink),
mailboxes: make(map[lnwire.ShortChannelID]MailBox), mailOrchestrator: newMailOrchestrator(),
forwardingIndex: make(map[lnwire.ShortChannelID]ChannelLink), forwardingIndex: make(map[lnwire.ShortChannelID]ChannelLink),
interfaceIndex: make(map[[33]byte]map[ChannelLink]struct{}), interfaceIndex: make(map[[33]byte]map[ChannelLink]struct{}),
pendingLinkIndex: make(map[lnwire.ChannelID]ChannelLink),
pendingPayments: make(map[uint64]*pendingPayment), pendingPayments: make(map[uint64]*pendingPayment),
htlcPlex: make(chan *plexPacket), htlcPlex: make(chan *plexPacket),
chanCloseRequests: make(chan *ChanClose), chanCloseRequests: make(chan *ChanClose),
@@ -1089,8 +1093,7 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error {
// Check to see that the source link is online before removing // Check to see that the source link is online before removing
// the circuit. // the circuit.
sourceMailbox := s.getOrCreateMailBox(packet.incomingChanID) return s.mailOrchestrator.Deliver(packet.incomingChanID, packet)
return sourceMailbox.AddPacket(packet)
default: default:
return errors.New("wrong update type") return errors.New("wrong update type")
@@ -1116,16 +1119,18 @@ func (s *Switch) failAddPacket(packet *htlcPacket,
log.Error(failErr) log.Error(failErr)
// Route a fail packet back to the source link. failPkt := &htlcPacket{
sourceMailbox := s.getOrCreateMailBox(packet.incomingChanID)
if err = sourceMailbox.AddPacket(&htlcPacket{
incomingChanID: packet.incomingChanID, incomingChanID: packet.incomingChanID,
incomingHTLCID: packet.incomingHTLCID, incomingHTLCID: packet.incomingHTLCID,
circuit: packet.circuit, circuit: packet.circuit,
htlc: &lnwire.UpdateFailHTLC{ htlc: &lnwire.UpdateFailHTLC{
Reason: reason, Reason: reason,
}, },
}); err != nil { }
// Route a fail packet back to the source link.
err = s.mailOrchestrator.Deliver(failPkt.incomingChanID, failPkt)
if err != nil {
err = errors.Errorf("source chanid=%v unable to "+ err = errors.Errorf("source chanid=%v unable to "+
"handle switch packet: %v", "handle switch packet: %v",
packet.incomingChanID, err) packet.incomingChanID, err)
@@ -1343,6 +1348,12 @@ func (s *Switch) htlcForwarder() {
"channel link on stop: %v", err) "channel link on stop: %v", err)
} }
} }
for _, link := range s.pendingLinkIndex {
if err := s.removeLink(link.ChanID()); err != nil {
log.Errorf("unable to remove pending "+
"channel link on stop: %v", err)
}
}
s.indexMtx.Unlock() s.indexMtx.Unlock()
// Before we exit fully, we'll attempt to flush out any // Before we exit fully, we'll attempt to flush out any
@@ -1699,11 +1710,7 @@ func (s *Switch) Stop() error {
// Wait until all active goroutines have finished exiting before // Wait until all active goroutines have finished exiting before
// stopping the mailboxes, otherwise the mailbox map could still be // stopping the mailboxes, otherwise the mailbox map could still be
// accessed and modified. // accessed and modified.
for _, mailBox := range s.mailboxes { s.mailOrchestrator.Stop()
mailBox.Stop()
}
for _, mailBox := range s.pendingMailboxes {
mailBox.Stop()
return nil return nil
} }
@@ -1714,64 +1721,67 @@ func (s *Switch) AddLink(link ChannelLink) error {
s.indexMtx.Lock() s.indexMtx.Lock()
defer s.indexMtx.Unlock() defer s.indexMtx.Unlock()
// First we'll add the link to the linkIndex which lets us quickly look chanID := link.ChanID()
// up a channel when we need to close or register it, and the
// forwarding index which'll be used when forwarding HTLC's in the // Get and attach the mailbox for this link, which buffers packets in
// multi-hop setting. // case there packets that we tried to deliver while this link was
// offline.
mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID)
link.AttachMailBox(mailbox)
if err := link.Start(); err != nil {
s.removeLink(chanID)
return err
}
shortChanID := link.ShortChanID()
if shortChanID == sourceHop {
log.Infof("Adding pending link chan_id=%v, short_chan_id=%v",
chanID, shortChanID)
s.pendingLinkIndex[chanID] = link
} else {
log.Infof("Adding live link chan_id=%v, short_chan_id=%v",
chanID, shortChanID)
s.addLiveLink(link)
s.mailOrchestrator.BindLiveShortChanID(
mailbox, chanID, shortChanID,
)
}
return nil
}
// addLiveLink adds a link to all associated forwarding index, this makes it a
// candidate for forwarding HTLCs.
func (s *Switch) addLiveLink(link ChannelLink) {
// We'll add the link to the linkIndex which lets us quickly
// look up a channel when we need to close or register it, and
// the forwarding index which'll be used when forwarding HTLC's
// in the multi-hop setting.
s.linkIndex[link.ChanID()] = link s.linkIndex[link.ChanID()] = link
s.forwardingIndex[link.ShortChanID()] = link s.forwardingIndex[link.ShortChanID()] = link
// Next we'll add the link to the interface index so we can quickly // Next we'll add the link to the interface index so we can
// look up all the channels for a particular node. // quickly look up all the channels for a particular node.
peerPub := link.Peer().PubKey() peerPub := link.Peer().PubKey()
if _, ok := s.interfaceIndex[peerPub]; !ok { if _, ok := s.interfaceIndex[peerPub]; !ok {
s.interfaceIndex[peerPub] = make(map[ChannelLink]struct{}) s.interfaceIndex[peerPub] = make(map[ChannelLink]struct{})
} }
s.interfaceIndex[peerPub][link] = struct{}{} s.interfaceIndex[peerPub][link] = struct{}{}
// Get the mailbox for this link, which buffers packets in case there
// packets that we tried to deliver while this link was offline.
mailbox := s.getOrCreateMailBox(link.ShortChanID())
// Give the link its mailbox, we only need to start the mailbox if it
// wasn't previously found.
link.AttachMailBox(mailbox)
if err := link.Start(); err != nil {
s.removeLink(link.ChanID())
return err
}
log.Infof("Added channel link with chan_id=%v, short_chan_id=(%v)",
link.ChanID(), spew.Sdump(link.ShortChanID()))
return nil
} }
// getOrCreateMailBox returns the known mailbox for a particular short channel // removeLiveLink removes a link from all associated forwarding indexes, this
// id, or creates one if the link has no existing mailbox. // prevents it from being a candidate in forwarding.
func (s *Switch) getOrCreateMailBox(chanID lnwire.ShortChannelID) MailBox { func (s *Switch) removeLiveLink(link ChannelLink) {
// Check to see if we have a mailbox already populated for this link. // Remove the channel from live link indexes.
s.mailMtx.RLock() delete(s.linkIndex, link.ChanID())
mailbox, ok := s.mailboxes[chanID] delete(s.forwardingIndex, link.ShortChanID())
if ok {
s.mailMtx.RUnlock()
return mailbox
}
s.mailMtx.RUnlock()
// Otherwise, we will make a new one only if the mailbox still is not // Remove the channel from channel index.
// present after the exclusive mutex is acquired. peerPub := link.Peer().PubKey()
s.mailMtx.Lock() delete(s.interfaceIndex, peerPub)
mailbox, ok = s.mailboxes[chanID]
if !ok {
mailbox = newMemoryMailBox()
mailbox.Start()
s.mailboxes[chanID] = mailbox
}
s.mailMtx.Unlock()
return mailbox
} }
// GetLink is used to initiate the handling of the get link command. The // GetLink is used to initiate the handling of the get link command. The
@@ -1782,7 +1792,10 @@ func (s *Switch) GetLink(chanID lnwire.ChannelID) (ChannelLink, error) {
link, ok := s.linkIndex[chanID] link, ok := s.linkIndex[chanID]
if !ok { if !ok {
return nil, ErrChannelLinkNotFound link, ok = s.pendingLinkIndex[chanID]
if !ok {
return nil, ErrChannelLinkNotFound
}
} }
return link, nil return link, nil
@@ -1817,52 +1830,68 @@ func (s *Switch) removeLink(chanID lnwire.ChannelID) error {
log.Infof("Removing channel link with ChannelID(%v)", chanID) log.Infof("Removing channel link with ChannelID(%v)", chanID)
link, ok := s.linkIndex[chanID] link, ok := s.linkIndex[chanID]
if !ok { if ok {
return ErrChannelLinkNotFound s.removeLiveLink(link)
link.Stop()
return nil
} }
// Remove the channel from channel map. link, ok = s.pendingLinkIndex[chanID]
delete(s.linkIndex, chanID) if ok {
delete(s.forwardingIndex, link.ShortChanID()) delete(s.pendingLinkIndex, chanID)
link.Stop()
// Remove the channel from channel index. return nil
peerPub := link.Peer().PubKey() }
delete(s.interfaceIndex, peerPub)
link.Stop() return ErrChannelLinkNotFound
return nil
} }
// UpdateShortChanID updates the short chan ID for an existing channel. This is // UpdateShortChanID updates the short chan ID for an existing channel. This is
// required in the case of a re-org and re-confirmation or a channel, or in the // required in the case of a re-org and re-confirmation or a channel, or in the
// case that a link was added to the switch before its short chan ID was known. // case that a link was added to the switch before its short chan ID was known.
func (s *Switch) UpdateShortChanID(chanID lnwire.ChannelID, func (s *Switch) UpdateShortChanID(chanID lnwire.ChannelID) error {
shortChanID lnwire.ShortChannelID) error {
s.indexMtx.Lock() s.indexMtx.Lock()
defer s.indexMtx.Unlock()
// First, we'll extract the current link as is from the link // Locate the target link in the pending link index. If no such link
// index. If the link isn't even in the index, then we'll return an // exists, then we will ignore the request.
// error. link, ok := s.pendingLinkIndex[chanID]
link, ok := s.linkIndex[chanID]
if !ok { if !ok {
s.indexMtx.Unlock()
return fmt.Errorf("link %v not found", chanID) return fmt.Errorf("link %v not found", chanID)
} }
log.Infof("Updating short_chan_id for ChannelLink(%v): old=%v, new=%v", oldShortChanID := link.ShortChanID()
chanID, link.ShortChanID(), shortChanID)
// At this point the link is actually active, so we'll update the // Try to update the link's short channel ID, returning early if this
// forwarding index with the next short channel ID. // update failed.
s.forwardingIndex[shortChanID] = link shortChanID, err := link.UpdateShortChanID()
if err != nil {
return err
}
s.indexMtx.Unlock() // Reject any blank short channel ids.
if shortChanID == sourceHop {
return fmt.Errorf("refusing trivial short_chan_id for chan_id=%v"+
"live link", chanID)
}
// Finally, we'll notify the link of its new short channel ID. log.Infof("Updated short_chan_id for ChannelLink(%v): old=%v, new=%v",
link.UpdateShortChanID(shortChanID) chanID, oldShortChanID, shortChanID)
// Since the link was in the pending state before, we will remove it
// from the pending link index and add it to the live link index so that
// it can be available in forwarding.
delete(s.pendingLinkIndex, chanID)
s.addLiveLink(link)
// Finally, alert the mail orchestrator to the change of short channel
// ID, and deliver any unclaimed packets to the link.
mailbox := s.mailOrchestrator.GetOrCreateMailBox(chanID)
s.mailOrchestrator.BindLiveShortChanID(
mailbox, chanID, shortChanID,
)
return nil return nil
} }