Python sh 模块,ErrorReturnCode() 实例源码

我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用sh.ErrorReturnCode()

项目:gilt    作者:metacloud    | 项目源码 | 文件源码
def _has_commit(version, debug=False):
    """
    Determine a version is a local git commit sha or not.

    :param version: A string containing the branch/tag/sha to be determined.
    :param debug: An optional bool to toggle debug output.
    :return: bool
    """
    if _has_tag(version, debug) or _has_branch(version, debug):
        return False
    cmd = sh.git.bake('cat-file', '-e', version)
    try:
        util.run_command(cmd, debug=debug)
        return True
    except sh.ErrorReturnCode:
        return False
项目:kubelib    作者:safarijv    | 项目源码 | 文件源码
def delete_path(self, path_or_fn, cascade=True):
        """Simple kubectl delete wrapper.

        :param path_or_fn: Path or filename of yaml resource descriptions
        """
        LOG.info('(-) kubectl delete -f %s', path_or_fn)
        try:
            self.kubectl.delete(
                '-f', path_or_fn,
                '--namespace={}'.format(self.config.namespace),
                '--context={}'.format(self.config.context),
                '--cascade={}'.format('true' if cascade else 'false')
            )
        except sh.ErrorReturnCode:
            return False
        return True
项目:AerisCloud    作者:AerisCloud    | 项目源码 | 文件源码
def update():
    for organization in get_organization_list():
        if not os.path.exists(os.path.join(get_env_path(organization),
                                           '.git')):
            info("Skip %s" % organization)
            continue

        info("Updating %s ..." % organization)
        os.chdir(get_env_path(organization))

        if not git('remote').strip():
            info("No remotes are set for this organization, skipping")
            continue

        try:
            vgit('pull')
        except ErrorReturnCode:
            fatal("Unable to update the organizations.")

        run_galaxy_install(organization)

    success("All the organizations have been updated.")
项目:s2e-env    作者:S2E    | 项目源码 | 文件源码
def _create_archive(self, archive_path, export_dir):
        """
        Create the final archive of all the exported project files.

        Args:
            archive_path: Path to the ``tar.xz`` archive.
            export_dir: Path to the directory containing the files to export.
        """
        try:
            logger.info('Creating archive %s', archive_path)
            create_archive = tar.bake(create=True, xz=True, verbose=True,
                                      file=archive_path, directory=export_dir,
                                      _fg=True, _out=sys.stdout,
                                      _err=sys.stderr)
            create_archive(self._project_name)
        except ErrorReturnCode as e:
            raise CommandError('Failed to archive project - %s' % e)
项目:s2e-env    作者:S2E    | 项目源码 | 文件源码
def _gen_html(self, lcov_info_path):
        """
        Generate an LCOV HTML report.

        Returns the directory containing the HTML report.
        """
        from sh import genhtml, ErrorReturnCode

        lcov_html_dir = self.project_path('s2e-last', 'lcov')
        try:
            genhtml(lcov_info_path, output_directory=lcov_html_dir,
                    _out=sys.stdout, _err=sys.stderr, _fg=True)
        except ErrorReturnCode as e:
            raise CommandError(e)

        return lcov_html_dir
项目:s2e-env    作者:S2E    | 项目源码 | 文件源码
def _update_s2e_sources(self):
        """
        Update all of the S2E repositories with repo.
        """
        repo = sh.Command(self.install_path('bin', 'repo'))

        # cd into the S2E source directory
        orig_dir = os.getcwd()
        os.chdir(self.source_path('s2e'))

        try:
            logger.info('Updating S2E')
            repo.sync(_out=sys.stdout, _err=sys.stderr, _fg=True)
        except ErrorReturnCode as e:
            raise CommandError(e)
        finally:
            # Change back to the original directory
            os.chdir(orig_dir)

        # Success!
        logger.success('Updated S2E')
项目:cookiecutter-django-app    作者:edx    | 项目源码 | 文件源码
def test_readme(cookies):
    """The generated README.rst file should pass some sanity checks and validate as a PyPI long description."""
    extra_context = {'repo_name': 'helloworld'}
    with bake_in_temp_dir(cookies, extra_context=extra_context) as result:

        readme_file = result.project.join('README.rst')
        readme_lines = [x.strip() for x in readme_file.readlines(cr=False)]
        assert 'helloworld' in readme_lines
        assert 'The full documentation is at https://helloworld.readthedocs.org.' in readme_lines
        setup_path = str(result.project.join('setup.py'))
        try:
            sh.python(setup_path, 'check', restructuredtext=True, strict=True)
        except sh.ErrorReturnCode as exc:
            pytest.fail(str(exc))
