274 lines
9.2 KiB
Python
Executable File
274 lines
9.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
|
|
def read_metadata_json(json_file):
|
|
"""
|
|
Read metadata from a JSON file.
|
|
|
|
Args:
|
|
json_file (str): Path to the JSON file
|
|
|
|
Returns:
|
|
dict: Metadata information
|
|
"""
|
|
try:
|
|
with open(json_file, 'r', encoding='utf-8') as f:
|
|
metadata = json.load(f)
|
|
return metadata
|
|
except Exception as e:
|
|
print(f"❌ Error reading JSON file: {str(e)}")
|
|
return None
|
|
|
|
def write_metadata_to_video(metadata, input_file, output_file=None):
|
|
"""
|
|
Write metadata to a video file using mkvmerge.
|
|
|
|
Args:
|
|
metadata (dict): Metadata information
|
|
input_file (str): Path to the input video file
|
|
output_file (str, optional): Path to the output video file
|
|
|
|
Returns:
|
|
bool: True if successful, False otherwise
|
|
"""
|
|
if not os.path.isfile(input_file):
|
|
print(f"❌ Input file not found: {input_file}")
|
|
return False
|
|
|
|
if not output_file:
|
|
# Create a temporary output file
|
|
base_name, ext = os.path.splitext(input_file)
|
|
output_file = f"{base_name}_modified{ext}"
|
|
|
|
# Start building the mkvmerge command
|
|
cmd = ["mkvmerge", "-o", output_file]
|
|
|
|
# Add global metadata (title)
|
|
if "title" in metadata:
|
|
cmd.extend(["--title", metadata["title"]])
|
|
|
|
# Process audio tracks
|
|
for track in metadata.get("audio_tracks", []):
|
|
# Use the actual track index from the metadata
|
|
track_id = track.get("index", 0)
|
|
|
|
# Set language
|
|
if "language" in track:
|
|
cmd.extend([f"--language", f"{track_id}:{track['language']}"])
|
|
|
|
# Set title/name
|
|
if "name" in track and track["name"]:
|
|
cmd.extend([f"--track-name", f"{track_id}:{track['name']}"])
|
|
|
|
# Set disposition flags
|
|
flags = track.get("flags", {})
|
|
|
|
if flags.get("default", False):
|
|
cmd.extend([f"--default-track", f"{track_id}:yes"])
|
|
else:
|
|
cmd.extend([f"--default-track", f"{track_id}:no"])
|
|
|
|
if flags.get("forced", False):
|
|
cmd.extend([f"--forced-track", f"{track_id}:yes"])
|
|
else:
|
|
cmd.extend([f"--forced-track", f"{track_id}:no"])
|
|
|
|
if flags.get("original", False):
|
|
cmd.extend([f"--original-flag", f"{track_id}:yes"])
|
|
else:
|
|
cmd.extend([f"--original-flag", f"{track_id}:no"])
|
|
|
|
# Process subtitle tracks
|
|
for track in metadata.get("subtitle_tracks", []):
|
|
# Use the actual track index from the metadata
|
|
track_id = track.get("index", 0)
|
|
|
|
# Set language
|
|
if "language" in track:
|
|
cmd.extend([f"--language", f"{track_id}:{track['language']}"])
|
|
|
|
# Set title/name
|
|
if "name" in track and track["name"]:
|
|
cmd.extend([f"--track-name", f"{track_id}:{track['name']}"])
|
|
|
|
# Set disposition flags
|
|
flags = track.get("flags", {})
|
|
|
|
if flags.get("default", False):
|
|
cmd.extend([f"--default-track", f"{track_id}:yes"])
|
|
else:
|
|
cmd.extend([f"--default-track", f"{track_id}:no"])
|
|
|
|
if flags.get("forced", False):
|
|
cmd.extend([f"--forced-track", f"{track_id}:yes"])
|
|
else:
|
|
cmd.extend([f"--forced-track", f"{track_id}:no"])
|
|
|
|
if flags.get("original", False):
|
|
cmd.extend([f"--original-flag", f"{track_id}:yes"])
|
|
else:
|
|
cmd.extend([f"--original-flag", f"{track_id}:no"])
|
|
|
|
# Add input file
|
|
cmd.append(input_file)
|
|
|
|
# Execute the mkvmerge command
|
|
print(f"🔄 Writing metadata to {os.path.basename(output_file)}")
|
|
print(f"Command: {' '.join(cmd)}")
|
|
|
|
try:
|
|
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
if result.returncode != 0:
|
|
print(f"❌ Error writing metadata: {result.stderr}")
|
|
return False
|
|
|
|
print(f"✅ Metadata written to {output_file}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ Error executing mkvmerge: {str(e)}")
|
|
return False
|
|
|
|
def process_single_file(metadata, video_file, output_dir=None):
|
|
"""
|
|
Process a single video file with the given metadata.
|
|
|
|
Args:
|
|
metadata (dict): Metadata for the video file
|
|
video_file (str): Path to the video file
|
|
output_dir (str, optional): Directory to save the output file
|
|
|
|
Returns:
|
|
bool: True if successful, False otherwise
|
|
"""
|
|
if not os.path.isfile(video_file):
|
|
print(f"❌ Video file not found: {video_file}")
|
|
return False
|
|
|
|
# Create output file path
|
|
if output_dir:
|
|
# Ensure output directory exists
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
# Use the same filename in the output directory
|
|
output_file = os.path.join(output_dir, os.path.basename(video_file))
|
|
else:
|
|
output_file = None # Let write_metadata_to_video create a default output file
|
|
|
|
# Write metadata to video
|
|
return write_metadata_to_video(metadata, video_file, output_file)
|
|
|
|
def process_directory(metadata_dict, source_dir, output_dir=None):
|
|
"""
|
|
Process all video files in the metadata dictionary.
|
|
|
|
Args:
|
|
metadata_dict (dict): Dictionary of metadata keyed by filename
|
|
source_dir (str): Directory containing the video files
|
|
output_dir (str, optional): Directory to save the output files
|
|
|
|
Returns:
|
|
bool: True if all files were processed successfully, False otherwise
|
|
"""
|
|
if not os.path.isdir(source_dir):
|
|
print(f"❌ Source directory not found: {source_dir}")
|
|
return False
|
|
|
|
# Create output directory if specified
|
|
if output_dir:
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
success = True
|
|
processed_count = 0
|
|
|
|
# Process each file in the metadata dictionary
|
|
for filename, file_metadata in metadata_dict.items():
|
|
# Construct the full path to the video file
|
|
video_file = os.path.join(source_dir, filename)
|
|
|
|
if not os.path.isfile(video_file):
|
|
print(f"❌ Video file not found: {video_file}")
|
|
success = False
|
|
continue
|
|
|
|
# Process the file
|
|
if process_single_file(file_metadata, video_file, output_dir):
|
|
processed_count += 1
|
|
else:
|
|
success = False
|
|
|
|
print(f"✅ Processed {processed_count} out of {len(metadata_dict)} files")
|
|
return success
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Write metadata from JSON to video files.")
|
|
parser.add_argument("json_file", help="Path to input JSON metadata file")
|
|
parser.add_argument("-o", "--output", help="Path to output directory")
|
|
parser.add_argument("-s", "--source", help="Source directory (overrides automatic detection)")
|
|
args = parser.parse_args()
|
|
|
|
json_file = args.json_file
|
|
output_dir = args.output
|
|
source_dir = args.source
|
|
|
|
if not os.path.isfile(json_file):
|
|
print(f"❌ JSON file not found: {json_file}")
|
|
sys.exit(1)
|
|
|
|
# Read metadata from JSON
|
|
metadata = read_metadata_json(json_file)
|
|
if not metadata:
|
|
sys.exit(1)
|
|
|
|
# Determine if the JSON contains metadata for multiple files or a single file
|
|
is_multi_file = isinstance(metadata, dict) and all(isinstance(metadata[key], dict) for key in metadata)
|
|
|
|
# If source directory is not specified, try to determine it from the JSON filename
|
|
if not source_dir and is_multi_file:
|
|
# Extract folder name from JSON filename (e.g., "Millenium" from "Millenium_metadata.json")
|
|
json_basename = os.path.basename(json_file)
|
|
if "_metadata.json" in json_basename:
|
|
folder_name = json_basename.split("_metadata.json")[0]
|
|
potential_source_dir = os.path.join(os.path.dirname(os.path.abspath(json_file)), folder_name)
|
|
|
|
if os.path.isdir(potential_source_dir):
|
|
source_dir = potential_source_dir
|
|
print(f"📂 Using source directory: {source_dir}")
|
|
|
|
# If no output directory is specified, create one based on the source directory
|
|
if not output_dir and source_dir:
|
|
output_dir = os.path.join("ready", os.path.basename(source_dir))
|
|
print(f"📂 Using output directory: {output_dir}")
|
|
|
|
# Process files based on the metadata format
|
|
if is_multi_file:
|
|
if not source_dir:
|
|
print("❌ Source directory not specified and could not be determined automatically.")
|
|
print(" Please specify a source directory with --source or use a JSON filename like 'FolderName_metadata.json'")
|
|
sys.exit(1)
|
|
|
|
success = process_directory(metadata, source_dir, output_dir)
|
|
else:
|
|
# Single file metadata
|
|
if "filename" not in metadata:
|
|
print("❌ Invalid metadata format: missing 'filename' field")
|
|
sys.exit(1)
|
|
|
|
# If source directory is specified, look for the file there
|
|
if source_dir:
|
|
video_file = os.path.join(source_dir, metadata["filename"])
|
|
else:
|
|
# Look for the file in the same directory as the JSON
|
|
video_file = os.path.join(os.path.dirname(json_file), metadata["filename"])
|
|
|
|
success = process_single_file(metadata, video_file, output_dir)
|
|
|
|
if not success:
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|