Python git 模块,GitCommandError() 实例源码

我们从Python开源项目中,提取了以下27个代码示例,用于说明如何使用git.GitCommandError()

项目:poco    作者:shiwaforce    | 项目源码 | 文件源码
def set_branch(self, branch, force=False):
        try:
            if branch not in self.repo.branches and branch == 'master':
                '''Make init commit'''
                self.repo.index.add(["*"])
                self.repo.index.commit("init")
                remote = self.repo.create_remote('master', self.repo.remotes.origin.url)
                remote.push(refspec='{}:{}'.format(self.repo.active_branch, 'master'))
            if self.is_developer_mode():
                return
            if str(self.repo.active_branch) != branch:
                self.repo.git.checkout(branch, force=force)
            else:
                self.pull()
        except git.GitCommandError as exc:
            ColorPrint.exit_after_print_messages(message=exc.stderr)
项目:sql-updater    作者:hive-tools    | 项目源码 | 文件源码
def init_project(self):
        project_dir_git = os.path.join(self._project_dir, 'repo')

        if not self._config['repo']:
            raise Exception('Empty repo configuration')

        create_dir(project_dir_git)
        git_path = os.path.join(project_dir_git, '.git')

        if os.path.exists(git_path):
            repo = Repo(project_dir_git)
            print colored('Pulling', 'green') + ' repo %s' % self._config['repo']
            repo.remotes.origin.pull()
        else:
            try:
                print 'cloning... %s' % self._config['repo']
                Repo.clone_from(self._config['repo'], project_dir_git)
            except GitCommandError as e:
                print 'Repo cannot be found'
项目:Problematica-public    作者:TechMaz    | 项目源码 | 文件源码
def git_pull():
    """ Git Pull handler """
    app = get_app()
    if not have_git:
        session.flash = GIT_MISSING
        redirect(URL('site'))
    dialog = FORM.confirm(T('Pull'),
                          {T('Cancel'): URL('site')})
    if dialog.accepted:
        try:
            repo = git.Repo(os.path.join(apath(r=request), app))
            origin = repo.remotes.origin
            origin.fetch()
            origin.pull()
            session.flash = T("Application updated via git pull")
            redirect(URL('site'))

        except git.CheckoutError:
            session.flash = T("Pull failed, certain files could not be checked out. Check logs for details.")
            redirect(URL('site'))
        except git.UnmergedEntriesError:
            session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
            redirect(URL('site'))
        except git.GitCommandError:
            session.flash = T(
                "Pull failed, git exited abnormally. See logs for details.")
            redirect(URL('site'))
        except AssertionError:
            session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
            redirect(URL('site'))
    elif 'cancel' in request.vars:
        redirect(URL('site'))
    return dict(app=app, dialog=dialog)
项目:got    作者:mrozekma    | 项目源码 | 文件源码
def getCloneURL(self, name):
        if self.clone_url is not None:
            return self.getCloneURLFromPattern(name)

        # Nothing stops 'name' from escaping the path specified by self.url, like '../../../foo'. I can't see a problem with allowing it other than that it's weird, and allowing normal subdirectory traversal could be useful, so not currently putting any restrictions on 'name'
        rtn = f"{self.url}/{name}"
        try:
            oldEnv = dict(os.environ)
            os.environ.update(makeGitEnvironment(self))
            try:
                git.Git().ls_remote(rtn)
            finally:
                os.environ.clear()
                os.environ.update(oldEnv)
        except git.GitCommandError as e:
            err = e.stderr
            # Try to strip off the formatting GitCommandError puts on stderr
            match = re.search("stderr: '(.*)'$", err)
            if match:
                err = match.group(1)
            raise RuntimeError(err)
        return rtn

