import re from subprocess import run, CalledProcessError, PIPE from getpass import getuser from os.path import isdir from os.path import join as joinpath from uuid import uuid4 from dateutil import parser as dateparser class SnapshotProvider: def __init__(self, directory, git_dir, exclude_unix_patterns=None): self._classname = self.__class__.__name__ author = f'{getuser()} via TFW {self._classname}' self.gitenv = { 'GIT_DIR': git_dir, 'GIT_WORK_TREE': directory, 'GIT_AUTHOR_NAME': author, 'GIT_AUTHOR_EMAIL': '', 'GIT_COMMITTER_NAME': author, 'GIT_COMMITTER_EMAIL': '', 'GIT_PAGER': 'cat' } self._init_repo() self.__last_valid_branch = self._branch if exclude_unix_patterns: self.exclude = exclude_unix_patterns def _init_repo(self): self._check_environment() if not self._repo_is_initialized: self._run(('git', 'init')) if self._number_of_commits == 0: try: self._snapshot() except CalledProcessError: raise EnvironmentError(f'{self._classname} cannot init on empty directories!') self._check_head_not_detached() def _check_environment(self): if not isdir(self.gitenv['GIT_DIR']) or not isdir(self.gitenv['GIT_WORK_TREE']): raise EnvironmentError(f'{self._classname}: "directory" and "git_dir" must exist!') @property def _repo_is_initialized(self): return self._run( ('git', 'status'), check=False ).returncode == 0 @property def _number_of_commits(self): return int( self._get_stdout(( 'git', 'rev-list', '--all', '--count' )) ) def _snapshot(self): self._run(( 'git', 'add', '-A' )) try: self._get_stdout(( 'git', 'commit', '-m', 'Snapshot' )) except CalledProcessError as err: if b'nothing to commit, working tree clean' not in err.output: raise def _check_head_not_detached(self): if self._head_detached: raise EnvironmentError(f'{self._classname} cannot init from detached HEAD state!') @property def _head_detached(self): return self._branch == 'HEAD' @property def _branch(self): return self._get_stdout(( 'git', 'rev-parse', '--abbrev-ref', 'HEAD' )) def _get_stdout(self, *args, **kwargs): kwargs['stdout'] = PIPE kwargs['stderr'] = PIPE stdout_bytes = self._run(*args, **kwargs).stdout return stdout_bytes.decode().rstrip('\n') def _run(self, *args, **kwargs): if 'check' not in kwargs: kwargs['check'] = True if 'env' not in kwargs: kwargs['env'] = self.gitenv return run(*args, **kwargs) @property def exclude(self): with open(self._exclude_path, 'r') as ofile: return ofile.read() @exclude.setter def exclude(self, exclude_patterns): with open(self._exclude_path, 'w') as ifile: ifile.write('\n'.join(exclude_patterns)) @property def _exclude_path(self): return joinpath( self.gitenv['GIT_DIR'], 'info', 'exclude' ) def take_snapshot(self): if self._head_detached: self._checkout_new_branch_from_head() self._snapshot() def _checkout_new_branch_from_head(self): branch_name = str(uuid4()) self._run(( 'git', 'branch', branch_name )) self._checkout(branch_name) def _checkout(self, what): self._run(( 'git', 'checkout', what )) def restore_snapshot(self, date): commit = self._get_commit_from_timestamp(date) branch = self._last_valid_branch if commit == self._latest_commit_on_branch(branch): commit = branch self._checkout(commit) def _get_commit_from_timestamp(self, date): commit = self._get_stdout(( 'git', 'rev-list', '--date=iso', '-n', '1', f'--before="{date.isoformat()}"', self._last_valid_branch )) if not commit: commit = self._get_oldest_parent_of_head() return commit def _get_oldest_parent_of_head(self): return self._get_stdout(( 'git', 'rev-list', '--max-parents=0', 'HEAD' )) @property def _last_valid_branch(self): if not self._head_detached: self.__last_valid_branch = self._branch return self.__last_valid_branch def _latest_commit_on_branch(self, branch): return self._get_stdout(( 'git', 'log', '-n', '1', '--pretty=format:%H', branch )) @property def all_timelines(self): return self._branches @property def _branches(self): git_branch_output = self._get_stdout(('git', 'branch')) regex_pattern = re.compile(r'(?:[^\S\n]|[*])') # matches '*' and non-newline whitespace chars return re.sub(regex_pattern, '', git_branch_output).splitlines() @property def timeline(self): return self._last_valid_branch @timeline.setter def timeline(self, value): self._checkout(value) @property def snapshots(self): return self._pretty_log_branch() def _pretty_log_branch(self): git_log_output = self._get_stdout(( 'git', 'log', '--pretty=%H@%aI' )) commits = [] for line in git_log_output.splitlines(): commit_hash, timestamp = line.split('@') commits.append({ 'hash': commit_hash, 'timestamp': dateparser.parse(timestamp) }) return commits