# Copyright (C) 2018 Avatao.com Innovative Learning Kft. # All Rights Reserved. See LICENSE file for details. import re from subprocess import run from getpass import getuser from os.path import isdir from datetime import datetime class SnapshotProvider: def __init__(self, directory, git_dir): author = f'{getuser()} via TFW {self.__class__.__name__}' self.gitenv = { 'GIT_DIR': git_dir, 'GIT_WORK_TREE': directory, 'GIT_AUTHOR_NAME': author, 'GIT_AUTHOR_EMAIL': '', 'GIT_COMMITTER_NAME': author, 'GIT_COMMITTER_EMAIL': '', 'GIT_PAGER': 'cat' } self._check_environment() self.__last_valid_branch = self._branch self._init_repo_if_needed() def _check_environment(self): if not isdir(self.gitenv['GIT_DIR']) or not isdir(self.gitenv['GIT_WORK_TREE']): raise EnvironmentError('Directories "directory" and "git_dir" must exist!') if self._head_detached: raise EnvironmentError(f'{self.__class__.__name__} cannot init from detached HEAD state!') def _init_repo_if_needed(self): if not self._repo_is_initialized(): self._run(('git', 'init')) def _repo_is_initialized(self): return self._run(('git', 'status')).returncode == 0 def take_snapshot(self): if self._head_detached: self._checkout_new_branch_from_head() self._run(( 'git', 'add', '-A' )) self._run(( 'git', 'commit', '-m', 'Snapshot' )) def _checkout_new_branch_from_head(self): head_hash = self._get_head_hash() self._run(( 'git', 'branch', head_hash )) self._checkout(head_hash) def _get_head_hash(self): return self._get_stdout(( 'git', 'rev-parse', 'HEAD' )) def restore_snapshot(self, date): commit = self._get_commit_from_timestamp(date) self._checkout(commit) def _checkout(self, what): self._run(( 'git', 'checkout', what )) self._update_last_valid_branch() def _get_commit_from_timestamp(self, date): return self._get_stdout(( 'git', 'rev-list', '--date=iso', '-n', '1', f'--before="{date.isoformat()}"', self._last_valid_branch )) def _get_stdout(self, *args, **kwargs): kwargs['capture_output'] = True stdout_bytes = self._run(*args, **kwargs).stdout return stdout_bytes.decode().rstrip('\n') def _run(self, *args, **kwargs): kwargs['check'] = True if 'env' not in kwargs: kwargs['env'] = self.gitenv return run(*args, **kwargs) @property def all_timelines(self): return self._branches @property def _branches(self): git_branch_output = self._get_stdout(('git', 'branch')) regex_pattern = re.compile(r'(?:[^\S\n]|[*])') # matches '*' and non-newline whitespace chars return re.sub(regex_pattern, '', git_branch_output).splitlines() @property def timeline(self): return self._branch @property def _branch(self): return self._get_stdout(( 'git', 'rev-parse', '--abbrev-ref', 'HEAD' )) @property def _last_valid_branch(self): self._update_last_valid_branch() return self.__last_valid_branch def _update_last_valid_branch(self): if not self._head_detached: self.__last_valid_branch = self._branch @property def _head_detached(self): return self._branch == 'HEAD' @timeline.setter def timeline(self, value): self._checkout(value) @property def snapshots(self): return self._pretty_log_branch() def _pretty_log_branch(self): git_log_output = self._get_stdout(( 'git', 'log', '--pretty=%H@%aI' )) commits = [] for line in git_log_output.splitlines(): commit_hash, timestamp = line.split('@') commits.append({ 'hash': commit_hash, 'timestamp': datetime.fromisoformat(timestamp) }) return commits