# Patch stashy's AuthenticationException to print the server's message (mostly for issue #16, detecting a captcha check)
项目:ndeploy    作者:sglebs    | 项目源码 | 文件源码
def git_clone_all(config_as_dict):
    progress = GitProgress()
    for repo_url, branch, app_name, app_props in repo_and_branch_and_app_name_and_app_props_iterator(config_as_dict):
        repo_full_path = get_repo_full_path_for_repo_url(repo_url, config_as_dict)
        if os.path.exists(repo_full_path):
            print("Already cloned %s from %s" % (app_name, repo_url))
            repo = git.Repo(repo_full_path)
            if repo.active_branch.name != branch:
                print("Your local checkout is in a different branch (%s) from the branch you want to deploy (%s). URL: %s" %
                      (repo.active_branch.name, branch, repo_url))
                repo.git.checkout(branch)
            origin = repo.remotes.origin
            #origin.fetch(branch)
            if config_as_dict["gitpull"]:
                origin.pull(branch)
        else:
            os.makedirs(repo_full_path)
            try:
                repo = git.Repo.clone_from(repo_url, repo_full_path, branch=branch, progress=progress)
                print("Cloned: %s" % repo)
            except git.GitCommandError as e:
                print(e)
                return False
项目:poco    作者:shiwaforce    | 项目源码 | 文件源码
def __init__(self, target_dir, url, branch, git_ssh_identity_file=None, force=False):
        super(GitRepository, self).__init__(target_dir)
        self.branch = branch

        try:
            if url is None:
                ColorPrint.exit_after_print_messages(message="GIT URL is empty")
            if git_ssh_identity_file is None:
                git_ssh_identity_file = os.path.expanduser("~/.ssh/id_rsa")
            with git.Git().custom_environment(GIT_SSH=git_ssh_identity_file):
                if not os.path.exists(target_dir) or not os.listdir(target_dir):
                    self.repo = git.Repo.clone_from(url=url, to_path=target_dir)
                else:
                    self.repo = git.Repo(target_dir)
                    old_url = self.repo.remotes.origin.url

                    if not GitRepository.is_same_host(old_url, url):
                        if self.is_developer_mode():
                            ColorPrint.exit_after_print_messages(
                                message="This directory exists with not matching git repository " + target_dir)
                        else:
                            shutil.rmtree(target_dir, onerror=FileUtils.remove_readonly)
                            self.repo = git.Repo.clone_from(url=url, to_path=target_dir)
                self.set_branch(branch=branch, force=force)
        except git.GitCommandError as exc:
            ColorPrint.exit_after_print_messages(message=exc.stderr)
项目:lgsm-python    作者:jaredballou    | 项目源码 | 文件源码
def clone(self, url=None, path=None):
        try:
            git.Repo.clone_from(url, path)
        except git.GitCommandError as exception:
            print(exception)
            if exception.stdout:
                print('!! stdout was:')
                print(exception.stdout)
            if exception.stderr:
                print('!! stderr was:')
                print(exception.stderr)
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def git_pull():
    """ Git Pull handler """
    app = get_app()
    if not have_git:
        session.flash = GIT_MISSING
        redirect(URL('site'))
    dialog = FORM.confirm(T('Pull'),
                          {T('Cancel'): URL('site')})
    if dialog.accepted:
        try:
            repo = git.Repo(os.path.join(apath(r=request), app))
            origin = repo.remotes.origin
            origin.fetch()
            origin.pull()
            session.flash = T("Application updated via git pull")
            redirect(URL('site'))

        except git.CheckoutError:
            session.flash = T("Pull failed, certain files could not be checked out. Check logs for details.")
            redirect(URL('site'))
        except git.UnmergedEntriesError:
            session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
            redirect(URL('site'))
        except git.GitCommandError:
            session.flash = T(
                "Pull failed, git exited abnormally. See logs for details.")
            redirect(URL('site'))
        except AssertionError:
            session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
            redirect(URL('site'))
    elif 'cancel' in request.vars:
        redirect(URL('site'))
    return dict(app=app, dialog=dialog)
