From 5181f100ed5cf288ee1e8b9a5d99d54db82d2212 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 12 Oct 2018 17:08:14 +0200 Subject: [PATCH] subscribe: add new subscribe package This commit creates a new package 'subscribe', that exposes a common Client-Server subscription system, that can be shared among packages. --- subscribe/subscribe.go | 216 ++++++++++++++++++++++++++++++++++++ subscribe/subscribe_test.go | 110 ++++++++++++++++++ 2 files changed, 326 insertions(+) create mode 100644 subscribe/subscribe.go create mode 100644 subscribe/subscribe_test.go diff --git a/subscribe/subscribe.go b/subscribe/subscribe.go new file mode 100644 index 000000000..1d413031c --- /dev/null +++ b/subscribe/subscribe.go @@ -0,0 +1,216 @@ +package subscribe + +import ( + "errors" + "sync" + "sync/atomic" + + "github.com/lightningnetwork/lnd/queue" +) + +// ErrServerShuttingDown is an error returned in case the server is in the +// process of shutting down. +var ErrServerShuttingDown = errors.New("subscription server shutting down") + +// Client is used to get notified about updates the caller has subscribed to, +type Client struct { + // Cancel should be called in case the client no longer wants to + // subscribe for updates from the server. + Cancel func() + + updates *queue.ConcurrentQueue + quit chan struct{} +} + +// Updates returns a read-only channel where the updates the client has +// subscribed to will be delivered. +func (c *Client) Updates() <-chan interface{} { + return c.updates.ChanOut() +} + +// Quit is a channel that will be closed in case the server decides to no +// longer deliver updates to this client. +func (c *Client) Quit() <-chan struct{} { + return c.quit +} + +// Server is a struct that manages a set of subscriptions and their +// corresponding clients. Any update will be delivered to all active clients. +type Server struct { + clientCounter uint64 // To be used atomically. + + started uint32 // To be used atomically. + stopped uint32 // To be used atomically. + + clients map[uint64]*Client + clientUpdates chan *clientUpdate + + updates chan interface{} + + quit chan struct{} + wg sync.WaitGroup +} + +// clientUpdate is an internal message sent to the subscriptionHandler to +// either register a new client for subscription or cancel an existing +// subscription. +type clientUpdate struct { + // cancel indicates if the update to the client is cancelling an + // existing client's subscription. If not then this update will be to + // subscribe a new client. + cancel bool + + // clientID is the unique identifier for this client. Any further + // updates (deleting or adding) to this notification client will be + // dispatched according to the target clientID. + clientID uint64 + + // client is the new client that will receive updates. Will be nil in + // case this is a cancallation update. + client *Client +} + +// NewServer returns a new Server. +func NewServer() *Server { + return &Server{ + clients: make(map[uint64]*Client), + clientUpdates: make(chan *clientUpdate), + updates: make(chan interface{}), + quit: make(chan struct{}), + } +} + +// Start starts the Server, making it ready to accept subscriptions and +// updates. +func (s *Server) Start() error { + if !atomic.CompareAndSwapUint32(&s.started, 0, 1) { + return nil + } + + s.wg.Add(1) + go s.subscriptionHandler() + + return nil +} + +// Stop stops the server. +func (s *Server) Stop() error { + if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) { + return nil + } + + close(s.quit) + s.wg.Wait() + + return nil +} + +// Subscribe returns a Client that will receive updates any time the Server is +// made aware of a new event. +func (s *Server) Subscribe() (*Client, error) { + // We'll first atomically obtain the next ID for this client from the + // incrementing client ID counter. + clientID := atomic.AddUint64(&s.clientCounter, 1) + + // Create the client that will be returned. The Cancel method is + // populated to send the cancellation intent to the + // subscriptionHandler. + client := &Client{ + updates: queue.NewConcurrentQueue(20), + quit: make(chan struct{}), + Cancel: func() { + select { + case s.clientUpdates <- &clientUpdate{ + cancel: true, + clientID: clientID, + }: + case <-s.quit: + return + } + }, + } + + select { + case s.clientUpdates <- &clientUpdate{ + cancel: false, + clientID: clientID, + client: client, + }: + case <-s.quit: + return nil, ErrServerShuttingDown + } + + return client, nil +} + +// SendUpdate is called to send the passed update to all currently active +// subscription clients. +func (s *Server) SendUpdate(update interface{}) error { + + select { + case s.updates <- update: + return nil + case <-s.quit: + return ErrServerShuttingDown + } +} + +// subscriptionHandler is the main handler for the Server. It will handle +// incoming updates and subscriptions, and forward the incoming updates to the +// registered clients. +// +// NOTE: MUST be run as a goroutine. +func (s *Server) subscriptionHandler() { + defer s.wg.Done() + + for { + select { + + // If a client update is received, the either a new + // subscription becomes active, or we cancel and existing one. + case update := <-s.clientUpdates: + clientID := update.clientID + + // In case this is a cancellation, stop the client's + // underlying queue, and remove the client from the set + // of active subscription clients. + if update.cancel { + client, ok := s.clients[update.clientID] + if ok { + client.updates.Stop() + close(client.quit) + delete(s.clients, clientID) + } + + continue + } + + // If this was not a cancellation, start the underlying + // queue and add the client to our set of subscription + // clients. It will be notified about any new updates + // the server receives. + update.client.updates.Start() + s.clients[update.clientID] = update.client + + // A new update was received, forward it to all active clients. + case upd := <-s.updates: + for _, client := range s.clients { + select { + case client.updates.ChanIn() <- upd: + case <-client.quit: + case <-s.quit: + return + } + } + + // In case the server is shutting down, stop the clients and + // close the quit channels to notify them. + case <-s.quit: + for _, client := range s.clients { + client.updates.Stop() + close(client.quit) + } + return + } + } +} diff --git a/subscribe/subscribe_test.go b/subscribe/subscribe_test.go new file mode 100644 index 000000000..b55f56daa --- /dev/null +++ b/subscribe/subscribe_test.go @@ -0,0 +1,110 @@ +package subscribe_test + +import ( + "testing" + "time" + + "github.com/lightningnetwork/lnd/subscribe" +) + +// TestSubscribe tests that the subscription clients receive the updates sent +// to them after they subscribe, and that cancelled clients don't get more +// updates. +func TestSubscribe(t *testing.T) { + t.Parallel() + + server := subscribe.NewServer() + if err := server.Start(); err != nil { + t.Fatalf("unable to start server") + } + + const numClients = 300 + const numUpdates = 1000 + + var clients [numClients]*subscribe.Client + + // Start by registering two thirds the clients. + for i := 0; i < numClients*2/3; i++ { + c, err := server.Subscribe() + if err != nil { + t.Fatalf("unable to subscribe: %v", err) + } + + clients[i] = c + } + + // Send half the updates. + for i := 0; i < numUpdates/2; i++ { + if err := server.SendUpdate(i); err != nil { + t.Fatalf("unable to send update") + } + } + + // Register the rest of the clients. + for i := numClients * 2 / 3; i < numClients; i++ { + c, err := server.Subscribe() + if err != nil { + t.Fatalf("unable to subscribe: %v", err) + } + + clients[i] = c + } + + // Cancel one third of the clients. + for i := 0; i < numClients/3; i++ { + clients[i].Cancel() + } + + // Send the rest of the updates. + for i := numUpdates / 2; i < numUpdates; i++ { + if err := server.SendUpdate(i); err != nil { + t.Fatalf("unable to send update") + } + } + + // Now ensure the clients got the updates we expect. + for i, c := range clients { + + var from, to int + switch { + + // We expect the first third of the clients to quit, since they + // were cancelled. + case i < numClients/3: + select { + case <-c.Quit(): + continue + case <-time.After(1 * time.Second): + t.Fatalf("cancelled client %v did not quit", i) + } + + // The next third should receive all updates. + case i < numClients*2/3: + from = 0 + to = numUpdates + + // And finally the last third should receive the last half of + // the updates. + default: + from = numUpdates / 2 + to = numUpdates + } + + for cnt := from; cnt < to; cnt++ { + select { + case upd := <-c.Updates(): + j := upd.(int) + if j != cnt { + t.Fatalf("expected %v, got %v, for "+ + "client %v", cnt, j, i) + } + + case <-time.After(1 * time.Second): + t.Fatalf("did not receive expected update %v "+ + "for client %v", cnt, i) + } + } + + } + +}