Python os 模块,EX_OK 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用os.EX_OK

项目:concierge    作者:9seconds    | 项目源码 | 文件源码
def manage_events(self, notify):
        filename = os.path.basename(self.source_path)

        while True:
            try:
                events = notify.read()
            except KeyboardInterrupt:
                return os.EX_OK
            else:
                LOG.debug("Caught %d events", len(events))

            events = self.filter_events(filename, events)
            descriptions = self.describe_events(events)
            LOG.debug("Got %d events after filtration: %s",
                      len(descriptions), descriptions)

            if events:
                self.output()

            LOG.info("Config was managed. Going to the next loop.")
项目:functest    作者:opnfv    | 项目源码 | 文件源码
def __str__(self):
        try:
            assert self.project_name
            assert self.case_name
            result = 'PASS' if(self.is_successful(
                ) == TestCase.EX_OK) else 'FAIL'
            msg = prettytable.PrettyTable(
                header_style='upper', padding_width=5,
                field_names=['test case', 'project', 'duration',
                             'result'])
            msg.add_row([self.case_name, self.project_name,
                         self.get_duration(), result])
            return msg.get_string()
        except AssertionError:
            self.__logger.error("We cannot print invalid objects")
            return super(TestCase, self).__str__()
项目:functest    作者:opnfv    | 项目源码 | 文件源码
def run_tier(self, tier):
        """Run one tier"""
        tier_name = tier.get_name()
        tests = tier.get_tests()
        if not tests:
            LOGGER.info("There are no supported test cases in this tier "
                        "for the given scenario")
            self.overall_result = Result.EX_ERROR
        else:
            LOGGER.info("Running tier '%s'", tier_name)
            for test in tests:
                self.run_test(test)
                test_case = self.executed_test_cases[test.get_name()]
                if test_case.is_successful() != testcase.TestCase.EX_OK:
                    LOGGER.error("The test case '%s' failed.", test.get_name())
                    if test.get_project() == "functest":
                        self.overall_result = Result.EX_ERROR
                    if test.is_blocking():
                        raise BlockingTestFailed(
                            "The test case {} failed and is blocking".format(
                                test.get_name()))
        return self.overall_result
项目:redisrwlock    作者:veshboo    | 项目源码 | 文件源码
def test_option_repeat_interval(self):
        """test --retry and --interval options"""
        # run with --retry, see 2 lines, then kill -INT
        cmd, output = runCmdOutput(['-p', '7788', '-r'],
                                   wait=False, limit=2)
        cmd.send_signal(signal.SIGINT)
        self.assertEqual(cmd.wait(), 1)
        cmd.stdout.close()
        # run with --retry, see 4 lines, then kill -INT
        cmd, output = runCmdOutput(['-p', '7788', '-r', '-i', '1'],
                                   wait=False, limit=4)
        cmd.send_signal(signal.SIGINT)
        self.assertEqual(cmd.wait(), 1)
        cmd.stdout.close()
        # invalid --interval option argument (int > 0)
        cmd, output = runCmdOutput(['-p', '7788', '-i', '0'])
        self.assertEqual(cmd.returncode, os.EX_USAGE)
        # --interval option argument ignored if no --retry
        cmd, output = runCmdOutput(['-p', '7788', '-i', '1000'])
        self.assertEqual(cmd.returncode, os.EX_OK)
项目:temci    作者:parttimenerd    | 项目源码 | 文件源码
def _instancecheck_impl(self, value, info: Info) -> InfoMsg:
        if not isinstance(value, str) or value == "":
            return info.errormsg(self)
        value = os.path.expanduser(value)
        if self.allow_std and value == "-" and (self.constraint is None or self.constraint(value)):
            return info.wrap(True)
        is_valid = True
        if os.path.exists(value):
            if os.path.isfile(value) and os.access(os.path.abspath(value), os.W_OK)\
                    and (self.constraint is None or self.constraint(value)):
                return info.wrap(True)
            return info.errormsg(self)
        if not self.allow_non_existent:
            return info.errormsg(self, "File doesn't exist")
        abs_name = os.path.abspath(value)
        dir_name = os.path.dirname(abs_name)
        if os.path.exists(dir_name) and os.access(dir_name, os.EX_OK) and os.access(dir_name, os.W_OK) \
            and (self.constraint is None or self.constraint(value)):
            return info.wrap(True)
        return info.errormsg(self)
