Python pytest 模块,fail() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用pytest.fail()

项目:ufodiff    作者:source-foundry    | 项目源码 | 文件源码
def test_ufodiff_delta_class_instantiation_branch_with_ufo_filter():
    make_testing_branch()

    try:
        deltaobj = Delta('.', ['Font-Regular.ufo'], is_branch_test=True, compare_branch_name='testing_branch')
        assert deltaobj.is_commit_test is False
        assert deltaobj.is_branch_test is True
        assert deltaobj.compare_branch_name == "testing_branch"
        assert deltaobj.commit_number == "0"
        assert len(deltaobj.ufo_directory_list) == 1
        assert deltaobj.ufo_directory_list[0] == "Font-Regular.ufo"
    except Exception as e:
        delete_testing_branch()
        pytest.fail(e)

    delete_testing_branch()
项目:shopify_python    作者:Shopify    | 项目源码 | 文件源码
def _runTest(self):
        self._linter.check([self._test_file.module])

        expected_messages, expected_text = self._get_expected()
        received_messages, received_text = self._get_received()

        if expected_messages != received_messages:
            msg = ['Wrong results for file "%s":' % (self._test_file.base)]
            missing, unexpected = multiset_difference(expected_messages,
                                                      received_messages)
            if missing:
                msg.append('\nExpected in testdata:')
                msg.extend(' %3d: %s' % msg for msg in sorted(missing))
            if unexpected:
                msg.append('\nUnexpected in testdata:')
                msg.extend(' %3d: %s' % msg for msg in sorted(unexpected))
            pytest.fail('\n'.join(msg))
        self._check_output_text(expected_messages, expected_text, received_text)
项目:robograph    作者:csparpa    | 项目源码 | 文件源码
def test_connect():
    n1 = WithParameters(a=1, b=2)
    n2 = WithParameters(c=1, d=2)
    g = graph.Graph('testgraph', [n1, n2])
    not_included = WithParameters(e=1, f=2)

    # Errors when trying to link nodes coming from out of the graph
    with pytest.raises(exceptions.NodeConnectionError):
        g.connect(n1, not_included, 'blabla')
        pytest.fail()
    with pytest.raises(exceptions.NodeConnectionError):
        g.connect(not_included, n2, 'blabla')
        pytest.fail()

    # Now the correct procedure
    assert n2.output_label is None
    assert len(g.nxgraph.edges()) == 0
    g.connect(n1, n2, 'label')
    assert n2.output_label == 'label'
    assert len(g.nxgraph.edges()) == 1
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def pytest_runtest_item(self, item):
        lines1 = self.get_open_files()
        yield
        if hasattr(sys, "pypy_version_info"):
            gc.collect()
        lines2 = self.get_open_files()

        new_fds = set([t[0] for t in lines2]) - set([t[0] for t in lines1])
        leaked_files = [t for t in lines2 if t[0] in new_fds]
        if leaked_files:
            error = []
            error.append("***** %s FD leakage detected" % len(leaked_files))
            error.extend([str(f) for f in leaked_files])
            error.append("*** Before:")
            error.extend([str(f) for f in lines1])
            error.append("*** After:")
            error.extend([str(f) for f in lines2])
            error.append(error[0])
            error.append("*** function %s:%s: %s " % item.location)
            pytest.fail("\n".join(error), pytrace=False)


# XXX copied from execnet's conftest.py - needs to be merged
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _addexcinfo(self, rawexcinfo):
        # unwrap potential exception info (see twisted trial support below)
        rawexcinfo = getattr(rawexcinfo, '_rawexcinfo', rawexcinfo)
        try:
            excinfo = _pytest._code.ExceptionInfo(rawexcinfo)
        except TypeError:
            try:
                try:
                    l = traceback.format_exception(*rawexcinfo)
                    l.insert(0, "NOTE: Incompatible Exception Representation, "
                        "displaying natively:\n\n")
                    pytest.fail("".join(l), pytrace=False)
                except (pytest.fail.Exception, KeyboardInterrupt):
                    raise
                except:
                    pytest.fail("ERROR: Unknown Incompatible Exception "
                        "representation:\n%r" %(rawexcinfo,), pytrace=False)
            except KeyboardInterrupt:
                raise
            except pytest.fail.Exception:
                excinfo = _pytest._code.ExceptionInfo()
        self.__dict__.setdefault('_excinfo', []).append(excinfo)
