Merge pull request #6221 from Crypt-iQ/link_handoff

multi: reliable hand-off from htlcswitch to contractcourt
This commit is contained in:
Olaoluwa Osuntokun
2022-02-24 17:01:36 -08:00
committed by GitHub
8 changed files with 199 additions and 75 deletions

View File

@@ -187,11 +187,14 @@ type ChannelLinkConfig struct {
LinkFailureError)
// UpdateContractSignals is a function closure that we'll use to update
// outside sub-systems with the latest signals for our inner Lightning
// channel. These signals will notify the caller when the channel has
// been closed, or when the set of active HTLC's is updated.
// outside sub-systems with this channel's latest ShortChannelID.
UpdateContractSignals func(*contractcourt.ContractSignals) error
// NotifyContractUpdate is a function closure that we'll use to update
// the contractcourt and more specifically the ChannelArbitrator of the
// latest channel state.
NotifyContractUpdate func(*contractcourt.ContractUpdate) error
// ChainEvents is an active subscription to the chain watcher for this
// channel to be notified of any on-chain activity related to this
// channel.
@@ -372,10 +375,6 @@ type channelLink struct {
// sent across.
localUpdateAdd chan *localUpdateAddMsg
// htlcUpdates is a channel that we'll use to update outside
// sub-systems with the latest set of active HTLC's on our channel.
htlcUpdates chan *contractcourt.ContractUpdate
// shutdownRequest is a channel that the channelLink will listen on to
// service shutdown requests from ShutdownIfChannelClean calls.
shutdownRequest chan *shutdownReq
@@ -421,11 +420,9 @@ func NewChannelLink(cfg ChannelLinkConfig,
logPrefix := fmt.Sprintf("ChannelLink(%v):", channel.ChannelPoint())
return &channelLink{
cfg: cfg,
channel: channel,
shortChanID: channel.ShortChanID(),
// TODO(roasbeef): just do reserve here?
htlcUpdates: make(chan *contractcourt.ContractUpdate),
cfg: cfg,
channel: channel,
shortChanID: channel.ShortChanID(),
shutdownRequest: make(chan *shutdownReq),
hodlMap: make(map[channeldb.CircuitKey]hodlHtlc),
hodlQueue: queue.NewConcurrentQueue(10),
@@ -496,7 +493,6 @@ func (l *channelLink) Start() error {
// TODO(roasbeef): split goroutines within channel arb to avoid
go func() {
signals := &contractcourt.ContractSignals{
HtlcUpdates: l.htlcUpdates,
ShortChanID: l.channel.ShortChanID(),
}
@@ -1837,15 +1833,23 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
l.cfg.Peer.SendMessage(false, nextRevocation)
// Since we just revoked our commitment, we may have a new set
// of HTLC's on our commitment, so we'll send them over our
// HTLC update channel so any callers can be notified.
select {
case l.htlcUpdates <- &contractcourt.ContractUpdate{
// of HTLC's on our commitment, so we'll send them using our
// function closure NotifyContractUpdate.
newUpdate := &contractcourt.ContractUpdate{
HtlcKey: contractcourt.LocalHtlcSet,
Htlcs: currentHtlcs,
}:
}
err = l.cfg.NotifyContractUpdate(newUpdate)
if err != nil {
l.log.Errorf("unable to notify contract update: %v",
err)
return
}
select {
case <-l.quit:
return
default:
}
// If both commitment chains are fully synced from our PoV,
@@ -1879,13 +1883,21 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// The remote party now has a new primary commitment, so we'll
// update the contract court to be aware of this new set (the
// prior old remote pending).
select {
case l.htlcUpdates <- &contractcourt.ContractUpdate{
newUpdate := &contractcourt.ContractUpdate{
HtlcKey: contractcourt.RemoteHtlcSet,
Htlcs: remoteHTLCs,
}:
}
err = l.cfg.NotifyContractUpdate(newUpdate)
if err != nil {
l.log.Errorf("unable to notify contract update: %v",
err)
return
}
select {
case <-l.quit:
return
default:
}
// If we have a tower client for this channel type, we'll
@@ -2093,13 +2105,20 @@ func (l *channelLink) updateCommitTx() error {
// The remote party now has a new pending commitment, so we'll update
// the contract court to be aware of this new set (the prior old remote
// pending).
select {
case l.htlcUpdates <- &contractcourt.ContractUpdate{
newUpdate := &contractcourt.ContractUpdate{
HtlcKey: contractcourt.RemotePendingHtlcSet,
Htlcs: pendingHTLCs,
}:
}
err = l.cfg.NotifyContractUpdate(newUpdate)
if err != nil {
l.log.Errorf("unable to notify contract update: %v", err)
return err
}
select {
case <-l.quit:
return ErrLinkShuttingDown
default:
}
commitSig := &lnwire.CommitSig{
@@ -2167,7 +2186,6 @@ func (l *channelLink) UpdateShortChanID() (lnwire.ShortChannelID, error) {
go func() {
err := l.cfg.UpdateContractSignals(&contractcourt.ContractSignals{
HtlcUpdates: l.htlcUpdates,
ShortChanID: sid,
})
if err != nil {

View File

@@ -1944,6 +1944,17 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
return nil, nil, nil, nil, nil, nil, err
}
notifyUpdateChan := make(chan *contractcourt.ContractUpdate)
doneChan := make(chan struct{})
notifyContractUpdate := func(u *contractcourt.ContractUpdate) error {
select {
case notifyUpdateChan <- u:
case <-doneChan:
}
return nil
}
// Instantiate with a long interval, so that we can precisely control
// the firing via force feeding.
bticker := ticker.NewForce(time.Hour)
@@ -1967,12 +1978,13 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
UpdateContractSignals: func(*contractcourt.ContractSignals) error {
return nil
},
Registry: invoiceRegistry,
FeeEstimator: newMockFeeEstimator(),
ChainEvents: &contractcourt.ChainEventSubscription{},
BatchTicker: bticker,
FwdPkgGCTicker: ticker.NewForce(15 * time.Second),
PendingCommitTicker: ticker.New(time.Minute),
NotifyContractUpdate: notifyContractUpdate,
Registry: invoiceRegistry,
FeeEstimator: newMockFeeEstimator(),
ChainEvents: &contractcourt.ChainEventSubscription{},
BatchTicker: bticker,
FwdPkgGCTicker: ticker.NewForce(15 * time.Second),
PendingCommitTicker: ticker.New(time.Minute),
// Make the BatchSize and Min/MaxFeeUpdateTimeout large enough
// to not trigger commit updates automatically during tests.
BatchSize: 10000,
@@ -1993,8 +2005,9 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
go func() {
for {
select {
case <-aliceLink.(*channelLink).htlcUpdates:
case <-notifyUpdateChan:
case <-aliceLink.(*channelLink).quit:
close(doneChan)
return
}
}
@@ -4482,6 +4495,17 @@ func (h *persistentLinkHarness) restartLink(
}
}
notifyUpdateChan := make(chan *contractcourt.ContractUpdate)
doneChan := make(chan struct{})
notifyContractUpdate := func(u *contractcourt.ContractUpdate) error {
select {
case notifyUpdateChan <- u:
case <-doneChan:
}
return nil
}
// Instantiate with a long interval, so that we can precisely control
// the firing via force feeding.
bticker := ticker.NewForce(time.Hour)
@@ -4505,12 +4529,13 @@ func (h *persistentLinkHarness) restartLink(
UpdateContractSignals: func(*contractcourt.ContractSignals) error {
return nil
},
Registry: h.coreLink.cfg.Registry,
FeeEstimator: newMockFeeEstimator(),
ChainEvents: &contractcourt.ChainEventSubscription{},
BatchTicker: bticker,
FwdPkgGCTicker: ticker.New(5 * time.Second),
PendingCommitTicker: ticker.New(time.Minute),
NotifyContractUpdate: notifyContractUpdate,
Registry: h.coreLink.cfg.Registry,
FeeEstimator: newMockFeeEstimator(),
ChainEvents: &contractcourt.ChainEventSubscription{},
BatchTicker: bticker,
FwdPkgGCTicker: ticker.New(5 * time.Second),
PendingCommitTicker: ticker.New(time.Minute),
// Make the BatchSize and Min/MaxFeeUpdateTimeout large enough
// to not trigger commit updates automatically during tests.
BatchSize: 10000,
@@ -4534,8 +4559,9 @@ func (h *persistentLinkHarness) restartLink(
go func() {
for {
select {
case <-aliceLink.(*channelLink).htlcUpdates:
case <-notifyUpdateChan:
case <-aliceLink.(*channelLink).quit:
close(doneChan)
return
}
}

View File

@@ -1122,6 +1122,17 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer,
maxFeeUpdateTimeout = 40 * time.Minute
)
notifyUpdateChan := make(chan *contractcourt.ContractUpdate)
doneChan := make(chan struct{})
notifyContractUpdate := func(u *contractcourt.ContractUpdate) error {
select {
case notifyUpdateChan <- u:
case <-doneChan:
}
return nil
}
link := NewChannelLink(
ChannelLinkConfig{
Switch: server.htlcSwitch,
@@ -1142,6 +1153,7 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer,
UpdateContractSignals: func(*contractcourt.ContractSignals) error {
return nil
},
NotifyContractUpdate: notifyContractUpdate,
ChainEvents: &contractcourt.ChainEventSubscription{},
SyncStates: true,
BatchSize: 10,
@@ -1169,8 +1181,9 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer,
go func() {
for {
select {
case <-link.(*channelLink).htlcUpdates:
case <-notifyUpdateChan:
case <-link.(*channelLink).quit:
close(doneChan)
return
}
}