Python pytest 模块,importorskip() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pytest.importorskip()

项目:xarray-simlab    作者:benbovy    | 项目源码 | 文件源码
def test_dot_graph_no_filename(tmpdir, model):
    ipydisp = pytest.importorskip('IPython.display')

    # Map from format extension to expected return type.
    result_types = {
        'png': ipydisp.Image,
        'jpeg': ipydisp.Image,
        'dot': type(None),
        'pdf': type(None),
        'svg': ipydisp.SVG,
    }
    for format in result_types:
        before = tmpdir.listdir()
        result = dot_graph(model, filename=None, format=format)
        # We shouldn't write any files if filename is None.
        after = tmpdir.listdir()
        assert before == after
        assert isinstance(result, result_types[format])
项目:xarray-simlab    作者:benbovy    | 项目源码 | 文件源码
def test_filenames_and_formats(model):
    ipydisp = pytest.importorskip('IPython.display')

    # Test with a variety of user provided args
    filenames = ['modelpdf', 'model.pdf', 'model.pdf', 'modelpdf',
                 'model.pdf.svg']
    formats = ['svg', None, 'svg', None, None]
    targets = ['modelpdf.svg', 'model.pdf', 'model.pdf.svg', 'modelpdf.png',
               'model.pdf.svg']

    result_types = {
        'png': ipydisp.Image,
        'jpeg': ipydisp.Image,
        'dot': type(None),
        'pdf': type(None),
        'svg': ipydisp.SVG,
    }

    for filename, format, target in zip(filenames, formats, targets):
        expected_result_type = result_types[target.split('.')[-1]]
        result = dot_graph(model, filename=filename, format=format)
        assert os.path.isfile(target)
        assert isinstance(result, expected_result_type)
        _ensure_not_exists(target)
项目:xarray-simlab    作者:benbovy    | 项目源码 | 文件源码
def test_visualize(self, model):
        pytest.importorskip('graphviz')
        ipydisp = pytest.importorskip('IPython.display')

        result = model.visualize()
        assert isinstance(result, ipydisp.Image)

        result = model.visualize(show_inputs=True)
        assert isinstance(result, ipydisp.Image)

        result = model.visualize(show_variables=True)
        assert isinstance(result, ipydisp.Image)

        result = model.visualize(
            show_only_variable=('quantity', 'quantity'))
        assert isinstance(result, ipydisp.Image)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def spawn(self, cmd, expect_timeout=10.0):
        """Run a command using pexpect.

        The pexpect child is returned.
        """
        pexpect = pytest.importorskip("pexpect", "3.0")
        if hasattr(sys, 'pypy_version_info') and '64' in platform.machine():
            pytest.skip("pypy-64 bit not supported")
        if sys.platform == "darwin":
            pytest.xfail("pexpect does not work reliably on darwin?!")
        if sys.platform.startswith("freebsd"):
            pytest.xfail("pexpect does not work reliably on freebsd")
        logfile = self.tmpdir.join("spawn.out").open("wb")
        child = pexpect.spawn(cmd, logfile=logfile)
        self.request.addfinalizer(logfile.close)
        child.timeout = expect_timeout
        return child
项目:knit    作者:dask    | 项目源码 | 文件源码
def test_hdfs_home():
    hdfs3 = pytest.importorskip('hdfs3')
    hdfs = hdfs3.HDFileSystem()
    d = '/tmp/test'
    try:
        hdfs.mkdir(d)
        k = Knit(nn='localhost', rm='localhost', nn_port=8020, rm_port=8088,
                 replication_factor=1, hdfs_home=d)

        env_zip = k.create_env(env_name='dev', packages=['python=2.7'], remove=True)
        k.start('env', files=[env_zip], memory=128)

        assert d + '/.knitDeps' in hdfs.ls(d, False)
        assert d + "/.knitDeps/knit-1.0-SNAPSHOT.jar" in hdfs.ls(d + '/.knitDeps', False)
        assert d + "/.knitDeps/dev.zip" in hdfs.ls(d + '/.knitDeps', False)
        if not k.wait_for_completion(30):
            k.kill()

    finally:
        hdfs.rm(d, True)
        k.kill()
