#!/usr/bin/env python3 import argparse import json import os import subprocess import sys def read_metadata_json(json_file): """ Read metadata from a JSON file. Args: json_file (str): Path to the JSON file Returns: dict: Metadata information """ try: with open(json_file, 'r', encoding='utf-8') as f: metadata = json.load(f) return metadata except Exception as e: print(f"❌ Error reading JSON file: {str(e)}") return None def write_metadata_to_video(metadata, input_file, output_file=None): """ Write metadata to a video file using mkvmerge. Args: metadata (dict): Metadata information input_file (str): Path to the input video file output_file (str, optional): Path to the output video file Returns: bool: True if successful, False otherwise """ if not os.path.isfile(input_file): print(f"❌ Input file not found: {input_file}") return False if not output_file: # Create a temporary output file base_name, ext = os.path.splitext(input_file) output_file = f"{base_name}_modified{ext}" # Start building the mkvmerge command cmd = ["mkvmerge", "-o", output_file] # Add global metadata (title) if "title" in metadata: cmd.extend(["--title", metadata["title"]]) # Process audio tracks for track in metadata.get("audio_tracks", []): # Use the actual track index from the metadata track_id = track.get("index", 0) # Set language if "language" in track: cmd.extend([f"--language", f"{track_id}:{track['language']}"]) # Set title/name if "name" in track and track["name"]: cmd.extend([f"--track-name", f"{track_id}:{track['name']}"]) # Set disposition flags flags = track.get("flags", {}) if flags.get("default", False): cmd.extend([f"--default-track", f"{track_id}:yes"]) else: cmd.extend([f"--default-track", f"{track_id}:no"]) if flags.get("forced", False): cmd.extend([f"--forced-track", f"{track_id}:yes"]) else: cmd.extend([f"--forced-track", f"{track_id}:no"]) if flags.get("original", False): cmd.extend([f"--original-flag", f"{track_id}:yes"]) else: cmd.extend([f"--original-flag", f"{track_id}:no"]) # Process subtitle tracks for track in metadata.get("subtitle_tracks", []): # Use the actual track index from the metadata track_id = track.get("index", 0) # Set language if "language" in track: cmd.extend([f"--language", f"{track_id}:{track['language']}"]) # Set title/name if "name" in track and track["name"]: cmd.extend([f"--track-name", f"{track_id}:{track['name']}"]) # Set disposition flags flags = track.get("flags", {}) if flags.get("default", False): cmd.extend([f"--default-track", f"{track_id}:yes"]) else: cmd.extend([f"--default-track", f"{track_id}:no"]) if flags.get("forced", False): cmd.extend([f"--forced-track", f"{track_id}:yes"]) else: cmd.extend([f"--forced-track", f"{track_id}:no"]) if flags.get("original", False): cmd.extend([f"--original-flag", f"{track_id}:yes"]) else: cmd.extend([f"--original-flag", f"{track_id}:no"]) # Add input file cmd.append(input_file) # Execute the mkvmerge command print(f"🔄 Writing metadata to {os.path.basename(output_file)}") print(f"Command: {' '.join(cmd)}") try: result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode != 0: print(f"❌ Error writing metadata: {result.stderr}") return False print(f"✅ Metadata written to {output_file}") return True except Exception as e: print(f"❌ Error executing mkvmerge: {str(e)}") return False def process_single_file(metadata, video_file, output_dir=None): """ Process a single video file with the given metadata. Args: metadata (dict): Metadata for the video file video_file (str): Path to the video file output_dir (str, optional): Directory to save the output file Returns: bool: True if successful, False otherwise """ if not os.path.isfile(video_file): print(f"❌ Video file not found: {video_file}") return False # Create output file path if output_dir: # Ensure output directory exists os.makedirs(output_dir, exist_ok=True) # Use the same filename in the output directory output_file = os.path.join(output_dir, os.path.basename(video_file)) else: output_file = None # Let write_metadata_to_video create a default output file # Write metadata to video return write_metadata_to_video(metadata, video_file, output_file) def process_directory(metadata_dict, source_dir, output_dir=None): """ Process all video files in the metadata dictionary. Args: metadata_dict (dict): Dictionary of metadata keyed by filename source_dir (str): Directory containing the video files output_dir (str, optional): Directory to save the output files Returns: bool: True if all files were processed successfully, False otherwise """ if not os.path.isdir(source_dir): print(f"❌ Source directory not found: {source_dir}") return False # Create output directory if specified if output_dir: os.makedirs(output_dir, exist_ok=True) success = True processed_count = 0 # Process each file in the metadata dictionary for filename, file_metadata in metadata_dict.items(): # Construct the full path to the video file video_file = os.path.join(source_dir, filename) if not os.path.isfile(video_file): print(f"❌ Video file not found: {video_file}") success = False continue # Process the file if process_single_file(file_metadata, video_file, output_dir): processed_count += 1 else: success = False print(f"✅ Processed {processed_count} out of {len(metadata_dict)} files") return success def main(): parser = argparse.ArgumentParser(description="Write metadata from JSON to video files.") parser.add_argument("json_file", help="Path to input JSON metadata file") parser.add_argument("-o", "--output", help="Path to output directory") parser.add_argument("-s", "--source", help="Source directory (overrides automatic detection)") args = parser.parse_args() json_file = args.json_file output_dir = args.output source_dir = args.source if not os.path.isfile(json_file): print(f"❌ JSON file not found: {json_file}") sys.exit(1) # Read metadata from JSON metadata = read_metadata_json(json_file) if not metadata: sys.exit(1) # Determine if the JSON contains metadata for multiple files or a single file is_multi_file = isinstance(metadata, dict) and all(isinstance(metadata[key], dict) for key in metadata) # If source directory is not specified, try to determine it from the JSON filename if not source_dir and is_multi_file: # Extract folder name from JSON filename (e.g., "Millenium" from "Millenium_metadata.json") json_basename = os.path.basename(json_file) if "_metadata.json" in json_basename: folder_name = json_basename.split("_metadata.json")[0] potential_source_dir = os.path.join(os.path.dirname(os.path.abspath(json_file)), folder_name) if os.path.isdir(potential_source_dir): source_dir = potential_source_dir print(f"📂 Using source directory: {source_dir}") # If no output directory is specified, create one based on the source directory if not output_dir and source_dir: output_dir = os.path.join("ready", os.path.basename(source_dir)) print(f"📂 Using output directory: {output_dir}") # Process files based on the metadata format if is_multi_file: if not source_dir: print("❌ Source directory not specified and could not be determined automatically.") print(" Please specify a source directory with --source or use a JSON filename like 'FolderName_metadata.json'") sys.exit(1) success = process_directory(metadata, source_dir, output_dir) else: # Single file metadata if "filename" not in metadata: print("❌ Invalid metadata format: missing 'filename' field") sys.exit(1) # If source directory is specified, look for the file there if source_dir: video_file = os.path.join(source_dir, metadata["filename"]) else: # Look for the file in the same directory as the JSON video_file = os.path.join(os.path.dirname(json_file), metadata["filename"]) success = process_single_file(metadata, video_file, output_dir) if not success: sys.exit(1) if __name__ == "__main__": main()