Compare commits
7 Commits
460ae94925
...
feat/docke
Author | SHA1 | Date | |
---|---|---|---|
04dd2309c6
|
|||
958a55ff17
|
|||
7061f8d64c
|
|||
4b6576ec53
|
|||
7a49849ee9
|
|||
ffe847fb5e
|
|||
775d3da6ed
|
10
Dockerfile
10
Dockerfile
@ -1,4 +1,4 @@
|
|||||||
FROM debian:bullseye-slim AS builder
|
FROM debian:bookworm-slim AS builder
|
||||||
|
|
||||||
# Install ffmpeg and mkvtoolnix
|
# Install ffmpeg and mkvtoolnix
|
||||||
# but only keep the binaries and libs for ffprobe and mkvmerge
|
# but only keep the binaries and libs for ffprobe and mkvmerge
|
||||||
@ -7,11 +7,13 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
|
|||||||
&& mkdir -p /artifacts/bin /artifacts/lib \
|
&& mkdir -p /artifacts/bin /artifacts/lib \
|
||||||
&& cp $(which ffprobe) /artifacts/bin/ \
|
&& cp $(which ffprobe) /artifacts/bin/ \
|
||||||
&& cp $(which mkvmerge) /artifacts/bin/ \
|
&& cp $(which mkvmerge) /artifacts/bin/ \
|
||||||
|
&& cp $(which mkvpropedit) /artifacts/bin/ \
|
||||||
&& ldd $(which ffprobe) | awk '{print $3}' | xargs -I '{}' cp -v '{}' /artifacts/lib/ || true \
|
&& ldd $(which ffprobe) | awk '{print $3}' | xargs -I '{}' cp -v '{}' /artifacts/lib/ || true \
|
||||||
&& ldd $(which mkvmerge) | awk '{print $3}' | xargs -I '{}' cp -v '{}' /artifacts/lib/ || true
|
&& ldd $(which mkvmerge) | awk '{print $3}' | xargs -I '{}' cp -v '{}' /artifacts/lib/ || true \
|
||||||
|
&& ldd $(which mkvpropedit) | awk '{print $3}' | xargs -I '{}' cp -v '{}' /artifacts/lib/ || true
|
||||||
|
|
||||||
# Must be the same base as builder image for shared libraries compatibility
|
# Must be the same base as builder image for shared libraries compatibility
|
||||||
FROM python:3.13.3-slim-bullseye
|
FROM python:3.13.3-slim-bookworm
|
||||||
|
|
||||||
COPY --from=builder /artifacts/bin/* /usr/local/bin/
|
COPY --from=builder /artifacts/bin/* /usr/local/bin/
|
||||||
COPY --from=builder /artifacts/lib/* /usr/local/lib/
|
COPY --from=builder /artifacts/lib/* /usr/local/lib/
|
||||||
@ -26,4 +28,4 @@ COPY . .
|
|||||||
|
|
||||||
EXPOSE 8000
|
EXPOSE 8000
|
||||||
|
|
||||||
CMD ["python", "src/server.py"]
|
CMD ["python", "-m", "scripts.server"]
|
0
__init__.py
Normal file
0
__init__.py
Normal file
0
scripts/__init__.py
Normal file
0
scripts/__init__.py
Normal file
47
scripts/extract_metadata.py
Executable file
47
scripts/extract_metadata.py
Executable file
@ -0,0 +1,47 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from src.metadata_extractor import MetadataExtractor
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format="[%(levelname)s] %(message)s"
|
||||||
|
)
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description="Extract metadata from video files and save as JSON"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"input",
|
||||||
|
help="Path to input video file or directory"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"-o", "--output",
|
||||||
|
help="Directory path where the output JSON files will be saved"
|
||||||
|
)
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
input_path = args.input
|
||||||
|
output_dir = args.output
|
||||||
|
|
||||||
|
extractor: MetadataExtractor = MetadataExtractor()
|
||||||
|
|
||||||
|
success = False
|
||||||
|
if os.path.isfile(input_path):
|
||||||
|
success = extractor.process_file(input_path, output_dir)
|
||||||
|
elif os.path.isdir(input_path):
|
||||||
|
success = extractor.process_directory(input_path, output_dir)
|
||||||
|
else:
|
||||||
|
logging.error(f"Path not found: {input_path}")
|
||||||
|
|
||||||
|
if not success:
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
71
scripts/server.py
Executable file
71
scripts/server.py
Executable file
@ -0,0 +1,71 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from src.env_default import EnvDefault
|
||||||
|
from src.server import MeliesServer
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||||
|
datefmt=r"%Y-%m-%d %H:%M:%S"
|
||||||
|
)
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description="Starts the Melies server",
|
||||||
|
formatter_class=argparse.RawTextHelpFormatter
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"-p", "--port",
|
||||||
|
action=EnvDefault,
|
||||||
|
envvar="MELIES_PORT",
|
||||||
|
default=8000,
|
||||||
|
type=int,
|
||||||
|
help="Port on which the server listens"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--max-payload-size",
|
||||||
|
action=EnvDefault,
|
||||||
|
envvar="MELIES_MAX_PAYLOAD_SIZE",
|
||||||
|
default=1e6,
|
||||||
|
type=int,
|
||||||
|
help="Maximum POST payload size in bytes that the server accepts"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--to-convert-dir",
|
||||||
|
action=EnvDefault,
|
||||||
|
envvar="MELIES_TO_CONVERT_DIR",
|
||||||
|
default="to_convert",
|
||||||
|
help="Path to the directory containing medias to convert"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--converted-dir",
|
||||||
|
action=EnvDefault,
|
||||||
|
envvar="MELIES_CONVERTED_DIR",
|
||||||
|
default="converted",
|
||||||
|
help="Path to the directory containing converted medias"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--metadata-dir",
|
||||||
|
action=EnvDefault,
|
||||||
|
envvar="MELIES_METADATA_DIR",
|
||||||
|
default="metadata",
|
||||||
|
help="Path to the directory containing metadata files"
|
||||||
|
)
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
server = MeliesServer(
|
||||||
|
args.port,
|
||||||
|
args.to_convert_dir,
|
||||||
|
args.converted_dir,
|
||||||
|
args.metadata_dir,
|
||||||
|
args.max_payload_size
|
||||||
|
)
|
||||||
|
server.start()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
45
scripts/write_metadata.py
Normal file
45
scripts/write_metadata.py
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import logging
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from src.metadata_writer import MetadataWriter
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format="[%(levelname)s] %(message)s"
|
||||||
|
)
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description="Write metadata from JSON to video files"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"json_file",
|
||||||
|
help="Path to input JSON metadata file"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"-o", "--output",
|
||||||
|
help="Path of the output directory"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"-s", "--source",
|
||||||
|
help="Source directory (overrides automatic detection)"
|
||||||
|
)
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
json_file = args.json_file
|
||||||
|
output_dir = args.output
|
||||||
|
source_dir = args.source
|
||||||
|
|
||||||
|
writer: MetadataWriter = MetadataWriter()
|
||||||
|
|
||||||
|
success: bool = writer.process_metadata(json_file, source_dir, output_dir)
|
||||||
|
|
||||||
|
if not success:
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
0
src/__init__.py
Normal file
0
src/__init__.py
Normal file
25
src/env_default.py
Normal file
25
src/env_default.py
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
# https://stackoverflow.com/a/10551190/11109181
|
||||||
|
class EnvDefault(argparse.Action):
|
||||||
|
def __init__(self, envvar, required=True, default=None, help=None, **kwargs):
|
||||||
|
if envvar:
|
||||||
|
if envvar in os.environ:
|
||||||
|
default = os.environ[envvar]
|
||||||
|
if required and default is not None:
|
||||||
|
required = False
|
||||||
|
|
||||||
|
if default is not None and help is not None:
|
||||||
|
help += f" (default: {default})"
|
||||||
|
|
||||||
|
if envvar and help is not None:
|
||||||
|
help += f"\nCan also be specified through the {envvar} environment variable"
|
||||||
|
super(EnvDefault, self).__init__(default=default, required=required, help=help,
|
||||||
|
**kwargs)
|
||||||
|
|
||||||
|
def __call__(self, parser, namespace, values, option_string=None):
|
||||||
|
setattr(namespace, self.dest, values)
|
106
src/file_handlers.py
Normal file
106
src/file_handlers.py
Normal file
@ -0,0 +1,106 @@
|
|||||||
|
import json
|
||||||
|
import os
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
|
||||||
|
class FileHandler:
|
||||||
|
def __init__(self, directory: str):
|
||||||
|
self.cache: dict[str, dict] = {}
|
||||||
|
self.directory: str = directory
|
||||||
|
|
||||||
|
def get_files(self, base_path: str = ""):
|
||||||
|
root_path: str = os.path.abspath(self.directory)
|
||||||
|
full_path: str = os.path.join(root_path, base_path)
|
||||||
|
full_path = os.path.abspath(full_path)
|
||||||
|
common_prefix: str = os.path.commonprefix([full_path, root_path])
|
||||||
|
|
||||||
|
if common_prefix != root_path:
|
||||||
|
return []
|
||||||
|
|
||||||
|
return [
|
||||||
|
os.path.join(base_path, f)
|
||||||
|
for f in os.listdir(full_path)
|
||||||
|
]
|
||||||
|
|
||||||
|
def get_files_meta(self, base_path: str = ""):
|
||||||
|
files: list[str] = self.get_files(base_path)
|
||||||
|
files = [
|
||||||
|
os.path.join(self.directory, f)
|
||||||
|
for f in files
|
||||||
|
]
|
||||||
|
files_meta: list[dict] = []
|
||||||
|
|
||||||
|
deleted = set(self.cache.keys()) - set(files)
|
||||||
|
for path in deleted:
|
||||||
|
del self.cache[path]
|
||||||
|
|
||||||
|
for path in files:
|
||||||
|
last_modified: float = os.path.getmtime(path)
|
||||||
|
if path not in self.cache or self.cache[path]["ts"] < last_modified:
|
||||||
|
self.update_meta(path)
|
||||||
|
|
||||||
|
files_meta.append(self.cache[path])
|
||||||
|
|
||||||
|
return files_meta
|
||||||
|
|
||||||
|
def update_meta(self, path: str) -> None:
|
||||||
|
self.cache[path] = self.get_meta(path)
|
||||||
|
|
||||||
|
def get_meta(self, path: str) -> dict:
|
||||||
|
return {
|
||||||
|
"path": os.path.relpath(path, self.directory),
|
||||||
|
"filename": os.path.basename(path),
|
||||||
|
"ts": os.path.getmtime(path)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class JsonFileHandler(FileHandler):
|
||||||
|
def read(self, path: str) -> Optional[dict|list]:
|
||||||
|
if path not in self.get_files():
|
||||||
|
return None
|
||||||
|
with open(os.path.join(self.directory, path), "r") as f:
|
||||||
|
data = json.load(f)
|
||||||
|
return data
|
||||||
|
|
||||||
|
def write(self, path: str, data: dict|list) -> bool:
|
||||||
|
if path not in self.get_files():
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(os.path.join(self.directory, path), "w", encoding="utf-8") as f:
|
||||||
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
||||||
|
except:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
class MetadataFileHandler(JsonFileHandler):
|
||||||
|
def get_meta(self, path: str) -> dict:
|
||||||
|
meta: dict = super().get_meta(path)
|
||||||
|
|
||||||
|
with open(path, "r") as f:
|
||||||
|
data = json.load(f)
|
||||||
|
is_series = "filename" not in data
|
||||||
|
meta["type"] = "series" if is_series else "film"
|
||||||
|
if is_series:
|
||||||
|
meta["episodes"] = len(data)
|
||||||
|
meta["title"] = meta["filename"].split("_metadata")[0]
|
||||||
|
else:
|
||||||
|
meta["title"] = data["title"]
|
||||||
|
|
||||||
|
return meta
|
||||||
|
|
||||||
|
|
||||||
|
class ToConvertFileHandler(FileHandler):
|
||||||
|
def get_meta(self, path: str) -> dict:
|
||||||
|
meta: dict = super().get_meta(path)
|
||||||
|
is_dir: bool = os.path.isdir(path)
|
||||||
|
|
||||||
|
meta["size"] = os.path.getsize(path)
|
||||||
|
meta["type"] = "folder" if is_dir else "media"
|
||||||
|
if is_dir:
|
||||||
|
meta["elements"] = len(os.listdir(path))
|
||||||
|
if not meta["path"].endswith("/"):
|
||||||
|
meta["path"] += "/"
|
||||||
|
|
||||||
|
return meta
|
@ -1,196 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
import argparse
|
|
||||||
import os
|
|
||||||
import subprocess
|
|
||||||
import json
|
|
||||||
import sys
|
|
||||||
|
|
||||||
SUPPORTED_EXTENSIONS = (".mp4", ".mkv", ".mov", ".avi")
|
|
||||||
|
|
||||||
def get_video_metadata(file_path):
|
|
||||||
"""
|
|
||||||
Extract metadata from a video file using ffprobe.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
file_path (str): Path to the video file
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
dict: Metadata information
|
|
||||||
"""
|
|
||||||
# Get general file info
|
|
||||||
cmd = [
|
|
||||||
"ffprobe", "-v", "quiet", "-print_format", "json",
|
|
||||||
"-show_format", "-show_streams", file_path
|
|
||||||
]
|
|
||||||
|
|
||||||
try:
|
|
||||||
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
||||||
if result.returncode != 0:
|
|
||||||
print(f"❌ Error processing {file_path}: {result.stderr}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
data = json.loads(result.stdout)
|
|
||||||
|
|
||||||
# Extract filename and title
|
|
||||||
filename = os.path.basename(file_path)
|
|
||||||
title = data.get("format", {}).get("tags", {}).get("title", filename)
|
|
||||||
|
|
||||||
# Initialize metadata structure
|
|
||||||
metadata = {
|
|
||||||
"filename": filename,
|
|
||||||
"title": title,
|
|
||||||
"audio_tracks": [],
|
|
||||||
"subtitle_tracks": []
|
|
||||||
}
|
|
||||||
|
|
||||||
# Process streams
|
|
||||||
for stream in data.get("streams", []):
|
|
||||||
codec_type = stream.get("codec_type")
|
|
||||||
|
|
||||||
if codec_type == "audio":
|
|
||||||
track = {
|
|
||||||
"index": stream.get("index"),
|
|
||||||
"language": stream.get("tags", {}).get("language", "und"),
|
|
||||||
"name": stream.get("tags", {}).get("title", ""),
|
|
||||||
"channels": stream.get("channels", 0),
|
|
||||||
"flags": {
|
|
||||||
"default": stream.get("disposition", {}).get("default", 0) == 1,
|
|
||||||
"visual_impaired": stream.get("disposition", {}).get("visual_impaired", 0) == 1,
|
|
||||||
"original": stream.get("disposition", {}).get("original", 0) == 1,
|
|
||||||
"commentary": stream.get("disposition", {}).get("comment", 0) == 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
metadata["audio_tracks"].append(track)
|
|
||||||
|
|
||||||
elif codec_type == "subtitle":
|
|
||||||
track = {
|
|
||||||
"index": stream.get("index"),
|
|
||||||
"language": stream.get("tags", {}).get("language", "und"),
|
|
||||||
"name": stream.get("tags", {}).get("title", ""),
|
|
||||||
"flags": {
|
|
||||||
"default": stream.get("disposition", {}).get("default", 0) == 1,
|
|
||||||
"forced": stream.get("disposition", {}).get("forced", 0) == 1,
|
|
||||||
"hearing_impaired": stream.get("disposition", {}).get("hearing_impaired", 0) == 1,
|
|
||||||
"original": stream.get("disposition", {}).get("original", 0) == 1,
|
|
||||||
"commentary": stream.get("disposition", {}).get("comment", 0) == 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
metadata["subtitle_tracks"].append(track)
|
|
||||||
|
|
||||||
return metadata
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f"❌ Error processing {file_path}: {str(e)}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
def process_file(file_path, output_dir=None):
|
|
||||||
"""
|
|
||||||
Process a single video file and write metadata to JSON.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
file_path (str): Path to the video file
|
|
||||||
output_dir (str, optional): Directory where the output JSON file will be saved
|
|
||||||
"""
|
|
||||||
if not os.path.isfile(file_path):
|
|
||||||
print(f"❌ File not found: {file_path}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
if not file_path.lower().endswith(SUPPORTED_EXTENSIONS):
|
|
||||||
print(f"❌ Unsupported file format: {file_path}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
print(f"📊 Extracting metadata from {os.path.basename(file_path)}")
|
|
||||||
metadata = get_video_metadata(file_path)
|
|
||||||
|
|
||||||
if metadata:
|
|
||||||
# Generate output filename based on input file
|
|
||||||
filename = os.path.basename(os.path.splitext(file_path)[0]) + "_metadata.json"
|
|
||||||
|
|
||||||
if output_dir:
|
|
||||||
# Ensure output directory exists
|
|
||||||
os.makedirs(output_dir, exist_ok=True)
|
|
||||||
output_path = os.path.join(output_dir, filename)
|
|
||||||
else:
|
|
||||||
# If no output directory specified, save in the same directory as the input file
|
|
||||||
base_name = os.path.splitext(file_path)[0]
|
|
||||||
output_path = f"{base_name}_metadata.json"
|
|
||||||
|
|
||||||
# Write metadata to JSON file
|
|
||||||
with open(output_path, 'w', encoding='utf-8') as f:
|
|
||||||
json.dump(metadata, f, indent=2, ensure_ascii=False)
|
|
||||||
|
|
||||||
print(f"✅ Metadata saved to {output_path}")
|
|
||||||
return True
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
def process_directory(directory_path, output_dir=None):
|
|
||||||
"""
|
|
||||||
Process all video files in a directory and write metadata to JSON.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
directory_path (str): Path to the directory
|
|
||||||
output_dir (str, optional): Directory where the output JSON file will be saved
|
|
||||||
"""
|
|
||||||
if not os.path.isdir(directory_path):
|
|
||||||
print(f"❌ Directory not found: {directory_path}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
all_metadata = {}
|
|
||||||
file_count = 0
|
|
||||||
|
|
||||||
for root, _, files in os.walk(directory_path):
|
|
||||||
for file in files:
|
|
||||||
if file.lower().endswith(SUPPORTED_EXTENSIONS):
|
|
||||||
file_path = os.path.join(root, file)
|
|
||||||
print(f"📊 Extracting metadata from {file}")
|
|
||||||
metadata = get_video_metadata(file_path)
|
|
||||||
|
|
||||||
if metadata:
|
|
||||||
# Use relative path as key
|
|
||||||
rel_path = os.path.relpath(file_path, directory_path)
|
|
||||||
all_metadata[rel_path] = metadata
|
|
||||||
file_count += 1
|
|
||||||
|
|
||||||
if file_count == 0:
|
|
||||||
print(f"❌ No supported video files found in {directory_path}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Generate output filename based on directory name
|
|
||||||
dir_name = os.path.basename(os.path.normpath(directory_path))
|
|
||||||
filename = f"{dir_name}_metadata.json"
|
|
||||||
|
|
||||||
if output_dir:
|
|
||||||
# Ensure output directory exists
|
|
||||||
os.makedirs(output_dir, exist_ok=True)
|
|
||||||
output_path = os.path.join(output_dir, filename)
|
|
||||||
else:
|
|
||||||
# If no output directory specified, save in the current directory
|
|
||||||
output_path = filename
|
|
||||||
|
|
||||||
# Write all metadata to a single JSON file
|
|
||||||
with open(output_path, 'w', encoding='utf-8') as f:
|
|
||||||
json.dump(all_metadata, f, indent=2, ensure_ascii=False)
|
|
||||||
|
|
||||||
print(f"✅ Metadata for {file_count} files saved to {output_path}")
|
|
||||||
return True
|
|
||||||
|
|
||||||
def main():
|
|
||||||
parser = argparse.ArgumentParser(description="Extract metadata from video files and save as JSON.")
|
|
||||||
parser.add_argument("input", help="Path to input video file or directory")
|
|
||||||
parser.add_argument("-o", "--output", help="Directory path where output JSON files will be saved")
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
input_path = args.input
|
|
||||||
output_dir = args.output
|
|
||||||
|
|
||||||
if os.path.isfile(input_path):
|
|
||||||
process_file(input_path, output_dir)
|
|
||||||
elif os.path.isdir(input_path):
|
|
||||||
process_directory(input_path, output_dir)
|
|
||||||
else:
|
|
||||||
print(f"❌ Path not found: {input_path}")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
191
src/metadata_extractor.py
Normal file
191
src/metadata_extractor.py
Normal file
@ -0,0 +1,191 @@
|
|||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
|
||||||
|
class MetadataExtractor:
|
||||||
|
SUPPORTED_EXTENSIONS = (".mp4", ".mkv", ".mov", ".avi")
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.logger: logging.Logger = logging.getLogger("MetadataExtractor")
|
||||||
|
|
||||||
|
def analyze_file(self, path: str) -> Optional[dict]:
|
||||||
|
"""
|
||||||
|
Extracts metadata from a video file using ffprobe
|
||||||
|
|
||||||
|
:param path: Path to the video file
|
||||||
|
:return: Metadata information or ``None`` if an error occurred
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Get general file info in JSON format
|
||||||
|
cmd: list[str] = [
|
||||||
|
"ffprobe",
|
||||||
|
"-v", "quiet",
|
||||||
|
"-print_format", "json",
|
||||||
|
"-show_format",
|
||||||
|
"-show_streams",
|
||||||
|
path
|
||||||
|
]
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
||||||
|
if result.returncode != 0:
|
||||||
|
self.logger.error(f"Error processing {path}: {result.stderr}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
data: dict = json.loads(result.stdout)
|
||||||
|
|
||||||
|
# Extract filename and title
|
||||||
|
filename: str = os.path.basename(path)
|
||||||
|
title: str = data.get("format", {}).get("tags", {}).get("title", filename)
|
||||||
|
|
||||||
|
# Initialize metadata structure
|
||||||
|
metadata: dict = {
|
||||||
|
"filename": filename,
|
||||||
|
"title": title,
|
||||||
|
"audio_tracks": [],
|
||||||
|
"subtitle_tracks": []
|
||||||
|
}
|
||||||
|
|
||||||
|
# Process streams
|
||||||
|
for stream in data.get("streams", []):
|
||||||
|
codec_type = stream.get("codec_type")
|
||||||
|
tags = stream.get("tags", {})
|
||||||
|
disposition = stream.get("disposition", {})
|
||||||
|
track = {
|
||||||
|
"index": stream.get("index"),
|
||||||
|
"language": tags.get("language", "und"),
|
||||||
|
"name": tags.get("title", ""),
|
||||||
|
"flags": {
|
||||||
|
"default": disposition.get("default", 0) == 1,
|
||||||
|
"original": disposition.get("original", 0) == 1,
|
||||||
|
"commentary": disposition.get("commentary", 0) == 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if codec_type == "audio":
|
||||||
|
track |= {
|
||||||
|
"channels": stream.get("channels", 0)
|
||||||
|
}
|
||||||
|
track["flags"] |= {
|
||||||
|
"visual_impaired": disposition.get("visual_impaired", 0) == 1
|
||||||
|
}
|
||||||
|
metadata["audio_tracks"].append(track)
|
||||||
|
|
||||||
|
elif codec_type == "subtitle":
|
||||||
|
track["flags"] |= {
|
||||||
|
"forced": disposition.get("forced", 0) == 1,
|
||||||
|
"hearing_impaired": disposition.get("hearing_impaired", 0) == 1
|
||||||
|
}
|
||||||
|
metadata["subtitle_tracks"].append(track)
|
||||||
|
|
||||||
|
elif codec_type == "video":
|
||||||
|
pass
|
||||||
|
|
||||||
|
elif codec_type == "button":
|
||||||
|
pass
|
||||||
|
|
||||||
|
else:
|
||||||
|
self.logger.warning(f"Unknown track codec type '{codec_type}'")
|
||||||
|
|
||||||
|
return metadata
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"Error processing {path}: {str(e)}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def process_file(self, file_path: str, output_dir: str) -> bool:
|
||||||
|
"""
|
||||||
|
Processes a single video file and writes metadata to a JSON file
|
||||||
|
|
||||||
|
:param file_path: Path of the video file
|
||||||
|
:param output_dir: Path of the directory where the output JSON file will be saved
|
||||||
|
:return: True if successful, False otherwise
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not os.path.isfile(file_path):
|
||||||
|
self.logger.error(f"File not found: {file_path}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
if not file_path.lower().endswith(self.SUPPORTED_EXTENSIONS):
|
||||||
|
self.logger.error(f"Unsupported file format: {file_path}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
self.logger.debug(f"Extracting metadata from {os.path.basename(file_path)}")
|
||||||
|
metadata: Optional[dict] = self.analyze_file(file_path)
|
||||||
|
|
||||||
|
if metadata:
|
||||||
|
# Generate output filename based on input file
|
||||||
|
filename = os.path.basename(os.path.splitext(file_path)[0]) + "_metadata.json"
|
||||||
|
|
||||||
|
if output_dir:
|
||||||
|
# Ensure output directory exists
|
||||||
|
os.makedirs(output_dir, exist_ok=True)
|
||||||
|
output_path = os.path.join(output_dir, filename)
|
||||||
|
else:
|
||||||
|
# If no output directory specified, save in the same directory as the input file
|
||||||
|
base_name = os.path.splitext(file_path)[0]
|
||||||
|
output_path = f"{base_name}_metadata.json"
|
||||||
|
|
||||||
|
# Write metadata to JSON file
|
||||||
|
with open(output_path, "w", encoding="utf-8") as f:
|
||||||
|
json.dump(metadata, f, indent=2, ensure_ascii=False)
|
||||||
|
|
||||||
|
self.logger.debug(f"Metadata saved to {output_path}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def process_directory(self, directory_path: str, output_dir: Optional[str] = None) -> bool:
|
||||||
|
"""
|
||||||
|
Processes all video files in a directory and writes metadata to a JSON file
|
||||||
|
|
||||||
|
:param directory_path: Path of the directory
|
||||||
|
:param output_dir: Path of the directory where the output JSON file will be saved
|
||||||
|
:return: True if successful, False otherwise
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not os.path.isdir(directory_path):
|
||||||
|
self.logger.error(f"Directory not found: {directory_path}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
all_metadata: dict[str, dict] = {}
|
||||||
|
file_count: int = 0
|
||||||
|
|
||||||
|
for root, _, files in os.walk(directory_path):
|
||||||
|
for file in files:
|
||||||
|
if file.lower().endswith(self.SUPPORTED_EXTENSIONS):
|
||||||
|
file_path: str = os.path.join(root, file)
|
||||||
|
self.logger.debug(f"Extracting metadata from {file}")
|
||||||
|
metadata: Optional[dict] = self.analyze_file(file_path)
|
||||||
|
|
||||||
|
if metadata:
|
||||||
|
# Use relative path as key
|
||||||
|
rel_path: str = os.path.relpath(file_path, directory_path)
|
||||||
|
all_metadata[rel_path] = metadata
|
||||||
|
file_count += 1
|
||||||
|
|
||||||
|
if file_count == 0:
|
||||||
|
self.logger.error(f"No supported video files found in {directory_path}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Generate output filename based on directory name
|
||||||
|
dir_name: str = os.path.basename(os.path.normpath(directory_path))
|
||||||
|
filename: str = f"{dir_name}_metadata.json"
|
||||||
|
|
||||||
|
if output_dir is not None:
|
||||||
|
# Ensure output directory exists
|
||||||
|
os.makedirs(output_dir, exist_ok=True)
|
||||||
|
output_path = os.path.join(output_dir, filename)
|
||||||
|
else:
|
||||||
|
# If no output directory specified, save in the current directory
|
||||||
|
output_path = filename
|
||||||
|
|
||||||
|
# Write all metadata to a single JSON file
|
||||||
|
with open(output_path, "w", encoding="utf-8") as f:
|
||||||
|
json.dump(all_metadata, f, indent=2, ensure_ascii=False)
|
||||||
|
|
||||||
|
self.logger.debug(f"Metadata for {file_count} files saved to {output_path}")
|
||||||
|
return True
|
286
src/metadata_writer.py
Normal file
286
src/metadata_writer.py
Normal file
@ -0,0 +1,286 @@
|
|||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
|
||||||
|
class MetadataWriter:
|
||||||
|
SUPPORTED_EXTENSIONS = (".mp4", ".mkv", ".mov", ".avi")
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.logger: logging.Logger = logging.getLogger("MetadataWriter")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_mkvmerge_cmd(metadata: dict, in_path: str, out_path: str) -> list[str]:
|
||||||
|
cmd: list[str] = [
|
||||||
|
"mkvmerge",
|
||||||
|
"-o", out_path
|
||||||
|
]
|
||||||
|
|
||||||
|
# Add global metadata (title)
|
||||||
|
if "title" in metadata:
|
||||||
|
cmd.extend(["--title", metadata["title"]])
|
||||||
|
|
||||||
|
# Process audio + subtitle tracks
|
||||||
|
tracks: list[dict] = metadata.get("audio_tracks", []) + metadata.get("subtitle_tracks", [])
|
||||||
|
for track in tracks:
|
||||||
|
# Use the actual track index from the metadata
|
||||||
|
track_id = track.get("index", 0)
|
||||||
|
|
||||||
|
# Set language
|
||||||
|
if "language" in track:
|
||||||
|
cmd.extend(["--language", f"{track_id}:{track["language"]}"])
|
||||||
|
|
||||||
|
# Set title/name
|
||||||
|
if "name" in track and track["name"]:
|
||||||
|
cmd.extend(["--track-name", f"{track_id}:{track["name"]}"])
|
||||||
|
|
||||||
|
# Set disposition flags
|
||||||
|
flags = track.get("flags", {})
|
||||||
|
|
||||||
|
def yes_no(flag: str):
|
||||||
|
return f"{track_id}:{"yes" if flags.get(flag, False) else "no"}"
|
||||||
|
|
||||||
|
cmd.extend(["--default-track", yes_no("default")])
|
||||||
|
cmd.extend(["--forced-track", yes_no("forced")])
|
||||||
|
cmd.extend(["--original-flag", yes_no("original")])
|
||||||
|
|
||||||
|
# Add input file
|
||||||
|
cmd.append(in_path)
|
||||||
|
return cmd
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_mkvpropedit_cmd(metadata: dict, path: str) -> list[str]:
|
||||||
|
cmd: list[str] = [
|
||||||
|
"mkvpropedit",
|
||||||
|
path
|
||||||
|
]
|
||||||
|
|
||||||
|
# Add global metadata (title)
|
||||||
|
if "title" in metadata:
|
||||||
|
cmd.extend(["--edit", "info", "--set", f"title={metadata["title"]}"])
|
||||||
|
|
||||||
|
# Process audio + subtitle tracks
|
||||||
|
tracks: list[dict] = metadata.get("audio_tracks", []) + metadata.get("subtitle_tracks", [])
|
||||||
|
for track in tracks:
|
||||||
|
# Use the actual track index from the metadata
|
||||||
|
track_id = track.get("index", 0)
|
||||||
|
|
||||||
|
cmd.extend(["--edit", f"track:{track_id}"])
|
||||||
|
|
||||||
|
# Set language
|
||||||
|
if "language" in track:
|
||||||
|
cmd.extend(["--set", f"language={track["language"]}"])
|
||||||
|
|
||||||
|
# Set title/name
|
||||||
|
if "name" in track and track["name"]:
|
||||||
|
cmd.extend(["--set", f"name={track["name"]}"])
|
||||||
|
|
||||||
|
# Set disposition flags
|
||||||
|
flags = track.get("flags", {})
|
||||||
|
|
||||||
|
cmd.extend(["--set", f"flag-default={int(flags.get("default", False))}"])
|
||||||
|
cmd.extend(["--set", f"flag-forced={int(flags.get("forced", False))}"])
|
||||||
|
cmd.extend(["--set", f"flag-original={int(flags.get("original", False))}"])
|
||||||
|
|
||||||
|
return cmd
|
||||||
|
|
||||||
|
def apply_metadata(self, metadata: dict, in_path: str, out_path: Optional[str] = None) -> bool:
|
||||||
|
"""
|
||||||
|
Writes metadata to a video file using mkvmerge or mkvpropedit
|
||||||
|
|
||||||
|
:param metadata: Metadata information
|
||||||
|
:param in_path: Path of the input video file
|
||||||
|
:param out_path: Path of the output video file. If None, ``"_modified"`` is appended to ``in_path`` instead
|
||||||
|
:return: True if successful, False otherwise
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not os.path.isfile(in_path):
|
||||||
|
self.logger.error(f"Input file not found: {in_path}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
if out_path is None:
|
||||||
|
# Create a temporary output file
|
||||||
|
base_name, ext = os.path.splitext(in_path)
|
||||||
|
out_path: str = f"{base_name}_modified{ext}"
|
||||||
|
|
||||||
|
# Build the command
|
||||||
|
overwriting: bool = os.path.abspath(in_path) == os.path.abspath(out_path)
|
||||||
|
cmd: list[str] = (
|
||||||
|
self.get_mkvpropedit_cmd(metadata, in_path)
|
||||||
|
if overwriting else
|
||||||
|
self.get_mkvmerge_cmd(metadata, in_path, out_path)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Execute the command
|
||||||
|
self.logger.debug(f"Writing metadata to {os.path.basename(out_path)}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
||||||
|
if result.returncode != 0:
|
||||||
|
self.logger.error(f"Error writing metadata: {result.stderr}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
self.logger.debug(f"Metadata written to {out_path}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"Error executing {cmd[0]}: {str(e)}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def read_metadata(path: str) -> Optional[dict]:
|
||||||
|
try:
|
||||||
|
with open(path, "r") as f:
|
||||||
|
metadata: dict = json.load(f)
|
||||||
|
return metadata
|
||||||
|
|
||||||
|
except:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def process_file(self, metadata_or_path: str|dict, file_path: str, output_dir: Optional[str] = None) -> bool:
|
||||||
|
"""
|
||||||
|
Processes a single video file with the given metadata
|
||||||
|
|
||||||
|
:param metadata_or_path: Metadata dict or path of the metadata file
|
||||||
|
:param file_path: Path of the video file
|
||||||
|
:param output_dir: Directory to save the output file to
|
||||||
|
:return: True if successful, False otherwise
|
||||||
|
"""
|
||||||
|
|
||||||
|
metadata: dict
|
||||||
|
if isinstance(metadata_or_path, str):
|
||||||
|
metadata = self.read_metadata(metadata_or_path)
|
||||||
|
if metadata is None:
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
metadata = metadata_or_path
|
||||||
|
|
||||||
|
# Create output file path
|
||||||
|
if output_dir is not None:
|
||||||
|
# Ensure output directory exists
|
||||||
|
os.makedirs(output_dir, exist_ok=True)
|
||||||
|
|
||||||
|
# Use the same filename in the output directory
|
||||||
|
output_file = os.path.join(output_dir, os.path.basename(file_path))
|
||||||
|
else:
|
||||||
|
output_file = None
|
||||||
|
|
||||||
|
# Write metadata to video
|
||||||
|
return self.apply_metadata(metadata, file_path, output_file)
|
||||||
|
|
||||||
|
def process_directory(self, metadata_or_path: str|dict, source_dir: str, output_dir: Optional[str] = None) -> bool:
|
||||||
|
"""
|
||||||
|
Processes all video files in the metadata dictionary
|
||||||
|
|
||||||
|
:param metadata_or_path: Dictionary of metadata keyed by filename
|
||||||
|
:param source_dir: Directory containing the video files
|
||||||
|
:param output_dir: Directory to save the output files to
|
||||||
|
:return: True if all files were processed successfully, False otherwise
|
||||||
|
"""
|
||||||
|
|
||||||
|
metadata: dict
|
||||||
|
if isinstance(metadata_or_path, str):
|
||||||
|
metadata = self.read_metadata(metadata_or_path)
|
||||||
|
if metadata is None:
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
metadata = metadata_or_path
|
||||||
|
|
||||||
|
if not os.path.isdir(source_dir):
|
||||||
|
self.logger.error(f"Source directory not found: {source_dir}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Create output directory if specified
|
||||||
|
if output_dir:
|
||||||
|
os.makedirs(output_dir, exist_ok=True)
|
||||||
|
|
||||||
|
success: bool = True
|
||||||
|
processed_count: int = 0
|
||||||
|
|
||||||
|
# Process each file in the metadata dictionary
|
||||||
|
for filename, file_metadata in metadata.items():
|
||||||
|
# Construct the full path to the video file
|
||||||
|
video_file: str = os.path.join(source_dir, filename)
|
||||||
|
|
||||||
|
if not os.path.isfile(video_file):
|
||||||
|
self.logger.error(f"Video file not found: {video_file}")
|
||||||
|
success = False
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Process the file
|
||||||
|
if self.process_file(file_metadata, video_file, output_dir):
|
||||||
|
processed_count += 1
|
||||||
|
else:
|
||||||
|
success = False
|
||||||
|
|
||||||
|
self.logger.debug(f"Processed {processed_count} out of {len(metadata)} files")
|
||||||
|
return success
|
||||||
|
|
||||||
|
def process_metadata(self, metadata_or_path: str|dict, source_dir: Optional[str] = None, output_dir: Optional[str] = None) -> bool:
|
||||||
|
metadata_as_path: bool = isinstance(metadata_or_path, str)
|
||||||
|
|
||||||
|
metadata: dict
|
||||||
|
if metadata_as_path:
|
||||||
|
metadata = self.read_metadata(metadata_or_path)
|
||||||
|
if metadata is None:
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
metadata = metadata_or_path
|
||||||
|
|
||||||
|
# Determine if the JSON contains metadata for multiple files or a single file
|
||||||
|
is_multi_file = isinstance(metadata, dict) and all(isinstance(metadata[key], dict) for key in metadata)
|
||||||
|
|
||||||
|
# If source directory is not specified, try to determine it from the JSON filename
|
||||||
|
if source_dir is None and is_multi_file and metadata_as_path:
|
||||||
|
# Extract folder name from JSON filename (e.g., "Millenium" from "Millenium_metadata.json")
|
||||||
|
json_basename: str = os.path.basename(metadata_or_path)
|
||||||
|
if json_basename.endswith("_metadata.json"):
|
||||||
|
folder_name: str = json_basename.split("_metadata.json")[0]
|
||||||
|
potential_source_dir: str = os.path.join(
|
||||||
|
os.path.dirname(os.path.abspath(metadata_or_path)),
|
||||||
|
folder_name
|
||||||
|
)
|
||||||
|
|
||||||
|
if os.path.isdir(potential_source_dir):
|
||||||
|
source_dir: str = potential_source_dir
|
||||||
|
self.logger.debug(f"Using source directory: {source_dir}")
|
||||||
|
|
||||||
|
# If no output directory is specified, create one based on the source directory
|
||||||
|
if output_dir is None and source_dir is not None:
|
||||||
|
output_dir = os.path.join("ready", os.path.basename(source_dir))
|
||||||
|
self.logger.debug(f"Using output directory: {output_dir}")
|
||||||
|
|
||||||
|
# Process files based on the metadata format
|
||||||
|
if is_multi_file:
|
||||||
|
if source_dir is None:
|
||||||
|
self.logger.error(
|
||||||
|
"Source directory not specified and could not be determined automatically. " +
|
||||||
|
"Please specify a source directory with --source or use a JSON filename like 'FolderName_metadata.json'"
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
success = self.process_directory(metadata, source_dir, output_dir)
|
||||||
|
else:
|
||||||
|
# Single file metadata
|
||||||
|
if "filename" not in metadata:
|
||||||
|
self.logger.error("Invalid metadata format: missing 'filename' field")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# If source directory is specified, look for the file there
|
||||||
|
video_file: str
|
||||||
|
if source_dir is not None:
|
||||||
|
video_file = os.path.join(source_dir, metadata["filename"])
|
||||||
|
elif metadata_as_path:
|
||||||
|
# Look for the file in the same directory as the JSON
|
||||||
|
video_file = os.path.join(os.path.dirname(metadata_or_path), metadata["filename"])
|
||||||
|
else:
|
||||||
|
self.logger.error(
|
||||||
|
"Source directory not specified and video path could not be determined automatically. " +
|
||||||
|
"Please specify a source directory with --source or use JSON filename like 'VideoName_metadata.json'"
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
success = self.process_file(metadata, video_file, output_dir)
|
||||||
|
return success
|
@ -10,7 +10,8 @@
|
|||||||
<script src="/static/js/conversion.js"></script>
|
<script src="/static/js/conversion.js"></script>
|
||||||
</head>
|
</head>
|
||||||
<body>
|
<body>
|
||||||
<header>
|
<header id="header">
|
||||||
|
<a href="/"><img class="logo" src="/static/images/icon3.svg"></a>
|
||||||
<h1>Media Conversion</h1>
|
<h1>Media Conversion</h1>
|
||||||
</header>
|
</header>
|
||||||
<main>
|
<main>
|
||||||
|
@ -10,8 +10,8 @@
|
|||||||
<script src="/static/js/index.js"></script>
|
<script src="/static/js/index.js"></script>
|
||||||
</head>
|
</head>
|
||||||
<body>
|
<body>
|
||||||
<header>
|
<header id="header">
|
||||||
<img src="/static/images/icon3.svg">
|
<a href="/"><img class="logo" src="/static/images/icon3.svg"></a>
|
||||||
<h1>Melies</h1>
|
<h1>Melies</h1>
|
||||||
</header>
|
</header>
|
||||||
<main>
|
<main>
|
||||||
|
@ -14,6 +14,10 @@
|
|||||||
<img src="/static/images/improve.svg">
|
<img src="/static/images/improve.svg">
|
||||||
<img class="clicked" src="/static/images/improve_clicked.svg">
|
<img class="clicked" src="/static/images/improve_clicked.svg">
|
||||||
</button>
|
</button>
|
||||||
|
<header id="header">
|
||||||
|
<a href="/"><img class="logo" src="/static/images/icon3.svg"></a>
|
||||||
|
<h1>Metadata Editor</h1>
|
||||||
|
</header>
|
||||||
<header id="toolbar">
|
<header id="toolbar">
|
||||||
<a href="/metadata/">Back</a>
|
<a href="/metadata/">Back</a>
|
||||||
<button id="check-integrity">Check integrity</button>
|
<button id="check-integrity">Check integrity</button>
|
||||||
|
@ -10,7 +10,8 @@
|
|||||||
<script src="/static/js/metadata.js"></script>
|
<script src="/static/js/metadata.js"></script>
|
||||||
</head>
|
</head>
|
||||||
<body>
|
<body>
|
||||||
<header>
|
<header id="header">
|
||||||
|
<a href="/"><img class="logo" src="/static/images/icon3.svg"></a>
|
||||||
<h1>Metadata Editor</h1>
|
<h1>Metadata Editor</h1>
|
||||||
</header>
|
</header>
|
||||||
<main>
|
<main>
|
||||||
|
@ -1,3 +1,13 @@
|
|||||||
|
@keyframes moon-pulse {
|
||||||
|
from {
|
||||||
|
filter: drop-shadow(0px 0px 0px #e7d7a8);
|
||||||
|
}
|
||||||
|
|
||||||
|
to {
|
||||||
|
filter: drop-shadow(0px 0px 12px white);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
* {
|
* {
|
||||||
margin: 0;
|
margin: 0;
|
||||||
box-sizing: border-box;
|
box-sizing: border-box;
|
||||||
@ -29,19 +39,17 @@ header {
|
|||||||
gap: 0.8em;
|
gap: 0.8em;
|
||||||
color: white;
|
color: white;
|
||||||
|
|
||||||
a, button {
|
&#header {
|
||||||
padding: 0.4em 0.8em;
|
align-items: center;
|
||||||
border: none;
|
|
||||||
color: black;
|
|
||||||
background-color: #e4e4e4;
|
|
||||||
font-size: inherit;
|
|
||||||
font-family: inherit;
|
|
||||||
text-decoration: none;
|
|
||||||
border-radius: 0.2em;
|
|
||||||
cursor: pointer;
|
|
||||||
|
|
||||||
&:hover {
|
img.logo {
|
||||||
background-color: #dbdbdb;
|
width: 4em;
|
||||||
|
height: 4em;
|
||||||
|
object-fit: contain;
|
||||||
|
|
||||||
|
&:hover {
|
||||||
|
animation: moon-pulse 2s alternate infinite linear;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,27 @@ main {
|
|||||||
gap: 1.2em;
|
gap: 1.2em;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
header#toolbar {
|
||||||
|
padding: 0.8em;
|
||||||
|
background-color: #4b4b4b;
|
||||||
|
|
||||||
|
a, button {
|
||||||
|
padding: 0.4em 0.8em;
|
||||||
|
border: none;
|
||||||
|
color: black;
|
||||||
|
background-color: #e4e4e4;
|
||||||
|
font-size: inherit;
|
||||||
|
font-family: inherit;
|
||||||
|
text-decoration: none;
|
||||||
|
border-radius: 0.2em;
|
||||||
|
cursor: pointer;
|
||||||
|
|
||||||
|
&:hover {
|
||||||
|
background-color: #dbdbdb;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#toggle-notifs {
|
#toggle-notifs {
|
||||||
margin-left: auto;
|
margin-left: auto;
|
||||||
}
|
}
|
||||||
|
@ -1,12 +1,3 @@
|
|||||||
header {
|
|
||||||
align-items: center;
|
|
||||||
img {
|
|
||||||
width: 4em;
|
|
||||||
height: 4em;
|
|
||||||
object-fit: contain;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#pages {
|
#pages {
|
||||||
display: grid;
|
display: grid;
|
||||||
max-width: calc(max(50%, 20em));
|
max-width: calc(max(50%, 20em));
|
||||||
|
@ -177,11 +177,7 @@ function showAgents() {
|
|||||||
function updateConvertBtn() {
|
function updateConvertBtn() {
|
||||||
const agent = document.querySelector("#agents .agent input:checked")
|
const agent = document.querySelector("#agents .agent input:checked")
|
||||||
const convertBtn = document.getElementById("convert")
|
const convertBtn = document.getElementById("convert")
|
||||||
if (agent) {
|
convertBtn.disabled = !agent
|
||||||
convertBtn.disabled = false
|
|
||||||
} else {
|
|
||||||
convertBtn.disabled = true
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function addAgents(agents) {
|
function addAgents(agents) {
|
||||||
|
@ -224,7 +224,7 @@ export default class IntegrityManager {
|
|||||||
|
|
||||||
if (parts.includes("pgs")) {
|
if (parts.includes("pgs")) {
|
||||||
fields.type = "PGS"
|
fields.type = "PGS"
|
||||||
} else {
|
} else if (parts.includes("srt")) {
|
||||||
fields.type = "SRT"
|
fields.type = "SRT"
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
@ -316,7 +316,9 @@ export default class IntegrityManager {
|
|||||||
if (fields.flags.hearing_impaired) {
|
if (fields.flags.hearing_impaired) {
|
||||||
name += " SDH"
|
name += " SDH"
|
||||||
}
|
}
|
||||||
name += " | " + fields.type
|
if (fields.type) {
|
||||||
|
name += " | " + fields.type
|
||||||
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
return name
|
return name
|
||||||
|
@ -119,6 +119,7 @@ export class Track {
|
|||||||
|
|
||||||
|
|
||||||
input.value = value
|
input.value = value
|
||||||
|
break
|
||||||
|
|
||||||
default:
|
default:
|
||||||
break
|
break
|
||||||
|
270
src/server.py
Executable file → Normal file
270
src/server.py
Executable file → Normal file
@ -1,54 +1,30 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import argparse
|
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import socketserver
|
import socketserver
|
||||||
|
import time
|
||||||
|
from functools import partial
|
||||||
from http import HTTPStatus
|
from http import HTTPStatus
|
||||||
from http.server import SimpleHTTPRequestHandler
|
from http.server import SimpleHTTPRequestHandler
|
||||||
import time
|
from logging import Logger
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from urllib.parse import parse_qs, unquote, urlparse
|
from urllib.parse import parse_qs, unquote, urlparse
|
||||||
|
|
||||||
from watchdog.events import DirModifiedEvent, FileSystemEventHandler
|
from watchdog.events import (FileClosedEvent, FileDeletedEvent, FileMovedEvent,
|
||||||
|
FileSystemEventHandler)
|
||||||
from watchdog.observers import Observer
|
from watchdog.observers import Observer
|
||||||
from watchdog.observers.api import BaseObserver
|
from watchdog.observers.api import BaseObserver
|
||||||
|
|
||||||
|
from src.file_handlers import ToConvertFileHandler, MetadataFileHandler
|
||||||
# https://stackoverflow.com/a/10551190/11109181
|
|
||||||
class EnvDefault(argparse.Action):
|
|
||||||
def __init__(self, envvar, required=True, default=None, help=None, **kwargs):
|
|
||||||
if envvar:
|
|
||||||
if envvar in os.environ:
|
|
||||||
default = os.environ[envvar]
|
|
||||||
if required and default is not None:
|
|
||||||
required = False
|
|
||||||
|
|
||||||
if default is not None and help is not None:
|
|
||||||
help += f" (default: {default})"
|
|
||||||
|
|
||||||
if envvar and help is not None:
|
|
||||||
help += f"\nCan also be specified through the {envvar} environment variable"
|
|
||||||
super(EnvDefault, self).__init__(default=default, required=required, help=help,
|
|
||||||
**kwargs)
|
|
||||||
|
|
||||||
def __call__(self, parser, namespace, values, option_string=None):
|
|
||||||
setattr(namespace, self.dest, values)
|
|
||||||
|
|
||||||
|
|
||||||
class HTTPHandler(SimpleHTTPRequestHandler):
|
class HTTPHandler(SimpleHTTPRequestHandler):
|
||||||
SERVER: MeliesServer = None
|
def __init__(self, server: MeliesServer, *args, **kwargs):
|
||||||
METADATA_CACHE = {}
|
self.server_: MeliesServer = server
|
||||||
TO_CONVERT_CACHE = {}
|
self.to_convert_files: ToConvertFileHandler = self.server_.to_convert_files
|
||||||
|
self.metadata_files: MetadataFileHandler = self.server_.metadata_files
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
self.MAX_PAYLOAD_SIZE: int = self.SERVER.max_payload_size
|
|
||||||
self.TO_CONVERT_DIR: str = self.SERVER.to_convert_dir
|
|
||||||
self.CONVERTED_DIR: str = self.SERVER.converted_dir
|
|
||||||
self.METADATA_DIR: str = self.SERVER.metadata_dir
|
|
||||||
|
|
||||||
super().__init__(
|
super().__init__(
|
||||||
*args,
|
*args,
|
||||||
@ -60,7 +36,7 @@ class HTTPHandler(SimpleHTTPRequestHandler):
|
|||||||
self.data: Optional[dict|list] = None
|
self.data: Optional[dict|list] = None
|
||||||
|
|
||||||
def log_message(self, format, *args):
|
def log_message(self, format, *args):
|
||||||
logging.info("%s - %s" % (
|
self.server_.logger.info("%s - %s" % (
|
||||||
self.client_address[0],
|
self.client_address[0],
|
||||||
format % args
|
format % args
|
||||||
))
|
))
|
||||||
@ -68,9 +44,9 @@ class HTTPHandler(SimpleHTTPRequestHandler):
|
|||||||
def read_body_data(self):
|
def read_body_data(self):
|
||||||
try:
|
try:
|
||||||
size: int = int(self.headers["Content-Length"])
|
size: int = int(self.headers["Content-Length"])
|
||||||
if size > self.MAX_PAYLOAD_SIZE:
|
if size > self.server_.max_payload_size:
|
||||||
self.send_error(HTTPStatus.CONTENT_TOO_LARGE)
|
self.send_error(HTTPStatus.CONTENT_TOO_LARGE)
|
||||||
self.log_error(f"Payload is too big ({self.MAX_PAYLOAD_SIZE=}B)")
|
self.log_error(f"Payload is too big ({self.server_.max_payload_size=}B)")
|
||||||
return False
|
return False
|
||||||
raw_data = self.rfile.read(size)
|
raw_data = self.rfile.read(size)
|
||||||
self.data = json.loads(raw_data)
|
self.data = json.loads(raw_data)
|
||||||
@ -103,16 +79,17 @@ class HTTPHandler(SimpleHTTPRequestHandler):
|
|||||||
def handle_api_get(self, path: str):
|
def handle_api_get(self, path: str):
|
||||||
self.log_message(f"API request at {path}")
|
self.log_message(f"API request at {path}")
|
||||||
if path == "files/to_convert":
|
if path == "files/to_convert":
|
||||||
files: list[str] = self.get_to_convert_files_meta(self.query.get("f", [""])[0])
|
base_path: str = self.query.get("f", [""])[0]
|
||||||
|
files: list[dict] = self.to_convert_files.get_files_meta(base_path)
|
||||||
self.send_json(files)
|
self.send_json(files)
|
||||||
|
|
||||||
elif path == "files/metadata":
|
elif path == "files/metadata":
|
||||||
files: list[str] = self.get_metadata_files_meta()
|
files: list[dict] = self.metadata_files.get_files_meta()
|
||||||
self.send_json(files)
|
self.send_json(files)
|
||||||
|
|
||||||
elif path.startswith("file"):
|
elif path.startswith("file"):
|
||||||
filename: str = path.split("/", 1)[1]
|
filename: str = path.split("/", 1)[1]
|
||||||
data = self.read_file(filename)
|
data = self.metadata_files.read(filename)
|
||||||
if data is None:
|
if data is None:
|
||||||
self.send_error(HTTPStatus.NOT_FOUND)
|
self.send_error(HTTPStatus.NOT_FOUND)
|
||||||
else:
|
else:
|
||||||
@ -125,9 +102,11 @@ class HTTPHandler(SimpleHTTPRequestHandler):
|
|||||||
if path.startswith("file"):
|
if path.startswith("file"):
|
||||||
if self.read_body_data():
|
if self.read_body_data():
|
||||||
filename: str = path.split("/", 1)[1]
|
filename: str = path.split("/", 1)[1]
|
||||||
if self.write_file(filename, self.data):
|
if self.metadata_files.write(filename, self.data):
|
||||||
self.send_response(HTTPStatus.OK)
|
self.send_response(HTTPStatus.OK)
|
||||||
self.end_headers()
|
self.end_headers()
|
||||||
|
else:
|
||||||
|
self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR)
|
||||||
else:
|
else:
|
||||||
self.send_response(HTTPStatus.NOT_FOUND, f"Unknown path {path}")
|
self.send_response(HTTPStatus.NOT_FOUND, f"Unknown path {path}")
|
||||||
self.end_headers()
|
self.end_headers()
|
||||||
@ -138,114 +117,6 @@ class HTTPHandler(SimpleHTTPRequestHandler):
|
|||||||
self.end_headers()
|
self.end_headers()
|
||||||
self.wfile.write(json.dumps(data).encode("utf-8"))
|
self.wfile.write(json.dumps(data).encode("utf-8"))
|
||||||
|
|
||||||
def get_to_convert_files(self, base_path: str):
|
|
||||||
root_path: str = os.path.abspath(self.TO_CONVERT_DIR)
|
|
||||||
full_path: str = os.path.join(root_path, base_path)
|
|
||||||
full_path = os.path.abspath(full_path)
|
|
||||||
common_prefix: str = os.path.commonprefix([full_path, root_path])
|
|
||||||
|
|
||||||
if common_prefix != root_path:
|
|
||||||
return []
|
|
||||||
|
|
||||||
return os.listdir(full_path)
|
|
||||||
|
|
||||||
def get_metadata_files(self):
|
|
||||||
return os.listdir(self.METADATA_DIR)
|
|
||||||
|
|
||||||
def read_file(self, filename: str) -> Optional[dict|list]:
|
|
||||||
if filename not in self.get_metadata_files():
|
|
||||||
return None
|
|
||||||
with open(os.path.join(self.METADATA_DIR, filename), "r") as f:
|
|
||||||
data = json.load(f)
|
|
||||||
return data
|
|
||||||
|
|
||||||
def write_file(self, filename: str, data: dict|list) -> bool:
|
|
||||||
if filename not in self.get_metadata_files():
|
|
||||||
self.send_error(HTTPStatus.NOT_FOUND)
|
|
||||||
return False
|
|
||||||
|
|
||||||
try:
|
|
||||||
with open(os.path.join(self.METADATA_DIR, filename), "w", encoding="utf-8") as f:
|
|
||||||
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
||||||
except:
|
|
||||||
self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR)
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
def get_to_convert_files_meta(self, base_path: str):
|
|
||||||
files: list[str] = self.get_to_convert_files(base_path)
|
|
||||||
files = [os.path.join(self.TO_CONVERT_DIR, base_path, f) for f in files]
|
|
||||||
files_meta: list[dict] = []
|
|
||||||
|
|
||||||
deleted = set(self.TO_CONVERT_CACHE.keys()) - set(files)
|
|
||||||
for path in deleted:
|
|
||||||
del self.TO_CONVERT_CACHE[path]
|
|
||||||
|
|
||||||
for path in files:
|
|
||||||
last_modified: float = os.path.getmtime(path)
|
|
||||||
if path not in self.TO_CONVERT_CACHE or self.TO_CONVERT_CACHE[path]["ts"] < last_modified:
|
|
||||||
self.update_to_convert_file_meta(path)
|
|
||||||
|
|
||||||
files_meta.append(self.TO_CONVERT_CACHE[path])
|
|
||||||
|
|
||||||
return files_meta
|
|
||||||
|
|
||||||
def get_metadata_files_meta(self):
|
|
||||||
files: list[str] = self.get_metadata_files()
|
|
||||||
files_meta: list[dict] = []
|
|
||||||
|
|
||||||
deleted = set(self.METADATA_CACHE.keys()) - set(files)
|
|
||||||
for filename in deleted:
|
|
||||||
del self.METADATA_CACHE[filename]
|
|
||||||
|
|
||||||
for filename in files:
|
|
||||||
path: str = os.path.join(self.METADATA_DIR, filename)
|
|
||||||
last_modified: float = os.path.getmtime(path)
|
|
||||||
if filename not in self.METADATA_CACHE or self.METADATA_CACHE[filename]["ts"] < last_modified:
|
|
||||||
self.update_metadata_file_meta(filename)
|
|
||||||
|
|
||||||
files_meta.append(self.METADATA_CACHE[filename])
|
|
||||||
|
|
||||||
return files_meta
|
|
||||||
|
|
||||||
def update_metadata_file_meta(self, filename: str):
|
|
||||||
path: str = os.path.join(self.METADATA_DIR, filename)
|
|
||||||
|
|
||||||
meta = {
|
|
||||||
"filename": filename,
|
|
||||||
"ts": os.path.getmtime(path)
|
|
||||||
}
|
|
||||||
|
|
||||||
with open(path, "r") as f:
|
|
||||||
data = json.load(f)
|
|
||||||
is_series = "filename" not in data
|
|
||||||
meta["type"] = "series" if is_series else "film"
|
|
||||||
if is_series:
|
|
||||||
meta["episodes"] = len(data)
|
|
||||||
meta["title"] = filename.split("_metadata")[0]
|
|
||||||
else:
|
|
||||||
meta["title"] = data["title"]
|
|
||||||
|
|
||||||
self.METADATA_CACHE[filename] = meta
|
|
||||||
|
|
||||||
def update_to_convert_file_meta(self, path: str):
|
|
||||||
filename: str = os.path.basename(path)
|
|
||||||
|
|
||||||
is_dir: bool = os.path.isdir(path)
|
|
||||||
meta = {
|
|
||||||
"path": os.path.relpath(path, self.TO_CONVERT_DIR),
|
|
||||||
"filename": filename,
|
|
||||||
"ts": os.path.getmtime(path),
|
|
||||||
"size": os.path.getsize(path),
|
|
||||||
"type": "folder" if is_dir else "media"
|
|
||||||
}
|
|
||||||
if is_dir:
|
|
||||||
meta["elements"] = len(os.listdir(path))
|
|
||||||
if not meta["path"].endswith("/"):
|
|
||||||
meta["path"] += "/"
|
|
||||||
|
|
||||||
self.TO_CONVERT_CACHE[path] = meta
|
|
||||||
|
|
||||||
|
|
||||||
class MeliesServer(FileSystemEventHandler):
|
class MeliesServer(FileSystemEventHandler):
|
||||||
def __init__(
|
def __init__(
|
||||||
@ -257,6 +128,7 @@ class MeliesServer(FileSystemEventHandler):
|
|||||||
max_payload_size: int):
|
max_payload_size: int):
|
||||||
|
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
self.logger: Logger = logging.getLogger("MeliesServer")
|
||||||
|
|
||||||
self.port: int = port
|
self.port: int = port
|
||||||
self.to_convert_dir: str = to_convert_dir
|
self.to_convert_dir: str = to_convert_dir
|
||||||
@ -264,8 +136,6 @@ class MeliesServer(FileSystemEventHandler):
|
|||||||
self.metadata_dir: str = metadata_dir
|
self.metadata_dir: str = metadata_dir
|
||||||
self.max_payload_size: int = max_payload_size
|
self.max_payload_size: int = max_payload_size
|
||||||
|
|
||||||
HTTPHandler.SERVER = self
|
|
||||||
|
|
||||||
if not os.path.exists(self.to_convert_dir):
|
if not os.path.exists(self.to_convert_dir):
|
||||||
os.mkdir(self.to_convert_dir)
|
os.mkdir(self.to_convert_dir)
|
||||||
if not os.path.exists(self.converted_dir):
|
if not os.path.exists(self.converted_dir):
|
||||||
@ -273,22 +143,26 @@ class MeliesServer(FileSystemEventHandler):
|
|||||||
if not os.path.exists(self.metadata_dir):
|
if not os.path.exists(self.metadata_dir):
|
||||||
os.mkdir(self.metadata_dir)
|
os.mkdir(self.metadata_dir)
|
||||||
|
|
||||||
logging.basicConfig(
|
self.to_convert_files: ToConvertFileHandler = ToConvertFileHandler(self.to_convert_dir)
|
||||||
level=logging.INFO,
|
self.metadata_files: MetadataFileHandler = MetadataFileHandler(self.metadata_dir)
|
||||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
||||||
datefmt=r"%Y-%m-%d %H:%M:%S"
|
|
||||||
)
|
|
||||||
|
|
||||||
self.httpd: Optional[socketserver.TCPServer] = None
|
self.httpd: Optional[socketserver.TCPServer] = None
|
||||||
self.observer: BaseObserver = Observer()
|
self.observer: BaseObserver = Observer()
|
||||||
self.observer.schedule(self, self.converted_dir, event_filter=[DirModifiedEvent])
|
self.observer.schedule(
|
||||||
|
self,
|
||||||
|
self.converted_dir,
|
||||||
|
recursive=True,
|
||||||
|
event_filter=[FileDeletedEvent, FileMovedEvent, FileClosedEvent]
|
||||||
|
)
|
||||||
self.last_event: float = time.time()
|
self.last_event: float = time.time()
|
||||||
|
|
||||||
|
self.http_handler_cls = partial(HTTPHandler, self)
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
self.observer.start()
|
self.observer.start()
|
||||||
try:
|
try:
|
||||||
with socketserver.TCPServer(("", self.port), HTTPHandler) as self.httpd:
|
with socketserver.TCPServer(("", self.port), self.http_handler_cls) as self.httpd:
|
||||||
logging.info(f"Serving on port {self.port}")
|
self.logger.info(f"Serving on port {self.port}")
|
||||||
self.httpd.serve_forever()
|
self.httpd.serve_forever()
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
pass
|
pass
|
||||||
@ -298,68 +172,26 @@ class MeliesServer(FileSystemEventHandler):
|
|||||||
self.observer.stop()
|
self.observer.stop()
|
||||||
self.observer.join()
|
self.observer.join()
|
||||||
|
|
||||||
def on_modified(self, event: DirModifiedEvent):
|
def on_deleted(self, event: FileDeletedEvent):
|
||||||
t: float = time.time()
|
self.logger.info(f"Converted media deleted: {event.src_path}")
|
||||||
|
self.delete_metadata(event.src_path)
|
||||||
|
return super().on_deleted(event)
|
||||||
|
|
||||||
logging.info(event)
|
def on_moved(self, event: FileMovedEvent):
|
||||||
if t - self.last_event > 1:
|
self.logger.info(f"Converted media moved: {event.src_path} -> {event.dest_path}")
|
||||||
self.last_event = t
|
self.rename_metadata(event.src_path, event.dest_path)
|
||||||
|
return super().on_moved(event)
|
||||||
|
|
||||||
|
def on_closed(self, event: FileClosedEvent):
|
||||||
|
self.logger.info(f"Converted media created or modified: {event.src_path}")
|
||||||
|
self.extract_metadata(event.src_path)
|
||||||
|
return super().on_closed(event)
|
||||||
|
|
||||||
|
def extract_metadata(self, path: str):
|
||||||
|
pass
|
||||||
|
|
||||||
def main():
|
def rename_metadata(self, src: str, dst: str):
|
||||||
parser = argparse.ArgumentParser(
|
pass
|
||||||
description="Starts the Melies server",
|
|
||||||
formatter_class=argparse.RawTextHelpFormatter
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"-p", "--port",
|
|
||||||
action=EnvDefault,
|
|
||||||
envvar="MELIES_PORT",
|
|
||||||
default=8000,
|
|
||||||
type=int,
|
|
||||||
help="Port on which the server listens"
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"--max-payload-size",
|
|
||||||
action=EnvDefault,
|
|
||||||
envvar="MELIES_MAX_PAYLOAD_SIZE",
|
|
||||||
default=1e6,
|
|
||||||
type=int,
|
|
||||||
help="Maximum POST payload size in bytes that the server accepts"
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"--to-convert-dir",
|
|
||||||
action=EnvDefault,
|
|
||||||
envvar="MELIES_TO_CONVERT_DIR",
|
|
||||||
default="to_convert",
|
|
||||||
help="Path to the directory containing medias to convert"
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"--converted-dir",
|
|
||||||
action=EnvDefault,
|
|
||||||
envvar="MELIES_CONVERTED_DIR",
|
|
||||||
default="converted",
|
|
||||||
help="Path to the directory containing converted medias"
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"--metadata-dir",
|
|
||||||
action=EnvDefault,
|
|
||||||
envvar="MELIES_METADATA_DIR",
|
|
||||||
default="metadata",
|
|
||||||
help="Path to the directory containing metadata files"
|
|
||||||
)
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
server = MeliesServer(
|
def delete_metadata(self, path: str):
|
||||||
args.port,
|
pass
|
||||||
args.to_convert_dir,
|
|
||||||
args.converted_dir,
|
|
||||||
args.metadata_dir,
|
|
||||||
args.max_payload_size
|
|
||||||
)
|
|
||||||
server.start()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
|
@ -1,273 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
import argparse
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
import subprocess
|
|
||||||
import sys
|
|
||||||
|
|
||||||
def read_metadata_json(json_file):
|
|
||||||
"""
|
|
||||||
Read metadata from a JSON file.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
json_file (str): Path to the JSON file
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
dict: Metadata information
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
with open(json_file, 'r', encoding='utf-8') as f:
|
|
||||||
metadata = json.load(f)
|
|
||||||
return metadata
|
|
||||||
except Exception as e:
|
|
||||||
print(f"❌ Error reading JSON file: {str(e)}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
def write_metadata_to_video(metadata, input_file, output_file=None):
|
|
||||||
"""
|
|
||||||
Write metadata to a video file using mkvmerge.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
metadata (dict): Metadata information
|
|
||||||
input_file (str): Path to the input video file
|
|
||||||
output_file (str, optional): Path to the output video file
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
bool: True if successful, False otherwise
|
|
||||||
"""
|
|
||||||
if not os.path.isfile(input_file):
|
|
||||||
print(f"❌ Input file not found: {input_file}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
if not output_file:
|
|
||||||
# Create a temporary output file
|
|
||||||
base_name, ext = os.path.splitext(input_file)
|
|
||||||
output_file = f"{base_name}_modified{ext}"
|
|
||||||
|
|
||||||
# Start building the mkvmerge command
|
|
||||||
cmd = ["mkvmerge", "-o", output_file]
|
|
||||||
|
|
||||||
# Add global metadata (title)
|
|
||||||
if "title" in metadata:
|
|
||||||
cmd.extend(["--title", metadata["title"]])
|
|
||||||
|
|
||||||
# Process audio tracks
|
|
||||||
for track in metadata.get("audio_tracks", []):
|
|
||||||
# Use the actual track index from the metadata
|
|
||||||
track_id = track.get("index", 0)
|
|
||||||
|
|
||||||
# Set language
|
|
||||||
if "language" in track:
|
|
||||||
cmd.extend([f"--language", f"{track_id}:{track['language']}"])
|
|
||||||
|
|
||||||
# Set title/name
|
|
||||||
if "name" in track and track["name"]:
|
|
||||||
cmd.extend([f"--track-name", f"{track_id}:{track['name']}"])
|
|
||||||
|
|
||||||
# Set disposition flags
|
|
||||||
flags = track.get("flags", {})
|
|
||||||
|
|
||||||
if flags.get("default", False):
|
|
||||||
cmd.extend([f"--default-track", f"{track_id}:yes"])
|
|
||||||
else:
|
|
||||||
cmd.extend([f"--default-track", f"{track_id}:no"])
|
|
||||||
|
|
||||||
if flags.get("forced", False):
|
|
||||||
cmd.extend([f"--forced-track", f"{track_id}:yes"])
|
|
||||||
else:
|
|
||||||
cmd.extend([f"--forced-track", f"{track_id}:no"])
|
|
||||||
|
|
||||||
if flags.get("original", False):
|
|
||||||
cmd.extend([f"--original-flag", f"{track_id}:yes"])
|
|
||||||
else:
|
|
||||||
cmd.extend([f"--original-flag", f"{track_id}:no"])
|
|
||||||
|
|
||||||
# Process subtitle tracks
|
|
||||||
for track in metadata.get("subtitle_tracks", []):
|
|
||||||
# Use the actual track index from the metadata
|
|
||||||
track_id = track.get("index", 0)
|
|
||||||
|
|
||||||
# Set language
|
|
||||||
if "language" in track:
|
|
||||||
cmd.extend([f"--language", f"{track_id}:{track['language']}"])
|
|
||||||
|
|
||||||
# Set title/name
|
|
||||||
if "name" in track and track["name"]:
|
|
||||||
cmd.extend([f"--track-name", f"{track_id}:{track['name']}"])
|
|
||||||
|
|
||||||
# Set disposition flags
|
|
||||||
flags = track.get("flags", {})
|
|
||||||
|
|
||||||
if flags.get("default", False):
|
|
||||||
cmd.extend([f"--default-track", f"{track_id}:yes"])
|
|
||||||
else:
|
|
||||||
cmd.extend([f"--default-track", f"{track_id}:no"])
|
|
||||||
|
|
||||||
if flags.get("forced", False):
|
|
||||||
cmd.extend([f"--forced-track", f"{track_id}:yes"])
|
|
||||||
else:
|
|
||||||
cmd.extend([f"--forced-track", f"{track_id}:no"])
|
|
||||||
|
|
||||||
if flags.get("original", False):
|
|
||||||
cmd.extend([f"--original-flag", f"{track_id}:yes"])
|
|
||||||
else:
|
|
||||||
cmd.extend([f"--original-flag", f"{track_id}:no"])
|
|
||||||
|
|
||||||
# Add input file
|
|
||||||
cmd.append(input_file)
|
|
||||||
|
|
||||||
# Execute the mkvmerge command
|
|
||||||
print(f"🔄 Writing metadata to {os.path.basename(output_file)}")
|
|
||||||
print(f"Command: {' '.join(cmd)}")
|
|
||||||
|
|
||||||
try:
|
|
||||||
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
||||||
if result.returncode != 0:
|
|
||||||
print(f"❌ Error writing metadata: {result.stderr}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
print(f"✅ Metadata written to {output_file}")
|
|
||||||
return True
|
|
||||||
except Exception as e:
|
|
||||||
print(f"❌ Error executing mkvmerge: {str(e)}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def process_single_file(metadata, video_file, output_dir=None):
|
|
||||||
"""
|
|
||||||
Process a single video file with the given metadata.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
metadata (dict): Metadata for the video file
|
|
||||||
video_file (str): Path to the video file
|
|
||||||
output_dir (str, optional): Directory to save the output file
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
bool: True if successful, False otherwise
|
|
||||||
"""
|
|
||||||
if not os.path.isfile(video_file):
|
|
||||||
print(f"❌ Video file not found: {video_file}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Create output file path
|
|
||||||
if output_dir:
|
|
||||||
# Ensure output directory exists
|
|
||||||
os.makedirs(output_dir, exist_ok=True)
|
|
||||||
|
|
||||||
# Use the same filename in the output directory
|
|
||||||
output_file = os.path.join(output_dir, os.path.basename(video_file))
|
|
||||||
else:
|
|
||||||
output_file = None # Let write_metadata_to_video create a default output file
|
|
||||||
|
|
||||||
# Write metadata to video
|
|
||||||
return write_metadata_to_video(metadata, video_file, output_file)
|
|
||||||
|
|
||||||
def process_directory(metadata_dict, source_dir, output_dir=None):
|
|
||||||
"""
|
|
||||||
Process all video files in the metadata dictionary.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
metadata_dict (dict): Dictionary of metadata keyed by filename
|
|
||||||
source_dir (str): Directory containing the video files
|
|
||||||
output_dir (str, optional): Directory to save the output files
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
bool: True if all files were processed successfully, False otherwise
|
|
||||||
"""
|
|
||||||
if not os.path.isdir(source_dir):
|
|
||||||
print(f"❌ Source directory not found: {source_dir}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Create output directory if specified
|
|
||||||
if output_dir:
|
|
||||||
os.makedirs(output_dir, exist_ok=True)
|
|
||||||
|
|
||||||
success = True
|
|
||||||
processed_count = 0
|
|
||||||
|
|
||||||
# Process each file in the metadata dictionary
|
|
||||||
for filename, file_metadata in metadata_dict.items():
|
|
||||||
# Construct the full path to the video file
|
|
||||||
video_file = os.path.join(source_dir, filename)
|
|
||||||
|
|
||||||
if not os.path.isfile(video_file):
|
|
||||||
print(f"❌ Video file not found: {video_file}")
|
|
||||||
success = False
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Process the file
|
|
||||||
if process_single_file(file_metadata, video_file, output_dir):
|
|
||||||
processed_count += 1
|
|
||||||
else:
|
|
||||||
success = False
|
|
||||||
|
|
||||||
print(f"✅ Processed {processed_count} out of {len(metadata_dict)} files")
|
|
||||||
return success
|
|
||||||
|
|
||||||
def main():
|
|
||||||
parser = argparse.ArgumentParser(description="Write metadata from JSON to video files.")
|
|
||||||
parser.add_argument("json_file", help="Path to input JSON metadata file")
|
|
||||||
parser.add_argument("-o", "--output", help="Path to output directory")
|
|
||||||
parser.add_argument("-s", "--source", help="Source directory (overrides automatic detection)")
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
json_file = args.json_file
|
|
||||||
output_dir = args.output
|
|
||||||
source_dir = args.source
|
|
||||||
|
|
||||||
if not os.path.isfile(json_file):
|
|
||||||
print(f"❌ JSON file not found: {json_file}")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
# Read metadata from JSON
|
|
||||||
metadata = read_metadata_json(json_file)
|
|
||||||
if not metadata:
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
# Determine if the JSON contains metadata for multiple files or a single file
|
|
||||||
is_multi_file = isinstance(metadata, dict) and all(isinstance(metadata[key], dict) for key in metadata)
|
|
||||||
|
|
||||||
# If source directory is not specified, try to determine it from the JSON filename
|
|
||||||
if not source_dir and is_multi_file:
|
|
||||||
# Extract folder name from JSON filename (e.g., "Millenium" from "Millenium_metadata.json")
|
|
||||||
json_basename = os.path.basename(json_file)
|
|
||||||
if "_metadata.json" in json_basename:
|
|
||||||
folder_name = json_basename.split("_metadata.json")[0]
|
|
||||||
potential_source_dir = os.path.join(os.path.dirname(os.path.abspath(json_file)), folder_name)
|
|
||||||
|
|
||||||
if os.path.isdir(potential_source_dir):
|
|
||||||
source_dir = potential_source_dir
|
|
||||||
print(f"📂 Using source directory: {source_dir}")
|
|
||||||
|
|
||||||
# If no output directory is specified, create one based on the source directory
|
|
||||||
if not output_dir and source_dir:
|
|
||||||
output_dir = os.path.join("ready", os.path.basename(source_dir))
|
|
||||||
print(f"📂 Using output directory: {output_dir}")
|
|
||||||
|
|
||||||
# Process files based on the metadata format
|
|
||||||
if is_multi_file:
|
|
||||||
if not source_dir:
|
|
||||||
print("❌ Source directory not specified and could not be determined automatically.")
|
|
||||||
print(" Please specify a source directory with --source or use a JSON filename like 'FolderName_metadata.json'")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
success = process_directory(metadata, source_dir, output_dir)
|
|
||||||
else:
|
|
||||||
# Single file metadata
|
|
||||||
if "filename" not in metadata:
|
|
||||||
print("❌ Invalid metadata format: missing 'filename' field")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
# If source directory is specified, look for the file there
|
|
||||||
if source_dir:
|
|
||||||
video_file = os.path.join(source_dir, metadata["filename"])
|
|
||||||
else:
|
|
||||||
# Look for the file in the same directory as the JSON
|
|
||||||
video_file = os.path.join(os.path.dirname(json_file), metadata["filename"])
|
|
||||||
|
|
||||||
success = process_single_file(metadata, video_file, output_dir)
|
|
||||||
|
|
||||||
if not success:
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
Reference in New Issue
Block a user