Python pytest 模块,exit() 实例源码

我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用pytest.exit()

项目:taf    作者:taf3    | 项目源码 | 文件源码
def pytest_sessionstart(session):
    # Check options
    session.config.ctlogger = loggers.module_logger("conftest")
    session.config.ctlogger.debug("Session start...")
    if not (session.config.option.setup_scope in ["session", "module", "class", "function"]):
        session.config.ctlogger.error("Incorrect --setup_scope option.")
        pytest.exit("Incorrect --setup_scope option.")
    if not (session.config.option.call_check in ["none", "complete", "fast", "sanity_check_only"]):
        session.config.ctlogger.error("Incorrect --call_check option.")
        pytest.exit("Incorrect --call_check option.")
    if not (session.config.option.teardown_check in ["none", "complete", "fast", "sanity_check_only"]):
        session.config.ctlogger.error("Incorrect --teardown_check option.")
        pytest.exit("Incorrect --teardown_check option.")
    if not (session.config.option.fail_ctrl in ["stop", "restart", "ignore"]):
        session.config.ctlogger.error("Incorrect --fail_ctrl option.")
        pytest.exit("Incorrect --fail_ctrl option.")
    # Define environment
    session.config.env = common3.Environment(session.config.option)
项目:taf    作者:taf3    | 项目源码 | 文件源码
def check(self):
        """Check if switch is operational using waiton method.

        Notes:
            This mandatory method for all environment classes.

        """
        if not self.status:
            self.class_logger.info("Skip switch id:%s(%s) check because it's has Off status." % (self.id, self.name))
            return
        status = self.waiton()
        # Verify Ports table is not empty
        if self.ui.get_table_ports() == []:
            if self.opts.fail_ctrl == 'stop':
                self.class_logger.debug("Exit switch check. Ports table is empty!")
                pytest.exit('Ports table is empty!')
            else:
                self.class_logger.debug("Fail switch check. Ports table is empty!")
                pytest.fail('Ports table is empty!')
        return status
项目:taf    作者:taf3    | 项目源码 | 文件源码
def env_init(self, request):
        """Validate command line options.

        Args:
            request(pytest.request): pytest request

        Returns:
            testlib.common3.Environment: Environment instance

        """
        if request.config.option.setup_scope not in {"session", "module", "class", "function"}:
            request.config.ctlogger.error("Incorrect --setup_scope option.")
            pytest.exit("Incorrect --setup_scope option.")
        if request.config.option.call_check not in {"none", "complete", "fast", "sanity_check_only"}:
            request.config.ctlogger.error("Incorrect --call_check option.")
            pytest.exit("Incorrect --call_check option.")
        if request.config.option.teardown_check not in {"none", "complete", "fast", "sanity_check_only"}:
            request.config.ctlogger.error("Incorrect --teardown_check option.")
            pytest.exit("Incorrect --teardown_check option.")
        if request.config.option.fail_ctrl not in {"stop", "restart", "ignore"}:
            request.config.ctlogger.error("Incorrect --fail_ctrl option.")
            pytest.exit("Incorrect --fail_ctrl option.")

        request.config.env.testenv_checkstatus = False
        return request.config.env
项目:stepler    作者:Mirantis    | 项目源码 | 文件源码
def pytest_configure(config):
    """Hook to check steps consistency."""
    if config.option.disable_steps_checker:
        config.warn('P1', 'Step consistency checker is disabled!')
        return

    errors = []
    for step_cls in _get_step_classes():
        for attr_name in dir(step_cls):

            if attr_name not in STEPS:
                continue

            step_func = six.get_unbound_function(getattr(step_cls, attr_name))
            step_func = utils.get_unwrapped_func(step_func)

            validator = StepValidator(step_func)

            errors.extend(validator.validate())
    if errors:
        pytest.exit('Some steps are not consistent!\n' + '\n'.join(errors))
项目:pytestlab    作者:sangoma    | 项目源码 | 文件源码
def exit_gracefully(signum, frame):
    raise pytest.exit('Interrupting from SIGTERM')
项目:agent-python-pytest    作者:reportportal    | 项目源码 | 文件源码
def _stop_if_necessary(self):
        try:
            exc, msg, tb = self._errors.get(False)
            traceback.print_exception(exc, msg, tb)
            sys.stderr.flush()
            if not self.ignore_errors:
                pytest.exit(msg)
        except queue.Empty:
            pass