项目:cookiecutter-django-app    作者:edx    | 项目源码 | 文件源码
def check_quality(result):
    """Run quality tests on the given generated output."""
    for dirpath, _dirnames, filenames in os.walk(str(result.project)):
        pylintrc = str(result.project.join('pylintrc'))
        for filename in filenames:
            name = os.path.join(dirpath, filename)
            if not name.endswith('.py'):
                continue
            try:
                sh.pylint(name, rcfile=pylintrc)
                sh.pylint(name, py3k=True)
                sh.pycodestyle(name)
                if filename != 'setup.py':
                    sh.pydocstyle(name)
                sh.isort(name, check_only=True)
            except sh.ErrorReturnCode as exc:
                pytest.fail(str(exc))

    tox_ini = result.project.join('tox.ini')
    docs_build_dir = result.project.join('docs/_build')
    try:
        # Sanity check the generated Makefile
        sh.make('help')
        # quality check docs
        sh.doc8(result.project.join("README.rst"), ignore_path=docs_build_dir, config=tox_ini)
        sh.doc8(result.project.join("docs"), ignore_path=docs_build_dir, config=tox_ini)
    except sh.ErrorReturnCode as exc:
        pytest.fail(str(exc))
项目:ldap2pg    作者:dalibo    | 项目源码 | 文件源码
def test_custom_yaml():
    from sh import ErrorReturnCode, chmod, ldap2pg, rm

    LDAP2PG_CONFIG = 'my-test-ldap2pg.yml'
    rm('-f', LDAP2PG_CONFIG)
    with pytest.raises(ErrorReturnCode):
        ldap2pg(_env=dict(os.environ, LDAP2PG_CONFIG=LDAP2PG_CONFIG))

    yaml = YAML_FMT % os.environ
    with open(LDAP2PG_CONFIG, 'w') as fo:
        fo.write(yaml)

    # Purge env from value set in file. Other are reads from ldaprc.
    blacklist = ('LDAPURI', 'LDAPHOST', 'LDAPPORT', 'LDAPPASSWORD')
    ldapfree_env = dict(
        (k, v)
        for k, v in os.environ.items()
        if k not in blacklist
    )

    # Ensure world readable password is denied
    with pytest.raises(ErrorReturnCode):
        ldap2pg(config=LDAP2PG_CONFIG, _env=ldapfree_env)

    # And that fixing file mode do the trick.
    chmod('0600', LDAP2PG_CONFIG)
    ldap2pg('--config', LDAP2PG_CONFIG, _env=ldapfree_env)
项目:gilt    作者:metacloud    | 项目源码 | 文件源码
def _has_tag(version, debug=False):
    """
    Determine a version is a local git tag name or not.

    :param version: A string containing the branch/tag/sha to be determined.
    :param debug: An optional bool to toggle debug output.
    :return: bool
    """
    cmd = sh.git.bake('show-ref', '--verify', '--quiet',
                      "refs/tags/{}".format(version))
    try:
        util.run_command(cmd, debug=debug)
        return True
    except sh.ErrorReturnCode:
        return False
项目:gilt    作者:metacloud    | 项目源码 | 文件源码
def _has_branch(version, debug=False):
    """
    Determine a version is a local git branch name or not.

    :param version: A string containing the branch/tag/sha to be determined.
    :param debug: An optional bool to toggle debug output.
    :return: bool
    """
    cmd = sh.git.bake('show-ref', '--verify', '--quiet',
                      "refs/heads/{}".format(version))
    try:
        util.run_command(cmd, debug=debug)
        return True
    except sh.ErrorReturnCode:
        return False
项目:cookiecutter-django-reactjs    作者:genomics-geek    | 项目源码 | 文件源码
def test_flake8_compliance(cookies):
    """generated project should pass flake8"""
    result = cookies.bake()

    try:
        sh.flake8(str(result.project))
    except sh.ErrorReturnCode as e:
        pytest.fail(e)
项目:keras-image-captioning    作者:danieljl    | 项目源码 | 文件源码
def _wait_running_commands(self):
        for search_index, running_command in self.running_commands:
            training_label = self.training_label(search_index)
            logging('Waiting {} to finish..'.format(training_label))
            try:
                running_command.wait()
            except sh.ErrorReturnCode as e:
                logging('{} returned a non-zero code!'.format(training_label))
            except:
                traceback.print_exc(file=sys.stderr)
项目:rootfetch    作者:zmap    | 项目源码 | 文件源码
def _find_java_home(self):
        if self._java_home:
            return self._java_home

        # First check if the enviornment variable is set.
        java_home = os.environ.get("JAVA_HOME", None)
        if java_home:
            return java_home

        # On OS X, there's a magical command that gives you $JAVA_HOME
        if sys.platform == "darwin":
            try:
                cmd = sh.Command("/usr/libexec/java_home")
                return cmd().strip()
            except sh.ErrorReturnCode:
                pass

        # If only one Java is installed in the default Linux JVM folder, use
        # that
        if sys.platform in {"linux", "linux2"}:
            if os.path.isdir(self.DEFAULT_LINUX_JVM):
                javas = os.listdir(self.DEFAULT_LINUX_JVM)
                if len(javas) == 1:
                    return javas[0]

        # Give up
        return None
项目:kubelib    作者:safarijv    | 项目源码 | 文件源码
def describe(self, name):
        """Return a yaml-ish text blob.

        Not helpful for automation, very helpful for humans.
        """
        try:
            return self.kubectl.describe(
                self.url_type,
                name,
                '--context={}'.format(self.config.context),
                '--namespace={}'.format(self.config.namespace)
            )
        except sh.ErrorReturnCode as err:
            logging.error("Unexpected response: %s", err)
