# pylint: disable=consider-using-with # (this code contains some IO stream juggling) from subprocess import Popen, PIPE, run, STDOUT from os import walk from os.path import abspath, relpath, join from pathlib import Path from sys import stderr class ClosableStream: def __init__(self, stream, close_func=None): self.stream = stream self._close_func = close_func or (lambda: None) def close(self): return self._close_func() def get_deterministic_stream(path, buffer_size): if Path(path).is_dir(): return _get_directorytree_stream(path, buffer_size) ifile = open(path, 'rb') return ClosableStream(ifile, ifile.close) def _get_directorytree_stream(path, buffer_size): if _gnu_tar_available(): return TarDirectorytreeStream(path, buffer_size) print( "GNU tar not found, falling back to less performant Python implementation.", file=stderr ) return WalkDirectorytreeStream(path) def _gnu_tar_available(): proc = run(('tar', '--version'), stdout=PIPE, stderr=STDOUT, check=False) if proc.returncode != 0: return False return 'GNU tar' in proc.stdout.decode() class TarDirectorytreeStream: def __init__(self, path, buffer_size): self._path = path self._buffer_size = buffer_size self._process = None @property def stream(self): cmd = ( 'tar', f'--blocking-factor={self._buffer_size//512}', '--sort=name', '--mtime=UTC 1970-01-01', '--owner=root:0', '--group=root:0', '--numeric-owner', '--mode=a=rwX', '--no-acls', '--no-xattrs', '--no-selinux', '-C', self._path, '-cf', '-', '.' ) self._process = Popen(cmd, stdout=PIPE, stderr=PIPE) return self._process.stdout def close(self): if self._process is not None: exit_code = self._process.wait() if exit_code != 0: raise RuntimeError(f'Tar failed: {self._process.stderr.read().decode()}') class WalkDirectorytreeStream: def __init__(self, path): self._path = path self.stream = self self._stream_generator = None def read(self, buffer_size): if self._stream_generator is None: self._stream_generator = self._stream_all_files(buffer_size) return next(self._stream_generator, None) def _stream_all_files(self, buffer_size): """ This method makes sure that the yielded bytestreams between different invocations are equvivalent if and only if the input directory trees are equal in structure and contents. Problematic edge cases include: - Different directory trees resulting in the same walk (i.e. yielding the same files in the same order) - Directory trees where files are split/merged Solutions: - We ensure a deterministic walk order by sorting the filenames/dirnames - We yield the relative path[1] of every file before yielding their contents. These filenames will end up in a hash function ensuring that different directory structures produce different fingerprints [1]: relative to the directory being streamed Note that we use os.path instead of pathlib because: - Path.resolve() reads the filesystem instead of just manipulating strings like abspath and friends do (which makes os.path a lot faster as it tends to avoid disk IO) - pathlib has no relpath """ for dirpath, dirnames, filenames in walk(self._path, topdown=True): dirnames.sort() filenames.sort() for filename in filenames: filepath = join(dirpath, filename) yield self._get_relative_path(filepath).encode() with open(filepath, 'rb') as ifile: while data := ifile.read(buffer_size): yield data def _get_relative_path(self, filepath): return relpath(abspath(filepath), start=abspath(self._path)) def close(self): pass