continued refactor streak

This commit is contained in:
Kjistóf 2017-09-02 19:25:35 +02:00
parent 720b9af4d8
commit 67fc2194fe
2 changed files with 130 additions and 123 deletions

View File

@ -13,12 +13,10 @@ from enum import Enum
from datetime import timedelta
from math import floor
from argparse import ArgumentParser
from functools import wraps
from tempfile import mkdtemp
from shutil import rmtree
from signal import signal, SIGINT
from sys import exit
import utility
from utility import call_verbose, print_opt, get_output, temporary_directory, yes_no_question
class Stream(Enum):
@ -32,24 +30,11 @@ class File(Enum):
OUTPUT = 4
def call_verbose(before_message='', after_message='Done!'):
def tag(f):
@wraps(f)
def wrapper(*args, **kwargs):
print_opt(before_message, end='', flush=True)
f(*args, **kwargs)
print_opt(after_message)
return wrapper
return tag
FILES_DICT_DEFAULT = {Stream.AUDIO: 'audio', Stream.VIDEO: 'video',
File.LIST: 'list.txt', File.LOOP: 'loop', File.FRACTION: 'fraction',
File.OUTPUT: 'out'}
def print_opt(*args, **kwargs):
if VERBOSE:
print(*args, **kwargs)
def get_output(*args, **kwargs):
return check_output(*args, **kwargs).decode().rstrip('\n')
OUTPUT_FILES = {File.OUTPUT}
class coub_dl:
@ -66,8 +51,8 @@ class coub_dl:
self.read_extensions()
# get stream lengths via ffprobe
audioLen = get_length(self._files_dict[Stream.AUDIO])
videoLen = get_length(self._files_dict[Stream.VIDEO])
audioLen = coub_dl.get_length(self._files_dict[Stream.AUDIO])
videoLen = coub_dl.get_length(self._files_dict[Stream.VIDEO])
# decide which stream needs some looping
longer = audioLen if audioLen > videoLen else videoLen
@ -78,7 +63,7 @@ class coub_dl:
# calculate how many times to loop
times = longer.total_seconds() / shorter.total_seconds()
timesLoop_base = floor(times)
timesLoop_base = int(floor(times))
timesLoop_fraction = times % 1
# write concat helper file for ffmpeg
@ -154,117 +139,51 @@ class coub_dl:
stdout=DEVNULL, stderr=DEVNULL)
def determine_output_filename(url, user_supplied, extension, files_dict):
if user_supplied is None:
files_dict[File.OUTPUT] = get_output(('youtube-dl', '--get-title', url))
else:
files_dict[File.OUTPUT] = user_supplied
files_dict[File.OUTPUT] += extension
@staticmethod
def get_length(file):
data = coub_dl.get_duration(coub_dl.get_command_stderr(('ffprobe', file))).split(':')
return timedelta(hours=float(data[0]), minutes=float(data[1]), seconds=float(data[2]))
def get_length(file):
data = get_duration(get_command_stderr(('ffprobe', file))).split(':')
return timedelta(hours=float(data[0]), minutes=float(data[1]), seconds=float(data[2]))
@staticmethod
def get_command_stderr(command):
process = Popen(command, stderr=PIPE, stdout=PIPE)
out, err = process.communicate()
return err
def get_command_stderr(command):
process = Popen(command, stderr=PIPE, stdout=PIPE)
out, err = process.communicate()
return err
@staticmethod
def get_duration(ffprobe_output):
durationPattern = r'.*Duration:\s(.+),\sstart.*'
regex = match(durationPattern, str(ffprobe_output))
duration = regex.groups()[0] if regex else None
if not duration:
raise ValueError('Cannot process ffprobe output!')
return duration
def get_duration(ffprobe_output):
durationPattern = r'.*Duration:\s(.+),\sstart.*'
regex = match(durationPattern, str(ffprobe_output))
duration = regex.groups()[0] if regex else None
if not duration:
raise ValueError('Cannot process ffprobe output!')
return duration
@staticmethod
def get_title(url):
return get_output(('youtube-dl', '--get-title', url))
@call_verbose(before_message='Checking your system for dependencies... ', after_message='Found all!')
def check_for_dependencies():
check_for = (('youtube-dl', '--version'), ('ffmpeg', '-version'), ('curl', '--version'))
error_str = '\nMissing dependencies: {}'
missing = []
@staticmethod
@call_verbose(before_message='Checking your system for dependencies... ', after_message='Found all!')
def check_for_dependencies():
check_for = (('youtube-dl', '--version'), ('ffmpeg', '-version'), ('curl', '--version'))
error_str = '\nMissing dependencies: {}'
missing = []
for command in check_for:
try: check_output(command)
except (CalledProcessError, FileNotFoundError): missing.append(command[0])
for command in check_for:
try: check_output(command)
except (CalledProcessError, FileNotFoundError): missing.append(command[0])
if missing: exit(error_str.format(', '.join(missing)))
if missing: exit(error_str.format(', '.join(missing)))
def build_default_files_dict():
return {Stream.AUDIO: 'audio', Stream.VIDEO: 'video',
File.LIST: 'list.txt', File.LOOP: 'loop', File.FRACTION: 'fraction',
File.OUTPUT: ''}, [File.OUTPUT]
def parse_cmd_arguments():
parser = ArgumentParser(description='Download player-looped videos with youtube-dl & ffmpeg.')
parser.add_argument('-nv', '--nonverbose', action='store_true', help='Turn off non-critical messages to user')
parser.add_argument('-o', '--output', default=None, help='Specify name of the output file (use -e for extension)')
parser.add_argument('-e', '--extension', default='mp4', help='Set the container to use for the output')
parser.add_argument('url', type=str, help='The URL of the site containing the video to download')
args = parser.parse_args()
args.extension = '.' + args.extension
return args
def yes_no_question(question, default):
valid = {"yes": True, "y": True, "ye": True,
"no": False, "n": False}
if default is None:
prompt = " [y/n] "
elif default == "yes":
prompt = " [Y/n] "
elif default == "no":
prompt = " [y/N] "
else:
raise ValueError("Invalid default answer: {}!".format(default))
while True:
print(question + prompt)
choice = input().lower()
if default is not None and choice == '':
return valid[default]
elif choice in valid:
return valid[choice]
else:
print("Please respond with 'yes'(y) or 'no'(n)!")
# tempfile.TemporaryDirectory replacement to provide backwards compatibility
class temporary_directory:
def __enter__(self):
self.name = mkdtemp()
return self.name
def __exit__(self, exc_type, exc_val, exc_tb):
rmtree(self.name)
@call_verbose(before_message='\nExiting!\n', after_message='')
def sigint_handler(signal, frame):
exit()
if __name__ == '__main__':
signal(SIGINT, sigint_handler)
args = parse_cmd_arguments()
VERBOSE = False if args.nonverbose else True
check_for_dependencies()
def run(URL, args):
# create dict that contains files used
FILES, OUTPUT_KEYS = build_default_files_dict()
URL = args.url
FILES = FILES_DICT_DEFAULT
determine_output_filename(URL, args.output, args.extension, FILES)
# ask what to do if output exists
@ -280,7 +199,37 @@ if __name__ == '__main__':
# create temporary directory to work in
with temporary_directory() as dir:
# update temporary file locations in FILES dict
for key in {key: FILES[key] for key in FILES if key not in OUTPUT_KEYS}:
for key in {key: FILES[key] for key in FILES if key not in OUTPUT_FILES}:
FILES[key] = join(dir, FILES[key])
coub_dl(URL, FILES, dir)()
def determine_output_filename(url, user_supplied, extension, files_dict):
if user_supplied is None:
files_dict[File.OUTPUT] = coub_dl.get_title(url)
else:
files_dict[File.OUTPUT] = user_supplied
files_dict[File.OUTPUT] += extension
def parse_cmd_arguments():
parser = ArgumentParser(description='Download player-looped videos with youtube-dl & ffmpeg.')
parser.add_argument('-nv', '--nonverbose', action='store_true', help='Turn off non-critical messages to user')
parser.add_argument('-o', '--output', default=None, help='Specify name of the output file (use -e for extension)')
parser.add_argument('-e', '--extension', default='mp4', help='Set the container to use for the output')
parser.add_argument('url', type=str, help='The URL of the site containing the video to download')
args = parser.parse_args()
args.extension = '.' + args.extension
return args
if __name__ == '__main__':
signal(SIGINT, lambda a, b: exit('\nExiting!'))
args = parse_cmd_arguments()
utility.VERBOSE = False if args.nonverbose else True
coub_dl.check_for_dependencies()
run(args.url, args)