项目:kubelib    作者:safarijv    | 项目源码 | 文件源码
def delete(self, name):
        """Delete the named resource.

        TODO: should be easy to rewrite this as a kube api
        delete call instead of going through kubectl.
        """
        try:
            self.kubectl.delete(
                self.url_type,
                name,
                '--context={}'.format(self.config.context),
                '--namespace={}'.format(self.config.namespace)
            )
        except sh.ErrorReturnCode as err:
            logging.error("Unexpected response: %s", err)
项目:reqwire    作者:darvid    | 项目源码 | 文件源码
def pip_install(ctx, *specifiers):
    # type: (click.Context, str) -> None
    try:
        result = sh.pip.install(*specifiers, _err_to_out=True, _iter=True)
        for line in result:
            click.echo(line, nl=False)
    except sh.ErrorReturnCode:
        ctx.abort()
项目:cookiecutter-saas    作者:jayfk    | 项目源码 | 文件源码
def test_flake8_compliance(cookies):
    """generated project should pass flake8"""
    result = cookies.bake()

    #try:
    #    sh.flake8(str(result.project))
    #except sh.ErrorReturnCode as e:
    #    print(e)
    #    pytest.fail(str(e))
项目:hbp-neuromorphic-client    作者:HumanBrainProject    | 项目源码 | 文件源码
def _get_code(self, nmpi_job, job_desc):
        """
        Obtain the code and place it in the working directory.

        If the experiment description is the URL of a Git repository, try to clone it.
        If it is the URL of a zip or .tar.gz archive, download and unpack it.
        Otherwise, the content of "code" is the code: write it to a file.
        """
        url_candidate = urlparse(nmpi_job['code'])
        logger.debug("Get code: %s %s", url_candidate.netloc, url_candidate.path)
        if url_candidate.scheme and url_candidate.path.endswith((".tar.gz", ".zip", ".tgz")):
            self._create_working_directory(job_desc.working_directory)
            target = os.path.join(job_desc.working_directory, os.path.basename(url_candidate.path))
            #urlretrieve(nmpi_job['code'], target) # not working via KIP https proxy
            curl(nmpi_job['code'], '-o', target)
            logger.info("Retrieved file from {} to local target {}".format(nmpi_job['code'], target))
            if url_candidate.path.endswith((".tar.gz", ".tgz")):
                tar("xzf", target, directory=job_desc.working_directory)
            elif url_candidate.path.endswith(".zip"):
                try:
                    # -o for auto-overwrite
                    unzip('-o', target, d=job_desc.working_directory)
                except:
                    logger.error("Could not unzip file {}".format(target))
        else:
            try:
                # Check the "code" field for a git url (clone it into the workdir) or a script (create a file into the workdir)
                # URL: use git clone
                git.clone('--recursive', nmpi_job['code'], job_desc.working_directory)
                logger.info("Cloned repository {}".format(nmpi_job['code']))
            except (sh.ErrorReturnCode_128, sh.ErrorReturnCode):
                # SCRIPT: create file (in the current directory)
                logger.info("The code field appears to be a script.")
                self._create_working_directory(job_desc.working_directory)
                with codecs.open(job_desc.arguments[0], 'w', encoding='utf8') as job_main_script:
                    job_main_script.write(nmpi_job['code'])
项目:photon    作者:metacloud    | 项目源码 | 文件源码
def run(self, resume=1):
        """Execute ansible-playbook using information gathered from config.

        Args:
            resume (int): Used as list index - 1 from which to resume workflow.
        """
        # list index to start working on (for support of --resume)
        try:
            i = int(resume) - 1
        except ValueError:  # generally if passed a non-int
            i = 0

        cmds = self._config.playbook_cmds
        kwargs = {
            '_out': self._print_stdout,
            '_err': self._print_stderr,
            '_env': self._config.env
        }

        for counter, cmd in enumerate(cmds):
            # skip execution until we reach our --resume index
            # using a list slice doesn't work here since we need to be aware of
            # the full list to produce a resume index on failure
            if counter < i:
                continue

            try:
                sh.ansible_playbook(*cmd, **kwargs)
            except (sh.ErrorReturnCode, sh.ErrorReturnCode_1):
                msg = ('An error was encountered during playbook execution. '
                       'Please resolve manually and then use the following '
                       'command to resume execution of this script:\n\n')
                cmd = self._construct_resume_cli(counter + 1)
                print(colorama.Fore.RED + msg + cmd)
                sys.exit(1)
项目:dvhb-backend-cookiecutter    作者:dvhbru    | 项目源码 | 文件源码
def check_project_result(result):
    """
    Method to common project baking verification
    """
    assert result.exit_code == 0
    assert result.exception is None
    assert result.project.isdir()

    # Check project with flake8
    try:
        sh.flake8(str(result.project))
    except sh.ErrorReturnCode as e:
        pytest.fail(e)
项目:AerisCloud    作者:AerisCloud    | 项目源码 | 文件源码
def update():
    for inventory in os.listdir(inventory_path):
        if not os.path.exists(os.path.join(inventory_path, inventory, '.git')):
            info("Skip %s" % inventory)
            continue

        info("Updating %s ..." % inventory)
        os.chdir(os.path.join(inventory_path, inventory))
        try:
            vgit('pull')
        except ErrorReturnCode:
            fatal("Unable to update the inventories.")
    success("All the inventories have been updated.")