项目:ceph-lcm    作者:Mirantis    | 项目源码 | 文件源码
def test_main_list(monkeypatch, capsys, mocked_sysexit, mocked_configure):
    server_id = pytest.faux.gen_uuid()
    host = pytest.faux.gen_alphanumeric()
    username = pytest.faux.gen_alphanumeric()
    initiator_id = pytest.faux.gen_uuid()

    tsk = task.ServerDiscoveryTask(server_id, host, username, initiator_id)
    tsk = tsk.create()

    monkeypatch.setenv(process.ENV_ENTRY_POINT, "server_discovery")
    monkeypatch.setenv(process.ENV_TASK_ID, str(tsk._id))
    monkeypatch.setattr("sys.argv", ["progname", "--list"])

    assert inventory.main() == os.EX_OK

    mocked_sysexit.assert_not_called()

    out, _ = capsys.readouterr()
    arg = json.loads(out)
    assert arg["new"]["hosts"] == [host]
    assert arg["_meta"]["hostvars"][host]["ansible_user"] == username
项目:ceph-lcm    作者:Mirantis    | 项目源码 | 文件源码
def test_main_host_ok(monkeypatch, capsys, mocked_sysexit, mocked_configure):
    server_id = pytest.faux.gen_uuid()
    host = pytest.faux.gen_alphanumeric()
    username = pytest.faux.gen_alphanumeric()
    initiator_id = pytest.faux.gen_uuid()

    tsk = task.ServerDiscoveryTask(server_id, host, username, initiator_id)
    tsk = tsk.create()

    monkeypatch.setenv(process.ENV_ENTRY_POINT, "server_discovery")
    monkeypatch.setenv(process.ENV_TASK_ID, str(tsk._id))
    monkeypatch.setattr("sys.argv", ["progname", "--host", host])

    assert inventory.main() == os.EX_OK

    mocked_sysexit.assert_not_called()

    out, _ = capsys.readouterr()
    arg = json.loads(out)
    assert arg["ansible_user"] == username
项目:ceph-lcm    作者:Mirantis    | 项目源码 | 文件源码
def run(self):
        if self.finished:
            return

        self.process = subprocess.Popen(
            [str(self.path)],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            shell=True
        )

        LOG.info("Run %s. Pid %d", self.path, self.process.pid)
        self.process.wait()
        logmethod = LOG.info if self.process.returncode == os.EX_OK \
            else LOG.warning
        logmethod("%s has been finished. Exit code %s",
                  self.path, self.process.returncode)

        self.stdout = self.process.stdout.read().decode("utf-8")
        self.stderr = self.process.stderr.read().decode("utf-8")

        if self.process.returncode != os.EX_OK:
            raise RuntimeError(
                "Program {0} has been finished with exit code {1}",
                self.path, self.process.returncode)
项目:ceph-lcm    作者:Mirantis    | 项目源码 | 文件源码
def get_package_version(prefix, connection, package_name):
    command = "dpkg-query --showformat='${Version}' --show %s" % shlex.quote(
        package_name)
    result = await connection.run(command)

    if result.exit_status != os.EX_OK:
        click.echo(
            "{0}package (failed {1}): {2} - {3}".format(
                prefix, result.exit_status, package_name, result.stderr.strip()
            )
        )
    else:
        click.echo(
            "{0}package (ok): {1}=={2}".format(
                prefix, package_name, result.stdout.strip()
            )
        )
项目:aegea    作者:kislyuk    | 项目源码 | 文件源码
def call(self, cmd, **kwargs):
        print('Running "{}"'.format(cmd), file=sys.stderr)
        expect = kwargs.pop("expect", [dict(return_codes=[os.EX_OK], stdout=None, stderr=None)])
        process = subprocess.Popen(cmd, stdin=kwargs.get("stdin", subprocess.PIPE), stdout=subprocess.PIPE,
                                   stderr=subprocess.PIPE, **kwargs)
        out, err = process.communicate()
        return_code = process.poll()
        out = out.decode(sys.stdin.encoding)
        err = err.decode(sys.stdin.encoding)

        def match(return_code, out, err, expected):
            exit_ok = return_code in expected["return_codes"]
            stdout_ok = re.search(expected.get("stdout") or "", out)
            stderr_ok = re.search(expected.get("stderr") or "", err)
            return exit_ok and stdout_ok and stderr_ok
        if not any(match(return_code, out, err, exp) for exp in expect):
            print(err)
            e = subprocess.CalledProcessError(return_code, cmd, output=out)
            e.stdout, e.stderr = out, err
            raise e
        return self.SubprocessResult(out, err, return_code)
项目:aegea    作者:kislyuk    | 项目源码 | 文件源码
def grep(args):
    filter_args = dict(logGroupName=args.log_group)
    if args.log_stream:
        filter_args.update(logStreamNames=[args.log_stream])
    if args.pattern:
        filter_args.update(filterPattern=args.pattern)
    if args.start_time:
        filter_args.update(startTime=int(timestamp(args.start_time) * 1000))
    if args.end_time:
        filter_args.update(endTime=int(timestamp(args.end_time) * 1000))
    num_results = 0
    while True:
        for event in paginate(clients.logs.get_paginator("filter_log_events"), **filter_args):
            if "timestamp" not in event or "message" not in event:
                continue
            print(str(Timestamp(event["timestamp"])), event["message"])
            num_results += 1
        if args.follow:
            time.sleep(1)
        else:
            return SystemExit(os.EX_OK if num_results > 0 else os.EX_DATAERR)
