From 4835fdf237255db648ec776b0503ce222f291535 Mon Sep 17 00:00:00 2001 From: Keagan McClelland Date: Mon, 29 Jul 2024 15:14:58 -0700 Subject: [PATCH 1/2] fn: Add new Req type to abstract the pattern of remote processing. It is common throughout the codebase to send data to a remote goroutine for processing. Typically, along with the data we are processing, we also send a one-shot channel where we intend to listen for the response. This type encapsulates that pattern. --- fn/req.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 fn/req.go diff --git a/fn/req.go b/fn/req.go new file mode 100644 index 000000000..0b200b44e --- /dev/null +++ b/fn/req.go @@ -0,0 +1,34 @@ +package fn + +// Req is a type to encapsulate RPC-like calls wherein we send some data +// structure as a request, as well as a channel to receive the response on where +// the remote goroutine will send the result. +// +// NOTE: This construct should only be used for request/response patterns for +// which there is only a single response for the request. +type Req[Input any, Output any] struct { + // Request is the data we are sending to the remote goroutine for + // processing. + Request Input + + // Response is the channel on which we will receive the result of the + // remote computation. + Response chan<- Output +} + +// NewReq is the base constructor of the Req type. It returns both the packaged +// Req object as well as the receive side of the response channel that we will +// listen on for the response. +func NewReq[Input, Output any](input Input) ( + Req[Input, Output], <-chan Output) { + + // Always buffer the response channel so that the goroutine doing the + // processing job does not block if the original requesting routine + // takes an unreasonably long time to read the response. + responseChan := make(chan Output, 1) + + return Req[Input, Output]{ + Request: input, + Response: responseChan, + }, responseChan +} From 2c6d229a6992c00a283fba392bb047056c747603 Mon Sep 17 00:00:00 2001 From: Keagan McClelland Date: Wed, 31 Jul 2024 12:57:07 -0700 Subject: [PATCH 2/2] fn: harden and refine the Req[I,O] API. In this commit we opt to make the internal response channel fully private and instead expose methods for doing resolution. This prevents internal implementation details from leaking a little bit better than the previous iteration. --- fn/req.go | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/fn/req.go b/fn/req.go index 0b200b44e..5428afd04 100644 --- a/fn/req.go +++ b/fn/req.go @@ -11,9 +11,27 @@ type Req[Input any, Output any] struct { // processing. Request Input - // Response is the channel on which we will receive the result of the + // response is the channel on which we will receive the result of the // remote computation. - Response chan<- Output + response chan<- Output +} + +// Dispatch is a convenience method that lifts a function that transforms the +// Input to the Output type into a full request handling cycle. +func (r *Req[Input, Output]) Dispatch(handler func(Input) Output) { + r.Resolve(handler(r.Request)) +} + +// Resolve is a function that is used to send a value of the Output type back +// to the requesting thread. +func (r *Req[Input, Output]) Resolve(output Output) { + select { + case r.response <- output: + default: + // We do nothing here because the only situation in which this + // case will fire is if the request handler attempts to resolve + // a request more than once which is explicitly forbidden. + } } // NewReq is the base constructor of the Req type. It returns both the packaged @@ -29,6 +47,6 @@ func NewReq[Input, Output any](input Input) ( return Req[Input, Output]{ Request: input, - Response: responseChan, + response: responseChan, }, responseChan }