lnd: integrate Sphinx onion routing when sending/receiving HTLC's

This commit alters the send/receive HTLC pipe line a bit in order to
fully integrate onion routing into the daemon.

The server now stores the global Sphinx router which all active
htlcManagers will used when processing upstream HTLC add messages.
Currently the onion routing private key is static, and identical to the
node’s current identity public key. In the future this key will be
rotated daily the node based on the current block hash.

When sending a payment via the SendPayment RPC, the routing manager is
now queried for the existence of a route before the payment request is
sent to the HTLC switch. If a path is found, then a Sphinx onion packet
encoding the route is created, then populated within the HTLC add
message.

Finally, when processing an upstream HTLC add request, the sphinx
packet is decoded, then processed by the target peer. If the peer is
indicated as the exit node, then the HTLC is queue’d to be settled
within the next state update.
This commit is contained in:
Olaoluwa Osuntokun
2016-09-20 17:15:26 -07:00
parent 62271768b0
commit e1b82566bd
3 changed files with 151 additions and 22 deletions

60
peer.go
View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"bytes"
"container/list" "container/list"
"fmt" "fmt"
"net" "net"
@ -11,6 +12,7 @@ import (
"github.com/BitfuryLightning/tools/rt/graph" "github.com/BitfuryLightning/tools/rt/graph"
"github.com/btcsuite/fastsha256" "github.com/btcsuite/fastsha256"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lightning-onion"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lndc" "github.com/lightningnetwork/lnd/lndc"
"github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc"
@ -62,6 +64,7 @@ type peer struct {
conn net.Conn conn net.Conn
identityPub *btcec.PublicKey
lightningAddr *lndc.LNAdr lightningAddr *lndc.LNAdr
lightningID wire.ShaHash lightningID wire.ShaHash
@ -151,6 +154,7 @@ func newPeer(conn net.Conn, server *server, btcNet wire.BitcoinNet, inbound bool
p := &peer{ p := &peer{
conn: conn, conn: conn,
identityPub: nodePub,
lightningID: wire.ShaHash(fastsha256.Sum256(nodePub.SerializeCompressed())), lightningID: wire.ShaHash(fastsha256.Sum256(nodePub.SerializeCompressed())),
id: atomic.AddInt32(&numNodes, 1), id: atomic.AddInt32(&numNodes, 1),
chainNet: btcNet, chainNet: btcNet,
@ -882,6 +886,11 @@ type commitmentState struct {
// fowarding. // fowarding.
switchChan chan<- *htlcPacket switchChan chan<- *htlcPacket
// sphinx is an instance of the Sphinx onion Router for this node. The
// router will be used to process all incmoing Sphinx packets embedded
// within HTLC add messages.
sphinx *sphinx.Router
channel *lnwallet.LightningChannel channel *lnwallet.LightningChannel
chanPoint *wire.OutPoint chanPoint *wire.OutPoint
} }
@ -921,9 +930,14 @@ func (p *peer) htlcManager(channel *lnwallet.LightningChannel,
chanPoint: channel.ChannelPoint(), chanPoint: channel.ChannelPoint(),
clearedHTCLs: make(map[uint32]*pendingPayment), clearedHTCLs: make(map[uint32]*pendingPayment),
htlcsToSettle: make(map[uint32]*channeldb.Invoice), htlcsToSettle: make(map[uint32]*channeldb.Invoice),
sphinx: p.server.sphinx,
switchChan: htlcPlex, switchChan: htlcPlex,
} }
// TODO(roasbeef): check to see if able to settle any currently pending
// HTLC's
// * also need signals when new invoices are added by the invoiceRegistry
batchTimer := time.Tick(10 * time.Millisecond) batchTimer := time.Tick(10 * time.Millisecond)
out: out:
for { for {
@ -1049,19 +1063,51 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
// TODO(roasbeef): timeouts // TODO(roasbeef): timeouts
// * fail if can't parse sphinx mix-header // * fail if can't parse sphinx mix-header
case *lnwire.HTLCAddRequest: case *lnwire.HTLCAddRequest:
// Before adding the new HTLC to the state machine, parse the
// onion object in order to obtain the routing information.
blobReader := bytes.NewReader(htlcPkt.OnionBlob)
onionPkt := &sphinx.OnionPacket{}
if err := onionPkt.Decode(blobReader); err != nil {
peerLog.Errorf("unable to decode onion pkt: %v", err)
p.Disconnect()
return
}
mixHeader, err := state.sphinx.ProcessOnionPacket(onionPkt)
if err != nil {
peerLog.Errorf("unable to process onion pkt: %v", err)
p.Disconnect()
return
}
// We just received an add request from an upstream peer, so we // We just received an add request from an upstream peer, so we
// add it to our state machine, then add the HTLC to our // add it to our state machine, then add the HTLC to our
// "settle" list in the event that we know the pre-image // "settle" list in the event that we know the pre-image
index := state.channel.ReceiveHTLC(htlcPkt) index := state.channel.ReceiveHTLC(htlcPkt)
rHash := htlcPkt.RedemptionHashes[0] switch mixHeader.Action {
invoice, err := p.server.invoices.LookupInvoice(rHash) // We're the designated payment destination. Therefore we
if err == nil { // attempt to see if we have an invoice locally which'll
// TODO(roasbeef): check value // allow us to settle this HTLC.
// * onion layer strip should also be before invoice lookup case sphinx.ExitNode:
rHash := htlcPkt.RedemptionHashes[0]
invoice, err := p.server.invoices.LookupInvoice(rHash)
if err != nil {
// TODO(roasbeef): send a canceHTLC message if we can't settle.
peerLog.Errorf("unable to query to locate: %v", err)
p.Disconnect()
return
}
// TODO(roasbeef): check values accept if >=
state.htlcsToSettle[index] = invoice state.htlcsToSettle[index] = invoice
} else if err != channeldb.ErrInvoiceNotFound { return
peerLog.Errorf("unable to query for invoice: %v", err) case sphinx.MoreHops:
// TODO(roasbeef): parse out the next dest so can
// attach to packet when forwarding.
// * send cancel + error if not in rounting table
default:
peerLog.Errorf("mal formed onion packet")
p.Disconnect()
} }
case *lnwire.HTLCSettleRequest: case *lnwire.HTLCSettleRequest:
// TODO(roasbeef): this assumes no "multi-sig" // TODO(roasbeef): this assumes no "multi-sig"

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"bytes"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"io" "io"
@ -9,12 +10,16 @@ import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"github.com/BitfuryLightning/tools/rt/graph"
"github.com/btcsuite/fastsha256" "github.com/btcsuite/fastsha256"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lightning-onion"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lndc" "github.com/lightningnetwork/lnd/lndc"
"github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/txscript" "github.com/roasbeef/btcd/txscript"
"github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil" "github.com/roasbeef/btcutil"
@ -317,6 +322,7 @@ func (r *rpcServer) GetInfo(ctx context.Context,
return &lnrpc.GetInfoResponse{ return &lnrpc.GetInfoResponse{
LightningId: hex.EncodeToString(r.server.lightningID[:]), LightningId: hex.EncodeToString(r.server.lightningID[:]),
IdentityPubkey: hex.EncodeToString(idPub),
IdentityAddress: idAddr.String(), IdentityAddress: idAddr.String(),
NumPendingChannels: pendingChannels, NumPendingChannels: pendingChannels,
NumActiveChannels: activeChannels, NumActiveChannels: activeChannels,
@ -445,7 +451,9 @@ func (r *rpcServer) PendingChannels(ctx context.Context,
// bi-directional stream allowing clients to rapidly send payments through the // bi-directional stream allowing clients to rapidly send payments through the
// Lightning Network with a single persistent connection. // Lightning Network with a single persistent connection.
func (r *rpcServer) SendPayment(paymentStream lnrpc.Lightning_SendPaymentServer) error { func (r *rpcServer) SendPayment(paymentStream lnrpc.Lightning_SendPaymentServer) error {
queryTimeout := time.Duration(time.Minute)
errChan := make(chan error, 1) errChan := make(chan error, 1)
for { for {
select { select {
case err := <-errChan: case err := <-errChan:
@ -461,6 +469,28 @@ func (r *rpcServer) SendPayment(paymentStream lnrpc.Lightning_SendPaymentServer)
return err return err
} }
// Query the routing table for a potential path to the
// destination node. If a path is ultimately
// unavailable, then an error will be returned.
destNode := hex.EncodeToString(nextPayment.Dest)
targetVertex := graph.NewID(destNode)
path, err := r.server.routingMgr.FindPath(targetVertex,
queryTimeout)
if err != nil {
return err
}
rpcsLog.Tracef("[sendpayment] selected route: %v", path)
// Generate the raw encoded sphinx packet to be
// included along with the HTLC add message.
// We snip off the first hop from the path as within
// the routing table's star graph, we're always the
// first hop.
sphinxPacket, err := generateSphinxPacket(path[1:])
if err != nil {
return err
}
// If we're in debug HTLC mode, then all outgoing // If we're in debug HTLC mode, then all outgoing
// HTLC's will pay to the same debug rHash. Otherwise, // HTLC's will pay to the same debug rHash. Otherwise,
// we pay to the rHash specified within the RPC // we pay to the rHash specified within the RPC
@ -471,19 +501,18 @@ func (r *rpcServer) SendPayment(paymentStream lnrpc.Lightning_SendPaymentServer)
} else { } else {
copy(rHash[:], nextPayment.PaymentHash) copy(rHash[:], nextPayment.PaymentHash)
} }
// Craft an HTLC packet to send to the routing sub-system. The
// meta-data within this packet will be used to route the // Craft an HTLC packet to send to the routing
// payment through the network. // sub-system. The meta-data within this packet will be
// used to route the payment through the network.
htlcAdd := &lnwire.HTLCAddRequest{ htlcAdd := &lnwire.HTLCAddRequest{
Amount: lnwire.CreditsAmount(nextPayment.Amt), Amount: lnwire.CreditsAmount(nextPayment.Amt),
RedemptionHashes: [][32]byte{rHash}, RedemptionHashes: [][32]byte{rHash},
OnionBlob: sphinxPacket,
} }
destAddr, err := wire.NewShaHash(nextPayment.Dest) destAddr := wire.ShaHash(fastsha256.Sum256(nextPayment.Dest))
if err != nil {
return err
}
htlcPkt := &htlcPacket{ htlcPkt := &htlcPacket{
dest: *destAddr, dest: destAddr,
msg: htlcAdd, msg: htlcAdd,
} }
@ -512,6 +541,50 @@ func (r *rpcServer) SendPayment(paymentStream lnrpc.Lightning_SendPaymentServer)
return nil return nil
} }
// generateSphinxPacket generates then encodes a sphinx packet which encodes
// the onion route specified by the passed list of graph vertexes. The blob
// returned from this function can immediately be included within an HTLC add
// packet to be sent to the first hop within the route.
func generateSphinxPacket(vertexes []graph.ID) ([]byte, error) {
var dest sphinx.LightningAddress
e2eMessage := []byte("test")
route := make([]*btcec.PublicKey, len(vertexes))
for i, vertex := range vertexes {
vertexBytes, err := hex.DecodeString(vertex.String())
if err != nil {
return nil, err
}
pub, err := btcec.ParsePubKey(vertexBytes, btcec.S256())
if err != nil {
return nil, err
}
route[i] = pub
}
// Next generate the onion routing packet which allows
// us to perform privacy preserving source routing
// across the network.
var onionBlob bytes.Buffer
mixHeader, err := sphinx.NewOnionPacket(route, dest,
e2eMessage)
if err != nil {
return nil, err
}
if err := mixHeader.Encode(&onionBlob); err != nil {
return nil, err
}
rpcsLog.Tracef("[sendpayment] generated sphinx packet: %v",
newLogClosure(func() string {
return spew.Sdump(mixHeader)
}))
return onionBlob.Bytes(), nil
}
// AddInvoice attempts to add a new invoice to the invoice database. Any // AddInvoice attempts to add a new invoice to the invoice database. Any
// duplicated invoices are rejected, therefore all invoices *must* have a // duplicated invoices are rejected, therefore all invoices *must* have a
// unique payment preimage. // unique payment preimage.
@ -610,8 +683,11 @@ func (r *rpcServer) ListInvoices(ctx context.Context,
func (r *rpcServer) ShowRoutingTable(ctx context.Context, func (r *rpcServer) ShowRoutingTable(ctx context.Context,
in *lnrpc.ShowRoutingTableRequest) (*lnrpc.ShowRoutingTableResponse, error) { in *lnrpc.ShowRoutingTableRequest) (*lnrpc.ShowRoutingTableResponse, error) {
rpcsLog.Debugf("[ShowRoutingTable]") rpcsLog.Debugf("[ShowRoutingTable]")
rtCopy := r.server.routingMgr.GetRTCopy() rtCopy := r.server.routingMgr.GetRTCopy()
channels := make([]*lnrpc.RoutingTableLink, 0) channels := make([]*lnrpc.RoutingTableLink, 0)
for _, channel := range rtCopy.AllChannels() { for _, channel := range rtCopy.AllChannels() {
channels = append(channels, channels = append(channels,
@ -624,6 +700,7 @@ func (r *rpcServer) ShowRoutingTable(ctx context.Context,
}, },
) )
} }
return &lnrpc.ShowRoutingTableResponse{ return &lnrpc.ShowRoutingTableResponse{
Channels: channels, Channels: channels,
}, nil }, nil

View File

@ -9,6 +9,7 @@ import (
"sync/atomic" "sync/atomic"
"github.com/btcsuite/fastsha256" "github.com/btcsuite/fastsha256"
"github.com/lightningnetwork/lightning-onion"
"github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lndc" "github.com/lightningnetwork/lnd/lndc"
@ -58,6 +59,8 @@ type server struct {
utxoNursery *utxoNursery utxoNursery *utxoNursery
sphinx *sphinx.Router
newPeers chan *peer newPeers chan *peer
donePeers chan *peer donePeers chan *peer
queries chan interface{} queries chan interface{}
@ -95,13 +98,16 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
invoices: newInvoiceRegistry(chanDB), invoices: newInvoiceRegistry(chanDB),
lnwallet: wallet, lnwallet: wallet,
identityPriv: privKey, identityPriv: privKey,
lightningID: fastsha256.Sum256(serializedPubKey), // TODO(roasbeef): derive proper onion key based on rotation
listeners: listeners, // schedule
peers: make(map[int32]*peer), sphinx: sphinx.NewRouter(privKey, activeNetParams.Params),
newPeers: make(chan *peer, 100), lightningID: fastsha256.Sum256(serializedPubKey),
donePeers: make(chan *peer, 100), listeners: listeners,
queries: make(chan interface{}), peers: make(map[int32]*peer),
quit: make(chan struct{}), newPeers: make(chan *peer, 100),
donePeers: make(chan *peer, 100),
queries: make(chan interface{}),
quit: make(chan struct{}),
} }
// If the debug HTLC flag is on, then we invoice a "master debug" // If the debug HTLC flag is on, then we invoice a "master debug"