diff --git a/channeldb/db.go b/channeldb/db.go index c30e6bdee..a4c9d13af 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -276,6 +276,11 @@ var ( // channelOpeningState for each channel that is currently in the process // of being opened. channelOpeningStateBucket = []byte("channelOpeningState") + + // initialChannelFwdingPolicyBucket is the database bucket used to store + // the forwarding policy for each permanent channel that is currently + // in the process of being opened. + initialChannelFwdingPolicyBucket = []byte("initialChannelFwdingPolicy") ) // DB is the primary datastore for the lnd daemon. The database stores @@ -324,7 +329,9 @@ func Open(dbPath string, modifiers ...OptionModifier) (*DB, error) { // CreateWithBackend creates channeldb instance using the passed kvdb.Backend. // Any necessary schemas migrations due to updates will take place as necessary. -func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB, error) { +func CreateWithBackend(backend kvdb.Backend, + modifiers ...OptionModifier) (*DB, error) { + opts := DefaultOptions() for _, modifier := range modifiers { modifier(&opts) @@ -656,7 +663,9 @@ func (c *ChannelStateDB) FetchChannel(tx kvdb.RTx, chanPoint wire.OutPoint) ( // The next layer down is all the chains that this node // has channels on with us. - return nodeChanBucket.ForEach(func(chainHash, v []byte) error { + return nodeChanBucket.ForEach(func(chainHash, + v []byte) error { + // If there's a value, it's not a bucket so // ignore it. if v != nil { @@ -1108,8 +1117,8 @@ func (c *ChannelStateDB) pruneLinkNode(openChannels []*OpenChannel, return c.linkNodeDB.DeleteLinkNode(remotePub) } -// PruneLinkNodes attempts to prune all link nodes found within the database with -// whom we no longer have any open channels with. +// PruneLinkNodes attempts to prune all link nodes found within the database +// with whom we no longer have any open channels with. func (c *ChannelStateDB) PruneLinkNodes() error { allLinkNodes, err := c.linkNodeDB.FetchAllLinkNodes() if err != nil { @@ -1290,6 +1299,64 @@ func (c *ChannelStateDB) AbandonChannel(chanPoint *wire.OutPoint, return dbChan.CloseChannel(summary, ChanStatusLocalCloseInitiator) } +// SaveInitialFwdingPolicy saves the serialized forwarding policy for the +// provided permanent channel id to the initialChannelFwdingPolicyBucket. +func (c *ChannelStateDB) SaveInitialFwdingPolicy(chanID, + forwardingPolicy []byte) error { + + return kvdb.Update(c.backend, func(tx kvdb.RwTx) error { + bucket, err := tx.CreateTopLevelBucket( + initialChannelFwdingPolicyBucket, + ) + if err != nil { + return err + } + + return bucket.Put(chanID, forwardingPolicy) + }, func() {}) +} + +// GetInitialFwdingPolicy fetches the serialized forwarding policy for the +// provided channel id from the database, or returns ErrChannelNotFound if +// a forwarding policy for this channel id is not found. +func (c *ChannelStateDB) GetInitialFwdingPolicy(chanID []byte) ([]byte, error) { + var serializedState []byte + err := kvdb.View(c.backend, func(tx kvdb.RTx) error { + bucket := tx.ReadBucket(initialChannelFwdingPolicyBucket) + if bucket == nil { + // If the bucket does not exist, it means we + // never added a channel fees to the db, so + // return ErrChannelNotFound. + return ErrChannelNotFound + } + + stateBytes := bucket.Get(chanID) + if stateBytes == nil { + return ErrChannelNotFound + } + + serializedState = append(serializedState, stateBytes...) + + return nil + }, func() { + serializedState = nil + }) + return serializedState, err +} + +// DeleteInitialFwdingPolicy removes the forwarding policy for a given channel +// from the database. +func (c *ChannelStateDB) DeleteInitialFwdingPolicy(chanID []byte) error { + return kvdb.Update(c.backend, func(tx kvdb.RwTx) error { + bucket := tx.ReadWriteBucket(initialChannelFwdingPolicyBucket) + if bucket == nil { + return ErrChannelNotFound + } + + return bucket.Delete(chanID) + }, func() {}) +} + // SaveChannelOpeningState saves the serialized channel state for the provided // chanPoint to the channelOpeningStateBucket. func (c *ChannelStateDB) SaveChannelOpeningState(outPoint, @@ -1308,7 +1375,9 @@ func (c *ChannelStateDB) SaveChannelOpeningState(outPoint, // GetChannelOpeningState fetches the serialized channel state for the provided // outPoint from the database, or returns ErrChannelNotFound if the channel // is not found. -func (c *ChannelStateDB) GetChannelOpeningState(outPoint []byte) ([]byte, error) { +func (c *ChannelStateDB) GetChannelOpeningState(outPoint []byte) ([]byte, + error) { + var serializedState []byte err := kvdb.View(c.backend, func(tx kvdb.RTx) error { bucket := tx.ReadBucket(channelOpeningStateBucket) @@ -1392,7 +1461,8 @@ func (d *DB) syncVersions(versions []mandatoryVersion) error { continue } - log.Infof("Applying migration #%v", migrationVersions[i]) + log.Infof("Applying migration #%v", + migrationVersions[i]) if err := migration(tx); err != nil { log.Infof("Unable to apply migration #%v", @@ -1532,7 +1602,9 @@ func fetchHistoricalChanBucket(tx kvdb.RTx, if err := writeOutpoint(&chanPointBuf, outPoint); err != nil { return nil, err } - chanBucket := historicalChanBucket.NestedReadBucket(chanPointBuf.Bytes()) + chanBucket := historicalChanBucket.NestedReadBucket( + chanPointBuf.Bytes(), + ) if chanBucket == nil { return nil, ErrChannelNotFound } diff --git a/funding/manager.go b/funding/manager.go index 500d4426c..2bb3a07f5 100644 --- a/funding/manager.go +++ b/funding/manager.go @@ -138,6 +138,9 @@ type reservationWithCtx struct { chanAmt btcutil.Amount + // forwardingPolicy is the policy provided by the initFundingMsg. + forwardingPolicy htlcswitch.ForwardingPolicy + // Constraints we require for the remote. remoteCsvDelay uint16 remoteMinHtlc lnwire.MilliSatoshi @@ -197,6 +200,15 @@ type InitFundingMsg struct { // LocalFundingAmt is the size of the channel. LocalFundingAmt btcutil.Amount + // BaseFee is the base fee charged for routing payments regardless of the + // number of milli-satoshis sent. + BaseFee *uint64 + + // FeeRate is the fee rate in ppm (parts per million) that will be charged + // proportionally based on the value of each forwarded HTLC, the lowest + // possible rate is 0 with a granularity of 0.000001 (millionths). + FeeRate *uint64 + // PushAmt is the amount pushed to the counterparty. PushAmt lnwire.MilliSatoshi @@ -1592,6 +1604,14 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer, minHtlc = acceptorResp.MinHtlcIn } + // If we are handling a FundingOpen request then we need to + // specify the default channel fees since they are not provided + // by the responder interactively. + forwardingPolicy := htlcswitch.ForwardingPolicy{ + BaseFee: f.cfg.DefaultRoutingPolicy.BaseFee, + FeeRate: f.cfg.DefaultRoutingPolicy.FeeRate, + } + // Once the reservation has been created successfully, we add it to // this peer's map of pending reservations to track this particular // reservation until either abort or completion. @@ -1600,16 +1620,17 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer, f.activeReservations[peerIDKey] = make(pendingChannels) } resCtx := &reservationWithCtx{ - reservation: reservation, - chanAmt: amt, - remoteCsvDelay: remoteCsvDelay, - remoteMinHtlc: minHtlc, - remoteMaxValue: remoteMaxValue, - remoteMaxHtlcs: maxHtlcs, - maxLocalCsv: f.cfg.MaxLocalCSVDelay, - channelType: msg.ChannelType, - err: make(chan error, 1), - peer: peer, + reservation: reservation, + chanAmt: amt, + forwardingPolicy: forwardingPolicy, + remoteCsvDelay: remoteCsvDelay, + remoteMinHtlc: minHtlc, + remoteMaxValue: remoteMaxValue, + remoteMaxHtlcs: maxHtlcs, + maxLocalCsv: f.cfg.MaxLocalCSVDelay, + channelType: msg.ChannelType, + err: make(chan error, 1), + peer: peer, } f.activeReservations[peerIDKey][msg.PendingChannelID] = resCtx f.resMtx.Unlock() @@ -2110,6 +2131,9 @@ func (f *Manager) handleFundingCreated(peer lnpeer.Peer, return } + // Get forwarding policy before deleting the reservation context. + forwardingPolicy := resCtx.forwardingPolicy + // The channel is marked IsPending in the database, and can be removed // from the set of active reservations. f.deleteReservationCtx(peerKey, msg.PendingChannelID) @@ -2176,6 +2200,14 @@ func (f *Manager) handleFundingCreated(peer lnpeer.Peer, return } + // With a permanent channel id established we can save the respective + // forwarding policy in the database. In the channel announcement phase + // this forwarding policy is retrieved and applied. + err = f.saveInitialFwdingPolicy(channelID, &forwardingPolicy) + if err != nil { + log.Errorf("Unable to store the forwarding policy: %v", err) + } + // Now that we've sent over our final signature for this channel, we'll // send it to the ChainArbitrator so it can watch for any on-chain // actions during this final confirmation stage. @@ -2257,6 +2289,14 @@ func (f *Manager) handleFundingSigned(peer lnpeer.Peer, f.localDiscoverySignals[permChanID] = make(chan struct{}) f.localDiscoveryMtx.Unlock() + // We have to store the forwardingPolicy before the reservation context + // is deleted. The policy will then be read and applied in + // newChanAnnouncement. + err = f.saveInitialFwdingPolicy(permChanID, &resCtx.forwardingPolicy) + if err != nil { + log.Errorf("Unable to store the forwarding policy: %v", err) + } + // The remote peer has responded with a signature for our commitment // transaction. We'll verify the signature for validity, then commit // the state to disk as we can now open the channel. @@ -3082,6 +3122,15 @@ func (f *Manager) annAfterSixConfs(completeChan *channeldb.OpenChannel, return fmt.Errorf("unable to send node announcement "+ "to peer %x: %v", pubKey, err) } + + // For private channels we do not announce the channel policy + // to the network but still need to delete them from the + // database. + err = f.deleteInitialFwdingPolicy(chanID) + if err != nil { + log.Infof("Could not delete channel fees "+ + "for chanId %x.", chanID) + } } else { // Otherwise, we'll wait until the funding transaction has // reached 6 confirmations before announcing it. @@ -3523,8 +3572,14 @@ func (f *Manager) newChanAnnouncement(localPubKey, if bytes.Compare(selfBytes, remoteBytes) == -1 { copy(chanAnn.NodeID1[:], localPubKey.SerializeCompressed()) copy(chanAnn.NodeID2[:], remotePubKey.SerializeCompressed()) - copy(chanAnn.BitcoinKey1[:], localFundingKey.PubKey.SerializeCompressed()) - copy(chanAnn.BitcoinKey2[:], remoteFundingKey.SerializeCompressed()) + copy( + chanAnn.BitcoinKey1[:], + localFundingKey.PubKey.SerializeCompressed(), + ) + copy( + chanAnn.BitcoinKey2[:], + remoteFundingKey.SerializeCompressed(), + ) // If we're the first node then update the chanFlags to // indicate the "direction" of the update. @@ -3532,8 +3587,14 @@ func (f *Manager) newChanAnnouncement(localPubKey, } else { copy(chanAnn.NodeID1[:], remotePubKey.SerializeCompressed()) copy(chanAnn.NodeID2[:], localPubKey.SerializeCompressed()) - copy(chanAnn.BitcoinKey1[:], remoteFundingKey.SerializeCompressed()) - copy(chanAnn.BitcoinKey2[:], localFundingKey.PubKey.SerializeCompressed()) + copy( + chanAnn.BitcoinKey1[:], + remoteFundingKey.SerializeCompressed(), + ) + copy( + chanAnn.BitcoinKey2[:], + localFundingKey.PubKey.SerializeCompressed(), + ) // If we're the second node then update the chanFlags to // indicate the "direction" of the update. @@ -3552,19 +3613,24 @@ func (f *Manager) newChanAnnouncement(localPubKey, Timestamp: uint32(time.Now().Unix()), MessageFlags: msgFlags, ChannelFlags: chanFlags, - TimeLockDelta: uint16(f.cfg.DefaultRoutingPolicy.TimeLockDelta), - - // We use the HtlcMinimumMsat that the remote party required us - // to use, as our ChannelUpdate will be used to carry HTLCs - // towards them. + TimeLockDelta: uint16( + f.cfg.DefaultRoutingPolicy.TimeLockDelta, + ), HtlcMinimumMsat: fwdMinHTLC, HtlcMaximumMsat: fwdMaxHTLC, - - BaseFee: uint32(f.cfg.DefaultRoutingPolicy.BaseFee), - FeeRate: uint32(f.cfg.DefaultRoutingPolicy.FeeRate), } - if ourPolicy != nil { + // The caller of newChanAnnouncement is expected to provide the initial + // forwarding policy to be announced. We abort the channel announcement + // if they are not provided. + storedFwdingPolicy, err := f.getInitialFwdingPolicy(chanID) + if err != nil { + return nil, errors.Errorf("unable to generate channel "+ + "update announcement: %v", err) + } + + switch { + case ourPolicy != nil: // If ourPolicy is non-nil, modify the default parameters of the // ChannelUpdate. chanUpdateAnn.MessageFlags = ourPolicy.MessageFlags @@ -3576,6 +3642,21 @@ func (f *Manager) newChanAnnouncement(localPubKey, chanUpdateAnn.FeeRate = uint32( ourPolicy.FeeProportionalMillionths, ) + + case storedFwdingPolicy != nil: + chanUpdateAnn.BaseFee = uint32(storedFwdingPolicy.BaseFee) + chanUpdateAnn.FeeRate = uint32(storedFwdingPolicy.FeeRate) + + default: + log.Infof("No channel forwaring policy specified for channel "+ + "announcement of ChannelID(%v). "+ + "Assuming default fee parameters.", chanID) + chanUpdateAnn.BaseFee = uint32( + f.cfg.DefaultRoutingPolicy.BaseFee, + ) + chanUpdateAnn.FeeRate = uint32( + f.cfg.DefaultRoutingPolicy.FeeRate, + ) } // With the channel update announcement constructed, we'll generate a @@ -3672,6 +3753,14 @@ func (f *Manager) announceChannel(localIDKey, remoteIDKey *btcec.PublicKey, return err } + // After the fee parameters have been stored in the announcement + // we can delete them from the database. + err = f.deleteInitialFwdingPolicy(chanID) + if err != nil { + log.Infof("Could not delete channel fees for chanId %x.", + chanID) + } + // We only send the channel proof announcement and the node announcement // because addToRouterGraph previously sent the ChannelAnnouncement and // the ChannelUpdate announcement messages. The channel proof and node @@ -3792,6 +3881,8 @@ func (f *Manager) handleInitFundingMsg(msg *InitFundingMsg) { var ( peerKey = msg.Peer.IdentityKey() localAmt = msg.LocalFundingAmt + baseFee = msg.BaseFee + feeRate = msg.FeeRate minHtlcIn = msg.MinHtlcIn remoteCsvDelay = msg.RemoteCsvDelay maxValue = msg.MaxValueInFlight @@ -3985,6 +4076,22 @@ func (f *Manager) handleInitFundingMsg(msg *InitFundingMsg) { maxHtlcs = f.cfg.RequiredRemoteMaxHTLCs(capacity) } + // Prepare the optional channel fee values from the initFundingMsg. + // If useBaseFee or useFeeRate are false the client did not + // provide fee values hence we assume default fee settings from + // the config. + forwardingPolicy := htlcswitch.ForwardingPolicy{ + BaseFee: f.cfg.DefaultRoutingPolicy.BaseFee, + FeeRate: f.cfg.DefaultRoutingPolicy.FeeRate, + } + if baseFee != nil { + forwardingPolicy.BaseFee = lnwire.MilliSatoshi(*baseFee) + } + + if feeRate != nil { + forwardingPolicy.FeeRate = lnwire.MilliSatoshi(*feeRate) + } + // If a pending channel map for this peer isn't already created, then // we create one, ultimately allowing us to track this pending // reservation within the target peer. @@ -3995,17 +4102,18 @@ func (f *Manager) handleInitFundingMsg(msg *InitFundingMsg) { } resCtx := &reservationWithCtx{ - chanAmt: capacity, - remoteCsvDelay: remoteCsvDelay, - remoteMinHtlc: minHtlcIn, - remoteMaxValue: maxValue, - remoteMaxHtlcs: maxHtlcs, - maxLocalCsv: maxCSV, - channelType: msg.ChannelType, - reservation: reservation, - peer: msg.Peer, - updates: msg.Updates, - err: msg.Err, + chanAmt: capacity, + forwardingPolicy: forwardingPolicy, + remoteCsvDelay: remoteCsvDelay, + remoteMinHtlc: minHtlcIn, + remoteMaxValue: maxValue, + remoteMaxHtlcs: maxHtlcs, + maxLocalCsv: maxCSV, + channelType: msg.ChannelType, + reservation: reservation, + peer: msg.Peer, + updates: msg.Updates, + err: msg.Err, } f.activeReservations[peerIDKey][chanID] = resCtx f.resMtx.Unlock() @@ -4267,6 +4375,70 @@ func copyPubKey(pub *btcec.PublicKey) *btcec.PublicKey { return btcec.NewPublicKey(&tmp.X, &tmp.Y) } +// saveInitialFwdingPolicy saves the forwarding policy for the provided +// chanPoint in the channelOpeningStateBucket. +func (f *Manager) saveInitialFwdingPolicy(permChanID lnwire.ChannelID, + forwardingPolicy *htlcswitch.ForwardingPolicy) error { + + chanID := make([]byte, 32) + copy(chanID, permChanID[:]) + + scratch := make([]byte, 36) + byteOrder.PutUint64(scratch[:8], uint64(forwardingPolicy.MinHTLCOut)) + byteOrder.PutUint64(scratch[8:16], uint64(forwardingPolicy.MaxHTLC)) + byteOrder.PutUint64(scratch[16:24], uint64(forwardingPolicy.BaseFee)) + byteOrder.PutUint64(scratch[24:32], uint64(forwardingPolicy.FeeRate)) + byteOrder.PutUint32(scratch[32:], forwardingPolicy.TimeLockDelta) + + return f.cfg.Wallet.Cfg.Database.SaveInitialFwdingPolicy( + chanID, scratch, + ) +} + +// getInitialFwdingPolicy fetches the initial forwarding policy for a given +// channel id from the database which will be applied during the channel +// announcement phase. +func (f *Manager) getInitialFwdingPolicy(permChanID lnwire.ChannelID) ( + *htlcswitch.ForwardingPolicy, error) { + + chanID := make([]byte, 32) + copy(chanID, permChanID[:]) + + value, err := f.cfg.Wallet.Cfg.Database.GetInitialFwdingPolicy( + chanID, + ) + + if err != nil { + return nil, err + } + + var fwdingPolicy htlcswitch.ForwardingPolicy + fwdingPolicy.MinHTLCOut = lnwire.MilliSatoshi( + byteOrder.Uint64(value[:8]), + ) + fwdingPolicy.MaxHTLC = lnwire.MilliSatoshi( + byteOrder.Uint64(value[8:16]), + ) + fwdingPolicy.BaseFee = lnwire.MilliSatoshi( + byteOrder.Uint64(value[16:24]), + ) + fwdingPolicy.FeeRate = lnwire.MilliSatoshi( + byteOrder.Uint64(value[24:32]), + ) + fwdingPolicy.TimeLockDelta = byteOrder.Uint32(value[32:36]) + + return &fwdingPolicy, nil +} + +// deleteInitialFwdingPolicy removes channel fees for this chanID from +// the database. +func (f *Manager) deleteInitialFwdingPolicy(permChanID lnwire.ChannelID) error { + chanID := make([]byte, 32) + copy(chanID, permChanID[:]) + + return f.cfg.Wallet.Cfg.Database.DeleteInitialFwdingPolicy(chanID) +} + // saveChannelOpeningState saves the channelOpeningState for the provided // chanPoint to the channelOpeningStateBucket. func (f *Manager) saveChannelOpeningState(chanPoint *wire.OutPoint,