k-skill/scripts/ktx_booking.py
Jeffrey (Dongkyu) Kim 3d8008f2f2 Harden KTX seat detail trust boundaries
Fail closed when Korail reports remaining seats but returns malformed, sentinel-only, or schema-incomplete detail rows, and keep the seat research endpoints inside the same Dynapath/Sid request boundary as other protected mobile calls.\n\nConstraint: PR #298 review required TDD regressions for false empty-seat success and raw external payload failures.\nRejected: Preserve lenient normalization of partial seat rows | it can mislead automation with authoritative empty results.\nConfidence: high\nScope-risk: narrow\nDirective: Keep KTX seat detail parsing fail-closed unless live Korail evidence proves a specific empty or partial shape is valid.\nTested: PYTHONPATH=.:scripts python3 -m unittest scripts.test_ktx_booking; node --test scripts/skill-docs.test.js; npm run typecheck; python3 -m compileall -q scripts/ktx_booking.py scripts/test_ktx_booking.py; ruff check scripts/ktx_booking.py scripts/test_ktx_booking.py; shellcheck scripts/validate-skills.sh; bash scripts/validate-skills.sh; PYENV_VERSION=3.11.9 npm run ci\nNot-tested: live Korail seat-detail network flow with production credentials
2026-05-29 01:55:52 +09:00

