One of the major system engineering tasks I encountered so far in my career, was to implement an asynchronous communication mechanism that would respect the following requirements:
- the system must be scallable (spawning multiple instances easy integratable)
- the communication must be asynchronous (use a service bus medium to publish notifications)
- the system must be stateless (we can have multiple signalR clients on different servers, and publisher on other servers, we must ensure that we have no coupling between them)
Requirement:
- Sending a user notification, meaning we are targeting a specific user and the message must be received by the user in all devices he is connected.
- Traking a correlation id, from publishing to service bus, until completion, by notifying the user (logged) back to the connected device
My tech stack is the following:
SignalR, RabbitMQ, Redis
To read more about this https://docs.microsoft.com/en-us/aspnet/core/signalr/redis-backplane?view=aspnetcore-5.0
There are ways of creating a faster and more scalable system, using project orleans and I am considering of creating a sample of that also in the future.
Design
The following sketch represents the idea behind the library
Details
The steps in the sketch are the following:
- Subscription
- the user logs in, then he connects (to any instance) to signalR hub. The logged user will have on the hub the connection id and the user id
- The signalR subscription has 2 roles:
- notifies the user to a specified channel
- listens to redis (using the pubsub mechanism) to a channel for that user only
- Publisher
- To send a notification, we publish a message to the service bus
- A worker listens to that queue, it gets the message and pushes it to the redis channel of the user targeted
- To send a notification, we publish a message to the service bus
Why redis? Behind the advantage of speed…I am using the pub sub mechanism, and while it would have worked also with other pub/sub mechanisms, Redis pub sub is extremely simple and fast, and it fires and forgets. We don’t keep anything stored in memory. The messages are instantly pushed to any active subscribers if any. Publishing and subscribing to a channel in Redis has a small computation cost, which is really what we need.
Why publishing first to Rabbit?
Well, because we don’t want to choke the system. We can implement a time to live in Rabbit and process all messages one after another.
Library
You can find the library at:
Example Subscriber
-
Create a project with signalR support.
-
Project Structure
-
Add authentication and make sure to pass the bearer token forward to the hub (it’s required to be authorized)
services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme) .AddJwtBearer("Bearer", options => { // GenericHttps http client name as defined in appsettings.json using configuredHttp options.BackchannelHttpHandler = HttpConfiguredClientFactory.CreateMessageHandler(SettingsConfiguredHttpClients.HTTPS, Configuration); options.Authority = Configuration["EndPoints:IdentityAuthority"]; options.RequireHttpsMetadata = false; options.TokenValidationParameters = new Microsoft.IdentityModel.Tokens.TokenValidationParameters { //TODO: the audience is not being checked - nor it's being defined in Identity Server // we need to reevaluate the whole process of getting scopes // we gete the user once in angular with the scopes we want (maybe we can refresh the token?) ValidateAudience = false, ValidAudience = "comx.notifications.hubapi" }; // for SIGNALR : from officiel documentation options.Events = new JwtBearerEvents { OnMessageReceived = context => { var accessToken = context.Request.Query["access_token"]; // If the request is for our hub... var path = context.HttpContext.Request.Path; if (!string.IsNullOrEmpty(accessToken) && (path.StartsWithSegments(SYSTEM_UPDATE_HUB))) { // Read the token out of the query string context.Token = accessToken; } return Task.CompletedTask; } }; });
-
Register Services
services.AddRedisManager<SettingsRedisConnection>(builder => { builder.AddManager<HubSystemUpdate, IHubSystemUpdate>(options => { options.RegisterController<RedisControllerSystemUpdate, HubSystemUpdate, IHubSystemUpdate>(); options.RegisterRoute<PubSubHubAsActive>(Routes.PubSubHubAsActive); options.RegisterRoute<PubSubHubDeleted>(Routes.PubSubHubDeleted); options.RegisterRoute<PubSubHubEdited>(Routes.PubSubHubEdited); options.RegisterRoute<PubSubHubCreated>(Routes.PubSubHubCreated); options.UseKeyBuilder<KeyBuilderSystemUpdate>(); options.UseNotificationEvents<NotificationEventsLogger>(); }); }); services.AddSignalR(); --- app.UseEndpoints(endpoints => { endpoints.MapHub<HubSystemUpdate>(SYSTEM_UPDATE_HUB); endpoints.MapHealthChecks("/health"); });
RegisterController – this system uses a controller-like method of handling various messages published to redis. Here we are registering a controller type.
RegisterRoute – this binds a model (redis published model) to a route (name) which is found in one of the registered controllers
UseKeyBuilder – a simple class that takes in a user and connection id and creates a key (redis channel name, and signalR subscriber key) -
Redis subscriber controller
All published models will be handled in a controller method (as we registered in startup)
public class RedisControllerSystemUpdate : NotificationControllerBase<HubSystemUpdate, IHubSystemUpdate> { public IHubContext<HubSystemUpdate, IHubSystemUpdate> HubContext { get; } public ILogger<RedisControllerSystemUpdate> Logger { get; } public RedisControllerSystemUpdate( IHubContext<HubSystemUpdate, IHubSystemUpdate> hubContext, ILogger<RedisControllerSystemUpdate> logger) { HubContext = hubContext; Logger = logger; } [NotificationRoute(Routes.PubSubHubAsActive)] public async Task HandleHubAsActive(PubSubHubAsActive notification) { try { IHubSystemUpdate clientHub = GetSubsriberHub(HubContext); await clientHub.HubSetAsActive(); } catch (Exception ex) { Logger.LogError(ex, "Error processing redis message PubSubHubAsActive"); } } }
-
The hub
For the signalR hub, we only need to declare the interface methods (client callbacks), the subscribe and unsubscribe is handled behind the scenes. And you can see, that in the controller methods, we actually use the methods we declare here.
public interface IHubSystemUpdate { public Task HubSetAsActive(); public Task HubDeleted(Guid hubId); public Task HubEdited(Guid hubId); public Task HubCreated(Guid hubId); } public class HubSystemUpdate : NotificationHub<HubSystemUpdate, IHubSystemUpdate> { public HubSystemUpdate(INotificationPubSubProvider provider) : base(provider) { } }
The library class NotificationHub will have implemented the subscribe and unsubscribe method and you just need to connect from your from end to the hub. To read more about sending bearer tokens from typescript client, read the following https://docs.microsoft.com/en-us/aspnet/core/signalr/authn-and-authz?view=aspnetcore-5.0
[Authorize] public class NotificationHub<THub, THubActions> : Hub where THub : Hub where THubActions : class { public NotificationHub(INotificationPubSubProvider provider); public override Task OnDisconnectedAsync(Exception exception); public void Subscribe(); public void Unubscribe(); }
Example Publisher
-
create a worker project
-
register the hub system services (for publisher).
We have a publisher manager and we need to know the routes (which are attached in the message exchange)
services.AddNotificationPublisherProvider<SettingsRedisConnection>(builder=> { builder.AddManager("SYSTEMUPDATE", options => { options.RegisterRoute<PubSubHubAsActive>(Routes.PubSubHubAsActive); options.RegisterRoute<PubSubHubDeleted>(Routes.PubSubHubDeleted); options.RegisterRoute<PubSubHubEdited>(Routes.PubSubHubEdited); options.RegisterRoute<PubSubHubCreated>(Routes.PubSubHubCreated); options.UseKeyBuilder<KeyBuilderSystemUpdate>(); options.UseNotificationEvents<NotificationEventsLogger>(); }); });
-
RabbitMQ
I will not enter into details on how to handle rabbitmq here, an in fact you can chose any service bus you want, just make sure that you pass events containing the user id for which the message is intended
My rabbit mq message handler takes in the INotificationPublisherFactory which handles delivering messages
public class EventSystemUpdateHandler : IntegrationEventHandler<EventSystemUpdate> { private readonly INotificationPublisherFactory _publisherManager; private readonly ILogger<EventSystemUpdateHandler> _logger; public EventSystemUpdateHandler( INotificationPublisherFactory publisherProvider, ILogger<EventSystemUpdateHandler> logger) { _publisherManager = publisherProvider; _logger = logger; } public override Task<HandlerResult> HandleAsync(EventSystemUpdate @event) { NotificationPublisherClient publisherManager = _publisherManager.GetPublisherClient("SYSTEMUPDATE"); NotificationPublisher publisher; switch (@event.Type) { case EventSystemUpdateType.SetHubAsActive: SuSetHubAsActive activeModel = (SuSetHubAsActive)EventSystemUpdate.To(@event); publisher = publisherManager.GetPublisherFor(activeModel.UserId.ToString()); publisher.Publish(new PubSubHubAsActive()); return Task.FromResult(HandlerResult.SetAcknowledged());