from subprocess import call, Popen, PIPE, check_output, DEVNULL from os import listdir, remove from os.path import splitext, exists, join from re import match from enum import Enum from datetime import timedelta from math import floor from argparse import ArgumentParser from functools import wraps from tempfile import mkdtemp from shutil import rmtree class Stream(Enum): AUDIO = 1 VIDEO = 2 class File(Enum): LIST = 1 LOOP = 2 FRACTION = 3 OUTPUT = 4 def call_verbose(before_message='', after_message='Done!'): def tag(f): @wraps(f) def wrapper(*args, **kwargs): print_opt(before_message, end='', flush=True) f(*args, **kwargs) print_opt(after_message) return wrapper return tag def print_opt(*args, **kwargs): if VERBOSE: print(*args, **kwargs) def run(url, files_dict, directory): # download streams and update FILE dict with extensions download_audio_stream(url, files_dict) download_video_stream(url, files_dict) read_extensions(files_dict, directory) # get stream lengths via ffprobe audioLen = get_length(files_dict[Stream.AUDIO]) videoLen = get_length(files_dict[Stream.VIDEO]) # decide which stream needs some looping longer = audioLen if audioLen > videoLen else videoLen shorter = audioLen if audioLen < videoLen else videoLen shorterFile = files_dict[Stream.AUDIO] if audioLen < videoLen else files_dict[Stream.VIDEO] files_dict[File.LOOP] += splitext(shorterFile)[1] files_dict[File.FRACTION] += splitext(shorterFile)[1] # calculate how many times to loop times = longer.total_seconds() / shorter.total_seconds() timesLoop_base = floor(times) timesLoop_fraction = times % 1 # write concat helper file for ffmpeg with open(files_dict[File.LIST], 'w') as f: for i in range(timesLoop_base): print("file '{}'".format(shorterFile), file=f) print("file '{}'".format(files_dict[File.FRACTION]), file=f) # loop & mux streams loop_shorter_stream(files_dict, shorter, shorterFile, timesLoop_fraction) mux_streams(files_dict) @call_verbose(before_message='Downloading audio stream... ') def download_audio_stream(url, file_dict): call(('youtube-dl', '--ignore-config', '--extract-audio', '--output', '{}.%(ext)s'.format(file_dict[Stream.AUDIO]), url), stdout=DEVNULL, stderr=DEVNULL) @call_verbose(before_message='Downloading video stream... ') def download_video_stream(url, file_dict): call(('youtube-dl', '--ignore-config', '--output', '{}.%(ext)s'.format(file_dict[Stream.VIDEO]), url), stdout=DEVNULL, stderr=DEVNULL) def read_extensions(file_dict, directory): for file in listdir(directory): for filename in file_dict: fullname = join(directory, file) if match('^{}.*'.format(file_dict[filename]), fullname): file_dict[filename] = fullname def get_length(file): data = get_duration(get_command_stderr(('ffprobe', file))).split(':') return timedelta(hours=float(data[0]), minutes=float(data[1]), seconds=float(data[2])) def get_command_stderr(command): process = Popen(command, stderr=PIPE, stdout=PIPE) out, err = process.communicate() return err def get_duration(ffprobe_output): durationPattern = r'.*Duration:\s(.+),\sstart.*' regex = match(durationPattern, str(ffprobe_output)) duration = regex.groups()[0] if regex else None if not duration: raise ValueError('Cannot process ffprobe output!') return duration @call_verbose(before_message='Looping shorter stream... ') def loop_shorter_stream(file_dict, shorter, shorter_file, loop_fraction): # prepare last fractional loop call(('ffmpeg', '-i', shorter_file, '-t', str(loop_fraction * shorter.total_seconds()), file_dict[File.FRACTION]), stdout=DEVNULL, stderr=DEVNULL) # concat them call(('ffmpeg', '-f', 'concat', '-safe', '0', '-i', file_dict[File.LIST], '-c', 'copy', file_dict[File.LOOP]), stdout=DEVNULL, stderr=DEVNULL) @call_verbose(before_message='Muxing streams... ') def mux_streams(file_dict): call(('ffmpeg', '-i', file_dict[File.LOOP], '-i', file_dict[Stream.AUDIO], '-map', '0:v:0', '-map', '1:a:0', '-c', 'copy', file_dict[File.OUTPUT]), stdout=DEVNULL, stderr=DEVNULL) def determine_output_filename(url, user_supplied, extension, files_dict): if user_supplied is None: files_dict[File.OUTPUT] = check_output(('youtube-dl', '--get-title', url)).decode('utf-8').strip() else: files_dict[File.OUTPUT] = user_supplied files_dict[File.OUTPUT] += extension def parse_cmd_arguments(): parser = ArgumentParser(description='Download player-looped videos with youtube-dl & ffmpeg.') parser.add_argument('-nv', '--nonverbose', action='store_true', help='Turn off non-critical messages to user') parser.add_argument('-o', '--output', default=None, help='Specify name of the output file (use -e for extension)') parser.add_argument('-e', '--extension', default='mp4', help='Set the container to use for the output') parser.add_argument('url', type=str, help='The URL of the site containing the video to download') args = parser.parse_args() args.extension = '.' + args.extension return args def yes_no_question(question, default): valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False} if default is None: prompt = " [y/n] " elif default == "yes": prompt = " [Y/n] " elif default == "no": prompt = " [y/N] " else: raise ValueError("Invalid default answer: {}!".format(default)) while True: print(question + prompt) choice = input().lower() if default is not None and choice == '': return valid[default] elif choice in valid: return valid[choice] else: print("Please respond with 'yes'(y) or 'no'(n)!") # tempfile.TemporaryDirectory replacement to provide backwards compatibility class temporary_directory: def __enter__(self): self.name = mkdtemp() return self.name def __exit__(self, exc_type, exc_val, exc_tb): rmtree(self.name) if __name__ == '__main__': args = parse_cmd_arguments() VERBOSE = False if args.nonverbose else True # create dict that contains files used FILES = {Stream.AUDIO: 'audio', Stream.VIDEO: 'video', File.LIST: 'list.txt', File.LOOP: 'loop', File.FRACTION: 'fraction', File.OUTPUT: 'output'+args.extension} OUTPUT_KEYS = [File.OUTPUT] URL = args.url determine_output_filename(URL, args.output, args.extension, FILES) # ask what to do if output exists if exists(FILES[File.OUTPUT]): answer = yes_no_question('A file named "{}" already exists! Overwrite?'.format(FILES[File.OUTPUT]), default='no') if not answer: print_opt('Exiting!') exit() else: remove(FILES[File.OUTPUT]) # create temporary directory to work in with temporary_directory() as dir: # update temporary file locations in FILES dict for key in {key: FILES[key] for key in FILES if key not in OUTPUT_KEYS}: FILES[key] = join(dir, FILES[key]) run(URL, FILES, dir)