Python warnings 模块,filterwarnings() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用warnings.filterwarnings()

项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def emits_warning_on(db, *messages):
    """Mark a test as emitting a warning on a specific dialect.

    With no arguments, squelches all SAWarning failures.  Or pass one or more
    strings; these will be matched to the root of the warning description by
    warnings.filterwarnings().

    Note that emits_warning_on does **not** assert that the warnings
    were in fact seen.

    """
    @decorator
    def decorate(fn, *args, **kw):
        with expect_warnings_on(db, assert_=False, *messages):
            return fn(*args, **kw)

    return decorate
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def uses_deprecated(*messages):
    """Mark a test as immune from fatal deprecation warnings.

    With no arguments, squelches all SADeprecationWarning failures.
    Or pass one or more strings; these will be matched to the root
    of the warning description by warnings.filterwarnings().

    As a special case, you may pass a function name prefixed with //
    and it will be re-written as needed to match the standard warning
    verbiage emitted by the sqlalchemy.util.deprecated decorator.

    Note that uses_deprecated does **not** assert that the warnings
    were in fact seen.

    """

    @decorator
    def decorate(fn, *args, **kw):
        with expect_deprecated(*messages, assert_=False):
            return fn(*args, **kw)
    return decorate
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def assert_warnings(fn, warning_msgs, regex=False):
    """Assert that each of the given warnings are emitted by fn."""

    from .assertions import eq_

    with warnings.catch_warnings(record=True) as log:
        # ensure that nothing is going into __warningregistry__
        warnings.filterwarnings("always")

        result = fn()
    for warning in log:
        popwarn = warning_msgs.pop(0)
        if regex:
            assert re.match(popwarn, str(warning.message))
        else:
            eq_(popwarn, str(warning.message))
    return result
