mirror of
https://github.com/avatao-content/baseimage-tutorial-framework
synced 2025-01-11 08:41:55 +00:00
222 lines
6.0 KiB
Python
222 lines
6.0 KiB
Python
import re
|
|
from subprocess import run, CalledProcessError, PIPE
|
|
from getpass import getuser
|
|
from os.path import isdir
|
|
from os.path import join as joinpath
|
|
from uuid import uuid4
|
|
|
|
from dateutil import parser as dateparser
|
|
|
|
|
|
class SnapshotProvider:
|
|
def __init__(self, directory, git_dir, exclude_unix_patterns=None):
|
|
self._classname = self.__class__.__name__
|
|
author = f'{getuser()} via TFW {self._classname}'
|
|
self.gitenv = {
|
|
'GIT_DIR': git_dir,
|
|
'GIT_WORK_TREE': directory,
|
|
'GIT_AUTHOR_NAME': author,
|
|
'GIT_AUTHOR_EMAIL': '',
|
|
'GIT_COMMITTER_NAME': author,
|
|
'GIT_COMMITTER_EMAIL': '',
|
|
'GIT_PAGER': 'cat'
|
|
}
|
|
|
|
self._init_repo()
|
|
self.__last_valid_branch = self._branch
|
|
if exclude_unix_patterns:
|
|
self.exclude = exclude_unix_patterns
|
|
|
|
def _init_repo(self):
|
|
self._check_environment()
|
|
|
|
if not self._repo_is_initialized:
|
|
self._run(('git', 'init'))
|
|
|
|
if self._number_of_commits == 0:
|
|
try:
|
|
self._snapshot()
|
|
except CalledProcessError:
|
|
raise EnvironmentError(f'{self._classname} cannot init on empty directories!')
|
|
|
|
self._check_head_not_detached()
|
|
|
|
def _check_environment(self):
|
|
if not isdir(self.gitenv['GIT_DIR']) or not isdir(self.gitenv['GIT_WORK_TREE']):
|
|
raise EnvironmentError(f'{self._classname}: "directory" and "git_dir" must exist!')
|
|
|
|
@property
|
|
def _repo_is_initialized(self):
|
|
return self._run(
|
|
('git', 'status'),
|
|
check=False
|
|
).returncode == 0
|
|
|
|
@property
|
|
def _number_of_commits(self):
|
|
return int(
|
|
self._get_stdout((
|
|
'git', 'rev-list',
|
|
'--all',
|
|
'--count'
|
|
))
|
|
)
|
|
|
|
def _snapshot(self):
|
|
self._run((
|
|
'git', 'add',
|
|
'-A'
|
|
))
|
|
try:
|
|
self._get_stdout((
|
|
'git', 'commit',
|
|
'-m', 'Snapshot'
|
|
))
|
|
except CalledProcessError as err:
|
|
if b'nothing to commit, working tree clean' not in err.output:
|
|
raise
|
|
|
|
def _check_head_not_detached(self):
|
|
if self._head_detached:
|
|
raise EnvironmentError(f'{self._classname} cannot init from detached HEAD state!')
|
|
|
|
@property
|
|
def _head_detached(self):
|
|
return self._branch == 'HEAD'
|
|
|
|
@property
|
|
def _branch(self):
|
|
return self._get_stdout((
|
|
'git', 'rev-parse',
|
|
'--abbrev-ref', 'HEAD'
|
|
))
|
|
|
|
def _get_stdout(self, *args, **kwargs):
|
|
kwargs['stdout'] = PIPE
|
|
kwargs['stderr'] = PIPE
|
|
stdout_bytes = self._run(*args, **kwargs).stdout
|
|
return stdout_bytes.decode().rstrip('\n')
|
|
|
|
def _run(self, *args, **kwargs):
|
|
if 'check' not in kwargs:
|
|
kwargs['check'] = True
|
|
if 'env' not in kwargs:
|
|
kwargs['env'] = self.gitenv
|
|
return run(*args, **kwargs)
|
|
|
|
@property
|
|
def exclude(self):
|
|
with open(self._exclude_path, 'r') as ofile:
|
|
return ofile.read()
|
|
|
|
@exclude.setter
|
|
def exclude(self, exclude_patterns):
|
|
with open(self._exclude_path, 'w') as ifile:
|
|
ifile.write('\n'.join(exclude_patterns))
|
|
|
|
@property
|
|
def _exclude_path(self):
|
|
return joinpath(
|
|
self.gitenv['GIT_DIR'],
|
|
'info',
|
|
'exclude'
|
|
)
|
|
|
|
def take_snapshot(self):
|
|
if self._head_detached:
|
|
self._checkout_new_branch_from_head()
|
|
self._snapshot()
|
|
|
|
def _checkout_new_branch_from_head(self):
|
|
branch_name = str(uuid4())
|
|
self._run((
|
|
'git', 'branch',
|
|
branch_name
|
|
))
|
|
self._checkout(branch_name)
|
|
|
|
def _checkout(self, what):
|
|
self._run((
|
|
'git', 'checkout',
|
|
what
|
|
))
|
|
|
|
def restore_snapshot(self, date):
|
|
commit = self._get_commit_from_timestamp(date)
|
|
branch = self._last_valid_branch
|
|
if commit == self._latest_commit_on_branch(branch):
|
|
commit = branch
|
|
self._checkout(commit)
|
|
|
|
def _get_commit_from_timestamp(self, date):
|
|
commit = self._get_stdout((
|
|
'git', 'rev-list',
|
|
'--date=iso',
|
|
'-n', '1',
|
|
f'--before="{date.isoformat()}"',
|
|
self._last_valid_branch
|
|
))
|
|
if not commit:
|
|
commit = self._get_oldest_parent_of_head()
|
|
return commit
|
|
|
|
def _get_oldest_parent_of_head(self):
|
|
return self._get_stdout((
|
|
'git',
|
|
'rev-list',
|
|
'--max-parents=0',
|
|
'HEAD'
|
|
))
|
|
|
|
@property
|
|
def _last_valid_branch(self):
|
|
if not self._head_detached:
|
|
self.__last_valid_branch = self._branch
|
|
return self.__last_valid_branch
|
|
|
|
def _latest_commit_on_branch(self, branch):
|
|
return self._get_stdout((
|
|
'git', 'log',
|
|
'-n', '1',
|
|
'--pretty=format:%H',
|
|
branch
|
|
))
|
|
|
|
@property
|
|
def all_timelines(self):
|
|
return self._branches
|
|
|
|
@property
|
|
def _branches(self):
|
|
git_branch_output = self._get_stdout(('git', 'branch'))
|
|
regex_pattern = re.compile(r'(?:[^\S\n]|[*])') # matches '*' and non-newline whitespace chars
|
|
return re.sub(regex_pattern, '', git_branch_output).splitlines()
|
|
|
|
@property
|
|
def timeline(self):
|
|
return self._last_valid_branch
|
|
|
|
@timeline.setter
|
|
def timeline(self, value):
|
|
self._checkout(value)
|
|
|
|
@property
|
|
def snapshots(self):
|
|
return self._pretty_log_branch()
|
|
|
|
def _pretty_log_branch(self):
|
|
git_log_output = self._get_stdout((
|
|
'git', 'log',
|
|
'--pretty=%H@%aI'
|
|
))
|
|
|
|
commits = []
|
|
for line in git_log_output.splitlines():
|
|
commit_hash, timestamp = line.split('@')
|
|
commits.append({
|
|
'hash': commit_hash,
|
|
'timestamp': dateparser.parse(timestamp)
|
|
})
|
|
|
|
return commits
|