项目:pymapd    作者:mapd    | 项目源码 | 文件源码
def test_load_empty_table_arrow(self, con, empty_table):
        pd = pytest.importorskip("pandas")
        pa = pytest.importorskip("pyarrow")
        skip_if_no_arrow_loader(con)

        data = [(1, 1.1, 'a'),
                (2, 2.2, '2'),
                (3, 3.3, '3')]

        df = pd.DataFrame(data, columns=list('abc')).astype({
            'a': 'int32',
            'b': 'float32'
        })

        table = pa.Table.from_pandas(df, preserve_index=False)
        con.load_table(empty_table, table, method='arrow')
        result = sorted(con.execute("select * from {}".format(empty_table)))
        self.check_empty_insert(result, data)
项目:pymapd    作者:mapd    | 项目源码 | 文件源码
def test_load_columnar_pandas_all(self, con, all_types_table):
        pd = pytest.importorskip("pandas")
        import numpy as np

        data = pd.DataFrame({
            "boolean_": [True, False],
            "smallint_": np.array([0, 1], dtype=np.int8),
            "int_": np.array([0, 1], dtype=np.int32),
            "bigint_": np.array([0, 1], dtype=np.int64),
            "float_": np.array([0, 1], dtype=np.float32),
            "double_": np.array([0, 1], dtype=np.float64),
            "varchar_": ["a", "b"],
            "text_": ['a', 'b'],
            "time_": [datetime.time(0, 11, 59), datetime.time(13)],
            "timestamp_": [pd.Timestamp("2016"), pd.Timestamp("2017")],
            "date_": [datetime.date(2016, 1, 1), datetime.date(2017, 1, 1)],
        }, columns=['boolean_', 'smallint_', 'int_', 'bigint_', 'float_',
                    'double_', 'varchar_', 'text_', 'time_', 'timestamp_',
                    'date_'])
        con.load_table_columnar(all_types_table, data, preserve_index=False)
项目:pymapd    作者:mapd    | 项目源码 | 文件源码
def test_load_table_columnar_arrow_all(self, con, all_types_table):
        pa = pytest.importorskip("pyarrow")
        skip_if_no_arrow_loader(con)

        names = ['boolean_', 'smallint_', 'int_', 'bigint_',
                 'float_', 'double_', 'varchar_', 'text_',
                 'time_', 'timestamp_', 'date_']

        columns = [pa.array([True, False, None], type=pa.bool_()),
                   pa.array([1, 0, None]).cast(pa.int16()),
                   pa.array([1, 0, None]).cast(pa.int32()),
                   pa.array([1, 0, None]),
                   pa.array([1.0, 1.1, None]).cast(pa.float32()),
                   pa.array([1.0, 1.1, None]),
                   # no fixed-width string
                   pa.array(['a', 'b', None]),
                   pa.array(['a', 'b', None]),
                   (pa.array([1, 2, None]).cast(pa.int32())
                    .cast(pa.time32('s'))),
                   pa.array([datetime.datetime(2016, 1, 1, 12, 12, 12),
                             datetime.datetime(2017, 1, 1), None]),
                   pa.array([datetime.date(2016, 1, 1),
                             datetime.date(2017, 1, 1), None])]
        table = pa.Table.from_arrays(columns, names=names)
        con.load_table_arrow(all_types_table, table)
项目:pymapd    作者:mapd    | 项目源码 | 文件源码
def test_load_table_creates(self, con, not_a_table):
        pd = pytest.importorskip("pandas")
        import numpy as np

        data = pd.DataFrame({
            "boolean_": [True, False],
            "smallint_cast": np.array([0, 1], dtype=np.int8),
            "smallint_": np.array([0, 1], dtype=np.int16),
            "int_": np.array([0, 1], dtype=np.int32),
            "bigint_": np.array([0, 1], dtype=np.int64),
            "float_": np.array([0, 1], dtype=np.float32),
            "double_": np.array([0, 1], dtype=np.float64),
            "varchar_": ["a", "b"],
            "text_": ['a', 'b'],
            "time_": [datetime.time(0, 11, 59), datetime.time(13)],
            "timestamp_": [pd.Timestamp("2016"), pd.Timestamp("2017")],
            "date_": [datetime.date(2016, 1, 1), datetime.date(2017, 1, 1)],
        }, columns=['boolean_', 'smallint_', 'int_', 'bigint_', 'float_',
                    'double_', 'varchar_', 'text_', 'time_', 'timestamp_',
                    'date_'])
        con.load_table(not_a_table, data, create=True)
