diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 5a42f155a..6fdda2865 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -86,14 +86,12 @@ type networkMsg struct { } // chanPolicyUpdateRequest is a request that is sent to the server when a caller -// wishes to update the channel policy (fees e.g.) for a particular set of -// channels. New ChannelUpdate messages will be crafted to be sent out during -// the next broadcast epoch and the fee updates committed to the lower layer. +// wishes to update a particular set of channels. New ChannelUpdate messages +// will be crafted to be sent out during the next broadcast epoch and the fee +// updates committed to the lower layer. type chanPolicyUpdateRequest struct { - targetChans []wire.OutPoint - newSchema routing.ChannelPolicy - - chanPolicies chan updatedChanPolicies + edgesToUpdate []EdgeWithInfo + errChan chan error } // Config defines the configuration for the service. ALL elements within the @@ -361,31 +359,36 @@ type updatedChanPolicies struct { err error } -// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to update the -// channel forwarding policies for the specified channels. If no channels are -// specified, then the update will be applied to all outgoing channels from the -// source node. Policy updates are done in two stages: first, the +// EdgeWithInfo contains the information that is required to update an edge. +type EdgeWithInfo struct { + // Info describes the channel. + Info *channeldb.ChannelEdgeInfo + + // Edge describes the policy in one direction of the channel. + Edge *channeldb.ChannelEdgePolicy +} + +// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to perform the +// specified edge updates. Updates are done in two stages: first, the // AuthenticatedGossiper ensures the update has been committed by dependent // sub-systems, then it signs and broadcasts new updates to the network. A // mapping between outpoints and updated channel policies is returned, which is // used to update the forwarding policies of the underlying links. func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate( - newSchema routing.ChannelPolicy, chanPoints ...wire.OutPoint) ( - map[wire.OutPoint]*channeldb.ChannelEdgePolicy, error) { + edgesToUpdate []EdgeWithInfo) error { - chanPolicyChan := make(chan updatedChanPolicies, 1) + errChan := make(chan error, 1) policyUpdate := &chanPolicyUpdateRequest{ - targetChans: chanPoints, - newSchema: newSchema, - chanPolicies: chanPolicyChan, + edgesToUpdate: edgesToUpdate, + errChan: errChan, } select { case d.chanPolicyUpdates <- policyUpdate: - updatedPolicies := <-chanPolicyChan - return updatedPolicies.chanPolicies, updatedPolicies.err + err := <-errChan + return err case <-d.quit: - return nil, fmt.Errorf("AuthenticatedGossiper shutting down") + return fmt.Errorf("AuthenticatedGossiper shutting down") } } @@ -922,14 +925,10 @@ func (d *AuthenticatedGossiper) networkHandler() { // First, we'll now create new fully signed updates for // the affected channels and also update the underlying // graph with the new state. - chanPolicies, newChanUpdates, err := d.processChanPolicyUpdate( - policyUpdate, + newChanUpdates, err := d.processChanPolicyUpdate( + policyUpdate.edgesToUpdate, ) - update := updatedChanPolicies{ - chanPolicies, - err, - } - policyUpdate.chanPolicies <- update + policyUpdate.errChan <- err if err != nil { log.Errorf("Unable to craft policy updates: %v", err) @@ -1317,102 +1316,29 @@ func (d *AuthenticatedGossiper) retransmitStaleAnns(now time.Time) error { return nil } -// processChanPolicyUpdate generates a new set of channel updates with the new -// channel policy applied for each specified channel identified by its channel -// point. In the case that no channel points are specified, then the update -// will be applied to all channels. Finally, the backing ChannelGraphSource is -// updated with the latest information reflecting the applied updates. -// -// TODO(roasbeef): generalize into generic for any channel update +// processChanPolicyUpdate generates a new set of channel updates for the +// provided list of edges and updates the backing ChannelGraphSource. func (d *AuthenticatedGossiper) processChanPolicyUpdate( - policyUpdate *chanPolicyUpdateRequest) ( - map[wire.OutPoint]*channeldb.ChannelEdgePolicy, []networkMsg, error) { + edgesToUpdate []EdgeWithInfo) ([]networkMsg, error) { - // First, we'll construct a set of all the channels that need to be - // updated. - chansToUpdate := make(map[wire.OutPoint]struct{}) - for _, chanPoint := range policyUpdate.targetChans { - chansToUpdate[chanPoint] = struct{}{} - } - - // Next, we'll create a mapping from outpoint to edge policy that will - // be used by each edge's underlying link to update its policy. - chanPolicies := make(map[wire.OutPoint]*channeldb.ChannelEdgePolicy) - - haveChanFilter := len(chansToUpdate) != 0 - if haveChanFilter { - log.Infof("Updating routing policies for chan_points=%v", - spew.Sdump(chansToUpdate)) - } else { - log.Infof("Updating routing policies for all chans") - } - - type edgeWithInfo struct { - info *channeldb.ChannelEdgeInfo - edge *channeldb.ChannelEdgePolicy - } - var edgesToUpdate []edgeWithInfo - - // Next, we'll loop over all the outgoing channels the router knows of. - // If we have a filter then we'll only collected those channels, - // otherwise we'll collect them all. - err := d.cfg.Router.ForAllOutgoingChannels(func( - info *channeldb.ChannelEdgeInfo, - edge *channeldb.ChannelEdgePolicy) error { - - // If we have a channel filter, and this channel isn't a part - // of it, then we'll skip it. - if _, ok := chansToUpdate[info.ChannelPoint]; !ok && haveChanFilter { - return nil - } - - // Now that we know we should update this channel, we'll update - // its set of policies. - edge.FeeBaseMSat = policyUpdate.newSchema.BaseFee - edge.FeeProportionalMillionths = lnwire.MilliSatoshi( - policyUpdate.newSchema.FeeRate, - ) - edge.TimeLockDelta = uint16(policyUpdate.newSchema.TimeLockDelta) - - // Max htlc is currently always set to the channel capacity. - edge.MessageFlags |= lnwire.ChanUpdateOptionMaxHtlc - edge.MaxHTLC = lnwire.NewMSatFromSatoshis(info.Capacity) - - edgesToUpdate = append(edgesToUpdate, edgeWithInfo{ - info: info, - edge: edge, - }) - - return nil - }) - if err != nil { - return nil, nil, err - } - - // With the set of edges we need to update retrieved, we'll now re-sign - // them, and insert them into the database. var chanUpdates []networkMsg for _, edgeInfo := range edgesToUpdate { // Now that we've collected all the channels we need to update, - // we'll Re-sign and update the backing ChannelGraphSource, and + // we'll re-sign and update the backing ChannelGraphSource, and // retrieve our ChannelUpdate to broadcast. _, chanUpdate, err := d.updateChannel( - edgeInfo.info, edgeInfo.edge, + edgeInfo.Info, edgeInfo.Edge, ) if err != nil { - return nil, nil, err + return nil, err } - // Since the update succeeded, add the edge to our policy - // mapping. - chanPolicies[edgeInfo.info.ChannelPoint] = edgeInfo.edge - // We'll avoid broadcasting any updates for private channels to // avoid directly giving away their existence. Instead, we'll // send the update directly to the remote party. - if edgeInfo.info.AuthProof == nil { + if edgeInfo.Info.AuthProof == nil { remotePubKey := remotePubFromChanInfo( - edgeInfo.info, chanUpdate.ChannelFlags, + edgeInfo.Info, chanUpdate.ChannelFlags, ) err := d.reliableSender.sendMessage( chanUpdate, remotePubKey, @@ -1435,7 +1361,7 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate( }) } - return chanPolicies, chanUpdates, nil + return chanUpdates, nil } // processRejectedEdge examines a rejected edge to see if we can extract any diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 40d8148e4..4252397c1 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -3602,20 +3602,26 @@ out: // Now that all of our channels are loaded, we'll attempt to update the // policy of all of them. const newTimeLockDelta = 100 - newPolicy := routing.ChannelPolicy{ - TimeLockDelta: newTimeLockDelta, - } - newChanPolicies, err := ctx.gossiper.PropagateChanPolicyUpdate(newPolicy) + var edgesToUpdate []EdgeWithInfo + err = ctx.router.ForAllOutgoingChannels(func( + info *channeldb.ChannelEdgeInfo, + edge *channeldb.ChannelEdgePolicy) error { + + edge.TimeLockDelta = uint16(newTimeLockDelta) + edgesToUpdate = append(edgesToUpdate, EdgeWithInfo{ + Info: info, + Edge: edge, + }) + + return nil + }) if err != nil { - t.Fatalf("unable to chan policies: %v", err) + t.Fatal(err) } - // Ensure that the updated channel policies are as expected. - for _, dbPolicy := range newChanPolicies { - if dbPolicy.TimeLockDelta != uint16(newPolicy.TimeLockDelta) { - t.Fatalf("wrong delta: expected %v, got %v", - newPolicy.TimeLockDelta, dbPolicy.TimeLockDelta) - } + err = ctx.gossiper.PropagateChanPolicyUpdate(edgesToUpdate) + if err != nil { + t.Fatalf("unable to chan policies: %v", err) } // Two channel updates should now be broadcast, with neither of them diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 0364b0c40..195eee9ae 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -431,7 +431,7 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, paymentID uint64, // forwarding policies for all links have been updated, or the switch shuts // down. func (s *Switch) UpdateForwardingPolicies( - chanPolicies map[wire.OutPoint]*channeldb.ChannelEdgePolicy) { + chanPolicies map[wire.OutPoint]ForwardingPolicy) { log.Tracef("Updating link policies: %v", newLogClosure(func() string { return spew.Sdump(chanPolicies) @@ -440,7 +440,7 @@ func (s *Switch) UpdateForwardingPolicies( s.indexMtx.RLock() // Update each link in chanPolicies. - for targetLink := range chanPolicies { + for targetLink, policy := range chanPolicies { cid := lnwire.NewChanIDFromOutPoint(&targetLink) link, ok := s.linkIndex[cid] @@ -450,28 +450,12 @@ func (s *Switch) UpdateForwardingPolicies( continue } - newPolicy := dbPolicyToFwdingPolicy( - chanPolicies[*link.ChannelPoint()], - ) - link.UpdateForwardingPolicy(newPolicy) + link.UpdateForwardingPolicy(policy) } s.indexMtx.RUnlock() } -// dbPolicyToFwdingPolicy is a helper function that converts a channeldb -// ChannelEdgePolicy into a ForwardingPolicy struct for the purpose of updating -// the forwarding policy of a link. -func dbPolicyToFwdingPolicy(policy *channeldb.ChannelEdgePolicy) ForwardingPolicy { - return ForwardingPolicy{ - BaseFee: policy.FeeBaseMSat, - FeeRate: policy.FeeProportionalMillionths, - TimeLockDelta: uint32(policy.TimeLockDelta), - MinHTLC: policy.MinHTLC, - MaxHTLC: policy.MaxHTLC, - } -} - // forward is used in order to find next channel link and apply htlc update. // Also this function is used by channel links itself in order to forward the // update after it has been included in the channel. diff --git a/routing/localchans/manager.go b/routing/localchans/manager.go new file mode 100644 index 000000000..ea71df64b --- /dev/null +++ b/routing/localchans/manager.go @@ -0,0 +1,145 @@ +package localchans + +import ( + "fmt" + "sync" + + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/discovery" + "github.com/lightningnetwork/lnd/htlcswitch" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing" +) + +// Manager manages the node's local channels. The only operation that is +// currently implemented is updating forwarding policies. +type Manager struct { + // UpdateForwardingPolicies is used by the manager to update active + // links with a new policy. + UpdateForwardingPolicies func( + chanPolicies map[wire.OutPoint]htlcswitch.ForwardingPolicy) + + // PropagateChanPolicyUpdate is called to persist a new policy to disk + // and broadcast it to the network. + PropagateChanPolicyUpdate func( + edgesToUpdate []discovery.EdgeWithInfo) error + + // ForAllOutgoingChannels is required to iterate over all our local + // channels. + ForAllOutgoingChannels func(cb func(*channeldb.ChannelEdgeInfo, + *channeldb.ChannelEdgePolicy) error) error + + // policyUpdateLock ensures that the database and the link do not fall + // out of sync if there are concurrent fee update calls. Without it, + // there is a chance that policy A updates the database, then policy B + // updates the database, then policy B updates the link, then policy A + // updates the link. + policyUpdateLock sync.Mutex +} + +// UpdatePolicy updates the policy for the specified channels on disk and in the +// active links. +func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy, + chanPoints ...wire.OutPoint) error { + + r.policyUpdateLock.Lock() + defer r.policyUpdateLock.Unlock() + + // First, we'll construct a set of all the channels that need to be + // updated. + chansToUpdate := make(map[wire.OutPoint]struct{}) + for _, chanPoint := range chanPoints { + chansToUpdate[chanPoint] = struct{}{} + } + + haveChanFilter := len(chansToUpdate) != 0 + + var edgesToUpdate []discovery.EdgeWithInfo + policiesToUpdate := make(map[wire.OutPoint]htlcswitch.ForwardingPolicy) + + // Next, we'll loop over all the outgoing channels the router knows of. + // If we have a filter then we'll only collected those channels, + // otherwise we'll collect them all. + err := r.ForAllOutgoingChannels(func( + info *channeldb.ChannelEdgeInfo, + edge *channeldb.ChannelEdgePolicy) error { + + // If we have a channel filter, and this channel isn't a part + // of it, then we'll skip it. + _, ok := chansToUpdate[info.ChannelPoint] + if !ok && haveChanFilter { + return nil + } + + // Apply the new policy to the edge. + err := r.updateEdge(info.Capacity, edge, newSchema) + if err != nil { + return nil + } + + // Add updated edge to list of edges to send to gossiper. + edgesToUpdate = append(edgesToUpdate, discovery.EdgeWithInfo{ + Info: info, + Edge: edge, + }) + + // Add updated policy to list of policies to send to switch. + policiesToUpdate[info.ChannelPoint] = htlcswitch.ForwardingPolicy{ + BaseFee: edge.FeeBaseMSat, + FeeRate: edge.FeeProportionalMillionths, + TimeLockDelta: uint32(edge.TimeLockDelta), + MinHTLC: edge.MinHTLC, + MaxHTLC: edge.MaxHTLC, + } + + return nil + }) + if err != nil { + return err + } + + // Commit the policy updates to disk and broadcast to the network. We + // validated the new policy above, so we expect no validation errors. If + // this would happen because of a bug, the link policy will be + // desynchronized. It is currently not possible to atomically commit + // multiple edge updates. + err = r.PropagateChanPolicyUpdate(edgesToUpdate) + if err != nil { + return err + } + + // Update active links. + r.UpdateForwardingPolicies(policiesToUpdate) + + return nil +} + +// updateEdge updates the given edge with the new schema. +func (r *Manager) updateEdge(capacity btcutil.Amount, + edge *channeldb.ChannelEdgePolicy, + newSchema routing.ChannelPolicy) error { + + // Update forwarding fee scheme and required time lock delta. + edge.FeeBaseMSat = newSchema.BaseFee + edge.FeeProportionalMillionths = lnwire.MilliSatoshi( + newSchema.FeeRate, + ) + edge.TimeLockDelta = uint16(newSchema.TimeLockDelta) + + // Max htlc is currently always set to the channel capacity. + edge.MessageFlags |= lnwire.ChanUpdateOptionMaxHtlc + edge.MaxHTLC = lnwire.NewMSatFromSatoshis(capacity) + + // Validate htlc amount constraints. + if edge.MinHTLC > edge.MaxHTLC { + return fmt.Errorf("min_htlc %v greater than max_htlc %v", + edge.MinHTLC, edge.MaxHTLC) + } + + // Clear signature to help prevent usage of the previous signature. + edge.SetSigBytes(nil) + + return nil +} diff --git a/routing/localchans/manager_test.go b/routing/localchans/manager_test.go new file mode 100644 index 000000000..ac10c369d --- /dev/null +++ b/routing/localchans/manager_test.go @@ -0,0 +1,111 @@ +package localchans + +import ( + "testing" + + "github.com/lightningnetwork/lnd/lnwire" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcutil" + + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/discovery" + "github.com/lightningnetwork/lnd/htlcswitch" + "github.com/lightningnetwork/lnd/routing" +) + +// TestManager tests that the local channel manager properly propagates fee +// updates to gossiper and links. +func TestManager(t *testing.T) { + chanPoint := wire.OutPoint{Hash: chainhash.Hash{1}, Index: 2} + chanCap := btcutil.Amount(1000) + + newPolicy := routing.ChannelPolicy{ + FeeSchema: routing.FeeSchema{ + BaseFee: 100, + FeeRate: 200, + }, + TimeLockDelta: 80, + } + + updateForwardingPolicies := func( + chanPolicies map[wire.OutPoint]htlcswitch.ForwardingPolicy) { + + if len(chanPolicies) != 1 { + t.Fatal("unexpected number of policies to apply") + } + + policy := chanPolicies[chanPoint] + if policy.TimeLockDelta != newPolicy.TimeLockDelta { + t.Fatal("unexpected time lock delta") + } + if policy.BaseFee != newPolicy.BaseFee { + t.Fatal("unexpected base fee") + } + if uint32(policy.FeeRate) != newPolicy.FeeRate { + t.Fatal("unexpected base fee") + } + if policy.MaxHTLC != lnwire.NewMSatFromSatoshis(chanCap) { + t.Fatal("unexpected max htlc") + } + } + + propagateChanPolicyUpdate := func( + edgesToUpdate []discovery.EdgeWithInfo) error { + + if len(edgesToUpdate) != 1 { + t.Fatal("unexpected number of edges to update") + } + + policy := edgesToUpdate[0].Edge + if !policy.MessageFlags.HasMaxHtlc() { + t.Fatal("expected max htlc flag") + } + if policy.TimeLockDelta != uint16(newPolicy.TimeLockDelta) { + t.Fatal("unexpected time lock delta") + } + if policy.FeeBaseMSat != newPolicy.BaseFee { + t.Fatal("unexpected base fee") + } + if uint32(policy.FeeProportionalMillionths) != newPolicy.FeeRate { + t.Fatal("unexpected base fee") + } + if policy.MaxHTLC != lnwire.NewMSatFromSatoshis(chanCap) { + t.Fatal("unexpected max htlc") + } + + return nil + } + + forAllOutgoingChannels := func(cb func(*channeldb.ChannelEdgeInfo, + *channeldb.ChannelEdgePolicy) error) error { + + return cb( + &channeldb.ChannelEdgeInfo{ + Capacity: chanCap, + ChannelPoint: chanPoint, + }, + &channeldb.ChannelEdgePolicy{}, + ) + } + + manager := Manager{ + UpdateForwardingPolicies: updateForwardingPolicies, + PropagateChanPolicyUpdate: propagateChanPolicyUpdate, + ForAllOutgoingChannels: forAllOutgoingChannels, + } + + // Test updating a specific channels. + err := manager.UpdatePolicy(newPolicy, chanPoint) + if err != nil { + t.Fatal(err) + } + + // Test updating all channels, which comes down to the same as testing a + // specific channel because there is only one channel. + err = manager.UpdatePolicy(newPolicy) + if err != nil { + t.Fatal(err) + } +} diff --git a/rpcserver.go b/rpcserver.go index bafa8244c..3c7413dd8 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -4545,12 +4545,6 @@ func (r *rpcServer) FeeReport(ctx context.Context, // 0.000001, or 0.0001%. const minFeeRate = 1e-6 -// policyUpdateLock ensures that the database and the link do not fall out of -// sync if there are concurrent fee update calls. Without it, there is a chance -// that policy A updates the database, then policy B updates the database, then -// policy B updates the link, then policy A updates the link. -var policyUpdateLock sync.Mutex - // UpdateChannelPolicy allows the caller to update the channel forwarding policy // for all channels globally, or a particular channel. func (r *rpcServer) UpdateChannelPolicy(ctx context.Context, @@ -4615,22 +4609,13 @@ func (r *rpcServer) UpdateChannelPolicy(ctx context.Context, req.BaseFeeMsat, req.FeeRate, feeRateFixed, req.TimeLockDelta, spew.Sdump(targetChans)) - // With the scope resolved, we'll now send this to the - // AuthenticatedGossiper so it can propagate the new policy for our - // target channel(s). - policyUpdateLock.Lock() - defer policyUpdateLock.Unlock() - chanPolicies, err := r.server.authGossiper.PropagateChanPolicyUpdate( - chanPolicy, targetChans..., - ) + // With the scope resolved, we'll now send this to the local channel + // manager so it can propagate the new policy for our target channel(s). + err := r.server.localChanMgr.UpdatePolicy(chanPolicy, targetChans...) if err != nil { return nil, err } - // Finally, we'll apply the set of channel policies to the target - // channels' links. - r.server.htlcSwitch.UpdateForwardingPolicies(chanPolicies) - return &lnrpc.PolicyUpdateResponse{}, nil } diff --git a/server.go b/server.go index f90dcd970..cfa2585a3 100644 --- a/server.go +++ b/server.go @@ -48,6 +48,7 @@ import ( "github.com/lightningnetwork/lnd/peernotifier" "github.com/lightningnetwork/lnd/pool" "github.com/lightningnetwork/lnd/routing" + "github.com/lightningnetwork/lnd/routing/localchans" "github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/sweep" "github.com/lightningnetwork/lnd/ticker" @@ -203,6 +204,8 @@ type server struct { authGossiper *discovery.AuthenticatedGossiper + localChanMgr *localchans.Manager + utxoNursery *utxoNursery sweeper *sweep.UtxoSweeper @@ -735,6 +738,12 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, s.identityPriv.PubKey(), ) + s.localChanMgr = &localchans.Manager{ + ForAllOutgoingChannels: s.chanRouter.ForAllOutgoingChannels, + PropagateChanPolicyUpdate: s.authGossiper.PropagateChanPolicyUpdate, + UpdateForwardingPolicies: s.htlcSwitch.UpdateForwardingPolicies, + } + utxnStore, err := newNurseryStore(activeNetParams.GenesisHash, chanDB) if err != nil { srvrLog.Errorf("unable to create nursery store: %v", err)