项目:python-tldp    作者:tLDP    | 项目源码 | 文件源码
def build(config, docs, **kwargs):
    if not config.pubdir:
        return ERR_NEEDPUBDIR + "to --build"
    ready, error = builddir_setup(config)
    if not ready:
        return error
    ready, error = prepare_docs_build_mode(config, docs)
    if not ready:
        return error
    buildsuccess, results = docbuild(config, docs, **kwargs)
    for x, (buildcode, source) in enumerate(results, 1):
        if buildcode:
            logger.info("success (%d of %d) available in %s",
                        x, len(results), source.working.dirname)
        else:
            logger.info("FAILURE (%d of %d) available in %s",
                        x, len(results), source.working.dirname)
    if buildsuccess:
        return os.EX_OK
    else:
        return "Build failed, see logging output in %s." % (config.builddir,)
项目:python-tldp    作者:tLDP    | 项目源码 | 文件源码
def publish(config, docs, **kwargs):
    config.build = True
    result = build(config, docs, **kwargs)
    if result != os.EX_OK:
        return result
    for x, source in enumerate(docs, 1):
        logger.info("Publishing (%d of %d) to %s.",
                    x, len(docs), source.output.dirname)
        # -- swapdirs must raise an error if there are problems
        #
        swapdirs(source.working.dirname, source.output.dirname)
        if os.path.isdir(source.working.dirname):
            logger.debug("%s removing old directory %s",
                         source.stem, source.working.dirname)
            shutil.rmtree(source.working.dirname)
    workingdirs = list(set([x.dtworkingdir for x in docs]))
    workingdirs.append(config.builddir)
    post_publish_cleanup(workingdirs)
    return os.EX_OK
项目:python-tldp    作者:tLDP    | 项目源码 | 文件源码
def test_summary_longnames(self):
        c = self.config
        names = self.publishDocumentsWithLongNames(5)
        stdout = io.StringIO()
        result = tldp.driver.summary(c, file=stdout)
        self.assertEqual(result, os.EX_OK)
        stdout.seek(0)
        data = stdout.read()
        self.assertTrue('and 4 more' in data)
        c.verbose = True
        stdout = io.StringIO()
        result = tldp.driver.summary(c, file=stdout)
        self.assertEqual(result, os.EX_OK)
        stdout.seek(0)
        data = stdout.read()
        for name in names:
            self.assertTrue(name in data)
项目:python-tldp    作者:tLDP    | 项目源码 | 文件源码
def test_run(self):
        c = self.config
        ex = example.ex_linuxdoc
        self.add_published('Published-HOWTO', ex)
        self.add_new('New-HOWTO', ex)
        self.add_stale('Stale-HOWTO', ex)
        self.add_orphan('Orphan-HOWTO', ex)
        self.add_broken('Broken-HOWTO', ex)
        fullpath = opj(self.tempdir, 'sources', 'New-HOWTO.sgml')
        argv = self.argv
        argv.extend(['--publish', 'stale', 'Orphan-HOWTO', fullpath])
        exitcode = tldp.driver.run(argv)
        self.assertEqual(exitcode, os.EX_OK)
        inv = tldp.inventory.Inventory(c.pubdir, c.sourcedir)
        self.assertEqual(4, len(inv.published.keys()))
        self.assertEqual(1, len(inv.broken.keys()))
项目:functest    作者:opnfv    | 项目源码 | 文件源码
def is_successful(self):
        """Interpret the result of the test case.

        It allows getting the result of TestCase. It completes run()
        which only returns the execution status.

        It can be overriden if checking result is not suitable.

        Returns:
            TestCase.EX_OK if result is 'PASS'.
            TestCase.EX_TESTCASE_FAILED otherwise.
        """
        try:
            assert self.criteria
            assert self.result is not None
            if (not isinstance(self.result, str) and
                    not isinstance(self.criteria, str)):
                if self.result >= self.criteria:
                    return TestCase.EX_OK
            else:
                # Backward compatibility
                # It must be removed as soon as TestCase subclasses
                # stop setting result = 'PASS' or 'FAIL'.
                # In this case criteria is unread.
                self.__logger.warning(
                    "Please update result which must be an int!")
                if self.result == 'PASS':
                    return TestCase.EX_OK
        except AssertionError:
            self.__logger.error("Please run test before checking the results")
        return TestCase.EX_TESTCASE_FAILED