项目:AerisCloud    作者:AerisCloud    | 项目源码 | 文件源码
def update_inventory(name, dest_path):
    if not os.path.exists(os.path.join(dest_path, '.git')):
        warning("The %s inventory is not a git repository and has not "
                "been updated." % name)
        return

    click.echo('We will update the %s inventory' % name)
    os.chdir(dest_path)
    try:
        vgit('pull')
    except ErrorReturnCode:
        fatal("Unable to update the inventory %s." % name)
    success("The %s inventory has been updated." % name)
项目:AerisCloud    作者:AerisCloud    | 项目源码 | 文件源码
def install(name, path):
    """
    Install inventories.
    """
    if not name:
        update()
        return

    if not name.isalnum():
        fatal("Your inventory name should only contains alphanumeric "
              "characters.")

    dest_path = os.path.join(inventory_path, name)
    if os.path.exists(dest_path):
        update_inventory(name, dest_path)
        return

    if not path:
        fatal("You must specify a path to a local directory or an URL to a "
              "git repository to install a new inventory.")

    if os.path.exists(path) and os.path.isdir(path):
        if not os.path.exists(os.path.dirname(dest_path)):
            os.mkdir(os.path.dirname(dest_path))
        os.symlink(path, dest_path)
    else:
        click.echo('We will clone %s in %s\n' % (path, dest_path))
        try:
            vgit('clone', path, dest_path)
        except ErrorReturnCode:
            fatal("Unable to install the inventory %s." % name)
    success("The %s inventory has been installed." % name)
项目:AerisCloud    作者:AerisCloud    | 项目源码 | 文件源码
def update_organization(name, dest_path):
    click.echo('We will update the %s organization' % name)
    os.chdir(dest_path)
    if os.path.exists(os.path.join(dest_path, '.git')):
        try:
            vgit('pull')
        except ErrorReturnCode:
            fatal("Unable to update the organization %s." % name)
        success("The %s organization has been updated." % name)

    run_galaxy_install(name)
项目:AerisCloud    作者:AerisCloud    | 项目源码 | 文件源码
def _get_git_root():
    """
    Retrieve the git directory, or prompt to create one if not found
    """
    git_root = None

    try:
        git_root = str(git('rev-parse', '--show-toplevel')).strip()
    except ErrorReturnCode as e:
        if e.exit_code != 128:
            fatal(e.message, e.exit_code)

    if not git_root:
        warning('You must be in a git repository directory to '
                'initialize a new project.')

        if not click.confirm('Do you want to create a new git '
                             'repository here?', default=True):
            fatal('Please run %s' % style('git init', bold=True))

        try:
            vgit('init')
            git_root = os.getcwd()
        except ErrorReturnCode as e:
            fatal('An error occurred when trying to initialize a '
                  'new repo.', e.exit_code)

    if git_root == aeriscloud_path:
        fatal('You cannot init AerisCloud from the AerisCloud directory!')

    return git_root
项目:AerisCloud    作者:AerisCloud    | 项目源码 | 文件源码
def _run_nosetests():
    click.echo('Running unit tests ... ', nl=False)
    nose_bin = os.path.join(aeriscloud_path, 'venv/bin/nosetests')

    errors = 0
    try:
        python(nose_bin, '-v', '--with-id', module_path(),
               _err_to_out=True, _ok_code=[0])
        click.echo('[%s]' % click.style('OK', fg='green'))
    except ErrorReturnCode as e:
        click.echo('[%s]\n' % click.style('FAIL', fg='red'))

        for line in e.stdout.split('\n')[:-2]:
            if line.startswith('#'):
                print(line)
                (id, name, test_file, ellipsis, res) = line.rstrip().split(' ')

                if res == 'ok':
                    res = click.style(res, fg='green', bold=True)
                elif res == 'FAIL':
                    res = click.style(res, fg='red', bold=True)

                line = ' '.join([
                    click.style(id, bold=True, fg='yellow'),
                    click.style(name, fg='blue'),
                    test_file,
                    ellipsis,
                    res
                ])
            elif line.startswith('FAIL:'):
                (_, name, test_file) = line.split(' ')

                line = ' '.join([
                    click.style('FAIL', bold=True, fg='red') + ':',
                    click.style(name, fg='blue'),
                    test_file
                ])

            click.echo('  ' + line)
            errors += 1
    return errors
项目:docl    作者:cloudify-cosmo    | 项目源码 | 文件源码
def _retry(func, *args, **kwargs):
    for _ in range(300):
        try:
            func(*args, **kwargs)
            break
        except sh.ErrorReturnCode:
            sleep(0.1)
    else:
        raise
项目:docl    作者:cloudify-cosmo    | 项目源码 | 文件源码
def install_docker(version=None, container_id=None):
    container_id = container_id or work.last_container_id
    try:
        quiet_docker('exec', container_id, *'which docker'.split(' '))
        logger.info('Docker already installed on container. Doing nothing')
        return
    except sh.ErrorReturnCode:
        pass
    cp(resources.DIR / 'docker.repo', ':/etc/yum.repos.d/docker.repo',
       container_id=container_id)
    version = version or _get_docker_version()
    install_docker_command = 'yum install -y -q docker-engine-{}'.format(
        version)
    docker('exec', container_id, *install_docker_command.split(' '))