项目:airflow-hovercraft    作者:gtoonstra    | 项目源码 | 文件源码
def git_version(version):
    """
    Return a version to identify the state of the underlying git repo. The version will
    indicate whether the head of the current git-backed working directory is tied to a
    release tag or not : it will indicate the former with a 'release:{version}' prefix
    and the latter with a 'dev0' prefix. Following the prefix will be a sha of the current
    branch head. Finally, a "dirty" suffix is appended to indicate that uncommitted changes
    are present.
    """
    repo = None
    try:
        import git
        repo = git.Repo('.git')
    except ImportError:
        logger.warning('gitpython not found: Cannot compute the git version.')
        return ''
    except Exception as e:
        logger.warning('Git repo not found: Cannot compute the git version.')
        return ''
    if repo:
        sha = repo.head.commit.hexsha
        if repo.is_dirty():
            return '.dev0+{sha}.dirty'.format(sha=sha)
        # commit is clean
        # is it release of `version` ?
        try:
            tag = repo.git.describe(
                match='[0-9]*', exact_match=True,
                tags=True, dirty=True)
            assert tag == version, (tag, version)
            return '.release:{version}+{sha}'.format(version=version,
                                                     sha=sha)
        except git.GitCommandError:
            return '.dev0+{sha}'.format(sha=sha)
    else:
        return 'no_git_version'
项目:vj4    作者:vijos    | 项目源码 | 文件源码
def get():
  try:
    return git.Repo(path.dirname(path.dirname(vj4.__file__))).git.describe(always=True, tags=True, dirty=True)
  except (git.InvalidGitRepositoryError, git.GitCommandError) as e:
    _logger.error('Failed to get repository: %s', repr(e))
    return 'unknown'
项目:badwolf    作者:bosondata    | 项目源码 | 文件源码
def test_clone_repo_failed(app, pipeline):
    with mock.patch.object(pipeline, 'clone') as mock_clone, \
            mock.patch.object(pipeline, '_report_git_error') as report_git_error, \
            mock.patch.object(pipeline, 'parse_spec') as mock_spec, \
            mock.patch.object(PullRequest, 'comment') as pr_comment, \
            mock.patch.object(Changesets, 'comment') as cs_comment:
        mock_clone.side_effect = git.GitCommandError('git clone', 1)
        report_git_error.return_value = None
        pr_comment.return_value = None
        cs_comment.return_value = None
        pipeline.start()
        assert report_git_error.called
        mock_spec.assert_not_called()
项目:badwolf    作者:bosondata    | 项目源码 | 文件源码
def start(self):
        '''Start Pipeline'''
        logger.info('Pipeline started for repository %s', self.context.repository)
        try:
            self.clone()
            self.parse_spec()
            exit_code = self.build()
            build_success = exit_code == 0
            self.save_artifacts(build_success)
            if exit_code != 137:
                # 137 means build cancelled
                self.lint()
            if build_success:
                self.deploy()
        except git.GitCommandError as git_err:
            logger.exception('Git command error')
            self._report_git_error(git_err)
        except BitbucketAPIError:
            logger.exception('Error calling BitBucket API')
            sentry.captureException()
        except InvalidSpecification as err:
            self._report_error(':umbrella: Invalid badwolf configuration: ' + str(err))
        except BadwolfException:
            pass
        finally:
            self.clean()
项目:badwolf    作者:bosondata    | 项目源码 | 文件源码
def get_conflicted_files(repo_path):
        gitcmd = git.Git(repo_path)
        try:
            return gitcmd.diff('--name-only', '--diff-filter=U')
        except git.GitCommandError:
            logger.exception('Error get conflicted files by git diff command')
            return None
