peer: implement the ping/pong workflow

This commit refactors the peer struct slightly in order to implement
the new ping/pong workflow added in a prior commit. Pings are currently
sent every 30 seconds unconditionally.
This commit is contained in:
Olaoluwa Osuntokun 2016-11-10 17:15:25 -08:00
parent 3f39f5413e
commit 297133316f
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2

72
peer.go
View File

@ -3,6 +3,8 @@ package main
import ( import (
"bytes" "bytes"
"container/list" "container/list"
"crypto/rand"
"encoding/binary"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"net" "net"
@ -10,6 +12,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/BitfuryLightning/tools/rt"
"github.com/BitfuryLightning/tools/rt/graph" "github.com/BitfuryLightning/tools/rt/graph"
"github.com/btcsuite/fastsha256" "github.com/btcsuite/fastsha256"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
@ -53,9 +56,10 @@ type chanSnapshotReq struct {
} }
// peer is an active peer on the Lightning Network. This struct is responsible // peer is an active peer on the Lightning Network. This struct is responsible
// for managing any channel state related to this peer. To do so, it has several // for managing any channel state related to this peer. To do so, it has
// helper goroutines to handle events such as HTLC timeouts, new funding // several helper goroutines to handle events such as HTLC timeouts, new
// workflow, and detecting an uncooperative closure of any active channels. // funding workflow, and detecting an uncooperative closure of any active
// channels.
// TODO(roasbeef): proper reconnection logic // TODO(roasbeef): proper reconnection logic
type peer struct { type peer struct {
// MUST be used atomically. // MUST be used atomically.
@ -118,8 +122,8 @@ type peer struct {
// channels to the source peer which handled the funding workflow. // channels to the source peer which handled the funding workflow.
newChannels chan *lnwallet.LightningChannel newChannels chan *lnwallet.LightningChannel
// localCloseChanReqs is a channel in which any local requests to // localCloseChanReqs is a channel in which any local requests to close
// close a particular channel are sent over. // a particular channel are sent over.
localCloseChanReqs chan *closeLinkReq localCloseChanReqs chan *closeLinkReq
// remoteCloseChanReqs is a channel in which any remote requests // remoteCloseChanReqs is a channel in which any remote requests
@ -131,7 +135,7 @@ type peer struct {
// next pending channel. Pending channels are tracked by this id // next pending channel. Pending channels are tracked by this id
// throughout their lifetime until they become active channels, or are // throughout their lifetime until they become active channels, or are
// cancelled. Channels id's initiated by an outbound node start from 0, // cancelled. Channels id's initiated by an outbound node start from 0,
// while channels inititaed by an inbound node start from 2^63. In // while channels initiated by an inbound node start from 2^63. In
// either case, this value is always monotonically increasing. // either case, this value is always monotonically increasing.
nextPendingChannelID uint64 nextPendingChannelID uint64
pendingChannelMtx sync.RWMutex pendingChannelMtx sync.RWMutex
@ -241,7 +245,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
} }
// Start starts all helper goroutines the peer needs for normal operations. // Start starts all helper goroutines the peer needs for normal operations.
// In the case this peer has already beeen started, then this function is a // In the case this peer has already been started, then this function is a
// noop. // noop.
func (p *peer) Start() error { func (p *peer) Start() error {
if atomic.AddInt32(&p.started, 1) != 1 { if atomic.AddInt32(&p.started, 1) != 1 {
@ -250,11 +254,12 @@ func (p *peer) Start() error {
peerLog.Tracef("peer %v starting", p) peerLog.Tracef("peer %v starting", p)
p.wg.Add(4) p.wg.Add(5)
go p.readHandler() go p.readHandler()
go p.queueHandler() go p.queueHandler()
go p.writeHandler() go p.writeHandler()
go p.channelManager() go p.channelManager()
go p.pingHandler()
return nil return nil
} }
@ -347,6 +352,9 @@ out:
var targetChan *wire.OutPoint var targetChan *wire.OutPoint
switch msg := nextMsg.(type) { switch msg := nextMsg.(type) {
case *lnwire.Ping:
p.queueMsg(lnwire.NewPong(msg.Nonce), nil)
// TODO(roasbeef): consolidate into predicate (single vs dual) // TODO(roasbeef): consolidate into predicate (single vs dual)
case *lnwire.SingleFundingRequest: case *lnwire.SingleFundingRequest:
p.server.fundingMgr.processFundingRequest(msg, p) p.server.fundingMgr.processFundingRequest(msg, p)
@ -457,10 +465,6 @@ func (p *peer) writeMessage(msg lnwire.Message) error {
// //
// NOTE: This method MUST be run as a goroutine. // NOTE: This method MUST be run as a goroutine.
func (p *peer) writeHandler() { func (p *peer) writeHandler() {
// pingTicker is used to periodically send pings to the remote peer.
pingTicker := time.NewTicker(pingInterval)
defer pingTicker.Stop()
out: out:
for { for {
select { select {
@ -477,8 +481,6 @@ out:
// Synchronize with the writeHandler. // Synchronize with the writeHandler.
p.sendQueueSync <- struct{}{} p.sendQueueSync <- struct{}{}
case <-pingTicker.C:
// TODO(roasbeef): move ping to time.AfterFunc
case <-p.quit: case <-p.quit:
break out break out
} }
@ -546,14 +548,50 @@ out:
p.wg.Done() p.wg.Done()
} }
// pingHandler is responsible for periodically sending ping messages to the
// remote peer in order to keep the connection alive and/or determine if the
// connection is still active.
//
// NOTE: This method MUST be run as a goroutine.
func (p *peer) pingHandler() {
pingTicker := time.NewTicker(pingInterval)
defer pingTicker.Stop()
var pingBuf [8]byte
out:
for {
select {
case <-pingTicker.C:
// Fill the ping buffer with fresh randomness. If we're
// unable to read enough bytes, then we simply defer
// sending the ping to the next interval.
if _, err := rand.Read(pingBuf[:]); err != nil {
peerLog.Errorf("unable to send ping to %v: %v", p,
err)
continue
}
// Convert the bytes read into a uint64, and queue the
// message for sending.
nonce := binary.BigEndian.Uint64(pingBuf[:])
p.queueMsg(lnwire.NewPing(nonce), nil)
case <-p.quit:
break out
}
}
p.wg.Done()
}
// queueMsg queues a new lnwire.Message to be eventually sent out on the // queueMsg queues a new lnwire.Message to be eventually sent out on the
// wire. // wire.
func (p *peer) queueMsg(msg lnwire.Message, doneChan chan struct{}) { func (p *peer) queueMsg(msg lnwire.Message, doneChan chan struct{}) {
p.outgoingQueue <- outgoinMsg{msg, doneChan} p.outgoingQueue <- outgoinMsg{msg, doneChan}
} }
// ChannelSnapshots returns a slice of channel snapshots detaling all currently // ChannelSnapshots returns a slice of channel snapshots detailing all
// active channels maintained with the remote peer. // currently active channels maintained with the remote peer.
func (p *peer) ChannelSnapshots() []*channeldb.ChannelSnapshot { func (p *peer) ChannelSnapshots() []*channeldb.ChannelSnapshot {
resp := make(chan []*channeldb.ChannelSnapshot, 1) resp := make(chan []*channeldb.ChannelSnapshot, 1)
p.chanSnapshotReqs <- &chanSnapshotReq{resp} p.chanSnapshotReqs <- &chanSnapshotReq{resp}
@ -1003,7 +1041,7 @@ out:
// Otherwise, attempt to extend the remote commitment // Otherwise, attempt to extend the remote commitment
// chain including all the currently pending entries. // chain including all the currently pending entries.
// If the send was unsuccesful, then abaondon the // If the send was unsuccessful, then abandon the
// update, waiting for the revocation window to open // update, waiting for the revocation window to open
// up. // up.
if sent, err := p.updateCommitTx(state); err != nil { if sent, err := p.updateCommitTx(state); err != nil {