项目:dask-searchcv    作者:dask    | 项目源码 | 文件源码
def test_visualize():
    pytest.importorskip('graphviz')

    X, y = make_classification(n_samples=100, n_classes=2, flip_y=.2,
                               random_state=0)
    clf = SVC(random_state=0)
    grid = {'C': [.1, .5, .9]}
    gs = dcv.GridSearchCV(clf, grid).fit(X, y)

    assert hasattr(gs, 'dask_graph_')

    with tmpdir() as d:
        gs.visualize(filename=os.path.join(d, 'mydask'))
        assert os.path.exists(os.path.join(d, 'mydask.png'))

    # Doesn't work if not fitted
    gs = dcv.GridSearchCV(clf, grid)
    with pytest.raises(NotFittedError):
        gs.visualize()
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def spawn(self, cmd, expect_timeout=10.0):
        """Run a command using pexpect.

        The pexpect child is returned.
        """
        pexpect = pytest.importorskip("pexpect", "3.0")
        if hasattr(sys, 'pypy_version_info') and '64' in platform.machine():
            pytest.skip("pypy-64 bit not supported")
        if sys.platform == "darwin":
            pytest.xfail("pexpect does not work reliably on darwin?!")
        if sys.platform.startswith("freebsd"):
            pytest.xfail("pexpect does not work reliably on freebsd")
        logfile = self.tmpdir.join("spawn.out").open("wb")
        child = pexpect.spawn(cmd, logfile=logfile)
        self.request.addfinalizer(logfile.close)
        child.timeout = expect_timeout
        return child
项目:deb-python-txaio    作者:openstack    | 项目源码 | 文件源码
def test_batched_successful_call_explicit_loop(framework_aio):
    '''
    batched calls really happen in batches
    '''
    # Trollius doesn't come with this, so won't work on py2
    pytest.importorskip('asyncio.test_utils')
    from asyncio.test_utils import TestLoop

    def time_gen():
        yield
        yield
    new_loop = TestLoop(time_gen)
    calls = []

    def foo(*args, **kw):
        calls.append((args, kw))

    txa = txaio.with_config(loop=new_loop)

    batched = txa.make_batched_timer(5)

    batched.call_later(1, foo, "first call")
    new_loop.advance_time(2.0)
    new_loop._run_once()
    assert len(calls) == 1
项目:deb-python-txaio    作者:openstack    | 项目源码 | 文件源码
def test_create_future_success_explicit_loop(framework):
    """
    process events on alternate loop= for create_future later
    """
    pytest.importorskip('asyncio')
    if txaio.using_twisted:
        pytest.skip()

    import asyncio
    alt_loop = asyncio.new_event_loop()
    txa = txaio.with_config(loop=alt_loop)

    f = txa.create_future_success('some result')

    results = []
    f.add_done_callback(lambda r: results.append(r.result()))

    # run_once() runs the txaio.config.loop so we shouldn't get any
    # results until we spin alt_loop
    assert results == []
    run_once()
    assert results == []
    with replace_loop(alt_loop):
        run_once()
    assert results == ['some result']
项目:deb-python-txaio    作者:openstack    | 项目源码 | 文件源码
def test_explicit_reactor_coroutine(framework):
    """
    If we set an event-loop, Futures + Tasks should use it.
    """
    pytest.importorskip('asyncio')
    if txaio.using_twisted:
        pytest.skip()

    from asyncio import coroutine

    @coroutine
    def some_coroutine():
        yield 'nothing'

    with patch.object(txaio.config, 'loop') as fake_loop:
        txaio.as_future(some_coroutine)

        assert len(fake_loop.method_calls) == 2
        c = fake_loop.method_calls[1]
        assert c[0] == 'call_soon'
项目:deb-python-txaio    作者:openstack    | 项目源码 | 文件源码
def test_log_converter(handler, framework):
    pytest.importorskip("twisted.logger")
    # this checks that we can convert a plain Twisted Logger calling
    # failure() into a traceback on our observers.
    from twisted.logger import Logger
    from txaio.tx import _LogObserver

    out = six.StringIO()
    observer = _LogObserver(out)
    logger = Logger(observer=observer)

    try:
        raise RuntimeError("failed on purpose")
    except:
        logger.failure(None)

    output = out.getvalue()
    assert "failed on purpose" in output
    assert "Traceback" in output
