peer: add handleNewChannelMsg method to handle new channel request

This commit is contained in:
yyforyongyu 2023-03-16 10:04:53 +08:00
parent 60627f676f
commit 22d2819489
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868

View File

@ -1661,6 +1661,28 @@ func (p *Brontide) handleCustomMessage(msg *lnwire.Custom) error {
return p.cfg.HandleCustomMessage(p.PubKey(), msg)
}
// isLoadedFromDisk returns true if the provided channel ID is loaded from
// disk.
//
// NOTE: only returns true for pending channels.
func (p *Brontide) isLoadedFromDisk(chanID lnwire.ChannelID) bool {
// If this is a newly added channel, no need to reestablish.
_, added := p.addedChannels.Load(chanID)
if added {
return false
}
// Return false if the channel is unknown.
channel, ok := p.activeChannels.Load(chanID)
if !ok {
return false
}
// During startup, we will use a nil value to mark a pending channel
// that's loaded from disk.
return channel == nil
}
// isActiveChannel returns true if the provided channel id is active, otherwise
// returns false.
func (p *Brontide) isActiveChannel(chanID lnwire.ChannelID) bool {
@ -2371,125 +2393,7 @@ out:
// funding workflow. We'll initialize the necessary local
// state, and notify the htlc switch of a new link.
case newChanReq := <-p.newChannels:
newChan := newChanReq.channel
chanPoint := &newChan.FundingOutpoint
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
// Only update RemoteNextRevocation if the channel is in the
// activeChannels map and if we added the link to the switch.
// Only active channels will be added to the switch.
currentChan, ok := p.activeChannels.Load(chanID)
if ok && currentChan != nil {
p.log.Infof("Already have ChannelPoint(%v), "+
"ignoring.", chanPoint)
close(newChanReq.err)
// If we're being sent a new channel, and our
// existing channel doesn't have the next
// revocation, then we need to update the
// current existing channel.
if currentChan.RemoteNextRevocation() != nil {
continue
}
p.log.Infof("Processing retransmitted "+
"ChannelReady for ChannelPoint(%v)",
chanPoint)
nextRevoke := newChan.RemoteNextRevocation
err := currentChan.InitNextRevocation(nextRevoke)
if err != nil {
p.log.Errorf("unable to init chan "+
"revocation: %v", err)
continue
}
continue
}
// If not already active, we'll add this channel to the
// set of active channels, so we can look it up later
// easily according to its channel ID.
lnChan, err := lnwallet.NewLightningChannel(
p.cfg.Signer, newChan, p.cfg.SigPool,
)
if err != nil {
err := fmt.Errorf("unable to create "+
"LightningChannel: %v", err)
p.log.Errorf(err.Error())
newChanReq.err <- err
continue
}
// This refreshes the activeChannels entry if the link was not in
// the switch, also populates for new entries.
p.activeChannels.Store(chanID, lnChan)
p.addedChannels.Store(chanID, struct{}{})
p.log.Infof("New channel active ChannelPoint(%v) "+
"with peer", chanPoint)
// Next, we'll assemble a ChannelLink along with the
// necessary items it needs to function.
//
// TODO(roasbeef): panic on below?
chainEvents, err := p.cfg.ChainArb.SubscribeChannelEvents(
*chanPoint,
)
if err != nil {
err := fmt.Errorf("unable to subscribe to "+
"chain events: %v", err)
p.log.Errorf(err.Error())
newChanReq.err <- err
continue
}
// We'll query the localChanCfg of the new channel to determine the
// minimum HTLC value that can be forwarded. For the maximum HTLC
// value that can be forwarded and fees we'll use the default
// values, as they currently are always set to the default values
// at initial channel creation. Note that the maximum HTLC value
// defaults to the cap on the total value of outstanding HTLCs.
//
// TODO(guggero): We should instead pass in the current
// forwarding policy from the funding manager to avoid
// needing us to update the link once the channel is
// announced to the network with custom user values.
fwdMinHtlc := lnChan.FwdMinHtlc()
defaultPolicy := p.cfg.RoutingPolicy
forwardingPolicy := &htlcswitch.ForwardingPolicy{
MinHTLCOut: fwdMinHtlc,
MaxHTLC: newChan.LocalChanCfg.MaxPendingAmount,
BaseFee: defaultPolicy.BaseFee,
FeeRate: defaultPolicy.FeeRate,
TimeLockDelta: defaultPolicy.TimeLockDelta,
}
// If we've reached this point, there are two possible scenarios.
// If the channel was in the active channels map as nil, then it
// was loaded from disk and we need to send reestablish. Else,
// it was not loaded from disk and we don't need to send
// reestablish as this is a fresh channel.
shouldReestablish := ok
// Create the link and add it to the switch.
err = p.addLink(
chanPoint, lnChan, forwardingPolicy,
chainEvents, shouldReestablish,
)
if err != nil {
err := fmt.Errorf("can't register new channel "+
"link(%v) with peer", chanPoint)
p.log.Errorf(err.Error())
newChanReq.err <- err
continue
}
close(newChanReq.err)
p.handleNewActiveChannel(newChanReq)
// We've just received a local request to close an active
// channel. It will either kick of a cooperative channel
@ -3754,3 +3658,145 @@ func (p *Brontide) attachChannelEventSubscription() error {
return nil
}
// updateNextRevocation updates the existing channel's next revocation if it's
// nil.
func (p *Brontide) updateNextRevocation(c *channeldb.OpenChannel) {
chanPoint := &c.FundingOutpoint
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
// Read the current channel.
currentChan, loaded := p.activeChannels.Load(chanID)
// currentChan should exist, but we perform a check anyway to avoid nil
// pointer dereference.
if !loaded {
p.log.Errorf("missing active channel with chanID=%v", chanID)
return
}
// currentChan should not be nil, but we perform a check anyway to
// avoid nil pointer dereference.
if currentChan == nil {
p.log.Errorf("found nil active channel with chanID=%v", chanID)
return
}
// If we're being sent a new channel, and our existing channel doesn't
// have the next revocation, then we need to update the current
// existing channel.
if currentChan.RemoteNextRevocation() != nil {
return
}
p.log.Infof("Processing retransmitted ChannelReady for "+
"ChannelPoint(%v)", chanPoint)
nextRevoke := c.RemoteNextRevocation
err := currentChan.InitNextRevocation(nextRevoke)
if err != nil {
p.log.Errorf("unable to init chan revocation: %v", err)
}
}
// addActiveChannel adds a new active channel to the `activeChannels` map. It
// takes a `channeldb.OpenChannel`, creates a `lnwallet.LightningChannel` from
// it and assembles it with a channel link.
func (p *Brontide) addActiveChannel(c *channeldb.OpenChannel) error {
chanPoint := &c.FundingOutpoint
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
// If not already active, we'll add this channel to the set of active
// channels, so we can look it up later easily according to its channel
// ID.
lnChan, err := lnwallet.NewLightningChannel(
p.cfg.Signer, c, p.cfg.SigPool,
)
if err != nil {
return fmt.Errorf("unable to create LightningChannel: %w", err)
}
// If we've reached this point, there are two possible scenarios. If
// the channel was in the active channels map as nil, then it was
// loaded from disk and we need to send reestablish. Else, it was not
// loaded from disk and we don't need to send reestablish as this is a
// fresh channel.
shouldReestablish := p.isLoadedFromDisk(chanID)
// Store the channel in the activeChannels map.
p.activeChannels.Store(chanID, lnChan)
p.addedChannels.Store(chanID, struct{}{})
p.log.Infof("New channel active ChannelPoint(%v) with peer", chanPoint)
// Next, we'll assemble a ChannelLink along with the necessary items it
// needs to function.
chainEvents, err := p.cfg.ChainArb.SubscribeChannelEvents(*chanPoint)
if err != nil {
return fmt.Errorf("unable to subscribe to chain events: %w",
err)
}
// We'll query the localChanCfg of the new channel to determine the
// minimum HTLC value that can be forwarded. For the maximum HTLC value
// that can be forwarded and fees we'll use the default values, as they
// currently are always set to the default values at initial channel
// creation. Note that the maximum HTLC value defaults to the cap on
// the total value of outstanding HTLCs.
fwdMinHtlc := lnChan.FwdMinHtlc()
defaultPolicy := p.cfg.RoutingPolicy
forwardingPolicy := &htlcswitch.ForwardingPolicy{
MinHTLCOut: fwdMinHtlc,
MaxHTLC: c.LocalChanCfg.MaxPendingAmount,
BaseFee: defaultPolicy.BaseFee,
FeeRate: defaultPolicy.FeeRate,
TimeLockDelta: defaultPolicy.TimeLockDelta,
}
// Create the link and add it to the switch.
err = p.addLink(
chanPoint, lnChan, forwardingPolicy,
chainEvents, shouldReestablish,
)
if err != nil {
return fmt.Errorf("can't register new channel link(%v) with "+
"peer", chanPoint)
}
return nil
}
// handleNewActiveChannel handles a `newChannelMsg` request. Depending on we
// know this channel ID or not, we'll either add it to the `activeChannels` map
// or init the next revocation for it.
func (p *Brontide) handleNewActiveChannel(req *newChannelMsg) {
newChan := req.channel
chanPoint := &newChan.FundingOutpoint
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
// Only update RemoteNextRevocation if the channel is in the
// activeChannels map and if we added the link to the switch. Only
// active channels will be added to the switch.
if p.isActiveChannel(chanID) {
p.log.Infof("Already have ChannelPoint(%v), ignoring",
chanPoint)
// Handle it and close the err chan on the request.
close(req.err)
p.updateNextRevocation(newChan)
return
}
// This is a new channel, we now add it to the map.
if err := p.addActiveChannel(req.channel); err != nil {
// Log and send back the error to the request.
p.log.Errorf(err.Error())
req.err <- err
return
}
// Close the err chan if everything went fine.
close(req.err)
}