diff --git a/discovery/gossiper.go b/discovery/gossiper.go index a1578c928..b07500a36 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1773,239 +1773,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // the existence of a channel and not yet the routing policies in // either direction of the channel. case *lnwire.ChannelAnnouncement: - // We'll ignore any channel announcements that target any chain - // other than the set of chains we know of. - if !bytes.Equal(msg.ChainHash[:], d.cfg.ChainHash[:]) { - err := fmt.Errorf("ignoring ChannelAnnouncement from "+ - "chain=%v, gossiper on chain=%v", msg.ChainHash, - d.cfg.ChainHash) - log.Errorf(err.Error()) - - key := newRejectCacheKey( - msg.ShortChannelID.ToUint64(), - sourceToPub(nMsg.source), - ) - _, _ = d.recentRejects.Put(key, &cachedReject{}) - - nMsg.err <- err - return nil, false - } - - // If the advertised inclusionary block is beyond our knowledge - // of the chain tip, then we'll ignore for it now. - d.Lock() - if nMsg.isRemote && d.isPremature(msg.ShortChannelID, 0, nMsg) { - log.Warnf("Announcement for chan_id=(%v), is "+ - "premature: advertises height %v, only "+ - "height %v is known", - msg.ShortChannelID.ToUint64(), - msg.ShortChannelID.BlockHeight, - d.bestHeight) - d.Unlock() - nMsg.err <- nil - return nil, false - } - d.Unlock() - - // At this point, we'll now ask the router if this is a - // zombie/known edge. If so we can skip all the processing - // below. - if d.cfg.Router.IsKnownEdge(msg.ShortChannelID) { - nMsg.err <- nil - return nil, true - } - - // If this is a remote channel announcement, then we'll validate - // all the signatures within the proof as it should be well - // formed. - var proof *channeldb.ChannelAuthProof - if nMsg.isRemote { - if err := routing.ValidateChannelAnn(msg); err != nil { - err := fmt.Errorf("unable to validate "+ - "announcement: %v", err) - - key := newRejectCacheKey( - msg.ShortChannelID.ToUint64(), - sourceToPub(nMsg.source), - ) - _, _ = d.recentRejects.Put(key, &cachedReject{}) - - log.Error(err) - nMsg.err <- err - return nil, false - } - - // If the proof checks out, then we'll save the proof - // itself to the database so we can fetch it later when - // gossiping with other nodes. - proof = &channeldb.ChannelAuthProof{ - NodeSig1Bytes: msg.NodeSig1.ToSignatureBytes(), - NodeSig2Bytes: msg.NodeSig2.ToSignatureBytes(), - BitcoinSig1Bytes: msg.BitcoinSig1.ToSignatureBytes(), - BitcoinSig2Bytes: msg.BitcoinSig2.ToSignatureBytes(), - } - } - - // With the proof validate (if necessary), we can now store it - // within the database for our path finding and syncing needs. - var featureBuf bytes.Buffer - if err := msg.Features.Encode(&featureBuf); err != nil { - log.Errorf("unable to encode features: %v", err) - nMsg.err <- err - return nil, false - } - - edge := &channeldb.ChannelEdgeInfo{ - ChannelID: msg.ShortChannelID.ToUint64(), - ChainHash: msg.ChainHash, - NodeKey1Bytes: msg.NodeID1, - NodeKey2Bytes: msg.NodeID2, - BitcoinKey1Bytes: msg.BitcoinKey1, - BitcoinKey2Bytes: msg.BitcoinKey2, - AuthProof: proof, - Features: featureBuf.Bytes(), - ExtraOpaqueData: msg.ExtraOpaqueData, - } - - // If there were any optional message fields provided, we'll - // include them in its serialized disk representation now. - if nMsg.optionalMsgFields != nil { - if nMsg.optionalMsgFields.capacity != nil { - edge.Capacity = *nMsg.optionalMsgFields.capacity - } - if nMsg.optionalMsgFields.channelPoint != nil { - edge.ChannelPoint = *nMsg.optionalMsgFields.channelPoint - } - } - - // We will add the edge to the channel router. If the nodes - // present in this channel are not present in the database, a - // partial node will be added to represent each node while we - // wait for a node announcement. - // - // Before we add the edge to the database, we obtain - // the mutex for this channel ID. We do this to ensure - // no other goroutine has read the database and is now - // making decisions based on this DB state, before it - // writes to the DB. - d.channelMtx.Lock(msg.ShortChannelID.ToUint64()) - err := d.cfg.Router.AddEdge(edge, schedulerOp...) - if err != nil { - defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64()) - - // If the edge was rejected due to already being known, - // then it may be that case that this new message has a - // fresh channel proof, so we'll check. - if routing.IsError(err, routing.ErrIgnored) { - // Attempt to process the rejected message to - // see if we get any new announcements. - anns, rErr := d.processRejectedEdge(msg, proof) - if rErr != nil { - - key := newRejectCacheKey( - msg.ShortChannelID.ToUint64(), - sourceToPub(nMsg.source), - ) - _, _ = d.recentRejects.Put(key, &cachedReject{}) - - nMsg.err <- rErr - return nil, false - } - - // If while processing this rejected edge, we - // realized there's a set of announcements we - // could extract, then we'll return those - // directly. - if len(anns) != 0 { - nMsg.err <- nil - return anns, true - } - - // Otherwise, this is just a regular rejected - // edge. - log.Debugf("Router rejected channel "+ - "edge: %v", err) - } else { - log.Debugf("Router rejected channel "+ - "edge: %v", err) - - key := newRejectCacheKey( - msg.ShortChannelID.ToUint64(), - sourceToPub(nMsg.source), - ) - _, _ = d.recentRejects.Put(key, &cachedReject{}) - } - - nMsg.err <- err - return nil, false - } - - // If err is nil, release the lock immediately. - d.channelMtx.Unlock(msg.ShortChannelID.ToUint64()) - - // If we earlier received any ChannelUpdates for this channel, - // we can now process them, as the channel is added to the - // graph. - shortChanID := msg.ShortChannelID.ToUint64() - var channelUpdates []*networkMsg - - earlyChanUpdates, err := d.prematureChannelUpdates.Get(shortChanID) - if err == nil { - // There was actually an entry in the map, so we'll - // accumulate it. We don't worry about deletion, since - // it'll eventually fall out anyway. - chanMsgs := earlyChanUpdates.(*cachedNetworkMsg) - channelUpdates = append(channelUpdates, chanMsgs.msgs...) - } - - // Launch a new goroutine to handle each ChannelUpdate, this to - // ensure we don't block here, as we can handle only one - // announcement at a time. - for _, cu := range channelUpdates { - d.wg.Add(1) - go func(nMsg *networkMsg) { - defer d.wg.Done() - - switch msg := nMsg.msg.(type) { - - // Reprocess the message, making sure we return - // an error to the original caller in case the - // gossiper shuts down. - case *lnwire.ChannelUpdate: - log.Debugf("Reprocessing"+ - " ChannelUpdate for "+ - "shortChanID=%v", - msg.ShortChannelID.ToUint64()) - - select { - case d.networkMsgs <- nMsg: - case <-d.quit: - nMsg.err <- ErrGossiperShuttingDown - } - - // We don't expect any other message type than - // ChannelUpdate to be in this map. - default: - log.Errorf("Unsupported message type "+ - "found among ChannelUpdates: "+ - "%T", msg) - } - }(cu) - } - - // Channel announcement was successfully proceeded and know it - // might be broadcast to other connected nodes if it was - // announcement with proof (remote). - if proof != nil { - announcements = append(announcements, networkMsg{ - peer: nMsg.peer, - source: nMsg.source, - msg: msg, - }) - } - - nMsg.err <- nil - return announcements, true + return d.handleChanAnnouncement(nMsg, msg, schedulerOp) // A new authenticated channel edge update has arrived. This indicates // that the directional information for an already known channel has @@ -2983,3 +2751,232 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg, // TODO(roasbeef): get rid of the above return announcements, true } + +// handleChanAnnouncement processes a new channel announcement. +func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, + ann *lnwire.ChannelAnnouncement, + ops []batch.SchedulerOption) ([]networkMsg, bool) { + + // We'll ignore any channel announcements that target any chain other + // than the set of chains we know of. + if !bytes.Equal(ann.ChainHash[:], d.cfg.ChainHash[:]) { + err := fmt.Errorf("ignoring ChannelAnnouncement from chain=%v"+ + ", gossiper on chain=%v", ann.ChainHash, + d.cfg.ChainHash) + log.Errorf(err.Error()) + + key := newRejectCacheKey( + ann.ShortChannelID.ToUint64(), + sourceToPub(nMsg.source), + ) + _, _ = d.recentRejects.Put(key, &cachedReject{}) + + nMsg.err <- err + return nil, false + } + + // If the advertised inclusionary block is beyond our knowledge of the + // chain tip, then we'll ignore it for now. + d.Lock() + if nMsg.isRemote && d.isPremature(ann.ShortChannelID, 0, nMsg) { + log.Warnf("Announcement for chan_id=(%v), is premature: "+ + "advertises height %v, only height %v is known", + ann.ShortChannelID.ToUint64(), + ann.ShortChannelID.BlockHeight, d.bestHeight) + d.Unlock() + nMsg.err <- nil + return nil, false + } + d.Unlock() + + // At this point, we'll now ask the router if this is a zombie/known + // edge. If so we can skip all the processing below. + if d.cfg.Router.IsKnownEdge(ann.ShortChannelID) { + nMsg.err <- nil + return nil, true + } + + // If this is a remote channel announcement, then we'll validate all + // the signatures within the proof as it should be well formed. + var proof *channeldb.ChannelAuthProof + if nMsg.isRemote { + if err := routing.ValidateChannelAnn(ann); err != nil { + err := fmt.Errorf("unable to validate announcement: "+ + "%v", err) + + key := newRejectCacheKey( + ann.ShortChannelID.ToUint64(), + sourceToPub(nMsg.source), + ) + _, _ = d.recentRejects.Put(key, &cachedReject{}) + + log.Error(err) + nMsg.err <- err + return nil, false + } + + // If the proof checks out, then we'll save the proof itself to + // the database so we can fetch it later when gossiping with + // other nodes. + proof = &channeldb.ChannelAuthProof{ + NodeSig1Bytes: ann.NodeSig1.ToSignatureBytes(), + NodeSig2Bytes: ann.NodeSig2.ToSignatureBytes(), + BitcoinSig1Bytes: ann.BitcoinSig1.ToSignatureBytes(), + BitcoinSig2Bytes: ann.BitcoinSig2.ToSignatureBytes(), + } + } + + // With the proof validated (if necessary), we can now store it within + // the database for our path finding and syncing needs. + var featureBuf bytes.Buffer + if err := ann.Features.Encode(&featureBuf); err != nil { + log.Errorf("unable to encode features: %v", err) + nMsg.err <- err + return nil, false + } + + edge := &channeldb.ChannelEdgeInfo{ + ChannelID: ann.ShortChannelID.ToUint64(), + ChainHash: ann.ChainHash, + NodeKey1Bytes: ann.NodeID1, + NodeKey2Bytes: ann.NodeID2, + BitcoinKey1Bytes: ann.BitcoinKey1, + BitcoinKey2Bytes: ann.BitcoinKey2, + AuthProof: proof, + Features: featureBuf.Bytes(), + ExtraOpaqueData: ann.ExtraOpaqueData, + } + + // If there were any optional message fields provided, we'll include + // them in its serialized disk representation now. + if nMsg.optionalMsgFields != nil { + if nMsg.optionalMsgFields.capacity != nil { + edge.Capacity = *nMsg.optionalMsgFields.capacity + } + if nMsg.optionalMsgFields.channelPoint != nil { + cp := *nMsg.optionalMsgFields.channelPoint + edge.ChannelPoint = cp + } + } + + // We will add the edge to the channel router. If the nodes present in + // this channel are not present in the database, a partial node will be + // added to represent each node while we wait for a node announcement. + // + // Before we add the edge to the database, we obtain the mutex for this + // channel ID. We do this to ensure no other goroutine has read the + // database and is now making decisions based on this DB state, before + // it writes to the DB. + d.channelMtx.Lock(ann.ShortChannelID.ToUint64()) + err := d.cfg.Router.AddEdge(edge, ops...) + if err != nil { + defer d.channelMtx.Unlock(ann.ShortChannelID.ToUint64()) + + // If the edge was rejected due to already being known, then it + // may be the case that this new message has a fresh channel + // proof, so we'll check. + if routing.IsError(err, routing.ErrIgnored) { + // Attempt to process the rejected message to see if we + // get any new announcements. + anns, rErr := d.processRejectedEdge(ann, proof) + if rErr != nil { + key := newRejectCacheKey( + ann.ShortChannelID.ToUint64(), + sourceToPub(nMsg.source), + ) + cr := &cachedReject{} + _, _ = d.recentRejects.Put(key, cr) + + nMsg.err <- rErr + return nil, false + } + + // If while processing this rejected edge, we realized + // there's a set of announcements we could extract, + // then we'll return those directly. + if len(anns) != 0 { + nMsg.err <- nil + return anns, true + } + + // Otherwise, this is just a regular rejected edge. + log.Debugf("Router rejected channel edge: %v", err) + } else { + log.Debugf("Router rejected channel edge: %v", err) + + key := newRejectCacheKey( + ann.ShortChannelID.ToUint64(), + sourceToPub(nMsg.source), + ) + _, _ = d.recentRejects.Put(key, &cachedReject{}) + } + + nMsg.err <- err + return nil, false + } + + // If err is nil, release the lock immediately. + d.channelMtx.Unlock(ann.ShortChannelID.ToUint64()) + + // If we earlier received any ChannelUpdates for this channel, we can + // now process them, as the channel is added to the graph. + shortChanID := ann.ShortChannelID.ToUint64() + var channelUpdates []*networkMsg + + earlyChanUpdates, err := d.prematureChannelUpdates.Get(shortChanID) + if err == nil { + // There was actually an entry in the map, so we'll accumulate + // it. We don't worry about deletion, since it'll eventually + // fall out anyway. + chanMsgs := earlyChanUpdates.(*cachedNetworkMsg) + channelUpdates = append(channelUpdates, chanMsgs.msgs...) + } + + // Launch a new goroutine to handle each ChannelUpdate, this is to + // ensure we don't block here, as we can handle only one announcement + // at a time. + for _, cu := range channelUpdates { + d.wg.Add(1) + go func(updMsg *networkMsg) { + defer d.wg.Done() + + switch msg := updMsg.msg.(type) { + // Reprocess the message, making sure we return an + // error to the original caller in case the gossiper + // shuts down. + case *lnwire.ChannelUpdate: + log.Debugf("Reprocessing ChannelUpdate for "+ + "shortChanID=%v", + msg.ShortChannelID.ToUint64()) + + select { + case d.networkMsgs <- updMsg: + case <-d.quit: + updMsg.err <- ErrGossiperShuttingDown + } + + // We don't expect any other message type than + // ChannelUpdate to be in this cache. + default: + log.Errorf("Unsupported message type found "+ + "among ChannelUpdates: %T", msg) + } + }(cu) + } + + // Channel announcement was successfully processed and now it might be + // broadcast to other connected nodes if it was an announcement with + // proof (remote). + var announcements []networkMsg + + if proof != nil { + announcements = append(announcements, networkMsg{ + peer: nMsg.peer, + source: nMsg.source, + msg: ann, + }) + } + + nMsg.err <- nil + return announcements, true +}