项目:deb-python-txaio    作者:openstack    | 项目源码 | 文件源码
def test_txlog_write_binary(handler, framework):
    """
    Writing to a binary stream is supported.
    """
    pytest.importorskip("twisted.logger")
    from txaio.tx import _LogObserver

    out_file = BytesIO()
    observer = _LogObserver(out_file)

    observer({
        "log_format": "hi: {testentry}",
        "testentry": "hello",
        "log_level": observer.to_tx["info"],
        "log_time": 1442890018.002233
    })

    output = out_file.getvalue()
    assert b"hi: hello" in output
项目:deb-python-txaio    作者:openstack    | 项目源码 | 文件源码
def test_txlog_write_text(handler, framework_tx):
    """
    Writing to a text stream is supported.
    """
    pytest.importorskip("twisted.logger")
    from txaio.tx import _LogObserver

    out_file = StringIO()
    observer = _LogObserver(out_file)

    observer({
        "log_format": "hi: {testentry}",
        "testentry": "hello",
        "log_level": observer.to_tx["info"],
        "log_time": 1442890018.002233
    })

    output = out_file.getvalue()
    assert u"hi: hello" in output
项目:deb-python-txaio    作者:openstack    | 项目源码 | 文件源码
def test_aiolog_write_text(handler, framework_aio):
    """
    Writing to a text stream is supported.
    """
    pytest.importorskip("txaio.aio")
    from txaio.aio import _TxaioFileHandler

    out_file = StringIO()
    observer = _TxaioFileHandler(out_file)

    observer.emit(Log(args={
        "log_message": "hi: {testentry}",
        "testentry": "hello",
        "log_time": 1442890018.002233
    }))

    output = out_file.getvalue()
    assert u"hi: hello" in output
项目:devpi    作者:devpi    | 项目源码 | 文件源码
def test_logger_cfg_yaml(self, tmpdir):
        pytest.importorskip("yaml")
        logger_cfg = tmpdir.join("logging.yml")
        logger_cfg.write(textwrap.dedent("""
            version: 1
            disable_existing_loggers: False

            root:
                handlers: []
                level: WARNING
        """))
        config = make_config(["devpi-server", "--logger-cfg=%s" % logger_cfg])
        configure_logging(config.args)

        assert logging.getLogger().getEffectiveLevel() == logging.WARNING
        assert not logging.getLogger().handlers
项目:indigo    作者:mbdriscoll    | 项目源码 | 文件源码
def test_compat_Zpad(backend, X,Y,Z, P, K):
    pymr = pytest.importorskip('pymr')
    b = backend()

    i_shape = (X, Y, Z, K)
    o_shape = (X+2*P, Y+2*P, Z+2*P, K)

    x = indigo.util.rand64c( *i_shape )

    D0 = pymr.linop.Zpad( o_shape, i_shape, dtype=x.dtype )
    D1 = b.Zpad(o_shape[:3], i_shape[:3], dtype=x.dtype)

    x_indigo = np.asfortranarray(x.reshape((-1,K), order='F'))
    x_pmr = pymr.util.vec(x)
    y_exp = D0 * x_pmr
    y_act = D1 * x_indigo

    y_act = y_act.flatten(order='F')
    npt.assert_equal(y_act, y_exp)
项目:indigo    作者:mbdriscoll    | 项目源码 | 文件源码
def test_compat_Interp(backend, X, Y, Z, RO, PS, K, n, width):
    pymr = pytest.importorskip('pymr')
    b = backend()

    N = (X, Y, Z, K)
    M = (1, RO, PS, K)
    T = (3, RO, PS)

    import scipy.signal as signal
    beta = 0.1234
    kb = signal.kaiser(2 * n + 1, beta)[n:]

    x = indigo.util.rand64c(*N)
    traj = indigo.util.rand64c(*T).real - 0.5

    G0 = pymr.linop.Interp(M, N, traj, width, kb, dtype=x.dtype)
    y_exp = G0 * pymr.util.vec(x)

    G1 = b.Interp(N[:3], traj, width, kb, dtype=x.dtype)
    y_act = G1 * x.reshape((-1,K), order='F')

    y_act = y_act.flatten(order='F')
    npt.assert_allclose( y_act, y_exp, rtol=1e-2)
