Python pytest 模块,raises() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用pytest.raises()

项目:ln2sql    作者:FerreroJeremy    | 项目源码 | 文件源码
def noKeywordFoundExceptionQueries():
    noKeyword = [
        {
            'input': 'Affiche moi.',  # No keyword found in sentence!
            'database': DATABASE_PATH + 'ecole.sql',
            'language': LANG_PATH + 'french.csv'
        },
        {
            'input': 'Affiche moi les étudiants',  # No keyword found in sentence!
            'database': DATABASE_PATH + 'ecole.sql',
            'language': LANG_PATH + 'french.csv'
        }
    ]
    for test in noKeyword:
        with pytest.raises(Exception):
            Ln2sql(test['database'], test['language']).get_query(test['input'])
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def test_hooks_are_run_even_if_there_was_an_exception(wrapper):
    class Bar(object):

        def will_fail(self):
            assert ctx.before_call_count == 1
            raise Exception('heh')

    bar = Bar()
    ctx = ControllableContextManager()
    wrapper(
        cls_to_wrap=Bar, method_to_wrap_name='will_fail', context_manager=ctx,
        is_cls_method=False)
    with pytest.raises(Exception) as excinfo:
        bar.will_fail()
    assert str(excinfo.value) == 'heh'
    assert ctx.before_call_count == 1
    assert ctx.after_call_count == 1
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def test_when_inner_enter_fails(self):
        ControllableContextManager.reset_events()
        outer = ControllableContextManager()
        inner = ControllableContextManager(fail_in_method='__enter__')

        with pytest.raises(ControllableContextManager.TestException):
            self.run_nested_context_managers(outer, inner)
        expected_calls = self.get_recorded_calls()

        assert expected_calls == [
            (outer, '__enter__'), (inner, '__enter__'), (outer, '__exit__')
        ]

        ControllableContextManager.reset_events()
        with pytest.raises(ControllableContextManager.TestException):
            with multi_context_manager([outer, inner]):
                pass
        assert expected_calls == self.get_recorded_calls()
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def test_when_outer_enter_fails(self):
        ControllableContextManager.reset_events()
        outer = ControllableContextManager(fail_in_method='__enter__')
        inner = ControllableContextManager()

        with pytest.raises(ControllableContextManager.TestException):
            self.run_nested_context_managers(outer, inner)

        expected_calls = self.get_recorded_calls()
        assert expected_calls == [(outer, '__enter__')]

        ControllableContextManager.reset_events()
        with pytest.raises(ControllableContextManager.TestException):
            with multi_context_manager([outer, inner]):
                pass
        assert expected_calls == self.get_recorded_calls()
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def test_when_code_inside_context_managers_fails(self):
        ControllableContextManager.reset_events()
        outer = ControllableContextManager()
        inner = ControllableContextManager()

        def fail():
            raise NotImplementedError('hi')

        with pytest.raises(NotImplementedError):
            self.run_nested_context_managers(outer, inner, fn=fail)
        expected_calls = self.get_recorded_calls()
        assert expected_calls == [
            (outer, '__enter__'), (inner, '__enter__'),
            (inner, '__exit__'), (outer, '__exit__')
        ]

        ControllableContextManager.reset_events()
        with pytest.raises(NotImplementedError):
            with multi_context_manager([outer, inner]):
                fail()

        assert expected_calls == self.get_recorded_calls()
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def test_three_managers_middle_one_fails(self):
        """ this is like raising the LimitViolationError """
        ControllableContextManager.reset_events()
        outer = ControllableContextManager()
        middle = ControllableContextManager(fail_in_method='__exit__')
        inner = ControllableContextManager()

        ControllableContextManager.reset_events()
        with pytest.raises(ControllableContextManager.TestException):
            with outer:
                with middle:
                    with inner:
                        pass
        expected_calls = self.get_recorded_calls()
        assert expected_calls == [
            (outer, '__enter__'), (middle, '__enter__'), (inner, '__enter__'),
            (inner, '__exit__'), (middle, '__exit__'), (outer, '__exit__')
        ]
        ControllableContextManager.reset_events()
        with pytest.raises(ControllableContextManager.TestException):
            with multi_context_manager([outer, middle, inner]):
                pass
        assert expected_calls == self.get_recorded_calls()
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def test_can_specify_typed_limits(db):
    with pytest.raises(LimitViolationError) as excinfo:
        with QueryBatchLimit(write=0, read=3):
            list(Group.objects.all())
            list(Group.objects.all())
            list(Group.objects.all())
            Group.objects.update(name='foo')

    with pytest.raises(LimitViolationError) as excinfo:
        with QueryBatchLimit(read=0):
            list(Group.objects.all())
    assert excinfo.value.context == {}
    assert excinfo.value.actual == '1'
    assert excinfo.value.limit == 0
    assert excinfo.value.name == 'read'

    with pytest.raises(LimitViolationError) as excinfo:
        with QueryBatchLimit(write=1):
            Group.objects.update(name='baz')
            Group.objects.update(name='name')
    assert excinfo.value.context == {}
    assert excinfo.value.actual == '2'
    assert excinfo.value.limit == 1
    assert excinfo.value.name == 'write'
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def test_has_support_for_elapsed_time_in_template_render(settings):
    settings.PERFORMANCE_LIMITS = {
        'Template.render': {
            'time': {
                'total': 0
            }
        }
    }
    template = loader.get_template('all-group-names.markdown')
    with freeze_time('2016-09-29 15:52:01') as frozen_time:
        class SlowIterable(object):
            def __iter__(self):
                yield 'foo'
                frozen_time.tick(timedelta(seconds=5))
                yield 'bar'

        with pytest.raises(LimitViolationError) as excinfo:
            template.render(context={'groups': SlowIterable()})

    assert excinfo.value.context == {'template': ['all-group-names.markdown']}
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def test_can_specify_limits_through_settings_for_django_test_client(
        db, settings, client, method, limit, value, cfg_key, items_name,
        view_ctx):
    settings.PERFORMANCE_LIMITS = {
        'django.test.client.Client': {
            cfg_key: {
                'total': limit
            }
        }
    }
    with view_ctx(value=value) as vctx:
        with pytest.raises(LimitViolationError) as excinfo:
            vctx.request(getattr(client, method.lower()))
        assert excinfo.value.context == {
            'Client.request': ['{method} {url}'.format(
                url=vctx.url, method=method)]}
        assert excinfo.value.items_name == items_name, \
            excinfo.value.base_error_msg
