mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-08-04 02:36:17 +02:00
multi: add RPC middleware interception
With the middleware handler in place, we now need to add a new gRPC interceptor to the interceptor chain that will send messages to the registered middlewares for each event that could be of interest to them.
This commit is contained in:
@@ -75,7 +75,12 @@ var (
|
||||
"starting up, but not yet ready to accept calls")
|
||||
|
||||
// macaroonWhitelist defines methods that we don't require macaroons to
|
||||
// access.
|
||||
// access. We also allow these methods to be called even if not all
|
||||
// mandatory middlewares are registered yet. If the wallet is locked
|
||||
// then a middleware cannot register itself, creating an impossible
|
||||
// situation. Also, a middleware might want to check the state of lnd
|
||||
// by calling the State service before it registers itself. So we also
|
||||
// need to exclude those calls from the mandatory middleware check.
|
||||
macaroonWhitelist = map[string]struct{}{
|
||||
// We allow all calls to the WalletUnlocker without macaroons.
|
||||
"/lnrpc.WalletUnlocker/GenSeed": {},
|
||||
@@ -91,8 +96,43 @@ var (
|
||||
)
|
||||
|
||||
// InterceptorChain is a struct that can be added to the running GRPC server,
|
||||
// intercepting API calls. This is useful for logging, enforcing permissions
|
||||
// etc.
|
||||
// intercepting API calls. This is useful for logging, enforcing permissions,
|
||||
// supporting middleware etc. The following diagram shows the order of each
|
||||
// interceptor in the chain and when exactly requests/responses are intercepted
|
||||
// and forwarded to external middleware for approval/modification. Middleware in
|
||||
// general can only intercept gRPC requests/responses that are sent by the
|
||||
// client with a macaroon that contains a custom caveat that is supported by one
|
||||
// of the registered middlewares.
|
||||
//
|
||||
// |
|
||||
// | gRPC request from client
|
||||
// |
|
||||
// +---v--------------------------------+
|
||||
// | InterceptorChain |
|
||||
// +-+----------------------------------+
|
||||
// | Log Interceptor |
|
||||
// +----------------------------------+
|
||||
// | RPC State Interceptor |
|
||||
// +----------------------------------+
|
||||
// | Macaroon Interceptor |
|
||||
// +----------------------------------+--------> +---------------------+
|
||||
// | RPC Macaroon Middleware Handler |<-------- | External Middleware |
|
||||
// +----------------------------------+ | - approve request |
|
||||
// | Prometheus Interceptor | +---------------------+
|
||||
// +-+--------------------------------+
|
||||
// | validated gRPC request from client
|
||||
// +---v--------------------------------+
|
||||
// | main gRPC server |
|
||||
// +---+--------------------------------+
|
||||
// |
|
||||
// | original gRPC request to client
|
||||
// |
|
||||
// +---v--------------------------------+--------> +---------------------+
|
||||
// | RPC Macaroon Middleware Handler |<-------- | External Middleware |
|
||||
// +---+--------------------------------+ | - modify response |
|
||||
// | +---------------------+
|
||||
// | edited gRPC request to client
|
||||
// v
|
||||
type InterceptorChain struct {
|
||||
// Required by the grpc-gateway/v2 library for forward compatibility.
|
||||
lnrpc.UnimplementedStateServer
|
||||
@@ -117,9 +157,22 @@ type InterceptorChain struct {
|
||||
// permissionMap is the permissions to enforce if macaroons are used.
|
||||
permissionMap map[string][]bakery.Op
|
||||
|
||||
// rpcsLog is the logger used to log calles to the RPCs intercepted.
|
||||
// rpcsLog is the logger used to log calls to the RPCs intercepted.
|
||||
rpcsLog btclog.Logger
|
||||
|
||||
// registeredMiddleware is a map of all macaroon permission based RPC
|
||||
// middleware clients that are currently registered. The map is keyed
|
||||
// by the middleware's name.
|
||||
registeredMiddleware map[string]*MiddlewareHandler
|
||||
|
||||
// mandatoryMiddleware is a list of all middleware that is considered to
|
||||
// be mandatory. If any of them is not registered then all RPC requests
|
||||
// (except for the macaroon white listed methods and the middleware
|
||||
// registration itself) are blocked. This is a security feature to make
|
||||
// sure that requests can't just go through unobserved/unaudited if a
|
||||
// middleware crashes.
|
||||
mandatoryMiddleware []string
|
||||
|
||||
quit chan struct{}
|
||||
sync.RWMutex
|
||||
}
|
||||
@@ -129,14 +182,18 @@ type InterceptorChain struct {
|
||||
var _ lnrpc.StateServer = (*InterceptorChain)(nil)
|
||||
|
||||
// NewInterceptorChain creates a new InterceptorChain.
|
||||
func NewInterceptorChain(log btclog.Logger, noMacaroons bool) *InterceptorChain {
|
||||
func NewInterceptorChain(log btclog.Logger, noMacaroons bool,
|
||||
mandatoryMiddleware []string) *InterceptorChain {
|
||||
|
||||
return &InterceptorChain{
|
||||
state: waitingToStart,
|
||||
ntfnServer: subscribe.NewServer(),
|
||||
noMacaroons: noMacaroons,
|
||||
permissionMap: make(map[string][]bakery.Op),
|
||||
rpcsLog: log,
|
||||
quit: make(chan struct{}),
|
||||
state: waitingToStart,
|
||||
ntfnServer: subscribe.NewServer(),
|
||||
noMacaroons: noMacaroons,
|
||||
permissionMap: make(map[string][]bakery.Op),
|
||||
rpcsLog: log,
|
||||
registeredMiddleware: make(map[string]*MiddlewareHandler),
|
||||
mandatoryMiddleware: mandatoryMiddleware,
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -241,7 +298,7 @@ func rpcStateToWalletState(state rpcState) (lnrpc.WalletState, error) {
|
||||
// state will always be delivered immediately.
|
||||
//
|
||||
// NOTE: Part of the StateService interface.
|
||||
func (r *InterceptorChain) SubscribeState(req *lnrpc.SubscribeStateRequest,
|
||||
func (r *InterceptorChain) SubscribeState(_ *lnrpc.SubscribeStateRequest,
|
||||
stream lnrpc.State_SubscribeStateServer) error {
|
||||
|
||||
sendStateUpdate := func(state rpcState) error {
|
||||
@@ -302,9 +359,9 @@ func (r *InterceptorChain) SubscribeState(req *lnrpc.SubscribeStateRequest,
|
||||
}
|
||||
}
|
||||
|
||||
// GetState returns he current wallet state.
|
||||
// GetState returns the current wallet state.
|
||||
func (r *InterceptorChain) GetState(_ context.Context,
|
||||
req *lnrpc.GetStateRequest) (*lnrpc.GetStateResponse, error) {
|
||||
_ *lnrpc.GetStateRequest) (*lnrpc.GetStateResponse, error) {
|
||||
|
||||
r.RLock()
|
||||
state := r.state
|
||||
@@ -359,6 +416,78 @@ func (r *InterceptorChain) Permissions() map[string][]bakery.Op {
|
||||
return c
|
||||
}
|
||||
|
||||
// RegisterMiddleware registers a new middleware that will handle request/
|
||||
// response interception for all RPC messages that are initiated with a custom
|
||||
// macaroon caveat. The name of the custom caveat a middleware is handling is
|
||||
// also its unique identifier. Only one middleware can be registered for each
|
||||
// custom caveat.
|
||||
func (r *InterceptorChain) RegisterMiddleware(mw *MiddlewareHandler) error {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
// The name of the middleware is the unique identifier.
|
||||
registered, ok := r.registeredMiddleware[mw.middlewareName]
|
||||
if ok {
|
||||
return fmt.Errorf("a middleware with the name '%s' is already "+
|
||||
"registered", registered.middlewareName)
|
||||
}
|
||||
|
||||
// For now, we only want one middleware per custom caveat name. If we
|
||||
// allowed multiple middlewares handling the same caveat there would be
|
||||
// a need for extra call chaining logic, and they could overwrite each
|
||||
// other's responses.
|
||||
for name, middleware := range r.registeredMiddleware {
|
||||
if middleware.customCaveatName == mw.customCaveatName {
|
||||
return fmt.Errorf("a middleware is already registered "+
|
||||
"for the custom caveat name '%s': %v",
|
||||
mw.customCaveatName, name)
|
||||
}
|
||||
}
|
||||
|
||||
r.registeredMiddleware[mw.middlewareName] = mw
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoveMiddleware removes the middleware that handles the given custom caveat
|
||||
// name.
|
||||
func (r *InterceptorChain) RemoveMiddleware(middlewareName string) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
log.Debugf("Removing middleware %s", middlewareName)
|
||||
|
||||
delete(r.registeredMiddleware, middlewareName)
|
||||
}
|
||||
|
||||
// CustomCaveatSupported makes sure a middleware that handles the given custom
|
||||
// caveat name is registered. If none is, an error is returned, signalling to
|
||||
// the macaroon bakery and its validator to reject macaroons that have a custom
|
||||
// caveat with that name.
|
||||
//
|
||||
// NOTE: This method is part of the macaroons.CustomCaveatAcceptor interface.
|
||||
func (r *InterceptorChain) CustomCaveatSupported(customCaveatName string) error {
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
|
||||
// We only accept requests with a custom caveat if we also have a
|
||||
// middleware registered that handles that custom caveat. That is
|
||||
// crucial for security! Otherwise a request with an encumbered (=has
|
||||
// restricted permissions based upon the custom caveat condition)
|
||||
// macaroon would not be validated against the limitations that the
|
||||
// custom caveat implicate. Since the map is keyed by the _name_ of the
|
||||
// middleware, we need to loop through all of them to see if one has
|
||||
// the given custom macaroon caveat name.
|
||||
for _, middleware := range r.registeredMiddleware {
|
||||
if middleware.customCaveatName == customCaveatName {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Errorf("cannot accept macaroon with custom caveat '%s', "+
|
||||
"no middleware registered to handle it", customCaveatName)
|
||||
}
|
||||
|
||||
// CreateServerOpts creates the GRPC server options that can be added to a GRPC
|
||||
// server in order to add this InterceptorChain.
|
||||
func (r *InterceptorChain) CreateServerOpts() []grpc.ServerOption {
|
||||
@@ -393,6 +522,15 @@ func (r *InterceptorChain) CreateServerOpts() []grpc.ServerOption {
|
||||
strmInterceptors, r.MacaroonStreamServerInterceptor(),
|
||||
)
|
||||
|
||||
// Next, we'll add the interceptors for our custom macaroon caveat based
|
||||
// middleware.
|
||||
unaryInterceptors = append(
|
||||
unaryInterceptors, r.middlewareUnaryServerInterceptor(),
|
||||
)
|
||||
strmInterceptors = append(
|
||||
strmInterceptors, r.middlewareStreamServerInterceptor(),
|
||||
)
|
||||
|
||||
// Get interceptors for Prometheus to gather gRPC performance metrics.
|
||||
// If monitoring is disabled, GetPromInterceptors() will return empty
|
||||
// slices.
|
||||
@@ -614,3 +752,245 @@ func (r *InterceptorChain) rpcStateStreamServerInterceptor() grpc.StreamServerIn
|
||||
return handler(srv, ss)
|
||||
}
|
||||
}
|
||||
|
||||
// middlewareUnaryServerInterceptor is a unary gRPC interceptor that intercepts
|
||||
// all requests and responses that are sent with a macaroon containing a custom
|
||||
// caveat condition that is handled by registered middleware.
|
||||
func (r *InterceptorChain) middlewareUnaryServerInterceptor() grpc.UnaryServerInterceptor {
|
||||
return func(ctx context.Context,
|
||||
req interface{}, info *grpc.UnaryServerInfo,
|
||||
handler grpc.UnaryHandler) (interface{}, error) {
|
||||
|
||||
// Make sure we don't allow any requests through if one of the
|
||||
// mandatory middlewares is missing.
|
||||
fullMethod := info.FullMethod
|
||||
if err := r.checkMandatoryMiddleware(fullMethod); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
msg, err := NewMessageInterceptionRequest(
|
||||
ctx, TypeRequest, false, info.FullMethod, req,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = r.acceptRequest(msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, respErr := handler(ctx, req)
|
||||
if respErr != nil {
|
||||
return resp, respErr
|
||||
}
|
||||
|
||||
return r.interceptResponse(ctx, false, info.FullMethod, resp)
|
||||
}
|
||||
}
|
||||
|
||||
// middlewareStreamServerInterceptor is a streaming gRPC interceptor that
|
||||
// intercepts all requests and responses that are sent with a macaroon
|
||||
// containing a custom caveat condition that is handled by registered
|
||||
// middleware.
|
||||
func (r *InterceptorChain) middlewareStreamServerInterceptor() grpc.StreamServerInterceptor {
|
||||
return func(srv interface{},
|
||||
ss grpc.ServerStream, info *grpc.StreamServerInfo,
|
||||
handler grpc.StreamHandler) error {
|
||||
|
||||
// Don't intercept the interceptor itself which is a streaming
|
||||
// RPC too!
|
||||
fullMethod := info.FullMethod
|
||||
if fullMethod == lnrpc.RegisterRPCMiddlewareURI {
|
||||
return handler(srv, ss)
|
||||
}
|
||||
|
||||
// Make sure we don't allow any requests through if one of the
|
||||
// mandatory middlewares is missing. We add this check here to
|
||||
// make sure the middleware registration itself can still be
|
||||
// called.
|
||||
if err := r.checkMandatoryMiddleware(fullMethod); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// To give the middleware a chance to accept or reject the
|
||||
// establishment of the stream itself (and not only when the
|
||||
// first message is sent on the stream), we send an intercept
|
||||
// request for the stream auth now:
|
||||
msg, err := NewStreamAuthInterceptionRequest(
|
||||
ss.Context(), info.FullMethod,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = r.acceptRequest(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
wrappedSS := &serverStreamWrapper{
|
||||
ServerStream: ss,
|
||||
fullMethod: info.FullMethod,
|
||||
interceptor: r,
|
||||
}
|
||||
|
||||
return handler(srv, wrappedSS)
|
||||
}
|
||||
}
|
||||
|
||||
// checkMandatoryMiddleware makes sure that each of the middlewares declared as
|
||||
// mandatory is currently registered.
|
||||
func (r *InterceptorChain) checkMandatoryMiddleware(fullMethod string) error {
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
|
||||
// Allow calls that are whitelisted for macaroons as well, otherwise we
|
||||
// get into impossible situations where the wallet is locked but the
|
||||
// unlock call is denied because the middleware isn't registered. But
|
||||
// the middleware cannot register itself because the wallet is locked.
|
||||
if _, ok := macaroonWhitelist[fullMethod]; ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Not a white listed call so make sure every mandatory middleware is
|
||||
// currently connected to lnd.
|
||||
for _, name := range r.mandatoryMiddleware {
|
||||
if _, ok := r.registeredMiddleware[name]; !ok {
|
||||
return fmt.Errorf("mandatory middleware '%s' is "+
|
||||
"currently not registered, not allowing any "+
|
||||
"RPC calls", name)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// acceptRequest sends an intercept request to all middlewares that have
|
||||
// registered for it. This means either a middleware has requested read-only
|
||||
// access or the request actually has a macaroon which a caveat the middleware
|
||||
// registered for.
|
||||
func (r *InterceptorChain) acceptRequest(msg *InterceptionRequest) error {
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
|
||||
for _, middleware := range r.registeredMiddleware {
|
||||
// If there is a custom caveat in the macaroon, make sure the
|
||||
// middleware registered for it. Or if a middleware registered
|
||||
// for read-only mode, it also gets the request.
|
||||
hasCustomCaveat := macaroons.HasCustomCaveat(
|
||||
msg.Macaroon, middleware.customCaveatName,
|
||||
)
|
||||
if !hasCustomCaveat && !middleware.readOnly {
|
||||
continue
|
||||
}
|
||||
|
||||
resp, err := middleware.intercept(msg)
|
||||
|
||||
// Error during interception itself.
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Error returned from middleware client.
|
||||
if resp.err != nil {
|
||||
return resp.err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// interceptResponse sends out an intercept request for an RPC response. Since
|
||||
// middleware that hasn't registered for the read-only mode has the option to
|
||||
// overwrite/replace the response, this needs to be handled differently than the
|
||||
// request/auth path above.
|
||||
func (r *InterceptorChain) interceptResponse(ctx context.Context,
|
||||
isStream bool, fullMethod string, m interface{}) (interface{}, error) {
|
||||
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
|
||||
currentMessage := m
|
||||
for _, middleware := range r.registeredMiddleware {
|
||||
msg, err := NewMessageInterceptionRequest(
|
||||
ctx, TypeResponse, isStream, fullMethod, currentMessage,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If there is a custom caveat in the macaroon, make sure the
|
||||
// middleware registered for it. Or if a middleware registered
|
||||
// for read-only mode, it also gets the request.
|
||||
hasCustomCaveat := macaroons.HasCustomCaveat(
|
||||
msg.Macaroon, middleware.customCaveatName,
|
||||
)
|
||||
if !hasCustomCaveat && !middleware.readOnly {
|
||||
continue
|
||||
}
|
||||
|
||||
resp, err := middleware.intercept(msg)
|
||||
|
||||
// Error during interception itself.
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Error returned from middleware client.
|
||||
if resp.err != nil {
|
||||
return nil, resp.err
|
||||
}
|
||||
|
||||
// The message was replaced, make sure the next middleware in
|
||||
// line receives the updated message.
|
||||
if !middleware.readOnly && resp.replace {
|
||||
currentMessage = resp.replacement
|
||||
}
|
||||
}
|
||||
|
||||
return currentMessage, nil
|
||||
}
|
||||
|
||||
// serverStreamWrapper is a struct that wraps a server stream in a way that all
|
||||
// requests and responses can be intercepted individually.
|
||||
type serverStreamWrapper struct {
|
||||
// ServerStream is the stream that's being wrapped.
|
||||
grpc.ServerStream
|
||||
|
||||
fullMethod string
|
||||
|
||||
interceptor *InterceptorChain
|
||||
}
|
||||
|
||||
// SendMsg is called when lnd sends a message to the client. This is wrapped to
|
||||
// intercept streaming RPC responses.
|
||||
func (w *serverStreamWrapper) SendMsg(m interface{}) error {
|
||||
newMsg, err := w.interceptor.interceptResponse(
|
||||
w.ServerStream.Context(), true, w.fullMethod, m,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return w.ServerStream.SendMsg(newMsg)
|
||||
}
|
||||
|
||||
// RecvMsg is called when lnd wants to receive a message from the client. This
|
||||
// is wrapped to intercept streaming RPC requests.
|
||||
func (w *serverStreamWrapper) RecvMsg(m interface{}) error {
|
||||
err := w.ServerStream.RecvMsg(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
msg, err := NewMessageInterceptionRequest(
|
||||
w.ServerStream.Context(), TypeRequest, true, w.fullMethod,
|
||||
m,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return w.interceptor.acceptRequest(msg)
|
||||
}
|
||||
|
Reference in New Issue
Block a user