项目:indigo    作者:mbdriscoll    | 项目源码 | 文件源码
def test_compat_conjgrad(backend, N):
    pymr = pytest.importorskip('pymr')
    b = backend()

    A = indigo.util.randM( N, N, 0.5 )
    A = A.H @ A # make positive definite
    y = indigo.util.rand64c( N )
    x0 = np.zeros( N, dtype=np.complex64 )

    A_pmr = pymr.linop.Matrix( A.toarray(), dtype=A.dtype )
    x_exp = pymr.alg.cg(A_pmr, A.H * y, x0, maxiter=40)

    A_indigo = b.SpMatrix(A)
    b.cg(A_indigo, y, x0, maxiter=40)
    x_act = x0.copy()

    npt.assert_allclose(x_act, x_exp, rtol=1e-6)
项目:GSM-scanner    作者:yosriayed    | 项目源码 | 文件源码
def test_unittest_mock_and_fixture(self, testdir):
        pytest.importorskip("unittest.mock")
        testdir.makepyfile("""
            import os.path
            import unittest.mock
            import pytest

            @pytest.fixture
            def inject_me():
                pass

            @unittest.mock.patch.object(os.path, "abspath",
                                        new=unittest.mock.MagicMock)
            def test_hello(inject_me):
                import os
                os.path.abspath("hello")
        """)
        reprec = testdir.inline_run()
        reprec.assertoutcome(passed=1)
项目:GSM-scanner    作者:yosriayed    | 项目源码 | 文件源码
def test_random_report_log_xdist(testdir):
    """xdist calls pytest_runtest_logreport as they are executed by the slaves,
    with nodes from several nodes overlapping, so junitxml must cope with that
    to produce correct reports. #1064
    """
    pytest.importorskip('xdist')
    testdir.makepyfile("""
        import pytest, time
        @pytest.mark.parametrize('i', list(range(30)))
        def test_x(i):
            assert i != 22
    """)
    _, dom = runandparse(testdir, '-n2')
    suite_node = dom.getElementsByTagName("testsuite")[0]
    failed = []
    for case_node in suite_node.getElementsByTagName("testcase"):
        if case_node.getElementsByTagName('failure'):
            failed.append(case_node.getAttributeNode('name').value)

    assert failed == ['test_x[22]']
项目:GSM-scanner    作者:yosriayed    | 项目源码 | 文件源码
def test_preparse_ordering_with_setuptools(testdir, monkeypatch):
    pkg_resources = pytest.importorskip("pkg_resources")
    def my_iter(name):
        assert name == "pytest11"
        class EntryPoint:
            name = "mytestplugin"
            class dist:
                pass
            def load(self):
                class PseudoPlugin:
                    x = 42
                return PseudoPlugin()
        return iter([EntryPoint()])
    monkeypatch.setattr(pkg_resources, 'iter_entry_points', my_iter)
    testdir.makeconftest("""
        pytest_plugins = "mytestplugin",
    """)
    monkeypatch.setenv("PYTEST_PLUGINS", "mytestplugin")
    config = testdir.parseconfig()
    plugin = config.pluginmanager.getplugin("mytestplugin")
    assert plugin.x == 42
项目:GSM-scanner    作者:yosriayed    | 项目源码 | 文件源码
def spawn(self, cmd, expect_timeout=10.0):
        """Run a command using pexpect.

        The pexpect child is returned.
        """
        pexpect = pytest.importorskip("pexpect", "3.0")
        if hasattr(sys, 'pypy_version_info') and '64' in platform.machine():
            pytest.skip("pypy-64 bit not supported")
        if sys.platform == "darwin":
            pytest.xfail("pexpect does not work reliably on darwin?!")
        if sys.platform.startswith("freebsd"):
            pytest.xfail("pexpect does not work reliably on freebsd")
        logfile = self.tmpdir.join("spawn.out").open("wb")
        child = pexpect.spawn(cmd, logfile=logfile)
        self.request.addfinalizer(logfile.close)
        child.timeout = expect_timeout
        return child