项目:docl    作者:cloudify-cosmo    | 项目源码 | 文件源码
def _get_docker_version():
    try:
        version = quiet_docker.version('-f', '{{.Client.Version}}').strip()
    except sh.ErrorReturnCode as e:
        version = e.stdout.strip()

    # Replacing the -ce in the version with .ce, as the versions in
    # https://yum.dockerproject.org/repo/main/centos/7/Packages/
    # adhere to this notation
    if version.endswith('-ce'):
        version = version.replace('-ce', '.ce')
    return version
项目:docl    作者:cloudify-cosmo    | 项目源码 | 文件源码
def _ssh_setup(container_id, container_ip):
    logger.info('Applying ssh configuration to manager container')
    try:
        known_hosts = path('~/.ssh/known_hosts').expanduser()
        # Known hosts file may not exist
        ssh_keygen('-R', container_ip)
        fingerprint = None
        while not fingerprint:
            fingerprint = ssh_keyscan(
                container_ip).stdout.split('\n')[0].strip()
            time.sleep(0.01)
        if fingerprint and known_hosts.exists():
            current = known_hosts.text()
            prefix = ''
            if not current.endswith('\n'):
                prefix = '\n'
            known_hosts.write_text(
                '{}{}\n'.format(prefix, fingerprint), append=True)
    except sh.ErrorReturnCode:
        pass
    quiet_docker('exec', container_id, 'mkdir', '-p', '/root/.ssh')
    ssh_public_key = ssh_keygen('-y', '-f', configuration.ssh_key_path).strip()
    with tempfile.NamedTemporaryFile() as f:
        f.write(ssh_public_key)
        f.flush()
        quiet_docker.cp(f.name, '{}:/root/.ssh/authorized_keys'.format(
            container_id))
    # due to a bug in docker 17.06, the file keeps ownership and is not
    # chowned to the main container user automatically
    quiet_docker('exec', container_id, 'chown', 'root:root',
                 '/root/.ssh/authorized_keys')
项目:hpc-workload-gen    作者:mikelangelo-project    | 项目源码 | 文件源码
def _get_qstat_job_state(self):
        try:
            self.logger.debug('getting qstat infos')
            ssh_output = self.ssh_host(self.cfg.path_qstat, self.job_id)

            self.logger.debug('qstat output:\n{}'.format(ssh_output))

            if ssh_output != '':
                # slice the header and the last line which is empty
                jobs_displayed = ssh_output.split('\n')[2:-1]
                for job in jobs_displayed:
                    # remove whitespace
                    job_info = ' '.join(job.split())
                    # split into stuff
                    (self.job_id,
                        job_name,
                        job_user,
                        job_time,
                        job_status,
                        job_queue) = job_info.split(' ')
                    self.logger.debug(
                        'job {} has status {}'.format(self.job_id, job_status))

                    return job_status
            else:
                return None

        except ErrorReturnCode as e:
            self.logger.error('\nError in ssh call:\n{}'.format(e))
            print e.stderr
            exit(1)
项目:ffpb    作者:althonos    | 项目源码 | 文件源码
def main(argv=None):
    argv = argv or sys.argv[1:]

    if {'-h', '-help', '--help'}.intersection(argv):
        sh.ffmpeg(help=True, _fg=True)
        return 0

    notifier = ProgressNotifier()

    try:

        sh.ffmpeg(
            sys.argv[1:],
            _in=queue.Queue(),
            _err=notifier,
            _out_bufsize=0,
            _err_bufsize=0,
            #_in_bufsize=0,
            _no_out=True,
            _no_pipe=True,
            _tty_in=True,
            #_fg=True,
            #_bg=True,
        )

    except sh.ErrorReturnCode as err:
        print(notifier.lines[-1])
        return err.exit_code

    else:
        print()
        return 0
项目:openshift-ansible-contrib    作者:openshift    | 项目源码 | 文件源码
def run(self):
        ''' run command '''
        if self.roles is not None:
            print("Roles:\n{0}".format(yaml.dump(self.roles, default_flow_style=False)))
        if self.excludes is not None:
            print("Excludes:\n{0}".format(yaml.dump(self.excludes, default_flow_style=False)))

        starting_dir = '.'
        if self.roles is not None:
            starting_dir = 'roles'

        molecule_dirs = find_dirs(starting_dir, self.excludes, self.roles, 'molecule')

        print("Found:\n{0}".format(yaml.dump(molecule_dirs, default_flow_style=False)))
        base_dir = os.getcwd()

        errors = ""
        warnings = ""

        for role in molecule_dirs:
            role = os.path.dirname(role)
            print("Testing: {0}".format(role))
            os.chdir(role)

            try:
                print(sh.molecule.test())
            except ErrorReturnCode as e:
                print(e.stdout)
                errors += e.stdout

            os.chdir(base_dir)

        if len(warnings) > 0:
            print("Warnings:\n{0}\n".format(warnings))

        if len(errors) > 0:
            print("Errors:\n{0}\n".format(errors))
            sys.exit(1)