项目:rstviewer    作者:arne-cl    | 项目源码 | 文件源码
def test_cli_rs3topng():
    """conversion to PNG on the commandline"""
    temp_png = tempfile.NamedTemporaryFile(suffix='.png', delete=False)
    temp_png.close()

    # calling `rstviewer -f png input.rs3 output.png` will end the program
    # with sys.exit(0), so we'll have to catch this here.
    with pytest.raises(SystemExit) as serr:
        cli(['-f', 'png', RS3_FILEPATH, temp_png.name])
        out, err = pytest.capsys.readouterr()
        assert err == 0

    with open(temp_png.name, 'r') as png_file:
        png_str = png_file.read()
        os.unlink(temp_png.name)

    # generated images might not be 100% identical, probably
    # because of the font used
    with open(EXPECTED_PNG1, 'r') as expected_png_file:
        ident1 = png_str == expected_png_file.read()

    with open(EXPECTED_PNG2, 'r') as expected_png_file:
        ident2 = png_str == expected_png_file.read()

    assert ident1 or ident2
项目:flash_services    作者:textbook    | 项目源码 | 文件源码
def test_abstract_methods_required():

    class Broken(Service):
        """Missing abstract methods."""

    with pytest.raises(TypeError):
        Broken()
项目:ngspicepy    作者:ashwith    | 项目源码 | 文件源码
def test__parse__(self):
        ng.load_netlist(netlists_path + 'dc_ac_check.net')
        with pytest.raises(KeyError):
            ng.run_dc(src='v1', start=0, stop=1, fstep=.1)

        with pytest.raises(ValueError):
            ng.run_dc(src='v1', start=0, stop=1)

        with pytest.raises(ValueError):
            ng.run_dc(src='v1', start=0, stop=1, step=.1,
                      start2=0, stop2=1, step2=.1)

        with pytest.raises(ValueError):
            ng.run_dc(src='v1', start=0, stop=1, step=.1,
                      src2='v2', start2=0, stop2=1)

        with pytest.raises(ValueError):
            ng.run_dc('v1 0 1 0 ')

        with pytest.raises(ValueError):
            ng.run_dc('v1 0 1 1m v2 0 1 0')