项目:py-validate    作者:gfyoung    | 项目源码 | 文件源码
def test_numpy_dtypes(self):
        np = pytest.importorskip("numpy")
        dtypes = np.typecodes["AllInteger"]

        for dtype in dtypes:
            dtype = np.dtype(dtype).type
            shortcuts.check_integer(dtype(1))

        # The integer check is strict. The
        # type must match, even if numerically
        # the number is an integer.
        msg = "Expected an integer but got"
        dtypes = (np.typecodes["AllFloat"] +
                  np.typecodes["Complex"])

        for dtype in dtypes:
            dtype = np.dtype(dtype).type
            assert_raises(TypeError, msg, shortcuts.check_integer, dtype(1))
项目:fleaker    作者:croscon    | 项目源码 | 文件源码
def test_make_instance():
    """Ensure that the schema's make instance works properly."""
    peewee = pytest.importorskip('peewee')

    class User(peewee.Model):
        name = peewee.CharField(max_length=255)

    class UserSchema(Schema):
        name = fields.String()

        class Meta:
            model = User

    data = {'name': 'Bob Blah'}
    user = UserSchema.make_instance(data)

    assert isinstance(user, User)
    assert user.name == data['name']
项目:xarray-simlab    作者:benbovy    | 项目源码 | 文件源码
def test_dot_graph(model, tmpdir):
    ipydisp = pytest.importorskip('IPython.display')

    # Use a name that the shell would interpret specially to ensure that we're
    # not vulnerable to shell injection when interacting with `dot`.
    filename = str(tmpdir.join('$(touch should_not_get_created.txt)'))

    # Map from format extension to expected return type.
    result_types = {
        'png': ipydisp.Image,
        'jpeg': ipydisp.Image,
        'dot': type(None),
        'pdf': type(None),
        'svg': ipydisp.SVG,
    }
    for format in result_types:
        target = '.'.join([filename, format])
        _ensure_not_exists(target)
        try:
            result = dot_graph(model, filename=filename, format=format)

            assert not os.path.exists('should_not_get_created.txt')
            assert os.path.isfile(target)
            assert isinstance(result, result_types[format])
        finally:
            _ensure_not_exists(target)

    # format supported by graphviz but not by IPython
    with pytest.raises(ValueError) as excinfo:
        dot_graph(model, filename=filename, format='ps')
    assert "Unknown format" in str(excinfo.value)
项目:abe-bootstrap    作者:TryCoin-Team    | 项目源码 | 文件源码
def __init__(db, dbtype):
        pytest.importorskip(dbtype)
        import tempfile
        db.installation_dir = py.path.local(tempfile.mkdtemp(prefix='abe-test-'))
        print("Created temporary directory %s" % db.installation_dir)
        try:
            db.server = db.install_server()
        except Exception as e:
            #print("EXCEPTION %s" % e)
            db._delete_tmpdir()
            pytest.skip(e)
            raise
        DB.__init__(db, dbtype, db.get_connect_args())
项目:pyrepl    作者:dajose    | 项目源码 | 文件源码
def pytest_funcarg__child(request):
    try:
        pexpect = pytest.importorskip('pexpect')
    except SyntaxError:
        pytest.skip('pexpect wont work on py3k')
    child = pexpect.spawn(sys.executable, ['-S'], timeout=10)
    if sys.version_info >= (3, ):
        child.logfile = sys.stdout.buffer
    else:
        child.logfile = sys.stdout
    child.sendline('from pyrepl.python_reader import main')
    child.sendline('main()')
    return child
项目:knit    作者:dask    | 项目源码 | 文件源码
def test_existing_path(k):
    # TODO: is this a good test if we noisily log the upload of files?
    hdfs3 = pytest.importorskip('hdfs3')
    hdfs = hdfs3.HDFileSystem()
    k.hdfs = hdfs
    cmd = 'ls -l'
    hdfs.put(__file__, '/tmp/mytestfile')
    k.start(cmd, files=['hdfs://tmp/mytestfile'])
    wait_for_status(k, 'FINISHED')
    time.sleep(2)
    out = k.logs()
    assert ' mytestfile ' in str(out)