项目:functest    作者:opnfv    | 项目源码 | 文件源码
def push_to_db(self):
        """Push the results of the test case to the DB.

        It allows publishing the results and to check the status.

        It could be overriden if the common implementation is not
        suitable. The following attributes must be set before pushing
        the results to DB:

            * project_name,
            * case_name,
            * result,
            * start_time,
            * stop_time.

        Returns:
            TestCase.EX_OK if results were pushed to DB.
            TestCase.EX_PUSH_TO_DB_ERROR otherwise.
        """
        try:
            assert self.project_name
            assert self.case_name
            assert self.start_time
            assert self.stop_time
            pub_result = 'PASS' if self.is_successful(
                ) == TestCase.EX_OK else 'FAIL'
            if ft_utils.push_results_to_db(
                    self.project_name, self.case_name, self.start_time,
                    self.stop_time, pub_result, self.details):
                self.__logger.info(
                    "The results were successfully pushed to DB")
                return TestCase.EX_OK
            else:
                self.__logger.error("The results cannot be pushed to DB")
                return TestCase.EX_PUSH_TO_DB_ERROR
        except Exception:  # pylint: disable=broad-except
            self.__logger.exception("The results cannot be pushed to DB")
            return TestCase.EX_PUSH_TO_DB_ERROR
项目:functest    作者:opnfv    | 项目源码 | 文件源码
def _run(self, args):  # pylint: disable=no-self-use
        """ The built_in function to run a test case """

        case_name = args.get('testcase')
        self._update_logging_ini(args.get('task_id'))

        try:
            cmd = "run_tests -t {}".format(case_name)
            runner = ft_utils.execute_command(cmd)
        except Exception:  # pylint: disable=broad-except
            result = 'FAIL'
            LOGGER.exception("Running test case %s failed!", case_name)
        if runner == os.EX_OK:
            result = 'PASS'
        else:
            result = 'FAIL'

        env_info = {
            'installer': CONST.__getattribute__('INSTALLER_TYPE'),
            'scenario': CONST.__getattribute__('DEPLOY_SCENARIO'),
            'build_tag': CONST.__getattribute__('BUILD_TAG'),
            'ci_loop': CONST.__getattribute__('CI_LOOP')
        }
        result = {
            'task_id': args.get('task_id'),
            'testcase': case_name,
            'env_info': env_info,
            'result': result
        }

        return {'result': result}
项目:functest    作者:opnfv    | 项目源码 | 文件源码
def __init__(self):
        self.executed_test_cases = {}
        self.overall_result = Result.EX_OK
        self.clean_flag = True
        self.report_flag = False
        self._tiers = tb.TierBuilder(
            CONST.__getattribute__('INSTALLER_TYPE'),
            CONST.__getattribute__('DEPLOY_SCENARIO'),
            pkg_resources.resource_filename('functest', 'ci/testcases.yaml'))
项目:functest    作者:opnfv    | 项目源码 | 文件源码
def main(self, **kwargs):
        """Entry point of class Runner"""
        if 'noclean' in kwargs:
            self.clean_flag = not kwargs['noclean']
        if 'report' in kwargs:
            self.report_flag = kwargs['report']
        try:
            if 'test' in kwargs:
                self.source_rc_file()
                LOGGER.debug("Test args: %s", kwargs['test'])
                if self._tiers.get_tier(kwargs['test']):
                    self.run_tier(self._tiers.get_tier(kwargs['test']))
                elif self._tiers.get_test(kwargs['test']):
                    result = self.run_test(
                        self._tiers.get_test(kwargs['test']))
                    if result != testcase.TestCase.EX_OK:
                        LOGGER.error("The test case '%s' failed.",
                                     kwargs['test'])
                        self.overall_result = Result.EX_ERROR
                elif kwargs['test'] == "all":
                    self.run_all()
                else:
                    LOGGER.error("Unknown test case or tier '%s', or not "
                                 "supported by the given scenario '%s'.",
                                 kwargs['test'],
                                 CONST.__getattribute__('DEPLOY_SCENARIO'))
                    LOGGER.debug("Available tiers are:\n\n%s",
                                 self._tiers)
                    return Result.EX_ERROR
            else:
                self.run_all()
        except BlockingTestFailed:
            pass
        except Exception:  # pylint: disable=broad-except
            LOGGER.exception("Failures when running testcase(s)")
            self.overall_result = Result.EX_ERROR
        if not self._tiers.get_test(kwargs['test']):
            self.summary(self._tiers.get_tier(kwargs['test']))
        LOGGER.info("Execution exit value: %s", self.overall_result)
        return self.overall_result