项目:true_review_web2py    作者:lucadealfaro    | 项目源码 | 文件源码
def git_pull():
    """ Git Pull handler """
    app = get_app()
    if not have_git:
        session.flash = GIT_MISSING
        redirect(URL('site'))
    dialog = FORM.confirm(T('Pull'),
                          {T('Cancel'): URL('site')})
    if dialog.accepted:
        try:
            repo = git.Repo(os.path.join(apath(r=request), app))
            origin = repo.remotes.origin
            origin.fetch()
            origin.pull()
            session.flash = T("Application updated via git pull")
            redirect(URL('site'))

        except git.CheckoutError:
            session.flash = T("Pull failed, certain files could not be checked out. Check logs for details.")
            redirect(URL('site'))
        except git.UnmergedEntriesError:
            session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
            redirect(URL('site'))
        except git.GitCommandError:
            session.flash = T(
                "Pull failed, git exited abnormally. See logs for details.")
            redirect(URL('site'))
        except AssertionError:
            session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
            redirect(URL('site'))
    elif 'cancel' in request.vars:
        redirect(URL('site'))
    return dict(app=app, dialog=dialog)
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def git_version(version):
    """
    Return a version to identify the state of the underlying git repo. The version will
    indicate whether the head of the current git-backed working directory is tied to a
    release tag or not : it will indicate the former with a 'release:{version}' prefix
    and the latter with a 'dev0' prefix. Following the prefix will be a sha of the current
    branch head. Finally, a "dirty" suffix is appended to indicate that uncommitted changes
    are present.
    """
    repo = None
    try:
        import git
        repo = git.Repo('.git')
    except ImportError:
        logger.warning('gitpython not found: Cannot compute the git version.')
        return ''
    except Exception as e:
        logger.warning('Git repo not found: Cannot compute the git version.')
        return ''
    if repo:
        sha = repo.head.commit.hexsha
        if repo.is_dirty():
            return '.dev0+{sha}.dirty'.format(sha=sha)
        # commit is clean
        # is it release of `version` ?
        try:
            tag = repo.git.describe(
                match='[0-9]*', exact_match=True,
                tags=True, dirty=True)
            assert tag == version, (tag, version)
            return '.release:{version}+{sha}'.format(version=version,
                                                     sha=sha)
        except git.GitCommandError:
            return '.dev0+{sha}'.format(sha=sha)
    else:
        return 'no_git_version'
项目:paper-to-git    作者:maxking    | 项目源码 | 文件源码
def commit_changes(self, push=False):
        git_repo = Repo(self.repo)
        git_repo.git.add('content/')
        try:
            git_repo.git.commit('-m', 'Commit added by PaperGit')
            self.last_run = time.time()
            self.save()
        except GitCommandError:
            print('Nothing to commit')
        if push:
            print("Pushing changes to remote")
            git_repo.git.push('origin')
项目:k8s    作者:fiaas    | 项目源码 | 文件源码
def git_repo(self):
        with mock.patch("release.Repo", spec=Repo, spec_set=True) as mock_repo:
            commits = [
                Commit(mock_repo, _h2b("111111"), message="First commit", parents=tuple()),
                Commit(mock_repo, _h2b("222222"), message="Second commit", parents=("111111",)),
                Commit(mock_repo, _h2b("333333"), message="Third commit", parents=("222222",))
            ]
            mock_repo.iter_commits.return_value = commits
            rev_parse_returns = {
                "heads/master": commits[-1],
                PREVIOUS_TAG: TagObject(mock_repo, _h2b("aaaaaa"), object=commits[-2], tag=PREVIOUS_TAG),
                CURRENT_TAG: TagObject(mock_repo, _h2b("bbbbbb"), object=commits[-1], tag=CURRENT_TAG)
            }
            mock_repo.rev_parse.side_effect = lambda x: rev_parse_returns[x]
            mock_repo.git.rev_parse.side_effect = lambda x, **kwargs: x

            def describe(rev=None, **kwargs):
                print("call to describe(%r, %r)" % (rev, kwargs))
                if rev is None:
                    return CURRENT_TAG
                if rev.endswith("^"):
                    if rev.startswith(CURRENT_TAG):
                        return PREVIOUS_TAG
                    raise GitCommandError("describe", "failed")
                raise AssertionError("Test wants to describe something unexpected: rev=%r, kwargs=%r" % (rev, kwargs))

            mock_repo.git.describe.side_effect = describe
            yield mock_repo
项目:k8s    作者:fiaas    | 项目源码 | 文件源码
def current_tag(self):
        if self._current_tag:
            return self._current_tag
        try:
            current_name = self.repo.git.describe(all=True)
            self._current_tag = self.repo.rev_parse(current_name)
            return self._current_tag
        except (BadName, GitCommandError):
            return None
