Python git 模块,Git() 实例源码

我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用git.Git()

项目:scm-workbench    作者:barry-scott    | 项目源码 | 文件源码
def setupTableContextMenu( self, m, addMenu ):
        super().setupTableContextMenu( m, addMenu )

        act = self.ui_actions

        m.addSection( T_('Status') )
        addMenu( m, T_('Diff HEAD vs. Working'), act.tableActionGitDiffHeadVsWorking, act.enablerGitDiffHeadVsWorking, 'toolbar_images/diff.png' )
        addMenu( m, T_('Diff HEAD vs. Staged'), act.tableActionGitDiffHeadVsStaged, act.enablerGitDiffHeadVsStaged, 'toolbar_images/diff.png' )
        addMenu( m, T_('Diff Staged vs. Working'), act.tableActionGitDiffStagedVsWorking, act.enablerGitDiffStagedVsWorking, 'toolbar_images/diff.png' )
        m.addSeparator()
        addMenu( m, T_('Annotate'), act.tableActionGitAnnotate_Bg, act.enablerTableGitAnnotate )

        m.addSection( T_('Git Actions') )
        addMenu( m, T_('Stage'), act.tableActionGitStage_Bg, act.enablerGitFilesStage, 'toolbar_images/include.png' )
        addMenu( m, T_('Unstage'), act.tableActionGitUnstage_Bg, act.enablerGitFilesUnstage, 'toolbar_images/exclude.png' )
        addMenu( m, T_('Revert'), act.tableActionGitRevert_Bg, act.enablerGitFilesRevert, 'toolbar_images/revert.png' )
        m.addSeparator()
        addMenu( m, T_('Rename…'), act.tableActionGitRename_Bg, self.main_window.table_view.enablerTableFilesExists )
        addMenu( m, T_('Delete…'), act.tableActionGitDelete_Bg, self.main_window.table_view.enablerTableFilesExists )
项目:pyt    作者:python-security    | 项目源码 | 文件源码
def clone(self):
        """Clone repo and update path to match the current one"""

        r = self.URL.split('/')[-1].split('.')
        if len(r) > 1:
            self.directory = '.'.join(r[0:-1])
        else:
            self.directory = r[0]

        if self.directory not in os.listdir():
            git.Git().clone(self.URL)

        if self.path is None:
            self._find_entry_path()
        elif self.path[0] == '/':
            self.path = self.path[1:]
            self.path = os.path.join(self.directory, self.path)
        else:
            self.path = os.path.join(self.directory, self.path)
项目:skybeard-2    作者:LanceMaverick    | 项目源码 | 文件源码
def download_beard(beard_name, upgrade):
    beard_details = find_record(beard_name)
    git_ = git.Git("beard_cache")
    print("Attempting to clone from {}...".format(beard_details['git_url']))
    try:
        repo_dir = join("beard_cache", beard_name)
        os.makedirs(repo_dir)
        repo = git.Repo()
        repo.clone_from(beard_details['git_url'], repo_dir)
        print("Done!")
    except FileExistsError:
        repo = git.Repo("beard_cache/{}".format(beard_name))
        if upgrade:
            print("Updating repo")
            # Should be origin, since no-one should add another remote.
            repo.remotes.origin.pull()
            print("Done!")
        else:
            print("Repo already exists. Nothing to do.")
项目:armada    作者:att-comdev    | 项目源码 | 文件源码
def git_clone(repo_url, ref='master'):
    '''
    :params repo_url - URL of git repo to clone
    :params ref - branch, commit or reference in the repo to clone

    Returns a path to the cloned repo
    '''

    if repo_url == '':
        raise source_exceptions.GitLocationException(repo_url)

    os.environ['GIT_TERMINAL_PROMPT'] = '0'
    _tmp_dir = tempfile.mkdtemp(prefix='armada')

    try:
        repo = Repo.clone_from(repo_url, _tmp_dir)
        repo.remotes.origin.fetch(ref)
        g = Git(repo.working_dir)
        g.checkout('FETCH_HEAD')
    except Exception:
        raise source_exceptions.GitLocationException(repo_url)

    return _tmp_dir
