from subprocess import call, Popen, PIPE, check_output, DEVNULL from os import listdir, remove from os.path import splitext, exists from re import match from sys import argv from enum import Enum from datetime import timedelta from math import ceil class Stream(Enum): AUDIO = 1 VIDEO = 2 class File(Enum): LIST = 1 LOOP = 2 OUTPUT = 3 VERBOSE = True def print_opt(*args, **kwargs): if VERBOSE: print(*args, **kwargs) def getCmdStdErr(command): process = Popen(command, stderr=PIPE, stdout=PIPE) out, err = process.communicate() return err def getDuration(ffprobe_output): durationPattern = r'.*Duration:\s(.+),\sstart.*' regex = match(durationPattern, str(ffprobe_output)) duration = regex.groups()[0] if regex else None if not duration: raise ValueError('Cannot process ffprobe output!') return duration def downloadStreams(): print_opt('Downloading audio stream... ', end='', flush=True) call(('/usr/bin/env', 'youtube-dl', '--extract-audio', '--output', '{}.%(ext)s'.format(FILES[Stream.AUDIO]), URL), stdout=DEVNULL, stderr=DEVNULL) print_opt('Done!') print_opt('Downloading video stream... ', end='', flush=True) call(('/usr/bin/env', 'youtube-dl', '--output', '{}.%(ext)s'.format(FILES[Stream.VIDEO]), URL), stdout=DEVNULL, stderr=DEVNULL) print_opt('Done!') def readExtensions(): for file in listdir(): for filename in FILES: if match('^{}.*'.format(FILES[filename]), file): FILES[filename] = file def yes_no_question(question, default): valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False} if default is None: prompt = " [y/n] " elif default == "yes": prompt = " [Y/n] " elif default == "no": prompt = " [y/N] " else: raise ValueError("Invalid default answer: {}!".format(default)) while True: print(question + prompt) choice = input().lower() if default is not None and choice == '': return valid[default] elif choice in valid: return valid[choice] else: print("Please respond with 'yes'(y) or 'no'(n)!") FILES = {Stream.AUDIO: 'audio', Stream.VIDEO: 'video', File.LIST: 'list.txt', File.LOOP: 'loop', File.OUTPUT: 'output.mp4'} OUTPUT_KEYS = [File.OUTPUT] URL = argv[1] if len(argv) > 0 else '' # youtube-dl error message will be shown if '' # fetch video title FILES[File.OUTPUT] = check_output(('/usr/bin/env', 'youtube-dl', '--get-title', argv[1])).decode('utf-8').strip() + '.mp4' # ask what to do if output exists if exists(FILES[File.OUTPUT]): answer = yes_no_question('A file named "{}" already exists! Overwrite?'.format(FILES[File.OUTPUT]), default='no') if not answer: print_opt('Exiting!') exit() else: remove(FILES[File.OUTPUT]) # download streams and update FILE dict with extensions downloadStreams() readExtensions() # get stream lengths via ffprobe audioData= getDuration(getCmdStdErr(('/usr/bin/env', 'ffprobe', FILES[Stream.AUDIO]))).split(':') videoData = getDuration(getCmdStdErr(('/usr/bin/env', 'ffprobe', FILES[Stream.VIDEO]))).split(':') audioLen = timedelta(hours=float(audioData[0]), minutes=float(audioData[1]), seconds=float(audioData[2])) videoLen = timedelta(hours=float(videoData[0]), minutes=float(videoData[1]), seconds=float(videoData[2])) # decide which stream needs some looping longer = audioLen if audioLen > videoLen else videoLen shorter = audioLen if audioLen < videoLen else videoLen shorterFile = FILES[Stream.AUDIO] if audioLen < videoLen else FILES[Stream.VIDEO] FILES[File.LOOP] += splitext(shorterFile)[1] timesLoop = ceil(longer.seconds / shorter.seconds) # write concat helper file for ffmpeg with open(FILES[File.LIST], 'w') as f: for i in range(timesLoop): print("file '{}'".format(shorterFile), file=f) # loop & mux print_opt('Looping shorter stream... ', end='', flush=True) call(('/usr/bin/env', 'ffmpeg', '-f', 'concat', '-i', FILES[File.LIST], '-c', 'copy', FILES[File.LOOP]), stdout=DEVNULL, stderr=DEVNULL) print_opt('Done!') print_opt('Muxing streams... ', end='', flush=True) call(('/usr/bin/env', 'ffmpeg', '-i', FILES[File.LOOP], '-i', FILES[Stream.AUDIO], '-map', '0', '-map', '1', '-c', 'copy', FILES[File.OUTPUT]), stdout=DEVNULL, stderr=DEVNULL) print_opt('Done!') # cleanup for key in FILES: if key not in OUTPUT_KEYS: remove(FILES[key])