k-skill/flight-ticket-search/scripts/flight_ticket_search.py
Jeffrey (Dongkyu) Kim 84b3c993df Validate flight search arguments before bootstrap
Parse CLI arguments before installing the cached fast-flights runtime, add numeric bounds, validate date relationships early, and always use the pinned private runtime instead of arbitrary global installs.

Constraint: PR #224 helper should keep --help and invalid input paths offline and deterministic.

Rejected: Importing any global fast_flights package opportunistically | version drift can break TFS URL generation and query behavior.

Confidence: high

Scope-risk: narrow

Directive: Keep provider execution behind explicit valid commands; do not bootstrap dependencies for help or parser errors.

Tested: python3 -m py_compile flight-ticket-search/scripts/flight_ticket_search.py; FLIGHT_TICKET_SEARCH_BOOTSTRAPPED=1 python3 flight-ticket-search/scripts/flight_ticket_search.py --help; invalid return-date and step-days parser checks; git diff --check

Not-tested: Live Google Flights fetch through fast-flights.
2026-05-12 19:11:29 +09:00

501 lines
19 KiB
Python

#!/usr/bin/env python3
"""Free flight ticket search helper for the k-skill flight-ticket-search skill.
Uses fast-flights (Google Flights public surface scraper) in an isolated user cache venv.
No API key, login, CAPTCHA bypass, purchase, or booking automation.
"""
from __future__ import annotations
import argparse
import json
import os
import re
import statistics
import shutil
import subprocess
import sys
import time
from dataclasses import asdict
from datetime import date, datetime, timedelta
from pathlib import Path
from typing import Any, Iterable
from urllib.parse import urlencode
PINNED_FAST_FLIGHTS = "fast-flights==2.2"
CACHE_ROOT = Path.home() / ".cache" / "k-skill" / "flight-ticket-search"
VENV_DIR = CACHE_ROOT / "venv"
def ensure_runtime() -> None:
"""Install fast-flights into a private cache venv, then re-exec there."""
if os.environ.get("FLIGHT_TICKET_SEARCH_BOOTSTRAPPED") == "1":
return
py = VENV_DIR / "bin" / "python"
def candidate_python_executables() -> list[str]:
candidates = [sys.executable]
candidates.extend(
found for name in ("python3.13", "python3.12", "python3.11", "python3")
if (found := shutil.which(name))
)
seen: set[str] = set()
unique: list[str] = []
for candidate in candidates:
resolved = str(Path(candidate).resolve())
if resolved not in seen:
seen.add(resolved)
unique.append(candidate)
return unique
def create_venv() -> None:
CACHE_ROOT.mkdir(parents=True, exist_ok=True)
errors: list[str] = []
for python in candidate_python_executables():
shutil.rmtree(VENV_DIR, ignore_errors=True)
try:
subprocess.check_call([python, "-m", "venv", str(VENV_DIR)])
return
except (OSError, subprocess.CalledProcessError) as exc:
errors.append(f"{python}: {exc}")
raise RuntimeError(
"Unable to create flight-ticket-search venv with available Python interpreters: "
+ "; ".join(errors)
)
def venv_has_fast_flights() -> bool:
if not py.exists():
return False
return subprocess.run(
[str(py), "-c", "import fast_flights"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
).returncode == 0
if not py.exists():
create_venv()
if not venv_has_fast_flights():
try:
subprocess.check_call([str(py), "-m", "ensurepip", "--upgrade"])
subprocess.check_call([
str(py),
"-m",
"pip",
"install",
"--disable-pip-version-check",
"-q",
PINNED_FAST_FLIGHTS,
])
except (OSError, subprocess.CalledProcessError):
# Recover from interrupted or pip-less cache venvs before surfacing a hard failure.
shutil.rmtree(VENV_DIR, ignore_errors=True)
create_venv()
subprocess.check_call([str(py), "-m", "ensurepip", "--upgrade"])
subprocess.check_call([
str(py),
"-m",
"pip",
"install",
"--disable-pip-version-check",
"-q",
PINNED_FAST_FLIGHTS,
])
env = os.environ.copy()
env["FLIGHT_TICKET_SEARCH_BOOTSTRAPPED"] = "1"
os.execve(str(py), [str(py), __file__, *sys.argv[1:]], env)
def parse_date(s: str) -> date:
return datetime.strptime(s, "%Y-%m-%d").date()
def positive_int(value: str) -> int:
parsed = int(value)
if parsed < 1:
raise argparse.ArgumentTypeError("must be a positive integer")
return parsed
def nonnegative_int(value: str) -> int:
parsed = int(value)
if parsed < 0:
raise argparse.ArgumentTypeError("must be zero or a positive integer")
return parsed
def nonnegative_float(value: str) -> float:
parsed = float(value)
if parsed < 0:
raise argparse.ArgumentTypeError("must be zero or a positive number")
return parsed
def iter_dates(start: date, end: date, step_days: int) -> Iterable[date]:
d = start
while d <= end:
yield d
d += timedelta(days=step_days)
def parse_price(price_text: str | None) -> int | None:
if not price_text or "unavailable" in price_text.lower():
return None
digits = re.sub(r"[^0-9]", "", price_text)
return int(digits) if digits else None
def money_krw(value: int | float | None) -> str:
if value is None:
return "확인 불가"
return f"{int(round(value)):,}"
def validate_airport(code: str, field: str) -> str:
code = code.strip().upper()
if not re.fullmatch(r"[A-Z]{3}", code):
raise SystemExit(f"{field} must be a 3-letter IATA airport code, got: {code!r}")
return code
def build_query_url(flight_data: list[Any], trip: str, adults: int, seat: str) -> str:
from fast_flights.flights_impl import Passengers, TFSData
tfs = TFSData.from_interface(
flight_data=flight_data,
trip=trip,
passengers=Passengers(adults=adults),
seat=seat,
).as_b64().decode("utf-8")
params = {"tfs": tfs, "hl": "en", "tfu": "EgQIABABIgA", "curr": "KRW"}
return "https://www.google.com/travel/flights?" + urlencode(params)
def make_flight_data(from_airport: str, to_airport: str, outbound: str, return_date: str | None = None) -> tuple[list[Any], str]:
origin = validate_airport(from_airport, "from")
dest = validate_airport(to_airport, "to")
if origin == dest:
raise SystemExit("from and to airports must be different")
outbound_date = parse_date(outbound)
from fast_flights import FlightData
data = [FlightData(date=outbound_date.isoformat(), from_airport=origin, to_airport=dest)]
if return_date:
inbound_date = parse_date(return_date)
if inbound_date < outbound_date:
raise SystemExit("return-date must be on or after date")
data.append(FlightData(date=inbound_date.isoformat(), from_airport=dest, to_airport=origin))
return data, "round-trip"
return data, "one-way"
def fetch_flights(flight_data: list[Any], trip: str, adults: int, seat: str) -> Any:
from fast_flights import Passengers, get_flights
return get_flights(
flight_data=flight_data,
trip=trip,
passengers=Passengers(adults=adults),
seat=seat,
fetch_mode="fallback",
)
def normalize_flight(f: Any) -> dict[str, Any]:
raw = asdict(f)
price_value = parse_price(raw.get("price"))
raw["price_value"] = price_value
raw["price_text"] = money_krw(price_value) if price_value is not None else raw.get("price") or "확인 불가"
raw["quality"] = "complete" if raw.get("name") and raw.get("departure") and raw.get("arrival") else "partial"
return raw
def summarize_result(res: Any, query_url: str, limit: int) -> dict[str, Any]:
flights = [normalize_flight(f) for f in res.flights]
priced = [f for f in flights if f["price_value"] is not None]
complete = [f for f in priced if f["quality"] == "complete"]
best_pool = complete or priced
best_pool = sorted(best_pool, key=lambda x: x["price_value"] if x["price_value"] is not None else 10**18)
values = [f["price_value"] for f in priced if f["price_value"] is not None]
return {
"meta": {
"provider": "google-flights-fast-flights",
"source": "Google Flights public search surface via fast-flights",
"price_band": getattr(res, "current_price", ""),
"currency": "KRW",
"queried_at": datetime.now().astimezone().isoformat(timespec="seconds"),
"booking_search_url": query_url,
"note": "예약 링크는 특정 판매자 결제 deep link가 아니라 Google Flights 검색 결과 링크입니다.",
},
"stats": {
"result_count": len(flights),
"priced_count": len(priced),
"complete_count": len(complete),
"min_price": min(values) if values else None,
"avg_price": statistics.mean(values) if values else None,
"max_price": max(values) if values else None,
},
"flights": best_pool[:limit],
}
def validate_date_text(value: str, field: str) -> date:
try:
return parse_date(value)
except ValueError as exc:
raise SystemExit(f"{field} must be YYYY-MM-DD, got: {value!r}") from exc
def validate_month_text(value: str) -> None:
try:
datetime.strptime(value + "-01", "%Y-%m-%d")
except ValueError as exc:
raise SystemExit(f"month must be YYYY-MM, got: {value!r}") from exc
def validate_month_day_text(value: str) -> None:
try:
datetime.strptime("2000-" + value, "%Y-%m-%d")
except ValueError as exc:
raise SystemExit(f"month-day must be MM-DD, got: {value!r}") from exc
def preflight_validate_args(args: argparse.Namespace) -> None:
validate_airport(args.from_airport, "from")
validate_airport(args.to_airport, "to")
if args.from_airport.strip().upper() == args.to_airport.strip().upper():
raise SystemExit("from and to airports must be different")
if args.command == "search":
outbound = validate_date_text(args.date, "date")
if args.return_date:
inbound = validate_date_text(args.return_date, "return-date")
if inbound < outbound:
raise SystemExit("return-date must be on or after date")
elif args.command == "compare-month":
validate_month_text(args.month)
elif args.command == "compare-range":
start = validate_date_text(args.start_date, "start-date")
end = validate_date_text(args.end_date, "end-date")
if end < start:
raise SystemExit("end-date must be on or after start-date")
elif args.command == "compare-years":
validate_month_day_text(args.month_day)
try:
years = [int(x) for x in re.split(r"[, ]+", args.years.strip()) if x]
except ValueError as exc:
raise SystemExit("years must be comma-separated numbers, e.g. 2026,2027") from exc
if not years:
raise SystemExit("years is required, e.g. 2026,2027")
def command_search(args: argparse.Namespace) -> dict[str, Any]:
data, trip = make_flight_data(args.from_airport, args.to_airport, args.date, args.return_date)
res = fetch_flights(data, trip, args.adults, args.seat)
url = build_query_url(data, trip, args.adults, args.seat)
out = summarize_result(res, url, args.limit)
out["query"] = {
"from": args.from_airport.upper(),
"to": args.to_airport.upper(),
"date": args.date,
"return_date": args.return_date,
"trip": trip,
"adults": args.adults,
"seat": args.seat,
}
return out
def scan_dates(args: argparse.Namespace, dates: list[date]) -> dict[str, Any]:
rows: list[dict[str, Any]] = []
for idx, d in enumerate(dates):
try:
data, trip = make_flight_data(args.from_airport, args.to_airport, d.isoformat(), None)
res = fetch_flights(data, trip, args.adults, args.seat)
url = build_query_url(data, trip, args.adults, args.seat)
summary = summarize_result(res, url, args.limit)
rows.append({
"date": d.isoformat(),
"ok": True,
"min_price": summary["stats"]["min_price"],
"avg_price": summary["stats"]["avg_price"],
"priced_count": summary["stats"]["priced_count"],
"price_band": summary["meta"]["price_band"],
"top": summary["flights"][: min(3, args.limit)],
"booking_search_url": url,
})
except Exception as e: # keep scans robust
rows.append({"date": d.isoformat(), "ok": False, "error": f"{type(e).__name__}: {str(e)[:300]}"})
if idx != len(dates) - 1 and args.sleep > 0:
time.sleep(args.sleep)
prices = [r["min_price"] for r in rows if r.get("ok") and r.get("min_price") is not None]
ok_rows = [r for r in rows if r.get("ok")]
cheapest = sorted((r for r in ok_rows if r.get("min_price") is not None), key=lambda r: r["min_price"])[: args.limit]
return {
"meta": {
"provider": "google-flights-fast-flights",
"currency": "KRW",
"queried_at": datetime.now().astimezone().isoformat(timespec="seconds"),
"sampled_dates": len(rows),
"successful_dates": len(ok_rows),
"note": "월/연도 비교는 지정 날짜들을 실제 조회해 산출한 샘플 기반 비교입니다. Google Flights 가격은 수시 변동됩니다.",
},
"query": {
"from": args.from_airport.upper(),
"to": args.to_airport.upper(),
"adults": args.adults,
"seat": args.seat,
},
"stats": {
"min_price": min(prices) if prices else None,
"avg_of_daily_min": statistics.mean(prices) if prices else None,
"max_of_daily_min": max(prices) if prices else None,
},
"cheapest_dates": cheapest,
"rows": rows,
}
def command_compare_month(args: argparse.Namespace) -> dict[str, Any]:
month_start = datetime.strptime(args.month + "-01", "%Y-%m-%d").date()
if month_start.month == 12:
month_end = date(month_start.year + 1, 1, 1) - timedelta(days=1)
else:
month_end = date(month_start.year, month_start.month + 1, 1) - timedelta(days=1)
step = 1 if args.sample == "daily" else 7
dates = list(iter_dates(month_start, month_end, step))
if args.max_dates:
dates = dates[: args.max_dates]
out = scan_dates(args, dates)
out["query"]["month"] = args.month
out["query"]["sample"] = args.sample
return out
def command_compare_range(args: argparse.Namespace) -> dict[str, Any]:
start = parse_date(args.start_date)
end = parse_date(args.end_date)
if end < start:
raise SystemExit("end-date must be on or after start-date")
step = args.step_days
dates = list(iter_dates(start, end, step))
if args.max_dates:
dates = dates[: args.max_dates]
out = scan_dates(args, dates)
out["query"]["start_date"] = args.start_date
out["query"]["end_date"] = args.end_date
out["query"]["step_days"] = step
return out
def command_compare_years(args: argparse.Namespace) -> dict[str, Any]:
years = [int(x) for x in re.split(r"[, ]+", args.years.strip()) if x]
if not years:
raise SystemExit("years is required, e.g. 2026,2027")
mmdd = args.month_day
dates = [datetime.strptime(f"{year}-{mmdd}", "%Y-%m-%d").date() for year in years]
out = scan_dates(args, dates)
out["query"]["years"] = years
out["query"]["month_day"] = mmdd
return out
def print_markdown(payload: dict[str, Any]) -> None:
meta = payload.get("meta", {})
query = payload.get("query", {})
stats = payload.get("stats", {})
print(f"provider: {meta.get('provider')}")
print(f"queried_at: {meta.get('queried_at')}")
print(f"query: {json.dumps(query, ensure_ascii=False)}")
print()
if "flights" in payload:
print(f"price_band: {meta.get('price_band')}")
print(f"min/avg/max: {money_krw(stats.get('min_price'))} / {money_krw(stats.get('avg_price'))} / {money_krw(stats.get('max_price'))}")
print(f"booking_search_url: {meta.get('booking_search_url')}")
print("\nflights:")
for i, f in enumerate(payload.get("flights", []), 1):
print(f"{i}. {f.get('name') or '항공편 상세 확인 불가'} | {f.get('departure') or '시간 확인 불가'} -> {f.get('arrival') or '시간 확인 불가'} | {f.get('duration') or '소요시간 확인 불가'} | stops={f.get('stops')} | {f.get('price_text')}")
else:
print(f"sampled/success: {meta.get('sampled_dates')} / {meta.get('successful_dates')}")
print(f"min / avg(daily min) / max(daily min): {money_krw(stats.get('min_price'))} / {money_krw(stats.get('avg_of_daily_min'))} / {money_krw(stats.get('max_of_daily_min'))}")
print("\ncheapest_dates:")
for i, r in enumerate(payload.get("cheapest_dates", []), 1):
print(f"{i}. {r.get('date')} | min={money_krw(r.get('min_price'))} | avg={money_krw(r.get('avg_price'))} | band={r.get('price_band')} | {r.get('booking_search_url')}")
failures = [r for r in payload.get("rows", []) if not r.get("ok")]
if failures:
print("\nfailures:")
for r in failures[:5]:
print(f"- {r.get('date')}: {r.get('error')}")
if meta.get("note"):
print(f"\nnote: {meta.get('note')}")
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(description="Free Google Flights-based flight search and comparison helper")
sub = p.add_subparsers(dest="command", required=True)
def add_common(sp: argparse.ArgumentParser) -> None:
sp.add_argument("--from", dest="from_airport", required=True, help="IATA origin airport, e.g. ICN")
sp.add_argument("--to", dest="to_airport", required=True, help="IATA destination airport, e.g. NRT")
sp.add_argument("--adults", type=positive_int, default=1)
sp.add_argument("--seat", choices=["economy", "premium-economy", "business", "first"], default="economy")
sp.add_argument("--limit", type=positive_int, default=5)
sp.add_argument("--sleep", type=nonnegative_float, default=1.5, help="seconds between comparison queries")
sp.add_argument("--format", choices=["json", "markdown"], default="markdown")
s = sub.add_parser("search", help="single one-way or round-trip search")
add_common(s)
s.add_argument("--date", required=True, help="YYYY-MM-DD outbound date")
s.add_argument("--return-date", help="YYYY-MM-DD return date for round trip")
s.set_defaults(func=command_search)
m = sub.add_parser("compare-month", help="sample a whole month and rank cheapest dates")
add_common(m)
m.add_argument("--month", required=True, help="YYYY-MM")
m.add_argument("--sample", choices=["weekly", "daily"], default="weekly")
m.add_argument("--max-dates", type=nonnegative_int, default=0, help="cap dates for quick tests; 0 means no cap")
m.set_defaults(func=command_compare_month)
r = sub.add_parser("compare-range", help="compare a custom date range")
add_common(r)
r.add_argument("--start-date", required=True)
r.add_argument("--end-date", required=True)
r.add_argument("--step-days", type=positive_int, default=7)
r.add_argument("--max-dates", type=nonnegative_int, default=0)
r.set_defaults(func=command_compare_range)
y = sub.add_parser("compare-years", help="compare the same month-day across years")
add_common(y)
y.add_argument("--years", required=True, help="comma separated years, e.g. 2026,2027")
y.add_argument("--month-day", required=True, help="MM-DD, e.g. 06-01")
y.add_argument("--max-dates", type=nonnegative_int, default=0)
y.set_defaults(func=command_compare_years)
return p
def main() -> None:
parser = build_parser()
args = parser.parse_args()
preflight_validate_args(args)
ensure_runtime()
if getattr(args, "max_dates", 0) == 0:
args.max_dates = None
payload = args.func(args)
if args.format == "json":
print(json.dumps(payload, ensure_ascii=False, indent=2))
else:
print_markdown(payload)
if __name__ == "__main__":
main()