diff --git a/routing/localchans/manager.go b/routing/localchans/manager.go index 25df1002a..5b35b346b 100644 --- a/routing/localchans/manager.go +++ b/routing/localchans/manager.go @@ -39,7 +39,7 @@ type Manager struct { edgesToUpdate []discovery.EdgeWithInfo) error // ForAllOutgoingChannels is required to iterate over all our local - // channels. + // channels. The ChannelEdgePolicy parameter may be nil. ForAllOutgoingChannels func(cb func(kvdb.RTx, *models.ChannelEdgeInfo, *models.ChannelEdgePolicy) error) error @@ -82,6 +82,7 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy, var edgesToUpdate []discovery.EdgeWithInfo policiesToUpdate := make(map[wire.OutPoint]models.ForwardingPolicy) + // NOTE: edge may be nil when this function is called. processChan := func( tx kvdb.RTx, info *models.ChannelEdgeInfo, @@ -99,7 +100,9 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy, delete(unprocessedChans, info.ChannelPoint) // Apply the new policy to the edge. - err := r.updateEdge(tx, info.ChannelPoint, edge, newSchema) + edge, err := r.updateEdge( + tx, info.ChannelPoint, edge, newSchema, + ) if err != nil { failedUpdates = append(failedUpdates, makeFailureItem(info.ChannelPoint, @@ -306,10 +309,26 @@ func (r *Manager) createEdge(channel *channeldb.OpenChannel, return info, edge, nil } -// updateEdge updates the given edge with the new schema. +// updateEdge updates the given edge with the new schema. The edge parameter may +// be nil, in that case a new channel policy is returned. In other cases the +// passed in channel policy is returned after modification. func (r *Manager) updateEdge(tx kvdb.RTx, chanPoint wire.OutPoint, edge *models.ChannelEdgePolicy, - newSchema routing.ChannelPolicy) error { + newSchema routing.ChannelPolicy) (*models.ChannelEdgePolicy, error) { + + channel, err := r.FetchChannel(tx, chanPoint) + if err != nil { + return nil, err + } + + // If due to some unforeseen circumstances the policy doesn't exist, + // recreate it here. + if edge == nil { + _, edge, err = r.createEdge(channel, time.Now()) + if err != nil { + return nil, err + } + } // Update forwarding fee scheme and required time lock delta. edge.FeeBaseMSat = newSchema.BaseFee @@ -318,7 +337,7 @@ func (r *Manager) updateEdge(tx kvdb.RTx, chanPoint wire.OutPoint, ) // If inbound fees are set, we update the edge with them. - err := fn.MapOptionZ(newSchema.InboundFee, + err = fn.MapOptionZ(newSchema.InboundFee, func(f models.InboundFee) error { inboundWireFee := f.ToWire() return edge.ExtraOpaqueData.PackRecords( @@ -326,15 +345,15 @@ func (r *Manager) updateEdge(tx kvdb.RTx, chanPoint wire.OutPoint, ) }) if err != nil { - return err + return nil, err } edge.TimeLockDelta = uint16(newSchema.TimeLockDelta) // Retrieve negotiated channel htlc amt limits. - amtMin, amtMax, err := r.getHtlcAmtLimits(tx, chanPoint) + amtMin, amtMax, err := r.getHtlcAmtLimits(channel) if err != nil { - return err + return nil, err } // We now update the edge max htlc value. @@ -367,19 +386,19 @@ func (r *Manager) updateEdge(tx kvdb.RTx, chanPoint wire.OutPoint, // Validate htlc amount constraints. switch { case edge.MinHTLC < amtMin: - return fmt.Errorf( + return nil, fmt.Errorf( "min htlc amount of %v is below min htlc parameter of %v", edge.MinHTLC, amtMin, ) case edge.MaxHTLC > amtMax: - return fmt.Errorf( + return nil, fmt.Errorf( "max htlc size of %v is above max pending amount of %v", edge.MaxHTLC, amtMax, ) case edge.MinHTLC > edge.MaxHTLC: - return fmt.Errorf( + return nil, fmt.Errorf( "min_htlc %v greater than max_htlc %v", edge.MinHTLC, edge.MaxHTLC, ) @@ -388,19 +407,14 @@ func (r *Manager) updateEdge(tx kvdb.RTx, chanPoint wire.OutPoint, // Clear signature to help prevent usage of the previous signature. edge.SetSigBytes(nil) - return nil + return edge, nil } // getHtlcAmtLimits retrieves the negotiated channel min and max htlc amount // constraints. -func (r *Manager) getHtlcAmtLimits(tx kvdb.RTx, chanPoint wire.OutPoint) ( +func (r *Manager) getHtlcAmtLimits(ch *channeldb.OpenChannel) ( lnwire.MilliSatoshi, lnwire.MilliSatoshi, error) { - ch, err := r.FetchChannel(tx, chanPoint) - if err != nil { - return 0, 0, err - } - // The max htlc policy field must be less than or equal to the channel // capacity AND less than or equal to the max in-flight HTLC value. // Since the latter is always less than or equal to the former, just diff --git a/server.go b/server.go index aaf3c0dfe..d825204e3 100644 --- a/server.go +++ b/server.go @@ -1098,11 +1098,25 @@ func newServer(cfg *Config, listenAddrs []net.Addr, ScidCloser: scidCloserMan, }, nodeKeyDesc) + selfVertex := route.Vertex(nodeKeyDesc.PubKey.SerializeCompressed()) //nolint:lll s.localChanMgr = &localchans.Manager{ - SelfPub: nodeKeyDesc.PubKey, - DefaultRoutingPolicy: cc.RoutingPolicy, - ForAllOutgoingChannels: s.graphBuilder.ForAllOutgoingChannels, + SelfPub: nodeKeyDesc.PubKey, + DefaultRoutingPolicy: cc.RoutingPolicy, + ForAllOutgoingChannels: func(cb func(kvdb.RTx, + *models.ChannelEdgeInfo, *models.ChannelEdgePolicy) error) error { + + return s.graphDB.ForEachNodeChannel(selfVertex, + func(tx kvdb.RTx, c *models.ChannelEdgeInfo, + e *models.ChannelEdgePolicy, + _ *models.ChannelEdgePolicy) error { + + // NOTE: The invoked callback here may + // receive a nil channel policy. + return cb(tx, c, e) + }, + ) + }, PropagateChanPolicyUpdate: s.authGossiper.PropagateChanPolicyUpdate, UpdateForwardingPolicies: s.htlcSwitch.UpdateForwardingPolicies, FetchChannel: s.chanStateDB.FetchChannel,