项目:sublime-text-3-packages    作者:nickjj    | 项目源码 | 文件源码
def run():  # pragma: no cover
    """Run Markdown from the command line."""

    # Parse options and adjust logging level if necessary
    options, logging_level = parse_options()
    if not options:
        sys.exit(2)
    logger.setLevel(logging_level)
    console_handler = logging.StreamHandler()
    logger.addHandler(console_handler)
    if logging_level <= WARNING:
        # Ensure deprecation warnings get displayed
        warnings.filterwarnings('default')
        logging.captureWarnings(True)
        warn_logger = logging.getLogger('py.warnings')
        warn_logger.addHandler(console_handler)

    # Run
    markdown.markdownFromFile(**options)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def runWithWarningsSuppressed(suppressedWarnings, f, *a, **kw):
    """Run the function C{f}, but with some warnings suppressed.

    @param suppressedWarnings: A list of arguments to pass to filterwarnings.
                               Must be a sequence of 2-tuples (args, kwargs).
    @param f: A callable, followed by its arguments and keyword arguments
    """
    for args, kwargs in suppressedWarnings:
        warnings.filterwarnings(*args, **kwargs)
    addedFilters = warnings.filters[:len(suppressedWarnings)]
    try:
        result = f(*a, **kw)
    except:
        exc_info = sys.exc_info()
        _resetWarningFilters(None, addedFilters)
        raise exc_info[0], exc_info[1], exc_info[2]
    else:
        if isinstance(result, defer.Deferred):
            result.addBoth(_resetWarningFilters, addedFilters)
        else:
            _resetWarningFilters(None, addedFilters)
        return result
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def test_norm_hash_name(self):
        """norm_hash_name()"""
        from itertools import chain
        from passlib.crypto.digest import norm_hash_name, _known_hash_names

        # snapshot warning state, ignore unknown hash warnings
        ctx = warnings.catch_warnings()
        ctx.__enter__()
        self.addCleanup(ctx.__exit__)
        warnings.filterwarnings("ignore", '.*unknown hash')

        # test string types
        self.assertEqual(norm_hash_name(u("MD4")), "md4")
        self.assertEqual(norm_hash_name(b"MD4"), "md4")
        self.assertRaises(TypeError, norm_hash_name, None)

        # test selected results
        for row in chain(_known_hash_names, self.norm_hash_samples):
            for idx, format in enumerate(self.norm_hash_formats):
                correct = row[idx]
                for value in row:
                    result = norm_hash_name(value, format)
                    self.assertEqual(result, correct,
                                     "name=%r, format=%r:" % (value,
                                                              format))
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def test_90_special(self):
        """test marker option & special behavior"""
        warnings.filterwarnings("ignore", "passing settings to .*.hash\(\) is deprecated")
        handler = self.handler

        # preserve hash if provided
        self.assertEqual(handler.genhash("stub", "!asd"), "!asd")

        # use marker if no hash
        self.assertEqual(handler.genhash("stub", ""), handler.default_marker)
        self.assertEqual(handler.hash("stub"), handler.default_marker)
        self.assertEqual(handler.using().default_marker, handler.default_marker)

        # custom marker
        self.assertEqual(handler.genhash("stub", "", marker="*xxx"), "*xxx")
        self.assertEqual(handler.hash("stub", marker="*xxx"), "*xxx")
        self.assertEqual(handler.using(marker="*xxx").hash("stub"), "*xxx")

        # reject invalid marker
        self.assertRaises(ValueError, handler.genhash, 'stub', "", marker='abc')
        self.assertRaises(ValueError, handler.hash, 'stub', marker='abc')
        self.assertRaises(ValueError, handler.using, marker='abc')
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def test_15_min_verify_time(self):
        """test get_min_verify_time() method"""
        # silence deprecation warnings for min verify time
        warnings.filterwarnings("ignore", category=DeprecationWarning)

        pa = CryptPolicy()
        self.assertEqual(pa.get_min_verify_time(), 0)
        self.assertEqual(pa.get_min_verify_time('admin'), 0)

        pb = pa.replace(min_verify_time=.1)
        self.assertEqual(pb.get_min_verify_time(), 0)
        self.assertEqual(pb.get_min_verify_time('admin'), 0)

    #===================================================================
    # serialization
    #===================================================================
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def setUpWarnings(self):
        """helper to init warning filters before subclass setUp()"""
        if self.resetWarningState:
            ctx = reset_warnings()
            ctx.__enter__()
            self.addCleanup(ctx.__exit__)

            # ignore warnings about PasswordHash features deprecated in 1.7
            # TODO: should be cleaned in 2.0, when support will be dropped.
            #       should be kept until then, so we test the legacy paths.
            warnings.filterwarnings("ignore", r"the method .*\.(encrypt|genconfig|genhash)\(\) is deprecated")
            warnings.filterwarnings("ignore", r"the 'vary_rounds' option is deprecated")

    #---------------------------------------------------------------
    # tweak message formatting so longMessage mode is only enabled
    # if msg ends with ":", and turn on longMessage by default.
    #---------------------------------------------------------------
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_pipe_handle(self):
        h, _ = windows_utils.pipe(overlapped=(True, True))
        _winapi.CloseHandle(_)
        p = windows_utils.PipeHandle(h)
        self.assertEqual(p.fileno(), h)
        self.assertEqual(p.handle, h)

        # check garbage collection of p closes handle
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", "",  ResourceWarning)
            del p
            support.gc_collect()
        try:
            _winapi.CloseHandle(h)
        except OSError as e:
            self.assertEqual(e.winerror, 6)     # ERROR_INVALID_HANDLE
        else:
            raise RuntimeError('expected ERROR_INVALID_HANDLE')
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def emits_warning_on(db, *messages):
    """Mark a test as emitting a warning on a specific dialect.

    With no arguments, squelches all SAWarning failures.  Or pass one or more
    strings; these will be matched to the root of the warning description by
    warnings.filterwarnings().

    Note that emits_warning_on does **not** assert that the warnings
    were in fact seen.

    """
    @decorator
    def decorate(fn, *args, **kw):
        with expect_warnings_on(db, assert_=False, *messages):
            return fn(*args, **kw)

    return decorate
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def uses_deprecated(*messages):
    """Mark a test as immune from fatal deprecation warnings.

    With no arguments, squelches all SADeprecationWarning failures.
    Or pass one or more strings; these will be matched to the root
    of the warning description by warnings.filterwarnings().

    As a special case, you may pass a function name prefixed with //
    and it will be re-written as needed to match the standard warning
    verbiage emitted by the sqlalchemy.util.deprecated decorator.

    Note that uses_deprecated does **not** assert that the warnings
    were in fact seen.

    """

    @decorator
    def decorate(fn, *args, **kw):
        with expect_deprecated(*messages, assert_=False):
            return fn(*args, **kw)
    return decorate
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def test_empty_tuple_index(self):
        # Empty tuple index creates a view
        a = np.array([1, 2, 3])
        assert_equal(a[()], a)
        assert_(a[()].base is a)
        a = np.array(0)
        assert_(isinstance(a[()], np.int_))

        # Regression, it needs to fall through integer and fancy indexing
        # cases, so need the with statement to ignore the non-integer error.
        with warnings.catch_warnings():
            warnings.filterwarnings('ignore', '', DeprecationWarning)
            a = np.array([1.])
            assert_(isinstance(a[0.], np.float_))

            a = np.array([np.array(1)], dtype=object)
            assert_(isinstance(a[0.], np.ndarray))
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def test_scalar_none_comparison(self):
        # Scalars should still just return False and not give a warnings.
        # The comparisons are flagged by pep8, ignore that.
        with warnings.catch_warnings(record=True) as w:
            warnings.filterwarnings('always', '', FutureWarning)
            assert_(not np.float32(1) == None)
            assert_(not np.str_('test') == None)
            # This is dubious (see below):
            assert_(not np.datetime64('NaT') == None)

            assert_(np.float32(1) != None)
            assert_(np.str_('test') != None)
            # This is dubious (see below):
            assert_(np.datetime64('NaT') != None)
        assert_(len(w) == 0)

        # For documentation purposes, this is why the datetime is dubious.
        # At the time of deprecation this was no behaviour change, but
        # it has to be considered when the deprecations are done.
        assert_(np.equal(np.datetime64('NaT'), None))
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def test_identity_equality_mismatch(self):
        a = np.array([np.nan], dtype=object)

        with warnings.catch_warnings():
            warnings.filterwarnings('always', '', FutureWarning)
            assert_warns(FutureWarning, np.equal, a, a)
            assert_warns(FutureWarning, np.not_equal, a, a)

        with warnings.catch_warnings():
            warnings.filterwarnings('error', '', FutureWarning)
            assert_raises(FutureWarning, np.equal, a, a)
            assert_raises(FutureWarning, np.not_equal, a, a)
            # And the other do not warn:
            with np.errstate(invalid='ignore'):
                np.less(a, a)
                np.greater(a, a)
                np.less_equal(a, a)
                np.greater_equal(a, a)
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def test_skip_footer_with_invalid(self):
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore")
            basestr = '1 1\n2 2\n3 3\n4 4\n5  \n6  \n7  \n'
            # Footer too small to get rid of all invalid values
            assert_raises(ValueError, np.genfromtxt,
                          TextIO(basestr), skip_footer=1)
    #        except ValueError:
    #            pass
            a = np.genfromtxt(
                TextIO(basestr), skip_footer=1, invalid_raise=False)
            assert_equal(a, np.array([[1., 1.], [2., 2.], [3., 3.], [4., 4.]]))
            #
            a = np.genfromtxt(TextIO(basestr), skip_footer=3)
            assert_equal(a, np.array([[1., 1.], [2., 2.], [3., 3.], [4., 4.]]))
            #
            basestr = '1 1\n2  \n3 3\n4 4\n5  \n6 6\n7 7\n'
            a = np.genfromtxt(
                TextIO(basestr), skip_footer=1, invalid_raise=False)
            assert_equal(a, np.array([[1., 1.], [3., 3.], [4., 4.], [6., 6.]]))
            a = np.genfromtxt(
                TextIO(basestr), skip_footer=3, invalid_raise=False)
            assert_equal(a, np.array([[1., 1.], [3., 3.], [4., 4.]]))
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def test_version_2_0():
    f = BytesIO()
    # requires more than 2 byte for header
    dt = [(("%d" % i) * 100, float) for i in range(500)]
    d = np.ones(1000, dtype=dt)

    format.write_array(f, d, version=(2, 0))
    with warnings.catch_warnings(record=True) as w:
        warnings.filterwarnings('always', '', UserWarning)
        format.write_array(f, d)
        assert_(w[0].category is UserWarning)

    f.seek(0)
    n = format.read_array(f)
    assert_array_equal(d, n)

    # 1.0 requested but data cannot be saved this way
    assert_raises(ValueError, format.write_array, f, d, (1, 0))
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def test_version_2_0_memmap():
    # requires more than 2 byte for header
    dt = [(("%d" % i) * 100, float) for i in range(500)]
    d = np.ones(1000, dtype=dt)
    tf = tempfile.mktemp('', 'mmap', dir=tempdir)

    # 1.0 requested but data cannot be saved this way
    assert_raises(ValueError, format.open_memmap, tf, mode='w+', dtype=d.dtype,
                            shape=d.shape, version=(1, 0))

    ma = format.open_memmap(tf, mode='w+', dtype=d.dtype,
                            shape=d.shape, version=(2, 0))
    ma[...] = d
    del ma

    with warnings.catch_warnings(record=True) as w:
        warnings.filterwarnings('always', '', UserWarning)
        ma = format.open_memmap(tf, mode='w+', dtype=d.dtype,
                                shape=d.shape, version=None)
        assert_(w[0].category is UserWarning)
        ma[...] = d
        del ma

    ma = format.open_memmap(tf, mode='r')
    assert_array_equal(ma, d)
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def test_basic(self):
        a = [1, 2, 3]
        assert_equal(insert(a, 0, 1), [1, 1, 2, 3])
        assert_equal(insert(a, 3, 1), [1, 2, 3, 1])
        assert_equal(insert(a, [1, 1, 1], [1, 2, 3]), [1, 1, 2, 3, 2, 3])
        assert_equal(insert(a, 1, [1, 2, 3]), [1, 1, 2, 3, 2, 3])
        assert_equal(insert(a, [1, -1, 3], 9), [1, 9, 2, 9, 3, 9])
        assert_equal(insert(a, slice(-1, None, -1), 9), [9, 1, 9, 2, 9, 3])
        assert_equal(insert(a, [-1, 1, 3], [7, 8, 9]), [1, 8, 2, 7, 3, 9])
        b = np.array([0, 1], dtype=np.float64)
        assert_equal(insert(b, 0, b[0]), [0., 0., 1.])
        assert_equal(insert(b, [], []), b)
        # Bools will be treated differently in the future:
        # assert_equal(insert(a, np.array([True]*4), 9), [9, 1, 9, 2, 9, 3, 9])
        with warnings.catch_warnings(record=True) as w:
            warnings.filterwarnings('always', '', FutureWarning)
            assert_equal(
                insert(a, np.array([True] * 4), 9), [1, 9, 9, 9, 9, 2, 3])
            assert_(w[0].category is FutureWarning)
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def test_out_nan(self):
        with warnings.catch_warnings(record=True):
            warnings.filterwarnings('always', '', RuntimeWarning)
            o = np.zeros((4,))
            d = np.ones((3, 4))
            d[2, 1] = np.nan
            assert_equal(np.percentile(d, 0, 0, out=o), o)
            assert_equal(
                np.percentile(d, 0, 0, interpolation='nearest', out=o), o)
            o = np.zeros((3,))
            assert_equal(np.percentile(d, 1, 1, out=o), o)
            assert_equal(
                np.percentile(d, 1, 1, interpolation='nearest', out=o), o)
            o = np.zeros(())
            assert_equal(np.percentile(d, 1, out=o), o)
            assert_equal(
                np.percentile(d, 1, interpolation='nearest', out=o), o)
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def test_basic(self):
        a0 = np.array(1)
        a1 = np.arange(2)
        a2 = np.arange(6).reshape(2, 3)
        assert_equal(np.median(a0), 1)
        assert_allclose(np.median(a1), 0.5)
        assert_allclose(np.median(a2), 2.5)
        assert_allclose(np.median(a2, axis=0), [1.5,  2.5,  3.5])
        assert_equal(np.median(a2, axis=1), [1, 4])
        assert_allclose(np.median(a2, axis=None), 2.5)

        a = np.array([0.0444502, 0.0463301, 0.141249, 0.0606775])
        assert_almost_equal((a[1] + a[3]) / 2., np.median(a))
        a = np.array([0.0463301, 0.0444502, 0.141249])
        assert_equal(a[0], np.median(a))
        a = np.array([0.0444502, 0.141249, 0.0463301])
        assert_equal(a[-1], np.median(a))
        # check array scalar result
        assert_equal(np.median(a).ndim, 0)
        a[1] = np.nan
        with warnings.catch_warnings(record=True) as w:
            warnings.filterwarnings('always', '', RuntimeWarning)
            assert_equal(np.median(a).ndim, 0)
            assert_(w[0].category is RuntimeWarning)
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def test_inplace_addition_array_type(self):
        # Test of inplace additions
        for t in self.othertypes:
            with warnings.catch_warnings(record=True) as w:
                warnings.filterwarnings("always")
                (x, y, xm) = (_.astype(t) for _ in self.uint8data)
                m = xm.mask
                a = arange(10, dtype=t)
                a[-1] = masked
                x += a
                xm += a
                assert_equal(x, y + a)
                assert_equal(xm, y + a)
                assert_equal(xm.mask, mask_or(m, a.mask))

                assert_equal(len(w), 0, "Failed on type=%s." % t)
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def test_inplace_subtraction_array_type(self):
        # Test of inplace subtractions
        for t in self.othertypes:
            with warnings.catch_warnings(record=True) as w:
                warnings.filterwarnings("always")
                (x, y, xm) = (_.astype(t) for _ in self.uint8data)
                m = xm.mask
                a = arange(10, dtype=t)
                a[-1] = masked
                x -= a
                xm -= a
                assert_equal(x, y - a)
                assert_equal(xm, y - a)
                assert_equal(xm.mask, mask_or(m, a.mask))

                assert_equal(len(w), 0, "Failed on type=%s." % t)
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def test_inplace_multiplication_array_type(self):
        # Test of inplace multiplication
        for t in self.othertypes:
            with warnings.catch_warnings(record=True) as w:
                warnings.filterwarnings("always")
                (x, y, xm) = (_.astype(t) for _ in self.uint8data)
                m = xm.mask
                a = arange(10, dtype=t)
                a[-1] = masked
                x *= a
                xm *= a
                assert_equal(x, y * a)
                assert_equal(xm, y * a)
                assert_equal(xm.mask, mask_or(m, a.mask))

                assert_equal(len(w), 0, "Failed on type=%s." % t)
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def test_inplace_floor_division_array_type(self):
        # Test of inplace division
        for t in self.othertypes:
            with warnings.catch_warnings(record=True) as w:
                warnings.filterwarnings("always")
                (x, y, xm) = (_.astype(t) for _ in self.uint8data)
                m = xm.mask
                a = arange(10, dtype=t)
                a[-1] = masked
                x //= a
                xm //= a
                assert_equal(x, y // a)
                assert_equal(xm, y // a)
                assert_equal(
                    xm.mask,
                    mask_or(mask_or(m, a.mask), (a == t(0)))
                )

                assert_equal(len(w), 0, "Failed on type=%s." % t)
项目:watchmen    作者:lycclsltt    | 项目源码 | 文件源码
def test_issue_3(self):
        """ undefined methods datetime_or_None, date_or_None """
        conn = self.connections[0]
        c = conn.cursor()
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore")
            c.execute("drop table if exists issue3")
        c.execute("create table issue3 (d date, t time, dt datetime, ts timestamp)")
        try:
            c.execute("insert into issue3 (d, t, dt, ts) values (%s,%s,%s,%s)", (None, None, None, None))
            c.execute("select d from issue3")
            self.assertEqual(None, c.fetchone()[0])
            c.execute("select t from issue3")
            self.assertEqual(None, c.fetchone()[0])
            c.execute("select dt from issue3")
            self.assertEqual(None, c.fetchone()[0])
            c.execute("select ts from issue3")
            self.assertTrue(isinstance(c.fetchone()[0], datetime.datetime))
        finally:
            c.execute("drop table issue3")
项目:watchmen    作者:lycclsltt    | 项目源码 | 文件源码
def test_issue_8(self):
        """ Primary Key and Index error when selecting data """
        conn = self.connections[0]
        c = conn.cursor()
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore")
            c.execute("drop table if exists test")
        c.execute("""CREATE TABLE `test` (`station` int(10) NOT NULL DEFAULT '0', `dh`
datetime NOT NULL DEFAULT '2015-01-01 00:00:00', `echeance` int(1) NOT NULL
DEFAULT '0', `me` double DEFAULT NULL, `mo` double DEFAULT NULL, PRIMARY
KEY (`station`,`dh`,`echeance`)) ENGINE=MyISAM DEFAULT CHARSET=latin1;""")
        try:
            self.assertEqual(0, c.execute("SELECT * FROM test"))
            c.execute("ALTER TABLE `test` ADD INDEX `idx_station` (`station`)")
            self.assertEqual(0, c.execute("SELECT * FROM test"))
        finally:
            c.execute("drop table test")
项目:watchmen    作者:lycclsltt    | 项目源码 | 文件源码
def test_issue_13(self):
        """ can't handle large result fields """
        conn = self.connections[0]
        cur = conn.cursor()
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore")
            cur.execute("drop table if exists issue13")
        try:
            cur.execute("create table issue13 (t text)")
            # ticket says 18k
            size = 18*1024
            cur.execute("insert into issue13 (t) values (%s)", ("x" * size,))
            cur.execute("select t from issue13")
            # use assertTrue so that obscenely huge error messages don't print
            r = cur.fetchone()[0]
            self.assertTrue("x" * size == r)
        finally:
            cur.execute("drop table issue13")
项目:watchmen    作者:lycclsltt    | 项目源码 | 文件源码
def test_issue_17(self):
        """could not connect mysql use passwod"""
        conn = self.connections[0]
        host = self.databases[0]["host"]
        db = self.databases[0]["db"]
        c = conn.cursor()

        # grant access to a table to a user with a password
        try:
            with warnings.catch_warnings():
                warnings.filterwarnings("ignore")
                c.execute("drop table if exists issue17")
            c.execute("create table issue17 (x varchar(32) primary key)")
            c.execute("insert into issue17 (x) values ('hello, world!')")
            c.execute("grant all privileges on %s.issue17 to 'issue17user'@'%%' identified by '1234'" % db)
            conn.commit()

            conn2 = pymysql.connect(host=host, user="issue17user", passwd="1234", db=db)
            c2 = conn2.cursor()
            c2.execute("select x from issue17")
            self.assertEqual("hello, world!", c2.fetchone()[0])
        finally:
            c.execute("drop table issue17")
项目:watchmen    作者:lycclsltt    | 项目源码 | 文件源码
def disabled_test_issue_54(self):
        conn = self.connections[0]
        c = conn.cursor()
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore")
            c.execute("drop table if exists issue54")
        big_sql = "select * from issue54 where "
        big_sql += " and ".join("%d=%d" % (i,i) for i in range(0, 100000))

        try:
            c.execute("create table issue54 (id integer primary key)")
            c.execute("insert into issue54 (id) values (7)")
            c.execute(big_sql)
            self.assertEqual(7, c.fetchone()[0])
        finally:
            c.execute("drop table issue54")
项目:watchmen    作者:lycclsltt    | 项目源码 | 文件源码
def setUp(self):
        super(TestDictCursor, self).setUp()
        self.conn = conn = self.connections[0]
        c = conn.cursor(self.cursor_type)

        # create a table ane some data to query
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore")
            c.execute("drop table if exists dictcursor")
            # include in filterwarnings since for unbuffered dict cursor warning for lack of table
            # will only be propagated at start of next execute() call
            c.execute("""CREATE TABLE dictcursor (name char(20), age int , DOB datetime)""")
        data = [("bob", 21, "1990-02-06 23:04:56"),
                ("jim", 56, "1955-05-09 13:12:45"),
                ("fred", 100, "1911-09-12 01:01:01")]
        c.executemany("insert into dictcursor values (%s,%s,%s)", data)
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def test_issue_3(self):
        """ undefined methods datetime_or_None, date_or_None """
        conn = self.connections[0]
        c = conn.cursor()
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore")
            c.execute("drop table if exists issue3")
        c.execute("create table issue3 (d date, t time, dt datetime, ts timestamp)")
        try:
            c.execute("insert into issue3 (d, t, dt, ts) values (%s,%s,%s,%s)", (None, None, None, None))
            c.execute("select d from issue3")
            self.assertEqual(None, c.fetchone()[0])
            c.execute("select t from issue3")
            self.assertEqual(None, c.fetchone()[0])
            c.execute("select dt from issue3")
            self.assertEqual(None, c.fetchone()[0])
            c.execute("select ts from issue3")
            self.assertTrue(isinstance(c.fetchone()[0], datetime.datetime))
        finally:
            c.execute("drop table issue3")
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def test_issue_8(self):
        """ Primary Key and Index error when selecting data """
        conn = self.connections[0]
        c = conn.cursor()
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore")
            c.execute("drop table if exists test")
        c.execute("""CREATE TABLE `test` (`station` int(10) NOT NULL DEFAULT '0', `dh`
datetime NOT NULL DEFAULT '2015-01-01 00:00:00', `echeance` int(1) NOT NULL
DEFAULT '0', `me` double DEFAULT NULL, `mo` double DEFAULT NULL, PRIMARY
KEY (`station`,`dh`,`echeance`)) ENGINE=MyISAM DEFAULT CHARSET=latin1;""")
        try:
            self.assertEqual(0, c.execute("SELECT * FROM test"))
            c.execute("ALTER TABLE `test` ADD INDEX `idx_station` (`station`)")
            self.assertEqual(0, c.execute("SELECT * FROM test"))
        finally:
            c.execute("drop table test")
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def test_issue_13(self):
        """ can't handle large result fields """
        conn = self.connections[0]
        cur = conn.cursor()
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore")
            cur.execute("drop table if exists issue13")
        try:
            cur.execute("create table issue13 (t text)")
            # ticket says 18k
            size = 18*1024
            cur.execute("insert into issue13 (t) values (%s)", ("x" * size,))
            cur.execute("select t from issue13")
            # use assertTrue so that obscenely huge error messages don't print
            r = cur.fetchone()[0]
            self.assertTrue("x" * size == r)
        finally:
            cur.execute("drop table issue13")
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def test_issue_17(self):
        """could not connect mysql use passwod"""
        conn = self.connections[0]
        host = self.databases[0]["host"]
        db = self.databases[0]["db"]
        c = conn.cursor()

        # grant access to a table to a user with a password
        try:
            with warnings.catch_warnings():
                warnings.filterwarnings("ignore")
                c.execute("drop table if exists issue17")
            c.execute("create table issue17 (x varchar(32) primary key)")
            c.execute("insert into issue17 (x) values ('hello, world!')")
            c.execute("grant all privileges on %s.issue17 to 'issue17user'@'%%' identified by '1234'" % db)
            conn.commit()

            conn2 = pymysql.connect(host=host, user="issue17user", passwd="1234", db=db)
            c2 = conn2.cursor()
            c2.execute("select x from issue17")
            self.assertEqual("hello, world!", c2.fetchone()[0])
        finally:
            c.execute("drop table issue17")
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def disabled_test_issue_54(self):
        conn = self.connections[0]
        c = conn.cursor()
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore")
            c.execute("drop table if exists issue54")
        big_sql = "select * from issue54 where "
        big_sql += " and ".join("%d=%d" % (i,i) for i in range(0, 100000))

        try:
            c.execute("create table issue54 (id integer primary key)")
            c.execute("insert into issue54 (id) values (7)")
            c.execute(big_sql)
            self.assertEqual(7, c.fetchone()[0])
        finally:
            c.execute("drop table issue54")
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def setUp(self):
        super(TestDictCursor, self).setUp()
        self.conn = conn = self.connections[0]
        c = conn.cursor(self.cursor_type)

        # create a table ane some data to query
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore")
            c.execute("drop table if exists dictcursor")
            # include in filterwarnings since for unbuffered dict cursor warning for lack of table
            # will only be propagated at start of next execute() call
            c.execute("""CREATE TABLE dictcursor (name char(20), age int , DOB datetime)""")
        data = [("bob", 21, "1990-02-06 23:04:56"),
                ("jim", 56, "1955-05-09 13:12:45"),
                ("fred", 100, "1911-09-12 01:01:01")]
        c.executemany("insert into dictcursor values (%s,%s,%s)", data)
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def expect_warnings(*messages):
    """Context manager to expect warnings with the given messages."""

    filters = [dict(action='ignore',
                    category=sa_exc.SAPendingDeprecationWarning)]
    if not messages:
        filters.append(dict(action='ignore',
                            category=sa_exc.SAWarning))
    else:
        filters.extend(dict(action='ignore',
                            message=message,
                            category=sa_exc.SAWarning)
                       for message in messages)
    for f in filters:
        warnings.filterwarnings(**f)
    try:
        yield
    finally:
        resetwarnings()
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def emits_warning_on(db, *warnings):
    """Mark a test as emitting a warning on a specific dialect.

    With no arguments, squelches all SAWarning failures.  Or pass one or more
    strings; these will be matched to the root of the warning description by
    warnings.filterwarnings().
    """
    spec = db_spec(db)

    @decorator
    def decorate(fn, *args, **kw):
        if isinstance(db, util.string_types):
            if not spec(config._current):
                return fn(*args, **kw)
            else:
                wrapped = emits_warning(*warnings)(fn)
                return wrapped(*args, **kw)
        else:
            if not _is_excluded(*db):
                return fn(*args, **kw)
            else:
                wrapped = emits_warning(*warnings)(fn)
                return wrapped(*args, **kw)
    return decorate
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def uses_deprecated(*messages):
    """Mark a test as immune from fatal deprecation warnings.

    With no arguments, squelches all SADeprecationWarning failures.
    Or pass one or more strings; these will be matched to the root
    of the warning description by warnings.filterwarnings().

    As a special case, you may pass a function name prefixed with //
    and it will be re-written as needed to match the standard warning
    verbiage emitted by the sqlalchemy.util.deprecated decorator.
    """

    @decorator
    def decorate(fn, *args, **kw):
        with expect_deprecated(*messages):
            return fn(*args, **kw)
    return decorate
项目:macos-st-packages    作者:zce    | 项目源码 | 文件源码
def run():  # pragma: no cover
    """Run Markdown from the command line."""

    # Parse options and adjust logging level if necessary
    options, logging_level = parse_options()
    if not options:
        sys.exit(2)
    logger.setLevel(logging_level)
    console_handler = logging.StreamHandler()
    logger.addHandler(console_handler)
    if logging_level <= WARNING:
        # Ensure deprecation warnings get displayed
        warnings.filterwarnings('default')
        logging.captureWarnings(True)
        warn_logger = logging.getLogger('py.warnings')
        warn_logger.addHandler(console_handler)

    # Run
    markdown.markdownFromFile(**options)
项目:AutoML-Challenge    作者:postech-mlg-exbrain    | 项目源码 | 文件源码
def fit(self, X, Y=None):
        import scipy.sparse
        import sklearn.decomposition

        self.preprocessor = sklearn.decomposition.KernelPCA(
            n_components=self.n_components, kernel=self.kernel,
            degree=self.degree, gamma=self.gamma, coef0=self.coef0,
            remove_zero_eig=True)

        if scipy.sparse.issparse(X):
            X = X.astype(np.float64)
        with warnings.catch_warnings():
            warnings.filterwarnings("error")
            self.preprocessor.fit(X)
        # Raise an informative error message, equation is based ~line 249 in
        # kernel_pca.py in scikit-learn
        if len(self.preprocessor.alphas_ / self.preprocessor.lambdas_) == 0:
            raise ValueError('KernelPCA removed all features!')
        return self
项目:AutoML-Challenge    作者:postech-mlg-exbrain    | 项目源码 | 文件源码
def fit(self, X, Y=None):
        import sklearn.decomposition

        self.preprocessor = sklearn.decomposition.FastICA(
            n_components=self.n_components, algorithm=self.algorithm,
            fun=self.fun, whiten=self.whiten, random_state=self.random_state
        )
        # Make the RuntimeWarning an Exception!
        with warnings.catch_warnings():
            warnings.filterwarnings("error")
            try:
                self.preprocessor.fit(X)
            except ValueError as e:
                if 'array must not contain infs or NaNs' in e.args[0]:
                    raise ValueError("Bug in scikit-learn: https://github.com/scikit-learn/scikit-learn/pull/2738")
                else:
                    import traceback
                    traceback.format_exc()
                    raise ValueError()

        return self
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def expect_warnings(*messages):
    """Context manager to expect warnings with the given messages."""

    filters = [dict(action='ignore',
                    category=sa_exc.SAPendingDeprecationWarning)]
    if not messages:
        filters.append(dict(action='ignore',
                            category=sa_exc.SAWarning))
    else:
        filters.extend(dict(action='ignore',
                            message=message,
                            category=sa_exc.SAWarning)
                       for message in messages)
    for f in filters:
        warnings.filterwarnings(**f)
    try:
        yield
    finally:
        resetwarnings()
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def emits_warning_on(db, *warnings):
    """Mark a test as emitting a warning on a specific dialect.

    With no arguments, squelches all SAWarning failures.  Or pass one or more
    strings; these will be matched to the root of the warning description by
    warnings.filterwarnings().
    """
    spec = db_spec(db)

    @decorator
    def decorate(fn, *args, **kw):
        if isinstance(db, util.string_types):
            if not spec(config._current):
                return fn(*args, **kw)
            else:
                wrapped = emits_warning(*warnings)(fn)
                return wrapped(*args, **kw)
        else:
            if not _is_excluded(*db):
                return fn(*args, **kw)
            else:
                wrapped = emits_warning(*warnings)(fn)
                return wrapped(*args, **kw)
    return decorate
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def uses_deprecated(*messages):
    """Mark a test as immune from fatal deprecation warnings.

    With no arguments, squelches all SADeprecationWarning failures.
    Or pass one or more strings; these will be matched to the root
    of the warning description by warnings.filterwarnings().

    As a special case, you may pass a function name prefixed with //
    and it will be re-written as needed to match the standard warning
    verbiage emitted by the sqlalchemy.util.deprecated decorator.
    """

    @decorator
    def decorate(fn, *args, **kw):
        with expect_deprecated(*messages):
            return fn(*args, **kw)
    return decorate
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _mkstemp(*args, **kw):
    old_open = os.open
    try:
        # temporarily bypass sandboxing
        os.open = os_open
        return tempfile.mkstemp(*args, **kw)
    finally:
        # and then put it back
        os.open = old_open


# Silence the PEP440Warning by default, so that end users don't get hit by it
# randomly just because they use pkg_resources. We want to append the rule
# because we want earlier uses of filterwarnings to take precedence over this
# one.
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def _mkstemp(*args, **kw):
    old_open = os.open
    try:
        # temporarily bypass sandboxing
        os.open = os_open
        return tempfile.mkstemp(*args, **kw)
    finally:
        # and then put it back
        os.open = old_open


# Silence the PEP440Warning by default, so that end users don't get hit by it
# randomly just because they use pkg_resources. We want to append the rule
# because we want earlier uses of filterwarnings to take precedence over this
# one.
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def _mkstemp(*args, **kw):
    old_open = os.open
    try:
        # temporarily bypass sandboxing
        os.open = os_open
        return tempfile.mkstemp(*args, **kw)
    finally:
        # and then put it back
        os.open = old_open


# Silence the PEP440Warning by default, so that end users don't get hit by it
# randomly just because they use pkg_resources. We want to append the rule
# because we want earlier uses of filterwarnings to take precedence over this
# one.
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def _mkstemp(*args, **kw):
    old_open = os.open
    try:
        # temporarily bypass sandboxing
        os.open = os_open
        return tempfile.mkstemp(*args, **kw)
    finally:
        # and then put it back
        os.open = old_open


# Silence the PEP440Warning by default, so that end users don't get hit by it
# randomly just because they use pkg_resources. We want to append the rule
# because we want earlier uses of filterwarnings to take precedence over this
# one.