项目:functest    作者:opnfv    | 项目源码 | 文件源码
def summary(self, tier=None):
        """To generate functest report showing the overall results"""
        msg = prettytable.PrettyTable(
            header_style='upper', padding_width=5,
            field_names=['env var', 'value'])
        for env_var in ['INSTALLER_TYPE', 'DEPLOY_SCENARIO', 'BUILD_TAG',
                        'CI_LOOP']:
            msg.add_row([env_var, CONST.__getattribute__(env_var)])
        LOGGER.info("Deployment description:\n\n%s\n", msg)
        msg = prettytable.PrettyTable(
            header_style='upper', padding_width=5,
            field_names=['test case', 'project', 'tier',
                         'duration', 'result'])
        tiers = [tier] if tier else self._tiers.get_tiers()
        for each_tier in tiers:
            for test in each_tier.get_tests():
                try:
                    test_case = self.executed_test_cases[test.get_name()]
                except KeyError:
                    msg.add_row([test.get_name(), test.get_project(),
                                 each_tier.get_name(), "00:00", "SKIP"])
                else:
                    result = 'PASS' if(test_case.is_successful(
                        ) == test_case.EX_OK) else 'FAIL'
                    msg.add_row(
                        [test_case.case_name, test_case.project_name,
                         self._tiers.get_tier_name(test_case.case_name),
                         test_case.get_duration(), result])
            for test in each_tier.get_skipped_test():
                msg.add_row([test.get_name(), test.get_project(),
                             each_tier.get_name(), "00:00", "SKIP"])
        LOGGER.info("FUNCTEST REPORT:\n\n%s\n", msg)
项目:pyrate-build    作者:pyrate-build    | 项目源码 | 文件源码
def main():
    version_info = 'pyrate version ' + __version__
    try:
        if os.environ.get('TESTOLDIMPORTS'):
            raise ImportError()
        import argparse
        parser = argparse.ArgumentParser()
        parser.add_argument('build_file', nargs = '?', default = 'build.py',
            help = 'name of the input file - default: build.py')
        parser.add_argument('-V', '--version', action = 'version', version = version_info)
        parser.add_argument('-M', '--makefile', action = 'store_true', help = 'enable makefile mode')
        parser.add_argument('-o', '--output', nargs = 1, default = None,
            help = 'name of output build file')
        args = parser.parse_args()
        if args.output:
            args.output = args.output[0]
        bfn = args.build_file
    except ImportError:
        optparse = __import__('optparse')
        parser = optparse.OptionParser(usage = 'pyrate [options] build_file')
        parser.add_option('-V', '--version', action='store_true', help = 'display version')
        parser.add_option('-M', '--makefile', action = 'store_true', help = 'enable makefile mode')
        parser.add_option('-o', '--output', default = None,
            help = 'name of output build file', dest='output')
        (args, posargs) = parser.parse_args()
        if len(posargs) > 1:
            sys.stderr.write('too many build_file arguments provided! %s\n' % repr(posargs))
            return os.EX_USAGE
        elif not posargs:
            posargs = ['build.py']
        bfn = posargs[0]
        if args.version:
            sys.stderr.write(version_info + '\n')
            sys.exit(os.EX_OK)

    generate_build_file(bfn, args.output, args.makefile)

################################################################################
# Externals + helper functions
################################################################################
项目:redisrwlock    作者:veshboo    | 项目源码 | 文件源码
def test_option_help(self):
        """test --help option"""
        cmd, output = runCmdOutput(['--help'])
        self.assertEqual(cmd.returncode, os.EX_OK)
项目:redisrwlock    作者:veshboo    | 项目源码 | 文件源码
def test_option_version(self):
        """test --version option"""
        cmd, output = runCmdOutput(['--version'])
        self.assertEqual(cmd.returncode, os.EX_OK)
项目:redisrwlock    作者:veshboo    | 项目源码 | 文件源码
def test_logging_config(self):
        """test logging config from file or default"""
        topdir = os.path.dirname(os.path.dirname(__file__))
        # logging config from default
        os.system('rm %s/logging.conf' % topdir)
        cmd, output = runCmdOutput(['-p', '7788'])
        self.assertEqual(cmd.returncode, os.EX_OK)
        # logging config from file
        os.system('cp %s/logging.conf.sample %s/logging.conf' %
                  (topdir, topdir))
        cmd, output = runCmdOutput(['-p', '7788'])
        self.assertEqual(cmd.returncode, os.EX_OK)
项目:do-like-javac    作者:SRI-CSL    | 项目源码 | 文件源码
def parse_args():
    to_parse, cmd, capturer = split_args_to_parse()

    global_argparser = create_argparser()

    args = global_argparser.parse_args(to_parse)

    if capturer:
        return args, cmd, capturer
    else:
        global_argparser.print_help()
        sys.exit(os.EX_OK)
项目:temci    作者:parttimenerd    | 项目源码 | 文件源码
def _instancecheck_impl(self, value, info: Info) -> InfoMsg:
        if not isinstance(value, str):
            return info.errormsg(self)
        is_valid = True
        if os.path.exists(value):
            if os.path.isdir(value) and os.access(os.path.abspath(value), os.W_OK)\
                    and (self.constraint is None or self.constraint(value)):
                return info.wrap(True)
            return info.errormsg(self)
        abs_name = os.path.abspath(value)
        dir_name = os.path.dirname(abs_name)
        if os.path.exists(dir_name) and os.access(dir_name, os.EX_OK) and os.access(dir_name, os.W_OK) \
            and (self.constraint is None or self.constraint(value)):
            return info.wrap(True)
        return info.errormsg(self)