项目:got    作者:mrozekma    | 项目源码 | 文件源码
def getCloneURL(self, name):
        if self.clone_url is not None:
            return self.getCloneURLFromPattern(name)

        # Nothing stops 'name' from escaping the path specified by self.url, like '../../../foo'. I can't see a problem with allowing it other than that it's weird, and allowing normal subdirectory traversal could be useful, so not currently putting any restrictions on 'name'
        rtn = f"{self.url}/{name}"
        try:
            oldEnv = dict(os.environ)
            os.environ.update(makeGitEnvironment(self))
            try:
                git.Git().ls_remote(rtn)
            finally:
                os.environ.clear()
                os.environ.update(oldEnv)
        except git.GitCommandError as e:
            err = e.stderr
            # Try to strip off the formatting GitCommandError puts on stderr
            match = re.search("stderr: '(.*)'$", err)
            if match:
                err = match.group(1)
            raise RuntimeError(err)
        return rtn

# Patch stashy's AuthenticationException to print the server's message (mostly for issue #16, detecting a captcha check)
项目:nautilus-git    作者:bil-elmoussaoui    | 项目源码 | 文件源码
def __init__(self, git_uri, window):
        self._window = window
        self._git = Git(git_uri)
        self._watchdog = WatchDog(self._git.dir)
        self._watchdog.connect("refresh", self._refresh)

        self._builder = Gtk.Builder()

        self._builder.add_from_resource('/com/nautilus/git/ui/location.ui')
        self._builder.connect_signals({
            "open_remote_clicked": self._open_remote_browser,
            "compare_commits_clicked": self._compare_commits,
            "popover_clicked": self._trigger_popover,
            "branch_clicked": self._update_branch
        })
        self._build_widgets()
项目:PluginManager    作者:mandeeps708    | 项目源码 | 文件源码
def install(self, plugin):
        "Installs a GitHub plugin"

        print("Installing...", plugin.name)
        import git

        # Clone the GitHub repository via the URL.
        # git.Git().clone(str(plugin.baseurl), install_dir)

        # Checks if the plugin installation path already exists.
        if not self.isInstalled(plugin):
            """Clone the GitHub repository via Plugin URL to install_dir and
            with depth=1 (shallow clone).
            """
            git.Repo.clone_from(plugin.baseurl, self.install_dir, depth=1)
            print("Done!")
            return True

        else:
            print("Plugin already installed!")
            return False
项目:Githeat    作者:AmmsA    | 项目源码 | 文件源码
def main(argv=None):
    """ Execute the application.

    Arguments are taken from sys.argv by default.

    """
    args = _cmdline(argv)
    logger.start(args.logging_level)
    logger.info("executing githeat")

    try:
        g = Git(os.getcwd())
    except (InvalidGitRepositoryError, GitCommandError, GitCommandNotFound):
        print("Are you sure you're in an initialized git directory?")
        return 0

    githeat = Githeat(g, **vars(args))
    githeat.run()

    logger.info("successful completion")
    return 0

# Make it executable.
项目:openedoo    作者:openedoo    | 项目源码 | 文件源码
def install_git_(url=None):
    os.chdir('modules/')
    if url == None:
        return "your url is None"
    try:
        git.Git().clone(url)
        message = {'message':'your modul had installed'}
        os.chdir('..')
    except Exception:
        message = {"message":"install failed"}
        os.chdir('..')
    return message
项目:poco    作者:shiwaforce    | 项目源码 | 文件源码
def __init__(self, target_dir, url, branch, git_ssh_identity_file=None, force=False):
        super(GitRepository, self).__init__(target_dir)
        self.branch = branch

        try:
            if url is None:
                ColorPrint.exit_after_print_messages(message="GIT URL is empty")
            if git_ssh_identity_file is None:
                git_ssh_identity_file = os.path.expanduser("~/.ssh/id_rsa")
            with git.Git().custom_environment(GIT_SSH=git_ssh_identity_file):
                if not os.path.exists(target_dir) or not os.listdir(target_dir):
                    self.repo = git.Repo.clone_from(url=url, to_path=target_dir)
                else:
                    self.repo = git.Repo(target_dir)
                    old_url = self.repo.remotes.origin.url

                    if not GitRepository.is_same_host(old_url, url):
                        if self.is_developer_mode():
                            ColorPrint.exit_after_print_messages(
                                message="This directory exists with not matching git repository " + target_dir)
                        else:
                            shutil.rmtree(target_dir, onerror=FileUtils.remove_readonly)
                            self.repo = git.Repo.clone_from(url=url, to_path=target_dir)
                self.set_branch(branch=branch, force=force)
        except git.GitCommandError as exc:
            ColorPrint.exit_after_print_messages(message=exc.stderr)
