Sai.MessageHub 3.0.0

dotnet add package Sai.MessageHub --version 3.0.0
                    
NuGet\Install-Package Sai.MessageHub -Version 3.0.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Sai.MessageHub" Version="3.0.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Sai.MessageHub" Version="3.0.0" />
                    
Directory.Packages.props
<PackageReference Include="Sai.MessageHub" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Sai.MessageHub --version 3.0.0
                    
#r "nuget: Sai.MessageHub, 3.0.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Sai.MessageHub@3.0.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Sai.MessageHub&version=3.0.0
                    
Install as a Cake Addin
#tool nuget:?package=Sai.MessageHub&version=3.0.0
                    
Install as a Cake Tool

Sai.MessageHub

A WebSocket message hub implemented as ASP.NET Core middleware. Provides real-time bidirectional communication with comprehensive error handling, thread safety, and type-safe message routing.

Message Format

All messages use UTF-8 encoded text with the format:

type:<data serialized as JSON>

Integration Approaches

  1. Minimal API: Use WebApplication extension methods for simple scenarios
  2. Service-based: Inject IMessageHubService for complex background services and controllers

Both approaches can be combined in the same application.

Quick Start

1. Installation

Install the NuGet package or reference the project directly.

2. Program.cs Setup

using Sai.MessageHub;

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddMessageHub();
builder.Services.AddHostedService<MyWorker>(); // Example worker which sends a heartbeat all MessageHub clients every second

var app = builder.Build();

app.UseMessageHub();

// Specific message handlers with strong typing
app.AddMessageReceivedHandler<int>("counter", (context, data) =>
{
    context.SendMessageToClient("counter-response", data); // echo back to sender
    context.SendMessageToAllClients("last-counter-update", DateTime.UtcNow); // send last update timestamp to all clients
});

app.AddMessageReceivedHandler<string>("chat", (context, message) =>
{
    context.SendMessageToAllClients("chat-broadcast", new { 
        From = context.ConnectionID, 
        Message = message,
        Timestamp = DateTime.UtcNow 
    });
});

// Fallback handler for unhandled message types (only runs if no specific handler found)
app.AddFallbackMessageReceivedHandler((context, dataJson) =>
{
    Console.WriteLine($"Unhandled message type: {context.Type} from {context.ConnectionID}");
});

// All-message handler (runs for every message regardless of specific handlers)
app.AddAllMessageReceivedHandler((context, dataJson) =>
{
    Console.WriteLine($"Message received: {context.Type} from {context.ConnectionID}");
});

// Connection lifecycle handlers
app.AddClientConnectedHandler(context => 
{
    Console.WriteLine($"Client connected: {context.ConnectionID}");
    context.SendMessageToClient("welcome", $"Hello {context.ConnectionID}!");
});

app.AddClientDisconnectedHandler(context => 
    Console.WriteLine($"Client disconnected: {context.ConnectionID}"));

app.Run();

3. Example Background Service (MyWorker.cs)

public class MyWorker : BackgroundService
{
    private readonly ILogger<MyWorker> _logger;
    private readonly IMessageHubService _messageHubService;

    public MyWorker(ILogger<MyWorker> logger, IMessageHubService messageHubService)
    {
        _logger = logger;
        _messageHubService = messageHubService;

        // Register handlers in the service
        _messageHubService.AddClientConnectedHandler(context =>
        {
            _logger.LogInformation("Client {ConnectionId} connected", context.ConnectionID);
            context.SendMessageToClient("server-time", DateTime.UtcNow);
        });

        _messageHubService.AddMessageReceivedHandler<string>("ping", (context, data) =>
        {
            _logger.LogInformation("Ping received from {ConnectionId}: {Data}", context.ConnectionID, data);
            context.SendMessageToClient("pong", $"Server received: {data}");
        });

        // Fallback for error handling
        _messageHubService.AddFallbackMessageReceivedHandler((context, dataJson) =>
        {
            _logger.LogWarning("Unhandled message type '{Type}' from {ConnectionId}", 
                context.Type, context.ConnectionID);
        });
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                await Task.Delay(1000, stoppingToken);
                _messageHubService.SendMessageToAllClients("heartbeat", new { 
                    ServerTime = DateTime.UtcNow,
                    Uptime = Environment.TickCount64 
                });
            }
            catch (OperationCanceledException)
            {
                break;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error in background service");
            }
        }
    }
}

4. Client-Side Implementation

<div id="status">Disconnected</div>
<div id="messages"></div>
<button onclick="sendPing()">Send Ping</button>
<button onclick="sendCounter()">Send Counter</button>

