Messaging subsystem
The messaging subsystem provides the communication backbone for all inter-component communication in RockBot. Agents, user proxies, script runners, and tool bridges communicate exclusively through a topic-based pub/sub message bus — no direct method calls, no shared memory, no in-process coupling.
Design principles
- Transport agnosticism — all contracts are defined in
RockBot.Messaging.Abstractionswith no provider-specific types leaking into application code - Explicit acknowledgment — handlers return
MessageResult(Ack/Retry/DeadLetter); the infrastructure never silently drops or requeues messages - Immutable envelopes —
MessageEnvelopeis a sealed record; the body is raw bytes, not a typed object, so the transport layer is agnostic to payload schema - Built-in observability — W3C trace context is propagated through headers; metrics are recorded per destination and result
Core abstractions
MessageEnvelope
The universal message container. Every message flowing through the system, regardless of type
or direction, is wrapped in a MessageEnvelope.
public sealed record MessageEnvelope(
string MessageId, // Unique ID (GUID, auto-generated by Create())
string MessageType, // Type classifier — used for routing and deserialization
string Source, // Originating component name
ReadOnlyMemory<byte> Body, // Raw payload (JSON-encoded, but transport doesn't know this)
DateTimeOffset Timestamp,
string? CorrelationId, // Links a reply to its originating request
string? ReplyTo, // Topic where replies should be published
string? Destination, // Target agent name; null for broadcast
IReadOnlyDictionary<string, string> Headers // Custom metadata (see WellKnownHeaders)
);
MessageEnvelope.Create(messageType, source, body) generates a new MessageId (GUID) and
defaults Timestamp to DateTimeOffset.UtcNow.
IMessagePublisher
public interface IMessagePublisher : IAsyncDisposable
{
Task PublishAsync(string topic, MessageEnvelope envelope, CancellationToken ct = default);
}
Publishes a message to a named topic. The topic is a hierarchical dot-separated string used as
a routing key (e.g. user.message, tool.invoke.mcp, script.invoke).
IMessageSubscriber
public interface IMessageSubscriber : IAsyncDisposable
{
Task<ISubscription> SubscribeAsync(
string topic,
string subscriptionName,
Func<MessageEnvelope, CancellationToken, Task<MessageResult>> handler,
CancellationToken ct = default);
}
Creates a durable subscription. Each subscriptionName gets its own queue — multiple agents
with the same subscription name form a competing-consumer group (only one processes each
message). Different subscription names on the same topic receive independent copies.
Wildcard patterns are supported:
*— matches exactly one path segment:tool.invoke.*matchestool.invoke.mcpbut nottool.invoke.mcp.list#— matches zero or more segments:agent.#matches everything belowagent
ISubscription
public interface ISubscription : IAsyncDisposable
{
string Topic { get; }
string SubscriptionName { get; }
bool IsActive { get; }
}
A handle to an active subscription. Disposing it unsubscribes and releases the channel.
MessageResult
The handler’s explicit decision about what happens to the message after processing:
| Value | Meaning |
|---|---|
Ack |
Processed successfully — remove from queue |
Retry |
Transient failure — requeue for redelivery |
DeadLetter |
Poison message — route to dead-letter queue, do not retry |
Payload helpers
MessageEnvelopeExtensions (in RockBot.Messaging.Abstractions) handles JSON serialization
using System.Text.Json with camelCase policy:
// Wrap a typed payload into an envelope
var envelope = userMessage.ToEnvelope<UserMessage>(
source: "blazor-proxy",
correlationId: Guid.NewGuid().ToString("N"),
replyTo: "user.response.proxy-1",
destination: "rockbot-agent");
// Unwrap a typed payload from an envelope
var userMessage = envelope.GetPayload<UserMessage>();
Both methods accept an optional JsonSerializerOptions to override the default camelCase policy.
Headers and trust levels
WellKnownHeaders defines standard header keys used by the framework:
| Header key | Constant | Purpose |
|---|---|---|
rb-content-trust |
ContentTrust |
Trust level of the content |
rb-tool-provider |
ToolProvider |
Backend type (mcp, etc.) |
rb-timeout-ms |
TimeoutMs |
Invocation timeout override |
ContentTrustValues defines standard values for the rb-content-trust header:
| Value | Meaning |
|---|---|
system |
Agent-generated system prompts and directives |
user-input |
Human user messages |
tool-request |
Outbound tool invocations |
tool-output |
External tool responses — treated as untrusted |
agent-message |
Agent-to-agent content |
Trust levels allow middleware and handlers to apply different validation or sanitization policies depending on the origin of content.
Trace context propagation
TraceContextPropagator propagates W3C TraceContext through message headers using only
System.Diagnostics — no OpenTelemetry SDK dependency in the abstractions layer:
// Inject current Activity context into outgoing headers
TraceContextPropagator.Inject(Activity.Current, headers);
// Extract parent context from incoming headers
var parentContext = TraceContextPropagator.Extract(envelope.Headers);
The publisher injects trace context into every outgoing envelope’s headers. The subscriber extracts it and creates a child Activity, so distributed traces flow seamlessly across process boundaries through the message bus.
RabbitMQ provider
RockBot.Messaging.RabbitMQ implements the messaging abstractions using RabbitMQ with a topic
exchange. No application code references RabbitMQ types directly.
Exchange topology
rockbot (topic exchange)
├── rockbot.{subscriptionName} ← durable queue per subscription
│ DLX: rockbot.dlx
│
└── rockbot.{subscriptionName}.dlq ← dead-letter queue (auto-created)
Bound to: rockbot.dlx
Every queue has a dead-letter exchange configured at creation time. Messages that return
DeadLetter from a handler (or are rejected after exhausting retries) route to
rockbot.{subscriptionName}.dlq for inspection.
Connection and channel model
RabbitMQ connections are heavyweight (TCP + authentication); channels are lightweight. The provider follows RabbitMQ’s recommended pattern:
- One connection per process —
RabbitMqConnectionManagerholds a single sharedIConnection - One channel per publisher/consumer — channels are not thread-safe; each
Publishcall uses a dedicated channel per publisher instance; each subscriber gets a dedicated channel per subscription
Configuration
public sealed class RabbitMqOptions
{
public string HostName { get; set; } = "localhost";
public int Port { get; set; } = 5672;
public string UserName { get; set; } = "guest";
public string Password { get; set; } = "guest";
public string VirtualHost { get; set; } = "/";
public string ExchangeName { get; set; } = "rockbot";
public string DeadLetterExchangeName { get; set; } = "rockbot.dlx";
public bool Durable { get; set; } = true;
public ushort PrefetchCount { get; set; } = 10;
}
AMQP header mapping
Standard AMQP properties are mapped from MessageEnvelope fields:
| AMQP property | MessageEnvelope field |
|---|---|
MessageId |
MessageId |
Type |
MessageType |
CorrelationId |
CorrelationId |
ReplyTo |
ReplyTo |
Timestamp |
Timestamp (Unix epoch seconds) |
ContentType |
Always "application/json" |
DeliveryMode |
Always 2 (persistent) |
Custom headers are written to the AMQP Headers table with rb- prefix:
| AMQP header key | Source |
|---|---|
rb-source |
envelope.Source |
rb-destination |
envelope.Destination |
rb-traceparent |
Injected by TraceContextPropagator |
rb-{key} |
Any user-supplied envelope.Headers entry |
Metrics
RabbitMqDiagnostics records zero-allocation metrics via System.Diagnostics.Metrics:
| Metric | Type | Tags |
|---|---|---|
rockbot.messaging.publish.duration |
Histogram (ms) | destination |
rockbot.messaging.publish.messages |
Counter | destination |
rockbot.messaging.process.duration |
Histogram (ms) | destination, result |
rockbot.messaging.process.messages |
Counter | destination, result |
rockbot.messaging.active_messages |
UpDownCounter | — |
DI registration
services.AddRockBotRabbitMq(opts =>
{
opts.HostName = "rabbitmq.cluster.local";
opts.Port = 5672;
opts.UserName = "rockbot";
opts.Password = "secret";
opts.VirtualHost = "/";
});
This registers RabbitMqConnectionManager, IMessagePublisher, and IMessageSubscriber as
singletons.
Topic naming conventions
Topics follow a hierarchical dot-separated scheme. Wildcard subscribers can cover broad categories; point-to-point uses full topic names.
| Topic | Direction | Purpose |
|---|---|---|
user.message |
User proxy → Agent | User input messages |
user.response.{proxyId} |
Agent → User proxy | Agent replies |
user.feedback |
User proxy → Agent | Thumbs-up / thumbs-down |
conversation.history.request |
User proxy → Agent | Request conversation history |
conversation.history.response.{proxyId} |
Agent → User proxy | History reply |
tool.invoke.* |
Agent → Tool bridge | Tool invocation requests |
tool.result.* |
Tool bridge → Agent | Tool results |
script.invoke |
Agent → Scripts Manager | Script execution request |
script.result.{correlationId} |
Scripts Manager → Agent | Script output |
agent.task.* |
Agent → Agent | A2A task delegation |
In-process bus (development/testing)
RockBot.Messaging.InProcess provides an in-memory message bus with the same
IMessagePublisher / IMessageSubscriber interfaces. No RabbitMQ required.
Use for:
- Local development without a running RabbitMQ instance
- Unit and integration tests
- Single-process multi-agent scenarios
Register with:
services.AddInProcessMessaging();