coub-dl/coub-dl.py

206 lines
6.4 KiB
Python

from subprocess import call, Popen, PIPE, check_output, DEVNULL
from os import listdir, remove
from os.path import splitext, exists
from re import match
from enum import Enum
from datetime import timedelta
from math import floor
from argparse import ArgumentParser
from functools import wraps, partial
from atexit import register
class Stream(Enum):
AUDIO = 1
VIDEO = 2
class File(Enum):
LIST = 1
LOOP = 2
FRACTION = 3
OUTPUT = 4
def call_verbose(before_message='', after_message='Done!'):
def tag(f):
@wraps(f)
def wrapper(*args, **kwargs):
print_opt(before_message, end='', flush=True)
f(*args, **kwargs)
print_opt(after_message)
return wrapper
return tag
def print_opt(*args, **kwargs):
if VERBOSE:
print(*args, **kwargs)
def get_command_stderr(command):
process = Popen(command, stderr=PIPE, stdout=PIPE)
out, err = process.communicate()
return err
def get_duration(ffprobe_output):
durationPattern = r'.*Duration:\s(.+),\sstart.*'
regex = match(durationPattern, str(ffprobe_output))
duration = regex.groups()[0] if regex else None
if not duration:
raise ValueError('Cannot process ffprobe output!')
return duration
@call_verbose(before_message='Downloading audio stream... ')
def download_audio_stream(url, file_dict):
call(('youtube-dl', '--ignore-config',
'--extract-audio',
'--output', '{}.%(ext)s'.format(file_dict[Stream.AUDIO]),
url),
stdout=DEVNULL, stderr=DEVNULL)
@call_verbose(before_message='Downloading video stream... ')
def download_video_stream(url, file_dict):
call(('youtube-dl', '--ignore-config',
'--output', '{}.%(ext)s'.format(file_dict[Stream.VIDEO]),
url),
stdout=DEVNULL, stderr=DEVNULL)
def read_extensions(file_dict):
for file in listdir():
for filename in file_dict:
if match('^{}.*'.format(file_dict[filename]), file):
file_dict[filename] = file
@call_verbose(before_message='Looping shorter stream... ')
def loop_shorter_stream(file_dict, shorter, shorter_file, loop_fraction):
# prepare last fractional loop
call(('ffmpeg', '-i', shorter_file, '-t', str(loop_fraction * shorter.total_seconds()), file_dict[File.FRACTION]),
stdout=DEVNULL, stderr=DEVNULL)
# concat them
call(('ffmpeg', '-f', 'concat', '-i', file_dict[File.LIST],
'-c', 'copy', file_dict[File.LOOP]),
stdout=DEVNULL, stderr=DEVNULL)
@call_verbose(before_message='Muxing streams... ')
def mux_streams(file_dict):
call(('ffmpeg', '-i', file_dict[File.LOOP],
'-i', file_dict[Stream.AUDIO],
'-map', '0:v:0', '-map', '1:a:0',
'-c', 'copy', file_dict[File.OUTPUT]),
stdout=DEVNULL, stderr=DEVNULL)
def cleanup(file_dict, outputs):
for key in file_dict:
if key not in outputs:
try:
remove(file_dict[key])
except FileNotFoundError:
pass
def get_length(file):
data = get_duration(get_command_stderr(('ffprobe', file))).split(':')
return timedelta(hours=float(data[0]), minutes=float(data[1]), seconds=float(data[2]))
def yes_no_question(question, default):
valid = {"yes": True, "y": True, "ye": True,
"no": False, "n": False}
if default is None:
prompt = " [y/n] "
elif default == "yes":
prompt = " [Y/n] "
elif default == "no":
prompt = " [y/N] "
else:
raise ValueError("Invalid default answer: {}!".format(default))
while True:
print(question + prompt)
choice = input().lower()
if default is not None and choice == '':
return valid[default]
elif choice in valid:
return valid[choice]
else:
print("Please respond with 'yes'(y) or 'no'(n)!")
# parse arguments
parser = ArgumentParser(description='Download player-looped videos with youtube-dl & ffmpeg.')
parser.add_argument('-nv', '--nonverbose', action='store_true', help='Turn off non-critical messages to user.')
parser.add_argument('-o', '--output', default=None, help='Specify name of the output file (use -e for extension).')
parser.add_argument('-e', '--extension', default='mp4', help='Set the container to use for the output.')
parser.add_argument('url', type=str, help='The URL of the site containing the video to download.')
args = parser.parse_args()
args.extension = '.' + args.extension
VERBOSE = False if args.nonverbose else True
FILES = {Stream.AUDIO: 'audio', Stream.VIDEO: 'video',
File.LIST: 'list.txt', File.LOOP: 'loop', File.FRACTION: 'fraction',
File.OUTPUT: 'output'+args.extension}
OUTPUT_KEYS = [File.OUTPUT]
URL = args.url
# clean up on exit
register(partial(cleanup, FILES, OUTPUT_KEYS))
# fetch video title if no filename was specified
if args.output is None:
FILES[File.OUTPUT] = check_output(('youtube-dl', '--get-title', args.url)).decode('utf-8').strip()
else:
FILES[File.OUTPUT] = args.output
FILES[File.OUTPUT] += args.extension
# ask what to do if output exists
if exists(FILES[File.OUTPUT]):
answer = yes_no_question('A file named "{}" already exists! Overwrite?'.format(FILES[File.OUTPUT]), default='no')
if not answer:
print_opt('Exiting!')
exit()
else:
remove(FILES[File.OUTPUT])
# download streams and update FILE dict with extensions
download_audio_stream(URL, FILES)
download_video_stream(URL, FILES)
read_extensions(FILES)
# get stream lengths via ffprobe
audioLen = get_length(FILES[Stream.AUDIO])
videoLen = get_length(FILES[Stream.VIDEO])
# decide which stream needs some looping
longer = audioLen if audioLen > videoLen else videoLen
shorter = audioLen if audioLen < videoLen else videoLen
shorterFile = FILES[Stream.AUDIO] if audioLen < videoLen else FILES[Stream.VIDEO]
FILES[File.LOOP] += splitext(shorterFile)[1]
FILES[File.FRACTION] += splitext(shorterFile)[1]
times = longer.total_seconds() / shorter.total_seconds()
timesLoop_base = floor(times)
timesLoop_fraction = times % 1
# write concat helper file for ffmpeg
with open(FILES[File.LIST], 'w') as f:
for i in range(timesLoop_base):
print("file '{}'".format(shorterFile), file=f)
print("file '{}'".format(FILES[File.FRACTION]), file=f)
loop_shorter_stream(FILES, shorter, shorterFile, timesLoop_fraction)
mux_streams(FILES)