From 7a49849ee96eb5906d48b146fb5fca4c2388f501 Mon Sep 17 00:00:00 2001 From: LordBaryhobal Date: Mon, 5 May 2025 21:53:24 +0200 Subject: [PATCH] refactor: adapt metadata writer --- scripts/write_metadata.py | 45 +++++++ src/get_metadata.py | 196 --------------------------- src/metadata_writer.py | 240 +++++++++++++++++++++++++++++++++ src/write_metadata.py | 273 -------------------------------------- 4 files changed, 285 insertions(+), 469 deletions(-) create mode 100644 scripts/write_metadata.py delete mode 100755 src/get_metadata.py create mode 100644 src/metadata_writer.py delete mode 100755 src/write_metadata.py diff --git a/scripts/write_metadata.py b/scripts/write_metadata.py new file mode 100644 index 0000000..4ea8d71 --- /dev/null +++ b/scripts/write_metadata.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 + +import argparse +import logging +import sys + +from src.metadata_writer import MetadataWriter + + +def main(): + logging.basicConfig( + level=logging.INFO, + format="[%(levelname)s] %(message)s" + ) + + parser = argparse.ArgumentParser( + description="Write metadata from JSON to video files" + ) + parser.add_argument( + "json_file", + help="Path to input JSON metadata file" + ) + parser.add_argument( + "-o", "--output", + help="Path of the output directory" + ) + parser.add_argument( + "-s", "--source", + help="Source directory (overrides automatic detection)" + ) + args = parser.parse_args() + + json_file = args.json_file + output_dir = args.output + source_dir = args.source + + writer: MetadataWriter = MetadataWriter() + + success: bool = writer.process_metadata(json_file, source_dir, output_dir) + + if not success: + sys.exit(1) + +if __name__ == "__main__": + main() diff --git a/src/get_metadata.py b/src/get_metadata.py deleted file mode 100755 index aff67b9..0000000 --- a/src/get_metadata.py +++ /dev/null @@ -1,196 +0,0 @@ -#!/usr/bin/env python3 -import argparse -import os -import subprocess -import json -import sys - -SUPPORTED_EXTENSIONS = (".mp4", ".mkv", ".mov", ".avi") - -def get_video_metadata(file_path): - """ - Extract metadata from a video file using ffprobe. - - Args: - file_path (str): Path to the video file - - Returns: - dict: Metadata information - """ - # Get general file info - cmd = [ - "ffprobe", "-v", "quiet", "-print_format", "json", - "-show_format", "-show_streams", file_path - ] - - try: - result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - if result.returncode != 0: - print(f"❌ Error processing {file_path}: {result.stderr}") - return None - - data = json.loads(result.stdout) - - # Extract filename and title - filename = os.path.basename(file_path) - title = data.get("format", {}).get("tags", {}).get("title", filename) - - # Initialize metadata structure - metadata = { - "filename": filename, - "title": title, - "audio_tracks": [], - "subtitle_tracks": [] - } - - # Process streams - for stream in data.get("streams", []): - codec_type = stream.get("codec_type") - - if codec_type == "audio": - track = { - "index": stream.get("index"), - "language": stream.get("tags", {}).get("language", "und"), - "name": stream.get("tags", {}).get("title", ""), - "channels": stream.get("channels", 0), - "flags": { - "default": stream.get("disposition", {}).get("default", 0) == 1, - "visual_impaired": stream.get("disposition", {}).get("visual_impaired", 0) == 1, - "original": stream.get("disposition", {}).get("original", 0) == 1, - "commentary": stream.get("disposition", {}).get("comment", 0) == 1 - } - } - metadata["audio_tracks"].append(track) - - elif codec_type == "subtitle": - track = { - "index": stream.get("index"), - "language": stream.get("tags", {}).get("language", "und"), - "name": stream.get("tags", {}).get("title", ""), - "flags": { - "default": stream.get("disposition", {}).get("default", 0) == 1, - "forced": stream.get("disposition", {}).get("forced", 0) == 1, - "hearing_impaired": stream.get("disposition", {}).get("hearing_impaired", 0) == 1, - "original": stream.get("disposition", {}).get("original", 0) == 1, - "commentary": stream.get("disposition", {}).get("comment", 0) == 1 - } - } - metadata["subtitle_tracks"].append(track) - - return metadata - - except Exception as e: - print(f"❌ Error processing {file_path}: {str(e)}") - return None - -def process_file(file_path, output_dir=None): - """ - Process a single video file and write metadata to JSON. - - Args: - file_path (str): Path to the video file - output_dir (str, optional): Directory where the output JSON file will be saved - """ - if not os.path.isfile(file_path): - print(f"❌ File not found: {file_path}") - return False - - if not file_path.lower().endswith(SUPPORTED_EXTENSIONS): - print(f"❌ Unsupported file format: {file_path}") - return False - - print(f"📊 Extracting metadata from {os.path.basename(file_path)}") - metadata = get_video_metadata(file_path) - - if metadata: - # Generate output filename based on input file - filename = os.path.basename(os.path.splitext(file_path)[0]) + "_metadata.json" - - if output_dir: - # Ensure output directory exists - os.makedirs(output_dir, exist_ok=True) - output_path = os.path.join(output_dir, filename) - else: - # If no output directory specified, save in the same directory as the input file - base_name = os.path.splitext(file_path)[0] - output_path = f"{base_name}_metadata.json" - - # Write metadata to JSON file - with open(output_path, 'w', encoding='utf-8') as f: - json.dump(metadata, f, indent=2, ensure_ascii=False) - - print(f"✅ Metadata saved to {output_path}") - return True - - return False - -def process_directory(directory_path, output_dir=None): - """ - Process all video files in a directory and write metadata to JSON. - - Args: - directory_path (str): Path to the directory - output_dir (str, optional): Directory where the output JSON file will be saved - """ - if not os.path.isdir(directory_path): - print(f"❌ Directory not found: {directory_path}") - return False - - all_metadata = {} - file_count = 0 - - for root, _, files in os.walk(directory_path): - for file in files: - if file.lower().endswith(SUPPORTED_EXTENSIONS): - file_path = os.path.join(root, file) - print(f"📊 Extracting metadata from {file}") - metadata = get_video_metadata(file_path) - - if metadata: - # Use relative path as key - rel_path = os.path.relpath(file_path, directory_path) - all_metadata[rel_path] = metadata - file_count += 1 - - if file_count == 0: - print(f"❌ No supported video files found in {directory_path}") - return False - - # Generate output filename based on directory name - dir_name = os.path.basename(os.path.normpath(directory_path)) - filename = f"{dir_name}_metadata.json" - - if output_dir: - # Ensure output directory exists - os.makedirs(output_dir, exist_ok=True) - output_path = os.path.join(output_dir, filename) - else: - # If no output directory specified, save in the current directory - output_path = filename - - # Write all metadata to a single JSON file - with open(output_path, 'w', encoding='utf-8') as f: - json.dump(all_metadata, f, indent=2, ensure_ascii=False) - - print(f"✅ Metadata for {file_count} files saved to {output_path}") - return True - -def main(): - parser = argparse.ArgumentParser(description="Extract metadata from video files and save as JSON.") - parser.add_argument("input", help="Path to input video file or directory") - parser.add_argument("-o", "--output", help="Directory path where output JSON files will be saved") - args = parser.parse_args() - - input_path = args.input - output_dir = args.output - - if os.path.isfile(input_path): - process_file(input_path, output_dir) - elif os.path.isdir(input_path): - process_directory(input_path, output_dir) - else: - print(f"❌ Path not found: {input_path}") - sys.exit(1) - -if __name__ == "__main__": - main() diff --git a/src/metadata_writer.py b/src/metadata_writer.py new file mode 100644 index 0000000..b23ff66 --- /dev/null +++ b/src/metadata_writer.py @@ -0,0 +1,240 @@ +import json +import logging +import os +import subprocess +from typing import Optional + + +class MetadataWriter: + SUPPORTED_EXTENSIONS = (".mp4", ".mkv", ".mov", ".avi") + + def __init__(self): + self.logger: logging.Logger = logging.getLogger("MetadataWriter") + + def apply_metadata(self, metadata: dict, in_path: str, out_path: Optional[str] = None) -> bool: + """ + Writes metadata to a video file using mkvmerge + + :param metadata: Metadata information + :param in_path: Path of the input video file + :param out_path: Path of the output video file. If None, ``"_modified"`` is appended to ``in_path`` instead + :return: True if successful, False otherwise + """ + + if not os.path.isfile(in_path): + self.logger.error(f"Input file not found: {in_path}") + return False + + if out_path is None: + # Create a temporary output file + base_name, ext = os.path.splitext(in_path) + out_path: str = f"{base_name}_modified{ext}" + + # Start building the mkvmerge command + cmd: list[str] = [ + "mkvmerge", + "-o", out_path + ] + + # Add global metadata (title) + if "title" in metadata: + cmd.extend(["--title", metadata["title"]]) + + # Process audio + subtitle tracks + tracks: list[dict] = metadata.get("audio_tracks", []) + metadata.get("subtitle_tracks", []) + for track in tracks: + # Use the actual track index from the metadata + track_id = track.get("index", 0) + + # Set language + if "language" in track: + cmd.extend(["--language", f"{track_id}:{track["language"]}"]) + + # Set title/name + if "name" in track and track["name"]: + cmd.extend(["--track-name", f"{track_id}:{track["name"]}"]) + + # Set disposition flags + flags = track.get("flags", {}) + + def yes_no(flag: str): + return f"{track_id}:{"yes" if flags.get(flag, False) else "no"}" + + cmd.extend(["--default-track", yes_no("default")]) + cmd.extend(["--forced-track", yes_no("forced")]) + cmd.extend(["--original-flag", yes_no("original")]) + + # Add input file + cmd.append(in_path) + + # Execute the mkvmerge command + self.logger.debug(f"Writing metadata to {os.path.basename(out_path)}") + + try: + result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if result.returncode != 0: + self.logger.error(f"Error writing metadata: {result.stderr}") + return False + + self.logger.debug(f"Metadata written to {out_path}") + return True + + except Exception as e: + self.logger.error(f"Error executing mkvmerge: {str(e)}") + return False + + @staticmethod + def read_metadata(path: str) -> Optional[dict]: + try: + with open(path, "r") as f: + metadata: dict = json.load(f) + return metadata + + except: + return None + + def process_file(self, metadata_or_path: str|dict, file_path: str, output_dir: Optional[str] = None) -> bool: + """ + Processes a single video file with the given metadata + + :param metadata_or_path: Metadata dict or path of the metadata file + :param file_path: Path of the video file + :param output_dir: Directory to save the output file to + :return: True if successful, False otherwise + """ + + metadata: dict + if isinstance(metadata_or_path, str): + metadata = self.read_metadata(metadata_or_path) + if metadata is None: + return False + else: + metadata = metadata_or_path + + # Create output file path + if output_dir is not None: + # Ensure output directory exists + os.makedirs(output_dir, exist_ok=True) + + # Use the same filename in the output directory + output_file = os.path.join(output_dir, os.path.basename(file_path)) + else: + output_file = None + + # Write metadata to video + return self.apply_metadata(metadata, file_path, output_file) + + def process_directory(self, metadata_or_path: str|dict, source_dir: str, output_dir: Optional[str] = None) -> bool: + """ + Processes all video files in the metadata dictionary + + :param metadata_or_path: Dictionary of metadata keyed by filename + :param source_dir: Directory containing the video files + :param output_dir: Directory to save the output files to + :return: True if all files were processed successfully, False otherwise + """ + + metadata: dict + if isinstance(metadata_or_path, str): + metadata = self.read_metadata(metadata_or_path) + if metadata is None: + return False + else: + metadata = metadata_or_path + + if not os.path.isdir(source_dir): + self.logger.error(f"Source directory not found: {source_dir}") + return False + + # Create output directory if specified + if output_dir: + os.makedirs(output_dir, exist_ok=True) + + success: bool = True + processed_count: int = 0 + + # Process each file in the metadata dictionary + for filename, file_metadata in metadata.items(): + # Construct the full path to the video file + video_file: str = os.path.join(source_dir, filename) + + if not os.path.isfile(video_file): + self.logger.error(f"Video file not found: {video_file}") + success = False + continue + + # Process the file + if self.process_file(file_metadata, video_file, output_dir): + processed_count += 1 + else: + success = False + + self.logger.debug(f"Processed {processed_count} out of {len(metadata)} files") + return success + + def process_metadata(self, metadata_or_path: str|dict, source_dir: Optional[str] = None, output_dir: Optional[str] = None) -> bool: + metadata_as_path: bool = isinstance(metadata_or_path, str) + + metadata: dict + if metadata_as_path: + metadata = self.read_metadata(metadata_or_path) + if metadata is None: + return False + else: + metadata = metadata_or_path + + # Determine if the JSON contains metadata for multiple files or a single file + is_multi_file = isinstance(metadata, dict) and all(isinstance(metadata[key], dict) for key in metadata) + + # If source directory is not specified, try to determine it from the JSON filename + if source_dir is None and is_multi_file and metadata_as_path: + # Extract folder name from JSON filename (e.g., "Millenium" from "Millenium_metadata.json") + json_basename: str = os.path.basename(metadata_or_path) + if json_basename.endswith("_metadata.json"): + folder_name: str = json_basename.split("_metadata.json")[0] + potential_source_dir: str = os.path.join( + os.path.dirname(os.path.abspath(metadata_or_path)), + folder_name + ) + + if os.path.isdir(potential_source_dir): + source_dir: str = potential_source_dir + self.logger.debug(f"Using source directory: {source_dir}") + + # If no output directory is specified, create one based on the source directory + if output_dir is None and source_dir is not None: + output_dir = os.path.join("ready", os.path.basename(source_dir)) + self.logger.debug(f"Using output directory: {output_dir}") + + # Process files based on the metadata format + if is_multi_file: + if source_dir is None: + self.logger.error( + "Source directory not specified and could not be determined automatically. " + + "Please specify a source directory with --source or use a JSON filename like 'FolderName_metadata.json'" + ) + return False + + success = self.process_directory(metadata, source_dir, output_dir) + else: + # Single file metadata + if "filename" not in metadata: + self.logger.error("Invalid metadata format: missing 'filename' field") + return False + + # If source directory is specified, look for the file there + video_file: str + if source_dir is not None: + video_file = os.path.join(source_dir, metadata["filename"]) + elif metadata_as_path: + # Look for the file in the same directory as the JSON + video_file = os.path.join(os.path.dirname(metadata_or_path), metadata["filename"]) + else: + self.logger.error( + "Source directory not specified and video path could not be determined automatically. " + + "Please specify a source directory with --source or use JSON filename like 'VideoName_metadata.json'" + ) + return False + + success = self.process_file(metadata, video_file, output_dir) + return success \ No newline at end of file diff --git a/src/write_metadata.py b/src/write_metadata.py deleted file mode 100755 index d3153ef..0000000 --- a/src/write_metadata.py +++ /dev/null @@ -1,273 +0,0 @@ -#!/usr/bin/env python3 -import argparse -import json -import os -import subprocess -import sys - -def read_metadata_json(json_file): - """ - Read metadata from a JSON file. - - Args: - json_file (str): Path to the JSON file - - Returns: - dict: Metadata information - """ - try: - with open(json_file, 'r', encoding='utf-8') as f: - metadata = json.load(f) - return metadata - except Exception as e: - print(f"❌ Error reading JSON file: {str(e)}") - return None - -def write_metadata_to_video(metadata, input_file, output_file=None): - """ - Write metadata to a video file using mkvmerge. - - Args: - metadata (dict): Metadata information - input_file (str): Path to the input video file - output_file (str, optional): Path to the output video file - - Returns: - bool: True if successful, False otherwise - """ - if not os.path.isfile(input_file): - print(f"❌ Input file not found: {input_file}") - return False - - if not output_file: - # Create a temporary output file - base_name, ext = os.path.splitext(input_file) - output_file = f"{base_name}_modified{ext}" - - # Start building the mkvmerge command - cmd = ["mkvmerge", "-o", output_file] - - # Add global metadata (title) - if "title" in metadata: - cmd.extend(["--title", metadata["title"]]) - - # Process audio tracks - for track in metadata.get("audio_tracks", []): - # Use the actual track index from the metadata - track_id = track.get("index", 0) - - # Set language - if "language" in track: - cmd.extend([f"--language", f"{track_id}:{track['language']}"]) - - # Set title/name - if "name" in track and track["name"]: - cmd.extend([f"--track-name", f"{track_id}:{track['name']}"]) - - # Set disposition flags - flags = track.get("flags", {}) - - if flags.get("default", False): - cmd.extend([f"--default-track", f"{track_id}:yes"]) - else: - cmd.extend([f"--default-track", f"{track_id}:no"]) - - if flags.get("forced", False): - cmd.extend([f"--forced-track", f"{track_id}:yes"]) - else: - cmd.extend([f"--forced-track", f"{track_id}:no"]) - - if flags.get("original", False): - cmd.extend([f"--original-flag", f"{track_id}:yes"]) - else: - cmd.extend([f"--original-flag", f"{track_id}:no"]) - - # Process subtitle tracks - for track in metadata.get("subtitle_tracks", []): - # Use the actual track index from the metadata - track_id = track.get("index", 0) - - # Set language - if "language" in track: - cmd.extend([f"--language", f"{track_id}:{track['language']}"]) - - # Set title/name - if "name" in track and track["name"]: - cmd.extend([f"--track-name", f"{track_id}:{track['name']}"]) - - # Set disposition flags - flags = track.get("flags", {}) - - if flags.get("default", False): - cmd.extend([f"--default-track", f"{track_id}:yes"]) - else: - cmd.extend([f"--default-track", f"{track_id}:no"]) - - if flags.get("forced", False): - cmd.extend([f"--forced-track", f"{track_id}:yes"]) - else: - cmd.extend([f"--forced-track", f"{track_id}:no"]) - - if flags.get("original", False): - cmd.extend([f"--original-flag", f"{track_id}:yes"]) - else: - cmd.extend([f"--original-flag", f"{track_id}:no"]) - - # Add input file - cmd.append(input_file) - - # Execute the mkvmerge command - print(f"🔄 Writing metadata to {os.path.basename(output_file)}") - print(f"Command: {' '.join(cmd)}") - - try: - result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - if result.returncode != 0: - print(f"❌ Error writing metadata: {result.stderr}") - return False - - print(f"✅ Metadata written to {output_file}") - return True - except Exception as e: - print(f"❌ Error executing mkvmerge: {str(e)}") - return False - -def process_single_file(metadata, video_file, output_dir=None): - """ - Process a single video file with the given metadata. - - Args: - metadata (dict): Metadata for the video file - video_file (str): Path to the video file - output_dir (str, optional): Directory to save the output file - - Returns: - bool: True if successful, False otherwise - """ - if not os.path.isfile(video_file): - print(f"❌ Video file not found: {video_file}") - return False - - # Create output file path - if output_dir: - # Ensure output directory exists - os.makedirs(output_dir, exist_ok=True) - - # Use the same filename in the output directory - output_file = os.path.join(output_dir, os.path.basename(video_file)) - else: - output_file = None # Let write_metadata_to_video create a default output file - - # Write metadata to video - return write_metadata_to_video(metadata, video_file, output_file) - -def process_directory(metadata_dict, source_dir, output_dir=None): - """ - Process all video files in the metadata dictionary. - - Args: - metadata_dict (dict): Dictionary of metadata keyed by filename - source_dir (str): Directory containing the video files - output_dir (str, optional): Directory to save the output files - - Returns: - bool: True if all files were processed successfully, False otherwise - """ - if not os.path.isdir(source_dir): - print(f"❌ Source directory not found: {source_dir}") - return False - - # Create output directory if specified - if output_dir: - os.makedirs(output_dir, exist_ok=True) - - success = True - processed_count = 0 - - # Process each file in the metadata dictionary - for filename, file_metadata in metadata_dict.items(): - # Construct the full path to the video file - video_file = os.path.join(source_dir, filename) - - if not os.path.isfile(video_file): - print(f"❌ Video file not found: {video_file}") - success = False - continue - - # Process the file - if process_single_file(file_metadata, video_file, output_dir): - processed_count += 1 - else: - success = False - - print(f"✅ Processed {processed_count} out of {len(metadata_dict)} files") - return success - -def main(): - parser = argparse.ArgumentParser(description="Write metadata from JSON to video files.") - parser.add_argument("json_file", help="Path to input JSON metadata file") - parser.add_argument("-o", "--output", help="Path to output directory") - parser.add_argument("-s", "--source", help="Source directory (overrides automatic detection)") - args = parser.parse_args() - - json_file = args.json_file - output_dir = args.output - source_dir = args.source - - if not os.path.isfile(json_file): - print(f"❌ JSON file not found: {json_file}") - sys.exit(1) - - # Read metadata from JSON - metadata = read_metadata_json(json_file) - if not metadata: - sys.exit(1) - - # Determine if the JSON contains metadata for multiple files or a single file - is_multi_file = isinstance(metadata, dict) and all(isinstance(metadata[key], dict) for key in metadata) - - # If source directory is not specified, try to determine it from the JSON filename - if not source_dir and is_multi_file: - # Extract folder name from JSON filename (e.g., "Millenium" from "Millenium_metadata.json") - json_basename = os.path.basename(json_file) - if "_metadata.json" in json_basename: - folder_name = json_basename.split("_metadata.json")[0] - potential_source_dir = os.path.join(os.path.dirname(os.path.abspath(json_file)), folder_name) - - if os.path.isdir(potential_source_dir): - source_dir = potential_source_dir - print(f"📂 Using source directory: {source_dir}") - - # If no output directory is specified, create one based on the source directory - if not output_dir and source_dir: - output_dir = os.path.join("ready", os.path.basename(source_dir)) - print(f"📂 Using output directory: {output_dir}") - - # Process files based on the metadata format - if is_multi_file: - if not source_dir: - print("❌ Source directory not specified and could not be determined automatically.") - print(" Please specify a source directory with --source or use a JSON filename like 'FolderName_metadata.json'") - sys.exit(1) - - success = process_directory(metadata, source_dir, output_dir) - else: - # Single file metadata - if "filename" not in metadata: - print("❌ Invalid metadata format: missing 'filename' field") - sys.exit(1) - - # If source directory is specified, look for the file there - if source_dir: - video_file = os.path.join(source_dir, metadata["filename"]) - else: - # Look for the file in the same directory as the JSON - video_file = os.path.join(os.path.dirname(json_file), metadata["filename"]) - - success = process_single_file(metadata, video_file, output_dir) - - if not success: - sys.exit(1) - -if __name__ == "__main__": - main()