Add support for indetifying whole directory trees using tar

This patch uses a clever invocation of GNU tar to produce a
deterministic bytestream from a directory tree. This stream is fed to a
hash function in 64KiB chunks (== Linux default pipe capacity) to
produce a fingerprint which can be displayed as an identicon.
Why would we do this instead of using the tarfile stdlib package or just
using os.walk plus some code?

The tarfile package is not capable of producing the output as a stream
of N byte chunks. The "most granual" mode of operation it can do is
producing all of the chunks belonging to a given file all at once.
This is problematic, because we could run out of memory or be forced to
write the tar archive to a temporary file - which would be painfully slow,
we could run out of disk space, wear out SSDs or outright refuse to run in a
container with a read-only rootfs and no tmpfs mounted.

An os.walk solution is doable, but will require some problem solving
which I am too lazy to do right now:
  - Forcing os.walk to walk in a deterministic order (should be easy)
  - Walks on different directory structures could theoretically produce
    the same bytestream (doable but requires some thinking)
The GNU tar solution is far from ideal (it forces an external dependency
and requires a subprocess call and some pipe juggling) but is very easy
to implement and should be fine performance wise:
  - The bottleneck on reasonable hardware configurations should
    be hashing or disk IO
  - The cost of doing a fork/exec is negligible compared to either

TL;DR os.walk: maybe in a future patch
This commit is contained in:
Kristóf Tóth 2022-02-13 20:14:02 +01:00
parent ce52ea6e58
commit 3fb80a4394
1 changed files with 64 additions and 14 deletions

78
main.py Normal file → Executable file
View File

@ -1,37 +1,87 @@
from sys import argv, stdin
#!/usr/bin/env python3
from sys import stdin
from sys import exit as sysexit
from hashlib import blake2b
from io import BytesIO
from subprocess import Popen, PIPE
import click
from identicon import Identicon
DIGEST_SIZE = 20
BUF_SIZE = 65536
BUF_SIZE = 65536 # Linux default pipe capacity is 64KiB (64 * 2^10)
def main():
if not (stream := get_input_stream()):
@click.command(
help=(
'Print OpenSSH style randomart identicon for arbitrary data.\n\n'
'If TEXT or --file is not supplied, data is read from STDIN.'
)
)
@click.argument('text', default=None, type=str, required=False)
@click.option(
'--file', '-f', default=None, type=click.Path(exists=True),
help='Calculate from file or directory (recursive).'
)
def main(**kwargs):
if not (stream := get_input_stream(kwargs)):
print_usage_and_exit()
i = Identicon(get_digest(stream))
digest = get_digest(stream.stream)
stream.close()
i = Identicon(digest)
i.calculate()
print(i)
def get_input_stream():
io = None
if len(argv) == 2:
io = BytesIO(argv[1].encode())
def get_input_stream(kwargs):
stream = None
if (text := kwargs["text"]) is not None:
stream = ClosableStream(BytesIO(text.encode()))
elif file := kwargs["file"]:
stream = get_deterministic_tar_stream(file)
elif not stdin.isatty():
io = stdin.buffer
return io
stream = ClosableStream(stdin.buffer)
return stream
class ClosableStream:
def __init__(self, stream, close_func=None):
self.stream = stream
self._close_func = close_func or (lambda: None)
def close(self):
return self._close_func()
def get_deterministic_tar_stream(file):
# pylint: disable=consider-using-with
cmd = (
'tar',
f'--blocking-factor={BUF_SIZE//512}',
'--sort=name',
'--mtime=UTC 1970-01-01',
'--owner=root:0', '--group=root:0', '--numeric-owner',
'--mode=a=rwX', '--no-acls', '--no-xattrs', '--no-selinux',
'-C', file, '-cf', '-', '.'
)
p = Popen(cmd, stdout=PIPE, stderr=PIPE)
def wait_and_check_exitcode():
exit_code = p.wait()
if exit_code != 0:
raise RuntimeError(f"Tar failed: {p.stderr.read().decode()}")
return ClosableStream(p.stdout, wait_and_check_exitcode)
def print_usage_and_exit():
print('Usage: identicon [TEXT]')
print('Print OpenSSH style randomart identicon for arbitrary data.\n')
print('If TEXT is not supplied, data is read from STDIN.')
command = main
with click.Context(command) as ctx:
click.echo(command.get_help(ctx))
sysexit(1)