项目:openshift-ansible    作者:nearform    | 项目源码 | 文件源码
def run(self):
        ''' run command '''
        if self.roles is not None:
            print("Roles:\n{0}".format(yaml.dump(self.roles, default_flow_style=False)))
        if self.excludes is not None:
            print("Excludes:\n{0}".format(yaml.dump(self.excludes, default_flow_style=False)))

        starting_dir = '.'
        if self.roles is not None:
            starting_dir = 'roles'

        molecule_dirs = find_dirs(starting_dir, self.excludes, self.roles, 'molecule')

        print("Found:\n{0}".format(yaml.dump(molecule_dirs, default_flow_style=False)))
        base_dir = os.getcwd()

        errors = ""
        warnings = ""

        for role in molecule_dirs:
            role = os.path.dirname(role)
            print("Testing: {0}".format(role))
            os.chdir(role)

            try:
                print(sh.molecule.test())
            except ErrorReturnCode as e:
                print(e.stdout)
                errors += e.stdout

            os.chdir(base_dir)

        if len(warnings) > 0:
            print("Warnings:\n{0}\n".format(warnings))

        if len(errors) > 0:
            print("Errors:\n{0}\n".format(errors))
            sys.exit(1)
项目:cookiecutter-django-gulp    作者:valerymelou    | 项目源码 | 文件源码
def test_flake8_compliance(cookies):
    """generated project should pass flake8"""
    result = cookies.bake()

    try:
        sh.flake8(str(result.project))
    except sh.ErrorReturnCode as e:
        pytest.fail(e)
项目:s2e-env    作者:S2E    | 项目源码 | 文件源码
def git_clone(git_repo_url, git_repo_dir):
    try:
        logger.info('Fetching from %s to %s', git_repo_url, git_repo_dir)
        git.clone(git_repo_url, git_repo_dir, _out=sys.stdout,
                  _err=sys.stderr, _fg=True)
    except ErrorReturnCode as e:
        raise CommandError(e)
项目:s2e-env    作者:S2E    | 项目源码 | 文件源码
def _decompress(path):
    """
    Decompress a .tar.xz file at the given path.

    The decompressed data will be located in the same directory as ``path``.
    """
    logger.info('Decompressing %s', path)
    try:
        tar(extract=True, xz=True, verbose=True, file=path,
            directory=os.path.dirname(path), _fg=True, _out=sys.stdout,
            _err=sys.stderr)
    except ErrorReturnCode as e:
        raise CommandError(e)
项目:s2e-env    作者:S2E    | 项目源码 | 文件源码
def _install_dependencies():
    """
    Install S2E's dependencies.

    Only apt-get is supported for now.
    """
    logger.info('Installing S2E dependencies')

    ubuntu_ver = _get_ubuntu_version()
    if not ubuntu_ver:
        return

    install_packages = CONSTANTS['dependencies']['common'] +                    \
                       CONSTANTS['dependencies']['ubuntu_%d' % ubuntu_ver] +    \
                       CONSTANTS['dependencies']['ida']

    try:
        # Enable 32-bit libraries
        dpkg_add_arch = sudo.bake('dpkg', add_architecture=True, _fg=True)
        dpkg_add_arch('i386')

        # Perform apt-get install
        apt_get = sudo.bake('apt-get', _fg=True)
        apt_get.update()
        apt_get.install(install_packages)
    except ErrorReturnCode as e:
        raise CommandError(e)
项目:s2e-env    作者:S2E    | 项目源码 | 文件源码
def _get_s2e_sources(env_path):
    """
    Download the S2E manifest repository and initialize all of the S2E
    repositories with repo.
    """
    # Download repo
    repo = _get_repo(env_path)

    s2e_source_path = os.path.join(env_path, 'source', 's2e')

    # Create the S2E source directory and cd to it to run repo
    os.mkdir(s2e_source_path)
    orig_dir = os.getcwd()
    os.chdir(s2e_source_path)

    git_url = CONSTANTS['repos']['url']
    git_s2e_repo = CONSTANTS['repos']['s2e']

    try:
        # Now use repo to initialize all the repositories
        logger.info('Fetching %s from %s', git_s2e_repo, git_url)
        repo.init(u='%s/%s' % (git_url, git_s2e_repo), _out=sys.stdout,
                  _err=sys.stderr, _fg=True)
        repo.sync(_out=sys.stdout, _err=sys.stderr, _fg=True)
    except ErrorReturnCode as e:
        # Clean up - remove the half-created S2E environment
        shutil.rmtree(env_path)
        raise CommandError(e)
    finally:
        # Change back to the original directory
        os.chdir(orig_dir)

    # Success!
    logger.success('Fetched %s', git_s2e_repo)
项目:s2e-env    作者:S2E    | 项目源码 | 文件源码
def _get_project_name(archive):
    """
    Get the project name from the archive.

    The project name is the name of the root directory in the archive.
    """
    try:
        contents = tar(exclude='*/*', list=True, file=archive)
        return os.path.dirname(str(contents))
    except ErrorReturnCode as e:
        raise CommandError('Failed to list archive - %s' % e)
