Security·Observability·Log Management

MCP server log aggregation security: shipping audit logs without leaking sensitive fields

Centralized log aggregation — Datadog, Elasticsearch/Kibana, Splunk — is essential for operating MCP servers in production. But log pipelines are themselves a credential-exposure surface: a tool argument containing a user's API key or PII that reaches your Datadog workspace is as much a breach as leaking it in an HTTP response. This guide covers how to structure MCP server audit logs for security queries, scrub sensitive fields before they leave the process, and configure TLS for the forwarding pipeline.

The log aggregation threat model

Your MCP server audit log is exposed at multiple points in its journey to a centralized platform:

  1. At emission: the log statement itself may include raw objects containing credentials or PII
  2. At the forwarder: the log agent (Fluent Bit, Vector, Datadog Agent) transmits log lines; if using HTTP without TLS, these are readable in transit
  3. At the platform: everyone with read access to your Datadog/Elastic workspace can query all fields, including fields you didn't intend to expose

SkillAudit grades log aggregation under the Credential Exposure axis. The most common HIGH findings: logging raw tool arguments (which may contain API keys or user tokens), and forwarding over plaintext HTTP rather than TLS.

Step 1: design a safe log schema

The foundation is a structured log schema that defines exactly which fields are safe to include. Define this explicitly — never rely on logging "everything" and hoping nothing sensitive appears.

// src/lib/audit-schema.ts — safe audit log fields
export interface AuditEvent {
  // Identity (safe: internal IDs, not credentials)
  callerId: string;        // opaque caller identifier, not the API key itself
  sessionId: string;       // request correlation ID

  // Tool invocation (safe: name + hash, not args content)
  tool: string;            // tool name
  argsHash: string;        // SHA-256 of serialized args, first 12 chars
  argsSize: number;        // byte size of args — useful for anomaly detection

  // Outcome (safe: code + message, not internal details)
  outcome: "success" | "error" | "denied" | "rate_limited";
  errorType?: string;      // error class name, not message
  durationMs: number;

  // Audit metadata (safe)
  timestamp: string;       // ISO 8601
  serverVersion: string;
}

// NEVER include in audit logs:
// - args: Record<string, unknown> — may contain API keys, tokens, PII
// - error.message — may contain schema details, internal paths
// - error.stack — contains file paths, library names
// - config — contains all secrets

Step 2: scrub at the emitter

The most reliable scrubbing happens at the point where the log is created — before it enters any pipeline. A scrubbing function runs synchronously in your process and never lets sensitive data into the log stream:

import { createHash } from "crypto";

const SENSITIVE_FIELDS = new Set([
  "apiKey", "api_key", "token", "accessToken", "access_token",
  "secret", "password", "credential", "authorization", "bearer",
  "privateKey", "private_key", "clientSecret", "client_secret",
]);

function scrubObject(obj: unknown, depth = 0): unknown {
  if (depth > 5) return "[truncated]";
  if (typeof obj !== "object" || obj === null) return obj;
  if (Array.isArray(obj)) return obj.slice(0, 10).map(v => scrubObject(v, depth + 1));

  const result: Record<string, unknown> = {};
  for (const [key, value] of Object.entries(obj)) {
    const lkey = key.toLowerCase();
    if (SENSITIVE_FIELDS.has(lkey) || SENSITIVE_FIELDS.has(key)) {
      result[key] = "[redacted]";
    } else {
      result[key] = scrubObject(value, depth + 1);
    }
  }
  return result;
}

// Wrap pino/winston to always scrub before emitting
export function createSecureLogger(base: Logger) {
  return {
    info: (fields: Record<string, unknown>, msg?: string) =>
      base.info(scrubObject(fields) as object, msg),
    warn: (fields: Record<string, unknown>, msg?: string) =>
      base.warn(scrubObject(fields) as object, msg),
    error: (fields: Record<string, unknown>, msg?: string) =>
      base.error(scrubObject(fields) as object, msg),
  };
}

Step 3: TLS for log forwarding

Any log forwarder sending data outside your local network must use TLS. The common configurations:

# Fluent Bit → Datadog over TLS
[OUTPUT]
    Name        datadog
    Match       *
    Host        http-intake.logs.datadoghq.com
    TLS         On
    compress    gzip
    apikey      ${DATADOG_API_KEY}
    dd_service  mcp-server
    dd_source   nodejs

# Fluent Bit → Elasticsearch over TLS
[OUTPUT]
    Name            es
    Match           *
    Host            ${ES_HOST}
    Port            9243
    TLS             On
    TLS.Verify      On
    TLS.ca_file     /etc/ssl/certs/ca-bundle.crt
    HTTP_User       ${ES_USERNAME}
    HTTP_Passwd     ${ES_PASSWORD}
    Index           mcp-audit-logs
# Vector → Splunk HEC over TLS
sinks:
  splunk_hec:
    type: splunk_hec_logs
    inputs: ["mcp_logs"]
    endpoint: "https://splunk.internal:8088"
    token: "${SPLUNK_HEC_TOKEN}"
    tls:
      enabled: true
      verify_certificate: true
      ca_file: "/etc/ssl/certs/ca-bundle.crt"

Step 4: field-level access control at the platform

Even with scrubbing at the emitter, configure field-level restrictions at the platform level as defense-in-depth. This protects against a future emitter that doesn't use your scrubber:

Security queries to run after setup

Once logs are centralized, set up these alerting queries for anomaly detection:

# Datadog — alert on rate limit bursts (possible abuse or misconfiguration)
@tool:* @outcome:rate_limited | stats count() by @callerId, @tool
# Alert if any callerId exceeds 100 rate-limited events in 1 hour

# Datadog — alert on consecutive errors (possible probe)
@outcome:error | stats count() by @callerId | where count > 20

# Elasticsearch — callers invoking unusual tool combinations
{
  "query": {
    "bool": {
      "must": [
        { "term": { "outcome": "denied" } },
        { "range": { "@timestamp": { "gte": "now-1h" } } }
      ]
    }
  },
  "aggs": {
    "by_caller": { "terms": { "field": "callerId" } }
  }
}

For more on what to log and how to structure audit events, see MCP server observability and security logging. For the complete secrets management picture, see MCP server secrets management.