Message Queue Security · RabbitMQ · Redis Pub/Sub · AMQP

MCP server message queue security — RabbitMQ, Redis pub/sub, and AMQP risks

MCP servers that consume from RabbitMQ, Redis pub/sub, or other AMQP brokers treat the message queue as a trusted internal channel — but queue messages are data that may originate from external systems, other services, or attacker-controlled inputs. Shared admin credentials, user-controlled channel patterns in PSUBSCRIBE, deserializing queue payloads without schema validation, and connection strings embedded in environment variables are the four vulnerability classes that convert a routine message queue integration into a critical security finding.

AMQP authentication: vhost isolation and per-service credentials

RabbitMQ's virtual host (vhost) system is the primary isolation boundary. A vhost is a logical partition: queues, exchanges, and bindings in one vhost are invisible to connections authenticated to a different vhost. MCP servers should each have their own dedicated vhost and a per-service username with permissions restricted to that vhost. Using a shared admin user across all services — the most common lazy configuration — means that compromising any single MCP server's credentials grants full access to all queues on the broker:

// DANGEROUS: shared admin credentials used by all services
// amqp://admin:secretpassword@rabbitmq-host:5672/
// A compromised mcp-file-server can now read from the mcp-payment-server's queues

// SAFE: per-service credentials with vhost isolation
// Provision with rabbitmqctl on the broker:
//   rabbitmqctl add_vhost mcp-file-vhost
//   rabbitmqctl add_user mcp-file-svc "$(openssl rand -base64 32)"
//   rabbitmqctl set_permissions -p mcp-file-vhost mcp-file-svc "^mcp-file\." "^mcp-file\." "^mcp-file\."
//   # Pattern: configure/write/read restricted to queues matching ^mcp-file\..*

import amqp from 'amqplib';

async function createChannel() {
  // Each service uses its own vhost and credentials
  const connection = await amqp.connect({
    protocol: 'amqps',            // TLS — never plain amqp:// in production
    hostname: process.env.RABBITMQ_HOST,
    port: 5671,
    username: process.env.RABBITMQ_USER,    // From Vault/Secrets Manager
    password: process.env.RABBITMQ_PASS,    // From Vault/Secrets Manager
    vhost: process.env.RABBITMQ_VHOST,      // e.g. 'mcp-file-vhost'
  });

  const channel = await connection.createChannel();
  // This connection cannot see or consume from any other service's vhost
  return channel;
}

Redis pub/sub channel injection via attacker-controlled patterns

Redis PSUBSCRIBE accepts glob patterns (mcp.*, events.?) and subscribes to all channels matching the pattern. If an MCP server accepts a channel name or pattern from user input and passes it directly to PSUBSCRIBE, an attacker can subscribe to channels they should not be able to reach. The pattern * subscribes to every channel on the Redis instance, bypassing any per-channel isolation:

import { createClient } from 'redis';

// VULNERABLE: user-controlled channel pattern in PSUBSCRIBE
async function subscribeToEvents(args) {
  // args.channel is MCP tool input — controlled by the LLM / caller
  const subscriber = client.duplicate();
  await subscriber.connect();

  // An attacker passes args.channel = "*" to subscribe to all channels,
  // or "payment.*" to intercept payment events from another service
  await subscriber.pSubscribe(args.channel, (message, channel) => {
    handleMessage(message, channel);
  });
}

// SAFE: enforce an allowlist of exact channel names
const ALLOWED_CHANNELS = new Set([
  'mcp:file:events',
  'mcp:file:notifications',
]);

async function subscribeToEvents(args) {
  // Validate: exact match against allowlist, no glob patterns accepted
  if (!ALLOWED_CHANNELS.has(args.channel)) {
    throw new Error(`Channel not permitted: ${args.channel}`);
  }

  const subscriber = client.duplicate();
  await subscriber.connect();

  // Use SUBSCRIBE (exact), not PSUBSCRIBE (glob) — no pattern expansion
  await subscriber.subscribe(args.channel, (message) => {
    handleMessage(message);
  });
}

Redis 6+ provides ACL commands that enforce channel restrictions at the connection level, before application code runs. Create a dedicated Redis user that can only subscribe to specific channel name prefixes:

# Redis 6+ ACL: restrict mcp-file-svc to only subscribe on mcp:file:* channels
# Run in redis-cli or via ACL SETUSER command:
ACL SETUSER mcp-file-svc on ><password> ~* &mcp:file:* -@all +subscribe +psubscribe +unsubscribe +punsubscribe +ping +quit

# &mcp:file:*  — only channels matching mcp:file:* are permitted for pub/sub
# -@all        — deny all commands by default
# +subscribe   — allow SUBSCRIBE command
# The server-level ACL rejects PSUBSCRIBE with pattern "*" even if app code passes it

Message payload deserialization without schema validation

Queue messages are not internal trusted data. They are serialized payloads that may originate from external systems, third-party services, or compromised upstream producers. An MCP server that deserializes a queue message and immediately uses its fields as operands — without validating the shape first — is vulnerable to malformed-payload attacks, type confusion, and injection through message content that is later used in database queries, shell commands, or file paths:

