diff --git a/breacharbiter.go b/breacharbiter.go index 1f93c6fec..1c1995dfc 100644 --- a/breacharbiter.go +++ b/breacharbiter.go @@ -7,6 +7,7 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/lnwallet" "github.com/roasbeef/btcd/chaincfg/chainhash" "github.com/roasbeef/btcd/txscript" @@ -27,8 +28,8 @@ type breachArbiter struct { db *channeldb.DB notifier chainntnfs.ChainNotifier chainIO lnwallet.BlockChainIO - htlcSwitch *htlcSwitch estimator lnwallet.FeeEstimator + htlcSwitch *htlcswitch.Switch // breachObservers is a map which tracks all the active breach // observers we're currently managing. The key of the map is the @@ -64,7 +65,7 @@ type breachArbiter struct { // newBreachArbiter creates a new instance of a breachArbiter initialized with // its dependent objects. func newBreachArbiter(wallet *lnwallet.LightningWallet, db *channeldb.DB, - notifier chainntnfs.ChainNotifier, h *htlcSwitch, + notifier chainntnfs.ChainNotifier, h *htlcswitch.Switch, chain lnwallet.BlockChainIO, fe lnwallet.FeeEstimator) *breachArbiter { return &breachArbiter{ @@ -482,7 +483,7 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, // breached in order to ensure any incoming or outgoing // multi-hop HTLCs aren't sent over this link, nor any other // links associated with this peer. - b.htlcSwitch.CloseLink(chanPoint, CloseBreach) + b.htlcSwitch.CloseLink(chanPoint, htlcswitch.CloseBreach) chanInfo := contract.StateSnapshot() closeInfo := &channeldb.ChannelCloseSummary{ ChanPoint: *chanPoint, diff --git a/htlcswitch.go b/htlcswitch.go deleted file mode 100644 index 94a1d63a3..000000000 --- a/htlcswitch.go +++ /dev/null @@ -1,952 +0,0 @@ -package main - -import ( - "crypto/sha256" - "encoding/hex" - "fmt" - "sync" - "sync/atomic" - "time" - - "github.com/davecgh/go-spew/spew" - "github.com/lightningnetwork/lightning-onion" - "github.com/lightningnetwork/lnd/channeldb" - "github.com/lightningnetwork/lnd/lnrpc" - "github.com/lightningnetwork/lnd/lnwallet" - "github.com/lightningnetwork/lnd/lnwire" - "github.com/roasbeef/btcd/btcec" - "github.com/roasbeef/btcd/chaincfg/chainhash" - "github.com/roasbeef/btcd/wire" - "github.com/roasbeef/btcutil" - "golang.org/x/crypto/ripemd160" -) - -const ( - // htlcQueueSize... - // buffer bloat ;) - htlcQueueSize = 50 -) - -var ( - zeroBytes [32]byte -) - -// boundedLinkChan is a simple wrapper around a link's communication channel -// that bounds the total flow into and through the channel. Channels attached -// the link have a value which defines the max number of pending HTLC's present -// within the commitment transaction. Using this struct we establish a -// synchronization primitive that ensure we don't send additional htlcPackets -// to a link if the max limit has een reached. Once HTLC's are cleared from the -// commitment transaction, slots are freed up and more can proceed. -type boundedLinkChan struct { - // slots is a buffered channel whose buffer is the total number of - // outstanding HTLC's we can add to a link's commitment transaction. - // This channel is essentially used as a semaphore. - slots chan struct{} - - // linkChan is a channel that is connected to the channel state machine - // for a link. The switch will send adds, settles, and cancels over - // this channel. - linkChan chan *htlcPacket -} - -// newBoundedChan makes a new boundedLinkChan that has numSlots free slots that -// are depleted on each send until a slot is re-stored. linkChan is the -// underlying channel that will be sent upon. -func newBoundedLinkChan(numSlots uint32, - linkChan chan *htlcPacket) *boundedLinkChan { - - b := &boundedLinkChan{ - slots: make(chan struct{}, numSlots), - linkChan: linkChan, - } - - b.restoreSlots(numSlots) - return b -} - -// sendAndConsume sends a packet to the linkChan and consumes a single token in -// the process. -// -// TODO(roasbeef): add error fall through case? -func (b *boundedLinkChan) sendAndConsume(pkt *htlcPacket) { - <-b.slots - b.linkChan <- pkt -} - -// sendAndRestore sends a packet to the linkChan and consumes a single token in -// the process. This method is called when the switch sends either a cancel or -// settle HTLC message to the link. -func (b *boundedLinkChan) sendAndRestore(pkt *htlcPacket) { - b.linkChan <- pkt - b.slots <- struct{}{} -} - -// consumeSlot consumes a single slot from the bounded channel. This method is -// called once the switch receives a new htlc add message from a link right -// before forwarding it to the next hop. -func (b *boundedLinkChan) consumeSlot() { - <-b.slots -} - -// restoreSlot restores a single slots to the bounded channel. This method is -// called once the switch receives an HTLC cancel or settle from a link. -func (b *boundedLinkChan) restoreSlot() { - b.slots <- struct{}{} -} - -// restoreSlots adds numSlots additional slots to the bounded channel. -func (b *boundedLinkChan) restoreSlots(numSlots uint32) { - for i := uint32(0); i < numSlots; i++ { - b.slots <- struct{}{} - } -} - -// link represents an active channel capable of forwarding HTLCs. Each -// active channel registered with the htlc switch creates a new link which will -// be used for forwarding outgoing HTLCs. The link also has additional -// metadata such as the current available bandwidth of the link (in satoshis) -// which aid the switch in optimally forwarding HTLCs. -type link struct { - chanID lnwire.ChannelID - - capacity btcutil.Amount - - availableBandwidth int64 // atomic - - peer *peer - - *boundedLinkChan -} - -// htlcPacket is a wrapper around an lnwire message which adds, times out, or -// settles an active HTLC. The dest field denotes the name of the interface to -// forward this htlcPacket on. -type htlcPacket struct { - sync.RWMutex - - dest chainhash.Hash - - srcLink lnwire.ChannelID - onion *sphinx.ProcessedPacket - - msg lnwire.Message - - // TODO(roasbeef): refactor and add type to pkt message - payHash [32]byte - amt btcutil.Amount - - preImage chan [32]byte - - err chan error - done chan struct{} -} - -// circuitKey uniquely identifies an active Sphinx (onion routing) circuit -// between two open channels. Currently, the rHash of the HTLC which created -// the circuit is used to uniquely identify each circuit. -// TODO(roasbeef): need to also add in the settle/clear channel points in order -// to support fragmenting payments on the link layer: 1 to N, N to N, etc. -type circuitKey [32]byte - -// paymentCircuit represents an active Sphinx (onion routing) circuit between -// two active links within the htlcSwitch. A payment circuit is created once a -// link forwards an HTLC add request which initiates the creation of the -// circuit. The onion routing information contained within this message is -// used to identify the settle/clear ends of the circuit. A circuit may be -// re-used (not torndown) in the case that multiple HTLCs with the send RHash -// are sent. -type paymentCircuit struct { - refCount uint32 - - // clear is the link the htlcSwitch will forward the HTLC add message - // that initiated the circuit to. Once the message is forwarded, the - // payment circuit is considered "active" from the POV of the switch as - // both the incoming/outgoing channels have the cleared HTLC within - // their latest state. - clear *link - - // settle is the link the htlcSwitch will forward the HTLC settle it - // receives from the outgoing peer to. Once the switch forwards the - // settle message to this link, the payment circuit is considered - // complete unless the reference count on the circuit is greater than - // 1. - settle *link -} - -// htlcSwitch is a central messaging bus for all incoming/outgoing HTLCs. -// Connected peers with active channels are treated as named interfaces which -// refer to active channels as links. A link is the switch's message -// communication point with the goroutine that manages an active channel. New -// links are registered each time a channel is created, and unregistered once -// the channel is closed. The switch manages the hand-off process for multi-hop -// HTLCs, forwarding HTLCs initiated from within the daemon, and additionally -// splitting up incoming/outgoing HTLCs to a particular interface amongst many -// links (payment fragmentation). -// TODO(roasbeef): active sphinx circuits need to be synced to disk -type htlcSwitch struct { - started int32 // atomic - shutdown int32 // atomic - - // chanIndex maps a channel's ID to a link which contains additional - // information about the channel, and additionally houses a pointer to - // the peer managing the channel. - chanIndexMtx sync.RWMutex - chanIndex map[lnwire.ChannelID]*link - - // interfaces maps a node's ID to the set of links (active channels) we - // currently have open with that peer. - // TODO(roasbeef): combine w/ onionIndex? - interfaceMtx sync.RWMutex - interfaces map[chainhash.Hash][]*link - - // onionIndex is an index used to properly forward a message to the - // next hop within a Sphinx circuit. Within the sphinx packets, the - // "next-hop" destination is encoded as the hash160 of the node's - // public key serialized in compressed format. - onionMtx sync.RWMutex - onionIndex map[[ripemd160.Size]byte][]*link - - // paymentCircuits maps a circuit key to an active payment circuit - // amongst two open channels. This map is used to properly clear/settle - // onion routed payments within the network. - paymentCircuits map[circuitKey]*paymentCircuit - - // linkControl is a channel used by connected links to notify the - // switch of a non-multi-hop triggered link state update. - linkControl chan interface{} - - // outgoingPayments is a channel that outgoing payments initiated by - // the RPC system. - outgoingPayments chan *htlcPacket - - // htlcPlex is the channel which all connected links use to coordinate - // the setup/teardown of Sphinx (onion routing) payment circuits. - // Active links forward any add/settle messages over this channel each - // state transition, sending new adds/settles which are fully locked - // in. - htlcPlex chan *htlcPacket - - // TODO(roasbeef): sampler to log sat/sec and tx/sec - - wg sync.WaitGroup - quit chan struct{} -} - -// newHtlcSwitch creates a new htlcSwitch. -func newHtlcSwitch() *htlcSwitch { - return &htlcSwitch{ - chanIndex: make(map[lnwire.ChannelID]*link), - interfaces: make(map[chainhash.Hash][]*link), - onionIndex: make(map[[ripemd160.Size]byte][]*link), - paymentCircuits: make(map[circuitKey]*paymentCircuit), - linkControl: make(chan interface{}), - htlcPlex: make(chan *htlcPacket, htlcQueueSize), - outgoingPayments: make(chan *htlcPacket, htlcQueueSize), - quit: make(chan struct{}), - } -} - -// Start starts all helper goroutines required for the operation of the switch. -func (h *htlcSwitch) Start() error { - if !atomic.CompareAndSwapInt32(&h.started, 0, 1) { - return nil - } - - hswcLog.Tracef("Starting HTLC switch") - - h.wg.Add(2) - go h.networkAdmin() - go h.htlcForwarder() - - return nil -} - -// Stop gracefully stops all active helper goroutines, then waits until they've -// exited. -func (h *htlcSwitch) Stop() error { - if !atomic.CompareAndSwapInt32(&h.shutdown, 0, 1) { - return nil - } - - hswcLog.Infof("HLTC switch shutting down") - - close(h.quit) - h.wg.Wait() - - return nil -} - -// SendHTLC queues a HTLC packet for forwarding over the designated interface. -// In the event that the interface has insufficient capacity for the payment, -// an error is returned. Additionally, if the interface cannot be found, an -// alternative error is returned. -func (h *htlcSwitch) SendHTLC(htlcPkt *htlcPacket) ([32]byte, error) { - htlcPkt.err = make(chan error, 1) - htlcPkt.done = make(chan struct{}) - htlcPkt.preImage = make(chan [32]byte, 1) - - h.outgoingPayments <- htlcPkt - - return <-htlcPkt.preImage, <-htlcPkt.err -} - -// htlcForwarder is responsible for optimally forwarding (and possibly -// fragmenting) incoming/outgoing HTLCs amongst all active interfaces and -// their links. The duties of the forwarder are similar to that of a network -// switch, in that it facilitates multi-hop payments by acting as a central -// messaging bus. The switch communicates will active links to create, manage, -// and tear down active onion routed payments. Each active channel is modeled -// as networked device with metadata such as the available payment bandwidth, -// and total link capacity. -func (h *htlcSwitch) htlcForwarder() { - // TODO(roasbeef): track pending payments here instead of within each peer? - // Examine settles/timeouts from htlcPlex. Add src to htlcPacket, key by - // (src, htlcKey). - - // TODO(roasbeef): cleared vs settled distinction - var ( - deltaNumUpdates, totalNumUpdates uint64 - - deltaSatSent, deltaSatRecv btcutil.Amount - totalSatSent, totalSatRecv btcutil.Amount - ) - logTicker := time.NewTicker(10 * time.Second) -out: - for { - select { - case htlcPkt := <-h.outgoingPayments: - dest := htlcPkt.dest - h.interfaceMtx.RLock() - chanInterface, ok := h.interfaces[dest] - h.interfaceMtx.RUnlock() - if !ok { - err := fmt.Errorf("Unable to locate link %x", - dest[:]) - hswcLog.Errorf(err.Error()) - htlcPkt.preImage <- zeroBytes - htlcPkt.err <- err - continue - } - - wireMsg := htlcPkt.msg.(*lnwire.UpdateAddHTLC) - amt := wireMsg.Amount - - // Handle this send request in a distinct goroutine in - // order to avoid a possible deadlock between the htlc - // switch and channel's htlc manager. - for _, link := range chanInterface { - // TODO(roasbeef): implement HTLC fragmentation - // * avoid full channel depletion at higher - // level (here) instead of within state - // machine? - if atomic.LoadInt64(&link.availableBandwidth) < int64(amt) { - continue - } - - hswcLog.Tracef("Sending %v to %x", amt, dest[:]) - - go func() { - link.sendAndConsume(htlcPkt) - <-htlcPkt.done - link.restoreSlot() - }() - - n := atomic.AddInt64(&link.availableBandwidth, - -int64(amt)) - hswcLog.Tracef("Decrementing link %v bandwidth to %v", - link.chanID, n) - - continue out - } - - hswcLog.Errorf("Unable to send payment, insufficient capacity") - htlcPkt.preImage <- zeroBytes - htlcPkt.err <- fmt.Errorf("Insufficient capacity") - case pkt := <-h.htlcPlex: - // TODO(roasbeef): properly account with cleared vs settled - deltaNumUpdates++ - - hswcLog.Tracef("plex packet: %v", newLogClosure(func() string { - if pkt.onion != nil { - pkt.onion.Packet.Header.EphemeralKey.Curve = nil - } - return spew.Sdump(pkt) - })) - - switch wireMsg := pkt.msg.(type) { - // A link has just forwarded us a new HTLC, therefore - // we initiate the payment circuit within our internal - // state so we can properly forward the ultimate settle - // message. - case *lnwire.UpdateAddHTLC: - payHash := wireMsg.PaymentHash - - // Create the two ends of the payment circuit - // required to ensure completion of this new - // payment. - nextHop := pkt.onion.NextHop - h.onionMtx.RLock() - clearLink, ok := h.onionIndex[nextHop] - h.onionMtx.RUnlock() - if !ok { - hswcLog.Errorf("unable to find dest end of "+ - "circuit: %x", nextHop) - - // We were unable to locate the - // next-hop as encoded within the - // Sphinx packet. Therefore, we send a - // cancellation message back to the - // source of the packet so they can - // propagate the message back to the - // origin. - cancelPkt := &htlcPacket{ - payHash: payHash, - msg: &lnwire.UpdateFailHTLC{ - Reason: []byte{uint8(lnwire.UnknownDestination)}, - }, - err: make(chan error, 1), - } - - h.chanIndexMtx.RLock() - cancelLink := h.chanIndex[pkt.srcLink] - h.chanIndexMtx.RUnlock() - - cancelLink.linkChan <- cancelPkt - continue - } - - h.chanIndexMtx.RLock() - settleLink := h.chanIndex[pkt.srcLink] - h.chanIndexMtx.RUnlock() - - // As the link now has a new HTLC that's been - // propagated to us, we'll consume a slot from - // it's bounded channel. - settleLink.consumeSlot() - - // If the link we're attempting to forward the - // HTLC over has insufficient capacity, then - // we'll cancel the HTLC as the payment cannot - // succeed. - linkBandwidth := atomic.LoadInt64(&clearLink[0].availableBandwidth) - if linkBandwidth < int64(wireMsg.Amount) { - hswcLog.Errorf("unable to forward HTLC "+ - "link %v has insufficient "+ - "capacity, have %v need %v", - clearLink[0].chanID, linkBandwidth, - int64(wireMsg.Amount)) - - pkt := &htlcPacket{ - payHash: payHash, - msg: &lnwire.UpdateFailHTLC{ - Reason: []byte{uint8(lnwire.InsufficientCapacity)}, - }, - err: make(chan error, 1), - } - - // Send the cancel message along the - // link, restoring a slot in the - // bounded channel in the process. - settleLink.sendAndRestore(pkt) - continue - } - - // Examine the circuit map to see if this - // circuit is already in use or not. If so, - // then we'll simply increment the reference - // count. Otherwise, we'll create a new circuit - // from scratch. - // - // TODO(roasbeef): include dest+src+amt in key - cKey := circuitKey(wireMsg.PaymentHash) - circuit, ok := h.paymentCircuits[cKey] - if ok { - hswcLog.Debugf("Increasing ref_count "+ - "of circuit: %x, from %v to %v", - wireMsg.PaymentHash, - circuit.refCount, - circuit.refCount+1) - - circuit.refCount++ - } else { - hswcLog.Debugf("Creating onion "+ - "circuit for %x: %v<->%v", - cKey[:], clearLink[0].chanID, - settleLink.chanID) - - circuit = &paymentCircuit{ - clear: clearLink[0], - settle: settleLink, - refCount: 1, - } - - h.paymentCircuits[cKey] = circuit - } - - // With the circuit initiated, send the htlcPkt - // to the clearing link within the circuit to - // continue propagating the HTLC across the - // network. - circuit.clear.sendAndConsume(&htlcPacket{ - msg: wireMsg, - preImage: make(chan [32]byte, 1), - err: make(chan error, 1), - done: make(chan struct{}), - }) - - // Reduce the available bandwidth for the link - // as it will clear the above HTLC, increasing - // the limbo balance within the channel. - n := atomic.AddInt64(&circuit.clear.availableBandwidth, - -int64(pkt.amt)) - hswcLog.Tracef("Decrementing link %v bandwidth to %v", - circuit.clear.chanID, n) - - deltaSatRecv += pkt.amt - - // We've just received a settle message which means we - // can finalize the payment circuit by forwarding the - // settle msg to the link which initially created the - // circuit. - case *lnwire.UpdateFufillHTLC: - rHash := sha256.Sum256(wireMsg.PaymentPreimage[:]) - var cKey circuitKey - copy(cKey[:], rHash[:]) - - // If we initiated the payment then there won't - // be an active circuit to continue propagating - // the settle over. Therefore, we exit early. - circuit, ok := h.paymentCircuits[cKey] - if !ok { - hswcLog.Debugf("No existing circuit "+ - "for %x to settle", rHash[:]) - deltaSatSent += pkt.amt - continue - } - - circuit.clear.restoreSlot() - - circuit.settle.sendAndRestore(&htlcPacket{ - msg: wireMsg, - err: make(chan error, 1), - }) - - // Increase the available bandwidth for the - // link as it will settle the above HTLC, - // subtracting from the limbo balance and - // incrementing its local balance. - n := atomic.AddInt64(&circuit.settle.availableBandwidth, - int64(pkt.amt)) - hswcLog.Tracef("Incrementing link %v bandwidth to %v", - circuit.settle.chanID, n) - - deltaSatSent += pkt.amt - - if circuit.refCount--; circuit.refCount == 0 { - hswcLog.Debugf("Closing completed onion "+ - "circuit for %x: %v<->%v", rHash[:], - circuit.clear.chanID, - circuit.settle.chanID) - delete(h.paymentCircuits, cKey) - } - - // We've just received an HTLC cancellation triggered - // by an upstream peer somewhere within the ultimate - // route. In response, we'll terminate the payment - // circuit and propagate the error backwards. - case *lnwire.UpdateFailHTLC: - // In order to properly handle the error, we'll - // need to look up the original circuit that - // the incoming HTLC created. - circuit, ok := h.paymentCircuits[pkt.payHash] - if !ok { - hswcLog.Debugf("No existing circuit "+ - "for %x to cancel", pkt.payHash) - continue - } - - circuit.clear.restoreSlot() - - // Since an outgoing HTLC we sent on the clear - // link has been cancelled, we update the - // bandwidth of the clear link, restoring the - // value of the HTLC worth. - n := atomic.AddInt64(&circuit.clear.availableBandwidth, - int64(pkt.amt)) - hswcLog.Debugf("HTLC %x has been cancelled, "+ - "incrementing link %v bandwidth to %v", pkt.payHash, - circuit.clear.chanID, n) - - // With our link info updated, we now continue - // the error propagation by sending the - // cancellation message over the link that sent - // us the incoming HTLC. - circuit.settle.sendAndRestore(&htlcPacket{ - msg: wireMsg, - payHash: pkt.payHash, - err: make(chan error, 1), - }) - - if circuit.refCount--; circuit.refCount == 0 { - hswcLog.Debugf("Closing cancelled onion "+ - "circuit for %x: %v<->%v", pkt.payHash, - circuit.clear.chanID, - circuit.settle.chanID) - delete(h.paymentCircuits, pkt.payHash) - } - } - case <-logTicker.C: - if deltaNumUpdates == 0 { - continue - } - - oldSatSent := totalSatRecv - oldSatRecv := totalSatRecv - oldNumUpdates := totalNumUpdates - - newSatSent := oldSatRecv + deltaSatSent - newSatRecv := totalSatRecv + deltaSatRecv - newNumUpdates := totalNumUpdates + deltaNumUpdates - - satSent := newSatSent - oldSatSent - satRecv := newSatRecv - oldSatRecv - numUpdates := newNumUpdates - oldNumUpdates - hswcLog.Infof("Sent %v satoshis, received %v satoshis in "+ - "the last 10 seconds (%v tx/sec)", - satSent.ToUnit(btcutil.AmountSatoshi), - satRecv.ToUnit(btcutil.AmountSatoshi), - numUpdates) - - totalSatSent += deltaSatSent - deltaSatSent = 0 - - totalSatRecv += deltaSatRecv - deltaSatRecv = 0 - - totalNumUpdates += deltaNumUpdates - deltaNumUpdates = 0 - case <-h.quit: - break out - } - } - h.wg.Done() -} - -// networkAdmin is responsible for handling requests to register, unregister, -// and close any link. In the event that an unregister request leaves an -// interface with no active links, that interface is garbage collected. -func (h *htlcSwitch) networkAdmin() { -out: - for { - select { - case msg := <-h.linkControl: - switch req := msg.(type) { - case *closeLinkReq: - h.handleCloseLink(req) - case *registerLinkMsg: - h.handleRegisterLink(req) - case *unregisterLinkMsg: - h.handleUnregisterLink(req) - case *linkInfoUpdateMsg: - h.handleLinkUpdate(req) - } - case <-h.quit: - break out - } - } - h.wg.Done() -} - -// handleRegisterLink registers a new link within the channel index, and also -// adds the link to the existing set of links for the target interface. -func (h *htlcSwitch) handleRegisterLink(req *registerLinkMsg) { - chanPoint := req.linkInfo.ChannelPoint - chanID := lnwire.NewChanIDFromOutPoint(chanPoint) - newLink := &link{ - capacity: req.linkInfo.Capacity, - availableBandwidth: int64(req.linkInfo.LocalBalance), - peer: req.peer, - chanID: chanID, - } - - // To ensure we never accidentally cause an HTLC overflow, we'll limit, - // we'll use this buffered channel as as semaphore in order to limit - // the number of outstanding HTLC's we extend to the target link. - //const numSlots = (lnwallet.MaxHTLCNumber / 2) - 1 - const numSlots = lnwallet.MaxHTLCNumber - 5 - newLink.boundedLinkChan = newBoundedLinkChan(numSlots, req.linkChan) - - // First update the channel index with this new channel point. The - // channel index will be used to quickly lookup channels in order to: - // close them, update their link capacity, or possibly during multi-hop - // HTLC forwarding. - h.chanIndexMtx.Lock() - h.chanIndex[chanID] = newLink - h.chanIndexMtx.Unlock() - - interfaceID := req.peer.lightningID - - h.interfaceMtx.Lock() - h.interfaces[interfaceID] = append(h.interfaces[interfaceID], newLink) - h.interfaceMtx.Unlock() - - // Next, update the onion index which is used to look up the - // settle/clear links during multi-hop payments and to dispatch - // outgoing payments initiated by a local subsystem. - var onionID [ripemd160.Size]byte - copy(onionID[:], btcutil.Hash160(req.peer.addr.IdentityKey.SerializeCompressed())) - - h.onionMtx.Lock() - h.onionIndex[onionID] = h.interfaces[interfaceID] - h.onionMtx.Unlock() - - hswcLog.Infof("registering new link, interface=%x, onion_link=%x, "+ - "chan_point=%v, capacity=%v", interfaceID[:], onionID, - chanPoint, newLink.capacity) - - if req.done != nil { - req.done <- struct{}{} - } -} - -// handleUnregisterLink unregisters a currently active link. If the deletion of -// this link leaves the interface empty, then the interface entry itself is -// also deleted. -func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) { - hswcLog.Debugf("unregistering active link, interface=%v, chan_id=%v", - hex.EncodeToString(req.chanInterface[:]), req.chanID) - - chanInterface := req.chanInterface - - h.interfaceMtx.RLock() - links := h.interfaces[chanInterface] - h.interfaceMtx.RUnlock() - - h.chanIndexMtx.Lock() - defer h.chanIndexMtx.Unlock() - - h.onionMtx.Lock() - defer h.onionMtx.Unlock() - - // A request with a nil channel point indicates that all the current - // links for this channel should be cleared. - if req.chanID == nil { - hswcLog.Debugf("purging all active links for interface %v", - hex.EncodeToString(chanInterface[:])) - - for _, link := range links { - delete(h.chanIndex, link.chanID) - } - - links = nil - } else { - delete(h.chanIndex, *req.chanID) - - for i := 0; i < len(links); i++ { - chanLink := links[i] - if chanLink.chanID == *req.chanID { - // We perform an in-place delete by sliding - // every element down one, then slicing off the - // last element. Additionally, we update the - // slice reference within the source map to - // ensure full deletion. - copy(links[i:], links[i+1:]) - links[len(links)-1] = nil - h.interfaceMtx.Lock() - h.interfaces[chanInterface] = links[:len(links)-1] - h.interfaceMtx.Unlock() - - break - } - } - } - - if len(links) == 0 { - hswcLog.Debugf("interface %v has no active links, destroying", - hex.EncodeToString(chanInterface[:])) - - // Delete the peer from the onion index so that the - // htlcForwarder knows not to attempt to forward any further - // HTLCs in this direction. - var onionID [ripemd160.Size]byte - copy(onionID[:], btcutil.Hash160(req.remoteID)) - delete(h.onionIndex, onionID) - - // Finally, delete the interface itself so that outgoing - // payments don't select this path. - h.interfaceMtx.Lock() - delete(h.interfaces, chanInterface) - h.interfaceMtx.Unlock() - - } - - if req.done != nil { - req.done <- struct{}{} - } -} - -// handleCloseLink sends a message to the peer responsible for the target -// channel point, instructing it to initiate a cooperative channel closure. -func (h *htlcSwitch) handleCloseLink(req *closeLinkReq) { - chanID := lnwire.NewChanIDFromOutPoint(req.chanPoint) - - h.chanIndexMtx.RLock() - targetLink, ok := h.chanIndex[chanID] - h.chanIndexMtx.RUnlock() - - if !ok { - req.err <- fmt.Errorf("channel %v not found, or peer "+ - "offline", req.chanPoint) - return - } - - hswcLog.Debugf("requesting interface %v to close link %v", - hex.EncodeToString(targetLink.peer.lightningID[:]), chanID) - targetLink.peer.localCloseChanReqs <- req - - // TODO(roasbeef): if type was CloseBreach initiate force closure with - // all other channels (if any) we have with the remote peer. -} - -// handleLinkUpdate processes the link info update message by adjusting the -// channel's available bandwidth by the delta specified within the message. -func (h *htlcSwitch) handleLinkUpdate(req *linkInfoUpdateMsg) { - h.chanIndexMtx.RLock() - link, ok := h.chanIndex[req.targetLink] - h.chanIndexMtx.RUnlock() - if !ok { - hswcLog.Errorf("received link update for non-existent link: %v", - req.targetLink) - return - } - - atomic.AddInt64(&link.availableBandwidth, int64(req.bandwidthDelta)) - - hswcLog.Tracef("adjusting bandwidth of link %v by %v", req.targetLink, - req.bandwidthDelta) -} - -// registerLinkMsg is message which requests a new link to be registered. -type registerLinkMsg struct { - peer *peer - linkInfo *channeldb.ChannelSnapshot - - linkChan chan *htlcPacket - - done chan struct{} -} - -// RegisterLink requests the htlcSwitch to register a new active link. The new -// link encapsulates an active channel. The htlc plex channel is returned. The -// plex channel allows the switch to properly de-multiplex incoming/outgoing -// HTLC messages forwarding them to their proper destination in the multi-hop -// settings. -func (h *htlcSwitch) RegisterLink(p *peer, linkInfo *channeldb.ChannelSnapshot, - linkChan chan *htlcPacket) chan *htlcPacket { - - done := make(chan struct{}, 1) - req := ®isterLinkMsg{p, linkInfo, linkChan, done} - h.linkControl <- req - - <-done - - return h.htlcPlex -} - -// unregisterLinkMsg is a message which requests the active link be unregistered. -type unregisterLinkMsg struct { - chanInterface [32]byte - chanID *lnwire.ChannelID - - // remoteID is the identity public key of the node we're removing the - // link between. The public key is expected to be serialized in - // compressed form. - // TODO(roasbeef): redo interface map - remoteID []byte - - done chan struct{} -} - -// UnregisterLink requests the htlcSwitch to register the new active link. An -// unregistered link will no longer be considered a candidate to forward -// HTLCs. -func (h *htlcSwitch) UnregisterLink(remotePub *btcec.PublicKey, - chanID *lnwire.ChannelID) { - - done := make(chan struct{}, 1) - rawPub := remotePub.SerializeCompressed() - - h.linkControl <- &unregisterLinkMsg{ - chanInterface: sha256.Sum256(rawPub), - chanID: chanID, - remoteID: rawPub, - done: done, - } - - <-done -} - -// LinkCloseType is an enum which signals the type of channel closure the switch -// should execute. -type LinkCloseType uint8 - -const ( - // CloseRegular indicates a regular cooperative channel closure should - // be attempted. - CloseRegular LinkCloseType = iota - - // CloseBreach indicates that a channel breach has been detected, and - // the link should immediately be marked as unavailable. - CloseBreach -) - -// closeLinkReq represents a request to close a particular channel specified by -// its outpoint. -type closeLinkReq struct { - CloseType LinkCloseType - - chanPoint *wire.OutPoint - - updates chan *lnrpc.CloseStatusUpdate - err chan error -} - -// CloseLink closes an active link targetted by its channel point. Closing the -// link initiates a cooperative channel closure iff forceClose is false. If -// forceClose is true, then a unilateral channel closure is executed. -// TODO(roasbeef): consolidate with UnregisterLink? -func (h *htlcSwitch) CloseLink(chanPoint *wire.OutPoint, - closeType LinkCloseType) (chan *lnrpc.CloseStatusUpdate, chan error) { - - updateChan := make(chan *lnrpc.CloseStatusUpdate, 1) - errChan := make(chan error, 1) - - h.linkControl <- &closeLinkReq{ - CloseType: closeType, - chanPoint: chanPoint, - updates: updateChan, - err: errChan, - } - - return updateChan, errChan -} - -// linkInfoUpdateMsg encapsulates a request for the htlc switch to update the -// metadata related to the target link. -type linkInfoUpdateMsg struct { - targetLink lnwire.ChannelID - - bandwidthDelta btcutil.Amount -} - -// UpdateLink sends a message to the switch to update the available bandwidth -// within the link by the passed satoshi delta. This function may be used when -// re-anchoring to boost the capacity of a channel, or once a peer settles an -// HTLC invoice. -func (h *htlcSwitch) UpdateLink(chanID lnwire.ChannelID, delta btcutil.Amount) { - h.linkControl <- &linkInfoUpdateMsg{ - targetLink: chanID, - bandwidthDelta: delta, - } -} diff --git a/log.go b/log.go index af4274d02..7ef208219 100644 --- a/log.go +++ b/log.go @@ -9,6 +9,7 @@ import ( "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/discovery" + "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/routing" "github.com/roasbeef/btcd/connmgr" @@ -96,6 +97,7 @@ func useLogger(subsystemID string, logger btclog.Logger) { case "HSWC": hswcLog = logger + htlcswitch.UseLogger(logger) case "UTXN": utxnLog = logger diff --git a/peer.go b/peer.go index b68a40461..5abb40508 100644 --- a/peer.go +++ b/peer.go @@ -1,7 +1,6 @@ package main import ( - "bytes" "container/list" "crypto/sha256" "fmt" @@ -11,11 +10,16 @@ import ( "time" "github.com/davecgh/go-spew/spew" - "github.com/go-errors/errors" - "github.com/lightningnetwork/lightning-onion" "github.com/lightningnetwork/lnd/brontide" + + "github.com/btcsuite/fastsha256" + + "bytes" + + "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" @@ -24,7 +28,6 @@ import ( "github.com/roasbeef/btcd/connmgr" "github.com/roasbeef/btcd/txscript" "github.com/roasbeef/btcd/wire" - "github.com/roasbeef/btcutil" ) var ( @@ -121,16 +124,13 @@ type peer struct { activeChannels map[lnwire.ChannelID]*lnwallet.LightningChannel chanSnapshotReqs chan *chanSnapshotReq - htlcManMtx sync.RWMutex - htlcManagers map[lnwire.ChannelID]chan lnwire.Message - // newChannels is used by the fundingManager to send fully opened // channels to the source peer which handled the funding workflow. newChannels chan *newChannelMsg // localCloseChanReqs is a channel in which any local requests to close // a particular channel are sent over. - localCloseChanReqs chan *closeLinkReq + localCloseChanReqs chan *htlcswitch.ChanClose // shutdownChanReqs is used to send the Shutdown messages that initiate // the cooperative close workflow. @@ -180,11 +180,10 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server, outgoingQueue: make(chan outgoinMsg, outgoingQueueLen), activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel), - htlcManagers: make(map[lnwire.ChannelID]chan lnwire.Message), chanSnapshotReqs: make(chan *chanSnapshotReq), newChannels: make(chan *newChannelMsg, 1), - localCloseChanReqs: make(chan *closeLinkReq), + localCloseChanReqs: make(chan *htlcswitch.ChanClose), shutdownChanReqs: make(chan *lnwire.Shutdown), closingSignedChanReqs: make(chan *lnwire.ClosingSigned), @@ -310,17 +309,20 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { // Register this new channel link with the HTLC Switch. This is // necessary to properly route multi-hop payments, and forward // new payments triggered by RPC clients. - downstreamLink := make(chan *htlcPacket, 10) - plexChan := p.server.htlcSwitch.RegisterLink(p, - dbChan.Snapshot(), downstreamLink) + sphinxDecoder := htlcswitch.NewSphinxDecoder(p.server.sphinx) + link := htlcswitch.NewChannelLink( + &htlcswitch.ChannelLinkConfig{ + Peer: p, + DecodeOnion: sphinxDecoder.Decode, + SettledContracts: p.server.breachArbiter.settledContracts, + DebugHTLC: cfg.DebugHTLC, + Registry: p.server.invoices, + Switch: p.server.htlcSwitch, + }, lnChan) - upstreamLink := make(chan lnwire.Message, 10) - p.htlcManMtx.Lock() - p.htlcManagers[chanID] = upstreamLink - p.htlcManMtx.Unlock() - - p.wg.Add(1) - go p.htlcManager(lnChan, plexChan, downstreamLink, upstreamLink) + if err := p.server.htlcSwitch.AddLink(link); err != nil { + return err + } } return nil @@ -488,19 +490,15 @@ out: if isChanUpdate { sendUpdate := func() { - // Dispatch the commitment update message to - // the proper active goroutine dedicated to - // this channel. - p.htlcManMtx.RLock() - channel, ok := p.htlcManagers[targetChan] - p.htlcManMtx.RUnlock() - if !ok { + // Dispatch the commitment update message to the proper + // active goroutine dedicated to this channel. + link, err := p.server.htlcSwitch.GetLink(targetChan) + if err != nil { peerLog.Errorf("recv'd update for unknown "+ "channel %v from %v", targetChan, p) return } - - channel <- nextMsg + link.HandleChannelUpdate(nextMsg) } // Check the map of active channel streams, if this map @@ -756,7 +754,7 @@ func (p *peer) channelManager() { // a cooperative channel close. When an lnwire.Shutdown is received, // this allows the node to determine the next step to be taken in the // workflow. - chanShutdowns := make(map[lnwire.ChannelID]*closeLinkReq) + chanShutdowns := make(map[lnwire.ChannelID]*htlcswitch.ChanClose) // shutdownSigs is a map of signatures maintained by the responder in a // cooperative channel close. This map enables us to respond to @@ -788,26 +786,22 @@ out: peerLog.Infof("New channel active ChannelPoint(%v) "+ "with peerId(%v)", chanPoint, p.id) - // Now that the channel is open, notify the Htlc - // Switch of a new active link. - // TODO(roasbeef): register needs to account for - // in-flight htlc's on restart - chanSnapShot := newChanReq.channel.StateSnapshot() - downstreamLink := make(chan *htlcPacket, 10) - plexChan := p.server.htlcSwitch.RegisterLink(p, - chanSnapShot, downstreamLink) + decoder := htlcswitch.NewSphinxDecoder(p.server.sphinx) + link := htlcswitch.NewChannelLink( + &htlcswitch.ChannelLinkConfig{ + Peer: p, + DecodeOnion: decoder.Decode, + SettledContracts: p.server.breachArbiter.settledContracts, + DebugHTLC: cfg.DebugHTLC, + Registry: p.server.invoices, + Switch: p.server.htlcSwitch, + }, newChanReq.channel) - // With the channel registered to the HtlcSwitch spawn - // a goroutine to handle commitment updates for this - // new channel. - upstreamLink := make(chan lnwire.Message, 10) - p.htlcManMtx.Lock() - p.htlcManagers[chanID] = upstreamLink - p.htlcManMtx.Unlock() - - p.wg.Add(1) - go p.htlcManager(newChanReq.channel, plexChan, - downstreamLink, upstreamLink) + err := p.server.htlcSwitch.AddLink(link) + if err != nil { + peerLog.Errorf("can't register new channel "+ + "link(%v) with peerId(%v)", chanPoint, p.id) + } close(newChanReq.done) @@ -816,12 +810,12 @@ out: case req := <-p.localCloseChanReqs: // So we'll first transition the channel to a state of // pending shutdown. - chanID := lnwire.NewChanIDFromOutPoint(req.chanPoint) + chanID := lnwire.NewChanIDFromOutPoint(req.ChanPoint) // We'll only track this shutdown request if this is a // regular close request, and not in response to a // channel breach. - if req.CloseType == CloseRegular { + if req.CloseType == htlcswitch.CloseRegular { chanShutdowns[chanID] = req } @@ -899,8 +893,8 @@ out: // // TODO(roasbeef): if no more active channels with peer call Remove on connMgr // with peerID -func (p *peer) handleLocalClose(req *closeLinkReq) { - chanID := lnwire.NewChanIDFromOutPoint(req.chanPoint) +func (p *peer) handleLocalClose(req *htlcswitch.ChanClose) { + chanID := lnwire.NewChanIDFromOutPoint(req.ChanPoint) p.activeChanMtx.RLock() channel, ok := p.activeChannels[chanID] @@ -909,7 +903,7 @@ func (p *peer) handleLocalClose(req *closeLinkReq) { err := fmt.Errorf("unable to close channel, ChannelID(%v) is "+ "unknown", chanID) peerLog.Errorf(err.Error()) - req.err <- err + req.Err <- err return } @@ -917,22 +911,22 @@ func (p *peer) handleLocalClose(req *closeLinkReq) { // A type of CloseRegular indicates that the user has opted to close // out this channel on-chain, so we execute the cooperative channel // closure workflow. - case CloseRegular: + case htlcswitch.CloseRegular: err := p.sendShutdown(channel) if err != nil { - req.err <- err + req.Err <- err return } // A type of CloseBreach indicates that the counterparty has breached // the channel therefore we need to clean up our local state. - case CloseBreach: + case htlcswitch.CloseBreach: peerLog.Infof("ChannelPoint(%v) has been breached, wiping "+ - "channel", req.chanPoint) - if err := wipeChannel(p, channel); err != nil { + "channel", req.ChanPoint) + if err := p.WipeChannel(channel); err != nil { peerLog.Infof("Unable to wipe channel after detected "+ "breach: %v", err) - req.err <- err + req.Err <- err return } return @@ -1003,8 +997,8 @@ func (p *peer) handleShutdownResponse(msg *lnwire.Shutdown) []byte { // of an unresponsive remote party, the initiator can either choose to execute // a force closure, or backoff for a period of time, and retry the cooperative // closure. -func (p *peer) handleInitClosingSigned(req *closeLinkReq, msg *lnwire.ClosingSigned) { - chanID := lnwire.NewChanIDFromOutPoint(req.chanPoint) +func (p *peer) handleInitClosingSigned(req *htlcswitch.ChanClose, msg *lnwire.ClosingSigned) { + chanID := lnwire.NewChanIDFromOutPoint(req.ChanPoint) p.activeChanMtx.RLock() channel, ok := p.activeChannels[chanID] p.activeChanMtx.RUnlock() @@ -1012,7 +1006,7 @@ func (p *peer) handleInitClosingSigned(req *closeLinkReq, msg *lnwire.ClosingSig err := fmt.Errorf("unable to close channel, ChannelID(%v) is "+ "unknown", chanID) peerLog.Errorf(err.Error()) - req.err <- err + req.Err <- err return } @@ -1027,7 +1021,7 @@ func (p *peer) handleInitClosingSigned(req *closeLinkReq, msg *lnwire.ClosingSig // so generate our signature. initiatorSig, proposedFee, err := channel.CreateCloseProposal(feeRate) if err != nil { - req.err <- err + req.Err <- err return } initSig := append(initiatorSig, byte(txscript.SigHashAll)) @@ -1039,7 +1033,7 @@ func (p *peer) handleInitClosingSigned(req *closeLinkReq, msg *lnwire.ClosingSig closeTx, err := channel.CompleteCooperativeClose(initSig, respSig, feeRate) if err != nil { - req.err <- err + req.Err <- err // TODO(roasbeef): send ErrorGeneric to other side return } @@ -1048,7 +1042,7 @@ func (p *peer) handleInitClosingSigned(req *closeLinkReq, msg *lnwire.ClosingSig // create a mirrored close signed message with our completed signature. parsedSig, err := btcec.ParseSignature(initSig, btcec.S256()) if err != nil { - req.err <- err + req.Err <- err return } closingSigned := lnwire.NewClosingSigned(chanID, proposedFee, parsedSig) @@ -1062,7 +1056,7 @@ func (p *peer) handleInitClosingSigned(req *closeLinkReq, msg *lnwire.ClosingSig if err := p.server.lnwallet.PublishTransaction(closeTx); err != nil { peerLog.Errorf("channel close tx from "+ "ChannelPoint(%v) rejected: %v", - req.chanPoint, err) + req.ChanPoint, err) // TODO(roasbeef): send ErrorGeneric to other side return } @@ -1070,9 +1064,9 @@ func (p *peer) handleInitClosingSigned(req *closeLinkReq, msg *lnwire.ClosingSig // Once we've completed the cooperative channel closure, we'll wipe the // channel so we reject any incoming forward or payment requests via // this channel. - p.server.breachArbiter.settledContracts <- req.chanPoint - if err := wipeChannel(p, channel); err != nil { - req.err <- err + p.server.breachArbiter.settledContracts <- req.ChanPoint + if err := p.WipeChannel(channel); err != nil { + req.Err <- err return } @@ -1081,7 +1075,7 @@ func (p *peer) handleInitClosingSigned(req *closeLinkReq, msg *lnwire.ClosingSig closingTxid := closeTx.TxHash() chanInfo := channel.StateSnapshot() closeSummary := &channeldb.ChannelCloseSummary{ - ChanPoint: *req.chanPoint, + ChanPoint: *req.ChanPoint, ClosingTXID: closingTxid, RemotePub: &chanInfo.RemoteIdentity, Capacity: chanInfo.Capacity, @@ -1090,13 +1084,13 @@ func (p *peer) handleInitClosingSigned(req *closeLinkReq, msg *lnwire.ClosingSig IsPending: true, } if err := channel.DeleteState(closeSummary); err != nil { - req.err <- err + req.Err <- err return } // Update the caller with a new event detailing the current pending // state of this request. - req.updates <- &lnrpc.CloseStatusUpdate{ + req.Updates <- &lnrpc.CloseStatusUpdate{ Update: &lnrpc.CloseStatusUpdate_ClosePending{ ClosePending: &lnrpc.PendingUpdate{ Txid: closingTxid[:], @@ -1106,7 +1100,7 @@ func (p *peer) handleInitClosingSigned(req *closeLinkReq, msg *lnwire.ClosingSig _, bestHeight, err := p.server.bio.GetBestBlock() if err != nil { - req.err <- err + req.Err <- err return } @@ -1114,21 +1108,21 @@ func (p *peer) handleInitClosingSigned(req *closeLinkReq, msg *lnwire.ClosingSig // ChainNotifier once the closure transaction obtains a single // confirmation. notifier := p.server.chainNotifier - go waitForChanToClose(uint32(bestHeight), notifier, req.err, - req.chanPoint, &closingTxid, func() { + go waitForChanToClose(uint32(bestHeight), notifier, req.Err, + req.ChanPoint, &closingTxid, func() { // First, we'll mark the database as being fully closed // so we'll no longer watch for its ultimate closure // upon startup. - err := p.server.chanDB.MarkChanFullyClosed(req.chanPoint) + err := p.server.chanDB.MarkChanFullyClosed(req.ChanPoint) if err != nil { - req.err <- err + req.Err <- err return } // Respond to the local subsystem which requested the // channel closure. - req.updates <- &lnrpc.CloseStatusUpdate{ + req.Updates <- &lnrpc.CloseStatusUpdate{ Update: &lnrpc.CloseStatusUpdate_ChanClose{ ChanClose: &lnrpc.ChannelCloseUpdate{ ClosingTxid: closingTxid[:], @@ -1188,7 +1182,7 @@ func (p *peer) handleResponseClosingSigned(msg *lnwire.ClosingSigned, // we'll wipe the channel from all our local indexes and also signal to // the switch that this channel is now closed. peerLog.Infof("ChannelPoint(%v) is now closed", chanPoint) - if err := wipeChannel(p, channel); err != nil { + if err := p.WipeChannel(channel); err != nil { peerLog.Errorf("unable to wipe channel: %v", err) } @@ -1288,14 +1282,14 @@ func (p *peer) sendShutdown(channel *lnwallet.LightningChannel) error { // Finally, we'll unregister the link from the switch in order to // Prevent the HTLC switch from receiving additional HTLCs for this // channel. - p.server.htlcSwitch.UnregisterLink(p.addr.IdentityKey, &chanID) + p.server.htlcSwitch.RemoveLink(chanID) return nil } -// wipeChannel removes the passed channel from all indexes associated with the +// WipeChannel removes the passed channel from all indexes associated with the // peer, and deletes the channel from the database. -func wipeChannel(p *peer, channel *lnwallet.LightningChannel) error { +func (p *peer) WipeChannel(channel *lnwallet.LightningChannel) error { channel.Stop() chanID := lnwire.NewChanIDFromOutPoint(channel.ChannelPoint()) @@ -1306,247 +1300,19 @@ func wipeChannel(p *peer, channel *lnwallet.LightningChannel) error { // Instruct the Htlc Switch to close this link as the channel is no // longer active. - p.server.htlcSwitch.UnregisterLink(p.addr.IdentityKey, &chanID) - // Additionally, close up "down stream" link for the htlcManager which - // has been assigned to this channel. This servers the link between the - // htlcManager and the switch, signalling that the channel is no longer - // active. - p.htlcManMtx.RLock() - - // If the channel can't be found in the map, then this channel has - // already been wiped. - htlcWireLink, ok := p.htlcManagers[chanID] - if !ok { - p.htlcManMtx.RUnlock() - return nil + if err := p.server.htlcSwitch.RemoveLink(chanID); err != nil { + if err == htlcswitch.ErrChannelLinkNotFound { + peerLog.Warnf("unable remove channel link with "+ + "ChannelPoint(%v): %v", chanID, err) + return nil + } + return err } - close(htlcWireLink) - - p.htlcManMtx.RUnlock() - - // Next, we remove the htlcManager from our internal map as the - // goroutine should have exited gracefully due to the channel closure - // above. - p.htlcManMtx.RLock() - delete(p.htlcManagers, chanID) - p.htlcManMtx.RUnlock() - return nil } -// pendingPayment represents a pending HTLC which has yet to be settled by the -// upstream peer. A pending payment encapsulates the initial HTLC add request -// additionally coupling the index of the HTLC within the log, and an error -// channel to signal the payment requester once the payment has been fully -// fufilled. -type pendingPayment struct { - htlc *lnwire.UpdateAddHTLC - index uint64 - - preImage chan [32]byte - err chan error - done chan struct{} -} - -// commitmentState is the volatile+persistent state of an active channel's -// commitment update state-machine. This struct is used by htlcManager's to -// save meta-state required for proper functioning. -type commitmentState struct { - // htlcsToSettle is a list of preimages which allow us to settle one or - // many of the pending HTLCs we've received from the upstream peer. - htlcsToSettle map[uint64]*channeldb.Invoice - - // htlcsToCancel is a set of HTLCs identified by their log index which - // are to be cancelled upon the next state transition. - htlcsToCancel map[uint64]lnwire.FailCode - - // cancelReasons stores the reason why a particular HTLC was cancelled. - // The index of the HTLC within the log is mapped to the cancellation - // reason. This value is used to thread the proper error through to the - // htlcSwitch, or subsystem that initiated the HTLC. - cancelReasons map[uint64]lnwire.FailCode - - // pendingBatch is slice of payments which have been added to the - // channel update log, but not yet committed to latest commitment. - pendingBatch []*pendingPayment - - // pendingSettle is counter which tracks the current number of settles - // that have been sent, but not yet committed to the commitment. - pendingSettle uint32 - - // clearedHTCLs is a map of outgoing HTLCs we've committed to in our - // chain which have not yet been settled by the upstream peer. - clearedHTCLs map[uint64]*pendingPayment - - // switchChan is a channel used to send packets to the htlc switch for - // forwarding. - switchChan chan<- *htlcPacket - - // sphinx is an instance of the Sphinx onion Router for this node. The - // router will be used to process all incoming Sphinx packets embedded - // within HTLC add messages. - sphinx *sphinx.Router - - // pendingCircuits tracks the remote log index of the incoming HTLCs, - // mapped to the processed Sphinx packet contained within the HTLC. - // This map is used as a staging area between when an HTLC is added to - // the log, and when it's locked into the commitment state of both - // chains. Once locked in, the processed packet is sent to the switch - // along with the HTLC to forward the packet to the next hop. - pendingCircuits map[uint64]*sphinx.ProcessedPacket - - // logCommitTimer is a timer which is sent upon if we go an interval - // without receiving/sending a commitment update. It's role is to - // ensure both chains converge to identical state in a timely manner. - // TODO(roasbeef): timer should be >> then RTT - logCommitTimer *time.Timer - logCommitTick <-chan time.Time - - channel *lnwallet.LightningChannel - chanPoint *wire.OutPoint - chanID lnwire.ChannelID - - sync.RWMutex -} - -// htlcManager is the primary goroutine which drives a channel's commitment -// update state-machine in response to messages received via several channels. -// The htlcManager reads messages from the upstream (remote) peer, and also -// from several possible downstream channels managed by the htlcSwitch. In the -// event that an htlc needs to be forwarded, then send-only htlcPlex chan is -// used which sends htlc packets to the switch for forwarding. Additionally, -// the htlcManager handles acting upon all timeouts for any active HTLCs, -// manages the channel's revocation window, and also the htlc trickle -// queue+timer for this active channels. -func (p *peer) htlcManager(channel *lnwallet.LightningChannel, - htlcPlex chan<- *htlcPacket, downstreamLink <-chan *htlcPacket, - upstreamLink <-chan lnwire.Message) { - - chanStats := channel.StateSnapshot() - peerLog.Infof("HTLC manager for ChannelPoint(%v) started, "+ - "our_balance=%v, their_balance=%v, chain_height=%v", - channel.ChannelPoint(), chanStats.LocalBalance, - chanStats.RemoteBalance, chanStats.NumUpdates) - - // A new session for this active channel has just started, therefore we - // need to send our initial revocation window to the remote peer. - for i := 0; i < lnwallet.InitialRevocationWindow; i++ { - rev, err := channel.ExtendRevocationWindow() - if err != nil { - peerLog.Errorf("unable to expand revocation window: %v", err) - continue - } - p.queueMsg(rev, nil) - } - - chanPoint := channel.ChannelPoint() - state := &commitmentState{ - channel: channel, - chanPoint: chanPoint, - chanID: lnwire.NewChanIDFromOutPoint(chanPoint), - clearedHTCLs: make(map[uint64]*pendingPayment), - htlcsToSettle: make(map[uint64]*channeldb.Invoice), - htlcsToCancel: make(map[uint64]lnwire.FailCode), - cancelReasons: make(map[uint64]lnwire.FailCode), - pendingCircuits: make(map[uint64]*sphinx.ProcessedPacket), - sphinx: p.server.sphinx, - logCommitTimer: time.NewTimer(300 * time.Millisecond), - switchChan: htlcPlex, - } - - // TODO(roasbeef): check to see if able to settle any currently pending - // HTLCs - // * also need signals when new invoices are added by the - // invoiceRegistry - - batchTimer := time.NewTicker(50 * time.Millisecond) - defer batchTimer.Stop() - -out: - for { - select { - case <-channel.UnilateralCloseSignal: - // TODO(roasbeef): need to send HTLC outputs to nursery - peerLog.Warnf("Remote peer has closed ChannelPoint(%v) on-chain", - state.chanPoint) - if err := wipeChannel(p, channel); err != nil { - peerLog.Errorf("unable to wipe channel %v", err) - } - - p.server.breachArbiter.settledContracts <- state.chanPoint - - break out - - case <-channel.ForceCloseSignal: - // TODO(roasbeef): path never taken now that server - // force closes's directly? - peerLog.Warnf("ChannelPoint(%v) has been force "+ - "closed, disconnecting from peerID(%x)", - state.chanPoint, p.id) - break out - - case <-state.logCommitTick: - // If we haven't sent or received a new commitment - // update in some time, check to see if we have any - // pending updates we need to commit due to our - // commitment chains being desynchronized. - if state.channel.FullySynced() && - len(state.htlcsToSettle) == 0 { - continue - } - - if err := p.updateCommitTx(state); err != nil { - peerLog.Errorf("unable to update commitment: %v", - err) - p.Disconnect() - break out - } - - case <-batchTimer.C: - // If the either batch is empty, then we have no work - // here. - // - // TODO(roasbeef): should be combined, will be fixed by - // andrew's PR - if len(state.pendingBatch) == 0 && state.pendingSettle == 0 { - continue - } - - // Otherwise, attempt to extend the remote commitment - // chain including all the currently pending entries. - // If the send was unsuccessful, then abandon the - // update, waiting for the revocation window to open - // up. - if err := p.updateCommitTx(state); err != nil { - peerLog.Errorf("unable to update "+ - "commitment: %v", err) - p.Disconnect() - break out - } - - case pkt := <-downstreamLink: - p.handleDownStreamPkt(state, pkt) - - case msg, ok := <-upstreamLink: - // If the upstream message link is closed, this signals - // that the channel itself is being closed, therefore - // we exit. - if !ok { - break out - } - - p.handleUpstreamMsg(state, msg) - case <-p.quit: - break out - } - } - - p.wg.Done() - peerLog.Tracef("htlcManager for peer %v done", p) -} - // handleInitMsg handles the incoming init message which contains global and // local features vectors. If feature vectors are incompatible then disconnect. func (p *peer) handleInitMsg(msg *lnwire.Init) error { @@ -1582,557 +1348,21 @@ func (p *peer) sendInitMsg() error { return p.writeMessage(msg) } -// handleDownStreamPkt processes an HTLC packet sent from the downstream HTLC -// Switch. Possible messages sent by the switch include requests to forward new -// HTLCs, timeout previously cleared HTLCs, and finally to settle currently -// cleared HTLCs with the upstream peer. -func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) { - var isSettle bool - switch htlc := pkt.msg.(type) { - case *lnwire.UpdateAddHTLC: - // A new payment has been initiated via the - // downstream channel, so we add the new HTLC - // to our local log, then update the commitment - // chains. - htlc.ChanID = state.chanID - index, err := state.channel.AddHTLC(htlc) - if err != nil { - // TODO: possibly perform fallback/retry logic - // depending on type of error - peerLog.Errorf("Adding HTLC rejected: %v", err) - pkt.err <- err - close(pkt.done) - - // The HTLC was unable to be added to the state - // machine, as a result, we'll signal the switch to - // cancel the pending payment. - // TODO(roasbeef): need to update link as well if local - // HTLC? - state.switchChan <- &htlcPacket{ - amt: htlc.Amount, - msg: &lnwire.UpdateFailHTLC{ - Reason: []byte{byte(0)}, - }, - srcLink: state.chanID, - } - return - } - - p.queueMsg(htlc, nil) - - state.pendingBatch = append(state.pendingBatch, &pendingPayment{ - htlc: htlc, - index: index, - preImage: pkt.preImage, - err: pkt.err, - done: pkt.done, - }) - - case *lnwire.UpdateFufillHTLC: - // An HTLC we forward to the switch has just settled somewhere - // upstream. Therefore we settle the HTLC within the our local - // state machine. - pre := htlc.PaymentPreimage - logIndex, err := state.channel.SettleHTLC(pre) - if err != nil { - // TODO(roasbeef): broadcast on-chain - peerLog.Errorf("settle for incoming HTLC rejected: %v", err) - p.Disconnect() - return - } - - // With the HTLC settled, we'll need to populate the wire - // message to target the specific channel and HTLC to be - // cancelled. - htlc.ChanID = state.chanID - htlc.ID = logIndex - - // Then we send the HTLC settle message to the connected peer - // so we can continue the propagation of the settle message. - p.queueMsg(htlc, nil) - isSettle = true - - state.pendingSettle++ - - case *lnwire.UpdateFailHTLC: - // An HTLC cancellation has been triggered somewhere upstream, - // we'll remove then HTLC from our local state machine. - logIndex, err := state.channel.FailHTLC(pkt.payHash) - if err != nil { - peerLog.Errorf("unable to cancel HTLC: %v", err) - return - } - - // With the HTLC removed, we'll need to populate the wire - // message to target the specific channel and HTLC to be - // cancelled. The "Reason" field will have already been set - // within the switch. - htlc.ChanID = state.chanID - htlc.ID = logIndex - - // Finally, we send the HTLC message to the peer which - // initially created the HTLC. - p.queueMsg(htlc, nil) - isSettle = true - - state.pendingSettle++ - } - - // If this newly added update exceeds the min batch size for adds, or - // this is a settle request, then initiate an update. - // TODO(roasbeef): enforce max HTLCs in flight limit - if len(state.pendingBatch) >= 10 || isSettle { - if err := p.updateCommitTx(state); err != nil { - peerLog.Errorf("unable to update "+ - "commitment: %v", err) - p.Disconnect() - return - } - } -} - -// handleUpstreamMsg processes wire messages related to commitment state -// updates from the upstream peer. The upstream peer is the peer whom we have a -// direct channel with, updating our respective commitment chains. -func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { - switch htlcPkt := msg.(type) { - // TODO(roasbeef): timeouts - // * fail if can't parse sphinx mix-header - case *lnwire.UpdateAddHTLC: - // Before adding the new HTLC to the state machine, parse the - // onion object in order to obtain the routing information. - blobReader := bytes.NewReader(htlcPkt.OnionBlob[:]) - onionPkt := &sphinx.OnionPacket{} - if err := onionPkt.Decode(blobReader); err != nil { - peerLog.Errorf("unable to decode onion pkt: %v", err) - p.Disconnect() - return - } - - // We just received an add request from an upstream peer, so we - // add it to our state machine, then add the HTLC to our - // "settle" list in the event that we know the preimage - index, err := state.channel.ReceiveHTLC(htlcPkt) - if err != nil { - peerLog.Errorf("Receiving HTLC rejected: %v", err) - p.Disconnect() - return - } - - // TODO(roasbeef): perform sanity checks on per-hop payload - // * time-lock is sane, fee, chain, etc - - // Attempt to process the Sphinx packet. We include the payment - // hash of the HTLC as it's authenticated within the Sphinx - // packet itself as associated data in order to thwart attempts - // a replay attacks. In the case of a replay, an attacker is - // *forced* to use the same payment hash twice, thereby losing - // their money entirely. - rHash := htlcPkt.PaymentHash[:] - sphinxPacket, err := state.sphinx.ProcessOnionPacket(onionPkt, rHash) - if err != nil { - // If we're unable to parse the Sphinx packet, then - // we'll cancel the HTLC after the current commitment - // transition. - peerLog.Errorf("unable to process onion pkt: %v", err) - state.htlcsToCancel[index] = lnwire.SphinxParseError - return - } - - switch sphinxPacket.Action { - // We're the designated payment destination. Therefore we - // attempt to see if we have an invoice locally which'll allow - // us to settle this HTLC. - case sphinx.ExitNode: - rHash := htlcPkt.PaymentHash - invoice, err := p.server.invoices.LookupInvoice(rHash) - if err != nil { - // If we're the exit node, but don't recognize - // the payment hash, then we'll fail the HTLC - // on the next state transition. - peerLog.Errorf("unable to settle HTLC, "+ - "payment hash (%x) unrecognized", rHash[:]) - state.htlcsToCancel[index] = lnwire.UnknownPaymentHash - return - } - - // If we're not currently in debug mode, and the - // extended HTLC doesn't meet the value requested, then - // we'll fail the HTLC. - if !cfg.DebugHTLC && htlcPkt.Amount < invoice.Terms.Value { - peerLog.Errorf("rejecting HTLC due to incorrect "+ - "amount: expected %v, received %v", - invoice.Terms.Value, htlcPkt.Amount) - state.htlcsToCancel[index] = lnwire.IncorrectValue - } else { - // Otherwise, everything is in order and we'll - // settle the HTLC after the current state - // transition. - state.htlcsToSettle[index] = invoice - } - - // There are additional hops left within this route, so we - // track the next hop according to the index of this HTLC - // within their log. When forwarding locked-in HLTC's to the - // switch, we'll attach the routing information so the switch - // can finalize the circuit. - case sphinx.MoreHops: - state.Lock() - state.pendingCircuits[index] = sphinxPacket - state.Unlock() - default: - peerLog.Errorf("mal formed onion packet") - state.htlcsToCancel[index] = lnwire.SphinxParseError - } - - case *lnwire.UpdateFufillHTLC: - pre := htlcPkt.PaymentPreimage - idx := htlcPkt.ID - if err := state.channel.ReceiveHTLCSettle(pre, idx); err != nil { - // TODO(roasbeef): broadcast on-chain - peerLog.Errorf("settle for outgoing HTLC rejected: %v", err) - p.Disconnect() - return - } - - // TODO(roasbeef): add preimage to DB in order to swipe - // repeated r-values - case *lnwire.UpdateFailHTLC: - idx := htlcPkt.ID - if err := state.channel.ReceiveFailHTLC(idx); err != nil { - peerLog.Errorf("unable to recv HTLC cancel: %v", err) - p.Disconnect() - return - } - - state.Lock() - state.cancelReasons[idx] = lnwire.FailCode(htlcPkt.Reason[0]) - state.Unlock() - - case *lnwire.CommitSig: - // We just received a new update to our local commitment chain, - // validate this new commitment, closing the link if invalid. - // TODO(roasbeef): redundant re-serialization - sig := htlcPkt.CommitSig.Serialize() - if err := state.channel.ReceiveNewCommitment(sig); err != nil { - peerLog.Errorf("unable to accept new commitment: %v", err) - p.Disconnect() - return - } - - // As we've just just accepted a new state, we'll now - // immediately send the remote peer a revocation for our prior - // state. - nextRevocation, err := state.channel.RevokeCurrentCommitment() - if err != nil { - peerLog.Errorf("unable to revoke commitment: %v", err) - return - } - p.queueMsg(nextRevocation, nil) - - if !state.logCommitTimer.Stop() { - select { - case <-state.logCommitTimer.C: - default: - } - } - - state.logCommitTimer.Reset(300 * time.Millisecond) - state.logCommitTick = state.logCommitTimer.C - - // If both commitment chains are fully synced from our PoV, - // then we don't need to reply with a signature as both sides - // already have a commitment with the latest accepted state. - if state.channel.FullySynced() { - return - } - - // Otherwise, the remote party initiated the state transition, - // so we'll reply with a signature to provide them with their - // version of the latest commitment state. - if err := p.updateCommitTx(state); err != nil { - peerLog.Errorf("unable to update commitment: %v", err) - p.Disconnect() - return - } - - case *lnwire.RevokeAndAck: - // We've received a revocation from the remote chain, if valid, - // this moves the remote chain forward, and expands our - // revocation window. - htlcsToForward, err := state.channel.ReceiveRevocation(htlcPkt) - if err != nil { - peerLog.Errorf("unable to accept revocation: %v", err) - p.Disconnect() - return - } - - // If any of the HTLCs eligible for forwarding are pending - // settling or timing out previous outgoing payments, then we - // can them from the pending set, and signal the requester (if - // existing) that the payment has been fully fulfilled. - var bandwidthUpdate btcutil.Amount - settledPayments := make(map[lnwallet.PaymentHash]struct{}) - cancelledHtlcs := make(map[uint64]struct{}) - for _, htlc := range htlcsToForward { - parentIndex := htlc.ParentIndex - if p, ok := state.clearedHTCLs[parentIndex]; ok { - switch htlc.EntryType { - // If the HTLC was settled successfully, then - // we return a nil error as well as the payment - // preimage back to the possible caller. - case lnwallet.Settle: - p.preImage <- htlc.RPreimage - p.err <- nil - - // Otherwise, the HTLC failed, so we propagate - // the error back to the potential caller. - case lnwallet.Fail: - state.Lock() - errMsg := state.cancelReasons[parentIndex] - state.Unlock() - - p.preImage <- [32]byte{} - p.err <- errors.New(errMsg.String()) - } - - close(p.done) - - delete(state.clearedHTCLs, htlc.ParentIndex) - } - - // TODO(roasbeef): rework log entries to a shared - // interface. - if htlc.EntryType != lnwallet.Add { - continue - } - - // If we can settle this HTLC within our local state - // update log, then send the update entry to the remote - // party. - invoice, ok := state.htlcsToSettle[htlc.Index] - if ok { - preimage := invoice.Terms.PaymentPreimage - logIndex, err := state.channel.SettleHTLC(preimage) - if err != nil { - peerLog.Errorf("unable to settle htlc: %v", err) - p.Disconnect() - continue - } - - settleMsg := &lnwire.UpdateFufillHTLC{ - ChanID: state.chanID, - ID: logIndex, - PaymentPreimage: preimage, - } - p.queueMsg(settleMsg, nil) - - delete(state.htlcsToSettle, htlc.Index) - settledPayments[htlc.RHash] = struct{}{} - - bandwidthUpdate += htlc.Amount - continue - } - - // Alternatively, if we marked this HTLC for - // cancellation, then immediately cancel the HTLC as - // it's now locked in within both commitment - // transactions. - reason, ok := state.htlcsToCancel[htlc.Index] - if !ok { - continue - } - - logIndex, err := state.channel.FailHTLC(htlc.RHash) - if err != nil { - peerLog.Errorf("unable to cancel htlc: %v", err) - p.Disconnect() - continue - } - - cancelMsg := &lnwire.UpdateFailHTLC{ - ChanID: state.chanID, - ID: logIndex, - Reason: []byte{byte(reason)}, - } - p.queueMsg(cancelMsg, nil) - delete(state.htlcsToCancel, htlc.Index) - - cancelledHtlcs[htlc.Index] = struct{}{} - } - - go func() { - for _, htlc := range htlcsToForward { - // We don't need to forward any HTLCs that we - // just settled or cancelled above. - // TODO(roasbeef): key by index instead? - state.RLock() - if _, ok := settledPayments[htlc.RHash]; ok { - state.RUnlock() - continue - } - if _, ok := cancelledHtlcs[htlc.Index]; ok { - state.RUnlock() - continue - } - state.RUnlock() - - state.Lock() - onionPkt := state.pendingCircuits[htlc.Index] - delete(state.pendingCircuits, htlc.Index) - - reason := state.cancelReasons[htlc.ParentIndex] - delete(state.cancelReasons, htlc.ParentIndex) - state.Unlock() - - // Send this fully activated HTLC to the htlc - // switch to continue the chained clear/settle. - pkt, err := logEntryToHtlcPkt(state.chanID, - htlc, onionPkt, reason) - if err != nil { - peerLog.Errorf("unable to make htlc pkt: %v", - err) - continue - } - - state.switchChan <- pkt - } - - }() - - if len(settledPayments) == 0 && len(cancelledHtlcs) == 0 { - return - } - - // Send an update to the htlc switch of our newly available - // payment bandwidth. - // TODO(roasbeef): ideally should wait for next state update. - if bandwidthUpdate != 0 { - p.server.htlcSwitch.UpdateLink(state.chanID, - bandwidthUpdate) - } - - // With all the settle updates added to the local and remote - // HTLC logs, initiate a state transition by updating the - // remote commitment chain. - if err := p.updateCommitTx(state); err != nil { - peerLog.Errorf("unable to update commitment: %v", err) - p.Disconnect() - return - } - - // Notify the invoiceRegistry of the invoices we just settled - // with this latest commitment update. - // TODO(roasbeef): wait until next transition? - for invoice := range settledPayments { - err := p.server.invoices.SettleInvoice(chainhash.Hash(invoice)) - if err != nil { - peerLog.Errorf("unable to settle invoice: %v", err) - } - } - } -} - -// updateCommitTx signs, then sends an update to the remote peer adding a new -// commitment to their commitment chain which includes all the latest updates -// we've received+processed up to this point. -func (p *peer) updateCommitTx(state *commitmentState) error { - sigTheirs, err := state.channel.SignNextCommitment() - if err == lnwallet.ErrNoWindow { - peerLog.Tracef("ChannelPoint(%v): revocation window exhausted, unable to send %v", - state.chanPoint, len(state.pendingBatch)) - return nil - } else if err != nil { - return err - } - - parsedSig, err := btcec.ParseSignature(sigTheirs, btcec.S256()) - if err != nil { - return fmt.Errorf("unable to parse sig: %v", err) - } - - commitSig := &lnwire.CommitSig{ - ChanID: state.chanID, - CommitSig: parsedSig, - } - p.queueMsg(commitSig, nil) - - // As we've just cleared out a batch, move all pending updates to the - // map of cleared HTLCs, clearing out the set of pending updates. - for _, update := range state.pendingBatch { - state.clearedHTCLs[update.index] = update - } - - // We've just initiated a state transition, attempt to stop the - // logCommitTimer. If the timer already ticked, then we'll consume the - // value, dropping - if state.logCommitTimer != nil && !state.logCommitTimer.Stop() { - select { - case <-state.logCommitTimer.C: - default: - } - } - state.logCommitTick = nil - - // Finally, clear our the current batch, and flip the pendingUpdate - // bool to indicate were waiting for a commitment signature. - // TODO(roasbeef): re-slice instead to avoid GC? - state.pendingBatch = nil - state.pendingSettle = 0 - +// SendMessage sends message to the remote peer which represented by +// this peer. +func (p *peer) SendMessage(msg lnwire.Message) error { + p.queueMsg(msg, nil) return nil } -// logEntryToHtlcPkt converts a particular Lightning Commitment Protocol (LCP) -// log entry the corresponding htlcPacket with src/dest set along with the -// proper wire message. This helper method is provided in order to aid an -// htlcManager in forwarding packets to the htlcSwitch. -func logEntryToHtlcPkt(chanID lnwire.ChannelID, pd *lnwallet.PaymentDescriptor, - onionPkt *sphinx.ProcessedPacket, - reason lnwire.FailCode) (*htlcPacket, error) { +// ID returns the lightning network peer id. +func (p *peer) ID() [sha256.Size]byte { + return fastsha256.Sum256(p.PubKey()) +} - pkt := &htlcPacket{} - - // TODO(roasbeef): alter after switch to log entry interface - var msg lnwire.Message - switch pd.EntryType { - - case lnwallet.Add: - // TODO(roasbeef): timeout, onion blob, etc - var b bytes.Buffer - if err := onionPkt.Packet.Encode(&b); err != nil { - return nil, err - } - - htlc := &lnwire.UpdateAddHTLC{ - Amount: pd.Amount, - PaymentHash: pd.RHash, - } - copy(htlc.OnionBlob[:], b.Bytes()) - msg = htlc - - case lnwallet.Settle: - msg = &lnwire.UpdateFufillHTLC{ - PaymentPreimage: pd.RPreimage, - } - - case lnwallet.Fail: - // For cancellation messages, we'll also need to set the rHash - // within the htlcPacket so the switch knows on which outbound - // link to forward the cancellation message - msg = &lnwire.UpdateFailHTLC{ - Reason: []byte{byte(reason)}, - } - pkt.payHash = pd.RHash - } - - pkt.amt = pd.Amount - pkt.msg = msg - - pkt.srcLink = chanID - pkt.onion = onionPkt - - return pkt, nil +// PubKey returns the peer public key. +func (p *peer) PubKey() []byte { + return p.addr.IdentityKey.SerializeCompressed() } // TODO(roasbeef): make all start/stop mutexes a CAS diff --git a/rpcserver.go b/rpcserver.go index b52c69d81..8caad5d6b 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -19,6 +19,7 @@ import ( "github.com/boltdb/bolt" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" @@ -527,7 +528,7 @@ func (r *rpcServer) OpenChannelSync(ctx context.Context, } } -// CloseChannel attempts to close an active channel identified by its channel +// CloseLink attempts to close an active channel identified by its channel // point. The actions of this method can additionally be augmented to attempt // a force close after a timeout period in the case of an inactive peer. func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest, @@ -577,10 +578,10 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest, // TODO(roasbeef): actually get the active channel // instead too? // * so only need to grab from database - wipeChannel(peer, channel) + peer.WipeChannel(channel) } else { chanID := lnwire.NewChanIDFromOutPoint(channel.ChannelPoint()) - r.server.htlcSwitch.UnregisterLink(remotePub, &chanID) + r.server.htlcSwitch.RemoveLink(chanID) } r.server.breachArbiter.settledContracts <- chanPoint @@ -628,7 +629,7 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest, // the htlc switch which will handle the negotiation and // broadcast details. updateChan, errChan = r.server.htlcSwitch.CloseLink(chanPoint, - CloseRegular) + htlcswitch.CloseRegular) } out: for { diff --git a/server.go b/server.go index 68d3cff88..da5565190 100644 --- a/server.go +++ b/server.go @@ -5,7 +5,6 @@ import ( "crypto/rand" "crypto/sha256" "encoding/hex" - "errors" "fmt" "net" "strconv" @@ -25,9 +24,11 @@ import ( "github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/routing/chainview" "github.com/roasbeef/btcd/btcec" - "github.com/roasbeef/btcd/chaincfg/chainhash" "github.com/roasbeef/btcd/connmgr" "github.com/roasbeef/btcutil" + + "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/htlcswitch" ) // server is the main server of the Lightning Network Daemon. The server houses @@ -69,7 +70,7 @@ type server struct { fundingMgr *fundingManager chanDB *channeldb.DB - htlcSwitch *htlcSwitch + htlcSwitch *htlcswitch.Switch invoices *invoiceRegistry breachArbiter *breachArbiter @@ -136,7 +137,6 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, invoices: newInvoiceRegistry(chanDB), utxoNursery: newUtxoNursery(chanDB, notifier, wallet), - htlcSwitch: newHtlcSwitch(), identityPriv: privKey, nodeSigner: newNodeSigner(privKey), @@ -177,6 +177,23 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, debugPre[:], debugHash[:]) } + s.htlcSwitch = htlcswitch.New(htlcswitch.Config{ + LocalChannelClose: func(pubKey []byte, + request *htlcswitch.ChanClose) { + s.peersMtx.RLock() + peer, ok := s.peersByPub[string(pubKey)] + s.peersMtx.RUnlock() + + if !ok { + srvrLog.Error("unable to close channel, peer"+ + " with %v id can't be found", pubKey) + return + } + + peer.localCloseChanReqs <- request + }, + }) + // If external IP addresses have been specified, add those to the list // of this server's addresses. selfAddrs := make([]net.Addr, 0, len(cfg.ExternalIPs)) @@ -237,14 +254,8 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, ChainView: chainView, SendToSwitch: func(firstHop *btcec.PublicKey, htlcAdd *lnwire.UpdateAddHTLC) ([32]byte, error) { - firstHopPub := firstHop.SerializeCompressed() - destInterface := chainhash.Hash(sha256.Sum256(firstHopPub)) - - return s.htlcSwitch.SendHTLC(&htlcPacket{ - dest: destInterface, - msg: htlcAdd, - }) + return s.htlcSwitch.SendHTLC(firstHopPub, htlcAdd) }, }) if err != nil { @@ -657,10 +668,22 @@ func (s *server) peerTerminationWatcher(p *peer) { srvrLog.Debugf("Peer %v has been disconnected", p) - // Tell the switch to unregister all links associated with this peer. + // Tell the switch to remove all links associated with this peer. // Passing nil as the target link indicates that all links associated // with this interface should be closed. - p.server.htlcSwitch.UnregisterLink(p.addr.IdentityKey, nil) + hop := htlcswitch.NewHopID(p.addr.IdentityKey.SerializeCompressed()) + links, err := p.server.htlcSwitch.GetLinks(hop) + if err != nil { + srvrLog.Errorf("unable to get channel links: %v", err) + } + + for _, link := range links { + err := p.server.htlcSwitch.RemoveLink(link.ChanID()) + if err != nil { + srvrLog.Errorf("unable to remove channel link: %v", + err) + } + } // Send the peer to be garbage collected by the server. p.server.donePeers <- p