from http import HTTPStatus from http.server import SimpleHTTPRequestHandler import json import os import socketserver from typing import Optional from urllib.parse import urlparse, parse_qs, unquote PORT = 8000 MAX_SIZE = 10e6 class MyHandler(SimpleHTTPRequestHandler): DATA_DIR = "metadata" CACHE = {} def __init__(self, *args, **kwargs): super().__init__( *args, directory="public", **kwargs ) self.query: dict = {} self.data: Optional[dict|list] = None def read_body_data(self): try: size: int = int(self.headers["Content-Length"]) if size > MAX_SIZE: self.send_error(HTTPStatus.CONTENT_TOO_LARGE) self.log_error(f"Payload is too big ({MAX_SIZE=}B)") return False raw_data = self.rfile.read(size) self.data = json.loads(raw_data) except: self.send_error(HTTPStatus.NOT_ACCEPTABLE, "Malformed JSON body") self.log_error("Malformed JSON body") return False return True def do_GET(self): self.path = unquote(self.path) self.query = parse_qs(urlparse(self.path).query) if self.path.startswith("/api/"): self.handle_api_get(self.path.removeprefix("/api/").removesuffix("/")) return super().do_GET() def do_POST(self): self.path = unquote(self.path) self.query = parse_qs(urlparse(self.path).query) if self.path.startswith("/api/"): self.handle_api_post(self.path.removeprefix("/api/").removesuffix("/")) return self.send_error(HTTPStatus.NOT_FOUND) def handle_api_get(self, path: str): self.log_message(f"API request at {path}") if path == "files": files: list[str] = self.get_files_meta() self.send_json(files) elif path.startswith("file"): filename: str = path.split("/", 1)[1] data = self.read_file(filename) if data is None: self.send_error(HTTPStatus.NOT_FOUND) else: self.send_json(data) else: self.send_response(HTTPStatus.NOT_FOUND, f"Unknown path {path}") self.end_headers() def handle_api_post(self, path: str): if path.startswith("file"): if self.read_body_data(): filename: str = path.split("/", 1)[1] if self.write_file(filename, self.data): self.send_response(HTTPStatus.OK) self.end_headers() else: self.send_response(HTTPStatus.NOT_FOUND, f"Unknown path {path}") self.end_headers() def send_json(self, data: dict|list): self.send_response(HTTPStatus.OK) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps(data).encode("utf-8")) def get_files(self): return os.listdir(self.DATA_DIR) def read_file(self, filename: str) -> Optional[dict|list]: if filename not in self.get_files(): return None with open(os.path.join(self.DATA_DIR, filename), "r") as f: data = json.load(f) return data def write_file(self, filename: str, data: dict|list) -> bool: if filename not in self.get_files(): self.send_error(HTTPStatus.NOT_FOUND) return False try: with open(os.path.join(self.DATA_DIR, filename), "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) except: self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR) return False return True def get_files_meta(self): files: list[str] = self.get_files() files_meta: list[dict] = [] deleted = set(self.CACHE.keys()) - set(files) for filename in deleted: del self.CACHE[deleted] for filename in files: path: str = os.path.join(self.DATA_DIR, filename) last_modified: float = os.path.getmtime(path) if filename not in self.CACHE or self.CACHE[filename]["ts"] < last_modified: self.update_file_meta(filename) files_meta.append(self.CACHE[filename]) return files_meta def update_file_meta(self, filename: str): path: str = os.path.join(self.DATA_DIR, filename) meta = { "filename": filename, "ts": os.path.getmtime(path) } with open(path, "r") as f: data = json.load(f) is_series = "filename" not in data meta["type"] = "series" if is_series else "film" if is_series: meta["episodes"] = len(data) meta["title"] = filename.split("_metadata")[0] else: meta["title"] = data["title"] self.CACHE[filename] = meta def main(): with socketserver.TCPServer(("", PORT), MyHandler) as httpd: print(f"Serving on port {PORT}") httpd.serve_forever() if __name__ == "__main__": main()