Topic: mcp server event-driven security
MCP server event-driven security — message queue and pub/sub patterns
MCP servers increasingly use event-driven architectures — consuming events from Kafka, publishing to RabbitMQ, reading from SQS queues, or subscribing to Redis Pub/Sub channels. The security model of these systems differs fundamentally from request-response APIs: messages arrive asynchronously, may originate from many producers, and are processed without a synchronous caller to hold accountable. For MCP servers, this introduces four attack patterns that don't exist in request-response tool designs: message injection, topic/queue confusion, replay attacks, and poison message denial-of-service.
Attack 1: Message injection via LLM-controlled publish
If an MCP tool allows the LLM to specify the content of a message published to a queue, a prompt-injected LLM can craft messages that cause downstream consumers to take unintended actions. This is the event-driven equivalent of SQL injection — the message content is treated as both data and instructions by the consumer.
// WRONG: LLM argument published directly as message content
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { action, payload } = request.params.arguments
// LLM can set action = "delete_all_records" and payload = { confirm: true }
// if the downstream consumer processes action/payload directly
await kafkaProducer.send({ topic: 'user-events', messages: [{ value: JSON.stringify({ action, payload }) }] })
return { content: [{ type: 'text', text: 'Event published' }] }
})
// CORRECT: define an allowlist of valid event types; LLM provides parameters only
const ALLOWED_ACTIONS = new Set(['create_note', 'update_note', 'list_notes'])
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { action, noteId, content } = request.params.arguments
if (!ALLOWED_ACTIONS.has(action)) throw new Error(`Invalid action: ${action}`)
// Schema-validate the parameters before publishing
const event = NoteEventSchema.parse({ action, noteId, content, publishedAt: new Date().toISOString() })
await kafkaProducer.send({ topic: 'note-events', messages: [{ value: JSON.stringify(event) }] })
return { content: [{ type: 'text', text: `Event ${action} published` }] }
})
Attack 2: Topic/queue confusion
When an MCP tool accepts a topic or queue name from LLM arguments, a prompt-injected LLM can route messages to arbitrary topics — including internal topics that process privileged operations, audit log topics (allowing audit trail manipulation), or topics consumed by services with elevated permissions.
// WRONG: LLM controls the destination topic
async function publishEvent(topicFromLLM: string, payload: any) {
await kafkaProducer.send({ topic: topicFromLLM, messages: [{ value: JSON.stringify(payload) }] })
// LLM can publish to: 'admin-commands', 'audit-deletions', 'internal-billing'
}
// CORRECT: topic is determined by server-side logic, never from LLM input
const TOOL_TOPIC_MAP: Record<string, string> = {
'create_note': 'user-note-events',
'update_note': 'user-note-events',
'archive_note': 'user-note-events',
}
async function publishEvent(action: string, payload: any) {
const topic = TOOL_TOPIC_MAP[action]
if (!topic) throw new Error(`No topic mapping for action: ${action}`)
await kafkaProducer.send({ topic, messages: [{ value: JSON.stringify(payload) }] })
}
Attack 3: Replay attacks on consumed events
When an MCP server consumes messages from a queue and takes state-changing actions based on them (creating records, triggering workflows), an attacker who can replay a previously observed message can trigger the same action repeatedly. This matters most for operations like payment events, permission grants, or audit log entries.
// Message format with replay-resistant fields
interface SecureEvent {
messageId: string // UUID — unique per message
timestamp: string // ISO 8601 — reject if > processing window
eventType: string
payload: unknown
}
// Consumer with deduplication and age check
const PROCESSING_WINDOW_MS = 5 * 60 * 1000 // 5 minutes
const processedIds = new Set<string>() // use Redis SET with TTL in production
async function consumeEvent(raw: string) {
const event = SecureEventSchema.parse(JSON.parse(raw))
// Age check
const age = Date.now() - new Date(event.timestamp).getTime()
if (age > PROCESSING_WINDOW_MS) {
throw new Error(`Message too old: ${age}ms — possible replay attack`)
}
// Deduplication
if (processedIds.has(event.messageId)) {
return // silently skip exact duplicate
}
processedIds.add(event.messageId)
await processEvent(event)
}
Attack 4: Poison message denial-of-service
A message that causes the consumer to throw an unhandled error or loop indefinitely creates a denial-of-service condition: the message is retried indefinitely, blocking processing of subsequent messages. In Kafka this manifests as a partition stall; in RabbitMQ as a queue that never drains.
// RabbitMQ with dead-letter queue after 3 retries
const MAX_RETRIES = 3
channel.assertQueue('note-events', {
deadLetterExchange: 'dlx', // failed messages go here after max retries
deadLetterRoutingKey: 'note-events-dlq',
})
channel.consume('note-events', async (msg) => {
if (!msg) return
const retryCount = (msg.properties.headers?.['x-retry-count'] as number) ?? 0
try {
const event = JSON.parse(msg.content.toString())
await processEvent(event)
channel.ack(msg)
} catch (err) {
if (retryCount >= MAX_RETRIES) {
// Give up — message goes to DLQ for manual inspection
channel.nack(msg, false, false) // false, false = don't requeue
} else {
// Retry with incremented counter and exponential backoff
setTimeout(() => {
channel.nack(msg, false, true) // requeue after delay
}, Math.pow(2, retryCount) * 1000)
}
}
})
What SkillAudit checks
- LLM-controlled topic or queue name — HIGH; topic confusion and privilege escalation
- No message schema validation on consumed events — WARN; poison message vulnerability
- No dead-letter queue configuration — WARN; poison message DoS
- No deduplication or age check on consumed events — WARN for state-changing consumers; replay attack surface
See also
- MCP server input sanitization — schema validation for synchronous tool inputs
- MCP server rate limit bypass — per-tool rate limits that complement message queue rate controls
- MCP server OWASP Top 10 — injection and resource exhaustion covering event-driven patterns
- Public audit corpus — event-driven and queue-related findings
Check your event-driven MCP server for injection and replay findings.
Run a free audit → How grading works →