项目:ubuntu-standalone-builder    作者:OddBloke    | 项目源码 | 文件源码
def test_binary_hook_sequence_is_lower_than_030(
            self, write_cloud_config_in_memory, monkeypatch):
        # That's the lowest sequence of disks and we want to filter before that
        filter_template = '-- binary hook filter:{} --'
        hook_filter = 'some*glob'
        monkeypatch.setattr(
            generate_build_config,
            "BINARY_HOOK_FILTER_CONTENT", filter_template)
        output = write_cloud_config_in_memory(binary_hook_filter=hook_filter)
        cloud_config = yaml.load(output)
        expected_content = filter_template.format(hook_filter)
        for stanza in cloud_config['write_files']:
            content = base64.b64decode(stanza['content']).decode('utf-8')
            if content == expected_content:
                break
        else:
            pytest.fail('Binary hook filter not correctly included.')
        path = stanza['path'].rsplit('/')[-1]
        assert path < '030-some-file.binary'
项目:srctools    作者:TeamSpen210    | 项目源码 | 文件源码
def assert_vec(vec, x, y, z, msg=''):
    """Asserts that Vec is equal to (x,y,z)."""
    # Don't show in pytest tracebacks.
    __tracebackhide__ = True

    # Ignore slight variations
    if not vec.x == pytest.approx(x):
        failed = 'x'
    elif not vec.y == pytest.approx(y):
        failed = 'y'
    elif not vec.z == pytest.approx(z):
        failed = 'z'
    else:
        # Success!
        return

    new_msg = "{!r} != ({}, {}, {})".format(vec, failed, x, y, z)
    if msg:
        new_msg += ': ' + msg
    pytest.fail(new_msg)
项目:Dallinger    作者:Dallinger    | 项目源码 | 文件源码
def test_collect(self):
        from dallinger.experiments import Bartlett1932
        exp = Bartlett1932()
        existing_uuid = "12345-12345-12345-12345"
        data = exp.collect(existing_uuid, recruiter=u'bots')
        assert isinstance(data, dallinger.data.Data)

        dataless_uuid = "ed9e7ddd-3e97-452d-9e34-fee5d432258e"
        dallinger.data.register(dataless_uuid, 'https://bogus-url.com/something')

        try:
            data = exp.collect(dataless_uuid, recruiter=u'bots')
        except RuntimeError:
            # This is expected for an already registered UUID with no accessible data
            pass
        else:
            pytest.fail('Did not raise RuntimeError for existing UUID')

        # In debug mode an unknown UUID fails
        unknown_uuid = "c85d5086-2fa7-4baf-9103-e142b9170cca"
        with pytest.raises(RuntimeError):
            data = exp.collect(unknown_uuid, mode=u'debug', recruiter=u'bots')
项目:aiomongo    作者:ZeoAlliance    | 项目源码 | 文件源码
def test_index_2dsphere(self, test_db):
        assert 'geo_2dsphere' == await test_db.test.create_index([('geo', GEOSPHERE)])

        for dummy, info in (await test_db.test.index_information()).items():
            field, idx_type = info['key'][0]
            if field == 'geo' and idx_type == '2dsphere':
                break
        else:
            pytest.fail('2dsphere index not found.')

        poly = {'type': 'Polygon',
                'coordinates': [[[40, 5], [40, 6], [41, 6], [41, 5], [40, 5]]]}
        query = {'geo': {'$within': {'$geometry': poly}}}

        # This query will error without a 2dsphere index.
        async with test_db.test.find(query) as cursor:
            async for _ in cursor:
                pass
项目:deb-python-falcon    作者:openstack    | 项目源码 | 文件源码
def test_forbidden(self, client, media_type):
        headers = {'Accept': media_type}

        expected_body = {
            'title': 'Request denied',
            'description': ('You do not have write permissions for this '
                            'queue.'),
            'link': {
                'text': 'Documentation related to this error',
                'href': 'http://example.com/api/rbac',
                'rel': 'help',
            },
        }

        response = client.simulate_post(path='/fail', headers=headers)

        assert response.status == falcon.HTTP_403
        assert response.json == expected_body
项目:deb-python-falcon    作者:openstack    | 项目源码 | 文件源码
def test_epic_fail_json(self, client):
        headers = {'Accept': 'application/json'}

        expected_body = {
            'title': 'Internet crashed',
            'description': 'Catastrophic weather event due to climate change.',
            'code': 8733224,
            'link': {
                'text': 'Drill baby drill!',
                'href': 'http://example.com/api/climate',
                'rel': 'help',
            },
        }

        response = client.simulate_put('/fail', headers=headers)

        assert response.status == falcon.HTTP_792
        assert response.json == expected_body
