TMP
This commit is contained in:
		@@ -1,7 +1,8 @@
 | 
				
			|||||||
#!/usr/bin/env python3
 | 
					#!/usr/bin/env python3
 | 
				
			||||||
from sys import stdin
 | 
					from sys import stdin, stderr
 | 
				
			||||||
from sys import exit as sysexit
 | 
					from sys import exit as sysexit
 | 
				
			||||||
from io import BytesIO
 | 
					from pathlib import Path
 | 
				
			||||||
 | 
					from shutil import get_terminal_size
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import click
 | 
					import click
 | 
				
			||||||
from blake3 import blake3
 | 
					from blake3 import blake3
 | 
				
			||||||
@@ -16,51 +17,68 @@ BUF_SIZE = 65536  # Linux default pipe capacity is 64KiB (64 * 2^10)
 | 
				
			|||||||
@click.command(
 | 
					@click.command(
 | 
				
			||||||
    help=(
 | 
					    help=(
 | 
				
			||||||
        'Generate OpenSSH style randomart identicon for arbitrary data.\n\n'
 | 
					        'Generate OpenSSH style randomart identicon for arbitrary data.\n\n'
 | 
				
			||||||
        'If TEXT or --file is not supplied, data is read from STDIN.'
 | 
					        'If PATHS is not supplied, data is read from STDIN.'
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
@click.argument('text', default=None, type=str, required=False)
 | 
					@click.argument('paths', type=str, nargs=-1)
 | 
				
			||||||
@click.option(
 | 
					 | 
				
			||||||
    '--file', '-f', default=None, type=click.Path(exists=True),
 | 
					 | 
				
			||||||
    help='Calculate from file or directory (recursive).'
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
@click.option(
 | 
					@click.option(
 | 
				
			||||||
    '--fingerprint', '-p', default=False, required=False, is_flag=True,
 | 
					    '--fingerprint', '-p', default=False, required=False, is_flag=True,
 | 
				
			||||||
    help='Print fingerprint instead of identicon.'
 | 
					    help='Print fingerprint instead of identicon.'
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
def main(**kwargs):
 | 
					@click.option(
 | 
				
			||||||
    if not (stream := get_input_stream(kwargs)):
 | 
					    '--tty', '-t', default=False, required=False, is_flag=True,
 | 
				
			||||||
 | 
					    help='Read from STDIN even if it is a TTY.'
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					def main(paths, **kwargs):
 | 
				
			||||||
 | 
					    kwargs['paths'] = paths
 | 
				
			||||||
 | 
					    if not (streams := get_input_streams(kwargs)):
 | 
				
			||||||
        print_usage_and_exit()
 | 
					        print_usage_and_exit()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    digest = get_digest(stream.stream)
 | 
					    digests = calculate_digests(streams)
 | 
				
			||||||
    stream.close()
 | 
					    print_output(digests, **kwargs)
 | 
				
			||||||
 | 
					 | 
				
			||||||
    if not kwargs.get('fingerprint'):
 | 
					 | 
				
			||||||
        i = Identicon(digest)
 | 
					 | 
				
			||||||
        i.calculate()
 | 
					 | 
				
			||||||
        print(i)
 | 
					 | 
				
			||||||
    else:
 | 
					 | 
				
			||||||
        print(digest.hex())
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def get_input_stream(kwargs):
 | 
					def get_input_streams(kwargs):
 | 
				
			||||||
    stream = None
 | 
					    streams = []
 | 
				
			||||||
    if (text := kwargs['text']) is not None:
 | 
					    if paths := kwargs['paths']:
 | 
				
			||||||
        stream = ClosableStream(BytesIO(text.encode()))
 | 
					        streams = get_streams_from(paths)
 | 
				
			||||||
    elif file := kwargs['file']:
 | 
					    elif not stdin.isatty() or kwargs['tty']:
 | 
				
			||||||
        stream = get_deterministic_stream(file, BUF_SIZE)
 | 
					        streams = [ClosableStream(stdin.buffer)]
 | 
				
			||||||
    elif not stdin.isatty():
 | 
					    return streams
 | 
				
			||||||
        stream = ClosableStream(stdin.buffer)
 | 
					
 | 
				
			||||||
    return stream
 | 
					
 | 
				
			||||||
 | 
					def get_streams_from(paths):
 | 
				
			||||||
 | 
					    paths = assert_paths_exist(paths)
 | 
				
			||||||
 | 
					    return [
 | 
				
			||||||
 | 
					        get_deterministic_stream(path, BUF_SIZE)
 | 
				
			||||||
 | 
					        for path in paths
 | 
				
			||||||
 | 
					    ]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def assert_paths_exist(paths):
 | 
				
			||||||
 | 
					    paths = [Path(path) for path in paths]
 | 
				
			||||||
 | 
					    for path in paths:
 | 
				
			||||||
 | 
					        if not path.exists():
 | 
				
			||||||
 | 
					            print(f'No such file or directory: "{path}"', file=stderr)
 | 
				
			||||||
 | 
					            sysexit(1)
 | 
				
			||||||
 | 
					    return paths
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def print_usage_and_exit():
 | 
					def print_usage_and_exit():
 | 
				
			||||||
    command = main
 | 
					    command = main
 | 
				
			||||||
    with click.Context(command) as ctx:
 | 
					    with click.Context(command) as ctx:
 | 
				
			||||||
        click.echo(command.get_help(ctx))
 | 
					        click.echo(command.get_help(ctx), err=True)
 | 
				
			||||||
    sysexit(1)
 | 
					    sysexit(1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def calculate_digests(streams):
 | 
				
			||||||
 | 
					    digests = []
 | 
				
			||||||
 | 
					    for stream in streams:
 | 
				
			||||||
 | 
					        digests.append(get_digest(stream.stream))
 | 
				
			||||||
 | 
					        stream.close()
 | 
				
			||||||
 | 
					    return digests
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def get_digest(stream):
 | 
					def get_digest(stream):
 | 
				
			||||||
    # pylint: disable=not-callable
 | 
					    # pylint: disable=not-callable
 | 
				
			||||||
    hasher = blake3()
 | 
					    hasher = blake3()
 | 
				
			||||||
@@ -69,6 +87,36 @@ def get_digest(stream):
 | 
				
			|||||||
    return hasher.digest(length=DIGEST_SIZE)
 | 
					    return hasher.digest(length=DIGEST_SIZE)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def print_output(digests, **kwargs):
 | 
				
			||||||
 | 
					    if not kwargs.get('fingerprint'):
 | 
				
			||||||
 | 
					        print_identicons(digests)
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        print_fingerprints(digests)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def print_identicons(digests):
 | 
				
			||||||
 | 
					    icons = [Identicon(digest).calculate() for digest in digests]
 | 
				
			||||||
 | 
					    for i in range(Identicon.HEIGHT+2):
 | 
				
			||||||
 | 
					        line = ''
 | 
				
			||||||
 | 
					        for i_icon, icon in enumerate(icons):
 | 
				
			||||||
 | 
					            if i_icon != 0:
 | 
				
			||||||
 | 
					                line += ' '
 | 
				
			||||||
 | 
					            line += str(icon).splitlines()[i]
 | 
				
			||||||
 | 
					        print(line)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def calculate_spacing(n_icons):
 | 
				
			||||||
 | 
					    terminal_width = get_terminal_size().columns
 | 
				
			||||||
 | 
					    icons_width = n_icons * (Identicon.WIDTH+2)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return abs(terminal_width - icons_width) // n_icons
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def print_fingerprints(digests):
 | 
				
			||||||
 | 
					    for digest in digests:
 | 
				
			||||||
 | 
					        print(digest.hex())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
if __name__ == '__main__':
 | 
					if __name__ == '__main__':
 | 
				
			||||||
    main()
 | 
					    main()
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -39,6 +39,7 @@ class Identicon:
 | 
				
			|||||||
        for command in self._get_commands():
 | 
					        for command in self._get_commands():
 | 
				
			||||||
            self._execute(command)
 | 
					            self._execute(command)
 | 
				
			||||||
        self._end_position = self._bishop_position
 | 
					        self._end_position = self._bishop_position
 | 
				
			||||||
 | 
					        return self
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def _get_commands(self):
 | 
					    def _get_commands(self):
 | 
				
			||||||
        commands = []
 | 
					        commands = []
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user