k-skill/kakaotalk-mac/scripts/kakaotalk_mac.py
Jeffrey (Dongkyu) Kim c9116b555f
Feature/#121 (#127)
* Recover KakaoTalk mac skill auth when upstream user_id detection fails

Issue #121 reproduces on a real MacBook because `kakaocli auth` can fail even when the encrypted hex-named DB exists. This change adds a thin repo-owned helper that recovers the active user_id from plist revision hashes, caches the validated DB/key tuple, and reuses it for read-only `kakaocli` commands. The skill and feature docs now steer users to the helper when upstream auto-detection stops at candidate key mismatch, and regression tests lock the recovery flow before the implementation.

Constraint: Must stay a thin adapter around upstream kakaocli rather than forking the CLI
Constraint: Must verify on a real local macOS KakaoTalk install where issue #121 reproduces
Rejected: Full kakaocli reimplementation inside k-skill | too broad for the user_id/key-derivation failure scope
Rejected: Docs-only workaround | does not actually fix the broken auth path for users
Confidence: high
Scope-risk: moderate
Reversibility: clean
Directive: Keep this helper limited to auth/key recovery and read-only passthrough unless upstream gaps widen materially
Tested: python3 -m unittest scripts.test_kakaotalk_mac
Tested: node --test scripts/skill-docs.test.js
Tested: npm run ci
Tested: python3 scripts/kakaotalk_mac.py auth --refresh --max-user-id 800000000 --workers 8 --chunk-size 2000000
Tested: python3 scripts/kakaotalk_mac.py chats --limit 1 --json
Not-tested: Other kakaocli subcommands beyond auth/chats/messages/search/query/schema

* Protect the KakaoTalk helper's safe recovery path

Address the PR follow-up by treating malformed auth cache files as cache misses,
removing write-capable passthrough from the wrapper surface, and redacting
human-readable auth output so the cached SQLCipher key is not echoed back into
terminal history. The docs and regression suite now describe and enforce the
read-only contract that the helper is meant to preserve.

Constraint: Helper must remain a read-only recovery wrapper around local kakaocli access
Rejected: Keep query support with SQL validation | still leaves a risky write-capable escape hatch
Confidence: high
Scope-risk: narrow
Reversibility: clean
Directive: Do not re-expose arbitrary SQL passthrough or print the SQLCipher key in default text output
Tested: python3 -m unittest scripts.test_kakaotalk_mac; node --test scripts/skill-docs.test.js; npm run ci; python3 scripts/kakaotalk_mac.py auth --refresh --max-user-id 800000000 --workers 8 --chunk-size 2000000; python3 scripts/kakaotalk_mac.py chats --limit 1 --json; python3 scripts/kakaotalk_mac.py auth --cache-path <bad-json>; python3 scripts/kakaotalk_mac.py query --help
Not-tested: External automation consumers that depend on shell/json auth output beyond the documented helper flows

* Lock the helper CLI surface against accidental regressions

The approved issue #121 fixes already hardened the KakaoTalk Mac helper, but the test suite still only exercised the passthrough validator directly. Add an explicit parser-level regression so the public CLI contract stays read-only and `query` cannot quietly reappear in future edits.

Constraint: Follow-up is on the existing feature/#121 PR branch and must stay minimal
Rejected: Re-open helper implementation changes | current code already satisfies the approved review findings
Confidence: high
Scope-risk: narrow
Reversibility: clean
Directive: Keep parser exposure tests aligned with READ_ONLY_COMMANDS whenever helper subcommands change
Tested: python3 -m unittest scripts.test_kakaotalk_mac; node --test scripts/skill-docs.test.js; npm run ci; python3 scripts/kakaotalk_mac.py auth --refresh --max-user-id 800000000 --workers 8 --chunk-size 2000000; python3 scripts/kakaotalk_mac.py chats --limit 1 --json; python3 scripts/kakaotalk_mac.py auth --cache-path <bad-json>
Not-tested: No new production code paths changed in this follow-up

* Honor explicit Kakao auth recovery overrides

The helper now treats manual auth overrides as a cache-bypassing recovery request and rejects invalid brute-force tuning flags at the CLI boundary so users get deterministic behavior instead of stale cached tuples or Python tracebacks. Regression coverage locks both paths before the PR follow-up lands.

