coub-dl/coub-dl.py

232 lines
8.3 KiB
Python
Executable File

#!/usr/bin/env python3
# Copyright © 2017 Kristof Toth <mrtoth@strongds.hu>
# This program is free software. It comes without any warranty, to the extent
# permitted by applicable law. You can redistribute it and/or modify it under the
# terms of the Do What The Fuck You Want To Public License, Version 2,
# as published by Sam Hocevar. See http://www.wtfpl.net/ for more details.
from subprocess import call, Popen, PIPE, check_output, DEVNULL, CalledProcessError
from os import listdir, remove
from os.path import splitext, exists, join
from re import match
from enum import Enum
from datetime import timedelta
from math import floor
from argparse import ArgumentParser
from signal import signal, SIGINT
from sys import exit
from copy import deepcopy
import utility
from utility import call_verbose, print_opt, get_output, temporary_directory, yes_no_question
class Stream(Enum):
AUDIO = 1
VIDEO = 2
class File(Enum):
LIST = 1
LOOP = 2
FRACTION = 3
OUTPUT = 4
class coub_dl:
default_files = {Stream.AUDIO: 'audio', Stream.VIDEO: 'video',
File.LIST: 'list.txt', File.LOOP: 'loop', File.FRACTION: 'fraction',
File.OUTPUT: 'out'}
output_files = {File.OUTPUT}
def __init__(self, url, files_dict, directory):
self._url = url
self._files_dict = files_dict
self._directory = directory
def __call__(self):
# download streams and update FILE dict with extensions
self.download_audio_stream()
self.download_video_stream()
self.read_extensions()
self.fix_video_stream()
# get stream lengths via ffprobe
audioLen = coub_dl.get_length(self._files_dict[Stream.AUDIO])
videoLen = coub_dl.get_length(self._files_dict[Stream.VIDEO])
# decide which stream needs some looping
longer = audioLen if audioLen > videoLen else videoLen
shorter = audioLen if audioLen < videoLen else videoLen
shorterFile = self._files_dict[Stream.AUDIO] if audioLen < videoLen else self._files_dict[Stream.VIDEO]
self._files_dict[File.LOOP] += splitext(shorterFile)[1]
self._files_dict[File.FRACTION] += splitext(shorterFile)[1]
# calculate how many times to loop
times = longer.total_seconds() / shorter.total_seconds()
timesLoop_base = int(floor(times))
timesLoop_fraction = times % 1
# write concat helper file for ffmpeg
with open(self._files_dict[File.LIST], 'w') as f:
for i in range(timesLoop_base):
print("file '{}'".format(shorterFile), file=f)
print("file '{}'".format(self._files_dict[File.FRACTION]), file=f)
# loop & mux streams
self.loop_shorter_stream(shorter, shorterFile, timesLoop_fraction)
self.mux_streams()
@call_verbose(before_message='Downloading audio stream... ')
def download_audio_stream(self):
call(('youtube-dl', '--ignore-config',
'--extract-audio',
'--output', '{}.%(ext)s'.format(self._files_dict[Stream.AUDIO]),
self._url),
stdout=DEVNULL, stderr=DEVNULL)
@call_verbose(before_message='Downloading video stream... ')
def download_video_stream(self):
call(('youtube-dl', '--ignore-config',
'--output', '{}.%(ext)s'.format(self._files_dict[Stream.VIDEO]),
self._url),
stdout=DEVNULL, stderr=DEVNULL)
def read_extensions(self):
for file in listdir(self._directory):
for filename in self._files_dict:
fullname = join(self._directory, file)
if match('^{}\..+$'.format(self._files_dict[filename]), fullname):
self._files_dict[filename] = fullname
def fix_video_stream(self):
""" magic fix for videos served by coub. see https://github.com/rg3/youtube-dl/issues/13754 """
with open(self._files_dict[Stream.VIDEO], 'r+b') as f:
f.seek(0)
f.write(bytes(2))
@call_verbose(before_message='Looping shorter stream... ')
def loop_shorter_stream(self, shorter, shorter_file, loop_fraction):
# prepare last fractional loop
call(('ffmpeg', '-i', shorter_file, '-t', str(loop_fraction * shorter.total_seconds()),
self._files_dict[File.FRACTION]),
stdout=DEVNULL, stderr=DEVNULL)
# concat them
call(('ffmpeg', '-f', 'concat', '-safe', '0', '-i', self._files_dict[File.LIST],
'-c', 'copy', self._files_dict[File.LOOP]),
stdout=DEVNULL, stderr=DEVNULL)
@call_verbose(before_message='Muxing streams... ')
def mux_streams(self):
call(('ffmpeg', '-i', self._files_dict[File.LOOP],
'-i', self._files_dict[Stream.AUDIO],
'-map', '0:v:0', '-map', '1:a:0',
'-c', 'copy', self._files_dict[File.OUTPUT]),
stdout=DEVNULL, stderr=DEVNULL)
@staticmethod
def get_length(file):
data = coub_dl.get_duration(coub_dl.get_command_stderr(('ffprobe', file))).split(':')
return timedelta(hours=float(data[0]), minutes=float(data[1]), seconds=float(data[2]))
@staticmethod
def get_command_stderr(command):
process = Popen(command, stderr=PIPE, stdout=PIPE)
out, err = process.communicate()
return err
@staticmethod
def get_duration(ffprobe_output):
durationPattern = r'.*Duration:\s(.+),\sstart.*'
regex = match(durationPattern, str(ffprobe_output))
duration = regex.groups()[0] if regex else None
if not duration:
raise ValueError('Cannot process ffprobe output!')
return duration
@staticmethod
def get_title(url):
return get_output(('youtube-dl', '--get-title', url))
@staticmethod
@call_verbose(before_message='Checking your system for dependencies... ', after_message='Found all!')
def check_for_dependencies():
check_for = (('youtube-dl', '--version'), ('ffmpeg', '-version'))
error_str = '\nMissing dependencies: {}'
missing = []
for command in check_for:
try: check_output(command)
except (CalledProcessError, FileNotFoundError): missing.append(command[0])
if missing: exit(error_str.format(', '.join(missing)))
def run(URL, output, extension):
# create dict that contains files used
FILES = deepcopy(coub_dl.default_files)
determine_output_filename(URL, output, extension, FILES)
# ask what to do if output exists
if exists(FILES[File.OUTPUT]):
answer = yes_no_question('A file named "{}" already exists! Overwrite?'.format(FILES[File.OUTPUT]),
default='no')
if not answer:
print_opt('Exiting!')
exit()
else:
remove(FILES[File.OUTPUT])
# create temporary directory to work in
with temporary_directory() as dir:
# update temporary file locations in FILES dict
for key in {key: FILES[key] for key in FILES if key not in coub_dl.output_files}:
FILES[key] = join(dir, FILES[key])
coub_dl(URL, FILES, dir)()
def determine_output_filename(url, user_supplied, extension, files_dict):
if user_supplied is None:
files_dict[File.OUTPUT] = coub_dl.get_title(url)
else:
files_dict[File.OUTPUT] = user_supplied
files_dict[File.OUTPUT] += extension
def parse_cmd_arguments():
parser = ArgumentParser(description='Download player-looped videos with youtube-dl & ffmpeg.')
parser.add_argument('-nv', '--nonverbose', action='store_true', help='Turn off non-critical messages to user')
parser.add_argument('-o', '--output', default=None, help='Specify name of the output file (use -e for extension)')
parser.add_argument('-e', '--extension', default='mp4', help='Set the container to use for the output')
parser.add_argument('URLs', type=str, nargs='+', help='The URLs of the sites containing the videos to download')
args = parser.parse_args()
args.extension = '.' + args.extension
return args
if __name__ == '__main__':
signal(SIGINT, lambda a, b: exit('\nExiting!'))
args = parse_cmd_arguments()
utility.VERBOSE = False if args.nonverbose else True
coub_dl.check_for_dependencies()
for url in set(args.URLs):
print_opt('\nCreating video from {}'.format(url))
run(url, args.output, args.extension)