peer: Add new function to create test peer.

Signed-off-by: Ononiwu Maureen <maureen.ononiwu@outlook.com>
This commit is contained in:
Ononiwu Maureen 2024-03-15 12:21:49 +01:00 committed by Ononiwu Maureen
parent 8f76c5eeef
commit 2ec0fe0717
No known key found for this signature in database
GPG Key ID: 82470E85193AD2DA
2 changed files with 278 additions and 286 deletions

View File

@ -6,22 +6,18 @@ import (
"testing"
"time"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lntest/mock"
"github.com/lightningnetwork/lnd/lntest/wait"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chancloser"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/pool"
"github.com/stretchr/testify/require"
)
@ -42,8 +38,10 @@ func TestPeerChannelClosureShutdownResponseLinkRemoved(t *testing.T) {
harness, err := createTestPeerWithChannel(t, noUpdate)
require.NoError(t, err, "unable to create test channels")
alicePeer := harness.peer
bobChan := harness.channel
var (
alicePeer = harness.peer
bobChan = harness.channel
)
chanPoint := bobChan.ChannelPoint()
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
@ -82,11 +80,13 @@ func TestPeerChannelClosureAcceptFeeResponder(t *testing.T) {
harness, err := createTestPeerWithChannel(t, noUpdate)
require.NoError(t, err, "unable to create test channels")
alicePeer := harness.peer
bobChan := harness.channel
mockSwitch := harness.mockSwitch
broadcastTxChan := harness.publishTx
notifier := harness.notifier
var (
alicePeer = harness.peer
bobChan = harness.channel
mockSwitch = harness.mockSwitch
broadcastTxChan = harness.publishTx
notifier = harness.notifier
)
chanPoint := bobChan.ChannelPoint()
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
@ -182,11 +182,13 @@ func TestPeerChannelClosureAcceptFeeInitiator(t *testing.T) {
harness, err := createTestPeerWithChannel(t, noUpdate)
require.NoError(t, err, "unable to create test channels")
bobChan := harness.channel
alicePeer := harness.peer
mockSwitch := harness.mockSwitch
broadcastTxChan := harness.publishTx
notifier := harness.notifier
var (
bobChan = harness.channel
alicePeer = harness.peer
mockSwitch = harness.mockSwitch
broadcastTxChan = harness.publishTx
notifier = harness.notifier
)
chanPoint := bobChan.ChannelPoint()
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
@ -301,11 +303,13 @@ func TestPeerChannelClosureFeeNegotiationsResponder(t *testing.T) {
harness, err := createTestPeerWithChannel(t, noUpdate)
require.NoError(t, err, "unable to create test channels")
bobChan := harness.channel
alicePeer := harness.peer
mockSwitch := harness.mockSwitch
broadcastTxChan := harness.publishTx
notifier := harness.notifier
var (
bobChan = harness.channel
alicePeer = harness.peer
mockSwitch = harness.mockSwitch
broadcastTxChan = harness.publishTx
notifier = harness.notifier
)
chanPoint := bobChan.ChannelPoint()
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
@ -483,11 +487,13 @@ func TestPeerChannelClosureFeeNegotiationsInitiator(t *testing.T) {
harness, err := createTestPeerWithChannel(t, noUpdate)
require.NoError(t, err, "unable to create test channels")
alicePeer := harness.peer
bobChan := harness.channel
mockSwitch := harness.mockSwitch
broadcastTxChan := harness.publishTx
notifier := harness.notifier
var (
alicePeer = harness.peer
bobChan = harness.channel
mockSwitch = harness.mockSwitch
broadcastTxChan = harness.publishTx
notifier = harness.notifier
)
chanPoint := bobChan.ChannelPoint()
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
@ -810,9 +816,11 @@ func TestCustomShutdownScript(t *testing.T) {
t.Fatalf("unable to create test channels: %v", err)
}
alicePeer := harness.peer
bobChan := harness.channel
mockSwitch := harness.mockSwitch
var (
alicePeer = harness.peer
bobChan = harness.channel
mockSwitch = harness.mockSwitch
)
chanPoint := bobChan.ChannelPoint()
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
@ -966,35 +974,23 @@ func TestStaticRemoteDowngrade(t *testing.T) {
test := test
t.Run(test.name, func(t *testing.T) {
writeBufferPool := pool.NewWriteBuffer(
pool.DefaultWriteBufferGCInterval,
pool.DefaultWriteBufferExpiryInterval,
params := createTestPeer(t)
var (
p = params.peer
mockConn = params.mockConn
writePool = p.cfg.WritePool
)
writePool := pool.NewWrite(
writeBufferPool, 1, timeout,
)
require.NoError(t, writePool.Start())
mockConn := newMockConn(t, 1)
p := Brontide{
cfg: Config{
LegacyFeatures: legacy,
Features: test.features,
Conn: mockConn,
WritePool: writePool,
PongBuf: make([]byte, lnwire.MaxPongBytes),
},
log: peerLog,
}
// Set feature bits.
p.cfg.LegacyFeatures = legacy
p.cfg.Features = test.features
var b bytes.Buffer
_, err := lnwire.WriteMessage(&b, test.expectedInit, 0)
require.NoError(t, err)
// Send our init message, assert that we write our expected message
// and shutdown our write pool.
// Send our init message, assert that we write our
// expected message and shutdown our write pool.
require.NoError(t, p.sendInitMsg(test.legacy))
mockConn.assertWrite(b.Bytes())
require.NoError(t, writePool.Stop())
@ -1022,78 +1018,15 @@ func genScript(t *testing.T, address string) lnwire.DeliveryAddress {
func TestPeerCustomMessage(t *testing.T) {
t.Parallel()
// Set up node Alice.
dbAlice, err := channeldb.Open(t.TempDir())
require.NoError(t, err)
params := createTestPeer(t)
aliceKey, err := btcec.NewPrivateKey()
require.NoError(t, err)
writeBufferPool := pool.NewWriteBuffer(
pool.DefaultWriteBufferGCInterval,
pool.DefaultWriteBufferExpiryInterval,
var (
mockConn = params.mockConn
alicePeer = params.peer
receivedCustomChan = params.customChan
)
writePool := pool.NewWrite(
writeBufferPool, 1, timeout,
)
require.NoError(t, writePool.Start())
readBufferPool := pool.NewReadBuffer(
pool.DefaultReadBufferGCInterval,
pool.DefaultReadBufferExpiryInterval,
)
readPool := pool.NewRead(
readBufferPool, 1, timeout,
)
require.NoError(t, readPool.Start())
mockConn := newMockConn(t, 1)
receivedCustomChan := make(chan *customMsg)
remoteKey := [33]byte{8}
notifier := &mock.ChainNotifier{
SpendChan: make(chan *chainntnfs.SpendDetail),
EpochChan: make(chan *chainntnfs.BlockEpoch),
ConfChan: make(chan *chainntnfs.TxConfirmation),
}
// TODO(yy): change ChannelNotifier to be an interface.
channelNotifier := channelnotifier.New(dbAlice.ChannelStateDB())
require.NoError(t, channelNotifier.Start())
t.Cleanup(func() {
require.NoError(t, channelNotifier.Stop(),
"stop channel notifier failed")
})
alicePeer := NewBrontide(Config{
PubKeyBytes: remoteKey,
ChannelDB: dbAlice.ChannelStateDB(),
Addr: &lnwire.NetAddress{
IdentityKey: aliceKey.PubKey(),
},
PrunePersistentPeerConnection: func([33]byte) {},
Features: lnwire.EmptyFeatureVector(),
LegacyFeatures: lnwire.EmptyFeatureVector(),
WritePool: writePool,
ReadPool: readPool,
Conn: mockConn,
ChainNotifier: notifier,
HandleCustomMessage: func(
peer [33]byte, msg *lnwire.Custom) error {
receivedCustomChan <- &customMsg{
peer: peer,
msg: *msg,
}
return nil
},
PongBuf: make([]byte, lnwire.MaxPongBytes),
ChannelNotifier: channelNotifier,
})
remoteKey := alicePeer.PubKey()
// Set up the init sequence.
go func() {
@ -1108,7 +1041,7 @@ func TestPeerCustomMessage(t *testing.T) {
lnwire.NewRawFeatureVector(),
)
var b bytes.Buffer
_, err = lnwire.WriteMessage(&b, initReplyMsg, 0)
_, err := lnwire.WriteMessage(&b, initReplyMsg, 0)
require.NoError(t, err)
mockConn.readMessages <- b.Bytes()
@ -1394,28 +1327,6 @@ func TestStartupWriteMessageRace(t *testing.T) {
mockConn := newMockConn(t, 2)
peer.cfg.Conn = mockConn
// Set up other configuration necessary to successfully execute
// peer.Start().
peer.cfg.LegacyFeatures = lnwire.EmptyFeatureVector()
writeBufferPool := pool.NewWriteBuffer(
pool.DefaultWriteBufferGCInterval,
pool.DefaultWriteBufferExpiryInterval,
)
writePool := pool.NewWrite(
writeBufferPool, 1, timeout,
)
require.NoError(t, writePool.Start())
peer.cfg.WritePool = writePool
readBufferPool := pool.NewReadBuffer(
pool.DefaultReadBufferGCInterval,
pool.DefaultReadBufferExpiryInterval,
)
readPool := pool.NewRead(
readBufferPool, 1, timeout,
)
require.NoError(t, readPool.Start())
peer.cfg.ReadPool = readPool
// Send a message while starting the peer. As the peer starts up, it
// should not trigger a data race between the sending of this message
// and the sending of the channel reestablish message.

View File

@ -27,6 +27,7 @@ import (
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/netann"
"github.com/lightningnetwork/lnd/pool"
"github.com/lightningnetwork/lnd/queue"
"github.com/lightningnetwork/lnd/shachain"
"github.com/stretchr/testify/require"
@ -54,11 +55,16 @@ var (
var noUpdate = func(a, b *channeldb.OpenChannel) {}
type peerTestCtx struct {
peer *Brontide
channel *lnwallet.LightningChannel
notifier *mock.ChainNotifier
publishTx <-chan *wire.MsgTx
mockSwitch *mockMessageSwitch
peer *Brontide
channel *lnwallet.LightningChannel
notifier *mock.ChainNotifier
publishTx <-chan *wire.MsgTx
mockSwitch *mockMessageSwitch
db *channeldb.DB
privKey *btcec.PrivateKey
mockConn *mockMessageConn
customChan chan *customMsg
chanStatusMgr *netann.ChanStatusManager
}
// createTestPeerWithChannel creates a channel between two nodes, and returns a
@ -68,30 +74,26 @@ type peerTestCtx struct {
func createTestPeerWithChannel(t *testing.T, updateChan func(a,
b *channeldb.OpenChannel)) (*peerTestCtx, error) {
// TODO(yy): create interface for lnwallet.LightningChannel so we can
// easily mock it without the following setups.
notifier := &mock.ChainNotifier{
SpendChan: make(chan *chainntnfs.SpendDetail),
EpochChan: make(chan *chainntnfs.BlockEpoch),
ConfChan: make(chan *chainntnfs.TxConfirmation),
}
params := createTestPeer(t)
var mockSwitch *mockMessageSwitch
var (
publishTx = params.publishTx
mockSwitch = params.mockSwitch
alicePeer = params.peer
notifier = params.notifier
aliceKeyPriv = params.privKey
dbAlice = params.db
chanStatusMgr = params.chanStatusMgr
)
publishTx := make(chan *wire.MsgTx)
err := chanStatusMgr.Start()
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, chanStatusMgr.Stop())
})
nodeKeyLocator := keychain.KeyLocator{
Family: keychain.KeyFamilyNodeKey,
}
aliceKeyPriv, aliceKeyPub := btcec.PrivKeyFromBytes(
channels.AlicesPrivKey,
)
aliceKeySigner := keychain.NewPrivKeyMessageSigner(
aliceKeyPriv, nodeKeyLocator,
)
bobKeyPriv, bobKeyPub := btcec.PrivKeyFromBytes(
channels.BobsPrivKey,
)
aliceKeyPub := alicePeer.IdentityKey()
estimator := alicePeer.cfg.FeeEstimator
channelCapacity := btcutil.Amount(10 * 1e8)
channelBal := channelCapacity / 2
@ -107,6 +109,10 @@ func createTestPeerWithChannel(t *testing.T, updateChan func(a,
}
fundingTxIn := wire.NewTxIn(prevOut, nil, nil)
bobKeyPriv, bobKeyPub := btcec.PrivKeyFromBytes(
channels.BobsPrivKey,
)
aliceCfg := channeldb.ChannelConfig{
ChannelConstraints: channeldb.ChannelConstraints{
DustLimit: aliceDustLimit,
@ -189,14 +195,6 @@ func createTestPeerWithChannel(t *testing.T, updateChan func(a,
return nil, err
}
dbAlice, err := channeldb.Open(t.TempDir())
if err != nil {
return nil, err
}
t.Cleanup(func() {
require.NoError(t, dbAlice.Close())
})
dbBob, err := channeldb.Open(t.TempDir())
if err != nil {
return nil, err
@ -205,7 +203,6 @@ func createTestPeerWithChannel(t *testing.T, updateChan func(a,
require.NoError(t, dbBob.Close())
})
estimator := chainfee.NewStaticEstimator(12500, 0)
feePerKw, err := estimator.EstimateFeePerKW(1)
if err != nil {
return nil, err
@ -278,11 +275,7 @@ func createTestPeerWithChannel(t *testing.T, updateChan func(a,
// Set custom values on the channel states.
updateChan(aliceChannelState, bobChannelState)
aliceAddr := &net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: 18555,
}
aliceAddr := alicePeer.cfg.Addr.Address
if err := aliceChannelState.SyncPending(aliceAddr, 0); err != nil {
return nil, err
}
@ -327,109 +320,9 @@ func createTestPeerWithChannel(t *testing.T, updateChan func(a,
require.NoError(t, bobPool.Stop())
})
chainIO := &mock.ChainIO{
BestHeight: broadcastHeight,
}
wallet := &lnwallet.LightningWallet{
WalletController: &mock.WalletController{
RootKey: aliceKeyPriv,
PublishedTransactions: publishTx,
},
}
// If mockSwitch is not set by the caller, set it to the default as the
// caller does not need to control it.
if mockSwitch == nil {
mockSwitch = &mockMessageSwitch{}
}
nodeSignerAlice := netann.NewNodeSigner(aliceKeySigner)
const chanActiveTimeout = time.Minute
chanStatusMgr, err := netann.NewChanStatusManager(&netann.ChanStatusConfig{
ChanStatusSampleInterval: 30 * time.Second,
ChanEnableTimeout: chanActiveTimeout,
ChanDisableTimeout: 2 * time.Minute,
DB: dbAlice.ChannelStateDB(),
Graph: dbAlice.ChannelGraph(),
MessageSigner: nodeSignerAlice,
OurPubKey: aliceKeyPub,
OurKeyLoc: testKeyLoc,
IsChannelActive: func(lnwire.ChannelID) bool { return true },
ApplyChannelUpdate: func(*lnwire.ChannelUpdate,
*wire.OutPoint, bool) error {
return nil
},
})
if err != nil {
return nil, err
}
if err = chanStatusMgr.Start(); err != nil {
return nil, err
}
errBuffer, err := queue.NewCircularBuffer(ErrorBufferSize)
if err != nil {
return nil, err
}
var pubKey [33]byte
copy(pubKey[:], aliceKeyPub.SerializeCompressed())
cfgAddr := &lnwire.NetAddress{
IdentityKey: aliceKeyPub,
Address: aliceAddr,
ChainNet: wire.SimNet,
}
interceptableSwitchNotifier := &mock.ChainNotifier{
EpochChan: make(chan *chainntnfs.BlockEpoch, 1),
}
interceptableSwitchNotifier.EpochChan <- &chainntnfs.BlockEpoch{
Height: 1,
}
interceptableSwitch, err := htlcswitch.NewInterceptableSwitch(
&htlcswitch.InterceptableSwitchConfig{
CltvRejectDelta: testCltvRejectDelta,
CltvInterceptDelta: testCltvRejectDelta + 3,
Notifier: interceptableSwitchNotifier,
},
alicePeer.remoteFeatures = lnwire.NewFeatureVector(
nil, lnwire.Features,
)
if err != nil {
return nil, err
}
// TODO(yy): change ChannelNotifier to be an interface.
channelNotifier := channelnotifier.New(dbAlice.ChannelStateDB())
require.NoError(t, channelNotifier.Start())
t.Cleanup(func() {
require.NoError(t, channelNotifier.Stop(),
"stop channel notifier failed")
})
cfg := &Config{
Addr: cfgAddr,
PubKeyBytes: pubKey,
ErrorBuffer: errBuffer,
ChainIO: chainIO,
Switch: mockSwitch,
ChanActiveTimeout: chanActiveTimeout,
InterceptSwitch: interceptableSwitch,
ChannelDB: dbAlice.ChannelStateDB(),
FeeEstimator: estimator,
Wallet: wallet,
ChainNotifier: notifier,
ChanStatusMgr: chanStatusMgr,
Features: lnwire.NewFeatureVector(nil, lnwire.Features),
DisconnectPeer: func(b *btcec.PublicKey) error { return nil },
ChannelNotifier: channelNotifier,
}
alicePeer := NewBrontide(*cfg)
alicePeer.remoteFeatures = lnwire.NewFeatureVector(nil, lnwire.Features)
chanID := lnwire.NewChanIDFromOutPoint(channelAlice.ChannelPoint())
alicePeer.activeChannels.Store(chanID, channelAlice)
@ -643,3 +536,191 @@ func (m *mockMessageConn) LocalAddr() net.Addr {
func (m *mockMessageConn) Close() error {
return nil
}
// createTestPeer creates a new peer for testing and returns a context struct
// containing necessary handles and mock objects for conducting tests on peer
// functionalities.
func createTestPeer(t *testing.T) *peerTestCtx {
nodeKeyLocator := keychain.KeyLocator{
Family: keychain.KeyFamilyNodeKey,
}
aliceKeyPriv, aliceKeyPub := btcec.PrivKeyFromBytes(
channels.AlicesPrivKey,
)
aliceKeySigner := keychain.NewPrivKeyMessageSigner(
aliceKeyPriv, nodeKeyLocator,
)
aliceAddr := &net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: 18555,
}
cfgAddr := &lnwire.NetAddress{
IdentityKey: aliceKeyPub,
Address: aliceAddr,
ChainNet: wire.SimNet,
}
errBuffer, err := queue.NewCircularBuffer(ErrorBufferSize)
require.NoError(t, err)
chainIO := &mock.ChainIO{
BestHeight: broadcastHeight,
}
publishTx := make(chan *wire.MsgTx)
wallet := &lnwallet.LightningWallet{
WalletController: &mock.WalletController{
RootKey: aliceKeyPriv,
PublishedTransactions: publishTx,
},
}
const chanActiveTimeout = time.Minute
dbAlice, err := channeldb.Open(t.TempDir())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, dbAlice.Close())
})
nodeSignerAlice := netann.NewNodeSigner(aliceKeySigner)
chanStatusMgr, err := netann.NewChanStatusManager(&netann.
ChanStatusConfig{
ChanStatusSampleInterval: 30 * time.Second,
ChanEnableTimeout: chanActiveTimeout,
ChanDisableTimeout: 2 * time.Minute,
DB: dbAlice.ChannelStateDB(),
Graph: dbAlice.ChannelGraph(),
MessageSigner: nodeSignerAlice,
OurPubKey: aliceKeyPub,
OurKeyLoc: testKeyLoc,
IsChannelActive: func(lnwire.ChannelID) bool {
return true
},
ApplyChannelUpdate: func(*lnwire.ChannelUpdate,
*wire.OutPoint, bool) error {
return nil
},
})
require.NoError(t, err)
interceptableSwitchNotifier := &mock.ChainNotifier{
EpochChan: make(chan *chainntnfs.BlockEpoch, 1),
}
interceptableSwitchNotifier.EpochChan <- &chainntnfs.BlockEpoch{
Height: 1,
}
interceptableSwitch, err := htlcswitch.NewInterceptableSwitch(
&htlcswitch.InterceptableSwitchConfig{
CltvRejectDelta: testCltvRejectDelta,
CltvInterceptDelta: testCltvRejectDelta + 3,
Notifier: interceptableSwitchNotifier,
},
)
require.NoError(t, err)
// TODO(yy): create interface for lnwallet.LightningChannel so we can
// easily mock it without the following setups.
notifier := &mock.ChainNotifier{
SpendChan: make(chan *chainntnfs.SpendDetail),
EpochChan: make(chan *chainntnfs.BlockEpoch),
ConfChan: make(chan *chainntnfs.TxConfirmation),
}
mockSwitch := &mockMessageSwitch{}
// TODO(yy): change ChannelNotifier to be an interface.
channelNotifier := channelnotifier.New(dbAlice.ChannelStateDB())
require.NoError(t, channelNotifier.Start())
t.Cleanup(func() {
require.NoError(t, channelNotifier.Stop(),
"stop channel notifier failed")
})
writeBufferPool := pool.NewWriteBuffer(
pool.DefaultWriteBufferGCInterval,
pool.DefaultWriteBufferExpiryInterval,
)
writePool := pool.NewWrite(
writeBufferPool, 1, timeout,
)
require.NoError(t, writePool.Start())
readBufferPool := pool.NewReadBuffer(
pool.DefaultReadBufferGCInterval,
pool.DefaultReadBufferExpiryInterval,
)
readPool := pool.NewRead(
readBufferPool, 1, timeout,
)
require.NoError(t, readPool.Start())
mockConn := newMockConn(t, 1)
receivedCustomChan := make(chan *customMsg)
var pubKey [33]byte
copy(pubKey[:], aliceKeyPub.SerializeCompressed())
estimator := chainfee.NewStaticEstimator(12500, 0)
cfg := &Config{
Addr: cfgAddr,
PubKeyBytes: pubKey,
ErrorBuffer: errBuffer,
ChainIO: chainIO,
Switch: mockSwitch,
ChanActiveTimeout: chanActiveTimeout,
InterceptSwitch: interceptableSwitch,
ChannelDB: dbAlice.ChannelStateDB(),
FeeEstimator: estimator,
Wallet: wallet,
ChainNotifier: notifier,
ChanStatusMgr: chanStatusMgr,
Features: lnwire.NewFeatureVector(
nil, lnwire.Features,
),
DisconnectPeer: func(b *btcec.PublicKey) error {
return nil
},
ChannelNotifier: channelNotifier,
PrunePersistentPeerConnection: func([33]byte) {},
LegacyFeatures: lnwire.EmptyFeatureVector(),
WritePool: writePool,
ReadPool: readPool,
Conn: mockConn,
HandleCustomMessage: func(
peer [33]byte, msg *lnwire.Custom) error {
receivedCustomChan <- &customMsg{
peer: peer,
msg: *msg,
}
return nil
},
PongBuf: make([]byte, lnwire.MaxPongBytes),
}
alicePeer := NewBrontide(*cfg)
return &peerTestCtx{
publishTx: publishTx,
mockSwitch: mockSwitch,
peer: alicePeer,
notifier: notifier,
db: dbAlice,
privKey: aliceKeyPriv,
mockConn: mockConn,
customChan: receivedCustomChan,
chanStatusMgr: chanStatusMgr,
}
}