Constraint: The helper must remain a thin read-only wrapper around kakaocli auth recovery
Rejected: Require --refresh whenever --user-id/--uuid is passed | worse UX than honoring overrides directly
Confidence: high
Scope-risk: narrow
Reversibility: clean
Directive: Keep explicit auth overrides ahead of cache reuse unless the CLI contract is redesigned and documented
Tested: python3 -m unittest scripts.test_kakaotalk_mac; node --test scripts/skill-docs.test.js; npm run ci; python3 scripts/kakaotalk_mac.py auth --refresh --max-user-id 800000000 --workers 8 --chunk-size 2000000; python3 scripts/kakaotalk_mac.py chats --limit 1 --json; python3 scripts/kakaotalk_mac.py auth --cache-path <bad-json>; python3 scripts/kakaotalk_mac.py auth --refresh --max-user-id -1; python3 scripts/kakaotalk_mac.py auth --refresh --workers 2 --chunk-size 0 --max-user-id 10; python3 scripts/kakaotalk_mac.py auth --cache-path <temp-cache> --user-id 999; python3 scripts/kakaotalk_mac.py auth --cache-path <temp-cache> --uuid <live-uuid>
Not-tested: Manual override success with a truly alternate valid user_id/uuid pair on a multi-account local install
2026-04-18 01:08:06 +09:00