项目:scm-workbench    作者:barry-scott    | 项目源码 | 文件源码
def createProject( self, project ):
        if shutil.which( git.Git.GIT_PYTHON_GIT_EXECUTABLE ) is None:
            self.app.log.error( '"git" command line tool not found' )
            return None

        try:
            return wb_git_project.GitProject( self.app, project, self )

        except git.exc.InvalidGitRepositoryError as e:
            self.log.error( 'Failed to add Git repo %r' % (project.path,) )
            self.log.error( 'Git error: %s' % (e,) )
            return None

    #------------------------------------------------------------
项目:scm-workbench    作者:barry-scott    | 项目源码 | 文件源码
def addProjectPreInitWizardHandler( self, name, url, wc_path ):
        self.log.infoheader( 'Initialise Git repository in %s' % (wc_path,) )
        self.setStatusAction( T_('Clone %(project)s') %
                                    {'project': name} )

    # runs on the background thread
项目:scm-workbench    作者:barry-scott    | 项目源码 | 文件源码
def addProjectPreCloneWizardHandler( self, name, url, wc_path ):
        self.log.infoheader( T_('Cloning Git repository %(url)s into %(path)s') %
                                    {'url': url, 'path': wc_path} )
        self.setStatusAction( T_('Clone %(project)s') %
                                    {'project': name} )

    # runs on the background thread
项目:scm-workbench    作者:barry-scott    | 项目源码 | 文件源码
def setTopWindow( self, top_window ):
        super().setTopWindow( top_window )

        tm = self.table_view.table_model
        self.all_visible_table_columns = (tm.col_staged, tm.col_status, tm.col_name, tm.col_date)

        prefs = self.app.prefs.git
        if prefs.program is not None:
            git.Git.GIT_PYTHON_GIT_EXECUTABLE = str(prefs.program)

        self.log.info( 'Git using program %s' % (shutil.which( git.Git.GIT_PYTHON_GIT_EXECUTABLE ),) )

        wb_git_project.initCallbackServer( self.app )
        wb_git_project.setCallbackCredentialsHandler( self.getGitCredentials )
项目:scm-workbench    作者:barry-scott    | 项目源码 | 文件源码
def about( self ):
        if shutil.which( git.Git.GIT_PYTHON_GIT_EXECUTABLE ) is None:
            git_ver = '"git" command line tool not found'

        else:
            git_ver = 'git %d.%d.%d' % git.Git().version_info

        return  ['GitPython %s' % (git.__version__,)
                ,git_ver]
项目:scm-workbench    作者:barry-scott    | 项目源码 | 文件源码
def setupToolBarAtLeft( self, addToolBar, addTool ):
        t = addToolBar( T_('git logo'), style='font-size: 20pt; width: 40px; color: #cc0000' )
        self.all_toolbars.append( t )

        addTool( t, 'Git', self.main_window.projectActionSettings )
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def lambda_handler(event, context):
    print "event.dump = " + json.dumps(event)
    responseData = {}
    # If not valid cloudformation custom resource call
    try:
        git.Git().clone(event["ResourceProperties"]["git_url"])

        DIR_NAME = "tmp/git"
        REMOTE_URL = event["ResourceProperties"]["git_url"]

        if os.path.isdir(DIR_NAME):
            shutil.rmtree(DIR_NAME)

        os.mkdir(DIR_NAME)

        repo = git.Repo.init(DIR_NAME)
        origin = repo.create_remote('origin',REMOTE_URL)
        origin.fetch()
        origin.pull(origin.refs[0].remote_head)

        print "---- DONE ----"

