#!/usr/bin/env python3 # pylint: disable=consider-using-with # (this code contains some IO stream juggling) from sys import stdin from sys import exit as sysexit from io import BytesIO from subprocess import Popen, PIPE from pathlib import Path import click from blake3 import blake3 from identicon import Identicon DIGEST_SIZE = 20 BUF_SIZE = 65536 # Linux default pipe capacity is 64KiB (64 * 2^10) @click.command( help=( 'Print OpenSSH style randomart identicon for arbitrary data.\n\n' 'If TEXT or --file is not supplied, data is read from STDIN.' ) ) @click.argument('text', default=None, type=str, required=False) @click.option( '--file', '-f', default=None, type=click.Path(exists=True), help='Calculate from file or directory (recursive).' ) @click.option( '--fingerprint', '-p', default=False, required=False, is_flag=True, help='Print fingerprint instead of identicon.' ) def main(**kwargs): if not (stream := get_input_stream(kwargs)): print_usage_and_exit() digest = get_digest(stream.stream) stream.close() if not kwargs.get('fingerprint'): i = Identicon(digest) i.calculate() print(i) else: print(digest.hex()) def get_input_stream(kwargs): stream = None if (text := kwargs['text']) is not None: stream = ClosableStream(BytesIO(text.encode())) elif file := kwargs['file']: stream = get_deterministic_stream(file) elif not stdin.isatty(): stream = ClosableStream(stdin.buffer) return stream class ClosableStream: def __init__(self, stream, close_func=None): self.stream = stream self._close_func = close_func or (lambda: None) def close(self): return self._close_func() def get_deterministic_stream(file): if Path(file).is_dir(): return get_deterministic_tar_stream(file) ifile = open(file, 'rb') return ClosableStream(ifile, ifile.close) def get_deterministic_tar_stream(file): cmd = ( 'tar', f'--blocking-factor={BUF_SIZE//512}', '--sort=name', '--mtime=UTC 1970-01-01', '--owner=root:0', '--group=root:0', '--numeric-owner', '--mode=a=rwX', '--no-acls', '--no-xattrs', '--no-selinux', '-C', file, '-cf', '-', '.' ) p = Popen(cmd, stdout=PIPE, stderr=PIPE) def wait_and_check_exitcode(): exit_code = p.wait() if exit_code != 0: raise RuntimeError(f'Tar failed: {p.stderr.read().decode()}') return ClosableStream(p.stdout, wait_and_check_exitcode) def print_usage_and_exit(): command = main with click.Context(command) as ctx: click.echo(command.get_help(ctx)) sysexit(1) def get_digest(stream): # pylint: disable=not-callable hasher = blake3() while data := stream.read(BUF_SIZE): hasher.update(data) return hasher.digest(length=DIGEST_SIZE) if __name__ == '__main__': main()