658 lines
22 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import base64
import hashlib
import json
import multiprocessing as mp
import os
import re
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Callable, Iterable, Sequence
EMPTY_ACCOUNT_HASH = (
"31bca02094eb78126a517b206a88c73cfa9ec6f704c7030d18212cace820f025"
"f00bf0ea68dbf3f3a5436ca63b53bf7bf80ad8d5de7d8359d0b7fed9dbc3ab99"
)
HEX_DATABASE_PATTERN = re.compile(r"^[0-9a-f]{78}(?:\.db)?$")
DIRECT_USER_ID_KEYS = ("userId", "user_id", "KAKAO_USER_ID", "userID")
DEFAULT_MAX_USER_ID = 1_000_000_000
DEFAULT_CHUNK_SIZE = 500_000
DEFAULT_CACHE_PATH = Path.home() / ".cache" / "k-skill" / "kakaotalk-mac-auth.json"
READ_ONLY_COMMANDS = ("chats", "messages", "search", "schema")
class AuthResolutionError(RuntimeError):
pass
@dataclass
class DetectionState:
uuid: str
candidate_user_ids: list[int]
active_account_hash: str | None
database_files: list[Path]
@dataclass
class ResolvedAuth:
user_id: int
uuid: str
database_path: Path
database_name: str
key: str
source: str
def parse_plist_xml(xml_text: str) -> Any:
tokens = tokenize_plist_xml(xml_text)
if not tokens:
raise AuthResolutionError("plist XML was empty")
index = 0
if tokens[index] != ("start", "plist"):
raise AuthResolutionError("plist XML did not start with <plist>")
value, index = _parse_plist_tokens(tokens, index + 1)
if tokens[index] != ("end", "plist"):
raise AuthResolutionError("plist XML did not end with </plist>")
return value
def tokenize_plist_xml(xml_text: str) -> list[tuple[str, str]]:
normalized = re.sub(r"<\?xml[^>]*\?>", "", xml_text)
normalized = re.sub(r"<!DOCTYPE[^>]*>", "", normalized)
normalized = re.sub(r"<([A-Za-z0-9]+)\s*/>", r"<\1></\1>", normalized)
normalized = (
normalized.replace("\r", "")
)
tokens: list[tuple[str, str]] = []
position = 0
for match in re.finditer(r"<(/?)([A-Za-z0-9]+)(?: [^>]*)?>", normalized):
text = normalized[position : match.start()]
stripped = _unescape_xml(text).strip()
if stripped:
tokens.append(("text", stripped))
token_type = "end" if match.group(1) else "start"
tokens.append((token_type, match.group(2)))
position = match.end()
trailing = _unescape_xml(normalized[position:]).strip()
if trailing:
tokens.append(("text", trailing))
return [token for token in tokens if token[0] != "text" or token[1]]
def _parse_plist_tokens(tokens: list[tuple[str, str]], index: int) -> tuple[Any, int]:
token_type, tag = tokens[index]
if token_type != "start":
raise AuthResolutionError(f"Unexpected token {tokens[index]!r}")
if tag == "dict":
result: dict[str, Any] = {}
index += 1
while tokens[index] != ("end", "dict"):
if tokens[index] != ("start", "key"):
raise AuthResolutionError(f"Expected dict key, got {tokens[index]!r}")
key, index = _parse_scalar(tokens, index, "key", lambda value: value)
value, index = _parse_plist_tokens(tokens, index)
result[key] = value
return result, index + 1
if tag == "array":
items: list[Any] = []
index += 1
while tokens[index] != ("end", "array"):
value, index = _parse_plist_tokens(tokens, index)
items.append(value)
return items, index + 1
if tag == "integer":
return _parse_scalar(tokens, index, "integer", int)
if tag == "real":
return _parse_scalar(tokens, index, "real", float)
if tag == "string":
return _parse_scalar(tokens, index, "string", lambda value: value)
if tag == "date":
return _parse_scalar(tokens, index, "date", lambda value: value)
if tag == "data":
return _parse_scalar(tokens, index, "data", lambda value: value)
if tag == "true":
return True, index + 2
if tag == "false":
return False, index + 2
raise AuthResolutionError(f"Unsupported plist tag: {tag}")
def _parse_scalar(
tokens: list[tuple[str, str]],
index: int,
tag: str,
caster: Callable[[str], Any],
) -> tuple[Any, int]:
if tokens[index] != ("start", tag):
raise AuthResolutionError(f"Expected <{tag}>, got {tokens[index]!r}")
text = ""
index += 1
if tokens[index][0] == "text":
text = tokens[index][1]
index += 1
if tokens[index] != ("end", tag):
raise AuthResolutionError(f"Expected </{tag}>, got {tokens[index]!r}")
return caster(text), index + 1
def _unescape_xml(text: str) -> str:
return (
text.replace("&lt;", "<")
.replace("&gt;", ">")
.replace("&amp;", "&")
.replace("&quot;", '"')
.replace("&apos;", "'")
)
def collect_candidate_user_ids(plist_data: dict[str, Any]) -> list[int]:
candidates: list[int] = []
for key in DIRECT_USER_ID_KEYS:
value = plist_data.get(key)
if isinstance(value, int) and value > 0:
candidates.append(value)
elif isinstance(value, str) and value.isdigit():
candidates.append(int(value))
alert_ids = plist_data.get("AlertKakaoIDsList", [])
if isinstance(alert_ids, list):
for item in alert_ids:
if isinstance(item, int) and item > 0:
candidates.append(item)
elif isinstance(item, str) and item.isdigit():
numeric = int(item)
if numeric > 0:
candidates.append(numeric)
return unique_ints(candidates)
def find_active_account_hash(plist_data: dict[str, Any]) -> str | None:
prefix = "DESIGNATEDFRIENDSREVISION:"
for key, value in plist_data.items():
if not key.startswith(prefix):
continue
hash_hex = key[len(prefix) :]
if hash_hex == EMPTY_ACCOUNT_HASH:
continue
if not re.fullmatch(r"[0-9a-f]{128}", hash_hex):
continue
numeric_value = 0
if isinstance(value, (int, float)):
numeric_value = int(value)
elif isinstance(value, str) and value.isdigit():
numeric_value = int(value)
if numeric_value != 0:
return hash_hex
return None
def discover_database_files(container_path: Path) -> list[Path]:
if not container_path.exists():
return []
return sorted(
[path for path in container_path.iterdir() if path.is_file() and HEX_DATABASE_PATTERN.fullmatch(path.name)],
key=lambda item: item.name,
)
def unique_ints(values: Iterable[int]) -> list[int]:
seen: set[int] = set()
ordered: list[int] = []
for value in values:
if value not in seen:
seen.add(value)
ordered.append(value)
return ordered
def recover_user_id_from_sha512(
hex_hash: str,
*,
max_user_id: int = DEFAULT_MAX_USER_ID,
workers: int | None = None,
chunk_size: int = DEFAULT_CHUNK_SIZE,
) -> int | None:
if not re.fullmatch(r"[0-9a-f]{128}", hex_hash):
raise ValueError("expected 128-char lowercase sha512 hex digest")
if max_user_id < 0:
raise ValueError("max_user_id must be non-negative")
normalized_workers = max(1, workers or (os.cpu_count() or 1))
if normalized_workers == 1:
return _scan_user_id_range((0, max_user_id + 1, hex_hash))
start_method = "fork" if "fork" in mp.get_all_start_methods() else mp.get_start_method()
ctx = mp.get_context(start_method)
job_iter = (
(start, min(start + chunk_size, max_user_id + 1), hex_hash)
for start in range(0, max_user_id + 1, chunk_size)
)
with ctx.Pool(processes=normalized_workers) as pool:
for result in pool.imap_unordered(_scan_user_id_range, job_iter, chunksize=1):
if result is not None:
pool.terminate()
return result
return None
def _scan_user_id_range(job: tuple[int, int, str]) -> int | None:
start, end, hex_hash = job
target = hex_hash.encode("ascii")
for user_id in range(start, end):
if hashlib.sha512(str(user_id).encode("utf-8")).hexdigest().encode("ascii") == target:
return user_id
return None
def database_name(user_id: int, uuid: str) -> str:
hawawa = ".".join([".", "F", str(user_id), "A", "F", uuid[::-1], ".", "|"])
salt = hashed_device_uuid(uuid)[::-1].encode("utf-8")
derived = hashlib.pbkdf2_hmac("sha256", hawawa.encode("utf-8"), salt, 100_000, 128)
hex_value = derived.hex()
return hex_value[28 : 28 + 78]
def secure_key(user_id: int, uuid: str) -> str:
hashed = hashed_device_uuid(uuid)
parts = ["A", hashed, "|", "F", uuid[:5], "H", str(user_id), "|", uuid[7:]]
hawawa = "F".join(parts)[::-1].encode("utf-8")
salt = uuid[int(len(uuid) * 0.3) :].encode("utf-8")
return hashlib.pbkdf2_hmac("sha256", hawawa, salt, 100_000, 128).hex()
def hashed_device_uuid(uuid: str) -> str:
uuid_bytes = uuid.encode("utf-8")
combined = hashlib.sha1(uuid_bytes).digest() + hashlib.sha256(uuid_bytes).digest()
return base64.b64encode(combined).decode("ascii")
def resolve_auth_state(
state: DetectionState,
*,
verify_access: Callable[[ResolvedAuth], bool],
cache_path: Path | None = None,
user_id_override: int | None = None,
max_user_id: int = DEFAULT_MAX_USER_ID,
workers: int | None = None,
chunk_size: int = DEFAULT_CHUNK_SIZE,
) -> ResolvedAuth:
if not state.database_files:
raise AuthResolutionError("No KakaoTalk database files were discovered in the container path.")
candidates = list(state.candidate_user_ids)
if user_id_override is not None:
candidates = [user_id_override, *candidates]
candidates = unique_ints(candidates)
for user_id in candidates:
resolved = _try_resolved_auth(user_id, "candidate", state, verify_access)
if resolved is not None:
persist_auth_cache(resolved, cache_path)
return resolved
if state.active_account_hash:
recovered = recover_user_id_from_sha512(
state.active_account_hash,
max_user_id=max_user_id,
workers=workers,
chunk_size=chunk_size,
)
if recovered is not None and recovered not in candidates:
resolved = _try_resolved_auth(recovered, "hash-recovery", state, verify_access)
if resolved is not None:
persist_auth_cache(resolved, cache_path)
return resolved
raise AuthResolutionError(
"Failed to resolve a working KakaoTalk auth key. "
"Try a larger --max-user-id or pass --user-id explicitly."
)
def _try_resolved_auth(
user_id: int,
source: str,
state: DetectionState,
verify_access: Callable[[ResolvedAuth], bool],
) -> ResolvedAuth | None:
derived_name = database_name(user_id, state.uuid)
key = secure_key(user_id, state.uuid)
for database_path in prioritized_database_paths(state.database_files, derived_name):
resolved = ResolvedAuth(
user_id=user_id,
uuid=state.uuid,
database_path=database_path,
database_name=derived_name,
key=key,
source=source,
)
if verify_access(resolved):
return resolved
return None
def prioritized_database_paths(database_files: Sequence[Path], derived_name: str) -> list[Path]:
preferred_names = {derived_name, f"{derived_name}.db"}
preferred = [path for path in database_files if path.name in preferred_names]
fallback = [path for path in database_files if path.name not in preferred_names]
return [*preferred, *fallback]
def load_cached_auth(cache_path: Path) -> ResolvedAuth | None:
if not cache_path.exists():
return None
try:
payload = json.loads(cache_path.read_text(encoding="utf-8"))
database_path = Path(payload["database_path"]).expanduser()
user_id = int(payload["user_id"])
uuid = str(payload["uuid"])
database_name = str(payload["database_name"])
key = str(payload["key"])
source = str(payload.get("source", "cache"))
except (OSError, json.JSONDecodeError, KeyError, TypeError, ValueError):
return None
if user_id <= 0 or not uuid or not database_name or not key or not database_path.exists():
return None
return ResolvedAuth(
user_id=user_id,
uuid=uuid,
database_path=database_path,
database_name=database_name,
key=key,
source=source,
)
def persist_auth_cache(resolved: ResolvedAuth, cache_path: Path | None) -> None:
if cache_path is None:
return
cache_path.parent.mkdir(parents=True, exist_ok=True)
payload = {
"user_id": resolved.user_id,
"uuid": resolved.uuid,
"database_path": str(resolved.database_path),
"database_name": resolved.database_name,
"key": resolved.key,
"source": resolved.source,
}
cache_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
try:
os.chmod(cache_path, 0o600)
except OSError:
pass
def platform_uuid() -> str:
result = run_command(["/usr/sbin/ioreg", "-rd1", "-c", "IOPlatformExpertDevice"], check=True)
match = re.search(r'"IOPlatformUUID" = "([0-9A-F-]+)"', result.stdout)
if not match:
raise AuthResolutionError("Could not read IOPlatformUUID from ioreg output.")
return match.group(1)
def convert_plist_to_xml(plist_path: Path) -> str:
result = run_command(["/usr/bin/plutil", "-convert", "xml1", "-o", "-", str(plist_path)], check=True)
return result.stdout
def read_plist_snapshot(plist_path: Path) -> dict[str, Any]:
return parse_plist_xml(convert_plist_to_xml(plist_path))
def collect_detection_state(uuid_override: str | None = None) -> DetectionState:
uuid = uuid_override or platform_uuid()
snapshots = []
for plist_path in preference_paths():
if plist_path.exists():
snapshots.append(read_plist_snapshot(plist_path))
candidate_user_ids: list[int] = []
active_account_hash: str | None = None
for snapshot in snapshots:
candidate_user_ids.extend(collect_candidate_user_ids(snapshot))
if active_account_hash is None:
active_account_hash = find_active_account_hash(snapshot)
return DetectionState(
uuid=uuid,
candidate_user_ids=unique_ints(candidate_user_ids),
active_account_hash=active_account_hash,
database_files=discover_database_files(container_path()),
)
def preference_paths() -> list[Path]:
pref_dir = (
Path.home()
/ "Library"
/ "Containers"
/ "com.kakao.KakaoTalkMac"
/ "Data"
/ "Library"
/ "Preferences"
)
paths = sorted(pref_dir.glob("com.kakao.KakaoTalkMac*.plist"))
global_pref = Path.home() / "Library" / "Preferences" / "com.kakao.KakaoTalkMac.plist"
if global_pref.exists():
paths.append(global_pref)
deduped: list[Path] = []
seen: set[Path] = set()
for path in paths:
if path not in seen:
seen.add(path)
deduped.append(path)
return deduped
def container_path() -> Path:
return (
Path.home()
/ "Library"
/ "Containers"
/ "com.kakao.KakaoTalkMac"
/ "Data"
/ "Library"
/ "Application Support"
/ "com.kakao.KakaoTalkMac"
)
def verify_database_access(resolved: ResolvedAuth) -> bool:
result = run_command(
[
"kakaocli",
"query",
"SELECT count(*) FROM sqlite_master",
"--db",
str(resolved.database_path),
"--key",
resolved.key,
],
check=False,
)
return result.returncode == 0
def run_command(args: Sequence[str], *, check: bool) -> subprocess.CompletedProcess[str]:
result = subprocess.run(args, capture_output=True, text=True, check=False)
if check and result.returncode != 0:
raise AuthResolutionError(result.stderr.strip() or result.stdout.strip() or f"command failed: {' '.join(args)}")
return result
def resolve_auth(
*,
refresh: bool,
cache_path: Path,
user_id_override: int | None,
uuid_override: str | None,
max_user_id: int,
workers: int | None,
chunk_size: int,
) -> ResolvedAuth:
use_cache = not refresh and user_id_override is None and uuid_override is None
if use_cache:
cached = load_cached_auth(cache_path)
if cached is not None:
return cached
state = collect_detection_state(uuid_override)
return resolve_auth_state(
state,
verify_access=verify_database_access,
cache_path=cache_path,
user_id_override=user_id_override,
max_user_id=max_user_id,
workers=workers,
chunk_size=chunk_size,
)
def render_auth(resolved: ResolvedAuth, *, output_format: str, cache_path: Path) -> str:
payload = {
"user_id": resolved.user_id,
"uuid": resolved.uuid,
"database_path": str(resolved.database_path),
"database_name": resolved.database_name,
"key": resolved.key,
"source": resolved.source,
"cache_path": str(cache_path),
}
if output_format == "json":
return json.dumps(payload, ensure_ascii=False, indent=2)
if output_format == "shell":
return "\n".join(
[
f"export KSKILL_KAKAOTALK_USER_ID='{resolved.user_id}'",
f"export KSKILL_KAKAOTALK_UUID='{resolved.uuid}'",
f"export KSKILL_KAKAOTALK_DB='{resolved.database_path}'",
f"export KSKILL_KAKAOTALK_KEY='{resolved.key}'",
f"export KSKILL_KAKAOTALK_AUTH_CACHE='{cache_path}'",
]
)
return "\n".join(
[
"KakaoTalk auth resolved",
f"- user_id: {resolved.user_id}",
f"- uuid: {resolved.uuid}",
f"- database: {resolved.database_path}",
f"- source: {resolved.source}",
f"- cache: {cache_path}",
"- secrets: redacted in text output (use --format json or --format shell only when automation needs them)",
"",
"You can now run:",
" python3 scripts/kakaotalk_mac.py chats --limit 10 --json",
" python3 scripts/kakaotalk_mac.py messages --chat \"채팅방 이름\" --since 1d --json",
]
)
def build_passthrough_command(command: str, auth: ResolvedAuth, forwarded_args: Sequence[str]) -> list[str]:
if command not in READ_ONLY_COMMANDS:
raise AuthResolutionError(
f"Unsupported command '{command}'. Allowed read-only commands: {', '.join(READ_ONLY_COMMANDS)}"
)
return [
"kakaocli",
command,
*forwarded_args,
"--db",
str(auth.database_path),
"--key",
auth.key,
]
def main(argv: Sequence[str] | None = None) -> int:
parser = build_parser()
try:
args, forwarded_args = parser.parse_known_args(argv)
cache_path = Path(args.cache_path).expanduser()
if args.command == "auth":
if forwarded_args:
raise AuthResolutionError(f"Unexpected auth arguments: {' '.join(forwarded_args)}")
resolved = resolve_auth(
refresh=args.refresh,
cache_path=cache_path,
user_id_override=args.user_id,
uuid_override=args.uuid,
max_user_id=args.max_user_id,
workers=args.workers,
chunk_size=args.chunk_size,
)
print(render_auth(resolved, output_format=args.format, cache_path=cache_path))
return 0
resolved = resolve_auth(
refresh=args.refresh_auth,
cache_path=cache_path,
user_id_override=args.user_id,
uuid_override=args.uuid,
max_user_id=args.max_user_id,
workers=args.workers,
chunk_size=args.chunk_size,
)
result = subprocess.run(build_passthrough_command(args.command, resolved, forwarded_args))
return result.returncode
except (AuthResolutionError, ValueError) as exc:
print(f"error: {exc}", file=sys.stderr)
return 1
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Thin k-skill adapter around kakaocli auth for user_id/hash recovery and cached read access.",
)
subparsers = parser.add_subparsers(dest="command", required=True)
auth_parser = subparsers.add_parser("auth", help="Recover/cache the working KakaoTalk DB/key tuple.")
add_auth_options(auth_parser)
auth_parser.add_argument("--refresh", action="store_true", help="Ignore cached auth and resolve again.")
auth_parser.add_argument("--format", choices=("text", "json", "shell"), default="text")
for command in READ_ONLY_COMMANDS:
passthrough = subparsers.add_parser(command, help=f"Run kakaocli {command} with cached/recovered auth.")
add_auth_options(passthrough)
passthrough.add_argument("--refresh-auth", action="store_true", help="Refresh cached auth before running.")
return parser
def add_auth_options(parser: argparse.ArgumentParser) -> None:
parser.add_argument("--cache-path", default=str(DEFAULT_CACHE_PATH))
parser.add_argument("--user-id", type=int, help="Explicit Kakao user_id override.")
parser.add_argument("--uuid", help="Explicit device UUID override.")
parser.add_argument("--max-user-id", type=non_negative_int, default=DEFAULT_MAX_USER_ID)
parser.add_argument("--workers", type=positive_int, default=None)
parser.add_argument("--chunk-size", type=positive_int, default=DEFAULT_CHUNK_SIZE)
def non_negative_int(value: str) -> int:
integer = int(value)
if integer < 0:
raise argparse.ArgumentTypeError("must be non-negative")
return integer
def positive_int(value: str) -> int:
integer = int(value)
if integer <= 0:
raise argparse.ArgumentTypeError("must be positive")
return integer
if __name__ == "__main__":
raise SystemExit(main())