0.1.0 initial commit
This commit is contained in:
commit
5b75dcd028
4 changed files with 243 additions and 0 deletions
26
.gitignore
vendored
Normal file
26
.gitignore
vendored
Normal file
|
|
@ -0,0 +1,26 @@
|
|||
node_modules/
|
||||
__pycache__/
|
||||
**.pyc
|
||||
**.pyo
|
||||
**.egg-info
|
||||
**~
|
||||
.*.swp
|
||||
\#*\#
|
||||
.\#*
|
||||
alembic.ini
|
||||
.env
|
||||
.env.*
|
||||
.venv
|
||||
env
|
||||
venv
|
||||
venv-*/
|
||||
config/
|
||||
conf/
|
||||
config.json
|
||||
data/
|
||||
build/
|
||||
dist/
|
||||
/target
|
||||
.err
|
||||
.vscode
|
||||
/run.sh
|
||||
14
pyproject.toml
Normal file
14
pyproject.toml
Normal file
|
|
@ -0,0 +1,14 @@
|
|||
[project]
|
||||
name = "sakuragasaki46_micorail"
|
||||
authors = [ { name = "Sakuragasaki46" } ]
|
||||
dynamic = [ "version" ]
|
||||
requires-python = ">=3.10"
|
||||
classifiers = [
|
||||
"Private :: X"
|
||||
]
|
||||
dependencies = [
|
||||
]
|
||||
|
||||
[tool.setuptools.dynamic]
|
||||
version = { attr = "micorail.__version__" }
|
||||
|
||||
199
src/micorail/__init__.py
Normal file
199
src/micorail/__init__.py
Normal file
|
|
@ -0,0 +1,199 @@
|
|||
|
||||
from __future__ import annotations
|
||||
from dataclasses import dataclass
|
||||
from functools import lru_cache
|
||||
import json
|
||||
import argparse
|
||||
from math import ceil
|
||||
from heapq import heapify, heappush, heappop
|
||||
|
||||
__version__ = "0.1.0"
|
||||
|
||||
|
||||
ALL_DATA = json.load(open('data/network.json'))
|
||||
LEGACY_DATA = json.load(open('data/network.1.json'))
|
||||
INFINITY = 2147483648
|
||||
|
||||
@dataclass
|
||||
class Station:
|
||||
name: str
|
||||
code: str
|
||||
|
||||
@dataclass
|
||||
class Line:
|
||||
name: str
|
||||
code: str
|
||||
route: list[RouteStep]
|
||||
|
||||
def __contains__(self, st: Station | str):
|
||||
if isinstance(st, Station):
|
||||
st = st.code
|
||||
for rs in self.route:
|
||||
if rs.origin.code == st or rs.target.code == st:
|
||||
return True
|
||||
return False
|
||||
|
||||
@dataclass
|
||||
class RouteStep:
|
||||
origin: Station
|
||||
target: Station
|
||||
time: int
|
||||
line: str
|
||||
|
||||
def find_station(code: str) -> Station | None:
|
||||
st_name = ALL_DATA.get('stations', {}).get(code)
|
||||
if st_name:
|
||||
return Station(
|
||||
name = st_name,
|
||||
code = code
|
||||
)
|
||||
|
||||
def take_first(s):
|
||||
if isinstance(s, (str, bytes)):
|
||||
return s
|
||||
elif hasattr(s, '__iter__'):
|
||||
return list(s)[0]
|
||||
return s
|
||||
|
||||
def build_route_list(line_stops: list, line_code):
|
||||
route_list = []
|
||||
last_step = None
|
||||
for step_data in line_stops:
|
||||
cur_step = step_data['code']
|
||||
if last_step:
|
||||
try:
|
||||
line_time = ceil(step_data['time'])
|
||||
except Exception:
|
||||
try:
|
||||
line_time = ceil(step_data['dist'] / 64)
|
||||
except Exception:
|
||||
line_time = 100 # TODO better fallback
|
||||
route_list.append(RouteStep(
|
||||
origin = find_station(last_step) or Station(last_step, last_step),
|
||||
target = find_station(cur_step) or Station(last_step, last_step),
|
||||
time = line_time,
|
||||
line = line_code
|
||||
))
|
||||
last_step = cur_step
|
||||
return route_list
|
||||
|
||||
def build_all_lines():
|
||||
lines = {}
|
||||
for line_data in ALL_DATA['lines']['overworld']:
|
||||
|
||||
lines[line_data['code']] = Line(
|
||||
code = line_data['code'],
|
||||
name = line_data['name'],
|
||||
route = build_route_list(line_data['stops'], line_data['code'])
|
||||
)
|
||||
return lines
|
||||
|
||||
ALL_LINES = build_all_lines()
|
||||
|
||||
## TODO algorithms of research
|
||||
def find_route(start: str, stop: str):
|
||||
steps_i = []
|
||||
|
||||
dist, prev = dijkstra(start)
|
||||
|
||||
cur = stop
|
||||
steps_i.append((find_station(stop), dist[stop]))
|
||||
while (cur_prev := prev[cur]) != start:
|
||||
steps_i.insert(0, (find_station(cur_prev), dist[cur_prev]))
|
||||
cur = cur_prev
|
||||
|
||||
steps_i.insert(0, (find_station(start), 0))
|
||||
|
||||
return steps_i
|
||||
|
||||
@lru_cache()
|
||||
def find_neighbors(start):
|
||||
neighs = []
|
||||
|
||||
for line in ALL_LINES.values():
|
||||
line: Line
|
||||
if start in line:
|
||||
for rs in line.route:
|
||||
rs: RouteStep
|
||||
if rs.origin.code == start:
|
||||
neighs.append((rs.target.code, rs.time))
|
||||
if rs.target.code == start:
|
||||
neighs.append((rs.origin.code, rs.time))
|
||||
|
||||
return neighs
|
||||
|
||||
def dijkstra(start: str):
|
||||
dist = {node: INFINITY for node in ALL_DATA['stations']}
|
||||
dist[start] = 0
|
||||
prev = {node: None for node in ALL_DATA['stations']}
|
||||
|
||||
pq = [(0, start)]
|
||||
heapify(pq)
|
||||
|
||||
visited = set()
|
||||
|
||||
while pq:
|
||||
cur_dist, cur_node = heappop(pq)
|
||||
|
||||
if cur_node in visited:
|
||||
continue
|
||||
visited.add(cur_node)
|
||||
|
||||
for neigh, time in find_neighbors(cur_node):
|
||||
tentative_dist = cur_dist + time
|
||||
if tentative_dist < dist.setdefault(neigh, INFINITY):
|
||||
dist[neigh] = tentative_dist
|
||||
prev[neigh] = cur_node
|
||||
heappush(pq, (tentative_dist, neigh))
|
||||
|
||||
return dist, prev
|
||||
|
||||
|
||||
class HourMin(int):
|
||||
def __str__(self):
|
||||
h, m = divmod(self, 60)
|
||||
return f'{h:01}:{m:02}'
|
||||
|
||||
def __new__(cls, *args):
|
||||
if len(args) == 1 and isinstance(args[0], str) and ':' in args[0]:
|
||||
h, m = args[0].split(':')
|
||||
return super().__new__(cls, int(h) * 60 + int(m))
|
||||
return super().__new__(cls, *args)
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('--version', action='version', version=__version__)
|
||||
parser.add_argument('--time', help='time of start', type=HourMin, default=HourMin(0))
|
||||
parser.add_argument('--start', help='station of start')
|
||||
parser.add_argument('--end', help='station of end')
|
||||
parser.add_argument('--legacy', action='store_true', help="use legacy network")
|
||||
parser.add_argument('--search', help='search a station by its name')
|
||||
|
||||
return parser.parse_args()
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
|
||||
if args.legacy:
|
||||
global ALL_DATA, ALL_LINES
|
||||
ALL_DATA = LEGACY_DATA
|
||||
ALL_LINES = build_all_lines()
|
||||
|
||||
if args.search:
|
||||
query = args.search.lower()
|
||||
for st_code, st_name in ALL_DATA['stations'].items():
|
||||
if query in take_first(st_name).lower():
|
||||
print('*', st_code, st_name)
|
||||
return
|
||||
|
||||
st_start = find_station(args.start)
|
||||
st_end = find_station(args.end)
|
||||
st_time = args.time
|
||||
|
||||
if not st_start or not st_end:
|
||||
return
|
||||
|
||||
route = find_route(st_start.code, st_end.code)
|
||||
|
||||
for st_step, time in route:
|
||||
print(HourMin(st_time + time), st_step.code, st_step.name)
|
||||
4
src/micorail/__main__.py
Normal file
4
src/micorail/__main__.py
Normal file
|
|
@ -0,0 +1,4 @@
|
|||
|
||||
from . import main
|
||||
|
||||
main()
|
||||
Loading…
Add table
Add a link
Reference in a new issue