Merge pull request #4347 from Crypt-iQ/peer_pkg_0518

peer: Brontide Peer implementation
This commit is contained in:
Olaoluwa Osuntokun 2020-07-06 17:24:15 -07:00 committed by GitHub
commit 854a12e4c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1754 additions and 1127 deletions

View File

@ -166,6 +166,11 @@ func (m *mockChanEvent) NotifyPendingOpenChannelEvent(outpoint wire.OutPoint,
}
}
type newChannelMsg struct {
channel *channeldb.OpenChannel
err chan error
}
type testNode struct {
privKey *btcec.PrivateKey
addr *lnwire.NetAddress

View File

@ -9,8 +9,7 @@ import (
"github.com/lightningnetwork/lnd/lnwire"
)
// Peer is an interface which represents the remote lightning node inside our
// system.
// Peer is an interface which represents a remote lightning node.
type Peer interface {
// SendMessage sends a variadic number of high-priority message to
// remote peer. The first argument denotes if the method should block

View File

@ -89,7 +89,7 @@ func (c *CommitSig) MaxPayloadLength(uint32) uint32 {
// TargetChanID returns the channel id of the link for which this message is
// intended.
//
// NOTE: Part of lnd.LinkUpdater interface.
// NOTE: Part of peer.LinkUpdater interface.
func (c *CommitSig) TargetChanID() ChannelID {
return c.ChanID
}

View File

@ -85,7 +85,7 @@ func (c *RevokeAndAck) MaxPayloadLength(uint32) uint32 {
// TargetChanID returns the channel id of the link for which this message is
// intended.
//
// NOTE: Part of lnd.LinkUpdater interface.
// NOTE: Part of peer.LinkUpdater interface.
func (c *RevokeAndAck) TargetChanID() ChannelID {
return c.ChanID
}

View File

@ -113,7 +113,7 @@ func (c *UpdateAddHTLC) MaxPayloadLength(uint32) uint32 {
// TargetChanID returns the channel id of the link for which this message is
// intended.
//
// NOTE: Part of lnd.LinkUpdater interface.
// NOTE: Part of peer.LinkUpdater interface.
func (c *UpdateAddHTLC) TargetChanID() ChannelID {
return c.ChanID
}

View File

@ -89,7 +89,7 @@ func (c *UpdateFailHTLC) MaxPayloadLength(uint32) uint32 {
// TargetChanID returns the channel id of the link for which this message is
// intended.
//
// NOTE: Part of lnd.LinkUpdater interface.
// NOTE: Part of peer.LinkUpdater interface.
func (c *UpdateFailHTLC) TargetChanID() ChannelID {
return c.ChanID
}

View File

@ -77,7 +77,7 @@ func (c *UpdateFailMalformedHTLC) MaxPayloadLength(uint32) uint32 {
// TargetChanID returns the channel id of the link for which this message is
// intended.
//
// NOTE: Part of lnd.LinkUpdater interface.
// NOTE: Part of peer.LinkUpdater interface.
func (c *UpdateFailMalformedHTLC) TargetChanID() ChannelID {
return c.ChanID
}

View File

@ -72,7 +72,7 @@ func (c *UpdateFee) MaxPayloadLength(uint32) uint32 {
// TargetChanID returns the channel id of the link for which this message is
// intended.
//
// NOTE: Part of lnd.LinkUpdater interface.
// NOTE: Part of peer.LinkUpdater interface.
func (c *UpdateFee) TargetChanID() ChannelID {
return c.ChanID
}

View File

@ -82,7 +82,7 @@ func (c *UpdateFulfillHTLC) MaxPayloadLength(uint32) uint32 {
// TargetChanID returns the channel id of the link for which this message is
// intended.
//
// NOTE: Part of lnd.LinkUpdater interface.
// NOTE: Part of peer.LinkUpdater interface.
func (c *UpdateFulfillHTLC) TargetChanID() ChannelID {
return c.ChanID
}

3
log.go
View File

@ -30,6 +30,7 @@ import (
"github.com/lightningnetwork/lnd/lnwallet/chanfunding"
"github.com/lightningnetwork/lnd/monitoring"
"github.com/lightningnetwork/lnd/netann"
"github.com/lightningnetwork/lnd/peer"
"github.com/lightningnetwork/lnd/peernotifier"
"github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/routing/localchans"
@ -75,7 +76,6 @@ var (
// function should always be called as soon as possible to finish
// setting them up properly with a root logger.
ltndLog = addLndPkgLogger("LTND")
peerLog = addLndPkgLogger("PEER")
rpcsLog = addLndPkgLogger("RPCS")
srvrLog = addLndPkgLogger("SRVR")
fndgLog = addLndPkgLogger("FNDG")
@ -122,6 +122,7 @@ func SetupLoggers(root *build.RotatingLogWriter) {
AddSubLogger(root, "WTCL", wtclient.UseLogger)
AddSubLogger(root, "PRNF", peernotifier.UseLogger)
AddSubLogger(root, "CHFD", chanfunding.UseLogger)
AddSubLogger(root, "PEER", peer.UseLogger)
AddSubLogger(root, "CHCL", chancloser.UseLogger)
AddSubLogger(root, routing.Subsystem, routing.UseLogger, localchans.UseLogger)

File diff suppressed because it is too large Load Diff

View File

@ -1,19 +1,17 @@
// +build !rpctest
package lnd
package peer
import (
"bytes"
"testing"
"time"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
"github.com/lightningnetwork/lnd/lnwallet/chancloser"
"github.com/lightningnetwork/lnd/lnwire"
)
@ -35,12 +33,12 @@ var (
func TestPeerChannelClosureAcceptFeeResponder(t *testing.T) {
t.Parallel()
notifier := &mockNotfier{
notifier := &mockNotifier{
confChannel: make(chan *chainntnfs.TxConfirmation),
}
broadcastTxChan := make(chan *wire.MsgTx)
responder, responderChan, initiatorChan, cleanUp, err := createTestPeer(
alicePeer, bobChan, cleanUp, err := createTestPeer(
notifier, broadcastTxChan, noUpdate,
)
if err != nil {
@ -48,19 +46,19 @@ func TestPeerChannelClosureAcceptFeeResponder(t *testing.T) {
}
defer cleanUp()
chanID := lnwire.NewChanIDFromOutPoint(responderChan.ChannelPoint())
chanID := lnwire.NewChanIDFromOutPoint(bobChan.ChannelPoint())
// We send a shutdown request to Alice. She will now be the responding
// node in this shutdown procedure. We first expect Alice to answer
// this shutdown request with a Shutdown message.
responder.chanCloseMsgs <- &closeMsg{
alicePeer.chanCloseMsgs <- &closeMsg{
cid: chanID,
msg: lnwire.NewShutdown(chanID, dummyDeliveryScript),
}
var msg lnwire.Message
select {
case outMsg := <-responder.outgoingQueue:
case outMsg := <-alicePeer.outgoingQueue:
msg = outMsg.msg
case <-time.After(timeout):
t.Fatalf("did not receive shutdown message")
@ -73,49 +71,61 @@ func TestPeerChannelClosureAcceptFeeResponder(t *testing.T) {
respDeliveryScript := shutdownMsg.Address
// Alice will thereafter send a ClosingSigned message, indicating her
// proposed closing transaction fee.
// Alice will then send a ClosingSigned message, indicating her proposed
// closing transaction fee. Alice sends the ClosingSigned message as she is
// the initiator of the channel.
select {
case outMsg := <-responder.outgoingQueue:
case outMsg := <-alicePeer.outgoingQueue:
msg = outMsg.msg
case <-time.After(timeout):
t.Fatalf("did not receive ClosingSigned message")
}
responderClosingSigned, ok := msg.(*lnwire.ClosingSigned)
respClosingSigned, ok := msg.(*lnwire.ClosingSigned)
if !ok {
t.Fatalf("expected ClosingSigned message, got %T", msg)
}
// We accept the fee, and send a ClosingSigned with the same fee back,
// so she knows we agreed.
peerFee := responderClosingSigned.FeeSatoshis
initiatorSig, _, _, err := initiatorChan.CreateCloseProposal(
peerFee, dummyDeliveryScript, respDeliveryScript,
aliceFee := respClosingSigned.FeeSatoshis
bobSig, _, _, err := bobChan.CreateCloseProposal(
aliceFee, dummyDeliveryScript, respDeliveryScript,
)
if err != nil {
t.Fatalf("error creating close proposal: %v", err)
}
parsedSig, err := lnwire.NewSigFromSignature(initiatorSig)
parsedSig, err := lnwire.NewSigFromSignature(bobSig)
if err != nil {
t.Fatalf("error parsing signature: %v", err)
}
closingSigned := lnwire.NewClosingSigned(chanID, peerFee, parsedSig)
responder.chanCloseMsgs <- &closeMsg{
closingSigned := lnwire.NewClosingSigned(chanID, aliceFee, parsedSig)
alicePeer.chanCloseMsgs <- &closeMsg{
cid: chanID,
msg: closingSigned,
}
// The responder will now see that we agreed on the fee, and broadcast
// the closing transaction.
// Alice should now see that we agreed on the fee, and should broadcast the
// closing transaction.
select {
case <-broadcastTxChan:
case <-time.After(timeout):
t.Fatalf("closing tx not broadcast")
}
// And the initiator should be waiting for a confirmation notification.
// Need to pull the remaining message off of Alice's outgoing queue.
select {
case outMsg := <-alicePeer.outgoingQueue:
msg = outMsg.msg
case <-time.After(timeout):
t.Fatalf("did not receive ClosingSigned message")
}
if _, ok := msg.(*lnwire.ClosingSigned); !ok {
t.Fatalf("expected ClosingSigned message, got %T", msg)
}
// Alice should be waiting in a goroutine for a confirmation.
notifier.confChannel <- &chainntnfs.TxConfirmation{}
}
@ -124,12 +134,12 @@ func TestPeerChannelClosureAcceptFeeResponder(t *testing.T) {
func TestPeerChannelClosureAcceptFeeInitiator(t *testing.T) {
t.Parallel()
notifier := &mockNotfier{
notifier := &mockNotifier{
confChannel: make(chan *chainntnfs.TxConfirmation),
}
broadcastTxChan := make(chan *wire.MsgTx)
initiator, initiatorChan, responderChan, cleanUp, err := createTestPeer(
alicePeer, bobChan, cleanUp, err := createTestPeer(
notifier, broadcastTxChan, noUpdate,
)
if err != nil {
@ -137,22 +147,22 @@ func TestPeerChannelClosureAcceptFeeInitiator(t *testing.T) {
}
defer cleanUp()
// We make the initiator send a shutdown request.
// We make Alice send a shutdown request.
updateChan := make(chan interface{}, 1)
errChan := make(chan error, 1)
closeCommand := &htlcswitch.ChanClose{
CloseType: htlcswitch.CloseRegular,
ChanPoint: initiatorChan.ChannelPoint(),
ChanPoint: bobChan.ChannelPoint(),
Updates: updateChan,
TargetFeePerKw: 12500,
Err: errChan,
}
initiator.localCloseChanReqs <- closeCommand
alicePeer.localCloseChanReqs <- closeCommand
// We should now be getting the shutdown request.
// We can now pull a Shutdown message off of Alice's outgoingQueue.
var msg lnwire.Message
select {
case outMsg := <-initiator.outgoingQueue:
case outMsg := <-alicePeer.outgoingQueue:
msg = outMsg.msg
case <-time.After(timeout):
t.Fatalf("did not receive shutdown request")
@ -163,68 +173,78 @@ func TestPeerChannelClosureAcceptFeeInitiator(t *testing.T) {
t.Fatalf("expected Shutdown message, got %T", msg)
}
initiatorDeliveryScript := shutdownMsg.Address
aliceDeliveryScript := shutdownMsg.Address
// We'll answer the shutdown message with our own Shutdown, and then a
// ClosingSigned message.
// Bob will respond with his own Shutdown message.
chanID := shutdownMsg.ChannelID
initiator.chanCloseMsgs <- &closeMsg{
alicePeer.chanCloseMsgs <- &closeMsg{
cid: chanID,
msg: lnwire.NewShutdown(chanID,
dummyDeliveryScript),
}
estimator := chainfee.NewStaticEstimator(12500, 0)
feePerKw, err := estimator.EstimateFeePerKW(1)
if err != nil {
t.Fatalf("unable to query fee estimator: %v", err)
// Alice will reply with a ClosingSigned here.
select {
case outMsg := <-alicePeer.outgoingQueue:
msg = outMsg.msg
case <-time.After(timeout):
t.Fatalf("did not receive closing signed message")
}
fee := responderChan.CalcFee(feePerKw)
closeSig, _, _, err := responderChan.CreateCloseProposal(fee,
dummyDeliveryScript, initiatorDeliveryScript)
closingSignedMsg, ok := msg.(*lnwire.ClosingSigned)
if !ok {
t.Fatalf("expected to receive closing signed message, got %T", msg)
}
// Bob should reply with the exact same fee in his next ClosingSigned
// message.
bobFee := closingSignedMsg.FeeSatoshis
bobSig, _, _, err := bobChan.CreateCloseProposal(
bobFee, dummyDeliveryScript, aliceDeliveryScript,
)
if err != nil {
t.Fatalf("unable to create close proposal: %v", err)
}
parsedSig, err := lnwire.NewSigFromSignature(closeSig)
parsedSig, err := lnwire.NewSigFromSignature(bobSig)
if err != nil {
t.Fatalf("unable to parse signature: %v", err)
}
closingSigned := lnwire.NewClosingSigned(shutdownMsg.ChannelID,
fee, parsedSig)
initiator.chanCloseMsgs <- &closeMsg{
bobFee, parsedSig)
alicePeer.chanCloseMsgs <- &closeMsg{
cid: chanID,
msg: closingSigned,
}
// And we expect the initiator to accept the fee, and broadcast the
// closing transaction.
select {
case outMsg := <-initiator.outgoingQueue:
msg = outMsg.msg
case <-time.After(timeout):
t.Fatalf("did not receive closing signed message")
}
// Alice should accept Bob's fee, broadcast the cooperative close tx, and
// send a ClosingSigned message back to Bob.
closingSignedMsg, ok := msg.(*lnwire.ClosingSigned)
if !ok {
t.Fatalf("expected ClosingSigned message, got %T", msg)
}
if closingSignedMsg.FeeSatoshis != fee {
t.Fatalf("expected ClosingSigned fee to be %v, instead got %v",
fee, closingSignedMsg.FeeSatoshis)
}
// The initiator will now see that we agreed on the fee, and broadcast
// the closing transaction.
// Alice should now broadcast the closing transaction.
select {
case <-broadcastTxChan:
case <-time.After(timeout):
t.Fatalf("closing tx not broadcast")
}
// And the initiator should be waiting for a confirmation notification.
// Alice should respond with the ClosingSigned they both agreed upon.
select {
case outMsg := <-alicePeer.outgoingQueue:
msg = outMsg.msg
case <-time.After(timeout):
t.Fatalf("did not receive closing signed message")
}
closingSignedMsg, ok = msg.(*lnwire.ClosingSigned)
if !ok {
t.Fatalf("expected ClosingSigned message, got %T", msg)
}
if closingSignedMsg.FeeSatoshis != bobFee {
t.Fatalf("expected ClosingSigned fee to be %v, instead got %v",
bobFee, closingSignedMsg.FeeSatoshis)
}
// Alice should be waiting on a single confirmation for the coop close tx.
notifier.confChannel <- &chainntnfs.TxConfirmation{}
}
@ -234,12 +254,12 @@ func TestPeerChannelClosureAcceptFeeInitiator(t *testing.T) {
func TestPeerChannelClosureFeeNegotiationsResponder(t *testing.T) {
t.Parallel()
notifier := &mockNotfier{
notifier := &mockNotifier{
confChannel: make(chan *chainntnfs.TxConfirmation),
}
broadcastTxChan := make(chan *wire.MsgTx)
responder, responderChan, initiatorChan, cleanUp, err := createTestPeer(
alicePeer, bobChan, cleanUp, err := createTestPeer(
notifier, broadcastTxChan, noUpdate,
)
if err != nil {
@ -247,12 +267,12 @@ func TestPeerChannelClosureFeeNegotiationsResponder(t *testing.T) {
}
defer cleanUp()
chanID := lnwire.NewChanIDFromOutPoint(responderChan.ChannelPoint())
chanID := lnwire.NewChanIDFromOutPoint(bobChan.ChannelPoint())
// We send a shutdown request to Alice. She will now be the responding
// node in this shutdown procedure. We first expect Alice to answer
// this shutdown request with a Shutdown message.
responder.chanCloseMsgs <- &closeMsg{
// Bob sends a shutdown request to Alice. She will now be the responding
// node in this shutdown procedure. We first expect Alice to answer this
// Shutdown request with a Shutdown message.
alicePeer.chanCloseMsgs <- &closeMsg{
cid: chanID,
msg: lnwire.NewShutdown(chanID,
dummyDeliveryScript),
@ -260,7 +280,7 @@ func TestPeerChannelClosureFeeNegotiationsResponder(t *testing.T) {
var msg lnwire.Message
select {
case outMsg := <-responder.outgoingQueue:
case outMsg := <-alicePeer.outgoingQueue:
msg = outMsg.msg
case <-time.After(timeout):
t.Fatalf("did not receive shutdown message")
@ -271,140 +291,152 @@ func TestPeerChannelClosureFeeNegotiationsResponder(t *testing.T) {
t.Fatalf("expected Shutdown message, got %T", msg)
}
respDeliveryScript := shutdownMsg.Address
aliceDeliveryScript := shutdownMsg.Address
// Alice will thereafter send a ClosingSigned message, indicating her
// proposed closing transaction fee.
// As Alice is the channel initiator, she will send her ClosingSigned
// message.
select {
case outMsg := <-responder.outgoingQueue:
case outMsg := <-alicePeer.outgoingQueue:
msg = outMsg.msg
case <-time.After(timeout):
t.Fatalf("did not receive closing signed message")
}
responderClosingSigned, ok := msg.(*lnwire.ClosingSigned)
aliceClosingSigned, ok := msg.(*lnwire.ClosingSigned)
if !ok {
t.Fatalf("expected ClosingSigned message, got %T", msg)
}
// We don't agree with the fee, and will send back one that's 2.5x.
preferredRespFee := responderClosingSigned.FeeSatoshis
// Bob doesn't agree with the fee and will send one back that's 2.5x.
preferredRespFee := aliceClosingSigned.FeeSatoshis
increasedFee := btcutil.Amount(float64(preferredRespFee) * 2.5)
initiatorSig, _, _, err := initiatorChan.CreateCloseProposal(
increasedFee, dummyDeliveryScript, respDeliveryScript,
bobSig, _, _, err := bobChan.CreateCloseProposal(
increasedFee, dummyDeliveryScript, aliceDeliveryScript,
)
if err != nil {
t.Fatalf("error creating close proposal: %v", err)
}
parsedSig, err := lnwire.NewSigFromSignature(initiatorSig)
parsedSig, err := lnwire.NewSigFromSignature(bobSig)
if err != nil {
t.Fatalf("error parsing signature: %v", err)
}
closingSigned := lnwire.NewClosingSigned(chanID, increasedFee, parsedSig)
responder.chanCloseMsgs <- &closeMsg{
alicePeer.chanCloseMsgs <- &closeMsg{
cid: chanID,
msg: closingSigned,
}
// The responder will see the new fee we propose, but with current
// settings it won't accept it immediately as it differs too much by
// its ideal fee. We should get a new proposal back, which should have
// the average fee rate proposed.
// Alice will now see the new fee we propose, but with current settings it
// won't accept it immediately as it differs too much by its ideal fee. We
// should get a new proposal back, which should have the average fee rate
// proposed.
select {
case outMsg := <-responder.outgoingQueue:
case outMsg := <-alicePeer.outgoingQueue:
msg = outMsg.msg
case <-time.After(timeout):
t.Fatalf("did not receive closing signed message")
}
responderClosingSigned, ok = msg.(*lnwire.ClosingSigned)
aliceClosingSigned, ok = msg.(*lnwire.ClosingSigned)
if !ok {
t.Fatalf("expected ClosingSigned message, got %T", msg)
}
// The fee sent by the responder should be less than the fee we just
// sent as it should attempt to compromise.
peerFee := responderClosingSigned.FeeSatoshis
if peerFee > increasedFee {
// The fee sent by Alice should be less than the fee Bob just sent as Alice
// should attempt to compromise.
aliceFee := aliceClosingSigned.FeeSatoshis
if aliceFee > increasedFee {
t.Fatalf("new fee should be less than our fee: new=%v, "+
"prior=%v", peerFee, increasedFee)
"prior=%v", aliceFee, increasedFee)
}
lastFeeResponder := peerFee
lastFeeResponder := aliceFee
// We try negotiating a 2.1x fee, which should also be rejected.
increasedFee = btcutil.Amount(float64(preferredRespFee) * 2.1)
initiatorSig, _, _, err = initiatorChan.CreateCloseProposal(
increasedFee, dummyDeliveryScript, respDeliveryScript,
bobSig, _, _, err = bobChan.CreateCloseProposal(
increasedFee, dummyDeliveryScript, aliceDeliveryScript,
)
if err != nil {
t.Fatalf("error creating close proposal: %v", err)
}
parsedSig, err = lnwire.NewSigFromSignature(initiatorSig)
parsedSig, err = lnwire.NewSigFromSignature(bobSig)
if err != nil {
t.Fatalf("error parsing signature: %v", err)
}
closingSigned = lnwire.NewClosingSigned(chanID, increasedFee, parsedSig)
responder.chanCloseMsgs <- &closeMsg{
alicePeer.chanCloseMsgs <- &closeMsg{
cid: chanID,
msg: closingSigned,
}
// It still won't be accepted, and we should get a new proposal, the
// average of what we proposed, and what they proposed last time.
// Bob's latest proposal still won't be accepted and Alice should send over
// a new ClosingSigned message. It should be the average of what Bob and
// Alice each proposed last time.
select {
case outMsg := <-responder.outgoingQueue:
case outMsg := <-alicePeer.outgoingQueue:
msg = outMsg.msg
case <-time.After(timeout):
t.Fatalf("did not receive closing signed message")
}
responderClosingSigned, ok = msg.(*lnwire.ClosingSigned)
aliceClosingSigned, ok = msg.(*lnwire.ClosingSigned)
if !ok {
t.Fatalf("expected ClosingSigned message, got %T", msg)
}
// The peer should inch towards our fee, in order to compromise.
// Additionally, this fee should be less than the fee we sent prior.
peerFee = responderClosingSigned.FeeSatoshis
if peerFee < lastFeeResponder {
// Alice should inch towards Bob's fee, in order to compromise.
// Additionally, this fee should be less than the fee Bob sent before.
aliceFee = aliceClosingSigned.FeeSatoshis
if aliceFee < lastFeeResponder {
t.Fatalf("new fee should be greater than prior: new=%v, "+
"prior=%v", peerFee, lastFeeResponder)
"prior=%v", aliceFee, lastFeeResponder)
}
if peerFee > increasedFee {
t.Fatalf("new fee should be less than our fee: new=%v, "+
"prior=%v", peerFee, increasedFee)
if aliceFee > increasedFee {
t.Fatalf("new fee should be less than Bob's fee: new=%v, "+
"prior=%v", aliceFee, increasedFee)
}
// Finally, we'll accept the fee by echoing back the same fee that they
// sent to us.
initiatorSig, _, _, err = initiatorChan.CreateCloseProposal(
peerFee, dummyDeliveryScript, respDeliveryScript,
// Finally, Bob will accept the fee by echoing back the same fee that Alice
// just sent over.
bobSig, _, _, err = bobChan.CreateCloseProposal(
aliceFee, dummyDeliveryScript, aliceDeliveryScript,
)
if err != nil {
t.Fatalf("error creating close proposal: %v", err)
}
parsedSig, err = lnwire.NewSigFromSignature(initiatorSig)
parsedSig, err = lnwire.NewSigFromSignature(bobSig)
if err != nil {
t.Fatalf("error parsing signature: %v", err)
}
closingSigned = lnwire.NewClosingSigned(chanID, peerFee, parsedSig)
responder.chanCloseMsgs <- &closeMsg{
closingSigned = lnwire.NewClosingSigned(chanID, aliceFee, parsedSig)
alicePeer.chanCloseMsgs <- &closeMsg{
cid: chanID,
msg: closingSigned,
}
// The responder will now see that we agreed on the fee, and broadcast
// the closing transaction.
// Alice will now see that Bob agreed on the fee, and broadcast the coop
// close transaction.
select {
case <-broadcastTxChan:
case <-time.After(timeout):
t.Fatalf("closing tx not broadcast")
}
// And the responder should be waiting for a confirmation notification.
// Alice should respond with the ClosingSigned they both agreed upon.
select {
case outMsg := <-alicePeer.outgoingQueue:
msg = outMsg.msg
case <-time.After(timeout):
t.Fatalf("did not receive closing signed message")
}
if _, ok := msg.(*lnwire.ClosingSigned); !ok {
t.Fatalf("expected to receive closing signed message, got %T", msg)
}
// Alice should be waiting on a single confirmation for the coop close tx.
notifier.confChannel <- &chainntnfs.TxConfirmation{}
}
@ -414,12 +446,12 @@ func TestPeerChannelClosureFeeNegotiationsResponder(t *testing.T) {
func TestPeerChannelClosureFeeNegotiationsInitiator(t *testing.T) {
t.Parallel()
notifier := &mockNotfier{
notifier := &mockNotifier{
confChannel: make(chan *chainntnfs.TxConfirmation),
}
broadcastTxChan := make(chan *wire.MsgTx)
initiator, initiatorChan, responderChan, cleanUp, err := createTestPeer(
alicePeer, bobChan, cleanUp, err := createTestPeer(
notifier, broadcastTxChan, noUpdate,
)
if err != nil {
@ -432,18 +464,18 @@ func TestPeerChannelClosureFeeNegotiationsInitiator(t *testing.T) {
errChan := make(chan error, 1)
closeCommand := &htlcswitch.ChanClose{
CloseType: htlcswitch.CloseRegular,
ChanPoint: initiatorChan.ChannelPoint(),
ChanPoint: bobChan.ChannelPoint(),
Updates: updateChan,
TargetFeePerKw: 12500,
Err: errChan,
}
initiator.localCloseChanReqs <- closeCommand
alicePeer.localCloseChanReqs <- closeCommand
// We should now be getting the shutdown request.
// Alice should now send a Shutdown request to Bob.
var msg lnwire.Message
select {
case outMsg := <-initiator.outgoingQueue:
case outMsg := <-alicePeer.outgoingQueue:
msg = outMsg.msg
case <-time.After(timeout):
t.Fatalf("did not receive shutdown request")
@ -454,47 +486,20 @@ func TestPeerChannelClosureFeeNegotiationsInitiator(t *testing.T) {
t.Fatalf("expected Shutdown message, got %T", msg)
}
initiatorDeliveryScript := shutdownMsg.Address
aliceDeliveryScript := shutdownMsg.Address
// We'll answer the shutdown message with our own Shutdown, and then a
// ClosingSigned message.
chanID := lnwire.NewChanIDFromOutPoint(initiatorChan.ChannelPoint())
// Bob will answer the Shutdown message with his own Shutdown.
chanID := lnwire.NewChanIDFromOutPoint(bobChan.ChannelPoint())
respShutdown := lnwire.NewShutdown(chanID, dummyDeliveryScript)
initiator.chanCloseMsgs <- &closeMsg{
alicePeer.chanCloseMsgs <- &closeMsg{
cid: chanID,
msg: respShutdown,
}
estimator := chainfee.NewStaticEstimator(12500, 0)
initiatorIdealFeeRate, err := estimator.EstimateFeePerKW(1)
if err != nil {
t.Fatalf("unable to query fee estimator: %v", err)
}
initiatorIdealFee := responderChan.CalcFee(initiatorIdealFeeRate)
increasedFee := btcutil.Amount(float64(initiatorIdealFee) * 2.5)
closeSig, _, _, err := responderChan.CreateCloseProposal(
increasedFee, dummyDeliveryScript, initiatorDeliveryScript,
)
if err != nil {
t.Fatalf("unable to create close proposal: %v", err)
}
parsedSig, err := lnwire.NewSigFromSignature(closeSig)
if err != nil {
t.Fatalf("unable to parse signature: %v", err)
}
closingSigned := lnwire.NewClosingSigned(
shutdownMsg.ChannelID, increasedFee, parsedSig,
)
initiator.chanCloseMsgs <- &closeMsg{
cid: chanID,
msg: closingSigned,
}
// We should get two closing signed messages, the first will be the
// ideal fee sent by the initiator in response to our shutdown request.
// Alice should now respond with a ClosingSigned message with her ideal
// fee rate.
select {
case outMsg := <-initiator.outgoingQueue:
case outMsg := <-alicePeer.outgoingQueue:
msg = outMsg.msg
case <-time.After(timeout):
t.Fatalf("did not receive closing signed")
@ -503,16 +508,35 @@ func TestPeerChannelClosureFeeNegotiationsInitiator(t *testing.T) {
if !ok {
t.Fatalf("expected ClosingSigned message, got %T", msg)
}
if closingSignedMsg.FeeSatoshis != initiatorIdealFee {
t.Fatalf("expected ClosingSigned fee to be %v, instead got %v",
initiatorIdealFee, closingSignedMsg.FeeSatoshis)
}
lastFeeSent := closingSignedMsg.FeeSatoshis
// The second message should be the compromise fee sent in response to
// them receiving our fee proposal.
idealFeeRate := closingSignedMsg.FeeSatoshis
lastReceivedFee := idealFeeRate
increasedFee := btcutil.Amount(float64(idealFeeRate) * 2.1)
lastSentFee := increasedFee
bobSig, _, _, err := bobChan.CreateCloseProposal(
increasedFee, dummyDeliveryScript, aliceDeliveryScript,
)
if err != nil {
t.Fatalf("error creating close proposal: %v", err)
}
parsedSig, err := lnwire.NewSigFromSignature(bobSig)
if err != nil {
t.Fatalf("unable to parse signature: %v", err)
}
closingSigned := lnwire.NewClosingSigned(chanID, increasedFee, parsedSig)
alicePeer.chanCloseMsgs <- &closeMsg{
cid: chanID,
msg: closingSigned,
}
// It still won't be accepted, and we should get a new proposal, the
// average of what we proposed, and what they proposed last time.
select {
case outMsg := <-initiator.outgoingQueue:
case outMsg := <-alicePeer.outgoingQueue:
msg = outMsg.msg
case <-time.After(timeout):
t.Fatalf("did not receive closing signed")
@ -522,80 +546,79 @@ func TestPeerChannelClosureFeeNegotiationsInitiator(t *testing.T) {
t.Fatalf("expected ClosingSigned message, got %T", msg)
}
// The peer should inch towards our fee, in order to compromise.
// Additionally, this fee should be less than the fee we sent prior.
peerFee := closingSignedMsg.FeeSatoshis
if peerFee < lastFeeSent {
t.Fatalf("new fee should be greater than prior: new=%v, "+
"prior=%v", peerFee, lastFeeSent)
aliceFee := closingSignedMsg.FeeSatoshis
if aliceFee < lastReceivedFee {
t.Fatalf("new fee should be greater than prior: new=%v, old=%v",
aliceFee, lastReceivedFee)
}
if peerFee > increasedFee {
t.Fatalf("new fee should be less than our fee: new=%v, "+
"prior=%v", peerFee, increasedFee)
if aliceFee > lastSentFee {
t.Fatalf("new fee should be less than our fee: new=%v, old=%v",
aliceFee, lastSentFee)
}
lastFeeSent = closingSignedMsg.FeeSatoshis
// We try negotiating a 2.1x fee, which should also be rejected.
increasedFee = btcutil.Amount(float64(initiatorIdealFee) * 2.1)
responderSig, _, _, err := responderChan.CreateCloseProposal(
increasedFee, dummyDeliveryScript, initiatorDeliveryScript,
lastReceivedFee = aliceFee
// We'll try negotiating a 1.5x fee, which should also be rejected.
increasedFee = btcutil.Amount(float64(idealFeeRate) * 1.5)
lastSentFee = increasedFee
bobSig, _, _, err = bobChan.CreateCloseProposal(
increasedFee, dummyDeliveryScript, aliceDeliveryScript,
)
if err != nil {
t.Fatalf("error creating close proposal: %v", err)
}
parsedSig, err = lnwire.NewSigFromSignature(responderSig)
parsedSig, err = lnwire.NewSigFromSignature(bobSig)
if err != nil {
t.Fatalf("error parsing signature: %v", err)
}
closingSigned = lnwire.NewClosingSigned(chanID, increasedFee, parsedSig)
initiator.chanCloseMsgs <- &closeMsg{
alicePeer.chanCloseMsgs <- &closeMsg{
cid: chanID,
msg: closingSigned,
}
// It still won't be accepted, and we should get a new proposal, the
// average of what we proposed, and what they proposed last time.
// Alice won't accept Bob's new proposal, and Bob should receive a new
// proposal which is the average of what Bob proposed and Alice proposed
// last time.
select {
case outMsg := <-initiator.outgoingQueue:
case outMsg := <-alicePeer.outgoingQueue:
msg = outMsg.msg
case <-time.After(timeout):
t.Fatalf("did not receive closing signed")
}
initiatorClosingSigned, ok := msg.(*lnwire.ClosingSigned)
closingSignedMsg, ok = msg.(*lnwire.ClosingSigned)
if !ok {
t.Fatalf("expected ClosingSigned message, got %T", msg)
}
// Once again, the fee sent by the initiator should be greater than the
// last fee they sent, but less than the last fee we sent.
peerFee = initiatorClosingSigned.FeeSatoshis
if peerFee < lastFeeSent {
t.Fatalf("new fee should be greater than prior: new=%v, "+
"prior=%v", peerFee, lastFeeSent)
aliceFee = closingSignedMsg.FeeSatoshis
if aliceFee < lastReceivedFee {
t.Fatalf("new fee should be greater than prior: new=%v, old=%v",
aliceFee, lastReceivedFee)
}
if peerFee > increasedFee {
t.Fatalf("new fee should be less than our fee: new=%v, "+
"prior=%v", peerFee, increasedFee)
if aliceFee > lastSentFee {
t.Fatalf("new fee should be less than Bob's fee: new=%v, old=%v",
aliceFee, lastSentFee)
}
// At this point, we'll accept their fee by sending back a CloseSigned
// message with an identical fee.
responderSig, _, _, err = responderChan.CreateCloseProposal(
peerFee, dummyDeliveryScript, initiatorDeliveryScript,
// Bob will now accept their fee by sending back a ClosingSigned message
// with an identical fee.
bobSig, _, _, err = bobChan.CreateCloseProposal(
aliceFee, dummyDeliveryScript, aliceDeliveryScript,
)
if err != nil {
t.Fatalf("error creating close proposal: %v", err)
}
parsedSig, err = lnwire.NewSigFromSignature(responderSig)
parsedSig, err = lnwire.NewSigFromSignature(bobSig)
if err != nil {
t.Fatalf("error parsing signature: %v", err)
}
closingSigned = lnwire.NewClosingSigned(chanID, peerFee, parsedSig)
initiator.chanCloseMsgs <- &closeMsg{
closingSigned = lnwire.NewClosingSigned(chanID, aliceFee, parsedSig)
alicePeer.chanCloseMsgs <- &closeMsg{
cid: chanID,
msg: closingSigned,
}
@ -606,6 +629,20 @@ func TestPeerChannelClosureFeeNegotiationsInitiator(t *testing.T) {
case <-time.After(timeout):
t.Fatalf("closing tx not broadcast")
}
// Alice should respond with the ClosingSigned they both agreed upon.
select {
case outMsg := <-alicePeer.outgoingQueue:
msg = outMsg.msg
case <-time.After(timeout):
t.Fatalf("did not receive closing signed message")
}
if _, ok := msg.(*lnwire.ClosingSigned); !ok {
t.Fatalf("expected to receive closing signed message, got %T", msg)
}
// Alice should be waiting on a single confirmation for the coop close tx.
notifier.confChannel <- &chainntnfs.TxConfirmation{}
}
// TestChooseDeliveryScript tests that chooseDeliveryScript correctly errors
@ -742,13 +779,13 @@ func TestCustomShutdownScript(t *testing.T) {
test := test
t.Run(test.name, func(t *testing.T) {
notifier := &mockNotfier{
notifier := &mockNotifier{
confChannel: make(chan *chainntnfs.TxConfirmation),
}
broadcastTxChan := make(chan *wire.MsgTx)
// Open a channel.
initiator, initiatorChan, _, cleanUp, err := createTestPeer(
alicePeer, bobChan, cleanUp, err := createTestPeer(
notifier, broadcastTxChan, test.update,
)
if err != nil {
@ -760,7 +797,7 @@ func TestCustomShutdownScript(t *testing.T) {
// a specified delivery address.
updateChan := make(chan interface{}, 1)
errChan := make(chan error, 1)
chanPoint := initiatorChan.ChannelPoint()
chanPoint := bobChan.ChannelPoint()
closeCommand := htlcswitch.ChanClose{
CloseType: htlcswitch.CloseRegular,
ChanPoint: chanPoint,
@ -772,11 +809,11 @@ func TestCustomShutdownScript(t *testing.T) {
// Send the close command for the correct channel and check that a
// shutdown message is sent.
initiator.localCloseChanReqs <- &closeCommand
alicePeer.localCloseChanReqs <- &closeCommand
var msg lnwire.Message
select {
case outMsg := <-initiator.outgoingQueue:
case outMsg := <-alicePeer.outgoingQueue:
msg = outMsg.msg
case <-time.After(timeout):
t.Fatalf("did not receive shutdown message")
@ -820,7 +857,7 @@ func genScript(t *testing.T, address string) lnwire.DeliveryAddress {
// Generate an address which can be used for testing.
deliveryAddr, err := btcutil.DecodeAddress(
address,
activeNetParams.Params,
&chaincfg.TestNet3Params,
)
if err != nil {
t.Fatalf("invalid delivery address: %v", err)

228
peer/config.go Normal file
View File

@ -0,0 +1,228 @@
package peer
import (
"net"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/connmgr"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/htlcswitch/hodl"
"github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/invoices"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/netann"
"github.com/lightningnetwork/lnd/pool"
"github.com/lightningnetwork/lnd/queue"
"github.com/lightningnetwork/lnd/watchtower/wtclient"
)
// Config defines configuration fields that are necessary for a peer object
// to function.
type Config struct {
// Conn is the underlying network connection for this peer.
Conn net.Conn
// ConnReq stores information related to the persistent connection request
// for this peer.
ConnReq *connmgr.ConnReq
// PubKeyBytes is the serialized, compressed public key of this peer.
PubKeyBytes [33]byte
// Addr is the network address of the peer.
Addr *lnwire.NetAddress
// Inbound indicates whether or not the peer is an inbound peer.
Inbound bool
// Features is the set of features that we advertise to the remote party.
Features *lnwire.FeatureVector
// LegacyFeatures is the set of features that we advertise to the remote
// peer for backwards compatibility. Nodes that have not implemented
// flat features will still be able to read our feature bits from the
// legacy global field, but we will also advertise everything in the
// default features field.
LegacyFeatures *lnwire.FeatureVector
// OutgoingCltvRejectDelta defines the number of blocks before expiry of
// an htlc where we don't offer it anymore.
OutgoingCltvRejectDelta uint32
// ChanActiveTimeout specifies the duration the peer will wait to request
// a channel reenable, beginning from the time the peer was started.
ChanActiveTimeout time.Duration
// ErrorBuffer stores a set of errors related to a peer. It contains error
// messages that our peer has recently sent us over the wire and records of
// unknown messages that were sent to us so that we can have a full track
// record of the communication errors we have had with our peer. If we
// choose to disconnect from a peer, it also stores the reason we had for
// disconnecting.
ErrorBuffer *queue.CircularBuffer
// WritePool is the task pool that manages reuse of write buffers. Write
// tasks are submitted to the pool in order to conserve the total number of
// write buffers allocated at any one time, and decouple write buffer
// allocation from the peer life cycle.
WritePool *pool.Write
// ReadPool is the task pool that manages reuse of read buffers.
ReadPool *pool.Read
// Switch is a pointer to the htlcswitch. It is used to setup, get, and
// tear-down ChannelLinks.
Switch *htlcswitch.Switch
// InterceptSwitch is a pointer to the InterceptableSwitch, a wrapper around
// the regular Switch. We only export it here to pass ForwardPackets to the
// ChannelLinkConfig.
InterceptSwitch *htlcswitch.InterceptableSwitch
// ChannelDB is used to fetch opened channels, closed channels, and the
// channel graph.
ChannelDB *channeldb.DB
// ChainArb is used to subscribe to channel events, update contract signals,
// and force close channels.
ChainArb *contractcourt.ChainArbitrator
// AuthGossiper is needed so that the Brontide impl can register with the
// gossiper and process remote channel announcements.
AuthGossiper *discovery.AuthenticatedGossiper
// ChanStatusMgr is used to set or un-set the disabled bit in channel
// updates.
ChanStatusMgr *netann.ChanStatusManager
// ChainIO is used to retrieve the best block.
ChainIO lnwallet.BlockChainIO
// FeeEstimator is used to compute our target ideal fee-per-kw when
// initializing the coop close process.
FeeEstimator chainfee.Estimator
// Signer is used when creating *lnwallet.LightningChannel instances.
Signer input.Signer
// SigPool is used when creating *lnwallet.LightningChannel instances.
SigPool *lnwallet.SigPool
// Wallet is used to publish transactions and generate delivery scripts
// during the coop close process.
Wallet *lnwallet.LightningWallet
// ChainNotifier is used to receive confirmations of a coop close
// transaction.
ChainNotifier chainntnfs.ChainNotifier
// RoutingPolicy is used to set the forwarding policy for links created by
// the Brontide.
RoutingPolicy htlcswitch.ForwardingPolicy
// Sphinx is used when setting up ChannelLinks so they can decode sphinx
// onion blobs.
Sphinx *hop.OnionProcessor
// WitnessBeacon is used when setting up ChannelLinks so they can add any
// preimages that they learn.
WitnessBeacon contractcourt.WitnessBeacon
// Invoices is passed to the ChannelLink on creation and handles all
// invoice-related logic.
Invoices *invoices.InvoiceRegistry
// ChannelNotifier is used by the link to notify other sub-systems about
// channel-related events and by the Brontide to subscribe to
// ActiveLinkEvents.
ChannelNotifier *channelnotifier.ChannelNotifier
// HtlcNotifier is used when creating a ChannelLink.
HtlcNotifier *htlcswitch.HtlcNotifier
// TowerClient is used when creating a ChannelLink.
TowerClient wtclient.Client
// DisconnectPeer is used to disconnect this peer if the cooperative close
// process fails.
DisconnectPeer func(*btcec.PublicKey) error
// GenNodeAnnouncement is used to send our node announcement to the remote
// on startup.
GenNodeAnnouncement func(bool,
...netann.NodeAnnModifier) (lnwire.NodeAnnouncement, error)
// PrunePersistentPeerConnection is used to remove all internal state
// related to this peer in the server.
PrunePersistentPeerConnection func([33]byte)
// FetchLastChanUpdate fetches our latest channel update for a target
// channel.
FetchLastChanUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate,
error)
// ProcessFundingOpen is used to hand off an OpenChannel message to the
// funding manager.
ProcessFundingOpen func(*lnwire.OpenChannel, lnpeer.Peer)
// ProcessFundingAccept is used to hand off an AcceptChannel message to the
// funding manager.
ProcessFundingAccept func(*lnwire.AcceptChannel, lnpeer.Peer)
// ProcessFundingCreated is used to hand off a FundingCreated message to
// the funding manager.
ProcessFundingCreated func(*lnwire.FundingCreated, lnpeer.Peer)
// ProcessFundingSigned is used to hand off a FundingSigned message to the
// funding manager.
ProcessFundingSigned func(*lnwire.FundingSigned, lnpeer.Peer)
// ProcessFundingLocked is used to hand off a FundingLocked message to the
// funding manager.
ProcessFundingLocked func(*lnwire.FundingLocked, lnpeer.Peer)
// ProcessFundingError is used to hand off an Error message to the funding
// manager.
ProcessFundingError func(*lnwire.Error, *btcec.PublicKey)
// IsPendingChannel is used to determine whether to send an Error message
// to the funding manager or not.
IsPendingChannel func([32]byte, *btcec.PublicKey) bool
// Hodl is used when creating ChannelLinks to specify HodlFlags as
// breakpoints in dev builds.
Hodl *hodl.Config
// UnsafeReplay is used when creating ChannelLinks to specify whether or
// not to replay adds on its commitment tx.
UnsafeReplay bool
// MaxOutgoingCltvExpiry is used when creating ChannelLinks and is the max
// number of blocks that funds could be locked up for when forwarding
// payments.
MaxOutgoingCltvExpiry uint32
// MaxChannelFeeAllocation is used when creating ChannelLinks and is the
// maximum percentage of total funds that can be allocated to a channel's
// commitment fee. This only applies for the initiator of the channel.
MaxChannelFeeAllocation float64
// ServerPubKey is the serialized, compressed public key of our lnd node.
// It is used to determine which policy (channel edge) to pass to the
// ChannelLink.
ServerPubKey [33]byte
// Quit is the server's quit channel. If this is closed, we halt operation.
Quit chan struct{}
}

11
peer/interfaces.go Normal file
View File

@ -0,0 +1,11 @@
package peer
import "github.com/lightningnetwork/lnd/lnwire"
// LinkUpdater is an interface implemented by most messages in BOLT 2 that are
// allowed to update the channel state.
type LinkUpdater interface {
// TargetChanID returns the channel id of the link for which this message
// is intended.
TargetChanID() lnwire.ChannelID
}

40
peer/log.go Normal file
View File

@ -0,0 +1,40 @@
package peer
import (
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/build"
)
// peerLog is a logger that is initialized with the btclog.Disabled logger.
var peerLog btclog.Logger
// The default amount of logging is none.
func init() {
UseLogger(build.NewSubLogger("PEER", nil))
}
// DisableLog disables all logging output.
func DisableLog() {
UseLogger(btclog.Disabled)
}
// UseLogger uses a specified Logger to output package logging info.
func UseLogger(logger btclog.Logger) {
peerLog = logger
}
// logClosure is used to provide a closure over expensive logging operations
// so they aren't performed when the logging level doesn't warrant it.
type logClosure func() string
// String invokes the underlying function and returns the result.
func (c logClosure) String() string {
return c()
}
// newLogClosure returns a new closure over a function that returns a string
// which itself provides a Stringer interface so that it can be used with the
// logging system.
func newLogClosure(c func() string) logClosure {
return logClosure(c)
}

695
peer/test_utils.go Normal file
View File

@ -0,0 +1,695 @@
package peer
import (
"bytes"
crand "crypto/rand"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"math/rand"
"net"
"os"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwallet/wallet/txauthor"
"github.com/btcsuite/btcwallet/wtxmgr"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/netann"
"github.com/lightningnetwork/lnd/queue"
"github.com/lightningnetwork/lnd/shachain"
"github.com/lightningnetwork/lnd/ticker"
)
const (
broadcastHeight = 100
)
var (
alicesPrivKey = []byte{
0x2b, 0xd8, 0x06, 0xc9, 0x7f, 0x0e, 0x00, 0xaf,
0x1a, 0x1f, 0xc3, 0x32, 0x8f, 0xa7, 0x63, 0xa9,
0x26, 0x97, 0x23, 0xc8, 0xdb, 0x8f, 0xac, 0x4f,
0x93, 0xaf, 0x71, 0xdb, 0x18, 0x6d, 0x6e, 0x90,
}
bobsPrivKey = []byte{
0x81, 0xb6, 0x37, 0xd8, 0xfc, 0xd2, 0xc6, 0xda,
0x63, 0x59, 0xe6, 0x96, 0x31, 0x13, 0xa1, 0x17,
0xd, 0xe7, 0x95, 0xe4, 0xb7, 0x25, 0xb8, 0x4d,
0x1e, 0xb, 0x4c, 0xfd, 0x9e, 0xc5, 0x8c, 0xe9,
}
// Use a hard-coded HD seed.
testHdSeed = [32]byte{
0xb7, 0x94, 0x38, 0x5f, 0x2d, 0x1e, 0xf7, 0xab,
0x4d, 0x92, 0x73, 0xd1, 0x90, 0x63, 0x81, 0xb4,
0x4f, 0x2f, 0x6f, 0x25, 0x88, 0xa3, 0xef, 0xb9,
0x6a, 0x49, 0x18, 0x83, 0x31, 0x98, 0x47, 0x53,
}
// Just use some arbitrary bytes as delivery script.
dummyDeliveryScript = alicesPrivKey
// testTx is used as the default funding txn for single-funder channels.
testTx = &wire.MsgTx{
Version: 1,
TxIn: []*wire.TxIn{
{
PreviousOutPoint: wire.OutPoint{
Hash: chainhash.Hash{},
Index: 0xffffffff,
},
SignatureScript: []byte{0x04, 0x31, 0xdc, 0x00, 0x1b, 0x01, 0x62},
Sequence: 0xffffffff,
},
},
TxOut: []*wire.TxOut{
{
Value: 5000000000,
PkScript: []byte{
0x41, // OP_DATA_65
0x04, 0xd6, 0x4b, 0xdf, 0xd0, 0x9e, 0xb1, 0xc5,
0xfe, 0x29, 0x5a, 0xbd, 0xeb, 0x1d, 0xca, 0x42,
0x81, 0xbe, 0x98, 0x8e, 0x2d, 0xa0, 0xb6, 0xc1,
0xc6, 0xa5, 0x9d, 0xc2, 0x26, 0xc2, 0x86, 0x24,
0xe1, 0x81, 0x75, 0xe8, 0x51, 0xc9, 0x6b, 0x97,
0x3d, 0x81, 0xb0, 0x1c, 0xc3, 0x1f, 0x04, 0x78,
0x34, 0xbc, 0x06, 0xd6, 0xd6, 0xed, 0xf6, 0x20,
0xd1, 0x84, 0x24, 0x1a, 0x6a, 0xed, 0x8b, 0x63,
0xa6, // 65-byte signature
0xac, // OP_CHECKSIG
},
},
},
LockTime: 5,
}
)
// noUpdate is a function which can be used as a parameter in createTestPeer to
// call the setup code with no custom values on the channels set up.
var noUpdate = func(a, b *channeldb.OpenChannel) {}
type mockSigner struct {
key *btcec.PrivateKey
}
func (m *mockSigner) SignOutputRaw(tx *wire.MsgTx,
signDesc *input.SignDescriptor) (input.Signature, error) {
amt := signDesc.Output.Value
witnessScript := signDesc.WitnessScript
privKey := m.key
if !privKey.PubKey().IsEqual(signDesc.KeyDesc.PubKey) {
return nil, fmt.Errorf("incorrect key passed")
}
switch {
case signDesc.SingleTweak != nil:
privKey = input.TweakPrivKey(privKey,
signDesc.SingleTweak)
case signDesc.DoubleTweak != nil:
privKey = input.DeriveRevocationPrivKey(privKey,
signDesc.DoubleTweak)
}
sig, err := txscript.RawTxInWitnessSignature(tx, signDesc.SigHashes,
signDesc.InputIndex, amt, witnessScript, signDesc.HashType,
privKey)
if err != nil {
return nil, err
}
return btcec.ParseDERSignature(sig[:len(sig)-1], btcec.S256())
}
func (m *mockSigner) ComputeInputScript(tx *wire.MsgTx,
signDesc *input.SignDescriptor) (*input.Script, error) {
// TODO(roasbeef): expose tweaked signer from lnwallet so don't need to
// duplicate this code?
privKey := m.key
switch {
case signDesc.SingleTweak != nil:
privKey = input.TweakPrivKey(privKey,
signDesc.SingleTweak)
case signDesc.DoubleTweak != nil:
privKey = input.DeriveRevocationPrivKey(privKey,
signDesc.DoubleTweak)
}
witnessScript, err := txscript.WitnessSignature(tx, signDesc.SigHashes,
signDesc.InputIndex, signDesc.Output.Value, signDesc.Output.PkScript,
signDesc.HashType, privKey, true)
if err != nil {
return nil, err
}
return &input.Script{
Witness: witnessScript,
}, nil
}
var _ input.Signer = (*mockSigner)(nil)
type mockChainIO struct {
bestHeight int32
}
func (m *mockChainIO) GetBestBlock() (*chainhash.Hash, int32, error) {
return nil, m.bestHeight, nil
}
func (*mockChainIO) GetUtxo(op *wire.OutPoint, _ []byte,
heightHint uint32, _ <-chan struct{}) (*wire.TxOut, error) {
return nil, nil
}
func (*mockChainIO) GetBlockHash(blockHeight int64) (*chainhash.Hash, error) {
return nil, nil
}
func (*mockChainIO) GetBlock(blockHash *chainhash.Hash) (*wire.MsgBlock, error) {
return nil, nil
}
var _ lnwallet.BlockChainIO = (*mockChainIO)(nil)
type mockWalletController struct {
rootKey *btcec.PrivateKey
publishedTxns chan *wire.MsgTx
}
func (*mockWalletController) FetchInputInfo(prevOut *wire.OutPoint) (
*lnwallet.Utxo, error) {
return nil, nil
}
func (*mockWalletController) ConfirmedBalance(confs int32) (btcutil.Amount,
error) {
return 0, nil
}
func (m *mockWalletController) NewAddress(addrType lnwallet.AddressType,
change bool) (btcutil.Address, error) {
addr, _ := btcutil.NewAddressPubKey(
m.rootKey.PubKey().SerializeCompressed(), &chaincfg.MainNetParams,
)
return addr, nil
}
func (*mockWalletController) LastUnusedAddress(addrType lnwallet.AddressType) (
btcutil.Address, error) {
return nil, nil
}
func (*mockWalletController) IsOurAddress(a btcutil.Address) bool {
return false
}
func (*mockWalletController) SendOutputs(outputs []*wire.TxOut,
feeRate chainfee.SatPerKWeight, label string) (*wire.MsgTx, error) {
return nil, nil
}
func (*mockWalletController) CreateSimpleTx(outputs []*wire.TxOut,
feeRate chainfee.SatPerKWeight, dryRun bool) (*txauthor.AuthoredTx, error) {
return nil, nil
}
func (*mockWalletController) ListUnspentWitness(minconfirms,
maxconfirms int32) ([]*lnwallet.Utxo, error) {
return nil, nil
}
func (*mockWalletController) ListTransactionDetails(startHeight,
endHeight int32) ([]*lnwallet.TransactionDetail, error) {
return nil, nil
}
func (*mockWalletController) LockOutpoint(o wire.OutPoint) {}
func (*mockWalletController) UnlockOutpoint(o wire.OutPoint) {}
func (m *mockWalletController) PublishTransaction(tx *wire.MsgTx,
label string) error {
m.publishedTxns <- tx
return nil
}
func (*mockWalletController) LabelTransaction(hash chainhash.Hash,
label string, overwrite bool) error {
return nil
}
func (*mockWalletController) SubscribeTransactions() (
lnwallet.TransactionSubscription, error) {
return nil, nil
}
func (*mockWalletController) IsSynced() (bool, int64, error) {
return false, 0, nil
}
func (*mockWalletController) Start() error {
return nil
}
func (*mockWalletController) Stop() error {
return nil
}
func (*mockWalletController) BackEnd() string {
return ""
}
func (*mockWalletController) LeaseOutput(wtxmgr.LockID,
wire.OutPoint) (time.Time, error) {
return time.Now(), nil
}
func (*mockWalletController) ReleaseOutput(wtxmgr.LockID, wire.OutPoint) error {
return nil
}
func (*mockWalletController) GetRecoveryInfo() (bool, float64, error) {
return false, 0, nil
}
var _ lnwallet.WalletController = (*mockWalletController)(nil)
type mockNotifier struct {
confChannel chan *chainntnfs.TxConfirmation
}
func (m *mockNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
_ []byte, numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent,
error) {
return &chainntnfs.ConfirmationEvent{
Confirmed: m.confChannel,
}, nil
}
func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, _ []byte,
heightHint uint32) (*chainntnfs.SpendEvent, error) {
return &chainntnfs.SpendEvent{
Spend: make(chan *chainntnfs.SpendDetail),
Cancel: func() {},
}, nil
}
func (m *mockNotifier) RegisterBlockEpochNtfn(
bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) {
return &chainntnfs.BlockEpochEvent{
Epochs: make(chan *chainntnfs.BlockEpoch),
Cancel: func() {},
}, nil
}
func (m *mockNotifier) Start() error {
return nil
}
func (m *mockNotifier) Stop() error {
return nil
}
func (m *mockNotifier) Started() bool {
return true
}
var _ chainntnfs.ChainNotifier = (*mockNotifier)(nil)
// createTestPeer creates a channel between two nodes, and returns a peer for
// one of the nodes, together with the channel seen from both nodes. It takes
// an updateChan function which can be used to modify the default values on
// the channel states for each peer.
func createTestPeer(notifier chainntnfs.ChainNotifier,
publTx chan *wire.MsgTx, updateChan func(a, b *channeldb.OpenChannel)) (
*Brontide, *lnwallet.LightningChannel, func(), error) {
aliceKeyPriv, aliceKeyPub := btcec.PrivKeyFromBytes(
btcec.S256(), alicesPrivKey,
)
aliceKeySigner := &keychain.PrivKeyDigestSigner{PrivKey: aliceKeyPriv}
bobKeyPriv, bobKeyPub := btcec.PrivKeyFromBytes(
btcec.S256(), bobsPrivKey,
)
channelCapacity := btcutil.Amount(10 * 1e8)
channelBal := channelCapacity / 2
aliceDustLimit := btcutil.Amount(200)
bobDustLimit := btcutil.Amount(1300)
csvTimeoutAlice := uint32(5)
csvTimeoutBob := uint32(4)
prevOut := &wire.OutPoint{
Hash: chainhash.Hash(testHdSeed),
Index: 0,
}
fundingTxIn := wire.NewTxIn(prevOut, nil, nil)
aliceCfg := channeldb.ChannelConfig{
ChannelConstraints: channeldb.ChannelConstraints{
DustLimit: aliceDustLimit,
MaxPendingAmount: lnwire.MilliSatoshi(rand.Int63()),
ChanReserve: btcutil.Amount(rand.Int63()),
MinHTLC: lnwire.MilliSatoshi(rand.Int63()),
MaxAcceptedHtlcs: uint16(rand.Int31()),
CsvDelay: uint16(csvTimeoutAlice),
},
MultiSigKey: keychain.KeyDescriptor{
PubKey: aliceKeyPub,
},
RevocationBasePoint: keychain.KeyDescriptor{
PubKey: aliceKeyPub,
},
PaymentBasePoint: keychain.KeyDescriptor{
PubKey: aliceKeyPub,
},
DelayBasePoint: keychain.KeyDescriptor{
PubKey: aliceKeyPub,
},
HtlcBasePoint: keychain.KeyDescriptor{
PubKey: aliceKeyPub,
},
}
bobCfg := channeldb.ChannelConfig{
ChannelConstraints: channeldb.ChannelConstraints{
DustLimit: bobDustLimit,
MaxPendingAmount: lnwire.MilliSatoshi(rand.Int63()),
ChanReserve: btcutil.Amount(rand.Int63()),
MinHTLC: lnwire.MilliSatoshi(rand.Int63()),
MaxAcceptedHtlcs: uint16(rand.Int31()),
CsvDelay: uint16(csvTimeoutBob),
},
MultiSigKey: keychain.KeyDescriptor{
PubKey: bobKeyPub,
},
RevocationBasePoint: keychain.KeyDescriptor{
PubKey: bobKeyPub,
},
PaymentBasePoint: keychain.KeyDescriptor{
PubKey: bobKeyPub,
},
DelayBasePoint: keychain.KeyDescriptor{
PubKey: bobKeyPub,
},
HtlcBasePoint: keychain.KeyDescriptor{
PubKey: bobKeyPub,
},
}
bobRoot, err := chainhash.NewHash(bobKeyPriv.Serialize())
if err != nil {
return nil, nil, nil, err
}
bobPreimageProducer := shachain.NewRevocationProducer(*bobRoot)
bobFirstRevoke, err := bobPreimageProducer.AtIndex(0)
if err != nil {
return nil, nil, nil, err
}
bobCommitPoint := input.ComputeCommitmentPoint(bobFirstRevoke[:])
aliceRoot, err := chainhash.NewHash(aliceKeyPriv.Serialize())
if err != nil {
return nil, nil, nil, err
}
alicePreimageProducer := shachain.NewRevocationProducer(*aliceRoot)
aliceFirstRevoke, err := alicePreimageProducer.AtIndex(0)
if err != nil {
return nil, nil, nil, err
}
aliceCommitPoint := input.ComputeCommitmentPoint(aliceFirstRevoke[:])
aliceCommitTx, bobCommitTx, err := lnwallet.CreateCommitmentTxns(
channelBal, channelBal, &aliceCfg, &bobCfg, aliceCommitPoint,
bobCommitPoint, *fundingTxIn, channeldb.SingleFunderTweaklessBit,
)
if err != nil {
return nil, nil, nil, err
}
alicePath, err := ioutil.TempDir("", "alicedb")
if err != nil {
return nil, nil, nil, err
}
dbAlice, err := channeldb.Open(alicePath)
if err != nil {
return nil, nil, nil, err
}
bobPath, err := ioutil.TempDir("", "bobdb")
if err != nil {
return nil, nil, nil, err
}
dbBob, err := channeldb.Open(bobPath)
if err != nil {
return nil, nil, nil, err
}
estimator := chainfee.NewStaticEstimator(12500, 0)
feePerKw, err := estimator.EstimateFeePerKW(1)
if err != nil {
return nil, nil, nil, err
}
// TODO(roasbeef): need to factor in commit fee?
aliceCommit := channeldb.ChannelCommitment{
CommitHeight: 0,
LocalBalance: lnwire.NewMSatFromSatoshis(channelBal),
RemoteBalance: lnwire.NewMSatFromSatoshis(channelBal),
FeePerKw: btcutil.Amount(feePerKw),
CommitFee: feePerKw.FeeForWeight(input.CommitWeight),
CommitTx: aliceCommitTx,
CommitSig: bytes.Repeat([]byte{1}, 71),
}
bobCommit := channeldb.ChannelCommitment{
CommitHeight: 0,
LocalBalance: lnwire.NewMSatFromSatoshis(channelBal),
RemoteBalance: lnwire.NewMSatFromSatoshis(channelBal),
FeePerKw: btcutil.Amount(feePerKw),
CommitFee: feePerKw.FeeForWeight(input.CommitWeight),
CommitTx: bobCommitTx,
CommitSig: bytes.Repeat([]byte{1}, 71),
}
var chanIDBytes [8]byte
if _, err := io.ReadFull(crand.Reader, chanIDBytes[:]); err != nil {
return nil, nil, nil, err
}
shortChanID := lnwire.NewShortChanIDFromInt(
binary.BigEndian.Uint64(chanIDBytes[:]),
)
aliceChannelState := &channeldb.OpenChannel{
LocalChanCfg: aliceCfg,
RemoteChanCfg: bobCfg,
IdentityPub: aliceKeyPub,
FundingOutpoint: *prevOut,
ShortChannelID: shortChanID,
ChanType: channeldb.SingleFunderTweaklessBit,
IsInitiator: true,
Capacity: channelCapacity,
RemoteCurrentRevocation: bobCommitPoint,
RevocationProducer: alicePreimageProducer,
RevocationStore: shachain.NewRevocationStore(),
LocalCommitment: aliceCommit,
RemoteCommitment: aliceCommit,
Db: dbAlice,
Packager: channeldb.NewChannelPackager(shortChanID),
FundingTxn: testTx,
}
bobChannelState := &channeldb.OpenChannel{
LocalChanCfg: bobCfg,
RemoteChanCfg: aliceCfg,
IdentityPub: bobKeyPub,
FundingOutpoint: *prevOut,
ChanType: channeldb.SingleFunderTweaklessBit,
IsInitiator: false,
Capacity: channelCapacity,
RemoteCurrentRevocation: aliceCommitPoint,
RevocationProducer: bobPreimageProducer,
RevocationStore: shachain.NewRevocationStore(),
LocalCommitment: bobCommit,
RemoteCommitment: bobCommit,
Db: dbBob,
Packager: channeldb.NewChannelPackager(shortChanID),
}
// Set custom values on the channel states.
updateChan(aliceChannelState, bobChannelState)
aliceAddr := &net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: 18555,
}
if err := aliceChannelState.SyncPending(aliceAddr, 0); err != nil {
return nil, nil, nil, err
}
bobAddr := &net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: 18556,
}
if err := bobChannelState.SyncPending(bobAddr, 0); err != nil {
return nil, nil, nil, err
}
cleanUpFunc := func() {
os.RemoveAll(bobPath)
os.RemoveAll(alicePath)
}
aliceSigner := &mockSigner{aliceKeyPriv}
bobSigner := &mockSigner{bobKeyPriv}
alicePool := lnwallet.NewSigPool(1, aliceSigner)
channelAlice, err := lnwallet.NewLightningChannel(
aliceSigner, aliceChannelState, alicePool,
)
if err != nil {
return nil, nil, nil, err
}
_ = alicePool.Start()
bobPool := lnwallet.NewSigPool(1, bobSigner)
channelBob, err := lnwallet.NewLightningChannel(
bobSigner, bobChannelState, bobPool,
)
if err != nil {
return nil, nil, nil, err
}
_ = bobPool.Start()
chainIO := &mockChainIO{
bestHeight: broadcastHeight,
}
wallet := &lnwallet.LightningWallet{
WalletController: &mockWalletController{
rootKey: aliceKeyPriv,
publishedTxns: publTx,
},
}
_, currentHeight, err := chainIO.GetBestBlock()
if err != nil {
return nil, nil, nil, err
}
htlcSwitch, err := htlcswitch.New(htlcswitch.Config{
DB: dbAlice,
SwitchPackager: channeldb.NewSwitchPackager(),
Notifier: notifier,
FwdEventTicker: ticker.New(
htlcswitch.DefaultFwdEventInterval),
LogEventTicker: ticker.New(
htlcswitch.DefaultLogInterval),
AckEventTicker: ticker.New(
htlcswitch.DefaultAckInterval),
}, uint32(currentHeight))
if err != nil {
return nil, nil, nil, err
}
if err = htlcSwitch.Start(); err != nil {
return nil, nil, nil, err
}
nodeSignerAlice := netann.NewNodeSigner(aliceKeySigner)
const chanActiveTimeout = time.Minute
chanStatusMgr, err := netann.NewChanStatusManager(&netann.ChanStatusConfig{
ChanStatusSampleInterval: 30 * time.Second,
ChanEnableTimeout: chanActiveTimeout,
ChanDisableTimeout: 2 * time.Minute,
DB: dbAlice,
Graph: dbAlice.ChannelGraph(),
MessageSigner: nodeSignerAlice,
OurPubKey: aliceKeyPub,
IsChannelActive: htlcSwitch.HasActiveLink,
ApplyChannelUpdate: func(*lnwire.ChannelUpdate) error { return nil },
})
if err != nil {
return nil, nil, nil, err
}
if err = chanStatusMgr.Start(); err != nil {
return nil, nil, nil, err
}
errBuffer, err := queue.NewCircularBuffer(ErrorBufferSize)
if err != nil {
return nil, nil, nil, err
}
var pubKey [33]byte
copy(pubKey[:], aliceKeyPub.SerializeCompressed())
cfgAddr := &lnwire.NetAddress{
IdentityKey: aliceKeyPub,
Address: aliceAddr,
ChainNet: wire.SimNet,
}
cfg := &Config{
Addr: cfgAddr,
PubKeyBytes: pubKey,
ErrorBuffer: errBuffer,
ChainIO: chainIO,
Switch: htlcSwitch,
ChanActiveTimeout: chanActiveTimeout,
InterceptSwitch: htlcswitch.NewInterceptableSwitch(htlcSwitch),
ChannelDB: dbAlice,
FeeEstimator: estimator,
Wallet: wallet,
ChainNotifier: notifier,
ChanStatusMgr: chanStatusMgr,
DisconnectPeer: func(b *btcec.PublicKey) error { return nil },
}
alicePeer := NewBrontide(*cfg)
chanID := lnwire.NewChanIDFromOutPoint(channelAlice.ChannelPoint())
alicePeer.activeChannels[chanID] = channelAlice
alicePeer.wg.Add(1)
go alicePeer.channelManager()
return alicePeer, channelBob, cleanUpFunc, nil
}

View File

@ -57,6 +57,7 @@ import (
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/macaroons"
"github.com/lightningnetwork/lnd/monitoring"
"github.com/lightningnetwork/lnd/peer"
"github.com/lightningnetwork/lnd/peernotifier"
"github.com/lightningnetwork/lnd/record"
"github.com/lightningnetwork/lnd/routing"
@ -2105,17 +2106,17 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest,
// With the transaction broadcast, we send our first update to
// the client.
updateChan = make(chan interface{}, 2)
updateChan <- &pendingUpdate{
updateChan <- &peer.PendingUpdate{
Txid: closingTxid[:],
}
errChan = make(chan error, 1)
notifier := r.server.cc.chainNotifier
go waitForChanToClose(uint32(bestHeight), notifier, errChan, chanPoint,
go peer.WaitForChanToClose(uint32(bestHeight), notifier, errChan, chanPoint,
&closingTxid, closingTx.TxOut[0].PkScript, func() {
// Respond to the local subsystem which
// requested the channel closure.
updateChan <- &channelCloseUpdate{
updateChan <- &peer.ChannelCloseUpdate{
ClosingTxid: closingTxid[:],
Success: true,
}
@ -2228,7 +2229,7 @@ out:
// then we can break out of our dispatch loop as we no
// longer need to process any further updates.
switch closeUpdate := closingUpdate.(type) {
case *channelCloseUpdate:
case *peer.ChannelCloseUpdate:
h, _ := chainhash.NewHash(closeUpdate.ClosingTxid)
rpcsLog.Infof("[closechannel] close completed: "+
"txid(%v)", h)
@ -2246,7 +2247,7 @@ func createRPCCloseUpdate(update interface{}) (
*lnrpc.CloseStatusUpdate, error) {
switch u := update.(type) {
case *channelCloseUpdate:
case *peer.ChannelCloseUpdate:
return &lnrpc.CloseStatusUpdate{
Update: &lnrpc.CloseStatusUpdate_ChanClose{
ChanClose: &lnrpc.ChannelCloseUpdate{
@ -2254,7 +2255,7 @@ func createRPCCloseUpdate(update interface{}) (
},
},
}, nil
case *pendingUpdate:
case *peer.PendingUpdate:
return &lnrpc.CloseStatusUpdate{
Update: &lnrpc.CloseStatusUpdate_ClosePending{
ClosePending: &lnrpc.PendingUpdate{
@ -2571,12 +2572,12 @@ func (r *rpcServer) ListPeers(ctx context.Context,
serverPeer.RemoteFeatures(),
)
peer := &lnrpc.Peer{
rpcPeer := &lnrpc.Peer{
PubKey: hex.EncodeToString(nodePub[:]),
Address: serverPeer.conn.RemoteAddr().String(),
Inbound: serverPeer.inbound,
BytesRecv: atomic.LoadUint64(&serverPeer.bytesReceived),
BytesSent: atomic.LoadUint64(&serverPeer.bytesSent),
Address: serverPeer.Conn().RemoteAddr().String(),
Inbound: serverPeer.Inbound(),
BytesRecv: serverPeer.BytesReceived(),
BytesSent: serverPeer.BytesSent(),
SatSent: satSent,
SatRecv: satRecv,
PingTime: serverPeer.PingTime(),
@ -2591,27 +2592,27 @@ func (r *rpcServer) ListPeers(ctx context.Context,
// it is non-nil. If we want all the stored errors, simply
// add the full list to our set of errors.
if in.LatestError {
latestErr := serverPeer.errorBuffer.Latest()
latestErr := serverPeer.ErrorBuffer().Latest()
if latestErr != nil {
peerErrors = []interface{}{latestErr}
}
} else {
peerErrors = serverPeer.errorBuffer.List()
peerErrors = serverPeer.ErrorBuffer().List()
}
// Add the relevant peer errors to our response.
for _, error := range peerErrors {
tsError := error.(*timestampedError)
tsError := error.(*peer.TimestampedError)
rpcErr := &lnrpc.TimestampedError{
Timestamp: uint64(tsError.timestamp.Unix()),
Error: tsError.error.Error(),
Timestamp: uint64(tsError.Timestamp.Unix()),
Error: tsError.Error.Error(),
}
peer.Errors = append(peer.Errors, rpcErr)
rpcPeer.Errors = append(rpcPeer.Errors, rpcErr)
}
resp.Peers = append(resp.Peers, peer)
resp.Peers = append(resp.Peers, rpcPeer)
}
rpcsLog.Debugf("[listpeers] yielded %v peers", serverPeers)

167
server.go
View File

@ -52,6 +52,7 @@ import (
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/nat"
"github.com/lightningnetwork/lnd/netann"
"github.com/lightningnetwork/lnd/peer"
"github.com/lightningnetwork/lnd/peernotifier"
"github.com/lightningnetwork/lnd/pool"
"github.com/lightningnetwork/lnd/queue"
@ -113,7 +114,7 @@ var (
// errPeerAlreadyConnected is an error returned by the server when we're
// commanded to connect to a peer, but they're already connected.
type errPeerAlreadyConnected struct {
peer *peer
peer *peer.Brontide
}
// Error returns the human readable version of this error type.
@ -167,10 +168,10 @@ type server struct {
lastDetectedIP net.IP
mu sync.RWMutex
peersByPub map[string]*peer
peersByPub map[string]*peer.Brontide
inboundPeers map[string]*peer
outboundPeers map[string]*peer
inboundPeers map[string]*peer.Brontide
outboundPeers map[string]*peer.Brontide
peerConnectedListeners map[string][]chan<- lnpeer.Peer
peerDisconnectedListeners map[string][]chan<- struct{}
@ -190,7 +191,7 @@ type server struct {
// a disconnect. Adding a peer to this map causes the peer termination
// watcher to short circuit in the event that peers are purposefully
// disconnected.
ignorePeerTermination map[*peer]struct{}
ignorePeerTermination map[*peer.Brontide]struct{}
// scheduledPeerConnection maps a pubkey string to a callback that
// should be executed in the peerTerminationWatcher the prior peer with
@ -452,12 +453,12 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
persistentConnReqs: make(map[string][]*connmgr.ConnReq),
persistentRetryCancels: make(map[string]chan struct{}),
peerErrors: make(map[string]*queue.CircularBuffer),
ignorePeerTermination: make(map[*peer]struct{}),
ignorePeerTermination: make(map[*peer.Brontide]struct{}),
scheduledPeerConnection: make(map[string]func()),
peersByPub: make(map[string]*peer),
inboundPeers: make(map[string]*peer),
outboundPeers: make(map[string]*peer),
peersByPub: make(map[string]*peer.Brontide),
inboundPeers: make(map[string]*peer.Brontide),
outboundPeers: make(map[string]*peer.Brontide),
peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer),
peerDisconnectedListeners: make(map[string][]chan<- struct{}),
@ -491,15 +492,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, chanDB *channeldb.DB,
return
}
select {
case peer.localCloseChanReqs <- request:
srvrLog.Infof("Local close channel request "+
"delivered to peer: %x", pubKey[:])
case <-peer.quit:
srvrLog.Errorf("Unable to deliver local close "+
"channel request to peer %x, err: %v",
pubKey[:], err)
}
peer.HandleLocalCloseChanReqs(request)
},
FwdingLog: chanDB.ForwardingLog(),
SwitchPackager: channeldb.NewSwitchPackager(),
@ -1481,7 +1474,13 @@ func (s *server) Stop() error {
// Disconnect from each active peers to ensure that
// peerTerminationWatchers signal completion to each peer.
for _, peer := range s.Peers() {
s.DisconnectPeer(peer.addr.IdentityKey)
err := s.DisconnectPeer(peer.IdentityKey())
if err != nil {
srvrLog.Warnf("could not disconnect peer: %v"+
"received error: %v", peer.IdentityKey(),
err,
)
}
}
// Now that all connections have been torn down, stop the tower
@ -1820,7 +1819,7 @@ func (s *server) peerBootstrapper(numTargetPeers uint32,
s.mu.RLock()
ignoreList := make(map[autopilot.NodeID]struct{})
for _, peer := range s.peersByPub {
nID := autopilot.NewNodeID(peer.addr.IdentityKey)
nID := autopilot.NewNodeID(peer.IdentityKey())
ignoreList[nID] = struct{}{}
}
s.mu.RUnlock()
@ -2310,12 +2309,12 @@ func (s *server) BroadcastMessage(skips map[route.Vertex]struct{},
// peersByPub throughout this process to ensure we deliver messages to
// exact set of peers present at the time of invocation.
s.mu.RLock()
peers := make([]*peer, 0, len(s.peersByPub))
peers := make([]*peer.Brontide, 0, len(s.peersByPub))
for _, sPeer := range s.peersByPub {
if skips != nil {
if _, ok := skips[sPeer.pubKeyBytes]; ok {
if _, ok := skips[sPeer.PubKey()]; ok {
srvrLog.Tracef("Skipping %x in broadcast",
sPeer.pubKeyBytes[:])
sPeer.PubKey())
continue
}
}
@ -2413,7 +2412,7 @@ func (s *server) NotifyWhenOffline(peerPubKey [33]byte) <-chan struct{} {
// daemon's local representation of the remote peer.
//
// NOTE: This function is safe for concurrent access.
func (s *server) FindPeer(peerKey *btcec.PublicKey) (*peer, error) {
func (s *server) FindPeer(peerKey *btcec.PublicKey) (*peer.Brontide, error) {
s.mu.RLock()
defer s.mu.RUnlock()
@ -2427,7 +2426,7 @@ func (s *server) FindPeer(peerKey *btcec.PublicKey) (*peer, error) {
// public key.
//
// NOTE: This function is safe for concurrent access.
func (s *server) FindPeerByPubStr(pubStr string) (*peer, error) {
func (s *server) FindPeerByPubStr(pubStr string) (*peer.Brontide, error) {
s.mu.RLock()
defer s.mu.RUnlock()
@ -2436,7 +2435,7 @@ func (s *server) FindPeerByPubStr(pubStr string) (*peer, error) {
// findPeerByPubStr is an internal method that retrieves the specified peer from
// the server's internal state using.
func (s *server) findPeerByPubStr(pubStr string) (*peer, error) {
func (s *server) findPeerByPubStr(pubStr string) (*peer.Brontide, error) {
peer, ok := s.peersByPub[pubStr]
if !ok {
return nil, ErrPeerNotConnected
@ -2565,7 +2564,7 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
// we'll close out the new connection s.t there's only a single
// connection between us.
localPub := s.identityECDH.PubKey()
if !connectedPeer.inbound &&
if !connectedPeer.Inbound() &&
!shouldDropLocalConnection(localPub, nodePub) {
srvrLog.Warnf("Received inbound connection from "+
@ -2676,7 +2675,7 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
// we'll close out the new connection s.t there's only a single
// connection between us.
localPub := s.identityECDH.PubKey()
if connectedPeer.inbound &&
if connectedPeer.Inbound() &&
shouldDropLocalConnection(localPub, nodePub) {
srvrLog.Warnf("Established outbound connection to "+
@ -2786,7 +2785,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
errBuffer, ok := s.peerErrors[pkStr]
if !ok {
var err error
errBuffer, err = queue.NewCircularBuffer(errorBufferSize)
errBuffer, err = queue.NewCircularBuffer(peer.ErrorBufferSize)
if err != nil {
srvrLog.Errorf("unable to create peer %v", err)
return
@ -2799,16 +2798,63 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
// offered that would trigger channel closure. In case of outgoing
// htlcs, an extra block is added to prevent the channel from being
// closed when the htlc is outstanding and a new block comes in.
p, err := newPeer(
s.cfg, conn, connReq, s, peerAddr, inbound, initFeatures,
legacyFeatures, s.cfg.ChanEnableTimeout,
lncfg.DefaultOutgoingCltvRejectDelta, errBuffer,
)
if err != nil {
srvrLog.Errorf("unable to create peer %v", err)
return
pCfg := peer.Config{
Conn: conn,
ConnReq: connReq,
Addr: peerAddr,
Inbound: inbound,
Features: initFeatures,
LegacyFeatures: legacyFeatures,
OutgoingCltvRejectDelta: lncfg.DefaultOutgoingCltvRejectDelta,
ChanActiveTimeout: s.cfg.ChanEnableTimeout,
ErrorBuffer: errBuffer,
WritePool: s.writePool,
ReadPool: s.readPool,
Switch: s.htlcSwitch,
InterceptSwitch: s.interceptableSwitch,
ChannelDB: s.chanDB,
ChainArb: s.chainArb,
AuthGossiper: s.authGossiper,
ChanStatusMgr: s.chanStatusMgr,
ChainIO: s.cc.chainIO,
FeeEstimator: s.cc.feeEstimator,
Signer: s.cc.wallet.Cfg.Signer,
SigPool: s.sigPool,
Wallet: s.cc.wallet,
ChainNotifier: s.cc.chainNotifier,
RoutingPolicy: s.cc.routingPolicy,
Sphinx: s.sphinx,
WitnessBeacon: s.witnessBeacon,
Invoices: s.invoices,
ChannelNotifier: s.channelNotifier,
HtlcNotifier: s.htlcNotifier,
TowerClient: s.towerClient,
DisconnectPeer: s.DisconnectPeer,
GenNodeAnnouncement: s.genNodeAnnouncement,
PrunePersistentPeerConnection: s.prunePersistentPeerConnection,
FetchLastChanUpdate: s.fetchLastChanUpdate(),
ProcessFundingOpen: s.fundingMgr.processFundingOpen,
ProcessFundingAccept: s.fundingMgr.processFundingAccept,
ProcessFundingCreated: s.fundingMgr.processFundingCreated,
ProcessFundingSigned: s.fundingMgr.processFundingSigned,
ProcessFundingLocked: s.fundingMgr.processFundingLocked,
ProcessFundingError: s.fundingMgr.processFundingError,
IsPendingChannel: s.fundingMgr.IsPendingChannel,
Hodl: s.cfg.Hodl,
UnsafeReplay: s.cfg.UnsafeReplay,
MaxOutgoingCltvExpiry: s.cfg.MaxOutgoingCltvExpiry,
MaxChannelFeeAllocation: s.cfg.MaxChannelFeeAllocation,
Quit: s.quit,
}
copy(pCfg.PubKeyBytes[:], peerAddr.IdentityKey.SerializeCompressed())
copy(pCfg.ServerPubKey[:], s.identityECDH.PubKey().SerializeCompressed())
p := peer.NewBrontide(pCfg)
// TODO(roasbeef): update IP address for link-node
// * also mark last-seen, do it one single transaction?
@ -2828,7 +2874,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
// addPeer adds the passed peer to the server's global state of all active
// peers.
func (s *server) addPeer(p *peer) {
func (s *server) addPeer(p *peer.Brontide) {
if p == nil {
return
}
@ -2844,12 +2890,12 @@ func (s *server) addPeer(p *peer) {
// TODO(roasbeef): pipe all requests through to the
// queryHandler/peerManager
pubSer := p.addr.IdentityKey.SerializeCompressed()
pubSer := p.IdentityKey().SerializeCompressed()
pubStr := string(pubSer)
s.peersByPub[pubStr] = p
if p.inbound {
if p.Inbound() {
s.inboundPeers[pubStr] = p
} else {
s.outboundPeers[pubStr] = p
@ -2872,7 +2918,7 @@ func (s *server) addPeer(p *peer) {
// be signaled of the new peer once the method returns.
//
// NOTE: This MUST be launched as a goroutine.
func (s *server) peerInitializer(p *peer) {
func (s *server) peerInitializer(p *peer.Brontide) {
defer s.wg.Done()
// Avoid initializing peers while the server is exiting.
@ -2905,7 +2951,7 @@ func (s *server) peerInitializer(p *peer) {
// was successful, and to begin watching the peer's wait group.
close(ready)
pubStr := string(p.addr.IdentityKey.SerializeCompressed())
pubStr := string(p.IdentityKey().SerializeCompressed())
s.mu.Lock()
defer s.mu.Unlock()
@ -2933,7 +2979,7 @@ func (s *server) peerInitializer(p *peer) {
// successfully, otherwise the peer should be disconnected instead.
//
// NOTE: This MUST be launched as a goroutine.
func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) {
func (s *server) peerTerminationWatcher(p *peer.Brontide, ready chan struct{}) {
defer s.wg.Done()
p.WaitForDisconnect(ready)
@ -2952,7 +2998,7 @@ func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) {
// available for use.
s.fundingMgr.CancelPeerReservations(p.PubKey())
pubKey := p.addr.IdentityKey
pubKey := p.IdentityKey()
// We'll also inform the gossiper that this peer is no longer active,
// so we don't need to maintain sync state for it any longer.
@ -2963,13 +3009,13 @@ func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) {
// with this interface should be closed.
//
// TODO(roasbeef): instead add a PurgeInterfaceLinks function?
links, err := p.server.htlcSwitch.GetLinksByInterface(p.pubKeyBytes)
links, err := s.htlcSwitch.GetLinksByInterface(p.PubKey())
if err != nil && err != htlcswitch.ErrNoLinksFound {
srvrLog.Errorf("Unable to get channel links for %v: %v", p, err)
}
for _, link := range links {
p.server.htlcSwitch.RemoveLink(link.ChanID())
s.htlcSwitch.RemoveLink(link.ChanID())
}
s.mu.Lock()
@ -3022,12 +3068,12 @@ func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) {
// within the peer's address for reconnection purposes.
//
// TODO(roasbeef): use them all?
if p.inbound {
if p.Inbound() {
advertisedAddr, err := s.fetchNodeAdvertisedAddr(pubKey)
switch {
// We found an advertised address, so use it.
case err == nil:
p.addr.Address = advertisedAddr
p.SetAddress(advertisedAddr)
// The peer doesn't have an advertised address.
case err == errNoAdvertisedAddr:
@ -3060,7 +3106,7 @@ func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) {
// Otherwise, we'll launch a new connection request in order to
// attempt to maintain a persistent connection with this peer.
connReq := &connmgr.ConnReq{
Addr: p.addr,
Addr: p.NetAddress(),
Permanent: true,
}
s.persistentConnReqs[pubStr] = append(
@ -3103,7 +3149,7 @@ func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) {
// removePeer removes the passed peer from the server's state of all active
// peers.
func (s *server) removePeer(p *peer) {
func (s *server) removePeer(p *peer.Brontide) {
if p == nil {
return
}
@ -3115,8 +3161,8 @@ func (s *server) removePeer(p *peer) {
p.Disconnect(fmt.Errorf("server: disconnecting peer %v", p))
// If this peer had an active persistent connection request, remove it.
if p.connReq != nil {
s.connMgr.Remove(p.connReq.ID())
if p.ConnReq() != nil {
s.connMgr.Remove(p.ConnReq().ID())
}
// Ignore deleting peers if we're shutting down.
@ -3124,12 +3170,13 @@ func (s *server) removePeer(p *peer) {
return
}
pubSer := p.addr.IdentityKey.SerializeCompressed()
pKey := p.PubKey()
pubSer := pKey[:]
pubStr := string(pubSer)
delete(s.peersByPub, pubStr)
if p.inbound {
if p.Inbound() {
delete(s.inboundPeers, pubStr)
} else {
delete(s.outboundPeers, pubStr)
@ -3137,8 +3184,8 @@ func (s *server) removePeer(p *peer) {
// Copy the peer's error buffer across to the server if it has any items
// in it so that we can restore peer errors across connections.
if p.errorBuffer.Total() > 0 {
s.peerErrors[pubStr] = p.errorBuffer
if p.ErrorBuffer().Total() > 0 {
s.peerErrors[pubStr] = p.ErrorBuffer()
}
// Inform the peer notifier of a peer offline event so that it can be
@ -3358,8 +3405,8 @@ func (s *server) OpenChannel(
// We'll wait until the peer is active before beginning the channel
// opening process.
select {
case <-peer.activeSignal:
case <-peer.quit:
case <-peer.ActiveSignal():
case <-peer.QuitSignal():
req.err <- fmt.Errorf("peer %x disconnected", pubKeyBytes)
return req.updates, req.err
case <-s.quit:
@ -3391,11 +3438,11 @@ func (s *server) OpenChannel(
// Peers returns a slice of all active peers.
//
// NOTE: This function is safe for concurrent access.
func (s *server) Peers() []*peer {
func (s *server) Peers() []*peer.Brontide {
s.mu.RLock()
defer s.mu.RUnlock()
peers := make([]*peer, 0, len(s.peersByPub))
peers := make([]*peer.Brontide, 0, len(s.peersByPub))
for _, peer := range s.peersByPub {
peers = append(peers, peer)
}

View File

@ -1,34 +1,8 @@
package lnd
import (
"bytes"
crand "crypto/rand"
"encoding/binary"
"io"
"io/ioutil"
"math/rand"
"net"
"os"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
"github.com/lightningnetwork/lnd/lnwallet/chancloser"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/netann"
"github.com/lightningnetwork/lnd/shachain"
"github.com/lightningnetwork/lnd/ticker"
)
var (
@ -54,9 +28,6 @@ var (
0x6a, 0x49, 0x18, 0x83, 0x31, 0x98, 0x47, 0x53,
}
// Just use some arbitrary bytes as delivery script.
dummyDeliveryScript = alicesPrivKey[:]
// testTx is used as the default funding txn for single-funder channels.
testTx = &wire.MsgTx{
Version: 1,
@ -91,372 +62,3 @@ var (
LockTime: 5,
}
)
// noUpdate is a function which can be used as a parameter in createTestPeer to
// call the setup code with no custom values on the channels set up.
var noUpdate = func(a, b *channeldb.OpenChannel) {}
// createTestPeer creates a channel between two nodes, and returns a peer for
// one of the nodes, together with the channel seen from both nodes. It takes
// an updateChan function which can be used to modify the default values on
// the channel states for each peer.
func createTestPeer(notifier chainntnfs.ChainNotifier, publTx chan *wire.MsgTx,
updateChan func(a, b *channeldb.OpenChannel)) (*peer, *lnwallet.LightningChannel,
*lnwallet.LightningChannel, func(), error) {
aliceKeyPriv, aliceKeyPub := btcec.PrivKeyFromBytes(
btcec.S256(), alicesPrivKey,
)
aliceKeySigner := &keychain.PrivKeyDigestSigner{PrivKey: aliceKeyPriv}
bobKeyPriv, bobKeyPub := btcec.PrivKeyFromBytes(
btcec.S256(), bobsPrivKey,
)
channelCapacity := btcutil.Amount(10 * 1e8)
channelBal := channelCapacity / 2
aliceDustLimit := btcutil.Amount(200)
bobDustLimit := btcutil.Amount(1300)
csvTimeoutAlice := uint32(5)
csvTimeoutBob := uint32(4)
prevOut := &wire.OutPoint{
Hash: chainhash.Hash(testHdSeed),
Index: 0,
}
fundingTxIn := wire.NewTxIn(prevOut, nil, nil)
aliceCfg := channeldb.ChannelConfig{
ChannelConstraints: channeldb.ChannelConstraints{
DustLimit: aliceDustLimit,
MaxPendingAmount: lnwire.MilliSatoshi(rand.Int63()),
ChanReserve: btcutil.Amount(rand.Int63()),
MinHTLC: lnwire.MilliSatoshi(rand.Int63()),
MaxAcceptedHtlcs: uint16(rand.Int31()),
CsvDelay: uint16(csvTimeoutAlice),
},
MultiSigKey: keychain.KeyDescriptor{
PubKey: aliceKeyPub,
},
RevocationBasePoint: keychain.KeyDescriptor{
PubKey: aliceKeyPub,
},
PaymentBasePoint: keychain.KeyDescriptor{
PubKey: aliceKeyPub,
},
DelayBasePoint: keychain.KeyDescriptor{
PubKey: aliceKeyPub,
},
HtlcBasePoint: keychain.KeyDescriptor{
PubKey: aliceKeyPub,
},
}
bobCfg := channeldb.ChannelConfig{
ChannelConstraints: channeldb.ChannelConstraints{
DustLimit: bobDustLimit,
MaxPendingAmount: lnwire.MilliSatoshi(rand.Int63()),
ChanReserve: btcutil.Amount(rand.Int63()),
MinHTLC: lnwire.MilliSatoshi(rand.Int63()),
MaxAcceptedHtlcs: uint16(rand.Int31()),
CsvDelay: uint16(csvTimeoutBob),
},
MultiSigKey: keychain.KeyDescriptor{
PubKey: bobKeyPub,
},
RevocationBasePoint: keychain.KeyDescriptor{
PubKey: bobKeyPub,
},
PaymentBasePoint: keychain.KeyDescriptor{
PubKey: bobKeyPub,
},
DelayBasePoint: keychain.KeyDescriptor{
PubKey: bobKeyPub,
},
HtlcBasePoint: keychain.KeyDescriptor{
PubKey: bobKeyPub,
},
}
bobRoot, err := chainhash.NewHash(bobKeyPriv.Serialize())
if err != nil {
return nil, nil, nil, nil, err
}
bobPreimageProducer := shachain.NewRevocationProducer(*bobRoot)
bobFirstRevoke, err := bobPreimageProducer.AtIndex(0)
if err != nil {
return nil, nil, nil, nil, err
}
bobCommitPoint := input.ComputeCommitmentPoint(bobFirstRevoke[:])
aliceRoot, err := chainhash.NewHash(aliceKeyPriv.Serialize())
if err != nil {
return nil, nil, nil, nil, err
}
alicePreimageProducer := shachain.NewRevocationProducer(*aliceRoot)
aliceFirstRevoke, err := alicePreimageProducer.AtIndex(0)
if err != nil {
return nil, nil, nil, nil, err
}
aliceCommitPoint := input.ComputeCommitmentPoint(aliceFirstRevoke[:])
aliceCommitTx, bobCommitTx, err := lnwallet.CreateCommitmentTxns(
channelBal, channelBal, &aliceCfg, &bobCfg, aliceCommitPoint,
bobCommitPoint, *fundingTxIn, channeldb.SingleFunderTweaklessBit,
)
if err != nil {
return nil, nil, nil, nil, err
}
alicePath, err := ioutil.TempDir("", "alicedb")
if err != nil {
return nil, nil, nil, nil, err
}
dbAlice, err := channeldb.Open(alicePath)
if err != nil {
return nil, nil, nil, nil, err
}
bobPath, err := ioutil.TempDir("", "bobdb")
if err != nil {
return nil, nil, nil, nil, err
}
dbBob, err := channeldb.Open(bobPath)
if err != nil {
return nil, nil, nil, nil, err
}
estimator := chainfee.NewStaticEstimator(12500, 0)
feePerKw, err := estimator.EstimateFeePerKW(1)
if err != nil {
return nil, nil, nil, nil, err
}
// TODO(roasbeef): need to factor in commit fee?
aliceCommit := channeldb.ChannelCommitment{
CommitHeight: 0,
LocalBalance: lnwire.NewMSatFromSatoshis(channelBal),
RemoteBalance: lnwire.NewMSatFromSatoshis(channelBal),
FeePerKw: btcutil.Amount(feePerKw),
CommitFee: feePerKw.FeeForWeight(input.CommitWeight),
CommitTx: aliceCommitTx,
CommitSig: bytes.Repeat([]byte{1}, 71),
}
bobCommit := channeldb.ChannelCommitment{
CommitHeight: 0,
LocalBalance: lnwire.NewMSatFromSatoshis(channelBal),
RemoteBalance: lnwire.NewMSatFromSatoshis(channelBal),
FeePerKw: btcutil.Amount(feePerKw),
CommitFee: feePerKw.FeeForWeight(input.CommitWeight),
CommitTx: bobCommitTx,
CommitSig: bytes.Repeat([]byte{1}, 71),
}
var chanIDBytes [8]byte
if _, err := io.ReadFull(crand.Reader, chanIDBytes[:]); err != nil {
return nil, nil, nil, nil, err
}
shortChanID := lnwire.NewShortChanIDFromInt(
binary.BigEndian.Uint64(chanIDBytes[:]),
)
aliceChannelState := &channeldb.OpenChannel{
LocalChanCfg: aliceCfg,
RemoteChanCfg: bobCfg,
IdentityPub: aliceKeyPub,
FundingOutpoint: *prevOut,
ShortChannelID: shortChanID,
ChanType: channeldb.SingleFunderTweaklessBit,
IsInitiator: true,
Capacity: channelCapacity,
RemoteCurrentRevocation: bobCommitPoint,
RevocationProducer: alicePreimageProducer,
RevocationStore: shachain.NewRevocationStore(),
LocalCommitment: aliceCommit,
RemoteCommitment: aliceCommit,
Db: dbAlice,
Packager: channeldb.NewChannelPackager(shortChanID),
FundingTxn: testTx,
}
bobChannelState := &channeldb.OpenChannel{
LocalChanCfg: bobCfg,
RemoteChanCfg: aliceCfg,
IdentityPub: bobKeyPub,
FundingOutpoint: *prevOut,
ChanType: channeldb.SingleFunderTweaklessBit,
IsInitiator: false,
Capacity: channelCapacity,
RemoteCurrentRevocation: aliceCommitPoint,
RevocationProducer: bobPreimageProducer,
RevocationStore: shachain.NewRevocationStore(),
LocalCommitment: bobCommit,
RemoteCommitment: bobCommit,
Db: dbBob,
Packager: channeldb.NewChannelPackager(shortChanID),
}
// Set custom values on the channel states.
updateChan(aliceChannelState, bobChannelState)
aliceAddr := &net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: 18555,
}
if err := aliceChannelState.SyncPending(aliceAddr, 0); err != nil {
return nil, nil, nil, nil, err
}
bobAddr := &net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: 18556,
}
if err := bobChannelState.SyncPending(bobAddr, 0); err != nil {
return nil, nil, nil, nil, err
}
cleanUpFunc := func() {
os.RemoveAll(bobPath)
os.RemoveAll(alicePath)
}
aliceSigner := &mockSigner{aliceKeyPriv}
bobSigner := &mockSigner{bobKeyPriv}
alicePool := lnwallet.NewSigPool(1, aliceSigner)
channelAlice, err := lnwallet.NewLightningChannel(
aliceSigner, aliceChannelState, alicePool,
)
if err != nil {
return nil, nil, nil, nil, err
}
alicePool.Start()
bobPool := lnwallet.NewSigPool(1, bobSigner)
channelBob, err := lnwallet.NewLightningChannel(
bobSigner, bobChannelState, bobPool,
)
if err != nil {
return nil, nil, nil, nil, err
}
bobPool.Start()
chainIO := &mockChainIO{
bestHeight: fundingBroadcastHeight,
}
wallet := &lnwallet.LightningWallet{
WalletController: &mockWalletController{
rootKey: aliceKeyPriv,
publishedTransactions: publTx,
},
}
cc := &chainControl{
feeEstimator: estimator,
chainIO: chainIO,
chainNotifier: notifier,
wallet: wallet,
}
breachArbiter := &breachArbiter{}
chainArb := contractcourt.NewChainArbitrator(
contractcourt.ChainArbitratorConfig{
Notifier: notifier,
ChainIO: chainIO,
IsForwardedHTLC: func(chanID lnwire.ShortChannelID,
htlcIndex uint64) bool {
return true
},
Clock: clock.NewDefaultClock(),
}, dbAlice,
)
chainArb.WatchNewChannel(aliceChannelState)
s := &server{
chanDB: dbAlice,
cc: cc,
breachArbiter: breachArbiter,
chainArb: chainArb,
}
_, currentHeight, err := s.cc.chainIO.GetBestBlock()
if err != nil {
return nil, nil, nil, nil, err
}
htlcSwitch, err := htlcswitch.New(htlcswitch.Config{
DB: dbAlice,
SwitchPackager: channeldb.NewSwitchPackager(),
Notifier: notifier,
FwdEventTicker: ticker.New(
htlcswitch.DefaultFwdEventInterval),
LogEventTicker: ticker.New(
htlcswitch.DefaultLogInterval),
AckEventTicker: ticker.New(
htlcswitch.DefaultAckInterval),
}, uint32(currentHeight))
if err != nil {
return nil, nil, nil, nil, err
}
if err = htlcSwitch.Start(); err != nil {
return nil, nil, nil, nil, err
}
s.htlcSwitch = htlcSwitch
nodeSignerAlice := netann.NewNodeSigner(aliceKeySigner)
const chanActiveTimeout = time.Minute
chanStatusMgr, err := netann.NewChanStatusManager(&netann.ChanStatusConfig{
ChanStatusSampleInterval: 30 * time.Second,
ChanEnableTimeout: chanActiveTimeout,
ChanDisableTimeout: 2 * time.Minute,
DB: dbAlice,
Graph: dbAlice.ChannelGraph(),
MessageSigner: nodeSignerAlice,
OurPubKey: aliceKeyPub,
IsChannelActive: s.htlcSwitch.HasActiveLink,
ApplyChannelUpdate: func(*lnwire.ChannelUpdate) error { return nil },
})
if err != nil {
return nil, nil, nil, nil, err
}
if err = chanStatusMgr.Start(); err != nil {
return nil, nil, nil, nil, err
}
s.chanStatusMgr = chanStatusMgr
alicePeer := &peer{
addr: &lnwire.NetAddress{
IdentityKey: aliceKeyPub,
Address: aliceAddr,
},
server: s,
sendQueue: make(chan outgoingMsg, 1),
outgoingQueue: make(chan outgoingMsg, outgoingQueueLen),
activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel),
newChannels: make(chan *newChannelMsg, 1),
activeChanCloses: make(map[lnwire.ChannelID]*chancloser.ChanCloser),
localCloseChanReqs: make(chan *htlcswitch.ChanClose),
chanCloseMsgs: make(chan *closeMsg),
chanActiveTimeout: chanActiveTimeout,
queueQuit: make(chan struct{}),
quit: make(chan struct{}),
}
chanID := lnwire.NewChanIDFromOutPoint(channelAlice.ChannelPoint())
alicePeer.activeChannels[chanID] = channelAlice
alicePeer.wg.Add(1)
go alicePeer.channelManager()
return alicePeer, channelAlice, channelBob, cleanUpFunc, nil
}