<script type="module">
    import { MessageHub } from "/js/messageHub.js";

    const messageHub = new MessageHub();
    const messagesDiv = document.getElementById('messages');
    const statusDiv = document.getElementById('status');

    // Connection handlers
    messageHub.setConnectionClosedHandler(() => {
        statusDiv.textContent = 'Disconnected';
        statusDiv.style.color = 'red';
    });

    messageHub.setErrorHandler((error) => {
        console.error('WebSocket error:', error);
        statusDiv.textContent = 'Error';
        statusDiv.style.color = 'red';
    });

    // Message handlers
    messageHub.addMessageHandler("welcome", (data) => {
        messagesDiv.innerHTML += `<p><strong>Welcome:</strong> ${data}</p>`;
    });

    messageHub.addMessageHandler("pong", (data) => {
        messagesDiv.innerHTML += `<p><strong>Pong:</strong> ${data}</p>`;
    });

    messageHub.addMessageHandler("heartbeat", (data) => {
        console.log('Heartbeat:', data);
    });

    messageHub.addMessageHandler("chat-broadcast", (data) => {
        messagesDiv.innerHTML += `<p><strong>${data.From}:</strong> ${data.Message} <small>(${data.Timestamp})</small></p>`;
    });

    // Fallback handler for unhandled messages
    messageHub.addFallbackMessageHandler((type, data) => {
        console.log(`Unhandled message - Type: ${type}, Data:`, data);
    });

    // All-message handler for logging
    messageHub.addAllMessageHandler((type, data) => {
        console.log(`Message received - Type: ${type}, Data:`, data);
    });

    // Connect to server
    try {
        await messageHub.connect();
        statusDiv.textContent = 'Connected';
        statusDiv.style.color = 'green';
    } catch (error) {
        statusDiv.textContent = 'Failed to connect';
        statusDiv.style.color = 'red';
        console.error('Connection failed:', error);
    }

    // Global functions for buttons
    let counter = 0;
    window.sendPing = () => messageHub.sendMessage("ping", "Hello from client!");
    window.sendCounter = () => messageHub.sendMessage("counter", ++counter);
</script>

JavaScript Client (messageHub.js)

/* MessageHub v3.0 */
export class MessageHub {

    constructor() {
        this.websocket = null;
        this.handlers = [];
        this.fallbackHandlers = [];
        this.allMessageHandlers = [];
        this.closedHandler = null;
        this.errorHandler = null;
        this.open = false;
        
        // Worker-based timer properties
        this.worker = null;
        this.workerIntervals = new Map();
        this.workerBlobUrl = null;
    }

    connect() {
        if (window.location.protocol.startsWith("https"))
            return this.connectUri("wss://" + window.location.host);
        else
            return this.connectUri("ws://" + window.location.host);
    }

    connectUri(wsUri) {
        return new Promise((resolve, reject) => {
            this.websocket = new WebSocket(wsUri);

            this.websocket.onopen = evt => {
                this.open = true;
                resolve();
            };

            this.websocket.onclose = evt => {
                this.websocket.close();
                if (this.open && this.closedHandler != null) {
                    this.closedHandler();
                }
            };

            this.websocket.onmessage = evt => {
                const dataIndex = evt.data.indexOf(":");
                if (dataIndex < 0)
                    return;
                const type = evt.data.substring(0, dataIndex);
                const jsonData = evt.data.substring(dataIndex + 1);
                
                // Parse JSON once and handle errors
                let parsedData;
                try {
                    parsedData = JSON.parse(jsonData);
                } catch (error) {
                    console.error(`Failed to parse JSON for message type '${type}':`, error);
                    console.error('Invalid JSON data:', jsonData);
                    
                    // Notify error handlers if available
                    if (this.onerror) {
                        this.onerror({
                            type: 'json-parse-error',
                            message: `Failed to parse JSON for message type '${type}'`,
                            error: error,
                            data: jsonData
                        });
                    }
                    return; // Skip processing this message
                }
                
                // 1. Execute specific handlers for this message type
                let handled = false;
                for (let i = 0; i < this.handlers.length; i++) {
                    const handler = this.handlers[i];
                    if (handler.type == type) {
                        try {
                            // Create a deep clone for each handler to prevent shared mutation
                            const clonedData = structuredClone(parsedData);
                            handler.delegate(clonedData);
                            handled = true;
                        } catch (error) {
                            console.error(`Handler error for message type '${type}':`, error);
                            if (this.onerror) {
                                this.onerror({
                                    type: 'handler-error',
                                    message: `Handler error for message type '${type}'`,
                                    error: error
                                });
                            }
                        }
                    }
                }

                // 2. Execute fallback handlers only if no specific handlers were found
                if (!handled) {
                    for (let i = 0; i < this.fallbackHandlers.length; i++) {
                        const handler = this.fallbackHandlers[i];
                        try {
                            // Create a deep clone for each fallback handler to prevent shared mutation
                            const clonedData = structuredClone(parsedData);
                            handler(type, clonedData);
                        } catch (error) {
                            console.error(`Fallback handler error for message type '${type}':`, error);
                            if (this.onerror) {
                                this.onerror({
                                    type: 'handler-error',
                                    message: `Fallback handler error for message type '${type}'`,
                                    error: error
                                });
                            }
                        }
                    }
                }

                // 3. Always execute all-message handlers
                for (let i = 0; i < this.allMessageHandlers.length; i++) {
                    const handler = this.allMessageHandlers[i];
                    try {
                        // Create a deep clone for each all-message handler to prevent shared mutation
                        const clonedData = structuredClone(parsedData);
                        handler(type, clonedData);
                    } catch (error) {
                        console.error(`All-message handler error for message type '${type}':`, error);
                        if (this.onerror) {
                            this.onerror({
                                type: 'handler-error',
                                message: `All-message handler error for message type '${type}'`,
                                error: error
                            });
                        }
                    }
                }

            };

            this.websocket.onerror = evt => {
                if (!this.open) {
                    reject(`Failed to connect to web socket: ${wsUri}`);
                }
                else if (this.errorHandler != null) {
                    this.errorHandler();
                }
            };
        });
    }

