chore: init
This commit is contained in:
commit
8a407a186a
102
encode_video.py
Executable file
102
encode_video.py
Executable file
@ -0,0 +1,102 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import os
|
||||
import subprocess
|
||||
import re
|
||||
import sys
|
||||
|
||||
SUPPORTED_EXTENSIONS = (".mp4", ".mkv", ".mov", ".avi")
|
||||
|
||||
def get_duration(file_path):
|
||||
result = subprocess.run([
|
||||
"ffprobe", "-v", "error", "-show_entries", "format=duration",
|
||||
"-of", "default=noprint_wrappers=1:nokey=1", file_path
|
||||
], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
||||
return float(result.stdout.strip())
|
||||
|
||||
def encode(input_file, codec):
|
||||
if codec == "x265":
|
||||
ffmpeg_codec = "libx265"
|
||||
crf = 26
|
||||
folder = "h265"
|
||||
elif codec == "av1":
|
||||
ffmpeg_codec = "libaom-av1"
|
||||
crf = 32
|
||||
folder = "av1"
|
||||
else:
|
||||
raise ValueError("Unsupported codec")
|
||||
|
||||
try:
|
||||
duration = get_duration(input_file)
|
||||
except Exception:
|
||||
print(f"\n❌ Could not read duration for {input_file}")
|
||||
return
|
||||
|
||||
filename = os.path.basename(input_file)
|
||||
name, _ = os.path.splitext(filename)
|
||||
outdir = os.path.join(os.path.dirname(input_file), folder)
|
||||
os.makedirs(outdir, exist_ok=True)
|
||||
output_file = os.path.join(outdir, f"{name}.mkv")
|
||||
|
||||
cmd = [
|
||||
"ffmpeg",
|
||||
"-i", input_file,
|
||||
"-map", "0",
|
||||
"-c:v", ffmpeg_codec,
|
||||
"-crf", str(crf),
|
||||
"-c:a", "copy",
|
||||
"-c:s", "copy",
|
||||
"-y", output_file
|
||||
]
|
||||
|
||||
print(f"\n🎬 Encoding {filename} → {folder}/{name}.mkv")
|
||||
process = subprocess.Popen(cmd, stderr=subprocess.PIPE, text=True)
|
||||
|
||||
time_re = re.compile(r"time=(\d+):(\d+):([\d.]+)")
|
||||
last_percent = -1
|
||||
|
||||
for line in process.stderr:
|
||||
match = time_re.search(line)
|
||||
if match:
|
||||
h, m, s = map(float, match.groups())
|
||||
current = h * 3600 + m * 60 + s
|
||||
percent = int((current / duration) * 100)
|
||||
if percent != last_percent:
|
||||
print(f"\r⏳ Progress: {percent}%", end='', flush=True)
|
||||
last_percent = percent
|
||||
|
||||
process.wait()
|
||||
print("\n✅ Done.")
|
||||
|
||||
def encode_batch(directory, codec):
|
||||
if not os.path.isdir(directory):
|
||||
print(f"❌ Not a valid directory: {directory}")
|
||||
return
|
||||
|
||||
for root, _, files in os.walk(directory):
|
||||
for file in files:
|
||||
if file.lower().endswith(SUPPORTED_EXTENSIONS):
|
||||
filepath = os.path.join(root, file)
|
||||
encode(filepath, codec)
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Encode video(s) to x265 or AV1.")
|
||||
parser.add_argument("input", nargs="?", help="Path to input file")
|
||||
parser.add_argument("-d", "--directory", help="Path to a directory for batch encoding")
|
||||
parser.add_argument("--codec", choices=["x265", "av1"], default="x265", help="Codec to use (default: x265)")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.directory:
|
||||
encode_batch(args.directory, args.codec)
|
||||
elif args.input:
|
||||
if not os.path.isfile(args.input):
|
||||
print(f"❌ File not found: {args.input}")
|
||||
sys.exit(1)
|
||||
encode(args.input, args.codec)
|
||||
else:
|
||||
print("❌ Please provide a file path or use -d for a directory.")
|
||||
parser.print_help()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
187
get_metadata.py
Executable file
187
get_metadata.py
Executable file
@ -0,0 +1,187 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import os
|
||||
import subprocess
|
||||
import json
|
||||
import sys
|
||||
|
||||
SUPPORTED_EXTENSIONS = (".mp4", ".mkv", ".mov", ".avi")
|
||||
|
||||
def get_video_metadata(file_path):
|
||||
"""
|
||||
Extract metadata from a video file using ffprobe.
|
||||
|
||||
Args:
|
||||
file_path (str): Path to the video file
|
||||
|
||||
Returns:
|
||||
dict: Metadata information
|
||||
"""
|
||||
# Get general file info
|
||||
cmd = [
|
||||
"ffprobe", "-v", "quiet", "-print_format", "json",
|
||||
"-show_format", "-show_streams", file_path
|
||||
]
|
||||
|
||||
try:
|
||||
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
||||
if result.returncode != 0:
|
||||
print(f"❌ Error processing {file_path}: {result.stderr}")
|
||||
return None
|
||||
|
||||
data = json.loads(result.stdout)
|
||||
|
||||
# Extract filename
|
||||
filename = os.path.basename(file_path)
|
||||
|
||||
# Initialize metadata structure
|
||||
metadata = {
|
||||
"filename": filename,
|
||||
"audio_tracks": [],
|
||||
"subtitle_tracks": []
|
||||
}
|
||||
|
||||
# Process streams
|
||||
for stream in data.get("streams", []):
|
||||
codec_type = stream.get("codec_type")
|
||||
|
||||
if codec_type == "audio":
|
||||
track = {
|
||||
"index": stream.get("index"),
|
||||
"language": stream.get("tags", {}).get("language", "und"),
|
||||
"region": stream.get("tags", {}).get("region", "und"),
|
||||
"name": stream.get("tags", {}).get("title", ""),
|
||||
"codec": stream.get("codec_name", ""),
|
||||
"channels": stream.get("channels", 0),
|
||||
"flags": {
|
||||
"default": stream.get("disposition", {}).get("default", 0) == 1,
|
||||
"forced": stream.get("disposition", {}).get("forced", 0) == 1,
|
||||
"hearing_impaired": stream.get("disposition", {}).get("hearing_impaired", 0) == 1,
|
||||
"visual_impaired": stream.get("disposition", {}).get("visual_impaired", 0) == 1,
|
||||
"original": stream.get("disposition", {}).get("original", 0) == 1,
|
||||
"commentary": stream.get("disposition", {}).get("comment", 0) == 1
|
||||
}
|
||||
}
|
||||
metadata["audio_tracks"].append(track)
|
||||
|
||||
elif codec_type == "subtitle":
|
||||
track = {
|
||||
"index": stream.get("index"),
|
||||
"language": stream.get("tags", {}).get("language", "und"),
|
||||
"region": stream.get("tags", {}).get("region", "und"),
|
||||
"name": stream.get("tags", {}).get("title", ""),
|
||||
"codec": stream.get("codec_name", ""),
|
||||
"flags": {
|
||||
"default": stream.get("disposition", {}).get("default", 0) == 1,
|
||||
"forced": stream.get("disposition", {}).get("forced", 0) == 1,
|
||||
"hearing_impaired": stream.get("disposition", {}).get("hearing_impaired", 0) == 1,
|
||||
"visual_impaired": stream.get("disposition", {}).get("visual_impaired", 0) == 1,
|
||||
"original": stream.get("disposition", {}).get("original", 0) == 1,
|
||||
"commentary": stream.get("disposition", {}).get("comment", 0) == 1
|
||||
}
|
||||
}
|
||||
metadata["subtitle_tracks"].append(track)
|
||||
|
||||
return metadata
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Error processing {file_path}: {str(e)}")
|
||||
return None
|
||||
|
||||
def process_file(file_path, output_file=None):
|
||||
"""
|
||||
Process a single video file and write metadata to JSON.
|
||||
|
||||
Args:
|
||||
file_path (str): Path to the video file
|
||||
output_file (str, optional): Path to output JSON file
|
||||
"""
|
||||
if not os.path.isfile(file_path):
|
||||
print(f"❌ File not found: {file_path}")
|
||||
return False
|
||||
|
||||
if not file_path.lower().endswith(SUPPORTED_EXTENSIONS):
|
||||
print(f"❌ Unsupported file format: {file_path}")
|
||||
return False
|
||||
|
||||
print(f"📊 Extracting metadata from {os.path.basename(file_path)}")
|
||||
metadata = get_video_metadata(file_path)
|
||||
|
||||
if metadata:
|
||||
if not output_file:
|
||||
# Generate output filename based on input file
|
||||
base_name = os.path.splitext(file_path)[0]
|
||||
output_file = f"{base_name}_metadata.json"
|
||||
|
||||
# Write metadata to JSON file
|
||||
with open(output_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(metadata, f, indent=2, ensure_ascii=False)
|
||||
|
||||
print(f"✅ Metadata saved to {output_file}")
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def process_directory(directory_path, output_file=None):
|
||||
"""
|
||||
Process all video files in a directory and write metadata to JSON.
|
||||
|
||||
Args:
|
||||
directory_path (str): Path to the directory
|
||||
output_file (str, optional): Path to output JSON file
|
||||
"""
|
||||
if not os.path.isdir(directory_path):
|
||||
print(f"❌ Directory not found: {directory_path}")
|
||||
return False
|
||||
|
||||
all_metadata = {}
|
||||
file_count = 0
|
||||
|
||||
for root, _, files in os.walk(directory_path):
|
||||
for file in files:
|
||||
if file.lower().endswith(SUPPORTED_EXTENSIONS):
|
||||
file_path = os.path.join(root, file)
|
||||
print(f"📊 Extracting metadata from {file}")
|
||||
metadata = get_video_metadata(file_path)
|
||||
|
||||
if metadata:
|
||||
# Use relative path as key
|
||||
rel_path = os.path.relpath(file_path, directory_path)
|
||||
all_metadata[rel_path] = metadata
|
||||
file_count += 1
|
||||
|
||||
if file_count == 0:
|
||||
print(f"❌ No supported video files found in {directory_path}")
|
||||
return False
|
||||
|
||||
if not output_file:
|
||||
# Generate output filename based on directory name
|
||||
dir_name = os.path.basename(os.path.normpath(directory_path))
|
||||
output_file = f"{dir_name}_metadata.json"
|
||||
|
||||
# Write all metadata to a single JSON file
|
||||
with open(output_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(all_metadata, f, indent=2, ensure_ascii=False)
|
||||
|
||||
print(f"✅ Metadata for {file_count} files saved to {output_file}")
|
||||
return True
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Extract metadata from video files and save as JSON.")
|
||||
parser.add_argument("input", help="Path to input video file or directory")
|
||||
parser.add_argument("-o", "--output", help="Path to output JSON file")
|
||||
args = parser.parse_args()
|
||||
|
||||
input_path = args.input
|
||||
output_file = args.output
|
||||
|
||||
if os.path.isfile(input_path):
|
||||
process_file(input_path, output_file)
|
||||
elif os.path.isdir(input_path):
|
||||
process_directory(input_path, output_file)
|
||||
else:
|
||||
print(f"❌ Path not found: {input_path}")
|
||||
sys.exit(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
x
Reference in New Issue
Block a user