|
|
@ -1,7 +1,8 @@
|
|
|
|
#!/usr/bin/env python3
|
|
|
|
#!/usr/bin/env python3
|
|
|
|
from sys import stdin
|
|
|
|
from sys import stdin, stderr
|
|
|
|
from sys import exit as sysexit
|
|
|
|
from sys import exit as sysexit
|
|
|
|
from io import BytesIO
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
|
|
from shutil import get_terminal_size
|
|
|
|
|
|
|
|
|
|
|
|
import click
|
|
|
|
import click
|
|
|
|
from blake3 import blake3
|
|
|
|
from blake3 import blake3
|
|
|
@ -16,51 +17,68 @@ BUF_SIZE = 65536 # Linux default pipe capacity is 64KiB (64 * 2^10)
|
|
|
|
@click.command(
|
|
|
|
@click.command(
|
|
|
|
help=(
|
|
|
|
help=(
|
|
|
|
'Generate OpenSSH style randomart identicon for arbitrary data.\n\n'
|
|
|
|
'Generate OpenSSH style randomart identicon for arbitrary data.\n\n'
|
|
|
|
'If TEXT or --file is not supplied, data is read from STDIN.'
|
|
|
|
'If PATHS is not supplied, data is read from STDIN.'
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
@click.argument('text', default=None, type=str, required=False)
|
|
|
|
@click.argument('paths', type=str, nargs=-1)
|
|
|
|
@click.option(
|
|
|
|
|
|
|
|
'--file', '-f', default=None, type=click.Path(exists=True),
|
|
|
|
|
|
|
|
help='Calculate from file or directory (recursive).'
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
@click.option(
|
|
|
|
@click.option(
|
|
|
|
'--fingerprint', '-p', default=False, required=False, is_flag=True,
|
|
|
|
'--fingerprint', '-p', default=False, required=False, is_flag=True,
|
|
|
|
help='Print fingerprint instead of identicon.'
|
|
|
|
help='Print fingerprint instead of identicon.'
|
|
|
|
)
|
|
|
|
)
|
|
|
|
def main(**kwargs):
|
|
|
|
@click.option(
|
|
|
|
if not (stream := get_input_stream(kwargs)):
|
|
|
|
'--tty', '-t', default=False, required=False, is_flag=True,
|
|
|
|
|
|
|
|
help='Read from STDIN even if it is a TTY.'
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
def main(paths, **kwargs):
|
|
|
|
|
|
|
|
kwargs['paths'] = paths
|
|
|
|
|
|
|
|
if not (streams := get_input_streams(kwargs)):
|
|
|
|
print_usage_and_exit()
|
|
|
|
print_usage_and_exit()
|
|
|
|
|
|
|
|
|
|
|
|
digest = get_digest(stream.stream)
|
|
|
|
digests = calculate_digests(streams)
|
|
|
|
stream.close()
|
|
|
|
print_output(digests, **kwargs)
|
|
|
|
|
|
|
|
|
|
|
|
if not kwargs.get('fingerprint'):
|
|
|
|
|
|
|
|
i = Identicon(digest)
|
|
|
|
|
|
|
|
i.calculate()
|
|
|
|
|
|
|
|
print(i)
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
print(digest.hex())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_input_stream(kwargs):
|
|
|
|
def get_input_streams(kwargs):
|
|
|
|
stream = None
|
|
|
|
streams = []
|
|
|
|
if (text := kwargs['text']) is not None:
|
|
|
|
if paths := kwargs['paths']:
|
|
|
|
stream = ClosableStream(BytesIO(text.encode()))
|
|
|
|
streams = get_streams_from(paths)
|
|
|
|
elif file := kwargs['file']:
|
|
|
|
elif not stdin.isatty() or kwargs['tty']:
|
|
|
|
stream = get_deterministic_stream(file, BUF_SIZE)
|
|
|
|
streams = [ClosableStream(stdin.buffer)]
|
|
|
|
elif not stdin.isatty():
|
|
|
|
return streams
|
|
|
|
stream = ClosableStream(stdin.buffer)
|
|
|
|
|
|
|
|
return stream
|
|
|
|
|
|
|
|
|
|
|
|
def get_streams_from(paths):
|
|
|
|
|
|
|
|
paths = assert_paths_exist(paths)
|
|
|
|
|
|
|
|
return [
|
|
|
|
|
|
|
|
get_deterministic_stream(path, BUF_SIZE)
|
|
|
|
|
|
|
|
for path in paths
|
|
|
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def assert_paths_exist(paths):
|
|
|
|
|
|
|
|
paths = [Path(path) for path in paths]
|
|
|
|
|
|
|
|
for path in paths:
|
|
|
|
|
|
|
|
if not path.exists():
|
|
|
|
|
|
|
|
print(f'No such file or directory: "{path}"', file=stderr)
|
|
|
|
|
|
|
|
sysexit(1)
|
|
|
|
|
|
|
|
return paths
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def print_usage_and_exit():
|
|
|
|
def print_usage_and_exit():
|
|
|
|
command = main
|
|
|
|
command = main
|
|
|
|
with click.Context(command) as ctx:
|
|
|
|
with click.Context(command) as ctx:
|
|
|
|
click.echo(command.get_help(ctx))
|
|
|
|
click.echo(command.get_help(ctx), err=True)
|
|
|
|
sysexit(1)
|
|
|
|
sysexit(1)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def calculate_digests(streams):
|
|
|
|
|
|
|
|
digests = []
|
|
|
|
|
|
|
|
for stream in streams:
|
|
|
|
|
|
|
|
digests.append(get_digest(stream.stream))
|
|
|
|
|
|
|
|
stream.close()
|
|
|
|
|
|
|
|
return digests
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_digest(stream):
|
|
|
|
def get_digest(stream):
|
|
|
|
# pylint: disable=not-callable
|
|
|
|
# pylint: disable=not-callable
|
|
|
|
hasher = blake3()
|
|
|
|
hasher = blake3()
|
|
|
@ -69,6 +87,36 @@ def get_digest(stream):
|
|
|
|
return hasher.digest(length=DIGEST_SIZE)
|
|
|
|
return hasher.digest(length=DIGEST_SIZE)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def print_output(digests, **kwargs):
|
|
|
|
|
|
|
|
if not kwargs.get('fingerprint'):
|
|
|
|
|
|
|
|
print_identicons(digests)
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
print_fingerprints(digests)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def print_identicons(digests):
|
|
|
|
|
|
|
|
icons = [Identicon(digest).calculate() for digest in digests]
|
|
|
|
|
|
|
|
for i in range(Identicon.HEIGHT+2):
|
|
|
|
|
|
|
|
line = ''
|
|
|
|
|
|
|
|
for i_icon, icon in enumerate(icons):
|
|
|
|
|
|
|
|
if i_icon != 0:
|
|
|
|
|
|
|
|
line += ' '
|
|
|
|
|
|
|
|
line += str(icon).splitlines()[i]
|
|
|
|
|
|
|
|
print(line)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def calculate_spacing(n_icons):
|
|
|
|
|
|
|
|
terminal_width = get_terminal_size().columns
|
|
|
|
|
|
|
|
icons_width = n_icons * (Identicon.WIDTH+2)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return abs(terminal_width - icons_width) // n_icons
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def print_fingerprints(digests):
|
|
|
|
|
|
|
|
for digest in digests:
|
|
|
|
|
|
|
|
print(digest.hex())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
if __name__ == '__main__':
|
|
|
|
main()
|
|
|
|
main()
|
|
|
|