项目:deb-python-falcon    作者:openstack    | 项目源码 | 文件源码
def test_epic_fail_xml(self, client, media_type):
        headers = {'Accept': media_type}

        expected_body = ('<?xml version="1.0" encoding="UTF-8"?>' +
                         '<error>' +
                         '<title>Internet crashed</title>' +
                         '<description>' +
                         'Catastrophic weather event due to climate change.' +
                         '</description>' +
                         '<code>8733224</code>' +
                         '<link>' +
                         '<text>Drill baby drill!</text>' +
                         '<href>http://example.com/api/climate</href>' +
                         '<rel>help</rel>' +
                         '</link>' +
                         '</error>')

        response = client.simulate_put(path='/fail', headers=headers)

        assert response.status == falcon.HTTP_792
        try:
            et.fromstring(response.content.decode('utf-8'))
        except ValueError:
            pytest.fail()
        assert response.text == expected_body
项目:aiotools    作者:achimnol    | 项目源码 | 文件源码
def test_actxmgr_exception_nested():

    @aiotools.actxmgr
    async def simple_ctx(msg):
        yield msg

    try:
        async with simple_ctx('hello') as msg1:
            async with simple_ctx('world') as msg2:
                assert msg1 == 'hello'
                assert msg2 == 'world'
                raise IndexError('bomb1')
    except BaseException as exc:
        assert isinstance(exc, IndexError)
        assert 'bomb1' == exc.args[0]
    else:
        pytest.fail()
项目:aiotools    作者:achimnol    | 项目源码 | 文件源码
def test_actxmgr_exception_chained():

    @aiotools.actxmgr
    async def simple_ctx(msg):
        try:
            await asyncio.sleep(0)
            yield msg
        except Exception as e:
            await asyncio.sleep(0)
            # exception is chained
            raise ValueError('bomb2') from e

    try:
        async with simple_ctx('hello') as msg:
            assert msg == 'hello'
            raise IndexError('bomb1')
    except BaseException as exc:
        assert isinstance(exc, ValueError)
        assert 'bomb2' == exc.args[0]
        assert isinstance(exc.__cause__, IndexError)
        assert 'bomb1' == exc.__cause__.args[0]
    else:
        pytest.fail()
项目:utils    作者:rapydo    | 项目源码 | 文件源码
def test():

    t = timestamp_from_string("423423423")

    d = datetime(1983, 6, 2, 17, 37, 3, tzinfo=pytz.utc)
    assert t == d

    try:
        t = timestamp_from_string("asd")
    except ValueError:
        pass
    else:
        pytest.fail("This call should raise a ValueError")

    t = timestamp_from_string("0")
    d = datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc)
    assert t == d

    t = timestamp_from_string("-1")
    d = datetime(1969, 12, 31, 23, 59, 59, tzinfo=pytz.utc)
    assert t == d
项目:utils    作者:rapydo    | 项目源码 | 文件源码
def test():

    packages = list_all()
    assert packages is not None
    assert isinstance(packages, list)
    assert len(packages) > 0

    found = False
    for pkg in packages:
        if pkg._key == "rapydo-utils":
            found = True

    if not found:
        pytest.fail("rapydo.utils not found in the list of packages")

    ver = check_version("rapydo-utils")
    assert ver is not None

    ver = check_version("rapydo-blabla")
    assert ver is None

    install("docker-compose")
项目:parglare    作者:igordejanovic    | 项目源码 | 文件源码
def test_repeatable_one_zero_rr_conflicts():
    """
    Check that translations of B+ and B* don't produce R/R conflict.
    """
    grammar = """
    S: A B+ C;
    S: A B* D;
    A:; B:; C:; D:;
    """
    g = Grammar.from_string(grammar, _no_check_recognizers=True)

    # Check if parser construction raises exception
    try:
        Parser(g)
    except RRConflicts:
        pytest.fail("R/R conflicts not expected here.")
项目:2FAssassin    作者:maxwellkoh    | 项目源码 | 文件源码
def test_exceptions_in_server_bubble_up(self):
        """
        The callbacks thrown in the server callback bubble up to the caller.
        """
        class SentinelException(Exception):
            pass

        def server_callback(*args):
            raise SentinelException()

        def client_callback(*args):  # pragma: nocover
            pytest.fail("Should not be called")

        client = self._client_connection(callback=client_callback, data=None)
        server = self._server_connection(callback=server_callback, data=None)

        with pytest.raises(SentinelException):
            handshake_in_memory(client, server)
