Merge pull request from bottlepay/onchain-interceptor

contractcourt: onchain htlc interceptor
This commit is contained in:
Olaoluwa Osuntokun 2022-04-13 14:58:29 -07:00 committed by GitHub
commit cd8a87c0da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 323 additions and 54 deletions

@ -14,6 +14,7 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/labels"
@ -75,7 +76,9 @@ type WitnessSubscription struct {
type WitnessBeacon interface {
// SubscribeUpdates returns a channel that will be sent upon *each* time
// a new preimage is discovered.
SubscribeUpdates() *WitnessSubscription
SubscribeUpdates(chanID lnwire.ShortChannelID, htlc *channeldb.HTLC,
payload *hop.Payload,
nextHopOnionBlob []byte) (*WitnessSubscription, error)
// LookupPreImage attempts to lookup a preimage in the global cache.
// True is returned for the second argument if the preimage is found.

@ -13,6 +13,7 @@ import (
"github.com/lightningnetwork/lnd/invoices"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
)
// htlcIncomingContestResolver is a ContractResolver that's able to resolve an
@ -70,7 +71,7 @@ func (h *htlcIncomingContestResolver) Resolve() (ContractResolver, error) {
// First try to parse the payload. If that fails, we can stop resolution
// now.
payload, err := h.decodePayload()
payload, nextHopOnionBlob, err := h.decodePayload()
if err != nil {
log.Debugf("ChannelArbitrator(%v): cannot decode payload of "+
"htlc %v", h.ChanPoint, h.HtlcPoint())
@ -152,7 +153,7 @@ func (h *htlcIncomingContestResolver) Resolve() (ContractResolver, error) {
// Update htlcResolution with the matching preimage.
h.htlcResolution.Preimage = preimage
log.Infof("%T(%v): extracted preimage=%v from beacon!", h,
log.Infof("%T(%v): applied preimage=%v", h,
h.htlcResolution.ClaimOutpoint, preimage)
// If this is our commitment transaction, then we'll need to
@ -277,7 +278,13 @@ func (h *htlcIncomingContestResolver) Resolve() (ContractResolver, error) {
// NOTE: This is done BEFORE opportunistically querying the db,
// to ensure the preimage can't be delivered between querying
// and registering for the preimage subscription.
preimageSubscription := h.PreimageDB.SubscribeUpdates()
preimageSubscription, err := h.PreimageDB.SubscribeUpdates(
h.htlcSuccessResolver.ShortChanID, &h.htlc,
payload, nextHopOnionBlob,
)
if err != nil {
return nil, err
}
defer preimageSubscription.CancelSubscription()
// With the epochs and preimage subscriptions initialized, we'll
@ -440,16 +447,31 @@ func (h *htlcIncomingContestResolver) SupplementState(_ *channeldb.OpenChannel)
}
// decodePayload (re)decodes the hop payload of a received htlc.
func (h *htlcIncomingContestResolver) decodePayload() (*hop.Payload, error) {
func (h *htlcIncomingContestResolver) decodePayload() (*hop.Payload,
[]byte, error) {
onionReader := bytes.NewReader(h.htlc.OnionBlob)
iterator, err := h.OnionProcessor.ReconstructHopIterator(
onionReader, h.htlc.RHash[:],
)
if err != nil {
return nil, err
return nil, nil, err
}
return iterator.HopPayload()
payload, err := iterator.HopPayload()
if err != nil {
return nil, nil, err
}
// Transform onion blob for the next hop.
var onionBlob [lnwire.OnionPacketSize]byte
buf := bytes.NewBuffer(onionBlob[0:0])
err = iterator.EncodeNextHop(buf)
if err != nil {
return nil, nil, err
}
return payload, onionBlob[:], nil
}
// A compile time assertion to ensure htlcIncomingContestResolver meets the

@ -276,6 +276,10 @@ func (h *mockHopIterator) HopPayload() (*hop.Payload, error) {
}), nil
}
func (h *mockHopIterator) EncodeNextHop(w io.Writer) error {
return nil
}
type mockOnionProcessor struct {
isExit bool
offeredOnionBlob []byte

@ -14,11 +14,13 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lntest/mock"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/stretchr/testify/require"
)
@ -36,11 +38,15 @@ func newMockWitnessBeacon() *mockWitnessBeacon {
}
}
func (m *mockWitnessBeacon) SubscribeUpdates() *WitnessSubscription {
func (m *mockWitnessBeacon) SubscribeUpdates(
chanID lnwire.ShortChannelID, htlc *channeldb.HTLC,
payload *hop.Payload,
nextHopOnionBlob []byte) (*WitnessSubscription, error) {
return &WitnessSubscription{
WitnessUpdates: m.preImageUpdates,
CancelSubscription: func() {},
}
}, nil
}
func (m *mockWitnessBeacon) LookupPreimage(payhash lntypes.Hash) (lntypes.Preimage, bool) {

@ -187,6 +187,9 @@ then watch it on chain. Taproot script spends are also supported through the
* [Support for making routes with the legacy onion payload format via `SendToRoute` has been removed.](https://github.com/lightningnetwork/lnd/pull/6385)
* Close a gap in the HTLC interceptor API by [intercepting htlcs in the on-chain
resolution flow](https://github.com/lightningnetwork/lnd/pull/6219) too.
## Database
* [Add ForAll implementation for etcd to speed up

@ -41,6 +41,8 @@ type InterceptableSwitch struct {
// interceptor client.
resolutionChan chan *fwdResolution
onchainIntercepted chan InterceptedForward
// interceptorRegistration is a channel that we use to synchronize
// client connect and disconnect.
interceptorRegistration chan ForwardInterceptor
@ -116,6 +118,7 @@ func NewInterceptableSwitch(s *Switch, cltvRejectDelta uint32,
return &InterceptableSwitch{
htlcSwitch: s,
intercepted: make(chan *interceptedPackets),
onchainIntercepted: make(chan InterceptedForward),
interceptorRegistration: make(chan ForwardInterceptor),
holdForwards: make(map[channeldb.CircuitKey]InterceptedForward),
resolutionChan: make(chan *fwdResolution),
@ -181,6 +184,16 @@ func (s *InterceptableSwitch) run() {
log.Errorf("Cannot forward packets: %v", err)
}
case fwd := <-s.onchainIntercepted:
// For on-chain interceptions, we don't know if it has
// already been offered before. This information is in
// the forwarding package which isn't easily accessible
// from contractcourt. It is likely though that it was
// already intercepted in the off-chain flow. And even
// if not, it is safe to signal replay so that we won't
// unexpectedly skip over this htlc.
s.forward(fwd, true)
case res := <-s.resolutionChan:
res.errChan <- s.resolve(res.resolution)
@ -308,17 +321,25 @@ func (s *InterceptableSwitch) ForwardPackets(linkQuit chan struct{}, isReplay bo
return nil
}
// ForwardPacket forwards a single htlc to the external interceptor.
func (s *InterceptableSwitch) ForwardPacket(
fwd InterceptedForward) error {
select {
case s.onchainIntercepted <- fwd:
case <-s.quit:
return errors.New("interceptable switch quit")
}
return nil
}
// interceptForward forwards the packet to the external interceptor after
// checking the interception criteria.
func (s *InterceptableSwitch) interceptForward(packet *htlcPacket,
isReplay bool) bool {
// Process normally if an interceptor is not required and not
// registered.
if !s.requireInterceptor && s.interceptor == nil {
return false
}
switch htlc := packet.htlc.(type) {
case *lnwire.UpdateAddHTLC:
// We are not interested in intercepting initiated payments.
@ -326,16 +347,6 @@ func (s *InterceptableSwitch) interceptForward(packet *htlcPacket,
return false
}
inKey := channeldb.CircuitKey{
ChanID: packet.incomingChanID,
HtlcID: packet.incomingHTLCID,
}
// Ignore already held htlcs.
if _, ok := s.holdForwards[inKey]; ok {
return true
}
intercepted := &interceptedForward{
htlc: htlc,
packet: packet,
@ -363,14 +374,37 @@ func (s *InterceptableSwitch) interceptForward(packet *htlcPacket,
return true
}
if s.interceptor == nil && !isReplay {
// There is no interceptor registered, we are in
// interceptor-required mode, and this is a new packet
//
// Because the interceptor has never seen this packet
// yet, it is still safe to fail back. This limits the
// backlog of htlcs when the interceptor is down.
err := intercepted.FailWithCode(
return s.forward(intercepted, isReplay)
default:
return false
}
}
// forward records the intercepted htlc and forwards it to the interceptor.
func (s *InterceptableSwitch) forward(
fwd InterceptedForward, isReplay bool) bool {
inKey := fwd.Packet().IncomingCircuit
// Ignore already held htlcs.
if _, ok := s.holdForwards[inKey]; ok {
return true
}
// If there is no interceptor currently registered, configuration and packet
// replay status determine how the packet is handled.
if s.interceptor == nil {
// Process normally if an interceptor is not required.
if !s.requireInterceptor {
return false
}
// We are in interceptor-required mode. If this is a new packet, it is
// still safe to fail back. The interceptor has never seen this packet
// yet. This limits the backlog of htlcs when the interceptor is down.
if !isReplay {
err := fwd.FailWithCode(
lnwire.CodeTemporaryChannelFailure,
)
if err != nil {
@ -380,20 +414,20 @@ func (s *InterceptableSwitch) interceptForward(packet *htlcPacket,
return true
}
s.holdForwards[inKey] = intercepted
// If there is no interceptor registered, we must be in
// interceptor-required mode. The packet is kept in the queue
// until the interceptor registers itself.
if s.interceptor != nil {
s.sendForward(intercepted)
}
// This packet is a replay. It is not safe to fail back, because the
// interceptor may still signal otherwise upon reconnect. Keep the
// packet in the queue until then.
s.holdForwards[inKey] = fwd
return true
default:
return false
}
// There is an interceptor registered. We can forward the packet right now.
// Hold it in the queue too to track what is outstanding.
s.holdForwards[inKey] = fwd
s.sendForward(fwd)
return true
}
// handleExpired checks that the htlc isn't too close to the channel

@ -65,8 +65,12 @@ func (m *mockPreimageCache) AddPreimages(preimages ...lntypes.Preimage) error {
return nil
}
func (m *mockPreimageCache) SubscribeUpdates() *contractcourt.WitnessSubscription {
return nil
func (m *mockPreimageCache) SubscribeUpdates(
chanID lnwire.ShortChannelID, htlc *channeldb.HTLC,
payload *hop.Payload,
nextHopOnionBlob []byte) (*contractcourt.WitnessSubscription, error) {
return nil, nil
}
type mockFeeEstimator struct {

80
intercepted_forward.go Normal file

@ -0,0 +1,80 @@
package lnd
import (
"errors"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwire"
)
var (
// ErrCannotResume is returned when an intercepted forward cannot be
// resumed. This is the case in the on-chain resolution flow.
ErrCannotResume = errors.New("cannot resume in the on-chain flow")
// ErrCannotFail is returned when an intercepted forward cannot be failed.
// This is the case in the on-chain resolution flow.
ErrCannotFail = errors.New("cannot fail in the on-chain flow")
// ErrPreimageMismatch is returned when the preimage that is specified to
// settle an htlc doesn't match the htlc hash.
ErrPreimageMismatch = errors.New("preimage does not match hash")
)
// interceptedForward implements the on-chain behavior for the resolution of
// a forwarded htlc.
type interceptedForward struct {
packet *htlcswitch.InterceptedPacket
beacon *preimageBeacon
}
func newInterceptedForward(
packet *htlcswitch.InterceptedPacket,
beacon *preimageBeacon) *interceptedForward {
return &interceptedForward{
beacon: beacon,
packet: packet,
}
}
// Packet returns the intercepted htlc packet.
func (f *interceptedForward) Packet() htlcswitch.InterceptedPacket {
return *f.packet
}
// Resume notifies the intention to resume an existing hold forward. This
// basically means the caller wants to resume with the default behavior for this
// htlc which usually means forward it.
func (f *interceptedForward) Resume() error {
return ErrCannotResume
}
// Fail notifies the intention to fail an existing hold forward with an
// encrypted failure reason.
func (f *interceptedForward) Fail(_ []byte) error {
// We can't actively fail an htlc. The best we could do is abandon the
// resolver, but this wouldn't be a safe operation. There may be a race
// with the preimage beacon supplying a preimage. Therefore we don't
// attempt to fail and just return an error here.
return ErrCannotFail
}
// FailWithCode notifies the intention to fail an existing hold forward with the
// specified failure code.
func (f *interceptedForward) FailWithCode(_ lnwire.FailCode) error {
return ErrCannotFail
}
// Settle notifies the intention to settle an existing hold forward with a given
// preimage.
func (f *interceptedForward) Settle(preimage lntypes.Preimage) error {
if !preimage.Matches(f.packet.Hash) {
return ErrPreimageMismatch
}
// Add preimage to the preimage beacon. The onchain resolver will pick
// up the preimage from the beacon.
return f.beacon.AddPreimages(preimage)
}

@ -595,11 +595,6 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
quit: make(chan struct{}),
}
s.witnessBeacon = &preimageBeacon{
wCache: dbs.ChanStateDB.NewWitnessCache(),
subscribers: make(map[uint64]*preimageSubscriber),
}
currentHash, currentHeight, err := s.cc.ChainIO.GetBestBlock()
if err != nil {
return nil, err
@ -659,6 +654,11 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
s.cfg.RequireInterceptor,
)
s.witnessBeacon = newPreimageBeacon(
dbs.ChanStateDB.NewWitnessCache(),
s.interceptableSwitch.ForwardPacket,
)
chanStatusMgrCfg := &netann.ChanStatusConfig{
ChanStatusSampleInterval: cfg.ChanStatusSampleInterval,
ChanEnableTimeout: cfg.ChanEnableTimeout,

@ -5,7 +5,10 @@ import (
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwire"
)
// preimageSubscriber reprints an active subscription to be notified once the
@ -16,21 +19,48 @@ type preimageSubscriber struct {
quit chan struct{}
}
type witnessCache interface {
// LookupSha256Witness attempts to lookup the preimage for a sha256
// hash. If the witness isn't found, ErrNoWitnesses will be returned.
LookupSha256Witness(hash lntypes.Hash) (lntypes.Preimage, error)
// AddSha256Witnesses adds a batch of new sha256 preimages into the
// witness cache. This is an alias for AddWitnesses that uses
// Sha256HashWitness as the preimages' witness type.
AddSha256Witnesses(preimages ...lntypes.Preimage) error
}
// preimageBeacon is an implementation of the contractcourt.WitnessBeacon
// interface, and the lnwallet.PreimageCache interface. This implementation is
// concerned with a single witness type: sha256 hahsh preimages.
type preimageBeacon struct {
sync.RWMutex
wCache *channeldb.WitnessCache
wCache witnessCache
clientCounter uint64
subscribers map[uint64]*preimageSubscriber
interceptor func(htlcswitch.InterceptedForward) error
}
func newPreimageBeacon(wCache witnessCache,
interceptor func(htlcswitch.InterceptedForward) error) *preimageBeacon {
return &preimageBeacon{
wCache: wCache,
interceptor: interceptor,
subscribers: make(map[uint64]*preimageSubscriber),
}
}
// SubscribeUpdates returns a channel that will be sent upon *each* time a new
// preimage is discovered.
func (p *preimageBeacon) SubscribeUpdates() *contractcourt.WitnessSubscription {
func (p *preimageBeacon) SubscribeUpdates(
chanID lnwire.ShortChannelID, htlc *channeldb.HTLC,
payload *hop.Payload,
nextHopOnionBlob []byte) (*contractcourt.WitnessSubscription, error) {
p.Lock()
defer p.Unlock()
@ -47,7 +77,7 @@ func (p *preimageBeacon) SubscribeUpdates() *contractcourt.WitnessSubscription {
srvrLog.Debugf("Creating new witness beacon subscriber, id=%v",
p.clientCounter)
return &contractcourt.WitnessSubscription{
sub := &contractcourt.WitnessSubscription{
WitnessUpdates: client.updateChan,
CancelSubscription: func() {
p.Lock()
@ -58,6 +88,32 @@ func (p *preimageBeacon) SubscribeUpdates() *contractcourt.WitnessSubscription {
close(client.quit)
},
}
// Notify the htlc interceptor. There may be a client connected
// and willing to supply a preimage.
packet := &htlcswitch.InterceptedPacket{
Hash: htlc.RHash,
IncomingExpiry: htlc.RefundTimeout,
IncomingAmount: htlc.Amt,
IncomingCircuit: channeldb.CircuitKey{
ChanID: chanID,
HtlcID: htlc.HtlcIndex,
},
OutgoingChanID: payload.FwdInfo.NextHop,
OutgoingExpiry: payload.FwdInfo.OutgoingCTLV,
OutgoingAmount: payload.FwdInfo.AmountToForward,
CustomRecords: payload.CustomRecords(),
}
copy(packet.OnionBlob[:], nextHopOnionBlob)
fwd := newInterceptedForward(packet, p)
err := p.interceptor(fwd)
if err != nil {
return nil, err
}
return sub, nil
}
// LookupPreImage attempts to lookup a preimage in the global cache. True is

57
witness_beacon_test.go Normal file

@ -0,0 +1,57 @@
package lnd
import (
"testing"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/stretchr/testify/require"
)
// TestWitnessBeaconIntercept tests that the beacon passes on subscriptions to
// the interceptor correctly.
func TestWitnessBeaconIntercept(t *testing.T) {
var interceptedFwd htlcswitch.InterceptedForward
interceptor := func(fwd htlcswitch.InterceptedForward) error {
interceptedFwd = fwd
return nil
}
p := newPreimageBeacon(
&mockWitnessCache{}, interceptor,
)
preimage := lntypes.Preimage{1, 2, 3}
hash := preimage.Hash()
subscription, err := p.SubscribeUpdates(
lnwire.NewShortChanIDFromInt(1),
&channeldb.HTLC{
RHash: hash,
},
&hop.Payload{},
[]byte{2},
)
require.NoError(t, err)
defer subscription.CancelSubscription()
require.NoError(t, interceptedFwd.Settle(preimage))
update := <-subscription.WitnessUpdates
require.Equal(t, preimage, update)
}
type mockWitnessCache struct {
witnessCache
}
func (w *mockWitnessCache) AddSha256Witnesses(
preimages ...lntypes.Preimage) error {
return nil
}