# Foreach Object in the cloned folder, upload to s3 cloned folder.
        for filename in os.listdir('DIR_NAME'):
            buffer+= open(filename, 'rU').read()
            s3.Bucket(event["ResourceProperties"]["bucket_name"]).put_object(Key=filename, Body=buffer)
            cfnresponse.send(event, context, cfnresponse.SUCCESS, responseData, ".zip pulled to S3 Bucket!")
    except Exception:
        cfnresponse.send(event, context, cfnresponse.FAILED, responseData, "Bucket Name and Key are all required.")
        print "ERROR"
项目:pentagon    作者:reactiveops    | 项目源码 | 文件源码
def __git_init(self):
        """ Initialize git repository in the project infrastructure path """
        if self._git_repo:
            return Git().clone(self._git_repo, self._repository_directory)
        else:
            return Repo.init(self._repository_directory)
项目:badwolf    作者:bosondata    | 项目源码 | 文件源码
def clone(self):
        clone_path = self.context.clone_path
        source_repo = self.context.source['repository']['full_name']
        if self.context.clone_depth > 0:
            # Use shallow clone to speed up
            clone_kwargs = dict(
                depth=self.context.clone_depth,
                branch=self.context.source['branch']['name']
            )
            if self.context.type == 'commit':
                # ci retry on commit
                clone_kwargs['no_single_branch'] = True
            bitbucket.clone(
                source_repo,
                clone_path,
                **clone_kwargs
            )
        else:
            # Full clone for ci retry in single commit
            bitbucket.clone(source_repo, clone_path)

        gitcmd = git.Git(clone_path)
        if self.context.target:
            self._merge_pull_request(gitcmd)
        else:
            # Push to branch or ci retry comment on some commit
            if not self.is_commit_exists(gitcmd, self.commit_hash):
                logger.info('Unshallowing a shallow cloned repository')
                output = gitcmd.fetch('--unshallow')
                logger.info('%s', output)
            logger.info('Checkout commit %s', self.commit_hash)
            gitcmd.checkout(self.commit_hash)

        gitmodules = os.path.join(clone_path, '.gitmodules')
        if os.path.exists(gitmodules):
            output = gitcmd.submodule('update', '--init', '--recursive')
            logger.info('%s', output)
项目:badwolf    作者:bosondata    | 项目源码 | 文件源码
def get_conflicted_files(repo_path):
        gitcmd = git.Git(repo_path)
        try:
            return gitcmd.diff('--name-only', '--diff-filter=U')
        except git.GitCommandError:
            logger.exception('Error get conflicted files by git diff command')
            return None
项目:badwolf    作者:bosondata    | 项目源码 | 文件源码
def clone_repository(self, full_name, path, **kwargs):
        clone_url = self.get_git_url(full_name)
        return git.Git().clone(clone_url, path, **kwargs)
