# Copyright © 2017 Kristof Toth # This program is free software. It comes without any warranty, to the extent # permitted by applicable law. You can redistribute it and/or modify it under the # terms of the Do What The Fuck You Want To Public License, Version 2, # as published by Sam Hocevar. See http://www.wtfpl.net/ for more details. from subprocess import call, Popen, PIPE, check_output, DEVNULL, CalledProcessError from os import listdir, remove from os.path import splitext, exists, join from re import match from enum import Enum from datetime import timedelta from math import floor from argparse import ArgumentParser from functools import wraps from tempfile import mkdtemp from shutil import rmtree from signal import signal, SIGINT from sys import exit class Stream(Enum): AUDIO = 1 VIDEO = 2 class File(Enum): LIST = 1 LOOP = 2 FRACTION = 3 OUTPUT = 4 def call_verbose(before_message='', after_message='Done!'): def tag(f): @wraps(f) def wrapper(*args, **kwargs): print_opt(before_message, end='', flush=True) f(*args, **kwargs) print_opt(after_message) return wrapper return tag def print_opt(*args, **kwargs): if VERBOSE: print(*args, **kwargs) def get_output(*args, **kwargs): return check_output(*args, **kwargs).decode().rstrip('\n') def run(url, files_dict, directory): # download streams and update FILE dict with extensions download_audio_stream(url, files_dict) download_video_stream(url, files_dict) read_extensions(files_dict, directory) # get stream lengths via ffprobe audioLen = get_length(files_dict[Stream.AUDIO]) videoLen = get_length(files_dict[Stream.VIDEO]) # decide which stream needs some looping longer = audioLen if audioLen > videoLen else videoLen shorter = audioLen if audioLen < videoLen else videoLen shorterFile = files_dict[Stream.AUDIO] if audioLen < videoLen else files_dict[Stream.VIDEO] files_dict[File.LOOP] += splitext(shorterFile)[1] files_dict[File.FRACTION] += splitext(shorterFile)[1] # calculate how many times to loop times = longer.total_seconds() / shorter.total_seconds() timesLoop_base = floor(times) timesLoop_fraction = times % 1 # write concat helper file for ffmpeg with open(files_dict[File.LIST], 'w') as f: for i in range(timesLoop_base): print("file '{}'".format(shorterFile), file=f) print("file '{}'".format(files_dict[File.FRACTION]), file=f) # loop & mux streams loop_shorter_stream(files_dict, shorter, shorterFile, timesLoop_fraction) mux_streams(files_dict) @call_verbose(before_message='Downloading audio stream... ') def download_audio_stream(url, file_dict): call(('youtube-dl', '--ignore-config', '--extract-audio', '--output', '{}.%(ext)s'.format(file_dict[Stream.AUDIO]), url), stdout=DEVNULL, stderr=DEVNULL) @call_verbose(before_message='Downloading video stream... ') def download_video_stream(url, file_dict): url = get_output(('youtube-dl', '--get-url', url)) curl = Popen(('curl', '--silent', '--write-out', '--location', url), stdout=PIPE) grep = Popen(('grep', '--only-matching', '"http.*"'), stdin=curl.stdout, stdout=PIPE) url = get_output(('sed', 's/muted_//g'), stdin=grep.stdout).strip('"') call(('youtube-dl', '--ignore-config', '--output', '{}.%(ext)s'.format(file_dict[Stream.VIDEO]), url), stdout=DEVNULL, stderr=DEVNULL) def read_extensions(file_dict, directory): for file in listdir(directory): for filename in file_dict: fullname = join(directory, file) if match('^{}.*'.format(file_dict[filename]), fullname): file_dict[filename] = fullname def get_length(file): data = get_duration(get_command_stderr(('ffprobe', file))).split(':') return timedelta(hours=float(data[0]), minutes=float(data[1]), seconds=float(data[2])) def get_command_stderr(command): process = Popen(command, stderr=PIPE, stdout=PIPE) out, err = process.communicate() return err def get_duration(ffprobe_output): durationPattern = r'.*Duration:\s(.+),\sstart.*' regex = match(durationPattern, str(ffprobe_output)) duration = regex.groups()[0] if regex else None if not duration: raise ValueError('Cannot process ffprobe output!') return duration @call_verbose(before_message='Looping shorter stream... ') def loop_shorter_stream(file_dict, shorter, shorter_file, loop_fraction): # prepare last fractional loop call(('ffmpeg', '-i', shorter_file, '-t', str(loop_fraction * shorter.total_seconds()), file_dict[File.FRACTION]), stdout=DEVNULL, stderr=DEVNULL) # concat them call(('ffmpeg', '-f', 'concat', '-safe', '0', '-i', file_dict[File.LIST], '-c', 'copy', file_dict[File.LOOP]), stdout=DEVNULL, stderr=DEVNULL) @call_verbose(before_message='Muxing streams... ') def mux_streams(file_dict): call(('ffmpeg', '-i', file_dict[File.LOOP], '-i', file_dict[Stream.AUDIO], '-map', '0:v:0', '-map', '1:a:0', '-c', 'copy', file_dict[File.OUTPUT]), stdout=DEVNULL, stderr=DEVNULL) @call_verbose(before_message='Checking your system for dependencies... ', after_message='Found all!') def check_for_dependencies(): check_for = (('youtube-dl', '--version'), ('ffmpeg', '-version'), ('curl', '--version')) error_str = '\nMissing dependencies: {}' missing = [] for command in check_for: try: check_output(command) except (CalledProcessError, FileNotFoundError): missing.append(command[0]) if missing: exit(error_str.format(', '.join(missing))) def determine_output_filename(url, user_supplied, extension, files_dict): if user_supplied is None: files_dict[File.OUTPUT] = get_output(('youtube-dl', '--get-title', url)) else: files_dict[File.OUTPUT] = user_supplied files_dict[File.OUTPUT] += extension def build_default_files_dict(): return {Stream.AUDIO: 'audio', Stream.VIDEO: 'video', File.LIST: 'list.txt', File.LOOP: 'loop', File.FRACTION: 'fraction', File.OUTPUT: ''}, [File.OUTPUT] def parse_cmd_arguments(): parser = ArgumentParser(description='Download player-looped videos with youtube-dl & ffmpeg.') parser.add_argument('-nv', '--nonverbose', action='store_true', help='Turn off non-critical messages to user') parser.add_argument('-o', '--output', default=None, help='Specify name of the output file (use -e for extension)') parser.add_argument('-e', '--extension', default='mp4', help='Set the container to use for the output') parser.add_argument('url', type=str, help='The URL of the site containing the video to download') args = parser.parse_args() args.extension = '.' + args.extension return args def yes_no_question(question, default): valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False} if default is None: prompt = " [y/n] " elif default == "yes": prompt = " [Y/n] " elif default == "no": prompt = " [y/N] " else: raise ValueError("Invalid default answer: {}!".format(default)) while True: print(question + prompt) choice = input().lower() if default is not None and choice == '': return valid[default] elif choice in valid: return valid[choice] else: print("Please respond with 'yes'(y) or 'no'(n)!") # tempfile.TemporaryDirectory replacement to provide backwards compatibility class temporary_directory: def __enter__(self): self.name = mkdtemp() return self.name def __exit__(self, exc_type, exc_val, exc_tb): rmtree(self.name) @call_verbose(before_message='\nExiting!\n', after_message='') def sigint_handler(signal, frame): exit() if __name__ == '__main__': signal(SIGINT, sigint_handler) args = parse_cmd_arguments() VERBOSE = False if args.nonverbose else True check_for_dependencies() # create dict that contains files used FILES, OUTPUT_KEYS = build_default_files_dict() URL = args.url determine_output_filename(URL, args.output, args.extension, FILES) # ask what to do if output exists if exists(FILES[File.OUTPUT]): answer = yes_no_question('A file named "{}" already exists! Overwrite?'.format(FILES[File.OUTPUT]), default='no') if not answer: print_opt('Exiting!') exit() else: remove(FILES[File.OUTPUT]) # create temporary directory to work in with temporary_directory() as dir: # update temporary file locations in FILES dict for key in {key: FILES[key] for key in FILES if key not in OUTPUT_KEYS}: FILES[key] = join(dir, FILES[key]) run(URL, FILES, dir)