import amqp from 'amqplib';
import { z } from 'zod';

// VULNERABLE: direct destructuring of queue message, no validation
channel.consume('mcp-tasks', async (msg) => {
  const task = JSON.parse(msg.content.toString());
  // task.filePath, task.userId, task.action are used directly below
  // An attacker who can publish to this queue controls all three fields
  await processFile(task.filePath, task.userId, task.action);
  channel.ack(msg);
});

// SAFE: parse with Zod schema before any use of message fields
const TaskSchema = z.object({
  taskId:   z.string().uuid(),
  filePath: z.string().regex(/^[a-zA-Z0-9_\-./]+$/).max(255),
  userId:   z.string().uuid(),
  action:   z.enum(['read', 'index', 'summarize']),  // only these values allowed
});

channel.consume('mcp-tasks', async (msg) => {
  if (!msg) return;

  let task;
  try {
    const raw = JSON.parse(msg.content.toString());
    task = TaskSchema.parse(raw);   // throws ZodError if shape is wrong
  } catch (err) {
    // Dead-letter the message rather than crashing or silently proceeding
    channel.nack(msg, false, false);  // requeue=false → goes to DLX
    logger.warn({ err, content: msg.content.toString() }, 'Invalid task message schema');
    return;
  }

  // task.filePath is now known to match the regex, task.action is a known enum
  await processFile(task.filePath, task.userId, task.action);
  channel.ack(msg);
});

Dead-letter queue monitoring: messages rejected to the dead-letter exchange (DLX) indicate either schema evolution issues or active attempts to send malformed payloads. Alert on sustained DLX message rates — a spike may indicate an attacker probing the MCP server via the queue.

Connection string exposure

AMQP and Redis connection strings carry full credentials: amqp://user:password@host:5672/vhost and redis://:password@host:6379. These are commonly found in three insecure locations: hardcoded in source code (visible in the repository), in .env files that get committed or leak via debug endpoints, and in container environment variables that appear in kubectl describe pod output or process listings. The safe pattern is to retrieve credentials from a secrets manager at startup and never log the assembled connection URL:

import { SecretsManagerClient, GetSecretValueCommand } from '@aws-sdk/client-secrets-manager';
import amqp from 'amqplib';

async function getAmqpConnection() {
  const sm = new SecretsManagerClient({ region: 'us-east-1' });

  // Retrieve credentials from Secrets Manager — never from process.env directly
  const { SecretString } = await sm.send(new GetSecretValueCommand({
    SecretId: 'prod/mcp-file-server/rabbitmq',
  }));

  const { host, port, username, password, vhost } = JSON.parse(SecretString);

  // Do NOT log the assembled URL — it contains the password
  const connection = await amqp.connect({
    protocol: 'amqps',
    hostname: host,
    port: Number(port),
    username,
    password,
    vhost,
  });

  // Log only the host and vhost for observability — never the credentials
  logger.info({ host, vhost }, 'AMQP connection established');
  return connection;
}

// In Vault-based environments, use the Node.js Vault SDK:
// const secret = await vault.read('secret/data/mcp-file-server/rabbitmq');
// const { username, password, host, vhost } = secret.data.data;

SkillAudit findings

CRITICAL Shared admin AMQP credentials used by MCP server. The connection string uses a RabbitMQ admin user or a user with permissions across all vhosts. A compromise of this MCP server's credentials grants full access to every queue on the broker. Provision per-service credentials scoped to a dedicated vhost.
CRITICAL Message queue payload deserialized without schema validation. Queue message content is parsed with JSON.parse() and fields are used directly without Zod or equivalent schema validation. A malformed or attacker-crafted message can cause type confusion, injection, or crash the MCP server process.
HIGH User-controlled channel pattern passed to Redis PSUBSCRIBE. The MCP tool accepts a channel name or pattern from its arguments and passes it to pSubscribe() without restricting the pattern. An attacker can subscribe to all channels with pattern *, intercepting messages from other services on the same Redis instance.
HIGH AMQP or Redis connection string in environment variable. The connection URL — including password — is stored in process.env.AMQP_URL or process.env.REDIS_URL and visible in process listings, kubectl describe pod output, and debug log lines. Use Secrets Manager or Vault; pass host/port/credentials as separate structured secrets, not as a single URL string.
MEDIUM No message size limit enforced on queue consumption. The MCP server does not set channel.prefetch() or a maximum message byte size. A producer that sends a very large message (hundreds of MB) can cause the Node.js process to exhaust heap memory while parsing the payload.
MEDIUM No dead-letter queue configured or monitored. Messages that fail processing are either silently dropped or cause the consumer to spin in a requeue loop. Configure a dead-letter exchange (DLX) on each queue and alert on DLX message accumulation to detect both bugs and active payload injection attempts.

Paste a GitHub URL at skillaudit.dev to get a graded report card covering message queue security alongside all other MCP security dimensions.