Add setup.py and make cli easily installable via entry_points

This commit is contained in:
2022-02-15 22:38:02 +01:00
parent bde2b0067e
commit 8597e44d3b
4 changed files with 29 additions and 3 deletions
+1
View File
@@ -1 +1,2 @@
from .identicon import Identicon
+75
View File
@@ -0,0 +1,75 @@
#!/usr/bin/env python3
from sys import stdin
from sys import exit as sysexit
from io import BytesIO
import click
from blake3 import blake3
from . import Identicon
from .stream import get_deterministic_stream, ClosableStream
DIGEST_SIZE = 20
BUF_SIZE = 65536 # Linux default pipe capacity is 64KiB (64 * 2^10)
@click.command(
help=(
'Generate OpenSSH style randomart identicon for arbitrary data.\n\n'
'If TEXT or --file is not supplied, data is read from STDIN.'
)
)
@click.argument('text', default=None, type=str, required=False)
@click.option(
'--file', '-f', default=None, type=click.Path(exists=True),
help='Calculate from file or directory (recursive).'
)
@click.option(
'--fingerprint', '-p', default=False, required=False, is_flag=True,
help='Print fingerprint instead of identicon.'
)
def main(**kwargs):
if not (stream := get_input_stream(kwargs)):
print_usage_and_exit()
digest = get_digest(stream.stream)
stream.close()
if not kwargs.get('fingerprint'):
i = Identicon(digest)
i.calculate()
print(i)
else:
print(digest.hex())
def get_input_stream(kwargs):
stream = None
if (text := kwargs['text']) is not None:
stream = ClosableStream(BytesIO(text.encode()))
elif file := kwargs['file']:
stream = get_deterministic_stream(file, BUF_SIZE)
elif not stdin.isatty():
stream = ClosableStream(stdin.buffer)
return stream
def print_usage_and_exit():
command = main
with click.Context(command) as ctx:
click.echo(command.get_help(ctx))
sysexit(1)
def get_digest(stream):
# pylint: disable=not-callable
hasher = blake3()
while data := stream.read(BUF_SIZE):
hasher.update(data)
return hasher.digest(length=DIGEST_SIZE)
if __name__ == '__main__':
main()
+124
View File
@@ -0,0 +1,124 @@
# pylint: disable=consider-using-with
# (this code contains some IO stream juggling)
from subprocess import Popen, PIPE, run, STDOUT
from os import walk
from os.path import abspath, relpath, join
from pathlib import Path
from sys import stderr
class ClosableStream:
def __init__(self, stream, close_func=None):
self.stream = stream
self._close_func = close_func or (lambda: None)
def close(self):
return self._close_func()
def get_deterministic_stream(path, buffer_size):
if Path(path).is_dir():
return _get_directorytree_stream(path, buffer_size)
ifile = open(path, 'rb')
return ClosableStream(ifile, ifile.close)
def _get_directorytree_stream(path, buffer_size):
if _gnu_tar_available():
return TarDirectorytreeStream(path, buffer_size)
print(
"GNU tar not found, falling back to less performant Python implementation.",
file=stderr
)
return WalkDirectorytreeStream(path)
def _gnu_tar_available():
proc = run(('tar', '--version'), stdout=PIPE, stderr=STDOUT, check=False)
if proc.returncode != 0:
return False
return 'GNU tar' in proc.stdout.decode()
class TarDirectorytreeStream:
def __init__(self, path, buffer_size):
self._path = path
self._buffer_size = buffer_size
self._process = None
@property
def stream(self):
cmd = (
'tar',
f'--blocking-factor={self._buffer_size//512}',
'--sort=name',
'--mtime=UTC 1970-01-01',
'--owner=root:0', '--group=root:0', '--numeric-owner',
'--mode=a=rwX', '--no-acls', '--no-xattrs', '--no-selinux',
'-C', self._path, '-cf', '-', '.'
)
self._process = Popen(cmd, stdout=PIPE, stderr=PIPE)
return self._process.stdout
def close(self):
if self._process is not None:
exit_code = self._process.wait()
if exit_code != 0:
raise RuntimeError(f'Tar failed: {self._process.stderr.read().decode()}')
class WalkDirectorytreeStream:
def __init__(self, path):
self._path = path
self.stream = self
self._stream_generator = None
def read(self, buffer_size):
if self._stream_generator is None:
self._stream_generator = self._stream_all_files(buffer_size)
return next(self._stream_generator, None)
def _stream_all_files(self, buffer_size):
"""
This method makes sure that the yielded bytestreams between
different invocations are equvivalent if and only if the
input directory trees are equal in structure and contents.
Problematic edge cases include:
- Different directory trees resulting in the same walk
(i.e. yielding the same files in the same order)
- Directory trees where files are split/merged
Solutions:
- We ensure a deterministic walk order by sorting the
filenames/dirnames
- We yield the relative path[1] of every file before yielding
their contents. These filenames will end up in a hash function
ensuring that different directory structures produce different
fingerprints
[1]: relative to the directory being streamed
Note that we use os.path instead of pathlib because:
- Path.resolve() reads the filesystem instead of just
manipulating strings like abspath and friends do
(which makes os.path a lot faster as it tends to avoid disk IO)
- pathlib has no relpath
"""
for dirpath, dirnames, filenames in walk(self._path, topdown=True):
dirnames.sort()
filenames.sort()
for filename in filenames:
filepath = join(dirpath, filename)
yield self._get_relative_path(filepath).encode()
with open(filepath, 'rb') as ifile:
while data := ifile.read(buffer_size):
yield data
def _get_relative_path(self, filepath):
return relpath(abspath(filepath), start=abspath(self._path))
def close(self):
pass