    close() {
        this.websocket.close();
        this._cleanupWorker();
    }

    sendMessage(type, data) {
        if (type.includes(":"))
            throw new Error("'type' is invalid. Character ':' is not allowed.")
        if (data == null)
            data = {};
        const json = JSON.stringify(data);
        const message = type + ":" + json;
        this.websocket.send(message);
    }

    addMessageHandler(type, delegate) {
        this.handlers.push({ type: type, delegate: delegate });
    }


    addFallbackMessageHandler(delegate) {
        this.fallbackHandlers.push(delegate);
    }

    addAllMessageHandler(delegate) {
        this.allMessageHandlers.push(delegate);
    }

    setConnectionClosedHandler(delegate) {
        this.closedHandler = delegate;
    }

    setErrorHandler(delegate) {
        this.errorHandler = delegate;
    }
    
    // Worker-based timer methods (not subject to browser throttling)
    
    _createWorkerCode() {
        return `
            const intervals = new Map();
            
            self.onmessage = function(e) {
                const { action, name, delay } = e.data;
                
                switch (action) {
                    case 'setInterval':
                        // Clear existing interval if any
                        if (intervals.has(name)) {
                            clearInterval(intervals.get(name));
                        }
                        
                        // Create new interval
                        const intervalId = setInterval(() => {
                            self.postMessage({ name: name, time: Date.now() });
                        }, delay);
                        
                        intervals.set(name, intervalId);
                        break;
                        
                    case 'clearInterval':
                        if (intervals.has(name)) {
                            clearInterval(intervals.get(name));
                            intervals.delete(name);
                        }
                        break;
                        
                    case 'clearAll':
                        intervals.forEach(intervalId => clearInterval(intervalId));
                        intervals.clear();
                        break;
                }
            };
        `;
    }
    
    _initializeWorker() {
        if (this.worker) return;
        
        try {
            // Create worker from blob URL
            const workerCode = this._createWorkerCode();
            const blob = new Blob([workerCode], { type: 'application/javascript' });
            this.workerBlobUrl = URL.createObjectURL(blob);
            this.worker = new Worker(this.workerBlobUrl);
            
            // Handle messages from worker
            this.worker.onmessage = (e) => {
                const { name } = e.data;
                const callback = this.workerIntervals.get(name);
                if (callback) {
                    try {
                        callback();
                    } catch (error) {
                        console.error(`Worker interval callback error for '${name}':`, error);
                        if (this.errorHandler) {
                            this.errorHandler({
                                type: 'worker-interval-error',
                                message: `Worker interval callback error for '${name}'`,
                                error: error
                            });
                        }
                    }
                }
            };
            
            this.worker.onerror = (error) => {
                console.error('Worker error:', error);
                if (this.errorHandler) {
                    this.errorHandler({
                        type: 'worker-error',
                        message: 'Web Worker error',
                        error: error
                    });
                }
            };
        } catch (error) {
            console.error('Failed to create worker:', error);
            throw error;
        }
    }
    
