#!/usr/bin/env python3 # Copyright © 2017 Kristof Toth # This program is free software. It comes without any warranty, to the extent # permitted by applicable law. You can redistribute it and/or modify it under the # terms of the Do What The Fuck You Want To Public License, Version 2, # as published by Sam Hocevar. See http://www.wtfpl.net/ for more details. from subprocess import call, Popen, PIPE, DEVNULL from os import listdir, remove from os.path import splitext, exists, join from re import match from enum import Enum from datetime import timedelta from math import floor from argparse import ArgumentParser from signal import signal, SIGINT from sys import exit from copy import deepcopy import utility from utility import call_verbose, print_opt, get_output, temporary_directory, yes_no_question, check_dependencies class Stream(Enum): AUDIO = 1 VIDEO = 2 class File(Enum): LIST = 1 LOOP = 2 FRACTION = 3 OUTPUT = 4 class DownloadFailure(RuntimeError): pass class coub_dl: default_files = {Stream.AUDIO: 'audio', Stream.VIDEO: 'video', File.LIST: 'list.txt', File.LOOP: 'loop', File.FRACTION: 'fraction', File.OUTPUT: 'out'} output_files = {File.OUTPUT} def __init__(self, url, files_dict, directory): self._url = url self._files_dict = files_dict self._directory = directory def __call__(self): # download streams and update FILE dict with extensions self.download_audio_stream() self.download_video_stream() self.read_extensions() self.check_downloads() self.fix_video_stream() # get stream lengths via ffprobe audioLen = coub_dl.get_length(self._files_dict[Stream.AUDIO]) videoLen = coub_dl.get_length(self._files_dict[Stream.VIDEO]) # decide which stream needs some looping longer = audioLen if audioLen > videoLen else videoLen shorter = audioLen if audioLen < videoLen else videoLen shorterFile = self._files_dict[Stream.AUDIO] if audioLen < videoLen else self._files_dict[Stream.VIDEO] self._files_dict[File.LOOP] += splitext(shorterFile)[1] self._files_dict[File.FRACTION] += splitext(shorterFile)[1] # calculate how many times to loop times = longer.total_seconds() / shorter.total_seconds() timesLoop_base = int(floor(times)) timesLoop_fraction = times % 1 # write concat helper file for ffmpeg with open(self._files_dict[File.LIST], 'w') as f: for i in range(timesLoop_base): print("file '{}'".format(shorterFile), file=f) print("file '{}'".format(self._files_dict[File.FRACTION]), file=f) # loop & mux streams self.loop_shorter_stream(shorter, shorterFile, timesLoop_fraction) self.mux_streams() def download_audio_stream(self): call(('youtube-dl', '--ignore-config', '--extract-audio', '--output', '{}.%(ext)s'.format(self._files_dict[Stream.AUDIO]), self._url), stdout=DEVNULL, stderr=DEVNULL) def download_video_stream(self): call(('youtube-dl', '--ignore-config', '--output', '{}.%(ext)s'.format(self._files_dict[Stream.VIDEO]), self._url), stdout=DEVNULL, stderr=DEVNULL) def read_extensions(self): for file in listdir(self._directory): for filename in self._files_dict: fullname = join(self._directory, file) if match('^{}\..+$'.format(self._files_dict[filename]), fullname): self._files_dict[filename] = fullname def check_downloads(self): check = {Stream.VIDEO, Stream.AUDIO} if not all({exists(self._files_dict[item]) for item in check}): raise DownloadFailure() def fix_video_stream(self): """ magic fix for videos served by coub. see https://github.com/rg3/youtube-dl/issues/13754 """ with open(self._files_dict[Stream.VIDEO], 'r+b') as f: f.seek(0) f.write(bytes(2)) def loop_shorter_stream(self, shorter, shorter_file, loop_fraction): # prepare last fractional loop call(('ffmpeg', '-i', shorter_file, '-t', str(loop_fraction * shorter.total_seconds()), self._files_dict[File.FRACTION]), stdout=DEVNULL, stderr=DEVNULL) # concat them call(('ffmpeg', '-f', 'concat', '-safe', '0', '-i', self._files_dict[File.LIST], '-c', 'copy', self._files_dict[File.LOOP]), stdout=DEVNULL, stderr=DEVNULL) def mux_streams(self): call(('ffmpeg', '-i', self._files_dict[File.LOOP], '-i', self._files_dict[Stream.AUDIO], '-map', '0:v:0', '-map', '1:a:0', '-c', 'copy', self._files_dict[File.OUTPUT]), stdout=DEVNULL, stderr=DEVNULL) @staticmethod def get_length(file): data = coub_dl.get_duration(coub_dl.get_command_stderr(('ffprobe', file))).split(':') return timedelta(hours=float(data[0]), minutes=float(data[1]), seconds=float(data[2])) @staticmethod def get_command_stderr(command): process = Popen(command, stderr=PIPE, stdout=PIPE) out, err = process.communicate() return err @staticmethod def get_duration(ffprobe_output): durationPattern = r'.*Duration:\s(.+),\sstart.*' regex = match(durationPattern, str(ffprobe_output)) duration = regex.groups()[0] if regex else None if not duration: raise ValueError('Cannot process ffprobe output!') return duration @staticmethod def get_title(url): return get_output(('youtube-dl', '--get-title', url)) def run(URL, output, extension): # create dict that contains files used FILES = deepcopy(coub_dl.default_files) determine_output_filename(URL, output, extension, FILES) # ask what to do if output exists if exists(FILES[File.OUTPUT]): answer = yes_no_question('A file named "{}" already exists! Overwrite?'.format(FILES[File.OUTPUT]), default='no') if not answer: print_opt('Exiting!') exit() else: remove(FILES[File.OUTPUT]) # create temporary directory to work in with temporary_directory() as dir: # update temporary file locations in FILES dict for key in {key: FILES[key] for key in FILES if key not in coub_dl.output_files}: FILES[key] = join(dir, FILES[key]) coub_dl(URL, FILES, dir)() def determine_output_filename(url, user_supplied, extension, files_dict): if user_supplied is None: files_dict[File.OUTPUT] = coub_dl.get_title(url) else: files_dict[File.OUTPUT] = user_supplied files_dict[File.OUTPUT] += extension def parse_cmd_arguments(): parser = ArgumentParser(description='Download player-looped videos with youtube-dl & ffmpeg.') parser.add_argument('-nv', '--nonverbose', action='store_true', help='Turn off non-critical messages to user') parser.add_argument('-o', '--output', default=None, help='Specify name of the output file (use -e for extension)') parser.add_argument('-e', '--extension', default='mp4', help='Set the container to use for the output') parser.add_argument('URLs', type=str, nargs='+', help='The URLs of the sites containing the videos to download') args = parser.parse_args() args.extension = '.' + args.extension return args def decorate_coubdl_uimsgs(*args): for item in args: setattr(coub_dl, item[0], call_verbose(**item[1])(getattr(coub_dl, item[0]))) if __name__ == '__main__': signal(SIGINT, lambda a, b: exit('\nExiting!')) args = parse_cmd_arguments() utility.VERBOSE = False if args.nonverbose else True decorate_coubdl_uimsgs(('download_audio_stream', {'before_message': 'Downloading audio stream... '}), ('download_video_stream', {'before_message': 'Downloading video stream... '}), ('loop_shorter_stream', {'before_message': 'Looping shorter stream... '}), ('mux_streams', {'before_message': 'Muxing streams... '})) check_dependencies((('youtube-dl', '--version'), ('ffmpeg', '-version'))) for url in set(args.URLs): print_opt('\nCreating video from {}'.format(url)) try: run(url, args.output, args.extension) except DownloadFailure: exit('Failed to download streams! This usually happens when Coub changes something.')