项目:k8s    作者:fiaas    | 项目源码 | 文件源码
def generate_changelog(self):
        """Use the git log to create a changelog with all changes since the previous tag"""
        try:
            previous_name = self.repo.git.describe("{}^".format(self.current_tag), abbrev=0)
            previous_tag = self.repo.rev_parse(previous_name)
        except GitCommandError:
            previous_tag = THE_NULL_COMMIT
        current = self._resolve_tag(self.current_tag)
        previous = self._resolve_tag(previous_tag)
        commit_range = "{}..{}".format(previous, current)
        return [(self._shorten(commit.hexsha), commit.summary) for commit in self.repo.iter_commits(commit_range)
                if len(commit.parents) <= 1]
项目:web3py    作者:web2py    | 项目源码 | 文件源码
def git_pull():
    """ Git Pull handler """
    app = get_app()
    if not have_git:
        session.flash = GIT_MISSING
        redirect(URL('site'))
    dialog = FORM.confirm(T('Pull'),
                          {T('Cancel'): URL('site')})
    if dialog.accepted:
        try:
            repo = git.Repo(os.path.join(apath(r=request), app))
            origin = repo.remotes.origin
            origin.fetch()
            origin.pull()
            session.flash = T("Application updated via git pull")
            redirect(URL('site'))

        except git.CheckoutError:
            session.flash = T("Pull failed, certain files could not be checked out. Check logs for details.")
            redirect(URL('site'))
        except git.UnmergedEntriesError:
            session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
            redirect(URL('site'))
        except git.GitCommandError:
            session.flash = T(
                "Pull failed, git exited abnormally. See logs for details.")
            redirect(URL('site'))
        except AssertionError:
            session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
            redirect(URL('site'))
    elif 'cancel' in request.vars:
        redirect(URL('site'))
    return dict(app=app, dialog=dialog)
项目:astrocats    作者:astrocatalogs    | 项目源码 | 文件源码
def git_clone_all_repos(cat):
    """Perform a 'git clone' for each data repository that doesnt exist.
    """
    log = cat.log
    log.debug("gitter.git_clone_all_repos()")

    all_repos = cat.PATHS.get_all_repo_folders()
    out_repos = cat.PATHS.get_repo_output_folders()
    for repo in all_repos:
        log.info("Repo in: '{}'".format(repo))

        if os.path.isdir(repo):
            log.info("Directory exists.")
        else:
            log.debug("Cloning directory...")
            clone(repo, cat.log, depth=max(cat.args.clone_depth, 1))

        if cat.args.purge_outputs and repo in out_repos:
            for fil in glob(os.path.join(repo, '*.json')):
                os.remove(fil)

        grepo = git.cmd.Git(repo)
        try:
            grepo.status()
        except git.GitCommandError:
            log.error("Repository does not exist!")
            raise

        # Get the initial git SHA
        sha_beg = get_sha(repo)
        log.debug("Current SHA: '{}'".format(sha_beg))

    return
项目:slugiot-client    作者:slugiot    | 项目源码 | 文件源码
def git_pull():
    """ Git Pull handler """
    app = get_app()
    if not have_git:
        session.flash = GIT_MISSING
        redirect(URL('site'))
    dialog = FORM.confirm(T('Pull'),
                          {T('Cancel'): URL('site')})
    if dialog.accepted:
        try:
            repo = git.Repo(os.path.join(apath(r=request), app))
            origin = repo.remotes.origin
            origin.fetch()
            origin.pull()
            session.flash = T("Application updated via git pull")
            redirect(URL('site'))

        except git.CheckoutError:
            session.flash = T("Pull failed, certain files could not be checked out. Check logs for details.")
            redirect(URL('site'))
        except git.UnmergedEntriesError:
            session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
            redirect(URL('site'))
        except git.GitCommandError:
            session.flash = T(
                "Pull failed, git exited abnormally. See logs for details.")
            redirect(URL('site'))
        except AssertionError:
            session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
            redirect(URL('site'))
    elif 'cancel' in request.vars:
        redirect(URL('site'))
    return dict(app=app, dialog=dialog)