    setWorkerInterval(name, callback, delay) {
        if (!name || typeof name !== 'string') {
            throw new Error('Interval name must be a non-empty string');
        }
        if (typeof callback !== 'function') {
            throw new Error('Callback must be a function');
        }
        if (typeof delay !== 'number' || delay < 0) {
            throw new Error('Delay must be a positive number');
        }
        
        // Initialize worker on first use
        if (!this.worker) {
            this._initializeWorker();
        }
        
        // Store callback
        this.workerIntervals.set(name, callback);
        
        // Send message to worker
        this.worker.postMessage({ action: 'setInterval', name, delay });
    }
    
    clearWorkerInterval(name) {
        if (!this.worker) return;
        
        // Remove callback
        this.workerIntervals.delete(name);
        
        // Send message to worker
        this.worker.postMessage({ action: 'clearInterval', name });
    }
    
    clearAllWorkerIntervals() {
        if (!this.worker) return;
        
        // Clear all callbacks
        this.workerIntervals.clear();
        
        // Send message to worker
        this.worker.postMessage({ action: 'clearAll' });
    }
    
    _cleanupWorker() {
        if (this.worker) {
            this.clearAllWorkerIntervals();
            this.worker.terminate();
            this.worker = null;
        }
        
        if (this.workerBlobUrl) {
            URL.revokeObjectURL(this.workerBlobUrl);
            this.workerBlobUrl = null;
        }
        
        this.workerIntervals.clear();
    }
}

Worker-Based Timers

The JavaScript client includes Web Worker-based timer functionality to avoid browser throttling issues:

The Problem

When browser tabs are inactive (minimized, in background, or when Windows is locked), browsers throttle JavaScript timers like setInterval() to conserve resources. This can cause intervals set to run every second to slow down to once per minute, breaking real-time functionality.

The Solution

MessageHub provides worker-based timer methods that run in a separate Web Worker thread, unaffected by browser throttling:

// Instead of regular setInterval (subject to throttling):
setInterval(() => {
    messageHub.sendMessage("ping", { time: Date.now() });
}, 1000);

// Use worker-based interval (consistent timing):
messageHub.setWorkerInterval('ping', () => {
    messageHub.sendMessage("ping", { time: Date.now() });
}, 1000);

// Clear when done
messageHub.clearWorkerInterval('ping');

API Reference

  • setWorkerInterval(name, callback, delay) - Creates a named interval in a Web Worker
    • name: Unique identifier for the interval
    • callback: Function to execute on each interval
    • delay: Interval delay in milliseconds
  • clearWorkerInterval(name) - Clears a specific interval by name
  • clearAllWorkerIntervals() - Clears all worker intervals

Features

  • Lazy initialization - Worker only created when first interval is set
  • Named intervals for easy management
  • Automatic cleanup on MessageHub close
  • Error handling integration with MessageHub's error handler
  • Zero configuration required

This is particularly useful for:

  • Real-time dashboards that need consistent updates
  • Chat applications with typing indicators
  • Game state synchronization
  • Any application requiring reliable periodic messaging

Error Handling & Validation

The MessageHub includes comprehensive error handling and validation:

Server-Side Validation

  • Connection ID Validation: Non-null, non-empty, non-whitespace
  • Message Type Validation: No colons, not empty
  • Data Validation: Non-null objects required
  • Handler Validation: All handlers must be non-null

Automatic Error Recovery

  • WebSocket Exceptions: Automatic client cleanup on connection failures
  • JSON Parsing: Graceful handling of malformed JSON with detailed logging
  • Memory Management: Automatic cleanup of disconnected clients
  • Thread Safety: Concurrent handler registration and execution

Logging

All errors are logged with structured logging including:

  • Connection IDs for traceability
  • Message types and data for debugging
  • Exception details for troubleshooting

Architecture

Handler Types

  1. Specific Handlers: Handle messages of a specific type
  2. Fallback Handlers: Handle unmatched message types (only if no specific handler found)
  3. All-Message Handlers: Handle every message regardless of type

Thread Safety

  • Concurrent collections for handler storage
  • Lock-free snapshots for handler execution
  • Thread-safe message queuing with channels
Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
3.0.0 92 7/11/2025
2.0.4 159 7/20/2024
2.0.3 138 7/19/2024
2.0.2 128 7/19/2024
2.0.1 138 7/19/2024
2.0.0 132 7/19/2024
1.1.2 480 8/25/2022
1.1.1 518 4/1/2022
1.1.0 319 1/7/2022
1.0.3 366 12/17/2021
1.0.2 349 12/17/2021
1.0.1 385 12/17/2021
1.0.0 386 12/17/2021