项目:taf    作者:taf3    | 项目源码 | 文件源码
def get(self, init_start=False, retry_count=5):
        """Checking OVS Controller.

        Args:
            init_start(bool):  Flag to start OVS Controller
            retry_count(int):  Retry attempts count

        """
        first_loop = True
        # Try to restart if necessary at least several times
        retry = 1
        # If fail_ctrl != "restart", restart retries won't be performed
        if self.opts.fail_ctrl != "restart":
            retry_count = 1
        while retry <= retry_count:
            try:
                if first_loop:
                    if init_start:
                        self.start()
                    else:
                        self.waiton()
                else:
                    self.restart()
                retry = retry_count + 1
            except Exception:
                self.class_logger.info("Error while checking Ovs Controller %s:%s..." % (self.ipaddr, self.port))
                retry += 1
                first_loop = False
                exc_type, exc_value, exc_traceback = sys.exc_info()
                traceback_message = traceback.format_exception(exc_type, exc_value, exc_traceback)
                message = "Error while checking Ovs Controller %s:%s:\n%s" % (self.ipaddr, self.port, "".join(traceback_message))
                sys.stderr.write(message)
                sys.stderr.flush()
                self.class_logger.log(loggers.levels['ERROR'], message)
                if retry >= retry_count + 1:
                    sys.stderr.write("Could not restart Ovs Controller for %s times. Something goes wrong \n" % (retry_count, ))
                    sys.stderr.flush()
                    if self.opts.fail_ctrl == "stop":
                        pytest.exit(message)
                    else:
                        pytest.fail(message)
项目:taf    作者:taf3    | 项目源码 | 文件源码
def get(self, init_start=False, retry_count=1):
        """Get or start linux host instance.

        Args:
            init_start(bool):  Perform switch start operation or not
            retry_count(int):  Number of retries to start(restart) linux host

        Returns:
            None or raise an exception.

        Notes:
            Also self.opts.fail_ctrl attribute affects logic of this method.
            fail_ctrl is set in py.test command line options (read py.test --help for more information).

        """
        # If fail_ctrl != "restart", restart retries won't be performed
        # as restart is not implemented for lhosts, retries makes no sense.
        # if self.opts.fail_ctrl != "restart":
        #    retry_count = 1

        try:
            if init_start:
                self.start()
            else:
                self.waiton()
        except KeyboardInterrupt as ex:
            message = "KeyboardInterrupt while checking device {0}({1})...".format(
                self.name, self.ipaddr)
            self.class_logger.info(message)
            self.sanitize()
            pytest.exit(message)
        except Exception:
            self.class_logger.error(
                "Error while checking device %s(%s)...", self.name, self.ipaddr)
            exc_type, exc_value, exc_traceback = sys.exc_info()
            traceback_message = traceback.format_exception(exc_type, exc_value, exc_traceback)
            message = "Error while checking device {0}({1}):\n{2}".format(
                self.name, self.ipaddr, "".join(traceback_message))
            sys.stderr.write(message)
            sys.stderr.flush()
            pytest.fail(message)
项目:taf    作者:taf3    | 项目源码 | 文件源码
def pytest_configure(config):
    """Registering plugin.

    """
    if config.option.setup:
        config.pluginmanager.register(OnsEnvPlugin(), "_onsenv")
    else:
        config.ctlogger.error("SETUP")
        pytest.exit("SETUP")
项目:dcos    作者:dcos    | 项目源码 | 文件源码
def stop(self):
        """Stop the LogCatcher instance and perform resource cleanup."""
        self._termination_flag.set()

        while self._logger_thread.is_alive():
            self._logger_thread.join(timeout=0.5)
            log.info("Waiting for LogCatcher thread to exit")

        log.info("LogCatcher thread has terminated, bye!")
项目:dcos    作者:dcos    | 项目源码 | 文件源码
def start(self):
        """Start a subprocess

        This method makes python actually spawn the subprocess and wait for it
        to finish initializing.
        """
        self._start_subprocess()

        self._register_stdout_stderr_to_logcatcher()

        if not self._wait_for_subprocess_to_finish_init():
            self.stop()
            pytest.exit("Failed to start `{}` process".format(self.id))
