Merge pull request #5405 from ErikEk/routing-updatechanpolicy-chan-check

routing: report invalid channels in updatechanpolicy call
This commit is contained in:
Oliver Gugger 2021-09-16 09:41:02 +02:00 committed by GitHub
commit d9534ea108
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1787 additions and 1342 deletions

View File

@ -2311,7 +2311,22 @@ func updateChannelPolicy(ctx *cli.Context) error {
return err
}
printRespJSON(resp)
// Parse the response into the final json object that will be printed
// to stdout. At the moment, this filters out the raw txid bytes from
// each failed update's outpoint and only prints the txid string.
var listFailedUpdateResp = struct {
FailedUpdates []*FailedUpdate `json:"failed_updates"`
}{
FailedUpdates: make([]*FailedUpdate, 0, len(resp.FailedUpdates)),
}
for _, protoUpdate := range resp.FailedUpdates {
failedUpdate := NewFailedUpdateFromProto(protoUpdate)
listFailedUpdateResp.FailedUpdates = append(
listFailedUpdateResp.FailedUpdates, failedUpdate)
}
printJSON(listFailedUpdateResp)
return nil
}

View File

@ -66,3 +66,22 @@ func NewUtxoFromProto(utxo *lnrpc.Utxo) *Utxo {
Confirmations: utxo.Confirmations,
}
}
// FailedUpdate displays information about a failed update, including its
// address, reason and update error.
type FailedUpdate struct {
OutPoint OutPoint `json:"outpoint"`
Reason string `json:"reason"`
UpdateError string `json:"update_error"`
}
// NewFailedUpdateFromProto creates a display from the FailedUpdate
// proto. This filters out the raw txid bytes from the provided outpoint,
// which will otherwise be printed in base64.
func NewFailedUpdateFromProto(update *lnrpc.FailedUpdate) *FailedUpdate {
return &FailedUpdate{
OutPoint: NewOutPointFromProto(update.Outpoint),
Reason: update.Reason.String(),
UpdateError: update.UpdateError,
}
}

View File

