from subprocess import call, Popen, PIPE, check_output, DEVNULL from os import listdir, remove from os.path import splitext, exists from re import match from enum import Enum from datetime import timedelta from math import floor from argparse import ArgumentParser from functools import wraps # parse arguments parser = ArgumentParser(description='Download player-looped videos with youtube-dl & ffmpeg.') parser.add_argument('-nv', '--nonverbose', action='store_true', help='Turn off non-critical messages to user.') parser.add_argument('-o', '--output', default=None, help='Specify name of the output file (use -e for extension).') parser.add_argument('-e', '--extension', default='mp4', help='Set the container to use for the output.') parser.add_argument('url', type=str, help='The URL of the site containing the video to download.') args = parser.parse_args() args.extension = '.' + args.extension VERBOSE = False if args.nonverbose else True class Stream(Enum): AUDIO = 1 VIDEO = 2 class File(Enum): LIST = 1 LOOP = 2 FRACTION = 3 OUTPUT = 4 def call_verbose(before_message='', after_message='Done!'): def tag(f): @wraps(f) def wrapper(*args, **kwargs): print_opt(before_message, end='', flush=True) f(*args, **kwargs) print_opt(after_message) return wrapper return tag def print_opt(*args, **kwargs): if VERBOSE: print(*args, **kwargs) def getCmdStdErr(command): process = Popen(command, stderr=PIPE, stdout=PIPE) out, err = process.communicate() return err def getDuration(ffprobe_output): durationPattern = r'.*Duration:\s(.+),\sstart.*' regex = match(durationPattern, str(ffprobe_output)) duration = regex.groups()[0] if regex else None if not duration: raise ValueError('Cannot process ffprobe output!') return duration @call_verbose(before_message='Downloading audio stream... ') def download_audio_stream(): call(('youtube-dl', '--ignore-config', '--extract-audio', '--output', '{}.%(ext)s'.format(FILES[Stream.AUDIO]), URL), stdout=DEVNULL, stderr=DEVNULL) @call_verbose(before_message='Downloading video stream... ') def download_video_stream(): call(('youtube-dl', '--ignore-config', '--output', '{}.%(ext)s'.format(FILES[Stream.VIDEO]), URL), stdout=DEVNULL, stderr=DEVNULL) def readExtensions(): for file in listdir(): for filename in FILES: if match('^{}.*'.format(FILES[filename]), file): FILES[filename] = file def yes_no_question(question, default): valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False} if default is None: prompt = " [y/n] " elif default == "yes": prompt = " [Y/n] " elif default == "no": prompt = " [y/N] " else: raise ValueError("Invalid default answer: {}!".format(default)) while True: print(question + prompt) choice = input().lower() if default is not None and choice == '': return valid[default] elif choice in valid: return valid[choice] else: print("Please respond with 'yes'(y) or 'no'(n)!") FILES = {Stream.AUDIO: 'audio', Stream.VIDEO: 'video', File.LIST: 'list.txt', File.LOOP: 'loop', File.FRACTION: 'fraction', File.OUTPUT: 'output'+args.extension} OUTPUT_KEYS = [File.OUTPUT] URL = args.url # fetch video title if no filename was specified if args.output is None: FILES[File.OUTPUT] = check_output(('youtube-dl', '--get-title', args.url)).decode('utf-8').strip() else: FILES[File.OUTPUT] = args.output FILES[File.OUTPUT] += args.extension # ask what to do if output exists if exists(FILES[File.OUTPUT]): answer = yes_no_question('A file named "{}" already exists! Overwrite?'.format(FILES[File.OUTPUT]), default='no') if not answer: print_opt('Exiting!') exit() else: remove(FILES[File.OUTPUT]) # download streams and update FILE dict with extensions download_audio_stream() download_video_stream() readExtensions() # get stream lengths via ffprobe audioData= getDuration(getCmdStdErr(('ffprobe', FILES[Stream.AUDIO]))).split(':') videoData = getDuration(getCmdStdErr(('ffprobe', FILES[Stream.VIDEO]))).split(':') audioLen = timedelta(hours=float(audioData[0]), minutes=float(audioData[1]), seconds=float(audioData[2])) videoLen = timedelta(hours=float(videoData[0]), minutes=float(videoData[1]), seconds=float(videoData[2])) # decide which stream needs some looping longer = audioLen if audioLen > videoLen else videoLen shorter = audioLen if audioLen < videoLen else videoLen shorterFile = FILES[Stream.AUDIO] if audioLen < videoLen else FILES[Stream.VIDEO] FILES[File.LOOP] += splitext(shorterFile)[1] FILES[File.FRACTION] += splitext(shorterFile)[1] times = longer.total_seconds() / shorter.total_seconds() timesLoop_base = floor(times) timesLoop_fraction = times % 1 # write concat helper file for ffmpeg with open(FILES[File.LIST], 'w') as f: for i in range(timesLoop_base): print("file '{}'".format(shorterFile), file=f) print("file '{}'".format(FILES[File.FRACTION]), file=f) # Cut last loop accurrate call(('ffmpeg', '-i', shorterFile, '-t', str(timesLoop_fraction*shorter.total_seconds()), FILES[File.FRACTION]), stdout=DEVNULL, stderr=DEVNULL) # loop shorter stream print_opt('Looping shorter stream... ', end='', flush=True) call(('ffmpeg', '-f', 'concat', '-i', FILES[File.LIST], '-c', 'copy', FILES[File.LOOP]), stdout=DEVNULL, stderr=DEVNULL) print_opt('Done!') # mux with audio print_opt('Muxing streams... ', end='', flush=True) call(('ffmpeg', '-i', FILES[File.LOOP], '-i', FILES[Stream.AUDIO], '-map', '0:v:0', '-map', '1:a:0', '-c', 'copy', FILES[File.OUTPUT]), stdout=DEVNULL, stderr=DEVNULL) print_opt('Done!') # cleanup for key in FILES: if key not in OUTPUT_KEYS: remove(FILES[key])