diff --git a/discovery/gossiper.go b/discovery/gossiper.go index b07500a36..ebf1ca918 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1779,315 +1779,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // that the directional information for an already known channel has // been updated. case *lnwire.ChannelUpdate: - // We'll ignore any channel announcements that target any chain - // other than the set of chains we know of. - if !bytes.Equal(msg.ChainHash[:], d.cfg.ChainHash[:]) { - err := fmt.Errorf("ignoring ChannelUpdate from "+ - "chain=%v, gossiper on chain=%v", msg.ChainHash, - d.cfg.ChainHash) - log.Errorf(err.Error()) - - key := newRejectCacheKey( - msg.ShortChannelID.ToUint64(), - sourceToPub(nMsg.source), - ) - _, _ = d.recentRejects.Put(key, &cachedReject{}) - - nMsg.err <- err - return nil, false - } - - blockHeight := msg.ShortChannelID.BlockHeight - shortChanID := msg.ShortChannelID.ToUint64() - - // If the advertised inclusionary block is beyond our knowledge - // of the chain tip, then we'll put the announcement in limbo - // to be fully verified once we advance forward in the chain. - d.Lock() - if nMsg.isRemote && d.isPremature(msg.ShortChannelID, 0, nMsg) { - log.Warnf("Update announcement for "+ - "short_chan_id(%v), is premature: advertises "+ - "height %v, only height %v is known", - shortChanID, blockHeight, - d.bestHeight) - d.Unlock() - nMsg.err <- nil - return nil, false - } - d.Unlock() - - // Before we perform any of the expensive checks below, we'll - // check whether this update is stale or is for a zombie - // channel in order to quickly reject it. - timestamp := time.Unix(int64(msg.Timestamp), 0) - if d.cfg.Router.IsStaleEdgePolicy( - msg.ShortChannelID, timestamp, msg.ChannelFlags, - ) { - - log.Debugf("Ignored stale edge policy: peer=%v, "+ - "source=%x, msg=%s, is_remote=%v", nMsg.peer, - nMsg.source.SerializeCompressed(), - nMsg.msg.MsgType(), nMsg.isRemote) - - nMsg.err <- nil - return nil, true - } - - // Get the node pub key as far as we don't have it in channel - // update announcement message. We'll need this to properly - // verify message signature. - // - // We make sure to obtain the mutex for this channel ID - // before we access the database. This ensures the state - // we read from the database has not changed between this - // point and when we call UpdateEdge() later. - d.channelMtx.Lock(msg.ShortChannelID.ToUint64()) - defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64()) - chanInfo, edge1, edge2, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) - switch err { - // No error, break. - case nil: - break - - case channeldb.ErrZombieEdge: - err = d.processZombieUpdate(chanInfo, msg) - if err != nil { - log.Debug(err) - nMsg.err <- err - return nil, false - } - - // We'll fallthrough to ensure we stash the update until - // we receive its corresponding ChannelAnnouncement. - // This is needed to ensure the edge exists in the graph - // before applying the update. - fallthrough - case channeldb.ErrGraphNotFound: - fallthrough - case channeldb.ErrGraphNoEdgesFound: - fallthrough - case channeldb.ErrEdgeNotFound: - // If the edge corresponding to this ChannelUpdate was - // not found in the graph, this might be a channel in - // the process of being opened, and we haven't processed - // our own ChannelAnnouncement yet, hence it is not - // found in the graph. This usually gets resolved after - // the channel proofs are exchanged and the channel is - // broadcasted to the rest of the network, but in case - // this is a private channel this won't ever happen. - // This can also happen in the case of a zombie channel - // with a fresh update for which we don't have a - // ChannelAnnouncement for since we reject them. Because - // of this, we temporarily add it to a map, and - // reprocess it after our own ChannelAnnouncement has - // been processed. - earlyMsgs, err := d.prematureChannelUpdates.Get( - shortChanID, - ) - switch { - // Nothing in the cache yet, we can just directly - // insert this element. - case err == cache.ErrElementNotFound: - _, _ = d.prematureChannelUpdates.Put( - shortChanID, &cachedNetworkMsg{ - msgs: []*networkMsg{nMsg}, - }) - - // There's already something in the cache, so we'll - // combine the set of messages into a single value. - default: - msgs := earlyMsgs.(*cachedNetworkMsg).msgs - msgs = append(msgs, nMsg) - _, _ = d.prematureChannelUpdates.Put( - shortChanID, &cachedNetworkMsg{ - msgs: msgs, - }) - } - - log.Debugf("Got ChannelUpdate for edge not found in "+ - "graph(shortChanID=%v), saving for "+ - "reprocessing later", shortChanID) - - // NOTE: We don't return anything on the error channel - // for this message, as we expect that will be done when - // this ChannelUpdate is later reprocessed. - return nil, false - - default: - err := fmt.Errorf("unable to validate channel update "+ - "short_chan_id=%v: %v", shortChanID, err) - log.Error(err) - nMsg.err <- err - - key := newRejectCacheKey( - msg.ShortChannelID.ToUint64(), - sourceToPub(nMsg.source), - ) - _, _ = d.recentRejects.Put(key, &cachedReject{}) - - return nil, false - } - - // The least-significant bit in the flag on the channel update - // announcement tells us "which" side of the channels directed - // edge is being updated. - var ( - pubKey *btcec.PublicKey - edgeToUpdate *channeldb.ChannelEdgePolicy - ) - direction := msg.ChannelFlags & lnwire.ChanUpdateDirection - switch direction { - case 0: - pubKey, _ = chanInfo.NodeKey1() - edgeToUpdate = edge1 - case 1: - pubKey, _ = chanInfo.NodeKey2() - edgeToUpdate = edge2 - } - - // If we have a previous version of the edge being updated, - // we'll want to rate limit its updates to prevent spam - // throughout the network. - if nMsg.isRemote && edgeToUpdate != nil { - // If it's a keep-alive update, we'll only propagate one - // if it's been a day since the previous. This follows - // our own heuristic of sending keep-alive updates after - // the same duration (see retransmitStaleAnns). - timeSinceLastUpdate := timestamp.Sub(edgeToUpdate.LastUpdate) - if IsKeepAliveUpdate(msg, edgeToUpdate) { - if timeSinceLastUpdate < d.cfg.RebroadcastInterval { - log.Debugf("Ignoring keep alive update "+ - "not within %v period for "+ - "channel %v", - d.cfg.RebroadcastInterval, - shortChanID) - nMsg.err <- nil - return nil, false - } - } else { - // If it's not, we'll allow an update per minute - // with a maximum burst of 10. If we haven't - // seen an update for this channel before, we'll - // need to initialize a rate limiter for each - // direction. - d.Lock() - rateLimiters, ok := d.chanUpdateRateLimiter[shortChanID] - if !ok { - r := rate.Every(d.cfg.ChannelUpdateInterval) - b := d.cfg.MaxChannelUpdateBurst - rateLimiters = [2]*rate.Limiter{ - rate.NewLimiter(r, b), - rate.NewLimiter(r, b), - } - d.chanUpdateRateLimiter[shortChanID] = rateLimiters - } - d.Unlock() - - if !rateLimiters[direction].Allow() { - log.Debugf("Rate limiting update for "+ - "channel %v from direction %x", - shortChanID, - pubKey.SerializeCompressed()) - nMsg.err <- nil - return nil, false - } - } - } - - // Validate the channel announcement with the expected public key and - // channel capacity. In the case of an invalid channel update, we'll - // return an error to the caller and exit early. - err = routing.ValidateChannelUpdateAnn(pubKey, chanInfo.Capacity, msg) - if err != nil { - rErr := fmt.Errorf("unable to validate channel "+ - "update announcement for short_chan_id=%v: %v", - spew.Sdump(msg.ShortChannelID), err) - - log.Error(rErr) - nMsg.err <- rErr - return nil, false - } - - update := &channeldb.ChannelEdgePolicy{ - SigBytes: msg.Signature.ToSignatureBytes(), - ChannelID: shortChanID, - LastUpdate: timestamp, - MessageFlags: msg.MessageFlags, - ChannelFlags: msg.ChannelFlags, - TimeLockDelta: msg.TimeLockDelta, - MinHTLC: msg.HtlcMinimumMsat, - MaxHTLC: msg.HtlcMaximumMsat, - FeeBaseMSat: lnwire.MilliSatoshi(msg.BaseFee), - FeeProportionalMillionths: lnwire.MilliSatoshi(msg.FeeRate), - ExtraOpaqueData: msg.ExtraOpaqueData, - } - - if err := d.cfg.Router.UpdateEdge(update, schedulerOp...); err != nil { - if routing.IsError( - err, routing.ErrOutdated, - routing.ErrIgnored, - routing.ErrVBarrierShuttingDown, - ) { - - log.Debug(err) - } else { - key := newRejectCacheKey( - msg.ShortChannelID.ToUint64(), - sourceToPub(nMsg.source), - ) - _, _ = d.recentRejects.Put(key, &cachedReject{}) - - log.Error(err) - } - - nMsg.err <- err - return nil, false - } - - // If this is a local ChannelUpdate without an AuthProof, it - // means it is an update to a channel that is not (yet) - // supposed to be announced to the greater network. However, - // our channel counter party will need to be given the update, - // so we'll try sending the update directly to the remote peer. - if !nMsg.isRemote && chanInfo.AuthProof == nil { - // Get our peer's public key. - remotePubKey := remotePubFromChanInfo( - chanInfo, msg.ChannelFlags, - ) - - log.Debugf("The message %v has no AuthProof, sending "+ - "the update to remote peer %x", - msg.MsgType(), remotePubKey) - - // Now, we'll attempt to send the channel update message - // reliably to the remote peer in the background, so - // that we don't block if the peer happens to be offline - // at the moment. - err := d.reliableSender.sendMessage(msg, remotePubKey) - if err != nil { - err := fmt.Errorf("unable to reliably send %v "+ - "for channel=%v to peer=%x: %v", - msg.MsgType(), msg.ShortChannelID, - remotePubKey, err) - nMsg.err <- err - return nil, false - } - } - - // Channel update announcement was successfully processed and - // now it can be broadcast to the rest of the network. However, - // we'll only broadcast the channel update announcement if it - // has an attached authentication proof. - if chanInfo.AuthProof != nil { - announcements = append(announcements, networkMsg{ - peer: nMsg.peer, - source: nMsg.source, - msg: msg, - }) - } - - nMsg.err <- nil - return announcements, true + return d.handleChanUpdate(nMsg, msg, schedulerOp) // A new signature announcement has been received. This indicates // willingness of nodes involved in the funding of a channel to @@ -2980,3 +2672,309 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, nMsg.err <- nil return announcements, true } + +// handleChanUpdate processes a new channel update. +func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg, + upd *lnwire.ChannelUpdate, + ops []batch.SchedulerOption) ([]networkMsg, bool) { + + // We'll ignore any channel updates that target any chain other than + // the set of chains we know of. + if !bytes.Equal(upd.ChainHash[:], d.cfg.ChainHash[:]) { + err := fmt.Errorf("ignoring ChannelUpdate from chain=%v, "+ + "gossiper on chain=%v", upd.ChainHash, d.cfg.ChainHash) + log.Errorf(err.Error()) + + key := newRejectCacheKey( + upd.ShortChannelID.ToUint64(), + sourceToPub(nMsg.source), + ) + _, _ = d.recentRejects.Put(key, &cachedReject{}) + + nMsg.err <- err + return nil, false + } + + blockHeight := upd.ShortChannelID.BlockHeight + shortChanID := upd.ShortChannelID.ToUint64() + + // If the advertised inclusionary block is beyond our knowledge of the + // chain tip, then we'll put the announcement in limbo to be fully + // verified once we advance forward in the chain. + d.Lock() + if nMsg.isRemote && d.isPremature(upd.ShortChannelID, 0, nMsg) { + log.Warnf("Update announcement for short_chan_id(%v), is "+ + "premature: advertises height %v, only height %v is "+ + "known", shortChanID, blockHeight, d.bestHeight) + d.Unlock() + nMsg.err <- nil + return nil, false + } + d.Unlock() + + // Before we perform any of the expensive checks below, we'll check + // whether this update is stale or is for a zombie channel in order to + // quickly reject it. + timestamp := time.Unix(int64(upd.Timestamp), 0) + if d.cfg.Router.IsStaleEdgePolicy( + upd.ShortChannelID, timestamp, upd.ChannelFlags, + ) { + + log.Debugf("Ignored stale edge policy: peer=%v, source=%x, "+ + "msg=%s, is_remote=%v", nMsg.peer, + nMsg.source.SerializeCompressed(), nMsg.msg.MsgType(), + nMsg.isRemote, + ) + + nMsg.err <- nil + return nil, true + } + + // Get the node pub key as far since we don't have it in the channel + // update announcement message. We'll need this to properly verify the + // message's signature. + // + // We make sure to obtain the mutex for this channel ID before we + // access the database. This ensures the state we read from the + // database has not changed between this point and when we call + // UpdateEdge() later. + d.channelMtx.Lock(upd.ShortChannelID.ToUint64()) + defer d.channelMtx.Unlock(upd.ShortChannelID.ToUint64()) + chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID( + upd.ShortChannelID, + ) + switch err { + // No error, break. + case nil: + break + + case channeldb.ErrZombieEdge: + err = d.processZombieUpdate(chanInfo, upd) + if err != nil { + log.Debug(err) + nMsg.err <- err + return nil, false + } + + // We'll fallthrough to ensure we stash the update until we + // receive its corresponding ChannelAnnouncement. This is + // needed to ensure the edge exists in the graph before + // applying the update. + fallthrough + case channeldb.ErrGraphNotFound: + fallthrough + case channeldb.ErrGraphNoEdgesFound: + fallthrough + case channeldb.ErrEdgeNotFound: + // If the edge corresponding to this ChannelUpdate was not + // found in the graph, this might be a channel in the process + // of being opened, and we haven't processed our own + // ChannelAnnouncement yet, hence it is not not found in the + // graph. This usually gets resolved after the channel proofs + // are exchanged and the channel is broadcasted to the rest of + // the network, but in case this is a private channel this + // won't ever happen. This can also happen in the case of a + // zombie channel with a fresh update for which we don't have a + // ChannelAnnouncement for since we reject them. Because of + // this, we temporarily add it to a map, and reprocess it after + // our own ChannelAnnouncement has been processed. + earlyMsgs, err := d.prematureChannelUpdates.Get(shortChanID) + switch { + // Nothing in the cache yet, we can just directly insert this + // element. + case err == cache.ErrElementNotFound: + _, _ = d.prematureChannelUpdates.Put( + shortChanID, &cachedNetworkMsg{ + msgs: []*networkMsg{nMsg}, + }) + + // There's already something in the cache, so we'll combine the + // set of messages into a single value. + default: + msgs := earlyMsgs.(*cachedNetworkMsg).msgs + msgs = append(msgs, nMsg) + _, _ = d.prematureChannelUpdates.Put( + shortChanID, &cachedNetworkMsg{ + msgs: msgs, + }) + } + + log.Debugf("Got ChannelUpdate for edge not found in graph"+ + "(shortChanID=%v), saving for reprocessing later", + shortChanID) + + // NOTE: We don't return anything on the error channel for this + // message, as we expect that will be done when this + // ChannelUpdate is later reprocessed. + return nil, false + + default: + err := fmt.Errorf("unable to validate channel update "+ + "short_chan_id=%v: %v", shortChanID, err) + log.Error(err) + nMsg.err <- err + + key := newRejectCacheKey( + upd.ShortChannelID.ToUint64(), + sourceToPub(nMsg.source), + ) + _, _ = d.recentRejects.Put(key, &cachedReject{}) + + return nil, false + } + + // The least-significant bit in the flag on the channel update + // announcement tells us "which" side of the channels directed edge is + // being updated. + var ( + pubKey *btcec.PublicKey + edgeToUpdate *channeldb.ChannelEdgePolicy + ) + direction := upd.ChannelFlags & lnwire.ChanUpdateDirection + switch direction { + case 0: + pubKey, _ = chanInfo.NodeKey1() + edgeToUpdate = e1 + case 1: + pubKey, _ = chanInfo.NodeKey2() + edgeToUpdate = e2 + } + + // Validate the channel announcement with the expected public key and + // channel capacity. In the case of an invalid channel update, we'll + // return an error to the caller and exit early. + err = routing.ValidateChannelUpdateAnn(pubKey, chanInfo.Capacity, upd) + if err != nil { + rErr := fmt.Errorf("unable to validate channel update "+ + "announcement for short_chan_id=%v: %v", + spew.Sdump(upd.ShortChannelID), err) + + log.Error(rErr) + nMsg.err <- rErr + return nil, false + } + + // If we have a previous version of the edge being updated, we'll want + // to rate limit its updates to prevent spam throughout the network. + if nMsg.isRemote && edgeToUpdate != nil { + // If it's a keep-alive update, we'll only propagate one if + // it's been a day since the previous. This follows our own + // heuristic of sending keep-alive updates after the same + // duration (see retransmitStaleAnns). + timeSinceLastUpdate := timestamp.Sub(edgeToUpdate.LastUpdate) + if IsKeepAliveUpdate(upd, edgeToUpdate) { + if timeSinceLastUpdate < d.cfg.RebroadcastInterval { + log.Debugf("Ignoring keep alive update not "+ + "within %v period for channel %v", + d.cfg.RebroadcastInterval, shortChanID) + nMsg.err <- nil + return nil, false + } + } else { + // If it's not, we'll allow an update per minute with a + // maximum burst of 10. If we haven't seen an update + // for this channel before, we'll need to initialize a + // rate limiter for each direction. + d.Lock() + rls, ok := d.chanUpdateRateLimiter[shortChanID] + if !ok { + r := rate.Every(d.cfg.ChannelUpdateInterval) + b := d.cfg.MaxChannelUpdateBurst + rls = [2]*rate.Limiter{ + rate.NewLimiter(r, b), + rate.NewLimiter(r, b), + } + d.chanUpdateRateLimiter[shortChanID] = rls + } + d.Unlock() + + if !rls[direction].Allow() { + log.Debugf("Rate limiting update for channel "+ + "%v from direction %x", shortChanID, + pubKey.SerializeCompressed()) + nMsg.err <- nil + return nil, false + } + } + } + + update := &channeldb.ChannelEdgePolicy{ + SigBytes: upd.Signature.ToSignatureBytes(), + ChannelID: shortChanID, + LastUpdate: timestamp, + MessageFlags: upd.MessageFlags, + ChannelFlags: upd.ChannelFlags, + TimeLockDelta: upd.TimeLockDelta, + MinHTLC: upd.HtlcMinimumMsat, + MaxHTLC: upd.HtlcMaximumMsat, + FeeBaseMSat: lnwire.MilliSatoshi(upd.BaseFee), + FeeProportionalMillionths: lnwire.MilliSatoshi(upd.FeeRate), + ExtraOpaqueData: upd.ExtraOpaqueData, + } + + if err := d.cfg.Router.UpdateEdge(update, ops...); err != nil { + if routing.IsError( + err, routing.ErrOutdated, + routing.ErrIgnored, + routing.ErrVBarrierShuttingDown, + ) { + + log.Debug(err) + } else { + key := newRejectCacheKey( + upd.ShortChannelID.ToUint64(), + sourceToPub(nMsg.source), + ) + _, _ = d.recentRejects.Put(key, &cachedReject{}) + + log.Error(err) + } + + nMsg.err <- err + return nil, false + } + + // If this is a local ChannelUpdate without an AuthProof, it means it + // is an update to a channel that is not (yet) supposed to be announced + // to the greater network. However, our channel counter party will need + // to be given the update, so we'll try sending the update directly to + // the remote peer. + if !nMsg.isRemote && chanInfo.AuthProof == nil { + // Get our peer's public key. + remotePubKey := remotePubFromChanInfo( + chanInfo, upd.ChannelFlags, + ) + + log.Debugf("The message %v has no AuthProof, sending the "+ + "update to remote peer %x", upd.MsgType(), + remotePubKey) + + // Now we'll attempt to send the channel update message + // reliably to the remote peer in the background, so that we + // don't block if the peer happens to be offline at the moment. + err := d.reliableSender.sendMessage(upd, remotePubKey) + if err != nil { + err := fmt.Errorf("unable to reliably send %v for "+ + "channel=%v to peer=%x: %v", upd.MsgType(), + upd.ShortChannelID, remotePubKey, err) + nMsg.err <- err + return nil, false + } + } + + // Channel update announcement was successfully processed and now it + // can be broadcast to the rest of the network. However, we'll only + // broadcast the channel update announcement if it has an attached + // authentication proof. + var announcements []networkMsg + if chanInfo.AuthProof != nil { + announcements = append(announcements, networkMsg{ + peer: nMsg.peer, + source: nMsg.source, + msg: upd, + }) + } + + nMsg.err <- nil + return announcements, true +}