项目:s2e-env    作者:S2E    | 项目源码 | 文件源码
def handle(self, *args, **options):
        # Exit if the makefile doesn't exist
        makefile = self.env_path('source', 's2e', 'Makefile')
        if not os.path.isfile(makefile):
            raise CommandError('No makefile found in %s' %
                               os.path.dirname(makefile))

        # If the build directory doesn't exist, create it
        build_dir = self.env_path('build', 's2e')
        if not os.path.isdir(build_dir):
            os.mkdir(build_dir)

        # Set up some environment variables
        env_vars = os.environ.copy()
        env_vars['S2EPREFIX'] = self.install_path()

        components = options['components']
        self._make = sh.Command('make').bake(directory=build_dir, file=makefile, _env=env_vars)

        # If the user has specified any components to rebuild, do this before
        # the build
        if components:
            self._rebuild_components(components)

        try:
            # Run make
            if options['debug']:
                logger.info('Building S2E (debug) in %s', build_dir)
                self._make('all-debug', _out=sys.stdout, _err=sys.stderr, _fg=True)
            else:
                logger.info('Building S2E (release) in %s', build_dir)
                self._make('install', _out=sys.stdout, _err=sys.stderr, _fg=True)
        except ErrorReturnCode as e:
            raise CommandError(e)

        return 'S2E built'
项目:s2e-env    作者:S2E    | 项目源码 | 文件源码
def _invoke_make(self, img_build_dir, rule_names, num_cores, iso_dir=''):
        env = os.environ.copy()
        env['S2E_INSTALL_ROOT'] = self.install_path()
        env['S2E_LINUX_KERNELS_ROOT'] = \
            self.source_path(CONSTANTS['repos']['images']['linux'])
        env['OUTDIR'] = self.image_path()

        if iso_dir:
            env['ISODIR'] = iso_dir

        logger.debug('Invoking makefile with:')
        logger.debug('export S2E_INSTALL_ROOT=%s', env['S2E_INSTALL_ROOT'])
        logger.debug('export S2E_LINUX_KERNELS_ROOT=%s', env['S2E_LINUX_KERNELS_ROOT'])
        logger.debug('export OUTDIR=%s', env['OUTDIR'])
        logger.debug('export ISODIR=%s', env.get('ISODIR', ''))

        if not self._headless:
            env['GRAPHICS'] = ''
        else:
            logger.warn('Image creation will run in headless mode. '
                        'Use --gui to see graphic output for debugging.')

        try:
            make = sh.Command('make').bake(file=os.path.join(img_build_dir,
                                                             'Makefile'),
                                           directory=self.image_path(),
                                           _out=sys.stdout, _err=sys.stderr,
                                           _env=env, _fg=True)

            make_image = make.bake(j=num_cores)
            make_image(rule_names)
        except ErrorReturnCode as e:
            raise CommandError(e)
项目:cookiecutter-saas    作者:jayfk    | 项目源码 | 文件源码
def run_docker_dev_test(path, coverage=False):
    """
    Method to check that docker runs with dev.yml
    """
    try:
        # build django, power up the stack and run the test
        sh.docker_compose(
            "--file", "{}/dev.yml".format(path), "build", "django"
        )
        sh.docker_compose("--file", "{}/dev.yml".format(path), "build")
        if coverage:
            sh.docker_compose(
                "--file", "{}/dev.yml".format(path), "run", "django", "coverage", "run", "manage.py", "test"
            )
            sh.docker_compose(
                "--file", "{}/dev.yml".format(path), "run", "django", "coverage", "xml", "-o", "coverage.xml"
            )
            shutil.copyfile(os.path.join(str(path), ".coverage"), os.path.join(PROJECT_DIR, ".coverage"))
            shutil.copyfile(os.path.join(str(path), "coverage.xml"),
                            os.path.join(PROJECT_DIR, "coverage.xml"))
        else:
            sh.docker_compose(
                "--file", "{}/dev.yml".format(path), "run", "django", "python", "manage.py", "test"
            )

        # test that the development server is running
        sh.docker_compose("--file", "{}/dev.yml".format(path), "up", "-d")
        time.sleep(10)
        curl = sh.curl("-I", "http://localhost:8000/")
        assert "200 OK" in curl
        assert "Server: Werkzeug" in curl

        # since we are running a lot of tests with different configurations,
        # we need to clean up the environment. Stop all running containers,
        # remove them and remove the postgres_data volume.
        sh.docker_compose("--file", "{}/dev.yml".format(path), "stop")
        sh.docker_compose("--file", "{}/dev.yml".format(path), "rm", "-f")
        sh.docker("volume", "rm", "cookiecuttersaastestproject_postgres_data_dev")
    except sh.ErrorReturnCode as e:
        # in case there are errors it's good to have full output of
        # stdout and stderr.
        pytest.fail("STDOUT: {} \n\n\n STDERR: {}".format(
            e.stdout.decode("utf-8"), e.stderr.decode("utf-8"))
        )
