Sai.MessageHub
3.0.0
dotnet add package Sai.MessageHub --version 3.0.0
NuGet\Install-Package Sai.MessageHub -Version 3.0.0
<PackageReference Include="Sai.MessageHub" Version="3.0.0" />
<PackageVersion Include="Sai.MessageHub" Version="3.0.0" />
<PackageReference Include="Sai.MessageHub" />
paket add Sai.MessageHub --version 3.0.0
#r "nuget: Sai.MessageHub, 3.0.0"
#:package Sai.MessageHub@3.0.0
#addin nuget:?package=Sai.MessageHub&version=3.0.0
#tool nuget:?package=Sai.MessageHub&version=3.0.0
Sai.MessageHub
A WebSocket message hub implemented as ASP.NET Core middleware. Provides real-time bidirectional communication with comprehensive error handling, thread safety, and type-safe message routing.
Message Format
All messages use UTF-8 encoded text with the format:
type:<data serialized as JSON>
Integration Approaches
- Minimal API: Use WebApplication extension methods for simple scenarios
- Service-based: Inject
IMessageHubService
for complex background services and controllers
Both approaches can be combined in the same application.
Quick Start
1. Installation
Install the NuGet package or reference the project directly.
2. Program.cs Setup
using Sai.MessageHub;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddMessageHub();
builder.Services.AddHostedService<MyWorker>(); // Example worker which sends a heartbeat all MessageHub clients every second
var app = builder.Build();
app.UseMessageHub();
// Specific message handlers with strong typing
app.AddMessageReceivedHandler<int>("counter", (context, data) =>
{
context.SendMessageToClient("counter-response", data); // echo back to sender
context.SendMessageToAllClients("last-counter-update", DateTime.UtcNow); // send last update timestamp to all clients
});
app.AddMessageReceivedHandler<string>("chat", (context, message) =>
{
context.SendMessageToAllClients("chat-broadcast", new {
From = context.ConnectionID,
Message = message,
Timestamp = DateTime.UtcNow
});
});
// Fallback handler for unhandled message types (only runs if no specific handler found)
app.AddFallbackMessageReceivedHandler((context, dataJson) =>
{
Console.WriteLine($"Unhandled message type: {context.Type} from {context.ConnectionID}");
});
// All-message handler (runs for every message regardless of specific handlers)
app.AddAllMessageReceivedHandler((context, dataJson) =>
{
Console.WriteLine($"Message received: {context.Type} from {context.ConnectionID}");
});
// Connection lifecycle handlers
app.AddClientConnectedHandler(context =>
{
Console.WriteLine($"Client connected: {context.ConnectionID}");
context.SendMessageToClient("welcome", $"Hello {context.ConnectionID}!");
});
app.AddClientDisconnectedHandler(context =>
Console.WriteLine($"Client disconnected: {context.ConnectionID}"));
app.Run();
3. Example Background Service (MyWorker.cs)
public class MyWorker : BackgroundService
{
private readonly ILogger<MyWorker> _logger;
private readonly IMessageHubService _messageHubService;
public MyWorker(ILogger<MyWorker> logger, IMessageHubService messageHubService)
{
_logger = logger;
_messageHubService = messageHubService;
// Register handlers in the service
_messageHubService.AddClientConnectedHandler(context =>
{
_logger.LogInformation("Client {ConnectionId} connected", context.ConnectionID);
context.SendMessageToClient("server-time", DateTime.UtcNow);
});
_messageHubService.AddMessageReceivedHandler<string>("ping", (context, data) =>
{
_logger.LogInformation("Ping received from {ConnectionId}: {Data}", context.ConnectionID, data);
context.SendMessageToClient("pong", $"Server received: {data}");
});
// Fallback for error handling
_messageHubService.AddFallbackMessageReceivedHandler((context, dataJson) =>
{
_logger.LogWarning("Unhandled message type '{Type}' from {ConnectionId}",
context.Type, context.ConnectionID);
});
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
await Task.Delay(1000, stoppingToken);
_messageHubService.SendMessageToAllClients("heartbeat", new {
ServerTime = DateTime.UtcNow,
Uptime = Environment.TickCount64
});
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in background service");
}
}
}
}
4. Client-Side Implementation
<div id="status">Disconnected</div>
<div id="messages"></div>
<button onclick="sendPing()">Send Ping</button>
<button onclick="sendCounter()">Send Counter</button>
<script type="module">
import { MessageHub } from "/js/messageHub.js";
const messageHub = new MessageHub();
const messagesDiv = document.getElementById('messages');
const statusDiv = document.getElementById('status');
// Connection handlers
messageHub.setConnectionClosedHandler(() => {
statusDiv.textContent = 'Disconnected';
statusDiv.style.color = 'red';
});
messageHub.setErrorHandler((error) => {
console.error('WebSocket error:', error);
statusDiv.textContent = 'Error';
statusDiv.style.color = 'red';
});
// Message handlers
messageHub.addMessageHandler("welcome", (data) => {
messagesDiv.innerHTML += `<p><strong>Welcome:</strong> ${data}</p>`;
});
messageHub.addMessageHandler("pong", (data) => {
messagesDiv.innerHTML += `<p><strong>Pong:</strong> ${data}</p>`;
});
messageHub.addMessageHandler("heartbeat", (data) => {
console.log('Heartbeat:', data);
});
messageHub.addMessageHandler("chat-broadcast", (data) => {
messagesDiv.innerHTML += `<p><strong>${data.From}:</strong> ${data.Message} <small>(${data.Timestamp})</small></p>`;
});
// Fallback handler for unhandled messages
messageHub.addFallbackMessageHandler((type, data) => {
console.log(`Unhandled message - Type: ${type}, Data:`, data);
});
// All-message handler for logging
messageHub.addAllMessageHandler((type, data) => {
console.log(`Message received - Type: ${type}, Data:`, data);
});
// Connect to server
try {
await messageHub.connect();
statusDiv.textContent = 'Connected';
statusDiv.style.color = 'green';
} catch (error) {
statusDiv.textContent = 'Failed to connect';
statusDiv.style.color = 'red';
console.error('Connection failed:', error);
}
// Global functions for buttons
let counter = 0;
window.sendPing = () => messageHub.sendMessage("ping", "Hello from client!");
window.sendCounter = () => messageHub.sendMessage("counter", ++counter);
</script>
JavaScript Client (messageHub.js)
/* MessageHub v3.0 */
export class MessageHub {
constructor() {
this.websocket = null;
this.handlers = [];
this.fallbackHandlers = [];
this.allMessageHandlers = [];
this.closedHandler = null;
this.errorHandler = null;
this.open = false;
// Worker-based timer properties
this.worker = null;
this.workerIntervals = new Map();
this.workerBlobUrl = null;
}
connect() {
if (window.location.protocol.startsWith("https"))
return this.connectUri("wss://" + window.location.host);
else
return this.connectUri("ws://" + window.location.host);
}
connectUri(wsUri) {
return new Promise((resolve, reject) => {
this.websocket = new WebSocket(wsUri);
this.websocket.onopen = evt => {
this.open = true;
resolve();
};
this.websocket.onclose = evt => {
this.websocket.close();
if (this.open && this.closedHandler != null) {
this.closedHandler();
}
};
this.websocket.onmessage = evt => {
const dataIndex = evt.data.indexOf(":");
if (dataIndex < 0)
return;
const type = evt.data.substring(0, dataIndex);
const jsonData = evt.data.substring(dataIndex + 1);
// Parse JSON once and handle errors
let parsedData;
try {
parsedData = JSON.parse(jsonData);
} catch (error) {
console.error(`Failed to parse JSON for message type '${type}':`, error);
console.error('Invalid JSON data:', jsonData);
// Notify error handlers if available
if (this.onerror) {
this.onerror({
type: 'json-parse-error',
message: `Failed to parse JSON for message type '${type}'`,
error: error,
data: jsonData
});
}
return; // Skip processing this message
}
// 1. Execute specific handlers for this message type
let handled = false;
for (let i = 0; i < this.handlers.length; i++) {
const handler = this.handlers[i];
if (handler.type == type) {
try {
// Create a deep clone for each handler to prevent shared mutation
const clonedData = structuredClone(parsedData);
handler.delegate(clonedData);
handled = true;
} catch (error) {
console.error(`Handler error for message type '${type}':`, error);
if (this.onerror) {
this.onerror({
type: 'handler-error',
message: `Handler error for message type '${type}'`,
error: error
});
}
}
}
}
// 2. Execute fallback handlers only if no specific handlers were found
if (!handled) {
for (let i = 0; i < this.fallbackHandlers.length; i++) {
const handler = this.fallbackHandlers[i];
try {
// Create a deep clone for each fallback handler to prevent shared mutation
const clonedData = structuredClone(parsedData);
handler(type, clonedData);
} catch (error) {
console.error(`Fallback handler error for message type '${type}':`, error);
if (this.onerror) {
this.onerror({
type: 'handler-error',
message: `Fallback handler error for message type '${type}'`,
error: error
});
}
}
}
}
// 3. Always execute all-message handlers
for (let i = 0; i < this.allMessageHandlers.length; i++) {
const handler = this.allMessageHandlers[i];
try {
// Create a deep clone for each all-message handler to prevent shared mutation
const clonedData = structuredClone(parsedData);
handler(type, clonedData);
} catch (error) {
console.error(`All-message handler error for message type '${type}':`, error);
if (this.onerror) {
this.onerror({
type: 'handler-error',
message: `All-message handler error for message type '${type}'`,
error: error
});
}
}
}
};
this.websocket.onerror = evt => {
if (!this.open) {
reject(`Failed to connect to web socket: ${wsUri}`);
}
else if (this.errorHandler != null) {
this.errorHandler();
}
};
});
}
close() {
this.websocket.close();
this._cleanupWorker();
}
sendMessage(type, data) {
if (type.includes(":"))
throw new Error("'type' is invalid. Character ':' is not allowed.")
if (data == null)
data = {};
const json = JSON.stringify(data);
const message = type + ":" + json;
this.websocket.send(message);
}
addMessageHandler(type, delegate) {
this.handlers.push({ type: type, delegate: delegate });
}
addFallbackMessageHandler(delegate) {
this.fallbackHandlers.push(delegate);
}
addAllMessageHandler(delegate) {
this.allMessageHandlers.push(delegate);
}
setConnectionClosedHandler(delegate) {
this.closedHandler = delegate;
}
setErrorHandler(delegate) {
this.errorHandler = delegate;
}
// Worker-based timer methods (not subject to browser throttling)
_createWorkerCode() {
return `
const intervals = new Map();
self.onmessage = function(e) {
const { action, name, delay } = e.data;
switch (action) {
case 'setInterval':
// Clear existing interval if any
if (intervals.has(name)) {
clearInterval(intervals.get(name));
}
// Create new interval
const intervalId = setInterval(() => {
self.postMessage({ name: name, time: Date.now() });
}, delay);
intervals.set(name, intervalId);
break;
case 'clearInterval':
if (intervals.has(name)) {
clearInterval(intervals.get(name));
intervals.delete(name);
}
break;
case 'clearAll':
intervals.forEach(intervalId => clearInterval(intervalId));
intervals.clear();
break;
}
};
`;
}
_initializeWorker() {
if (this.worker) return;
try {
// Create worker from blob URL
const workerCode = this._createWorkerCode();
const blob = new Blob([workerCode], { type: 'application/javascript' });
this.workerBlobUrl = URL.createObjectURL(blob);
this.worker = new Worker(this.workerBlobUrl);
// Handle messages from worker
this.worker.onmessage = (e) => {
const { name } = e.data;
const callback = this.workerIntervals.get(name);
if (callback) {
try {
callback();
} catch (error) {
console.error(`Worker interval callback error for '${name}':`, error);
if (this.errorHandler) {
this.errorHandler({
type: 'worker-interval-error',
message: `Worker interval callback error for '${name}'`,
error: error
});
}
}
}
};
this.worker.onerror = (error) => {
console.error('Worker error:', error);
if (this.errorHandler) {
this.errorHandler({
type: 'worker-error',
message: 'Web Worker error',
error: error
});
}
};
} catch (error) {
console.error('Failed to create worker:', error);
throw error;
}
}
setWorkerInterval(name, callback, delay) {
if (!name || typeof name !== 'string') {
throw new Error('Interval name must be a non-empty string');
}
if (typeof callback !== 'function') {
throw new Error('Callback must be a function');
}
if (typeof delay !== 'number' || delay < 0) {
throw new Error('Delay must be a positive number');
}
// Initialize worker on first use
if (!this.worker) {
this._initializeWorker();
}
// Store callback
this.workerIntervals.set(name, callback);
// Send message to worker
this.worker.postMessage({ action: 'setInterval', name, delay });
}
clearWorkerInterval(name) {
if (!this.worker) return;
// Remove callback
this.workerIntervals.delete(name);
// Send message to worker
this.worker.postMessage({ action: 'clearInterval', name });
}
clearAllWorkerIntervals() {
if (!this.worker) return;
// Clear all callbacks
this.workerIntervals.clear();
// Send message to worker
this.worker.postMessage({ action: 'clearAll' });
}
_cleanupWorker() {
if (this.worker) {
this.clearAllWorkerIntervals();
this.worker.terminate();
this.worker = null;
}
if (this.workerBlobUrl) {
URL.revokeObjectURL(this.workerBlobUrl);
this.workerBlobUrl = null;
}
this.workerIntervals.clear();
}
}
Worker-Based Timers
The JavaScript client includes Web Worker-based timer functionality to avoid browser throttling issues:
The Problem
When browser tabs are inactive (minimized, in background, or when Windows is locked), browsers throttle JavaScript timers like setInterval()
to conserve resources. This can cause intervals set to run every second to slow down to once per minute, breaking real-time functionality.
The Solution
MessageHub provides worker-based timer methods that run in a separate Web Worker thread, unaffected by browser throttling:
// Instead of regular setInterval (subject to throttling):
setInterval(() => {
messageHub.sendMessage("ping", { time: Date.now() });
}, 1000);
// Use worker-based interval (consistent timing):
messageHub.setWorkerInterval('ping', () => {
messageHub.sendMessage("ping", { time: Date.now() });
}, 1000);
// Clear when done
messageHub.clearWorkerInterval('ping');
API Reference
setWorkerInterval(name, callback, delay)
- Creates a named interval in a Web Workername
: Unique identifier for the intervalcallback
: Function to execute on each intervaldelay
: Interval delay in milliseconds
clearWorkerInterval(name)
- Clears a specific interval by nameclearAllWorkerIntervals()
- Clears all worker intervals
Features
- Lazy initialization - Worker only created when first interval is set
- Named intervals for easy management
- Automatic cleanup on MessageHub close
- Error handling integration with MessageHub's error handler
- Zero configuration required
This is particularly useful for:
- Real-time dashboards that need consistent updates
- Chat applications with typing indicators
- Game state synchronization
- Any application requiring reliable periodic messaging
Error Handling & Validation
The MessageHub includes comprehensive error handling and validation:
Server-Side Validation
- Connection ID Validation: Non-null, non-empty, non-whitespace
- Message Type Validation: No colons, not empty
- Data Validation: Non-null objects required
- Handler Validation: All handlers must be non-null
Automatic Error Recovery
- WebSocket Exceptions: Automatic client cleanup on connection failures
- JSON Parsing: Graceful handling of malformed JSON with detailed logging
- Memory Management: Automatic cleanup of disconnected clients
- Thread Safety: Concurrent handler registration and execution
Logging
All errors are logged with structured logging including:
- Connection IDs for traceability
- Message types and data for debugging
- Exception details for troubleshooting
Architecture
Handler Types
- Specific Handlers: Handle messages of a specific type
- Fallback Handlers: Handle unmatched message types (only if no specific handler found)
- All-Message Handlers: Handle every message regardless of type
Thread Safety
- Concurrent collections for handler storage
- Lock-free snapshots for handler execution
- Thread-safe message queuing with channels
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- Microsoft.AspNetCore.Http.Abstractions (>= 2.3.0)
- Microsoft.AspNetCore.WebSockets (>= 2.3.0)
- Microsoft.Extensions.Hosting.Abstractions (>= 6.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 8.0.3)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.