refactor: adapt metadata writer
This commit is contained in:
parent
ffe847fb5e
commit
7a49849ee9
45
scripts/write_metadata.py
Normal file
45
scripts/write_metadata.py
Normal file
@ -0,0 +1,45 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import sys
|
||||
|
||||
from src.metadata_writer import MetadataWriter
|
||||
|
||||
|
||||
def main():
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="[%(levelname)s] %(message)s"
|
||||
)
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Write metadata from JSON to video files"
|
||||
)
|
||||
parser.add_argument(
|
||||
"json_file",
|
||||
help="Path to input JSON metadata file"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-o", "--output",
|
||||
help="Path of the output directory"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-s", "--source",
|
||||
help="Source directory (overrides automatic detection)"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
json_file = args.json_file
|
||||
output_dir = args.output
|
||||
source_dir = args.source
|
||||
|
||||
writer: MetadataWriter = MetadataWriter()
|
||||
|
||||
success: bool = writer.process_metadata(json_file, source_dir, output_dir)
|
||||
|
||||
if not success:
|
||||
sys.exit(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@ -1,196 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import os
|
||||
import subprocess
|
||||
import json
|
||||
import sys
|
||||
|
||||
SUPPORTED_EXTENSIONS = (".mp4", ".mkv", ".mov", ".avi")
|
||||
|
||||
def get_video_metadata(file_path):
|
||||
"""
|
||||
Extract metadata from a video file using ffprobe.
|
||||
|
||||
Args:
|
||||
file_path (str): Path to the video file
|
||||
|
||||
Returns:
|
||||
dict: Metadata information
|
||||
"""
|
||||
# Get general file info
|
||||
cmd = [
|
||||
"ffprobe", "-v", "quiet", "-print_format", "json",
|
||||
"-show_format", "-show_streams", file_path
|
||||
]
|
||||
|
||||
try:
|
||||
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
||||
if result.returncode != 0:
|
||||
print(f"❌ Error processing {file_path}: {result.stderr}")
|
||||
return None
|
||||
|
||||
data = json.loads(result.stdout)
|
||||
|
||||
# Extract filename and title
|
||||
filename = os.path.basename(file_path)
|
||||
title = data.get("format", {}).get("tags", {}).get("title", filename)
|
||||
|
||||
# Initialize metadata structure
|
||||
metadata = {
|
||||
"filename": filename,
|
||||
"title": title,
|
||||
"audio_tracks": [],
|
||||
"subtitle_tracks": []
|
||||
}
|
||||
|
||||
# Process streams
|
||||
for stream in data.get("streams", []):
|
||||
codec_type = stream.get("codec_type")
|
||||
|
||||
if codec_type == "audio":
|
||||
track = {
|
||||
"index": stream.get("index"),
|
||||
"language": stream.get("tags", {}).get("language", "und"),
|
||||
"name": stream.get("tags", {}).get("title", ""),
|
||||
"channels": stream.get("channels", 0),
|
||||
"flags": {
|
||||
"default": stream.get("disposition", {}).get("default", 0) == 1,
|
||||
"visual_impaired": stream.get("disposition", {}).get("visual_impaired", 0) == 1,
|
||||
"original": stream.get("disposition", {}).get("original", 0) == 1,
|
||||
"commentary": stream.get("disposition", {}).get("comment", 0) == 1
|
||||
}
|
||||
}
|
||||
metadata["audio_tracks"].append(track)
|
||||
|
||||
elif codec_type == "subtitle":
|
||||
track = {
|
||||
"index": stream.get("index"),
|
||||
"language": stream.get("tags", {}).get("language", "und"),
|
||||
"name": stream.get("tags", {}).get("title", ""),
|
||||
"flags": {
|
||||
"default": stream.get("disposition", {}).get("default", 0) == 1,
|
||||
"forced": stream.get("disposition", {}).get("forced", 0) == 1,
|
||||
"hearing_impaired": stream.get("disposition", {}).get("hearing_impaired", 0) == 1,
|
||||
"original": stream.get("disposition", {}).get("original", 0) == 1,
|
||||
"commentary": stream.get("disposition", {}).get("comment", 0) == 1
|
||||
}
|
||||
}
|
||||
metadata["subtitle_tracks"].append(track)
|
||||
|
||||
return metadata
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Error processing {file_path}: {str(e)}")
|
||||
return None
|
||||
|
||||
def process_file(file_path, output_dir=None):
|
||||
"""
|
||||
Process a single video file and write metadata to JSON.
|
||||
|
||||
Args:
|
||||
file_path (str): Path to the video file
|
||||
output_dir (str, optional): Directory where the output JSON file will be saved
|
||||
"""
|
||||
if not os.path.isfile(file_path):
|
||||
print(f"❌ File not found: {file_path}")
|
||||
return False
|
||||
|
||||
if not file_path.lower().endswith(SUPPORTED_EXTENSIONS):
|
||||
print(f"❌ Unsupported file format: {file_path}")
|
||||
return False
|
||||
|
||||
print(f"📊 Extracting metadata from {os.path.basename(file_path)}")
|
||||
metadata = get_video_metadata(file_path)
|
||||
|
||||
if metadata:
|
||||
# Generate output filename based on input file
|
||||
filename = os.path.basename(os.path.splitext(file_path)[0]) + "_metadata.json"
|
||||
|
||||
if output_dir:
|
||||
# Ensure output directory exists
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
output_path = os.path.join(output_dir, filename)
|
||||
else:
|
||||
# If no output directory specified, save in the same directory as the input file
|
||||
base_name = os.path.splitext(file_path)[0]
|
||||
output_path = f"{base_name}_metadata.json"
|
||||
|
||||
# Write metadata to JSON file
|
||||
with open(output_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(metadata, f, indent=2, ensure_ascii=False)
|
||||
|
||||
print(f"✅ Metadata saved to {output_path}")
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def process_directory(directory_path, output_dir=None):
|
||||
"""
|
||||
Process all video files in a directory and write metadata to JSON.
|
||||
|
||||
Args:
|
||||
directory_path (str): Path to the directory
|
||||
output_dir (str, optional): Directory where the output JSON file will be saved
|
||||
"""
|
||||
if not os.path.isdir(directory_path):
|
||||
print(f"❌ Directory not found: {directory_path}")
|
||||
return False
|
||||
|
||||
all_metadata = {}
|
||||
file_count = 0
|
||||
|
||||
for root, _, files in os.walk(directory_path):
|
||||
for file in files:
|
||||
if file.lower().endswith(SUPPORTED_EXTENSIONS):
|
||||
file_path = os.path.join(root, file)
|
||||
print(f"📊 Extracting metadata from {file}")
|
||||
metadata = get_video_metadata(file_path)
|
||||
|
||||
if metadata:
|
||||
# Use relative path as key
|
||||
rel_path = os.path.relpath(file_path, directory_path)
|
||||
all_metadata[rel_path] = metadata
|
||||
file_count += 1
|
||||
|
||||
if file_count == 0:
|
||||
print(f"❌ No supported video files found in {directory_path}")
|
||||
return False
|
||||
|
||||
# Generate output filename based on directory name
|
||||
dir_name = os.path.basename(os.path.normpath(directory_path))
|
||||
filename = f"{dir_name}_metadata.json"
|
||||
|
||||
if output_dir:
|
||||
# Ensure output directory exists
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
output_path = os.path.join(output_dir, filename)
|
||||
else:
|
||||
# If no output directory specified, save in the current directory
|
||||
output_path = filename
|
||||
|
||||
# Write all metadata to a single JSON file
|
||||
with open(output_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(all_metadata, f, indent=2, ensure_ascii=False)
|
||||
|
||||
print(f"✅ Metadata for {file_count} files saved to {output_path}")
|
||||
return True
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Extract metadata from video files and save as JSON.")
|
||||
parser.add_argument("input", help="Path to input video file or directory")
|
||||
parser.add_argument("-o", "--output", help="Directory path where output JSON files will be saved")
|
||||
args = parser.parse_args()
|
||||
|
||||
input_path = args.input
|
||||
output_dir = args.output
|
||||
|
||||
if os.path.isfile(input_path):
|
||||
process_file(input_path, output_dir)
|
||||
elif os.path.isdir(input_path):
|
||||
process_directory(input_path, output_dir)
|
||||
else:
|
||||
print(f"❌ Path not found: {input_path}")
|
||||
sys.exit(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
240
src/metadata_writer.py
Normal file
240
src/metadata_writer.py
Normal file
@ -0,0 +1,240 @@
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class MetadataWriter:
|
||||
SUPPORTED_EXTENSIONS = (".mp4", ".mkv", ".mov", ".avi")
|
||||
|
||||
def __init__(self):
|
||||
self.logger: logging.Logger = logging.getLogger("MetadataWriter")
|
||||
|
||||
def apply_metadata(self, metadata: dict, in_path: str, out_path: Optional[str] = None) -> bool:
|
||||
"""
|
||||
Writes metadata to a video file using mkvmerge
|
||||
|
||||
:param metadata: Metadata information
|
||||
:param in_path: Path of the input video file
|
||||
:param out_path: Path of the output video file. If None, ``"_modified"`` is appended to ``in_path`` instead
|
||||
:return: True if successful, False otherwise
|
||||
"""
|
||||
|
||||
if not os.path.isfile(in_path):
|
||||
self.logger.error(f"Input file not found: {in_path}")
|
||||
return False
|
||||
|
||||
if out_path is None:
|
||||
# Create a temporary output file
|
||||
base_name, ext = os.path.splitext(in_path)
|
||||
out_path: str = f"{base_name}_modified{ext}"
|
||||
|
||||
# Start building the mkvmerge command
|
||||
cmd: list[str] = [
|
||||
"mkvmerge",
|
||||
"-o", out_path
|
||||
]
|
||||
|
||||
# Add global metadata (title)
|
||||
if "title" in metadata:
|
||||
cmd.extend(["--title", metadata["title"]])
|
||||
|
||||
# Process audio + subtitle tracks
|
||||
tracks: list[dict] = metadata.get("audio_tracks", []) + metadata.get("subtitle_tracks", [])
|
||||
for track in tracks:
|
||||
# Use the actual track index from the metadata
|
||||
track_id = track.get("index", 0)
|
||||
|
||||
# Set language
|
||||
if "language" in track:
|
||||
cmd.extend(["--language", f"{track_id}:{track["language"]}"])
|
||||
|
||||
# Set title/name
|
||||
if "name" in track and track["name"]:
|
||||
cmd.extend(["--track-name", f"{track_id}:{track["name"]}"])
|
||||
|
||||
# Set disposition flags
|
||||
flags = track.get("flags", {})
|
||||
|
||||
def yes_no(flag: str):
|
||||
return f"{track_id}:{"yes" if flags.get(flag, False) else "no"}"
|
||||
|
||||
cmd.extend(["--default-track", yes_no("default")])
|
||||
cmd.extend(["--forced-track", yes_no("forced")])
|
||||
cmd.extend(["--original-flag", yes_no("original")])
|
||||
|
||||
# Add input file
|
||||
cmd.append(in_path)
|
||||
|
||||
# Execute the mkvmerge command
|
||||
self.logger.debug(f"Writing metadata to {os.path.basename(out_path)}")
|
||||
|
||||
try:
|
||||
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
||||
if result.returncode != 0:
|
||||
self.logger.error(f"Error writing metadata: {result.stderr}")
|
||||
return False
|
||||
|
||||
self.logger.debug(f"Metadata written to {out_path}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error executing mkvmerge: {str(e)}")
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def read_metadata(path: str) -> Optional[dict]:
|
||||
try:
|
||||
with open(path, "r") as f:
|
||||
metadata: dict = json.load(f)
|
||||
return metadata
|
||||
|
||||
except:
|
||||
return None
|
||||
|
||||
def process_file(self, metadata_or_path: str|dict, file_path: str, output_dir: Optional[str] = None) -> bool:
|
||||
"""
|
||||
Processes a single video file with the given metadata
|
||||
|
||||
:param metadata_or_path: Metadata dict or path of the metadata file
|
||||
:param file_path: Path of the video file
|
||||
:param output_dir: Directory to save the output file to
|
||||
:return: True if successful, False otherwise
|
||||
"""
|
||||
|
||||
metadata: dict
|
||||
if isinstance(metadata_or_path, str):
|
||||
metadata = self.read_metadata(metadata_or_path)
|
||||
if metadata is None:
|
||||
return False
|
||||
else:
|
||||
metadata = metadata_or_path
|
||||
|
||||
# Create output file path
|
||||
if output_dir is not None:
|
||||
# Ensure output directory exists
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
# Use the same filename in the output directory
|
||||
output_file = os.path.join(output_dir, os.path.basename(file_path))
|
||||
else:
|
||||
output_file = None
|
||||
|
||||
# Write metadata to video
|
||||
return self.apply_metadata(metadata, file_path, output_file)
|
||||
|
||||
def process_directory(self, metadata_or_path: str|dict, source_dir: str, output_dir: Optional[str] = None) -> bool:
|
||||
"""
|
||||
Processes all video files in the metadata dictionary
|
||||
|
||||
:param metadata_or_path: Dictionary of metadata keyed by filename
|
||||
:param source_dir: Directory containing the video files
|
||||
:param output_dir: Directory to save the output files to
|
||||
:return: True if all files were processed successfully, False otherwise
|
||||
"""
|
||||
|
||||
metadata: dict
|
||||
if isinstance(metadata_or_path, str):
|
||||
metadata = self.read_metadata(metadata_or_path)
|
||||
if metadata is None:
|
||||
return False
|
||||
else:
|
||||
metadata = metadata_or_path
|
||||
|
||||
if not os.path.isdir(source_dir):
|
||||
self.logger.error(f"Source directory not found: {source_dir}")
|
||||
return False
|
||||
|
||||
# Create output directory if specified
|
||||
if output_dir:
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
success: bool = True
|
||||
processed_count: int = 0
|
||||
|
||||
# Process each file in the metadata dictionary
|
||||
for filename, file_metadata in metadata.items():
|
||||
# Construct the full path to the video file
|
||||
video_file: str = os.path.join(source_dir, filename)
|
||||
|
||||
if not os.path.isfile(video_file):
|
||||
self.logger.error(f"Video file not found: {video_file}")
|
||||
success = False
|
||||
continue
|
||||
|
||||
# Process the file
|
||||
if self.process_file(file_metadata, video_file, output_dir):
|
||||
processed_count += 1
|
||||
else:
|
||||
success = False
|
||||
|
||||
self.logger.debug(f"Processed {processed_count} out of {len(metadata)} files")
|
||||
return success
|
||||
|
||||
def process_metadata(self, metadata_or_path: str|dict, source_dir: Optional[str] = None, output_dir: Optional[str] = None) -> bool:
|
||||
metadata_as_path: bool = isinstance(metadata_or_path, str)
|
||||
|
||||
metadata: dict
|
||||
if metadata_as_path:
|
||||
metadata = self.read_metadata(metadata_or_path)
|
||||
if metadata is None:
|
||||
return False
|
||||
else:
|
||||
metadata = metadata_or_path
|
||||
|
||||
# Determine if the JSON contains metadata for multiple files or a single file
|
||||
is_multi_file = isinstance(metadata, dict) and all(isinstance(metadata[key], dict) for key in metadata)
|
||||
|
||||
# If source directory is not specified, try to determine it from the JSON filename
|
||||
if source_dir is None and is_multi_file and metadata_as_path:
|
||||
# Extract folder name from JSON filename (e.g., "Millenium" from "Millenium_metadata.json")
|
||||
json_basename: str = os.path.basename(metadata_or_path)
|
||||
if json_basename.endswith("_metadata.json"):
|
||||
folder_name: str = json_basename.split("_metadata.json")[0]
|
||||
potential_source_dir: str = os.path.join(
|
||||
os.path.dirname(os.path.abspath(metadata_or_path)),
|
||||
folder_name
|
||||
)
|
||||
|
||||
if os.path.isdir(potential_source_dir):
|
||||
source_dir: str = potential_source_dir
|
||||
self.logger.debug(f"Using source directory: {source_dir}")
|
||||
|
||||
# If no output directory is specified, create one based on the source directory
|
||||
if output_dir is None and source_dir is not None:
|
||||
output_dir = os.path.join("ready", os.path.basename(source_dir))
|
||||
self.logger.debug(f"Using output directory: {output_dir}")
|
||||
|
||||
# Process files based on the metadata format
|
||||
if is_multi_file:
|
||||
if source_dir is None:
|
||||
self.logger.error(
|
||||
"Source directory not specified and could not be determined automatically. " +
|
||||
"Please specify a source directory with --source or use a JSON filename like 'FolderName_metadata.json'"
|
||||
)
|
||||
return False
|
||||
|
||||
success = self.process_directory(metadata, source_dir, output_dir)
|
||||
else:
|
||||
# Single file metadata
|
||||
if "filename" not in metadata:
|
||||
self.logger.error("Invalid metadata format: missing 'filename' field")
|
||||
return False
|
||||
|
||||
# If source directory is specified, look for the file there
|
||||
video_file: str
|
||||
if source_dir is not None:
|
||||
video_file = os.path.join(source_dir, metadata["filename"])
|
||||
elif metadata_as_path:
|
||||
# Look for the file in the same directory as the JSON
|
||||
video_file = os.path.join(os.path.dirname(metadata_or_path), metadata["filename"])
|
||||
else:
|
||||
self.logger.error(
|
||||
"Source directory not specified and video path could not be determined automatically. " +
|
||||
"Please specify a source directory with --source or use JSON filename like 'VideoName_metadata.json'"
|
||||
)
|
||||
return False
|
||||
|
||||
success = self.process_file(metadata, video_file, output_dir)
|
||||
return success
|
@ -1,273 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
def read_metadata_json(json_file):
|
||||
"""
|
||||
Read metadata from a JSON file.
|
||||
|
||||
Args:
|
||||
json_file (str): Path to the JSON file
|
||||
|
||||
Returns:
|
||||
dict: Metadata information
|
||||
"""
|
||||
try:
|
||||
with open(json_file, 'r', encoding='utf-8') as f:
|
||||
metadata = json.load(f)
|
||||
return metadata
|
||||
except Exception as e:
|
||||
print(f"❌ Error reading JSON file: {str(e)}")
|
||||
return None
|
||||
|
||||
def write_metadata_to_video(metadata, input_file, output_file=None):
|
||||
"""
|
||||
Write metadata to a video file using mkvmerge.
|
||||
|
||||
Args:
|
||||
metadata (dict): Metadata information
|
||||
input_file (str): Path to the input video file
|
||||
output_file (str, optional): Path to the output video file
|
||||
|
||||
Returns:
|
||||
bool: True if successful, False otherwise
|
||||
"""
|
||||
if not os.path.isfile(input_file):
|
||||
print(f"❌ Input file not found: {input_file}")
|
||||
return False
|
||||
|
||||
if not output_file:
|
||||
# Create a temporary output file
|
||||
base_name, ext = os.path.splitext(input_file)
|
||||
output_file = f"{base_name}_modified{ext}"
|
||||
|
||||
# Start building the mkvmerge command
|
||||
cmd = ["mkvmerge", "-o", output_file]
|
||||
|
||||
# Add global metadata (title)
|
||||
if "title" in metadata:
|
||||
cmd.extend(["--title", metadata["title"]])
|
||||
|
||||
# Process audio tracks
|
||||
for track in metadata.get("audio_tracks", []):
|
||||
# Use the actual track index from the metadata
|
||||
track_id = track.get("index", 0)
|
||||
|
||||
# Set language
|
||||
if "language" in track:
|
||||
cmd.extend([f"--language", f"{track_id}:{track['language']}"])
|
||||
|
||||
# Set title/name
|
||||
if "name" in track and track["name"]:
|
||||
cmd.extend([f"--track-name", f"{track_id}:{track['name']}"])
|
||||
|
||||
# Set disposition flags
|
||||
flags = track.get("flags", {})
|
||||
|
||||
if flags.get("default", False):
|
||||
cmd.extend([f"--default-track", f"{track_id}:yes"])
|
||||
else:
|
||||
cmd.extend([f"--default-track", f"{track_id}:no"])
|
||||
|
||||
if flags.get("forced", False):
|
||||
cmd.extend([f"--forced-track", f"{track_id}:yes"])
|
||||
else:
|
||||
cmd.extend([f"--forced-track", f"{track_id}:no"])
|
||||
|
||||
if flags.get("original", False):
|
||||
cmd.extend([f"--original-flag", f"{track_id}:yes"])
|
||||
else:
|
||||
cmd.extend([f"--original-flag", f"{track_id}:no"])
|
||||
|
||||
# Process subtitle tracks
|
||||
for track in metadata.get("subtitle_tracks", []):
|
||||
# Use the actual track index from the metadata
|
||||
track_id = track.get("index", 0)
|
||||
|
||||
# Set language
|
||||
if "language" in track:
|
||||
cmd.extend([f"--language", f"{track_id}:{track['language']}"])
|
||||
|
||||
# Set title/name
|
||||
if "name" in track and track["name"]:
|
||||
cmd.extend([f"--track-name", f"{track_id}:{track['name']}"])
|
||||
|
||||
# Set disposition flags
|
||||
flags = track.get("flags", {})
|
||||
|
||||
if flags.get("default", False):
|
||||
cmd.extend([f"--default-track", f"{track_id}:yes"])
|
||||
else:
|
||||
cmd.extend([f"--default-track", f"{track_id}:no"])
|
||||
|
||||
if flags.get("forced", False):
|
||||
cmd.extend([f"--forced-track", f"{track_id}:yes"])
|
||||
else:
|
||||
cmd.extend([f"--forced-track", f"{track_id}:no"])
|
||||
|
||||
if flags.get("original", False):
|
||||
cmd.extend([f"--original-flag", f"{track_id}:yes"])
|
||||
else:
|
||||
cmd.extend([f"--original-flag", f"{track_id}:no"])
|
||||
|
||||
# Add input file
|
||||
cmd.append(input_file)
|
||||
|
||||
# Execute the mkvmerge command
|
||||
print(f"🔄 Writing metadata to {os.path.basename(output_file)}")
|
||||
print(f"Command: {' '.join(cmd)}")
|
||||
|
||||
try:
|
||||
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
||||
if result.returncode != 0:
|
||||
print(f"❌ Error writing metadata: {result.stderr}")
|
||||
return False
|
||||
|
||||
print(f"✅ Metadata written to {output_file}")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"❌ Error executing mkvmerge: {str(e)}")
|
||||
return False
|
||||
|
||||
def process_single_file(metadata, video_file, output_dir=None):
|
||||
"""
|
||||
Process a single video file with the given metadata.
|
||||
|
||||
Args:
|
||||
metadata (dict): Metadata for the video file
|
||||
video_file (str): Path to the video file
|
||||
output_dir (str, optional): Directory to save the output file
|
||||
|
||||
Returns:
|
||||
bool: True if successful, False otherwise
|
||||
"""
|
||||
if not os.path.isfile(video_file):
|
||||
print(f"❌ Video file not found: {video_file}")
|
||||
return False
|
||||
|
||||
# Create output file path
|
||||
if output_dir:
|
||||
# Ensure output directory exists
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
# Use the same filename in the output directory
|
||||
output_file = os.path.join(output_dir, os.path.basename(video_file))
|
||||
else:
|
||||
output_file = None # Let write_metadata_to_video create a default output file
|
||||
|
||||
# Write metadata to video
|
||||
return write_metadata_to_video(metadata, video_file, output_file)
|
||||
|
||||
def process_directory(metadata_dict, source_dir, output_dir=None):
|
||||
"""
|
||||
Process all video files in the metadata dictionary.
|
||||
|
||||
Args:
|
||||
metadata_dict (dict): Dictionary of metadata keyed by filename
|
||||
source_dir (str): Directory containing the video files
|
||||
output_dir (str, optional): Directory to save the output files
|
||||
|
||||
Returns:
|
||||
bool: True if all files were processed successfully, False otherwise
|
||||
"""
|
||||
if not os.path.isdir(source_dir):
|
||||
print(f"❌ Source directory not found: {source_dir}")
|
||||
return False
|
||||
|
||||
# Create output directory if specified
|
||||
if output_dir:
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
success = True
|
||||
processed_count = 0
|
||||
|
||||
# Process each file in the metadata dictionary
|
||||
for filename, file_metadata in metadata_dict.items():
|
||||
# Construct the full path to the video file
|
||||
video_file = os.path.join(source_dir, filename)
|
||||
|
||||
if not os.path.isfile(video_file):
|
||||
print(f"❌ Video file not found: {video_file}")
|
||||
success = False
|
||||
continue
|
||||
|
||||
# Process the file
|
||||
if process_single_file(file_metadata, video_file, output_dir):
|
||||
processed_count += 1
|
||||
else:
|
||||
success = False
|
||||
|
||||
print(f"✅ Processed {processed_count} out of {len(metadata_dict)} files")
|
||||
return success
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Write metadata from JSON to video files.")
|
||||
parser.add_argument("json_file", help="Path to input JSON metadata file")
|
||||
parser.add_argument("-o", "--output", help="Path to output directory")
|
||||
parser.add_argument("-s", "--source", help="Source directory (overrides automatic detection)")
|
||||
args = parser.parse_args()
|
||||
|
||||
json_file = args.json_file
|
||||
output_dir = args.output
|
||||
source_dir = args.source
|
||||
|
||||
if not os.path.isfile(json_file):
|
||||
print(f"❌ JSON file not found: {json_file}")
|
||||
sys.exit(1)
|
||||
|
||||
# Read metadata from JSON
|
||||
metadata = read_metadata_json(json_file)
|
||||
if not metadata:
|
||||
sys.exit(1)
|
||||
|
||||
# Determine if the JSON contains metadata for multiple files or a single file
|
||||
is_multi_file = isinstance(metadata, dict) and all(isinstance(metadata[key], dict) for key in metadata)
|
||||
|
||||
# If source directory is not specified, try to determine it from the JSON filename
|
||||
if not source_dir and is_multi_file:
|
||||
# Extract folder name from JSON filename (e.g., "Millenium" from "Millenium_metadata.json")
|
||||
json_basename = os.path.basename(json_file)
|
||||
if "_metadata.json" in json_basename:
|
||||
folder_name = json_basename.split("_metadata.json")[0]
|
||||
potential_source_dir = os.path.join(os.path.dirname(os.path.abspath(json_file)), folder_name)
|
||||
|
||||
if os.path.isdir(potential_source_dir):
|
||||
source_dir = potential_source_dir
|
||||
print(f"📂 Using source directory: {source_dir}")
|
||||
|
||||
# If no output directory is specified, create one based on the source directory
|
||||
if not output_dir and source_dir:
|
||||
output_dir = os.path.join("ready", os.path.basename(source_dir))
|
||||
print(f"📂 Using output directory: {output_dir}")
|
||||
|
||||
# Process files based on the metadata format
|
||||
if is_multi_file:
|
||||
if not source_dir:
|
||||
print("❌ Source directory not specified and could not be determined automatically.")
|
||||
print(" Please specify a source directory with --source or use a JSON filename like 'FolderName_metadata.json'")
|
||||
sys.exit(1)
|
||||
|
||||
success = process_directory(metadata, source_dir, output_dir)
|
||||
else:
|
||||
# Single file metadata
|
||||
if "filename" not in metadata:
|
||||
print("❌ Invalid metadata format: missing 'filename' field")
|
||||
sys.exit(1)
|
||||
|
||||
# If source directory is specified, look for the file there
|
||||
if source_dir:
|
||||
video_file = os.path.join(source_dir, metadata["filename"])
|
||||
else:
|
||||
# Look for the file in the same directory as the JSON
|
||||
video_file = os.path.join(os.path.dirname(json_file), metadata["filename"])
|
||||
|
||||
success = process_single_file(metadata, video_file, output_dir)
|
||||
|
||||
if not success:
|
||||
sys.exit(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
x
Reference in New Issue
Block a user