diff --git a/main.py b/main.py old mode 100644 new mode 100755 index 289ef55..8b424c6 --- a/main.py +++ b/main.py @@ -1,37 +1,87 @@ -from sys import argv, stdin +#!/usr/bin/env python3 +from sys import stdin from sys import exit as sysexit from hashlib import blake2b from io import BytesIO +from subprocess import Popen, PIPE + +import click from identicon import Identicon DIGEST_SIZE = 20 -BUF_SIZE = 65536 +BUF_SIZE = 65536 # Linux default pipe capacity is 64KiB (64 * 2^10) -def main(): - if not (stream := get_input_stream()): +@click.command( + help=( + 'Print OpenSSH style randomart identicon for arbitrary data.\n\n' + 'If TEXT or --file is not supplied, data is read from STDIN.' + ) +) +@click.argument('text', default=None, type=str, required=False) +@click.option( + '--file', '-f', default=None, type=click.Path(exists=True), + help='Calculate from file or directory (recursive).' +) +def main(**kwargs): + if not (stream := get_input_stream(kwargs)): print_usage_and_exit() - i = Identicon(get_digest(stream)) + digest = get_digest(stream.stream) + stream.close() + + i = Identicon(digest) i.calculate() print(i) -def get_input_stream(): - io = None - if len(argv) == 2: - io = BytesIO(argv[1].encode()) +def get_input_stream(kwargs): + stream = None + if (text := kwargs["text"]) is not None: + stream = ClosableStream(BytesIO(text.encode())) + elif file := kwargs["file"]: + stream = get_deterministic_tar_stream(file) elif not stdin.isatty(): - io = stdin.buffer - return io + stream = ClosableStream(stdin.buffer) + return stream + + +class ClosableStream: + def __init__(self, stream, close_func=None): + self.stream = stream + self._close_func = close_func or (lambda: None) + + def close(self): + return self._close_func() + + +def get_deterministic_tar_stream(file): + # pylint: disable=consider-using-with + cmd = ( + 'tar', + f'--blocking-factor={BUF_SIZE//512}', + '--sort=name', + '--mtime=UTC 1970-01-01', + '--owner=root:0', '--group=root:0', '--numeric-owner', + '--mode=a=rwX', '--no-acls', '--no-xattrs', '--no-selinux', + '-C', file, '-cf', '-', '.' + ) + p = Popen(cmd, stdout=PIPE, stderr=PIPE) + + def wait_and_check_exitcode(): + exit_code = p.wait() + if exit_code != 0: + raise RuntimeError(f"Tar failed: {p.stderr.read().decode()}") + + return ClosableStream(p.stdout, wait_and_check_exitcode) def print_usage_and_exit(): - print('Usage: identicon [TEXT]') - print('Print OpenSSH style randomart identicon for arbitrary data.\n') - print('If TEXT is not supplied, data is read from STDIN.') + command = main + with click.Context(command) as ctx: + click.echo(command.get_help(ctx)) sysexit(1)