routerrpc: add htlc notifier subscribed event

This commit is contained in:
Joost Jager 2022-07-11 15:02:17 +02:00
parent 281cdd4209
commit b85cda2a1d
No known key found for this signature in database
GPG Key ID: B9A26449A5528325
6 changed files with 484 additions and 376 deletions

File diff suppressed because it is too large Load Diff

View File

@ -618,6 +618,7 @@ message HtlcEvent {
ForwardFailEvent forward_fail_event = 8;
SettleEvent settle_event = 9;
LinkFailEvent link_fail_event = 10;
SubscribedEvent subscribed_event = 11;
}
}
@ -648,6 +649,9 @@ message SettleEvent {
bytes preimage = 1;
}
message SubscribedEvent {
}
message LinkFailEvent {
// Info contains details about the htlc that we failed.
HtlcInfo info = 1;

View File

@ -1317,6 +1317,9 @@
},
"link_fail_event": {
"$ref": "#/definitions/routerrpcLinkFailEvent"
},
"subscribed_event": {
"$ref": "#/definitions/routerrpcSubscribedEvent"
}
},
"title": "HtlcEvent contains the htlc event that was processed. These are served on a\nbest-effort basis; events are not persisted, delivery is not guaranteed\n(in the event of a crash in the switch, forward events may be lost) and\nsome events may be replayed upon restart. Events consumed from this package\nshould be de-duplicated by the htlc's unique combination of incoming and\noutgoing channel id and htlc id. [EXPERIMENTAL]"
@ -1748,6 +1751,9 @@
}
}
},
"routerrpcSubscribedEvent": {
"type": "object"
},
"routerrpcUpdateChanStatusRequest": {
"type": "object",
"properties": {

View File

@ -900,6 +900,16 @@ func (s *Server) SubscribeHtlcEvents(req *SubscribeHtlcEventsRequest,
}
defer htlcClient.Cancel()
// Send out an initial subscribed event so that the caller knows the
// point from which new events will be transmitted.
if err := stream.Send(&HtlcEvent{
Event: &HtlcEvent_SubscribedEvent{
SubscribedEvent: &SubscribedEvent{},
},
}); err != nil {
return err
}
for {
select {
case event := <-htlcClient.Updates():

View File

@ -168,6 +168,7 @@ out:
if err != nil {
t.Fatalf("could not subscribe events: %v", err)
}
assertSubscribed(t, aliceEvents)
bobEvents, err := net.Bob.RouterClient.SubscribeHtlcEvents(
ctxt, &routerrpc.SubscribeHtlcEventsRequest{},
@ -175,6 +176,7 @@ out:
if err != nil {
t.Fatalf("could not subscribe events: %v", err)
}
assertSubscribed(t, bobEvents)
carolEvents, err := carol.RouterClient.SubscribeHtlcEvents(
ctxt, &routerrpc.SubscribeHtlcEventsRequest{},
@ -182,6 +184,7 @@ out:
if err != nil {
t.Fatalf("could not subscribe events: %v", err)
}
assertSubscribed(t, carolEvents)
// For the first scenario, we'll test the cancellation of an HTLC with
// an unknown payment hash.

View File

@ -172,6 +172,7 @@ func testMultiHopPayments(net *lntest.NetworkHarness, t *harnessTest) {
if err != nil {
t.Fatalf("could not subscribe events: %v", err)
}
assertSubscribed(t, aliceEvents)
bobEvents, err := net.Bob.RouterClient.SubscribeHtlcEvents(
ctxt, &routerrpc.SubscribeHtlcEventsRequest{},
@ -179,6 +180,7 @@ func testMultiHopPayments(net *lntest.NetworkHarness, t *harnessTest) {
if err != nil {
t.Fatalf("could not subscribe events: %v", err)
}
assertSubscribed(t, bobEvents)
carolEvents, err := carol.RouterClient.SubscribeHtlcEvents(
ctxt, &routerrpc.SubscribeHtlcEventsRequest{},
@ -186,6 +188,7 @@ func testMultiHopPayments(net *lntest.NetworkHarness, t *harnessTest) {
if err != nil {
t.Fatalf("could not subscribe events: %v", err)
}
assertSubscribed(t, carolEvents)
daveEvents, err := dave.RouterClient.SubscribeHtlcEvents(
ctxt, &routerrpc.SubscribeHtlcEventsRequest{},
@ -193,6 +196,7 @@ func testMultiHopPayments(net *lntest.NetworkHarness, t *harnessTest) {
if err != nil {
t.Fatalf("could not subscribe events: %v", err)
}
assertSubscribed(t, daveEvents)
// Using Carol as the source, pay to the 5 invoices from Bob created
// above.
@ -399,6 +403,14 @@ func assertEventAndType(t *harnessTest, eventType routerrpc.HtlcEvent_EventType,
return event
}
func assertSubscribed(t *harnessTest,
client routerrpc.Router_SubscribeHtlcEventsClient) {
event, err := client.Recv()
require.NoError(t.t, err)
require.NotNil(t.t, event.GetSubscribedEvent())
}
// updateChannelPolicy updates the channel policy of node to the
// given fees and timelock delta. This function blocks until
// listenerNode has received the policy update.