@ -62,6 +62,9 @@ proposed channel type is used.
* [Stub code for interacting with `lnrpc` from a WASM context through JSON
messages was added](https://github.com/lightningnetwork/lnd/pull/5601).
* The updatechanpolicy call now [detects invalid and pending channels, and
returns a policy update failure report](https://github.com/lightningnetwork/lnd/pull/5405).
* LND now [reports to systemd](https://github.com/lightningnetwork/lnd/pull/5536)
that RPC is ready (port bound, certificate generated, macaroons created,
in case of `wallet-unlock-password-file` wallet unlocked). This can be used to

File diff suppressed because it is too large Load Diff

View File

@ -3635,7 +3635,28 @@ message PolicyUpdateRequest {
// If true, min_htlc_msat is applied.
bool min_htlc_msat_specified = 8;
}
enum UpdateFailure {
UPDATE_FAILURE_UNKNOWN = 0;
UPDATE_FAILURE_PENDING = 1;
UPDATE_FAILURE_NOT_FOUND = 2;
UPDATE_FAILURE_INTERNAL_ERR = 3;
UPDATE_FAILURE_INVALID_PARAMETER = 4;
}
message FailedUpdate {
// The outpoint in format txid:n
OutPoint outpoint = 1;
// Reason for the policy update failure.
UpdateFailure reason = 2;
// A string representation of the policy update error.
string update_error = 3;
}
message PolicyUpdateResponse {
// List of failed policy updates.
repeated FailedUpdate failed_updates = 1;
}
message ForwardingHistoryRequest {

View File

@ -3831,6 +3831,23 @@
}
}
},
"lnrpcFailedUpdate": {
"type": "object",
"properties": {
"outpoint": {
"$ref": "#/definitions/lnrpcOutPoint",
"title": "The outpoint in format txid:n"
},
"reason": {
"$ref": "#/definitions/lnrpcUpdateFailure",
"description": "Reason for the policy update failure."
},
"update_error": {
"type": "string",
"description": "A string representation of the policy update error."
}
}
},
"lnrpcFailure": {
"type": "object",
"properties": {
@ -5562,7 +5579,16 @@
}
},
"lnrpcPolicyUpdateResponse": {
"type": "object"
"type": "object",
"properties": {
"failed_updates": {
"type": "array",
"items": {
"$ref": "#/definitions/lnrpcFailedUpdate"
},
"description": "List of failed policy updates."
}
}
},
"lnrpcPsbtShim": {
"type": "object",
@ -6100,6 +6126,17 @@
}
}
},
"lnrpcUpdateFailure": {
"type": "string",
"enum": [
"UPDATE_FAILURE_UNKNOWN",
"UPDATE_FAILURE_PENDING",
"UPDATE_FAILURE_NOT_FOUND",
"UPDATE_FAILURE_INTERNAL_ERR",
"UPDATE_FAILURE_INVALID_PARAMETER"
],
"default": "UPDATE_FAILURE_UNKNOWN"
},
"lnrpcUtxo": {
"type": "object",
"properties": {

View File

@ -1,6 +1,7 @@
package localchans
import (
"errors"
"fmt"
"sync"
@ -9,6 +10,7 @@ import (
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing"
)
@ -45,23 +47,24 @@ type Manager struct {
policyUpdateLock sync.Mutex
}
// UpdatePolicy updates the policy for the specified channels on disk and in the
// active links.
// UpdatePolicy updates the policy for the specified channels on disk and in
// the active links.
func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
chanPoints ...wire.OutPoint) error {
chanPoints ...wire.OutPoint) ([]*lnrpc.FailedUpdate, error) {
r.policyUpdateLock.Lock()
defer r.policyUpdateLock.Unlock()
// First, we'll construct a set of all the channels that need to be
// updated.
chansToUpdate := make(map[wire.OutPoint]struct{})
// First, we'll construct a set of all the channels that we are
// trying to update.
unprocessedChans := make(map[wire.OutPoint]struct{})
for _, chanPoint := range chanPoints {
chansToUpdate[chanPoint] = struct{}{}
unprocessedChans[chanPoint] = struct{}{}
}
haveChanFilter := len(chansToUpdate) != 0
haveChanFilter := len(unprocessedChans) != 0
var failedUpdates []*lnrpc.FailedUpdate
var edgesToUpdate []discovery.EdgeWithInfo
policiesToUpdate := make(map[wire.OutPoint]htlcswitch.ForwardingPolicy)
@ -75,17 +78,24 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
// If we have a channel filter, and this channel isn't a part
// of it, then we'll skip it.
_, ok := chansToUpdate[info.ChannelPoint]
_, ok := unprocessedChans[info.ChannelPoint]
if !ok && haveChanFilter {
return nil
}
// Mark this channel as found by removing it. unprocessedChans
// will be used to report invalid channels later on.
delete(unprocessedChans, info.ChannelPoint)
// Apply the new policy to the edge.
err := r.updateEdge(tx, info.ChannelPoint, edge, newSchema)
if err != nil {
log.Warnf("Cannot update policy for %v: %v\n",
info.ChannelPoint, err,
)
failedUpdates = append(failedUpdates,
makeFailureItem(info.ChannelPoint,
lnrpc.UpdateFailure_UPDATE_FAILURE_INVALID_PARAMETER,
err.Error(),
))
return nil
}
@ -107,7 +117,41 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
return nil
})
if err != nil {
return err
return nil, err
}
// Construct a list of failed policy updates.
for chanPoint := range unprocessedChans {
channel, err := r.FetchChannel(nil, chanPoint)
switch {
case errors.Is(err, channeldb.ErrChannelNotFound):
failedUpdates = append(failedUpdates,
makeFailureItem(chanPoint,
lnrpc.UpdateFailure_UPDATE_FAILURE_NOT_FOUND,
"not found",
))
case err != nil:
failedUpdates = append(failedUpdates,
makeFailureItem(chanPoint,
lnrpc.UpdateFailure_UPDATE_FAILURE_INTERNAL_ERR,
err.Error(),
))
case channel.IsPending:
failedUpdates = append(failedUpdates,
makeFailureItem(chanPoint,
lnrpc.UpdateFailure_UPDATE_FAILURE_PENDING,
"not yet confirmed",
))
default:
failedUpdates = append(failedUpdates,
makeFailureItem(chanPoint,
lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN,
"could not update policies",
))
}
}
// Commit the policy updates to disk and broadcast to the network. We
@ -117,13 +161,13 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
// multiple edge updates.
err = r.PropagateChanPolicyUpdate(edgesToUpdate)
if err != nil {
return err
return nil, err
}
// Update active links.
r.UpdateForwardingPolicies(policiesToUpdate)
return nil
return failedUpdates, nil
}
// updateEdge updates the given edge with the new schema.
@ -175,19 +219,22 @@ func (r *Manager) updateEdge(tx kvdb.RTx, chanPoint wire.OutPoint,
// Validate htlc amount constraints.
switch {
case edge.MinHTLC < amtMin:
return fmt.Errorf("min htlc amount of %v msat is below "+
"min htlc parameter of %v msat for channel %v",
return fmt.Errorf(
"min htlc amount of %v is below min htlc parameter of %v",
edge.MinHTLC, amtMin,
chanPoint)
)
case edge.MaxHTLC > amtMax:
return fmt.Errorf("max htlc size of %v msat is above "+
"max pending amount of %v msat for channel %v",
edge.MaxHTLC, amtMax, chanPoint)
return fmt.Errorf(
"max htlc size of %v is above max pending amount of %v",
edge.MaxHTLC, amtMax,
)
case edge.MinHTLC > edge.MaxHTLC:
return fmt.Errorf("min_htlc %v greater than max_htlc %v",
edge.MinHTLC, edge.MaxHTLC)
return fmt.Errorf(
"min_htlc %v greater than max_htlc %v",
edge.MinHTLC, edge.MaxHTLC,
)
}
// Clear signature to help prevent usage of the previous signature.
@ -214,3 +261,20 @@ func (r *Manager) getHtlcAmtLimits(tx kvdb.RTx, chanPoint wire.OutPoint) (
return ch.LocalChanCfg.MinHTLC, maxAmt, nil
}
// makeFailureItem creates a lnrpc.FailedUpdate object.
func makeFailureItem(outPoint wire.OutPoint, updateFailure lnrpc.UpdateFailure,
errStr string) *lnrpc.FailedUpdate {
outpoint := &lnrpc.OutPoint{
TxidBytes: outPoint.Hash[:],
TxidStr: outPoint.Hash.String(),
OutputIndex: outPoint.Index,
}
return &lnrpc.FailedUpdate{
Outpoint: outpoint,
Reason: updateFailure,
UpdateError: errStr,
}
}

View File

@ -4,7 +4,9 @@ import (
"testing"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/stretchr/testify/require"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcutil"
@ -19,11 +21,20 @@ import (
// TestManager tests that the local channel manager properly propagates fee
// updates to gossiper and links.
func TestManager(t *testing.T) {
t.Parallel()
type channel struct {
edgeInfo *channeldb.ChannelEdgeInfo
}
var (
chanPoint = wire.OutPoint{Hash: chainhash.Hash{1}, Index: 2}
chanCap = btcutil.Amount(1000)
maxPendingAmount = lnwire.MilliSatoshi(999000)
minHTLC = lnwire.MilliSatoshi(2000)
chanPointValid = wire.OutPoint{Hash: chainhash.Hash{1}, Index: 2}
chanCap = btcutil.Amount(1000)
chanPointMissing = wire.OutPoint{Hash: chainhash.Hash{2}, Index: 2}
maxPendingAmount = lnwire.MilliSatoshi(999000)
minHTLC = lnwire.MilliSatoshi(2000)
expectedNumUpdates int
channelSet []channel
)
newPolicy := routing.ChannelPolicy{
@ -43,11 +54,15 @@ func TestManager(t *testing.T) {
updateForwardingPolicies := func(
chanPolicies map[wire.OutPoint]htlcswitch.ForwardingPolicy) {
if len(chanPolicies) == 0 {
return
}
if len(chanPolicies) != 1 {
t.Fatal("unexpected number of policies to apply")
}
policy := chanPolicies[chanPoint]
policy := chanPolicies[chanPointValid]
if policy.TimeLockDelta != newPolicy.TimeLockDelta {
t.Fatal("unexpected time lock delta")
}
@ -65,25 +80,29 @@ func TestManager(t *testing.T) {
propagateChanPolicyUpdate := func(
edgesToUpdate []discovery.EdgeWithInfo) error {
if len(edgesToUpdate) != 1 {
t.Fatal("unexpected number of edges to update")
if len(edgesToUpdate) != expectedNumUpdates {
t.Fatalf("unexpected number of updates,"+
" expected %d got %d", expectedNumUpdates,
len(edgesToUpdate))
}
policy := edgesToUpdate[0].Edge
if !policy.MessageFlags.HasMaxHtlc() {
t.Fatal("expected max htlc flag")
}
if policy.TimeLockDelta != uint16(newPolicy.TimeLockDelta) {
t.Fatal("unexpected time lock delta")
}
if policy.FeeBaseMSat != newPolicy.BaseFee {
t.Fatal("unexpected base fee")
}
if uint32(policy.FeeProportionalMillionths) != newPolicy.FeeRate {
t.Fatal("unexpected base fee")
}
if policy.MaxHTLC != newPolicy.MaxHTLC {
t.Fatal("unexpected max htlc")
for _, edge := range edgesToUpdate {
policy := edge.Edge
if !policy.MessageFlags.HasMaxHtlc() {
t.Fatal("expected max htlc flag")
}
if policy.TimeLockDelta != uint16(newPolicy.TimeLockDelta) {
t.Fatal("unexpected time lock delta")
}
if policy.FeeBaseMSat != newPolicy.BaseFee {
t.Fatal("unexpected base fee")
}
if uint32(policy.FeeProportionalMillionths) != newPolicy.FeeRate {
t.Fatal("unexpected base fee")
}
if policy.MaxHTLC != newPolicy.MaxHTLC {
t.Fatal("unexpected max htlc")
}
}
return nil
@ -93,19 +112,21 @@ func TestManager(t *testing.T) {
*channeldb.ChannelEdgeInfo,
*channeldb.ChannelEdgePolicy) error) error {
return cb(
nil,
&channeldb.ChannelEdgeInfo{
Capacity: chanCap,
ChannelPoint: chanPoint,
},
&currentPolicy,
)
for _, c := range channelSet {
if err := cb(nil, c.edgeInfo, &currentPolicy); err != nil {
return err
}
}
return nil
}
fetchChannel := func(tx kvdb.RTx, chanPoint wire.OutPoint) (
*channeldb.OpenChannel, error) {
if chanPoint == chanPointMissing {
return &channeldb.OpenChannel{}, channeldb.ErrChannelNotFound
}
constraints := channeldb.ChannelConstraints{
MaxPendingAmount: maxPendingAmount,
MinHTLC: minHTLC,
@ -125,27 +146,117 @@ func TestManager(t *testing.T) {
FetchChannel: fetchChannel,
}
// Test updating a specific channels.
err := manager.UpdatePolicy(newPolicy, chanPoint)
if err != nil {
t.Fatal(err)
}
// Test updating all channels, which comes down to the same as testing a
// specific channel because there is only one channel.
err = manager.UpdatePolicy(newPolicy)
if err != nil {
t.Fatal(err)
}
// If no max htlc is specified, the max htlc value should be kept
// unchanged.
currentPolicy.MaxHTLC = newPolicy.MaxHTLC
// Policy with no max htlc value.
MaxHTLCPolicy := currentPolicy
MaxHTLCPolicy.MaxHTLC = newPolicy.MaxHTLC
noMaxHtlcPolicy := newPolicy
noMaxHtlcPolicy.MaxHTLC = 0
err = manager.UpdatePolicy(noMaxHtlcPolicy)
if err != nil {
t.Fatal(err)
tests := []struct {
name string
currentPolicy channeldb.ChannelEdgePolicy
newPolicy routing.ChannelPolicy
channelSet []channel
specifiedChanPoints []wire.OutPoint
expectedNumUpdates int
expectedUpdateFailures []lnrpc.UpdateFailure
expectErr error
}{
{
name: "valid channel",
currentPolicy: currentPolicy,
newPolicy: newPolicy,
channelSet: []channel{
{
edgeInfo: &channeldb.ChannelEdgeInfo{
Capacity: chanCap,
ChannelPoint: chanPointValid,
},
},
},
specifiedChanPoints: []wire.OutPoint{chanPointValid},
expectedNumUpdates: 1,
expectedUpdateFailures: []lnrpc.UpdateFailure{},
expectErr: nil,
},
{
name: "all channels",
currentPolicy: currentPolicy,
newPolicy: newPolicy,
channelSet: []channel{
{
edgeInfo: &channeldb.ChannelEdgeInfo{
Capacity: chanCap,
ChannelPoint: chanPointValid,
},
},
},
specifiedChanPoints: []wire.OutPoint{},
expectedNumUpdates: 1,
expectedUpdateFailures: []lnrpc.UpdateFailure{},
expectErr: nil,
},
{
name: "missing channel",
currentPolicy: currentPolicy,
newPolicy: newPolicy,
channelSet: []channel{
{
edgeInfo: &channeldb.ChannelEdgeInfo{
Capacity: chanCap,
ChannelPoint: chanPointValid,
},
},
},
specifiedChanPoints: []wire.OutPoint{chanPointMissing},
expectedNumUpdates: 0,
expectedUpdateFailures: []lnrpc.UpdateFailure{
lnrpc.UpdateFailure_UPDATE_FAILURE_NOT_FOUND,
},
expectErr: nil,
},
{
// Here, no max htlc is specified, the max htlc value
// should be kept unchanged.
name: "no max htlc specified",
currentPolicy: MaxHTLCPolicy,
newPolicy: noMaxHtlcPolicy,
channelSet: []channel{
{
edgeInfo: &channeldb.ChannelEdgeInfo{
Capacity: chanCap,
ChannelPoint: chanPointValid,
},
},
},
specifiedChanPoints: []wire.OutPoint{chanPointValid},
expectedNumUpdates: 1,
expectedUpdateFailures: []lnrpc.UpdateFailure{},
expectErr: nil,
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
currentPolicy = test.currentPolicy
channelSet = test.channelSet
expectedNumUpdates = test.expectedNumUpdates
failedUpdates, err := manager.UpdatePolicy(test.newPolicy,
test.specifiedChanPoints...)
if len(failedUpdates) != len(test.expectedUpdateFailures) {
t.Fatalf("wrong number of failed policy updates")
}
if len(test.expectedUpdateFailures) > 0 {
if failedUpdates[0].Reason != test.expectedUpdateFailures[0] {
t.Fatalf("wrong expected policy update failure")
}
}
require.Equal(t, test.expectErr, err)
})
}
}

View File

@ -6312,12 +6312,15 @@ func (r *rpcServer) UpdateChannelPolicy(ctx context.Context,
// With the scope resolved, we'll now send this to the local channel
// manager so it can propagate the new policy for our target channel(s).
err := r.server.localChanMgr.UpdatePolicy(chanPolicy, targetChans...)
failedUpdates, err := r.server.localChanMgr.UpdatePolicy(chanPolicy,
targetChans...)
if err != nil {
return nil, err
}
return &lnrpc.PolicyUpdateResponse{}, nil
return &lnrpc.PolicyUpdateResponse{
FailedUpdates: failedUpdates,
}, nil
}
// ForwardingHistory allows the caller to query the htlcswitch for a record of