项目:execnet    作者:pytest-dev    | 项目源码 | 文件源码
def test_gateway_status_busy(self, gw):
        numchannels = gw.remote_status().numchannels
        ch1 = gw.remote_exec("channel.send(1); channel.receive()")
        ch2 = gw.remote_exec("channel.receive()")
        ch1.receive()
        status = gw.remote_status()
        assert status.numexecuting == 2  # number of active execution threads
        assert status.numchannels == numchannels + 2
        ch1.send(None)
        ch2.send(None)
        ch1.waitclose()
        ch2.waitclose()
        for i in range(10):
            status = gw.remote_status()
            if status.numexecuting == 0:
                break
        else:
            pytest.fail("did not get correct remote status")
        # race condition
        assert status.numchannels <= numchannels
项目:execnet    作者:pytest-dev    | 项目源码 | 文件源码
def test_popen_filetracing(self, testdir, monkeypatch, makegateway):
        tmpdir = testdir.tmpdir
        monkeypatch.setenv("TMP", tmpdir)
        monkeypatch.setenv("TEMP", tmpdir)  # windows
        monkeypatch.setenv('EXECNET_DEBUG', "1")
        gw = makegateway("popen")
        #  hack out the debuffilename
        fn = gw.remote_exec(
            "import execnet;channel.send(execnet.gateway_base.fn)"
        ).receive()
        slavefile = py.path.local(fn)
        assert slavefile.check()
        slave_line = "creating slavegateway"
        for line in slavefile.readlines():
            if slave_line in line:
                break
        else:
            py.test.fail("did not find %r in tracefile" % (slave_line,))
        gw.exit()
项目:chalice    作者:aws    | 项目源码 | 文件源码
def test_can_have_same_cors_configurations(self, sample_app):
        @sample_app.route('/cors', methods=['GET'], cors=True)
        def cors():
            pass

        @sample_app.route('/cors', methods=['PUT'], cors=True)
        def same_cors():
            pass

        try:
            validate_routes(sample_app.routes)
        except ValueError:
            pytest.fail(
                'A ValueError was unexpectedly thrown. Applications '
                'may have multiple view functions that share the same '
                'route and CORS configuration.'
            )
项目:chalice    作者:aws    | 项目源码 | 文件源码
def test_can_have_one_cors_configured_and_others_not(self, sample_app):
        @sample_app.route('/cors', methods=['GET'], cors=True)
        def cors():
            pass

        @sample_app.route('/cors', methods=['PUT'])
        def no_cors():
            pass

        try:
            validate_routes(sample_app.routes)
        except ValueError:
            pytest.fail(
                'A ValueError was unexpectedly thrown. Applications '
                'may have multiple view functions that share the same '
                'route but only one is configured for CORS.'
            )
项目:taf    作者:taf3    | 项目源码 | 文件源码
def get_isis_topology(self, instance=None):
        """Get IS-IS topology table.

        Returns:
            list[dict]: List of dictionary with keys: vertex, type, metric, next_hop, interface, parent

        """
        if instance:
            try:
                topology_output = instance.cli_send_command('vtysh -c "show isis topology"').stdout
            except UICmdException as ex:
                self.class_logger.error(ex)
                pytest.fail('Failed to get IS-IS topology, node id {}.'.format(
                    instance.switch.id))
            table = list(self.parse_isis_table_topology(topology_output.strip().splitlines()[4:]))
            for record in table:
                record["interface"] = instance.name_to_portid_map.get(record["interface"])
            return table
        else:
            raise UIException("UI instance isn't specified.")
项目:taf    作者:taf3    | 项目源码 | 文件源码
def get_node_hostname(self, instance=None):
        """Get node's hostname.

        Returns:
            str:  Hostname of node.

        """
        if instance:
            try:
                hostname_output = instance.cli_send_command('hostname').stdout
            except UICmdException as ex:
                self.class_logger.error(ex)
                pytest.fail('Failed to get hostname, node id {}.'.format(
                    instance.switch.id))
            return hostname_output.strip()
        else:
            raise UIException("UI instance isn't specified.")
项目:taf    作者:taf3    | 项目源码 | 文件源码
def is_row_added_to_l2multicast_table(mac_address=None, port_id=None, vlan_id=1, switch_instance=None, result=True):
    """Check if row with specified parameters added to L2Multicast table.

    """
    # Need to wait until entry will be added to L2Multicast table.
    time.sleep(1)
    table = switch_instance.getprop_table("L2Multicast")
    is_entry_added = False
    if table:
        for row in table:
            if (row['macAddress'] == mac_address) and (row['portId'] == port_id) and (row['vlanId'] == vlan_id):
                is_entry_added = True
    if is_entry_added == result:
        return True
    else:
        if is_entry_added is False:
            pytest.fail("Entry is not added to L2Multicast table!")
        if is_entry_added is True:
            pytest.fail("Entry is added to L2Multicast table (should not be)!")