1270 lines
48 KiB
Python
Executable file

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import base64
import json
import os
import random
import re
import string
import sys
import time
from functools import reduce
try:
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
except ModuleNotFoundError as exc:
AES = None
pad = None
_CRYPTO_IMPORT_ERROR = exc
else:
_CRYPTO_IMPORT_ERROR = None
try:
from korail2 import (
AdultPassenger,
ChildPassenger,
Korail,
KorailError,
NeedToLoginError,
NoResultsError,
Passenger,
ReserveOption,
SeniorPassenger,
SoldOutError,
ToddlerPassenger,
TrainType,
)
import korail2.korail2 as korail_mod
except ModuleNotFoundError as exc:
_KORAIL_IMPORT_ERROR = exc
class KorailError(Exception):
pass
class NeedToLoginError(KorailError):
pass
class NoResultsError(KorailError):
pass
class SoldOutError(KorailError):
pass
class Passenger:
def __init__(self, count: int = 1):
self.count = count
@staticmethod
def reduce(passengers):
return passengers
def get_dict(self, _: int) -> dict[str, str]:
return {}
class AdultPassenger(Passenger):
pass
class ChildPassenger(Passenger):
pass
class ToddlerPassenger(Passenger):
pass
class SeniorPassenger(Passenger):
pass
class ReserveOption:
GENERAL_FIRST = "GENERAL_FIRST"
GENERAL_ONLY = "GENERAL_ONLY"
SPECIAL_FIRST = "SPECIAL_FIRST"
SPECIAL_ONLY = "SPECIAL_ONLY"
class TrainType:
# Fallback constants used only when korail2 is missing so module
# import succeeds and ensure_runtime_dependencies() can surface
# the install message. Values mirror upstream korail2.TrainType.
KTX = "100"
KTX_SANCHEON = "100"
ITX_SAEMAEUL = "101"
SAEMAEUL = "101"
MUGUNGHWA = "102"
NURIRO = "102"
TONGGUEN = "103"
ITX_CHEONGCHUN = "104"
AIRPORT = "105"
ALL = "109"
class Korail:
def __init__(self, *args, **kwargs):
raise ModuleNotFoundError("korail2")
class _FallbackKorailModule:
EMAIL_REGEX = re.compile(r".+@.+")
PHONE_NUMBER_REGEX = re.compile(r"(\d{3})-(\d{3,4})-(\d{4})")
korail_mod = _FallbackKorailModule()
else:
_KORAIL_IMPORT_ERROR = None
try:
from korail2 import NCardPassenger
_NCARD_AVAILABLE = True
except ImportError:
_NCARD_AVAILABLE = False
class NCardPassenger(AdultPassenger):
def __init__(self, count=1, card_no='', card='', card_pw='', discount_type='153'):
AdultPassenger.__init__(self, count)
self.card_no = card_no
self.card = card
self.card_pw = card_pw
self.discount_type = discount_type
DEFAULT_USER_AGENT = "Dalvik/2.1.0 (Linux; U; Android 13; SM-S928N Build/UP1A.231005.007)"
DYNAPATH_PATHS = [
"/classes/com.korail.mobile.certification.TicketReservation",
"/classes/com.korail.mobile.nonMember.NonMemTicket",
"/classes/com.korail.mobile.research.TrainResearch",
"/classes/com.korail.mobile.research.ResidualSeatsResearch.do",
"/classes/com.korail.mobile.seatMovie.ScheduleView",
"/classes/com.korail.mobile.seatMovie.ScheduleViewSpecial",
"/classes/com.korail.mobile.trn.prcFare.do",
"/classes/com.korail.mobile.login.Login",
]
KORAIL_CARS_INFO = "https://smart.letskorail.com:443/classes/com.korail.mobile.research.TrainResearch"
KORAIL_CAR_DETAIL = "https://smart.letskorail.com:443/classes/com.korail.mobile.research.ResidualSeatsResearch.do"
RESERVE_OPTION_MAP = {
"general-first": ReserveOption.GENERAL_FIRST,
"general-only": ReserveOption.GENERAL_ONLY,
"special-first": ReserveOption.SPECIAL_FIRST,
"special-only": ReserveOption.SPECIAL_ONLY,
}
TRAIN_TYPE_MAP = {
"ktx": TrainType.KTX, # 100 — KTX/KTX-산천
"itx-saemaeul": TrainType.ITX_SAEMAEUL, # 101 — ITX-새마을
"mugunghwa": TrainType.MUGUNGHWA, # 102 — 무궁화호
"nuriro": TrainType.NURIRO, # 102 — 누리로
"tonggeun": TrainType.TONGGUEN, # 103 — 통근열차
"itx-cheongchun": TrainType.ITX_CHEONGCHUN, # 104 — ITX-청춘
"airport": TrainType.AIRPORT, # 105 — 공항직통
"all": TrainType.ALL, # 109 — 전체
}
TRAIN_ID_PREFIX = "ktx:v1:"
TRAIN_ID_INVALID_MESSAGE = "train_id is invalid; rerun search and copy a fresh train_id"
TRAIN_ID_STALE_MESSAGE = "train_id no longer matches any current search result; rerun search and choose a fresh train_id"
TRAIN_ID_FIELDS = (
"train_no",
"dep_date",
"dep_time",
"arr_date",
"arr_time",
"run_date",
"train_group",
"dep_code",
"arr_code",
)
PHONE_NUMBER_DIGITS_REGEX = re.compile(r"^01\d{8,9}$")
ROOM_CLASS_MAP = {
"general": "1",
"special": "2",
}
ROOM_CLASS_NAME = {
"1": "일반실",
"2": "특실",
}
SEAT_DIRECTION_NAME = {
"009": "순방향",
"010": "역방향",
}
SEAT_POSITION_NAME = {
"011": "1인",
"012": "창측",
"013": "내측",
}
SEAT_TYPE_NAME = {
"015": "일반석",
"018": "2층석",
"019": "유아동반석",
"021": "휠체어석",
"023": "4인동반석",
"027": "4인석",
"028": "전동휠체어석",
"032": "자전거",
"052": "대피도우미",
}
POWER_OUTLET_ROWS = {1, 3, 5, 7, 10, 12, 14, 15}
POWER_OUTLET_DIRECT_COLUMNS = {"A", "D"}
POWER_OUTLET_ADJACENT_COLUMNS = {"B", "C"}
def is_phone_login_id(korail_id: str) -> bool:
return bool(korail_mod.PHONE_NUMBER_REGEX.fullmatch(korail_id) or PHONE_NUMBER_DIGITS_REGEX.fullmatch(korail_id))
def ensure_runtime_dependencies() -> None:
missing: list[str] = []
if _KORAIL_IMPORT_ERROR is not None:
missing.append("korail2")
if _CRYPTO_IMPORT_ERROR is not None:
missing.append("pycryptodome")
if missing:
install_command = f"python3 -m pip install {' '.join(missing)}"
raise SystemExit(
"scripts/ktx_booking.py requires additional Python packages "
f"({', '.join(missing)}). Install them before running this helper: {install_command}"
)
class DynaPathMasterEngine:
APP_ID = "com.korail.talk"
AS_VALUE = "%5B38ff229cb34c7dda8e28220a2d750cce%5D"
DEVICE_MODEL = "SM-S928N"
OS_TYPE = "Android"
SDK_VERSION = "v1"
def __init__(self) -> None:
self.table = "3FE9jgRD4KdCyuawklqGJYmvfMn15P7US8XbxeLQtWT6OicBAopINs2Vh0HZrz"
self.i8 = 161
self.i9 = 30
self.i10 = 2
self.app_start_ts = str(int(time.time() * 1000))
def string2xa1s(self, data: str) -> list[int]:
result: list[int] = []
idx = 0
while idx < len(data):
codepoint = ord(data[idx])
idx += 1
if codepoint < 128:
result.append(codepoint)
elif codepoint < 2048:
result.append(128 | ((codepoint >> 7) & 15))
result.append(codepoint & 127)
elif codepoint >= 262144:
result.append(160)
result.append((codepoint >> 14) & 127)
result.append((codepoint >> 7) & 127)
result.append(codepoint & 127)
elif (63488 & codepoint) != 55296:
result.append(((codepoint >> 14) & 15) | 144)
result.append((codepoint >> 7) & 127)
result.append(codepoint & 127)
return result
def make_key(self, key: str) -> int:
total = 0
for char in key:
codepoint = ord(char)
bit = 32768
for _ in range(16):
if bit & codepoint:
break
bit >>= 1
total = (total * (bit << 1)) + codepoint
return total
def internal_char(self, base_table: str, remainder: int, current: str) -> str:
seen = 0
for char in base_table:
if char in current:
continue
if seen == remainder:
return char
seen += 1
return " "
def make_encode_table(self, number: int, encode_size: int, base_table: str) -> str:
chars = ""
temp = number
for index in range(encode_size):
divisor = encode_size - index
remainder = temp % divisor
chars += self.internal_char(base_table, remainder, chars)
temp //= divisor
return chars
def encode_normal_be(self, data: str, table: str) -> str:
values = self.string2xa1s(data)
output: list[str] = []
digits = [0] * (self.i10 + 1)
idx = 0
tail = len(values) % self.i10
body_size = len(values) - tail
while idx < body_size:
value = 0
for _ in range(self.i10):
value = (value * self.i8) + values[idx]
idx += 1
for digit_index in range(self.i10 + 1):
digits[digit_index] = value % self.i9
value //= self.i9
for digit_index in range(self.i10, -1, -1):
output.append(table[digits[digit_index]])
if tail > 0:
value = 0
for _ in range(tail):
value = (value * self.i8) + values[idx]
idx += 1
for digit_index in range(tail + 1):
digits[digit_index] = value % self.i9
value //= self.i9
while tail >= 0:
output.append(table[digits[tail]])
tail -= 1
return "".join(output)
def generate_token(self, device_id: str, timestamp_ms: int, nonce: str) -> str:
plaintext = (
f"ai={self.APP_ID}&di={device_id}&as={self.AS_VALUE}&su=false&dbg=false&emu=false&hk=false"
f"&it={self.app_start_ts}&ts={timestamp_ms}&rt=0&os=13&dm={self.DEVICE_MODEL}&st={self.OS_TYPE}&sv={self.SDK_VERSION}"
)
dyn_key = f"v1+{nonce}+{timestamp_ms}"
key_encoded = self.encode_normal_be(dyn_key, self.table)
table = self.make_encode_table(self.make_key(dyn_key), self.i9, self.table)
body_encoded = self.encode_normal_be(plaintext, table)
return f"bEeEP{self.table[len(key_encoded)]}{key_encoded}{body_encoded}"
class PatchedKorail(Korail):
_device = "AD"
_version = "250601002"
_sid_key = b"2485dd54d9deaa36"
_device_id = "558a4f02041657ea"
def __init__(self, korail_id: str, korail_pw: str, auto_login: bool = True, want_feedback: bool = False):
import requests
self._session = requests.session()
self._session.headers.update({"User-Agent": DEFAULT_USER_AGENT})
self._engine = DynaPathMasterEngine()
super().__init__(korail_id, korail_pw, auto_login=False, want_feedback=want_feedback)
self._session.headers.update({"User-Agent": DEFAULT_USER_AGENT})
if auto_login:
self.login(korail_id, korail_pw)
def _generate_sid(self, timestamp_ms: int) -> str:
ensure_runtime_dependencies()
plaintext = f"{self._device}{timestamp_ms}".encode("utf-8")
cipher = AES.new(self._sid_key, AES.MODE_CBC, iv=self._sid_key)
return base64.b64encode(cipher.encrypt(pad(plaintext, 16))).decode("utf-8") + "\n"
def _auth_headers_and_sid(self, url: str) -> tuple[dict[str, str], str | None]:
headers: dict[str, str] = {}
sid = None
if any(path in url for path in DYNAPATH_PATHS):
timestamp_ms = int(time.time() * 1000)
nonce = "".join(random.choices(string.ascii_uppercase + string.digits, k=4))
headers["x-dynapath-m-token"] = self._engine.generate_token(self._device_id, timestamp_ms, nonce)
sid = self._generate_sid(timestamp_ms)
return headers, sid
def login(self, korail_id: str | None = None, korail_pw: str | None = None) -> bool:
if korail_id is None:
korail_id = self.korail_id
else:
self.korail_id = korail_id
if korail_pw is None:
korail_pw = self.korail_pw
else:
self.korail_pw = korail_pw
if korail_mod.EMAIL_REGEX.match(korail_id):
input_flag = "5"
elif is_phone_login_id(korail_id):
input_flag = "4"
else:
input_flag = "2"
headers, sid = self._auth_headers_and_sid(korail_mod.KORAIL_LOGIN)
payload = {
"Device": self._device,
"Version": self._version,
"txtInputFlg": input_flag,
"txtMemberNo": korail_id,
"txtPwd": self._Korail__enc_password(korail_pw),
"idx": self._idx,
}
if sid:
payload["Sid"] = sid
response = self._session.post(korail_mod.KORAIL_LOGIN, data=payload, headers=headers)
data = json.loads(response.text)
if data["strResult"] == "SUCC" and data.get("strMbCrdNo") is not None:
self._key = data["Key"]
self.membership_number = data["strMbCrdNo"]
self.name = data["strCustNm"]
self.email = data["strEmailAdr"]
self.logined = True
return True
self.logined = False
return False
def search_train_details(
self,
dep: str,
arr: str,
date: str | None = None,
time_value: str | None = None,
train_type: str = TrainType.ALL,
passengers: list[Passenger] | None = None,
include_no_seats: bool = False,
include_waiting_list: bool = False,
):
kst_now = korail_mod.datetime.now(korail_mod.timezone.utc) + korail_mod.timedelta(hours=9)
if date is None:
date = kst_now.strftime("%Y%m%d")
if time_value is None:
time_value = kst_now.strftime("%H%M%S")
if passengers is None:
passengers = [AdultPassenger()]
passengers = Passenger.reduce(passengers)
adult_count = reduce(lambda total, passenger: total + passenger.count, [p for p in passengers if isinstance(p, AdultPassenger)], 0)
child_count = reduce(lambda total, passenger: total + passenger.count, [p for p in passengers if isinstance(p, ChildPassenger)], 0)
toddler_count = reduce(
lambda total, passenger: total + passenger.count,
[p for p in passengers if isinstance(p, ToddlerPassenger)],
0,
)
senior_count = reduce(lambda total, passenger: total + passenger.count, [p for p in passengers if isinstance(p, SeniorPassenger)], 0)
headers, sid = self._auth_headers_and_sid(korail_mod.KORAIL_SEARCH_SCHEDULE)
payload = {
"Device": self._device,
"radJobId": "1",
"selGoTrain": train_type,
"txtCardPsgCnt": "0",
"txtGdNo": "",
"txtGoAbrdDt": date,
"txtGoEnd": arr,
"txtGoHour": time_value,
"txtGoStart": dep,
"txtJobDv": "",
"txtMenuId": "11",
"txtPsgFlg_1": adult_count,
"txtPsgFlg_2": child_count,
"txtPsgFlg_8": toddler_count,
"txtPsgFlg_3": senior_count,
"txtPsgFlg_4": "0",
"txtPsgFlg_5": "0",
"txtSeatAttCd_2": "000",
"txtSeatAttCd_3": "000",
"txtSeatAttCd_4": "015",
"txtTrnGpCd": train_type,
"Version": self._version,
}
if sid:
payload["Sid"] = sid
response = self._session.post(korail_mod.KORAIL_SEARCH_SCHEDULE, params=payload, headers=headers)
data = json.loads(response.text)
if self._result_check(data):
train_infos = data["trn_infos"]["trn_info"]
if isinstance(train_infos, dict):
train_infos = [train_infos]
details = [(korail_mod.Train(info), info) for info in train_infos]
details = [(train, info) for train, info in details if train.dep_name == dep and train.arr_name == arr]
filters = [lambda train: train.has_seat()]
if include_no_seats:
filters.append(lambda train: not train.has_seat())
if include_waiting_list:
filters.append(lambda train: train.has_waiting_list())
details = [(train, info) for train, info in details if any(check(train) for check in filters)]
if not details:
raise NoResultsError()
return details
def search_train(
self,
dep: str,
arr: str,
date: str | None = None,
time_value: str | None = None,
train_type: str = TrainType.ALL,
passengers: list[Passenger] | None = None,
include_no_seats: bool = False,
include_waiting_list: bool = False,
):
return [
train
for train, _ in self.search_train_details(
dep,
arr,
date,
time_value,
train_type=train_type,
passengers=passengers,
include_no_seats=include_no_seats,
include_waiting_list=include_waiting_list,
)
]
def train_cars(self, raw_train: dict[str, object], passenger_count: int = 1, room_class: str = "1") -> list[dict[str, object]]:
payload = self._seat_lookup_payload(raw_train, passenger_count, room_class)
headers, sid = self._auth_headers_and_sid(KORAIL_CARS_INFO)
if sid:
payload["Sid"] = sid
response = self._session.post(KORAIL_CARS_INFO, data=payload, headers=headers)
data = json.loads(response.text)
if self._result_check(data):
cars = data.get("srcar_infos", {}).get("srcar_info", [])
if isinstance(cars, dict):
cars = [cars]
return cars
return []
def car_seats(
self,
raw_train: dict[str, object],
car_no: str,
passenger_count: int = 1,
room_class: str = "1",
) -> dict[str, object]:
payload = self._seat_lookup_payload(raw_train, passenger_count, room_class)
payload["txtSrcarNo"] = car_no
headers, sid = self._auth_headers_and_sid(KORAIL_CAR_DETAIL)
if sid:
payload["Sid"] = sid
response = self._session.post(KORAIL_CAR_DETAIL, data=payload, headers=headers)
data = json.loads(response.text)
if self._result_check(data):
return data
return {}
def _seat_lookup_payload(self, raw_train: dict[str, object], passenger_count: int, room_class: str) -> dict[str, object]:
return {
"Device": self._device,
"Version": self._version,
"Key": self._key,
"txtArvRsStnCd": raw_train.get("h_arv_rs_stn_cd", ""),
"txtArvStnRunOrdr": raw_train.get("h_arv_stn_run_ordr", ""),
"txtDptDt": raw_train.get("h_dpt_dt", ""),
"txtDptRsStnCd": raw_train.get("h_dpt_rs_stn_cd", ""),
"txtDptStnRunOrdr": raw_train.get("h_dpt_stn_run_ordr", ""),
"txtGdNo": "",
"txtMenuId": "11",
"txtPsrmClCd": room_class,
"txtRunDt": raw_train.get("h_run_dt", ""),
"txtSeatAttCd": "015",
"txtTotPsgCnt": str(passenger_count),
"txtTrnClsfCd": raw_train.get("h_trn_clsf_cd", ""),
"txtTrnGpCd": raw_train.get("h_trn_gp_cd", ""),
"txtTrnNo": raw_train.get("h_trn_no", ""),
}
def reserve(self, train, passengers=None, option=ReserveOption.GENERAL_FIRST, try_waiting=False):
reserving_seat = True
try:
if not train.has_seat():
raise SoldOutError()
if option == ReserveOption.GENERAL_ONLY:
if train.has_general_seat():
seat_type = "1"
else:
raise SoldOutError()
elif option == ReserveOption.SPECIAL_ONLY:
if train.has_special_seat():
seat_type = "2"
else:
raise SoldOutError()
elif option == ReserveOption.GENERAL_FIRST:
seat_type = "1" if train.has_general_seat() else "2"
elif option == ReserveOption.SPECIAL_FIRST:
seat_type = "2" if train.has_special_seat() else "1"
else:
raise ValueError(f"unsupported reserve option: {option}")
except SoldOutError:
if try_waiting and option != ReserveOption.SPECIAL_ONLY and train.has_general_waiting_list():
reserving_seat = False
seat_type = "1"
else:
raise
if passengers is None:
passengers = [AdultPassenger()]
passengers = Passenger.reduce(passengers)
passenger_count = reduce(lambda total, passenger: total + passenger.count, passengers, 0)
headers, sid = self._auth_headers_and_sid(korail_mod.KORAIL_TICKETRESERVATION)
payload = {
"Device": self._device,
"Version": self._version,
"Key": self._key,
"txtGdNo": "",
"txtJobId": "1101" if reserving_seat else "1102",
"txtTotPsgCnt": passenger_count,
"txtSeatAttCd1": "000",
"txtSeatAttCd2": "000",
"txtSeatAttCd3": "000",
"txtSeatAttCd4": "015",
"txtSeatAttCd5": "000",
"hidFreeFlg": "N",
"txtStndFlg": "N",
"txtMenuId": "11",
"txtSrcarCnt": "0",
"txtJrnyCnt": "1",
"txtJrnySqno1": "001",
"txtJrnyTpCd1": "11",
"txtDptDt1": train.dep_date,
"txtDptRsStnCd1": train.dep_code,
"txtDptTm1": train.dep_time,
"txtArvRsStnCd1": train.arr_code,
"txtTrnNo1": train.train_no,
"txtRunDt1": train.run_date,
"txtTrnClsfCd1": train.train_type,
"txtPsrmClCd1": seat_type,
"txtTrnGpCd1": train.train_group,
"txtChgFlg1": "",
"txtJrnySqno2": "",
"txtJrnyTpCd2": "",
"txtDptDt2": "",
"txtDptRsStnCd2": "",
"txtDptTm2": "",
"txtArvRsStnCd2": "",
"txtTrnNo2": "",
"txtRunDt2": "",
"txtTrnClsfCd2": "",
"txtPsrmClCd2": "",
"txtChgFlg2": "",
}
if sid:
payload["Sid"] = sid
for index, passenger in enumerate(passengers, start=1):
payload.update(passenger.get_dict(index))
response = self._session.get(korail_mod.KORAIL_TICKETRESERVATION, params=payload, headers=headers)
data = json.loads(response.text)
if self._result_check(data):
reservation_id = data["h_pnr_no"]
matches = [reservation for reservation in self.reservations() if reservation.rsv_id == reservation_id]
if len(matches) == 1:
return matches[0]
raise KorailError(f"reservation {reservation_id} was created but could not be reloaded")
def reservations(self):
payload = {"Device": self._device, "Version": self._version, "Key": self._key}
response = self._session.get(korail_mod.KORAIL_MYRESERVATIONLIST, params=payload)
data = json.loads(response.text)
try:
if self._result_check(data):
return [
korail_mod.Reservation(train_info)
for journey in data["jrny_infos"]["jrny_info"]
for train_info in journey["train_infos"]["train_info"]
]
except NoResultsError:
return []
return []
def cancel(self, reservation):
assert isinstance(reservation, korail_mod.Reservation)
payload = {
"Device": self._device,
"Version": self._version,
"Key": self._key,
"txtPnrNo": reservation.rsv_id,
"txtJrnySqno": reservation.journey_no,
"txtJrnyCnt": reservation.journey_cnt,
"hidRsvChgNo": reservation.rsv_chg_no,
}
response = self._session.get(korail_mod.KORAIL_CANCEL, params=payload)
data = json.loads(response.text)
if self._result_check(data):
return True
return False
def parse_passengers(args: argparse.Namespace) -> list[Passenger]:
passengers: list[Passenger] = []
if args.adults:
passengers.append(AdultPassenger(args.adults))
if args.children:
passengers.append(ChildPassenger(args.children))
if args.toddlers:
passengers.append(ToddlerPassenger(args.toddlers))
if args.seniors:
passengers.append(SeniorPassenger(args.seniors))
if not passengers:
passengers.append(AdultPassenger())
return passengers
def build_train_id_payload(train) -> dict[str, str]:
return {field: getattr(train, field) for field in TRAIN_ID_FIELDS}
def build_train_id(train) -> str:
payload = json.dumps(build_train_id_payload(train), ensure_ascii=False, separators=(",", ":")).encode("utf-8")
encoded = base64.urlsafe_b64encode(payload).decode("ascii").rstrip("=")
return f"{TRAIN_ID_PREFIX}{encoded}"
def parse_train_id(train_id: str) -> dict[str, str]:
if not train_id.startswith(TRAIN_ID_PREFIX):
raise SystemExit("train_id must start with ktx:v1:")
encoded = train_id.removeprefix(TRAIN_ID_PREFIX)
padded = encoded + ("=" * ((4 - len(encoded) % 4) % 4))
try:
payload = json.loads(base64.urlsafe_b64decode(padded.encode("ascii")).decode("utf-8"))
except (ValueError, json.JSONDecodeError, UnicodeDecodeError) as exc:
raise SystemExit(TRAIN_ID_INVALID_MESSAGE) from exc
if not isinstance(payload, dict):
raise SystemExit(TRAIN_ID_INVALID_MESSAGE)
invalid_fields = [field for field in TRAIN_ID_FIELDS if not isinstance(payload.get(field), str) or not payload[field]]
if invalid_fields:
raise SystemExit(TRAIN_ID_INVALID_MESSAGE)
return {field: payload[field] for field in TRAIN_ID_FIELDS}
def find_train_by_id(trains, train_id: str):
expected = parse_train_id(train_id)
for train in trains:
if build_train_id_payload(train) == expected:
return train
return None
def find_train_detail_by_id(details, train_id: str):
expected = parse_train_id(train_id)
for train, raw_train in details:
if build_train_id_payload(train) == expected:
return train, raw_train
return None
def normalize_train(train, index: int) -> dict[str, object]:
return {
"index": index,
"train_id": build_train_id(train),
"train_no": train.train_no,
"train_type": train.train_type_name,
"dep_name": train.dep_name,
"dep_date": train.dep_date,
"dep_time": train.dep_time,
"arr_name": train.arr_name,
"arr_date": train.arr_date,
"arr_time": train.arr_time,
"has_general_seat": train.has_general_seat(),
"has_special_seat": train.has_special_seat(),
"has_waiting_list": train.has_waiting_list(),
"description": str(train),
}
def parse_seat_label(seat_label: str) -> tuple[int | None, str]:
match = re.match(r"^(\d+)([A-Za-z])$", seat_label or "")
if not match:
return None, ""
return int(match.group(1)), match.group(2).upper()
def power_outlet_match(seat_label: str) -> str:
row, column = parse_seat_label(seat_label)
if row not in POWER_OUTLET_ROWS:
return "none"
if column in POWER_OUTLET_DIRECT_COLUMNS:
return "direct"
if column in POWER_OUTLET_ADJACENT_COLUMNS:
return "adjacent"
return "none"
def normalize_seat(raw_seat: dict[str, object]) -> dict[str, object]:
seat_label = str(raw_seat.get("h_con_seat_no", ""))
return {
"seat": seat_label,
"seat_no": str(raw_seat.get("h_seat_no", "")),
"available": raw_seat.get("h_sale_psb_flg") == "Y",
"direction": SEAT_DIRECTION_NAME.get(str(raw_seat.get("h_for_rev_dir_dv", "")), str(raw_seat.get("h_for_rev_dir_dv", ""))),
"position": SEAT_POSITION_NAME.get(str(raw_seat.get("h_sigl_win_in_dv", "")), str(raw_seat.get("h_sigl_win_in_dv", ""))),
"seat_type": SEAT_TYPE_NAME.get(str(raw_seat.get("h_dmd_seat_att", "")), str(raw_seat.get("h_dmd_seat_att", ""))),
"near_door": raw_seat.get("h_door_nbor_flg") == "Y",
"power_outlet": power_outlet_match(seat_label),
}
def validate_raw_seat(raw_seat: dict[str, object]) -> None:
required_fields = ("h_con_seat_no", "h_seat_no", "h_sale_psb_flg")
if any(raw_seat.get(field) in (None, "") for field in required_fields):
raise ValueError("seat row is missing required fields")
def parse_nonnegative_int_field(raw: object, field_name: str) -> int:
text = "" if raw is None else str(raw)
if not text.isdigit():
raise ValueError(f"{field_name} is not a non-negative integer")
return int(text)
def normalize_car(raw_car: object) -> dict[str, object]:
if not isinstance(raw_car, dict):
raise ValueError("car row is not an object")
return {
"car_no": parse_nonnegative_int_field(raw_car.get("h_srcar_no"), "h_srcar_no"),
"car_no_raw": str(raw_car.get("h_srcar_no", "")),
"room_class": ROOM_CLASS_NAME.get(str(raw_car.get("h_psrm_cl_cd", "")), str(raw_car.get("h_psrm_cl_nm", ""))),
"room_class_code": str(raw_car.get("h_psrm_cl_cd", "")),
"total_seats": parse_nonnegative_int_field(raw_car.get("h_seat_cnt"), "h_seat_cnt"),
"remaining_seats": parse_nonnegative_int_field(raw_car.get("h_rest_seat_cnt"), "h_rest_seat_cnt"),
}
def car_center_priority(car: dict[str, object], car_numbers: list[int]) -> tuple[float, int]:
car_no = int(car["car_no"])
if not car_numbers:
return (0.0, car_no)
center = (min(car_numbers) + max(car_numbers)) / 2
return (abs(car_no - center), car_no)
def sort_cars_for_booking(cars: list[dict[str, object]]) -> list[dict[str, object]]:
car_numbers = [int(car["car_no"]) for car in cars]
return sorted(cars, key=lambda car: car_center_priority(car, car_numbers))
def seat_preference_key(seat: dict[str, object]) -> tuple[int, int, int, str]:
power_rank = {"direct": 0, "adjacent": 1, "none": 2}.get(str(seat.get("power_outlet")), 2)
direction_rank = 0 if seat.get("direction") == "순방향" else 1
row, column = parse_seat_label(str(seat.get("seat", "")))
return (power_rank, direction_rank, row if row is not None else 999, column)
def sort_seats_for_booking(seats: list[dict[str, object]]) -> list[dict[str, object]]:
return sorted(seats, key=seat_preference_key)
def mask_identifier(value: object, visible: int = 4) -> str:
text = str(value or "")
if not text:
return ""
if len(text) <= visible:
return "*" * len(text)
return f"{'*' * (len(text) - visible)}{text[-visible:]}"
def normalize_reservation(reservation) -> dict[str, object]:
return {
"reservation_id": reservation.rsv_id,
"train_no": reservation.train_no,
"train_type": reservation.train_type_name,
"dep_name": reservation.dep_name,
"dep_date": reservation.dep_date,
"dep_time": reservation.dep_time,
"arr_name": reservation.arr_name,
"arr_date": reservation.arr_date,
"arr_time": reservation.arr_time,
"seat_count": reservation.seat_no_count,
"price": reservation.price,
"buy_limit_date": reservation.buy_limit_date,
"buy_limit_time": reservation.buy_limit_time,
"journey_no": reservation.journey_no,
"journey_cnt": reservation.journey_cnt,
"rsv_chg_no": reservation.rsv_chg_no,
"description": str(reservation),
}
def print_json(payload: dict[str, object]) -> None:
print(json.dumps(payload, ensure_ascii=False, indent=2))
def build_client() -> PatchedKorail:
ensure_runtime_dependencies()
korail_id = os.environ.get("KSKILL_KTX_ID")
korail_pw = os.environ.get("KSKILL_KTX_PASSWORD")
if not korail_id or not korail_pw:
raise SystemExit(
"이 작업에는 KSKILL_KTX_ID, KSKILL_KTX_PASSWORD 환경변수가 필요합니다. "
"환경변수가 설정되어 있지 않으면 ~/.config/k-skill/secrets.env 에 추가하거나 "
"에이전트의 secret vault에서 주입해 주세요."
)
client = PatchedKorail(korail_id, korail_pw)
if not client.logined:
raise NeedToLoginError()
return client
def command_search(args: argparse.Namespace) -> None:
client = build_client()
passengers = parse_passengers(args)
trains = client.search_train(
args.dep,
args.arr,
args.date,
args.time,
train_type=TRAIN_TYPE_MAP[args.train_type],
passengers=passengers,
include_no_seats=args.include_no_seats,
include_waiting_list=args.include_waiting_list,
)
visible_trains = trains[: args.limit]
print_json({
"count": len(visible_trains),
"trains": [normalize_train(train, index) for index, train in enumerate(visible_trains, start=1)],
})
def command_seats(args: argparse.Namespace) -> None:
client = build_client()
passengers = parse_passengers(args)
passenger_count = sum(passenger.count for passenger in Passenger.reduce(passengers))
details = client.search_train_details(
args.dep,
args.arr,
args.date,
args.time,
train_type=TRAIN_TYPE_MAP[args.train_type],
passengers=passengers,
include_no_seats=True,
include_waiting_list=True,
)
match = find_train_detail_by_id(details, args.train_id)
if match is None:
raise SystemExit(TRAIN_ID_STALE_MESSAGE)
train, raw_train = match
room_class = ROOM_CLASS_MAP[args.room]
seat_car_unavailable = f"seat car data is unavailable for {args.room}; retry search or choose another train"
try:
cars = [normalize_car(car) for car in client.train_cars(raw_train, passenger_count, room_class)]
except (TypeError, ValueError, AttributeError) as exc:
raise SystemExit(seat_car_unavailable) from exc
if not cars:
raise SystemExit(seat_car_unavailable)
if args.car_no is not None:
cars = [car for car in cars if car["car_no"] == args.car_no]
if not cars:
raise SystemExit(f"car_no {args.car_no} is not available for {args.room}")
else:
cars = sort_cars_for_booking(cars)
car_payloads: list[dict[str, object]] = []
for car in cars:
raw = client.car_seats(raw_train, str(car["car_no_raw"]), passenger_count, room_class)
seat_infos = raw.get("seat_infos") if isinstance(raw, dict) else None
seat_detail_unavailable = (
f"seat detail data is unavailable for car_no {car['car_no']}; "
"retry search or choose another train"
)
if not isinstance(seat_infos, dict):
raise SystemExit(seat_detail_unavailable)
if "seat_info" not in seat_infos:
raise SystemExit(seat_detail_unavailable)
raw_seats = seat_infos["seat_info"]
if isinstance(raw_seats, dict):
raw_seats = [raw_seats]
if not isinstance(raw_seats, list):
raise SystemExit(seat_detail_unavailable)
if any(not isinstance(seat, dict) for seat in raw_seats):
raise SystemExit(seat_detail_unavailable)
try:
for raw_seat in raw_seats:
validate_raw_seat(raw_seat)
except ValueError as exc:
raise SystemExit(seat_detail_unavailable) from exc
remaining_seats = car["remaining_seats"]
if not isinstance(remaining_seats, int):
raise SystemExit(seat_detail_unavailable)
if not raw_seats and remaining_seats > 0:
raise SystemExit(seat_detail_unavailable)
all_seats = [normalize_seat(seat) for seat in raw_seats if seat.get("h_con_seat_no") != "0A"]
if not all_seats and remaining_seats > 0:
raise SystemExit(seat_detail_unavailable)
seats = sort_seats_for_booking(all_seats)
if args.available_only:
seats = [seat for seat in seats if seat["available"]]
if args.power_only:
seats = [seat for seat in seats if seat["power_outlet"] != "none"]
available_seats = [seat for seat in seats if seat["available"]]
seats = seats[: args.limit]
car_payload = dict(car)
car_payload["available_seat_count"] = len(available_seats)
car_payload["available_seats"] = [seat["seat"] for seat in available_seats]
car_payload["shown_seat_count"] = len(seats)
car_payload["seats"] = seats
car_payloads.append(car_payload)
print_json({
"train": normalize_train(train, 1),
"room": args.room,
"passenger_count": passenger_count,
"available_only": args.available_only,
"power_only": args.power_only,
"cars": car_payloads,
})
def ensure_ncard_available() -> None:
if not _NCARD_AVAILABLE:
raise SystemExit(
"N카드 기능을 사용하려면 korail2-ncard 패키지가 필요합니다: "
"pip install korail2-ncard pycryptodome"
)
def resolve_ncard_no(client: PatchedKorail, ncard_index: int | None, ncard_no: str | None) -> str | None:
if ncard_index is None and not ncard_no:
return None
ensure_ncard_available()
if ncard_index is None:
return ncard_no
ncards = client.owned_ncards()
if not ncards:
raise SystemExit("보유한 N카드가 없습니다.")
if ncard_index < 1 or ncard_index > len(ncards):
raise SystemExit(f"ncard-index는 1~{len(ncards)} 사이여야 합니다.")
selected = ncards[ncard_index - 1]
selected_no = getattr(selected, "discount_card_no", None)
if not selected_no:
raise SystemExit("선택한 N카드에서 카드 번호를 확인할 수 없습니다.")
return selected_no
def command_reserve(args: argparse.Namespace) -> None:
client = build_client()
ncard_no = resolve_ncard_no(
client,
getattr(args, "ncard_index", None),
getattr(args, "ncard_no", None),
)
if ncard_no:
passengers = [NCardPassenger(card_no=ncard_no)]
else:
passengers = parse_passengers(args)
include_waiting_list = args.include_waiting_list or args.try_waiting
trains = client.search_train(
args.dep,
args.arr,
args.date,
args.time,
train_type=TRAIN_TYPE_MAP[args.train_type],
passengers=passengers,
include_no_seats=args.include_no_seats,
include_waiting_list=include_waiting_list,
)
selected_train = find_train_by_id(trains, args.train_id)
if selected_train is None:
raise SystemExit(TRAIN_ID_STALE_MESSAGE)
reservation = client.reserve(
selected_train,
passengers=passengers,
option=RESERVE_OPTION_MAP[args.seat_option],
try_waiting=args.try_waiting,
)
print_json({"reservation": normalize_reservation(reservation)})
def command_reservations(_: argparse.Namespace) -> None:
client = build_client()
reservations = client.reservations()
print_json({
"count": len(reservations),
"reservations": [normalize_reservation(reservation) for reservation in reservations],
})
def normalize_ncard(ncard, index: int) -> dict[str, object]:
return {
"index": index,
"card_no": mask_identifier(getattr(ncard, "discount_card_no", "")),
"card_no_masked": True,
"ticket_kind": ncard.ticket_kind_name or "",
"dep_name": ncard.dep_name or "",
"arr_name": ncard.arr_name or "",
"valid": ncard.valid or "",
"description": str(ncard),
}
def normalize_ncard_train(train, index: int) -> dict[str, object]:
base = normalize_train(train, index)
base["price"] = getattr(train, "price", None)
base["discount_name"] = getattr(train, "discount_name", None)
base["general_remaining_seats"] = getattr(train, "general_remaining_seats", None)
base["standing_remaining_seats"] = getattr(train, "standing_remaining_seats", None)
return base
def command_ncard_list(args: argparse.Namespace) -> None:
ensure_ncard_available()
client = build_client()
ncards = client.owned_ncards()
print_json({
"count": len(ncards),
"ncards": [normalize_ncard(ncard, index) for index, ncard in enumerate(ncards, start=1)],
})
def command_ncard_search(args: argparse.Namespace) -> None:
ensure_ncard_available()
client = build_client()
ncards = client.owned_ncards()
if not ncards:
raise SystemExit("보유한 N카드가 없습니다.")
if args.ncard_index < 1 or args.ncard_index > len(ncards):
raise SystemExit(f"ncard-index는 1~{len(ncards)} 사이여야 합니다.")
ncard = ncards[args.ncard_index - 1]
trains = client.search_owned_ncard_trains(
ncard,
dep=args.dep,
arr=args.arr,
date=args.date,
time=args.time,
train_type=TRAIN_TYPE_MAP[args.train_type],
)
visible_trains = trains[: args.limit]
print_json({
"count": len(visible_trains),
"ncard": normalize_ncard(ncard, args.ncard_index),
"trains": [normalize_ncard_train(train, index) for index, train in enumerate(visible_trains, start=1)],
})
def command_cancel(args: argparse.Namespace) -> None:
client = build_client()
reservations = client.reservations()
match = next((reservation for reservation in reservations if reservation.rsv_id == args.reservation_id), None)
if match is None:
raise SystemExit(f"reservation {args.reservation_id} not found")
client.cancel(match)
print_json({"cancelled": True, "reservation_id": args.reservation_id})
def add_common_trip_args(parser: argparse.ArgumentParser) -> None:
parser.add_argument("dep", help="출발역")
parser.add_argument("arr", help="도착역")
parser.add_argument("date", help="출발일 YYYYMMDD")
parser.add_argument("time", help="희망 시작 시각 HHMMSS")
parser.add_argument("--adults", type=int, default=1, help="성인 수")
parser.add_argument("--children", type=int, default=0, help="어린이 수")
parser.add_argument("--toddlers", type=int, default=0, help="유아 수")
parser.add_argument("--seniors", type=int, default=0, help="경로 수")
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Patched KTX/Korail booking helper for k-skill")
subparsers = parser.add_subparsers(dest="command", required=True)
search_parser = subparsers.add_parser("search", help="KTX/Korail 열차를 조회합니다")
add_common_trip_args(search_parser)
search_parser.add_argument("--limit", type=int, default=5, help="출력할 최대 열차 수")
search_parser.add_argument(
"--train-type",
choices=sorted(TRAIN_TYPE_MAP),
default="ktx",
help="조회할 열차 종류 (기본 ktx). ITX-청춘 노선은 itx-cheongchun, 무궁화는 mugunghwa, 전체는 all 사용",
)
search_parser.add_argument("--include-no-seats", action="store_true", help="매진 열차도 포함")
search_parser.add_argument("--include-waiting-list", action="store_true", help="예약 대기 가능 열차도 포함")
search_parser.set_defaults(func=command_search)
seats_parser = subparsers.add_parser("seats", help="조회 결과 중 하나의 호차별 좌석번호를 조회합니다")
add_common_trip_args(seats_parser)
seats_parser.add_argument("--train-id", required=True, help="search 결과에서 복사한 stable train_id")
seats_parser.add_argument(
"--room",
choices=sorted(ROOM_CLASS_MAP),
default="general",
help="좌석을 조회할 객실 등급 (기본 general)",
)
seats_parser.add_argument(
"--train-type",
choices=sorted(TRAIN_TYPE_MAP),
default="ktx",
help="재조회할 열차 종류 — search 단계에서 사용한 값과 동일하게 지정 (기본 ktx)",
)
seats_parser.add_argument("--car-no", type=int, default=None, help="특정 호차만 조회")
seats_parser.add_argument(
"--available-only",
"--remaining-only",
dest="available_only",
action="store_true",
help="예약 가능한/남은 좌석만 출력",
)
seats_parser.add_argument("--power-only", action="store_true", help="콘센트 꿀팁 좌석(direct/adjacent)만 출력")
seats_parser.add_argument("--limit", type=int, default=100, help="호차별 출력할 최대 좌석 수")
seats_parser.set_defaults(func=command_seats)
reserve_parser = subparsers.add_parser("reserve", help="조회 결과 중 하나를 예약합니다")
add_common_trip_args(reserve_parser)
reserve_parser.add_argument("--train-id", required=True, help="search 결과에서 복사한 stable train_id")
reserve_parser.add_argument("--seat-option", choices=sorted(RESERVE_OPTION_MAP), default="general-first")
reserve_parser.add_argument(
"--train-type",
choices=sorted(TRAIN_TYPE_MAP),
default="ktx",
help="재조회할 열차 종류 — search 단계에서 사용한 값과 동일하게 지정 (기본 ktx)",
)
reserve_parser.add_argument("--include-no-seats", action="store_true", help="검색 시 매진 열차도 포함")
reserve_parser.add_argument("--include-waiting-list", action="store_true", help="검색 시 예약대기 열차도 포함")
reserve_parser.add_argument(
"--try-waiting",
action="store_true",
help="좌석이 없으면 예약대기를 시도 (reserve 재조회 시 예약대기 열차 자동 포함)",
)
reserve_parser.add_argument(
"--ncard-index",
type=int,
metavar="N",
default=None,
help="ncard-list 결과의 N카드 순번 (권장). 지정하면 N카드 할인 승객으로 예약",
)
reserve_parser.add_argument(
"--ncard-no",
metavar="CARD_NO",
default=None,
help="N카드 번호 직접 입력 (비권장: 셸 히스토리에 남을 수 있음)",
)
reserve_parser.set_defaults(func=command_reserve)
ncard_list_parser = subparsers.add_parser("ncard-list", help="보유한 N카드 목록을 조회합니다")
ncard_list_parser.set_defaults(func=command_ncard_list)
ncard_search_parser = subparsers.add_parser("ncard-search", help="N카드 할인 열차를 조회합니다")
ncard_search_parser.add_argument("dep", help="출발역")
ncard_search_parser.add_argument("arr", help="도착역")
ncard_search_parser.add_argument("date", help="출발일 YYYYMMDD")
ncard_search_parser.add_argument("time", help="희망 시작 시각 HHMMSS")
ncard_search_parser.add_argument(
"--ncard-index", type=int, required=True, metavar="N",
help="ncard-list 결과의 N카드 순번 (1부터)",
)
ncard_search_parser.add_argument("--limit", type=int, default=5, help="출력할 최대 열차 수")
ncard_search_parser.add_argument(
"--train-type",
choices=sorted(TRAIN_TYPE_MAP),
default="ktx",
help="조회할 열차 종류 (기본 ktx)",
)
ncard_search_parser.set_defaults(func=command_ncard_search)
reservations_parser = subparsers.add_parser("reservations", help="현재 예약 목록을 조회합니다")
reservations_parser.set_defaults(func=command_reservations)
cancel_parser = subparsers.add_parser("cancel", help="예약번호로 예약을 취소합니다")
cancel_parser.add_argument("reservation_id", help="취소할 예약번호")
cancel_parser.set_defaults(func=command_cancel)
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
try:
args.func(args)
except (KorailError, NeedToLoginError, NoResultsError, SoldOutError) as exc:
print(str(exc), file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())