k-skill/foresttrip-vacancy/scripts/run_foresttrip_vacancy.py
Donghun Seol 00a6d9feae fix(foresttrip-vacancy): exclude 예비 rooms, gate useDt strictly, dedup duplicate rooms
월별예약조회 API가 srchDate 단일 일자 요청에도 5일 윈도우를 반환하고,
"예비"로 표기된 운영자 보유분이 raw 응답에 포함되며, 같은 객실이 다른
goodsId로 중복 표시되는 세 가지 문제를 한꺼번에 fix한다.

수정:
- collect_results 안에 strict useDt gate 추가 (today~last_day 범위 밖 행 차단)
- is_reserve_room() helper로 goodsNm에 "예비" 포함 객실 제외
- (forest_id, use_dt, name) 단위 dedup으로 중복 행 제거
- is_available()는 시그니처/로직 변경 없이 booking-state predicate 유지

추가:
- foresttrip-vacancy/tests/ 18개 단위 테스트 (mock + fixture 기반)
- IsReserveRoomTest, IsAvailableTest, CollectResultsFilterTest,
  StrictUseDtGateTest, GroundTruthTest 다섯 클래스
- 거제·구재봉 fixture로 사용자 라이브 검증 결과 회귀 보호
- package.json lint·test 스크립트에 등록

문서:
- SKILL.md: API 5일 윈도우/예비 객실/중복 dedup 자동 처리 명시 + 회복 시나리오 보강
- docs/features/foresttrip-vacancy.md: 기본 흐름 6단계와 주의할 점 보강

사용자 라이브 검증 ground truth (2026-05-12 기준):
- 거제자연휴양림 5/13 ~9개, 5/16 0개, 5/17 19개, 5/23 0개, 5/24 0개
- 구재봉자연휴양림 5/16 1개 (206호 쑥부쟁이방)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-29 20:57:25 +09:00

541 lines
18 KiB
Python
Executable file

