diff --git a/fundingmanager.go b/fundingmanager.go index 8a1323bda..43134b220 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -182,9 +182,10 @@ type fundingConfig struct { // announcement from the backing Lighting Network node. CurrentNodeAnnouncement func() (lnwire.NodeAnnouncement, error) - // SendAnnouncement is used by the FundingManager to announce newly - // created channels to the rest of the Lightning Network. - SendAnnouncement func(msg lnwire.Message) error + // SendLocalAnnouncement is used by the FundingManager to send + // announcement messages to the Gossiper to possibly broadcast + // to the greater network. + SendLocalAnnouncement func(msg lnwire.Message) error // SendToPeer allows the FundingManager to send messages to the peer // node during the multiple steps involved in the creation of the @@ -260,6 +261,10 @@ type fundingManager struct { // funding workflows. activeReservations map[serializedPubKey]pendingChannels + // pendingChanAnnPrefs is a map which stores a pending channel's id + // along with its announcement preference. + pendingChanAnnPrefs map[[32]byte]bool + // signedReservations is a utility map that maps the permanent channel // ID of a funding reservation to its temporary channel ID. This is // required as mid funding flow, we switch to referencing the channel @@ -315,6 +320,12 @@ const ( // fundingLocked message has successfully been sent to the other peer, // but we still haven't announced the channel to the network. fundingLockedSent + + // addedToRouterGraph is the opening state of a channel if the + // channel has been successfully added to the router graph + // immediately after the fundingLocked message has been sent, but + // we still haven't announced the channel to the network. + addedToRouterGraph ) var ( @@ -323,6 +334,12 @@ var ( // of being opened. channelOpeningStateBucket = []byte("channelOpeningState") + // openChanAnnPrefBucket is the database bucket used to store each + // channel's announcement preference signalled by the LSB of + // channel_flags of the open_channel message in the funding workflow. + // It only stores open channels' announcement preferences. + openChanAnnPrefBucket = []byte("open_chan_ann") + // ErrChannelNotFound is returned when we are looking for a specific // channel opening state in the FundingManager's internal database, but // the channel in question is not considered being in an opening state. @@ -342,6 +359,7 @@ func newFundingManager(cfg fundingConfig) (*fundingManager, error) { fundingRequests: make(chan *initFundingMsg, msgBufferSize), localDiscoverySignals: make(map[lnwire.ChannelID]chan struct{}), handleFundingLockedBarriers: make(map[lnwire.ChannelID]struct{}), + pendingChanAnnPrefs: make(map[[32]byte]bool), queries: make(chan interface{}, 1), quit: make(chan struct{}), }, nil @@ -457,11 +475,6 @@ func (f *fundingManager) Start() error { f.newChanBarriers[chanID] = make(chan struct{}) f.barrierMtx.Unlock() - // Set up a localDiscoverySignals to make sure we finish sending - // our own fundingLocked and channel announcements before - // processing a received fundingLocked. - f.localDiscoverySignals[chanID] = make(chan struct{}) - // If we did find the channel in the opening state database, we // have seen the funding transaction being confirmed, but we // did not finish the rest of the setup procedure before we shut @@ -486,17 +499,92 @@ func (f *fundingManager) Start() error { case fundingLockedSent: // fundingLocked was sent to peer, but the channel + // was not added to the router graph and the channel // announcement was not sent. f.wg.Add(1) go func(dbChan *channeldb.OpenChannel) { defer f.wg.Done() - err = f.sendChannelAnnouncement(dbChan, shortChanID) + // Retrieve channel announcement preference + private, err := f.getOpenChanAnnPref( + &dbChan.FundingOutpoint) if err != nil { - fndgLog.Errorf("error sending channel "+ - "announcement: %v", err) + fndgLog.Errorf("unable to retrieve channel "+ + "announcement preference: %v", err) return } + + lnChannel, err := lnwallet.NewLightningChannel( + nil, nil, f.cfg.FeeEstimator, dbChan) + if err != nil { + fndgLog.Errorf("error creating "+ + "lightning channel: %v", err) + return + } + defer lnChannel.Stop() + + err = f.addToRouterGraph(dbChan, lnChannel, + shortChanID, private) + if err != nil { + fndgLog.Errorf("failed adding to "+ + "router graph: %v", err) + return + } + + if !private { + err = f.annAfterSixConfs(dbChan, + lnChannel, shortChanID) + if err != nil { + fndgLog.Errorf("error sending channel "+ + "announcement: %v", err) + return + } + } + }(channel) + + case addedToRouterGraph: + // The channel was added to the Router's topology, but + // the channel announcement was not sent. + f.wg.Add(1) + go func(dbChan *channeldb.OpenChannel) { + defer f.wg.Done() + // Retrieve channel announcement preference + private, err := f.getOpenChanAnnPref( + &dbChan.FundingOutpoint) + if err != nil { + fndgLog.Errorf("unable to retrieve channel "+ + "announcement preference: %v", err) + return + } + + lnChannel, err := lnwallet.NewLightningChannel( + nil, nil, f.cfg.FeeEstimator, dbChan) + if err != nil { + fndgLog.Errorf("error creating "+ + "lightning channel: %v", err) + return + } + defer lnChannel.Stop() + + if private { + // We delete the channel from our internal + // database. + err := f.deleteChannelOpeningState( + &channel.FundingOutpoint) + if err != nil { + fndgLog.Errorf("error deleting "+ + "channel state: %v", err) + return + } + } else { + err = f.annAfterSixConfs(channel, + lnChannel, shortChanID) + if err != nil { + fndgLog.Errorf("error sending channel "+ + "announcement: %v", err) + return + } + } }(channel) default: @@ -641,6 +729,7 @@ func (f *fundingManager) failFundingFlow(peer *btcec.PublicKey, return } + delete(f.pendingChanAnnPrefs, tempChanID) f.cancelReservationCtx(peer, tempChanID) return } @@ -737,7 +826,6 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { // number and send ErrorGeneric to remote peer if condition is // violated. peerIDKey := newSerializedKey(fmsg.peerAddress.IdentityKey) - msg := fmsg.msg amt := msg.FundingAmount @@ -846,6 +934,16 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { } f.resMtx.Unlock() + // Save the announcement preference of this pending channel + if msg.ChannelFlags&1 == 0 { + // This channel WILL be announced to the greater network later. + f.pendingChanAnnPrefs[msg.PendingChannelID] = false + } else { + // This channel WILL NOT be announced to the greater network + // later. + f.pendingChanAnnPrefs[msg.PendingChannelID] = true + } + // Using the RequiredRemoteDelay closure, we'll compute the remote CSV // delay we require given the total amount of funds within the channel. remoteCsvDelay := f.cfg.RequiredRemoteDelay(amt) @@ -1080,6 +1178,12 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { peerKey, pendingChanID[:]) return } + private, ok := f.pendingChanAnnPrefs[pendingChanID] + if !ok { + fndgLog.Warnf("can't find channel announcement preference for"+ + "chanID:%x", pendingChanID[:]) + return + } // The channel initiator has responded with the funding outpoint of the // final funding transaction, as well as a signature for our version of @@ -1106,10 +1210,32 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { return } + // A new channel has almost finished the funding process. In order to + // properly synchronize with the writeHandler goroutine, we add a new + // channel to the barriers map which will be closed once the channel is + // fully open. + f.barrierMtx.Lock() + channelID := lnwire.NewChanIDFromOutPoint(&fundingOut) + fndgLog.Debugf("Creating chan barrier for ChanID(%v)", channelID) + f.newChanBarriers[channelID] = make(chan struct{}) + f.barrierMtx.Unlock() + + // Store channel announcement preference in boltdb + if err = f.saveOpenChanAnnPref(&fundingOut, private); err != nil { + fndgLog.Errorf("unable to save channel announcement preference "+ + "to db for chanID:%x", channelID) + } + // If something goes wrong before the funding transaction is confirmed, // we use this convenience method to delete the pending OpenChannel // from the database. deleteFromDatabase := func() { + err := f.deleteOpenChanAnnPref(&completeChan.FundingOutpoint) + if err != nil { + fndgLog.Errorf("Failed to delete channel announcement "+ + "preference: %v", err) + } + closeInfo := &channeldb.ChannelCloseSummary{ ChanPoint: completeChan.FundingOutpoint, ChainHash: completeChan.ChainHash, @@ -1184,7 +1310,9 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { // completely forget about this channel if we haven't seen the funding // transaction in 288 blocks (~ 48 hrs), by canceling the reservation // and canceling the wait for the funding confirmation. + f.wg.Add(1) go func() { + defer f.wg.Done() confChan := make(chan *lnwire.ShortChannelID) timeoutChan := make(chan struct{}) go f.waitForFundingWithTimeout(completeChan, confChan, @@ -1263,6 +1391,12 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { pendingChanID, []byte(err.Error())) return } + private, ok := f.pendingChanAnnPrefs[pendingChanID] + if !ok { + fndgLog.Warnf("can't find channel announcement preference for"+ + "chanID:%x", pendingChanID[:]) + return + } // Create an entry in the local discovery map so we can ensure that we // process the channel confirmation fully before we receive a funding @@ -1273,6 +1407,12 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { f.localDiscoverySignals[permChanID] = make(chan struct{}) f.localDiscoveryMtx.Unlock() + // Store channel announcement preference in boltdb + if err = f.saveOpenChanAnnPref(fundingPoint, private); err != nil { + fndgLog.Errorf("unable to save channel announcement preference "+ + "to db for chanID:%x", permChanID) + } + // The remote peer has responded with a signature for our commitment // transaction. We'll verify the signature for validity, then commit // the state to disk as we can now open the channel. @@ -1531,15 +1671,31 @@ func (f *fundingManager) waitForFundingConfirmation(completeChan *channeldb.Open case <-f.quit: return } + + // This closes the discoverySignal channel, indicating to a separate + // goroutine that it is acceptable to process funding locked messages + // from the peer. + f.localDiscoveryMtx.Lock() + if discoverySignal, ok := f.localDiscoverySignals[chanID]; ok { + close(discoverySignal) + } + f.localDiscoveryMtx.Unlock() } // handleFundingConfirmation is a wrapper method for creating a new -// lnwallet.LightningChannel object, calling sendFundingLocked, and for calling -// sendChannelAnnouncement. This is called after the funding transaction is +// lnwallet.LightningChannel object, calling sendFundingLocked, addToRouterGraph, +// and annAfterSixConfs. This is called after the funding transaction is // confirmed. func (f *fundingManager) handleFundingConfirmation(completeChan *channeldb.OpenChannel, shortChanID *lnwire.ShortChannelID) error { + // Retrieve channel announcement preference + private, err := f.getOpenChanAnnPref(&completeChan.FundingOutpoint) + if err != nil { + return fmt.Errorf("unable to retrieve channel announcement "+ + "preference: %v", err) + } + // We create the state-machine object which wraps the database state. lnChannel, err := lnwallet.NewLightningChannel(nil, nil, f.cfg.FeeEstimator, completeChan) @@ -1559,10 +1715,23 @@ func (f *fundingManager) handleFundingConfirmation(completeChan *channeldb.OpenC if err != nil { return fmt.Errorf("failed sending fundingLocked: %v", err) } - err = f.sendChannelAnnouncement(completeChan, shortChanID) + err = f.addToRouterGraph(completeChan, lnChannel, shortChanID, private) if err != nil { - return fmt.Errorf("failed sending channel announcement: %v", - err) + return fmt.Errorf("failed adding to router graph: %v", err) + } + + if private { + // We delete the channel from our internal database. + err := f.deleteChannelOpeningState(&completeChan.FundingOutpoint) + if err != nil { + return fmt.Errorf("error deleting channel state: %v", err) + } + } else { + err = f.annAfterSixConfs(completeChan, lnChannel, shortChanID) + if err != nil { + return fmt.Errorf("failed sending channel announcement: %v", + err) + } } return nil @@ -1644,17 +1813,82 @@ func (f *fundingManager) sendFundingLocked(completeChan *channeldb.OpenChannel, return nil } -// sendChannelAnnouncement broadcast the necessary channel announcement -// messages to the network. Should be called after the fundingLocked message -// is sent (channelState is 'fundingLockedSent') and the channel is ready to -// be used. -func (f *fundingManager) sendChannelAnnouncement(completeChan *channeldb.OpenChannel, - shortChanID *lnwire.ShortChannelID) error { - - // TODO(eugene) wait for 6 confirmations here +// addToRouterGraph sends a private ChannelAnnouncement and a private +// ChannelUpdate to the gossiper so that it is added to the Router's internal +// graph before the announcement_signatures is sent in +// annAfterSixConfs. These private announcement messages are NOT +// broadcasted to the greater network. +func (f *fundingManager) addToRouterGraph(completeChan *channeldb.OpenChannel, + shortChanID *lnwire.ShortChannelID, private bool) error { chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint) + + ann, err := f.newChanAnnouncement(f.cfg.IDKey, completeChan.IdentityPub, + completeChan.LocalChanCfg.MultiSigKey, + completeChan.RemoteChanCfg.MultiSigKey, *shortChanID, chanID) + if err != nil { + return fmt.Errorf("error generating channel "+ + "announcement: %v", err) + } + + // Send ChannelAnnouncement and ChannelUpdate to the gossiper to add + // to the Router's topology. + if err = f.cfg.SendLocalAnnouncement(ann.chanAnn); err != nil { + return fmt.Errorf("error sending private channel "+ + "announcement: %v", err) + } + if err = f.cfg.SendLocalAnnouncement(ann.chanUpdateAnn); err != nil { + return fmt.Errorf("error sending private channel "+ + "update: %v", err) + } + + // As the channel is now added to the ChannelRouter's topology, the + // channel is moved to the next state of the state machine. It will be + // moved to the last state (actually deleted from the database) after + // the channel is finally announced. + err = f.saveChannelOpeningState(&completeChan.FundingOutpoint, addedToRouterGraph, + shortChanID) + if err != nil { + return fmt.Errorf("error setting channel state to"+ + " addedToRouterGraph: %v", err) + } + + return nil +} + +// annAfterSixConfs broadcasts the necessary channel announcement messages to +// the network after 6 confs. Should be called after the fundingLocked message +// is sent and the channel is added to the router graph (channelState is +// 'addedToRouterGraph') and the channel is ready to be used. +func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel, + shortChanID *lnwire.ShortChannelID) error { + + // Register with the ChainNotifier for a notification once the + // funding transaction reaches 6 confirmations. + txid := completeChan.FundingOutpoint.Hash + confNtfn, err := f.cfg.Notifier.RegisterConfirmationsNtfn(&txid, 6, + completeChan.FundingBroadcastHeight) + if err != nil { + return fmt.Errorf("Unable to register for confirmation of "+ + "ChannelPoint(%v): %v", completeChan.FundingOutpoint, err) + } + + // Wait until 6 confirmations has been reached or the wallet signals + // a shutdown. + select { + case _, ok := <-confNtfn.Confirmed: + if !ok { + return fmt.Errorf("ChainNotifier shutting down, cannot "+ + "complete funding flow for ChannelPoint(%v)", + completeChan.FundingOutpoint) + } + case <-f.quit: + return fmt.Errorf("fundingManager shutting down, stopping funding "+ + "flow for ChannelPoint(%v)", completeChan.FundingOutpoint) + } + fundingPoint := completeChan.FundingOutpoint + chanID := lnwire.NewChanIDFromOutPoint(&fundingPoint) fndgLog.Infof("Announcing ChannelPoint(%v), short_chan_id=%v", &fundingPoint, spew.Sdump(shortChanID)) @@ -1679,20 +1913,11 @@ func (f *fundingManager) sendChannelAnnouncement(completeChan *channeldb.OpenCha // we delete the channel from our internal database. We can do this // because we assume the AuthenticatedGossiper queues the announcement // messages, and persists them in case of a daemon shutdown. - err = f.deleteChannelOpeningState(&completeChan.FundingOutpoint) + err = f.deleteChannelOpeningState(&fundingPoint) if err != nil { return fmt.Errorf("error deleting channel state: %v", err) } - // Finally, as the local channel discovery has been fully processed, - // we'll trigger the signal indicating that it's safe for any funding - // locked messages related to this channel to be processed. - f.localDiscoveryMtx.Lock() - if discoverySignal, ok := f.localDiscoverySignals[chanID]; ok { - close(discoverySignal) - } - f.localDiscoveryMtx.Unlock() - return nil } @@ -2010,25 +2235,10 @@ func (f *fundingManager) announceChannel(localIDKey, remoteIDKey, localFundingKe return err } - // With the announcements crafted, we'll now send the announcements to - // the rest of the network. - // - // TODO(roasbeef): add flag that indicates if should be announced or - // not - - // The announcement message consists of three distinct messages: - // 1. channel announcement 2. channel update 3. channel proof - // We must wait for them all to be successfully announced to the - // network, and/ if either fails we consider the announcement - // unsuccessful. - if err = f.cfg.SendAnnouncement(ann.chanAnn); err != nil { - fndgLog.Errorf("Unable to send channel announcement: %v", err) - return err - } - if err = f.cfg.SendAnnouncement(ann.chanUpdateAnn); err != nil { - fndgLog.Errorf("Unable to send channel update: %v", err) - return err - } + // We only send the channel proof announcement and the node announcement + // because addToRouterGraph previously send the ChannelAnnouncement and + // the ChannelUpdate announcement messages. The channel proof and node + // announcements are broadcast to the greater network. if err = f.cfg.SendAnnouncement(ann.chanProof); err != nil { fndgLog.Errorf("Unable to send channel proof: %v", err) return err @@ -2148,6 +2358,18 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { fndgLog.Infof("Starting funding workflow with %v for pendingID(%x)", msg.peerAddress.Address, chanID) + // Save the announcement preference of this pending channel + var channelFlags byte + if msg.openChanReq.private { + // This channel will be private + channelFlags = 1 + f.pendingChanAnnPrefs[chanID] = true + } else { + // This channel will be publicly announced to the greater network. + channelFlags = 0 + f.pendingChanAnnPrefs[chanID] = false + } + fundingOpen := lnwire.OpenChannel{ ChainHash: *f.cfg.Wallet.Cfg.NetParams.GenesisHash, PendingChannelID: chanID, @@ -2248,6 +2470,7 @@ func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) { ) } + delete(f.pendingChanAnnPrefs, chanID) if _, err := f.cancelReservationCtx(peerKey, chanID); err != nil { fndgLog.Warnf("unable to delete reservation: %v", err) return @@ -2316,6 +2539,86 @@ func copyPubKey(pub *btcec.PublicKey) *btcec.PublicKey { } } +// saveOpenChanAnnPref saves an open channel's announcement preference. +func (f *fundingManager) saveOpenChanAnnPref(chanPoint *wire.OutPoint, + pref bool) error { + return f.cfg.Wallet.Cfg.Database.Update(func(tx *bolt.Tx) error { + + bucket, err := tx.CreateBucketIfNotExists(openChanAnnPrefBucket) + if err != nil { + return err + } + + var outpointBytes bytes.Buffer + if err = writeOutpoint(&outpointBytes, chanPoint); err != nil { + return err + } + + scratch := make([]byte, 0) + var b byte + if pref { + b = 1 + } else { + b = 0 + } + scratch = append(scratch[:], b) + + return bucket.Put(outpointBytes.Bytes(), scratch[:]) + }) +} + +// getOpenChanAnnPref retrives an open channel's announcement preference. +func (f *fundingManager) getOpenChanAnnPref(chanPoint *wire.OutPoint) (bool, error) { + + var pref bool + err := f.cfg.Wallet.Cfg.Database.View(func(tx *bolt.Tx) error { + + bucket := tx.Bucket(openChanAnnPrefBucket) + if bucket == nil { + return fmt.Errorf("Channel announcement preference " + + "not found") + } + + var outpointBytes bytes.Buffer + if err := writeOutpoint(&outpointBytes, chanPoint); err != nil { + return err + } + + value := bucket.Get(outpointBytes.Bytes()) + if value == nil { + return fmt.Errorf("Channel announcement preference " + + "not found") + } + + if value[0] == 1 { + pref = true + } + return nil + }) + if err != nil { + return false, err + } + + return pref, nil +} + +// deleteOpenChanAnnPref deletes an open channel's announcement preference. +func (f *fundingManager) deleteOpenChanAnnPref(chanPoint *wire.OutPoint) error { + return f.cfg.Wallet.Cfg.Database.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket(openChanAnnPrefBucket) + if bucket == nil { + return fmt.Errorf("Bucket not found") + } + + var outpointBytes bytes.Buffer + if err := writeOutpoint(&outpointBytes, chanPoint); err != nil { + return err + } + + return bucket.Delete(outpointBytes.Bytes()) + }) +} + // saveChannelOpeningState saves the channelOpeningState for the provided // chanPoint to the channelOpeningStateBucket. func (f *fundingManager) saveChannelOpeningState(chanPoint *wire.OutPoint, @@ -2338,10 +2641,7 @@ func (f *fundingManager) saveChannelOpeningState(chanPoint *wire.OutPoint, byteOrder.PutUint16(scratch[:2], uint16(state)) byteOrder.PutUint64(scratch[2:], shortChanID.ToUint64()) - if err = bucket.Put(outpointBytes.Bytes(), scratch); err != nil { - return err - } - return nil + return bucket.Put(outpointBytes.Bytes(), scratch) }) } @@ -2396,9 +2696,6 @@ func (f *fundingManager) deleteChannelOpeningState(chanPoint *wire.OutPoint) err return err } - if err := bucket.Delete(outpointBytes.Bytes()); err != nil { - return err - } - return nil + return bucket.Delete(outpointBytes.Bytes()) }) }