#!/usr/bin/env python3 from __future__ import annotations import argparse import base64 import json import os import random import re import string import sys import time from functools import reduce try: from Crypto.Cipher import AES from Crypto.Util.Padding import pad except ModuleNotFoundError as exc: AES = None pad = None _CRYPTO_IMPORT_ERROR = exc else: _CRYPTO_IMPORT_ERROR = None try: from korail2 import ( AdultPassenger, ChildPassenger, Korail, KorailError, NeedToLoginError, NoResultsError, Passenger, ReserveOption, SeniorPassenger, SoldOutError, ToddlerPassenger, TrainType, ) import korail2.korail2 as korail_mod except ModuleNotFoundError as exc: _KORAIL_IMPORT_ERROR = exc class KorailError(Exception): pass class NeedToLoginError(KorailError): pass class NoResultsError(KorailError): pass class SoldOutError(KorailError): pass class Passenger: def __init__(self, count: int = 1): self.count = count @staticmethod def reduce(passengers): return passengers def get_dict(self, _: int) -> dict[str, str]: return {} class AdultPassenger(Passenger): pass class ChildPassenger(Passenger): pass class ToddlerPassenger(Passenger): pass class SeniorPassenger(Passenger): pass class ReserveOption: GENERAL_FIRST = "GENERAL_FIRST" GENERAL_ONLY = "GENERAL_ONLY" SPECIAL_FIRST = "SPECIAL_FIRST" SPECIAL_ONLY = "SPECIAL_ONLY" class TrainType: # Fallback constants used only when korail2 is missing so module # import succeeds and ensure_runtime_dependencies() can surface # the install message. Values mirror upstream korail2.TrainType. KTX = "100" KTX_SANCHEON = "100" ITX_SAEMAEUL = "101" SAEMAEUL = "101" MUGUNGHWA = "102" NURIRO = "102" TONGGUEN = "103" ITX_CHEONGCHUN = "104" AIRPORT = "105" ALL = "109" class Korail: def __init__(self, *args, **kwargs): raise ModuleNotFoundError("korail2") class _FallbackKorailModule: EMAIL_REGEX = re.compile(r".+@.+") PHONE_NUMBER_REGEX = re.compile(r"(\d{3})-(\d{3,4})-(\d{4})") korail_mod = _FallbackKorailModule() else: _KORAIL_IMPORT_ERROR = None try: from korail2 import NCardPassenger _NCARD_AVAILABLE = True except ImportError: _NCARD_AVAILABLE = False class NCardPassenger(AdultPassenger): def __init__(self, count=1, card_no='', card='', card_pw='', discount_type='153'): AdultPassenger.__init__(self, count) self.card_no = card_no self.card = card self.card_pw = card_pw self.discount_type = discount_type DEFAULT_USER_AGENT = "Dalvik/2.1.0 (Linux; U; Android 13; SM-S928N Build/UP1A.231005.007)" DYNAPATH_PATHS = [ "/classes/com.korail.mobile.certification.TicketReservation", "/classes/com.korail.mobile.nonMember.NonMemTicket", "/classes/com.korail.mobile.research.TrainResearch", "/classes/com.korail.mobile.research.ResidualSeatsResearch.do", "/classes/com.korail.mobile.seatMovie.ScheduleView", "/classes/com.korail.mobile.seatMovie.ScheduleViewSpecial", "/classes/com.korail.mobile.trn.prcFare.do", "/classes/com.korail.mobile.login.Login", ] KORAIL_CARS_INFO = "https://smart.letskorail.com:443/classes/com.korail.mobile.research.TrainResearch" KORAIL_CAR_DETAIL = "https://smart.letskorail.com:443/classes/com.korail.mobile.research.ResidualSeatsResearch.do" RESERVE_OPTION_MAP = { "general-first": ReserveOption.GENERAL_FIRST, "general-only": ReserveOption.GENERAL_ONLY, "special-first": ReserveOption.SPECIAL_FIRST, "special-only": ReserveOption.SPECIAL_ONLY, } TRAIN_TYPE_MAP = { "ktx": TrainType.KTX, # 100 — KTX/KTX-산천 "itx-saemaeul": TrainType.ITX_SAEMAEUL, # 101 — ITX-새마을 "mugunghwa": TrainType.MUGUNGHWA, # 102 — 무궁화호 "nuriro": TrainType.NURIRO, # 102 — 누리로 "tonggeun": TrainType.TONGGUEN, # 103 — 통근열차 "itx-cheongchun": TrainType.ITX_CHEONGCHUN, # 104 — ITX-청춘 "airport": TrainType.AIRPORT, # 105 — 공항직통 "all": TrainType.ALL, # 109 — 전체 } TRAIN_ID_PREFIX = "ktx:v1:" TRAIN_ID_INVALID_MESSAGE = "train_id is invalid; rerun search and copy a fresh train_id" TRAIN_ID_STALE_MESSAGE = "train_id no longer matches any current search result; rerun search and choose a fresh train_id" TRAIN_ID_FIELDS = ( "train_no", "dep_date", "dep_time", "arr_date", "arr_time", "run_date", "train_group", "dep_code", "arr_code", ) PHONE_NUMBER_DIGITS_REGEX = re.compile(r"^01\d{8,9}$") ROOM_CLASS_MAP = { "general": "1", "special": "2", } ROOM_CLASS_NAME = { "1": "일반실", "2": "특실", } SEAT_DIRECTION_NAME = { "009": "순방향", "010": "역방향", } SEAT_POSITION_NAME = { "011": "1인", "012": "창측", "013": "내측", } SEAT_TYPE_NAME = { "015": "일반석", "018": "2층석", "019": "유아동반석", "021": "휠체어석", "023": "4인동반석", "027": "4인석", "028": "전동휠체어석", "032": "자전거", "052": "대피도우미", } POWER_OUTLET_ROWS = {1, 3, 5, 7, 10, 12, 14, 15} POWER_OUTLET_DIRECT_COLUMNS = {"A", "D"} POWER_OUTLET_ADJACENT_COLUMNS = {"B", "C"} def is_phone_login_id(korail_id: str) -> bool: return bool(korail_mod.PHONE_NUMBER_REGEX.fullmatch(korail_id) or PHONE_NUMBER_DIGITS_REGEX.fullmatch(korail_id)) def ensure_runtime_dependencies() -> None: missing: list[str] = [] if _KORAIL_IMPORT_ERROR is not None: missing.append("korail2") if _CRYPTO_IMPORT_ERROR is not None: missing.append("pycryptodome") if missing: install_command = f"python3 -m pip install {' '.join(missing)}" raise SystemExit( "scripts/ktx_booking.py requires additional Python packages " f"({', '.join(missing)}). Install them before running this helper: {install_command}" ) class DynaPathMasterEngine: APP_ID = "com.korail.talk" AS_VALUE = "%5B38ff229cb34c7dda8e28220a2d750cce%5D" DEVICE_MODEL = "SM-S928N" OS_TYPE = "Android" SDK_VERSION = "v1" def __init__(self) -> None: self.table = "3FE9jgRD4KdCyuawklqGJYmvfMn15P7US8XbxeLQtWT6OicBAopINs2Vh0HZrz" self.i8 = 161 self.i9 = 30 self.i10 = 2 self.app_start_ts = str(int(time.time() * 1000)) def string2xa1s(self, data: str) -> list[int]: result: list[int] = [] idx = 0 while idx < len(data): codepoint = ord(data[idx]) idx += 1 if codepoint < 128: result.append(codepoint) elif codepoint < 2048: result.append(128 | ((codepoint >> 7) & 15)) result.append(codepoint & 127) elif codepoint >= 262144: result.append(160) result.append((codepoint >> 14) & 127) result.append((codepoint >> 7) & 127) result.append(codepoint & 127) elif (63488 & codepoint) != 55296: result.append(((codepoint >> 14) & 15) | 144) result.append((codepoint >> 7) & 127) result.append(codepoint & 127) return result def make_key(self, key: str) -> int: total = 0 for char in key: codepoint = ord(char) bit = 32768 for _ in range(16): if bit & codepoint: break bit >>= 1 total = (total * (bit << 1)) + codepoint return total def internal_char(self, base_table: str, remainder: int, current: str) -> str: seen = 0 for char in base_table: if char in current: continue if seen == remainder: return char seen += 1 return " " def make_encode_table(self, number: int, encode_size: int, base_table: str) -> str: chars = "" temp = number for index in range(encode_size): divisor = encode_size - index remainder = temp % divisor chars += self.internal_char(base_table, remainder, chars) temp //= divisor return chars def encode_normal_be(self, data: str, table: str) -> str: values = self.string2xa1s(data) output: list[str] = [] digits = [0] * (self.i10 + 1) idx = 0 tail = len(values) % self.i10 body_size = len(values) - tail while idx < body_size: value = 0 for _ in range(self.i10): value = (value * self.i8) + values[idx] idx += 1 for digit_index in range(self.i10 + 1): digits[digit_index] = value % self.i9 value //= self.i9 for digit_index in range(self.i10, -1, -1): output.append(table[digits[digit_index]]) if tail > 0: value = 0 for _ in range(tail): value = (value * self.i8) + values[idx] idx += 1 for digit_index in range(tail + 1): digits[digit_index] = value % self.i9 value //= self.i9 while tail >= 0: output.append(table[digits[tail]]) tail -= 1 return "".join(output) def generate_token(self, device_id: str, timestamp_ms: int, nonce: str) -> str: plaintext = ( f"ai={self.APP_ID}&di={device_id}&as={self.AS_VALUE}&su=false&dbg=false&emu=false&hk=false" f"&it={self.app_start_ts}&ts={timestamp_ms}&rt=0&os=13&dm={self.DEVICE_MODEL}&st={self.OS_TYPE}&sv={self.SDK_VERSION}" ) dyn_key = f"v1+{nonce}+{timestamp_ms}" key_encoded = self.encode_normal_be(dyn_key, self.table) table = self.make_encode_table(self.make_key(dyn_key), self.i9, self.table) body_encoded = self.encode_normal_be(plaintext, table) return f"bEeEP{self.table[len(key_encoded)]}{key_encoded}{body_encoded}" class PatchedKorail(Korail): _device = "AD" _version = "250601002" _sid_key = b"2485dd54d9deaa36" _device_id = "558a4f02041657ea" def __init__(self, korail_id: str, korail_pw: str, auto_login: bool = True, want_feedback: bool = False): import requests self._session = requests.session() self._session.headers.update({"User-Agent": DEFAULT_USER_AGENT}) self._engine = DynaPathMasterEngine() super().__init__(korail_id, korail_pw, auto_login=False, want_feedback=want_feedback) self._session.headers.update({"User-Agent": DEFAULT_USER_AGENT}) if auto_login: self.login(korail_id, korail_pw) def _generate_sid(self, timestamp_ms: int) -> str: ensure_runtime_dependencies() plaintext = f"{self._device}{timestamp_ms}".encode("utf-8") cipher = AES.new(self._sid_key, AES.MODE_CBC, iv=self._sid_key) return base64.b64encode(cipher.encrypt(pad(plaintext, 16))).decode("utf-8") + "\n" def _auth_headers_and_sid(self, url: str) -> tuple[dict[str, str], str | None]: headers: dict[str, str] = {} sid = None if any(path in url for path in DYNAPATH_PATHS): timestamp_ms = int(time.time() * 1000) nonce = "".join(random.choices(string.ascii_uppercase + string.digits, k=4)) headers["x-dynapath-m-token"] = self._engine.generate_token(self._device_id, timestamp_ms, nonce) sid = self._generate_sid(timestamp_ms) return headers, sid def login(self, korail_id: str | None = None, korail_pw: str | None = None) -> bool: if korail_id is None: korail_id = self.korail_id else: self.korail_id = korail_id if korail_pw is None: korail_pw = self.korail_pw else: self.korail_pw = korail_pw if korail_mod.EMAIL_REGEX.match(korail_id): input_flag = "5" elif is_phone_login_id(korail_id): input_flag = "4" else: input_flag = "2" headers, sid = self._auth_headers_and_sid(korail_mod.KORAIL_LOGIN) payload = { "Device": self._device, "Version": self._version, "txtInputFlg": input_flag, "txtMemberNo": korail_id, "txtPwd": self._Korail__enc_password(korail_pw), "idx": self._idx, } if sid: payload["Sid"] = sid response = self._session.post(korail_mod.KORAIL_LOGIN, data=payload, headers=headers) data = json.loads(response.text) if data["strResult"] == "SUCC" and data.get("strMbCrdNo") is not None: self._key = data["Key"] self.membership_number = data["strMbCrdNo"] self.name = data["strCustNm"] self.email = data["strEmailAdr"] self.logined = True return True self.logined = False return False def search_train_details( self, dep: str, arr: str, date: str | None = None, time_value: str | None = None, train_type: str = TrainType.ALL, passengers: list[Passenger] | None = None, include_no_seats: bool = False, include_waiting_list: bool = False, ): kst_now = korail_mod.datetime.now(korail_mod.timezone.utc) + korail_mod.timedelta(hours=9) if date is None: date = kst_now.strftime("%Y%m%d") if time_value is None: time_value = kst_now.strftime("%H%M%S") if passengers is None: passengers = [AdultPassenger()] passengers = Passenger.reduce(passengers) adult_count = reduce(lambda total, passenger: total + passenger.count, [p for p in passengers if isinstance(p, AdultPassenger)], 0) child_count = reduce(lambda total, passenger: total + passenger.count, [p for p in passengers if isinstance(p, ChildPassenger)], 0) toddler_count = reduce( lambda total, passenger: total + passenger.count, [p for p in passengers if isinstance(p, ToddlerPassenger)], 0, ) senior_count = reduce(lambda total, passenger: total + passenger.count, [p for p in passengers if isinstance(p, SeniorPassenger)], 0) headers, sid = self._auth_headers_and_sid(korail_mod.KORAIL_SEARCH_SCHEDULE) payload = { "Device": self._device, "radJobId": "1", "selGoTrain": train_type, "txtCardPsgCnt": "0", "txtGdNo": "", "txtGoAbrdDt": date, "txtGoEnd": arr, "txtGoHour": time_value, "txtGoStart": dep, "txtJobDv": "", "txtMenuId": "11", "txtPsgFlg_1": adult_count, "txtPsgFlg_2": child_count, "txtPsgFlg_8": toddler_count, "txtPsgFlg_3": senior_count, "txtPsgFlg_4": "0", "txtPsgFlg_5": "0", "txtSeatAttCd_2": "000", "txtSeatAttCd_3": "000", "txtSeatAttCd_4": "015", "txtTrnGpCd": train_type, "Version": self._version, } if sid: payload["Sid"] = sid response = self._session.post(korail_mod.KORAIL_SEARCH_SCHEDULE, params=payload, headers=headers) data = json.loads(response.text) if self._result_check(data): train_infos = data["trn_infos"]["trn_info"] if isinstance(train_infos, dict): train_infos = [train_infos] details = [(korail_mod.Train(info), info) for info in train_infos] details = [(train, info) for train, info in details if train.dep_name == dep and train.arr_name == arr] filters = [lambda train: train.has_seat()] if include_no_seats: filters.append(lambda train: not train.has_seat()) if include_waiting_list: filters.append(lambda train: train.has_waiting_list()) details = [(train, info) for train, info in details if any(check(train) for check in filters)] if not details: raise NoResultsError() return details def search_train( self, dep: str, arr: str, date: str | None = None, time_value: str | None = None, train_type: str = TrainType.ALL, passengers: list[Passenger] | None = None, include_no_seats: bool = False, include_waiting_list: bool = False, ): return [ train for train, _ in self.search_train_details( dep, arr, date, time_value, train_type=train_type, passengers=passengers, include_no_seats=include_no_seats, include_waiting_list=include_waiting_list, ) ] def train_cars(self, raw_train: dict[str, object], passenger_count: int = 1, room_class: str = "1") -> list[dict[str, object]]: payload = self._seat_lookup_payload(raw_train, passenger_count, room_class) headers, sid = self._auth_headers_and_sid(KORAIL_CARS_INFO) if sid: payload["Sid"] = sid response = self._session.post(KORAIL_CARS_INFO, data=payload, headers=headers) data = json.loads(response.text) if self._result_check(data): cars = data.get("srcar_infos", {}).get("srcar_info", []) if isinstance(cars, dict): cars = [cars] return cars return [] def car_seats( self, raw_train: dict[str, object], car_no: str, passenger_count: int = 1, room_class: str = "1", ) -> dict[str, object]: payload = self._seat_lookup_payload(raw_train, passenger_count, room_class) payload["txtSrcarNo"] = car_no headers, sid = self._auth_headers_and_sid(KORAIL_CAR_DETAIL) if sid: payload["Sid"] = sid response = self._session.post(KORAIL_CAR_DETAIL, data=payload, headers=headers) data = json.loads(response.text) if self._result_check(data): return data return {} def _seat_lookup_payload(self, raw_train: dict[str, object], passenger_count: int, room_class: str) -> dict[str, object]: return { "Device": self._device, "Version": self._version, "Key": self._key, "txtArvRsStnCd": raw_train.get("h_arv_rs_stn_cd", ""), "txtArvStnRunOrdr": raw_train.get("h_arv_stn_run_ordr", ""), "txtDptDt": raw_train.get("h_dpt_dt", ""), "txtDptRsStnCd": raw_train.get("h_dpt_rs_stn_cd", ""), "txtDptStnRunOrdr": raw_train.get("h_dpt_stn_run_ordr", ""), "txtGdNo": "", "txtMenuId": "11", "txtPsrmClCd": room_class, "txtRunDt": raw_train.get("h_run_dt", ""), "txtSeatAttCd": "015", "txtTotPsgCnt": str(passenger_count), "txtTrnClsfCd": raw_train.get("h_trn_clsf_cd", ""), "txtTrnGpCd": raw_train.get("h_trn_gp_cd", ""), "txtTrnNo": raw_train.get("h_trn_no", ""), } def reserve(self, train, passengers=None, option=ReserveOption.GENERAL_FIRST, try_waiting=False): reserving_seat = True try: if not train.has_seat(): raise SoldOutError() if option == ReserveOption.GENERAL_ONLY: if train.has_general_seat(): seat_type = "1" else: raise SoldOutError() elif option == ReserveOption.SPECIAL_ONLY: if train.has_special_seat(): seat_type = "2" else: raise SoldOutError() elif option == ReserveOption.GENERAL_FIRST: seat_type = "1" if train.has_general_seat() else "2" elif option == ReserveOption.SPECIAL_FIRST: seat_type = "2" if train.has_special_seat() else "1" else: raise ValueError(f"unsupported reserve option: {option}") except SoldOutError: if try_waiting and option != ReserveOption.SPECIAL_ONLY and train.has_general_waiting_list(): reserving_seat = False seat_type = "1" else: raise if passengers is None: passengers = [AdultPassenger()] passengers = Passenger.reduce(passengers) passenger_count = reduce(lambda total, passenger: total + passenger.count, passengers, 0) headers, sid = self._auth_headers_and_sid(korail_mod.KORAIL_TICKETRESERVATION) payload = { "Device": self._device, "Version": self._version, "Key": self._key, "txtGdNo": "", "txtJobId": "1101" if reserving_seat else "1102", "txtTotPsgCnt": passenger_count, "txtSeatAttCd1": "000", "txtSeatAttCd2": "000", "txtSeatAttCd3": "000", "txtSeatAttCd4": "015", "txtSeatAttCd5": "000", "hidFreeFlg": "N", "txtStndFlg": "N", "txtMenuId": "11", "txtSrcarCnt": "0", "txtJrnyCnt": "1", "txtJrnySqno1": "001", "txtJrnyTpCd1": "11", "txtDptDt1": train.dep_date, "txtDptRsStnCd1": train.dep_code, "txtDptTm1": train.dep_time, "txtArvRsStnCd1": train.arr_code, "txtTrnNo1": train.train_no, "txtRunDt1": train.run_date, "txtTrnClsfCd1": train.train_type, "txtPsrmClCd1": seat_type, "txtTrnGpCd1": train.train_group, "txtChgFlg1": "", "txtJrnySqno2": "", "txtJrnyTpCd2": "", "txtDptDt2": "", "txtDptRsStnCd2": "", "txtDptTm2": "", "txtArvRsStnCd2": "", "txtTrnNo2": "", "txtRunDt2": "", "txtTrnClsfCd2": "", "txtPsrmClCd2": "", "txtChgFlg2": "", } if sid: payload["Sid"] = sid for index, passenger in enumerate(passengers, start=1): payload.update(passenger.get_dict(index)) response = self._session.get(korail_mod.KORAIL_TICKETRESERVATION, params=payload, headers=headers) data = json.loads(response.text) if self._result_check(data): reservation_id = data["h_pnr_no"] matches = [reservation for reservation in self.reservations() if reservation.rsv_id == reservation_id] if len(matches) == 1: return matches[0] raise KorailError(f"reservation {reservation_id} was created but could not be reloaded") def reservations(self): payload = {"Device": self._device, "Version": self._version, "Key": self._key} response = self._session.get(korail_mod.KORAIL_MYRESERVATIONLIST, params=payload) data = json.loads(response.text) try: if self._result_check(data): return [ korail_mod.Reservation(train_info) for journey in data["jrny_infos"]["jrny_info"] for train_info in journey["train_infos"]["train_info"] ] except NoResultsError: return [] return [] def cancel(self, reservation): assert isinstance(reservation, korail_mod.Reservation) payload = { "Device": self._device, "Version": self._version, "Key": self._key, "txtPnrNo": reservation.rsv_id, "txtJrnySqno": reservation.journey_no, "txtJrnyCnt": reservation.journey_cnt, "hidRsvChgNo": reservation.rsv_chg_no, } response = self._session.get(korail_mod.KORAIL_CANCEL, params=payload) data = json.loads(response.text) if self._result_check(data): return True return False def parse_passengers(args: argparse.Namespace) -> list[Passenger]: passengers: list[Passenger] = [] if args.adults: passengers.append(AdultPassenger(args.adults)) if args.children: passengers.append(ChildPassenger(args.children)) if args.toddlers: passengers.append(ToddlerPassenger(args.toddlers)) if args.seniors: passengers.append(SeniorPassenger(args.seniors)) if not passengers: passengers.append(AdultPassenger()) return passengers def build_train_id_payload(train) -> dict[str, str]: return {field: getattr(train, field) for field in TRAIN_ID_FIELDS} def build_train_id(train) -> str: payload = json.dumps(build_train_id_payload(train), ensure_ascii=False, separators=(",", ":")).encode("utf-8") encoded = base64.urlsafe_b64encode(payload).decode("ascii").rstrip("=") return f"{TRAIN_ID_PREFIX}{encoded}" def parse_train_id(train_id: str) -> dict[str, str]: if not train_id.startswith(TRAIN_ID_PREFIX): raise SystemExit("train_id must start with ktx:v1:") encoded = train_id.removeprefix(TRAIN_ID_PREFIX) padded = encoded + ("=" * ((4 - len(encoded) % 4) % 4)) try: payload = json.loads(base64.urlsafe_b64decode(padded.encode("ascii")).decode("utf-8")) except (ValueError, json.JSONDecodeError, UnicodeDecodeError) as exc: raise SystemExit(TRAIN_ID_INVALID_MESSAGE) from exc if not isinstance(payload, dict): raise SystemExit(TRAIN_ID_INVALID_MESSAGE) invalid_fields = [field for field in TRAIN_ID_FIELDS if not isinstance(payload.get(field), str) or not payload[field]] if invalid_fields: raise SystemExit(TRAIN_ID_INVALID_MESSAGE) return {field: payload[field] for field in TRAIN_ID_FIELDS} def find_train_by_id(trains, train_id: str): expected = parse_train_id(train_id) for train in trains: if build_train_id_payload(train) == expected: return train return None def find_train_detail_by_id(details, train_id: str): expected = parse_train_id(train_id) for train, raw_train in details: if build_train_id_payload(train) == expected: return train, raw_train return None def normalize_train(train, index: int) -> dict[str, object]: return { "index": index, "train_id": build_train_id(train), "train_no": train.train_no, "train_type": train.train_type_name, "dep_name": train.dep_name, "dep_date": train.dep_date, "dep_time": train.dep_time, "arr_name": train.arr_name, "arr_date": train.arr_date, "arr_time": train.arr_time, "has_general_seat": train.has_general_seat(), "has_special_seat": train.has_special_seat(), "has_waiting_list": train.has_waiting_list(), "description": str(train), } def parse_seat_label(seat_label: str) -> tuple[int | None, str]: match = re.match(r"^(\d+)([A-Za-z])$", seat_label or "") if not match: return None, "" return int(match.group(1)), match.group(2).upper() def power_outlet_match(seat_label: str) -> str: row, column = parse_seat_label(seat_label) if row not in POWER_OUTLET_ROWS: return "none" if column in POWER_OUTLET_DIRECT_COLUMNS: return "direct" if column in POWER_OUTLET_ADJACENT_COLUMNS: return "adjacent" return "none" def normalize_seat(raw_seat: dict[str, object]) -> dict[str, object]: seat_label = str(raw_seat.get("h_con_seat_no", "")) return { "seat": seat_label, "seat_no": str(raw_seat.get("h_seat_no", "")), "available": raw_seat.get("h_sale_psb_flg") == "Y", "direction": SEAT_DIRECTION_NAME.get(str(raw_seat.get("h_for_rev_dir_dv", "")), str(raw_seat.get("h_for_rev_dir_dv", ""))), "position": SEAT_POSITION_NAME.get(str(raw_seat.get("h_sigl_win_in_dv", "")), str(raw_seat.get("h_sigl_win_in_dv", ""))), "seat_type": SEAT_TYPE_NAME.get(str(raw_seat.get("h_dmd_seat_att", "")), str(raw_seat.get("h_dmd_seat_att", ""))), "near_door": raw_seat.get("h_door_nbor_flg") == "Y", "power_outlet": power_outlet_match(seat_label), } def validate_raw_seat(raw_seat: dict[str, object]) -> None: required_fields = ("h_con_seat_no", "h_seat_no", "h_sale_psb_flg") if any(raw_seat.get(field) in (None, "") for field in required_fields): raise ValueError("seat row is missing required fields") def parse_nonnegative_int_field(raw: object, field_name: str) -> int: text = "" if raw is None else str(raw) if not text.isdigit(): raise ValueError(f"{field_name} is not a non-negative integer") return int(text) def normalize_car(raw_car: object) -> dict[str, object]: if not isinstance(raw_car, dict): raise ValueError("car row is not an object") return { "car_no": parse_nonnegative_int_field(raw_car.get("h_srcar_no"), "h_srcar_no"), "car_no_raw": str(raw_car.get("h_srcar_no", "")), "room_class": ROOM_CLASS_NAME.get(str(raw_car.get("h_psrm_cl_cd", "")), str(raw_car.get("h_psrm_cl_nm", ""))), "room_class_code": str(raw_car.get("h_psrm_cl_cd", "")), "total_seats": parse_nonnegative_int_field(raw_car.get("h_seat_cnt"), "h_seat_cnt"), "remaining_seats": parse_nonnegative_int_field(raw_car.get("h_rest_seat_cnt"), "h_rest_seat_cnt"), } def car_center_priority(car: dict[str, object], car_numbers: list[int]) -> tuple[float, int]: car_no = int(car["car_no"]) if not car_numbers: return (0.0, car_no) center = (min(car_numbers) + max(car_numbers)) / 2 return (abs(car_no - center), car_no) def sort_cars_for_booking(cars: list[dict[str, object]]) -> list[dict[str, object]]: car_numbers = [int(car["car_no"]) for car in cars] return sorted(cars, key=lambda car: car_center_priority(car, car_numbers)) def seat_preference_key(seat: dict[str, object]) -> tuple[int, int, int, str]: power_rank = {"direct": 0, "adjacent": 1, "none": 2}.get(str(seat.get("power_outlet")), 2) direction_rank = 0 if seat.get("direction") == "순방향" else 1 row, column = parse_seat_label(str(seat.get("seat", ""))) return (power_rank, direction_rank, row if row is not None else 999, column) def sort_seats_for_booking(seats: list[dict[str, object]]) -> list[dict[str, object]]: return sorted(seats, key=seat_preference_key) def mask_identifier(value: object, visible: int = 4) -> str: text = str(value or "") if not text: return "" if len(text) <= visible: return "*" * len(text) return f"{'*' * (len(text) - visible)}{text[-visible:]}" def normalize_reservation(reservation) -> dict[str, object]: return { "reservation_id": reservation.rsv_id, "train_no": reservation.train_no, "train_type": reservation.train_type_name, "dep_name": reservation.dep_name, "dep_date": reservation.dep_date, "dep_time": reservation.dep_time, "arr_name": reservation.arr_name, "arr_date": reservation.arr_date, "arr_time": reservation.arr_time, "seat_count": reservation.seat_no_count, "price": reservation.price, "buy_limit_date": reservation.buy_limit_date, "buy_limit_time": reservation.buy_limit_time, "journey_no": reservation.journey_no, "journey_cnt": reservation.journey_cnt, "rsv_chg_no": reservation.rsv_chg_no, "description": str(reservation), } def print_json(payload: dict[str, object]) -> None: print(json.dumps(payload, ensure_ascii=False, indent=2)) def build_client() -> PatchedKorail: ensure_runtime_dependencies() korail_id = os.environ.get("KSKILL_KTX_ID") korail_pw = os.environ.get("KSKILL_KTX_PASSWORD") if not korail_id or not korail_pw: raise SystemExit( "이 작업에는 KSKILL_KTX_ID, KSKILL_KTX_PASSWORD 환경변수가 필요합니다. " "환경변수가 설정되어 있지 않으면 ~/.config/k-skill/secrets.env 에 추가하거나 " "에이전트의 secret vault에서 주입해 주세요." ) client = PatchedKorail(korail_id, korail_pw) if not client.logined: raise NeedToLoginError() return client def command_search(args: argparse.Namespace) -> None: client = build_client() passengers = parse_passengers(args) trains = client.search_train( args.dep, args.arr, args.date, args.time, train_type=TRAIN_TYPE_MAP[args.train_type], passengers=passengers, include_no_seats=args.include_no_seats, include_waiting_list=args.include_waiting_list, ) visible_trains = trains[: args.limit] print_json({ "count": len(visible_trains), "trains": [normalize_train(train, index) for index, train in enumerate(visible_trains, start=1)], }) def command_seats(args: argparse.Namespace) -> None: client = build_client() passengers = parse_passengers(args) passenger_count = sum(passenger.count for passenger in Passenger.reduce(passengers)) details = client.search_train_details( args.dep, args.arr, args.date, args.time, train_type=TRAIN_TYPE_MAP[args.train_type], passengers=passengers, include_no_seats=True, include_waiting_list=True, ) match = find_train_detail_by_id(details, args.train_id) if match is None: raise SystemExit(TRAIN_ID_STALE_MESSAGE) train, raw_train = match room_class = ROOM_CLASS_MAP[args.room] seat_car_unavailable = f"seat car data is unavailable for {args.room}; retry search or choose another train" try: cars = [normalize_car(car) for car in client.train_cars(raw_train, passenger_count, room_class)] except (TypeError, ValueError, AttributeError) as exc: raise SystemExit(seat_car_unavailable) from exc if not cars: raise SystemExit(seat_car_unavailable) if args.car_no is not None: cars = [car for car in cars if car["car_no"] == args.car_no] if not cars: raise SystemExit(f"car_no {args.car_no} is not available for {args.room}") else: cars = sort_cars_for_booking(cars) car_payloads: list[dict[str, object]] = [] for car in cars: raw = client.car_seats(raw_train, str(car["car_no_raw"]), passenger_count, room_class) seat_infos = raw.get("seat_infos") if isinstance(raw, dict) else None seat_detail_unavailable = ( f"seat detail data is unavailable for car_no {car['car_no']}; " "retry search or choose another train" ) if not isinstance(seat_infos, dict): raise SystemExit(seat_detail_unavailable) if "seat_info" not in seat_infos: raise SystemExit(seat_detail_unavailable) raw_seats = seat_infos["seat_info"] if isinstance(raw_seats, dict): raw_seats = [raw_seats] if not isinstance(raw_seats, list): raise SystemExit(seat_detail_unavailable) if any(not isinstance(seat, dict) for seat in raw_seats): raise SystemExit(seat_detail_unavailable) try: for raw_seat in raw_seats: validate_raw_seat(raw_seat) except ValueError as exc: raise SystemExit(seat_detail_unavailable) from exc remaining_seats = car["remaining_seats"] if not isinstance(remaining_seats, int): raise SystemExit(seat_detail_unavailable) if not raw_seats and remaining_seats > 0: raise SystemExit(seat_detail_unavailable) all_seats = [normalize_seat(seat) for seat in raw_seats if seat.get("h_con_seat_no") != "0A"] if not all_seats and remaining_seats > 0: raise SystemExit(seat_detail_unavailable) seats = sort_seats_for_booking(all_seats) if args.available_only: seats = [seat for seat in seats if seat["available"]] if args.power_only: seats = [seat for seat in seats if seat["power_outlet"] != "none"] available_seats = [seat for seat in seats if seat["available"]] seats = seats[: args.limit] car_payload = dict(car) car_payload["available_seat_count"] = len(available_seats) car_payload["available_seats"] = [seat["seat"] for seat in available_seats] car_payload["shown_seat_count"] = len(seats) car_payload["seats"] = seats car_payloads.append(car_payload) print_json({ "train": normalize_train(train, 1), "room": args.room, "passenger_count": passenger_count, "available_only": args.available_only, "power_only": args.power_only, "cars": car_payloads, }) def ensure_ncard_available() -> None: if not _NCARD_AVAILABLE: raise SystemExit( "N카드 기능을 사용하려면 korail2-ncard 패키지가 필요합니다: " "pip install korail2-ncard pycryptodome" ) def resolve_ncard_no(client: PatchedKorail, ncard_index: int | None, ncard_no: str | None) -> str | None: if ncard_index is None and not ncard_no: return None ensure_ncard_available() if ncard_index is None: return ncard_no ncards = client.owned_ncards() if not ncards: raise SystemExit("보유한 N카드가 없습니다.") if ncard_index < 1 or ncard_index > len(ncards): raise SystemExit(f"ncard-index는 1~{len(ncards)} 사이여야 합니다.") selected = ncards[ncard_index - 1] selected_no = getattr(selected, "discount_card_no", None) if not selected_no: raise SystemExit("선택한 N카드에서 카드 번호를 확인할 수 없습니다.") return selected_no def command_reserve(args: argparse.Namespace) -> None: client = build_client() ncard_no = resolve_ncard_no( client, getattr(args, "ncard_index", None), getattr(args, "ncard_no", None), ) if ncard_no: passengers = [NCardPassenger(card_no=ncard_no)] else: passengers = parse_passengers(args) include_waiting_list = args.include_waiting_list or args.try_waiting trains = client.search_train( args.dep, args.arr, args.date, args.time, train_type=TRAIN_TYPE_MAP[args.train_type], passengers=passengers, include_no_seats=args.include_no_seats, include_waiting_list=include_waiting_list, ) selected_train = find_train_by_id(trains, args.train_id) if selected_train is None: raise SystemExit(TRAIN_ID_STALE_MESSAGE) reservation = client.reserve( selected_train, passengers=passengers, option=RESERVE_OPTION_MAP[args.seat_option], try_waiting=args.try_waiting, ) print_json({"reservation": normalize_reservation(reservation)}) def command_reservations(_: argparse.Namespace) -> None: client = build_client() reservations = client.reservations() print_json({ "count": len(reservations), "reservations": [normalize_reservation(reservation) for reservation in reservations], }) def normalize_ncard(ncard, index: int) -> dict[str, object]: return { "index": index, "card_no": mask_identifier(getattr(ncard, "discount_card_no", "")), "card_no_masked": True, "ticket_kind": ncard.ticket_kind_name or "", "dep_name": ncard.dep_name or "", "arr_name": ncard.arr_name or "", "valid": ncard.valid or "", "description": str(ncard), } def normalize_ncard_train(train, index: int) -> dict[str, object]: base = normalize_train(train, index) base["price"] = getattr(train, "price", None) base["discount_name"] = getattr(train, "discount_name", None) base["general_remaining_seats"] = getattr(train, "general_remaining_seats", None) base["standing_remaining_seats"] = getattr(train, "standing_remaining_seats", None) return base def command_ncard_list(args: argparse.Namespace) -> None: ensure_ncard_available() client = build_client() ncards = client.owned_ncards() print_json({ "count": len(ncards), "ncards": [normalize_ncard(ncard, index) for index, ncard in enumerate(ncards, start=1)], }) def command_ncard_search(args: argparse.Namespace) -> None: ensure_ncard_available() client = build_client() ncards = client.owned_ncards() if not ncards: raise SystemExit("보유한 N카드가 없습니다.") if args.ncard_index < 1 or args.ncard_index > len(ncards): raise SystemExit(f"ncard-index는 1~{len(ncards)} 사이여야 합니다.") ncard = ncards[args.ncard_index - 1] trains = client.search_owned_ncard_trains( ncard, dep=args.dep, arr=args.arr, date=args.date, time=args.time, train_type=TRAIN_TYPE_MAP[args.train_type], ) visible_trains = trains[: args.limit] print_json({ "count": len(visible_trains), "ncard": normalize_ncard(ncard, args.ncard_index), "trains": [normalize_ncard_train(train, index) for index, train in enumerate(visible_trains, start=1)], }) def command_cancel(args: argparse.Namespace) -> None: client = build_client() reservations = client.reservations() match = next((reservation for reservation in reservations if reservation.rsv_id == args.reservation_id), None) if match is None: raise SystemExit(f"reservation {args.reservation_id} not found") client.cancel(match) print_json({"cancelled": True, "reservation_id": args.reservation_id}) def add_common_trip_args(parser: argparse.ArgumentParser) -> None: parser.add_argument("dep", help="출발역") parser.add_argument("arr", help="도착역") parser.add_argument("date", help="출발일 YYYYMMDD") parser.add_argument("time", help="희망 시작 시각 HHMMSS") parser.add_argument("--adults", type=int, default=1, help="성인 수") parser.add_argument("--children", type=int, default=0, help="어린이 수") parser.add_argument("--toddlers", type=int, default=0, help="유아 수") parser.add_argument("--seniors", type=int, default=0, help="경로 수") def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Patched KTX/Korail booking helper for k-skill") subparsers = parser.add_subparsers(dest="command", required=True) search_parser = subparsers.add_parser("search", help="KTX/Korail 열차를 조회합니다") add_common_trip_args(search_parser) search_parser.add_argument("--limit", type=int, default=5, help="출력할 최대 열차 수") search_parser.add_argument( "--train-type", choices=sorted(TRAIN_TYPE_MAP), default="ktx", help="조회할 열차 종류 (기본 ktx). ITX-청춘 노선은 itx-cheongchun, 무궁화는 mugunghwa, 전체는 all 사용", ) search_parser.add_argument("--include-no-seats", action="store_true", help="매진 열차도 포함") search_parser.add_argument("--include-waiting-list", action="store_true", help="예약 대기 가능 열차도 포함") search_parser.set_defaults(func=command_search) seats_parser = subparsers.add_parser("seats", help="조회 결과 중 하나의 호차별 좌석번호를 조회합니다") add_common_trip_args(seats_parser) seats_parser.add_argument("--train-id", required=True, help="search 결과에서 복사한 stable train_id") seats_parser.add_argument( "--room", choices=sorted(ROOM_CLASS_MAP), default="general", help="좌석을 조회할 객실 등급 (기본 general)", ) seats_parser.add_argument( "--train-type", choices=sorted(TRAIN_TYPE_MAP), default="ktx", help="재조회할 열차 종류 — search 단계에서 사용한 값과 동일하게 지정 (기본 ktx)", ) seats_parser.add_argument("--car-no", type=int, default=None, help="특정 호차만 조회") seats_parser.add_argument( "--available-only", "--remaining-only", dest="available_only", action="store_true", help="예약 가능한/남은 좌석만 출력", ) seats_parser.add_argument("--power-only", action="store_true", help="콘센트 꿀팁 좌석(direct/adjacent)만 출력") seats_parser.add_argument("--limit", type=int, default=100, help="호차별 출력할 최대 좌석 수") seats_parser.set_defaults(func=command_seats) reserve_parser = subparsers.add_parser("reserve", help="조회 결과 중 하나를 예약합니다") add_common_trip_args(reserve_parser) reserve_parser.add_argument("--train-id", required=True, help="search 결과에서 복사한 stable train_id") reserve_parser.add_argument("--seat-option", choices=sorted(RESERVE_OPTION_MAP), default="general-first") reserve_parser.add_argument( "--train-type", choices=sorted(TRAIN_TYPE_MAP), default="ktx", help="재조회할 열차 종류 — search 단계에서 사용한 값과 동일하게 지정 (기본 ktx)", ) reserve_parser.add_argument("--include-no-seats", action="store_true", help="검색 시 매진 열차도 포함") reserve_parser.add_argument("--include-waiting-list", action="store_true", help="검색 시 예약대기 열차도 포함") reserve_parser.add_argument( "--try-waiting", action="store_true", help="좌석이 없으면 예약대기를 시도 (reserve 재조회 시 예약대기 열차 자동 포함)", ) reserve_parser.add_argument( "--ncard-index", type=int, metavar="N", default=None, help="ncard-list 결과의 N카드 순번 (권장). 지정하면 N카드 할인 승객으로 예약", ) reserve_parser.add_argument( "--ncard-no", metavar="CARD_NO", default=None, help="N카드 번호 직접 입력 (비권장: 셸 히스토리에 남을 수 있음)", ) reserve_parser.set_defaults(func=command_reserve) ncard_list_parser = subparsers.add_parser("ncard-list", help="보유한 N카드 목록을 조회합니다") ncard_list_parser.set_defaults(func=command_ncard_list) ncard_search_parser = subparsers.add_parser("ncard-search", help="N카드 할인 열차를 조회합니다") ncard_search_parser.add_argument("dep", help="출발역") ncard_search_parser.add_argument("arr", help="도착역") ncard_search_parser.add_argument("date", help="출발일 YYYYMMDD") ncard_search_parser.add_argument("time", help="희망 시작 시각 HHMMSS") ncard_search_parser.add_argument( "--ncard-index", type=int, required=True, metavar="N", help="ncard-list 결과의 N카드 순번 (1부터)", ) ncard_search_parser.add_argument("--limit", type=int, default=5, help="출력할 최대 열차 수") ncard_search_parser.add_argument( "--train-type", choices=sorted(TRAIN_TYPE_MAP), default="ktx", help="조회할 열차 종류 (기본 ktx)", ) ncard_search_parser.set_defaults(func=command_ncard_search) reservations_parser = subparsers.add_parser("reservations", help="현재 예약 목록을 조회합니다") reservations_parser.set_defaults(func=command_reservations) cancel_parser = subparsers.add_parser("cancel", help="예약번호로 예약을 취소합니다") cancel_parser.add_argument("reservation_id", help="취소할 예약번호") cancel_parser.set_defaults(func=command_cancel) return parser def main() -> int: parser = build_parser() args = parser.parse_args() try: args.func(args) except (KorailError, NeedToLoginError, NoResultsError, SoldOutError) as exc: print(str(exc), file=sys.stderr) return 1 return 0 if __name__ == "__main__": raise SystemExit(main())