#!/usr/bin/env python3 from sys import stdin from sys import exit as sysexit from hashlib import blake2b from io import BytesIO from subprocess import Popen, PIPE import click from identicon import Identicon DIGEST_SIZE = 20 BUF_SIZE = 65536 # Linux default pipe capacity is 64KiB (64 * 2^10) @click.command( help=( 'Print OpenSSH style randomart identicon for arbitrary data.\n\n' 'If TEXT or --file is not supplied, data is read from STDIN.' ) ) @click.argument('text', default=None, type=str, required=False) @click.option( '--file', '-f', default=None, type=click.Path(exists=True), help='Calculate from file or directory (recursive).' ) def main(**kwargs): if not (stream := get_input_stream(kwargs)): print_usage_and_exit() digest = get_digest(stream.stream) stream.close() i = Identicon(digest) i.calculate() print(i) def get_input_stream(kwargs): stream = None if (text := kwargs["text"]) is not None: stream = ClosableStream(BytesIO(text.encode())) elif file := kwargs["file"]: stream = get_deterministic_tar_stream(file) elif not stdin.isatty(): stream = ClosableStream(stdin.buffer) return stream class ClosableStream: def __init__(self, stream, close_func=None): self.stream = stream self._close_func = close_func or (lambda: None) def close(self): return self._close_func() def get_deterministic_tar_stream(file): # pylint: disable=consider-using-with cmd = ( 'tar', f'--blocking-factor={BUF_SIZE//512}', '--sort=name', '--mtime=UTC 1970-01-01', '--owner=root:0', '--group=root:0', '--numeric-owner', '--mode=a=rwX', '--no-acls', '--no-xattrs', '--no-selinux', '-C', file, '-cf', '-', '.' ) p = Popen(cmd, stdout=PIPE, stderr=PIPE) def wait_and_check_exitcode(): exit_code = p.wait() if exit_code != 0: raise RuntimeError(f"Tar failed: {p.stderr.read().decode()}") return ClosableStream(p.stdout, wait_and_check_exitcode) def print_usage_and_exit(): command = main with click.Context(command) as ctx: click.echo(command.get_help(ctx)) sysexit(1) def get_digest(stream): hasher = blake2b(digest_size=DIGEST_SIZE) while data := stream.read(BUF_SIZE): hasher.update(data) return hasher.digest() if __name__ == '__main__': main()