From 41c40a9560f242ee547761b0b05840aa3036e1de Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 22 Feb 2018 15:06:31 -0800 Subject: [PATCH 1/9] channeldb/forwarding_package: initial fwdpkg --- channeldb/forwarding_package.go | 921 ++++++++++++++++++++++++++++++++ 1 file changed, 921 insertions(+) create mode 100644 channeldb/forwarding_package.go diff --git a/channeldb/forwarding_package.go b/channeldb/forwarding_package.go new file mode 100644 index 000000000..d43904af9 --- /dev/null +++ b/channeldb/forwarding_package.go @@ -0,0 +1,921 @@ +package channeldb + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + + "github.com/boltdb/bolt" + "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/lnwire" +) + +// ErrCorruptedFwdPkg signals that the on-disk structure of the forwarding +// package has potentially been mangled. +var ErrCorruptedFwdPkg = errors.New("fwding package db has been corrupted") + +// FwdState is an enum used to describe the lifecycle of a FwdPkg. +type FwdState byte + +const ( + // FwdStateLockedIn is the starting state for all forwarding packages. + // Packages in this state have not yet committed to the exact set of + // Adds to forward to the switch. + FwdStateLockedIn FwdState = iota + + // FwdStateProcessed marks the state in which all Adds have been + // locally processed and the forwarding decision to the switch has been + // persisted. + FwdStateProcessed + + // FwdStateCompleted signals that all Adds have been acked, and that all + // settles and fails have been delivered to their sources. Packages in + // this state can be removed permanently. + FwdStateCompleted +) + +var ( + // fwdPackagesKey is the root-level bucket that all forwarding packages + // are written. This bucket is further subdivided based on the short + // channel ID of each channel. + fwdPackagesKey = []byte("fwd-packages") + + // addBucketKey is the bucket to which all Add log updates are written. + addBucketKey = []byte("add-updates") + + // failSettleBucketKey is the bucket to which all Settle/Fail log + // updates are written. + failSettleBucketKey = []byte("fail-settle-updates") + + // fwdFilterKey is a key used to write the set of Adds that passed + // validation and are to be forwarded to the switch. + // NOTE: The presence of this key within a forwarding package indicates + // that the package has reached FwdStateProcessed. + fwdFilterKey = []byte("fwd-filter-key") + + // ackFilterKey is a key used to access the PkgFilter indicating which + // Adds have received a Settle/Fail. This response may come from a + // number of sources, including: exitHop settle/fails, switch failures, + // chain arbiter interjections, as well as settle/fails from the + // next hop in the route. + ackFilterKey = []byte("ack-filter-key") + + // settleFailFilterKey is a key used to access the PkgFilter indicating + // which Settles/Fails in have been received and processed by the link + // that originally received the Add. + settleFailFilterKey = []byte("settle-fail-filter-key") +) + +// PkgFilter is used to compactly represent a particular subset of the Adds in a +// forwarding package. Each filter is represented as a simple, statically-sized +// bitvector, where the elements are intended to be the indices of the Adds as +// they are written in the FwdPkg. +type PkgFilter struct { + count uint16 + filter []byte +} + +// NewPkgFilter initializes an empty PkgFilter supporting `count` elements. +func NewPkgFilter(count uint16) *PkgFilter { + // We add 7 to ensure that the integer division yields properly rounded + // values. + filterLen := (count + 7) / 8 + + return &PkgFilter{ + count: count, + filter: make([]byte, filterLen), + } +} + +// Count returns the number of elements represented by this PkgFilter. +func (f *PkgFilter) Count() uint16 { + return f.count +} + +// Set marks the `i`-th element as included by this filter. +// NOTE: It is assumed that i is always less than count. +func (f *PkgFilter) Set(i uint16) { + byt := i / 8 + bit := i % 8 + + // Set the i-th bit in the filter. + // TODO(conner): ignore if > count to prevent panic? + f.filter[byt] |= byte(1 << (7 - bit)) +} + +// Contains queries the filter for membership of index `i`. +// NOTE: It is assumed that i is always less than count. +func (f *PkgFilter) Contains(i uint16) bool { + byt := i / 8 + bit := i % 8 + + // Read the i-th bit in the filter. + // TODO(conner): ignore if > count to prevent panic? + return f.filter[byt]&(1<<(7-bit)) != 0 +} + +// Equal checks two PkgFilters for equality. +func (f *PkgFilter) Equal(f2 *PkgFilter) bool { + if f == f2 { + return true + } + if f.count != f2.count { + return false + } + + return bytes.Equal(f.filter, f2.filter) +} + +// IsFull returns true if every element in the filter has been Set, and false +// otherwise. +func (f *PkgFilter) IsFull() bool { + // Batch validate bytes that are fully used. + for i := uint16(0); i < f.count/8; i++ { + if f.filter[i] != 0xFF { + return false + } + } + + // If the count is not a multiple of 8, check that the filter contains + // all remaining bits. + rem := f.count % 8 + for idx := f.count - rem; idx < f.count; idx++ { + if !f.Contains(idx) { + return false + } + } + + return true +} + +// Size returns number of bytes produced when the PkgFilter is serialized. +func (f *PkgFilter) Size() uint16 { + // 2 bytes for uint16 `count`, then round up number of bytes required to + // represent `count` bits. + return 2 + (f.count+7)/8 +} + +// Encode writes the filter to the provided io.Writer. +func (f *PkgFilter) Encode(w io.Writer) error { + if err := binary.Write(w, binary.BigEndian, f.count); err != nil { + return err + } + + _, err := w.Write(f.filter) + + return err +} + +// Decode reads the filter from the provided io.Reader. +func (f *PkgFilter) Decode(r io.Reader) error { + if err := binary.Read(r, binary.BigEndian, &f.count); err != nil { + return err + } + + f.filter = make([]byte, f.Size()-2) + _, err := io.ReadFull(r, f.filter) + + return err +} + +// FwdPkg records all adds, settles, and fails that were locked in as a result +// of the remote peer sending us a revocation. Each package is identified by +// the short chanid and remote commitment height corresponding to the revocation +// that locked in the HTLCs. For everything except a locally initiated payment, +// settles and fails in a forwarding package must have a corresponding Add in +// another package, and can be removed individually once the source link has +// received the fail/settle. +// +// Adds cannot be removed, as we need to present the same batch of Adds to +// properly handle replay protection. Instead, we use a PkgFilter to mark that +// we have finished processing a particular Add. A FwdPkg should only be deleted +// after the AckFilter is full and all settles and fails have been persistently +// removed. +type FwdPkg struct { + // Source identifies the channel that wrote this forwarding package. + Source lnwire.ShortChannelID + + // Height is the height of the remote commitment chain that locked in + // this forwarding package. + Height uint64 + + // State signals the persistent condition of the package and directs how + // to reprocess the package in the event of failures. + State FwdState + + // Adds contains all add messages which need to be processed and + // forwarded to the switch. Adds does not change over the life of a + // forwarding package. + Adds []LogUpdate + + // FwdFilter is a filter containing the indices of all Adds that were + // forwarded to the switch. + FwdFilter *PkgFilter + + // AckFilter is a filter containing the indices of all Adds for which + // the source has received a settle or fail and is reflected in the next + // commitment txn. A package should not be removed until IsFull() + // returns true. + AckFilter *PkgFilter + + // SettleFails contains all settle and fail messages that should be + // forwarded to the switch. + SettleFails []LogUpdate + + // SettleFailFilter is a filter containing the indices of all Settle or + // Fails originating in this package that have been received and locked + // into the incoming link's commitment state. + SettleFailFilter *PkgFilter +} + +// NewFwdPkg initializes a new forwarding package in FwdStateLockedIn. This +// should be used to create a package at the time we receive a revocation. +func NewFwdPkg(source lnwire.ShortChannelID, height uint64, + addUpdates, settleFailUpdates []LogUpdate) *FwdPkg { + + nAddUpdates := uint16(len(addUpdates)) + nSettleFailUpdates := uint16(len(settleFailUpdates)) + + return &FwdPkg{ + Source: source, + Height: height, + State: FwdStateLockedIn, + Adds: addUpdates, + FwdFilter: NewPkgFilter(nAddUpdates), + AckFilter: NewPkgFilter(nAddUpdates), + SettleFails: settleFailUpdates, + SettleFailFilter: NewPkgFilter(nSettleFailUpdates), + } +} + +// ID returns an unique identifier for this package, used to ensure that sphinx +// replay processing of this batch is idempotent. +func (f *FwdPkg) ID() []byte { + var id = make([]byte, 16) + byteOrder.PutUint64(id[:8], f.Source.ToUint64()) + byteOrder.PutUint64(id[8:], f.Height) + return id +} + +// String returns a human-readable description of the forwarding package. +func (f *FwdPkg) String() string { + return fmt.Sprintf("%T(src=%v, height=%v, nadds=%v, nfailsettles=%v)", + f, f.Source, f.Height, len(f.Adds), len(f.SettleFails)) +} + +// AddRef is used to identify a particular Add in a FwdPkg. The short channel ID +// is assumed to be that of the packager. +type AddRef struct { + // Height is the remote commitment height that locked in the Add. + Height uint64 + + // Index is the index of the Add within the fwd pkg's Adds. + // + // NOTE: This index is static over the lifetime of a forwarding package. + Index uint16 +} + +// Encode serializes the AddRef to the given io.Writer. +func (a *AddRef) Encode(w io.Writer) error { + if err := binary.Write(w, binary.BigEndian, a.Height); err != nil { + return err + } + + return binary.Write(w, binary.BigEndian, a.Index) +} + +// Decode deserializes the AddRef from the given io.Reader. +func (a *AddRef) Decode(r io.Reader) error { + if err := binary.Read(r, binary.BigEndian, &a.Height); err != nil { + return err + } + + return binary.Read(r, binary.BigEndian, &a.Index) +} + +// SettleFailRef is used to locate a Settle/Fail in another channel's FwdPkg. A +// channel does not remove its own Settle/Fail htlcs, so the source is provided +// to locate a db bucket belonging to another channel. +type SettleFailRef struct { + // Source identifies the outgoing link that locked in the settle or + // fail. This is then used by the *incoming* link to find the settle + // fail in another link's forwarding packages. + Source lnwire.ShortChannelID + + // Height is the remote commitment height that locked in this + // Settle/Fail. + Height uint64 + + // Index is the index of the Add with the fwd pkg's SettleFails. + // + // NOTE: This index is static over the lifetime of a forwarding package. + Index uint16 +} + +// SettleFailAcker is a generic interface providing the ability to acknowledge +// settle/fail HTLCs stored in forwarding packages. +type SettleFailAcker interface { + // AckSettleFails atomically updates the settle-fail filters in *other* + // channels' forwarding packages. + AckSettleFails(tx *bolt.Tx, settleFailRefs ...SettleFailRef) error +} + +// GlobalFwdPkgReader is an interface used to retrieve the forwarding packages +// of any active channel. +type GlobalFwdPkgReader interface { + // LoadChannelFwdPkgs loads all known forwarding packages for the given + // channel. + LoadChannelFwdPkgs(tx *bolt.Tx, + source lnwire.ShortChannelID) ([]*FwdPkg, error) +} + +// FwdOperator defines the interfaces for managing forwarding packages that are +// external to a particular channel. This interface is used by the switch to +// read forwarding packages from arbitrary channels, and acknowledge settles and +// fails for locally-sourced payments. +type FwdOperator interface { + // GlobalFwdPkgReader provides read access to all known forwarding + // packages + GlobalFwdPkgReader + + // SettleFailAcker grants the ability to acknowledge settles or fails + // residing in arbitrary forwarding packages. + SettleFailAcker +} + +// SwitchPackager is a concrete implementation of the FwdOperator interface. +// A SwitchPackager offers the ability to read any forwarding package, and ack +// arbitrary settle and fail HTLCs. +type SwitchPackager struct{} + +// NewSwitchPackager instantiates a new SwitchPackager. +func NewSwitchPackager() *SwitchPackager { + return &SwitchPackager{} +} + +// AckSettleFails atomically updates the settle-fail filters in *other* +// channels' forwarding packages, to mark that the switch has received a settle +// or fail residing in the forwarding package of a link. +func (*SwitchPackager) AckSettleFails(tx *bolt.Tx, + settleFailRefs ...SettleFailRef) error { + + return ackSettleFails(tx, settleFailRefs) +} + +// LoadChannelFwdPkgs loads all forwarding packages for a particular channel. +func (*SwitchPackager) LoadChannelFwdPkgs(tx *bolt.Tx, + source lnwire.ShortChannelID) ([]*FwdPkg, error) { + + return loadChannelFwdPkgs(tx, source) +} + +// FwdPackager supports all operations required to modify fwd packages, such as +// creation, updates, reading, and removal. The interfaces are broken down in +// this way to support future delegation of the subinterfaces. +type FwdPackager interface { + // AddFwdPkg serializes and writes a FwdPkg for this channel at the + // remote commitment height included in the forwarding package. + AddFwdPkg(tx *bolt.Tx, fwdPkg *FwdPkg) error + + // SetFwdFilter looks up the forwarding package at the remote `height` + // and sets the `fwdFilter`, marking the Adds for which: + // 1) We are not the exit node + // 2) Passed all validation + // 3) Should be forwarded to the switch immediately after a failure + SetFwdFilter(tx *bolt.Tx, height uint64, fwdFilter *PkgFilter) error + + // AckAddHtlcs atomically updates the add filters in this channel's + // forwarding packages to mark the resolution of an Add that was + // received from the remote party. + AckAddHtlcs(tx *bolt.Tx, addRefs ...AddRef) error + + // SettleFailAcker allows a link to acknowledge settle/fail HTLCs + // belonging to other channels. + SettleFailAcker + + // LoadFwdPkgs loads all known forwarding packages owned by this + // channel. + LoadFwdPkgs(tx *bolt.Tx) ([]*FwdPkg, error) + + // RemovePkg deletes a forwarding package owned by this channel at + // the provided remote `height`. + RemovePkg(tx *bolt.Tx, height uint64) error +} + +// ChannelPackager is used by a channel to manage the lifecycle of its forwarding +// packages. The packager is tied to a particular source channel ID, allowing it +// to create and edit its own packages. Each packager also has the ability to +// remove fail/settle htlcs that correspond to an add contained in one of +// source's packages. +type ChannelPackager struct { + source lnwire.ShortChannelID +} + +// NewChannelPackager creates a new packager for a single channel. +func NewChannelPackager(source lnwire.ShortChannelID) *ChannelPackager { + return &ChannelPackager{ + source: source, + } +} + +// AddFwdPkg writes a newly locked in forwarding package to disk. +func (*ChannelPackager) AddFwdPkg(tx *bolt.Tx, fwdPkg *FwdPkg) error { + fwdPkgBkt, err := tx.CreateBucketIfNotExists(fwdPackagesKey) + if err != nil { + return err + } + + source := makeLogKey(fwdPkg.Source.ToUint64()) + sourceBkt, err := fwdPkgBkt.CreateBucketIfNotExists(source[:]) + if err != nil { + return err + } + + heightKey := makeLogKey(fwdPkg.Height) + heightBkt, err := sourceBkt.CreateBucketIfNotExists(heightKey[:]) + if err != nil { + return err + } + + // Write ADD updates we received at this commit height. + addBkt, err := heightBkt.CreateBucketIfNotExists(addBucketKey) + if err != nil { + return err + } + + // Write SETTLE/FAIL updates we received at this commit height. + failSettleBkt, err := heightBkt.CreateBucketIfNotExists(failSettleBucketKey) + if err != nil { + return err + } + + for i := range fwdPkg.Adds { + err = putLogUpdate(addBkt, uint16(i), &fwdPkg.Adds[i]) + if err != nil { + return err + } + } + + // Persist the initialized pkg filter, which will be used to determine + // when we can remove this forwarding package from disk. + var ackFilterBuf bytes.Buffer + if err := fwdPkg.AckFilter.Encode(&ackFilterBuf); err != nil { + return err + } + + if err := heightBkt.Put(ackFilterKey, ackFilterBuf.Bytes()); err != nil { + return err + } + + for i := range fwdPkg.SettleFails { + err = putLogUpdate(failSettleBkt, uint16(i), &fwdPkg.SettleFails[i]) + if err != nil { + return err + } + } + + var settleFailFilterBuf bytes.Buffer + err = fwdPkg.SettleFailFilter.Encode(&settleFailFilterBuf) + if err != nil { + return err + } + + return heightBkt.Put(settleFailFilterKey, settleFailFilterBuf.Bytes()) +} + +// putLogUpdate writes an htlc to the provided `bkt`, using `index` as the key. +func putLogUpdate(bkt *bolt.Bucket, idx uint16, htlc *LogUpdate) error { + var b bytes.Buffer + if err := htlc.Encode(&b); err != nil { + return err + } + + return bkt.Put(uint16Key(idx), b.Bytes()) +} + +// LoadFwdPkgs scans the forwarding log for any packages that haven't been +// processed, and returns their deserialized log updates in a map indexed by the +// remote commitment height at which the updates were locked in. +func (p *ChannelPackager) LoadFwdPkgs(tx *bolt.Tx) ([]*FwdPkg, error) { + return loadChannelFwdPkgs(tx, p.source) +} + +// loadChannelFwdPkgs loads all forwarding packages owned by `source`. +func loadChannelFwdPkgs(tx *bolt.Tx, source lnwire.ShortChannelID) ([]*FwdPkg, error) { + fwdPkgBkt := tx.Bucket(fwdPackagesKey) + if fwdPkgBkt == nil { + return nil, nil + } + + sourceKey := makeLogKey(source.ToUint64()) + sourceBkt := fwdPkgBkt.Bucket(sourceKey[:]) + if sourceBkt == nil { + return nil, nil + } + + var heights []uint64 + if err := sourceBkt.ForEach(func(k, _ []byte) error { + if len(k) != 8 { + return ErrCorruptedFwdPkg + } + + heights = append(heights, byteOrder.Uint64(k)) + + return nil + }); err != nil { + return nil, err + } + + // Load the forwarding package for each retrieved height. + fwdPkgs := make([]*FwdPkg, 0, len(heights)) + for _, height := range heights { + fwdPkg, err := loadFwdPkg(fwdPkgBkt, source, height) + if err != nil { + return nil, err + } + + fwdPkgs = append(fwdPkgs, fwdPkg) + } + + return fwdPkgs, nil +} + +// loadFwPkg reads the packager's fwd pkg at a given height, and determines the +// appropriate FwdState. +func loadFwdPkg(fwdPkgBkt *bolt.Bucket, source lnwire.ShortChannelID, + height uint64) (*FwdPkg, error) { + + sourceKey := makeLogKey(source.ToUint64()) + sourceBkt := fwdPkgBkt.Bucket(sourceKey[:]) + if sourceBkt == nil { + return nil, ErrCorruptedFwdPkg + } + + heightKey := makeLogKey(height) + heightBkt := sourceBkt.Bucket(heightKey[:]) + if heightBkt == nil { + return nil, ErrCorruptedFwdPkg + } + + // Load ADDs from disk. + addBkt := heightBkt.Bucket(addBucketKey) + if addBkt == nil { + return nil, ErrCorruptedFwdPkg + } + + adds, err := loadHtlcs(addBkt) + if err != nil { + return nil, err + } + + // Load ack filter from disk. + ackFilterBytes := heightBkt.Get(ackFilterKey) + if ackFilterBytes == nil { + return nil, ErrCorruptedFwdPkg + } + ackFilterReader := bytes.NewReader(ackFilterBytes) + + ackFilter := &PkgFilter{} + if err := ackFilter.Decode(ackFilterReader); err != nil { + return nil, err + } + + // Load SETTLE/FAILs from disk. + failSettleBkt := heightBkt.Bucket(failSettleBucketKey) + if failSettleBkt == nil { + return nil, ErrCorruptedFwdPkg + } + + failSettles, err := loadHtlcs(failSettleBkt) + if err != nil { + return nil, err + } + + // Load settle fail filter from disk. + settleFailFilterBytes := heightBkt.Get(settleFailFilterKey) + if settleFailFilterBytes == nil { + return nil, ErrCorruptedFwdPkg + } + settleFailFilterReader := bytes.NewReader(settleFailFilterBytes) + + settleFailFilter := &PkgFilter{} + if err := settleFailFilter.Decode(settleFailFilterReader); err != nil { + return nil, err + } + + // Initialize the fwding package, which always starts in the + // FwdStateLockedIn. We can determine what state the package was left in + // by examining constraints on the information loaded from disk. + fwdPkg := &FwdPkg{ + Source: source, + State: FwdStateLockedIn, + Height: height, + Adds: adds, + AckFilter: ackFilter, + SettleFails: failSettles, + SettleFailFilter: settleFailFilter, + } + + // Check to see if we have written the set exported filter adds to + // disk. If we haven't, processing of this package was never started, or + // failed during the last attempt. + fwdFilterBytes := heightBkt.Get(fwdFilterKey) + if fwdFilterBytes == nil { + nAdds := uint16(len(adds)) + fwdPkg.FwdFilter = NewPkgFilter(nAdds) + return fwdPkg, nil + } + + fwdFilterReader := bytes.NewReader(fwdFilterBytes) + fwdPkg.FwdFilter = &PkgFilter{} + if err := fwdPkg.FwdFilter.Decode(fwdFilterReader); err != nil { + return nil, err + } + + // Otherwise, a complete round of processing was completed, and we + // advance the package to FwdStateProcessed. + fwdPkg.State = FwdStateProcessed + + // If every add, settle, and fail has been fully acknowledged, we can + // safely set the package's state to FwdStateCompleted, signalling that + // it can be garbage collected. + if fwdPkg.AckFilter.IsFull() && fwdPkg.SettleFailFilter.IsFull() { + fwdPkg.State = FwdStateCompleted + } + + return fwdPkg, nil +} + +// loadHtlcs retrieves all serialized htlcs in a bucket, returning +// them in order of the indexes they were written under. +func loadHtlcs(bkt *bolt.Bucket) ([]LogUpdate, error) { + var htlcs []LogUpdate + if err := bkt.ForEach(func(_, v []byte) error { + var htlc LogUpdate + if err := htlc.Decode(bytes.NewReader(v)); err != nil { + return err + } + + htlcs = append(htlcs, htlc) + + return nil + }); err != nil { + return nil, err + } + + return htlcs, nil +} + +// SetFwdFilter writes the set of indexes corresponding to Adds at the +// `height` that are to be forwarded to the switch. Calling this method causes +// the forwarding package at `height` to be in FwdStateProcessed. We write this +// forwarding decision so that we always arrive at the same behavior for HTLCs +// leaving this channel. After a restart, we skip validation of these Adds, +// since they are assumed to have already been validated, and make the switch or +// outgoing link responsible for handling replays. +func (p *ChannelPackager) SetFwdFilter(tx *bolt.Tx, height uint64, + fwdFilter *PkgFilter) error { + + fwdPkgBkt := tx.Bucket(fwdPackagesKey) + if fwdPkgBkt == nil { + return ErrCorruptedFwdPkg + } + + source := makeLogKey(p.source.ToUint64()) + sourceBkt := fwdPkgBkt.Bucket(source[:]) + if sourceBkt == nil { + return ErrCorruptedFwdPkg + } + + heightKey := makeLogKey(height) + heightBkt := sourceBkt.Bucket(heightKey[:]) + if heightBkt == nil { + return ErrCorruptedFwdPkg + } + + // If the fwd filter has already been written, we return early to avoid + // modifying the persistent state. + forwardedAddsBytes := heightBkt.Get(fwdFilterKey) + if forwardedAddsBytes != nil { + return nil + } + + // Otherwise we serialize and write the provided fwd filter. + var b bytes.Buffer + if err := fwdFilter.Encode(&b); err != nil { + return err + } + + return heightBkt.Put(fwdFilterKey, b.Bytes()) +} + +// AckAddHtlcs accepts a list of references to add htlcs, and updates the +// AckAddFilter of those forwarding packages to indicate that a settle or fail +// has been received in response to the add. +func (p *ChannelPackager) AckAddHtlcs(tx *bolt.Tx, addRefs ...AddRef) error { + if len(addRefs) == 0 { + return nil + } + + fwdPkgBkt := tx.Bucket(fwdPackagesKey) + if fwdPkgBkt == nil { + return ErrCorruptedFwdPkg + } + + sourceKey := makeLogKey(p.source.ToUint64()) + sourceBkt := fwdPkgBkt.Bucket(sourceKey[:]) + if sourceBkt == nil { + return ErrCorruptedFwdPkg + } + + // Organize the forward references such that we just get a single slice + // of indexes for each unique height. + heightDiffs := make(map[uint64][]uint16) + for _, addRef := range addRefs { + heightDiffs[addRef.Height] = append( + heightDiffs[addRef.Height], + addRef.Index, + ) + } + + // Load each height bucket once and remove all acked htlcs at that + // height. + for height, indexes := range heightDiffs { + err := ackAddHtlcsAtHeight(sourceBkt, height, indexes) + if err != nil { + return err + } + } + + return nil +} + +// ackAddHtlcsAtHeight updates the AddAckFilter of a single forwarding package +// with a list of indexes, writing the resulting filter back in its place. +func ackAddHtlcsAtHeight(sourceBkt *bolt.Bucket, height uint64, + indexes []uint16) error { + + heightKey := makeLogKey(height) + heightBkt := sourceBkt.Bucket(heightKey[:]) + if heightBkt == nil { + return ErrCorruptedFwdPkg + } + + // Load ack filter from disk. + ackFilterBytes := heightBkt.Get(ackFilterKey) + if ackFilterBytes == nil { + return ErrCorruptedFwdPkg + } + + ackFilter := &PkgFilter{} + ackFilterReader := bytes.NewReader(ackFilterBytes) + if err := ackFilter.Decode(ackFilterReader); err != nil { + return err + } + + // Update the ack filter for this height. + for _, index := range indexes { + ackFilter.Set(index) + } + + // Write the resulting filter to disk. + var ackFilterBuf bytes.Buffer + if err := ackFilter.Encode(&ackFilterBuf); err != nil { + return err + } + + return heightBkt.Put(ackFilterKey, ackFilterBuf.Bytes()) +} + +// AckSettleFails persistently acknowledges settles or fails from a remote forwarding +// package. This should only be called after the source of the Add has locked in +// the settle/fail, or it becomes otherwise safe to forgo retransmitting the +// settle/fail after a restart. +func (p *ChannelPackager) AckSettleFails(tx *bolt.Tx, settleFailRefs ...SettleFailRef) error { + return ackSettleFails(tx, settleFailRefs) +} + +// ackSettleFails persistently acknowledges a batch of settle fail references. +func ackSettleFails(tx *bolt.Tx, settleFailRefs []SettleFailRef) error { + if len(settleFailRefs) == 0 { + return nil + } + + fwdPkgBkt := tx.Bucket(fwdPackagesKey) + if fwdPkgBkt == nil { + return ErrCorruptedFwdPkg + } + + // Organize the forward references such that we just get a single slice + // of indexes for each unique destination-height pair. + destHeightDiffs := make(map[lnwire.ShortChannelID]map[uint64][]uint16) + for _, settleFailRef := range settleFailRefs { + destHeights, ok := destHeightDiffs[settleFailRef.Source] + if !ok { + destHeights = make(map[uint64][]uint16) + destHeightDiffs[settleFailRef.Source] = destHeights + } + + destHeights[settleFailRef.Height] = append( + destHeights[settleFailRef.Height], + settleFailRef.Index, + ) + } + + // With the references organized by destination and height, we now load + // each remote bucket, and update the settle fail filter for any + // settle/fail htlcs. + for dest, destHeights := range destHeightDiffs { + destKey := makeLogKey(dest.ToUint64()) + destBkt := fwdPkgBkt.Bucket(destKey[:]) + if destBkt == nil { + continue + } + + for height, indexes := range destHeights { + err := ackSettleFailsAtHeight(destBkt, height, indexes) + if err != nil { + return err + } + } + } + + return nil +} + +// ackSettleFailsAtHeight given a destination bucket, acks the provided indexes +// at particular a height by updating the settle fail filter. +func ackSettleFailsAtHeight(destBkt *bolt.Bucket, height uint64, + indexes []uint16) error { + + heightKey := makeLogKey(height) + heightBkt := destBkt.Bucket(heightKey[:]) + if heightBkt == nil { + return nil + } + + // Load ack filter from disk. + settleFailFilterBytes := heightBkt.Get(settleFailFilterKey) + if settleFailFilterBytes == nil { + return ErrCorruptedFwdPkg + } + + settleFailFilter := &PkgFilter{} + settleFailFilterReader := bytes.NewReader(settleFailFilterBytes) + if err := settleFailFilter.Decode(settleFailFilterReader); err != nil { + return err + } + + // Update the ack filter for this height. + for _, index := range indexes { + settleFailFilter.Set(index) + } + + // Write the resulting filter to disk. + var settleFailFilterBuf bytes.Buffer + if err := settleFailFilter.Encode(&settleFailFilterBuf); err != nil { + return err + } + + return heightBkt.Put(settleFailFilterKey, settleFailFilterBuf.Bytes()) +} + +// RemovePkg deletes the forwarding package at the given height from the +// packager's source bucket. +func (p *ChannelPackager) RemovePkg(tx *bolt.Tx, height uint64) error { + fwdPkgBkt := tx.Bucket(fwdPackagesKey) + if fwdPkgBkt == nil { + return nil + } + + sourceBytes := makeLogKey(p.source.ToUint64()) + sourceBkt := fwdPkgBkt.Bucket(sourceBytes[:]) + if sourceBkt == nil { + return ErrCorruptedFwdPkg + } + + heightKey := makeLogKey(height) + + return sourceBkt.DeleteBucket(heightKey[:]) +} + +// uint16Key writes the provided 16-bit unsigned integer to a 2-byte slice. +func uint16Key(i uint16) []byte { + key := make([]byte, 2) + byteOrder.PutUint16(key, i) + return key +} + +// uint16FromKey reconstructs a 16-bit unsigned integer from a 2-byte slice. +func uint16FromKey(key []byte) uint16 { + return byteOrder.Uint16(key) +} + +// Compile-time constraint to ensure that ChannelPackager implements the public +// FwdPackager interface. +var _ FwdPackager = (*ChannelPackager)(nil) + +// Compile-time constraint to ensure that SwitchPackager implements the public +// FwdOperator interface. +var _ FwdOperator = (*SwitchPackager)(nil) From d18c317220d8c100fc8e175c72e97be5195f3d23 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 12 Feb 2018 11:33:37 -0800 Subject: [PATCH 2/9] channeldb/forwarding_package_test: test PkgFilter --- channeldb/forwarding_package_test.go | 815 +++++++++++++++++++++++++++ 1 file changed, 815 insertions(+) create mode 100644 channeldb/forwarding_package_test.go diff --git a/channeldb/forwarding_package_test.go b/channeldb/forwarding_package_test.go new file mode 100644 index 000000000..6fd8fa775 --- /dev/null +++ b/channeldb/forwarding_package_test.go @@ -0,0 +1,815 @@ +package channeldb_test + +import ( + "bytes" + "io/ioutil" + "path/filepath" + "runtime" + "testing" + + "github.com/boltdb/bolt" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcd/wire" +) + +// TestPkgFilterBruteForce tests the behavior of a pkg filter up to size 1000, +// which is greater than the number of HTLCs we permit on a commitment txn. +// This should encapsulate every potential filter used in practice. +func TestPkgFilterBruteForce(t *testing.T) { + t.Parallel() + + checkPkgFilterRange(t, 1000) +} + +// checkPkgFilterRange verifies the behavior of a pkg filter when doing a linear +// insertion of `high` elements. This is primarily to test that IsFull functions +// properly for all relevant sizes of `high`. +func checkPkgFilterRange(t *testing.T, high int) { + for i := uint16(0); i < uint16(high); i++ { + f := channeldb.NewPkgFilter(i) + + if f.Count() != i { + t.Fatalf("pkg filter count=%d is actually %d", + i, f.Count()) + } + checkPkgFilterEncodeDecode(t, i, f) + + for j := uint16(0); j < i; j++ { + if f.Contains(j) { + t.Fatalf("pkg filter count=%d contains %d "+ + "before being added", i, j) + } + + f.Set(j) + checkPkgFilterEncodeDecode(t, i, f) + + if !f.Contains(j) { + t.Fatalf("pkg filter count=%d missing %d "+ + "after being added", i, j) + } + + if j < i-1 && f.IsFull() { + t.Fatalf("pkg filter count=%d already full", i) + } + } + + if !f.IsFull() { + t.Fatalf("pkg filter count=%d not full", i) + } + checkPkgFilterEncodeDecode(t, i, f) + } +} + +// TestPkgFilterRand uses a random permutation to verify the proper behavior of +// the pkg filter if the entries are not inserted in-order. +func TestPkgFilterRand(t *testing.T) { + t.Parallel() + + checkPkgFilterRand(t, 3, 17) +} + +// checkPkgFilterRand checks the behavior of a pkg filter by randomly inserting +// indices and asserting the invariants. The order in which indices are inserted +// is parameterized by a base `b` coprime to `p`, and using modular +// exponentiation to generate all elements in [1,p). +func checkPkgFilterRand(t *testing.T, b, p uint16) { + f := channeldb.NewPkgFilter(p) + var j = b + for i := uint16(1); i < p; i++ { + if f.Contains(j) { + t.Fatalf("pkg filter contains %d-%d "+ + "before being added", i, j) + } + + f.Set(j) + checkPkgFilterEncodeDecode(t, i, f) + + if !f.Contains(j) { + t.Fatalf("pkg filter missing %d-%d "+ + "after being added", i, j) + } + + if i < p-1 && f.IsFull() { + t.Fatalf("pkg filter %d already full", i) + } + checkPkgFilterEncodeDecode(t, i, f) + + j = (b * j) % p + } + + // Set 0 independently, since it will never be emitted by the generator. + f.Set(0) + checkPkgFilterEncodeDecode(t, p, f) + + if !f.IsFull() { + t.Fatalf("pkg filter count=%d not full", p) + } + checkPkgFilterEncodeDecode(t, p, f) +} + +// checkPkgFilterEncodeDecode tests the serialization of a pkg filter by: +// 1) writing it to a buffer +// 2) verifying the number of bytes written matches the filter's Size() +// 3) reconstructing the filter decoding the bytes +// 4) checking that the two filters are the same according to Equal +func checkPkgFilterEncodeDecode(t *testing.T, i uint16, f *channeldb.PkgFilter) { + var b bytes.Buffer + if err := f.Encode(&b); err != nil { + t.Fatalf("unable to serialize pkg filter: %v", err) + } + + // +2 for uint16 length + size := uint16(len(b.Bytes())) + if size != f.Size() { + t.Fatalf("pkg filter count=%d serialized size differs, "+ + "Size(): %d, len(bytes): %v", i, f.Size(), size) + } + + reader := bytes.NewReader(b.Bytes()) + + f2 := &channeldb.PkgFilter{} + if err := f2.Decode(reader); err != nil { + t.Fatalf("unable to deserialize pkg filter: %v", err) + } + + if !f.Equal(f2) { + t.Fatalf("pkg filter count=%v does is not equal "+ + "after deserialization, want: %v, got %v", + i, f, f2) + } +} + +var ( + chanID = lnwire.NewChanIDFromOutPoint(&wire.OutPoint{}) + + adds = []channeldb.LogUpdate{ + { + LogIndex: 0, + UpdateMsg: &lnwire.UpdateAddHTLC{ + ChanID: chanID, + ID: 1, + Amount: 100, + Expiry: 1000, + PaymentHash: [32]byte{0}, + }, + }, + { + LogIndex: 1, + UpdateMsg: &lnwire.UpdateAddHTLC{ + ChanID: chanID, + ID: 1, + Amount: 101, + Expiry: 1001, + PaymentHash: [32]byte{1}, + }, + }, + } + + settleFails = []channeldb.LogUpdate{ + { + LogIndex: 2, + UpdateMsg: &lnwire.UpdateFulfillHTLC{ + ChanID: chanID, + ID: 0, + PaymentPreimage: [32]byte{0}, + }, + }, + { + LogIndex: 3, + UpdateMsg: &lnwire.UpdateFailHTLC{ + ChanID: chanID, + ID: 1, + Reason: []byte{}, + }, + }, + } +) + +// TestPackagerEmptyFwdPkg checks that the state transitions exhibited by a +// forwarding package that contains no adds, fails or settles. We expect that +// the fwdpkg reaches FwdStateCompleted immediately after writing the forwarding +// decision via SetFwdFilter. +func TestPackagerEmptyFwdPkg(t *testing.T) { + t.Parallel() + + db := makeFwdPkgDB(t, "") + + shortChanID := lnwire.NewShortChanIDFromInt(1) + packager := channeldb.NewChannelPackager(shortChanID) + + // To begin, there should be no forwarding packages on disk. + fwdPkgs := loadFwdPkgs(t, db, packager) + if len(fwdPkgs) != 0 { + t.Fatalf("no forwarding packages should exist, found %d", len(fwdPkgs)) + } + + // Next, create and write a new forwarding package with no htlcs. + fwdPkg := channeldb.NewFwdPkg(shortChanID, 0, nil, nil) + + if err := db.Update(func(tx *bolt.Tx) error { + return packager.AddFwdPkg(tx, fwdPkg) + }); err != nil { + t.Fatalf("unable to add fwd pkg: %v", err) + } + + // There should now be one fwdpkg on disk. Since no forwarding decision + // has been written, we expect it to be FwdStateLockedIn. With no HTLCs, + // the ack filter will have no elements, and should always return true. + fwdPkgs = loadFwdPkgs(t, db, packager) + if len(fwdPkgs) != 1 { + t.Fatalf("expected 1 fwdpkg, instead found %d", len(fwdPkgs)) + } + assertFwdPkgState(t, fwdPkgs[0], channeldb.FwdStateLockedIn) + assertFwdPkgNumAddsSettleFails(t, fwdPkgs[0], 0, 0) + assertAckFilterIsFull(t, fwdPkgs[0], true) + + // Now, write the forwarding decision. In this case, its just an empty + // fwd filter. + if err := db.Update(func(tx *bolt.Tx) error { + return packager.SetFwdFilter(tx, fwdPkg.Height, fwdPkg.FwdFilter) + }); err != nil { + t.Fatalf("unable to set fwdfiter: %v", err) + } + + // We should still have one package on disk. Since the forwarding + // decision has been written, it will minimally be in FwdStateProcessed. + // However with no htlcs, it should leap frog to FwdStateCompleted. + fwdPkgs = loadFwdPkgs(t, db, packager) + if len(fwdPkgs) != 1 { + t.Fatalf("expected 1 fwdpkg, instead found %d", len(fwdPkgs)) + } + assertFwdPkgState(t, fwdPkgs[0], channeldb.FwdStateCompleted) + assertFwdPkgNumAddsSettleFails(t, fwdPkgs[0], 0, 0) + assertAckFilterIsFull(t, fwdPkgs[0], true) + + // Lastly, remove the completed forwarding package from disk. + if err := db.Update(func(tx *bolt.Tx) error { + return packager.RemovePkg(tx, fwdPkg.Height) + }); err != nil { + t.Fatalf("unable to remove fwdpkg: %v", err) + } + + // Check that the fwd package was actually removed. + fwdPkgs = loadFwdPkgs(t, db, packager) + if len(fwdPkgs) != 0 { + t.Fatalf("no forwarding packages should exist, found %d", len(fwdPkgs)) + } +} + +// TestPackagerOnlyAdds checks that the fwdpkg does not reach FwdStateCompleted +// as soon as all the adds in the package have been acked using AckAddHtlcs. +func TestPackagerOnlyAdds(t *testing.T) { + t.Parallel() + + db := makeFwdPkgDB(t, "") + + shortChanID := lnwire.NewShortChanIDFromInt(1) + packager := channeldb.NewChannelPackager(shortChanID) + + // To begin, there should be no forwarding packages on disk. + fwdPkgs := loadFwdPkgs(t, db, packager) + if len(fwdPkgs) != 0 { + t.Fatalf("no forwarding packages should exist, found %d", len(fwdPkgs)) + } + + // Next, create and write a new forwarding package that only has add + // htlcs. + fwdPkg := channeldb.NewFwdPkg(shortChanID, 0, adds, nil) + + nAdds := len(adds) + + if err := db.Update(func(tx *bolt.Tx) error { + return packager.AddFwdPkg(tx, fwdPkg) + }); err != nil { + t.Fatalf("unable to add fwd pkg: %v", err) + } + + // There should now be one fwdpkg on disk. Since no forwarding decision + // has been written, we expect it to be FwdStateLockedIn. The package + // has unacked add HTLCs, so the ack filter should not be full. + fwdPkgs = loadFwdPkgs(t, db, packager) + if len(fwdPkgs) != 1 { + t.Fatalf("expected 1 fwdpkg, instead found %d", len(fwdPkgs)) + } + assertFwdPkgState(t, fwdPkgs[0], channeldb.FwdStateLockedIn) + assertFwdPkgNumAddsSettleFails(t, fwdPkgs[0], nAdds, 0) + assertAckFilterIsFull(t, fwdPkgs[0], false) + + // Now, write the forwarding decision. Since we have not explicitly + // added any adds to the fwdfilter, this would indicate that all of the + // adds were 1) settled locally by this link (exit hop), or 2) the htlc + // was failed locally. + if err := db.Update(func(tx *bolt.Tx) error { + return packager.SetFwdFilter(tx, fwdPkg.Height, fwdPkg.FwdFilter) + }); err != nil { + t.Fatalf("unable to set fwdfiter: %v", err) + } + + for i := range adds { + // We should still have one package on disk. Since the forwarding + // decision has been written, it will minimally be in FwdStateProcessed. + // However not allf of the HTLCs have been acked, so should not + // have advanced further. + fwdPkgs = loadFwdPkgs(t, db, packager) + if len(fwdPkgs) != 1 { + t.Fatalf("expected 1 fwdpkg, instead found %d", len(fwdPkgs)) + } + assertFwdPkgState(t, fwdPkgs[0], channeldb.FwdStateProcessed) + assertFwdPkgNumAddsSettleFails(t, fwdPkgs[0], nAdds, 0) + assertAckFilterIsFull(t, fwdPkgs[0], false) + + addRef := channeldb.AddRef{ + Height: fwdPkg.Height, + Index: uint16(i), + } + + if err := db.Update(func(tx *bolt.Tx) error { + return packager.AckAddHtlcs(tx, addRef) + }); err != nil { + t.Fatalf("unable to ack add htlc: %v", err) + } + } + + // We should still have one package on disk. Now that all adds have been + // acked, the ack filter should return true and the package should be + // FwdStateCompleted since there are no other settle/fail packets. + fwdPkgs = loadFwdPkgs(t, db, packager) + if len(fwdPkgs) != 1 { + t.Fatalf("expected 1 fwdpkg, instead found %d", len(fwdPkgs)) + } + assertFwdPkgState(t, fwdPkgs[0], channeldb.FwdStateCompleted) + assertFwdPkgNumAddsSettleFails(t, fwdPkgs[0], nAdds, 0) + assertAckFilterIsFull(t, fwdPkgs[0], true) + + // Lastly, remove the completed forwarding package from disk. + if err := db.Update(func(tx *bolt.Tx) error { + return packager.RemovePkg(tx, fwdPkg.Height) + }); err != nil { + t.Fatalf("unable to remove fwdpkg: %v", err) + } + + // Check that the fwd package was actually removed. + fwdPkgs = loadFwdPkgs(t, db, packager) + if len(fwdPkgs) != 0 { + t.Fatalf("no forwarding packages should exist, found %d", len(fwdPkgs)) + } +} + +// TestPackagerOnlySettleFails asserts that the fwdpkg remains in +// FwdStateProcessed after writing the forwarding decision when there are no +// adds in the fwdpkg. We expect this because an empty FwdFilter will always +// return true, but we are still waiting for the remaining fails and settles to +// be deleted. +func TestPackagerOnlySettleFails(t *testing.T) { + t.Parallel() + + db := makeFwdPkgDB(t, "") + + shortChanID := lnwire.NewShortChanIDFromInt(1) + packager := channeldb.NewChannelPackager(shortChanID) + + // To begin, there should be no forwarding packages on disk. + fwdPkgs := loadFwdPkgs(t, db, packager) + if len(fwdPkgs) != 0 { + t.Fatalf("no forwarding packages should exist, found %d", len(fwdPkgs)) + } + + // Next, create and write a new forwarding package that only has add + // htlcs. + fwdPkg := channeldb.NewFwdPkg(shortChanID, 0, nil, settleFails) + + nSettleFails := len(settleFails) + + if err := db.Update(func(tx *bolt.Tx) error { + return packager.AddFwdPkg(tx, fwdPkg) + }); err != nil { + t.Fatalf("unable to add fwd pkg: %v", err) + } + + // There should now be one fwdpkg on disk. Since no forwarding decision + // has been written, we expect it to be FwdStateLockedIn. The package + // has unacked add HTLCs, so the ack filter should not be full. + fwdPkgs = loadFwdPkgs(t, db, packager) + if len(fwdPkgs) != 1 { + t.Fatalf("expected 1 fwdpkg, instead found %d", len(fwdPkgs)) + } + assertFwdPkgState(t, fwdPkgs[0], channeldb.FwdStateLockedIn) + assertFwdPkgNumAddsSettleFails(t, fwdPkgs[0], 0, nSettleFails) + assertAckFilterIsFull(t, fwdPkgs[0], true) + + // Now, write the forwarding decision. Since we have not explicitly + // added any adds to the fwdfilter, this would indicate that all of the + // adds were 1) settled locally by this link (exit hop), or 2) the htlc + // was failed locally. + if err := db.Update(func(tx *bolt.Tx) error { + return packager.SetFwdFilter(tx, fwdPkg.Height, fwdPkg.FwdFilter) + }); err != nil { + t.Fatalf("unable to set fwdfiter: %v", err) + } + + for i := range settleFails { + // We should still have one package on disk. Since the + // forwarding decision has been written, it will minimally be in + // FwdStateProcessed. However, not all of the HTLCs have been + // acked, so should not have advanced further. + fwdPkgs = loadFwdPkgs(t, db, packager) + if len(fwdPkgs) != 1 { + t.Fatalf("expected 1 fwdpkg, instead found %d", len(fwdPkgs)) + } + assertFwdPkgState(t, fwdPkgs[0], channeldb.FwdStateProcessed) + assertFwdPkgNumAddsSettleFails(t, fwdPkgs[0], 0, nSettleFails) + assertSettleFailFilterIsFull(t, fwdPkgs[0], false) + assertAckFilterIsFull(t, fwdPkgs[0], true) + + failSettleRef := channeldb.SettleFailRef{ + Source: shortChanID, + Height: fwdPkg.Height, + Index: uint16(i), + } + + if err := db.Update(func(tx *bolt.Tx) error { + return packager.AckSettleFails(tx, failSettleRef) + }); err != nil { + t.Fatalf("unable to ack add htlc: %v", err) + } + } + + // We should still have one package on disk. Now that all settles and + // fails have been removed, package should be FwdStateCompleted since + // there are no other add packets. + fwdPkgs = loadFwdPkgs(t, db, packager) + if len(fwdPkgs) != 1 { + t.Fatalf("expected 1 fwdpkg, instead found %d", len(fwdPkgs)) + } + assertFwdPkgState(t, fwdPkgs[0], channeldb.FwdStateCompleted) + assertFwdPkgNumAddsSettleFails(t, fwdPkgs[0], 0, nSettleFails) + assertSettleFailFilterIsFull(t, fwdPkgs[0], true) + assertAckFilterIsFull(t, fwdPkgs[0], true) + + // Lastly, remove the completed forwarding package from disk. + if err := db.Update(func(tx *bolt.Tx) error { + return packager.RemovePkg(tx, fwdPkg.Height) + }); err != nil { + t.Fatalf("unable to remove fwdpkg: %v", err) + } + + // Check that the fwd package was actually removed. + fwdPkgs = loadFwdPkgs(t, db, packager) + if len(fwdPkgs) != 0 { + t.Fatalf("no forwarding packages should exist, found %d", len(fwdPkgs)) + } +} + +// TestPackagerAddsThenSettleFails writes a fwdpkg containing both adds and +// settle/fails, then checks the behavior when the adds are acked before any of +// the settle fails. Here we expect pkg to remain in FwdStateProcessed while the +// remainder of the fail/settles are being deleted. +func TestPackagerAddsThenSettleFails(t *testing.T) { + t.Parallel() + + db := makeFwdPkgDB(t, "") + + shortChanID := lnwire.NewShortChanIDFromInt(1) + packager := channeldb.NewChannelPackager(shortChanID) + + // To begin, there should be no forwarding packages on disk. + fwdPkgs := loadFwdPkgs(t, db, packager) + if len(fwdPkgs) != 0 { + t.Fatalf("no forwarding packages should exist, found %d", len(fwdPkgs)) + } + + // Next, create and write a new forwarding package that only has add + // htlcs. + fwdPkg := channeldb.NewFwdPkg(shortChanID, 0, adds, settleFails) + + nAdds := len(adds) + nSettleFails := len(settleFails) + + if err := db.Update(func(tx *bolt.Tx) error { + return packager.AddFwdPkg(tx, fwdPkg) + }); err != nil { + t.Fatalf("unable to add fwd pkg: %v", err) + } + + // There should now be one fwdpkg on disk. Since no forwarding decision + // has been written, we expect it to be FwdStateLockedIn. The package + // has unacked add HTLCs, so the ack filter should not be full. + fwdPkgs = loadFwdPkgs(t, db, packager) + if len(fwdPkgs) != 1 { + t.Fatalf("expected 1 fwdpkg, instead found %d", len(fwdPkgs)) + } + assertFwdPkgState(t, fwdPkgs[0], channeldb.FwdStateLockedIn) + assertFwdPkgNumAddsSettleFails(t, fwdPkgs[0], nAdds, nSettleFails) + assertAckFilterIsFull(t, fwdPkgs[0], false) + + // Now, write the forwarding decision. Since we have not explicitly + // added any adds to the fwdfilter, this would indicate that all of the + // adds were 1) settled locally by this link (exit hop), or 2) the htlc + // was failed locally. + if err := db.Update(func(tx *bolt.Tx) error { + return packager.SetFwdFilter(tx, fwdPkg.Height, fwdPkg.FwdFilter) + }); err != nil { + t.Fatalf("unable to set fwdfiter: %v", err) + } + + for i := range adds { + // We should still have one package on disk. Since the forwarding + // decision has been written, it will minimally be in FwdStateProcessed. + // However not allf of the HTLCs have been acked, so should not + // have advanced further. + fwdPkgs = loadFwdPkgs(t, db, packager) + if len(fwdPkgs) != 1 { + t.Fatalf("expected 1 fwdpkg, instead found %d", len(fwdPkgs)) + } + assertFwdPkgState(t, fwdPkgs[0], channeldb.FwdStateProcessed) + assertFwdPkgNumAddsSettleFails(t, fwdPkgs[0], nAdds, nSettleFails) + assertSettleFailFilterIsFull(t, fwdPkgs[0], false) + assertAckFilterIsFull(t, fwdPkgs[0], false) + + addRef := channeldb.AddRef{ + Height: fwdPkg.Height, + Index: uint16(i), + } + + if err := db.Update(func(tx *bolt.Tx) error { + return packager.AckAddHtlcs(tx, addRef) + }); err != nil { + t.Fatalf("unable to ack add htlc: %v", err) + } + } + + for i := range settleFails { + // We should still have one package on disk. Since the + // forwarding decision has been written, it will minimally be in + // FwdStateProcessed. However not allf of the HTLCs have been + // acked, so should not have advanced further. + fwdPkgs = loadFwdPkgs(t, db, packager) + if len(fwdPkgs) != 1 { + t.Fatalf("expected 1 fwdpkg, instead found %d", len(fwdPkgs)) + } + assertFwdPkgState(t, fwdPkgs[0], channeldb.FwdStateProcessed) + assertFwdPkgNumAddsSettleFails(t, fwdPkgs[0], nAdds, nSettleFails) + assertSettleFailFilterIsFull(t, fwdPkgs[0], false) + assertAckFilterIsFull(t, fwdPkgs[0], true) + + failSettleRef := channeldb.SettleFailRef{ + Source: shortChanID, + Height: fwdPkg.Height, + Index: uint16(i), + } + + if err := db.Update(func(tx *bolt.Tx) error { + return packager.AckSettleFails(tx, failSettleRef) + }); err != nil { + t.Fatalf("unable to remove settle/fail htlc: %v", err) + } + } + + // We should still have one package on disk. Now that all settles and + // fails have been removed, package should be FwdStateCompleted since + // there are no other add packets. + fwdPkgs = loadFwdPkgs(t, db, packager) + if len(fwdPkgs) != 1 { + t.Fatalf("expected 1 fwdpkg, instead found %d", len(fwdPkgs)) + } + assertFwdPkgState(t, fwdPkgs[0], channeldb.FwdStateCompleted) + assertFwdPkgNumAddsSettleFails(t, fwdPkgs[0], nAdds, nSettleFails) + assertSettleFailFilterIsFull(t, fwdPkgs[0], true) + assertAckFilterIsFull(t, fwdPkgs[0], true) + + // Lastly, remove the completed forwarding package from disk. + if err := db.Update(func(tx *bolt.Tx) error { + return packager.RemovePkg(tx, fwdPkg.Height) + }); err != nil { + t.Fatalf("unable to remove fwdpkg: %v", err) + } + + // Check that the fwd package was actually removed. + fwdPkgs = loadFwdPkgs(t, db, packager) + if len(fwdPkgs) != 0 { + t.Fatalf("no forwarding packages should exist, found %d", len(fwdPkgs)) + } +} + +// TestPackagerSettleFailsThenAdds writes a fwdpkg with both adds and +// settle/fails, then checks the behavior when the settle/fails are removed +// before any of the adds have been acked. This should cause the fwdpkg to +// remain in FwdStateProcessed until the final ack is recorded, at which point +// it should be promoted directly to FwdStateCompleted.since all adds have been +// removed. +func TestPackagerSettleFailsThenAdds(t *testing.T) { + t.Parallel() + + db := makeFwdPkgDB(t, "") + + shortChanID := lnwire.NewShortChanIDFromInt(1) + packager := channeldb.NewChannelPackager(shortChanID) + + // To begin, there should be no forwarding packages on disk. + fwdPkgs := loadFwdPkgs(t, db, packager) + if len(fwdPkgs) != 0 { + t.Fatalf("no forwarding packages should exist, found %d", len(fwdPkgs)) + } + + // Next, create and write a new forwarding package that has both add + // and settle/fail htlcs. + fwdPkg := channeldb.NewFwdPkg(shortChanID, 0, adds, settleFails) + + nAdds := len(adds) + nSettleFails := len(settleFails) + + if err := db.Update(func(tx *bolt.Tx) error { + return packager.AddFwdPkg(tx, fwdPkg) + }); err != nil { + t.Fatalf("unable to add fwd pkg: %v", err) + } + + // There should now be one fwdpkg on disk. Since no forwarding decision + // has been written, we expect it to be FwdStateLockedIn. The package + // has unacked add HTLCs, so the ack filter should not be full. + fwdPkgs = loadFwdPkgs(t, db, packager) + if len(fwdPkgs) != 1 { + t.Fatalf("expected 1 fwdpkg, instead found %d", len(fwdPkgs)) + } + assertFwdPkgState(t, fwdPkgs[0], channeldb.FwdStateLockedIn) + assertFwdPkgNumAddsSettleFails(t, fwdPkgs[0], nAdds, nSettleFails) + assertAckFilterIsFull(t, fwdPkgs[0], false) + + // Now, write the forwarding decision. Since we have not explicitly + // added any adds to the fwdfilter, this would indicate that all of the + // adds were 1) settled locally by this link (exit hop), or 2) the htlc + // was failed locally. + if err := db.Update(func(tx *bolt.Tx) error { + return packager.SetFwdFilter(tx, fwdPkg.Height, fwdPkg.FwdFilter) + }); err != nil { + t.Fatalf("unable to set fwdfiter: %v", err) + } + + // Simulate another channel deleting the settle/fails it received from + // the original fwd pkg. + // TODO(conner): use different packager/s? + for i := range settleFails { + // We should still have one package on disk. Since the + // forwarding decision has been written, it will minimally be in + // FwdStateProcessed. However none all of the add HTLCs have + // been acked, so should not have advanced further. + fwdPkgs = loadFwdPkgs(t, db, packager) + if len(fwdPkgs) != 1 { + t.Fatalf("expected 1 fwdpkg, instead found %d", len(fwdPkgs)) + } + assertFwdPkgState(t, fwdPkgs[0], channeldb.FwdStateProcessed) + assertFwdPkgNumAddsSettleFails(t, fwdPkgs[0], nAdds, nSettleFails) + assertSettleFailFilterIsFull(t, fwdPkgs[0], false) + assertAckFilterIsFull(t, fwdPkgs[0], false) + + failSettleRef := channeldb.SettleFailRef{ + Source: shortChanID, + Height: fwdPkg.Height, + Index: uint16(i), + } + + if err := db.Update(func(tx *bolt.Tx) error { + return packager.AckSettleFails(tx, failSettleRef) + }); err != nil { + t.Fatalf("unable to remove settle/fail htlc: %v", err) + } + } + + // Now simulate this channel receiving a fail/settle for the adds in the + // fwdpkg. + for i := range adds { + // Again, we should still have one package on disk and be in + // FwdStateProcessed. This should not change until all of the + // add htlcs have been acked. + fwdPkgs = loadFwdPkgs(t, db, packager) + if len(fwdPkgs) != 1 { + t.Fatalf("expected 1 fwdpkg, instead found %d", len(fwdPkgs)) + } + assertFwdPkgState(t, fwdPkgs[0], channeldb.FwdStateProcessed) + assertFwdPkgNumAddsSettleFails(t, fwdPkgs[0], nAdds, nSettleFails) + assertSettleFailFilterIsFull(t, fwdPkgs[0], true) + assertAckFilterIsFull(t, fwdPkgs[0], false) + + addRef := channeldb.AddRef{ + Height: fwdPkg.Height, + Index: uint16(i), + } + + if err := db.Update(func(tx *bolt.Tx) error { + return packager.AckAddHtlcs(tx, addRef) + }); err != nil { + t.Fatalf("unable to ack add htlc: %v", err) + } + } + + // We should still have one package on disk. Now that all settles and + // fails have been removed, package should be FwdStateCompleted since + // there are no other add packets. + fwdPkgs = loadFwdPkgs(t, db, packager) + if len(fwdPkgs) != 1 { + t.Fatalf("expected 1 fwdpkg, instead found %d", len(fwdPkgs)) + } + assertFwdPkgState(t, fwdPkgs[0], channeldb.FwdStateCompleted) + assertFwdPkgNumAddsSettleFails(t, fwdPkgs[0], nAdds, nSettleFails) + assertSettleFailFilterIsFull(t, fwdPkgs[0], true) + assertAckFilterIsFull(t, fwdPkgs[0], true) + + // Lastly, remove the completed forwarding package from disk. + if err := db.Update(func(tx *bolt.Tx) error { + return packager.RemovePkg(tx, fwdPkg.Height) + }); err != nil { + t.Fatalf("unable to remove fwdpkg: %v", err) + } + + // Check that the fwd package was actually removed. + fwdPkgs = loadFwdPkgs(t, db, packager) + if len(fwdPkgs) != 0 { + t.Fatalf("no forwarding packages should exist, found %d", len(fwdPkgs)) + } +} + +// assertFwdPkgState checks the current state of a fwdpkg meets our +// expectations. +func assertFwdPkgState(t *testing.T, fwdPkg *channeldb.FwdPkg, + state channeldb.FwdState) { + _, _, line, _ := runtime.Caller(1) + if fwdPkg.State != state { + t.Fatalf("line %d: expected fwdpkg in state %v, found %v", + line, state, fwdPkg.State) + } +} + +// assertFwdPkgNumAddsSettleFails checks that the number of adds and +// settle/fail log updates are correct. +func assertFwdPkgNumAddsSettleFails(t *testing.T, fwdPkg *channeldb.FwdPkg, + expectedNumAdds, expectedNumSettleFails int) { + _, _, line, _ := runtime.Caller(1) + if len(fwdPkg.Adds) != expectedNumAdds { + t.Fatalf("line %d: expected fwdpkg to have %d adds, found %d", + line, expectedNumAdds, len(fwdPkg.Adds)) + } + + if len(fwdPkg.SettleFails) != expectedNumSettleFails { + t.Fatalf("line %d: expected fwdpkg to have %d settle/fails, found %d", + line, expectedNumSettleFails, len(fwdPkg.SettleFails)) + } +} + +// assertAckFilterIsFull checks whether or not a fwdpkg's ack filter matches our +// expected full-ness. +func assertAckFilterIsFull(t *testing.T, fwdPkg *channeldb.FwdPkg, expected bool) { + _, _, line, _ := runtime.Caller(1) + if fwdPkg.AckFilter.IsFull() != expected { + t.Fatalf("line %d: expected fwdpkg ack filter IsFull to be %v, "+ + "found %v", line, expected, fwdPkg.AckFilter.IsFull()) + } +} + +// assertSettleFailFilterIsFull checks whether or not a fwdpkg's settle fail +// filter matches our expected full-ness. +func assertSettleFailFilterIsFull(t *testing.T, fwdPkg *channeldb.FwdPkg, expected bool) { + _, _, line, _ := runtime.Caller(1) + if fwdPkg.SettleFailFilter.IsFull() != expected { + t.Fatalf("line %d: expected fwdpkg settle/fail filter IsFull to be %v, "+ + "found %v", line, expected, fwdPkg.SettleFailFilter.IsFull()) + } +} + +// loadFwdPkgs is a helper method that reads all forwarding packages for a +// particular packager. +func loadFwdPkgs(t *testing.T, db *bolt.DB, + packager channeldb.FwdPackager) []*channeldb.FwdPkg { + + var fwdPkgs []*channeldb.FwdPkg + if err := db.View(func(tx *bolt.Tx) error { + var err error + fwdPkgs, err = packager.LoadFwdPkgs(tx) + return err + }); err != nil { + t.Fatalf("unable to load fwd pkgs: %v", err) + } + + return fwdPkgs +} + +// makeFwdPkgDB initializes a test database for forwarding packages. If the +// provided path is an empty, it will create a temp dir/file to use. +func makeFwdPkgDB(t *testing.T, path string) *bolt.DB { + if path == "" { + var err error + path, err = ioutil.TempDir("", "fwdpkgdb") + if err != nil { + t.Fatalf("unable to create temp path: %v", err) + } + + path = filepath.Join(path, "fwdpkg.db") + } + + db, err := bolt.Open(path, 0600, nil) + if err != nil { + t.Fatalf("unable to open boltdb: %v", err) + } + + return db +} From 2df9fb55101255432141153b21a8b5800975f3e5 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Sun, 26 Nov 2017 23:21:07 -0800 Subject: [PATCH 3/9] channeldb/channel: adds fwding package to channeldb --- channeldb/channel.go | 251 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 248 insertions(+), 3 deletions(-) diff --git a/channeldb/channel.go b/channeldb/channel.go index 0153f3d73..5a5c7f767 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -66,6 +66,12 @@ var ( // channel closure. This key should be accessed from within the // sub-bucket of a target channel, identified by its channel point. revocationLogBucket = []byte("revocation-log-key") + + // fwdPackageLogBucket is a bucket that stores the locked-in htlcs after + // having received a revocation from the remote party. The keys in this + // bucket represent the remote height at which these htlcs were + // accepted. + fwdPackageLogBucket = []byte("fwd-package-log-key") ) var ( @@ -86,6 +92,11 @@ var ( // each time we write a new state in order to be properly fault // tolerant. ErrNoPendingCommit = fmt.Errorf("no pending commits found") + + // ErrInvalidCircuitKeyLen signals that a circuit key could not be + // decoded because the byte slice is of an invalid length. + ErrInvalidCircuitKeyLen = fmt.Errorf( + "length of serialized circuit key must be 16 bytes") ) // ChannelType is an enum-like type that describes one of several possible @@ -387,6 +398,11 @@ type OpenChannel struct { // implementation of secret store is shachain store. RevocationStore shachain.Store + // Packager is used to create and update forwarding packages for this + // channel, which encodes all necessary information to recover from + // failures and reforward HTLCs that were not fully processed. + Packager FwdPackager + // TODO(roasbeef): eww Db *DB @@ -615,6 +631,8 @@ func fetchOpenChannel(chanBucket *bolt.Bucket, return nil, fmt.Errorf("unable to fetch chan revocations: %v", err) } + channel.Packager = NewChannelPackager(channel.ShortChanID) + return channel, nil } @@ -837,6 +855,84 @@ type LogUpdate struct { UpdateMsg lnwire.Message } +// Encode writes a log update to the provided io.Writer. +func (l *LogUpdate) Encode(w io.Writer) error { + return writeElements(w, l.LogIndex, l.UpdateMsg) +} + +// Decode reads a log update from the provided io.Reader. +func (l *LogUpdate) Decode(r io.Reader) error { + return readElements(r, &l.LogIndex, &l.UpdateMsg) +} + +// CircuitKey is used by a channel to uniquely identify the HTLCs it receives +// from the switch, and is used to purge our in-memory state of HTLCs that have +// already been processed by a link. Two list of CircuitKeys are included in +// each CommitDiff to allow a link to determine which in-memory htlcs directed +// the opening and closing of circuits in the switch's circuit map. +type CircuitKey struct { + // ChanID is the short chanid indicating the HTLC's origin. + // + // NOTE: It is fine for this value to be blank, as this indicates a + // locally-sourced payment. + ChanID lnwire.ShortChannelID + + // HtlcID is the unique htlc index predominately assigned by links, + // though can also be assigned by switch in the case of locally-sourced + // payments. + HtlcID uint64 +} + +// SetBytes deserializes the given bytes into this CircuitKey. +func (k *CircuitKey) SetBytes(bs []byte) error { + if len(bs) != 16 { + return ErrInvalidCircuitKeyLen + } + + k.ChanID = lnwire.NewShortChanIDFromInt( + binary.BigEndian.Uint64(bs[:8])) + k.HtlcID = binary.BigEndian.Uint64(bs[8:]) + + return nil +} + +// Bytes returns the serialized bytes for this circuit key. +func (k CircuitKey) Bytes() []byte { + var bs = make([]byte, 16) + binary.BigEndian.PutUint64(bs[:8], k.ChanID.ToUint64()) + binary.BigEndian.PutUint64(bs[8:], k.HtlcID) + return bs +} + +// Encode writes a CircuitKey to the provided io.Writer. +func (k *CircuitKey) Encode(w io.Writer) error { + var scratch [16]byte + binary.BigEndian.PutUint64(scratch[:8], k.ChanID.ToUint64()) + binary.BigEndian.PutUint64(scratch[8:], k.HtlcID) + + _, err := w.Write(scratch[:]) + return err +} + +// Decode reads a CircuitKey from the provided io.Reader. +func (k *CircuitKey) Decode(r io.Reader) error { + var scratch [16]byte + + if _, err := io.ReadFull(r, scratch[:]); err != nil { + return err + } + k.ChanID = lnwire.NewShortChanIDFromInt( + binary.BigEndian.Uint64(scratch[:8])) + k.HtlcID = binary.BigEndian.Uint64(scratch[8:]) + + return nil +} + +// String returns a string representation of the CircuitKey. +func (k CircuitKey) String() string { + return fmt.Sprintf("(Chan ID=%s, HTLC ID=%d)", k.ChanID, k.HtlcID) +} + // CommitDiff represents the delta needed to apply the state transition between // two subsequent commitment states. Given state N and state N+1, one is able // to apply the set of messages contained within the CommitDiff to N to arrive @@ -860,6 +956,36 @@ type CommitDiff struct { // within this message should properly cover the new commitment state // and also the HTLC's within the new commitment state. CommitSig *lnwire.CommitSig + + // OpenedCircuitKeys is a set of unique identifiers for any downstream + // Add packets included in this commitment txn. After a restart, this + // set of htlcs is acked from the link's incoming mailbox to ensure + // there isn't an attempt to re-add them to this commitment txn. + OpenedCircuitKeys []CircuitKey + + // ClosedCircuitKeys records the unique identifiers for any settle/fail + // packets that were resolved by this commitment txn. After a restart, + // this is used to ensure those circuits are removed from the circuit + // map, and the downstream packets in the link's mailbox are removed. + ClosedCircuitKeys []CircuitKey + + // AddAcks specifies the locations (commit height, pkg index) of any + // Adds that were failed/settled in this commit diff. This will ack + // entries in *this* channel's forwarding packages. + // + // NOTE: This value is not serialized, it is used to atomically mark the + // resolution of adds, such that they will not be reprocessed after a + // restart. + AddAcks []AddRef + + // SettleFailAcks specifies the locations (chan id, commit height, pkg + // index) of any Settles or Fails that were locked into this commit + // diff, and originate from *another* channel, i.e. the outgoing link. + // + // NOTE: This value is not serialized, it is used to atomically acks + // settles and fails from the forwarding packages of other channels, + // such that they will not be reforwarded internally after a restart. + SettleFailAcks []SettleFailRef } func serializeCommitDiff(w io.Writer, diff *CommitDiff) error { @@ -883,8 +1009,33 @@ func serializeCommitDiff(w io.Writer, diff *CommitDiff) error { } } + numOpenRefs := uint16(len(diff.OpenedCircuitKeys)) + if err := binary.Write(w, byteOrder, numOpenRefs); err != nil { + return err + } + + for _, openRef := range diff.OpenedCircuitKeys { + err := writeElements(w, openRef.ChanID, openRef.HtlcID) + if err != nil { + return err + } + } + + numClosedRefs := uint16(len(diff.ClosedCircuitKeys)) + if err := binary.Write(w, byteOrder, numClosedRefs); err != nil { + return err + } + + for _, closedRef := range diff.ClosedCircuitKeys { + err := writeElements(w, closedRef.ChanID, closedRef.HtlcID) + if err != nil { + return err + } + } + return nil } + func deserializeCommitDiff(r io.Reader) (*CommitDiff, error) { var ( d CommitDiff @@ -916,6 +1067,36 @@ func deserializeCommitDiff(r io.Reader) (*CommitDiff, error) { } } + var numOpenRefs uint16 + if err := binary.Read(r, byteOrder, &numOpenRefs); err != nil { + return nil, err + } + + d.OpenedCircuitKeys = make([]CircuitKey, numOpenRefs) + for i := 0; i < int(numOpenRefs); i++ { + err := readElements(r, + &d.OpenedCircuitKeys[i].ChanID, + &d.OpenedCircuitKeys[i].HtlcID) + if err != nil { + return nil, err + } + } + + var numClosedRefs uint16 + if err := binary.Read(r, byteOrder, &numClosedRefs); err != nil { + return nil, err + } + + d.ClosedCircuitKeys = make([]CircuitKey, numClosedRefs) + for i := 0; i < int(numClosedRefs); i++ { + err := readElements(r, + &d.ClosedCircuitKeys[i].ChanID, + &d.ClosedCircuitKeys[i].HtlcID) + if err != nil { + return nil, err + } + } + return &d, nil } @@ -938,6 +1119,26 @@ func (c *OpenChannel) AppendRemoteCommitChain(diff *CommitDiff) error { return err } + // Any outgoing settles and fails necessarily have a + // corresponding adds in this channel's forwarding packages. + // Mark all of these as being fully processed in our forwarding + // package, which prevents us from reprocessing them after + // startup. + err = c.Packager.AckAddHtlcs(tx, diff.AddAcks...) + if err != nil { + return err + } + + // Additionally, we ack from any fails or settles that are + // persisted in another channel's forwarding package. This + // prevents the same fails and settles from being retransmitted + // after restarts. The actual fail or settle we need to + // propagate to the remote party is now in the commit diff. + err = c.Packager.AckSettleFails(tx, diff.SettleFailAcks...) + if err != nil { + return err + } + // TODO(roasbeef): use seqno to derive key for later LCP // With the bucket retrieved, we'll now serialize the commit @@ -1021,15 +1222,15 @@ func (c *OpenChannel) InsertNextRevocation(revKey *btcec.PublicKey) error { // this log can be consulted in order to reconstruct the state needed to // rectify the situation. This method will add the current commitment for the // remote party to the revocation log, and promote the current pending -// commitment to the current remove commitment. -func (c *OpenChannel) AdvanceCommitChainTail() error { +// commitment to the current remote commitment. +func (c *OpenChannel) AdvanceCommitChainTail(fwdPkg *FwdPkg) error { c.Lock() defer c.Unlock() var newRemoteCommit *ChannelCommitment err := c.Db.Update(func(tx *bolt.Tx) error { - chanBucket, err := readChanBucket(tx, c.IdentityPub, + chanBucket, err := updateChanBucket(tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash) if err != nil { return err @@ -1081,7 +1282,15 @@ func (c *OpenChannel) AdvanceCommitChainTail() error { return err } + // Lastly, we write the forwarding package to disk so that we + // can properly recover from failures and reforward HTLCs that + // have not received a corresponding settle/fail. + if err := c.Packager.AddFwdPkg(tx, fwdPkg); err != nil { + return err + } + newRemoteCommit = &newCommit.Commitment + return nil }) if err != nil { @@ -1096,6 +1305,40 @@ func (c *OpenChannel) AdvanceCommitChainTail() error { return nil } +// LoadFwdPkgs scans the forwarding log for any packages that haven't been +// processed, and returns their deserialized log updates in map indexed by the +// remote commitment height at which the updates were locked in. +func (c *OpenChannel) LoadFwdPkgs() ([]*FwdPkg, error) { + var fwdPkgs []*FwdPkg + if err := c.Db.View(func(tx *bolt.Tx) error { + var err error + fwdPkgs, err = c.Packager.LoadFwdPkgs(tx) + return err + }); err != nil { + return nil, err + } + + return fwdPkgs, nil +} + +// SetFwdFilter atomically sets the forwarding filter for the forwarding package +// identified by `height`. +func (c *OpenChannel) SetFwdFilter(height uint64, fwdFilter *PkgFilter) error { + return c.Db.Update(func(tx *bolt.Tx) error { + return c.Packager.SetFwdFilter(tx, height, fwdFilter) + }) +} + +// RemoveFwdPkg atomically removes a forwarding package specified by the remote +// commitment height. +// +// NOTE: This method should only be called on packages marked FwdStateCompleted. +func (c *OpenChannel) RemoveFwdPkg(height uint64) error { + return c.Db.Update(func(tx *bolt.Tx) error { + return c.Packager.RemovePkg(tx, height) + }) +} + // RevocationLogTail returns the "tail", or the end of the current revocation // log. This entry represents the last previous state for the remote node's // commitment chain. The ChannelDelta returned by this method will always lag @@ -1671,6 +1914,8 @@ func fetchChanInfo(chanBucket *bolt.Bucket, channel *OpenChannel) error { return err } + channel.Packager = NewChannelPackager(channel.ShortChanID) + return nil } From e4d2958f68c8c48f7061efadfbace27bcdb561d9 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 22 Feb 2018 15:02:02 -0800 Subject: [PATCH 4/9] channeldb/channel_test: init with Pacakager and construct FwdPkgs --- channeldb/channel_test.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/channeldb/channel_test.go b/channeldb/channel_test.go index 072d24a04..ba93410fa 100644 --- a/channeldb/channel_test.go +++ b/channeldb/channel_test.go @@ -219,6 +219,7 @@ func createTestChannelState(cdb *DB) (*OpenChannel, error) { RevocationProducer: producer, RevocationStore: store, Db: cdb, + Packager: NewChannelPackager(chanID), }, nil } @@ -475,6 +476,8 @@ func TestChannelStateTransition(t *testing.T) { }, }, }, + OpenedCircuitKeys: []CircuitKey{}, + ClosedCircuitKeys: []CircuitKey{}, } copy(commitDiff.LogUpdates[0].UpdateMsg.(*lnwire.UpdateAddHTLC).PaymentHash[:], bytes.Repeat([]byte{1}, 32)) @@ -509,7 +512,12 @@ func TestChannelStateTransition(t *testing.T) { t.Fatalf("unable to generate key: %v", err) } channel.RemoteNextRevocation = newPriv.PubKey() - if err := channel.AdvanceCommitChainTail(); err != nil { + + fwdPkg := NewFwdPkg(channel.ShortChanID, oldRemoteCommit.CommitHeight, + diskCommitDiff.LogUpdates, nil) + + err = channel.AdvanceCommitChainTail(fwdPkg) + if err != nil { t.Fatalf("unable to append to revocation log: %v", err) } @@ -553,7 +561,11 @@ func TestChannelStateTransition(t *testing.T) { if err := channel.AppendRemoteCommitChain(commitDiff); err != nil { t.Fatalf("unable to add to commit chain: %v", err) } - if err := channel.AdvanceCommitChainTail(); err != nil { + + fwdPkg = NewFwdPkg(channel.ShortChanID, oldRemoteCommit.CommitHeight, nil, nil) + + err = channel.AdvanceCommitChainTail(fwdPkg) + if err != nil { t.Fatalf("unable to append to revocation log: %v", err) } From 7a93c7530cfb21c0d04a47564751a16a9fef5f57 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 5 Feb 2018 19:46:19 -0800 Subject: [PATCH 5/9] channeldb/invoices: add idempotency to SettleInvoice --- channeldb/invoices.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/channeldb/invoices.go b/channeldb/invoices.go index 3614532ba..38f27aa27 100644 --- a/channeldb/invoices.go +++ b/channeldb/invoices.go @@ -432,6 +432,12 @@ func settleInvoice(invoices *bolt.Bucket, invoiceNum []byte) error { return err } + // Add idempotency to duplicate settles, return here to avoid + // overwriting the previous info. + if invoice.Terms.Settled { + return nil + } + invoice.Terms.Settled = true invoice.SettleDate = time.Now() From 53e4422a2ea127928079a832625f9d5d8f3e536d Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 27 Feb 2018 20:01:41 -0800 Subject: [PATCH 6/9] lnwallet/channel: integrate fwdpkgs w/ in-mem buffering --- lnwallet/channel.go | 421 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 362 insertions(+), 59 deletions(-) diff --git a/lnwallet/channel.go b/lnwallet/channel.go index 3cb97f817..295a8b5d9 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -209,6 +209,37 @@ type PaymentDescriptor struct { // Settle. ParentIndex uint64 + // SourceRef points to an Add update in a forwarding package owned by + // this channel. + // + // NOTE: This field will only be populated if EntryType is Fail or + // Settle. + SourceRef *channeldb.AddRef + + // DestRef points to a Fail/Settle update in another link's forwarding + // package. + // + // NOTE: This field will only be populated if EntryType is Fail or + // Settle, and the forwarded Add successfully included in an outgoing + // link's commitment txn. + DestRef *channeldb.SettleFailRef + + // OpenCircuitRef references the incoming Chan/HTLC ID of an Add HTLC + // packet delivered by the switch. + // + // NOTE: This field is only populated for payment descriptors in the + // *local* update log, and if the Add packet was delivered by the + // switch. + OpenCircuitRef *channeldb.CircuitKey + + // ClosedCircuitRef references the incoming Chan/HTLC ID of the Add HTLC + // that opened the circuit. + // + // NOTE: This field is only populated for payment descriptors in the + // *local* update log, and if settle/fails have a committed circuit in + // the circuit map. + ClosedCircuitRef *channeldb.CircuitKey + // localOutputIndex is the output index of this HTLc output in the // commitment transaction of the local node. // @@ -291,6 +322,96 @@ type PaymentDescriptor struct { isForwarded bool } +// PayDescsFromRemoteLogUpdates converts a slice of LogUpdates received from the +// remote peer into PaymentDescriptors to inform a link's forwarding decisions. +// +// NOTE: The provided `logUpdates` MUST corresponding exactly to either the Adds +// or SettleFails in this channel's forwarding package at `height`. +func (lc *LightningChannel) PayDescsFromRemoteLogUpdates(height uint64, + logUpdates []channeldb.LogUpdate) []*PaymentDescriptor { + + lc.RLock() + defer lc.RUnlock() + + // Allocate enough space to hold all of the payment descriptors we will + // reconstruct, and also the list of pointers that will be returned to + // the caller. + payDescs := make([]PaymentDescriptor, 0, len(logUpdates)) + payDescPtrs := make([]*PaymentDescriptor, 0, len(logUpdates)) + + // Iterate over the log updates we loaded from disk, and reconstruct the + // payment descriptor corresponding to one of the four types of htlcs we + // can receive from the remote peer. We only repopulate the information + // necessary to process the packets and, if necessary, forward them to + // the switch. + // + // For each log update, we include either an AddRef or a SettleFailRef + // so that they can be ACK'd and garbage collected. + for i, logUpdate := range logUpdates { + var pd PaymentDescriptor + switch wireMsg := logUpdate.UpdateMsg.(type) { + + case *lnwire.UpdateAddHTLC: + pd = PaymentDescriptor{ + RHash: wireMsg.PaymentHash, + Timeout: wireMsg.Expiry, + Amount: wireMsg.Amount, + EntryType: Add, + HtlcIndex: wireMsg.ID, + LogIndex: logUpdate.LogIndex, + SourceRef: &channeldb.AddRef{ + Height: height, + Index: uint16(i), + }, + } + pd.OnionBlob = make([]byte, len(wireMsg.OnionBlob)) + copy(pd.OnionBlob[:], wireMsg.OnionBlob[:]) + + case *lnwire.UpdateFulfillHTLC: + pd = PaymentDescriptor{ + RPreimage: wireMsg.PaymentPreimage, + ParentIndex: wireMsg.ID, + EntryType: Settle, + DestRef: &channeldb.SettleFailRef{ + Source: lc.ShortChanID(), + Height: height, + Index: uint16(i), + }, + } + + case *lnwire.UpdateFailHTLC: + pd = PaymentDescriptor{ + ParentIndex: wireMsg.ID, + EntryType: Fail, + FailReason: wireMsg.Reason[:], + DestRef: &channeldb.SettleFailRef{ + Source: lc.ShortChanID(), + Height: height, + Index: uint16(i), + }, + } + + case *lnwire.UpdateFailMalformedHTLC: + pd = PaymentDescriptor{ + ParentIndex: wireMsg.ID, + EntryType: MalformedFail, + FailCode: wireMsg.FailureCode, + ShaOnionBlob: wireMsg.ShaOnionBlob, + DestRef: &channeldb.SettleFailRef{ + Source: lc.ShortChanID(), + Height: height, + Index: uint16(i), + }, + } + } + + payDescs = append(payDescs, pd) + payDescPtrs = append(payDescPtrs, &payDescs[i]) + } + + return payDescPtrs +} + // commitment represents a commitment to a new state within an active channel. // New commitments can be initiated by either side. Commitments are ordered // into a commitment chain, with one existing for both parties. Each side can @@ -2572,6 +2693,13 @@ func (lc *LightningChannel) createCommitDiff( }) } + var ( + ackAddRefs []channeldb.AddRef + settleFailRefs []channeldb.SettleFailRef + openCircuitKeys []channeldb.CircuitKey + closedCircuitKeys []channeldb.CircuitKey + ) + // We'll now run through our local update log to locate the items which // were only just committed within this pending state. This will be the // set of items we need to retransmit if we reconnect and find that @@ -2611,6 +2739,20 @@ func (lc *LightningChannel) createCommitDiff( copy(htlc.OnionBlob[:], pd.OnionBlob) logUpdate.UpdateMsg = htlc + // Gather any references for circuits opened by this Add + // HTLC. + if pd.OpenCircuitRef != nil { + openCircuitKeys = append(openCircuitKeys, + *pd.OpenCircuitRef) + } + + logUpdates = append(logUpdates, logUpdate) + + // Short circuit here since an add should not have any + // of the references gathered in the case of settles, + // fails or malformed fails. + continue + case Settle: logUpdate.UpdateMsg = &lnwire.UpdateFulfillHTLC{ ChanID: chanID, @@ -2634,6 +2776,19 @@ func (lc *LightningChannel) createCommitDiff( } } + // Gather the fwd pkg references from any settle or fail + // packets, if they exist. + if pd.SourceRef != nil { + ackAddRefs = append(ackAddRefs, *pd.SourceRef) + } + if pd.DestRef != nil { + settleFailRefs = append(settleFailRefs, *pd.DestRef) + } + if pd.ClosedCircuitRef != nil { + closedCircuitKeys = append(closedCircuitKeys, + *pd.ClosedCircuitRef) + } + logUpdates = append(logUpdates, logUpdate) } @@ -2651,7 +2806,11 @@ func (lc *LightningChannel) createCommitDiff( CommitSig: commitSig, HtlcSigs: htlcSigs, }, - LogUpdates: logUpdates, + LogUpdates: logUpdates, + OpenedCircuitKeys: openCircuitKeys, + ClosedCircuitKeys: closedCircuitKeys, + AddAcks: ackAddRefs, + SettleFailAcks: settleFailRefs, }, nil } @@ -2832,7 +2991,8 @@ func (lc *LightningChannel) SignNextCommitment() (lnwire.Sig, []lnwire.Sig, erro // have not received // * RevokeAndAck: if we sent a revocation message that they claim to have // not received -func (lc *LightningChannel) ProcessChanSyncMsg(msg *lnwire.ChannelReestablish) ([]lnwire.Message, error) { +func (lc *LightningChannel) ProcessChanSyncMsg( + msg *lnwire.ChannelReestablish) ([]lnwire.Message, error) { // We owe them a commitment if they have an un-acked commitment and the // tip of their chain (from our Pov) is equal to what they think their @@ -2850,7 +3010,13 @@ func (lc *LightningChannel) ProcessChanSyncMsg(msg *lnwire.ChannelReestablish) ( // chain sync message. If we're de-synchronized, then we'll send a // batch of messages which when applied will kick start the chain // resync. - var updates []lnwire.Message + var ( + updates []lnwire.Message + // TODO(conner): uncomment after API exposes these return + // variables, this permits compilation in the meantime + //openedCircuits []channeldb.CircuitKey + //closedCircuits []channeldb.CircuitKey + ) // If the remote party included the optional fields, then we'll verify // their correctness first, as it will influence our decisions below. @@ -2974,9 +3140,12 @@ func (lc *LightningChannel) ProcessChanSyncMsg(msg *lnwire.ChannelReestablish) ( // commitment chain with our local version of their chain. updates = append(updates, commitDiff.CommitSig) - } else if !oweCommitment && remoteChainTip.height+1 != - msg.NextLocalCommitHeight { + // TODO(conner): uncomment after API exposes these return + // variables, this permits compilation in the meantime + //openedCircuits = commitDiff.OpenedCircuitKeys + //closedCircuits = commitDiff.ClosedCircuitKeys + } else if remoteChainTip.height+1 != msg.NextLocalCommitHeight { if err := lc.channelState.MarkBorked(); err != nil { return nil, err } @@ -3692,7 +3861,9 @@ func (lc *LightningChannel) RevokeCurrentCommitment() (*lnwire.RevokeAndAck, []c // successful, then the remote commitment chain is advanced by a single // commitment, and a log compaction is attempted. In addition, a slice of // HTLC's which can be forwarded upstream are returned. -func (lc *LightningChannel) ReceiveRevocation(revMsg *lnwire.RevokeAndAck) ([]*PaymentDescriptor, error) { +func (lc *LightningChannel) ReceiveRevocation(revMsg *lnwire.RevokeAndAck) ( + []*PaymentDescriptor, error) { + lc.Lock() defer lc.Unlock() @@ -3728,12 +3899,155 @@ func (lc *LightningChannel) ReceiveRevocation(revMsg *lnwire.RevokeAndAck) ([]*P lc.remoteCommitChain.tail().height, lc.remoteCommitChain.tail().height+1) + // Add one to the remote tail since this will be height *after* we write + // the revocation to disk, the local height will remain unchanged. + remoteChainTail := lc.remoteCommitChain.tail().height + 1 + localChainTail := lc.localCommitChain.tail().height + + chanID := lnwire.NewChanIDFromOutPoint(&lc.channelState.FundingOutpoint) + + // Determine the set of htlcs that can be forwarded as a result of + // having received the revocation. We will simultaneously construct the + // log updates and payment descriptors, allowing us to persist the log + // updates to disk and optimistically buffer the forwarding package in + // memory. + var ( + addsToForward []*PaymentDescriptor + addUpdates []channeldb.LogUpdate + settleFailsToForward []*PaymentDescriptor + settleFailUpdates []channeldb.LogUpdate + ) + + var addIndex, settleFailIndex uint16 + for e := lc.remoteUpdateLog.Front(); e != nil; e = e.Next() { + pd := e.Value.(*PaymentDescriptor) + + if pd.isForwarded { + continue + } + + uncommitted := (pd.addCommitHeightRemote == 0 || + pd.addCommitHeightLocal == 0) + if pd.EntryType == Add && uncommitted { + continue + } + + // Using the height of the remote and local commitments, + // preemptively compute whether or not to forward this HTLC for + // the case in which this in an Add HTLC, or if this is a + // Settle, Fail, or MalformedFail. + shouldFwdAdd := remoteChainTail == pd.addCommitHeightRemote && + localChainTail >= pd.addCommitHeightLocal + shouldFwdRmv := remoteChainTail >= pd.removeCommitHeightRemote && + localChainTail >= pd.removeCommitHeightLocal + + // We'll only forward any new HTLC additions iff, it's "freshly + // locked in". Meaning that the HTLC was only *just* considered + // locked-in at this new state. By doing this we ensure that we + // don't re-forward any already processed HTLC's after a + // restart. + switch { + case pd.EntryType == Add && shouldFwdAdd: + + // Construct a reference specifying the location that + // this forwarded Add will be written in the forwarding + // package constructed at this remote height. + pd.SourceRef = &channeldb.AddRef{ + Height: remoteChainTail, + Index: addIndex, + } + addIndex++ + + pd.isForwarded = true + addsToForward = append(addsToForward, pd) + + case pd.EntryType != Add && shouldFwdRmv: + + // Construct a reference specifying the location that + // this forwarded Settle/Fail will be written in the + // forwarding package constructed at this remote height. + pd.DestRef = &channeldb.SettleFailRef{ + Source: lc.ShortChanID(), + Height: remoteChainTail, + Index: settleFailIndex, + } + settleFailIndex++ + + pd.isForwarded = true + settleFailsToForward = append(settleFailsToForward, pd) + + default: + continue + } + + // If we've reached this point, this HTLC will be added to the + // forwarding package at the height of the remote commitment. + // All types of HTLCs will record their assigned log index. + logUpdate := channeldb.LogUpdate{ + LogIndex: pd.LogIndex, + } + + // Next, we'll map the type of the PaymentDescriptor to one of + // the four messages that it corresponds to and separate the + // updates into Adds and Settle/Fail/MalformedFail such that + // they can be written in the forwarding package. Adds are + // aggregated separately from the other types of HTLCs. + switch pd.EntryType { + case Add: + htlc := &lnwire.UpdateAddHTLC{ + ChanID: chanID, + ID: pd.HtlcIndex, + Amount: pd.Amount, + Expiry: pd.Timeout, + PaymentHash: pd.RHash, + } + copy(htlc.OnionBlob[:], pd.OnionBlob) + logUpdate.UpdateMsg = htlc + addUpdates = append(addUpdates, logUpdate) + + case Settle: + logUpdate.UpdateMsg = &lnwire.UpdateFulfillHTLC{ + ChanID: chanID, + ID: pd.ParentIndex, + PaymentPreimage: pd.RPreimage, + } + settleFailUpdates = append(settleFailUpdates, logUpdate) + + case Fail: + logUpdate.UpdateMsg = &lnwire.UpdateFailHTLC{ + ChanID: chanID, + ID: pd.ParentIndex, + Reason: pd.FailReason, + } + settleFailUpdates = append(settleFailUpdates, logUpdate) + + case MalformedFail: + logUpdate.UpdateMsg = &lnwire.UpdateFailMalformedHTLC{ + ChanID: chanID, + ID: pd.ParentIndex, + ShaOnionBlob: pd.ShaOnionBlob, + FailureCode: pd.FailCode, + } + settleFailUpdates = append(settleFailUpdates, logUpdate) + } + } + + source := lc.channelState.ShortChanID + + // Now that we have gathered the set of HTLCs to forward, separated by + // type, construct a forwarding package using the height that the remote + // commitment chain will be extended after persisting the revocation. + fwdPkg := channeldb.NewFwdPkg( + source, remoteChainTail, addUpdates, settleFailUpdates, + ) + // At this point, the revocation has been accepted, and we've rotated // the current revocation key+hash for the remote party. Therefore we // sync now to ensure the revocation producer state is consistent with // the current commitment height and also to advance the on-disk // commitment chain. - if err := lc.channelState.AdvanceCommitChainTail(); err != nil { + err = lc.channelState.AdvanceCommitChainTail(fwdPkg) + if err != nil { return nil, err } @@ -3741,59 +4055,37 @@ func (lc *LightningChannel) ReceiveRevocation(revMsg *lnwire.RevokeAndAck) ([]*P // chain, we can advance their chain by a single commitment. lc.remoteCommitChain.advanceTail() - remoteChainTail := lc.remoteCommitChain.tail().height - localChainTail := lc.localCommitChain.tail().height - - // Now that we've verified the revocation update the state of the HTLC - // log as we may be able to prune portions of it now, and update their - // balance. - var htlcsToForward []*PaymentDescriptor - for e := lc.remoteUpdateLog.Front(); e != nil; e = e.Next() { - htlc := e.Value.(*PaymentDescriptor) - - if htlc.isForwarded { - continue - } - - uncommitted := (htlc.addCommitHeightRemote == 0 || - htlc.addCommitHeightLocal == 0) - if htlc.EntryType == Add && uncommitted { - continue - } - - // We'll only forward any new HTLC additions iff, it's "freshly - // locked in". Meaning that the HTLC was only *just* considered - // locked-in at this new state. By doing this we ensure that we - // don't re-forward any already processed HTLC's after a - // restart. - if htlc.EntryType == Add && - remoteChainTail == htlc.addCommitHeightRemote && - localChainTail >= htlc.addCommitHeightLocal { - - htlc.isForwarded = true - htlcsToForward = append(htlcsToForward, htlc) - continue - } - - if htlc.EntryType != Add && - remoteChainTail >= htlc.removeCommitHeightRemote && - localChainTail >= htlc.removeCommitHeightLocal { - - htlc.isForwarded = true - htlcsToForward = append(htlcsToForward, htlc) - continue - } - } - // As we've just completed a new state transition, attempt to see if we // can remove any entries from the update log which have been removed // from the PoV of both commitment chains. compactLogs(lc.localUpdateLog, lc.remoteUpdateLog, localChainTail, remoteChainTail) + htlcsToForward := append(settleFailsToForward, + addsToForward...) + return htlcsToForward, nil } +// LoadFwdPkgs loads any pending log updates from disk and returns the payment +// descriptors to be processed by the link. +func (lc *LightningChannel) LoadFwdPkgs() ([]*channeldb.FwdPkg, error) { + return lc.channelState.LoadFwdPkgs() +} + +// SetFwdFilter writes the forwarding decision for a given remote commitment +// height. +func (lc *LightningChannel) SetFwdFilter(height uint64, + fwdFilter *channeldb.PkgFilter) error { + + return lc.channelState.SetFwdFilter(height, fwdFilter) +} + +// RemoveFwdPkg permanently deletes the forwarding package at the given height. +func (lc *LightningChannel) RemoveFwdPkg(height uint64) error { + return lc.channelState.RemoveFwdPkg(height) +} + // NextRevocationKey returns the commitment point for the _next_ commitment // height. The pubkey returned by this function is required by the remote party // along with their revocation base to to extend our commitment chain with a @@ -3825,6 +4117,7 @@ func (lc *LightningChannel) InitNextRevocation(revKey *btcec.PublicKey) error { // AddHTLC adds an HTLC to the state machine's local update log. This method // should be called when preparing to send an outgoing HTLC. func (lc *LightningChannel) AddHTLC(htlc *lnwire.UpdateAddHTLC) (uint64, error) { + lc.Lock() defer lc.Unlock() @@ -3885,8 +4178,7 @@ func (lc *LightningChannel) ReceiveHTLC(htlc *lnwire.UpdateAddHTLC) (uint64, err // creating the corresponding wire message. In the case the supplied preimage // is invalid, an error is returned. Additionally, the value of the settled // HTLC is also returned. -func (lc *LightningChannel) SettleHTLC(preimage [32]byte, htlcIndex uint64, -) error { +func (lc *LightningChannel) SettleHTLC(preimage [32]byte, htlcIndex uint64) error { lc.Lock() defer lc.Unlock() @@ -3952,6 +4244,7 @@ func (lc *LightningChannel) ReceiveHTLCSettle(preimage [32]byte, htlcIndex uint6 // update. This method is intended to be called in order to cancel in // _incoming_ HTLC. func (lc *LightningChannel) FailHTLC(htlcIndex uint64, reason []byte) error { + lc.Lock() defer lc.Unlock() @@ -4041,9 +4334,6 @@ func (lc *LightningChannel) ReceiveFailHTLC(htlcIndex uint64, reason []byte, // created this active channel. This outpoint is used throughout various // subsystems to uniquely identify an open channel. func (lc *LightningChannel) ChannelPoint() *wire.OutPoint { - lc.RLock() - defer lc.RUnlock() - return &lc.channelState.FundingOutpoint } @@ -4051,9 +4341,6 @@ func (lc *LightningChannel) ChannelPoint() *wire.OutPoint { // ID encodes the exact location in the main chain that the original // funding output can be found. func (lc *LightningChannel) ShortChanID() lnwire.ShortChannelID { - lc.RLock() - defer lc.RUnlock() - return lc.channelState.ShortChanID } @@ -5417,3 +5704,19 @@ func (lc *LightningChannel) ActiveHtlcs() []channeldb.HTLC { func (lc *LightningChannel) LocalChanReserve() btcutil.Amount { return lc.localChanCfg.ChanReserve } + +// LocalHtlcIndex returns the next local htlc index to be allocated. +func (lc *LightningChannel) LocalHtlcIndex() uint64 { + lc.RLock() + defer lc.RUnlock() + + return lc.channelState.LocalCommitment.LocalHtlcIndex +} + +// RemoteCommitHeight returns the commitment height of the remote chain. +func (lc *LightningChannel) RemoteCommitHeight() uint64 { + lc.RLock() + defer lc.RUnlock() + + return lc.channelState.RemoteCommitment.CommitHeight +} From 6e542d5dfa6ed77c252918ac594d1289937d3313 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 23 Feb 2018 19:28:18 -0800 Subject: [PATCH 7/9] lnwallet/channel_test: init open channels with Packager --- lnwallet/channel_test.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/lnwallet/channel_test.go b/lnwallet/channel_test.go index 29895f6f3..d67d09817 100644 --- a/lnwallet/channel_test.go +++ b/lnwallet/channel_test.go @@ -2,9 +2,11 @@ package lnwallet import ( "bytes" + "crypto/rand" "crypto/sha256" + "encoding/binary" + "io" "io/ioutil" - "os" "reflect" "runtime" @@ -280,11 +282,21 @@ func createTestChannels(revocationWindow int) (*LightningChannel, CommitSig: bytes.Repeat([]byte{1}, 71), } + var chanIDBytes [8]byte + if _, err := io.ReadFull(rand.Reader, chanIDBytes[:]); err != nil { + return nil, nil, nil, err + } + + shortChanID := lnwire.NewShortChanIDFromInt( + binary.BigEndian.Uint64(chanIDBytes[:]), + ) + aliceChannelState := &channeldb.OpenChannel{ LocalChanCfg: aliceCfg, RemoteChanCfg: bobCfg, IdentityPub: aliceKeys[0].PubKey(), FundingOutpoint: *prevOut, + ShortChanID: shortChanID, ChanType: channeldb.SingleFunder, IsInitiator: true, Capacity: channelCapacity, @@ -294,12 +306,14 @@ func createTestChannels(revocationWindow int) (*LightningChannel, LocalCommitment: aliceCommit, RemoteCommitment: aliceCommit, Db: dbAlice, + Packager: channeldb.NewChannelPackager(shortChanID), } bobChannelState := &channeldb.OpenChannel{ LocalChanCfg: bobCfg, RemoteChanCfg: aliceCfg, IdentityPub: bobKeys[0].PubKey(), FundingOutpoint: *prevOut, + ShortChanID: shortChanID, ChanType: channeldb.SingleFunder, IsInitiator: false, Capacity: channelCapacity, @@ -309,6 +323,7 @@ func createTestChannels(revocationWindow int) (*LightningChannel, LocalCommitment: bobCommit, RemoteCommitment: bobCommit, Db: dbBob, + Packager: channeldb.NewChannelPackager(shortChanID), } aliceSigner := &mockSigner{privkeys: aliceKeys} From 970006ff2ad8c439425f0f20cb5044a8bfb1ca79 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 23 Feb 2018 19:28:36 -0800 Subject: [PATCH 8/9] breacharbiter_test: init open channels with Pacakager --- breacharbiter_test.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/breacharbiter_test.go b/breacharbiter_test.go index 6ce445db3..0f2fe16df 100644 --- a/breacharbiter_test.go +++ b/breacharbiter_test.go @@ -4,8 +4,11 @@ package main import ( "bytes" + crand "crypto/rand" "crypto/sha256" + "encoding/binary" "fmt" + "io" "io/ioutil" "math/rand" "net" @@ -1384,6 +1387,15 @@ func createInitChannels(revocationWindow int) (*lnwallet.LightningChannel, *lnwa CommitSig: bytes.Repeat([]byte{1}, 71), } + var chanIDBytes [8]byte + if _, err := io.ReadFull(crand.Reader, chanIDBytes[:]); err != nil { + return nil, nil, nil, err + } + + shortChanID := lnwire.NewShortChanIDFromInt( + binary.BigEndian.Uint64(chanIDBytes[:]), + ) + aliceChannelState := &channeldb.OpenChannel{ LocalChanCfg: aliceCfg, RemoteChanCfg: bobCfg, @@ -1398,6 +1410,7 @@ func createInitChannels(revocationWindow int) (*lnwallet.LightningChannel, *lnwa LocalCommitment: aliceCommit, RemoteCommitment: aliceCommit, Db: dbAlice, + Packager: channeldb.NewChannelPackager(shortChanID), } bobChannelState := &channeldb.OpenChannel{ LocalChanCfg: bobCfg, @@ -1413,6 +1426,7 @@ func createInitChannels(revocationWindow int) (*lnwallet.LightningChannel, *lnwa LocalCommitment: bobCommit, RemoteCommitment: bobCommit, Db: dbBob, + Packager: channeldb.NewChannelPackager(shortChanID), } pCache := &mockPreimageCache{ From 6a88ff940a18c105356ce7aa39d6ec723443ccc0 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 27 Feb 2018 19:32:36 -0800 Subject: [PATCH 9/9] htlcswitch_/test_utils: init OpenChannels w/ Packager --- htlcswitch/test_utils.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index a7f4dd84e..e95038de6 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -5,16 +5,13 @@ import ( "crypto/rand" "crypto/sha256" "fmt" + "io/ioutil" + "math/big" + "net" + "os" "testing" "time" - "io/ioutil" - "os" - - "math/big" - - "net" - "github.com/btcsuite/fastsha256" "github.com/go-errors/errors" "github.com/lightningnetwork/lightning-onion" @@ -266,6 +263,7 @@ func createTestChannel(alicePrivKey, bobPrivKey []byte, RemoteCommitment: aliceCommit, ShortChanID: chanID, Db: dbAlice, + Packager: channeldb.NewChannelPackager(chanID), } bobChannelState := &channeldb.OpenChannel{ @@ -283,6 +281,7 @@ func createTestChannel(alicePrivKey, bobPrivKey []byte, RemoteCommitment: bobCommit, ShortChanID: chanID, Db: dbBob, + Packager: channeldb.NewChannelPackager(chanID), } if err := aliceChannelState.SyncPending(bobAddr, broadcastHeight); err != nil {