项目:badwolf    作者:bosondata    | 项目源码 | 文件源码
def clone_repository(self, full_name, path, **kwargs):
        clone_url = self.get_git_url(full_name)
        return git.Git().clone(clone_url, path, **kwargs)
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def load_package_by_file(self):
        """
        ???? ??? ??

        :return:
        """
        # ?? ??? ?? ??
        # 1. repository_path ? ?? ???? zip??? ??? ??? ??? ??? ??, Git ????? ?? ??
        # 2. repository_path Remote Repository ?? pull ? ?? ? ??? pull? ??
        # 3. exception? ????, ???? ?? ???.
        # ????? ??? ?? ???? ???? ?? ???? ????? ????
        # ?????? ??? ?? ??
        # ?????? ????? ???? ?? ???? (?/..?) ? _ ? ????.

        score_package_file = get_valid_filename(self.__score_package)
        logging.debug(self.__repository_path)
        package_file = osp.join(self.__repository_path, score_package_file+".zip")
        logging.debug('load_package_by_file '+str(package_file))
        if osp.isfile(package_file):
            package_zip = zipfile.ZipFile(package_file, 'r')
            # file exists
            logging.debug("load local package file and package file exist")
            # ??? ??? ????? ??? ?? ???.
            # ??? ???? ??
            os.makedirs(self.__package_path, exist_ok=True)
            # ?? ?? ??

            package_zip.extractall(self.__package_path)
            package_zip.close()
            # ????? ??? ????? ?? ???.
            return self.load_package_by_local_repository()
        else:
            return False
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def load_package_by_remote(self):
        """
        ??? ??????? ???? ??

        :return:
        """
        # socore package clone from remote repository
        repository_url = self.__score_base + ':' + self.__score_package + '.git'

        # Repository Key ? ?? ?? ???.
        logging.debug("git Path :"+self.__package_path)
        # repo = Repo(str(self.__package_path))
        git = Git(os.getcwd())
        # check deploy key
        if os.path.exists(conf.DEFAULT_SCORE_REPOSITORY_KEY):
            st = os.stat(conf.DEFAULT_SCORE_REPOSITORY_KEY)
            # owner read only
            if bool(st.st_mode & stat.S_IRGRP or st.st_mode & stat.S_IROTH):
                os.chmod(conf.DEFAULT_SCORE_REPOSITORY_KEY, 0o600)
            ssh_cmd = 'ssh -o StrictHostKeyChecking=no -i '+conf.DEFAULT_SCORE_REPOSITORY_KEY
            logging.debug("SSH KEY COMMAND : "+ssh_cmd)
            git.custom_environment(GIT_SSH_COMMAND=ssh_cmd)

        logging.debug(f"load_package_by_remote repository_url({repository_url}) package_path({self.__package_path})")
        self.__package_repository = Repo._clone(git, repository_url, self.__package_path, GitCmdObjectDB, None)
        logging.debug(f"load_package_by_remote result({self.__package_repository})")

        if conf.DEFAULT_SCORE_BRANCH != conf.DEFAULT_SCORE_BRANCH_MASTER:
            self.__package_repository.git.checkout(conf.DEFAULT_SCORE_BRANCH)

        return True
项目:craftbeerpi3    作者:Manuel83    | 项目源码 | 文件源码
def checkout_tag(self,name):
        repo = Repo('./')
        repo.git.reset('--hard')
        o = repo.remotes.origin
        o.fetch()
        g = Git('./')
        g.checkout(name)
        cbpi.notify("Checkout successful", "Please restart the system")
        return ('', 204)
项目:ExtensionCrawler    作者:logicalhacking    | 项目源码 | 文件源码
def get_add_date(git_path, filename):
    """Method for getting the initial add/commit date of a file."""
    try:
        gitobj = git.Git(git_path)
        add_date_string = gitobj.log("--follow", "--format=%aD", "--reverse",
                                     filename).splitlines()[0]
        del gitobj
        gc.collect()
        logging.info(filename + " was added on " + add_date_string)
        return dateutil.parser.parse(add_date_string)
    except Exception as e:
        logging.debug("Exception during git log for " + filename + ":\n" +
                      (str(e)))
        return None
项目:ExtensionCrawler    作者:logicalhacking    | 项目源码 | 文件源码
def hackish_pull_list_changed_files(git_path):
    """Pull new updates from remote origin (hack, using git binary -
       faster but not as safe as GitPython)."""
    git_repo = git.Repo(git_path)
    logging.info(" HEAD: " + str(git_repo.head.commit))
    logging.info("   is detached: " + str(git_repo.head.is_detached))
    logging.info("   is dirty: " + str(git_repo.is_dirty()))
    if git_repo.head.is_detached:
        raise Exception("Detached head")
    if git_repo.is_dirty():
        raise Exception("Dirty repository")
    del git_repo
    gc.collect()

    files = set()
    git_obj = git.Git(git_path)
    pull_lines = git_obj.pull().splitlines()
    del git_obj
    gc.collect()

    for line in pull_lines:
        match = re.search(r'^ (.+) \| .*$', line)
        if not match is None:
            changed_files = match.group(1).split('=>')
            for changed_file in changed_files:
                files.add(changed_file.strip())
    return list(files)
项目:SSMA    作者:secrary    | 项目源码 | 文件源码
def download_yara_rules_git():
    git.Git().clone("https://github.com/Yara-Rules/rules")
项目:package-manager    作者:bro    | 项目源码 | 文件源码
def git_clone_shallow(git_url, dst_path):
    git.Git().clone(git_url, dst_path, '--no-single-branch', depth=1)
    rval = git.Repo(dst_path)
    rval.git.fetch(tags=True)
    return rval
