mirror of
https://github.com/NomaDamas/k-skill.git
synced 2026-06-24 02:04:11 +00:00
Compare commits
4 commits
main
...
feature/#1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c81b710cae | ||
|
|
5631f58b14 | ||
|
|
0b8609f5e0 | ||
|
|
c5be56b53c |
7 changed files with 1020 additions and 5 deletions
|
|
@ -14,6 +14,8 @@
|
|||
- KakaoTalk for Mac 설치
|
||||
- Homebrew
|
||||
- `brew install silver-flight-group/tap/kakaocli`
|
||||
- `python3` 3.10+
|
||||
- 이 저장소의 helper `scripts/kakaotalk_mac.py`
|
||||
- 터미널 앱에 **Full Disk Access** 와 **Accessibility** 권한 부여
|
||||
|
||||
카카오톡 앱이 없으면 `mas` 로 먼저 설치할 수 있다.
|
||||
|
|
@ -36,15 +38,20 @@ mas install 869223134
|
|||
|
||||
1. KakaoTalk for Mac 과 `kakaocli` 가 설치되어 있는지 확인한다.
|
||||
2. `kakaocli status`, `kakaocli auth` 로 권한과 DB 접근이 되는지 먼저 확인한다.
|
||||
3. 읽기/검색은 JSON 모드로 실행한 뒤 사람이 읽기 쉽게 요약한다.
|
||||
4. 전송은 먼저 `--me` 또는 `--dry-run` 으로 테스트한다.
|
||||
5. 다른 사람에게 보내는 메시지는 항상 최종 확인 후에만 전송한다.
|
||||
3. `user_id` 자동 감지가 실패하면 helper `python3 scripts/kakaotalk_mac.py auth --refresh` 로 복구한다.
|
||||
4. 읽기/검색은 JSON 모드로 실행한 뒤 사람이 읽기 쉽게 요약한다.
|
||||
5. 전송은 먼저 `--me` 또는 `--dry-run` 으로 테스트한다.
|
||||
6. 다른 사람에게 보내는 메시지는 항상 최종 확인 후에만 전송한다.
|
||||
|
||||
## 예시
|
||||
|
||||
```bash
|
||||
kakaocli status
|
||||
kakaocli auth
|
||||
python3 scripts/kakaotalk_mac.py auth --refresh
|
||||
python3 scripts/kakaotalk_mac.py chats --limit 10 --json
|
||||
python3 scripts/kakaotalk_mac.py messages --chat "지수" --since 1d --json
|
||||
python3 scripts/kakaotalk_mac.py search "회의" --json
|
||||
kakaocli chats --limit 10 --json
|
||||
kakaocli messages --chat "지수" --since 1d --json
|
||||
kakaocli search "회의" --json
|
||||
|
|
@ -52,9 +59,26 @@ kakaocli send --me _ "테스트 메시지"
|
|||
kakaocli send --dry-run "팀 공지방" "오늘 3시에 만나요"
|
||||
```
|
||||
|
||||
## helper 가 해결하는 문제
|
||||
|
||||
`kakaocli auth` 실패가 항상 “DB 파일이 없음”을 의미하지는 않는다. 실제 Mac 환경에서는:
|
||||
|
||||
- container 안에 `KakaoTalk.db` 라는 이름 대신 **78자 hex 파일**이 DB 로 존재할 수 있다.
|
||||
- `kakaocli status` 는 정상이어도 `auth` 는 `user_id 자동 감지 실패` 로 끝날 수 있다.
|
||||
- 이 경우 plist 의 `AlertKakaoIDsList` 후보만으로는 부족하고, `DESIGNATEDFRIENDSREVISION:<SHA-512(user_id)>` 에서 실제 `user_id` 를 더 오래 찾아야 할 수 있다.
|
||||
|
||||
helper `scripts/kakaotalk_mac.py` 는 그 얇은 read-only 어댑터 역할을 한다.
|
||||
|
||||
- plist 에서 후보 `user_id` 와 active hash 를 읽는다.
|
||||
- hash recovery 가 필요하면 더 긴 검색으로 실제 `user_id` 를 찾는다.
|
||||
- 검증된 DB 경로와 SQLCipher key 를 `~/.cache/k-skill/kakaotalk-mac-auth.json` 에 캐시한다.
|
||||
- 이후 read-only helper 명령 `chats`, `messages`, `search`, `schema` 를 cached `--db` / `--key` 와 함께 다시 실행한다.
|
||||
|
||||
## 주의할 점
|
||||
|
||||
- **Full Disk Access** 가 없으면 읽기 명령도 실패할 수 있다.
|
||||
- **Accessibility** 가 없으면 전송과 harvest 계열 자동화가 실패한다.
|
||||
- macOS 전용이므로 Windows/Linux 대체 구현으로 넘어가지 않는다.
|
||||
- 다른 사람에게 보내는 메시지는 자동 전송하지 말고 확인을 먼저 받는다.
|
||||
- helper cache 는 로컬 auth material 을 담으므로 본인 장비에서만 보관한다.
|
||||
- 기본 `auth` 텍스트 출력은 key 를 다시 보여주지 않는다. 자동화가 필요할 때만 `--format json` 또는 `--format shell` 을 사용한다.
|
||||
|
|
|
|||
|
|
@ -38,6 +38,8 @@ metadata:
|
|||
- Homebrew
|
||||
- Mac App Store 로그인(`mas` 사용 시)
|
||||
- `kakaocli` 설치
|
||||
- `python3` 3.10+
|
||||
- 이 저장소의 helper `scripts/kakaotalk_mac.py`
|
||||
- 터미널 앱에 **Full Disk Access** 와 **Accessibility** 권한 부여
|
||||
|
||||
## Inputs
|
||||
|
|
@ -99,6 +101,28 @@ kakaocli chats --limit 10 --json
|
|||
|
||||
`auth` 가 성공하면 읽기 경로는 준비된 것이다.
|
||||
|
||||
### 3.5. Use the helper when `kakaocli auth` fails on `user_id` auto-detection
|
||||
|
||||
실제 macOS 환경에서는 `KakaoTalk.db` 라는 literal 파일이 없어도, container 안의 **78자 hex 파일**이 실제 SQLCipher DB 인 경우가 있다. 이때 `kakaocli status` 는 정상인데 `kakaocli auth` 만 실패하는 대표 원인은:
|
||||
|
||||
- `AlertKakaoIDsList` 후보로는 복호화가 안 됨
|
||||
- plist 의 `DESIGNATEDFRIENDSREVISION:<sha512(user_id)>` 만 남아 있음
|
||||
- upstream `kakaocli` 의 기본 `user_id` brute-force 시간이 짧아서 auto-detection 이 실패함
|
||||
|
||||
이 저장소는 그 구간만 보완하는 **read-only helper** 를 함께 제공한다.
|
||||
|
||||
```bash
|
||||
python3 scripts/kakaotalk_mac.py auth --refresh
|
||||
python3 scripts/kakaotalk_mac.py chats --limit 10 --json
|
||||
python3 scripts/kakaotalk_mac.py messages --chat "지수" --since 1d --json
|
||||
python3 scripts/kakaotalk_mac.py search "회의" --json
|
||||
```
|
||||
|
||||
- helper 는 plist 의 `AlertKakaoIDsList` 와 `DESIGNATEDFRIENDSREVISION` hash 를 읽는다.
|
||||
- 후보 `user_id` 가 모두 실패하면 SHA-512 preimage search 로 실제 `user_id` 를 더 오래 찾는다.
|
||||
- 성공한 `user_id`, DB 경로, derived key 는 `~/.cache/k-skill/kakaotalk-mac-auth.json` 에 캐시한다.
|
||||
- 이후 read-only helper 명령(`chats`, `messages`, `search`, `schema`)은 cached `--db` / `--key` 를 붙여 `kakaocli` 를 다시 호출한다.
|
||||
|
||||
### 4. Read or search messages
|
||||
|
||||
```bash
|
||||
|
|
@ -106,6 +130,13 @@ kakaocli messages --chat "지수" --since 1h --json
|
|||
kakaocli search "점심" --json
|
||||
```
|
||||
|
||||
helper 경유 예시:
|
||||
|
||||
```bash
|
||||
python3 scripts/kakaotalk_mac.py messages --chat "지수" --since 1h --json
|
||||
python3 scripts/kakaotalk_mac.py search "점심" --json
|
||||
```
|
||||
|
||||
응답은 가능하면 JSON 모드로 받고, 사람이 읽기 쉽게 다시 요약한다.
|
||||
|
||||
### 5. Use safe testing before real sends
|
||||
|
|
@ -158,6 +189,7 @@ kakaocli login --status
|
|||
- App Store 로그인 누락으로 `mas install` 실패
|
||||
- Full Disk Access 미부여
|
||||
- Accessibility 미부여
|
||||
- `status` 는 정상인데 `auth` 만 실패하는 `user_id` auto-detection / key mismatch 케이스
|
||||
- 채팅방 이름 substring 이 애매해서 잘못된 후보가 여러 개 잡힘
|
||||
|
||||
## Notes
|
||||
|
|
@ -165,3 +197,6 @@ kakaocli login --status
|
|||
- 이 스킬은 macOS 전용이다.
|
||||
- 다른 사람에게 보내는 메시지는 항상 confirm before sending 원칙을 지킨다.
|
||||
- 첫 검증은 `kakaocli status` 와 `kakaocli auth` 부터 시작하는 편이 안전하다.
|
||||
- `kakaocli auth` 가 `User ID: auto-detection failed` 로 멈추면 helper 경로를 우선 사용한다.
|
||||
- helper cache 는 로컬 SQLCipher key 를 포함하므로 본인 계정에서만 유지하고 공유하지 않는다.
|
||||
- 기본 `auth` 텍스트 출력은 key 를 다시 보여주지 않는다. 자동화가 필요할 때만 `--format json` 또는 `--format shell` 을 사용한다.
|
||||
|
|
|
|||
658
kakaotalk-mac/scripts/kakaotalk_mac.py
Normal file
658
kakaotalk-mac/scripts/kakaotalk_mac.py
Normal file
|
|
@ -0,0 +1,658 @@
|
|||
#!/usr/bin/env python3
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import base64
|
||||
import hashlib
|
||||
import json
|
||||
import multiprocessing as mp
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Iterable, Sequence
|
||||
|
||||
|
||||
EMPTY_ACCOUNT_HASH = (
|
||||
"31bca02094eb78126a517b206a88c73cfa9ec6f704c7030d18212cace820f025"
|
||||
"f00bf0ea68dbf3f3a5436ca63b53bf7bf80ad8d5de7d8359d0b7fed9dbc3ab99"
|
||||
)
|
||||
HEX_DATABASE_PATTERN = re.compile(r"^[0-9a-f]{78}(?:\.db)?$")
|
||||
DIRECT_USER_ID_KEYS = ("userId", "user_id", "KAKAO_USER_ID", "userID")
|
||||
DEFAULT_MAX_USER_ID = 1_000_000_000
|
||||
DEFAULT_CHUNK_SIZE = 500_000
|
||||
DEFAULT_CACHE_PATH = Path.home() / ".cache" / "k-skill" / "kakaotalk-mac-auth.json"
|
||||
READ_ONLY_COMMANDS = ("chats", "messages", "search", "schema")
|
||||
|
||||
|
||||
class AuthResolutionError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class DetectionState:
|
||||
uuid: str
|
||||
candidate_user_ids: list[int]
|
||||
active_account_hash: str | None
|
||||
database_files: list[Path]
|
||||
|
||||
|
||||
@dataclass
|
||||
class ResolvedAuth:
|
||||
user_id: int
|
||||
uuid: str
|
||||
database_path: Path
|
||||
database_name: str
|
||||
key: str
|
||||
source: str
|
||||
|
||||
|
||||
def parse_plist_xml(xml_text: str) -> Any:
|
||||
tokens = tokenize_plist_xml(xml_text)
|
||||
if not tokens:
|
||||
raise AuthResolutionError("plist XML was empty")
|
||||
index = 0
|
||||
if tokens[index] != ("start", "plist"):
|
||||
raise AuthResolutionError("plist XML did not start with <plist>")
|
||||
value, index = _parse_plist_tokens(tokens, index + 1)
|
||||
if tokens[index] != ("end", "plist"):
|
||||
raise AuthResolutionError("plist XML did not end with </plist>")
|
||||
return value
|
||||
|
||||
|
||||
def tokenize_plist_xml(xml_text: str) -> list[tuple[str, str]]:
|
||||
normalized = re.sub(r"<\?xml[^>]*\?>", "", xml_text)
|
||||
normalized = re.sub(r"<!DOCTYPE[^>]*>", "", normalized)
|
||||
normalized = re.sub(r"<([A-Za-z0-9]+)\s*/>", r"<\1></\1>", normalized)
|
||||
normalized = (
|
||||
normalized.replace("\r", "")
|
||||
)
|
||||
tokens: list[tuple[str, str]] = []
|
||||
position = 0
|
||||
for match in re.finditer(r"<(/?)([A-Za-z0-9]+)(?: [^>]*)?>", normalized):
|
||||
text = normalized[position : match.start()]
|
||||
stripped = _unescape_xml(text).strip()
|
||||
if stripped:
|
||||
tokens.append(("text", stripped))
|
||||
token_type = "end" if match.group(1) else "start"
|
||||
tokens.append((token_type, match.group(2)))
|
||||
position = match.end()
|
||||
trailing = _unescape_xml(normalized[position:]).strip()
|
||||
if trailing:
|
||||
tokens.append(("text", trailing))
|
||||
return [token for token in tokens if token[0] != "text" or token[1]]
|
||||
|
||||
|
||||
def _parse_plist_tokens(tokens: list[tuple[str, str]], index: int) -> tuple[Any, int]:
|
||||
token_type, tag = tokens[index]
|
||||
if token_type != "start":
|
||||
raise AuthResolutionError(f"Unexpected token {tokens[index]!r}")
|
||||
|
||||
if tag == "dict":
|
||||
result: dict[str, Any] = {}
|
||||
index += 1
|
||||
while tokens[index] != ("end", "dict"):
|
||||
if tokens[index] != ("start", "key"):
|
||||
raise AuthResolutionError(f"Expected dict key, got {tokens[index]!r}")
|
||||
key, index = _parse_scalar(tokens, index, "key", lambda value: value)
|
||||
value, index = _parse_plist_tokens(tokens, index)
|
||||
result[key] = value
|
||||
return result, index + 1
|
||||
|
||||
if tag == "array":
|
||||
items: list[Any] = []
|
||||
index += 1
|
||||
while tokens[index] != ("end", "array"):
|
||||
value, index = _parse_plist_tokens(tokens, index)
|
||||
items.append(value)
|
||||
return items, index + 1
|
||||
|
||||
if tag == "integer":
|
||||
return _parse_scalar(tokens, index, "integer", int)
|
||||
if tag == "real":
|
||||
return _parse_scalar(tokens, index, "real", float)
|
||||
if tag == "string":
|
||||
return _parse_scalar(tokens, index, "string", lambda value: value)
|
||||
if tag == "date":
|
||||
return _parse_scalar(tokens, index, "date", lambda value: value)
|
||||
if tag == "data":
|
||||
return _parse_scalar(tokens, index, "data", lambda value: value)
|
||||
if tag == "true":
|
||||
return True, index + 2
|
||||
if tag == "false":
|
||||
return False, index + 2
|
||||
raise AuthResolutionError(f"Unsupported plist tag: {tag}")
|
||||
|
||||
|
||||
def _parse_scalar(
|
||||
tokens: list[tuple[str, str]],
|
||||
index: int,
|
||||
tag: str,
|
||||
caster: Callable[[str], Any],
|
||||
) -> tuple[Any, int]:
|
||||
if tokens[index] != ("start", tag):
|
||||
raise AuthResolutionError(f"Expected <{tag}>, got {tokens[index]!r}")
|
||||
text = ""
|
||||
index += 1
|
||||
if tokens[index][0] == "text":
|
||||
text = tokens[index][1]
|
||||
index += 1
|
||||
if tokens[index] != ("end", tag):
|
||||
raise AuthResolutionError(f"Expected </{tag}>, got {tokens[index]!r}")
|
||||
return caster(text), index + 1
|
||||
|
||||
|
||||
def _unescape_xml(text: str) -> str:
|
||||
return (
|
||||
text.replace("<", "<")
|
||||
.replace(">", ">")
|
||||
.replace("&", "&")
|
||||
.replace(""", '"')
|
||||
.replace("'", "'")
|
||||
)
|
||||
|
||||
|
||||
def collect_candidate_user_ids(plist_data: dict[str, Any]) -> list[int]:
|
||||
candidates: list[int] = []
|
||||
for key in DIRECT_USER_ID_KEYS:
|
||||
value = plist_data.get(key)
|
||||
if isinstance(value, int) and value > 0:
|
||||
candidates.append(value)
|
||||
elif isinstance(value, str) and value.isdigit():
|
||||
candidates.append(int(value))
|
||||
|
||||
alert_ids = plist_data.get("AlertKakaoIDsList", [])
|
||||
if isinstance(alert_ids, list):
|
||||
for item in alert_ids:
|
||||
if isinstance(item, int) and item > 0:
|
||||
candidates.append(item)
|
||||
elif isinstance(item, str) and item.isdigit():
|
||||
numeric = int(item)
|
||||
if numeric > 0:
|
||||
candidates.append(numeric)
|
||||
|
||||
return unique_ints(candidates)
|
||||
|
||||
|
||||
def find_active_account_hash(plist_data: dict[str, Any]) -> str | None:
|
||||
prefix = "DESIGNATEDFRIENDSREVISION:"
|
||||
for key, value in plist_data.items():
|
||||
if not key.startswith(prefix):
|
||||
continue
|
||||
hash_hex = key[len(prefix) :]
|
||||
if hash_hex == EMPTY_ACCOUNT_HASH:
|
||||
continue
|
||||
if not re.fullmatch(r"[0-9a-f]{128}", hash_hex):
|
||||
continue
|
||||
numeric_value = 0
|
||||
if isinstance(value, (int, float)):
|
||||
numeric_value = int(value)
|
||||
elif isinstance(value, str) and value.isdigit():
|
||||
numeric_value = int(value)
|
||||
if numeric_value != 0:
|
||||
return hash_hex
|
||||
return None
|
||||
|
||||
|
||||
def discover_database_files(container_path: Path) -> list[Path]:
|
||||
if not container_path.exists():
|
||||
return []
|
||||
return sorted(
|
||||
[path for path in container_path.iterdir() if path.is_file() and HEX_DATABASE_PATTERN.fullmatch(path.name)],
|
||||
key=lambda item: item.name,
|
||||
)
|
||||
|
||||
|
||||
def unique_ints(values: Iterable[int]) -> list[int]:
|
||||
seen: set[int] = set()
|
||||
ordered: list[int] = []
|
||||
for value in values:
|
||||
if value not in seen:
|
||||
seen.add(value)
|
||||
ordered.append(value)
|
||||
return ordered
|
||||
|
||||
|
||||
def recover_user_id_from_sha512(
|
||||
hex_hash: str,
|
||||
*,
|
||||
max_user_id: int = DEFAULT_MAX_USER_ID,
|
||||
workers: int | None = None,
|
||||
chunk_size: int = DEFAULT_CHUNK_SIZE,
|
||||
) -> int | None:
|
||||
if not re.fullmatch(r"[0-9a-f]{128}", hex_hash):
|
||||
raise ValueError("expected 128-char lowercase sha512 hex digest")
|
||||
if max_user_id < 0:
|
||||
raise ValueError("max_user_id must be non-negative")
|
||||
|
||||
normalized_workers = max(1, workers or (os.cpu_count() or 1))
|
||||
if normalized_workers == 1:
|
||||
return _scan_user_id_range((0, max_user_id + 1, hex_hash))
|
||||
|
||||
start_method = "fork" if "fork" in mp.get_all_start_methods() else mp.get_start_method()
|
||||
ctx = mp.get_context(start_method)
|
||||
job_iter = (
|
||||
(start, min(start + chunk_size, max_user_id + 1), hex_hash)
|
||||
for start in range(0, max_user_id + 1, chunk_size)
|
||||
)
|
||||
with ctx.Pool(processes=normalized_workers) as pool:
|
||||
for result in pool.imap_unordered(_scan_user_id_range, job_iter, chunksize=1):
|
||||
if result is not None:
|
||||
pool.terminate()
|
||||
return result
|
||||
return None
|
||||
|
||||
|
||||
def _scan_user_id_range(job: tuple[int, int, str]) -> int | None:
|
||||
start, end, hex_hash = job
|
||||
target = hex_hash.encode("ascii")
|
||||
for user_id in range(start, end):
|
||||
if hashlib.sha512(str(user_id).encode("utf-8")).hexdigest().encode("ascii") == target:
|
||||
return user_id
|
||||
return None
|
||||
|
||||
|
||||
def database_name(user_id: int, uuid: str) -> str:
|
||||
hawawa = ".".join([".", "F", str(user_id), "A", "F", uuid[::-1], ".", "|"])
|
||||
salt = hashed_device_uuid(uuid)[::-1].encode("utf-8")
|
||||
derived = hashlib.pbkdf2_hmac("sha256", hawawa.encode("utf-8"), salt, 100_000, 128)
|
||||
hex_value = derived.hex()
|
||||
return hex_value[28 : 28 + 78]
|
||||
|
||||
|
||||
def secure_key(user_id: int, uuid: str) -> str:
|
||||
hashed = hashed_device_uuid(uuid)
|
||||
parts = ["A", hashed, "|", "F", uuid[:5], "H", str(user_id), "|", uuid[7:]]
|
||||
hawawa = "F".join(parts)[::-1].encode("utf-8")
|
||||
salt = uuid[int(len(uuid) * 0.3) :].encode("utf-8")
|
||||
return hashlib.pbkdf2_hmac("sha256", hawawa, salt, 100_000, 128).hex()
|
||||
|
||||
|
||||
def hashed_device_uuid(uuid: str) -> str:
|
||||
uuid_bytes = uuid.encode("utf-8")
|
||||
combined = hashlib.sha1(uuid_bytes).digest() + hashlib.sha256(uuid_bytes).digest()
|
||||
return base64.b64encode(combined).decode("ascii")
|
||||
|
||||
|
||||
def resolve_auth_state(
|
||||
state: DetectionState,
|
||||
*,
|
||||
verify_access: Callable[[ResolvedAuth], bool],
|
||||
cache_path: Path | None = None,
|
||||
user_id_override: int | None = None,
|
||||
max_user_id: int = DEFAULT_MAX_USER_ID,
|
||||
workers: int | None = None,
|
||||
chunk_size: int = DEFAULT_CHUNK_SIZE,
|
||||
) -> ResolvedAuth:
|
||||
if not state.database_files:
|
||||
raise AuthResolutionError("No KakaoTalk database files were discovered in the container path.")
|
||||
|
||||
candidates = list(state.candidate_user_ids)
|
||||
if user_id_override is not None:
|
||||
candidates = [user_id_override, *candidates]
|
||||
candidates = unique_ints(candidates)
|
||||
|
||||
for user_id in candidates:
|
||||
resolved = _try_resolved_auth(user_id, "candidate", state, verify_access)
|
||||
if resolved is not None:
|
||||
persist_auth_cache(resolved, cache_path)
|
||||
return resolved
|
||||
|
||||
if state.active_account_hash:
|
||||
recovered = recover_user_id_from_sha512(
|
||||
state.active_account_hash,
|
||||
max_user_id=max_user_id,
|
||||
workers=workers,
|
||||
chunk_size=chunk_size,
|
||||
)
|
||||
if recovered is not None and recovered not in candidates:
|
||||
resolved = _try_resolved_auth(recovered, "hash-recovery", state, verify_access)
|
||||
if resolved is not None:
|
||||
persist_auth_cache(resolved, cache_path)
|
||||
return resolved
|
||||
|
||||
raise AuthResolutionError(
|
||||
"Failed to resolve a working KakaoTalk auth key. "
|
||||
"Try a larger --max-user-id or pass --user-id explicitly."
|
||||
)
|
||||
|
||||
|
||||
def _try_resolved_auth(
|
||||
user_id: int,
|
||||
source: str,
|
||||
state: DetectionState,
|
||||
verify_access: Callable[[ResolvedAuth], bool],
|
||||
) -> ResolvedAuth | None:
|
||||
derived_name = database_name(user_id, state.uuid)
|
||||
key = secure_key(user_id, state.uuid)
|
||||
for database_path in prioritized_database_paths(state.database_files, derived_name):
|
||||
resolved = ResolvedAuth(
|
||||
user_id=user_id,
|
||||
uuid=state.uuid,
|
||||
database_path=database_path,
|
||||
database_name=derived_name,
|
||||
key=key,
|
||||
source=source,
|
||||
)
|
||||
if verify_access(resolved):
|
||||
return resolved
|
||||
return None
|
||||
|
||||
|
||||
def prioritized_database_paths(database_files: Sequence[Path], derived_name: str) -> list[Path]:
|
||||
preferred_names = {derived_name, f"{derived_name}.db"}
|
||||
preferred = [path for path in database_files if path.name in preferred_names]
|
||||
fallback = [path for path in database_files if path.name not in preferred_names]
|
||||
return [*preferred, *fallback]
|
||||
|
||||
|
||||
def load_cached_auth(cache_path: Path) -> ResolvedAuth | None:
|
||||
if not cache_path.exists():
|
||||
return None
|
||||
try:
|
||||
payload = json.loads(cache_path.read_text(encoding="utf-8"))
|
||||
database_path = Path(payload["database_path"]).expanduser()
|
||||
user_id = int(payload["user_id"])
|
||||
uuid = str(payload["uuid"])
|
||||
database_name = str(payload["database_name"])
|
||||
key = str(payload["key"])
|
||||
source = str(payload.get("source", "cache"))
|
||||
except (OSError, json.JSONDecodeError, KeyError, TypeError, ValueError):
|
||||
return None
|
||||
|
||||
if user_id <= 0 or not uuid or not database_name or not key or not database_path.exists():
|
||||
return None
|
||||
|
||||
return ResolvedAuth(
|
||||
user_id=user_id,
|
||||
uuid=uuid,
|
||||
database_path=database_path,
|
||||
database_name=database_name,
|
||||
key=key,
|
||||
source=source,
|
||||
)
|
||||
|
||||
|
||||
def persist_auth_cache(resolved: ResolvedAuth, cache_path: Path | None) -> None:
|
||||
if cache_path is None:
|
||||
return
|
||||
cache_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
payload = {
|
||||
"user_id": resolved.user_id,
|
||||
"uuid": resolved.uuid,
|
||||
"database_path": str(resolved.database_path),
|
||||
"database_name": resolved.database_name,
|
||||
"key": resolved.key,
|
||||
"source": resolved.source,
|
||||
}
|
||||
cache_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
|
||||
try:
|
||||
os.chmod(cache_path, 0o600)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def platform_uuid() -> str:
|
||||
result = run_command(["/usr/sbin/ioreg", "-rd1", "-c", "IOPlatformExpertDevice"], check=True)
|
||||
match = re.search(r'"IOPlatformUUID" = "([0-9A-F-]+)"', result.stdout)
|
||||
if not match:
|
||||
raise AuthResolutionError("Could not read IOPlatformUUID from ioreg output.")
|
||||
return match.group(1)
|
||||
|
||||
|
||||
def convert_plist_to_xml(plist_path: Path) -> str:
|
||||
result = run_command(["/usr/bin/plutil", "-convert", "xml1", "-o", "-", str(plist_path)], check=True)
|
||||
return result.stdout
|
||||
|
||||
|
||||
def read_plist_snapshot(plist_path: Path) -> dict[str, Any]:
|
||||
return parse_plist_xml(convert_plist_to_xml(plist_path))
|
||||
|
||||
|
||||
def collect_detection_state(uuid_override: str | None = None) -> DetectionState:
|
||||
uuid = uuid_override or platform_uuid()
|
||||
snapshots = []
|
||||
for plist_path in preference_paths():
|
||||
if plist_path.exists():
|
||||
snapshots.append(read_plist_snapshot(plist_path))
|
||||
|
||||
candidate_user_ids: list[int] = []
|
||||
active_account_hash: str | None = None
|
||||
for snapshot in snapshots:
|
||||
candidate_user_ids.extend(collect_candidate_user_ids(snapshot))
|
||||
if active_account_hash is None:
|
||||
active_account_hash = find_active_account_hash(snapshot)
|
||||
|
||||
return DetectionState(
|
||||
uuid=uuid,
|
||||
candidate_user_ids=unique_ints(candidate_user_ids),
|
||||
active_account_hash=active_account_hash,
|
||||
database_files=discover_database_files(container_path()),
|
||||
)
|
||||
|
||||
|
||||
def preference_paths() -> list[Path]:
|
||||
pref_dir = (
|
||||
Path.home()
|
||||
/ "Library"
|
||||
/ "Containers"
|
||||
/ "com.kakao.KakaoTalkMac"
|
||||
/ "Data"
|
||||
/ "Library"
|
||||
/ "Preferences"
|
||||
)
|
||||
paths = sorted(pref_dir.glob("com.kakao.KakaoTalkMac*.plist"))
|
||||
global_pref = Path.home() / "Library" / "Preferences" / "com.kakao.KakaoTalkMac.plist"
|
||||
if global_pref.exists():
|
||||
paths.append(global_pref)
|
||||
deduped: list[Path] = []
|
||||
seen: set[Path] = set()
|
||||
for path in paths:
|
||||
if path not in seen:
|
||||
seen.add(path)
|
||||
deduped.append(path)
|
||||
return deduped
|
||||
|
||||
|
||||
def container_path() -> Path:
|
||||
return (
|
||||
Path.home()
|
||||
/ "Library"
|
||||
/ "Containers"
|
||||
/ "com.kakao.KakaoTalkMac"
|
||||
/ "Data"
|
||||
/ "Library"
|
||||
/ "Application Support"
|
||||
/ "com.kakao.KakaoTalkMac"
|
||||
)
|
||||
|
||||
|
||||
def verify_database_access(resolved: ResolvedAuth) -> bool:
|
||||
result = run_command(
|
||||
[
|
||||
"kakaocli",
|
||||
"query",
|
||||
"SELECT count(*) FROM sqlite_master",
|
||||
"--db",
|
||||
str(resolved.database_path),
|
||||
"--key",
|
||||
resolved.key,
|
||||
],
|
||||
check=False,
|
||||
)
|
||||
return result.returncode == 0
|
||||
|
||||
|
||||
def run_command(args: Sequence[str], *, check: bool) -> subprocess.CompletedProcess[str]:
|
||||
result = subprocess.run(args, capture_output=True, text=True, check=False)
|
||||
if check and result.returncode != 0:
|
||||
raise AuthResolutionError(result.stderr.strip() or result.stdout.strip() or f"command failed: {' '.join(args)}")
|
||||
return result
|
||||
|
||||
|
||||
def resolve_auth(
|
||||
*,
|
||||
refresh: bool,
|
||||
cache_path: Path,
|
||||
user_id_override: int | None,
|
||||
uuid_override: str | None,
|
||||
max_user_id: int,
|
||||
workers: int | None,
|
||||
chunk_size: int,
|
||||
) -> ResolvedAuth:
|
||||
use_cache = not refresh and user_id_override is None and uuid_override is None
|
||||
if use_cache:
|
||||
cached = load_cached_auth(cache_path)
|
||||
if cached is not None:
|
||||
return cached
|
||||
|
||||
state = collect_detection_state(uuid_override)
|
||||
return resolve_auth_state(
|
||||
state,
|
||||
verify_access=verify_database_access,
|
||||
cache_path=cache_path,
|
||||
user_id_override=user_id_override,
|
||||
max_user_id=max_user_id,
|
||||
workers=workers,
|
||||
chunk_size=chunk_size,
|
||||
)
|
||||
|
||||
|
||||
def render_auth(resolved: ResolvedAuth, *, output_format: str, cache_path: Path) -> str:
|
||||
payload = {
|
||||
"user_id": resolved.user_id,
|
||||
"uuid": resolved.uuid,
|
||||
"database_path": str(resolved.database_path),
|
||||
"database_name": resolved.database_name,
|
||||
"key": resolved.key,
|
||||
"source": resolved.source,
|
||||
"cache_path": str(cache_path),
|
||||
}
|
||||
if output_format == "json":
|
||||
return json.dumps(payload, ensure_ascii=False, indent=2)
|
||||
if output_format == "shell":
|
||||
return "\n".join(
|
||||
[
|
||||
f"export KSKILL_KAKAOTALK_USER_ID='{resolved.user_id}'",
|
||||
f"export KSKILL_KAKAOTALK_UUID='{resolved.uuid}'",
|
||||
f"export KSKILL_KAKAOTALK_DB='{resolved.database_path}'",
|
||||
f"export KSKILL_KAKAOTALK_KEY='{resolved.key}'",
|
||||
f"export KSKILL_KAKAOTALK_AUTH_CACHE='{cache_path}'",
|
||||
]
|
||||
)
|
||||
return "\n".join(
|
||||
[
|
||||
"KakaoTalk auth resolved",
|
||||
f"- user_id: {resolved.user_id}",
|
||||
f"- uuid: {resolved.uuid}",
|
||||
f"- database: {resolved.database_path}",
|
||||
f"- source: {resolved.source}",
|
||||
f"- cache: {cache_path}",
|
||||
"- secrets: redacted in text output (use --format json or --format shell only when automation needs them)",
|
||||
"",
|
||||
"You can now run:",
|
||||
" python3 scripts/kakaotalk_mac.py chats --limit 10 --json",
|
||||
" python3 scripts/kakaotalk_mac.py messages --chat \"채팅방 이름\" --since 1d --json",
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def build_passthrough_command(command: str, auth: ResolvedAuth, forwarded_args: Sequence[str]) -> list[str]:
|
||||
if command not in READ_ONLY_COMMANDS:
|
||||
raise AuthResolutionError(
|
||||
f"Unsupported command '{command}'. Allowed read-only commands: {', '.join(READ_ONLY_COMMANDS)}"
|
||||
)
|
||||
return [
|
||||
"kakaocli",
|
||||
command,
|
||||
*forwarded_args,
|
||||
"--db",
|
||||
str(auth.database_path),
|
||||
"--key",
|
||||
auth.key,
|
||||
]
|
||||
|
||||
|
||||
def main(argv: Sequence[str] | None = None) -> int:
|
||||
parser = build_parser()
|
||||
|
||||
try:
|
||||
args, forwarded_args = parser.parse_known_args(argv)
|
||||
cache_path = Path(args.cache_path).expanduser()
|
||||
if args.command == "auth":
|
||||
if forwarded_args:
|
||||
raise AuthResolutionError(f"Unexpected auth arguments: {' '.join(forwarded_args)}")
|
||||
resolved = resolve_auth(
|
||||
refresh=args.refresh,
|
||||
cache_path=cache_path,
|
||||
user_id_override=args.user_id,
|
||||
uuid_override=args.uuid,
|
||||
max_user_id=args.max_user_id,
|
||||
workers=args.workers,
|
||||
chunk_size=args.chunk_size,
|
||||
)
|
||||
print(render_auth(resolved, output_format=args.format, cache_path=cache_path))
|
||||
return 0
|
||||
|
||||
resolved = resolve_auth(
|
||||
refresh=args.refresh_auth,
|
||||
cache_path=cache_path,
|
||||
user_id_override=args.user_id,
|
||||
uuid_override=args.uuid,
|
||||
max_user_id=args.max_user_id,
|
||||
workers=args.workers,
|
||||
chunk_size=args.chunk_size,
|
||||
)
|
||||
result = subprocess.run(build_passthrough_command(args.command, resolved, forwarded_args))
|
||||
return result.returncode
|
||||
except (AuthResolutionError, ValueError) as exc:
|
||||
print(f"error: {exc}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
|
||||
def build_parser() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Thin k-skill adapter around kakaocli auth for user_id/hash recovery and cached read access.",
|
||||
)
|
||||
subparsers = parser.add_subparsers(dest="command", required=True)
|
||||
|
||||
auth_parser = subparsers.add_parser("auth", help="Recover/cache the working KakaoTalk DB/key tuple.")
|
||||
add_auth_options(auth_parser)
|
||||
auth_parser.add_argument("--refresh", action="store_true", help="Ignore cached auth and resolve again.")
|
||||
auth_parser.add_argument("--format", choices=("text", "json", "shell"), default="text")
|
||||
|
||||
for command in READ_ONLY_COMMANDS:
|
||||
passthrough = subparsers.add_parser(command, help=f"Run kakaocli {command} with cached/recovered auth.")
|
||||
add_auth_options(passthrough)
|
||||
passthrough.add_argument("--refresh-auth", action="store_true", help="Refresh cached auth before running.")
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
def add_auth_options(parser: argparse.ArgumentParser) -> None:
|
||||
parser.add_argument("--cache-path", default=str(DEFAULT_CACHE_PATH))
|
||||
parser.add_argument("--user-id", type=int, help="Explicit Kakao user_id override.")
|
||||
parser.add_argument("--uuid", help="Explicit device UUID override.")
|
||||
parser.add_argument("--max-user-id", type=non_negative_int, default=DEFAULT_MAX_USER_ID)
|
||||
parser.add_argument("--workers", type=positive_int, default=None)
|
||||
parser.add_argument("--chunk-size", type=positive_int, default=DEFAULT_CHUNK_SIZE)
|
||||
|
||||
|
||||
def non_negative_int(value: str) -> int:
|
||||
integer = int(value)
|
||||
if integer < 0:
|
||||
raise argparse.ArgumentTypeError("must be non-negative")
|
||||
return integer
|
||||
|
||||
|
||||
def positive_int(value: str) -> int:
|
||||
integer = int(value)
|
||||
if integer <= 0:
|
||||
raise argparse.ArgumentTypeError("must be positive")
|
||||
return integer
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
|
|
@ -9,9 +9,9 @@
|
|||
],
|
||||
"scripts": {
|
||||
"build": "npm run build --workspaces --if-present",
|
||||
"lint": "node --check scripts/skill-docs.test.js scripts/korean_character_count.js scripts/test_korean_character_count.js && python3 -m py_compile scripts/fine_dust.py scripts/test_fine_dust.py scripts/ktx_booking.py scripts/test_ktx_booking.py scripts/sillok_search.py scripts/test_sillok_search.py scripts/korean_spell_check.py scripts/test_korean_spell_check.py scripts/patent_search.py scripts/test_patent_search.py scripts/mfds_drug_safety.py scripts/test_mfds_drug_safety.py scripts/mfds_food_safety.py scripts/test_mfds_food_safety.py scripts/zipcode_search.py scripts/test_zipcode_search.py scripts/subway_lost_property.py scripts/test_subway_lost_property.py scripts/geeknews_search.py scripts/test_geeknews_search.py scripts/test_naver_blog_search.py naver-blog-research/scripts/_naver_http.py naver-blog-research/scripts/naver_search.py naver-blog-research/scripts/naver_read.py naver-blog-research/scripts/naver_download_images.py korean-scholarship-search/scripts/scholarship_filter.py korean-scholarship-search/scripts/test_scholarship_filter.py korean-scholarship-search/scripts/university_search_plan.py && npm run lint --workspaces --if-present && ./scripts/validate-skills.sh",
|
||||
"lint": "node --check scripts/skill-docs.test.js scripts/korean_character_count.js scripts/test_korean_character_count.js && python3 -m py_compile scripts/fine_dust.py scripts/test_fine_dust.py scripts/ktx_booking.py scripts/test_ktx_booking.py scripts/sillok_search.py scripts/test_sillok_search.py scripts/korean_spell_check.py scripts/test_korean_spell_check.py scripts/patent_search.py scripts/test_patent_search.py scripts/mfds_drug_safety.py scripts/test_mfds_drug_safety.py scripts/mfds_food_safety.py scripts/test_mfds_food_safety.py scripts/zipcode_search.py scripts/test_zipcode_search.py scripts/subway_lost_property.py scripts/test_subway_lost_property.py scripts/geeknews_search.py scripts/test_geeknews_search.py scripts/test_naver_blog_search.py scripts/kakaotalk_mac.py scripts/test_kakaotalk_mac.py kakaotalk-mac/scripts/kakaotalk_mac.py naver-blog-research/scripts/_naver_http.py naver-blog-research/scripts/naver_search.py naver-blog-research/scripts/naver_read.py naver-blog-research/scripts/naver_download_images.py korean-scholarship-search/scripts/scholarship_filter.py korean-scholarship-search/scripts/test_scholarship_filter.py korean-scholarship-search/scripts/university_search_plan.py && npm run lint --workspaces --if-present && ./scripts/validate-skills.sh",
|
||||
"typecheck": "tsc --noEmit",
|
||||
"test": "node --test scripts/skill-docs.test.js scripts/test_korean_character_count.js && PYTHONPATH=.:scripts python3 -m unittest scripts.test_fine_dust scripts.test_ktx_booking scripts.test_sillok_search scripts.test_korean_spell_check scripts.test_patent_search scripts.test_mfds_drug_safety scripts.test_mfds_food_safety scripts.test_zipcode_search scripts.test_subway_lost_property scripts.test_geeknews_search scripts.test_naver_blog_search && PYTHONPATH=.:scripts:korean-scholarship-search/scripts python3 -m unittest discover -s korean-scholarship-search/scripts -p 'test_scholarship_filter.py' && npm run test --workspaces --if-present && ./scripts/validate-skills.sh",
|
||||
"test": "node --test scripts/skill-docs.test.js scripts/test_korean_character_count.js && PYTHONPATH=.:scripts python3 -m unittest scripts.test_fine_dust scripts.test_ktx_booking scripts.test_sillok_search scripts.test_korean_spell_check scripts.test_patent_search scripts.test_mfds_drug_safety scripts.test_mfds_food_safety scripts.test_zipcode_search scripts.test_subway_lost_property scripts.test_geeknews_search scripts.test_naver_blog_search scripts.test_kakaotalk_mac && PYTHONPATH=.:scripts:korean-scholarship-search/scripts python3 -m unittest discover -s korean-scholarship-search/scripts -p 'test_scholarship_filter.py' && npm run test --workspaces --if-present && ./scripts/validate-skills.sh",
|
||||
"pack:dry-run": "npm pack --workspace k-lotto --dry-run && npm pack --workspace daiso-product-search --dry-run && npm pack --workspace market-kurly-search --dry-run && npm pack --workspace blue-ribbon-nearby --dry-run && npm pack --workspace kakao-bar-nearby --dry-run && npm pack --workspace cheap-gas-nearby --dry-run && npm pack --workspace public-restroom-nearby --dry-run && npm pack --workspace kleague-results --dry-run && npm pack --workspace lck-analytics --dry-run && npm pack --workspace toss-securities --dry-run && npm pack --workspace hipass-receipt --dry-run && npm pack --workspace used-car-price-search --dry-run",
|
||||
|
||||
"ci": "npm run lint && npm run typecheck && npm run test && npm run pack:dry-run",
|
||||
|
|
|
|||
11
scripts/kakaotalk_mac.py
Normal file
11
scripts/kakaotalk_mac.py
Normal file
|
|
@ -0,0 +1,11 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
_BUNDLED_HELPER = Path(__file__).resolve().parent.parent / "kakaotalk-mac" / "scripts" / "kakaotalk_mac.py"
|
||||
|
||||
if not _BUNDLED_HELPER.exists(): # pragma: no cover - defensive import guard
|
||||
raise FileNotFoundError(f"Bundled KakaoTalk helper not found: {_BUNDLED_HELPER}")
|
||||
|
||||
exec(compile(_BUNDLED_HELPER.read_text(encoding="utf-8"), str(_BUNDLED_HELPER), "exec"), globals())
|
||||
|
|
@ -499,8 +499,11 @@ test("korea-weather docs route short-term forecast calls through the proxy witho
|
|||
|
||||
test("kakaotalk-mac skill documents safe macOS kakaocli usage", () => {
|
||||
const skillPath = path.join(repoRoot, "kakaotalk-mac", "SKILL.md");
|
||||
const helperPath = path.join(repoRoot, "scripts", "kakaotalk_mac.py");
|
||||
const featureDoc = read(path.join("docs", "features", "kakaotalk-mac.md"));
|
||||
|
||||
assert.ok(fs.existsSync(skillPath), "expected kakaotalk-mac/SKILL.md to exist");
|
||||
assert.ok(fs.existsSync(helperPath), "expected scripts/kakaotalk_mac.py to exist");
|
||||
|
||||
const skill = read(path.join("kakaotalk-mac", "SKILL.md"));
|
||||
|
||||
|
|
@ -512,6 +515,17 @@ test("kakaotalk-mac skill documents safe macOS kakaocli usage", () => {
|
|||
assert.match(skill, /Accessibility/i);
|
||||
assert.match(skill, /--me/);
|
||||
assert.match(skill, /confirm before sending/i);
|
||||
|
||||
for (const doc of [skill, featureDoc]) {
|
||||
assert.match(doc, /python3 scripts\/kakaotalk_mac\.py auth/);
|
||||
assert.match(doc, /python3 scripts\/kakaotalk_mac\.py chats --limit 10 --json/);
|
||||
assert.match(doc, /python3 scripts\/kakaotalk_mac\.py messages --chat/);
|
||||
assert.match(doc, /python3 scripts\/kakaotalk_mac\.py search/);
|
||||
assert.match(doc, /user_id 자동 감지 실패|SHA-512|DESIGNATEDFRIENDSREVISION/i);
|
||||
assert.match(doc, /cache|캐시/);
|
||||
assert.match(doc, /read-only|읽기 전용/i);
|
||||
assert.doesNotMatch(doc, /`query`/);
|
||||
}
|
||||
});
|
||||
|
||||
test("repository docs advertise the KTX booking skill as supported", () => {
|
||||
|
|
|
|||
273
scripts/test_kakaotalk_mac.py
Normal file
273
scripts/test_kakaotalk_mac.py
Normal file
|
|
@ -0,0 +1,273 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import io
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from unittest import mock
|
||||
|
||||
import scripts.kakaotalk_mac as kakaotalk_mac
|
||||
|
||||
|
||||
def sha512_hex(value: int) -> str:
|
||||
return hashlib.sha512(str(value).encode("utf-8")).hexdigest()
|
||||
|
||||
|
||||
def make_resolved_auth(
|
||||
*,
|
||||
user_id: int = 123,
|
||||
uuid: str = "uuid",
|
||||
database_path: Path | None = None,
|
||||
database_name: str = "db-name",
|
||||
key: str = "super-secret",
|
||||
source: str = "cache",
|
||||
) -> kakaotalk_mac.ResolvedAuth:
|
||||
return kakaotalk_mac.ResolvedAuth(
|
||||
user_id=user_id,
|
||||
uuid=uuid,
|
||||
database_path=database_path or Path("/tmp/kakaotalk.db"),
|
||||
database_name=database_name,
|
||||
key=key,
|
||||
source=source,
|
||||
)
|
||||
|
||||
|
||||
class KakaoTalkMacHelperTests(unittest.TestCase):
|
||||
def test_parse_plist_xml_extracts_candidates_and_active_hash(self) -> None:
|
||||
active_hash = sha512_hex(123456)
|
||||
xml_text = f"""<?xml version="1.0" encoding="UTF-8"?>
|
||||
<plist version="1.0">
|
||||
<dict>
|
||||
<key>AlertKakaoIDsList</key>
|
||||
<array>
|
||||
<integer>111</integer>
|
||||
<integer>222</integer>
|
||||
</array>
|
||||
<key>userId</key>
|
||||
<integer>333</integer>
|
||||
<key>DESIGNATEDFRIENDSREVISION:{active_hash}</key>
|
||||
<integer>5</integer>
|
||||
</dict>
|
||||
</plist>
|
||||
"""
|
||||
|
||||
parsed = kakaotalk_mac.parse_plist_xml(xml_text)
|
||||
|
||||
self.assertEqual(parsed["AlertKakaoIDsList"], [111, 222])
|
||||
self.assertEqual(kakaotalk_mac.collect_candidate_user_ids(parsed), [333, 111, 222])
|
||||
self.assertEqual(kakaotalk_mac.find_active_account_hash(parsed), active_hash)
|
||||
|
||||
def test_discover_database_files_filters_hex_names(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tempdir:
|
||||
root = Path(tempdir)
|
||||
expected = [
|
||||
root / ("a" * 78),
|
||||
root / ("b" * 78 + ".db"),
|
||||
]
|
||||
for path in expected:
|
||||
path.write_text("", encoding="utf-8")
|
||||
(root / ("c" * 40)).write_text("", encoding="utf-8")
|
||||
(root / ("d" * 78 + "-wal")).write_text("", encoding="utf-8")
|
||||
|
||||
discovered = kakaotalk_mac.discover_database_files(root)
|
||||
|
||||
self.assertEqual(discovered, expected)
|
||||
|
||||
def test_recover_user_id_from_sha512_supports_single_worker_search(self) -> None:
|
||||
target_user_id = 123456
|
||||
recovered = kakaotalk_mac.recover_user_id_from_sha512(
|
||||
sha512_hex(target_user_id),
|
||||
max_user_id=200000,
|
||||
workers=1,
|
||||
chunk_size=5000,
|
||||
)
|
||||
|
||||
self.assertEqual(recovered, target_user_id)
|
||||
|
||||
def test_resolve_auth_retries_with_hash_recovered_user_id_and_caches_result(self) -> None:
|
||||
target_user_id = 654321
|
||||
active_hash = sha512_hex(target_user_id)
|
||||
|
||||
with tempfile.TemporaryDirectory() as tempdir:
|
||||
cache_path = Path(tempdir) / "auth-cache.json"
|
||||
database_path = Path(tempdir) / "kakaotalk.db"
|
||||
database_path.write_text("", encoding="utf-8")
|
||||
verification_calls: list[int] = []
|
||||
|
||||
state = kakaotalk_mac.DetectionState(
|
||||
uuid="42C34717-27C3-538C-81E4-8B568287C7A0",
|
||||
candidate_user_ids=[111, 222],
|
||||
active_account_hash=active_hash,
|
||||
database_files=[database_path],
|
||||
)
|
||||
|
||||
def verify(candidate: kakaotalk_mac.ResolvedAuth) -> bool:
|
||||
verification_calls.append(candidate.user_id)
|
||||
return candidate.user_id == target_user_id
|
||||
|
||||
resolved = kakaotalk_mac.resolve_auth_state(
|
||||
state,
|
||||
verify_access=verify,
|
||||
cache_path=cache_path,
|
||||
max_user_id=700000,
|
||||
workers=1,
|
||||
chunk_size=10000,
|
||||
)
|
||||
|
||||
cache_payload = json.loads(cache_path.read_text(encoding="utf-8"))
|
||||
|
||||
self.assertEqual(verification_calls, [111, 222, target_user_id])
|
||||
self.assertEqual(resolved.user_id, target_user_id)
|
||||
self.assertEqual(resolved.database_path, database_path)
|
||||
self.assertEqual(cache_payload["user_id"], target_user_id)
|
||||
self.assertEqual(cache_payload["database_path"], str(database_path))
|
||||
|
||||
def test_load_cached_auth_treats_corrupt_json_as_cache_miss(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tempdir:
|
||||
cache_path = Path(tempdir) / "auth-cache.json"
|
||||
cache_path.write_text("{bad json\n", encoding="utf-8")
|
||||
|
||||
self.assertIsNone(kakaotalk_mac.load_cached_auth(cache_path))
|
||||
|
||||
def test_resolve_auth_reuses_detection_when_cache_is_corrupt(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tempdir:
|
||||
cache_path = Path(tempdir) / "auth-cache.json"
|
||||
cache_path.write_text("{bad json\n", encoding="utf-8")
|
||||
database_path = Path(tempdir) / "kakaotalk.db"
|
||||
database_path.write_text("", encoding="utf-8")
|
||||
resolved = make_resolved_auth(database_path=database_path, source="hash-recovery")
|
||||
|
||||
with (
|
||||
mock.patch.object(kakaotalk_mac, "collect_detection_state", return_value=mock.sentinel.state) as collect_state,
|
||||
mock.patch.object(kakaotalk_mac, "resolve_auth_state", return_value=resolved) as resolve_state,
|
||||
):
|
||||
cached = kakaotalk_mac.resolve_auth(
|
||||
refresh=False,
|
||||
cache_path=cache_path,
|
||||
user_id_override=None,
|
||||
uuid_override=None,
|
||||
max_user_id=1000,
|
||||
workers=1,
|
||||
chunk_size=100,
|
||||
)
|
||||
|
||||
self.assertEqual(cached, resolved)
|
||||
collect_state.assert_called_once_with(None)
|
||||
resolve_state.assert_called_once()
|
||||
|
||||
def test_resolve_auth_bypasses_cache_when_user_id_override_is_supplied(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tempdir:
|
||||
cache_path = Path(tempdir) / "auth-cache.json"
|
||||
database_path = Path(tempdir) / "kakaotalk.db"
|
||||
database_path.write_text("", encoding="utf-8")
|
||||
persistable = make_resolved_auth(database_path=database_path, source="cache")
|
||||
kakaotalk_mac.persist_auth_cache(persistable, cache_path)
|
||||
override_result = make_resolved_auth(user_id=999, database_path=database_path, source="candidate")
|
||||
|
||||
with (
|
||||
mock.patch.object(kakaotalk_mac, "collect_detection_state", return_value=mock.sentinel.state) as collect_state,
|
||||
mock.patch.object(kakaotalk_mac, "resolve_auth_state", return_value=override_result) as resolve_state,
|
||||
):
|
||||
resolved = kakaotalk_mac.resolve_auth(
|
||||
refresh=False,
|
||||
cache_path=cache_path,
|
||||
user_id_override=999,
|
||||
uuid_override=None,
|
||||
max_user_id=1000,
|
||||
workers=1,
|
||||
chunk_size=100,
|
||||
)
|
||||
|
||||
self.assertEqual(resolved, override_result)
|
||||
collect_state.assert_called_once_with(None)
|
||||
resolve_state.assert_called_once_with(
|
||||
mock.sentinel.state,
|
||||
verify_access=kakaotalk_mac.verify_database_access,
|
||||
cache_path=cache_path,
|
||||
user_id_override=999,
|
||||
max_user_id=1000,
|
||||
workers=1,
|
||||
chunk_size=100,
|
||||
)
|
||||
|
||||
def test_resolve_auth_bypasses_cache_when_uuid_override_is_supplied(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tempdir:
|
||||
cache_path = Path(tempdir) / "auth-cache.json"
|
||||
database_path = Path(tempdir) / "kakaotalk.db"
|
||||
database_path.write_text("", encoding="utf-8")
|
||||
persistable = make_resolved_auth(database_path=database_path, source="cache")
|
||||
kakaotalk_mac.persist_auth_cache(persistable, cache_path)
|
||||
override_result = make_resolved_auth(uuid="override-uuid", database_path=database_path, source="candidate")
|
||||
|
||||
with (
|
||||
mock.patch.object(kakaotalk_mac, "collect_detection_state", return_value=mock.sentinel.state) as collect_state,
|
||||
mock.patch.object(kakaotalk_mac, "resolve_auth_state", return_value=override_result) as resolve_state,
|
||||
):
|
||||
resolved = kakaotalk_mac.resolve_auth(
|
||||
refresh=False,
|
||||
cache_path=cache_path,
|
||||
user_id_override=None,
|
||||
uuid_override="override-uuid",
|
||||
max_user_id=1000,
|
||||
workers=1,
|
||||
chunk_size=100,
|
||||
)
|
||||
|
||||
self.assertEqual(resolved, override_result)
|
||||
collect_state.assert_called_once_with("override-uuid")
|
||||
resolve_state.assert_called_once_with(
|
||||
mock.sentinel.state,
|
||||
verify_access=kakaotalk_mac.verify_database_access,
|
||||
cache_path=cache_path,
|
||||
user_id_override=None,
|
||||
max_user_id=1000,
|
||||
workers=1,
|
||||
chunk_size=100,
|
||||
)
|
||||
|
||||
def test_render_auth_text_redacts_key_material(self) -> None:
|
||||
resolved = make_resolved_auth(key="super-secret-key", source="hash-recovery")
|
||||
|
||||
rendered = kakaotalk_mac.render_auth(resolved, output_format="text", cache_path=Path("/tmp/cache.json"))
|
||||
|
||||
self.assertNotIn("super-secret-key", rendered)
|
||||
self.assertNotIn("--key", rendered)
|
||||
self.assertIn("python3 scripts/kakaotalk_mac.py chats --limit 10 --json", rendered)
|
||||
|
||||
def test_build_passthrough_command_rejects_non_read_only_command(self) -> None:
|
||||
auth = make_resolved_auth()
|
||||
|
||||
with self.assertRaises(kakaotalk_mac.AuthResolutionError):
|
||||
kakaotalk_mac.build_passthrough_command("query", auth, ["DELETE FROM chat_logs"])
|
||||
|
||||
def test_build_parser_only_exposes_read_only_commands(self) -> None:
|
||||
parser = kakaotalk_mac.build_parser()
|
||||
subcommands = parser._subparsers._group_actions[0].choices
|
||||
|
||||
self.assertEqual(sorted(subcommands), ["auth", "chats", "messages", "schema", "search"])
|
||||
self.assertNotIn("query", subcommands)
|
||||
|
||||
def test_build_parser_rejects_negative_max_user_id(self) -> None:
|
||||
parser = kakaotalk_mac.build_parser()
|
||||
stderr = io.StringIO()
|
||||
|
||||
with self.assertRaises(SystemExit) as exit_context, mock.patch("sys.stderr", stderr):
|
||||
parser.parse_args(["auth", "--max-user-id", "-1"])
|
||||
|
||||
self.assertEqual(exit_context.exception.code, 2)
|
||||
self.assertIn("must be non-negative", stderr.getvalue())
|
||||
|
||||
def test_build_parser_rejects_non_positive_chunk_size(self) -> None:
|
||||
parser = kakaotalk_mac.build_parser()
|
||||
stderr = io.StringIO()
|
||||
|
||||
with self.assertRaises(SystemExit) as exit_context, mock.patch("sys.stderr", stderr):
|
||||
parser.parse_args(["auth", "--chunk-size", "0"])
|
||||
|
||||
self.assertEqual(exit_context.exception.code, 2)
|
||||
self.assertIn("must be positive", stderr.getvalue())
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Loading…
Add table
Add a link
Reference in a new issue