mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-03-27 18:22:24 +01:00
Merge pull request #9197 from guggero/aux-signer-batching-fixes
[custom channels]: Aux signer batching fixes
This commit is contained in:
commit
49275e1d46
1
.gitignore
vendored
1
.gitignore
vendored
@ -66,6 +66,7 @@ profile.tmp
|
||||
.DS_Store
|
||||
|
||||
.vscode
|
||||
*.code-workspace
|
||||
|
||||
# Coverage test
|
||||
coverage.txt
|
||||
|
@ -2,6 +2,7 @@ package contractcourt
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"testing"
|
||||
@ -145,17 +146,15 @@ func TestChainWatcherRemoteUnilateralClosePendingCommit(t *testing.T) {
|
||||
|
||||
// With the HTLC added, we'll now manually initiate a state transition
|
||||
// from Alice to Bob.
|
||||
_, err = aliceChannel.SignNextCommitment()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
testQuit, testQuitFunc := context.WithCancel(context.Background())
|
||||
t.Cleanup(testQuitFunc)
|
||||
_, err = aliceChannel.SignNextCommitment(testQuit)
|
||||
require.NoError(t, err)
|
||||
|
||||
// At this point, we'll now Bob broadcasting this new pending unrevoked
|
||||
// commitment.
|
||||
bobPendingCommit, err := aliceChannel.State().RemoteCommitChainTip()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
// We'll craft a fake spend notification with Bob's actual commitment.
|
||||
// The chain watcher should be able to detect that this is a pending
|
||||
|
@ -568,7 +568,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
|
||||
&lnwallet.MockAuxLeafStore{},
|
||||
),
|
||||
AuxSigner: fn.Some[lnwallet.AuxSigner](
|
||||
&lnwallet.MockAuxSigner{},
|
||||
lnwallet.NewAuxSignerMock(lnwallet.EmptyMockJobHandler),
|
||||
),
|
||||
}
|
||||
|
||||
|
2
go.mod
2
go.mod
@ -35,7 +35,7 @@ require (
|
||||
github.com/lightningnetwork/lightning-onion v1.2.1-0.20240712235311-98bd56499dfb
|
||||
github.com/lightningnetwork/lnd/cert v1.2.2
|
||||
github.com/lightningnetwork/lnd/clock v1.1.1
|
||||
github.com/lightningnetwork/lnd/fn v1.2.2
|
||||
github.com/lightningnetwork/lnd/fn v1.2.3
|
||||
github.com/lightningnetwork/lnd/healthcheck v1.2.5
|
||||
github.com/lightningnetwork/lnd/kvdb v1.4.10
|
||||
github.com/lightningnetwork/lnd/queue v1.1.1
|
||||
|
4
go.sum
4
go.sum
@ -453,8 +453,8 @@ github.com/lightningnetwork/lnd/cert v1.2.2 h1:71YK6hogeJtxSxw2teq3eGeuy4rHGKcFf
|
||||
github.com/lightningnetwork/lnd/cert v1.2.2/go.mod h1:jQmFn/Ez4zhDgq2hnYSw8r35bqGVxViXhX6Cd7HXM6U=
|
||||
github.com/lightningnetwork/lnd/clock v1.1.1 h1:OfR3/zcJd2RhH0RU+zX/77c0ZiOnIMsDIBjgjWdZgA0=
|
||||
github.com/lightningnetwork/lnd/clock v1.1.1/go.mod h1:mGnAhPyjYZQJmebS7aevElXKTFDuO+uNFFfMXK1W8xQ=
|
||||
github.com/lightningnetwork/lnd/fn v1.2.2 h1:rVtmGW1cQTmYce2XdUbRcc5qLDxqu+aQ6IGRpyspakk=
|
||||
github.com/lightningnetwork/lnd/fn v1.2.2/go.mod h1:SyFohpVrARPKH3XVAJZlXdVe+IwMYc4OMAvrDY32kw0=
|
||||
github.com/lightningnetwork/lnd/fn v1.2.3 h1:Q1OrgNSgQynVheBNa16CsKVov1JI5N2AR6G07x9Mles=
|
||||
github.com/lightningnetwork/lnd/fn v1.2.3/go.mod h1:SyFohpVrARPKH3XVAJZlXdVe+IwMYc4OMAvrDY32kw0=
|
||||
github.com/lightningnetwork/lnd/healthcheck v1.2.5 h1:aTJy5xeBpcWgRtW/PGBDe+LMQEmNm/HQewlQx2jt7OA=
|
||||
github.com/lightningnetwork/lnd/healthcheck v1.2.5/go.mod h1:G7Tst2tVvWo7cx6mSBEToQC5L1XOGxzZTPB29g9Rv2I=
|
||||
github.com/lightningnetwork/lnd/kvdb v1.4.10 h1:vK89IVv1oVH9ubQWU+EmoCQFeVRaC8kfmOrqHbY5zoY=
|
||||
|
@ -95,7 +95,7 @@ type InterceptableSwitch struct {
|
||||
|
||||
type interceptedPackets struct {
|
||||
packets []*htlcPacket
|
||||
linkQuit chan struct{}
|
||||
linkQuit <-chan struct{}
|
||||
isReplay bool
|
||||
}
|
||||
|
||||
@ -465,8 +465,8 @@ func (s *InterceptableSwitch) Resolve(res *FwdResolution) error {
|
||||
// interceptor. If the interceptor signals the resume action, the htlcs are
|
||||
// forwarded to the switch. The link's quit signal should be provided to allow
|
||||
// cancellation of forwarding during link shutdown.
|
||||
func (s *InterceptableSwitch) ForwardPackets(linkQuit chan struct{}, isReplay bool,
|
||||
packets ...*htlcPacket) error {
|
||||
func (s *InterceptableSwitch) ForwardPackets(linkQuit <-chan struct{},
|
||||
isReplay bool, packets ...*htlcPacket) error {
|
||||
|
||||
// Synchronize with the main event loop. This should be light in the
|
||||
// case where there is no interceptor.
|
||||
|
@ -101,7 +101,7 @@ type ChannelLinkConfig struct {
|
||||
// switch. The function returns and error in case it fails to send one or
|
||||
// more packets. The link's quit signal should be provided to allow
|
||||
// cancellation of forwarding during link shutdown.
|
||||
ForwardPackets func(chan struct{}, bool, ...*htlcPacket) error
|
||||
ForwardPackets func(<-chan struct{}, bool, ...*htlcPacket) error
|
||||
|
||||
// DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion
|
||||
// blobs, which are then used to inform how to forward an HTLC.
|
||||
@ -387,8 +387,10 @@ type channelLink struct {
|
||||
// our next CommitSig.
|
||||
incomingCommitHooks hookMap
|
||||
|
||||
wg sync.WaitGroup
|
||||
quit chan struct{}
|
||||
// ContextGuard is a helper that encapsulates a wait group and quit
|
||||
// channel and allows contexts that either block or cancel on those
|
||||
// depending on the use case.
|
||||
*fn.ContextGuard
|
||||
}
|
||||
|
||||
// hookMap is a data structure that is used to track the hooks that need to be
|
||||
@ -469,7 +471,7 @@ func NewChannelLink(cfg ChannelLinkConfig,
|
||||
flushHooks: newHookMap(),
|
||||
outgoingCommitHooks: newHookMap(),
|
||||
incomingCommitHooks: newHookMap(),
|
||||
quit: make(chan struct{}),
|
||||
ContextGuard: fn.NewContextGuard(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -548,7 +550,7 @@ func (l *channelLink) Start() error {
|
||||
|
||||
l.updateFeeTimer = time.NewTimer(l.randomFeeUpdateTimeout())
|
||||
|
||||
l.wg.Add(1)
|
||||
l.Wg.Add(1)
|
||||
go l.htlcManager()
|
||||
|
||||
return nil
|
||||
@ -588,8 +590,8 @@ func (l *channelLink) Stop() {
|
||||
l.hodlQueue.Stop()
|
||||
}
|
||||
|
||||
close(l.quit)
|
||||
l.wg.Wait()
|
||||
close(l.Quit)
|
||||
l.Wg.Wait()
|
||||
|
||||
// Now that the htlcManager has completely exited, reset the packet
|
||||
// courier. This allows the mailbox to revaluate any lingering Adds that
|
||||
@ -614,7 +616,7 @@ func (l *channelLink) Stop() {
|
||||
// WaitForShutdown blocks until the link finishes shutting down, which includes
|
||||
// termination of all dependent goroutines.
|
||||
func (l *channelLink) WaitForShutdown() {
|
||||
l.wg.Wait()
|
||||
l.Wg.Wait()
|
||||
}
|
||||
|
||||
// EligibleToForward returns a bool indicating if the channel is able to
|
||||
@ -675,7 +677,7 @@ func (l *channelLink) IsFlushing(linkDirection LinkDirection) bool {
|
||||
func (l *channelLink) OnFlushedOnce(hook func()) {
|
||||
select {
|
||||
case l.flushHooks.newTransients <- hook:
|
||||
case <-l.quit:
|
||||
case <-l.Quit:
|
||||
}
|
||||
}
|
||||
|
||||
@ -694,7 +696,7 @@ func (l *channelLink) OnCommitOnce(direction LinkDirection, hook func()) {
|
||||
|
||||
select {
|
||||
case queue <- hook:
|
||||
case <-l.quit:
|
||||
case <-l.Quit:
|
||||
}
|
||||
}
|
||||
|
||||
@ -903,8 +905,10 @@ func (l *channelLink) syncChanStates() error {
|
||||
// We've just received a ChanSync message from the remote
|
||||
// party, so we'll process the message in order to determine
|
||||
// if we need to re-transmit any messages to the remote party.
|
||||
ctx, cancel := l.WithCtxQuitNoTimeout()
|
||||
defer cancel()
|
||||
msgsToReSend, openedCircuits, closedCircuits, err =
|
||||
l.channel.ProcessChanSyncMsg(remoteChanSyncMsg)
|
||||
l.channel.ProcessChanSyncMsg(ctx, remoteChanSyncMsg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -933,7 +937,7 @@ func (l *channelLink) syncChanStates() error {
|
||||
l.cfg.Peer.SendMessage(false, msg)
|
||||
}
|
||||
|
||||
case <-l.quit:
|
||||
case <-l.Quit:
|
||||
return ErrLinkShuttingDown
|
||||
}
|
||||
|
||||
@ -1023,7 +1027,7 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error {
|
||||
//
|
||||
// NOTE: This MUST be run as a goroutine.
|
||||
func (l *channelLink) fwdPkgGarbager() {
|
||||
defer l.wg.Done()
|
||||
defer l.Wg.Done()
|
||||
|
||||
l.cfg.FwdPkgGCTicker.Resume()
|
||||
defer l.cfg.FwdPkgGCTicker.Stop()
|
||||
@ -1040,7 +1044,7 @@ func (l *channelLink) fwdPkgGarbager() {
|
||||
err)
|
||||
continue
|
||||
}
|
||||
case <-l.quit:
|
||||
case <-l.Quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -1163,7 +1167,7 @@ func (l *channelLink) handleChanSyncErr(err error) {
|
||||
func (l *channelLink) htlcManager() {
|
||||
defer func() {
|
||||
l.cfg.BatchTicker.Stop()
|
||||
l.wg.Done()
|
||||
l.Wg.Done()
|
||||
l.log.Infof("exited")
|
||||
}()
|
||||
|
||||
@ -1257,7 +1261,7 @@ func (l *channelLink) htlcManager() {
|
||||
// With our link's in-memory state fully reconstructed, spawn a
|
||||
// goroutine to manage the reclamation of disk space occupied by
|
||||
// completed forwarding packages.
|
||||
l.wg.Add(1)
|
||||
l.Wg.Add(1)
|
||||
go l.fwdPkgGarbager()
|
||||
}
|
||||
|
||||
@ -1441,7 +1445,7 @@ func (l *channelLink) htlcManager() {
|
||||
)
|
||||
}
|
||||
|
||||
case <-l.quit:
|
||||
case <-l.Quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -2299,7 +2303,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
|
||||
}
|
||||
|
||||
select {
|
||||
case <-l.quit:
|
||||
case <-l.Quit:
|
||||
return
|
||||
default:
|
||||
}
|
||||
@ -2360,7 +2364,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
|
||||
}
|
||||
|
||||
select {
|
||||
case <-l.quit:
|
||||
case <-l.Quit:
|
||||
return
|
||||
default:
|
||||
}
|
||||
@ -2590,7 +2594,10 @@ func (l *channelLink) updateCommitTx() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
newCommit, err := l.channel.SignNextCommitment()
|
||||
ctx, done := l.WithCtxQuitNoTimeout()
|
||||
defer done()
|
||||
|
||||
newCommit, err := l.channel.SignNextCommitment(ctx)
|
||||
if err == lnwallet.ErrNoWindow {
|
||||
l.cfg.PendingCommitTicker.Resume()
|
||||
l.log.Trace("PendingCommitTicker resumed")
|
||||
@ -2627,7 +2634,7 @@ func (l *channelLink) updateCommitTx() error {
|
||||
}
|
||||
|
||||
select {
|
||||
case <-l.quit:
|
||||
case <-l.Quit:
|
||||
return ErrLinkShuttingDown
|
||||
default:
|
||||
}
|
||||
@ -3233,7 +3240,7 @@ func (l *channelLink) handleSwitchPacket(pkt *htlcPacket) error {
|
||||
// NOTE: Part of the ChannelLink interface.
|
||||
func (l *channelLink) HandleChannelUpdate(message lnwire.Message) {
|
||||
select {
|
||||
case <-l.quit:
|
||||
case <-l.Quit:
|
||||
// Return early if the link is already in the process of
|
||||
// quitting. It doesn't make sense to hand the message to the
|
||||
// mailbox here.
|
||||
@ -3932,7 +3939,7 @@ func (l *channelLink) forwardBatch(replay bool, packets ...*htlcPacket) {
|
||||
filteredPkts = append(filteredPkts, pkt)
|
||||
}
|
||||
|
||||
err := l.cfg.ForwardPackets(l.quit, replay, filteredPkts...)
|
||||
err := l.cfg.ForwardPackets(l.Quit, replay, filteredPkts...)
|
||||
if err != nil {
|
||||
log.Errorf("Unhandled error while reforwarding htlc "+
|
||||
"settle/fail over htlcswitch: %v", err)
|
||||
|
@ -1,6 +1,7 @@
|
||||
package htlcswitch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"testing"
|
||||
"time"
|
||||
@ -94,7 +95,9 @@ func (l *linkTestContext) receiveHtlcAliceToBob() {
|
||||
func (l *linkTestContext) sendCommitSigBobToAlice(expHtlcs int) {
|
||||
l.t.Helper()
|
||||
|
||||
sigs, err := l.bobChannel.SignNextCommitment()
|
||||
testQuit, testQuitFunc := context.WithCancel(context.Background())
|
||||
defer testQuitFunc()
|
||||
sigs, err := l.bobChannel.SignNextCommitment(testQuit)
|
||||
if err != nil {
|
||||
l.t.Fatalf("error signing commitment: %v", err)
|
||||
}
|
||||
|
@ -2197,17 +2197,21 @@ func newSingleLinkTestHarness(t *testing.T, chanAmt,
|
||||
return nil
|
||||
}
|
||||
|
||||
forwardPackets := func(linkQuit <-chan struct{}, _ bool,
|
||||
packets ...*htlcPacket) error {
|
||||
|
||||
return aliceSwitch.ForwardPackets(linkQuit, packets...)
|
||||
}
|
||||
|
||||
// Instantiate with a long interval, so that we can precisely control
|
||||
// the firing via force feeding.
|
||||
bticker := ticker.NewForce(time.Hour)
|
||||
aliceCfg := ChannelLinkConfig{
|
||||
FwrdingPolicy: globalPolicy,
|
||||
Peer: alicePeer,
|
||||
BestHeight: aliceSwitch.BestHeight,
|
||||
Circuits: aliceSwitch.CircuitModifier(),
|
||||
ForwardPackets: func(linkQuit chan struct{}, _ bool, packets ...*htlcPacket) error {
|
||||
return aliceSwitch.ForwardPackets(linkQuit, packets...)
|
||||
},
|
||||
FwrdingPolicy: globalPolicy,
|
||||
Peer: alicePeer,
|
||||
BestHeight: aliceSwitch.BestHeight,
|
||||
Circuits: aliceSwitch.CircuitModifier(),
|
||||
ForwardPackets: forwardPackets,
|
||||
DecodeHopIterators: decoder.DecodeHopIterators,
|
||||
ExtractErrorEncrypter: func(*btcec.PublicKey) (
|
||||
hop.ErrorEncrypter, lnwire.FailCode) {
|
||||
@ -2248,12 +2252,14 @@ func newSingleLinkTestHarness(t *testing.T, chanAmt,
|
||||
return aliceSwitch.AddLink(aliceLink)
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-notifyUpdateChan:
|
||||
case <-aliceLink.(*channelLink).quit:
|
||||
close(doneChan)
|
||||
return
|
||||
if chanLink, ok := aliceLink.(*channelLink); ok {
|
||||
for {
|
||||
select {
|
||||
case <-notifyUpdateChan:
|
||||
case <-chanLink.Quit:
|
||||
close(doneChan)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
@ -2320,7 +2326,10 @@ func handleStateUpdate(link *channelLink,
|
||||
}
|
||||
link.HandleChannelUpdate(remoteRev)
|
||||
|
||||
remoteSigs, err := remoteChannel.SignNextCommitment()
|
||||
ctx, done := link.WithCtxQuitNoTimeout()
|
||||
defer done()
|
||||
|
||||
remoteSigs, err := remoteChannel.SignNextCommitment(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -2363,7 +2372,7 @@ func updateState(batchTick chan time.Time, link *channelLink,
|
||||
// Trigger update by ticking the batchTicker.
|
||||
select {
|
||||
case batchTick <- time.Now():
|
||||
case <-link.quit:
|
||||
case <-link.Quit:
|
||||
return fmt.Errorf("link shutting down")
|
||||
}
|
||||
return handleStateUpdate(link, remoteChannel)
|
||||
@ -2371,7 +2380,10 @@ func updateState(batchTick chan time.Time, link *channelLink,
|
||||
|
||||
// The remote is triggering the state update, emulate this by
|
||||
// signing and sending CommitSig to the link.
|
||||
remoteSigs, err := remoteChannel.SignNextCommitment()
|
||||
ctx, done := link.WithCtxQuitNoTimeout()
|
||||
defer done()
|
||||
|
||||
remoteSigs, err := remoteChannel.SignNextCommitment(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -4867,17 +4879,21 @@ func (h *persistentLinkHarness) restartLink(
|
||||
return nil
|
||||
}
|
||||
|
||||
forwardPackets := func(linkQuit <-chan struct{}, _ bool,
|
||||
packets ...*htlcPacket) error {
|
||||
|
||||
return h.hSwitch.ForwardPackets(linkQuit, packets...)
|
||||
}
|
||||
|
||||
// Instantiate with a long interval, so that we can precisely control
|
||||
// the firing via force feeding.
|
||||
bticker := ticker.NewForce(time.Hour)
|
||||
aliceCfg := ChannelLinkConfig{
|
||||
FwrdingPolicy: globalPolicy,
|
||||
Peer: alicePeer,
|
||||
BestHeight: h.hSwitch.BestHeight,
|
||||
Circuits: h.hSwitch.CircuitModifier(),
|
||||
ForwardPackets: func(linkQuit chan struct{}, _ bool, packets ...*htlcPacket) error {
|
||||
return h.hSwitch.ForwardPackets(linkQuit, packets...)
|
||||
},
|
||||
FwrdingPolicy: globalPolicy,
|
||||
Peer: alicePeer,
|
||||
BestHeight: h.hSwitch.BestHeight,
|
||||
Circuits: h.hSwitch.CircuitModifier(),
|
||||
ForwardPackets: forwardPackets,
|
||||
DecodeHopIterators: decoder.DecodeHopIterators,
|
||||
ExtractErrorEncrypter: func(*btcec.PublicKey) (
|
||||
hop.ErrorEncrypter, lnwire.FailCode) {
|
||||
@ -4923,12 +4939,14 @@ func (h *persistentLinkHarness) restartLink(
|
||||
return nil, nil, err
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-notifyUpdateChan:
|
||||
case <-aliceLink.(*channelLink).quit:
|
||||
close(doneChan)
|
||||
return
|
||||
if chanLink, ok := aliceLink.(*channelLink); ok {
|
||||
for {
|
||||
select {
|
||||
case <-notifyUpdateChan:
|
||||
case <-chanLink.Quit:
|
||||
close(doneChan)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
@ -5911,7 +5929,12 @@ func TestChannelLinkFail(t *testing.T) {
|
||||
|
||||
// Sign a commitment that will include
|
||||
// signature for the HTLC just sent.
|
||||
sigs, err := remoteChannel.SignNextCommitment()
|
||||
quitCtx, done := c.WithCtxQuitNoTimeout()
|
||||
defer done()
|
||||
|
||||
sigs, err := remoteChannel.SignNextCommitment(
|
||||
quitCtx,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("error signing commitment: %v",
|
||||
err)
|
||||
@ -5953,7 +5976,12 @@ func TestChannelLinkFail(t *testing.T) {
|
||||
|
||||
// Sign a commitment that will include
|
||||
// signature for the HTLC just sent.
|
||||
sigs, err := remoteChannel.SignNextCommitment()
|
||||
quitCtx, done := c.WithCtxQuitNoTimeout()
|
||||
defer done()
|
||||
|
||||
sigs, err := remoteChannel.SignNextCommitment(
|
||||
quitCtx,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("error signing commitment: %v",
|
||||
err)
|
||||
@ -7037,7 +7065,7 @@ func TestPipelineSettle(t *testing.T) {
|
||||
// erroneously forwarded. If the forwardChan is closed before the last
|
||||
// step, then the test will fail.
|
||||
forwardChan := make(chan struct{})
|
||||
fwdPkts := func(c chan struct{}, _ bool, hp ...*htlcPacket) error {
|
||||
fwdPkts := func(c <-chan struct{}, _ bool, hp ...*htlcPacket) error {
|
||||
close(forwardChan)
|
||||
return nil
|
||||
}
|
||||
@ -7223,7 +7251,7 @@ func TestChannelLinkShortFailureRelay(t *testing.T) {
|
||||
aliceMsgs := mockPeer.sentMsgs
|
||||
switchChan := make(chan *htlcPacket)
|
||||
|
||||
coreLink.cfg.ForwardPackets = func(linkQuit chan struct{}, _ bool,
|
||||
coreLink.cfg.ForwardPackets = func(linkQuit <-chan struct{}, _ bool,
|
||||
packets ...*htlcPacket) error {
|
||||
|
||||
for _, p := range packets {
|
||||
|
@ -95,7 +95,7 @@ type mailBoxConfig struct {
|
||||
// forwardPackets send a varidic number of htlcPackets to the switch to
|
||||
// be routed. A quit channel should be provided so that the call can
|
||||
// properly exit during shutdown.
|
||||
forwardPackets func(chan struct{}, ...*htlcPacket) error
|
||||
forwardPackets func(<-chan struct{}, ...*htlcPacket) error
|
||||
|
||||
// clock is a time source for the mailbox.
|
||||
clock clock.Clock
|
||||
@ -804,7 +804,7 @@ type mailOrchConfig struct {
|
||||
// forwardPackets send a varidic number of htlcPackets to the switch to
|
||||
// be routed. A quit channel should be provided so that the call can
|
||||
// properly exit during shutdown.
|
||||
forwardPackets func(chan struct{}, ...*htlcPacket) error
|
||||
forwardPackets func(<-chan struct{}, ...*htlcPacket) error
|
||||
|
||||
// clock is a time source for the generated mailboxes.
|
||||
clock clock.Clock
|
||||
|
@ -250,7 +250,7 @@ func newMailboxContext(t *testing.T, startTime time.Time,
|
||||
return ctx
|
||||
}
|
||||
|
||||
func (c *mailboxContext) forward(_ chan struct{},
|
||||
func (c *mailboxContext) forward(_ <-chan struct{},
|
||||
pkts ...*htlcPacket) error {
|
||||
|
||||
for _, pkt := range pkts {
|
||||
@ -706,7 +706,7 @@ func TestMailOrchestrator(t *testing.T) {
|
||||
// First, we'll create a new instance of our orchestrator.
|
||||
mo := newMailOrchestrator(&mailOrchConfig{
|
||||
failMailboxUpdate: failMailboxUpdate,
|
||||
forwardPackets: func(_ chan struct{},
|
||||
forwardPackets: func(_ <-chan struct{},
|
||||
pkts ...*htlcPacket) error {
|
||||
|
||||
return nil
|
||||
|
@ -672,7 +672,7 @@ func (s *Switch) IsForwardedHTLC(chanID lnwire.ShortChannelID,
|
||||
// given to forward them through the router. The sending link's quit channel is
|
||||
// used to prevent deadlocks when the switch stops a link in the midst of
|
||||
// forwarding.
|
||||
func (s *Switch) ForwardPackets(linkQuit chan struct{},
|
||||
func (s *Switch) ForwardPackets(linkQuit <-chan struct{},
|
||||
packets ...*htlcPacket) error {
|
||||
|
||||
var (
|
||||
@ -850,7 +850,7 @@ func (s *Switch) logFwdErrs(num *int, wg *sync.WaitGroup, fwdChan chan error) {
|
||||
// receive a shutdown requuest. This method does not wait for a response from
|
||||
// the htlcForwarder before returning.
|
||||
func (s *Switch) routeAsync(packet *htlcPacket, errChan chan error,
|
||||
linkQuit chan struct{}) error {
|
||||
linkQuit <-chan struct{}) error {
|
||||
|
||||
command := &plexPacket{
|
||||
pkt: packet,
|
||||
|
@ -1144,15 +1144,19 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer,
|
||||
return nil
|
||||
}
|
||||
|
||||
forwardPackets := func(linkQuit <-chan struct{}, _ bool,
|
||||
packets ...*htlcPacket) error {
|
||||
|
||||
return server.htlcSwitch.ForwardPackets(linkQuit, packets...)
|
||||
}
|
||||
|
||||
link := NewChannelLink(
|
||||
ChannelLinkConfig{
|
||||
BestHeight: server.htlcSwitch.BestHeight,
|
||||
FwrdingPolicy: h.globalPolicy,
|
||||
Peer: peer,
|
||||
Circuits: server.htlcSwitch.CircuitModifier(),
|
||||
ForwardPackets: func(linkQuit chan struct{}, _ bool, packets ...*htlcPacket) error {
|
||||
return server.htlcSwitch.ForwardPackets(linkQuit, packets...)
|
||||
},
|
||||
BestHeight: server.htlcSwitch.BestHeight,
|
||||
FwrdingPolicy: h.globalPolicy,
|
||||
Peer: peer,
|
||||
Circuits: server.htlcSwitch.CircuitModifier(),
|
||||
ForwardPackets: forwardPackets,
|
||||
DecodeHopIterators: decoder.DecodeHopIterators,
|
||||
ExtractErrorEncrypter: func(*btcec.PublicKey) (
|
||||
hop.ErrorEncrypter, lnwire.FailCode) {
|
||||
@ -1193,12 +1197,14 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer,
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-notifyUpdateChan:
|
||||
case <-link.(*channelLink).quit:
|
||||
close(doneChan)
|
||||
return
|
||||
if chanLink, ok := link.(*channelLink); ok {
|
||||
for {
|
||||
select {
|
||||
case <-notifyUpdateChan:
|
||||
case <-chanLink.Quit:
|
||||
close(doneChan)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
@ -2,6 +2,8 @@ package lnwallet
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"cmp"
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"fmt"
|
||||
@ -137,6 +139,10 @@ var (
|
||||
// errNoPartialSig is returned when a partial signature is required,
|
||||
// but none is found.
|
||||
errNoPartialSig = errors.New("no partial signature found")
|
||||
|
||||
// errQuit is returned when a quit signal was received, interrupting the
|
||||
// current operation.
|
||||
errQuit = errors.New("received quit signal")
|
||||
)
|
||||
|
||||
// ErrCommitSyncLocalDataLoss is returned in the case that we receive a valid
|
||||
@ -3892,7 +3898,9 @@ type NewCommitState struct {
|
||||
// for the remote party's commitment are also returned.
|
||||
//
|
||||
//nolint:funlen
|
||||
func (lc *LightningChannel) SignNextCommitment() (*NewCommitState, error) {
|
||||
func (lc *LightningChannel) SignNextCommitment(
|
||||
ctx context.Context) (*NewCommitState, error) {
|
||||
|
||||
lc.Lock()
|
||||
defer lc.Unlock()
|
||||
|
||||
@ -3996,10 +4004,10 @@ func (lc *LightningChannel) SignNextCommitment() (*NewCommitState, error) {
|
||||
// order as they appear on the commitment transaction after BIP 69
|
||||
// sorting.
|
||||
slices.SortFunc(sigBatch, func(i, j SignJob) int {
|
||||
return int(i.OutputIndex - j.OutputIndex)
|
||||
return cmp.Compare(i.OutputIndex, j.OutputIndex)
|
||||
})
|
||||
slices.SortFunc(auxSigBatch, func(i, j AuxSigJob) int {
|
||||
return int(i.OutputIndex - j.OutputIndex)
|
||||
return cmp.Compare(i.OutputIndex, j.OutputIndex)
|
||||
})
|
||||
|
||||
lc.sigPool.SubmitSignBatch(sigBatch)
|
||||
@ -4058,7 +4066,13 @@ func (lc *LightningChannel) SignNextCommitment() (*NewCommitState, error) {
|
||||
auxSigs := make([]fn.Option[tlv.Blob], 0, len(auxSigBatch))
|
||||
for i := range sigBatch {
|
||||
htlcSigJob := sigBatch[i]
|
||||
jobResp := <-htlcSigJob.Resp
|
||||
var jobResp SignJobResp
|
||||
|
||||
select {
|
||||
case jobResp = <-htlcSigJob.Resp:
|
||||
case <-ctx.Done():
|
||||
return nil, errQuit
|
||||
}
|
||||
|
||||
// If an error occurred, then we'll cancel any other active
|
||||
// jobs.
|
||||
@ -4074,7 +4088,13 @@ func (lc *LightningChannel) SignNextCommitment() (*NewCommitState, error) {
|
||||
}
|
||||
|
||||
auxHtlcSigJob := auxSigBatch[i]
|
||||
auxJobResp := <-auxHtlcSigJob.Resp
|
||||
var auxJobResp AuxSigJobResp
|
||||
|
||||
select {
|
||||
case auxJobResp = <-auxHtlcSigJob.Resp:
|
||||
case <-ctx.Done():
|
||||
return nil, errQuit
|
||||
}
|
||||
|
||||
// If an error occurred, then we'll cancel any other active
|
||||
// jobs.
|
||||
@ -4168,7 +4188,9 @@ func (lc *LightningChannel) resignMusigCommit(
|
||||
// previous commitment txn. This allows the link to clear its mailbox of those
|
||||
// circuits in case they are still in memory, and ensure the switch's circuit
|
||||
// map has been updated by deleting the closed circuits.
|
||||
func (lc *LightningChannel) ProcessChanSyncMsg(
|
||||
//
|
||||
//nolint:funlen
|
||||
func (lc *LightningChannel) ProcessChanSyncMsg(ctx context.Context,
|
||||
msg *lnwire.ChannelReestablish) ([]lnwire.Message, []models.CircuitKey,
|
||||
[]models.CircuitKey, error) {
|
||||
|
||||
@ -4332,7 +4354,7 @@ func (lc *LightningChannel) ProcessChanSyncMsg(
|
||||
// revocation, but also initiate a state transition to re-sync
|
||||
// them.
|
||||
if lc.OweCommitment() {
|
||||
newCommit, err := lc.SignNextCommitment()
|
||||
newCommit, err := lc.SignNextCommitment(ctx)
|
||||
switch {
|
||||
|
||||
// If we signed this state, then we'll accumulate
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -443,9 +443,26 @@ func (*MockAuxLeafStore) ApplyHtlcView(
|
||||
return fn.Ok(fn.None[tlv.Blob]())
|
||||
}
|
||||
|
||||
// EmptyMockJobHandler is a mock job handler that just sends an empty response
|
||||
// to all jobs.
|
||||
func EmptyMockJobHandler(jobs []AuxSigJob) {
|
||||
for _, sigJob := range jobs {
|
||||
sigJob.Resp <- AuxSigJobResp{}
|
||||
}
|
||||
}
|
||||
|
||||
// MockAuxSigner is a mock implementation of the AuxSigner interface.
|
||||
type MockAuxSigner struct {
|
||||
mock.Mock
|
||||
|
||||
jobHandlerFunc func([]AuxSigJob)
|
||||
}
|
||||
|
||||
// NewAuxSignerMock creates a new mock aux signer with the given job handler.
|
||||
func NewAuxSignerMock(jobHandler func([]AuxSigJob)) *MockAuxSigner {
|
||||
return &MockAuxSigner{
|
||||
jobHandlerFunc: jobHandler,
|
||||
}
|
||||
}
|
||||
|
||||
// SubmitSecondLevelSigBatch takes a batch of aux sign jobs and
|
||||
@ -455,10 +472,8 @@ func (a *MockAuxSigner) SubmitSecondLevelSigBatch(chanState AuxChanState,
|
||||
|
||||
args := a.Called(chanState, tx, jobs)
|
||||
|
||||
// While we return, we'll also send back an instant response for the
|
||||
// set of jobs.
|
||||
for _, sigJob := range jobs {
|
||||
sigJob.Resp <- AuxSigJobResp{}
|
||||
if a.jobHandlerFunc != nil {
|
||||
a.jobHandlerFunc(jobs)
|
||||
}
|
||||
|
||||
return args.Error(0)
|
||||
|
@ -2,6 +2,7 @@ package lnwallet
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
@ -103,6 +104,10 @@ var (
|
||||
bobDustLimit = btcutil.Amount(1300)
|
||||
|
||||
testChannelCapacity float64 = 10
|
||||
|
||||
// ctxb is a context that will never be cancelled, that is used in
|
||||
// place of a real quit context.
|
||||
ctxb = context.Background()
|
||||
)
|
||||
|
||||
// CreateTestChannels creates to fully populated channels to be used within
|
||||
@ -557,7 +562,7 @@ func calcStaticFee(chanType channeldb.ChannelType, numHTLCs int) btcutil.Amount
|
||||
// pending updates. This method is useful when testing interactions between two
|
||||
// live state machines.
|
||||
func ForceStateTransition(chanA, chanB *LightningChannel) error {
|
||||
aliceNewCommit, err := chanA.SignNextCommitment()
|
||||
aliceNewCommit, err := chanA.SignNextCommitment(ctxb)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -570,7 +575,7 @@ func ForceStateTransition(chanA, chanB *LightningChannel) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bobNewCommit, err := chanB.SignNextCommitment()
|
||||
bobNewCommit, err := chanB.SignNextCommitment(ctxb)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -597,7 +602,7 @@ func ForceStateTransition(chanA, chanB *LightningChannel) error {
|
||||
}
|
||||
|
||||
func NewDefaultAuxSignerMock(t *testing.T) *MockAuxSigner {
|
||||
auxSigner := &MockAuxSigner{}
|
||||
auxSigner := NewAuxSignerMock(EmptyMockJobHandler)
|
||||
|
||||
type testSigBlob struct {
|
||||
BlobInt tlv.RecordT[tlv.TlvType65634, uint16]
|
||||
|
@ -357,7 +357,7 @@ func testVectors(t *testing.T, chanType channeldb.ChannelType, test testCase) {
|
||||
|
||||
// Execute commit dance to arrive at the point where the local node has
|
||||
// received the test commitment and the remote signature.
|
||||
localNewCommit, err := localChannel.SignNextCommitment()
|
||||
localNewCommit, err := localChannel.SignNextCommitment(ctxb)
|
||||
require.NoError(t, err, "local unable to sign commitment")
|
||||
|
||||
err = remoteChannel.ReceiveNewCommitment(localNewCommit.CommitSigs)
|
||||
@ -369,7 +369,7 @@ func testVectors(t *testing.T, chanType channeldb.ChannelType, test testCase) {
|
||||
_, _, err = localChannel.ReceiveRevocation(revMsg)
|
||||
require.NoError(t, err)
|
||||
|
||||
remoteNewCommit, err := remoteChannel.SignNextCommitment()
|
||||
remoteNewCommit, err := remoteChannel.SignNextCommitment(ctxb)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(
|
||||
|
@ -305,7 +305,9 @@ func createTestPeerWithChannel(t *testing.T, updateChan func(a,
|
||||
channelAlice, err := lnwallet.NewLightningChannel(
|
||||
aliceSigner, aliceChannelState, alicePool,
|
||||
lnwallet.WithLeafStore(&lnwallet.MockAuxLeafStore{}),
|
||||
lnwallet.WithAuxSigner(&lnwallet.MockAuxSigner{}),
|
||||
lnwallet.WithAuxSigner(lnwallet.NewAuxSignerMock(
|
||||
lnwallet.EmptyMockJobHandler,
|
||||
)),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -319,7 +321,9 @@ func createTestPeerWithChannel(t *testing.T, updateChan func(a,
|
||||
channelBob, err := lnwallet.NewLightningChannel(
|
||||
bobSigner, bobChannelState, bobPool,
|
||||
lnwallet.WithLeafStore(&lnwallet.MockAuxLeafStore{}),
|
||||
lnwallet.WithAuxSigner(&lnwallet.MockAuxSigner{}),
|
||||
lnwallet.WithAuxSigner(lnwallet.NewAuxSignerMock(
|
||||
lnwallet.EmptyMockJobHandler,
|
||||
)),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
Loading…
x
Reference in New Issue
Block a user