discovery: update graph API usage to match recent API changes

This commit is contained in:
Olaoluwa Osuntokun
2018-01-30 20:23:14 -08:00
parent 5e9166e478
commit cd9d2d7e6f
4 changed files with 319 additions and 122 deletions

View File

@@ -157,17 +157,17 @@ type AuthenticatedGossiper struct {
// TODO(roasbeef): limit premature networkMsgs to N
prematureAnnouncements map[uint32][]*networkMsg
// prematureChannelUpdates is a map of ChannelUpdates we have
// received that wasn't associated with any channel we know about.
// We store them temporarily, such that we can reprocess them when
// a ChannelAnnouncement for the channel is received.
// prematureChannelUpdates is a map of ChannelUpdates we have received
// that wasn't associated with any channel we know about. We store
// them temporarily, such that we can reprocess them when a
// ChannelAnnouncement for the channel is received.
prematureChannelUpdates map[uint64][]*networkMsg
pChanUpdMtx sync.Mutex
// waitingProofs is a persistent storage of partial channel proof
// announcement messages. We use it to buffer half of the material
// needed to reconstruct a full authenticated channel announcement. Once
// we receive the other half the channel proof, we'll be able to
// needed to reconstruct a full authenticated channel announcement.
// Once we receive the other half the channel proof, we'll be able to
// properly validate it an re-broadcast it out to the network.
waitingProofs *channeldb.WaitingProofStore
@@ -176,8 +176,8 @@ type AuthenticatedGossiper struct {
// networkHandler.
networkMsgs chan *networkMsg
// chanPolicyUpdates is a channel that requests to update the forwarding
// policy of a set of channels is sent over.
// chanPolicyUpdates is a channel that requests to update the
// forwarding policy of a set of channels is sent over.
chanPolicyUpdates chan *chanPolicyUpdateRequest
// bestHeight is the height of the block at the tip of the main chain
@@ -232,17 +232,22 @@ func (d *AuthenticatedGossiper) SynchronizeNode(pub *btcec.PublicKey) error {
// containing all the messages to be sent to the target peer.
var announceMessages []lnwire.Message
makeNodeAnn := func(n *channeldb.LightningNode) *lnwire.NodeAnnouncement {
makeNodeAnn := func(n *channeldb.LightningNode) (*lnwire.NodeAnnouncement, error) {
alias, _ := lnwire.NewNodeAlias(n.Alias)
wireSig, err := lnwire.NewSigFromRawSignature(n.AuthSigBytes)
if err != nil {
return nil, err
}
return &lnwire.NodeAnnouncement{
Signature: n.AuthSig,
Signature: wireSig,
Timestamp: uint32(n.LastUpdate.Unix()),
Addresses: n.Addresses,
NodeID: n.PubKey,
NodeID: n.PubKeyBytes,
Features: n.Features.RawFeatureVector,
RGBColor: n.Color,
Alias: alias,
}
}, nil
}
// As peers are expecting channel announcements before node
@@ -262,8 +267,12 @@ func (d *AuthenticatedGossiper) SynchronizeNode(pub *btcec.PublicKey) error {
// also has known validated nodes, then we'll send that as
// well.
if chanInfo.AuthProof != nil {
chanAnn, e1Ann, e2Ann := createChanAnnouncement(
chanInfo.AuthProof, chanInfo, e1, e2)
chanAnn, e1Ann, e2Ann, err := createChanAnnouncement(
chanInfo.AuthProof, chanInfo, e1, e2,
)
if err != nil {
return err
}
announceMessages = append(announceMessages, chanAnn)
if e1Ann != nil {
@@ -272,7 +281,10 @@ func (d *AuthenticatedGossiper) SynchronizeNode(pub *btcec.PublicKey) error {
// If this edge has a validated node
// announcement, then we'll send that as well.
if e1.Node.HaveNodeAnnouncement {
nodeAnn := makeNodeAnn(e1.Node)
nodeAnn, err := makeNodeAnn(e1.Node)
if err != nil {
return err
}
announceMessages = append(
announceMessages, nodeAnn,
)
@@ -285,7 +297,10 @@ func (d *AuthenticatedGossiper) SynchronizeNode(pub *btcec.PublicKey) error {
// If this edge has a validated node
// announcement, then we'll send that as well.
if e2.Node.HaveNodeAnnouncement {
nodeAnn := makeNodeAnn(e2.Node)
nodeAnn, err := makeNodeAnn(e2.Node)
if err != nil {
return err
}
announceMessages = append(
announceMessages, nodeAnn,
)
@@ -588,7 +603,7 @@ func (d *deDupedAnnouncements) addMsg(message networkMsg) {
// NodeID to create the corresponding Vertex.
case *lnwire.NodeAnnouncement:
sender := routing.NewVertex(message.peer)
deDupKey := routing.NewVertex(msg.NodeID)
deDupKey := routing.Vertex(msg.NodeID)
// We do the same for node announcements as we did for channel
// updates, as they also carry a timestamp.
@@ -1182,9 +1197,12 @@ func (d *AuthenticatedGossiper) processRejectedEdge(chanAnnMsg *lnwire.ChannelAn
// We'll then create then validate the new fully assembled
// announcement.
chanAnn, e1Ann, e2Ann := createChanAnnouncement(
chanAnn, e1Ann, e2Ann, err := createChanAnnouncement(
proof, chanInfo, e1, e2,
)
if err != nil {
return nil, err
}
err = ValidateChannelAnn(chanAnn)
if err != nil {
err := errors.Errorf("assembled channel announcement proof "+
@@ -1265,9 +1283,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
HaveNodeAnnouncement: true,
LastUpdate: time.Unix(int64(msg.Timestamp), 0),
Addresses: msg.Addresses,
PubKey: msg.NodeID,
PubKeyBytes: msg.NodeID,
Alias: msg.Alias.String(),
AuthSig: msg.Signature,
AuthSigBytes: msg.Signature.ToSignatureBytes(),
Features: features,
Color: msg.RGBColor,
}
@@ -1348,10 +1366,10 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
// itself to the database so we can fetch it later when
// gossiping with other nodes.
proof = &channeldb.ChannelAuthProof{
NodeSig1: msg.NodeSig1,
NodeSig2: msg.NodeSig2,
BitcoinSig1: msg.BitcoinSig1,
BitcoinSig2: msg.BitcoinSig2,
NodeSig1Bytes: msg.NodeSig1.ToSignatureBytes(),
NodeSig2Bytes: msg.NodeSig2.ToSignatureBytes(),
BitcoinSig1Bytes: msg.BitcoinSig1.ToSignatureBytes(),
BitcoinSig2Bytes: msg.BitcoinSig2.ToSignatureBytes(),
}
}
@@ -1365,14 +1383,14 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
}
edge := &channeldb.ChannelEdgeInfo{
ChannelID: msg.ShortChannelID.ToUint64(),
ChainHash: msg.ChainHash,
NodeKey1: msg.NodeID1,
NodeKey2: msg.NodeID2,
BitcoinKey1: msg.BitcoinKey1,
BitcoinKey2: msg.BitcoinKey2,
AuthProof: proof,
Features: featureBuf.Bytes(),
ChannelID: msg.ShortChannelID.ToUint64(),
ChainHash: msg.ChainHash,
NodeKey1Bytes: msg.NodeID1,
NodeKey2Bytes: msg.NodeID2,
BitcoinKey1Bytes: msg.BitcoinKey1,
BitcoinKey2Bytes: msg.BitcoinKey2,
AuthProof: proof,
Features: featureBuf.Bytes(),
}
// We will add the edge to the channel router. If the nodes
@@ -1560,7 +1578,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
d.prematureChannelUpdates[shortChanID],
nMsg)
d.pChanUpdMtx.Unlock()
log.Infof("Got ChannelUpdate for edge not "+
log.Debugf("Got ChannelUpdate for edge not "+
"found in graph(shortChanID=%v), "+
"saving for reprocessing later",
shortChanID)
@@ -1582,9 +1600,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
var pubKey *btcec.PublicKey
switch {
case msg.Flags&lnwire.ChanUpdateDirection == 0:
pubKey = chanInfo.NodeKey1
pubKey, _ = chanInfo.NodeKey1()
case msg.Flags&lnwire.ChanUpdateDirection == 1:
pubKey = chanInfo.NodeKey2
pubKey, _ = chanInfo.NodeKey2()
}
// Validate the channel announcement with the expected public
@@ -1601,7 +1619,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
}
update := &channeldb.ChannelEdgePolicy{
Signature: msg.Signature,
SigBytes: msg.Signature.ToSignatureBytes(),
ChannelID: shortChanID,
LastUpdate: time.Unix(int64(msg.Timestamp), 0),
Flags: msg.Flags,
@@ -1632,9 +1650,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
var remotePeer *btcec.PublicKey
switch {
case msg.Flags&lnwire.ChanUpdateDirection == 0:
remotePeer = chanInfo.NodeKey2
remotePeer, _ = chanInfo.NodeKey2()
case msg.Flags&lnwire.ChanUpdateDirection == 1:
remotePeer = chanInfo.NodeKey1
remotePeer, _ = chanInfo.NodeKey1()
}
// Send ChannelUpdate directly to remotePeer.
@@ -1726,9 +1744,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
}
isFirstNode := bytes.Equal(nMsg.peer.SerializeCompressed(),
chanInfo.NodeKey1.SerializeCompressed())
chanInfo.NodeKey1Bytes[:])
isSecondNode := bytes.Equal(nMsg.peer.SerializeCompressed(),
chanInfo.NodeKey2.SerializeCompressed())
chanInfo.NodeKey2Bytes[:])
// Ensure that channel that was retrieved belongs to the peer
// which sent the proof announcement.
@@ -1748,9 +1766,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
if !nMsg.isRemote {
var remotePeer *btcec.PublicKey
if isFirstNode {
remotePeer = chanInfo.NodeKey2
remotePeer, _ = chanInfo.NodeKey2()
} else {
remotePeer = chanInfo.NodeKey1
remotePeer, _ = chanInfo.NodeKey1()
}
// Since the remote peer might not be online
// we'll call a method that will attempt to
@@ -1786,9 +1804,14 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
msg.ChannelID,
peerID)
chanAnn, _, _ := createChanAnnouncement(
chanInfo.AuthProof, chanInfo, e1, e2)
err := d.cfg.SendToPeer(nMsg.peer, chanAnn)
chanAnn, _, _, err := createChanAnnouncement(
chanInfo.AuthProof, chanInfo, e1, e2,
)
if err != nil {
log.Error("unable to gen ann: %v", err)
return
}
err = d.cfg.SendToPeer(nMsg.peer, chanAnn)
if err != nil {
log.Errorf("Failed sending "+
"full proof to "+
@@ -1846,17 +1869,22 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
// validate it shortly below.
var dbProof channeldb.ChannelAuthProof
if isFirstNode {
dbProof.NodeSig1 = msg.NodeSignature
dbProof.NodeSig2 = oppositeProof.NodeSignature
dbProof.BitcoinSig1 = msg.BitcoinSignature
dbProof.BitcoinSig2 = oppositeProof.BitcoinSignature
dbProof.NodeSig1Bytes = msg.NodeSignature.ToSignatureBytes()
dbProof.NodeSig2Bytes = oppositeProof.NodeSignature.ToSignatureBytes()
dbProof.BitcoinSig1Bytes = msg.BitcoinSignature.ToSignatureBytes()
dbProof.BitcoinSig2Bytes = oppositeProof.BitcoinSignature.ToSignatureBytes()
} else {
dbProof.NodeSig1 = oppositeProof.NodeSignature
dbProof.NodeSig2 = msg.NodeSignature
dbProof.BitcoinSig1 = oppositeProof.BitcoinSignature
dbProof.BitcoinSig2 = msg.BitcoinSignature
dbProof.NodeSig1Bytes = oppositeProof.NodeSignature.ToSignatureBytes()
dbProof.NodeSig2Bytes = msg.NodeSignature.ToSignatureBytes()
dbProof.BitcoinSig1Bytes = oppositeProof.BitcoinSignature.ToSignatureBytes()
dbProof.BitcoinSig2Bytes = msg.BitcoinSignature.ToSignatureBytes()
}
chanAnn, e1Ann, e2Ann, err := createChanAnnouncement(&dbProof, chanInfo, e1, e2)
if err != nil {
log.Error(err)
nMsg.err <- err
return nil
}
chanAnn, e1Ann, e2Ann := createChanAnnouncement(&dbProof, chanInfo, e1, e2)
// With all the necessary components assembled validate the
// full channel announcement proof.
@@ -2017,6 +2045,8 @@ func (d *AuthenticatedGossiper) sendAnnSigReliably(
func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement, *lnwire.ChannelUpdate, error) {
var err error
// Make sure timestamp is always increased, such that our update
// gets propagated.
timestamp := time.Now().Unix()
@@ -2025,7 +2055,6 @@ func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo,
}
edge.LastUpdate = time.Unix(timestamp, 0)
chanUpdate := &lnwire.ChannelUpdate{
Signature: edge.Signature,
ChainHash: info.ChainHash,
ShortChannelID: lnwire.NewShortChanIDFromInt(edge.ChannelID),
Timestamp: uint32(timestamp),
@@ -2035,6 +2064,10 @@ func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo,
BaseFee: uint32(edge.FeeBaseMSat),
FeeRate: uint32(edge.FeeProportionalMillionths),
}
chanUpdate.Signature, err = lnwire.NewSigFromRawSignature(edge.SigBytes)
if err != nil {
return nil, nil, err
}
// With the update applied, we'll generate a new signature over a
// digest of the channel announcement itself.
@@ -2045,8 +2078,11 @@ func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo,
// Next, we'll set the new signature in place, and update the reference
// in the backing slice.
edge.Signature = sig
chanUpdate.Signature = sig
edge.SigBytes = sig.Serialize()
chanUpdate.Signature, err = lnwire.NewSigFromSignature(sig)
if err != nil {
return nil, nil, err
}
// To ensure that our signature is valid, we'll verify it ourself
// before committing it to the slice returned.
@@ -2057,7 +2093,6 @@ func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo,
}
// Finally, we'll write the new edge policy to disk.
edge.Node.PubKey.Curve = nil
if err := d.cfg.Router.UpdateEdge(edge); err != nil {
return nil, nil, err
}
@@ -2069,17 +2104,37 @@ func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo,
if info.AuthProof != nil {
chanID := lnwire.NewShortChanIDFromInt(info.ChannelID)
chanAnn = &lnwire.ChannelAnnouncement{
NodeSig1: info.AuthProof.NodeSig1,
NodeSig2: info.AuthProof.NodeSig2,
ShortChannelID: chanID,
BitcoinSig1: info.AuthProof.BitcoinSig1,
BitcoinSig2: info.AuthProof.BitcoinSig2,
NodeID1: info.NodeKey1,
NodeID2: info.NodeKey2,
NodeID1: info.NodeKey1Bytes,
NodeID2: info.NodeKey2Bytes,
ChainHash: info.ChainHash,
BitcoinKey1: info.BitcoinKey1,
BitcoinKey1: info.BitcoinKey1Bytes,
Features: lnwire.NewRawFeatureVector(),
BitcoinKey2: info.BitcoinKey2,
BitcoinKey2: info.BitcoinKey2Bytes,
}
chanAnn.NodeSig1, err = lnwire.NewSigFromRawSignature(
info.AuthProof.NodeSig1Bytes,
)
if err != nil {
return nil, nil, err
}
chanAnn.NodeSig2, err = lnwire.NewSigFromRawSignature(
info.AuthProof.NodeSig2Bytes,
)
if err != nil {
return nil, nil, err
}
chanAnn.BitcoinSig1, err = lnwire.NewSigFromRawSignature(
info.AuthProof.BitcoinSig1Bytes,
)
if err != nil {
return nil, nil, err
}
chanAnn.BitcoinSig2, err = lnwire.NewSigFromRawSignature(
info.AuthProof.BitcoinSig2Bytes,
)
if err != nil {
return nil, nil, err
}
}