diff --git a/main.py b/main.py index fbf109f..02c2155 100755 --- a/main.py +++ b/main.py @@ -1,16 +1,13 @@ #!/usr/bin/env python3 -# pylint: disable=consider-using-with -# (this code contains some IO stream juggling) from sys import stdin from sys import exit as sysexit from io import BytesIO -from subprocess import Popen, PIPE -from pathlib import Path import click from blake3 import blake3 from identicon import Identicon +from stream import get_deterministic_stream, ClosableStream DIGEST_SIZE = 20 @@ -52,49 +49,12 @@ def get_input_stream(kwargs): if (text := kwargs['text']) is not None: stream = ClosableStream(BytesIO(text.encode())) elif file := kwargs['file']: - stream = get_deterministic_stream(file) + stream = get_deterministic_stream(file, BUF_SIZE) elif not stdin.isatty(): stream = ClosableStream(stdin.buffer) return stream -class ClosableStream: - def __init__(self, stream, close_func=None): - self.stream = stream - self._close_func = close_func or (lambda: None) - - def close(self): - return self._close_func() - - -def get_deterministic_stream(file): - if Path(file).is_dir(): - return get_deterministic_tar_stream(file) - - ifile = open(file, 'rb') - return ClosableStream(ifile, ifile.close) - - -def get_deterministic_tar_stream(file): - cmd = ( - 'tar', - f'--blocking-factor={BUF_SIZE//512}', - '--sort=name', - '--mtime=UTC 1970-01-01', - '--owner=root:0', '--group=root:0', '--numeric-owner', - '--mode=a=rwX', '--no-acls', '--no-xattrs', '--no-selinux', - '-C', file, '-cf', '-', '.' - ) - p = Popen(cmd, stdout=PIPE, stderr=PIPE) - - def wait_and_check_exitcode(): - exit_code = p.wait() - if exit_code != 0: - raise RuntimeError(f'Tar failed: {p.stderr.read().decode()}') - - return ClosableStream(p.stdout, wait_and_check_exitcode) - - def print_usage_and_exit(): command = main with click.Context(command) as ctx: diff --git a/stream.py b/stream.py new file mode 100644 index 0000000..04b1889 --- /dev/null +++ b/stream.py @@ -0,0 +1,124 @@ +# pylint: disable=consider-using-with +# (this code contains some IO stream juggling) +from subprocess import Popen, PIPE, run, STDOUT +from os import walk +from os.path import abspath, relpath, join +from pathlib import Path +from sys import stderr + + +class ClosableStream: + def __init__(self, stream, close_func=None): + self.stream = stream + self._close_func = close_func or (lambda: None) + + def close(self): + return self._close_func() + + +def get_deterministic_stream(path, buffer_size): + if Path(path).is_dir(): + return _get_directorytree_stream(path, buffer_size) + + ifile = open(path, 'rb') + return ClosableStream(ifile, ifile.close) + + +def _get_directorytree_stream(path, buffer_size): + if _gnu_tar_available(): + return TarDirectorytreeStream(path, buffer_size) + print( + "GNU tar not found, falling back to less performant Python implementation.", + file=stderr + ) + return WalkDirectorytreeStream(path) + + +def _gnu_tar_available(): + proc = run(('tar', '--version'), stdout=PIPE, stderr=STDOUT, check=False) + if proc.returncode != 0: + return False + return 'GNU tar' in proc.stdout.decode() + + +class TarDirectorytreeStream: + def __init__(self, path, buffer_size): + self._path = path + self._buffer_size = buffer_size + self._process = None + + @property + def stream(self): + cmd = ( + 'tar', + f'--blocking-factor={self._buffer_size//512}', + '--sort=name', + '--mtime=UTC 1970-01-01', + '--owner=root:0', '--group=root:0', '--numeric-owner', + '--mode=a=rwX', '--no-acls', '--no-xattrs', '--no-selinux', + '-C', self._path, '-cf', '-', '.' + ) + self._process = Popen(cmd, stdout=PIPE, stderr=PIPE) + + return self._process.stdout + + def close(self): + if self._process is not None: + exit_code = self._process.wait() + if exit_code != 0: + raise RuntimeError(f'Tar failed: {self._process.stderr.read().decode()}') + + +class WalkDirectorytreeStream: + def __init__(self, path): + self._path = path + self.stream = self + self._stream_generator = None + + def read(self, buffer_size): + if self._stream_generator is None: + self._stream_generator = self._stream_all_files(buffer_size) + return next(self._stream_generator, None) + + def _stream_all_files(self, buffer_size): + """ + This method makes sure that the yielded bytestreams between + different invocations are equvivalent if and only if the + input directory trees are equal in structure and contents. + + Problematic edge cases include: + - Different directory trees resulting in the same walk + (i.e. yielding the same files in the same order) + - Directory trees where files are split/merged + Solutions: + - We ensure a deterministic walk order by sorting the + filenames/dirnames + - We yield the relative path[1] of every file before yielding + their contents. These filenames will end up in a hash function + ensuring that different directory structures produce different + fingerprints + + [1]: relative to the directory being streamed + + Note that we use os.path instead of pathlib because: + - Path.resolve() reads the filesystem instead of just + manipulating strings like abspath and friends do + (which makes os.path a lot faster as it tends to avoid disk IO) + - pathlib has no relpath + """ + + for dirpath, dirnames, filenames in walk(self._path, topdown=True): + dirnames.sort() + filenames.sort() + for filename in filenames: + filepath = join(dirpath, filename) + yield self._get_relative_path(filepath).encode() + with open(filepath, 'rb') as ifile: + while data := ifile.read(buffer_size): + yield data + + def _get_relative_path(self, filepath): + return relpath(abspath(filepath), start=abspath(self._path)) + + def close(self): + pass