项目:nautilus-git    作者:bil-elmoussaoui    | 项目源码 | 文件源码
def __init__(self, git_uri):
        self._git = Git(git_uri)
        self._watchdog = WatchDog(self._git.dir)
        self._watchdog.connect("refresh", self._refresh)

        self._builder = Gtk.Builder()
        self._builder.add_from_resource('/com/nautilus/git/ui/page.ui')
        self._build_widgets()
项目:log-with-git    作者:iesugrace    | 项目源码 | 文件源码
def setup(dataDir):
        engineDir = os.path.join(dataDir, 'xml')
        os.makedirs(engineDir, exist_ok=True)
        XmlStorage.dataDir = engineDir
        XmlStorage.git     = Git(engineDir)
        return XmlStorage.git
项目:iagitup    作者:gdamdam    | 项目源码 | 文件源码
def repo_download(github_repo_url):
    download_dir = os.path.expanduser('~/.iagitup/downloads')
    mkdirs(os.path.expanduser('~/.iagitup'))
    mkdirs(download_dir)

    # parsing url to initialize the github api rul and get the repo_data
    gh_user, gh_repo = github_repo_url.split('/')[3:]
    gh_api_url = "https://api.github.com/repos/{}/{}".format(gh_user,gh_repo)

    # delete the temp directory if exists
    repo_folder = download_dir+'/'+gh_repo
    if os.path.exists(repo_folder):
        shutil.rmtree(repo_folder)

    # get the data from GitHub api
    req = requests.get(gh_api_url)
    if req.status_code == 200:
        gh_repo_data = json.loads(req.text)
        # download the repo from github
        repo_folder = '{}/{}'.format(download_dir,gh_repo)
        try:
            git.Git().clone(gh_repo_data['clone_url'],repo_folder)
        except Exception as e:
            print 'Error occurred while downloading: {}'.format(github_repo_url)
            print str(e)
            exit(1)
    else:
        raise ValueError('Error occurred while downloading: {}'.format(github_repo_url))

    return gh_repo_data, repo_folder


# get descripton from readme md
项目:octario    作者:redhat-openstack    | 项目源码 | 文件源码
def __get_branch_url_from_git(self, path):
        """Gets repo_url and branch from local git directory.

        Raises:
            NotValidGitRepoException: If there is no git directory found.

        Args:
            path (:obj:`str`): Path to the git directory.

        Returns:
            touple: repo URL and branch name from local git directory
        """
        repo_url = None
        branch_name = None

        try:
            g = git.Git(path)
            LOG.debug('Found remotes {}'.format(','.join([remote for remote in g.remote().split('\n')])))
            for remote_name in ('rhos', 'patches', 'origin'):
                if remote_name in g.remote().split('\n'):
                    repo_url = g.config("--get", "remote.{}.url".format(remote_name))
                    LOG.debug('Using Remote {}'.format(remote_name))
                    break
            if repo_url is None:
                raise exceptions.NotValidGitRepoException(path)

            repo = git.Repo(path)
            branch = repo.active_branch
            branch_name = branch.name
        except (git.exc.InvalidGitRepositoryError,
                git.exc.GitCommandNotFound,
                git.exc.GitCommandError) as git_ex:
            LOG.error(git_ex)
            raise exceptions.NotValidGitRepoException(path)
        except TypeError:
            # If not found directly from git, try to get the branch name
            # from .gitreview file which should be pointing to the proper
            # branch.
            LOG.debug('Fallback method to get branch name from .gitreview')
            branch_name = self.__get_branch_from_gitreview(path)
            if not branch_name:
                # Git repo is most likely in detached state
                # Fallback method of getting branch name, much slower
                LOG.debug('HEAD detached, fallback method to get branch name')
                head_branch = os.linesep.join(
                    [s for s in g.log('--pretty=%d').splitlines()
                     if s and "tag:" not in s])
                r_index = head_branch.rfind("/")
                branch_name = head_branch[r_index + 1:-1]

        LOG.debug('Component repository: %s' % repo_url)
        LOG.debug('Component branch: %s' % branch_name)

        return repo_url, branch_name