from subprocess import call, Popen, PIPE, check_output, DEVNULL from os import listdir, remove from os.path import splitext, exists from re import match from enum import Enum from datetime import timedelta from math import floor from argparse import ArgumentParser from functools import wraps, partial from atexit import register class Stream(Enum): AUDIO = 1 VIDEO = 2 class File(Enum): LIST = 1 LOOP = 2 FRACTION = 3 OUTPUT = 4 def call_verbose(before_message='', after_message='Done!'): def tag(f): @wraps(f) def wrapper(*args, **kwargs): print_opt(before_message, end='', flush=True) f(*args, **kwargs) print_opt(after_message) return wrapper return tag def print_opt(*args, **kwargs): if VERBOSE: print(*args, **kwargs) def get_command_stderr(command): process = Popen(command, stderr=PIPE, stdout=PIPE) out, err = process.communicate() return err def get_duration(ffprobe_output): durationPattern = r'.*Duration:\s(.+),\sstart.*' regex = match(durationPattern, str(ffprobe_output)) duration = regex.groups()[0] if regex else None if not duration: raise ValueError('Cannot process ffprobe output!') return duration @call_verbose(before_message='Downloading audio stream... ') def download_audio_stream(url, file_dict): call(('youtube-dl', '--ignore-config', '--extract-audio', '--output', '{}.%(ext)s'.format(file_dict[Stream.AUDIO]), url), stdout=DEVNULL, stderr=DEVNULL) @call_verbose(before_message='Downloading video stream... ') def download_video_stream(url, file_dict): call(('youtube-dl', '--ignore-config', '--output', '{}.%(ext)s'.format(file_dict[Stream.VIDEO]), url), stdout=DEVNULL, stderr=DEVNULL) def read_extensions(file_dict): for file in listdir(): for filename in file_dict: if match('^{}.*'.format(file_dict[filename]), file): file_dict[filename] = file @call_verbose(before_message='Looping shorter stream... ') def loop_shorter_stream(file_dict, shorter, shorter_file, loop_fraction): # prepare last fractional loop call(('ffmpeg', '-i', shorter_file, '-t', str(loop_fraction * shorter.total_seconds()), file_dict[File.FRACTION]), stdout=DEVNULL, stderr=DEVNULL) # concat them call(('ffmpeg', '-f', 'concat', '-i', file_dict[File.LIST], '-c', 'copy', file_dict[File.LOOP]), stdout=DEVNULL, stderr=DEVNULL) @call_verbose(before_message='Muxing streams... ') def mux_streams(file_dict): call(('ffmpeg', '-i', file_dict[File.LOOP], '-i', file_dict[Stream.AUDIO], '-map', '0:v:0', '-map', '1:a:0', '-c', 'copy', file_dict[File.OUTPUT]), stdout=DEVNULL, stderr=DEVNULL) def cleanup(file_dict, outputs): for key in file_dict: if key not in outputs: try: remove(file_dict[key]) except FileNotFoundError: pass def yes_no_question(question, default): valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False} if default is None: prompt = " [y/n] " elif default == "yes": prompt = " [Y/n] " elif default == "no": prompt = " [y/N] " else: raise ValueError("Invalid default answer: {}!".format(default)) while True: print(question + prompt) choice = input().lower() if default is not None and choice == '': return valid[default] elif choice in valid: return valid[choice] else: print("Please respond with 'yes'(y) or 'no'(n)!") # parse arguments parser = ArgumentParser(description='Download player-looped videos with youtube-dl & ffmpeg.') parser.add_argument('-nv', '--nonverbose', action='store_true', help='Turn off non-critical messages to user.') parser.add_argument('-o', '--output', default=None, help='Specify name of the output file (use -e for extension).') parser.add_argument('-e', '--extension', default='mp4', help='Set the container to use for the output.') parser.add_argument('url', type=str, help='The URL of the site containing the video to download.') args = parser.parse_args() args.extension = '.' + args.extension VERBOSE = False if args.nonverbose else True FILES = {Stream.AUDIO: 'audio', Stream.VIDEO: 'video', File.LIST: 'list.txt', File.LOOP: 'loop', File.FRACTION: 'fraction', File.OUTPUT: 'output'+args.extension} OUTPUT_KEYS = [File.OUTPUT] URL = args.url # clean up on exit register(partial(cleanup, FILES, OUTPUT_KEYS)) # fetch video title if no filename was specified if args.output is None: FILES[File.OUTPUT] = check_output(('youtube-dl', '--get-title', args.url)).decode('utf-8').strip() else: FILES[File.OUTPUT] = args.output FILES[File.OUTPUT] += args.extension # ask what to do if output exists if exists(FILES[File.OUTPUT]): answer = yes_no_question('A file named "{}" already exists! Overwrite?'.format(FILES[File.OUTPUT]), default='no') if not answer: print_opt('Exiting!') exit() else: remove(FILES[File.OUTPUT]) # download streams and update FILE dict with extensions download_audio_stream(URL, FILES) download_video_stream(URL, FILES) read_extensions(FILES) # get stream lengths via ffprobe audioData= get_duration(get_command_stderr(('ffprobe', FILES[Stream.AUDIO]))).split(':') videoData = get_duration(get_command_stderr(('ffprobe', FILES[Stream.VIDEO]))).split(':') audioLen = timedelta(hours=float(audioData[0]), minutes=float(audioData[1]), seconds=float(audioData[2])) videoLen = timedelta(hours=float(videoData[0]), minutes=float(videoData[1]), seconds=float(videoData[2])) # decide which stream needs some looping longer = audioLen if audioLen > videoLen else videoLen shorter = audioLen if audioLen < videoLen else videoLen shorterFile = FILES[Stream.AUDIO] if audioLen < videoLen else FILES[Stream.VIDEO] FILES[File.LOOP] += splitext(shorterFile)[1] FILES[File.FRACTION] += splitext(shorterFile)[1] times = longer.total_seconds() / shorter.total_seconds() timesLoop_base = floor(times) timesLoop_fraction = times % 1 # write concat helper file for ffmpeg with open(FILES[File.LIST], 'w') as f: for i in range(timesLoop_base): print("file '{}'".format(shorterFile), file=f) print("file '{}'".format(FILES[File.FRACTION]), file=f) loop_shorter_stream(FILES, shorter, shorterFile, timesLoop_fraction) mux_streams(FILES)