项目:airflow    作者:apache-airflow    | 项目源码 | 文件源码
def git_version(version):
    """
    Return a version to identify the state of the underlying git repo. The version will
    indicate whether the head of the current git-backed working directory is tied to a
    release tag or not : it will indicate the former with a 'release:{version}' prefix
    and the latter with a 'dev0' prefix. Following the prefix will be a sha of the current
    branch head. Finally, a "dirty" suffix is appended to indicate that uncommitted changes
    are present.
    """
    repo = None
    try:
        import git
        repo = git.Repo('.git')
    except ImportError:
        logger.warn('gitpython not found: Cannot compute the git version.')
        return ''
    except Exception as e:
        logger.warn('Git repo not found: Cannot compute the git version.')
        return ''
    if repo:
        sha = repo.head.commit.hexsha
        if repo.is_dirty():
            return '.dev0+{sha}.dirty'.format(sha=sha)
        # commit is clean
        # is it release of `version` ?
        try:
            tag = repo.git.describe(
                match='[0-9]*', exact_match=True,
                tags=True, dirty=True)
            assert tag == version, (tag, version)
            return '.release:{version}+{sha}'.format(version=version,
                                                     sha=sha)
        except git.GitCommandError:
            return '.dev0+{sha}'.format(sha=sha)
    else:
        return 'no_git_version'
项目:nokia-deployer    作者:nokia    | 项目源码 | 文件源码
def clone(self, raise_for_error=True):
        try:
            git.Repo.clone_from(self.remote_url, self.local_path)
            return True
        except git.GitCommandError as e:
            if e.status == 128 and not raise_for_error:
                return False
            raise


# Safe operations only, can be used without acquiring a lock first.
# Specialized subclasses with more operations defined are built by using one of the lock functions.
项目:Anubis    作者:KawashiroNitori    | 项目源码 | 文件源码
def get():
    try:
        return git.Repo(path.dirname(path.dirname(anubis.__file__))).git.describe(always=True, dirty=True)
    except (git.InvalidGitRepositoryError, git.GitCommandError) as e:
        _logger.error('Failed to get respository: %s', repr(e))
        return 'unknown'
项目:StuffShare    作者:StuffShare    | 项目源码 | 文件源码
def git_pull():
    """ Git Pull handler """
    app = get_app()
    if not have_git:
        session.flash = GIT_MISSING
        redirect(URL('site'))
    dialog = FORM.confirm(T('Pull'),
                          {T('Cancel'): URL('site')})
    if dialog.accepted:
        try:
            repo = git.Repo(os.path.join(apath(r=request), app))
            origin = repo.remotes.origin
            origin.fetch()
            origin.pull()
            session.flash = T("Application updated via git pull")
            redirect(URL('site'))

        except git.CheckoutError:
            session.flash = T("Pull failed, certain files could not be checked out. Check logs for details.")
            redirect(URL('site'))
        except git.UnmergedEntriesError:
            session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
            redirect(URL('site'))
        except git.GitCommandError:
            session.flash = T(
                "Pull failed, git exited abnormally. See logs for details.")
            redirect(URL('site'))
        except AssertionError:
            session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
            redirect(URL('site'))
    elif 'cancel' in request.vars:
        redirect(URL('site'))
    return dict(app=app, dialog=dialog)
项目:zazu    作者:stopthatcow    | 项目源码 | 文件源码
def test_clone_error(mocker, git_repo):
    mocker.patch('git.Repo.clone_from', side_effect=git.GitCommandError('clone', 'Foo'))
    dir = git_repo.working_tree_dir
    with zazu.util.cd(dir):
        runner = click.testing.CliRunner()
        result = runner.invoke(zazu.cli.cli, ['repo', 'clone', 'http://foo/bar/baz.git'])
        assert result.exit_code != 0
        assert result.exception
    git.Repo.clone_from.assert_called_once()