From 8a407a186a3da7873434019a6165d35548ed00d8 Mon Sep 17 00:00:00 2001 From: Klagarge Date: Mon, 21 Apr 2025 11:39:04 +0200 Subject: [PATCH] chore: init --- encode_video.py | 102 ++++++++++++++++++++++++++ get_metadata.py | 187 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 289 insertions(+) create mode 100755 encode_video.py create mode 100755 get_metadata.py diff --git a/encode_video.py b/encode_video.py new file mode 100755 index 0000000..b1ef79c --- /dev/null +++ b/encode_video.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 +import argparse +import os +import subprocess +import re +import sys + +SUPPORTED_EXTENSIONS = (".mp4", ".mkv", ".mov", ".avi") + +def get_duration(file_path): + result = subprocess.run([ + "ffprobe", "-v", "error", "-show_entries", "format=duration", + "-of", "default=noprint_wrappers=1:nokey=1", file_path + ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + return float(result.stdout.strip()) + +def encode(input_file, codec): + if codec == "x265": + ffmpeg_codec = "libx265" + crf = 26 + folder = "h265" + elif codec == "av1": + ffmpeg_codec = "libaom-av1" + crf = 32 + folder = "av1" + else: + raise ValueError("Unsupported codec") + + try: + duration = get_duration(input_file) + except Exception: + print(f"\n❌ Could not read duration for {input_file}") + return + + filename = os.path.basename(input_file) + name, _ = os.path.splitext(filename) + outdir = os.path.join(os.path.dirname(input_file), folder) + os.makedirs(outdir, exist_ok=True) + output_file = os.path.join(outdir, f"{name}.mkv") + + cmd = [ + "ffmpeg", + "-i", input_file, + "-map", "0", + "-c:v", ffmpeg_codec, + "-crf", str(crf), + "-c:a", "copy", + "-c:s", "copy", + "-y", output_file + ] + + print(f"\n🎬 Encoding {filename} → {folder}/{name}.mkv") + process = subprocess.Popen(cmd, stderr=subprocess.PIPE, text=True) + + time_re = re.compile(r"time=(\d+):(\d+):([\d.]+)") + last_percent = -1 + + for line in process.stderr: + match = time_re.search(line) + if match: + h, m, s = map(float, match.groups()) + current = h * 3600 + m * 60 + s + percent = int((current / duration) * 100) + if percent != last_percent: + print(f"\r⏳ Progress: {percent}%", end='', flush=True) + last_percent = percent + + process.wait() + print("\n✅ Done.") + +def encode_batch(directory, codec): + if not os.path.isdir(directory): + print(f"❌ Not a valid directory: {directory}") + return + + for root, _, files in os.walk(directory): + for file in files: + if file.lower().endswith(SUPPORTED_EXTENSIONS): + filepath = os.path.join(root, file) + encode(filepath, codec) + +def main(): + parser = argparse.ArgumentParser(description="Encode video(s) to x265 or AV1.") + parser.add_argument("input", nargs="?", help="Path to input file") + parser.add_argument("-d", "--directory", help="Path to a directory for batch encoding") + parser.add_argument("--codec", choices=["x265", "av1"], default="x265", help="Codec to use (default: x265)") + args = parser.parse_args() + + if args.directory: + encode_batch(args.directory, args.codec) + elif args.input: + if not os.path.isfile(args.input): + print(f"❌ File not found: {args.input}") + sys.exit(1) + encode(args.input, args.codec) + else: + print("❌ Please provide a file path or use -d for a directory.") + parser.print_help() + +if __name__ == "__main__": + main() + diff --git a/get_metadata.py b/get_metadata.py new file mode 100755 index 0000000..f0a325f --- /dev/null +++ b/get_metadata.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python3 +import argparse +import os +import subprocess +import json +import sys + +SUPPORTED_EXTENSIONS = (".mp4", ".mkv", ".mov", ".avi") + +def get_video_metadata(file_path): + """ + Extract metadata from a video file using ffprobe. + + Args: + file_path (str): Path to the video file + + Returns: + dict: Metadata information + """ + # Get general file info + cmd = [ + "ffprobe", "-v", "quiet", "-print_format", "json", + "-show_format", "-show_streams", file_path + ] + + try: + result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if result.returncode != 0: + print(f"❌ Error processing {file_path}: {result.stderr}") + return None + + data = json.loads(result.stdout) + + # Extract filename + filename = os.path.basename(file_path) + + # Initialize metadata structure + metadata = { + "filename": filename, + "audio_tracks": [], + "subtitle_tracks": [] + } + + # Process streams + for stream in data.get("streams", []): + codec_type = stream.get("codec_type") + + if codec_type == "audio": + track = { + "index": stream.get("index"), + "language": stream.get("tags", {}).get("language", "und"), + "region": stream.get("tags", {}).get("region", "und"), + "name": stream.get("tags", {}).get("title", ""), + "codec": stream.get("codec_name", ""), + "channels": stream.get("channels", 0), + "flags": { + "default": stream.get("disposition", {}).get("default", 0) == 1, + "forced": stream.get("disposition", {}).get("forced", 0) == 1, + "hearing_impaired": stream.get("disposition", {}).get("hearing_impaired", 0) == 1, + "visual_impaired": stream.get("disposition", {}).get("visual_impaired", 0) == 1, + "original": stream.get("disposition", {}).get("original", 0) == 1, + "commentary": stream.get("disposition", {}).get("comment", 0) == 1 + } + } + metadata["audio_tracks"].append(track) + + elif codec_type == "subtitle": + track = { + "index": stream.get("index"), + "language": stream.get("tags", {}).get("language", "und"), + "region": stream.get("tags", {}).get("region", "und"), + "name": stream.get("tags", {}).get("title", ""), + "codec": stream.get("codec_name", ""), + "flags": { + "default": stream.get("disposition", {}).get("default", 0) == 1, + "forced": stream.get("disposition", {}).get("forced", 0) == 1, + "hearing_impaired": stream.get("disposition", {}).get("hearing_impaired", 0) == 1, + "visual_impaired": stream.get("disposition", {}).get("visual_impaired", 0) == 1, + "original": stream.get("disposition", {}).get("original", 0) == 1, + "commentary": stream.get("disposition", {}).get("comment", 0) == 1 + } + } + metadata["subtitle_tracks"].append(track) + + return metadata + + except Exception as e: + print(f"❌ Error processing {file_path}: {str(e)}") + return None + +def process_file(file_path, output_file=None): + """ + Process a single video file and write metadata to JSON. + + Args: + file_path (str): Path to the video file + output_file (str, optional): Path to output JSON file + """ + if not os.path.isfile(file_path): + print(f"❌ File not found: {file_path}") + return False + + if not file_path.lower().endswith(SUPPORTED_EXTENSIONS): + print(f"❌ Unsupported file format: {file_path}") + return False + + print(f"📊 Extracting metadata from {os.path.basename(file_path)}") + metadata = get_video_metadata(file_path) + + if metadata: + if not output_file: + # Generate output filename based on input file + base_name = os.path.splitext(file_path)[0] + output_file = f"{base_name}_metadata.json" + + # Write metadata to JSON file + with open(output_file, 'w', encoding='utf-8') as f: + json.dump(metadata, f, indent=2, ensure_ascii=False) + + print(f"✅ Metadata saved to {output_file}") + return True + + return False + +def process_directory(directory_path, output_file=None): + """ + Process all video files in a directory and write metadata to JSON. + + Args: + directory_path (str): Path to the directory + output_file (str, optional): Path to output JSON file + """ + if not os.path.isdir(directory_path): + print(f"❌ Directory not found: {directory_path}") + return False + + all_metadata = {} + file_count = 0 + + for root, _, files in os.walk(directory_path): + for file in files: + if file.lower().endswith(SUPPORTED_EXTENSIONS): + file_path = os.path.join(root, file) + print(f"📊 Extracting metadata from {file}") + metadata = get_video_metadata(file_path) + + if metadata: + # Use relative path as key + rel_path = os.path.relpath(file_path, directory_path) + all_metadata[rel_path] = metadata + file_count += 1 + + if file_count == 0: + print(f"❌ No supported video files found in {directory_path}") + return False + + if not output_file: + # Generate output filename based on directory name + dir_name = os.path.basename(os.path.normpath(directory_path)) + output_file = f"{dir_name}_metadata.json" + + # Write all metadata to a single JSON file + with open(output_file, 'w', encoding='utf-8') as f: + json.dump(all_metadata, f, indent=2, ensure_ascii=False) + + print(f"✅ Metadata for {file_count} files saved to {output_file}") + return True + +def main(): + parser = argparse.ArgumentParser(description="Extract metadata from video files and save as JSON.") + parser.add_argument("input", help="Path to input video file or directory") + parser.add_argument("-o", "--output", help="Path to output JSON file") + args = parser.parse_args() + + input_path = args.input + output_file = args.output + + if os.path.isfile(input_path): + process_file(input_path, output_file) + elif os.path.isdir(input_path): + process_directory(input_path, output_file) + else: + print(f"❌ Path not found: {input_path}") + sys.exit(1) + +if __name__ == "__main__": + main()