项目:dsq    作者:baverman    | 项目源码 | 文件源码
def test_sync_manager(manager):
    manager.sync = True

    @manager.task
    def foo(a, b):
        foo.called = True
        return a + b

    assert foo.push(1, 2).ready().value == 3
    assert foo.called

    with pytest.raises(KeyError):
        manager.process(make_task('boo'))

    @manager.task
    def bad():
        raise ZeroDivisionError()

    with pytest.raises(ZeroDivisionError):
        bad.push()
项目:nidaqmx-python    作者:ni    | 项目源码 | 文件源码
def test_insufficient_numpy_write_data(self, x_series_device, seed):
        # Reset the pseudorandom number generator with seed.
        random.seed(seed)

        # Randomly select physical channels to test.
        number_of_channels = random.randint(
            2, len(x_series_device.ao_physical_chans))
        channels_to_test = random.sample(
            x_series_device.ao_physical_chans, number_of_channels)

        with nidaqmx.Task() as task:
            task.ao_channels.add_ao_voltage_chan(
                flatten_channel_string([c.name for c in channels_to_test]),
                max_val=10, min_val=-10)

            number_of_samples = random.randint(1, number_of_channels - 1)
            values_to_test = numpy.float64([
                random.uniform(-10, 10) for _ in range(number_of_samples)])

            with pytest.raises(DaqError) as e:
                task.write(values_to_test, auto_start=True)
            assert e.value.error_code == -200524
项目:nidaqmx-python    作者:ni    | 项目源码 | 文件源码
def test_float_property(self, x_series_device):

        with nidaqmx.Task() as task:
            ai_channel = task.ai_channels.add_ai_voltage_chan(
                x_series_device.ai_physical_chans[0].name, max_val=5)

            # Test property default value.
            assert ai_channel.ai_max == 5

            # Test property setter and getter.
            max_value = 10
            ai_channel.ai_max = max_value
            assert ai_channel.ai_max == max_value

            # Test property deleter. Reading this property will throw an
            # error after being reset.
            del ai_channel.ai_max
            with pytest.raises(DaqError) as e:
                read_value = ai_channel.ai_max
            assert e.value.error_code == -200695
项目:quantiphy    作者:KenKundert    | 项目源码 | 文件源码
def test_simple_scaling():
    Quantity.set_prefs(spacer=None, show_label=None, label_fmt=None, label_fmt_full=None)
    q=Quantity('1kg')
    assert q.render() == '1 kg'
    assert q.render(scale=0.001, show_units=False) == '1'
    with pytest.raises(KeyError, message="Unable to convert between 'fuzz' and 'g'."):
        q.render(scale='fuzz')

    q=Quantity('1', units='g', scale=1000)
    assert q.render() == '1 kg'
    assert q.render(scale=(0.0022046, 'lbs')) == '2.2046 lbs'

    q=Quantity('1', scale=(1000, 'g'))
    assert q.render() == '1 kg'
    assert q.render(scale=lambda v, u: (0.0022046*v, 'lbs')) == '2.2046 lbs'

    def dB(v, u):
        return 20*math.log(v, 10), 'dB'+u

    def adB(v, u):
        return pow(10, v/20), u[2:] if u.startswith('dB') else u

    q=Quantity('-40 dBV', scale=adB)
    assert q.render() == '10 mV'
    assert q.render(scale=dB) == '-40 dBV'
