rpc: Add PendingOpenChannel to SubscribeChannelEvents

This commit adds PendingOpenChannel to SubscribeChannelEvents stream in
the gRPC API.
This is useful for keeping track of channel openings that Autopilot does.
It can also be used for the non-initator side of a channel opening to keep
track of channel openings.
This commit is contained in:
Hampus Sjöberg 2019-09-29 12:13:01 +02:00
parent 51d8c11eb5
commit 227e66c469
9 changed files with 720 additions and 644 deletions

View File

@ -20,6 +20,13 @@ type ChannelNotifier struct {
chanDB *channeldb.DB
}
// PendingOpenChannelEvent represents a new event where a new channel has
// entered a pending open state.
type PendingOpenChannelEvent struct {
// ChannelPoint is the channelpoint for the new channel.
ChannelPoint *wire.OutPoint
}
// OpenChannelEvent represents a new event where a channel goes from pending
// open to open.
type OpenChannelEvent struct {
@ -82,6 +89,16 @@ func (c *ChannelNotifier) SubscribeChannelEvents() (*subscribe.Client, error) {
return c.ntfnServer.Subscribe()
}
// NotifyPendingOpenChannelEvent notifies the channelEventNotifier goroutine that a
// new channel is pending.
func (c *ChannelNotifier) NotifyPendingOpenChannelEvent(chanPoint wire.OutPoint) {
event := PendingOpenChannelEvent{ChannelPoint: &chanPoint}
if err := c.ntfnServer.SendUpdate(event); err != nil {
log.Warnf("Unable to send pending open channel update: %v", err)
}
}
// NotifyOpenChannelEvent notifies the channelEventNotifier goroutine that a
// channel has gone from pending open to open.
func (c *ChannelNotifier) NotifyOpenChannelEvent(chanPoint wire.OutPoint) {

View File

@ -363,6 +363,10 @@ type fundingConfig struct {
// and on the requesting node's public key that returns a bool which tells
// the funding manager whether or not to accept the channel.
OpenChannelPredicate chanacceptor.ChannelAcceptor
// NotifyPendingOpenChannelEvent informs the ChannelNotifier when channels
// enter a pending state.
NotifyPendingOpenChannelEvent func(wire.OutPoint)
}
// fundingManager acts as an orchestrator/bridge between the wallet's
@ -1691,6 +1695,10 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
f.localDiscoverySignals[channelID] = make(chan struct{})
f.localDiscoveryMtx.Unlock()
// Inform the ChannelNotifier that the channel has entered
// pending open state.
f.cfg.NotifyPendingOpenChannelEvent(fundingOut)
// At this point we have sent our last funding message to the
// initiating peer before the funding transaction will be broadcast.
// With this last message, our job as the responder is now complete.
@ -1835,6 +1843,9 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
select {
case resCtx.updates <- upd:
// Inform the ChannelNotifier that the channel has entered
// pending open state.
f.cfg.NotifyPendingOpenChannelEvent(*fundingPoint)
case <-f.quit:
return
}

View File

@ -376,11 +376,12 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
publTxChan <- txn
return nil
},
ZombieSweeperInterval: 1 * time.Hour,
ReservationTimeout: 1 * time.Nanosecond,
MaxPendingChannels: DefaultMaxPendingChannels,
NotifyOpenChannelEvent: func(wire.OutPoint) {},
OpenChannelPredicate: chainedAcceptor,
ZombieSweeperInterval: 1 * time.Hour,
ReservationTimeout: 1 * time.Nanosecond,
MaxPendingChannels: DefaultMaxPendingChannels,
NotifyOpenChannelEvent: func(wire.OutPoint) {},
OpenChannelPredicate: chainedAcceptor,
NotifyPendingOpenChannelEvent: func(wire.OutPoint) {},
}
for _, op := range options {

File diff suppressed because it is too large Load Diff

View File

@ -1930,6 +1930,7 @@ message ChannelEventUpdate {
ChannelCloseSummary closed_channel = 2 [ json_name = "closed_channel" ];
ChannelPoint active_channel = 3 [ json_name = "active_channel" ];
ChannelPoint inactive_channel = 4 [ json_name = "inactive_channel" ];
PendingUpdate pending_open_channel = 6 [json_name = "pending_open_channel"];
}
enum UpdateType {
@ -1937,6 +1938,7 @@ message ChannelEventUpdate {
CLOSED_CHANNEL = 1;
ACTIVE_CHANNEL = 2;
INACTIVE_CHANNEL = 3;
PENDING_OPEN_CHANNEL = 4;
}
UpdateType type = 5 [ json_name = "type" ];

View File

@ -1467,7 +1467,8 @@
"OPEN_CHANNEL",
"CLOSED_CHANNEL",
"ACTIVE_CHANNEL",
"INACTIVE_CHANNEL"
"INACTIVE_CHANNEL",
"PENDING_OPEN_CHANNEL"
],
"default": "OPEN_CHANNEL"
},
@ -2149,6 +2150,9 @@
"inactive_channel": {
"$ref": "#/definitions/lnrpcChannelPoint"
},
"pending_open_channel": {
"$ref": "#/definitions/lnrpcPendingUpdate"
},
"type": {
"$ref": "#/definitions/ChannelEventUpdateUpdateType"
}

