Merge pull request #1731 from halseth/link-policy-persist

Correctly apply min_htlc to forwarding policy
This commit is contained in:
Olaoluwa Osuntokun 2018-08-23 19:21:44 -07:00 committed by GitHub
commit a1a6845fb5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 887 additions and 265 deletions

View File

@ -1617,7 +1617,7 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
// is over.
// TODO(roasbeef): add abstraction over updates to accommodate
// long-polling, or SSE, etc.
resCtx.updates <- &lnrpc.OpenStatusUpdate{
upd := &lnrpc.OpenStatusUpdate{
Update: &lnrpc.OpenStatusUpdate_ChanPending{
ChanPending: &lnrpc.PendingUpdate{
Txid: fundingPoint.Hash[:],
@ -1626,6 +1626,12 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
},
}
select {
case resCtx.updates <- upd:
case <-f.quit:
return
}
// At this point we have broadcast the funding transaction and done all
// necessary processing.
f.wg.Add(1)
@ -1693,7 +1699,7 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
// Give the caller a final update notifying them that
// the channel is now open.
// TODO(roasbeef): only notify after recv of funding locked?
resCtx.updates <- &lnrpc.OpenStatusUpdate{
upd := &lnrpc.OpenStatusUpdate{
Update: &lnrpc.OpenStatusUpdate_ChanOpen{
ChanOpen: &lnrpc.ChannelOpenUpdate{
ChannelPoint: &lnrpc.ChannelPoint{
@ -1706,6 +1712,12 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
},
}
select {
case resCtx.updates <- upd:
case <-f.quit:
return
}
err = f.annAfterSixConfs(completeChan, shortChanID)
if err != nil {
fndgLog.Errorf("failed sending channel announcement: %v",
@ -2060,16 +2072,17 @@ func (f *fundingManager) addToRouterGraph(completeChan *channeldb.OpenChannel,
chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint)
// We'll obtain their min HTLC as we'll use this value within our
// ChannelUpdate. We use this value isn't of ours, as the remote party
// will be the one that's carrying the HTLC towards us.
remoteMinHTLC := completeChan.RemoteChanCfg.MinHTLC
// We'll obtain the min HTLC value we can forward in our direction, as
// we'll use this value within our ChannelUpdate. This constraint is
// originally set by the remote node, as it will be the one that will
// need to determine the smallest HTLC it deems economically relevant.
fwdMinHTLC := completeChan.LocalChanCfg.MinHTLC
ann, err := f.newChanAnnouncement(
f.cfg.IDKey, completeChan.IdentityPub,
completeChan.LocalChanCfg.MultiSigKey.PubKey,
completeChan.RemoteChanCfg.MultiSigKey.PubKey, *shortChanID,
chanID, remoteMinHTLC,
chanID, fwdMinHTLC,
)
if err != nil {
return fmt.Errorf("error generating channel "+
@ -2196,11 +2209,12 @@ func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel,
fndgLog.Infof("Announcing ChannelPoint(%v), short_chan_id=%v",
&fundingPoint, spew.Sdump(shortChanID))
// We'll obtain their min HTLC as we'll use this value within
// our ChannelUpdate. We use this value isn't of ours, as the
// remote party will be the one that's carrying the HTLC towards
// us.
remoteMinHTLC := completeChan.RemoteChanCfg.MinHTLC
// We'll obtain the min HTLC value we can forward in our
// direction, as we'll use this value within our ChannelUpdate.
// This constraint is originally set by the remote node, as it
// will be the one that will need to determine the smallest
// HTLC it deems economically relevant.
fwdMinHTLC := completeChan.LocalChanCfg.MinHTLC
// Create and broadcast the proofs required to make this channel
// public and usable for other nodes for routing.
@ -2208,7 +2222,7 @@ func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel,
f.cfg.IDKey, completeChan.IdentityPub,
completeChan.LocalChanCfg.MultiSigKey.PubKey,
completeChan.RemoteChanCfg.MultiSigKey.PubKey,
*shortChanID, chanID, remoteMinHTLC,
*shortChanID, chanID, fwdMinHTLC,
)
if err != nil {
return fmt.Errorf("channel announcement failed: %v", err)
@ -2375,10 +2389,10 @@ type chanAnnouncement struct {
// identity pub keys of both parties to the channel, and the second segment is
// authenticated only by us and contains our directional routing policy for the
// channel.
func (f *fundingManager) newChanAnnouncement(localPubKey, remotePubKey *btcec.PublicKey,
func (f *fundingManager) newChanAnnouncement(localPubKey, remotePubKey,
localFundingKey, remoteFundingKey *btcec.PublicKey,
shortChanID lnwire.ShortChannelID, chanID lnwire.ChannelID,
remoteMinHTLC lnwire.MilliSatoshi) (*chanAnnouncement, error) {
fwdMinHTLC lnwire.MilliSatoshi) (*chanAnnouncement, error) {
chainHash := *f.cfg.Wallet.Cfg.NetParams.GenesisHash
@ -2432,9 +2446,10 @@ func (f *fundingManager) newChanAnnouncement(localPubKey, remotePubKey *btcec.Pu
Flags: chanFlags,
TimeLockDelta: uint16(f.cfg.DefaultRoutingPolicy.TimeLockDelta),
// We use the *remote* party's HtlcMinimumMsat, as they'll be
// the ones carrying the HTLC routed *towards* us.
HtlcMinimumMsat: remoteMinHTLC,
// We use the HtlcMinimumMsat that the remote party required us
// to use, as our ChannelUpdate will be used to carry HTLCs
// towards them.
HtlcMinimumMsat: fwdMinHTLC,
BaseFee: uint32(f.cfg.DefaultRoutingPolicy.BaseFee),
FeeRate: uint32(f.cfg.DefaultRoutingPolicy.FeeRate),
@ -2513,7 +2528,7 @@ func (f *fundingManager) newChanAnnouncement(localPubKey, remotePubKey *btcec.Pu
// finish, either successfully or with an error.
func (f *fundingManager) announceChannel(localIDKey, remoteIDKey, localFundingKey,
remoteFundingKey *btcec.PublicKey, shortChanID lnwire.ShortChannelID,
chanID lnwire.ChannelID, remoteMinHTLC lnwire.MilliSatoshi) error {
chanID lnwire.ChannelID, fwdMinHTLC lnwire.MilliSatoshi) error {
// First, we'll create the batch of announcements to be sent upon
// initial channel creation. This includes the channel announcement
@ -2521,7 +2536,7 @@ func (f *fundingManager) announceChannel(localIDKey, remoteIDKey, localFundingKe
// proof needed to fully authenticate the channel.
ann, err := f.newChanAnnouncement(localIDKey, remoteIDKey,
localFundingKey, remoteFundingKey, shortChanID, chanID,
remoteMinHTLC,
fwdMinHTLC,
)
if err != nil {
fndgLog.Errorf("can't generate channel announcement: %v", err)

View File

@ -432,6 +432,12 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) {
},
TempChanIDSeed: oldCfg.TempChanIDSeed,
FindChannel: oldCfg.FindChannel,
DefaultRoutingPolicy: htlcswitch.ForwardingPolicy{
MinHTLC: 5,
BaseFee: 100,
FeeRate: 1000,
TimeLockDelta: 10,
},
PublishTransaction: func(txn *wire.MsgTx) error {
publishChan <- txn
return nil
@ -810,7 +816,16 @@ func assertAddedToRouterGraph(t *testing.T, alice, bob *testNode,
assertDatabaseState(t, bob, fundingOutPoint, addedToRouterGraph)
}
func assertChannelAnnouncements(t *testing.T, alice, bob *testNode) {
// assertChannelAnnouncements checks that alice and bob both sends the expected
// announcements (ChannelAnnouncement, ChannelUpdate) after the funding tx has
// confirmed. The last arguments can be set if we expect the nodes to advertise
// custom min_htlc values as part of their ChannelUpdate. We expect Alice to
// advertise the value required by Bob and vice versa. If they are not set the
// advertised value will be checked againts the other node's default min_htlc
// value.
func assertChannelAnnouncements(t *testing.T, alice, bob *testNode,
customMinHtlc ...lnwire.MilliSatoshi) {
// After the FundingLocked message is sent, Alice and Bob will each
// send the following messages to their gossiper:
// 1) ChannelAnnouncement
@ -818,7 +833,8 @@ func assertChannelAnnouncements(t *testing.T, alice, bob *testNode) {
// The ChannelAnnouncement is kept locally, while the ChannelUpdate
// is sent directly to the other peer, so the edge policies are
// known to both peers.
for j, node := range []*testNode{alice, bob} {
nodes := []*testNode{alice, bob}
for j, node := range nodes {
announcements := make([]lnwire.Message, 2)
for i := 0; i < len(announcements); i++ {
select {
@ -831,10 +847,35 @@ func assertChannelAnnouncements(t *testing.T, alice, bob *testNode) {
gotChannelAnnouncement := false
gotChannelUpdate := false
for _, msg := range announcements {
switch msg.(type) {
switch m := msg.(type) {
case *lnwire.ChannelAnnouncement:
gotChannelAnnouncement = true
case *lnwire.ChannelUpdate:
// The channel update sent by the node should
// advertise the MinHTLC value required by the
// _other_ node.
other := (j + 1) % 2
minHtlc := nodes[other].fundingMgr.cfg.
DefaultRoutingPolicy.MinHTLC
// We might expect a custom MinHTLC value.
if len(customMinHtlc) > 0 {
if len(customMinHtlc) != 2 {
t.Fatalf("only 0 or 2 custom " +
"min htlc values " +
"currently supported")
}
minHtlc = customMinHtlc[j]
}
if m.HtlcMinimumMsat != minHtlc {
t.Fatalf("expected ChannelUpdate to "+
"advertise min HTLC %v, had %v",
minHtlc, m.HtlcMinimumMsat)
}
gotChannelUpdate = true
}
}
@ -2209,6 +2250,31 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) {
case <-time.After(time.Second * 5):
t.Fatalf("alice did not publish funding tx")
}
// Notify that transaction was mined.
alice.mockNotifier.oneConfChannel <- &chainntnfs.TxConfirmation{}
bob.mockNotifier.oneConfChannel <- &chainntnfs.TxConfirmation{}
// After the funding transaction is mined, Alice will send
// fundingLocked to Bob.
_ = assertFundingMsgSent(
t, alice.msgChan, "FundingLocked",
).(*lnwire.FundingLocked)
// And similarly Bob will send funding locked to Alice.
_ = assertFundingMsgSent(
t, bob.msgChan, "FundingLocked",
).(*lnwire.FundingLocked)
// Make sure both fundingManagers send the expected channel
// announcements. Alice should advertise the default MinHTLC value of
// 5, while bob should advertise the value minHtlc, since Alice
// required him to use it.
assertChannelAnnouncements(t, alice, bob, 5, minHtlc)
// The funding transaction is now confirmed, wait for the
// OpenStatusUpdate_ChanOpen update
waitForOpenUpdate(t, updateChan)
}
// TestFundingManagerMaxPendingChannels checks that trying to open another

File diff suppressed because it is too large Load Diff

View File

@ -19,6 +19,7 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwire"
)
// NetworkHarness is an integration testing harness for the lightning network.
@ -687,15 +688,35 @@ func (n *NetworkHarness) WaitForTxBroadcast(ctx context.Context, txid chainhash.
}
}
// OpenChannelParams houses the params to specify when opening a new channel.
type OpenChannelParams struct {
// Amt is the local amount being put into the channel.
Amt btcutil.Amount
// PushAmt is the amount that should be pushed to the remote when the
// channel is opened.
PushAmt btcutil.Amount
// Private is a boolan indicating whether the opened channel should be
// private.
Private bool
// SpendUnconfirmed is a boolean indicating whether we can utilize
// unconfirmed outputs to fund the channel.
SpendUnconfirmed bool
// MinHtlc is the htlc_minumum_msat value set when opening the channel.
MinHtlc lnwire.MilliSatoshi
}
// OpenChannel attempts to open a channel between srcNode and destNode with the
// passed channel funding parameters. If the passed context has a timeout, then
// if the timeout is reached before the channel pending notification is
// received, an error is returned. The confirmed boolean determines whether we
// should fund the channel with confirmed outputs or not.
func (n *NetworkHarness) OpenChannel(ctx context.Context,
srcNode, destNode *HarnessNode, amt btcutil.Amount,
pushAmt btcutil.Amount,
private, confirmed bool) (lnrpc.Lightning_OpenChannelClient, error) {
srcNode, destNode *HarnessNode, p OpenChannelParams) (
lnrpc.Lightning_OpenChannelClient, error) {
// Wait until srcNode and destNode have the latest chain synced.
// Otherwise, we may run into a check within the funding manager that
@ -708,17 +729,18 @@ func (n *NetworkHarness) OpenChannel(ctx context.Context,
return nil, fmt.Errorf("Unable to sync destNode chain: %v", err)
}
minConfs := int32(0)
if confirmed {
minConfs = 1
minConfs := int32(1)
if p.SpendUnconfirmed {
minConfs = 0
}
openReq := &lnrpc.OpenChannelRequest{
NodePubkey: destNode.PubKey[:],
LocalFundingAmount: int64(amt),
PushSat: int64(pushAmt),
Private: private,
LocalFundingAmount: int64(p.Amt),
PushSat: int64(p.PushAmt),
Private: p.Private,
MinConfs: minConfs,
MinHtlcMsat: int64(p.MinHtlc),
}
respStream, err := srcNode.OpenChannel(ctx, openReq)

View File

@ -6254,3 +6254,9 @@ func (lc *LightningChannel) RemoteCommitHeight() uint64 {
return lc.channelState.RemoteCommitment.CommitHeight
}
// FwdMinHtlc returns the minimum HTLC value required by the remote node, i.e.
// the minimum value HTLC we can forward on this channel.
func (lc *LightningChannel) FwdMinHtlc() lnwire.MilliSatoshi {
return lc.localChanCfg.MinHTLC
}

82
peer.go
View File

@ -396,10 +396,14 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
TimeLockDelta: uint32(selfPolicy.TimeLockDelta),
}
} else {
peerLog.Warnf("Unable to find our forwarding policy "+
"for channel %v, using default values",
chanPoint)
forwardingPolicy = &p.server.cc.routingPolicy
}
peerLog.Tracef("Using link policy of: %v", spew.Sdump(forwardingPolicy))
peerLog.Tracef("Using link policy of: %v",
spew.Sdump(forwardingPolicy))
// Register this new channel link with the HTLC Switch. This is
// necessary to properly route multi-hop payments, and forward
@ -539,22 +543,20 @@ func (p *peer) addLink(chanPoint *wire.OutPoint,
}
linkCfg := htlcswitch.ChannelLinkConfig{
Peer: p,
DecodeHopIterators: p.server.sphinx.DecodeHopIterators,
ExtractErrorEncrypter: p.server.sphinx.ExtractErrorEncrypter,
FetchLastChannelUpdate: fetchLastChanUpdate(
p.server, p.PubKey(),
),
DebugHTLC: cfg.DebugHTLC,
HodlMask: cfg.Hodl.Mask(),
Registry: p.server.invoices,
Switch: p.server.htlcSwitch,
Circuits: p.server.htlcSwitch.CircuitModifier(),
ForwardPackets: p.server.htlcSwitch.ForwardPackets,
FwrdingPolicy: *forwardingPolicy,
FeeEstimator: p.server.cc.feeEstimator,
PreimageCache: p.server.witnessBeacon,
ChainEvents: chainEvents,
Peer: p,
DecodeHopIterators: p.server.sphinx.DecodeHopIterators,
ExtractErrorEncrypter: p.server.sphinx.ExtractErrorEncrypter,
FetchLastChannelUpdate: p.server.fetchLastChanUpdate(),
DebugHTLC: cfg.DebugHTLC,
HodlMask: cfg.Hodl.Mask(),
Registry: p.server.invoices,
Switch: p.server.htlcSwitch,
Circuits: p.server.htlcSwitch.CircuitModifier(),
ForwardPackets: p.server.htlcSwitch.ForwardPackets,
FwrdingPolicy: *forwardingPolicy,
FeeEstimator: p.server.cc.feeEstimator,
PreimageCache: p.server.witnessBeacon,
ChainEvents: chainEvents,
UpdateContractSignals: func(signals *contractcourt.ContractSignals) error {
return p.server.chainArb.UpdateContractSignals(
*chanPoint, signals,
@ -1550,9 +1552,23 @@ out:
continue
}
// We'll query the localChanCfg of the new channel to
// determine the minimum HTLC value that can be
// forwarded. For fees we'll use the default values, as
// they currently are always set to the default values
// at initial channel creation.
fwdMinHtlc := newChan.FwdMinHtlc()
defaultPolicy := p.server.cc.routingPolicy
forwardingPolicy := &htlcswitch.ForwardingPolicy{
MinHTLC: fwdMinHtlc,
BaseFee: defaultPolicy.BaseFee,
FeeRate: defaultPolicy.FeeRate,
TimeLockDelta: defaultPolicy.TimeLockDelta,
}
// Create the link and add it to the switch.
err = p.addLink(
chanPoint, newChan, &p.server.cc.routingPolicy,
chanPoint, newChan, forwardingPolicy,
chainEvents, currentHeight, false,
)
if err != nil {
@ -2098,33 +2114,3 @@ func (p *peer) StartTime() time.Time {
}
// TODO(roasbeef): make all start/stop mutexes a CAS
// fetchLastChanUpdate returns a function which is able to retrieve the last
// channel update for a target channel.
func fetchLastChanUpdate(s *server,
pubKey [33]byte) func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) {
return func(cid lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) {
info, edge1, edge2, err := s.chanRouter.GetChannelByID(cid)
if err != nil {
return nil, err
}
if edge1 == nil || edge2 == nil {
return nil, fmt.Errorf("unable to find channel by "+
"ShortChannelID(%v)", cid)
}
// If we're the outgoing node on the first edge, then that
// means the second edge is our policy. Otherwise, the first
// edge is our policy.
var local *channeldb.ChannelEdgePolicy
if bytes.Equal(edge1.Node.PubKeyBytes[:], pubKey[:]) {
local = edge2
} else {
local = edge1
}
return extractChannelUpdate(info, local)
}
}

View File

@ -336,7 +336,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
FwdingLog: chanDB.ForwardingLog(),
SwitchPackager: channeldb.NewSwitchPackager(),
ExtractErrorEncrypter: s.sphinx.ExtractErrorEncrypter,
FetchLastChannelUpdate: fetchLastChanUpdate(s, serializedPubKey),
FetchLastChannelUpdate: s.fetchLastChanUpdate(),
Notifier: s.cc.chainNotifier,
FwdEventTicker: ticker.New(
htlcswitch.DefaultFwdEventInterval),
@ -2952,6 +2952,33 @@ func (s *server) fetchLastChanUpdateByOutPoint(op wire.OutPoint) (
return nil, err
}
pubKey := s.identityPriv.PubKey().SerializeCompressed()
return extractChannelUpdate(pubKey, info, edge1, edge2)
}
// fetchLastChanUpdate returns a function which is able to retrieve our latest
// channel update for a target channel.
func (s *server) fetchLastChanUpdate() func(lnwire.ShortChannelID) (
*lnwire.ChannelUpdate, error) {
ourPubKey := s.identityPriv.PubKey().SerializeCompressed()
return func(cid lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) {
info, edge1, edge2, err := s.chanRouter.GetChannelByID(cid)
if err != nil {
return nil, err
}
return extractChannelUpdate(ourPubKey[:], info, edge1, edge2)
}
}
// extractChannelUpdate attempts to retrieve a lnwire.ChannelUpdate message
// from an edge's info and a set of routing policies.
// NOTE: the passed policies can be nil.
func extractChannelUpdate(ownerPubKey []byte,
info *channeldb.ChannelEdgeInfo,
policies ...*channeldb.ChannelEdgePolicy) (
*lnwire.ChannelUpdate, error) {
// Helper function to extract the owner of the given policy.
owner := func(edge *channeldb.ChannelEdgePolicy) []byte {
var pubKey *btcec.PublicKey
@ -2971,21 +2998,19 @@ func (s *server) fetchLastChanUpdateByOutPoint(op wire.OutPoint) (
}
// Extract the channel update from the policy we own, if any.
ourPubKey := s.identityPriv.PubKey().SerializeCompressed()
if edge1 != nil && bytes.Equal(ourPubKey, owner(edge1)) {
return extractChannelUpdate(info, edge1)
for _, edge := range policies {
if edge != nil && bytes.Equal(ownerPubKey, owner(edge)) {
return createChannelUpdate(info, edge)
}
}
if edge2 != nil && bytes.Equal(ourPubKey, owner(edge2)) {
return extractChannelUpdate(info, edge2)
}
return nil, fmt.Errorf("unable to find channel(%v)", op)
return nil, fmt.Errorf("unable to extract ChannelUpdate for channel %v",
info.ChannelPoint)
}
// extractChannelUpdate retrieves a lnwire.ChannelUpdate message from an edge's
// info and routing policy.
func extractChannelUpdate(info *channeldb.ChannelEdgeInfo,
// createChannelUpdate reconstructs a signed ChannelUpdate from the given edge
// info and policy.
func createChannelUpdate(info *channeldb.ChannelEdgeInfo,
policy *channeldb.ChannelEdgePolicy) (*lnwire.ChannelUpdate, error) {
update := &lnwire.ChannelUpdate{