项目:pytest-selenium-pdiff    作者:rentlytics    | 项目源码 | 文件源码
def test_screenshot_matches__raises_when_images_are_different_sizes(selenium, tmpdir):
    setup_selenium_session(selenium, tmpdir)

    screenshot_matches(selenium, 'testing-size')

    selenium.set_window_size(1200, 900)
    selenium.get('./tests/fixtures/page1-changed.html')
    selenium.find_element_by_tag_name('body').text.index("It has a second paragraph.")

    with pytest.raises(exceptions.ScreenshotMismatch):
        assert screenshot_matches(selenium, 'testing-size')

    captured_path = os.path.join(settings['PDIFF_PATH'], 'testing-size.captured.png')
    pdiff_path = os.path.join(settings['PDIFF_PATH'], 'testing-size.diff.png')

    assert os.path.exists(captured_path)
    assert os.path.exists(pdiff_path) is False
项目:sstash    作者:realcr    | 项目源码 | 文件源码
def test_value_write_failure_invalid_type():
    """
    Try writing a value not of type bytes. Should raise an exception.
    """
    ins = InnerStash({})

    with pytest.raises(SSValueError):
        ins.write_value(['hello','world'],3)

    with pytest.raises(SSValueError):
        ins.write_value(['hello','world'],"A string!")

    # Try writing a dummy class instance:
    class MyClass:
        pass

    with pytest.raises(SSValueError):
        ins.write_value(['hello','world'],MyClass())
项目:sstash    作者:realcr    | 项目源码 | 文件源码
def test_values_tree_structure():
    """
    Write various values to the tree. Make sure results are as expected
    """

    ins = InnerStash({})
    ins.write_value(['a','b','c'],b'abc')
    ins.write_value(['a','b'],b'ab')
    ins.write_value(['a','b','d'],b'abd')
    ins.write_value(['a','b','d','e'],b'abde')

    assert ins.read_value(['a','b','c']) == b'abc'
    assert ins.read_value(['a','b']) == b'ab'
    assert ins.read_value(['a','b','d']) == b'abd'
    assert ins.read_value(['a','b','d','e']) == b'abde'

    # The key ['a'] was not assigned a value:
    with pytest.raises(SSKeyError):
        ins.read_value(['a'])
项目:sstash    作者:realcr    | 项目源码 | 文件源码
def test_get_children():
    ins = InnerStash({})
    ins.write_value(['a','b','c'],b'abc')
    ins.write_value(['a','b'],b'ab')
    ins.write_value(['a','b','d'],b'abd')
    ins.write_value(['a','b','d','e'],b'abde')
    ins.write_value(['a','b','f'],b'abf')

    assert ins.get_children(['a']) == ['b']
    assert sorted(ins.get_children(['a','b'])) == sorted(['c','d','f'])
    assert ins.get_children(['a','b','d','e']) == []
    assert ins.get_children(['a','b','d']) == ['e']

    # Empty key:
    assert ins.get_children([]) == ['a']

    # Try nonexistent key:
    with pytest.raises(SSKeyError):
        ins.get_children(['r','q'])
项目:sstash    作者:realcr    | 项目源码 | 文件源码
def test_wrong_password(tmp_file_path):
    """
    Test the initialization of a CryptoStash (When stash file doesn't exist)
    """
    # Initialization:
    cs = CryptoStash(tmp_file_path,'my_password',default_num_iterations=1000)

    my_store = {'1':2, '3':4}
    cs.write_store({1:2,3:4})

    # Try to load Crypto Stash with the wrong password:
    with pytest.raises(SSCryptoError):
        cs = CryptoStash(tmp_file_path,'my_password2',
                default_num_iterations=1000)


    # Try to load Crypto Stash with the correct password.
    # Should work correctly:
    cs = CryptoStash(tmp_file_path,'my_password',default_num_iterations=1000)
    cs.read_store()