View File

@ -6460,25 +6460,34 @@ func testBasicChannelCreationAndUpdates(net *lntest.NetworkHarness, t *harnessTe
// Since each of the channels just became open, Bob and Alice should
// each receive an open and an active notification for each channel.
var numChannelUpds int
const totalNtfns = 2 * numChannels
const totalNtfns = 3 * numChannels
verifyOpenUpdatesReceived := func(sub channelSubscription) error {
numChannelUpds = 0
for numChannelUpds < totalNtfns {
select {
case update := <-sub.updateChan:
switch update.Type {
case lnrpc.ChannelEventUpdate_ACTIVE_CHANNEL:
if numChannelUpds%2 != 1 {
return fmt.Errorf("expected open" +
"channel ntfn, got active " +
case lnrpc.ChannelEventUpdate_PENDING_OPEN_CHANNEL:
if numChannelUpds%3 != 0 {
return fmt.Errorf("expected " +
"open or active" +
"channel ntfn, got pending open " +
"channel ntfn instead")
}
case lnrpc.ChannelEventUpdate_OPEN_CHANNEL:
if numChannelUpds%2 != 0 {
return fmt.Errorf("expected active" +
if numChannelUpds%3 != 1 {
return fmt.Errorf("expected " +
"pending open or active" +
"channel ntfn, got open" +
"channel ntfn instead")
}
case lnrpc.ChannelEventUpdate_ACTIVE_CHANNEL:
if numChannelUpds%3 != 2 {
return fmt.Errorf("expected " +
"pending open or open" +
"channel ntfn, got active " +
"channel ntfn instead")
}
default:
return fmt.Errorf("update type mismatch: "+
"expected open or active channel "+

View File

@ -3187,6 +3187,16 @@ func (r *rpcServer) SubscribeChannelEvents(req *lnrpc.ChannelEventSubscription,
case e := <-channelEventSub.Updates():
var update *lnrpc.ChannelEventUpdate
switch event := e.(type) {
case channelnotifier.PendingOpenChannelEvent:
update = &lnrpc.ChannelEventUpdate{
Type: lnrpc.ChannelEventUpdate_PENDING_OPEN_CHANNEL,
Channel: &lnrpc.ChannelEventUpdate_PendingOpenChannel{
PendingOpenChannel: &lnrpc.PendingUpdate{
Txid: event.ChannelPoint.Hash[:],
OutputIndex: event.ChannelPoint.Index,
},
},
}
case channelnotifier.OpenChannelEvent:
channel, err := createRPCOpenChannel(r, graph,
event.Channel, true)
@ -5601,7 +5611,9 @@ func (r *rpcServer) SubscribeChannelBackups(req *lnrpc.ChannelBackupSubscription
switch e.(type) {
// We only care about new/closed channels, so we'll
// skip any events for active/inactive channels.
// skip any events for pending/active/inactive channels.
case channelnotifier.PendingOpenChannelEvent:
continue
case channelnotifier.ActiveChannelEvent:
continue
case channelnotifier.InactiveChannelEvent:

View File

@ -1113,13 +1113,14 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
// channel bandwidth.
return uint16(input.MaxHTLCNumber / 2)
},
ZombieSweeperInterval: 1 * time.Minute,
ReservationTimeout: 10 * time.Minute,
MinChanSize: btcutil.Amount(cfg.MinChanSize),
MaxPendingChannels: cfg.MaxPendingChannels,
RejectPush: cfg.RejectPush,
NotifyOpenChannelEvent: s.channelNotifier.NotifyOpenChannelEvent,
OpenChannelPredicate: chanPredicate,
ZombieSweeperInterval: 1 * time.Minute,
ReservationTimeout: 10 * time.Minute,
MinChanSize: btcutil.Amount(cfg.MinChanSize),
MaxPendingChannels: cfg.MaxPendingChannels,
RejectPush: cfg.RejectPush,
NotifyOpenChannelEvent: s.channelNotifier.NotifyOpenChannelEvent,
OpenChannelPredicate: chanPredicate,
NotifyPendingOpenChannelEvent: s.channelNotifier.NotifyPendingOpenChannelEvent,
})
if err != nil {
return nil, err