MCP Server Security · Message Queues · Async Architecture
MCP Server Message Queue Security: Replay Attacks, Dead-Letter Queue Poisoning, Consumer Authentication, and Idempotent Tool Execution
When MCP tool calls move from synchronous HTTP to an async message broker, the threat model changes completely. Brokers like Kafka, RabbitMQ, and SQS provide strong delivery guarantees — but those guarantees describe delivery, not safe execution. A replayed message re-executes the tool call that already charged a card. A crafted message that reliably fails processing reaches the dead-letter queue, where a more privileged consumer picks it up. An unauthenticated broker connection lets any internal process inject tool calls as any tenant. Async MCP deployments need a security layer built on top of the broker's delivery semantics, not in place of it.
Why MCP servers use message queues
The synchronous MCP transport model — client sends a tool call, server executes it, server returns the result — breaks at scale in three ways. First, long-running tools (file ingestion, LLM sub-calls, external API calls with multi-second latencies) block the HTTP connection and the agent waiting on the other end. Second, a synchronous server that crashes mid-execution loses the in-flight tool call entirely — the client sees a 502 and retries, potentially executing the tool twice. Third, fan-out patterns — where one agent action triggers tool calls across multiple downstream services — require the caller to wait for all of them serially unless you add parallel coordination.
Message queues solve all three: the initial tool call acknowledgment is immediate, retries happen at the broker layer rather than requiring client-side retry logic, and fan-out becomes a simple multi-topic publish. But they move the execution window out of the synchronous request-response cycle, which breaks the security model that assumes tool execution is bounded by the authenticated HTTP request.
1. Broker authentication: preventing unauthorized tool injection
The first and most fundamental question: who is allowed to publish messages to the MCP tool execution queue? In many internal deployments, the answer is "any service on the internal network" — which is the same as "any attacker who compromises a single internal service can inject arbitrary tool calls as any tenant."
Apache Kafka
SASL/SCRAM-SHA-256 or SASL/SCRAM-SHA-512 for password-based auth. mTLS for certificate-based mutual auth (strongest — requires client certificate per service). SASL/PLAIN in plaintext is the dangerous default that ships in many tutorials. ACLs via kafka-acls.sh or Confluent RBAC to restrict which service principals can produce to which topics.
RabbitMQ
Username/password per virtual host (vhost). Certificate-based auth via the rabbitmq_auth_mechanism_ssl plugin. Vhost-level isolation for tenant separation. User permissions with rabbitmqctl set_permissions — configure, write, read patterns on resource names. OAuth 2.0 integration via rabbitmq_auth_backend_oauth2 for service account tokens.
AWS SQS
IAM policy on the SQS queue resource. sqs:SendMessage, sqs:ReceiveMessage, sqs:DeleteMessage as separate permissions — principle of least privilege means producers don't receive and consumers don't send. VPC endpoint policies to block cross-account access. SQS resource policy aws:SourceVpc condition to block non-VPC sends.
// DANGEROUS: no auth, open broker connection — any internal service can publish tool calls
const kafka = new Kafka({ brokers: ['kafka:9092'] });
// CORRECT: SASL/SCRAM with TLS
const kafka = new Kafka({
brokers: ['kafka:9092'],
ssl: true,
sasl: {
mechanism: 'scram-sha-256',
username: process.env.KAFKA_USERNAME,
password: process.env.KAFKA_PASSWORD,
},
});
// ACL: only the MCP gateway service principal can produce to mcp-tool-calls topic
// kafka-acls.sh --add --allow-principal User:mcp-gateway \
// --operation Write --topic mcp-tool-calls
SASL/PLAIN over a plaintext connection is equivalent to no auth. The username and password are base64-encoded in the SASL exchange, not encrypted. Any network observer reads them in cleartext. Always pair SASL/PLAIN with TLS (ssl: true in kafkajs, KAFKA_SSL=true in consumer configs). Better still, use SASL/SCRAM or mTLS — PLAIN exists for legacy compatibility only.
2. Replay attacks: re-executing tool calls after initial success
Message brokers guarantee at-least-once delivery by default — meaning a message may be delivered more than once, especially after consumer crashes, network partitions, or explicit redelivery requests. For tool calls that read data (readFile, listRecords), duplicate delivery is harmless — the second execution returns the same result. For tool calls with side effects, a replay is a second execution of the side effect:
- A
sendEmailtool call replayed 3× sends 3 emails - A
chargeCardtool call replayed 2× charges the card twice - A
provisionVirtualMachinetool call replayed creates two VMs - A
postWebhooktool call replayed delivers the webhook twice — if the receiver isn't idempotent, this causes double-processing on their end
The replay may happen legitimately (broker redelivery after consumer crash) or maliciously (attacker captures a message off the wire or from a leaked DLQ and re-publishes it with the same payload).
Tool call published: MCP gateway publishes { toolName: "sendEmail", args: { to: "user@example.com", subject: "Order confirmed" }, messageId: "msg-abc123" } to the tool-calls topic.
Consumer processes successfully: Consumer reads the message, sends the email, acknowledges the message (ack / commit offset). Email delivered once. ✓
Replay event: Attacker who captured the message (or automation error that resets consumer offset to an earlier position) republishes or redelivers msg-abc123.
Consumer processes again: No deduplication check — consumer sends the email a second time. User receives a duplicate "Order confirmed" email. In a payment context, this is a double-charge.
Idempotency key pattern for exactly-once tool execution
The solution is an idempotency key stored in a shared deduplication table. Every tool call message carries a globally unique idempotency key. Before executing the side effect, the consumer attempts to reserve the key with an INSERT ON CONFLICT DO NOTHING. If the insert succeeds (rowCount = 1), this is the first execution — proceed with the side effect. If it fails (rowCount = 0), this message has already been processed — skip the side effect, optionally return the cached result.
-- Deduplication table CREATE TABLE IF NOT EXISTS tool_idempotency_keys ( key TEXT PRIMARY KEY, tool_name TEXT NOT NULL, tenant_id TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'processing', -- processing | done | failed result JSONB, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), expires_at TIMESTAMPTZ NOT NULL DEFAULT now() + INTERVAL '7 days' ); CREATE INDEX ON tool_idempotency_keys (expires_at); -- for TTL cleanup
async function processToolCallMessage(message) {
const { idempotencyKey, toolName, tenantId, args } = message;
// Step 1: reserve the key before executing any side effect
const { rowCount } = await db.query(
`INSERT INTO tool_idempotency_keys (key, tool_name, tenant_id)
VALUES ($1, $2, $3)
ON CONFLICT (key) DO NOTHING`,
[idempotencyKey, toolName, tenantId]
);
if (rowCount === 0) {
// Already processed (or currently processing by another consumer)
const existing = await db.query(
'SELECT status, result FROM tool_idempotency_keys WHERE key = $1',
[idempotencyKey]
);
if (existing.rows[0].status === 'done') {
// Safe to ack — we already have the result
return existing.rows[0].result;
}
// Still 'processing' from another consumer — nack and let broker redeliver
// after the processing lock expires or the other consumer commits
throw new Error('Concurrent processing — nack for redeliver');
}
// Step 2: execute the side effect (only runs if we reserved the key)
let result;
try {
result = await executeTool(toolName, args, tenantId);
} catch (err) {
// Mark failed so retries are allowed
await db.query(
`UPDATE tool_idempotency_keys SET status = 'failed' WHERE key = $1`,
[idempotencyKey]
);
throw err;
}
// Step 3: mark done and cache the result
await db.query(
`UPDATE tool_idempotency_keys SET status = 'done', result = $1 WHERE key = $2`,
[JSON.stringify(result), idempotencyKey]
);
return result;
}
The idempotency key must be generated by the original caller, not the consumer. If the consumer generates the key (e.g., from the message's broker-assigned delivery tag), a replayed message gets a new delivery tag and therefore a new key — the deduplication check never fires. The key must travel with the message payload from the caller that initiated the tool call, so replays carry the same key. Use a UUID generated at tool-call submission time, stored in the message body alongside the tool arguments. This also makes the key auditable — it ties the deduplicated result back to the specific agent session that requested it. See our post on distributed locking for the analogous database-level pattern.
3. Dead-letter queue poisoning
A dead-letter queue (DLQ) is where messages go when the main consumer fails to process them after the configured retry count. DLQs serve a legitimate purpose: they prevent a poison-pill message (one that always throws an exception) from blocking the entire queue indefinitely. But they introduce a security vulnerability when:
- The DLQ consumer runs with different (often higher) permissions than the main consumer
- The DLQ consumer is a human-reviewed replay interface operated by an engineer with production access
- The DLQ message payload is rendered in a UI for inspection without sanitization
An attacker who can publish to the tool-calls queue can craft a message that reliably fails processing — for example, by including a tool argument that triggers a validation exception in the consumer — and then waits for that message to reach the DLQ. If the DLQ consumer re-processes messages with elevated permissions ("replay this failed job as admin"), the attacker has successfully escalated through the error path.
// DANGEROUS: DLQ consumer re-executes with elevated system permissions
const dlqConsumer = kafka.consumer({ groupId: 'dlq-replay-admin' });
dlqConsumer.subscribe({ topic: 'mcp-tool-calls-dlq' });
dlqConsumer.run({
eachMessage: async ({ message }) => {
const payload = JSON.parse(message.value);
// Re-runs with system-level service account (more privileged than original caller)
await executeTool(payload.toolName, payload.args, 'system'); // WRONG
}
});
// CORRECT: DLQ consumer enforces the original tenant/permission context
dlqConsumer.run({
eachMessage: async ({ message }) => {
const payload = JSON.parse(message.value);
// Validate the idempotency key and original tenant before re-executing
if (!payload.idempotencyKey || !payload.tenantId || !payload.originalPermissions) {
// Malformed or tampered DLQ message — quarantine, do not execute
await quarantineMessage(message);
return;
}
// Re-execute with ORIGINAL permissions only — no elevation
await executeTool(payload.toolName, payload.args, payload.tenantId, {
permissions: payload.originalPermissions,
});
}
});
DLQ message payloads are attacker-controlled data. If your operations team has a UI that displays DLQ messages for inspection before replaying them — which is common — that UI must sanitize the message payload before rendering it. A DLQ message payload containing <script>...</script> in a string field will execute in the operations UI if the template renders fields as raw HTML. This is a stored XSS in your operations console, delivered via the error path. See our post on Content Security Policy for hardening the UI layer.
The full DLQ hardening strategy:
- Preserve original context: embed the original
tenantId,permissions, andsessionIdin the message at publish time. DLQ re-processing must use these, never elevated credentials. - Message signing: HMAC-sign the message payload at publish time with a producer-side secret. DLQ consumers verify the signature before re-execution — tampered payloads (with elevated permission fields injected) fail verification.
- Schema validation on DLQ entry: validate the message against the expected tool call schema before adding to DLQ. A message that fails schema validation is not a failed tool call — it is malformed input. Route to a quarantine store, not the DLQ.
- DLQ access controls: the DLQ topic should have stricter ACLs than the main topic — only the DLQ consumer service principal and designated operations accounts should read it.
4. Consumer group isolation for multi-tenant MCP deployments
In Kafka, multiple consumers in the same consumer group share partition assignments — each partition is consumed by exactly one group member. Consumer groups with different group IDs each get their own independent copy of messages (all groups see all messages). This fan-out model creates a cross-tenant leakage risk if all tenants' tool calls are on the same topic:
// DANGEROUS: all tenants' tool calls on one topic, consumed by a shared group
// Tenant A's tool call for their private data reaches Tenant B's consumer
// if partition assignment puts them on the same consumer instance
// CORRECT Option 1: per-tenant topics with ACLs
// Topic naming: mcp-tool-calls-tenant-{tenantId}
// Each tenant's consumer group has ACLs only on their own topic
// CORRECT Option 2: single topic with per-message tenant validation
// Consumer validates tenantId in message against the authenticated session
// before executing — rejects messages where tenantId doesn't match the
// consumer's assigned tenant scope
async function validateTenantScope(message, consumerTenantId) {
const payload = JSON.parse(message.value);
if (payload.tenantId !== consumerTenantId) {
// Cross-tenant message — could be routing error or injection attempt
await quarantineMessage(message, 'cross-tenant-routing');
throw new Error(`Message tenant ${payload.tenantId} !== consumer scope ${consumerTenantId}`);
}
return payload;
}
For SQS, tenant isolation is straightforward: each tenant gets their own SQS queue, and the IAM policy on each queue grants access only to that tenant's service account. The queue URL itself is a capability — knowing the queue URL isn't sufficient without the IAM credentials that authorize sqs:ReceiveMessage.
5. Message schema validation
Consumers that trust the structure of inbound messages without validation are vulnerable to prototype pollution, type confusion, and injection attacks via unexpected field values. Message schema validation at the consumer boundary — before any processing — provides the same protection that HTTP request validation provides at the API boundary.
import { z } from 'zod';
const ToolCallMessageSchema = z.object({
idempotencyKey: z.string().uuid(),
toolName: z.string().min(1).max(100).regex(/^[a-zA-Z][a-zA-Z0-9_]*$/),
tenantId: z.string().uuid(),
sessionId: z.string().uuid(),
args: z.record(z.unknown()), // further validated per-tool
originalPermissions: z.array(z.string()).max(50),
publishedAt: z.string().datetime(),
signature: z.string().min(64), // HMAC-SHA256 hex
}).strict(); // .strict() rejects extra fields — prevents __proto__ injection
async function consumeMessage(rawMessage) {
let payload;
try {
payload = ToolCallMessageSchema.parse(JSON.parse(rawMessage.value));
} catch (err) {
// Invalid schema — quarantine immediately, do not process
await quarantineMessage(rawMessage, 'schema-validation-failed', err.errors);
return;
}
// Verify HMAC signature before processing
const expectedSig = computeHmac(payload, process.env.MESSAGE_SIGNING_SECRET);
if (!timingSafeEqual(Buffer.from(payload.signature, 'hex'), expectedSig)) {
await quarantineMessage(rawMessage, 'signature-mismatch');
return;
}
await processToolCallMessage(payload);
}
Use .strict() in Zod (or additionalProperties: false in JSON Schema) on message schemas. Without it, an attacker can add extra fields to a message payload — including __proto__ or constructor keys — that survive JSON parsing and pollute the object prototype chain when the payload is spread into configuration objects downstream. Strict mode rejects messages with unexpected fields, preventing both prototype pollution and accidental processing of injected fields.
6. Transport encryption and at-rest protection for tool arguments
Tool call messages often contain arguments that are sensitive: file paths, search queries that reveal user intent, API parameters that encode PII. Two protection layers are needed:
In-flight encryption (TLS): All broker connections should use TLS — this is non-negotiable for any production deployment. For Kafka, set ssl: true in client config. For RabbitMQ, use the amqps:// scheme. For SQS, the HTTPS endpoint is the only endpoint; there is no unencrypted alternative. For self-hosted brokers, provision certificates from an internal CA and rotate them on the same schedule as your TLS certificates elsewhere.
At-rest encryption for sensitive argument fields: Broker-level disk encryption protects against physical storage compromise but not against anyone with broker read access. For tool arguments that contain secrets (passwords, API keys passed as tool args — which is an anti-pattern but common), encrypt the sensitive fields before publishing using envelope encryption: generate a per-message data encryption key (DEK), encrypt the sensitive fields with the DEK using AES-256-GCM, encrypt the DEK with a key encryption key (KEK) stored in your KMS (AWS KMS, HashiCorp Vault), and store the encrypted DEK alongside the encrypted field in the message payload. The consumer decrypts the DEK from KMS, decrypts the field locally.
// Envelope encryption for sensitive tool args
import { KMSClient, GenerateDataKeyCommand, DecryptCommand } from '@aws-sdk/client-kms';
import { createCipheriv, createDecipheriv, randomBytes } from 'crypto';
const kms = new KMSClient({ region: 'us-east-1' });
const KEY_ID = process.env.KMS_KEY_ARN;
async function encryptSensitiveArgs(args) {
const { Plaintext: dek, CiphertextBlob: encryptedDek } =
await kms.send(new GenerateDataKeyCommand({ KeyId: KEY_ID, KeySpec: 'AES_256' }));
const iv = randomBytes(12);
const cipher = createCipheriv('aes-256-gcm', dek, iv);
const encrypted = Buffer.concat([cipher.update(JSON.stringify(args)), cipher.final()]);
const authTag = cipher.getAuthTag();
return {
encryptedArgs: encrypted.toString('base64'),
encryptedDek: Buffer.from(encryptedDek).toString('base64'),
iv: iv.toString('base64'),
authTag: authTag.toString('base64'),
};
}
async function decryptSensitiveArgs(encryptedPayload) {
const { Plaintext: dek } = await kms.send(new DecryptCommand({
KeyId: KEY_ID,
CiphertextBlob: Buffer.from(encryptedPayload.encryptedDek, 'base64'),
}));
const decipher = createDecipheriv(
'aes-256-gcm',
dek,
Buffer.from(encryptedPayload.iv, 'base64')
);
decipher.setAuthTag(Buffer.from(encryptedPayload.authTag, 'base64'));
return JSON.parse(
Buffer.concat([
decipher.update(Buffer.from(encryptedPayload.encryptedArgs, 'base64')),
decipher.final()
]).toString()
);
}
Delivery semantics: what "at-least-once" means for tool security
| Delivery guarantee | What the broker promises | Security implication for MCP tool calls |
|---|---|---|
| At-most-once | Message delivered 0 or 1 times — may be lost | Safe for side effects (no replay), but lost tool calls silently fail — not suitable for critical tool execution |
| At-least-once (default) | Message delivered 1+ times — may duplicate | Safe delivery, but requires idempotency keys on all side-effectful tools to prevent double execution |
| Exactly-once (Kafka transactions) | Message delivered exactly 1 time — Kafka transactional API | Eliminates replay at the broker level, but adds overhead; still need idempotency keys for malicious replay (which bypasses broker guarantees) |
Kafka's exactly-once semantics prevent broker-level duplicates but not malicious replay. Exactly-once delivery means the broker won't redeliver a message after it's committed. But if an attacker captures the raw message bytes and re-publishes them as a new message with a new offset, Kafka treats it as a distinct message and delivers it exactly once — to your consumer. The idempotency key pattern (deduplication table checked before each side effect) is the defense that covers both broker-level duplicates and malicious replay, regardless of delivery guarantee level.
Deployment checklist
- Broker requires authentication on all connections (SASL/SCRAM or mTLS); no unauthenticated plaintext connections permitted in production
- Topic-level ACLs restrict produce/consume permissions to named service principals; no wildcard grants
- All broker connections use TLS (ssl: true / amqps:// / HTTPS-only endpoint)
- Every side-effectful tool call message carries a caller-generated idempotency key (UUID) in the message body
- Consumers check idempotency key against deduplication table before executing any side effect
- Message payload is HMAC-signed by the producer; consumers verify signature before processing
- Zod / JSON Schema validation with
.strict()on every inbound message before processing - DLQ consumer re-executes with original tenant permissions, never elevated credentials
- DLQ messages are schema-validated before DLQ entry; tampered or malformed messages go to quarantine
- DLQ operations UI sanitizes all message fields before rendering (treat as untrusted HTML)
- Per-tenant topic ACLs or per-message tenant validation prevents cross-tenant message delivery
- Sensitive tool argument fields encrypted with envelope encryption (DEK + KMS) before publishing
SkillAudit findings for message queue vulnerabilities in MCP servers
tenantId, permissions, or toolName fields in message payloads without detection; consumer processes tampered payloads as legitimate__proto__ keys in message args affects downstream object operations; injection via unexpected fieldsSee also: Distributed locking and race conditions · OAuth security · CSRF security · Zero-trust architecture