# Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. import re from subprocess import run, CalledProcessError, PIPE from getpass import getuser from os.path import isdir from datetime import datetime from uuid import uuid4 class SnapshotProvider: def __init__(self, directory, git_dir): self._classname = self.__class__.__name__ author = f'{getuser()} via TFW {self._classname}' self.gitenv = { 'GIT_DIR': git_dir, 'GIT_WORK_TREE': directory, 'GIT_AUTHOR_NAME': author, 'GIT_AUTHOR_EMAIL': '', 'GIT_COMMITTER_NAME': author, 'GIT_COMMITTER_EMAIL': '', 'GIT_PAGER': 'cat' } self._init_repo() self.__last_valid_branch = self._branch def _init_repo(self): self._check_environment() if not self._repo_is_initialized: self._run(('git', 'init')) if self._number_of_commits == 0: try: self._snapshot() except CalledProcessError: raise EnvironmentError(f'{self._classname} cannot init on empty directories!') self._check_head_not_detached() def _check_environment(self): if not isdir(self.gitenv['GIT_DIR']) or not isdir(self.gitenv['GIT_WORK_TREE']): raise EnvironmentError(f'{self._classname}: "directory" and "git_dir" must exist!') @property def _repo_is_initialized(self): return self._run( ('git', 'status'), check=False ).returncode == 0 @property def _number_of_commits(self): return int( self._get_stdout(( 'git', 'rev-list', '--all', '--count' )) ) def _snapshot(self): self._run(( 'git', 'add', '-A' )) self._run(( 'git', 'commit', '-m', 'Snapshot' )) def _check_head_not_detached(self): if self._head_detached: raise EnvironmentError(f'{self._classname} cannot init from detached HEAD state!') @property def _head_detached(self): return self._branch == 'HEAD' @property def _branch(self): return self._get_stdout(( 'git', 'rev-parse', '--abbrev-ref', 'HEAD' )) def _get_stdout(self, *args, **kwargs): kwargs['stdout'] = PIPE kwargs['stderr'] = PIPE stdout_bytes = self._run(*args, **kwargs).stdout return stdout_bytes.decode().rstrip('\n') def _run(self, *args, **kwargs): if 'check' not in kwargs: kwargs['check'] = True if 'env' not in kwargs: kwargs['env'] = self.gitenv return run(*args, **kwargs) def take_snapshot(self): if self._head_detached: self._checkout_new_branch_from_head() self._snapshot() def _checkout_new_branch_from_head(self): branch_name = uuid4() self._run(( 'git', 'branch', branch_name )) self._checkout(branch_name) def _checkout(self, what): self._run(( 'git', 'checkout', what )) def restore_snapshot(self, date): commit = self._get_commit_from_timestamp(date) self._checkout(commit) def _get_commit_from_timestamp(self, date): return self._get_stdout(( 'git', 'rev-list', '--date=iso', '-n', '1', f'--before="{date.isoformat()}"', self._last_valid_branch )) @property def _last_valid_branch(self): if not self._head_detached: self.__last_valid_branch = self._branch return self.__last_valid_branch @property def all_timelines(self): return self._branches @property def _branches(self): git_branch_output = self._get_stdout(('git', 'branch')) regex_pattern = re.compile(r'(?:[^\S\n]|[*])') # matches '*' and non-newline whitespace chars return re.sub(regex_pattern, '', git_branch_output).splitlines() @property def timeline(self): return self._last_valid_branch @timeline.setter def timeline(self, value): self._checkout(value) @property def snapshots(self): return self._pretty_log_branch() def _pretty_log_branch(self): git_log_output = self._get_stdout(( 'git', 'log', '--pretty=%H@%aI' )) commits = [] for line in git_log_output.splitlines(): commit_hash, timestamp = line.split('@') commits.append({ 'hash': commit_hash, 'timestamp': datetime.fromisoformat(timestamp) }) return commits