项目:pymapd    作者:mapd    | 项目源码 | 文件源码
def test_select_ipc_parametrized(self, con, stocks, query, parameters):
        pd = pytest.importorskip("pandas")
        import numpy as np
        import pandas.util.testing as tm

        result = con.select_ipc(query, parameters=parameters)
        expected = pd.DataFrame({
            "qty": np.array([100, 100], dtype=np.int32),
            "price": np.array([35.13999938964844, 12.140000343322754],
                              dtype=np.float32)
        })[['qty', 'price']]
        tm.assert_frame_equal(result, expected)
项目:pymapd    作者:mapd    | 项目源码 | 文件源码
def test_select_ipc_first_n(self, con, stocks):
        pytest.importorskip("pandas")
        result = con.select_ipc("select * from stocks", first_n=1)
        assert len(result) == 1
项目:pymapd    作者:mapd    | 项目源码 | 文件源码
def test_select_ipc_pandas(self, con):
        pytest.importorskip("pyarrow")
        with mock.patch.dict('sys.modules', {'pandas': None}):
            with pytest.raises(ImportError) as m:
                con.select_ipc("select * from foo;")

        assert m.match("pandas is required for `select_ipc`")
项目:pymapd    作者:mapd    | 项目源码 | 文件源码
def test_load_empty_table_pandas(self, con, empty_table):
        # TODO: just require arrow and pandas for tests
        pd = pytest.importorskip("pandas")

        data = [(1, 1.1, 'a'),
                (2, 2.2, '2'),
                (3, 3.3, '3')]
        df = pd.DataFrame(data, columns=list('abc'))
        con.load_table(empty_table, df, method='columnar')
        result = sorted(con.execute("select * from {}".format(empty_table)))
        self.check_empty_insert(result, data)
项目:pymapd    作者:mapd    | 项目源码 | 文件源码
def test_load_infer(self, con, empty_table):
        pd = pytest.importorskip("pandas")
        skip_if_no_arrow_loader(con)
        import numpy as np

        data = pd.DataFrame(
            {'a': np.array([0, 1], dtype=np.int32),
             'b': np.array([1.1, 2.2], dtype=np.float32),
             'c': ['a', 'b']}
        )
        con.load_table(empty_table, data)
项目:pymapd    作者:mapd    | 项目源码 | 文件源码
def test_create_table(self, con, not_a_table):
        pd = pytest.importorskip("pandas")
        df = pd.DataFrame({"A": [1, 2], "B": [1., 2.]})
        con.create_table(not_a_table, df)
项目:pdb    作者:antocuni    | 项目源码 | 文件源码
def test_track_with_no_args():
    pytest.importorskip('rpython.translator.tool.reftracker')

    def fn():
        set_trace()
        return 42

    check(fn, """
[NUM] > .*fn()
-> return 42
# track
... SyntaxError:
# c
""")
项目:parsec-cloud    作者:Scille    | 项目源码 | 文件源码
def init_or_skiptest_parsec_postgresql():
    if pytest.config.getoption('--no-postgresql'):
        pytest.skip('`--no-postgresql` option provided')
    module = pytest.importorskip('parsec.backend.postgresql')
    url = postgresql_url()
    await module._init_db(url, force=True)
    return module, url
项目:camisole    作者:prologin    | 项目源码 | 文件源码
def http_request(request):
    if request.param == 'json':
        return request.getfuncargvalue('json_request')
    if request.param == 'msgpack':
        pytest.importorskip('msgpack')
        return request.getfuncargvalue('msgpack_request')
项目:execnet    作者:pytest-dev    | 项目源码 | 文件源码
def execmodel(request):
    if request.param != "thread":
        pytest.importorskip(request.param)
    if sys.platform == "win32":
        pytest.xfail("eventlet/gevent do not work onwin32")
    return get_execmodel(request.param)
项目:execnet    作者:pytest-dev    | 项目源码 | 文件源码
def test_mmap(self, tmpdir, val):
        mmap = pytest.importorskip("mmap").mmap
        p = tmpdir.join("data")
        with p.open("wb") as f:
            f.write(execnet.dumps(val))
        f = p.open("r+b")
        m = mmap(f.fileno(), 0)
        val2 = execnet.load(m)
        assert val == val2
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_porter2():
    from whoosh.lang.porter2 import stem

    plurals = ['caresses', 'flies', 'dies', 'mules', 'denied',
               'died', 'agreed', 'owned', 'humbled', 'sized',
               'meeting', 'stating', 'siezing', 'itemization',
               'sensational', 'traditional', 'reference', 'colonizer',
               'plotted']
    singles = [stem(w) for w in plurals]

    assert singles == ['caress', 'fli', 'die', 'mule', 'deni', 'die',
                       'agre', 'own', 'humbl', 'size', 'meet', 'state',
                       'siez', 'item', 'sensat', 'tradit', 'refer',
                       'colon', 'plot']
    assert stem("bill's") == "bill"
    assert stem("y's") == "y"


