identicon/main.py

109 lines
2.7 KiB
Python
Executable File

#!/usr/bin/env python3
# pylint: disable=consider-using-with
# (this code contains some IO stream juggling)
from sys import stdin
from sys import exit as sysexit
from io import BytesIO
from subprocess import Popen, PIPE
from pathlib import Path
import click
from blake3 import blake3
from identicon import Identicon
DIGEST_SIZE = 20
BUF_SIZE = 65536 # Linux default pipe capacity is 64KiB (64 * 2^10)
@click.command(
help=(
'Print OpenSSH style randomart identicon for arbitrary data.\n\n'
'If TEXT or --file is not supplied, data is read from STDIN.'
)
)
@click.argument('text', default=None, type=str, required=False)
@click.option(
'--file', '-f', default=None, type=click.Path(exists=True),
help='Calculate from file or directory (recursive).'
)
def main(**kwargs):
if not (stream := get_input_stream(kwargs)):
print_usage_and_exit()
digest = get_digest(stream.stream)
stream.close()
i = Identicon(digest)
i.calculate()
print(i)
def get_input_stream(kwargs):
stream = None
if (text := kwargs['text']) is not None:
stream = ClosableStream(BytesIO(text.encode()))
elif file := kwargs['file']:
stream = get_deterministic_stream(file)
elif not stdin.isatty():
stream = ClosableStream(stdin.buffer)
return stream
class ClosableStream:
def __init__(self, stream, close_func=None):
self.stream = stream
self._close_func = close_func or (lambda: None)
def close(self):
return self._close_func()
def get_deterministic_stream(file):
if Path(file).is_dir():
return get_deterministic_tar_stream(file)
ifile = open(file, 'rb')
return ClosableStream(ifile, ifile.close)
def get_deterministic_tar_stream(file):
cmd = (
'tar',
f'--blocking-factor={BUF_SIZE//512}',
'--sort=name',
'--mtime=UTC 1970-01-01',
'--owner=root:0', '--group=root:0', '--numeric-owner',
'--mode=a=rwX', '--no-acls', '--no-xattrs', '--no-selinux',
'-C', file, '-cf', '-', '.'
)
p = Popen(cmd, stdout=PIPE, stderr=PIPE)
def wait_and_check_exitcode():
exit_code = p.wait()
if exit_code != 0:
raise RuntimeError(f'Tar failed: {p.stderr.read().decode()}')
return ClosableStream(p.stdout, wait_and_check_exitcode)
def print_usage_and_exit():
command = main
with click.Context(command) as ctx:
click.echo(command.get_help(ctx))
sysexit(1)
def get_digest(stream):
# pylint: disable=not-callable
hasher = blake3()
while data := stream.read(BUF_SIZE):
hasher.update(data)
return hasher.digest(length=DIGEST_SIZE)
if __name__ == '__main__':
main()