Production-Ready Python Script for Real-Time Compaction Latency Tracking in Cassandra 4.x/5.x
Compaction latency directly dictates read amplification, repair window viability, and node stability under sustained write pressure. In distributed deployments, tracking real-time compaction progress requires deterministic polling, stateful ID correlation across intervals, and strict operational boundaries. This workflow delivers an idempotent Python automation script engineered for Cassandra v4.x/v5.x, tailored for DBAs, distributed systems engineers, and DevOps teams managing high-throughput clusters. The methodology aligns with established Python Monitoring for Cassandra Compaction practices while providing actionable telemetry for Advanced Compaction Strategy Tuning & Monitoring. By capturing per-task runtime deltas and throughput estimates, operators can preemptively adjust compaction throughput limits, schedule repairs during low-impact windows, and isolate pathological compaction storms before they cascade into node exhaustion.
Prerequisites & Environment Validation
Before deployment, verify the execution environment meets deterministic requirements. Every validation step includes explicit safety gates, expected outputs, and rollback procedures to prevent configuration drift.
1. Python Runtime & Standard Library
python3 -c "import sys, subprocess, json, logging, time, re, signal; assert sys.version_info >= (3,8), 'Python 3.8+ required'; print('PASS: Runtime validated')"- Expected Output:
PASS: Runtime validated - Safety Check: Fails fast if version < 3.8. No external dependencies required.
- Rollback Path: If validation fails, install Python 3.8+ via package manager (
apt install python3.10/yum install python39) and revert to system default viaupdate-alternatives. Do not symlink over system Python.
2. nodetool Accessibility & JMX Connectivity
timeout 10 nodetool version && nodetool compactionstats -H > /dev/null 2>&1 && echo "PASS: nodetool responsive" || echo "FAIL: JMX/auth or nodetool unreachable"- Expected Output:
PASS: nodetool responsive - Safety Check: 10-second timeout prevents hanging on unresponsive JMX ports. Verifies read-only command execution without authentication prompts.
- Rollback Path: If
FAILoccurs, verifycassandra-env.shJMX settings, ensureLOCAL_JMX=yesor validJMX_USERNAME/PASSWORDare exported, and restore previouscassandra.yamlif JMX was recently modified.
3. Execution Context & Permissions
[ "$(id -u)" -eq 0 ] && echo "WARNING: Running as root. Create dedicated service account." || echo "PASS: Non-root context verified"- Expected Output:
PASS: Non-root context verified - Safety Check: Prevents privilege escalation and accidental
nodetoolstate mutations. - Rollback Path: If running as root, create
cassandra-monitoruser:useradd -r -s /sbin/nologin cassandra-monitor, assign read-onlysudoersrule fornodetool compactionstats, and switch execution context immediately.
Core Architecture & Telemetry Model
The script operates exclusively in read-only mode, polling nodetool compactionstats at configurable intervals. It parses the tabular output into positional columns, correlates active compaction IDs across cycles, calculates runtime latency from first-seen timestamps, estimates throughput using byte deltas, and emits structured JSON snapshots. The repeating polling loop is summarized below:
Key design principles:
- Stateless Across Invocations: No disk persistence. Restarting does not corrupt metrics or leave residual artifacts.
- Deterministic Polling: Configurable interval with exponential backoff on transient failures.
- Circuit Breaker: Halts polling if
nodetoolresponse time exceeds threshold or CPU/IO saturation is detected, preventing monitoring-induced load spikes. - Graceful Degradation: Falls back to last-known-good state and logs structured errors instead of crashing.
Production Script
Save as compaction_latency_tracker.py. The implementation includes explicit timeout handling, whitespace-positional parsing of the tabular nodetool compactionstats output, signal trapping, and JSON emission.
#!/usr/bin/env python3
"""
Real-Time Compaction Latency Tracker for Cassandra v4.x/v5.x
Idempotent, read-only, production-hardened polling workflow.
"""
import subprocess
import re
import time
import json
import logging
import sys
import signal
from typing import Dict, List, Optional, Tuple
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger("cassandra_compaction_tracker")
# Raw `nodetool compactionstats` reports completed/total in plain bytes
# (the `unit` column reads "bytes"). These multipliers also cover the rare
# non-byte units (e.g. "keys") gracefully by defaulting to 1.
UNIT_MULTIPLIERS = {"BYTES": 1, "B": 1, "KB": 1024, "MB": 1024**2, "GB": 1024**3, "TB": 1024**4}
def parse_bytes(value: str, unit: str) -> float:
"""Convert a nodetool compactionstats value+unit pair to bytes."""
try:
num = float(value.replace(",", ""))
return num * UNIT_MULTIPLIERS.get(unit.upper(), 1)
except ValueError:
return 0.0
class CompactionLatencyTracker:
def __init__(self, poll_interval: float = 5.0, timeout: int = 10, nodetool_path: str = "nodetool", max_latency_threshold: float = 300.0):
self.poll_interval = poll_interval
self.timeout = timeout
self.nodetool_path = nodetool_path
self.max_latency_threshold = max_latency_threshold
self.active_compactions: Dict[str, dict] = {}
self.running = True
signal.signal(signal.SIGINT, self._graceful_shutdown)
signal.signal(signal.SIGTERM, self._graceful_shutdown)
def _graceful_shutdown(self, signum, frame):
logger.info("Received termination signal. Flushing state and exiting.")
self.running = False
def _run_nodetool(self) -> Optional[str]:
try:
result = subprocess.run(
[self.nodetool_path, "compactionstats"],
capture_output=True, text=True, timeout=self.timeout, check=True
)
return result.stdout
except subprocess.TimeoutExpired:
logger.warning("nodetool timed out. Circuit breaker engaged for this cycle.")
return None
except subprocess.CalledProcessError as e:
logger.error(f"nodetool failed: {e.stderr.strip()}")
return None
def _parse_compaction_stats(self, raw: str) -> List[dict]:
"""Parse the tabular `nodetool compactionstats` output.
The data section starts at the header row:
id compaction type keyspace table completed total unit progress
Each active compaction is one whitespace-separated row. Note that the
"compaction type" column is two words (e.g. "Compaction", "Validation"),
so a data row has 8 fields total. We parse raw (non -H) output, where
`completed` and `total` are plain integer byte counts.
"""
tasks = []
in_table = False
for line in raw.splitlines():
stripped = line.strip()
if not stripped:
continue
# Detect the header row and switch into table-parsing mode.
if stripped.startswith("id") and "compaction type" in stripped and "progress" in stripped:
in_table = True
continue
if not in_table:
continue
# Stop at the trailing summary lines.
if stripped.lower().startswith(("pending tasks", "active compaction")):
break
parts = stripped.split()
# id, type(2 words), keyspace, table, completed, total, unit, progress
if len(parts) < 8:
continue
tasks.append({
"id": parts[0],
"compaction_type": parts[1],
"keyspace": parts[2],
"table": parts[3],
"completed": parts[4],
"total": parts[5],
"unit": parts[6],
"progress": parts[7],
})
return tasks
def _correlate_and_emit(self, tasks: List[dict]) -> List[dict]:
now = time.time()
current_ids = {t["id"] for t in tasks}
snapshot = []
# Remove completed tasks
for cid in list(self.active_compactions.keys()):
if cid not in current_ids:
del self.active_compactions[cid]
for task in tasks:
tid = task["id"]
if tid not in self.active_compactions:
self.active_compactions[tid] = {
"start_time": now,
"initial_completed": parse_bytes(task.get("completed", "0"), task.get("unit", "B")),
"initial_total": parse_bytes(task.get("total", "0"), task.get("unit", "B"))
}
state = self.active_compactions[tid]
elapsed = now - state["start_time"]
completed_now = parse_bytes(task.get("completed", "0"), task.get("unit", "B"))
total = parse_bytes(task.get("total", "0"), task.get("unit", "B"))
throughput = (completed_now - state["initial_completed"]) / max(elapsed, 0.1)
entry = {
"id": tid,
"keyspace": task.get("keyspace", "unknown"),
"table": task.get("table", "unknown"),
"type": task.get("compaction_type", "unknown"),
"progress_pct": float(task.get("progress", "0.0").replace("%", "")),
"elapsed_sec": round(elapsed, 2),
"throughput_bytes_sec": round(throughput, 2),
"completed_bytes": round(completed_now, 2),
"total_bytes": round(total, 2),
"timestamp_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
}
if elapsed > self.max_latency_threshold:
logger.warning(f"Compaction {tid} exceeded latency threshold ({elapsed:.0f}s > {self.max_latency_threshold}s). Flagging for review.")
entry["latency_alert"] = True
snapshot.append(entry)
return snapshot
def run(self):
logger.info(f"Starting compaction latency tracker. Interval={self.poll_interval}s, Timeout={self.timeout}s")
while self.running:
raw = self._run_nodetool()
if raw is None:
time.sleep(self.poll_interval)
continue
tasks = self._parse_compaction_stats(raw)
snapshot = self._correlate_and_emit(tasks)
print(json.dumps({"compactions": snapshot, "active_count": len(snapshot)}, indent=2))
time.sleep(self.poll_interval)
if __name__ == "__main__":
tracker = CompactionLatencyTracker(poll_interval=5.0, timeout=10, nodetool_path="nodetool")
tracker.run()Script Safety & Rollback Configuration
- Expected Output: Structured JSON array of active compactions with latency, throughput, and alert flags.
- Safety Check:
subprocess.runenforces strict timeout. Circuit breaker logs and skips cycles on failure. Signal handlers ensure clean exit without orphaned processes. - Rollback Path: If JSON output floods logs or causes downstream ingestion bottlenecks, reduce
poll_intervalto15.0or setmax_latency_thresholdlower to trigger early alerts. To revert entirely, kill the process (systemctl stop cassandra-compaction-tracker) and restore baselinenodetoolcron jobs.
Deployment & Execution Workflow
1. Systemd Service Unit
[Unit]
Description=Cassandra Real-Time Compaction Latency Tracker
After=network.target cassandra.service
[Service]
Type=simple
User=cassandra-monitor
ExecStart=/usr/local/bin/python3 /opt/scripts/compaction_latency_tracker.py
Restart=on-failure
RestartSec=10
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target- Expected Output: Service enters
active (running)state. Logs stream tojournalctl -u cassandra-compaction-tracker. - Safety Check:
Restart=on-failureprevents silent crashes. Runs as unprivileged user. - Rollback Path:
systemctl disable --now cassandra-compaction-tracker. Remove unit file from/etc/systemd/system/. Reload daemon:systemctl daemon-reload.
2. Log Rotation Configuration
cat > /etc/logrotate.d/cassandra-compaction-tracker << 'EOF'
/var/log/cassandra/compaction-tracker.log {
daily
rotate 14
compress
missingok
notifempty
copytruncate
postrotate
systemctl reload cassandra-compaction-tracker || true
endscript
}
EOF- Expected Output: Log files rotate daily, compress after 14 days, zero data loss during rotation.
- Safety Check:
copytruncateprevents file descriptor issues.postrotateensures logger reopens safely. - Rollback Path: Delete
/etc/logrotate.d/cassandra-compaction-tracker. Runlogrotate -d /etc/logrotate.confto verify baseline rotation resumes.
Operational Safety, Circuit Breakers & Rollback Paths
Monitoring compaction must never become the source of instability. The script implements three-tier safety boundaries:
- Polling Throttle: If
nodetoolresponse time exceedstimeout, the circuit breaker skips the cycle and logs a warning. This prevents compounding JMX thread exhaustion. - Latency Threshold Alerting: When
elapsed_sec > max_latency_threshold, the script flags the task. Operators should correlate withnodetool compactionhistoryand adjustcompaction_throughput_mb_per_secincassandra.yaml. - Graceful Degradation: On
SIGINT/SIGTERM, the tracker flushes in-memory state, closes file descriptors, and exits with code0. No partial JSON or corrupted state is emitted.
Rollback Protocol:
- Immediate Stop:
systemctl stop cassandra-compaction-tracker - Verify Node Stability: Run
nodetool compactionstatsmanually. Confirm pending tasks decrease. - Revert Configuration: If throughput limits were adjusted at runtime during monitoring, restore the original value with
nodetool setcompactionthroughput <value>(there is nonodetool reloadyaml); to persist the change, editcassandra.yamland restart the node, since most yaml settings are only read at startup. - Post-Incident Audit: Review JSON snapshots for compaction IDs that consistently breach thresholds. Cross-reference with
nodetool tpstatsto identify thread pool saturation.
Integration with Repair & Node Management
Real-time compaction latency tracking directly informs repair scheduling and node lifecycle automation. Compaction and repair compete for disk I/O and CPU; overlapping them causes exponential latency spikes. By consuming the JSON telemetry stream, automation pipelines can:
- Defer Repairs: Trigger repair windows only when
active_count == 0orelapsed_secaverages below baseline thresholds. - Dynamic Throughput Tuning: Feed latency deltas into configuration management tools to adjust
compaction_throughput(namedcompaction_throughput_mb_per_secprior to 4.1) at runtime vianodetool setcompactionthroughput. - Node Decommissioning Safeguards: Block
nodetool decommissionornodetool rebuildif active compaction latency exceeds operational SLAs, preventing data loss or prolonged streaming stalls.
This telemetry model aligns with Apache Cassandra’s official compaction documentation and provides deterministic inputs for infrastructure-as-code pipelines. For deeper parsing standards, reference the subprocess module documentation and the Apache Cassandra nodetool compactionstats reference.