#def test_pystemmer():
#    Stemmer = pytest.importorskip("Stemmer")
#
#    ana = (analysis.RegexTokenizer()
#           | analysis.LowercaseFilter()
#           | analysis.PyStemmerFilter())
#    schema = fields.Schema(text=fields.TEXT(analyzer=ana))
#    st = RamStorage()
#
#    ix = st.create_index(schema)
#    with ix.writer() as w:
#        w.add_document(text=u("rains falling strangely"))
#
#    ix = st.open_index()
#    with ix.writer() as w:
#        w.add_document(text=u("pains stalling strongly"))
#
#    ix = st.open_index()
#    with ix.reader() as r:
#        assert (list(r.field_terms("text"))
#                == ["fall", "pain", "rain", "stall", "strang", "strong"])
项目:crick    作者:jcrist    | 项目源码 | 文件源码
def test_skew(x, bias):
    stats = pytest.importorskip('scipy.stats')
    s = SummaryStats()
    s.update(x)
    res = s.skew(bias=bias)
    sol = stats.skew(x[~np.isnan(x)], bias=bias) if len(x) else np.nan
    np.testing.assert_allclose(res, sol, rtol=RTOL, atol=ATOL)
项目:crick    作者:jcrist    | 项目源码 | 文件源码
def test_kurt(x, bias, fisher):
    stats = pytest.importorskip('scipy.stats')
    s = SummaryStats()
    s.update(x)

    res = s.kurt(bias=bias, fisher=fisher)
    if len(x):
        sol = stats.kurtosis(x[~np.isnan(x)], bias=bias, fisher=fisher)
    else:
        sol = np.nan
    np.testing.assert_allclose(res, sol, rtol=RTOL, atol=ATOL)
项目:ml-utils    作者:LinxiFan    | 项目源码 | 文件源码
def test_import():
    mylib = pytest.importorskip("weird-lib")
    print(mylib)
项目:dask-searchcv    作者:dask    | 项目源码 | 文件源码
def test_scheduler_param(scheduler, n_jobs, get):
    if scheduler == 'multiprocessing':
        mp = pytest.importorskip('dask.multiprocessing')
        get = mp.get

    assert _normalize_scheduler(scheduler, n_jobs) is get

    X, y = make_classification(n_samples=100, n_features=10, random_state=0)
    gs = dcv.GridSearchCV(MockClassifier(), {'foo_param': [0, 1, 2]}, cv=3,
                          scheduler=scheduler, n_jobs=n_jobs)
    gs.fit(X, y)
项目:nengo_spa    作者:nengo    | 项目源码 | 文件源码
def test_prob_cleanup(rng):
    pytest.importorskip('scipy')

    assert 1.0 > prob_cleanup(0.7, 64, 10000) > 0.9999
    assert 0.9999 > prob_cleanup(0.6, 64, 10000) > 0.999
    assert 0.99 > prob_cleanup(0.5, 64, 1000) > 0.9

    assert 0.999 > prob_cleanup(0.4, 128, 1000) > 0.997
    assert 0.99 > prob_cleanup(0.4, 128, 10000) > 0.97
    assert 0.9 > prob_cleanup(0.4, 128, 100000) > 0.8
项目:nengo_spa    作者:nengo    | 项目源码 | 文件源码
def assert_noexceptions(nb_file, tmpdir):
    plt = pytest.importorskip('matplotlib.pyplot')
    pytest.importorskip("IPython", minversion="1.0")
    pytest.importorskip("jinja2")
    from nengo.utils.ipython import export_py
    nb = load_example(nb_file)
    pyfile = "%s.py" % tmpdir.join(os.path.basename(nb_file))
    export_py(nb, pyfile)
    execfile(pyfile, {})
    plt.close('all')