项目:AerisCloud    作者:AerisCloud    | 项目源码 | 文件源码
def _run_ansible_lint(organization):
    al_bin = os.path.join(aeriscloud_path, 'venv/bin/ansible-lint')

    env = ansible_env(os.environ.copy())

    if organization:
        environment_files = glob.glob(get_env_path(organization) + '/*.yml')
    else:
        environment_files = glob.glob(organization_path + '/*/*.yml')

    if not environment_files:
        return 0

    args = environment_files + ['-r', os.path.join(ansible_path, 'rules')]

    click.echo('Running ansible-lint ... ', nl=False)

    errors = 0
    try:
        python(al_bin, *args,
               _env=env, _err_to_out=True, _ok_code=[0])
        click.echo('[%s]' % click.style('OK', fg='green'))
    except ErrorReturnCode as e:
        parser = re.compile(
            r'^\[(?P<error_code>[^\]]+)\] (?P<error_message>[^\n]+)\n'
            r'%s(?P<file_name>[^:]+):(?P<line_number>[0-9]+)\n'
            r'Task/Handler: (?P<task_name>[^\n]+)\n\n' % (ansible_path + '/'),
            re.MULTILINE
        )

        click.echo('[%s]\n' % click.style('FAIL', fg='red'))

        last_file = None
        pos = 0
        while pos < len(e.stdout):
            match = parser.match(e.stdout, pos)
            if not match:
                click.secho("Error: %s" % e.stdout)
                errors += 1
                break
            error = match.groupdict()

            if error['file_name'] != last_file:
                click.secho('  Errors in file: %s' % error['file_name'],
                            fg='blue', bold=True)
                last_file = error['file_name']

            click.echo('    line %s task %s: %s %s' % (
                click.style(error['line_number'], fg='green'),
                click.style(error['task_name'], fg='green'),
                click.style(error['error_code'], fg='red'),
                click.style(error['error_message'], fg='red'),
            ))
            errors += 1
            pos = match.end()
    return errors
项目:hpc-workload-gen    作者:mikelangelo-project    | 项目源码 | 文件源码
def submit_job(self):
        """Submit the job to qsub, returns job_id."""
        self.logger.info('Submitting job ...')

        job_script_path = self._get_job_script_path()

        arg_list = self._build_qsub_args()
        arg_list.append(job_script_path)
        self.logger.debug('arg_liste: {}'.format(arg_list))

        try:
            self.logger.debug(
                '{} {}'.format(self.cfg.path_qsub, arg_list)
            )

            # get stating time and convert
            self.time_stamp_jobstart = int(time.time()) * 1000
            self.logger.debug(
                'stat time stamp: {}'.format(self.time_stamp_jobstart)
            )
            # Job submit
            ssh_output = self.ssh_host(self.cfg.path_qsub, *arg_list)

            # searching job id
            for line in ssh_output:
                self.logger.debug('searching for job id in \n{}'.format(line))

                if "hlrs.de" in line:
                    self.logger.debug('possible job id found: {}'.format(line))
                    self.job_id = str(line)
                    if self.cfg.grafana:
                        self.logger.info(
                            'Job performance data at:\n'
                            '{}var-JobId=snapTask-{}-{}&'
                            'from={}&'
                            'to=now'.format(
                                self.cfg.grafana_base_string,
                                self.cfg.user_name,
                                self.job_id.rstrip(),
                                self.time_stamp_jobstart
                            )
                        )
                    return

            self.logger.error(
                'no job id found in \n{}\nexiting!!!'.format(ssh_output)
            )
            exit(1)

        except ErrorReturnCode as e:
            self.logger.error('\nError in ssh call:\n{}'.format(e))
            print e.stderr
            exit(1)
项目:s2e-env    作者:S2E    | 项目源码 | 文件源码
def _get_basic_blocks(self):
        """
        Extract basic block information from the target binary using S2E's IDA
        Pro script.

        This extraction is done within a temporary directory so that we don't
        pollute the file system with temporary idbs and other such things.
        """
        logger.info('Generating basic block information from IDA Pro')

        try:
            with TemporaryDirectory() as temp_dir:
                target_path = self._project_desc['target_path']

                # Copy the binary to the temporary directory. Because projects
                # are created with a symlink to the target program, then IDA
                # Pro will generate the idb and bblist files in the symlinked
                # target's directory. Which is not what we want
                target_name = os.path.basename(target_path)

                temp_target_path = os.path.join(temp_dir, target_name)
                shutil.copyfile(target_path, temp_target_path)

                # Run the IDA Pro extractBasicBlocks script
                env_vars = os.environ.copy()
                env_vars['TVHEADLESS'] = '1'
                # This is required if s2e-env runs inside screen
                env_vars['TERM'] = 'xterm'

                ida = sh.Command(self._ida_path)
                ida('-A', '-B',
                    '-S%s' % self.install_path('bin', 'extractBasicBlocks.py'),
                    temp_target_path, _out=os.devnull, _tty_out=False,
                    _cwd=temp_dir, _env=env_vars)

                # Check that the basic block list file was correctly generated
                bblist_file = os.path.join(temp_dir, '%s.bblist' % target_name)
                if not os.path.isfile(bblist_file):
                    raise CommandError('Failed to generate bblist file for '
                                       '%s' % target_name)

                # Parse the basic block list file
                #
                # to_basic_block takes a 3-tuple read from the bblist file and
                # converts it to a BasicBlock
                to_basic_block = lambda tup: BasicBlock(int(tup[0], 16),
                                                        int(tup[1], 16),
                                                        tup[2])
                with open(bblist_file, 'r') as f:
                    return [to_basic_block(l.rstrip().split(' ')) for l in f]
        except ErrorReturnCode as e:
            raise CommandError(e)