Compare commits

7 Commits

24 changed files with 890 additions and 729 deletions

View File

@ -1,4 +1,4 @@
FROM debian:bullseye-slim AS builder
FROM debian:bookworm-slim AS builder
# Install ffmpeg and mkvtoolnix
# but only keep the binaries and libs for ffprobe and mkvmerge
@ -7,11 +7,13 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
&& mkdir -p /artifacts/bin /artifacts/lib \
&& cp $(which ffprobe) /artifacts/bin/ \
&& cp $(which mkvmerge) /artifacts/bin/ \
&& cp $(which mkvpropedit) /artifacts/bin/ \
&& ldd $(which ffprobe) | awk '{print $3}' | xargs -I '{}' cp -v '{}' /artifacts/lib/ || true \
&& ldd $(which mkvmerge) | awk '{print $3}' | xargs -I '{}' cp -v '{}' /artifacts/lib/ || true
&& ldd $(which mkvmerge) | awk '{print $3}' | xargs -I '{}' cp -v '{}' /artifacts/lib/ || true \
&& ldd $(which mkvpropedit) | awk '{print $3}' | xargs -I '{}' cp -v '{}' /artifacts/lib/ || true
# Must be the same base as builder image for shared libraries compatibility
FROM python:3.13.3-slim-bullseye
FROM python:3.13.3-slim-bookworm
COPY --from=builder /artifacts/bin/* /usr/local/bin/
COPY --from=builder /artifacts/lib/* /usr/local/lib/
@ -26,4 +28,4 @@ COPY . .
EXPOSE 8000
CMD ["python", "src/server.py"]
CMD ["python", "-m", "scripts.server"]

0
__init__.py Normal file
View File

0
scripts/__init__.py Normal file
View File

47
scripts/extract_metadata.py Executable file
View File

@ -0,0 +1,47 @@
#!/usr/bin/env python3
import argparse
import logging
import os
import sys
from src.metadata_extractor import MetadataExtractor
def main():
logging.basicConfig(
level=logging.INFO,
format="[%(levelname)s] %(message)s"
)
parser = argparse.ArgumentParser(
description="Extract metadata from video files and save as JSON"
)
parser.add_argument(
"input",
help="Path to input video file or directory"
)
parser.add_argument(
"-o", "--output",
help="Directory path where the output JSON files will be saved"
)
args = parser.parse_args()
input_path = args.input
output_dir = args.output
extractor: MetadataExtractor = MetadataExtractor()
success = False
if os.path.isfile(input_path):
success = extractor.process_file(input_path, output_dir)
elif os.path.isdir(input_path):
success = extractor.process_directory(input_path, output_dir)
else:
logging.error(f"Path not found: {input_path}")
if not success:
sys.exit(1)
if __name__ == "__main__":
main()

71
scripts/server.py Executable file
View File

@ -0,0 +1,71 @@
#!/usr/bin/env python3
import argparse
import logging
from src.env_default import EnvDefault
from src.server import MeliesServer
def main():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt=r"%Y-%m-%d %H:%M:%S"
)
parser = argparse.ArgumentParser(
description="Starts the Melies server",
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument(
"-p", "--port",
action=EnvDefault,
envvar="MELIES_PORT",
default=8000,
type=int,
help="Port on which the server listens"
)
parser.add_argument(
"--max-payload-size",
action=EnvDefault,
envvar="MELIES_MAX_PAYLOAD_SIZE",
default=1e6,
type=int,
help="Maximum POST payload size in bytes that the server accepts"
)
parser.add_argument(
"--to-convert-dir",
action=EnvDefault,
envvar="MELIES_TO_CONVERT_DIR",
default="to_convert",
help="Path to the directory containing medias to convert"
)
parser.add_argument(
"--converted-dir",
action=EnvDefault,
envvar="MELIES_CONVERTED_DIR",
default="converted",
help="Path to the directory containing converted medias"
)
parser.add_argument(
"--metadata-dir",
action=EnvDefault,
envvar="MELIES_METADATA_DIR",
default="metadata",
help="Path to the directory containing metadata files"
)
args = parser.parse_args()
server = MeliesServer(
args.port,
args.to_convert_dir,
args.converted_dir,
args.metadata_dir,
args.max_payload_size
)
server.start()
if __name__ == "__main__":
main()

45
scripts/write_metadata.py Normal file
View File

@ -0,0 +1,45 @@
#!/usr/bin/env python3
import argparse
import logging
import sys
from src.metadata_writer import MetadataWriter
def main():
logging.basicConfig(
level=logging.INFO,
format="[%(levelname)s] %(message)s"
)
parser = argparse.ArgumentParser(
description="Write metadata from JSON to video files"
)
parser.add_argument(
"json_file",
help="Path to input JSON metadata file"
)
parser.add_argument(
"-o", "--output",
help="Path of the output directory"
)
parser.add_argument(
"-s", "--source",
help="Source directory (overrides automatic detection)"
)
args = parser.parse_args()
json_file = args.json_file
output_dir = args.output
source_dir = args.source
writer: MetadataWriter = MetadataWriter()
success: bool = writer.process_metadata(json_file, source_dir, output_dir)
if not success:
sys.exit(1)
if __name__ == "__main__":
main()

0
src/__init__.py Normal file
View File

25
src/env_default.py Normal file
View File

@ -0,0 +1,25 @@
from __future__ import annotations
import argparse
import os
# https://stackoverflow.com/a/10551190/11109181
class EnvDefault(argparse.Action):
def __init__(self, envvar, required=True, default=None, help=None, **kwargs):
if envvar:
if envvar in os.environ:
default = os.environ[envvar]
if required and default is not None:
required = False
if default is not None and help is not None:
help += f" (default: {default})"
if envvar and help is not None:
help += f"\nCan also be specified through the {envvar} environment variable"
super(EnvDefault, self).__init__(default=default, required=required, help=help,
**kwargs)
def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, self.dest, values)

106
src/file_handlers.py Normal file
View File

@ -0,0 +1,106 @@
import json
import os
from typing import Optional
class FileHandler:
def __init__(self, directory: str):
self.cache: dict[str, dict] = {}
self.directory: str = directory
def get_files(self, base_path: str = ""):
root_path: str = os.path.abspath(self.directory)
full_path: str = os.path.join(root_path, base_path)
full_path = os.path.abspath(full_path)
common_prefix: str = os.path.commonprefix([full_path, root_path])
if common_prefix != root_path:
return []
return [
os.path.join(base_path, f)
for f in os.listdir(full_path)
]
def get_files_meta(self, base_path: str = ""):
files: list[str] = self.get_files(base_path)
files = [
os.path.join(self.directory, f)
for f in files
]
files_meta: list[dict] = []
deleted = set(self.cache.keys()) - set(files)
for path in deleted:
del self.cache[path]
for path in files:
last_modified: float = os.path.getmtime(path)
if path not in self.cache or self.cache[path]["ts"] < last_modified:
self.update_meta(path)
files_meta.append(self.cache[path])
return files_meta
def update_meta(self, path: str) -> None:
self.cache[path] = self.get_meta(path)
def get_meta(self, path: str) -> dict:
return {
"path": os.path.relpath(path, self.directory),
"filename": os.path.basename(path),
"ts": os.path.getmtime(path)
}
class JsonFileHandler(FileHandler):
def read(self, path: str) -> Optional[dict|list]:
if path not in self.get_files():
return None
with open(os.path.join(self.directory, path), "r") as f:
data = json.load(f)
return data
def write(self, path: str, data: dict|list) -> bool:
if path not in self.get_files():
return False
try:
with open(os.path.join(self.directory, path), "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
except:
return False
return True
class MetadataFileHandler(JsonFileHandler):
def get_meta(self, path: str) -> dict:
meta: dict = super().get_meta(path)
with open(path, "r") as f:
data = json.load(f)
is_series = "filename" not in data
meta["type"] = "series" if is_series else "film"
if is_series:
meta["episodes"] = len(data)
meta["title"] = meta["filename"].split("_metadata")[0]
else:
meta["title"] = data["title"]
return meta
class ToConvertFileHandler(FileHandler):
def get_meta(self, path: str) -> dict:
meta: dict = super().get_meta(path)
is_dir: bool = os.path.isdir(path)
meta["size"] = os.path.getsize(path)
meta["type"] = "folder" if is_dir else "media"
if is_dir:
meta["elements"] = len(os.listdir(path))
if not meta["path"].endswith("/"):
meta["path"] += "/"
return meta

View File

@ -1,196 +0,0 @@
#!/usr/bin/env python3
import argparse
import os
import subprocess
import json
import sys
SUPPORTED_EXTENSIONS = (".mp4", ".mkv", ".mov", ".avi")
def get_video_metadata(file_path):
"""
Extract metadata from a video file using ffprobe.
Args:
file_path (str): Path to the video file
Returns:
dict: Metadata information
"""
# Get general file info
cmd = [
"ffprobe", "-v", "quiet", "-print_format", "json",
"-show_format", "-show_streams", file_path
]
try:
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
print(f"❌ Error processing {file_path}: {result.stderr}")
return None
data = json.loads(result.stdout)
# Extract filename and title
filename = os.path.basename(file_path)
title = data.get("format", {}).get("tags", {}).get("title", filename)
# Initialize metadata structure
metadata = {
"filename": filename,
"title": title,
"audio_tracks": [],
"subtitle_tracks": []
}
# Process streams
for stream in data.get("streams", []):
codec_type = stream.get("codec_type")
if codec_type == "audio":
track = {
"index": stream.get("index"),
"language": stream.get("tags", {}).get("language", "und"),
"name": stream.get("tags", {}).get("title", ""),
"channels": stream.get("channels", 0),
"flags": {
"default": stream.get("disposition", {}).get("default", 0) == 1,
"visual_impaired": stream.get("disposition", {}).get("visual_impaired", 0) == 1,
"original": stream.get("disposition", {}).get("original", 0) == 1,
"commentary": stream.get("disposition", {}).get("comment", 0) == 1
}
}
metadata["audio_tracks"].append(track)
elif codec_type == "subtitle":
track = {
"index": stream.get("index"),
"language": stream.get("tags", {}).get("language", "und"),
"name": stream.get("tags", {}).get("title", ""),
"flags": {
"default": stream.get("disposition", {}).get("default", 0) == 1,
"forced": stream.get("disposition", {}).get("forced", 0) == 1,
"hearing_impaired": stream.get("disposition", {}).get("hearing_impaired", 0) == 1,
"original": stream.get("disposition", {}).get("original", 0) == 1,
"commentary": stream.get("disposition", {}).get("comment", 0) == 1
}
}
metadata["subtitle_tracks"].append(track)
return metadata
except Exception as e:
print(f"❌ Error processing {file_path}: {str(e)}")
return None
def process_file(file_path, output_dir=None):
"""
Process a single video file and write metadata to JSON.
Args:
file_path (str): Path to the video file
output_dir (str, optional): Directory where the output JSON file will be saved
"""
if not os.path.isfile(file_path):
print(f"❌ File not found: {file_path}")
return False
if not file_path.lower().endswith(SUPPORTED_EXTENSIONS):
print(f"❌ Unsupported file format: {file_path}")
return False
print(f"📊 Extracting metadata from {os.path.basename(file_path)}")
metadata = get_video_metadata(file_path)
if metadata:
# Generate output filename based on input file
filename = os.path.basename(os.path.splitext(file_path)[0]) + "_metadata.json"
if output_dir:
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, filename)
else:
# If no output directory specified, save in the same directory as the input file
base_name = os.path.splitext(file_path)[0]
output_path = f"{base_name}_metadata.json"
# Write metadata to JSON file
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
print(f"✅ Metadata saved to {output_path}")
return True
return False
def process_directory(directory_path, output_dir=None):
"""
Process all video files in a directory and write metadata to JSON.
Args:
directory_path (str): Path to the directory
output_dir (str, optional): Directory where the output JSON file will be saved
"""
if not os.path.isdir(directory_path):
print(f"❌ Directory not found: {directory_path}")
return False
all_metadata = {}
file_count = 0
for root, _, files in os.walk(directory_path):
for file in files:
if file.lower().endswith(SUPPORTED_EXTENSIONS):
file_path = os.path.join(root, file)
print(f"📊 Extracting metadata from {file}")
metadata = get_video_metadata(file_path)
if metadata:
# Use relative path as key
rel_path = os.path.relpath(file_path, directory_path)
all_metadata[rel_path] = metadata
file_count += 1
if file_count == 0:
print(f"❌ No supported video files found in {directory_path}")
return False
# Generate output filename based on directory name
dir_name = os.path.basename(os.path.normpath(directory_path))
filename = f"{dir_name}_metadata.json"
if output_dir:
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, filename)
else:
# If no output directory specified, save in the current directory
output_path = filename
# Write all metadata to a single JSON file
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(all_metadata, f, indent=2, ensure_ascii=False)
print(f"✅ Metadata for {file_count} files saved to {output_path}")
return True
def main():
parser = argparse.ArgumentParser(description="Extract metadata from video files and save as JSON.")
parser.add_argument("input", help="Path to input video file or directory")
parser.add_argument("-o", "--output", help="Directory path where output JSON files will be saved")
args = parser.parse_args()
input_path = args.input
output_dir = args.output
if os.path.isfile(input_path):
process_file(input_path, output_dir)
elif os.path.isdir(input_path):
process_directory(input_path, output_dir)
else:
print(f"❌ Path not found: {input_path}")
sys.exit(1)
if __name__ == "__main__":
main()

191
src/metadata_extractor.py Normal file
View File

@ -0,0 +1,191 @@
import json
import logging
import os
import subprocess
from typing import Optional
class MetadataExtractor:
SUPPORTED_EXTENSIONS = (".mp4", ".mkv", ".mov", ".avi")
def __init__(self):
self.logger: logging.Logger = logging.getLogger("MetadataExtractor")
def analyze_file(self, path: str) -> Optional[dict]:
"""
Extracts metadata from a video file using ffprobe
:param path: Path to the video file
:return: Metadata information or ``None`` if an error occurred
"""
# Get general file info in JSON format
cmd: list[str] = [
"ffprobe",
"-v", "quiet",
"-print_format", "json",
"-show_format",
"-show_streams",
path
]
try:
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
self.logger.error(f"Error processing {path}: {result.stderr}")
return None
data: dict = json.loads(result.stdout)
# Extract filename and title
filename: str = os.path.basename(path)
title: str = data.get("format", {}).get("tags", {}).get("title", filename)
# Initialize metadata structure
metadata: dict = {
"filename": filename,
"title": title,
"audio_tracks": [],
"subtitle_tracks": []
}
# Process streams
for stream in data.get("streams", []):
codec_type = stream.get("codec_type")
tags = stream.get("tags", {})
disposition = stream.get("disposition", {})
track = {
"index": stream.get("index"),
"language": tags.get("language", "und"),
"name": tags.get("title", ""),
"flags": {
"default": disposition.get("default", 0) == 1,
"original": disposition.get("original", 0) == 1,
"commentary": disposition.get("commentary", 0) == 1
}
}
if codec_type == "audio":
track |= {
"channels": stream.get("channels", 0)
}
track["flags"] |= {
"visual_impaired": disposition.get("visual_impaired", 0) == 1
}
metadata["audio_tracks"].append(track)
elif codec_type == "subtitle":
track["flags"] |= {
"forced": disposition.get("forced", 0) == 1,
"hearing_impaired": disposition.get("hearing_impaired", 0) == 1
}
metadata["subtitle_tracks"].append(track)
elif codec_type == "video":
pass
elif codec_type == "button":
pass
else:
self.logger.warning(f"Unknown track codec type '{codec_type}'")
return metadata
except Exception as e:
self.logger.error(f"Error processing {path}: {str(e)}")
return None
def process_file(self, file_path: str, output_dir: str) -> bool:
"""
Processes a single video file and writes metadata to a JSON file
:param file_path: Path of the video file
:param output_dir: Path of the directory where the output JSON file will be saved
:return: True if successful, False otherwise
"""
if not os.path.isfile(file_path):
self.logger.error(f"File not found: {file_path}")
return False
if not file_path.lower().endswith(self.SUPPORTED_EXTENSIONS):
self.logger.error(f"Unsupported file format: {file_path}")
return False
self.logger.debug(f"Extracting metadata from {os.path.basename(file_path)}")
metadata: Optional[dict] = self.analyze_file(file_path)
if metadata:
# Generate output filename based on input file
filename = os.path.basename(os.path.splitext(file_path)[0]) + "_metadata.json"
if output_dir:
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, filename)
else:
# If no output directory specified, save in the same directory as the input file
base_name = os.path.splitext(file_path)[0]
output_path = f"{base_name}_metadata.json"
# Write metadata to JSON file
with open(output_path, "w", encoding="utf-8") as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
self.logger.debug(f"Metadata saved to {output_path}")
return True
return False
def process_directory(self, directory_path: str, output_dir: Optional[str] = None) -> bool:
"""
Processes all video files in a directory and writes metadata to a JSON file
:param directory_path: Path of the directory
:param output_dir: Path of the directory where the output JSON file will be saved
:return: True if successful, False otherwise
"""
if not os.path.isdir(directory_path):
self.logger.error(f"Directory not found: {directory_path}")
return False
all_metadata: dict[str, dict] = {}
file_count: int = 0
for root, _, files in os.walk(directory_path):
for file in files:
if file.lower().endswith(self.SUPPORTED_EXTENSIONS):
file_path: str = os.path.join(root, file)
self.logger.debug(f"Extracting metadata from {file}")
metadata: Optional[dict] = self.analyze_file(file_path)
if metadata:
# Use relative path as key
rel_path: str = os.path.relpath(file_path, directory_path)
all_metadata[rel_path] = metadata
file_count += 1
if file_count == 0:
self.logger.error(f"No supported video files found in {directory_path}")
return False
# Generate output filename based on directory name
dir_name: str = os.path.basename(os.path.normpath(directory_path))
filename: str = f"{dir_name}_metadata.json"
if output_dir is not None:
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, filename)
else:
# If no output directory specified, save in the current directory
output_path = filename
# Write all metadata to a single JSON file
with open(output_path, "w", encoding="utf-8") as f:
json.dump(all_metadata, f, indent=2, ensure_ascii=False)
self.logger.debug(f"Metadata for {file_count} files saved to {output_path}")
return True

286
src/metadata_writer.py Normal file
View File

@ -0,0 +1,286 @@
import json
import logging
import os
import subprocess
from typing import Optional
class MetadataWriter:
SUPPORTED_EXTENSIONS = (".mp4", ".mkv", ".mov", ".avi")
def __init__(self):
self.logger: logging.Logger = logging.getLogger("MetadataWriter")
@staticmethod
def get_mkvmerge_cmd(metadata: dict, in_path: str, out_path: str) -> list[str]:
cmd: list[str] = [
"mkvmerge",
"-o", out_path
]
# Add global metadata (title)
if "title" in metadata:
cmd.extend(["--title", metadata["title"]])
# Process audio + subtitle tracks
tracks: list[dict] = metadata.get("audio_tracks", []) + metadata.get("subtitle_tracks", [])
for track in tracks:
# Use the actual track index from the metadata
track_id = track.get("index", 0)
# Set language
if "language" in track:
cmd.extend(["--language", f"{track_id}:{track["language"]}"])
# Set title/name
if "name" in track and track["name"]:
cmd.extend(["--track-name", f"{track_id}:{track["name"]}"])
# Set disposition flags
flags = track.get("flags", {})
def yes_no(flag: str):
return f"{track_id}:{"yes" if flags.get(flag, False) else "no"}"
cmd.extend(["--default-track", yes_no("default")])
cmd.extend(["--forced-track", yes_no("forced")])
cmd.extend(["--original-flag", yes_no("original")])
# Add input file
cmd.append(in_path)
return cmd
@staticmethod
def get_mkvpropedit_cmd(metadata: dict, path: str) -> list[str]:
cmd: list[str] = [
"mkvpropedit",
path
]
# Add global metadata (title)
if "title" in metadata:
cmd.extend(["--edit", "info", "--set", f"title={metadata["title"]}"])
# Process audio + subtitle tracks
tracks: list[dict] = metadata.get("audio_tracks", []) + metadata.get("subtitle_tracks", [])
for track in tracks:
# Use the actual track index from the metadata
track_id = track.get("index", 0)
cmd.extend(["--edit", f"track:{track_id}"])
# Set language
if "language" in track:
cmd.extend(["--set", f"language={track["language"]}"])
# Set title/name
if "name" in track and track["name"]:
cmd.extend(["--set", f"name={track["name"]}"])
# Set disposition flags
flags = track.get("flags", {})
cmd.extend(["--set", f"flag-default={int(flags.get("default", False))}"])
cmd.extend(["--set", f"flag-forced={int(flags.get("forced", False))}"])
cmd.extend(["--set", f"flag-original={int(flags.get("original", False))}"])
return cmd
def apply_metadata(self, metadata: dict, in_path: str, out_path: Optional[str] = None) -> bool:
"""
Writes metadata to a video file using mkvmerge or mkvpropedit
:param metadata: Metadata information
:param in_path: Path of the input video file
:param out_path: Path of the output video file. If None, ``"_modified"`` is appended to ``in_path`` instead
:return: True if successful, False otherwise
"""
if not os.path.isfile(in_path):
self.logger.error(f"Input file not found: {in_path}")
return False
if out_path is None:
# Create a temporary output file
base_name, ext = os.path.splitext(in_path)
out_path: str = f"{base_name}_modified{ext}"
# Build the command
overwriting: bool = os.path.abspath(in_path) == os.path.abspath(out_path)
cmd: list[str] = (
self.get_mkvpropedit_cmd(metadata, in_path)
if overwriting else
self.get_mkvmerge_cmd(metadata, in_path, out_path)
)
# Execute the command
self.logger.debug(f"Writing metadata to {os.path.basename(out_path)}")
try:
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
self.logger.error(f"Error writing metadata: {result.stderr}")
return False
self.logger.debug(f"Metadata written to {out_path}")
return True
except Exception as e:
self.logger.error(f"Error executing {cmd[0]}: {str(e)}")
return False
@staticmethod
def read_metadata(path: str) -> Optional[dict]:
try:
with open(path, "r") as f:
metadata: dict = json.load(f)
return metadata
except:
return None
def process_file(self, metadata_or_path: str|dict, file_path: str, output_dir: Optional[str] = None) -> bool:
"""
Processes a single video file with the given metadata
:param metadata_or_path: Metadata dict or path of the metadata file
:param file_path: Path of the video file
:param output_dir: Directory to save the output file to
:return: True if successful, False otherwise
"""
metadata: dict
if isinstance(metadata_or_path, str):
metadata = self.read_metadata(metadata_or_path)
if metadata is None:
return False
else:
metadata = metadata_or_path
# Create output file path
if output_dir is not None:
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
# Use the same filename in the output directory
output_file = os.path.join(output_dir, os.path.basename(file_path))
else:
output_file = None
# Write metadata to video
return self.apply_metadata(metadata, file_path, output_file)
def process_directory(self, metadata_or_path: str|dict, source_dir: str, output_dir: Optional[str] = None) -> bool:
"""
Processes all video files in the metadata dictionary
:param metadata_or_path: Dictionary of metadata keyed by filename
:param source_dir: Directory containing the video files
:param output_dir: Directory to save the output files to
:return: True if all files were processed successfully, False otherwise
"""
metadata: dict
if isinstance(metadata_or_path, str):
metadata = self.read_metadata(metadata_or_path)
if metadata is None:
return False
else:
metadata = metadata_or_path
if not os.path.isdir(source_dir):
self.logger.error(f"Source directory not found: {source_dir}")
return False
# Create output directory if specified
if output_dir:
os.makedirs(output_dir, exist_ok=True)
success: bool = True
processed_count: int = 0
# Process each file in the metadata dictionary
for filename, file_metadata in metadata.items():
# Construct the full path to the video file
video_file: str = os.path.join(source_dir, filename)
if not os.path.isfile(video_file):
self.logger.error(f"Video file not found: {video_file}")
success = False
continue
# Process the file
if self.process_file(file_metadata, video_file, output_dir):
processed_count += 1
else:
success = False
self.logger.debug(f"Processed {processed_count} out of {len(metadata)} files")
return success
def process_metadata(self, metadata_or_path: str|dict, source_dir: Optional[str] = None, output_dir: Optional[str] = None) -> bool:
metadata_as_path: bool = isinstance(metadata_or_path, str)
metadata: dict
if metadata_as_path:
metadata = self.read_metadata(metadata_or_path)
if metadata is None:
return False
else:
metadata = metadata_or_path
# Determine if the JSON contains metadata for multiple files or a single file
is_multi_file = isinstance(metadata, dict) and all(isinstance(metadata[key], dict) for key in metadata)
# If source directory is not specified, try to determine it from the JSON filename
if source_dir is None and is_multi_file and metadata_as_path:
# Extract folder name from JSON filename (e.g., "Millenium" from "Millenium_metadata.json")
json_basename: str = os.path.basename(metadata_or_path)
if json_basename.endswith("_metadata.json"):
folder_name: str = json_basename.split("_metadata.json")[0]
potential_source_dir: str = os.path.join(
os.path.dirname(os.path.abspath(metadata_or_path)),
folder_name
)
if os.path.isdir(potential_source_dir):
source_dir: str = potential_source_dir
self.logger.debug(f"Using source directory: {source_dir}")
# If no output directory is specified, create one based on the source directory
if output_dir is None and source_dir is not None:
output_dir = os.path.join("ready", os.path.basename(source_dir))
self.logger.debug(f"Using output directory: {output_dir}")
# Process files based on the metadata format
if is_multi_file:
if source_dir is None:
self.logger.error(
"Source directory not specified and could not be determined automatically. " +
"Please specify a source directory with --source or use a JSON filename like 'FolderName_metadata.json'"
)
return False
success = self.process_directory(metadata, source_dir, output_dir)
else:
# Single file metadata
if "filename" not in metadata:
self.logger.error("Invalid metadata format: missing 'filename' field")
return False
# If source directory is specified, look for the file there
video_file: str
if source_dir is not None:
video_file = os.path.join(source_dir, metadata["filename"])
elif metadata_as_path:
# Look for the file in the same directory as the JSON
video_file = os.path.join(os.path.dirname(metadata_or_path), metadata["filename"])
else:
self.logger.error(
"Source directory not specified and video path could not be determined automatically. " +
"Please specify a source directory with --source or use JSON filename like 'VideoName_metadata.json'"
)
return False
success = self.process_file(metadata, video_file, output_dir)
return success

View File

@ -10,7 +10,8 @@
<script src="/static/js/conversion.js"></script>
</head>
<body>
<header>
<header id="header">
<a href="/"><img class="logo" src="/static/images/icon3.svg"></a>
<h1>Media Conversion</h1>
</header>
<main>

View File

@ -10,8 +10,8 @@
<script src="/static/js/index.js"></script>
</head>
<body>
<header>
<img src="/static/images/icon3.svg">
<header id="header">
<a href="/"><img class="logo" src="/static/images/icon3.svg"></a>
<h1>Melies</h1>
</header>
<main>

View File

@ -14,6 +14,10 @@
<img src="/static/images/improve.svg">
<img class="clicked" src="/static/images/improve_clicked.svg">
</button>
<header id="header">
<a href="/"><img class="logo" src="/static/images/icon3.svg"></a>
<h1>Metadata Editor</h1>
</header>
<header id="toolbar">
<a href="/metadata/">Back</a>
<button id="check-integrity">Check integrity</button>

View File

@ -10,7 +10,8 @@
<script src="/static/js/metadata.js"></script>
</head>
<body>
<header>
<header id="header">
<a href="/"><img class="logo" src="/static/images/icon3.svg"></a>
<h1>Metadata Editor</h1>
</header>
<main>

View File

@ -1,3 +1,13 @@
@keyframes moon-pulse {
from {
filter: drop-shadow(0px 0px 0px #e7d7a8);
}
to {
filter: drop-shadow(0px 0px 12px white);
}
}
* {
margin: 0;
box-sizing: border-box;
@ -29,19 +39,17 @@ header {
gap: 0.8em;
color: white;
a, button {
padding: 0.4em 0.8em;
border: none;
color: black;
background-color: #e4e4e4;
font-size: inherit;
font-family: inherit;
text-decoration: none;
border-radius: 0.2em;
cursor: pointer;
&#header {
align-items: center;
&:hover {
background-color: #dbdbdb;
img.logo {
width: 4em;
height: 4em;
object-fit: contain;
&:hover {
animation: moon-pulse 2s alternate infinite linear;
}
}
}
}

View File

@ -4,6 +4,27 @@ main {
gap: 1.2em;
}
header#toolbar {
padding: 0.8em;
background-color: #4b4b4b;
a, button {
padding: 0.4em 0.8em;
border: none;
color: black;
background-color: #e4e4e4;
font-size: inherit;
font-family: inherit;
text-decoration: none;
border-radius: 0.2em;
cursor: pointer;
&:hover {
background-color: #dbdbdb;
}
}
}
#toggle-notifs {
margin-left: auto;
}

View File

@ -1,12 +1,3 @@
header {
align-items: center;
img {
width: 4em;
height: 4em;
object-fit: contain;
}
}
#pages {
display: grid;
max-width: calc(max(50%, 20em));

View File

@ -177,11 +177,7 @@ function showAgents() {
function updateConvertBtn() {
const agent = document.querySelector("#agents .agent input:checked")
const convertBtn = document.getElementById("convert")
if (agent) {
convertBtn.disabled = false
} else {
convertBtn.disabled = true
}
convertBtn.disabled = !agent
}
function addAgents(agents) {

View File

@ -224,7 +224,7 @@ export default class IntegrityManager {
if (parts.includes("pgs")) {
fields.type = "PGS"
} else {
} else if (parts.includes("srt")) {
fields.type = "SRT"
}
break
@ -316,7 +316,9 @@ export default class IntegrityManager {
if (fields.flags.hearing_impaired) {
name += " SDH"
}
name += " | " + fields.type
if (fields.type) {
name += " | " + fields.type
}
break
}
return name

View File

@ -119,6 +119,7 @@ export class Track {
input.value = value
break
default:
break

280
src/server.py Executable file → Normal file
View File

@ -1,55 +1,31 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import logging
import os
import socketserver
import time
from functools import partial
from http import HTTPStatus
from http.server import SimpleHTTPRequestHandler
import time
from logging import Logger
from typing import Optional
from urllib.parse import parse_qs, unquote, urlparse
from watchdog.events import DirModifiedEvent, FileSystemEventHandler
from watchdog.events import (FileClosedEvent, FileDeletedEvent, FileMovedEvent,
FileSystemEventHandler)
from watchdog.observers import Observer
from watchdog.observers.api import BaseObserver
# https://stackoverflow.com/a/10551190/11109181
class EnvDefault(argparse.Action):
def __init__(self, envvar, required=True, default=None, help=None, **kwargs):
if envvar:
if envvar in os.environ:
default = os.environ[envvar]
if required and default is not None:
required = False
if default is not None and help is not None:
help += f" (default: {default})"
if envvar and help is not None:
help += f"\nCan also be specified through the {envvar} environment variable"
super(EnvDefault, self).__init__(default=default, required=required, help=help,
**kwargs)
def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, self.dest, values)
from src.file_handlers import ToConvertFileHandler, MetadataFileHandler
class HTTPHandler(SimpleHTTPRequestHandler):
SERVER: MeliesServer = None
METADATA_CACHE = {}
TO_CONVERT_CACHE = {}
def __init__(self, *args, **kwargs):
self.MAX_PAYLOAD_SIZE: int = self.SERVER.max_payload_size
self.TO_CONVERT_DIR: str = self.SERVER.to_convert_dir
self.CONVERTED_DIR: str = self.SERVER.converted_dir
self.METADATA_DIR: str = self.SERVER.metadata_dir
def __init__(self, server: MeliesServer, *args, **kwargs):
self.server_: MeliesServer = server
self.to_convert_files: ToConvertFileHandler = self.server_.to_convert_files
self.metadata_files: MetadataFileHandler = self.server_.metadata_files
super().__init__(
*args,
directory=os.path.join(os.path.dirname(__file__), "public"),
@ -60,7 +36,7 @@ class HTTPHandler(SimpleHTTPRequestHandler):
self.data: Optional[dict|list] = None
def log_message(self, format, *args):
logging.info("%s - %s" % (
self.server_.logger.info("%s - %s" % (
self.client_address[0],
format % args
))
@ -68,9 +44,9 @@ class HTTPHandler(SimpleHTTPRequestHandler):
def read_body_data(self):
try:
size: int = int(self.headers["Content-Length"])
if size > self.MAX_PAYLOAD_SIZE:
if size > self.server_.max_payload_size:
self.send_error(HTTPStatus.CONTENT_TOO_LARGE)
self.log_error(f"Payload is too big ({self.MAX_PAYLOAD_SIZE=}B)")
self.log_error(f"Payload is too big ({self.server_.max_payload_size=}B)")
return False
raw_data = self.rfile.read(size)
self.data = json.loads(raw_data)
@ -103,16 +79,17 @@ class HTTPHandler(SimpleHTTPRequestHandler):
def handle_api_get(self, path: str):
self.log_message(f"API request at {path}")
if path == "files/to_convert":
files: list[str] = self.get_to_convert_files_meta(self.query.get("f", [""])[0])
base_path: str = self.query.get("f", [""])[0]
files: list[dict] = self.to_convert_files.get_files_meta(base_path)
self.send_json(files)
elif path == "files/metadata":
files: list[str] = self.get_metadata_files_meta()
files: list[dict] = self.metadata_files.get_files_meta()
self.send_json(files)
elif path.startswith("file"):
filename: str = path.split("/", 1)[1]
data = self.read_file(filename)
data = self.metadata_files.read(filename)
if data is None:
self.send_error(HTTPStatus.NOT_FOUND)
else:
@ -125,9 +102,11 @@ class HTTPHandler(SimpleHTTPRequestHandler):
if path.startswith("file"):
if self.read_body_data():
filename: str = path.split("/", 1)[1]
if self.write_file(filename, self.data):
if self.metadata_files.write(filename, self.data):
self.send_response(HTTPStatus.OK)
self.end_headers()
else:
self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR)
else:
self.send_response(HTTPStatus.NOT_FOUND, f"Unknown path {path}")
self.end_headers()
@ -137,114 +116,6 @@ class HTTPHandler(SimpleHTTPRequestHandler):
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(data).encode("utf-8"))
def get_to_convert_files(self, base_path: str):
root_path: str = os.path.abspath(self.TO_CONVERT_DIR)
full_path: str = os.path.join(root_path, base_path)
full_path = os.path.abspath(full_path)
common_prefix: str = os.path.commonprefix([full_path, root_path])
if common_prefix != root_path:
return []
return os.listdir(full_path)
def get_metadata_files(self):
return os.listdir(self.METADATA_DIR)
def read_file(self, filename: str) -> Optional[dict|list]:
if filename not in self.get_metadata_files():
return None
with open(os.path.join(self.METADATA_DIR, filename), "r") as f:
data = json.load(f)
return data
def write_file(self, filename: str, data: dict|list) -> bool:
if filename not in self.get_metadata_files():
self.send_error(HTTPStatus.NOT_FOUND)
return False
try:
with open(os.path.join(self.METADATA_DIR, filename), "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
except:
self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR)
return False
return True
def get_to_convert_files_meta(self, base_path: str):
files: list[str] = self.get_to_convert_files(base_path)
files = [os.path.join(self.TO_CONVERT_DIR, base_path, f) for f in files]
files_meta: list[dict] = []
deleted = set(self.TO_CONVERT_CACHE.keys()) - set(files)
for path in deleted:
del self.TO_CONVERT_CACHE[path]
for path in files:
last_modified: float = os.path.getmtime(path)
if path not in self.TO_CONVERT_CACHE or self.TO_CONVERT_CACHE[path]["ts"] < last_modified:
self.update_to_convert_file_meta(path)
files_meta.append(self.TO_CONVERT_CACHE[path])
return files_meta
def get_metadata_files_meta(self):
files: list[str] = self.get_metadata_files()
files_meta: list[dict] = []
deleted = set(self.METADATA_CACHE.keys()) - set(files)
for filename in deleted:
del self.METADATA_CACHE[filename]
for filename in files:
path: str = os.path.join(self.METADATA_DIR, filename)
last_modified: float = os.path.getmtime(path)
if filename not in self.METADATA_CACHE or self.METADATA_CACHE[filename]["ts"] < last_modified:
self.update_metadata_file_meta(filename)
files_meta.append(self.METADATA_CACHE[filename])
return files_meta
def update_metadata_file_meta(self, filename: str):
path: str = os.path.join(self.METADATA_DIR, filename)
meta = {
"filename": filename,
"ts": os.path.getmtime(path)
}
with open(path, "r") as f:
data = json.load(f)
is_series = "filename" not in data
meta["type"] = "series" if is_series else "film"
if is_series:
meta["episodes"] = len(data)
meta["title"] = filename.split("_metadata")[0]
else:
meta["title"] = data["title"]
self.METADATA_CACHE[filename] = meta
def update_to_convert_file_meta(self, path: str):
filename: str = os.path.basename(path)
is_dir: bool = os.path.isdir(path)
meta = {
"path": os.path.relpath(path, self.TO_CONVERT_DIR),
"filename": filename,
"ts": os.path.getmtime(path),
"size": os.path.getsize(path),
"type": "folder" if is_dir else "media"
}
if is_dir:
meta["elements"] = len(os.listdir(path))
if not meta["path"].endswith("/"):
meta["path"] += "/"
self.TO_CONVERT_CACHE[path] = meta
class MeliesServer(FileSystemEventHandler):
@ -257,6 +128,7 @@ class MeliesServer(FileSystemEventHandler):
max_payload_size: int):
super().__init__()
self.logger: Logger = logging.getLogger("MeliesServer")
self.port: int = port
self.to_convert_dir: str = to_convert_dir
@ -264,31 +136,33 @@ class MeliesServer(FileSystemEventHandler):
self.metadata_dir: str = metadata_dir
self.max_payload_size: int = max_payload_size
HTTPHandler.SERVER = self
if not os.path.exists(self.to_convert_dir):
os.mkdir(self.to_convert_dir)
if not os.path.exists(self.converted_dir):
os.mkdir(self.converted_dir)
if not os.path.exists(self.metadata_dir):
os.mkdir(self.metadata_dir)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt=r"%Y-%m-%d %H:%M:%S"
)
self.to_convert_files: ToConvertFileHandler = ToConvertFileHandler(self.to_convert_dir)
self.metadata_files: MetadataFileHandler = MetadataFileHandler(self.metadata_dir)
self.httpd: Optional[socketserver.TCPServer] = None
self.observer: BaseObserver = Observer()
self.observer.schedule(self, self.converted_dir, event_filter=[DirModifiedEvent])
self.observer.schedule(
self,
self.converted_dir,
recursive=True,
event_filter=[FileDeletedEvent, FileMovedEvent, FileClosedEvent]
)
self.last_event: float = time.time()
self.http_handler_cls = partial(HTTPHandler, self)
def start(self):
self.observer.start()
try:
with socketserver.TCPServer(("", self.port), HTTPHandler) as self.httpd:
logging.info(f"Serving on port {self.port}")
with socketserver.TCPServer(("", self.port), self.http_handler_cls) as self.httpd:
self.logger.info(f"Serving on port {self.port}")
self.httpd.serve_forever()
except KeyboardInterrupt:
pass
@ -297,69 +171,27 @@ class MeliesServer(FileSystemEventHandler):
def stop(self):
self.observer.stop()
self.observer.join()
def on_deleted(self, event: FileDeletedEvent):
self.logger.info(f"Converted media deleted: {event.src_path}")
self.delete_metadata(event.src_path)
return super().on_deleted(event)
def on_moved(self, event: FileMovedEvent):
self.logger.info(f"Converted media moved: {event.src_path} -> {event.dest_path}")
self.rename_metadata(event.src_path, event.dest_path)
return super().on_moved(event)
def on_closed(self, event: FileClosedEvent):
self.logger.info(f"Converted media created or modified: {event.src_path}")
self.extract_metadata(event.src_path)
return super().on_closed(event)
def on_modified(self, event: DirModifiedEvent):
t: float = time.time()
logging.info(event)
if t - self.last_event > 1:
self.last_event = t
def main():
parser = argparse.ArgumentParser(
description="Starts the Melies server",
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument(
"-p", "--port",
action=EnvDefault,
envvar="MELIES_PORT",
default=8000,
type=int,
help="Port on which the server listens"
)
parser.add_argument(
"--max-payload-size",
action=EnvDefault,
envvar="MELIES_MAX_PAYLOAD_SIZE",
default=1e6,
type=int,
help="Maximum POST payload size in bytes that the server accepts"
)
parser.add_argument(
"--to-convert-dir",
action=EnvDefault,
envvar="MELIES_TO_CONVERT_DIR",
default="to_convert",
help="Path to the directory containing medias to convert"
)
parser.add_argument(
"--converted-dir",
action=EnvDefault,
envvar="MELIES_CONVERTED_DIR",
default="converted",
help="Path to the directory containing converted medias"
)
parser.add_argument(
"--metadata-dir",
action=EnvDefault,
envvar="MELIES_METADATA_DIR",
default="metadata",
help="Path to the directory containing metadata files"
)
args = parser.parse_args()
def extract_metadata(self, path: str):
pass
server = MeliesServer(
args.port,
args.to_convert_dir,
args.converted_dir,
args.metadata_dir,
args.max_payload_size
)
server.start()
def rename_metadata(self, src: str, dst: str):
pass
if __name__ == "__main__":
main()
def delete_metadata(self, path: str):
pass

View File

@ -1,273 +0,0 @@
#!/usr/bin/env python3
import argparse
import json
import os
import subprocess
import sys
def read_metadata_json(json_file):
"""
Read metadata from a JSON file.
Args:
json_file (str): Path to the JSON file
Returns:
dict: Metadata information
"""
try:
with open(json_file, 'r', encoding='utf-8') as f:
metadata = json.load(f)
return metadata
except Exception as e:
print(f"❌ Error reading JSON file: {str(e)}")
return None
def write_metadata_to_video(metadata, input_file, output_file=None):
"""
Write metadata to a video file using mkvmerge.
Args:
metadata (dict): Metadata information
input_file (str): Path to the input video file
output_file (str, optional): Path to the output video file
Returns:
bool: True if successful, False otherwise
"""
if not os.path.isfile(input_file):
print(f"❌ Input file not found: {input_file}")
return False
if not output_file:
# Create a temporary output file
base_name, ext = os.path.splitext(input_file)
output_file = f"{base_name}_modified{ext}"
# Start building the mkvmerge command
cmd = ["mkvmerge", "-o", output_file]
# Add global metadata (title)
if "title" in metadata:
cmd.extend(["--title", metadata["title"]])
# Process audio tracks
for track in metadata.get("audio_tracks", []):
# Use the actual track index from the metadata
track_id = track.get("index", 0)
# Set language
if "language" in track:
cmd.extend([f"--language", f"{track_id}:{track['language']}"])
# Set title/name
if "name" in track and track["name"]:
cmd.extend([f"--track-name", f"{track_id}:{track['name']}"])
# Set disposition flags
flags = track.get("flags", {})
if flags.get("default", False):
cmd.extend([f"--default-track", f"{track_id}:yes"])
else:
cmd.extend([f"--default-track", f"{track_id}:no"])
if flags.get("forced", False):
cmd.extend([f"--forced-track", f"{track_id}:yes"])
else:
cmd.extend([f"--forced-track", f"{track_id}:no"])
if flags.get("original", False):
cmd.extend([f"--original-flag", f"{track_id}:yes"])
else:
cmd.extend([f"--original-flag", f"{track_id}:no"])
# Process subtitle tracks
for track in metadata.get("subtitle_tracks", []):
# Use the actual track index from the metadata
track_id = track.get("index", 0)
# Set language
if "language" in track:
cmd.extend([f"--language", f"{track_id}:{track['language']}"])
# Set title/name
if "name" in track and track["name"]:
cmd.extend([f"--track-name", f"{track_id}:{track['name']}"])
# Set disposition flags
flags = track.get("flags", {})
if flags.get("default", False):
cmd.extend([f"--default-track", f"{track_id}:yes"])
else:
cmd.extend([f"--default-track", f"{track_id}:no"])
if flags.get("forced", False):
cmd.extend([f"--forced-track", f"{track_id}:yes"])
else:
cmd.extend([f"--forced-track", f"{track_id}:no"])
if flags.get("original", False):
cmd.extend([f"--original-flag", f"{track_id}:yes"])
else:
cmd.extend([f"--original-flag", f"{track_id}:no"])
# Add input file
cmd.append(input_file)
# Execute the mkvmerge command
print(f"🔄 Writing metadata to {os.path.basename(output_file)}")
print(f"Command: {' '.join(cmd)}")
try:
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
print(f"❌ Error writing metadata: {result.stderr}")
return False
print(f"✅ Metadata written to {output_file}")
return True
except Exception as e:
print(f"❌ Error executing mkvmerge: {str(e)}")
return False
def process_single_file(metadata, video_file, output_dir=None):
"""
Process a single video file with the given metadata.
Args:
metadata (dict): Metadata for the video file
video_file (str): Path to the video file
output_dir (str, optional): Directory to save the output file
Returns:
bool: True if successful, False otherwise
"""
if not os.path.isfile(video_file):
print(f"❌ Video file not found: {video_file}")
return False
# Create output file path
if output_dir:
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
# Use the same filename in the output directory
output_file = os.path.join(output_dir, os.path.basename(video_file))
else:
output_file = None # Let write_metadata_to_video create a default output file
# Write metadata to video
return write_metadata_to_video(metadata, video_file, output_file)
def process_directory(metadata_dict, source_dir, output_dir=None):
"""
Process all video files in the metadata dictionary.
Args:
metadata_dict (dict): Dictionary of metadata keyed by filename
source_dir (str): Directory containing the video files
output_dir (str, optional): Directory to save the output files
Returns:
bool: True if all files were processed successfully, False otherwise
"""
if not os.path.isdir(source_dir):
print(f"❌ Source directory not found: {source_dir}")
return False
# Create output directory if specified
if output_dir:
os.makedirs(output_dir, exist_ok=True)
success = True
processed_count = 0
# Process each file in the metadata dictionary
for filename, file_metadata in metadata_dict.items():
# Construct the full path to the video file
video_file = os.path.join(source_dir, filename)
if not os.path.isfile(video_file):
print(f"❌ Video file not found: {video_file}")
success = False
continue
# Process the file
if process_single_file(file_metadata, video_file, output_dir):
processed_count += 1
else:
success = False
print(f"✅ Processed {processed_count} out of {len(metadata_dict)} files")
return success
def main():
parser = argparse.ArgumentParser(description="Write metadata from JSON to video files.")
parser.add_argument("json_file", help="Path to input JSON metadata file")
parser.add_argument("-o", "--output", help="Path to output directory")
parser.add_argument("-s", "--source", help="Source directory (overrides automatic detection)")
args = parser.parse_args()
json_file = args.json_file
output_dir = args.output
source_dir = args.source
if not os.path.isfile(json_file):
print(f"❌ JSON file not found: {json_file}")
sys.exit(1)
# Read metadata from JSON
metadata = read_metadata_json(json_file)
if not metadata:
sys.exit(1)
# Determine if the JSON contains metadata for multiple files or a single file
is_multi_file = isinstance(metadata, dict) and all(isinstance(metadata[key], dict) for key in metadata)
# If source directory is not specified, try to determine it from the JSON filename
if not source_dir and is_multi_file:
# Extract folder name from JSON filename (e.g., "Millenium" from "Millenium_metadata.json")
json_basename = os.path.basename(json_file)
if "_metadata.json" in json_basename:
folder_name = json_basename.split("_metadata.json")[0]
potential_source_dir = os.path.join(os.path.dirname(os.path.abspath(json_file)), folder_name)
if os.path.isdir(potential_source_dir):
source_dir = potential_source_dir
print(f"📂 Using source directory: {source_dir}")
# If no output directory is specified, create one based on the source directory
if not output_dir and source_dir:
output_dir = os.path.join("ready", os.path.basename(source_dir))
print(f"📂 Using output directory: {output_dir}")
# Process files based on the metadata format
if is_multi_file:
if not source_dir:
print("❌ Source directory not specified and could not be determined automatically.")
print(" Please specify a source directory with --source or use a JSON filename like 'FolderName_metadata.json'")
sys.exit(1)
success = process_directory(metadata, source_dir, output_dir)
else:
# Single file metadata
if "filename" not in metadata:
print("❌ Invalid metadata format: missing 'filename' field")
sys.exit(1)
# If source directory is specified, look for the file there
if source_dir:
video_file = os.path.join(source_dir, metadata["filename"])
else:
# Look for the file in the same directory as the JSON
video_file = os.path.join(os.path.dirname(json_file), metadata["filename"])
success = process_single_file(metadata, video_file, output_dir)
if not success:
sys.exit(1)
if __name__ == "__main__":
main()