项目:windflow    作者:hartym    | 项目源码 | 文件源码
def test_rowmethod():
    session = Session()

    with pytest.raises(ValueError) as e:
        ModelStub.my_row_method(0, 1, 2)

    c, s, a = ModelStub.my_row_method(0, session, 1)

    assert c is 0
    assert s is session
    assert a == 1

    i = ModelStub()

    c, s, a = i.my_row_method(session, 2)
    assert c is i
    assert s is session
    assert a == 2
项目:python-libjuju    作者:juju    | 项目源码 | 文件源码
def test_juju_error_in_results_list(event_loop):
    '''
    Replicate the code that caused
    https://github.com/juju/python-libjuju/issues/67, and verify that
    we get a JujuError instead of passing silently by the failure.

    (We don't raise a JujuAPIError, because the request isn't
    completely invalid -- it's just passing a tag that doesn't exist.)

    This also verifies that we will raise a JujuError any time there
    is an error in one of a list of results.

    '''
    from juju.errors import JujuError
    from juju.client import client

    async with base.CleanModel() as model:
        ann_facade = client.AnnotationsFacade.from_connection(model.connection())

        ann = client.EntityAnnotations(
            entity='badtag',
            annotations={'gui-x': '1', 'gui-y': '1'},
        )
        with pytest.raises(JujuError):
            return await ann_facade.Set([ann])
