Message Queue Security · RabbitMQ · Redis Pub/Sub · AMQP
MCP server message queue security — RabbitMQ, Redis pub/sub, and AMQP risks
MCP servers that consume from RabbitMQ, Redis pub/sub, or other AMQP brokers treat the message queue as a trusted internal channel — but queue messages are data that may originate from external systems, other services, or attacker-controlled inputs. Shared admin credentials, user-controlled channel patterns in PSUBSCRIBE, deserializing queue payloads without schema validation, and connection strings embedded in environment variables are the four vulnerability classes that convert a routine message queue integration into a critical security finding.
AMQP authentication: vhost isolation and per-service credentials
RabbitMQ's virtual host (vhost) system is the primary isolation boundary. A vhost is a logical partition: queues, exchanges, and bindings in one vhost are invisible to connections authenticated to a different vhost. MCP servers should each have their own dedicated vhost and a per-service username with permissions restricted to that vhost. Using a shared admin user across all services — the most common lazy configuration — means that compromising any single MCP server's credentials grants full access to all queues on the broker:
// DANGEROUS: shared admin credentials used by all services
// amqp://admin:secretpassword@rabbitmq-host:5672/
// A compromised mcp-file-server can now read from the mcp-payment-server's queues
// SAFE: per-service credentials with vhost isolation
// Provision with rabbitmqctl on the broker:
// rabbitmqctl add_vhost mcp-file-vhost
// rabbitmqctl add_user mcp-file-svc "$(openssl rand -base64 32)"
// rabbitmqctl set_permissions -p mcp-file-vhost mcp-file-svc "^mcp-file\." "^mcp-file\." "^mcp-file\."
// # Pattern: configure/write/read restricted to queues matching ^mcp-file\..*
import amqp from 'amqplib';
async function createChannel() {
// Each service uses its own vhost and credentials
const connection = await amqp.connect({
protocol: 'amqps', // TLS — never plain amqp:// in production
hostname: process.env.RABBITMQ_HOST,
port: 5671,
username: process.env.RABBITMQ_USER, // From Vault/Secrets Manager
password: process.env.RABBITMQ_PASS, // From Vault/Secrets Manager
vhost: process.env.RABBITMQ_VHOST, // e.g. 'mcp-file-vhost'
});
const channel = await connection.createChannel();
// This connection cannot see or consume from any other service's vhost
return channel;
}
Redis pub/sub channel injection via attacker-controlled patterns
Redis PSUBSCRIBE accepts glob patterns (mcp.*, events.?) and subscribes to all channels matching the pattern. If an MCP server accepts a channel name or pattern from user input and passes it directly to PSUBSCRIBE, an attacker can subscribe to channels they should not be able to reach. The pattern * subscribes to every channel on the Redis instance, bypassing any per-channel isolation:
import { createClient } from 'redis';
// VULNERABLE: user-controlled channel pattern in PSUBSCRIBE
async function subscribeToEvents(args) {
// args.channel is MCP tool input — controlled by the LLM / caller
const subscriber = client.duplicate();
await subscriber.connect();
// An attacker passes args.channel = "*" to subscribe to all channels,
// or "payment.*" to intercept payment events from another service
await subscriber.pSubscribe(args.channel, (message, channel) => {
handleMessage(message, channel);
});
}
// SAFE: enforce an allowlist of exact channel names
const ALLOWED_CHANNELS = new Set([
'mcp:file:events',
'mcp:file:notifications',
]);
async function subscribeToEvents(args) {
// Validate: exact match against allowlist, no glob patterns accepted
if (!ALLOWED_CHANNELS.has(args.channel)) {
throw new Error(`Channel not permitted: ${args.channel}`);
}
const subscriber = client.duplicate();
await subscriber.connect();
// Use SUBSCRIBE (exact), not PSUBSCRIBE (glob) — no pattern expansion
await subscriber.subscribe(args.channel, (message) => {
handleMessage(message);
});
}
Redis 6+ provides ACL commands that enforce channel restrictions at the connection level, before application code runs. Create a dedicated Redis user that can only subscribe to specific channel name prefixes:
# Redis 6+ ACL: restrict mcp-file-svc to only subscribe on mcp:file:* channels # Run in redis-cli or via ACL SETUSER command: ACL SETUSER mcp-file-svc on ><password> ~* &mcp:file:* -@all +subscribe +psubscribe +unsubscribe +punsubscribe +ping +quit # &mcp:file:* — only channels matching mcp:file:* are permitted for pub/sub # -@all — deny all commands by default # +subscribe — allow SUBSCRIBE command # The server-level ACL rejects PSUBSCRIBE with pattern "*" even if app code passes it
Message payload deserialization without schema validation
Queue messages are not internal trusted data. They are serialized payloads that may originate from external systems, third-party services, or compromised upstream producers. An MCP server that deserializes a queue message and immediately uses its fields as operands — without validating the shape first — is vulnerable to malformed-payload attacks, type confusion, and injection through message content that is later used in database queries, shell commands, or file paths:
import amqp from 'amqplib';
import { z } from 'zod';
// VULNERABLE: direct destructuring of queue message, no validation
channel.consume('mcp-tasks', async (msg) => {
const task = JSON.parse(msg.content.toString());
// task.filePath, task.userId, task.action are used directly below
// An attacker who can publish to this queue controls all three fields
await processFile(task.filePath, task.userId, task.action);
channel.ack(msg);
});
// SAFE: parse with Zod schema before any use of message fields
const TaskSchema = z.object({
taskId: z.string().uuid(),
filePath: z.string().regex(/^[a-zA-Z0-9_\-./]+$/).max(255),
userId: z.string().uuid(),
action: z.enum(['read', 'index', 'summarize']), // only these values allowed
});
channel.consume('mcp-tasks', async (msg) => {
if (!msg) return;
let task;
try {
const raw = JSON.parse(msg.content.toString());
task = TaskSchema.parse(raw); // throws ZodError if shape is wrong
} catch (err) {
// Dead-letter the message rather than crashing or silently proceeding
channel.nack(msg, false, false); // requeue=false → goes to DLX
logger.warn({ err, content: msg.content.toString() }, 'Invalid task message schema');
return;
}
// task.filePath is now known to match the regex, task.action is a known enum
await processFile(task.filePath, task.userId, task.action);
channel.ack(msg);
});
Dead-letter queue monitoring: messages rejected to the dead-letter exchange (DLX) indicate either schema evolution issues or active attempts to send malformed payloads. Alert on sustained DLX message rates — a spike may indicate an attacker probing the MCP server via the queue.
Connection string exposure
AMQP and Redis connection strings carry full credentials: amqp://user:password@host:5672/vhost and redis://:password@host:6379. These are commonly found in three insecure locations: hardcoded in source code (visible in the repository), in .env files that get committed or leak via debug endpoints, and in container environment variables that appear in kubectl describe pod output or process listings. The safe pattern is to retrieve credentials from a secrets manager at startup and never log the assembled connection URL:
import { SecretsManagerClient, GetSecretValueCommand } from '@aws-sdk/client-secrets-manager';
import amqp from 'amqplib';
async function getAmqpConnection() {
const sm = new SecretsManagerClient({ region: 'us-east-1' });
// Retrieve credentials from Secrets Manager — never from process.env directly
const { SecretString } = await sm.send(new GetSecretValueCommand({
SecretId: 'prod/mcp-file-server/rabbitmq',
}));
const { host, port, username, password, vhost } = JSON.parse(SecretString);
// Do NOT log the assembled URL — it contains the password
const connection = await amqp.connect({
protocol: 'amqps',
hostname: host,
port: Number(port),
username,
password,
vhost,
});
// Log only the host and vhost for observability — never the credentials
logger.info({ host, vhost }, 'AMQP connection established');
return connection;
}
// In Vault-based environments, use the Node.js Vault SDK:
// const secret = await vault.read('secret/data/mcp-file-server/rabbitmq');
// const { username, password, host, vhost } = secret.data.data;
SkillAudit findings
JSON.parse() and fields are used directly without Zod or equivalent schema validation. A malformed or attacker-crafted message can cause type confusion, injection, or crash the MCP server process.
pSubscribe() without restricting the pattern. An attacker can subscribe to all channels with pattern *, intercepting messages from other services on the same Redis instance.
process.env.AMQP_URL or process.env.REDIS_URL and visible in process listings, kubectl describe pod output, and debug log lines. Use Secrets Manager or Vault; pass host/port/credentials as separate structured secrets, not as a single URL string.
channel.prefetch() or a maximum message byte size. A producer that sends a very large message (hundreds of MB) can cause the Node.js process to exhaust heap memory while parsing the payload.
Paste a GitHub URL at skillaudit.dev to get a graded report card covering message queue security alongside all other MCP security dimensions.