项目:dcos    作者:dcos    | 项目源码 | 文件源码
def stop(self):
        """Stop ManagedSubprocess instance and perform a cleanup

        This method makes sure that there are no child processes left after
        the object destruction finalizes. In case when a process cannot stop
        on it's own, it's forced to using SIGTERM/SIGKILL.
        """
        self._process.poll()
        if self._process.returncode is not None:
            msg_fmt = "`%s` process has already terminated with code `%s`"
            pytest.exit(msg_fmt % (self.id, self._process.returncode))
            return

        log.info("Send SIGINT to `%s` session leader", self.id)
        self._process.send_signal(signal.SIGINT)
        try:
            self._process.wait(self._EXIT_TIMEOUT / 2.0)
        except subprocess.TimeoutExpired:
            log.info("Send SIGTERM to `%s` session leader", self.id)
            self._process.send_signal(signal.SIGTERM)
            try:
                self._process.wait(self._EXIT_TIMEOUT / 2.0)
            except subprocess.TimeoutExpired:
                log.info("Send SIGKILL to all `%s` processess", self.id)
                os.killpg(os.getpgid(self._process.pid), signal.SIGKILL)
                log.info("wait() for `%s` session leader to die", self.id)
                self._process.wait()

        log.info("`%s` session leader has terminated", self.id)
项目:GSM-scanner    作者:yosriayed    | 项目源码 | 文件源码
def test_exit_propagates(self, testdir):
        try:
            testdir.runitem("""
                import pytest
                def test_func():
                    raise pytest.exit.Exception()
            """)
        except pytest.exit.Exception:
            pass
        else:
            pytest.fail("did not raise")
项目:GSM-scanner    作者:yosriayed    | 项目源码 | 文件源码
def test_pytest_exit():
    try:
        pytest.exit("hello")
    except pytest.exit.Exception:
        excinfo = py.code.ExceptionInfo()
        assert excinfo.errisinstance(KeyboardInterrupt)
项目:stepler    作者:Mirantis    | 项目源码 | 文件源码
def pytest_collection_modifyitems(session, items):
    """Add marker to test name, if test marked with `idempotent_id` marker.

    If optional kwargs passed - test parameters should be a
    superset of this kwargs to mark be applied.
    Also kwargs can be passed as `params` argument.
    """
    ids = defaultdict(list)
    for item in items:
        test_id = get_item_id(item)
        ids[test_id].append(item)
        if test_id is not None:
            item.name += '[id-{}]'.format(test_id)

    if session.config.option.check_idempotent_id:
        errors = []
        without_id = ids.pop(None, [])
        if without_id:
            errors += ["Tests without idempotent_id:"]
            errors += ['  ' + x.nodeid for x in without_id]
        for test_id, items in ids.items():
            if len(items) > 1:
                errors += ["Single idempotent_id for many cases:"]
                errors += ['  ' + x.nodeid for x in items]
        if errors:
            print('')
            print('\n'.join(errors))
            pytest.exit('Errors with idempotent_id')
项目:stepler    作者:Mirantis    | 项目源码 | 文件源码
def pytest_collection_modifyitems(config, items):
    """Hook to detect forbidden calls inside test."""
    if config.option.disable_steps_checker:
        config.warn('P1', 'Permitted calls checker is disabled!')
        return

    errors = []
    for item in items:
        permitted_calls = PERMITTED_CALLS + STEPS + item.funcargnames
        validator = TestValidator(item.function, permitted_calls)
        errors.extend(validator.validate())

    if errors:
        pytest.exit("Only steps and fixtures must be called in test!\n" +
                    '\n'.join(errors))
项目:stepler    作者:Mirantis    | 项目源码 | 文件源码
def pytest_collection_modifyitems(config, items):
    """Hook to prevent run destructive tests without snapshot_name."""
    stop_destructive = (config.option.snapshot_name is None and
                        not config.option.force_destructive)
    for item in items:
        if item.get_marker(DESTRUCTIVE) and stop_destructive:
            pytest.exit("You try to start destructive tests without passing "
                        "--snapshot-name for cloud reverting. Such tests can "
                        "break your cloud. To run destructive tests without "
                        "--snapshot-name you should pass --force-destructive "
                        "argument.")
项目:topotests    作者:FRRouting    | 项目源码 | 文件源码
def pytest_runtest_call():
    """
    This function must be run after setup_module(), it does standarized post
    setup routines. It is only being used for the 'topology-only' option.
    """
    # pylint: disable=E1101
    # Trust me, 'config' exists.
    if pytest.config.getoption('--topology-only'):
        tgen = get_topogen()
        if tgen is not None:
            # Allow user to play with the setup.
            tgen.mininet_cli()

        pytest.exit('the topology executed successfully')