项目:taf    作者:taf3    | 项目源码 | 文件源码
def console_clear_config(self):
        """Clear device configuration using console connection

        """
        cmd = [
            "from xmlrpclib import ServerProxy", "rc = -1",
            "rc = ServerProxy('http://127.0.0.1:8081/RPC2').nb.clearConfig()",
            "print 'clearConfig() returnCode={0}.'.format(rc)"]
        command = "python -c \"" + "; ".join(cmd) + "\""

        output, err, _ = self.telnet.exec_command(command.encode("ascii"))
        if err:
            message = "Cannot perform clearConfig.\nCommand: %s.\nStdout: %s\nStdErr: %s" % (command, output, err)
            self.class_logger.error(message)
            self.db_corruption = True
            pytest.fail(message)
        else:
            if "returnCode=0." in output:
                self.class_logger.debug("ClearConfig finished. StdOut:\n%s" % (output, ))
            else:
                message = "ClearConfig failed. StdOut:\n%s" % (output, )
                self.class_logger.error(message)
                pytest.fail(message)
项目:taf    作者:taf3    | 项目源码 | 文件源码
def check(self):
        """Check if switch is operational using waiton method.

        Notes:
            This mandatory method for all environment classes.

        """
        if not self.status:
            self.class_logger.info("Skip switch id:%s(%s) check because it's has Off status." % (self.id, self.name))
            return
        status = self.waiton()
        # Verify Ports table is not empty
        if self.ui.get_table_ports() == []:
            if self.opts.fail_ctrl == 'stop':
                self.class_logger.debug("Exit switch check. Ports table is empty!")
                pytest.exit('Ports table is empty!')
            else:
                self.class_logger.debug("Fail switch check. Ports table is empty!")
                pytest.fail('Ports table is empty!')
        return status
项目:taf    作者:taf3    | 项目源码 | 文件源码
def test_import_helpers_module():
    """Verify that all modules can be imported within 'helpers' module and classes objects can be created.

    """
    module_name = "helpers"
    try:
        # define test data
        def assertion_error_func():
            """Assertion error.

            """
            assert 0

        from testlib import helpers
        helpers.raises(AssertionError, "", assertion_error_func)
    except ImportError as err:
        pytest.fail("Import failure in '%s' module: %s" % (module_name, err))
项目:taf    作者:taf3    | 项目源码 | 文件源码
def test_import_common3_module(monkeypatch):
    """Verify that all modules can be imported within 'common3' module and 'Cross'/'Environment' objects can be created.

    """

    def fake_get_conf(env_object, path_string):
        """Get config.

        """
        return {'env': []}

    module_name = "common3"
    try:
        from testlib import common3
        common3.Cross(None, None)

        # replace Environment _get_setup method
        monkeypatch.setattr(common3.Environment, "_get_setup", fake_get_conf)
        common3.Environment(FakeOpts())
    except ImportError as err:
        pytest.fail("Import failure in '%s' module: %s" % (module_name, err))
项目:cookiecutter-django-app    作者:edx    | 项目源码 | 文件源码
def test_readme(cookies):
    """The generated README.rst file should pass some sanity checks and validate as a PyPI long description."""
    extra_context = {'repo_name': 'helloworld'}
    with bake_in_temp_dir(cookies, extra_context=extra_context) as result:

        readme_file = result.project.join('README.rst')
        readme_lines = [x.strip() for x in readme_file.readlines(cr=False)]
        assert 'helloworld' in readme_lines
        assert 'The full documentation is at https://helloworld.readthedocs.org.' in readme_lines
        setup_path = str(result.project.join('setup.py'))
        try:
            sh.python(setup_path, 'check', restructuredtext=True, strict=True)
        except sh.ErrorReturnCode as exc:
            pytest.fail(str(exc))
项目:cookiecutter-django-app    作者:edx    | 项目源码 | 文件源码
def check_quality(result):
    """Run quality tests on the given generated output."""
    for dirpath, _dirnames, filenames in os.walk(str(result.project)):
        pylintrc = str(result.project.join('pylintrc'))
        for filename in filenames:
            name = os.path.join(dirpath, filename)
            if not name.endswith('.py'):
                continue
            try:
                sh.pylint(name, rcfile=pylintrc)
                sh.pylint(name, py3k=True)
                sh.pycodestyle(name)
                if filename != 'setup.py':
                    sh.pydocstyle(name)
                sh.isort(name, check_only=True)
            except sh.ErrorReturnCode as exc:
                pytest.fail(str(exc))

    tox_ini = result.project.join('tox.ini')
    docs_build_dir = result.project.join('docs/_build')
    try:
        # Sanity check the generated Makefile
        sh.make('help')
        # quality check docs
        sh.doc8(result.project.join("README.rst"), ignore_path=docs_build_dir, config=tox_ini)
        sh.doc8(result.project.join("docs"), ignore_path=docs_build_dir, config=tox_ini)
    except sh.ErrorReturnCode as exc:
        pytest.fail(str(exc))
