htlcswitch: introduce and embed packetHandler interface in ChannelLink

This will allow separating the now-private *htlcPacket methods from
the publicly-used ChannelLink interface methods.
This commit is contained in:
eugene 2021-08-03 14:59:15 -04:00
parent 6c6e353597
commit 051cd8793a
No known key found for this signature in database
GPG Key ID: 118759E83439A9B1
6 changed files with 43 additions and 37 deletions

View File

@ -41,6 +41,22 @@ type InvoiceDatabase interface {
HodlUnsubscribeAll(subscriber chan<- interface{}) HodlUnsubscribeAll(subscriber chan<- interface{})
} }
// packetHandler is an interface used exclusively by the Switch to handle
// htlcPacket and pass them to the link implementation.
type packetHandler interface {
// handleSwitchPacket handles the switch packets. These packets might
// be forwarded to us from another channel link in case the htlc
// update came from another peer or if the update was created by user
// initially.
//
// NOTE: This function should block as little as possible.
handleSwitchPacket(*htlcPacket) error
// handleLocalAddPacket handles a locally-initiated UpdateAddHTLC
// packet. It will be processed synchronously.
handleLocalAddPacket(*htlcPacket) error
}
// ChannelLink is an interface which represents the subsystem for managing the // ChannelLink is an interface which represents the subsystem for managing the
// incoming htlc requests, applying the changes to the channel, and also // incoming htlc requests, applying the changes to the channel, and also
// propagating/forwarding it to htlc switch. // propagating/forwarding it to htlc switch.
@ -62,18 +78,8 @@ type InvoiceDatabase interface {
type ChannelLink interface { type ChannelLink interface {
// TODO(roasbeef): modify interface to embed mail boxes? // TODO(roasbeef): modify interface to embed mail boxes?
// HandleSwitchPacket handles the switch packets. This packets might be // Embed the packetHandler interface.
// forwarded to us from another channel link in case the htlc update packetHandler
// came from another peer or if the update was created by user
// initially.
//
// NOTE: This function MUST be non-blocking (or block as little as
// possible).
HandleSwitchPacket(*htlcPacket) error
// HandleLocalAddPacket handles a locally-initiated UpdateAddHTLC
// packet. It will be processed synchronously.
HandleLocalAddPacket(*htlcPacket) error
// HandleChannelUpdate handles the htlc requests as settle/add/fail // HandleChannelUpdate handles the htlc requests as settle/add/fail
// which sent to us from remote peer we have a channel with. // which sent to us from remote peer we have a channel with.

View File

@ -2387,23 +2387,23 @@ func (l *channelLink) String() string {
return l.channel.ChannelPoint().String() return l.channel.ChannelPoint().String()
} }
// HandleSwitchPacket handles the switch packets. This packets which might be // handleSwitchPacket handles the switch packets. This packets which might be
// forwarded to us from another channel link in case the htlc update came from // forwarded to us from another channel link in case the htlc update came from
// another peer or if the update was created by user // another peer or if the update was created by user
// //
// NOTE: Part of the ChannelLink interface. // NOTE: Part of the packetHandler interface.
func (l *channelLink) HandleSwitchPacket(pkt *htlcPacket) error { func (l *channelLink) handleSwitchPacket(pkt *htlcPacket) error {
l.log.Tracef("received switch packet inkey=%v, outkey=%v", l.log.Tracef("received switch packet inkey=%v, outkey=%v",
pkt.inKey(), pkt.outKey()) pkt.inKey(), pkt.outKey())
return l.mailBox.AddPacket(pkt) return l.mailBox.AddPacket(pkt)
} }
// HandleLocalAddPacket handles a locally-initiated UpdateAddHTLC packet. It // handleLocalAddPacket handles a locally-initiated UpdateAddHTLC packet. It
// will be processed synchronously. // will be processed synchronously.
// //
// NOTE: Part of the ChannelLink interface. // NOTE: Part of the packetHandler interface.
func (l *channelLink) HandleLocalAddPacket(pkt *htlcPacket) error { func (l *channelLink) handleLocalAddPacket(pkt *htlcPacket) error {
l.log.Tracef("received switch packet outkey=%v", pkt.outKey()) l.log.Tracef("received switch packet outkey=%v", pkt.outKey())
// Create a buffered result channel to prevent the link from blocking. // Create a buffered result channel to prevent the link from blocking.

View File

@ -56,7 +56,7 @@ func (l *linkTestContext) sendHtlcAliceToBob(htlcID int,
l.t.Fatalf("expected 1 adds, found %d", len(fwdActions.Adds)) l.t.Fatalf("expected 1 adds, found %d", len(fwdActions.Adds))
} }
err = l.aliceLink.HandleSwitchPacket(&htlcPacket{ err = l.aliceLink.handleSwitchPacket(&htlcPacket{
incomingHTLCID: uint64(htlcID), incomingHTLCID: uint64(htlcID),
htlc: htlc, htlc: htlc,
}) })

View File

@ -2247,7 +2247,7 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) {
} }
addPkt.circuit = &circuit addPkt.circuit = &circuit
if err := aliceLink.HandleSwitchPacket(&addPkt); err != nil { if err := aliceLink.handleSwitchPacket(&addPkt); err != nil {
t.Fatalf("unable to handle switch packet: %v", err) t.Fatalf("unable to handle switch packet: %v", err)
} }
time.Sleep(time.Millisecond * 500) time.Sleep(time.Millisecond * 500)
@ -2327,7 +2327,7 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) {
} }
addPkt.circuit = &circuit addPkt.circuit = &circuit
if err := aliceLink.HandleSwitchPacket(&addPkt); err != nil { if err := aliceLink.handleSwitchPacket(&addPkt); err != nil {
t.Fatalf("unable to handle switch packet: %v", err) t.Fatalf("unable to handle switch packet: %v", err)
} }
time.Sleep(time.Millisecond * 500) time.Sleep(time.Millisecond * 500)
@ -2467,7 +2467,7 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) {
obfuscator: NewMockObfuscator(), obfuscator: NewMockObfuscator(),
} }
if err := aliceLink.HandleSwitchPacket(&settlePkt); err != nil { if err := aliceLink.handleSwitchPacket(&settlePkt); err != nil {
t.Fatalf("unable to handle switch packet: %v", err) t.Fatalf("unable to handle switch packet: %v", err)
} }
time.Sleep(time.Millisecond * 500) time.Sleep(time.Millisecond * 500)
@ -2571,7 +2571,7 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) {
obfuscator: NewMockObfuscator(), obfuscator: NewMockObfuscator(),
} }
if err := aliceLink.HandleSwitchPacket(&failPkt); err != nil { if err := aliceLink.handleSwitchPacket(&failPkt); err != nil {
t.Fatalf("unable to handle switch packet: %v", err) t.Fatalf("unable to handle switch packet: %v", err)
} }
time.Sleep(time.Millisecond * 500) time.Sleep(time.Millisecond * 500)
@ -2709,7 +2709,7 @@ func TestChannelLinkTrimCircuitsPending(t *testing.T) {
// Since both were committed successfully, we will now deliver them to // Since both were committed successfully, we will now deliver them to
// Alice's link. // Alice's link.
for _, addPkt := range addPkts[:halfHtlcs] { for _, addPkt := range addPkts[:halfHtlcs] {
if err := alice.link.HandleSwitchPacket(addPkt); err != nil { if err := alice.link.handleSwitchPacket(addPkt); err != nil {
t.Fatalf("unable to handle switch packet: %v", err) t.Fatalf("unable to handle switch packet: %v", err)
} }
} }
@ -2794,7 +2794,7 @@ func TestChannelLinkTrimCircuitsPending(t *testing.T) {
// Deliver the latter two HTLCs to Alice's links so that they can be // Deliver the latter two HTLCs to Alice's links so that they can be
// processed and added to the in-memory commitment state. // processed and added to the in-memory commitment state.
for _, addPkt := range addPkts[halfHtlcs:] { for _, addPkt := range addPkts[halfHtlcs:] {
if err := alice.link.HandleSwitchPacket(addPkt); err != nil { if err := alice.link.handleSwitchPacket(addPkt); err != nil {
t.Fatalf("unable to handle switch packet: %v", err) t.Fatalf("unable to handle switch packet: %v", err)
} }
} }
@ -2989,7 +2989,7 @@ func TestChannelLinkTrimCircuitsNoCommit(t *testing.T) {
// Since both were committed successfully, we will now deliver them to // Since both were committed successfully, we will now deliver them to
// Alice's link. // Alice's link.
for _, addPkt := range addPkts[:halfHtlcs] { for _, addPkt := range addPkts[:halfHtlcs] {
if err := alice.link.HandleSwitchPacket(addPkt); err != nil { if err := alice.link.handleSwitchPacket(addPkt); err != nil {
t.Fatalf("unable to handle switch packet: %v", err) t.Fatalf("unable to handle switch packet: %v", err)
} }
} }
@ -3082,7 +3082,7 @@ func TestChannelLinkTrimCircuitsNoCommit(t *testing.T) {
// Deliver the last two HTLCs to the link via Alice's mailbox. // Deliver the last two HTLCs to the link via Alice's mailbox.
for _, addPkt := range addPkts[halfHtlcs:] { for _, addPkt := range addPkts[halfHtlcs:] {
if err := alice.link.HandleSwitchPacket(addPkt); err != nil { if err := alice.link.handleSwitchPacket(addPkt); err != nil {
t.Fatalf("unable to handle switch packet: %v", err) t.Fatalf("unable to handle switch packet: %v", err)
} }
} }
@ -3249,7 +3249,7 @@ func TestChannelLinkTrimCircuitsRemoteCommit(t *testing.T) {
// Since both were committed successfully, we will now deliver them to // Since both were committed successfully, we will now deliver them to
// Alice's link. // Alice's link.
for _, addPkt := range addPkts { for _, addPkt := range addPkts {
if err := alice.link.HandleSwitchPacket(addPkt); err != nil { if err := alice.link.handleSwitchPacket(addPkt); err != nil {
t.Fatalf("unable to handle switch packet: %v", err) t.Fatalf("unable to handle switch packet: %v", err)
} }
} }
@ -3390,7 +3390,7 @@ func TestChannelLinkBandwidthChanReserve(t *testing.T) {
t.Fatalf("unable to commit circuit: %v", err) t.Fatalf("unable to commit circuit: %v", err)
} }
aliceLink.HandleSwitchPacket(addPkt) _ = aliceLink.handleSwitchPacket(addPkt)
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt-htlcFee) assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt-htlcFee)
@ -5139,7 +5139,7 @@ func TestChannelLinkCleanupSpuriousResponses(t *testing.T) {
obfuscator: NewMockObfuscator(), obfuscator: NewMockObfuscator(),
htlc: &lnwire.UpdateFailHTLC{}, htlc: &lnwire.UpdateFailHTLC{},
} }
aliceLink.HandleSwitchPacket(fail0) _ = aliceLink.handleSwitchPacket(fail0)
// Bob Alice // Bob Alice
// |<----- fal-1 ------| // |<----- fal-1 ------|
@ -5199,7 +5199,7 @@ func TestChannelLinkCleanupSpuriousResponses(t *testing.T) {
obfuscator: NewMockObfuscator(), obfuscator: NewMockObfuscator(),
htlc: &lnwire.UpdateFailHTLC{}, htlc: &lnwire.UpdateFailHTLC{},
} }
aliceLink.HandleSwitchPacket(fail1) _ = aliceLink.handleSwitchPacket(fail1)
// Bob Alice // Bob Alice
// |<----- fal-1 ------| // |<----- fal-1 ------|
@ -5256,7 +5256,7 @@ func TestChannelLinkCleanupSpuriousResponses(t *testing.T) {
// this should trigger an attempt to cleanup the spurious response. // this should trigger an attempt to cleanup the spurious response.
// However, we expect it to result in a NOP since it is still missing // However, we expect it to result in a NOP since it is still missing
// its sourceRef. // its sourceRef.
aliceLink.HandleSwitchPacket(fail0) _ = aliceLink.handleSwitchPacket(fail0)
// Allow the link enough time to process and reject the duplicate // Allow the link enough time to process and reject the duplicate
// packet, we'll also check that this doesn't trigger Alice to send the // packet, we'll also check that this doesn't trigger Alice to send the
@ -5311,7 +5311,7 @@ func TestChannelLinkCleanupSpuriousResponses(t *testing.T) {
obfuscator: NewMockObfuscator(), obfuscator: NewMockObfuscator(),
htlc: &lnwire.UpdateFailHTLC{}, htlc: &lnwire.UpdateFailHTLC{},
} }
aliceLink.HandleSwitchPacket(fail0) _ = aliceLink.handleSwitchPacket(fail0)
// Allow the link enough time to process and reject the duplicate // Allow the link enough time to process and reject the duplicate
// packet, we'll also check that this doesn't trigger Alice to send the // packet, we'll also check that this doesn't trigger Alice to send the

