coub-dl/coub-dl.py

154 lines
5.0 KiB
Python

from subprocess import call, Popen, PIPE, check_output, DEVNULL
from os import listdir, remove
from os.path import splitext, exists
from re import match
from enum import Enum
from datetime import timedelta
from math import ceil
from argparse import ArgumentParser
# parse arguments
parser = ArgumentParser(description='Download player-looped videos with youtube-dl & ffmpeg.')
parser.add_argument('-nv', '--nonverbose', help='Turn off non-critical messages to user.')
parser.add_argument('-e', '--extension', default='mp4', help='Set the container to use for the output.')
parser.add_argument('url', type=str, help='The URL of the site containing the video to download.')
args = parser.parse_args()
args.extension = '.' + args.extension
VERBOSE = False if args.nonverbose else True
class Stream(Enum):
AUDIO = 1
VIDEO = 2
class File(Enum):
LIST = 1
LOOP = 2
OUTPUT = 3
def print_opt(*args, **kwargs):
if VERBOSE:
print(*args, **kwargs)
def getCmdStdErr(command):
process = Popen(command, stderr=PIPE, stdout=PIPE)
out, err = process.communicate()
return err
def getDuration(ffprobe_output):
durationPattern = r'.*Duration:\s(.+),\sstart.*'
regex = match(durationPattern, str(ffprobe_output))
duration = regex.groups()[0] if regex else None
if not duration:
raise ValueError('Cannot process ffprobe output!')
return duration
def downloadStreams():
print_opt('Downloading audio stream... ', end='', flush=True)
call(('youtube-dl', '--ignore-config',
'--extract-audio',
'--output', '{}.%(ext)s'.format(FILES[Stream.AUDIO]),
URL),
stdout=DEVNULL, stderr=DEVNULL)
print_opt('Done!')
print_opt('Downloading video stream... ', end='', flush=True)
call(('youtube-dl', '--ignore-config',
'--output', '{}.%(ext)s'.format(FILES[Stream.VIDEO]),
URL),
stdout=DEVNULL, stderr=DEVNULL)
print_opt('Done!')
def readExtensions():
for file in listdir():
for filename in FILES:
if match('^{}.*'.format(FILES[filename]), file):
FILES[filename] = file
def yes_no_question(question, default):
valid = {"yes": True, "y": True, "ye": True,
"no": False, "n": False}
if default is None:
prompt = " [y/n] "
elif default == "yes":
prompt = " [Y/n] "
elif default == "no":
prompt = " [y/N] "
else:
raise ValueError("Invalid default answer: {}!".format(default))
while True:
print(question + prompt)
choice = input().lower()
if default is not None and choice == '':
return valid[default]
elif choice in valid:
return valid[choice]
else:
print("Please respond with 'yes'(y) or 'no'(n)!")
FILES = {Stream.AUDIO: 'audio', Stream.VIDEO: 'video',
File.LIST: 'list.txt', File.LOOP: 'loop', File.OUTPUT: 'output'+args.extension}
OUTPUT_KEYS = [File.OUTPUT]
URL = args.url
# fetch video title
FILES[File.OUTPUT] = check_output(('youtube-dl', '--get-title', args.url)).decode('utf-8').strip() + args.extension
# ask what to do if output exists
if exists(FILES[File.OUTPUT]):
answer = yes_no_question('A file named "{}" already exists! Overwrite?'.format(FILES[File.OUTPUT]), default='no')
if not answer:
print_opt('Exiting!')
exit()
else:
remove(FILES[File.OUTPUT])
# download streams and update FILE dict with extensions
downloadStreams()
readExtensions()
# get stream lengths via ffprobe
audioData= getDuration(getCmdStdErr(('ffprobe', FILES[Stream.AUDIO]))).split(':')
videoData = getDuration(getCmdStdErr(('ffprobe', FILES[Stream.VIDEO]))).split(':')
audioLen = timedelta(hours=float(audioData[0]), minutes=float(audioData[1]), seconds=float(audioData[2]))
videoLen = timedelta(hours=float(videoData[0]), minutes=float(videoData[1]), seconds=float(videoData[2]))
# decide which stream needs some looping
longer = audioLen if audioLen > videoLen else videoLen
shorter = audioLen if audioLen < videoLen else videoLen
shorterFile = FILES[Stream.AUDIO] if audioLen < videoLen else FILES[Stream.VIDEO]
FILES[File.LOOP] += splitext(shorterFile)[1]
timesLoop = ceil(longer.seconds / shorter.seconds)
# write concat helper file for ffmpeg
with open(FILES[File.LIST], 'w') as f:
for i in range(timesLoop):
print("file '{}'".format(shorterFile), file=f)
# loop shorter stream
print_opt('Looping shorter stream... ', end='', flush=True)
call(('ffmpeg', '-f', 'concat', '-i', FILES[File.LIST], '-c', 'copy', FILES[File.LOOP]),
stdout=DEVNULL, stderr=DEVNULL)
print_opt('Done!')
# mux with audio
print_opt('Muxing streams... ', end='', flush=True)
call(('ffmpeg', '-i', FILES[File.LOOP],
'-i', FILES[Stream.AUDIO],
'-map', '0:v:0', '-map', '1:a:0',
'-c', 'copy', FILES[File.OUTPUT]),
stdout=DEVNULL, stderr=DEVNULL)
print_opt('Done!')
# cleanup
for key in FILES:
if key not in OUTPUT_KEYS:
remove(FILES[key])