项目:topotests    作者:FRRouting    | 项目源码 | 文件源码
def pytest_configure(config):
    "Assert that the environment is correctly configured."
    if not diagnose_env():
        pytest.exit('enviroment has errors, please read the logs')
项目:allure-python    作者:allure-framework    | 项目源码 | 文件源码
def test_history_id():
    """
    >>> allure_report = getfixture('allure_report')
    >>> assert_that(allure_report,
    ...             has_test_case('test_history_id',
    ...                           has_history_id()
    ...             )
    ... )
    """
    pass  # pytest.exit("Just test")
项目:taf    作者:taf3    | 项目源码 | 文件源码
def _connect_to_switch(self, prompt, timeout=20):
        """SSH connect to switch and wait until prompt string appeared.

        Args:
            prompt(str):  expected CLI prompt.
            timeout(int):  connection timeout.

        Returns:
            None

        Examples::

            self._connect_to_switches(sw_keys=1, prompt="Switch ")

        """
        cli_start_path = ''
        self.suite_logger.debug("Login on switch with login: {0} and expected prompt is: {1}".format(self.login, prompt))
        self.conn.login(self.login, self.passw, timeout=self.timeout)
        self.suite_logger.debug("Create Shell")
        self.conn.open_shell(raw_output=True)
        self.conn.shell.settimeout(self.timeout)
        # lxc: run command "python main.py"
        if self.devtype == 'lxc':
            self.suite_logger.debug("Launched CLI on LXC")
            if os.path.exists(os.path.join(self.build_path, self.img_path, 'main.py')) is True:
                cli_start_path = os.path.join(self.build_path, self.img_path)
                self.conn.shell_command('cd %s && python main.py -a %s -p %s'
                                        % (cli_start_path, self.ipaddr, self.xmlrpcport), timeout=5, ret_code=False, quiet=True)
            else:
                self.suite_logger.error("Path to CLI image does not exist: %s" % (os.path.join(cli_start_path, 'main.py')))
                pytest.exit("Path to CLI image does not exist: %s" % (os.path.join(cli_start_path, 'main.py')))
        else:
            self.suite_logger.debug("Waiting for CLI on Real switch")
            alter = []
            # Add one or few expected prompt(s) and action(s) to alternatives list
            if isinstance(prompt, str):
                prompt = [prompt]
            for single_prompt in prompt:
                alter.append((single_prompt, None, True, False))

            iterations = 12
            for i in range(iterations):
                time.sleep(10)
                login = self.conn.action_on_connect(self.conn.shell, alternatives=alter, timeout=timeout, is_shell=self.is_shell)
                if any(login.find(str(single_prompt)) != -1 for single_prompt in prompt):
                    break
                else:
                    self.suite_logger.debug("Waiting, current prompt is {0}".format(login))
                if i == iterations:
                    login = self.conn.action_on_expect(self.conn.shell, alternatives=alter, timeout=timeout, is_shell=self.is_shell)
项目:taf    作者:taf3    | 项目源码 | 文件源码
def waiton(self, timeout=DEFAULT_SERVER_WAIT_ON_TIMEOUT):
        """Wait until device is fully operational.

        Args:
            timeout(int):  Wait timeout

        Raises:
            SwitchException:  device doesn't response

        Returns:
            dict:  Status dictionary from probe method or raise an exception.

        """
        status = None
        message = "Waiting until device {0}({1}) is up.".format(self.name, self.ipaddr)
        self.class_logger.info(message)
        stop_flag = False
        end_time = time.time() + timeout
        while not stop_flag:
            if loggers.LOG_STREAM:
                sys.stdout.write(".")
                sys.stdout.flush()
            if time.time() < end_time:
                # While time isn't elapsed continue probing switch.
                try:
                    status = self.probe()
                except KeyboardInterrupt:
                    message = "KeyboardInterrupt while checking switch {0}({1})...".format(
                        self.name, self.ipaddr)
                    self.class_logger.info(message)
                    self.sanitize()
                    pytest.exit(message)
                if status["isup"]:
                    stop_flag = True
            else:
                # Time is elapsed.

                port = self._get_port_for_probe()
                message = "Timeout exceeded. IP address {0} port {1} doesn't respond.".format(
                    self.ipaddr, port)
                self.class_logger.warning(message)
                raise Exception(message)
            if not stop_flag:
                time.sleep(0.75)

        return status