58
utility.py Normal file
View File

@ -0,0 +1,58 @@
from functools import wraps
from tempfile import mkdtemp
from shutil import rmtree
from subprocess import check_output
VERBOSE = True
def call_verbose(before_message='', after_message='Done!'):
def tag(f):
@wraps(f)
def wrapper(*args, **kwargs):
print_opt(before_message, end='', flush=True)
f(*args, **kwargs)
print_opt(after_message)
return wrapper
return tag
def print_opt(*args, **kwargs):
if VERBOSE:
print(*args, **kwargs)
def yes_no_question(question, default):
valid = {"yes": True, "y": True, "ye": True,
"no": False, "n": False}
if default is None:
prompt = " [y/n] "
elif default == "yes":
prompt = " [Y/n] "
elif default == "no":
prompt = " [y/N] "
else:
raise ValueError("Invalid default answer: {}!".format(default))
while True:
print(question + prompt)
choice = input().lower()
if default is not None and choice == '':
return valid[default]
elif choice in valid:
return valid[choice]
else:
print("Please respond with 'yes'(y) or 'no'(n)!")
# tempfile.TemporaryDirectory replacement to provide backwards compatibility
class temporary_directory:
def __enter__(self):
self.name = mkdtemp()
return self.name
def __exit__(self, exc_type, exc_val, exc_tb):
rmtree(self.name)
def get_output(*args, **kwargs):
return check_output(*args, **kwargs).decode().rstrip('\n')