#!/usr/bin/env python3
"""Read-only foresttrip.go.kr vacancy lookup helper.
The script logs in with Playwright to obtain a CSRF token and session cookies,
extracts forest IDs from the official monthly reservation status page, then
queries the read-only monthly availability JSON endpoint.
It intentionally does not click booking buttons, submit reservation forms,
handle payment, solve captcha, or bypass queues.
"""
from __future__ import annotations
import argparse
import concurrent.futures
import json
import os
import sys
import time
import urllib.error
import urllib.request
from dataclasses import asdict, dataclass
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
LOGIN_URL = "https://www.foresttrip.go.kr/com/login.do"
RSRVT_PAGE = "https://www.foresttrip.go.kr/rep/or/sssn/monthRsrvtSmplStatus.do"
POST_URL = "https://www.foresttrip.go.kr/rep/or/selectRsrvtAvailInfoListForMonthRsrvtSmpl.do"
DEFAULT_CONCURRENCY = 4
MAX_CONCURRENCY = 5
DEFAULT_WEEK_RANGE = 1
CATEGORY_CODES = {"01", "02"}
RESERVE_ROOM_MARKER = "예비"
@dataclass
class Session:
cookies: dict[str, str]
csrf: str
user_agent: str
forests: dict[str, str]
expires_at: float
def parse_csv(value: str) -> list[str]:
return [part.strip() for part in value.split(",") if part.strip()]
def parse_categories(value: str) -> tuple[str, ...]:
categories = parse_csv(value)
if not categories:
raise argparse.ArgumentTypeError("must include at least one category code")
invalid = [category for category in categories if category not in CATEGORY_CODES]
if invalid:
raise argparse.ArgumentTypeError(
"unknown category code(s): "
+ ", ".join(invalid)
+ " (allowed: 01=lodging, 02=camping)"
)
return tuple(dict.fromkeys(categories))
def parse_dates(value: str) -> tuple[str, ...]:
dates = parse_csv(value)
if not dates:
raise argparse.ArgumentTypeError("must include at least one YYYYMMDD date")
today = datetime.now().date()
normalized: list[str] = []
for raw_date in dates:
try:
parsed = datetime.strptime(raw_date, "%Y%m%d").date()
except ValueError as exc:
raise argparse.ArgumentTypeError(f"invalid YYYYMMDD date: {raw_date}") from exc
if parsed.strftime("%Y%m%d") != raw_date:
raise argparse.ArgumentTypeError(f"invalid YYYYMMDD date: {raw_date}")
if parsed < today:
raise argparse.ArgumentTypeError(f"date is in the past: {raw_date}")
normalized.append(raw_date)
return tuple(sorted(dict.fromkeys(normalized)))
def parse_concurrency(value: str) -> int:
try:
concurrency = int(value)
except ValueError as exc:
raise argparse.ArgumentTypeError("must be an integer") from exc
if not 1 <= concurrency <= MAX_CONCURRENCY:
raise argparse.ArgumentTypeError(f"must be between 1 and {MAX_CONCURRENCY}")
return concurrency
def parse_week_range(value: str) -> int:
try:
week_range = int(value)
except ValueError as exc:
raise argparse.ArgumentTypeError("must be an integer") from exc
if week_range < 1:
raise argparse.ArgumentTypeError("must be at least 1")
return week_range
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Read-only foresttrip.go.kr vacancy lookup.",
)
target = parser.add_argument_group("target selection")
target.add_argument("--all", action="store_true", help="Scan all extracted forest IDs.")
target.add_argument(
"--forest-id",
action="append",
help="ForestTrip insttId. Can be passed multiple times or comma-separated.",
)
target.add_argument(
"--forest-name",
action="append",
help="Substring to match against official forest names.",
)
output = parser.add_mutually_exclusive_group()
output.add_argument("--json", action="store_true", help="Print JSON output.")
output.add_argument("--text", action="store_true", help="Print human-readable output.")
parser.add_argument("--dates", type=parse_dates, help="Comma-separated YYYYMMDD dates.")
parser.add_argument(
"--categories",
type=parse_categories,
default=("01", "02"),
help="Comma-separated category codes: 01=lodging, 02=camping.",
)
parser.add_argument(
"--concurrency",
type=parse_concurrency,
default=DEFAULT_CONCURRENCY,
help=f"Parallel POST workers, 1-{MAX_CONCURRENCY}.",
)
parser.add_argument("--week-range", type=parse_week_range, help="Weeks ahead to scan when --dates is omitted.")
parser.add_argument("--refresh-session", action="store_true", help="Ignore session cache.")
parser.add_argument("--check-deps", action="store_true", help="Check Python and Playwright runtime dependencies.")
parser.add_argument(
"--session-cache",
default="~/.cache/k-skill/foresttrip-vacancy/session.json",
help="Session cache path.",
)
args = parser.parse_args()
if args.all and (args.forest_id or args.forest_name):
parser.error("--all cannot be combined with --forest-id or --forest-name")
if args.dates and args.week_range is not None:
parser.error("--week-range cannot be combined with --dates; the lookup range is derived from --dates")
return args
def require_env(name: str) -> str:
value = os.getenv(name)
if not value:
raise SystemExit(f"missing required environment variable: {name}")
return value
def check_dependencies(*, launch_browser: bool = True) -> None:
if sys.version_info < (3, 9):
raise SystemExit("python 3.9+ is required")
try:
from playwright.sync_api import Error as PlaywrightError
from playwright.sync_api import sync_playwright
except ImportError as exc:
raise SystemExit(
"playwright is required. Install with: python3 -m pip install playwright"
) from exc
if not launch_browser:
return
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
browser.close()
except PlaywrightError as exc:
raise SystemExit(
"playwright chromium browser is required. Install with: "
"python3 -m playwright install chromium"
) from exc
def load_session_cache(path: Path) -> Session | None:
if not path.exists():
return None
try:
data = json.loads(path.read_text(encoding="utf-8"))
if time.time() > float(data.get("expires_at", 0)):
return None
return Session(
cookies=dict(data["cookies"]),
csrf=str(data["csrf"]),
user_agent=str(data["user_agent"]),
forests=dict(data["forests"]),
expires_at=float(data["expires_at"]),
)
except Exception:
return None
def save_session_cache(path: Path, session: Session) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(asdict(session), ensure_ascii=False), encoding="utf-8")
try:
path.chmod(0o600)
except OSError:
pass
def bootstrap_session(*, forest_id: str, forest_pw: str, ttl_sec: int = 600) -> Session:
try:
from playwright.sync_api import Error as PlaywrightError
from playwright.sync_api import sync_playwright
except ImportError as exc:
raise SystemExit(
"playwright is required. Install with: python3 -m pip install playwright "
"&& python3 -m playwright install chromium"
) from exc
with sync_playwright() as p:
try:
browser = p.chromium.launch(headless=True)
except PlaywrightError as exc:
raise SystemExit(
"playwright chromium browser is required. Install with: "
"python3 -m playwright install chromium"
) from exc
page = browser.new_page()
page.goto(LOGIN_URL)
page.fill("#mmberId", forest_id)
page.fill("#gnrlMmberPssrd", forest_pw)
page.click("input.loginBtn")
page.wait_for_load_state("networkidle")
page.goto(RSRVT_PAGE)
page.wait_for_load_state("networkidle")
csrf_locator = page.locator('input[name="_csrf"]')
if csrf_locator.count() == 0:
browser.close()
raise SystemExit("login succeeded page did not expose a CSRF token")
csrf = csrf_locator.first.get_attribute("value") or ""
forests: dict[str, str] = {}
regions = page.evaluate(
"""
() => Array.from(document.querySelector('#srchSido').options)
.slice(1)
.map(o => ({ value: o.value, text: o.textContent.trim() }))
"""
)
for region in regions:
value = region.get("value")
if not value:
continue
page.select_option("#srchSido", value=value)
page.wait_for_timeout(500)
options = page.evaluate(
"""
() => Array.from(document.querySelector('#srchInstt').options)
.slice(1)
.map(o => ({ value: o.value, text: o.textContent.trim() }))
"""
)
for opt in options:
fid = str(opt.get("value") or "").strip()
name = str(opt.get("text") or "").strip()
if fid and name:
forests[fid] = name
cookies = {cookie["name"]: cookie["value"] for cookie in page.context.cookies()}
user_agent = page.evaluate("() => navigator.userAgent")
browser.close()
if not csrf or not cookies:
raise SystemExit("failed to bootstrap foresttrip session")
if not forests:
raise SystemExit("failed to extract forest list from reservation page")
return Session(
cookies=cookies,
csrf=csrf,
user_agent=user_agent,
forests=forests,
expires_at=time.time() + ttl_sec,
)
def get_session(args: argparse.Namespace) -> Session:
cache_path = Path(args.session_cache).expanduser()
if not args.refresh_session:
cached = load_session_cache(cache_path)
if cached is not None:
return cached
session = bootstrap_session(
forest_id=require_env("KSKILL_FORESTTRIP_ID"),
forest_pw=require_env("KSKILL_FORESTTRIP_PASSWORD"),
)
save_session_cache(cache_path, session)
return session
def split_csv(values: list[str] | None) -> list[str]:
out: list[str] = []
for value in values or []:
out.extend(part.strip() for part in value.split(",") if part.strip())
return out
def resolve_targets(args: argparse.Namespace, forests: dict[str, str]) -> dict[str, str]:
if args.all:
return dict(sorted(forests.items(), key=lambda item: item[1]))
requested_ids = split_csv(args.forest_id)
requested_names = split_csv(args.forest_name)
targets: dict[str, str] = {}
for fid in requested_ids:
targets[fid] = forests.get(fid, fid)
for needle in requested_names:
matches = {
fid: name
for fid, name in forests.items()
if needle.replace(" ", "") in name.replace(" ", "")
}
targets.update(matches)
if not targets:
raise SystemExit("choose a target with --all, --forest-id, or --forest-name")
return dict(sorted(targets.items(), key=lambda item: item[1]))
def build_headers(session: Session) -> dict[str, str]:
cookie_header = "; ".join(f"{k}={v}" for k, v in session.cookies.items())
return {
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Language": "ko-KR,ko;q=0.9",
"Content-Type": "application/json; charset=UTF-8",
"Cookie": cookie_header,
"Origin": "https://www.foresttrip.go.kr",
"Referer": RSRVT_PAGE,
"User-Agent": session.user_agent,
"X-CSRF-Token": session.csrf,
"X-Requested-With": "XMLHttpRequest",
}
def fetch_one(
*,
session: Session,
forest_id: str,
category: str,
today: str,
last_day: str,
) -> tuple[str, str, list[dict[str, Any]] | None, str | None]:
payload = {
"insttId": forest_id,
"upperGoodsClsscCd": category,
"srchDate": today,
"lastDay": last_day,
"inqurSctin": "02",
}
body = json.dumps(payload).encode("utf-8")
request = urllib.request.Request(
POST_URL,
data=body,
headers=build_headers(session),
method="POST",
)
try:
with urllib.request.urlopen(request, timeout=30) as response:
if response.status != 200:
return forest_id, category, None, f"http_{response.status}"
data = json.loads(response.read().decode("utf-8"))
if isinstance(data, list):
return forest_id, category, data, None
return forest_id, category, None, "unexpected_payload"
except urllib.error.HTTPError as exc:
return forest_id, category, None, f"http_{exc.code}"
except Exception as exc:
return forest_id, category, None, str(exc)
def is_available(row: dict[str, Any]) -> bool:
return row.get("rsrvtAvail") == "Y" and row.get("rsrvtCnt") == 0
def is_reserve_room(row: dict[str, Any]) -> bool:
return RESERVE_ROOM_MARKER in (row.get("goodsNm") or "")
def normalize_row(row: dict[str, Any], forests: dict[str, str]) -> dict[str, Any]:
instt_id = str(row.get("insttId") or "")
return {
"forest_id": instt_id,
"forest": forests.get(instt_id, row.get("insttNm") or instt_id),
"use_dt": row.get("useDt") or "",
"day": row.get("dywkDtTpcd"),
"name": row.get("goodsNm") or "",
"area": row.get("insttArea"),
"capacity": row.get("mxmmAccptCnt"),
"category": row.get("goodsClsscNm"),
"region": row.get("insttAreaNm"),
"waiting_possible": row.get("wtngPssblYn"),
}
def collect_results(
*,
session: Session,
targets: dict[str, str],
categories: tuple[str, ...],
dates: tuple[str, ...] | None,
week_range: int | None,
concurrency: int,
) -> dict[str, Any]:
now = datetime.now()
today = now.strftime("%Y%m%d")
last_day = (
max(dates)
if dates
else (now + timedelta(weeks=week_range or DEFAULT_WEEK_RANGE)).strftime("%Y%m%d")
)
date_filter = set(dates) if dates else None
failures: list[dict[str, str]] = []
rows: list[dict[str, Any]] = []
jobs = [
(forest_id, category)
for forest_id in targets
for category in categories
]
with concurrent.futures.ThreadPoolExecutor(max_workers=max(1, concurrency)) as pool:
futures = [
pool.submit(
fetch_one,
session=session,
forest_id=forest_id,
category=category,
today=today,
last_day=last_day,
)
for forest_id, category in jobs
]
for future in concurrent.futures.as_completed(futures):
forest_id, category, data, error = future.result()
if error is not None or data is None:
failures.append({"forest_id": forest_id, "category": category, "error": error or "unknown"})
continue
for row in data:
if not is_available(row):
continue
if is_reserve_room(row):
continue
use_dt = row.get("useDt") or ""
if use_dt < today or use_dt > last_day:
continue
normalized = normalize_row(row, session.forests)
if date_filter is not None and normalized["use_dt"] not in date_filter:
continue
rows.append(normalized)
seen: set[tuple[str, str, str]] = set()
deduped: list[dict[str, Any]] = []
for row in rows:
key = (row["forest_id"], row["use_dt"], row["name"])
if key in seen:
continue
seen.add(key)
deduped.append(row)
rows = deduped
grouped: dict[str, dict[str, list[dict[str, Any]]]] = {}
for row in sorted(rows, key=lambda item: (item["forest"], item["use_dt"], item["name"])):
grouped.setdefault(row["forest"], {}).setdefault(row["use_dt"], []).append(row)
return {
"forests_scanned": len(targets),
"filter_hits": len(rows),
"fetch_failures": len(failures),
"failures": failures[:20],
"concurrency": concurrency,
"date_range": {"from": today, "to": last_day},
"results": [
{
"forest": forest_name,
"dates": [
{"use_dt": use_dt, "rooms": rooms}
for use_dt, rooms in sorted(rows_by_date.items())
],
}
for forest_name, rows_by_date in sorted(grouped.items())
],
}
def print_text(payload: dict[str, Any]) -> None:
print("=== ForestTrip Vacancy Lookup ===")
print(
f"filter_hits: {payload['filter_hits']} "
f"fetch_failures: {payload['fetch_failures']} "
f"forests_scanned: {payload['forests_scanned']}"
)
if not payload["results"]:
print("(no available rooms at lookup time)")
return
for forest in payload["results"]:
print(f"\n{forest['forest']}")
for date_group in forest["dates"]:
rooms = date_group["rooms"]
print(f" {date_group['use_dt']} - {len(rooms)} slot(s)")
for room in rooms[:8]:
capacity = room["capacity"] if room["capacity"] is not None else "?"
area = room["area"] if room["area"] is not None else "?"
print(f" - {room['name']} / {room['category']} / {area}sqm / max {capacity}")
def main() -> int:
args = parse_args()
if args.check_deps:
check_dependencies()
print("foresttrip-vacancy dependencies look ready")
return 0
session = get_session(args)
targets = resolve_targets(args, session.forests)
payload = collect_results(
session=session,
targets=targets,
categories=args.categories,
dates=args.dates,
week_range=args.week_range,
concurrency=args.concurrency,
)
if args.text and not args.json:
print_text(payload)
else:
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0 if payload["fetch_failures"] == 0 else 1
if __name__ == "__main__":
raise SystemExit(main())