项目:sappho    作者:lily-mayfield    | 项目源码 | 文件源码
def test_out_of_bounds(self):
        """Test the CameraOutOfBounds exception through
        testing through the default camera behavior movement.

        1. Create a camera
        2. Create a rectangle whose topleft is out-of-bounds
           of the Camera source surface.
        3. Assert exception is raised!

        """

        camera = Camera((800, 600), (1080, 1050), (300, 300))

        out_of_bounds_coord = (2000, 2000)
        out_of_bounds_rect = pygame.Rect(out_of_bounds_coord, [32, 32])

        with pytest.raises(CameraOutOfBounds):
            camera.scroll_to(out_of_bounds_rect)
            camera.update_state(28974329)
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def _assert_failure_with_guarantees(
        self,
        name,
        expected_cluster=None,
        expected_database=None,
        expected_environment=None,
        expected_suffixes=None
    ):
        with pytest.raises(ValueError) as e:
            DBSourcedNamespace.create_from_namespace_name_with_guarantees(
                name,
                expected_environment=expected_environment,
                expected_cluster=expected_cluster,
                expected_database=expected_database,
                expected_suffixes=expected_suffixes
            )
            assert "impossible to rectify" in e
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def test_exceed_max_retry_count(
        self,
        number_sequence_func,
        exp_backoff_with_max_retry_count_policy
    ):
        with pytest.raises(MaxRetryError) as e, mock.patch.object(
            exp_backoff_with_max_retry_count_policy.backoff_policy,
            'next_backoff_delay',
            return_value=0.1
        ) as next_backoff_delay_spy:
            retry_on_condition(
                retry_policy=exp_backoff_with_max_retry_count_policy,
                retry_conditions=[Predicate(self.always_true)],
                func_to_retry=number_sequence_func
            )
        assert number_sequence_func.call_count == self.max_retry_count + 1
        assert e.value.last_result == 4
        assert next_backoff_delay_spy.call_count == self.max_retry_count
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def test_refresh_queue_no_job(
        self,
        priority_refresh_queue,
        fake_source_name,
        refresh_result
    ):
        assert priority_refresh_queue.peek() == {}
        with pytest.raises(EmptyQueueError) as e:
            priority_refresh_queue.pop(fake_source_name)
        assert fake_source_name in e.value.message

        priority_refresh_queue.add_refreshes_to_queue([refresh_result])
        assert priority_refresh_queue.pop(fake_source_name) == refresh_result
        with pytest.raises(EmptyQueueError) as e:
            priority_refresh_queue.pop(fake_source_name)
        assert fake_source_name in e.value.message
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def test_missing_name_error(
        self,
        schema_check_command,
        parser,
        source_name
    ):
        with pytest.raises(FakeParserError) as e:
            schema_check_command.process_args(
                self._create_fake_args(
                    schema="{}",
                    source_name=source_name
                ),
                parser
            )
        assert e.value.args
        assert "--namespace must be provided when given a source name as source identifier" in e.value.args[0]
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def test_avro_schema_with_no_namespace(
        self,
        register_command,
        parser,
        source_name,
        schema_str
    ):
        args = self._create_fake_args(
            source_name=source_name,
            namespace=None,
            avro_schema=schema_str
        )
        with pytest.raises(FakeParserError) as e:
            register_command.run(args, parser)
        assert e.value.args
        assert "--namespace must be provided" in e.value.args[0]
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def _assert_schema_equals_schema_dict(
        self,
        topic_schema,
        schema_obj,
        schema_dict,
        topic_to_check=None
    ):
        assert topic_schema.schema_id == schema_dict['schema_id']
        assert topic_schema.base_schema_id == schema_dict['base_schema_id']
        assert topic_schema.primary_keys == schema_dict['primary_keys']
        assert topic_schema.note == schema_dict['note']
        assert schema_obj.schema_json == schema_dict['schema_json']
        assert schema_obj.status == schema_dict['status']
        if topic_to_check:
            self._assert_topic_equals_topic_dict(
                topic=topic_to_check,
                topic_dict=schema_dict['topic'],
                namespace_name=topic_to_check.source.namespace.name,
                source_name=topic_to_check.source.name,
                is_active=False,
                include_kafka_info=False
            )
        else:
            with pytest.raises(KeyError):
                schema_dict['topic']
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def test_process_source_and_namespace_args_name_error(
        self,
        command,
        parser,
        source_one_active
    ):
        args = Args(
            source_id=None,
            source_name=source_one_active,
            namespace=None
        )
        with pytest.raises(FakeParserError) as e:
            command.process_source_and_namespace_args(args, parser)
        assert e
        assert e.value.args
        assert "--namespace must be provided" in e.value.args[0]
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def test_ensure_messages_published_fails_when_overpublished(
        self, topic, messages, producer, topic_offsets
    ):
        for message in messages:
            producer.publish(message)
        producer.flush()

        with pytest.raises(
            PublicationUnensurableError
        ), mock.patch.object(
            data_pipeline.producer,
            'logger'
        ) as mock_logger:
            producer.ensure_messages_published(messages[:2], topic_offsets)

            self._assert_logged_info_correct(
                mock_logger,
                len(messages),
                topic,
                topic_offsets,
                message_count=len(messages[:2])
            )
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def test_publish_fails_after_retry(self, message, producer):
        # TODO(DATAPIPE-606|clin) investigate better way than mocking response
        with mock.patch.object(
            producer._kafka_producer.kafka_client,
            'send_produce_request',
            side_effect=[FailedPayloadsError]
        ) as mock_send_request, capture_new_messages(
            message.topic
        ) as get_messages, pytest.raises(
            MaxRetryError
        ):
            orig_topic_to_offset_map = self.get_orig_topic_to_offset_map(producer)
            producer.publish(message)
            producer.flush()

            messages = get_messages()
            assert len(messages) == 0
            assert mock_send_request.call_count == self.max_retry_count
            self.assert_new_topic_to_offset_map(
                producer,
                message.topic,
                orig_topic_to_offset_map,
                published_message_count=0
            )
项目:transfert    作者:rbernand    | 项目源码 | 文件源码
def test_move(tmpdir, storages):
    f = tmpdir.join('alpha')
    f_http = Resource(storages['http']('/LICENSE'))
    f_file = Resource(storages['file'](f.strpath))
    f_sftp = Resource(storages['sftp']('/gamma'))
    f_ftp = Resource(storages['ftp']('/beta'))
    assert f_http.exists()
    delete_files(f_file, f_sftp, f_ftp)
    assert not f_file.exists()\
        and not f_sftp.exists()\
        and not f_ftp.exists()
    with pytest.raises(transfert.exceptions.ReadOnlyResourceException):
        transfert.actions.move(f_http, f_ftp, size=100)
    assert f_ftp.exists() and f_http.exists()
    transfert.actions.move(f_ftp, f_sftp, size=100)
    assert not f_ftp.exists() and f_sftp.exists()
    transfert.actions.move(f_sftp, f_file, size=100)
    assert not f_sftp.exists() and f_file.exists()