项目:foremast    作者:gogoair    | 项目源码 | 文件源码
def test_all_iam_templates(template_name):
    """Verify all IAM templates render as proper JSON."""
    *_, service_json = template_name.split('/')
    service, *_ = service_json.split('.')

    items = ['resource1', 'resource2']

    if service == 'rds-db':
        items = {
            'resource1': 'user1',
            'resource2': 'user2',
        }

    try:
        rendered = render_policy_template(
            account_number='',
            app='coreforrest',
            env='dev',
            group='forrest',
            items=items,
            pipeline_settings={
                'lambda': {
                    'vpc_enabled': False,
                },
            },
            region='us-east-1',
            service=service)
    except json.decoder.JSONDecodeError:
        pytest.fail('Bad template: {0}'.format(template_name), pytrace=False)

    assert isinstance(rendered, list)
项目:kubey    作者:bradrf    | 项目源码 | 文件源码
def _release(self, name):
        attr = getattr(self, '_orig_' + name)
        setattr(subprocess, name, attr)
        delattr(self, '_orig_' + name)
        responder = self.responders[name]
        if not responder.is_satisfied:
            pytest.fail('Unsatisfied responder: %s' % responder)
项目:ufodiff    作者:source-foundry    | 项目源码 | 文件源码
def test_ufodiff_delta_class_instantiation_commit():
    try:
        deltaobj = Delta('.', [], is_commit_test=True, commit_number='2')
        assert deltaobj.is_commit_test is True
        assert deltaobj.is_branch_test is False
        assert deltaobj.commit_number == "2"
        assert deltaobj.compare_branch_name is None
        assert len(deltaobj.ufo_directory_list) == 0
    except Exception as e:
        pytest.fail(e)
项目:ufodiff    作者:source-foundry    | 项目源码 | 文件源码
def test_ufodiff_delta_class_instantiation_branch():
    make_testing_branch()

    try:
        deltaobj = Delta('.', [], is_branch_test=True, compare_branch_name='testing_branch')
        assert deltaobj.is_commit_test is False
        assert deltaobj.is_branch_test is True
        assert deltaobj.compare_branch_name == "testing_branch"
        assert deltaobj.commit_number == "0"
        assert len(deltaobj.ufo_directory_list) == 0
    except Exception as e:
        delete_testing_branch()
        pytest.fail(e)

    delete_testing_branch()
项目:ufodiff    作者:source-foundry    | 项目源码 | 文件源码
def test_ufodiff_delta_class_instantiation_commit_with_ufo_filter():
    try:
        deltaobj = Delta('.', ['Font-Regular.ufo'], is_commit_test=True, commit_number='2')
        assert deltaobj.is_commit_test is True
        assert deltaobj.is_branch_test is False
        assert deltaobj.commit_number == "2"
        assert deltaobj.compare_branch_name is None
        assert len(deltaobj.ufo_directory_list) == 1
        assert deltaobj.ufo_directory_list[0] == "Font-Regular.ufo"
    except Exception as e:
        pytest.fail(e)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def error_on_ResourceWarning():
    """This fixture captures ResourceWarning's and reports an "error"
    describing the file handles left open.

    This is shown regardless of how successful the test was, if a test fails
    and leaves files open then those files will be reported.  Ideally, even
    those files should be closed properly after a test failure or exception.

    Since only Python 3 and PyPy3 have ResourceWarning's, this context will
    have no effect when running tests on Python 2 or PyPy.

    Because of autouse=True, this function will be automatically enabled for
    all test_* functions in this module.

    This code is primarily based on the examples found here:
    https://stackoverflow.com/questions/24717027/convert-python-3-resourcewarnings-into-exception
    """
    try:
        ResourceWarning
    except NameError:
        # Python 2, PyPy
        yield
        return
    # Python 3, PyPy3
    with warnings.catch_warnings(record=True) as caught:
        warnings.resetwarnings() # clear all filters
        warnings.simplefilter('ignore') # ignore all
        warnings.simplefilter('always', ResourceWarning) # add filter
        yield # run tests in this context
        gc.collect() # run garbage collection (for pypy3)
        if not caught:
            return
        pytest.fail('The following file descriptors were not closed properly:\n' +
                    '\n'.join((str(warning.message) for warning in caught)),
                    pytrace=False)
