From 939c3e891919f66fa3a03eaaea3995afd4d99b50 Mon Sep 17 00:00:00 2001 From: Yusur Princeps Date: Sat, 4 Apr 2026 18:08:39 +0200 Subject: [PATCH] 0.2.0 add triwat post parsing --- CHANGELOG.md | 9 ++ src/micorail/__init__.py | 226 +++++++-------------------------------- src/micorail/rails.py | 173 ++++++++++++++++++++++++++++++ src/micorail/triwat.py | 117 ++++++++++++++++++++ src/micorail/utils.py | 11 ++ 5 files changed, 351 insertions(+), 185 deletions(-) create mode 100644 CHANGELOG.md create mode 100644 src/micorail/rails.py create mode 100644 src/micorail/triwat.py create mode 100644 src/micorail/utils.py diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..9425ec5 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,9 @@ +# 0.2.0 + ++ Added Tríwat post parsing utilities ++ Turned into a multi command + +# 0.1.0 + ++ Initial commit ++ Added overworld route calculation \ No newline at end of file diff --git a/src/micorail/__init__.py b/src/micorail/__init__.py index 938c300..b01fc9a 100644 --- a/src/micorail/__init__.py +++ b/src/micorail/__init__.py @@ -1,199 +1,55 @@ -from __future__ import annotations -from dataclasses import dataclass -from functools import lru_cache -import json import argparse -from math import ceil -from heapq import heapify, heappush, heappop +import sys -__version__ = "0.1.0" +from .utils import HourMin +from .rails import main_rails +from .triwat import main_triwat +__version__ = "0.2.0" -ALL_DATA = json.load(open('data/network.json')) -LEGACY_DATA = json.load(open('data/network.1.json')) -INFINITY = 2147483648 - -@dataclass -class Station: - name: str - code: str - -@dataclass -class Line: - name: str - code: str - route: list[RouteStep] - - def __contains__(self, st: Station | str): - if isinstance(st, Station): - st = st.code - for rs in self.route: - if rs.origin.code == st or rs.target.code == st: - return True - return False - -@dataclass -class RouteStep: - origin: Station - target: Station - time: int - line: str - -def find_station(code: str) -> Station | None: - st_name = ALL_DATA.get('stations', {}).get(code) - if st_name: - return Station( - name = st_name, - code = code - ) - -def take_first(s): - if isinstance(s, (str, bytes)): - return s - elif hasattr(s, '__iter__'): - return list(s)[0] - return s - -def build_route_list(line_stops: list, line_code): - route_list = [] - last_step = None - for step_data in line_stops: - cur_step = step_data['code'] - if last_step: - try: - line_time = ceil(step_data['time']) - except Exception: - try: - line_time = ceil(step_data['dist'] / 64) - except Exception: - line_time = 100 # TODO better fallback - route_list.append(RouteStep( - origin = find_station(last_step) or Station(last_step, last_step), - target = find_station(cur_step) or Station(last_step, last_step), - time = line_time, - line = line_code - )) - last_step = cur_step - return route_list - -def build_all_lines(): - lines = {} - for line_data in ALL_DATA['lines']['overworld']: - - lines[line_data['code']] = Line( - code = line_data['code'], - name = line_data['name'], - route = build_route_list(line_data['stops'], line_data['code']) - ) - return lines - -ALL_LINES = build_all_lines() - -## TODO algorithms of research -def find_route(start: str, stop: str): - steps_i = [] - - dist, prev = dijkstra(start) - - cur = stop - steps_i.append((find_station(stop), dist[stop])) - while (cur_prev := prev[cur]) != start: - steps_i.insert(0, (find_station(cur_prev), dist[cur_prev])) - cur = cur_prev - - steps_i.insert(0, (find_station(start), 0)) - - return steps_i - -@lru_cache() -def find_neighbors(start): - neighs = [] - - for line in ALL_LINES.values(): - line: Line - if start in line: - for rs in line.route: - rs: RouteStep - if rs.origin.code == start: - neighs.append((rs.target.code, rs.time)) - if rs.target.code == start: - neighs.append((rs.origin.code, rs.time)) - - return neighs - -def dijkstra(start: str): - dist = {node: INFINITY for node in ALL_DATA['stations']} - dist[start] = 0 - prev = {node: None for node in ALL_DATA['stations']} - - pq = [(0, start)] - heapify(pq) - - visited = set() - - while pq: - cur_dist, cur_node = heappop(pq) - - if cur_node in visited: - continue - visited.add(cur_node) - - for neigh, time in find_neighbors(cur_node): - tentative_dist = cur_dist + time - if tentative_dist < dist.setdefault(neigh, INFINITY): - dist[neigh] = tentative_dist - prev[neigh] = cur_node - heappush(pq, (tentative_dist, neigh)) - - return dist, prev - - -class HourMin(int): - def __str__(self): - h, m = divmod(self, 60) - return f'{h:01}:{m:02}' - - def __new__(cls, *args): - if len(args) == 1 and isinstance(args[0], str) and ':' in args[0]: - h, m = args[0].split(':') - return super().__new__(cls, int(h) * 60 + int(m)) - return super().__new__(cls, *args) - -def parse_args(): +def build_parser(): parser = argparse.ArgumentParser() parser.add_argument('--version', action='version', version=__version__) - parser.add_argument('--time', help='time of start', type=HourMin, default=HourMin(0)) - parser.add_argument('--start', help='station of start') - parser.add_argument('--end', help='station of end') - parser.add_argument('--legacy', action='store_true', help="use legacy network") - parser.add_argument('--search', help='search a station by its name') - return parser.parse_args() + parsers = parser.add_subparsers(dest='action') + parser_r = parsers.add_parser("rail", aliases=('rails','r')) + + parser_r.add_argument('start', help='station of start', default=None, nargs='?') + parser_r.add_argument('end', help='station of end', default=None, nargs='?') + parser_r.add_argument('-t', '--time', help='time of start', type=HourMin, default=HourMin(0)) + parser_r.add_argument('--legacy', action='store_true', help="use legacy network") + parser_r.add_argument('-q', '--search', help='search a station by its name') + + parser_t = parsers.add_parser('triwat', aliases=('t',)) + parser_t.add_argument('id', help="post ID (copy from discord)") + parser_t.add_argument('parent', help="parent ID (copy from discord)", nargs='?', default=None) + + return parser + +SHORTCUTS = { + 'R': 'rail', + 'T': 'triwat' +} def main(): - args = parse_args() - - if args.legacy: - global ALL_DATA, ALL_LINES - ALL_DATA = LEGACY_DATA - ALL_LINES = build_all_lines() - - if args.search: - query = args.search.lower() - for st_code, st_name in ALL_DATA['stations'].items(): - if query in take_first(st_name).lower(): - print('*', st_code, st_name) - return + raw_args = sys.argv[1:] - st_start = find_station(args.start) - st_end = find_station(args.end) - st_time = args.time + # Arch-like switch + if raw_args and raw_args[0][0] == '-' and raw_args[0][1] in SHORTCUTS: + raw_args.insert(0, SHORTCUTS[raw_args[0][1]]) + if len(raw_args[1]) > 2: + raw_args[1] = '-' + raw_args[1][2:] + else: + raw_args.pop(1) - if not st_start or not st_end: - return + parser = build_parser() + args = parser.parse_args(raw_args) - route = find_route(st_start.code, st_end.code) + if args.action == 'rail': + main_rails(args) + elif args.action == 'triwat': + main_triwat(args) + else: + parser.print_help() - for st_step, time in route: - print(HourMin(st_time + time), st_step.code, st_step.name) \ No newline at end of file diff --git a/src/micorail/rails.py b/src/micorail/rails.py new file mode 100644 index 0000000..b254928 --- /dev/null +++ b/src/micorail/rails.py @@ -0,0 +1,173 @@ + +from __future__ import annotations +from dataclasses import dataclass +from functools import lru_cache +import json +from math import ceil +from heapq import heapify, heappush, heappop + +ALL_DATA = json.load(open('data/network.json')) +LEGACY_DATA = json.load(open('data/network.1.json')) +INFINITY = 2147483648 + +@dataclass +class Station: + name: str + code: str + +@dataclass +class Line: + name: str + code: str + route: list[RouteStep] + + def __contains__(self, st: Station | str): + if isinstance(st, Station): + st = st.code + for rs in self.route: + if rs.origin.code == st or rs.target.code == st: + return True + return False + +@dataclass +class RouteStep: + origin: Station + target: Station + time: int + line: str + +def find_station(code: str) -> Station | None: + st_name = ALL_DATA.get('stations', {}).get(code) + if st_name: + return Station( + name = st_name, + code = code + ) + +def take_first(s): + if isinstance(s, (str, bytes)): + return s + elif hasattr(s, '__iter__'): + return list(s)[0] + return s + +def build_route_list(line_stops: list, line_code): + route_list = [] + last_step = None + for step_data in line_stops: + cur_step = step_data['code'] + if last_step: + try: + line_time = ceil(step_data['time']) + except Exception: + try: + line_time = ceil(step_data['dist'] / 64) + except Exception: + line_time = 100 # TODO better fallback + route_list.append(RouteStep( + origin = find_station(last_step) or Station(last_step, last_step), + target = find_station(cur_step) or Station(last_step, last_step), + time = line_time, + line = line_code + )) + last_step = cur_step + return route_list + +def build_all_lines(): + lines = {} + for line_data in ALL_DATA['lines']['overworld']: + + lines[line_data['code']] = Line( + code = line_data['code'], + name = line_data['name'], + route = build_route_list(line_data['stops'], line_data['code']) + ) + return lines + +ALL_LINES = build_all_lines() + +## TODO algorithms of research +def find_route(start: str, stop: str): + steps_i = [] + + dist, prev = dijkstra(start) + + cur = stop + steps_i.append((find_station(stop), dist[stop])) + while (cur_prev := prev[cur]) != start: + steps_i.insert(0, (find_station(cur_prev), dist[cur_prev])) + cur = cur_prev + + steps_i.insert(0, (find_station(start), 0)) + + return steps_i + +@lru_cache() +def find_neighbors(start): + neighs = [] + + for line in ALL_LINES.values(): + line: Line + if start in line: + for rs in line.route: + rs: RouteStep + if rs.origin.code == start: + neighs.append((rs.target.code, rs.time)) + if rs.target.code == start: + neighs.append((rs.origin.code, rs.time)) + + return neighs + +def dijkstra(start: str): + dist = {node: INFINITY for node in ALL_DATA['stations']} + dist[start] = 0 + prev = {node: None for node in ALL_DATA['stations']} + + pq = [(0, start)] + heapify(pq) + + visited = set() + + while pq: + cur_dist, cur_node = heappop(pq) + + if cur_node in visited: + continue + visited.add(cur_node) + + for neigh, time in find_neighbors(cur_node): + tentative_dist = cur_dist + time + if tentative_dist < dist.setdefault(neigh, INFINITY): + dist[neigh] = tentative_dist + prev[neigh] = cur_node + heappush(pq, (tentative_dist, neigh)) + + return dist, prev + + + +def main_rails(args): + if args.legacy: + global ALL_DATA, ALL_LINES + ALL_DATA = LEGACY_DATA + ALL_LINES = build_all_lines() + + if args.search: + query = args.search.lower() + for st_code, st_name in ALL_DATA['stations'].items(): + if query in take_first(st_name).lower(): + print('*', st_code, st_name) + return + + st_start = find_station(args.start) + st_end = find_station(args.end) + st_time = args.time + + if not st_start or not st_end: + print('error: missing stations') + return + + route = find_route(st_start.code, st_end.code) + + for st_step, time in route: + print(HourMin(st_time + time), st_step.code, st_step.name) \ No newline at end of file diff --git a/src/micorail/triwat.py b/src/micorail/triwat.py new file mode 100644 index 0000000..2348e50 --- /dev/null +++ b/src/micorail/triwat.py @@ -0,0 +1,117 @@ + + + + + +from dataclasses import dataclass +import datetime +import json +import sys +import re + +@dataclass +class TriwatPost: + id: str + date: datetime.datetime + title: str + author: str + content: str + reply_to: str | None + tags: list[str] | None + muted: bool = False + + def to_json(self): + return { + "id": self.id, + "date": self.date.isoformat(), + "title": self.title, + "author": self.author, + "content": self.content, + "reply_to": self.reply_to, + "tags": self.tags, + "muted": self.muted + } + +EPOCH = 1420066800 + +def id_to_date(id: str | int): + id = int(id) + timestamp_millis = id >> 22 + timestamp = timestamp_millis / 1000 + return datetime.datetime.fromtimestamp(EPOCH + timestamp) + +def store_triwat_post(p: TriwatPost) -> bool: + try: + with open(f"data/triwat.{int(p.id) >> 51}.json") as f: + data = json.load(f) + except FileNotFoundError: + data = {'posts': []} + + post_exists = [x for x in data['posts'] if x['id'] == p.id] + if post_exists: + return False + + data['posts'].append(p.to_json()) + + with open(f'data/triwat.{int(p.id) >> 51}.json', 'w') as fw: + json.dump(data, fw, indent=2) + return True + + +def main_triwat(args): + post_id = args.id + reply_id = args.parent or None + + whole_post = sys.stdin.read().strip() + + first_line, *rest = whole_post.splitlines() + + post_is_reply = False + + if first_line.startswith(('Reply', '*Reply*', '**Reply**')): + post_is_reply = True + + title_part, _, author_part = first_line.partition('(') + author_name, _, _ = author_part.partition(')') + mg = re.search(r'(\*?)(\\?\*){1,2}([^\s\*]+)\1', author_name) + muted = False + if mg: + author = mg.group(3) + stars = mg.group(2).replace('\\', '') + muted = len(stars) > 1 + + title = title_part.strip('* ') + + while rest and not rest[0].strip(): + rest.pop(0) + + tags = [] + if rest[0].startswith('&'): + tags_line = rest.pop(0) + tags = [x.strip() for x in tags_line.split('&') if x.strip()] + + while rest and not rest[0].strip(): + rest.pop(0) + + content = '\n'.join(rest) + + p = TriwatPost( + id = post_id, + date = id_to_date(post_id), + title = title, + author = author, + content = content, + reply_to = reply_id if post_is_reply else None, + tags = tags, + muted = muted + ) + + if store_triwat_post(p): + print(f'** Post {p.id} stored!') + + + + + + + diff --git a/src/micorail/utils.py b/src/micorail/utils.py new file mode 100644 index 0000000..54bef4c --- /dev/null +++ b/src/micorail/utils.py @@ -0,0 +1,11 @@ + +class HourMin(int): + def __str__(self): + h, m = divmod(self, 60) + return f'{h:01}:{m:02}' + + def __new__(cls, *args): + if len(args) == 1 and isinstance(args[0], str) and ':' in args[0]: + h, m = args[0].split(':') + return super().__new__(cls, int(h) * 60 + int(m)) + return super().__new__(cls, *args) \ No newline at end of file