项目:yan    作者:motet-a    | 项目源码 | 文件源码
def main():
    argument_parser = get_argument_parser()
    options = argument_parser.parse_args()

    if options.list_checkers:
        list_checkers()
        return

    program = Program(options)
    program.check()
    program.print_issues()
    errors = [e for e in program.issues
              if e.level == 'error' or e.level == 'syntax-error']
    sys.exit(os.EX_OK if len(errors) == 0 else os.EX_DATAERR)
项目:homemadescripts    作者:helioloureiro    | 项目源码 | 文件源码
def check_if_run():
    pid = read_file(PIDFILE)
    current_pid = os.getpid()
    if pid is None:
        return
    if int(pid) > 0 and int(pid) != current_pid:
        if os.path.exists("/proc/%d" % int(pid)):
            log("[%s] Already running - keepalive done." % time.ctime())
            sys.exit(os.EX_OK)
项目:ceph-lcm    作者:Mirantis    | 项目源码 | 文件源码
def mocked_plugin():
    patch = unittest.mock.patch("decapod_common.plugins.get_playbook_plugins")
    with patch as ptch:
        plugin = unittest.mock.MagicMock()

        required_mock = unittest.mock.MagicMock()
        required_mock.pid = 100
        required_mock.returncode = os.EX_OK
        plugin.execute.return_value.__enter__.return_value = required_mock

        ptch.return_value.get.return_value.return_value = plugin

        yield required_mock
项目:ceph-lcm    作者:Mirantis    | 项目源码 | 文件源码
def test_command_result(proc):
    proc.options["-c"] = ""
    result = proc.run()

    time.sleep(2)

    assert result.pid
    assert result.returncode == os.EX_OK
    assert result.stdout
    assert result.stdin is None
    assert not result.alive()
    assert str(result)
    assert repr(result)
项目:ceph-lcm    作者:Mirantis    | 项目源码 | 文件源码
def test_command_result_running(proc):
    proc.options["-c"] = "import time; time.sleep(2)"
    result = proc.run()

    assert result.pid
    assert result.returncode is None
    assert result.alive()
    assert str(result)
    assert repr(result)

    time.sleep(3.5)

    assert result.pid
    assert result.returncode == os.EX_OK
    assert not result.alive()
项目:ceph-lcm    作者:Mirantis    | 项目源码 | 文件源码
def execute(self, tsk, stop_ev):
        # Small hack to prevent execution of callback BEFORE task
        # happen to arrive into self.data. It is possible because
        # submitting task into pool is eager.
        while tsk.id not in self.data:
            time.sleep(0.1)

        plugin = self.get_plugin(tsk)

        with plugin.execute(tsk) as process:
            tsk = tsk.set_executor_data(platform.node(), process.pid)

            LOG.info(
                "Management process for task %s was started. Pid %d",
                tsk, process.pid
            )

            while not stop_ev.is_set() and process.alive():
                stop_ev.wait(0.5)

            process.stop()

            LOG.info(
                "Management process for task %s with PID %d has "
                "stopped with exit code %d",
                tsk, process.pid, process.returncode
             )

            if process.returncode != os.EX_OK:
                raise ChildProcessError(
                    "Process exit with code {0}".format(process.returncode))
项目:ceph-lcm    作者:Mirantis    | 项目源码 | 文件源码
def main():
    if not PATH_ANSIBLE:
        sys.exit("Cannot find ansible-playbook executable.")

    logging.basicConfig(
        format="%(asctime)s [%(levelname)-5s] %(message)s",
        level=logging.DEBUG)

    if not os.path.isfile(ANSIBLE_CONFIG_PATH):
        LOG.warning("Cannot find Ansible config at %r", ANSIBLE_CONFIG_PATH)

    for cluster in get_clusters():
        if not cluster["configuration"]:
            LOG.info("Skip cluster %s because it has no servers",
                     cluster["name"])
            continue

        server = random.choice(cluster["configuration"])
        server = server["server_id"]
        server = get_server_by_id(server)

        commandline = get_ansible_commandline(
            server["username"], server["ip"], cluster["name"])

        LOG.info("Collect from %s", cluster["name"])
        LOG.debug("Execute %s", commandline)

        code = execute(commandline)

        if code == os.EX_OK:
            LOG.info("Collected from %s", cluster["name"])
        else:
            LOG.warning("Cannot collect from %s: %d", cluster["name"], code)