项目:integration    作者:mendersoftware    | 项目源码 | 文件源码
def yocto_id_from_ext4(self, filename):
        try:
            cmd = "debugfs -R 'cat %s' %s| sed -n 's/^%s=//p'" % (self.artifact_info_file, filename, self.artifact_prefix)
            output = subprocess.check_output(cmd, shell=True).strip()
            logger.info("Running: " + cmd + " returned: " + output)
            return output

        except subprocess.CalledProcessError:
            pytest.fail("Unable to read: %s, is it a broken image?" % (filename))

        except Exception, e:
            pytest.fail("Unexpected error trying to read ext4 image: %s, error: %s" % (filename, str(e)))
项目:integration    作者:mendersoftware    | 项目源码 | 文件源码
def artifact_id_randomize(self, install_image, device_type="vexpress-qemu", specific_image_id=None):

        if specific_image_id:
            imageid = specific_image_id
        else:
            imageid = "mender-%s" % str(random.randint(0, 99999999))

        config_file = r"""%s=%s""" % (self.artifact_prefix, imageid)
        tfile = tempfile.NamedTemporaryFile(delete=False)
        tfile.write(config_file)
        tfile.close()

        try:
            cmd = "debugfs -w -R 'rm %s' %s" % (self.artifact_info_file, install_image)
            logger.info("Running: " + cmd)
            output = subprocess.check_output(cmd, shell=True).strip()
            logger.info("Returned: " + output)

            cmd = ("printf 'cd %s\nwrite %s %s\n' | debugfs -w %s"
                   % (os.path.dirname(self.artifact_info_file),
                      tfile.name, os.path.basename(self.artifact_info_file), install_image))
            logger.info("Running: " + cmd)
            output = subprocess.check_output(cmd, shell=True).strip()
            logger.info("Returned: " + output)

        except subprocess.CalledProcessError:
            pytest.fail("Trying to modify ext4 image failed, probably because it's not a valid image.")

        except Exception, e:
            pytest.fail("Unexpted error trying to modify ext4 image: %s, error: %s" % (install_image, str(e)))

        finally:
            os.remove(tfile.name)

        return imageid
项目:integration    作者:mendersoftware    | 项目源码 | 文件源码
def verify_reboot_performed(self, max_wait=60*30):
        logger.info("waiting for system to reboot")

        successful_connections = 0
        timeout = time.time() + max_wait

        while time.time() <= timeout:
            try:
                with settings(warn_only=True, abort_exception=FabricFatalException):
                    time.sleep(1)
                    if exists(self.tfile):
                        logger.debug("temp. file still exists, device hasn't rebooted.")
                        continue
                    else:
                        logger.debug("temp. file no longer exists, device has rebooted.")
                        successful_connections += 1

                    # try connecting 10 times before returning
                    if successful_connections <= 9:
                        continue
                    return

            except (BaseException):
                logger.debug("system exit was caught, this is probably because SSH connectivity is broken while the system is rebooting")
                continue

        if time.time() > timeout:
            pytest.fail("Device never rebooted!")
项目:integration    作者:mendersoftware    | 项目源码 | 文件源码
def common_update_procedure(install_image,
                            regenerate_image_id=True,
                            device_type="vexpress-qemu",
                            broken_image=False,
                            verify_status=True,
                            signed=False,
                            devices=None,
                            scripts=[]):

    with artifact_lock:
        if broken_image:
            artifact_id = "broken_image_" + str(random.randint(0, 999999))
        elif regenerate_image_id:
            artifact_id = Helpers.artifact_id_randomize(install_image)
            logger.debug("randomized image id: " + artifact_id)
        else:
            artifact_id = Helpers.yocto_id_from_ext4(install_image)

        # create atrifact
        with tempfile.NamedTemporaryFile() as artifact_file:
            created_artifact = image.make_artifact(install_image, device_type, artifact_id, artifact_file, signed=signed, scripts=scripts)

            if created_artifact:
                deploy.upload_image(created_artifact)
                if devices is None:
                    devices = list(set([device["device_id"] for device in adm.get_devices_status("accepted")]))
                deployment_id = deploy.trigger_deployment(name="New valid update",
                                                          artifact_name=artifact_id,
                                                          devices=devices)
            else:
                logger.warn("failed to create artifact")
                pytest.fail("error creating artifact")

    # wait until deployment is in correct state
    if verify_status:
        deploy.check_expected_status("inprogress", deployment_id)

    return deployment_id, artifact_id