项目:taf    作者:taf3    | 项目源码 | 文件源码
def waiton(self, timeout=90):
        """Wait until switch if fully operational.

        Args:
            timeout(int):  Wait timeout

        Raises:
            SwitchException:  device doesn't response

        Returns:
            dict:  Status dictionary from probe method or raise an exception.

        """
        status = None
        message = "Waiting until switch %s(%s) is up." % (self.name, self.ipaddr)
        self.class_logger.info(message)
        stop_flag = False
        end_time = time.time() + timeout
        while not stop_flag:
            if loggers.LOG_STREAM:
                sys.stdout.write(".")
                sys.stdout.flush()
            if time.time() < end_time:
                # While time isn't elapsed continue probing switch.
                try:
                    status = self.probe()
                except KeyboardInterrupt:
                    message = "KeyboardInterrupt while checking switch %s(%s)..." % (self.name, self.ipaddr)
                    self.class_logger.info(message)
                    self.sanitize()
                    pytest.exit(message)
                if status["isup"] and status["type"] == "switchpp":
                    stop_flag = True
                    if status['prop'] == {}:
                        # If type == switchpp but prop == empty dict then Platform table return incorrect data.
                        message = "Found running switchpp on %s but Platform data is corrupted." % (self.ipaddr, )
                        self.class_logger.warning(message)
                        raise SwitchException(message)
                    self.class_logger.info("Switch instance on %s(%s) is OK." % (self.name, self.ipaddr))
            else:
                # Time is elapsed.
                if status["isup"] and status["type"] != "switchpp":
                    message = ("Port %s on host %s is opened but doesn't response queries." +
                               " %s Check your environment!") % (self.port, self.ipaddr, self.waiton_err_message)
                else:
                    port = self._get_port_for_probe()
                    message = "Timeout exceeded. IP address %s port %s doesn't respond" % (self.ipaddr, port)
                self.class_logger.warning(message)
                raise SwitchException(message)
            if not stop_flag:
                time.sleep(0.75)
        return status
项目:taf    作者:taf3    | 项目源码 | 文件源码
def get(self, init_start=False, retry_count=7):
        """Get or start switch instance.

        Args:
            init_start(bool):  Perform switch start operation or not
            retry_count(int):  Number of retries to start(restart) switch

        Returns:
            None or raise an exception.

        Notes:
            Also self.opts.fail_ctrl attribute affects logic of this method.
            fail_ctrl is set in py.test command line options (read py.test --help for more information).

        """
        # If fail_ctrl != "restart", restart retries won't be performed
        if self.opts.fail_ctrl != "restart":
            retry_count = 1

        for retry in range(retry_count):
            try:
                if retry == 0:
                    if init_start:
                        self.start()
                    else:
                        self.waiton()
                else:
                    self.restart(mode=self.default_restart_type)
                break
            except KeyboardInterrupt:
                message = "KeyboardInterrupt while checking switch %s(%s)..." % (self.name, self.ipaddr)
                self.class_logger.info(message)
                self.sanitize()
                pytest.exit(message)
            except Exception:
                self.class_logger.warning("Error while checking switch %s(%s)..." % (self.name, self.ipaddr))
                retry += 1
                exc_type, exc_value, exc_traceback = sys.exc_info()
                traceback_message = traceback.format_exception(exc_type, exc_value, exc_traceback)
                message = "Error while checking switch %s(%s):\n%s" % (self.name, self.ipaddr, "".join(traceback_message))
                sys.stderr.write(message)
                sys.stderr.flush()
                self.class_logger.error(message)
                if retry > 4:
                    self.class_logger.warning(
                        "Could not complete switch start method for the fourth time. Trying to "
                        "reset the DB...")
                    self.db_corruption = True
                if retry >= retry_count + 1:
                    message = "Could not complete start switch method after {0} retries. Something " \
                              "went wrong...\n".format(retry_count)
                    sys.stderr.write(message)
                    sys.stderr.flush()
                    self.class_logger.error(message)
                    if self.opts.fail_ctrl != "ignore":
                        pytest.exit(message)
                    else:
                        pytest.fail(message)