项目:ceph-lcm    作者:Mirantis    | 项目源码 | 文件源码
def get_ceph_version(prefix, connection, cluster_name):
    command = "sudo -EHn -- ceph --cluster {0} version".format(
        shlex.quote(cluster_name))
    result = await connection.run(command)

    if result.exit_status != os.EX_OK:
        click.echo(
            "{0}ceph-version (failed {1}): {2}".format(
                prefix, result.exit_status, result.stderr.strip()))
    else:
        click.echo(
            "{0}ceph-version (ok): {1}".format(
                prefix, result.stdout.strip()))
项目:aegea    作者:kislyuk    | 项目源码 | 文件源码
def test_dry_run_commands(self):
        unauthorized_ok = [dict(return_codes=[os.EX_OK]),
                           dict(return_codes=[1, os.EX_SOFTWARE], stderr="UnauthorizedOperation")]
        self.call("aegea launch unittest --dry-run --no-verify-ssh-key-pem-file",
                  shell=True, expect=unauthorized_ok)
        self.call("aegea launch unittest --dry-run --spot --no-verify-ssh-key-pem-file",
                  shell=True, expect=unauthorized_ok)
        self.call("aegea launch unittest --dry-run --duration-hours 1 --no-verify-ssh-key-pem-file",
                  shell=True, expect=unauthorized_ok)
        self.call("aegea launch unittest --duration 0.5 --min-mem 6 --cores 2 --dry-run --no-verify --client-token t",
                  shell=True, expect=unauthorized_ok)
        self.call("aegea build_ami i --dry-run --no-verify-ssh-key-pem-file",
                  shell=True, expect=unauthorized_ok)
项目:aegea    作者:kislyuk    | 项目源码 | 文件源码
def test_secrets(self):
        unauthorized_ok = [dict(return_codes=[os.EX_OK]),
                           dict(return_codes=[1, os.EX_SOFTWARE], stderr="(AccessDenied|NoSuchKey)")]
        self.call("test_secret=test aegea secrets put test_secret --iam-role aegea.launch",
                  shell=True, expect=unauthorized_ok)
        self.call("aegea secrets put test_secret --generate-ssh-key --iam-role aegea.launch",
                  shell=True, expect=unauthorized_ok)
        self.call("aegea secrets ls", shell=True, expect=unauthorized_ok)
        self.call("aegea secrets ls --json", shell=True, expect=unauthorized_ok)
        self.call("aegea secrets get test_secret --iam-role aegea.launch", shell=True, expect=unauthorized_ok)
        self.call("aegea secrets delete test_secret --iam-role aegea.launch", shell=True, expect=unauthorized_ok)
项目:aegea    作者:kislyuk    | 项目源码 | 文件源码
def check_output(self, command, input_data=None, stderr=sys.stderr):
        logger.debug('Running "%s"', command)
        ssh_stdin, ssh_stdout, ssh_stderr = self.exec_command(command)
        if input_data is not None:
            ssh_stdin.write(input_data)
        exit_code = ssh_stdout.channel.recv_exit_status()
        stderr.write(ssh_stderr.read().decode("utf-8"))
        if exit_code != os.EX_OK:
            raise Exception('Error while running "{}": {}'.format(command, os.errno.errorcode.get(exit_code)))
        return ssh_stdout.read().decode("utf-8")
项目:jipdate    作者:Linaro    | 项目源码 | 文件源码
def main(argv):
    parser = get_parser()

    # The parser arguments (cfg.args) are accessible everywhere after this call.
    cfg.args = parser.parse_args()

    # This initiates the global yml configuration instance so it will be
    # accessible everywhere after this call.
    cfg.initiate_config()

    if not cfg.args.file and not cfg.args.q:
        eprint("No file provided and not in query mode\n")
        parser.print_help()
        sys.exit(os.EX_USAGE)

    jira, username = jiralogin.get_jira_instance(cfg.args.t)

    if cfg.args.x or cfg.args.e:
        if not cfg.args.q:
            eprint("Arguments '-x' and '-e' can only be used together with '-q'")
            sys.exit(os.EX_USAGE)

    if cfg.args.p and not cfg.args.q:
        eprint("Arguments '-p' can only be used together with '-q'")
        sys.exit(os.EX_USAGE)

    if cfg.args.q:
        filename = get_jira_issues(jira, username)

        if cfg.args.p:
            print_status_file(filename)
            sys.exit(os.EX_OK)
    elif cfg.args.file is not None:
        filename = cfg.args.file
    else:
        eprint("Trying to run script with unsupported configuration. Try using --help.")
        sys.exit(os.EX_USAGE)

    if get_editor():
        open_editor(filename)
    parse_status_file(jira, filename)
项目:ovirt-system-tests    作者:oVirt    | 项目源码 | 文件源码
def ping_vm0(ovirt_prefix):
    nt.assert_equals(_ping(ovirt_prefix, VM0_PING_DEST), EX_OK)