项目:integration    作者:mendersoftware    | 项目源码 | 文件源码
def test_ssl_only(self):
        """ make sure we are not exposing any non-ssl connections in production environment """
        done = False
        sleep_time = 2
        # start production environment
        subprocess.call(["./production_test_env.py", "--start"])

        # get all exposed ports from docker

        for _ in range(3):
            exposed_hosts = subprocess.check_output("docker ps | grep %s | grep -o -E '0.0.0.0:[0-9]*'" % ("testprod"), shell=True)

            try:
                for host in exposed_hosts.split():
                    with contextlib.closing(ssl.wrap_socket(socket.socket())) as sock:
                        logging.info("%s: connect to host with TLS" % host)
                        host, port = host.split(":")
                        sock.connect((host, int(port)))
                        done = True
            except:
                sleep_time *= 2
                time.sleep(sleep_time)
                continue

            if done:
                break

        # tear down production env
        subprocess.call(["./production_test_env.py", "--kill"])

        if not done:
            pytest.fail("failed to connect to production env. using SSL")
项目:integration    作者:mendersoftware    | 项目源码 | 文件源码
def test_token_token_expiration(self):
        """ verify that an expired token is handled correctly (client gets a new, valid one)
            and that deployments are still recieved by the client
        """

        if not env.host_string:
            execute(self.test_token_token_expiration,
                    hosts=get_mender_clients())
            return

        timeout_time = int(time.time()) + 60
        while int(time.time()) < timeout_time:
            with quiet():
                output = run("journalctl -u mender -l --no-pager | grep \"received new authorization data\"")
                time.sleep(1)

            if output.return_code == 0:
                logging.info("mender logs indicate new authorization data available")
                break

        if timeout_time <= int(time.time()):
            pytest.fail("timed out waiting for download retries")


        # this call verifies that the deployment process goes into an "inprogress" state
        # which is only possible when the client has a valid token.
        common_update_procedure(install_image=conftest.get_valid_image())
项目:integration    作者:mendersoftware    | 项目源码 | 文件源码
def mender_log_contains_aborted_string(self, mender_client_container="mender-client"):
        expected_string = "deployment aborted at the backend"

        for _ in range(60*5):
            with settings(hide('everything'), warn_only=True):
                out = run("journalctl -u mender | grep \"%s\"" % expected_string)
                if out.succeeded:
                    return
                else:
                    time.sleep(2)

        pytest.fail("deployment never aborted.")
项目:integration    作者:mendersoftware    | 项目源码 | 文件源码
def perform_update(self, mender_client_container="mender-client", fail=False):

        if fail:
            execute(update_image_failed,
                    hosts=get_mender_client_by_container_name(mender_client_container))
        else:
            execute(update_image_successful,
                    install_image=conftest.get_valid_image(),
                    skip_reboot_verification=True,
                    hosts=get_mender_client_by_container_name(mender_client_container))
项目:integration    作者:mendersoftware    | 项目源码 | 文件源码
def wait_for_containers(expected_containers, defined_in):
    for _ in range(60 * 5):
        out = subprocess.check_output("docker-compose -p %s %s ps -q" % (conftest.docker_compose_instance, "-f " + " -f ".join(defined_in)), shell=True)
        if len(out.split()) == expected_containers:
            time.sleep(60)
            return
        else:
            time.sleep(1)

    pytest.fail("timeout: %d containers not running for docker-compose project: %s" % (expected_containers, conftest.docker_compose_instance))
项目:nucypher-kms    作者:nucypher    | 项目源码 | 文件源码
def get_ursula_by_id(self, ursula_id):
        try:
            ursula = self._ursulas[ursula_id]
        except KeyError:
            pytest.fail("No Ursula with ID {}".format(ursula_id))
        return ursula
项目:nucypher-kms    作者:nucypher    | 项目源码 | 文件源码
def test_all_ursulas_know_about_all_other_ursulas(ursulas):
    """
    Once launched, all Ursulas know about - and can help locate - all other Ursulas in the network.
    """
    ignorance = []
    for acounter, announcing_ursula in enumerate(blockchain_client._ursulas_on_blockchain):
        for counter, propagating_ursula in enumerate(ursulas):
            if not digest(announcing_ursula) in propagating_ursula.server.storage:
                ignorance.append((counter, acounter))
    if ignorance:
        pytest.fail(str(["{} didn't know about {}".format(counter, acounter) for counter, acounter in ignorance]))
项目:robograph    作者:csparpa    | 项目源码 | 文件源码
def test_execute_fails_with_graphs_with_isles():
    g = make_a_graph_with_isles()
    with pytest.raises(exceptions.GraphExecutionError):
        g.execute()
        pytest.fail()