Merge pull request from yyforyongyu/fix-channel-ready-race

Fix race between `channel_ready` and link update
This commit is contained in:
Olaoluwa Osuntokun 2023-08-08 17:10:31 -07:00 committed by GitHub
commit 8f693fe020
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1058 additions and 127 deletions

@ -214,10 +214,17 @@ issues:
- gosec
- funlen
- revive
# Allow duplications in tests so it's easier to follow a single unit
# test.
- dupl
- path: mock*
linters:
- revive
# forcetypeassert is skipped for the mock because the test would fail
# if the returned value doesn't match the type, so there's no need to
# check the convert.
- forcetypeassert
- path: test*
linters:

@ -490,6 +490,10 @@ type Config struct {
// Estimator is used to estimate routing probabilities.
Estimator routing.Estimator
// Dev specifies configs used for integration tests, which is always
// empty if not built with `integration` flag.
Dev *lncfg.DevConfig `group:"dev" namespace:"dev"`
}
// GRPCConfig holds the configuration options for the gRPC server.

@ -63,6 +63,16 @@ func (p *mockPeer) RemoteFeatures() *lnwire.FeatureVector {
return nil
}
func (p *mockPeer) AddPendingChannel(_ lnwire.ChannelID,
_ <-chan struct{}) error {
return nil
}
func (p *mockPeer) RemovePendingChannel(_ lnwire.ChannelID) error {
return nil
}
// mockMessageStore is an in-memory implementation of the MessageStore interface
// used for the gossiper's unit tests.
type mockMessageStore struct {

@ -216,6 +216,9 @@ compare the default values between lnd and sample-lnd.conf.
creation](https://github.com/lightningnetwork/lnd/pull/7856) that can arise
under rare scenarios.
- A race condition found between `channel_ready` and link updates is [now
fixed](https://github.com/lightningnetwork/lnd/pull/7518).
### Tooling and documentation
* Add support for [custom `RPCHOST` and

@ -337,10 +337,22 @@ func newSerializedKey(pubKey *btcec.PublicKey) serializedPubKey {
return s
}
// DevConfig specifies configs used for integration test only.
type DevConfig struct {
// ProcessChannelReadyWait is the duration to sleep before processing
// remote node's channel ready message once the channel as been marked
// as `channelReadySent`.
ProcessChannelReadyWait time.Duration
}
// Config defines the configuration for the FundingManager. All elements
// within the configuration MUST be non-nil for the FundingManager to carry out
// its duties.
type Config struct {
// Dev specifies config values used in integration test. For
// production, this config will always be an empty struct.
Dev *DevConfig
// NoWumboChans indicates if we're to reject all incoming wumbo channel
// requests, and also reject all outgoing wumbo channel requests.
NoWumboChans bool
@ -842,6 +854,48 @@ func (f *Manager) CancelPeerReservations(nodePub [33]byte) {
delete(f.activeReservations, nodePub)
}
// chanIdentifier wraps pending channel ID and channel ID into one struct so
// it's easier to identify a specific channel.
//
// TODO(yy): move to a different package to hide the private fields so direct
// access is disabled.
type chanIdentifier struct {
// tempChanID is the pending channel ID created by the funder when
// initializing the funding flow. For fundee, it's received from the
// `open_channel` message.
tempChanID lnwire.ChannelID
// chanID is the channel ID created by the funder once the
// `accept_channel` message is received. For fundee, it's received from
// the `funding_created` message.
chanID lnwire.ChannelID
// chanIDSet is a boolean indicates whether the active channel ID is
// set for this identifier. For zero conf channels, the `chanID` can be
// all-zero, which is the same as the empty value of `ChannelID`. To
// avoid the confusion, we use this boolean to explicitly signal
// whether the `chanID` is set or not.
chanIDSet bool
}
// newChanIdentifier creates a new chanIdentifier.
func newChanIdentifier(tempChanID lnwire.ChannelID) *chanIdentifier {
return &chanIdentifier{
tempChanID: tempChanID,
}
}
// setChanID updates the `chanIdentifier` with the active channel ID.
func (c *chanIdentifier) setChanID(chanID lnwire.ChannelID) {
c.chanID = chanID
c.chanIDSet = true
}
// hasChanID returns true if the active channel ID has been set.
func (c *chanIdentifier) hasChanID() bool {
return c.chanIDSet
}
// failFundingFlow will fail the active funding flow with the target peer,
// identified by its unique temporary channel ID. This method will send an
// error to the remote peer, and also remove the reservation from our set of
@ -849,14 +903,26 @@ func (f *Manager) CancelPeerReservations(nodePub [33]byte) {
//
// TODO(roasbeef): if peer disconnects, and haven't yet broadcast funding
// transaction, then all reservations should be cleared.
func (f *Manager) failFundingFlow(peer lnpeer.Peer, tempChanID [32]byte,
func (f *Manager) failFundingFlow(peer lnpeer.Peer, cid *chanIdentifier,
fundingErr error) {
log.Debugf("Failing funding flow for pending_id=%x: %v",
tempChanID, fundingErr)
log.Debugf("Failing funding flow for pending_id=%v: %v",
cid.tempChanID, fundingErr)
// First, notify Brontide to remove the pending channel.
//
// NOTE: depending on where we fail the flow, we may not have the
// active channel ID yet.
if cid.hasChanID() {
err := peer.RemovePendingChannel(cid.chanID)
if err != nil {
log.Errorf("Unable to remove channel %v with peer %x: "+
"%v", cid, peer.IdentityKey(), err)
}
}
ctx, err := f.cancelReservationCtx(
peer.IdentityKey(), tempChanID, false,
peer.IdentityKey(), cid.tempChanID, false,
)
if err != nil {
log.Errorf("unable to cancel reservation: %v", err)
@ -887,7 +953,7 @@ func (f *Manager) failFundingFlow(peer lnpeer.Peer, tempChanID [32]byte,
}
errMsg := &lnwire.Error{
ChanID: tempChanID,
ChanID: cid.tempChanID,
Data: msg,
}
@ -1325,13 +1391,14 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer,
}
f.resMtx.RUnlock()
// Create the channel identifier.
cid := newChanIdentifier(msg.PendingChannelID)
// Also count the channels that are already pending. There we don't know
// the underlying intent anymore, unfortunately.
channels, err := f.cfg.Wallet.Cfg.Database.FetchOpenChannels(peerPubKey)
if err != nil {
f.failFundingFlow(
peer, msg.PendingChannelID, err,
)
f.failFundingFlow(peer, cid, err)
return
}
@ -1350,29 +1417,20 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer,
// TODO(roasbeef): modify to only accept a _single_ pending channel per
// block unless white listed
if numPending >= f.cfg.MaxPendingChannels {
f.failFundingFlow(
peer, msg.PendingChannelID,
lnwire.ErrMaxPendingChannels,
)
f.failFundingFlow(peer, cid, lnwire.ErrMaxPendingChannels)
return
}
// Ensure that the pendingChansLimit is respected.
pendingChans, err := f.cfg.Wallet.Cfg.Database.FetchPendingChannels()
if err != nil {
f.failFundingFlow(
peer, msg.PendingChannelID, err,
)
f.failFundingFlow(peer, cid, err)
return
}
if len(pendingChans) > pendingChansLimit {
f.failFundingFlow(
peer, msg.PendingChannelID,
lnwire.ErrMaxPendingChannels,
)
f.failFundingFlow(peer, cid, lnwire.ErrMaxPendingChannels)
return
}
@ -1385,17 +1443,14 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer,
log.Errorf("unable to query wallet: %v", err)
}
err := errors.New("Synchronizing blockchain")
f.failFundingFlow(
peer, msg.PendingChannelID,
err,
)
f.failFundingFlow(peer, cid, err)
return
}
// Ensure that the remote party respects our maximum channel size.
if amt > f.cfg.MaxChanSize {
f.failFundingFlow(
peer, msg.PendingChannelID,
peer, cid,
lnwallet.ErrChanTooLarge(amt, f.cfg.MaxChanSize),
)
return
@ -1405,7 +1460,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer,
// a channel that's below our current min channel size.
if amt < f.cfg.MinChanSize {
f.failFundingFlow(
peer, msg.PendingChannelID,
peer, cid,
lnwallet.ErrChanTooSmall(amt, f.cfg.MinChanSize),
)
return
@ -1414,10 +1469,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer,
// If request specifies non-zero push amount and 'rejectpush' is set,
// signal an error.
if f.cfg.RejectPush && msg.PushAmount > 0 {
f.failFundingFlow(
peer, msg.PendingChannelID,
lnwallet.ErrNonZeroPushAmount(),
)
f.failFundingFlow(peer, cid, lnwallet.ErrNonZeroPushAmount())
return
}
@ -1432,10 +1484,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer,
// the channel.
acceptorResp := f.cfg.OpenChannelPredicate.Accept(chanReq)
if acceptorResp.RejectChannel() {
f.failFundingFlow(
peer, msg.PendingChannelID,
acceptorResp.ChanAcceptError,
)
f.failFundingFlow(peer, cid, acceptorResp.ChanAcceptError)
return
}
@ -1462,7 +1511,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer,
if err != nil {
// TODO(roasbeef): should be using soft errors
log.Errorf("channel type negotiation failed: %v", err)
f.failFundingFlow(peer, msg.PendingChannelID, err)
f.failFundingFlow(peer, cid, err)
return
}
@ -1498,9 +1547,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer,
// Fail the funding flow.
flowErr := fmt.Errorf("channel acceptor blocked " +
"zero-conf channel negotiation")
f.failFundingFlow(
peer, msg.PendingChannelID, flowErr,
)
f.failFundingFlow(peer, cid, flowErr)
return
}
@ -1514,9 +1561,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer,
// Fail the funding flow.
flowErr := fmt.Errorf("scid-alias feature " +
"must be negotiated for zero-conf")
f.failFundingFlow(
peer, msg.PendingChannelID, flowErr,
)
f.failFundingFlow(peer, cid, flowErr)
return
}
@ -1532,7 +1577,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer,
err = fmt.Errorf("option-scid-alias chantype for public " +
"channel")
log.Error(err)
f.failFundingFlow(peer, msg.PendingChannelID, err)
f.failFundingFlow(peer, cid, err)
return
}
@ -1557,7 +1602,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer,
reservation, err := f.cfg.Wallet.InitChannelReservation(req)
if err != nil {
log.Errorf("Unable to initialize reservation: %v", err)
f.failFundingFlow(peer, msg.PendingChannelID, err)
f.failFundingFlow(peer, cid, err)
return
}
@ -1571,7 +1616,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer,
aliasScid, err := f.cfg.AliasManager.RequestAlias()
if err != nil {
log.Errorf("Unable to request alias: %v", err)
f.failFundingFlow(peer, msg.PendingChannelID, err)
f.failFundingFlow(peer, cid, err)
return
}
@ -1612,7 +1657,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer,
)
if err != nil {
log.Errorf("Unacceptable channel constraints: %v", err)
f.failFundingFlow(peer, msg.PendingChannelID, err)
f.failFundingFlow(peer, cid, err)
return
}
@ -1626,7 +1671,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer,
)
if err != nil {
f.failFundingFlow(
peer, msg.PendingChannelID,
peer, cid,
fmt.Errorf("getUpfrontShutdownScript error: %v", err),
)
return
@ -1638,7 +1683,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer,
if commitType == lnwallet.CommitmentTypeScriptEnforcedLease {
if msg.LeaseExpiry == nil {
err := errors.New("missing lease expiry")
f.failFundingFlow(peer, msg.PendingChannelID, err)
f.failFundingFlow(peer, cid, err)
return
}
@ -1651,9 +1696,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer,
reservation.LeaseExpiry() {
err := errors.New("lease expiry mismatch")
f.failFundingFlow(
peer, msg.PendingChannelID, err,
)
f.failFundingFlow(peer, cid, err)
return
}
}
@ -1775,7 +1818,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer,
err = reservation.ProcessSingleContribution(remoteContribution)
if err != nil {
log.Errorf("unable to add contribution reservation: %v", err)
f.failFundingFlow(peer, msg.PendingChannelID, err)
f.failFundingFlow(peer, cid, err)
return
}
@ -1809,7 +1852,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer,
if err := peer.SendMessage(true, &fundingAccept); err != nil {
log.Errorf("unable to send funding response to peer: %v", err)
f.failFundingFlow(peer, msg.PendingChannelID, err)
f.failFundingFlow(peer, cid, err)
return
}
}
@ -1840,6 +1883,9 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer,
log.Infof("Recv'd fundingResponse for pending_id(%x)",
pendingChanID[:])
// Create the channel identifier.
cid := newChanIdentifier(msg.PendingChannelID)
// Perform some basic validation of any custom TLV records included.
//
// TODO: Return errors as funding.Error to give context to remote peer?
@ -1849,14 +1895,14 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer,
if msg.ChannelType == nil {
err := errors.New("explicit channel type not echoed " +
"back")
f.failFundingFlow(peer, msg.PendingChannelID, err)
f.failFundingFlow(peer, cid, err)
return
}
proposedFeatures := lnwire.RawFeatureVector(*resCtx.channelType)
ackedFeatures := lnwire.RawFeatureVector(*msg.ChannelType)
if !proposedFeatures.Equals(&ackedFeatures) {
err := errors.New("channel type mismatch")
f.failFundingFlow(peer, msg.PendingChannelID, err)
f.failFundingFlow(peer, cid, err)
return
}
@ -1866,18 +1912,14 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer,
if msg.LeaseExpiry == nil {
err := errors.New("lease expiry not echoed " +
"back")
f.failFundingFlow(
peer, msg.PendingChannelID, err,
)
f.failFundingFlow(peer, cid, err)
return
}
if uint32(*msg.LeaseExpiry) !=
resCtx.reservation.LeaseExpiry() {
err := errors.New("lease expiry mismatch")
f.failFundingFlow(
peer, msg.PendingChannelID, err,
)
f.failFundingFlow(peer, cid, err)
return
}
}
@ -1899,7 +1941,7 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer,
)
if err != nil {
err := errors.New("received unexpected channel type")
f.failFundingFlow(peer, msg.PendingChannelID, err)
f.failFundingFlow(peer, cid, err)
return
}
@ -1909,7 +1951,7 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer,
// it's another type, we fail the flow.
if implicitChannelType != negotiatedChannelType {
err := errors.New("negotiated unexpected channel type")
f.failFundingFlow(peer, msg.PendingChannelID, err)
f.failFundingFlow(peer, cid, err)
return
}
}
@ -1922,7 +1964,7 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer,
msg.MinAcceptDepth, chainntnfs.MaxNumConfs,
)
log.Warnf("Unacceptable channel constraints: %v", err)
f.failFundingFlow(peer, msg.PendingChannelID, err)
f.failFundingFlow(peer, cid, err)
return
}
@ -1930,7 +1972,7 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer,
if resCtx.reservation.IsZeroConf() && msg.MinAcceptDepth != 0 {
err = fmt.Errorf("zero-conf channel has min_depth non-zero")
log.Warn(err)
f.failFundingFlow(peer, msg.PendingChannelID, err)
f.failFundingFlow(peer, cid, err)
return
}
@ -1939,7 +1981,7 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer,
if !resCtx.reservation.IsZeroConf() && msg.MinAcceptDepth == 0 {
err = fmt.Errorf("non-zero-conf channel has min depth zero")
log.Warn(err)
f.failFundingFlow(peer, msg.PendingChannelID, err)
f.failFundingFlow(peer, cid, err)
return
}
@ -1960,7 +2002,7 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer,
)
if err != nil {
log.Warnf("Unacceptable channel constraints: %v", err)
f.failFundingFlow(peer, msg.PendingChannelID, err)
f.failFundingFlow(peer, cid, err)
return
}
@ -2012,7 +2054,7 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer,
log.Errorf("Unable to process PSBT funding params "+
"for contribution from %x: %v", peerKeyBytes,
err)
f.failFundingFlow(peer, msg.PendingChannelID, err)
f.failFundingFlow(peer, cid, err)
return
}
var buf bytes.Buffer
@ -2020,7 +2062,7 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer,
if err != nil {
log.Errorf("Unable to serialize PSBT for "+
"contribution from %x: %v", peerKeyBytes, err)
f.failFundingFlow(peer, msg.PendingChannelID, err)
f.failFundingFlow(peer, cid, err)
return
}
resCtx.updates <- &lnrpc.OpenStatusUpdate{
@ -2037,7 +2079,7 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer,
} else if err != nil {
log.Errorf("Unable to process contribution from %x: %v",
peerKeyBytes, err)
f.failFundingFlow(peer, msg.PendingChannelID, err)
f.failFundingFlow(peer, cid, err)
return
}
@ -2056,7 +2098,7 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer,
go func() {
defer f.wg.Done()
f.waitForPsbt(psbtIntent, resCtx, pendingChanID)
f.waitForPsbt(psbtIntent, resCtx, cid)
}()
// With the new goroutine spawned, we can now exit to unblock
@ -2066,7 +2108,7 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer,
// In a normal, non-PSBT funding flow, we can jump directly to the next
// step where we expect our contribution to be finalized.
f.continueFundingAccept(resCtx, pendingChanID)
f.continueFundingAccept(resCtx, cid)
}
// waitForPsbt blocks until either a signed PSBT arrives, an error occurs or
@ -2075,7 +2117,7 @@ func (f *Manager) handleFundingAccept(peer lnpeer.Peer,
//
// NOTE: This method must be called as a goroutine.
func (f *Manager) waitForPsbt(intent *chanfunding.PsbtIntent,
resCtx *reservationWithCtx, pendingChanID [32]byte) {
resCtx *reservationWithCtx, cid *chanIdentifier) {
// failFlow is a helper that logs an error message with the current
// context and then fails the funding flow.
@ -2083,9 +2125,9 @@ func (f *Manager) waitForPsbt(intent *chanfunding.PsbtIntent,
failFlow := func(errMsg string, cause error) {
log.Errorf("Unable to handle funding accept message "+
"for peer_key=%x, pending_chan_id=%x: %s: %v",
peerKey.SerializeCompressed(), pendingChanID, errMsg,
peerKey.SerializeCompressed(), cid.tempChanID, errMsg,
cause)
f.failFundingFlow(resCtx.peer, pendingChanID, cause)
f.failFundingFlow(resCtx.peer, cid, cause)
}
// We'll now wait until the intent has received the final and complete
@ -2106,7 +2148,7 @@ func (f *Manager) waitForPsbt(intent *chanfunding.PsbtIntent,
case chanfunding.ErrRemoteCanceled:
log.Infof("Remote canceled, aborting PSBT flow "+
"for peer_key=%x, pending_chan_id=%x",
peerKey.SerializeCompressed(), pendingChanID)
peerKey.SerializeCompressed(), cid.tempChanID)
return
// Nil error means the flow continues normally now.
@ -2128,7 +2170,7 @@ func (f *Manager) waitForPsbt(intent *chanfunding.PsbtIntent,
}
// We are now ready to continue the funding flow.
f.continueFundingAccept(resCtx, pendingChanID)
f.continueFundingAccept(resCtx, cid)
// Handle a server shutdown as well because the reservation won't
// survive a restart as it's in memory only.
@ -2136,7 +2178,7 @@ func (f *Manager) waitForPsbt(intent *chanfunding.PsbtIntent,
log.Errorf("Unable to handle funding accept message "+
"for peer_key=%x, pending_chan_id=%x: funding manager "+
"shutting down", peerKey.SerializeCompressed(),
pendingChanID)
cid.tempChanID)
return
}
}
@ -2145,7 +2187,7 @@ func (f *Manager) waitForPsbt(intent *chanfunding.PsbtIntent,
// contribution is finalized, the channel output is known and the funding
// transaction is signed.
func (f *Manager) continueFundingAccept(resCtx *reservationWithCtx,
pendingChanID [32]byte) {
cid *chanIdentifier) {
// Now that we have their contribution, we can extract, then send over
// both the funding out point and our signature for their version of
@ -2166,26 +2208,40 @@ func (f *Manager) continueFundingAccept(resCtx *reservationWithCtx,
// so we can retrieve the reservation context once we get the
// FundingSigned message.
f.resMtx.Lock()
f.signedReservations[channelID] = pendingChanID
f.signedReservations[channelID] = cid.tempChanID
f.resMtx.Unlock()
log.Infof("Generated ChannelPoint(%v) for pending_id(%x)", outPoint,
pendingChanID[:])
cid.tempChanID[:])
var err error
// Before sending FundingCreated sent, we notify Brontide to keep track
// of this pending open channel.
err := resCtx.peer.AddPendingChannel(channelID, f.quit)
if err != nil {
pubKey := resCtx.peer.IdentityKey().SerializeCompressed()
log.Errorf("Unable to add pending channel %v with peer %x: %v",
channelID, pubKey, err)
}
// Once Brontide is aware of this channel, we need to set it in
// chanIdentifier so this channel will be removed from Brontide if the
// funding flow fails.
cid.setChanID(channelID)
// Send the FundingCreated msg.
fundingCreated := &lnwire.FundingCreated{
PendingChannelID: pendingChanID,
PendingChannelID: cid.tempChanID,
FundingPoint: *outPoint,
}
fundingCreated.CommitSig, err = lnwire.NewSigFromSignature(sig)
if err != nil {
log.Errorf("Unable to parse signature: %v", err)
f.failFundingFlow(resCtx.peer, pendingChanID, err)
f.failFundingFlow(resCtx.peer, cid, err)
return
}
if err := resCtx.peer.SendMessage(true, fundingCreated); err != nil {
log.Errorf("Unable to send funding complete message: %v", err)
f.failFundingFlow(resCtx.peer, pendingChanID, err)
f.failFundingFlow(resCtx.peer, cid, err)
return
}
}
@ -2216,10 +2272,13 @@ func (f *Manager) handleFundingCreated(peer lnpeer.Peer,
log.Infof("completing pending_id(%x) with ChannelPoint(%v)",
pendingChanID[:], fundingOut)
// Create the channel identifier without setting the active channel ID.
cid := newChanIdentifier(pendingChanID)
commitSig, err := msg.CommitSig.ToSignature()
if err != nil {
log.Errorf("unable to parse signature: %v", err)
f.failFundingFlow(peer, pendingChanID, err)
f.failFundingFlow(peer, cid, err)
return
}
@ -2234,7 +2293,7 @@ func (f *Manager) handleFundingCreated(peer lnpeer.Peer,
if err != nil {
// TODO(roasbeef): better error logging: peerID, channelID, etc.
log.Errorf("unable to complete single reservation: %v", err)
f.failFundingFlow(peer, pendingChanID, err)
f.failFundingFlow(peer, cid, err)
return
}
@ -2243,7 +2302,7 @@ func (f *Manager) handleFundingCreated(peer lnpeer.Peer,
// The channel is marked IsPending in the database, and can be removed
// from the set of active reservations.
f.deleteReservationCtx(peerKey, msg.PendingChannelID)
f.deleteReservationCtx(peerKey, cid.tempChanID)
// If something goes wrong before the funding transaction is confirmed,
// we use this convenience method to delete the pending OpenChannel
@ -2289,18 +2348,31 @@ func (f *Manager) handleFundingCreated(peer lnpeer.Peer,
ourCommitSig, err := lnwire.NewSigFromSignature(sig)
if err != nil {
log.Errorf("unable to parse signature: %v", err)
f.failFundingFlow(peer, pendingChanID, err)
f.failFundingFlow(peer, cid, err)
deleteFromDatabase()
return
}
// Before sending FundingSigned, we notify Brontide first to keep track
// of this pending open channel.
if err := peer.AddPendingChannel(channelID, f.quit); err != nil {
pubKey := peer.IdentityKey().SerializeCompressed()
log.Errorf("Unable to add pending channel %v with peer %x: %v",
cid.chanID, pubKey, err)
}
// Once Brontide is aware of this channel, we need to set it in
// chanIdentifier so this channel will be removed from Brontide if the
// funding flow fails.
cid.setChanID(channelID)
fundingSigned := &lnwire.FundingSigned{
ChanID: channelID,
ChanID: cid.chanID,
CommitSig: ourCommitSig,
}
if err := peer.SendMessage(true, fundingSigned); err != nil {
log.Errorf("unable to send FundingSigned message: %v", err)
f.failFundingFlow(peer, pendingChanID, err)
f.failFundingFlow(peer, cid, err)
deleteFromDatabase()
return
}
@ -2308,7 +2380,7 @@ func (f *Manager) handleFundingCreated(peer lnpeer.Peer,
// With a permanent channel id established we can save the respective
// forwarding policy in the database. In the channel announcement phase
// this forwarding policy is retrieved and applied.
err = f.saveInitialFwdingPolicy(channelID, &forwardingPolicy)
err = f.saveInitialFwdingPolicy(cid.chanID, &forwardingPolicy)
if err != nil {
log.Errorf("Unable to store the forwarding policy: %v", err)
}
@ -2324,7 +2396,7 @@ func (f *Manager) handleFundingCreated(peer lnpeer.Peer,
// Create an entry in the local discovery map so we can ensure that we
// process the channel confirmation fully before we receive a
// channel_ready message.
f.localDiscoverySignals.Store(channelID, make(chan struct{}))
f.localDiscoverySignals.Store(cid.chanID, make(chan struct{}))
// Inform the ChannelNotifier that the channel has entered
// pending open state.
@ -2365,11 +2437,29 @@ func (f *Manager) handleFundingSigned(peer lnpeer.Peer,
pendingChanID, ok := f.signedReservations[msg.ChanID]
delete(f.signedReservations, msg.ChanID)
f.resMtx.Unlock()
// Create the channel identifier and set the channel ID.
//
// NOTE: we may get an empty pending channel ID here if the key cannot
// be found, which means when we cancel the reservation context in
// `failFundingFlow`, we will get an error. In this case, we will send
// an error msg to our peer using the active channel ID.
//
// TODO(yy): refactor the funding flow to fix this case.
cid := newChanIdentifier(pendingChanID)
cid.setChanID(msg.ChanID)
// If the pending channel ID is not found, fail the funding flow.
if !ok {
// NOTE: we directly overwrite the pending channel ID here for
// this rare case since we don't have a valid pending channel
// ID.
cid.tempChanID = msg.ChanID
err := fmt.Errorf("unable to find signed reservation for "+
"chan_id=%x", msg.ChanID)
log.Warnf(err.Error())
f.failFundingFlow(peer, msg.ChanID, err)
f.failFundingFlow(peer, cid, err)
return
}
@ -2379,7 +2469,7 @@ func (f *Manager) handleFundingSigned(peer lnpeer.Peer,
log.Warnf("Unable to find reservation (peer_id:%v, "+
"chan_id:%x)", peerKey, pendingChanID[:])
// TODO: add ErrChanNotFound?
f.failFundingFlow(peer, pendingChanID, err)
f.failFundingFlow(peer, cid, err)
return
}
@ -2404,7 +2494,7 @@ func (f *Manager) handleFundingSigned(peer lnpeer.Peer,
commitSig, err := msg.CommitSig.ToSignature()
if err != nil {
log.Errorf("Unable to parse signature: %v", err)
f.failFundingFlow(peer, pendingChanID, err)
f.failFundingFlow(peer, cid, err)
return
}
@ -2414,7 +2504,7 @@ func (f *Manager) handleFundingSigned(peer lnpeer.Peer,
if err != nil {
log.Errorf("Unable to complete reservation sign "+
"complete: %v", err)
f.failFundingFlow(peer, pendingChanID, err)
f.failFundingFlow(peer, cid, err)
return
}
@ -2575,12 +2665,16 @@ func (f *Manager) fundingTimeout(c *channeldb.OpenChannel,
"to come online: %v", err)
}
// Create channel identifier and set the channel ID.
cid := newChanIdentifier(pendingID)
cid.setChanID(lnwire.NewChanIDFromOutPoint(&c.FundingOutpoint))
// TODO(halseth): should this send be made
// reliable?
// The reservation won't exist at this point, but we'll send an
// Error message over anyways with ChanID set to pendingID.
f.failFundingFlow(peer, pendingID, timeoutErr)
f.failFundingFlow(peer, cid, timeoutErr)
}()
return timeoutErr
@ -3441,6 +3535,24 @@ func (f *Manager) handleChannelReady(peer lnpeer.Peer,
msg *lnwire.ChannelReady) {
defer f.wg.Done()
// If we are in development mode, we'll wait for specified duration
// before processing the channel ready message.
if f.cfg.Dev != nil {
duration := f.cfg.Dev.ProcessChannelReadyWait
log.Warnf("Channel(%v): sleeping %v before processing "+
"channel_ready", msg.ChanID, duration)
select {
case <-time.After(duration):
log.Warnf("Channel(%v): slept %v before processing "+
"channel_ready", msg.ChanID, duration)
case <-f.quit:
log.Warnf("Channel(%v): quit sleeping", msg.ChanID)
return
}
}
log.Debugf("Received ChannelReady for ChannelID(%v) from "+
"peer %x", msg.ChanID,
peer.IdentityKey().SerializeCompressed())
@ -4386,7 +4498,16 @@ func (f *Manager) pruneZombieReservations() {
resCtx.peer.IdentityKey().SerializeCompressed(),
pendingChanID[:])
log.Warnf(err.Error())
f.failFundingFlow(resCtx.peer, pendingChanID, err)
chanID := lnwire.NewChanIDFromOutPoint(
resCtx.reservation.FundingOutpoint(),
)
// Create channel identifier and set the channel ID.
cid := newChanIdentifier(pendingChanID)
cid.setChanID(chanID)
f.failFundingFlow(resCtx.peer, cid, err)
}
}

@ -321,6 +321,16 @@ func (n *testNode) AddNewChannel(channel *channeldb.OpenChannel,
}
}
func (n *testNode) AddPendingChannel(_ lnwire.ChannelID,
quit <-chan struct{}) error {
return nil
}
func (n *testNode) RemovePendingChannel(_ lnwire.ChannelID) error {
return nil
}
func createTestWallet(cdb *channeldb.ChannelStateDB, netParams *chaincfg.Params,
notifier chainntnfs.ChainNotifier, wc lnwallet.WalletController,
signer input.Signer, keyRing keychain.SecretKeyRing,

@ -1888,6 +1888,16 @@ func (m *mockPeer) RemoteFeatures() *lnwire.FeatureVector {
return nil
}
func (m *mockPeer) AddPendingChannel(_ lnwire.ChannelID,
_ <-chan struct{}) error {
return nil
}
func (m *mockPeer) RemovePendingChannel(_ lnwire.ChannelID) error {
return nil
}
func newSingleLinkTestHarness(t *testing.T, chanAmt, chanReserve btcutil.Amount) (
ChannelLink, *lnwallet.LightningChannel, chan time.Time, func() error,
func() (*lnwallet.LightningChannel, error), error) {

@ -672,6 +672,16 @@ func (s *mockServer) AddNewChannel(channel *channeldb.OpenChannel,
return nil
}
func (s *mockServer) AddPendingChannel(_ lnwire.ChannelID,
cancel <-chan struct{}) error {
return nil
}
func (s *mockServer) RemovePendingChannel(_ lnwire.ChannelID) error {
return nil
}
func (s *mockServer) WipeChannel(*wire.OutPoint) {}
func (s *mockServer) LocalFeatures() *lnwire.FeatureVector {

@ -546,4 +546,8 @@ var allTestCases = []*lntest.TestCase{
Name: "utxo selection funding",
TestFunc: testChannelUtxoSelection,
},
{
Name: "update pending open channels",
TestFunc: testUpdateOnPendingOpenChannels,
},
}

@ -10,6 +10,7 @@ import (
"github.com/lightningnetwork/lnd/chainreg"
"github.com/lightningnetwork/lnd/funding"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
"github.com/lightningnetwork/lnd/lntest"
"github.com/lightningnetwork/lnd/lntest/node"
"github.com/lightningnetwork/lnd/lntest/rpc"
@ -488,6 +489,194 @@ func runBasicChannelCreationAndUpdates(ht *lntest.HarnessTest,
)
}
// testUpdateOnPendingOpenChannels checks that `update_add_htlc` followed by
// `channel_ready` is properly handled. In specific, when a node is in a state
// that it's still processing a remote `channel_ready` message, meanwhile an
// `update_add_htlc` is received, this HTLC message is cached and settled once
// processing `channel_ready` is complete.
func testUpdateOnPendingOpenChannels(ht *lntest.HarnessTest) {
// Test funder's behavior. Funder sees the channel pending, but fundee
// sees it active and sends an HTLC.
ht.Run("pending on funder side", func(t *testing.T) {
st := ht.Subtest(t)
testUpdateOnFunderPendingOpenChannels(st)
})
// Test fundee's behavior. Fundee sees the channel pending, but funder
// sees it active and sends an HTLC.
ht.Run("pending on fundee side", func(t *testing.T) {
st := ht.Subtest(t)
testUpdateOnFundeePendingOpenChannels(st)
})
}
// testUpdateOnFunderPendingOpenChannels checks that when the fundee sends an
// `update_add_htlc` followed by `channel_ready` while the funder is still
// processing the fundee's `channel_ready`, the HTLC will be cached and
// eventually settled.
func testUpdateOnFunderPendingOpenChannels(ht *lntest.HarnessTest) {
// Grab the channel participants.
alice, bob := ht.Alice, ht.Bob
// Restart Alice with the config so she won't process Bob's
// channel_ready msg immediately.
ht.RestartNodeWithExtraArgs(alice, []string{
"--dev.processchannelreadywait=10s",
})
// Make sure Alice and Bob are connected.
ht.EnsureConnected(alice, bob)
// Create a new channel that requires 1 confs before it's considered
// open.
params := lntest.OpenChannelParams{
Amt: funding.MaxBtcFundingAmount,
PushAmt: funding.MaxBtcFundingAmount / 2,
}
pendingChan := ht.OpenChannelAssertPending(alice, bob, params)
chanPoint := &lnrpc.ChannelPoint{
FundingTxid: &lnrpc.ChannelPoint_FundingTxidBytes{
FundingTxidBytes: pendingChan.Txid,
},
OutputIndex: pendingChan.OutputIndex,
}
// Alice and Bob should both consider the channel pending open.
ht.AssertNumPendingOpenChannels(alice, 1)
ht.AssertNumPendingOpenChannels(bob, 1)
// Mine one block to confirm the funding transaction.
ht.MineBlocksAndAssertNumTxes(1, 1)
// TODO(yy): we've prematurely marked the channel as open before
// processing channel ready messages. We need to mark it as open after
// we've processed channel ready messages and change the check to,
// ht.AssertNumPendingOpenChannels(alice, 1)
ht.AssertNumPendingOpenChannels(alice, 0)
// Bob will consider the channel open as there's no wait time to send
// and receive Alice's channel_ready message.
ht.AssertNumPendingOpenChannels(bob, 0)
// Alice and Bob now have different view of the channel. For Bob,
// since the channel_ready messages are processed, he will have a
// working link to route HTLCs. For Alice, because she hasn't handled
// Bob's channel_ready, there's no active link yet.
//
// Alice now adds an invoice.
req := &lnrpc.Invoice{
RPreimage: ht.Random32Bytes(),
Value: 10_000,
}
invoice := alice.RPC.AddInvoice(req)
// Bob sends an `update_add_htlc`, which would result in this message
// being cached in Alice's `peer.Brontide` and the payment will stay
// in-flight instead of being failed by Alice.
bobReq := &routerrpc.SendPaymentRequest{
PaymentRequest: invoice.PaymentRequest,
TimeoutSeconds: 60,
FeeLimitMsat: noFeeLimitMsat,
}
bobStream := bob.RPC.SendPayment(bobReq)
ht.AssertPaymentStatusFromStream(bobStream, lnrpc.Payment_IN_FLIGHT)
// Wait until Alice finishes processing Bob's channel_ready.
//
// NOTE: no effect before fixing the above TODO.
ht.AssertNumPendingOpenChannels(alice, 0)
// Once Alice sees the channel as active, she will process the cached
// premature `update_add_htlc` and settles the payment.
ht.AssertPaymentStatusFromStream(bobStream, lnrpc.Payment_SUCCEEDED)
// Close the channel.
ht.CloseChannel(alice, chanPoint)
}
// testUpdateOnFundeePendingOpenChannels checks that when the funder sends an
// `update_add_htlc` followed by `channel_ready` while the fundee is still
// processing the funder's `channel_ready`, the HTLC will be cached and
// eventually settled.
func testUpdateOnFundeePendingOpenChannels(ht *lntest.HarnessTest) {
// Grab the channel participants.
alice, bob := ht.Alice, ht.Bob
// Restart Bob with the config so he won't process Alice's
// channel_ready msg immediately.
ht.RestartNodeWithExtraArgs(bob, []string{
"--dev.processchannelreadywait=10s",
})
// Make sure Alice and Bob are connected.
ht.EnsureConnected(alice, bob)
// Create a new channel that requires 1 confs before it's considered
// open.
params := lntest.OpenChannelParams{
Amt: funding.MaxBtcFundingAmount,
}
pendingChan := ht.OpenChannelAssertPending(alice, bob, params)
chanPoint := &lnrpc.ChannelPoint{
FundingTxid: &lnrpc.ChannelPoint_FundingTxidBytes{
FundingTxidBytes: pendingChan.Txid,
},
OutputIndex: pendingChan.OutputIndex,
}
// Alice and Bob should both consider the channel pending open.
ht.AssertNumPendingOpenChannels(alice, 1)
ht.AssertNumPendingOpenChannels(bob, 1)
// Mine one block to confirm the funding transaction.
ht.MineBlocksAndAssertNumTxes(1, 1)
// Alice will consider the channel open as there's no wait time to send
// and receive Bob's channel_ready message.
ht.AssertNumPendingOpenChannels(alice, 0)
// TODO(yy): we've prematurely marked the channel as open before
// processing channel ready messages. We need to mark it as open after
// we've processed channel ready messages and change the check to,
// ht.AssertNumPendingOpenChannels(bob, 1)
ht.AssertNumPendingOpenChannels(bob, 0)
// Alice and Bob now have different view of the channel. For Alice,
// since the channel_ready messages are processed, she will have a
// working link to route HTLCs. For Bob, because he hasn't handled
// Alice's channel_ready, there's no active link yet.
//
// Bob now adds an invoice.
req := &lnrpc.Invoice{
RPreimage: ht.Random32Bytes(),
Value: 10_000,
}
bobInvoice := bob.RPC.AddInvoice(req)
// Alice sends an `update_add_htlc`, which would result in this message
// being cached in Bob's `peer.Brontide` and the payment will stay
// in-flight instead of being failed by Bob.
aliceReq := &routerrpc.SendPaymentRequest{
PaymentRequest: bobInvoice.PaymentRequest,
TimeoutSeconds: 60,
FeeLimitMsat: noFeeLimitMsat,
}
aliceStream := alice.RPC.SendPayment(aliceReq)
ht.AssertPaymentStatusFromStream(aliceStream, lnrpc.Payment_IN_FLIGHT)
// Wait until Bob finishes processing Alice's channel_ready.
//
// NOTE: no effect before fixing the above TODO.
ht.AssertNumPendingOpenChannels(bob, 0)
// Once Bob sees the channel as active, he will process the cached
// premature `update_add_htlc` and settles the payment.
ht.AssertPaymentStatusFromStream(aliceStream, lnrpc.Payment_SUCCEEDED)
// Close the channel.
ht.CloseChannel(alice, chanPoint)
}
// verifyCloseUpdate is used to verify that a closed channel update is of the
// expected type.
func verifyCloseUpdate(chanUpdate *lnrpc.ChannelEventUpdate,

23
lncfg/dev.go Normal file

@ -0,0 +1,23 @@
//go:build !integration
package lncfg
import "time"
// IsDevBuild returns a bool to indicate whether we are in a development
// environment.
//
// NOTE: always return false here.
func IsDevBuild() bool {
return false
}
// DevConfig specifies development configs used for production. This struct
// should always remain empty.
type DevConfig struct{}
// ChannelReadyWait returns the config value, which is always 0 for production
// build.
func (d *DevConfig) ChannelReadyWait() time.Duration {
return 0
}

26
lncfg/dev_integration.go Normal file

@ -0,0 +1,26 @@
//go:build integration
package lncfg
import "time"
// IsDevBuild returns a bool to indicate whether we are in a development
// environment.
//
// NOTE: always return true here.
func IsDevBuild() bool {
return true
}
// DevConfig specifies configs used for integration tests. These configs can
// only be used in tests and must NOT be exported for production usage.
//
//nolint:lll
type DevConfig struct {
ProcessChannelReadyWait time.Duration `long:"processchannelreadywait" description:"Time to sleep before processing remote node's channel_ready message."`
}
// ChannelReadyWait returns the config value `ProcessChannelReadyWait`.
func (d *DevConfig) ChannelReadyWait() time.Duration {
return d.ProcessChannelReadyWait
}

82
lnpeer/mock_peer.go Normal file

@ -0,0 +1,82 @@
package lnpeer
import (
"net"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/stretchr/testify/mock"
)
// MockPeer implements the `lnpeer.Peer` interface.
type MockPeer struct {
mock.Mock
}
// Compile time assertion that MockPeer implements lnpeer.Peer.
var _ Peer = (*MockPeer)(nil)
func (m *MockPeer) SendMessage(sync bool, msgs ...lnwire.Message) error {
args := m.Called(sync, msgs)
return args.Error(0)
}
func (m *MockPeer) SendMessageLazy(sync bool, msgs ...lnwire.Message) error {
args := m.Called(sync, msgs)
return args.Error(0)
}
func (m *MockPeer) AddNewChannel(channel *channeldb.OpenChannel,
cancel <-chan struct{}) error {
args := m.Called(channel, cancel)
return args.Error(0)
}
func (m *MockPeer) AddPendingChannel(cid lnwire.ChannelID,
cancel <-chan struct{}) error {
args := m.Called(cid, cancel)
return args.Error(0)
}
func (m *MockPeer) RemovePendingChannel(cid lnwire.ChannelID) error {
args := m.Called(cid)
return args.Error(0)
}
func (m *MockPeer) WipeChannel(op *wire.OutPoint) {
m.Called(op)
}
func (m *MockPeer) PubKey() [33]byte {
args := m.Called()
return args.Get(0).([33]byte)
}
func (m *MockPeer) IdentityKey() *btcec.PublicKey {
args := m.Called()
return args.Get(0).(*btcec.PublicKey)
}
func (m *MockPeer) Address() net.Addr {
args := m.Called()
return args.Get(0).(net.Addr)
}
func (m *MockPeer) QuitSignal() <-chan struct{} {
args := m.Called()
return args.Get(0).(<-chan struct{})
}
func (m *MockPeer) LocalFeatures() *lnwire.FeatureVector {
args := m.Called()
return args.Get(0).(*lnwire.FeatureVector)
}
func (m *MockPeer) RemoteFeatures() *lnwire.FeatureVector {
args := m.Called()
return args.Get(0).(*lnwire.FeatureVector)
}

@ -27,6 +27,13 @@ type Peer interface {
// to be added if the cancel channel is closed.
AddNewChannel(channel *channeldb.OpenChannel, cancel <-chan struct{}) error
// AddPendingChannel adds a pending open channel ID to the peer. The
// channel should fail to be added if the cancel chan is closed.
AddPendingChannel(cid lnwire.ChannelID, cancel <-chan struct{}) error
// RemovePendingChannel removes a pending open channel ID to the peer.
RemovePendingChannel(cid lnwire.ChannelID) error
// WipeChannel removes the channel uniquely identified by its channel
// point from all indexes associated with the peer.
WipeChannel(*wire.OutPoint)

@ -88,8 +88,13 @@ type outgoingMsg struct {
// the receiver of the request to report when the channel creation process has
// completed.
type newChannelMsg struct {
// channel is used when the pending channel becomes active.
channel *channeldb.OpenChannel
err chan error
// channelID is used when there's a new pending channel.
channelID lnwire.ChannelID
err chan error
}
type customMsg struct {
@ -427,6 +432,14 @@ type Brontide struct {
// channels to the source peer which handled the funding workflow.
newActiveChannel chan *newChannelMsg
// newPendingChannel is used by the fundingManager to send pending open
// channels to the source peer which handled the funding workflow.
newPendingChannel chan *newChannelMsg
// removePendingChannel is used by the fundingManager to cancel pending
// open channels to the source peer when the funding flow is failed.
removePendingChannel chan *newChannelMsg
// activeMsgStreams is a map from channel id to the channel streams that
// proxy messages to individual, active links.
activeMsgStreams map[lnwire.ChannelID]*msgStream
@ -493,7 +506,8 @@ func NewBrontide(cfg Config) *Brontide {
activeChannels: &lnutils.SyncMap[
lnwire.ChannelID, *lnwallet.LightningChannel,
]{},
newActiveChannel: make(chan *newChannelMsg, 1),
newActiveChannel: make(chan *newChannelMsg, 1),
newPendingChannel: make(chan *newChannelMsg, 1),
activeMsgStreams: make(map[lnwire.ChannelID]*msgStream),
activeChanCloses: make(map[lnwire.ChannelID]*chancloser.ChanCloser),
@ -1593,8 +1607,7 @@ out:
case *lnwire.ChannelReestablish:
targetChan = msg.ChanID
isLinkUpdate = p.isActiveChannel(targetChan) ||
p.isPendingChannel(targetChan)
isLinkUpdate = p.hasChannel(targetChan)
// If we failed to find the link in question, and the
// message received was a channel sync message, then
@ -1611,9 +1624,22 @@ out:
}
}
// For messages that implement the LinkUpdater interface, we
// will consider them as link updates and send them to
// chanStream. These messages will be queued inside chanStream
// if the channel is not active yet.
case LinkUpdater:
targetChan = msg.TargetChanID()
isLinkUpdate = p.isActiveChannel(targetChan)
isLinkUpdate = p.hasChannel(targetChan)
// Log an error if we don't have this channel. This
// means the peer has sent us a message with unknown
// channel ID.
if !isLinkUpdate {
p.log.Errorf("Unknown channel ID: %v found "+
"in received msg=%s", targetChan,
nextMsg.MsgType())
}
case *lnwire.ChannelUpdate,
*lnwire.ChannelAnnouncement,
@ -1647,20 +1673,7 @@ out:
if isLinkUpdate {
// If this is a channel update, then we need to feed it
// into the channel's in-order message stream.
chanStream, ok := p.activeMsgStreams[targetChan]
if !ok {
// If a stream hasn't yet been created, then
// we'll do so, add it to the map, and finally
// start it.
chanStream = newChanMsgStream(p, targetChan)
p.activeMsgStreams[targetChan] = chanStream
chanStream.Start()
defer chanStream.Stop()
}
// With the stream obtained, add the message to the
// stream so we can continue processing message.
chanStream.AddMsg(nextMsg)
p.sendLinkUpdateMsg(targetChan, nextMsg)
}
idleTimer.Reset(idleTimeout)
@ -1728,6 +1741,13 @@ func (p *Brontide) isPendingChannel(chanID lnwire.ChannelID) bool {
return channel == nil
}
// hasChannel returns true if the peer has a pending/active channel specified
// by the channel ID.
func (p *Brontide) hasChannel(chanID lnwire.ChannelID) bool {
_, ok := p.activeChannels.Load(chanID)
return ok
}
// storeError stores an error in our peer's buffer of recent errors with the
// current timestamp. Errors are only stored if we have at least one active
// channel with the peer to mitigate a dos vector where a peer costlessly
@ -2382,12 +2402,25 @@ func (p *Brontide) channelManager() {
out:
for {
select {
// A new pending channel has arrived which means we are about
// to complete a funding workflow and is waiting for the final
// `ChannelReady` messages to be exchanged. We will add this
// channel to the `activeChannels` with a nil value to indicate
// this is a pending channel.
case req := <-p.newPendingChannel:
p.handleNewPendingChannel(req)
// A new channel has arrived which means we've just completed a
// funding workflow. We'll initialize the necessary local
// state, and notify the htlc switch of a new link.
case req := <-p.newActiveChannel:
p.handleNewActiveChannel(req)
// The funding flow for a pending channel is failed, we will
// remove it from Brontide.
case req := <-p.removePendingChannel:
p.handleRemovePendingChannel(req)
// We've just received a local request to close an active
// channel. It will either kick of a cooperative channel
// closure negotiation, or be a notification of a breached
@ -3484,6 +3517,72 @@ func (p *Brontide) AddNewChannel(channel *channeldb.OpenChannel,
}
}
// AddPendingChannel adds a pending open channel to the peer. The channel
// should fail to be added if the cancel channel is closed.
//
// NOTE: Part of the lnpeer.Peer interface.
func (p *Brontide) AddPendingChannel(cid lnwire.ChannelID,
cancel <-chan struct{}) error {
errChan := make(chan error, 1)
newChanMsg := &newChannelMsg{
channelID: cid,
err: errChan,
}
select {
case p.newPendingChannel <- newChanMsg:
case <-cancel:
return errors.New("canceled adding pending channel")
case <-p.quit:
return lnpeer.ErrPeerExiting
}
// We pause here to wait for the peer to recognize the new pending
// channel before we close the channel barrier corresponding to the
// channel.
select {
case err := <-errChan:
return err
case <-cancel:
return errors.New("canceled adding pending channel")
case <-p.quit:
return lnpeer.ErrPeerExiting
}
}
// RemovePendingChannel removes a pending open channel from the peer.
//
// NOTE: Part of the lnpeer.Peer interface.
func (p *Brontide) RemovePendingChannel(cid lnwire.ChannelID) error {
errChan := make(chan error, 1)
newChanMsg := &newChannelMsg{
channelID: cid,
err: errChan,
}
select {
case p.removePendingChannel <- newChanMsg:
case <-p.quit:
return lnpeer.ErrPeerExiting
}
// We pause here to wait for the peer to respond to the cancellation of
// the pending channel before we close the channel barrier
// corresponding to the channel.
select {
case err := <-errChan:
return err
case <-p.quit:
return lnpeer.ErrPeerExiting
}
}
// StartTime returns the time at which the connection was established if the
// peer started successfully, and zero otherwise.
func (p *Brontide) StartTime() time.Time {
@ -3654,7 +3753,7 @@ func (p *Brontide) attachChannelEventSubscription() error {
// updateNextRevocation updates the existing channel's next revocation if it's
// nil.
func (p *Brontide) updateNextRevocation(c *channeldb.OpenChannel) {
func (p *Brontide) updateNextRevocation(c *channeldb.OpenChannel) error {
chanPoint := &c.FundingOutpoint
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
@ -3664,32 +3763,35 @@ func (p *Brontide) updateNextRevocation(c *channeldb.OpenChannel) {
// currentChan should exist, but we perform a check anyway to avoid nil
// pointer dereference.
if !loaded {
p.log.Errorf("missing active channel with chanID=%v", chanID)
return
return fmt.Errorf("missing active channel with chanID=%v",
chanID)
}
// currentChan should not be nil, but we perform a check anyway to
// avoid nil pointer dereference.
if currentChan == nil {
p.log.Errorf("found nil active channel with chanID=%v", chanID)
return
return fmt.Errorf("found nil active channel with chanID=%v",
chanID)
}
// If we're being sent a new channel, and our existing channel doesn't
// have the next revocation, then we need to update the current
// existing channel.
if currentChan.RemoteNextRevocation() != nil {
return
return nil
}
p.log.Infof("Processing retransmitted ChannelReady for "+
"ChannelPoint(%v)", chanPoint)
nextRevoke := c.RemoteNextRevocation
err := currentChan.InitNextRevocation(nextRevoke)
if err != nil {
p.log.Errorf("unable to init chan revocation: %v", err)
return fmt.Errorf("unable to init next revocation: %w", err)
}
return nil
}
// addActiveChannel adds a new active channel to the `activeChannels` map. It
@ -3718,7 +3820,6 @@ func (p *Brontide) addActiveChannel(c *channeldb.OpenChannel) error {
// Store the channel in the activeChannels map.
p.activeChannels.Store(chanID, lnChan)
p.addedChannels.Store(chanID, struct{}{})
p.log.Infof("New channel active ChannelPoint(%v) with peer", chanPoint)
@ -3776,7 +3877,11 @@ func (p *Brontide) handleNewActiveChannel(req *newChannelMsg) {
// Handle it and close the err chan on the request.
close(req.err)
p.updateNextRevocation(newChan)
// Update the next revocation point.
if err := p.updateNextRevocation(newChan); err != nil {
p.log.Errorf(err.Error())
}
return
}
@ -3793,3 +3898,91 @@ func (p *Brontide) handleNewActiveChannel(req *newChannelMsg) {
// Close the err chan if everything went fine.
close(req.err)
}
// handleNewPendingChannel takes a `newChannelMsg` request and add it to
// `activeChannels` map with nil value. This pending channel will be saved as
// it may become active in the future. Once active, the funding manager will
// send it again via `AddNewChannel`, and we'd handle the link creation there.
func (p *Brontide) handleNewPendingChannel(req *newChannelMsg) {
defer close(req.err)
chanID := req.channelID
// If we already have this channel, something is wrong with the funding
// flow as it will only be marked as active after `ChannelReady` is
// handled. In this case, we will do nothing but log an error, just in
// case this is a legit channel.
if p.isActiveChannel(chanID) {
p.log.Errorf("Channel(%v) is already active, ignoring "+
"pending channel request", chanID)
return
}
// The channel has already been added, we will do nothing and return.
if p.isPendingChannel(chanID) {
p.log.Infof("Channel(%v) is already added, ignoring "+
"pending channel request", chanID)
return
}
// This is a new channel, we now add it to the map `activeChannels`
// with nil value and mark it as a newly added channel in
// `addedChannels`.
p.activeChannels.Store(chanID, nil)
p.addedChannels.Store(chanID, struct{}{})
}
// handleRemovePendingChannel takes a `newChannelMsg` request and removes it
// from `activeChannels` map. The request will be ignored if the channel is
// considered active by Brontide. Noop if the channel ID cannot be found.
func (p *Brontide) handleRemovePendingChannel(req *newChannelMsg) {
defer close(req.err)
chanID := req.channelID
// If we already have this channel, something is wrong with the funding
// flow as it will only be marked as active after `ChannelReady` is
// handled. In this case, we will log an error and exit.
if p.isActiveChannel(chanID) {
p.log.Errorf("Channel(%v) is active, ignoring remove request",
chanID)
return
}
// The channel has not been added yet, we will log a warning as there
// is an unexpected call from funding manager.
if !p.isPendingChannel(chanID) {
p.log.Warnf("Channel(%v) not found, removing it anyway", chanID)
}
// Remove the record of this pending channel.
p.activeChannels.Delete(chanID)
p.addedChannels.Delete(chanID)
}
// sendLinkUpdateMsg sends a message that updates the channel to the
// channel's message stream.
func (p *Brontide) sendLinkUpdateMsg(cid lnwire.ChannelID, msg lnwire.Message) {
p.log.Tracef("Sending link update msg=%v", msg.MsgType())
chanStream, ok := p.activeMsgStreams[cid]
if !ok {
// If a stream hasn't yet been created, then we'll do so, add
// it to the map, and finally start it.
chanStream = newChanMsgStream(p, cid)
p.activeMsgStreams[cid] = chanStream
chanStream.Start()
// Stop the stream when quit.
go func() {
<-p.quit
chanStream.Stop()
}()
}
// With the stream obtained, add the message to the stream so we can
// continue processing message.
chanStream.AddMsg(msg)
}

@ -16,6 +16,7 @@ import (
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lntest/mock"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chancloser"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/pool"
@ -1118,3 +1119,224 @@ func TestPeerCustomMessage(t *testing.T) {
require.Equal(t, remoteKey, receivedCustom.peer)
require.Equal(t, receivedCustomMsg, &receivedCustom.msg)
}
// TestUpdateNextRevocation checks that the method `updateNextRevocation` is
// behave as expected.
func TestUpdateNextRevocation(t *testing.T) {
t.Parallel()
require := require.New(t)
// TODO(yy): create interface for lnwallet.LightningChannel so we can
// easily mock it without the following setups.
notifier := &mock.ChainNotifier{
SpendChan: make(chan *chainntnfs.SpendDetail),
EpochChan: make(chan *chainntnfs.BlockEpoch),
ConfChan: make(chan *chainntnfs.TxConfirmation),
}
broadcastTxChan := make(chan *wire.MsgTx)
mockSwitch := &mockMessageSwitch{}
alicePeer, bobChan, err := createTestPeer(
t, notifier, broadcastTxChan, noUpdate, mockSwitch,
)
require.NoError(err, "unable to create test channels")
// testChannel is used to test the updateNextRevocation function.
testChannel := bobChan.State()
// Update the next revocation for a known channel should give us no
// error.
err = alicePeer.updateNextRevocation(testChannel)
require.NoError(err, "expected no error")
// Test an error is returned when the chanID cannot be found in
// `activeChannels` map.
testChannel.FundingOutpoint = wire.OutPoint{Index: 0}
err = alicePeer.updateNextRevocation(testChannel)
require.Error(err, "expected an error")
// Test an error is returned when the chanID's corresponding channel is
// nil.
testChannel.FundingOutpoint = wire.OutPoint{Index: 1}
chanID := lnwire.NewChanIDFromOutPoint(&testChannel.FundingOutpoint)
alicePeer.activeChannels.Store(chanID, nil)
err = alicePeer.updateNextRevocation(testChannel)
require.Error(err, "expected an error")
// TODO(yy): should also test `InitNextRevocation` is called on
// `lnwallet.LightningWallet` once it's interfaced.
}
// TODO(yy): add test for `addActiveChannel` and `handleNewActiveChannel` once
// we have interfaced `lnwallet.LightningChannel` and
// `*contractcourt.ChainArbitrator`.
// TestHandleNewPendingChannel checks the method `handleNewPendingChannel`
// behaves as expected.
func TestHandleNewPendingChannel(t *testing.T) {
t.Parallel()
// Create three channel IDs for testing.
chanIDActive := lnwire.ChannelID{0}
chanIDNotExist := lnwire.ChannelID{1}
chanIDPending := lnwire.ChannelID{2}
// Create a test brontide.
dummyConfig := Config{}
peer := NewBrontide(dummyConfig)
// Create the test state.
peer.activeChannels.Store(chanIDActive, &lnwallet.LightningChannel{})
peer.activeChannels.Store(chanIDPending, nil)
// Assert test state, we should have two channels store, one active and
// one pending.
require.Equal(t, 2, peer.activeChannels.Len())
testCases := []struct {
name string
chanID lnwire.ChannelID
// expectChanAdded specifies whether this chanID will be added
// to the peer's state.
expectChanAdded bool
}{
{
name: "noop on active channel",
chanID: chanIDActive,
expectChanAdded: false,
},
{
name: "noop on pending channel",
chanID: chanIDPending,
expectChanAdded: false,
},
{
name: "new channel should be added",
chanID: chanIDNotExist,
expectChanAdded: true,
},
}
for _, tc := range testCases {
tc := tc
// Create a request for testing.
errChan := make(chan error, 1)
req := &newChannelMsg{
channelID: tc.chanID,
err: errChan,
}
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
require := require.New(t)
// Get the number of channels before mutating the
// state.
numChans := peer.activeChannels.Len()
// Call the method.
peer.handleNewPendingChannel(req)
// Add one if we expect this channel to be added.
if tc.expectChanAdded {
numChans++
}
// Assert the number of channels is correct.
require.Equal(numChans, peer.activeChannels.Len())
// Assert the request's error chan is closed.
err, ok := <-req.err
require.False(ok, "expect err chan to be closed")
require.NoError(err, "expect no error")
})
}
}
// TestHandleRemovePendingChannel checks the method
// `handleRemovePendingChannel` behaves as expected.
func TestHandleRemovePendingChannel(t *testing.T) {
t.Parallel()
// Create three channel IDs for testing.
chanIDActive := lnwire.ChannelID{0}
chanIDNotExist := lnwire.ChannelID{1}
chanIDPending := lnwire.ChannelID{2}
// Create a test brontide.
dummyConfig := Config{}
peer := NewBrontide(dummyConfig)
// Create the test state.
peer.activeChannels.Store(chanIDActive, &lnwallet.LightningChannel{})
peer.activeChannels.Store(chanIDPending, nil)
// Assert test state, we should have two channels store, one active and
// one pending.
require.Equal(t, 2, peer.activeChannels.Len())
testCases := []struct {
name string
chanID lnwire.ChannelID
// expectDeleted specifies whether this chanID will be removed
// from the peer's state.
expectDeleted bool
}{
{
name: "noop on active channel",
chanID: chanIDActive,
expectDeleted: false,
},
{
name: "pending channel should be removed",
chanID: chanIDPending,
expectDeleted: true,
},
{
name: "noop on non-exist channel",
chanID: chanIDNotExist,
expectDeleted: false,
},
}
for _, tc := range testCases {
tc := tc
// Create a request for testing.
errChan := make(chan error, 1)
req := &newChannelMsg{
channelID: tc.chanID,
err: errChan,
}
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
require := require.New(t)
// Get the number of channels before mutating the
// state.
numChans := peer.activeChannels.Len()
// Call the method.
peer.handleRemovePendingChannel(req)
// Minus one if we expect this channel to be removed.
if tc.expectDeleted {
numChans--
}
// Assert the number of channels is correct.
require.Equal(numChans, peer.activeChannels.Len())
// Assert the request's error chan is closed.
err, ok := <-req.err
require.False(ok, "expect err chan to be closed")
require.NoError(err, "expect no error")
})
}
}

@ -1276,8 +1276,18 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
return ourPolicy, err
}
// Get the development config for funding manager. If we are not in
// development mode, this would be nil.
var devCfg *funding.DevConfig
if lncfg.IsDevBuild() {
devCfg = &funding.DevConfig{
ProcessChannelReadyWait: cfg.Dev.ChannelReadyWait(),
}
}
//nolint:lll
s.fundingMgr, err = funding.NewFundingManager(funding.Config{
Dev: devCfg,
NoWumboChans: !cfg.ProtocolOptions.Wumbo(),
IDKey: nodeKeyDesc.PubKey,
IDKeyLoc: nodeKeyDesc.KeyLocator,