项目:ln2sql    作者:FerreroJeremy    | 项目源码 | 文件源码
def noTableFoundExceptionQueries():
    noTable = [
        {
            'input': 'Quel est le nom des reservation ?',  # No table name found in sentence!
            'database': DATABASE_PATH + 'ecole.sql',
            'language': LANG_PATH + 'french.csv'
        }
    ]
    for test in noTable:
        with pytest.raises(Exception):
            Ln2sql(test['database'], test['language']).get_query(test['input'])
项目:ln2sql    作者:FerreroJeremy    | 项目源码 | 文件源码
def noLinkForJoinFoundExceptionQueries():   
    noLink = [
        {
            'input': "Quel est le professeur qui enseigne la matière SVT ?",
            # There is at least column `matiere` that is unreachable from table `PROFESSEUR`!
            'database': DATABASE_PATH + 'ecole.sql',
            'language': LANG_PATH + 'french.csv'
        },
        {
            'input': "compte les salle des élève",
            # There is at least column `salle` that is unreachable from table `ELEVE`!
            'database': DATABASE_PATH + 'ecole.sql',
            'language': LANG_PATH + 'french.csv'
        }
    ]
    for test in noLink:
        with pytest.raises(Exception):
            Ln2sql(test['database'], test['language']).get_query(test['input'])
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def test_report_can_deal_with_single_anonymous_result_not_with_more():
    report = WorstReport()
    report.handle_results_collected(
        signal=None, sender=WithId('foo'),
        results=[9], context={})
    assert list(report.data.keys()) == ['foo']
    assert get_value_and_context(report, 'foo')[0] == 9
    with pytest.raises(TypeError) as excinfo:
        report.handle_results_collected(
            signal=None, sender=WithId('foo'),
            results=[1, 2], context={})
    assert 'Duplicate result name(s): \'\'' == str(excinfo.value)
    assert list(report.data.keys()) == ['foo']
    assert get_value_and_context(report, 'foo')[0] == 9
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def test_can_limit_elapsed_seconds(seconds):
    with freeze_time('2016-09-22 15:57:01') as frozen_time:
        with pytest.raises(LimitViolationError) as excinfo:
            with TimeLimit(total=0):
                frozen_time.tick(timedelta(seconds=seconds))
    assert excinfo.value.base_error_msg == \
        'Too many ({}) total elapsed seconds (limit: 0)'.format(seconds)
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def test_signals_all_listeners_reports_first_failure(self, collector_cls):
        collector = collector_cls()

        class MyException(Exception):
            pass

        def get_mock_listener():
            m = Mock()
            m.return_value = None
            return m

        first_attached_handler = get_mock_listener()

        def second_handler_that_will_fail(*a, **kw):
            raise MyException('foo')
        last_attached_handler = get_mock_listener()

        listeners = [
            first_attached_handler, second_handler_that_will_fail,
            last_attached_handler]
        for l in listeners:
            results_collected.connect(l)
        try:
            with pytest.raises(MyException) as excinfo:
                with collector:
                    pass
            assert str(excinfo.value).endswith('foo')
            method_name_in_stacktrace = 'second_handler_that_will_fail'
            assert method_name_in_stacktrace in str(excinfo.value)
            assert first_attached_handler.called
            assert last_attached_handler.called
        finally:
            for l in listeners:
                results_collected.disconnect(l)
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def test_signal_handler_error_doesnt_hide_orig_error(self, collector_cls):
        collector = collector_cls()
        failing_signal_handler = Mock(side_effect=Exception('handler error'))
        results_collected.connect(failing_signal_handler)
        try:
            with pytest.raises(Exception) as excinfo:
                with collector:
                    raise Exception('actual code error')
            assert str(excinfo.value) == 'actual code error'
            assert failing_signal_handler.called
        finally:
            results_collected.disconnect(failing_signal_handler)
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def test_when_above_the_limit_there_is_an_error(
            self, limit_cls_and_name, limit, value):
        limit_cls, name = limit_cls_and_name
        assert limit < value, 'test pre-req'
        limit_obj = limit_cls(**{name: limit})
        result = NameValueResult(name, value)
        with pytest.raises(LimitViolationError) as excinfo:
            limit_obj.handle_results(
                results=[result], context=None)
        assert excinfo.value.limit_obj == limit_obj
        assert excinfo.value.result == result
        assert excinfo.value.actual == str(value)
        assert not excinfo.value.context
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def test_without_collector_id_cannot_be_settings_based(self, limit_cls):
        with pytest.raises(TypeError) as excinfo:
            limit_cls(settings_based=True)
        assert 'Can only be settings based when collector_id is provided.' == \
            str(excinfo.value)
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def test_limits_can_be_used_as_template_tags(sample_template):
    with freeze_time('2016-11-02 10:33:00') as frozen_time:
        too_slow = SlowRendering(frozen_time=frozen_time, render_in_seconds=5)
        too_slow_context = template.context.Context(
            {'slow_rendering': too_slow})
        with pytest.raises(LimitViolationError) as excinfo:
            rendered_content = sample_template.render(too_slow_context)
            print(rendered_content)  # to get debug info
        assert excinfo.value.actual == '5.0'
        assert excinfo.value.limit == 3
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def test_cannot_construct_from_list_of_duplicates(dotted_paths):
    with pytest.raises(DuplicateNamesError):
        UniqueNamedClassRegistry(dotted_paths)
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def test_fails_when_class_has_no_such_method_as_to_wrap(wrapper):
    with pytest.raises(AttributeError):
        wrapper(
            cls_to_wrap=object, method_to_wrap_name='no_such_method',
            context_manager=None, is_cls_method=False)
    with pytest.raises(AttributeError):
        wrapper(
            cls_to_wrap=object, method_to_wrap_name='no_such_method',
            context_manager=None, is_cls_method=True)
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def test_integration_test_with_db(db):
    with pytest.raises(LimitViolationError) as excinfo:
        with override_current_context() as ctx:
            with QueryBatchLimit(total=2) as limit:
                ctx.enter(key='some', value='context')
                list(Group.objects.all())
                Group.objects.update(name='bar')
                Group.objects.create(name='group')
    assert excinfo.value.context == {'some': ['context']}
    assert excinfo.value.limit_obj == limit
    assert excinfo.value.limit == 2
    assert excinfo.value.name == 'total'
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def test_has_support_for_number_of_queries_in_templates(db, settings):
    settings.PERFORMANCE_LIMITS = {
        'Template.render': {
            'queries': {
                'total': 0
            }
        }
    }
    template = loader.get_template('all-group-names.markdown')
    with pytest.raises(LimitViolationError) as excinfo:
        template.render(context={'groups': Group.objects.all()})

    assert excinfo.value.context == {'template': ['all-group-names.markdown']}
项目:django-performance-testing    作者:PaesslerAG    | 项目源码 | 文件源码
def test_exiting_a_not_entered_context_is_an_error(self):
        ctx = Context()
        with pytest.raises(ValueError) as excinfo:
            ctx.exit(key='no such key', value='some value')
        assert str(excinfo.value) == \
            'cannot exit not entered context - key {!r} mismatch'.format(
                'no such key')

        ctx.enter(key='key', value='enter value')
        with pytest.raises(ValueError) as excinfo:
            ctx.exit(key='key', value='exit value')
        assert str(excinfo.value) == \
            'cannot exit not entered context - value mismatch ' \
            '(exit: {!r}, enter: {!r})'.format(
                'exit value', 'enter value')
项目:flash_services    作者:textbook    | 项目源码 | 文件源码
def test_required_config(config):
    with pytest.raises(TypeError) as excinfo:
        Fake.from_config(**config)
    message = excinfo.value.args[0]
    assert 'missing required config keys' in message
    assert 'from Fake' in message