项目:ovirt-system-tests    作者:oVirt    | 项目源码 | 文件源码
def restore_vm0_networking(ovirt_prefix):
    # Networking may not work after resume.  We need this pseudo-test for the
    # purpose of reviving VM networking by rebooting the VM.  We must be
    # careful to reboot just the guest OS, not to restart the whole VM, to keep
    # checking for contingent failures after resume.
    # A better solution might be using a guest OS other than Cirros.
    if _ping(ovirt_prefix, VM0_PING_DEST) == EX_OK:
        return
    host = _vm_host(ovirt_prefix, VM0_NAME)
    uri = 'qemu+tls://%s/system' % host.name()
    ret = host.ssh(['virsh', '-c', uri, 'reboot', '--mode', 'acpi', VM0_NAME])
    nt.assert_equals(ret.code, EX_OK)
    # We might want to wait until ssh server inside the VM gets up.  But the
    # interim tests, especially *_recovery, and repeated ssh connection
    # attempts in host.ssh calls should give enough time.
项目:ovirt-system-tests    作者:oVirt    | 项目源码 | 文件源码
def ping_vm0(ovirt_prefix):
    nt.assert_equals(_ping(ovirt_prefix, VM0_PING_DEST), EX_OK)
项目:ovirt-system-tests    作者:oVirt    | 项目源码 | 文件源码
def restore_vm0_networking(ovirt_prefix):
    # Networking may not work after resume.  We need this pseudo-test for the
    # purpose of reviving VM networking by rebooting the VM.  We must be
    # careful to reboot just the guest OS, not to restart the whole VM, to keep
    # checking for contingent failures after resume.
    # A better solution might be using a guest OS other than Cirros.
    if _ping(ovirt_prefix, VM0_PING_DEST) == EX_OK:
        return
    host = _vm_host(ovirt_prefix, VM0_NAME)
    uri = 'qemu+tls://%s/system' % host.name()
    ret = host.ssh(['virsh', '-c', uri, 'reboot', '--mode', 'acpi', VM0_NAME])
    nt.assert_equals(ret.code, EX_OK)
    # We might want to wait until ssh server inside the VM gets up.  But the
    # interim tests, especially *_recovery, and repeated ssh connection
    # attempts in host.ssh calls should give enough time.
项目:python-tldp    作者:tLDP    | 项目源码 | 文件源码
def script(config, docs, **kwargs):
    ready, error = prepare_docs_script_mode(config, docs)
    if not ready:
        return error
    file = kwargs.get('file', sys.stdout)
    print(preamble, file=file)
    buildsuccess, results = docbuild(config, docs, **kwargs)
    print(postamble, file=file)
    for errcode, source in results:
        if not errcode:
            logger.error("Could not generate script for %s", source.stem)
    if buildsuccess:
        return os.EX_OK
    else:
        return "Script generation failed."
项目:python-tldp    作者:tLDP    | 项目源码 | 文件源码
def test_run_detail(self):
        self.add_published('Published-HOWTO', example.ex_linuxdoc)
        self.add_new('New-HOWTO', example.ex_linuxdoc)
        self.add_stale('Stale-HOWTO', example.ex_linuxdoc)
        self.add_orphan('Orphan-HOWTO', example.ex_linuxdoc)
        self.add_broken('Broken-HOWTO', example.ex_linuxdoc)
        argv = self.argv
        argv.append('--detail')
        exitcode = tldp.driver.run(argv)
        self.assertEqual(exitcode, os.EX_OK)
项目:python-tldp    作者:tLDP    | 项目源码 | 文件源码
def test_run_doctypes(self):
        exitcode = tldp.driver.run(['--doctypes'])
        self.assertEqual(exitcode, os.EX_OK)
项目:python-tldp    作者:tLDP    | 项目源码 | 文件源码
def test_show_statustypes(self):
        stdout = io.StringIO()
        result = tldp.driver.show_statustypes(Namespace(), file=stdout)
        self.assertEqual(result, os.EX_OK)
        stdout.seek(0)
        data = stdout.read()
        for status in status_types:
            self.assertTrue(stypes[status] in data)
项目:python-tldp    作者:tLDP    | 项目源码 | 文件源码
def test_run_statustypes(self):
        exitcode = tldp.driver.run(['--statustypes'])
        self.assertEqual(exitcode, os.EX_OK)
项目:python-tldp    作者:tLDP    | 项目源码 | 文件源码
def test_run_summary(self):
        self.add_published('Published-HOWTO', example.ex_linuxdoc)
        self.add_new('New-HOWTO', example.ex_linuxdoc)
        self.add_stale('Stale-HOWTO', example.ex_linuxdoc)
        self.add_orphan('Orphan-HOWTO', example.ex_linuxdoc)
        self.add_broken('Broken-HOWTO', example.ex_linuxdoc)
        argv = self.argv
        argv.append('--summary')
        exitcode = tldp.driver.run(argv)
        self.assertEqual(exitcode, os.EX_OK)