120 lines
3.9 KiB
Python
120 lines
3.9 KiB
Python
from http import HTTPStatus
|
|
from http.server import SimpleHTTPRequestHandler
|
|
import json
|
|
import os
|
|
import socketserver
|
|
from typing import Optional
|
|
from urllib.parse import urlparse, parse_qs, unquote
|
|
|
|
PORT = 8000
|
|
MAX_SIZE = 10e6
|
|
|
|
class MyHandler(SimpleHTTPRequestHandler):
|
|
DATA_DIR = "metadata"
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(
|
|
*args,
|
|
directory="public",
|
|
**kwargs
|
|
)
|
|
self.query: dict = {}
|
|
self.data: Optional[dict|list] = None
|
|
|
|
def read_body_data(self):
|
|
try:
|
|
size: int = int(self.headers["Content-Length"])
|
|
if size > MAX_SIZE:
|
|
self.send_error(HTTPStatus.CONTENT_TOO_LARGE)
|
|
self.log_error(f"Payload is too big ({MAX_SIZE=}B)")
|
|
return False
|
|
raw_data = self.rfile.read(size)
|
|
self.data = json.loads(raw_data)
|
|
except:
|
|
self.send_error(HTTPStatus.NOT_ACCEPTABLE, "Malformed JSON body")
|
|
self.log_error("Malformed JSON body")
|
|
return False
|
|
return True
|
|
|
|
def do_GET(self):
|
|
self.path = unquote(self.path)
|
|
self.query = parse_qs(urlparse(self.path).query)
|
|
if self.path.startswith("/api/"):
|
|
self.handle_api_get(self.path.removeprefix("/api/").removesuffix("/"))
|
|
return
|
|
super().do_GET()
|
|
|
|
def do_POST(self):
|
|
self.path = unquote(self.path)
|
|
self.query = parse_qs(urlparse(self.path).query)
|
|
if self.path.startswith("/api/"):
|
|
self.handle_api_post(self.path.removeprefix("/api/").removesuffix("/"))
|
|
return
|
|
self.send_error(HTTPStatus.NOT_FOUND)
|
|
|
|
def handle_api_get(self, path: str):
|
|
self.log_message(f"API request at {path}")
|
|
if path == "files":
|
|
files: list[str] = self.get_files()
|
|
self.send_json(files)
|
|
elif path.startswith("file"):
|
|
filename: str = path.split("/", 1)[1]
|
|
data = self.read_file(filename)
|
|
if data is None:
|
|
self.send_error(HTTPStatus.NOT_FOUND)
|
|
else:
|
|
self.send_json(data)
|
|
else:
|
|
self.send_response(HTTPStatus.NOT_FOUND, f"Unknown path {path}")
|
|
self.end_headers()
|
|
|
|
def handle_api_post(self, path: str):
|
|
if path.startswith("file"):
|
|
if self.read_body_data():
|
|
filename: str = path.split("/", 1)[1]
|
|
if self.write_file(filename, self.data):
|
|
self.send_response(HTTPStatus.OK)
|
|
self.end_headers()
|
|
else:
|
|
self.send_response(HTTPStatus.NOT_FOUND, f"Unknown path {path}")
|
|
self.end_headers()
|
|
|
|
def send_json(self, data: dict|list):
|
|
self.send_response(HTTPStatus.OK)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(data).encode("utf-8"))
|
|
|
|
def get_files(self):
|
|
return os.listdir(self.DATA_DIR)
|
|
|
|
def read_file(self, filename: str) -> Optional[dict|list]:
|
|
if filename not in self.get_files():
|
|
return None
|
|
with open(os.path.join(self.DATA_DIR, filename), "r") as f:
|
|
data = json.load(f)
|
|
return data
|
|
|
|
def write_file(self, filename: str, data: dict|list) -> bool:
|
|
if filename not in self.get_files():
|
|
self.send_error(HTTPStatus.NOT_FOUND)
|
|
return False
|
|
|
|
try:
|
|
with open(os.path.join(self.DATA_DIR, filename), "w", encoding="utf-8") as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
except:
|
|
self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR)
|
|
return False
|
|
return True
|
|
|
|
|
|
def main():
|
|
with socketserver.TCPServer(("", PORT), MyHandler) as httpd:
|
|
print(f"Serving on port {PORT}")
|
|
httpd.serve_forever()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|