View File

@ -706,12 +706,12 @@ func newMockChannelLink(htlcSwitch *Switch, chanID lnwire.ChannelID,
} }
} }
func (f *mockChannelLink) HandleSwitchPacket(pkt *htlcPacket) error { func (f *mockChannelLink) handleSwitchPacket(pkt *htlcPacket) error {
f.mailBox.AddPacket(pkt) f.mailBox.AddPacket(pkt)
return nil return nil
} }
func (f *mockChannelLink) HandleLocalAddPacket(pkt *htlcPacket) error { func (f *mockChannelLink) handleLocalAddPacket(pkt *htlcPacket) error {
_ = f.mailBox.AddPacket(pkt) _ = f.mailBox.AddPacket(pkt)
return nil return nil
} }

View File

@ -509,7 +509,7 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, attemptID uint64,
return linkErr return linkErr
} }
return link.HandleLocalAddPacket(packet) return link.handleLocalAddPacket(packet)
} }
// UpdateForwardingPolicies sends a message to the switch to update the // UpdateForwardingPolicies sends a message to the switch to update the
@ -1101,7 +1101,7 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error {
// Send the packet to the destination channel link which // Send the packet to the destination channel link which
// manages the channel. // manages the channel.
packet.outgoingChanID = destination.ShortChanID() packet.outgoingChanID = destination.ShortChanID()
return destination.HandleSwitchPacket(packet) return destination.handleSwitchPacket(packet)
case *lnwire.UpdateFailHTLC, *lnwire.UpdateFulfillHTLC: case *lnwire.UpdateFailHTLC, *lnwire.UpdateFulfillHTLC:
// If the source of this packet has not been set, use the // If the source of this packet has not been set, use the