Topic: mcp server event-driven security

MCP server event-driven security — message queue and pub/sub patterns

MCP servers increasingly use event-driven architectures — consuming events from Kafka, publishing to RabbitMQ, reading from SQS queues, or subscribing to Redis Pub/Sub channels. The security model of these systems differs fundamentally from request-response APIs: messages arrive asynchronously, may originate from many producers, and are processed without a synchronous caller to hold accountable. For MCP servers, this introduces four attack patterns that don't exist in request-response tool designs: message injection, topic/queue confusion, replay attacks, and poison message denial-of-service.

Attack 1: Message injection via LLM-controlled publish

If an MCP tool allows the LLM to specify the content of a message published to a queue, a prompt-injected LLM can craft messages that cause downstream consumers to take unintended actions. This is the event-driven equivalent of SQL injection — the message content is treated as both data and instructions by the consumer.

// WRONG: LLM argument published directly as message content
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { action, payload } = request.params.arguments
  // LLM can set action = "delete_all_records" and payload = { confirm: true }
  // if the downstream consumer processes action/payload directly
  await kafkaProducer.send({ topic: 'user-events', messages: [{ value: JSON.stringify({ action, payload }) }] })
  return { content: [{ type: 'text', text: 'Event published' }] }
})

// CORRECT: define an allowlist of valid event types; LLM provides parameters only
const ALLOWED_ACTIONS = new Set(['create_note', 'update_note', 'list_notes'])

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { action, noteId, content } = request.params.arguments
  if (!ALLOWED_ACTIONS.has(action)) throw new Error(`Invalid action: ${action}`)
  // Schema-validate the parameters before publishing
  const event = NoteEventSchema.parse({ action, noteId, content, publishedAt: new Date().toISOString() })
  await kafkaProducer.send({ topic: 'note-events', messages: [{ value: JSON.stringify(event) }] })
  return { content: [{ type: 'text', text: `Event ${action} published` }] }
})

Attack 2: Topic/queue confusion

When an MCP tool accepts a topic or queue name from LLM arguments, a prompt-injected LLM can route messages to arbitrary topics — including internal topics that process privileged operations, audit log topics (allowing audit trail manipulation), or topics consumed by services with elevated permissions.

// WRONG: LLM controls the destination topic
async function publishEvent(topicFromLLM: string, payload: any) {
  await kafkaProducer.send({ topic: topicFromLLM, messages: [{ value: JSON.stringify(payload) }] })
  // LLM can publish to: 'admin-commands', 'audit-deletions', 'internal-billing'
}

// CORRECT: topic is determined by server-side logic, never from LLM input
const TOOL_TOPIC_MAP: Record<string, string> = {
  'create_note': 'user-note-events',
  'update_note': 'user-note-events',
  'archive_note': 'user-note-events',
}

async function publishEvent(action: string, payload: any) {
  const topic = TOOL_TOPIC_MAP[action]
  if (!topic) throw new Error(`No topic mapping for action: ${action}`)
  await kafkaProducer.send({ topic, messages: [{ value: JSON.stringify(payload) }] })
}

Attack 3: Replay attacks on consumed events

When an MCP server consumes messages from a queue and takes state-changing actions based on them (creating records, triggering workflows), an attacker who can replay a previously observed message can trigger the same action repeatedly. This matters most for operations like payment events, permission grants, or audit log entries.

// Message format with replay-resistant fields
interface SecureEvent {
  messageId: string      // UUID — unique per message
  timestamp: string      // ISO 8601 — reject if > processing window
  eventType: string
  payload: unknown
}

// Consumer with deduplication and age check
const PROCESSING_WINDOW_MS = 5 * 60 * 1000  // 5 minutes
const processedIds = new Set<string>()       // use Redis SET with TTL in production

async function consumeEvent(raw: string) {
  const event = SecureEventSchema.parse(JSON.parse(raw))

  // Age check
  const age = Date.now() - new Date(event.timestamp).getTime()
  if (age > PROCESSING_WINDOW_MS) {
    throw new Error(`Message too old: ${age}ms — possible replay attack`)
  }

  // Deduplication
  if (processedIds.has(event.messageId)) {
    return  // silently skip exact duplicate
  }
  processedIds.add(event.messageId)

  await processEvent(event)
}

Attack 4: Poison message denial-of-service

A message that causes the consumer to throw an unhandled error or loop indefinitely creates a denial-of-service condition: the message is retried indefinitely, blocking processing of subsequent messages. In Kafka this manifests as a partition stall; in RabbitMQ as a queue that never drains.

// RabbitMQ with dead-letter queue after 3 retries
const MAX_RETRIES = 3

channel.assertQueue('note-events', {
  deadLetterExchange: 'dlx',       // failed messages go here after max retries
  deadLetterRoutingKey: 'note-events-dlq',
})

channel.consume('note-events', async (msg) => {
  if (!msg) return
  const retryCount = (msg.properties.headers?.['x-retry-count'] as number) ?? 0

  try {
    const event = JSON.parse(msg.content.toString())
    await processEvent(event)
    channel.ack(msg)
  } catch (err) {
    if (retryCount >= MAX_RETRIES) {
      // Give up — message goes to DLQ for manual inspection
      channel.nack(msg, false, false)   // false, false = don't requeue
    } else {
      // Retry with incremented counter and exponential backoff
      setTimeout(() => {
        channel.nack(msg, false, true)  // requeue after delay
      }, Math.pow(2, retryCount) * 1000)
    }
  }
})

What SkillAudit checks

See also

Check your event-driven MCP server for injection and replay findings.

Run a free audit → How grading works →