k-skill/k-skill-cleaner/scripts/k_skill_cleaner.py
Jeffrey (Dongkyu) Kim 0b280839d6 Clarify cleaner usage evidence boundaries
The cleanup helper now streams local logs, reports which evidence sources were merged, and keeps README table coverage tied to the central skill-name fixture so the documented cleanup signal stays trustworthy for large local histories and mixed imported counts.

Constraint: Follow-up addresses PR #178 review comments without changing the non-destructive recommendation model.

Rejected: Filtering imported usage JSON by --days inside the helper | imported counts are already aggregated and lack per-record timestamps.

Confidence: high

Scope-risk: narrow

Directive: Keep --usage-json documented as pre-windowed unless the input schema gains timestamped per-record events.

Tested: PYTHONPATH=scripts python3 -m unittest scripts.test_k_skill_cleaner

Tested: node --test scripts/skill-docs.test.js

Tested: npm run lint

Tested: npm run typecheck && npm test

Tested: npm run ci
2026-04-28 18:08:17 +09:00

410 lines
15 KiB
Python
Executable file

#!/usr/bin/env python3
"""Utilities for the k-skill-cleaner skill.
The helper intentionally stays dependency-free: it scans root-level skill
folders, best-effort local agent logs, and optional interview choices to produce
a conservative cleanup shortlist. It never deletes files by itself.
"""
from __future__ import annotations
import argparse
import json
import os
import re
from collections.abc import Iterable, Mapping
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
EXCLUDED_ROOT_DIRS = {
".changeset",
".claude",
".codex",
".cursor",
".git",
".github",
".omx",
".ouroboros",
".vscode",
"docs",
"examples",
"node_modules",
"packages",
"python-packages",
"scripts",
}
AGENT_USAGE_SOURCES = [
{
"agent": "Claude Code",
"paths": ["~/.claude/projects/**/*.jsonl", "~/.claude/transcripts/**/*.jsonl"],
"method": "Scan JSONL transcript lines for skill-trigger events, $skill mentions, and SKILL.md load markers.",
"confidence": "best-effort",
},
{
"agent": "Codex",
"paths": ["~/.codex/sessions/**/*.jsonl", "~/.codex/log/**/*.log", ".omx/logs/**/*.log"],
"method": "Scan Codex session/log lines for routed skill names, $skill invocations, and SKILL.md reads.",
"confidence": "best-effort",
},
{
"agent": "OpenCode",
"paths": ["~/.local/share/opencode/**/*.jsonl", "~/.config/opencode/**/*.jsonl"],
"method": "Scan OpenCode data/config logs when available; ask for an exported transcript otherwise.",
"confidence": "best-effort",
},
{
"agent": "OpenClaw/ClawHub",
"paths": ["~/.openclaw/**/*.jsonl", "~/.clawhub/**/*.jsonl"],
"method": "No stable public trigger-count schema is assumed; use local logs if present or imported JSON counts.",
"confidence": "manual-confirm",
"fallback": "Ask the user to export trigger stats or provide a usage JSON file.",
},
{
"agent": "Hermes Agent",
"paths": ["~/.hermes/**/*.jsonl", "~/.config/hermes/**/*.jsonl"],
"method": "No stable public trigger-count schema is assumed; use local logs if present or imported JSON counts.",
"confidence": "manual-confirm",
"fallback": "Ask the user to export trigger stats or provide a usage JSON file.",
},
]
def resolve_skills_root(root: Path | str) -> Path:
"""Resolve the directory that contains installable skill directories.
Standalone installs tell users to run this helper from inside the
``k-skill-cleaner`` directory with ``--skills-root .``. In that layout, the
current directory is itself a skill, while sibling skill directories live in
the parent directory. Treat that self-skill root as shorthand for its parent
so the advertised standalone command scans the installed skill bundle.
"""
root_path = Path(root).expanduser().resolve()
if (root_path / "SKILL.md").is_file():
parent = root_path.parent
if any(
child.is_dir()
and child.name not in EXCLUDED_ROOT_DIRS
and (child / "SKILL.md").is_file()
for child in parent.iterdir()
):
return parent
return root_path
def find_skill_dirs(root: Path | str) -> list[str]:
"""Return root-level directories that look like installable skills."""
root_path = resolve_skills_root(root)
skills: list[str] = []
for child in root_path.iterdir():
if not child.is_dir() or child.name in EXCLUDED_ROOT_DIRS:
continue
if (child / "SKILL.md").is_file():
skills.append(child.name)
return sorted(skills)
def _walk_strings(value: Any, key_hint: str | None = None) -> Iterable[tuple[str | None, str]]:
if isinstance(value, str):
yield key_hint, value
elif isinstance(value, Mapping):
for key, child in value.items():
yield from _walk_strings(child, str(key))
elif isinstance(value, list):
for child in value:
yield from _walk_strings(child, key_hint)
def _line_mentions_skill(line: str, skill: str) -> bool:
escaped = re.escape(skill)
patterns = [
rf"(?<![\w-])\${escaped}(?![\w-])",
rf"(?i)\bskill(?:[_ -]?name|[_ -]?id)?\s*[:=]\s*['\"]?{escaped}(?![\w-])",
rf"(?<![\w-]){escaped}/SKILL\.md\b",
rf"(?i)\bloaded skill\s*[:=]?\s*['\"]?{escaped}(?![\w-])",
rf"(?i)\busing\s+\${escaped}(?![\w-])",
]
return any(re.search(pattern, line) for pattern in patterns)
def _json_mentions_skill(record: Any, skill: str) -> bool:
key_names = {"skill", "skillname", "skill_name", "skillid", "skill_id", "name"}
for key, value in _walk_strings(record):
normalized_key = (key or "").replace("-", "").replace("_", "").lower()
if normalized_key in key_names and value == skill:
return True
if _line_mentions_skill(value, skill):
return True
return False
def _parse_datetime(value: str | datetime | None) -> datetime | None:
if value is None or isinstance(value, datetime):
parsed = value
else:
raw = value.strip()
if not raw:
return None
if raw.endswith("Z"):
raw = f"{raw[:-1]}+00:00"
try:
parsed = datetime.fromisoformat(raw)
except ValueError:
try:
parsed = datetime.fromisoformat(f"{raw}T00:00:00")
except ValueError as exc:
raise ValueError("since must be an ISO date or datetime") from exc
if parsed is None:
return None
if parsed.tzinfo is None:
return parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
def _line_datetime_from_json(record: Any) -> datetime | None:
timestamp_keys = {"timestamp", "time", "created_at", "createdat", "date", "datetime", "ts"}
if not isinstance(record, Mapping):
return None
for key, value in record.items():
normalized_key = str(key).replace("-", "").replace("_", "").lower()
if normalized_key in timestamp_keys and isinstance(value, str):
try:
return _parse_datetime(value)
except ValueError:
return None
return None
def _line_datetime_from_text(line: str) -> datetime | None:
match = re.search(r"\b\d{4}-\d{2}-\d{2}(?:[T ]\d{2}:\d{2}:\d{2}(?:\.\d+)?(?:Z|[+-]\d{2}:?\d{2})?)?\b", line)
if not match:
return None
raw = match.group(0)
if "T" not in raw and " " not in raw:
raw = f"{raw}T00:00:00"
if re.search(r"[+-]\d{4}$", raw):
raw = f"{raw[:-2]}:{raw[-2:]}"
try:
return _parse_datetime(raw.replace(" ", "T", 1))
except ValueError:
return None
def _mtime_datetime(path: Path) -> datetime:
return datetime.fromtimestamp(path.stat().st_mtime, tz=timezone.utc)
def _line_is_in_window(path: Path, line: str, parsed: Any | None, since: datetime | None) -> bool:
if since is None:
return True
line_dt = _line_datetime_from_json(parsed) if parsed is not None else None
if line_dt is None:
line_dt = _line_datetime_from_text(line)
if line_dt is None:
line_dt = _mtime_datetime(path)
return line_dt >= since
def collect_skill_usage(
log_paths: Iterable[Path | str],
skill_names: Iterable[str],
since: str | datetime | None = None,
) -> dict[str, int]:
"""Best-effort count of skill trigger mentions across local agent logs.
When ``since`` is provided, timestamped records older than the cutoff are
skipped. Lines without parseable timestamps fall back to the log file mtime,
which keeps the selected interview window enforceable even for mixed log
formats.
"""
since_dt = _parse_datetime(since)
skills = sorted(set(skill_names))
counts = {skill: 0 for skill in skills}
for raw_path in log_paths:
path = Path(raw_path).expanduser()
if not path.is_file():
continue
try:
with path.open(encoding="utf-8", errors="replace") as handle:
for line in handle:
parsed: Any | None = None
try:
parsed = json.loads(line)
except json.JSONDecodeError:
parsed = None
if not _line_is_in_window(path, line, parsed, since_dt):
continue
for skill in skills:
if (parsed is not None and _json_mentions_skill(parsed, skill)) or _line_mentions_skill(line, skill):
counts[skill] += 1
except OSError:
continue
return counts
def load_usage_json(path: Path | str | None) -> dict[str, int]:
if path is None:
return {}
data = json.loads(Path(path).read_text(encoding="utf-8"))
if not isinstance(data, Mapping):
raise ValueError("usage JSON must be an object mapping skill names to counts")
counts: dict[str, int] = {}
for key, value in data.items():
try:
counts[str(key)] = int(value)
except (TypeError, ValueError) as exc:
raise ValueError(f"usage count for {key!r} must be an integer") from exc
return counts
def rank_cleanup_candidates(
skill_names: Iterable[str],
usage_counts: Mapping[str, int] | None = None,
never_use: Iterable[str] | None = None,
keep: Iterable[str] | None = None,
low_usage_threshold: int = 1,
) -> list[dict[str, Any]]:
"""Rank deletion/review candidates without touching the filesystem."""
counts = usage_counts or {}
never = set(never_use or [])
protected = set(keep or [])
candidates: list[dict[str, Any]] = []
for skill in sorted(set(skill_names)):
if skill in protected:
continue
count = int(counts.get(skill, 0))
reasons: list[str] = []
score = 0
action = "keep"
if skill in never:
reasons.append("interview_never_use")
score += 100
action = "remove"
if count == 0:
reasons.append("zero_triggers")
score += 50
elif count <= low_usage_threshold:
reasons.append("low_usage")
score += 20
if not reasons:
continue
if action != "remove":
action = "review"
candidates.append(
{
"skill": skill,
"action": action,
"trigger_count": count,
"score": score,
"reasons": reasons,
}
)
return sorted(candidates, key=lambda item: (-item["score"], item["skill"]))
def expand_default_log_paths() -> list[Path]:
paths: list[Path] = []
for source in AGENT_USAGE_SOURCES:
for pattern in source.get("paths", []):
paths.extend(Path().glob(os.path.expanduser(pattern)) if not pattern.startswith("~") else Path.home().glob(pattern[2:]))
return sorted({path for path in paths if path.is_file()})
def parse_csv(value: str | None) -> set[str]:
if not value:
return set()
return {item.strip() for item in value.split(",") if item.strip()}
def _resolve_since(days: int | None, since: str | None, now: datetime | None = None) -> datetime | None:
explicit_since = _parse_datetime(since)
if explicit_since is not None:
return explicit_since
if days is None:
return None
if days < 0:
raise ValueError("days must be zero or greater")
base = now or datetime.now(timezone.utc)
if base.tzinfo is None:
base = base.replace(tzinfo=timezone.utc)
else:
base = base.astimezone(timezone.utc)
return base - timedelta(days=days)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Suggest K-skill cleanup candidates from interviews and usage logs.")
parser.add_argument(
"--skills-root",
default=".",
help="Directory containing root-level skills; a skill directory with SKILL.md auto-scans its parent",
)
parser.add_argument("--usage-json", help="Optional JSON object mapping skill names to trigger counts")
parser.add_argument("--log", action="append", default=[], help="Agent log file to scan; repeatable")
parser.add_argument("--scan-default-logs", action="store_true", help="Best-effort scan known local agent log locations")
parser.add_argument("--never-use", default="", help="Comma-separated skills the user says they never use")
parser.add_argument("--keep", default="", help="Comma-separated skills to protect from suggestions")
parser.add_argument("--low-usage-threshold", type=int, default=1, help="Counts at or below this threshold are review candidates")
parser.add_argument("--days", type=int, help="Only count log records from the last N days; untimestamped lines use file mtime fallback")
parser.add_argument("--since", help="Only count log records on or after this ISO date/datetime; overrides --days")
return parser
def main(argv: list[str] | None = None) -> int:
args = build_parser().parse_args(argv)
skill_names = find_skill_dirs(args.skills_root)
usage_counts = {skill: 0 for skill in skill_names}
usage_counts.update(load_usage_json(args.usage_json))
log_paths = [Path(path) for path in args.log]
if args.scan_default_logs:
log_paths.extend(expand_default_log_paths())
since = _resolve_since(args.days, args.since)
scanned_log_paths = sorted({str(path.expanduser()) for path in log_paths if path.expanduser().is_file()})
log_counts = collect_skill_usage(log_paths, skill_names, since=since)
for skill, count in log_counts.items():
usage_counts[skill] = usage_counts.get(skill, 0) + count
report = {
"skill_count": len(skill_names),
"candidates": rank_cleanup_candidates(
skill_names=skill_names,
usage_counts=usage_counts,
never_use=parse_csv(args.never_use),
keep=parse_csv(args.keep),
low_usage_threshold=args.low_usage_threshold,
),
"agent_usage_sources": AGENT_USAGE_SOURCES,
"time_window": {
"since": since.isoformat() if since is not None else None,
"days": args.days if args.since is None else None,
"scope": "Applies to scanned logs only; usage JSON counts are merged as already aggregated/pre-windowed input.",
"fallback": "Untimestamped log lines are included or skipped by log file mtime.",
},
"usage_json": {
"applied": args.usage_json is not None,
"path": args.usage_json,
"caveat": "Usage JSON counts are treated as already aggregated/pre-windowed and are not filtered by --days or --since.",
},
"scanned_logs": {
"count": len(scanned_log_paths),
"paths": scanned_log_paths,
"caveat": "Unreadable log files are skipped; trigger detection is best-effort.",
},
"safety": "No files were deleted. Review candidates and remove skills in a separate explicit edit.",
}
print(json.dumps(report, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())