Merge pull request #5621 from Roasbeef/ping-pong-headers

peer: always send latest block header as part of ping messages
This commit is contained in:
Olaoluwa Osuntokun 2021-08-27 16:55:46 -07:00 committed by GitHub
commit c93824ec9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 2111 additions and 1906 deletions

View File

@ -157,6 +157,10 @@ func (b *BitcoindNotifier) startNotifier() error {
if err != nil {
return err
}
blockHeader, err := b.chainConn.GetBlockHeader(currentHash)
if err != nil {
return err
}
b.txNotifier = chainntnfs.NewTxNotifier(
uint32(currentHeight), chainntnfs.ReorgSafetyLimit,
@ -164,8 +168,9 @@ func (b *BitcoindNotifier) startNotifier() error {
)
b.bestBlock = chainntnfs.BlockEpoch{
Height: currentHeight,
Hash: currentHash,
Height: currentHeight,
Hash: currentHash,
BlockHeader: blockHeader,
}
b.wg.Add(1)
@ -322,6 +327,7 @@ out:
b.notifyBlockEpochClient(
msg, b.bestBlock.Height,
b.bestBlock.Hash,
b.bestBlock.BlockHeader,
)
msg.errorChan <- nil
@ -343,6 +349,7 @@ out:
for _, block := range missedBlocks {
b.notifyBlockEpochClient(
msg, block.Height, block.Hash,
block.BlockHeader,
)
}
@ -392,8 +399,9 @@ out:
}
newBlock := chainntnfs.BlockEpoch{
Height: item.Height,
Hash: &item.Hash,
Height: item.Height,
Hash: &item.Hash,
BlockHeader: blockHeader,
}
if err := b.handleBlockConnected(newBlock); err != nil {
chainntnfs.Log.Error(err)
@ -589,26 +597,29 @@ func (b *BitcoindNotifier) handleBlockConnected(block chainntnfs.BlockEpoch) err
// satisfy any client requests based upon the new block.
b.bestBlock = block
b.notifyBlockEpochs(block.Height, block.Hash)
b.notifyBlockEpochs(block.Height, block.Hash, block.BlockHeader)
return b.txNotifier.NotifyHeight(uint32(block.Height))
}
// notifyBlockEpochs notifies all registered block epoch clients of the newly
// connected block to the main chain.
func (b *BitcoindNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) {
func (b *BitcoindNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash,
blockHeader *wire.BlockHeader) {
for _, client := range b.blockEpochClients {
b.notifyBlockEpochClient(client, newHeight, newSha)
b.notifyBlockEpochClient(client, newHeight, newSha, blockHeader)
}
}
// notifyBlockEpochClient sends a registered block epoch client a notification
// about a specific block.
func (b *BitcoindNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistration,
height int32, sha *chainhash.Hash) {
height int32, sha *chainhash.Hash, header *wire.BlockHeader) {
epoch := &chainntnfs.BlockEpoch{
Height: height,
Hash: sha,
Height: height,
Hash: sha,
BlockHeader: header,
}
select {

View File

@ -206,14 +206,22 @@ func (b *BtcdNotifier) startNotifier() error {
return err
}
bestBlock, err := b.chainConn.GetBlock(currentHash)
if err != nil {
b.txUpdates.Stop()
b.chainUpdates.Stop()
return err
}
b.txNotifier = chainntnfs.NewTxNotifier(
uint32(currentHeight), chainntnfs.ReorgSafetyLimit,
b.confirmHintCache, b.spendHintCache,
)
b.bestBlock = chainntnfs.BlockEpoch{
Height: currentHeight,
Hash: currentHash,
Height: currentHeight,
Hash: currentHash,
BlockHeader: &bestBlock.Header,
}
if err := b.chainConn.NotifyBlocks(); err != nil {
@ -375,6 +383,7 @@ out:
b.notifyBlockEpochClient(
msg, b.bestBlock.Height,
b.bestBlock.Hash,
b.bestBlock.BlockHeader,
)
msg.errorChan <- nil
@ -396,6 +405,7 @@ out:
for _, block := range missedBlocks {
b.notifyBlockEpochClient(
msg, block.Height, block.Hash,
block.BlockHeader,
)
}
@ -405,8 +415,9 @@ out:
case item := <-b.chainUpdates.ChanOut():
update := item.(*chainUpdate)
if update.connect {
blockHeader, err :=
b.chainConn.GetBlockHeader(update.blockHash)
blockHeader, err := b.chainConn.GetBlockHeader(
update.blockHash,
)
if err != nil {
chainntnfs.Log.Errorf("Unable to fetch "+
"block header: %v", err)
@ -445,8 +456,9 @@ out:
}
newBlock := chainntnfs.BlockEpoch{
Height: update.blockHeight,
Hash: update.blockHash,
Height: update.blockHeight,
Hash: update.blockHash,
BlockHeader: blockHeader,
}
if err := b.handleBlockConnected(newBlock); err != nil {
chainntnfs.Log.Error(err)
@ -654,26 +666,34 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error {
// satisfy any client requests based upon the new block.
b.bestBlock = epoch
b.notifyBlockEpochs(epoch.Height, epoch.Hash)
b.notifyBlockEpochs(
epoch.Height, epoch.Hash, epoch.BlockHeader,
)
return b.txNotifier.NotifyHeight(uint32(epoch.Height))
}
// notifyBlockEpochs notifies all registered block epoch clients of the newly
// connected block to the main chain.
func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) {
func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32,
newSha *chainhash.Hash, blockHeader *wire.BlockHeader) {
for _, client := range b.blockEpochClients {
b.notifyBlockEpochClient(client, newHeight, newSha)
b.notifyBlockEpochClient(
client, newHeight, newSha, blockHeader,
)
}
}
// notifyBlockEpochClient sends a registered block epoch client a notification
// about a specific block.
func (b *BtcdNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistration,
height int32, sha *chainhash.Hash) {
height int32, sha *chainhash.Hash, blockHeader *wire.BlockHeader) {
epoch := &chainntnfs.BlockEpoch{
Height: height,
Hash: sha,
Height: height,
Hash: sha,
BlockHeader: blockHeader,
}
select {

View File

@ -302,6 +302,9 @@ type BlockEpoch struct {
// Height is the height of the latest block to be added to the tip of
// the main chain.
Height int32
// BlockHeader is the block header of this new height.
BlockHeader *wire.BlockHeader
}
// BlockEpochEvent encapsulates an on-going stream of block epoch
@ -489,8 +492,9 @@ func RewindChain(chainConn ChainConn, txNotifier *TxNotifier,
currBestBlock BlockEpoch, targetHeight int32) (BlockEpoch, error) {
newBestBlock := BlockEpoch{
Height: currBestBlock.Height,
Hash: currBestBlock.Hash,
Height: currBestBlock.Height,
Hash: currBestBlock.Hash,
BlockHeader: currBestBlock.BlockHeader,
}
for height := currBestBlock.Height; height > targetHeight; height-- {
@ -500,6 +504,11 @@ func RewindChain(chainConn ChainConn, txNotifier *TxNotifier,
"find blockhash for disconnected height=%d: %v",
height, err)
}
header, err := chainConn.GetBlockHeader(hash)
if err != nil {
return newBestBlock, fmt.Errorf("unable to get block "+
"header for height=%v", height-1)
}
Log.Infof("Block disconnected from main chain: "+
"height=%v, sha=%v", height, newBestBlock.Hash)
@ -512,7 +521,9 @@ func RewindChain(chainConn ChainConn, txNotifier *TxNotifier,
}
newBestBlock.Height = height - 1
newBestBlock.Hash = hash
newBestBlock.BlockHeader = header
}
return newBestBlock, nil
}
@ -536,8 +547,9 @@ func HandleMissedBlocks(chainConn ChainConn, txNotifier *TxNotifier,
// If a reorg causes our best hash to be incorrect, rewind the
// chain so our best block is set to the closest common
// ancestor, then dispatch notifications from there.
hashAtBestHeight, err :=
chainConn.GetBlockHash(int64(currBestBlock.Height))
hashAtBestHeight, err := chainConn.GetBlockHash(
int64(currBestBlock.Height),
)
if err != nil {
return currBestBlock, nil, fmt.Errorf("unable to find "+
"blockhash for height=%d: %v",
@ -552,8 +564,9 @@ func HandleMissedBlocks(chainConn ChainConn, txNotifier *TxNotifier,
"common ancestor: %v", err)
}
currBestBlock, err = RewindChain(chainConn, txNotifier,
currBestBlock, startingHeight)
currBestBlock, err = RewindChain(
chainConn, txNotifier, currBestBlock, startingHeight,
)
if err != nil {
return currBestBlock, nil, fmt.Errorf("unable to "+
"rewind chain: %v", err)
@ -589,8 +602,20 @@ func getMissedBlocks(chainConn ChainConn, startingHeight,
return nil, fmt.Errorf("unable to find blockhash for "+
"height=%d: %v", height, err)
}
missedBlocks = append(missedBlocks,
BlockEpoch{Hash: hash, Height: height})
header, err := chainConn.GetBlockHeader(hash)
if err != nil {
return nil, fmt.Errorf("unable to find block header "+
"for height=%d: %v", height, err)
}
missedBlocks = append(
missedBlocks,
BlockEpoch{
Hash: hash,
Height: height,
BlockHeader: header,
},
)
}
return missedBlocks, nil

View File

@ -175,8 +175,18 @@ func (n *NeutrinoNotifier) startNotifier() error {
n.chainUpdates.Stop()
return err
}
startingHeader, err := n.p2pNode.GetBlockHeader(
&startingPoint.Hash,
)
if err != nil {
n.txUpdates.Stop()
n.chainUpdates.Stop()
return err
}
n.bestBlock.Hash = &startingPoint.Hash
n.bestBlock.Height = startingPoint.Height
n.bestBlock.BlockHeader = startingHeader
n.txNotifier = chainntnfs.NewTxNotifier(
uint32(n.bestBlock.Height), chainntnfs.ReorgSafetyLimit,
@ -226,6 +236,7 @@ func (n *NeutrinoNotifier) startNotifier() error {
// includes a transaction that confirmed one of our watched txids, or spends
// one of the outputs currently being watched.
type filteredBlock struct {
header *wire.BlockHeader
hash chainhash.Hash
height uint32
txns []*btcutil.Tx
@ -255,6 +266,7 @@ func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32,
hash: header.BlockHash(),
height: uint32(height),
txns: txns,
header: header,
connect: true,
}:
case <-n.quit:
@ -374,6 +386,7 @@ out:
n.notifyBlockEpochClient(
msg, n.bestBlock.Height,
n.bestBlock.Hash,
n.bestBlock.BlockHeader,
)
msg.errorChan <- nil
@ -399,6 +412,7 @@ out:
for _, block := range missedBlocks {
n.notifyBlockEpochClient(
msg, block.Height, block.Hash,
block.BlockHeader,
)
}
@ -629,8 +643,11 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
// satisfy any client requests based upon the new block.
n.bestBlock.Hash = &newBlock.hash
n.bestBlock.Height = int32(newBlock.height)
n.bestBlock.BlockHeader = newBlock.header
n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash)
n.notifyBlockEpochs(
int32(newBlock.height), &newBlock.hash, newBlock.header,
)
return n.txNotifier.NotifyHeight(newBlock.height)
}
@ -646,6 +663,7 @@ func (n *NeutrinoNotifier) getFilteredBlock(epoch chainntnfs.BlockEpoch) (*filte
block := &filteredBlock{
hash: *epoch.Hash,
height: uint32(epoch.Height),
header: &rawBlock.MsgBlock().Header,
txns: txns,
connect: true,
}
@ -654,20 +672,23 @@ func (n *NeutrinoNotifier) getFilteredBlock(epoch chainntnfs.BlockEpoch) (*filte
// notifyBlockEpochs notifies all registered block epoch clients of the newly
// connected block to the main chain.
func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) {
func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash,
blockHeader *wire.BlockHeader) {
for _, client := range n.blockEpochClients {
n.notifyBlockEpochClient(client, newHeight, newSha)
n.notifyBlockEpochClient(client, newHeight, newSha, blockHeader)
}
}
// notifyBlockEpochClient sends a registered block epoch client a notification
// about a specific block.
func (n *NeutrinoNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistration,
height int32, sha *chainhash.Hash) {
height int32, sha *chainhash.Hash, blockHeader *wire.BlockHeader) {
epoch := &chainntnfs.BlockEpoch{
Height: height,
Hash: sha,
Height: height,
Hash: sha,
BlockHeader: blockHeader,
}
select {

View File

@ -393,7 +393,6 @@ func testBlockEpochNotification(miner *rpctest.Harness,
// We'd like to test the case of multiple registered clients receiving
// block epoch notifications.
const numBlocks = 10
const numNtfns = numBlocks + 1
const numClients = 5
@ -403,6 +402,7 @@ func testBlockEpochNotification(miner *rpctest.Harness,
// expect each client to receive 11 notifications, one for the current
// tip of the chain, and one for each of the ten blocks we generate
// below. So we'll use a WaitGroup to synchronize the test.
clientErrors := make(chan error, numClients)
for i := 0; i < numClients; i++ {
epochClient, err := notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
@ -412,7 +412,24 @@ func testBlockEpochNotification(miner *rpctest.Harness,
wg.Add(numNtfns)
go func() {
for i := 0; i < numNtfns; i++ {
<-epochClient.Epochs
// Ensure that each block epoch has a header,
// and that header matches the contained header
// hash.
blockEpoch := <-epochClient.Epochs
if blockEpoch.BlockHeader == nil {
fmt.Println(i)
clientErrors <- fmt.Errorf("block " +
"header is nil")
return
}
if blockEpoch.BlockHeader.BlockHash() !=
*blockEpoch.Hash {
clientErrors <- fmt.Errorf("block " +
"header hash mismatch")
return
}
wg.Done()
}
}()
@ -431,6 +448,8 @@ func testBlockEpochNotification(miner *rpctest.Harness,
}
select {
case err := <-clientErrors:
t.Fatalf("block epoch case failed: %v", err)
case <-epochsSent:
case <-time.After(30 * time.Second):
t.Fatalf("all notifications not sent")

View File

@ -7,6 +7,25 @@ is only used for onion address connections, and clearnet for everything else.
This new behavior can be added using the `tor.skip-proxy-for-clearnet-targets`
flag.
## LN Peer-to-Peer Netowrk
### Bitcoin Blockheaders in Ping Messages
[In this release, we implement a long discussed mechanism to use the Lightning
Network as a redundant block header
source](https://github.com/lightningnetwork/lnd/pull/5621). By sending our
latest block header with each ping message, we give peers another source
(outside of the Bitcoin P2P network) they can use to spot check their chain
state. Peers can also use this information to detect if they've been eclipsed
from the traditional Bitcoin P2P network itself.
As is, we only send this data in Ping messages (which are periodically sent),
in the future we could also move to send them as the partial payload for our
pong messages, and also randomize the payload size requested as well.
The `ListPeers` RPC call will now also include a hex encoded version of the
last ping message the peer has sent to us.
## Backend Enhancements & Optimizations
### Full remote database support

File diff suppressed because it is too large Load Diff

View File

@ -1561,6 +1561,11 @@ message Peer {
zero, we have not observed any flaps for this peer.
*/
int64 last_flap_ns = 14;
/*
The last ping payload the peer has sent to us.
*/
bytes last_ping_payload = 15;
}
message TimestampedError {

View File

@ -5179,6 +5179,11 @@
"type": "string",
"format": "int64",
"description": "The timestamp of the last flap we observed for this peer. If this value is\nzero, we have not observed any flaps for this peer."
},
"last_ping_payload": {
"type": "string",
"format": "byte",
"description": "The last ping payload the peer has sent to us."
}
}
},

View File

@ -347,6 +347,13 @@ type Brontide struct {
// our last ping message. To be used atomically.
pingLastSend int64
// lastPingPayload stores an unsafe pointer wrapped as an atomic
// variable which points to the last payload the remote party sent us
// as their ping.
//
// MUST be used atomically.
lastPingPayload atomic.Value
cfg Config
// activeSignal when closed signals that the peer is now active and
@ -1368,6 +1375,14 @@ out:
atomic.StoreInt64(&p.pingTime, delay)
case *lnwire.Ping:
// First, we'll store their latest ping payload within
// the relevant atomic variable.
p.lastPingPayload.Store(msg.PaddingBytes[:])
// Next, we'll send over the amount of specified pong
// bytes.
//
// TODO(roasbeef): read out from pong scratch instead?
pongBytes := make([]byte, msg.NumPongBytes)
p.queueMsg(lnwire.NewPong(pongBytes), nil)
@ -1630,12 +1645,10 @@ func messageSummary(msg lnwire.Message) string {
msg.NodeID, time.Unix(int64(msg.Timestamp), 0))
case *lnwire.Ping:
// No summary.
return ""
return fmt.Sprintf("ping_bytes=%x", msg.PaddingBytes[:])
case *lnwire.Pong:
// No summary.
return ""
return fmt.Sprintf("pong_bytes=%x", msg.PongBytes[:])
case *lnwire.UpdateFee:
return fmt.Sprintf("chan_id=%v, fee_update_sat=%v",
@ -1983,13 +1996,50 @@ func (p *Brontide) pingHandler() {
defer pingTicker.Stop()
// TODO(roasbeef): make dynamic in order to create fake cover traffic
const numPingBytes = 16
const numPongBytes = 16
blockEpochs, err := p.cfg.ChainNotifier.RegisterBlockEpochNtfn(nil)
if err != nil {
peerLog.Errorf("unable to establish block epoch "+
"subscription: %v", err)
}
var (
pingPayload [wire.MaxBlockHeaderPayload]byte
blockHeader *wire.BlockHeader
)
out:
for {
select {
// Each time a new block comes in, we'll copy the raw header
// contents over to our ping payload declared above. Over time,
// we'll use this to disseminate the latest block header
// between all our peers, which can later be used to
// cross-check our own view of the network to mitigate various
// types of eclipse attacks.
case epoch, ok := <-blockEpochs.Epochs:
if !ok {
peerLog.Debugf("block notifications " +
"canceled")
return
}
blockHeader = epoch.BlockHeader
headerBuf := bytes.NewBuffer(pingPayload[0:0])
err := blockHeader.Serialize(headerBuf)
if err != nil {
peerLog.Errorf("unable to encode header: %v",
err)
}
case <-pingTicker.C:
p.queueMsg(lnwire.NewPing(numPingBytes), nil)
pingMsg := &lnwire.Ping{
NumPongBytes: numPongBytes,
PaddingBytes: pingPayload[:],
}
p.queueMsg(pingMsg, nil)
case <-p.quit:
break out
}
@ -3126,3 +3176,19 @@ func (p *Brontide) BytesReceived() uint64 {
func (p *Brontide) BytesSent() uint64 {
return atomic.LoadUint64(&p.bytesSent)
}
// LastRemotePingPayload returns the last payload the remote party sent as part
// of their ping.
func (p *Brontide) LastRemotePingPayload() []byte {
pingPayload := p.lastPingPayload.Load()
if pingPayload == nil {
return []byte{}
}
pingBytes, ok := pingPayload.(lnwire.PingPayload)
if !ok {
return nil
}
return pingBytes
}

View File

@ -2682,16 +2682,17 @@ func (r *rpcServer) ListPeers(ctx context.Context,
)
rpcPeer := &lnrpc.Peer{
PubKey: hex.EncodeToString(nodePub[:]),
Address: serverPeer.Conn().RemoteAddr().String(),
Inbound: serverPeer.Inbound(),
BytesRecv: serverPeer.BytesReceived(),
BytesSent: serverPeer.BytesSent(),
SatSent: satSent,
SatRecv: satRecv,
PingTime: serverPeer.PingTime(),
SyncType: lnrpcSyncType,
Features: features,
PubKey: hex.EncodeToString(nodePub[:]),
Address: serverPeer.Conn().RemoteAddr().String(),
Inbound: serverPeer.Inbound(),
BytesRecv: serverPeer.BytesReceived(),
BytesSent: serverPeer.BytesSent(),
SatSent: satSent,
SatRecv: satRecv,
PingTime: serverPeer.PingTime(),
SyncType: lnrpcSyncType,
Features: features,
LastPingPayload: serverPeer.LastRemotePingPayload(),
}
var peerErrors []interface{}