now stuff are written to a temporary folder. also did some refactoring.

This commit is contained in:
Kjistóf 2017-02-05 22:53:08 +01:00
parent abc7d96b04
commit 6fae9697fd

View File

@ -6,8 +6,8 @@ from enum import Enum
from datetime import timedelta from datetime import timedelta
from math import floor from math import floor
from argparse import ArgumentParser from argparse import ArgumentParser
from functools import wraps, partial from functools import wraps
from atexit import register from tempfile import TemporaryDirectory
@ -70,11 +70,12 @@ def download_video_stream(url, file_dict):
stdout=DEVNULL, stderr=DEVNULL) stdout=DEVNULL, stderr=DEVNULL)
def read_extensions(file_dict): def read_extensions(file_dict, directory):
for file in listdir(): for file in listdir(directory):
for filename in file_dict: for filename in file_dict:
if match('^{}.*'.format(file_dict[filename]), file): fullname = '{}/{}'.format(directory, file)
file_dict[filename] = file if match('^{}.*'.format(file_dict[filename]), fullname):
file_dict[filename] = fullname
@call_verbose(before_message='Looping shorter stream... ') @call_verbose(before_message='Looping shorter stream... ')
@ -84,7 +85,7 @@ def loop_shorter_stream(file_dict, shorter, shorter_file, loop_fraction):
stdout=DEVNULL, stderr=DEVNULL) stdout=DEVNULL, stderr=DEVNULL)
# concat them # concat them
call(('ffmpeg', '-f', 'concat', '-i', file_dict[File.LIST], call(('ffmpeg', '-f', 'concat', '-safe', '0', '-i', file_dict[File.LIST],
'-c', 'copy', file_dict[File.LOOP]), '-c', 'copy', file_dict[File.LOOP]),
stdout=DEVNULL, stderr=DEVNULL) stdout=DEVNULL, stderr=DEVNULL)
@ -98,15 +99,6 @@ def mux_streams(file_dict):
stdout=DEVNULL, stderr=DEVNULL) stdout=DEVNULL, stderr=DEVNULL)
def cleanup(file_dict, outputs):
for key in file_dict:
if key not in outputs:
try:
remove(file_dict[key])
except FileNotFoundError:
pass
def get_length(file): def get_length(file):
data = get_duration(get_command_stderr(('ffprobe', file))).split(':') data = get_duration(get_command_stderr(('ffprobe', file))).split(':')
return timedelta(hours=float(data[0]), minutes=float(data[1]), seconds=float(data[2])) return timedelta(hours=float(data[0]), minutes=float(data[1]), seconds=float(data[2]))
@ -135,71 +127,76 @@ def yes_no_question(question, default):
print("Please respond with 'yes'(y) or 'no'(n)!") print("Please respond with 'yes'(y) or 'no'(n)!")
def run(url, files_dict, directory):
# download streams and update FILE dict with extensions
download_audio_stream(url, files_dict)
download_video_stream(url, files_dict)
read_extensions(files_dict, directory)
# parse arguments # get stream lengths via ffprobe
parser = ArgumentParser(description='Download player-looped videos with youtube-dl & ffmpeg.') audioLen = get_length(files_dict[Stream.AUDIO])
parser.add_argument('-nv', '--nonverbose', action='store_true', help='Turn off non-critical messages to user.') videoLen = get_length(files_dict[Stream.VIDEO])
parser.add_argument('-o', '--output', default=None, help='Specify name of the output file (use -e for extension).')
parser.add_argument('-e', '--extension', default='mp4', help='Set the container to use for the output.')
parser.add_argument('url', type=str, help='The URL of the site containing the video to download.')
args = parser.parse_args() # decide which stream needs some looping
args.extension = '.' + args.extension longer = audioLen if audioLen > videoLen else videoLen
VERBOSE = False if args.nonverbose else True shorter = audioLen if audioLen < videoLen else videoLen
shorterFile = files_dict[Stream.AUDIO] if audioLen < videoLen else files_dict[Stream.VIDEO]
files_dict[File.LOOP] += splitext(shorterFile)[1]
files_dict[File.FRACTION] += splitext(shorterFile)[1]
times = longer.total_seconds() / shorter.total_seconds()
timesLoop_base = floor(times)
timesLoop_fraction = times % 1
# write concat helper file for ffmpeg
with open(files_dict[File.LIST], 'w') as f:
for i in range(timesLoop_base):
print("file '{}'".format(shorterFile), file=f)
print("file '{}'".format(files_dict[File.FRACTION]), file=f)
loop_shorter_stream(files_dict, shorter, shorterFile, timesLoop_fraction)
mux_streams(files_dict)
FILES = {Stream.AUDIO: 'audio', Stream.VIDEO: 'video',
if __name__ == '__main__':
# parse arguments
parser = ArgumentParser(description='Download player-looped videos with youtube-dl & ffmpeg.')
parser.add_argument('-nv', '--nonverbose', action='store_true', help='Turn off non-critical messages to user.')
parser.add_argument('-o', '--output', default=None, help='Specify name of the output file (use -e for extension).')
parser.add_argument('-e', '--extension', default='mp4', help='Set the container to use for the output.')
parser.add_argument('url', type=str, help='The URL of the site containing the video to download.')
args = parser.parse_args()
args.extension = '.' + args.extension
VERBOSE = False if args.nonverbose else True
FILES = {Stream.AUDIO: 'audio', Stream.VIDEO: 'video',
File.LIST: 'list.txt', File.LOOP: 'loop', File.FRACTION: 'fraction', File.LIST: 'list.txt', File.LOOP: 'loop', File.FRACTION: 'fraction',
File.OUTPUT: 'output'+args.extension} File.OUTPUT: 'output'+args.extension}
OUTPUT_KEYS = [File.OUTPUT] OUTPUT_KEYS = [File.OUTPUT]
URL = args.url URL = args.url
# fetch video title if no filename was specified
# clean up on exit if args.output is None:
register(partial(cleanup, FILES, OUTPUT_KEYS))
# fetch video title if no filename was specified
if args.output is None:
FILES[File.OUTPUT] = check_output(('youtube-dl', '--get-title', args.url)).decode('utf-8').strip() FILES[File.OUTPUT] = check_output(('youtube-dl', '--get-title', args.url)).decode('utf-8').strip()
else: else:
FILES[File.OUTPUT] = args.output FILES[File.OUTPUT] = args.output
FILES[File.OUTPUT] += args.extension FILES[File.OUTPUT] += args.extension
# ask what to do if output exists # ask what to do if output exists
if exists(FILES[File.OUTPUT]): if exists(FILES[File.OUTPUT]):
answer = yes_no_question('A file named "{}" already exists! Overwrite?'.format(FILES[File.OUTPUT]), default='no') answer = yes_no_question('A file named "{}" already exists! Overwrite?'.format(FILES[File.OUTPUT]),
default='no')
if not answer: if not answer:
print_opt('Exiting!') print_opt('Exiting!')
exit() exit()
else: else:
remove(FILES[File.OUTPUT]) remove(FILES[File.OUTPUT])
# download streams and update FILE dict with extensions with TemporaryDirectory() as dir:
download_audio_stream(URL, FILES) for key in FILES:
download_video_stream(URL, FILES) if key not in OUTPUT_KEYS:
read_extensions(FILES) FILES[key] = '{}/{}'.format(dir, FILES[key])
# get stream lengths via ffprobe run(URL, FILES, dir)
audioLen = get_length(FILES[Stream.AUDIO])
videoLen = get_length(FILES[Stream.VIDEO])
# decide which stream needs some looping
longer = audioLen if audioLen > videoLen else videoLen
shorter = audioLen if audioLen < videoLen else videoLen
shorterFile = FILES[Stream.AUDIO] if audioLen < videoLen else FILES[Stream.VIDEO]
FILES[File.LOOP] += splitext(shorterFile)[1]
FILES[File.FRACTION] += splitext(shorterFile)[1]
times = longer.total_seconds() / shorter.total_seconds()
timesLoop_base = floor(times)
timesLoop_fraction = times % 1
# write concat helper file for ffmpeg
with open(FILES[File.LIST], 'w') as f:
for i in range(timesLoop_base):
print("file '{}'".format(shorterFile), file=f)
print("file '{}'".format(FILES[File.FRACTION]), file=f)
loop_shorter_stream(FILES, shorter, shorterFile, timesLoop_fraction)
mux_streams(FILES)