Python warnings 模块,catch_warnings() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用warnings.catch_warnings()

项目:deprecation    作者:briancurtin    | 项目源码 | 文件源码
def fail_if_not_removed(method):
    """Decorate a test method to track removal of deprecated code

    This decorator catches :class:`~deprecation.UnsupportedWarning`
    warnings that occur during testing and causes unittests to fail,
    making it easier to keep track of when code should be removed.

    :raises: :class:`AssertionError` if an
             :class:`~deprecation.UnsupportedWarning`
             is raised while running the test method.
    """
    def _inner(*args, **kwargs):
        with warnings.catch_warnings(record=True) as caught_warnings:
            warnings.simplefilter("always")
            rv = method(*args, **kwargs)

        for warning in caught_warnings:
            if warning.category == UnsupportedWarning:
                raise AssertionError(
                    ("%s uses a function that should be removed: %s" %
                     (method, str(warning.message))))
        return rv
    return _inner
项目:deprecation    作者:briancurtin    | 项目源码 | 文件源码
def test_DeprecatedWarning_not_raised(self):
        ret_val = "lololol"

        class Test(object):
            @deprecation.deprecated(deprecated_in="2.0",
                                    removed_in="3.0",
                                    current_version="1.0")
            def method(self):
                """method docstring"""
                return ret_val

        with warnings.catch_warnings(record=True):
            # If a warning is raised it'll be an exception, so we'll fail.
            warnings.simplefilter("error")

            sot = Test()
            self.assertEqual(sot.method(), ret_val)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def cache_from_source(path, debug_override=None):
    """**DEPRECATED**

    Given the path to a .py file, return the path to its .pyc file.

    The .py file does not need to exist; this simply returns the path to the
    .pyc file calculated as if the .py file were imported.

    If debug_override is not None, then it must be a boolean and is used in
    place of sys.flags.optimize.

    If sys.implementation.cache_tag is None then NotImplementedError is raised.

    """
    with warnings.catch_warnings():
        warnings.simplefilter('ignore')
        return util.cache_from_source(path, debug_override)
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def test_security_dates_warning(self):

        # Build an asset with an end_date
        eq_end = pd.Timestamp('2012-01-01', tz='UTC')
        equity_asset = Equity(1, symbol="TESTEQ", end_date=eq_end)

        # Catch all warnings
        with warnings.catch_warnings(record=True) as w:
            # Cause all warnings to always be triggered
            warnings.simplefilter("always")
            equity_asset.security_start_date
            equity_asset.security_end_date
            equity_asset.security_name
            # Verify the warning
            self.assertEqual(3, len(w))
            for warning in w:
                self.assertTrue(issubclass(warning.category,
                                           DeprecationWarning))
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def test_analogsignal_shortening_warning(self):
        nio = NeuralynxIO(self.sn, use_cache='never')

        with reset_warning_registry():
            with warnings.catch_warnings(record=True) as w:
                seg = Segment('testsegment')
                nio.read_ncs(os.path.join(self.sn, 'Tet3a.ncs'), seg)

                self.assertGreater(len(w), 0)
                self.assertTrue(issubclass(w[0].category, UserWarning))
                self.assertTrue('Analogsignalarray was shortened due to gap in'
                                ' recorded data  of file'
                                in str(w[0].message))


# This class is copied from
# 'http://bugs.python.org/file40031/reset_warning_registry.py' by Eli Collins
# and is related to http://bugs.python.org/issue21724 Python<3.4
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def test_analogsignal_shortening_warning(self):
        nio = NeuralynxIO(self.sn, use_cache='never')

        with reset_warning_registry():
            with warnings.catch_warnings(record=True) as w:
                seg = Segment('testsegment')
                nio.read_ncs(os.path.join(self.sn, 'Tet3a.ncs'), seg)

                self.assertGreater(len(w), 0)
                self.assertTrue(issubclass(w[0].category, UserWarning))
                self.assertTrue('Analogsignalarray was shortened due to gap in'
                                ' recorded data  of file'
                                in str(w[0].message))


# This class is copied from
# 'http://bugs.python.org/file40031/reset_warning_registry.py' by Eli Collins
# and is related to http://bugs.python.org/issue21724 Python<3.4
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_callbacks_work_multiple_times(self):
        """
        Tests that multiple executions of execute on a batch statement
        logs a warning, and that we don't encounter an attribute error.
        @since 3.1
        @jira_ticket PYTHON-445
        @expected_result warning message is logged

        @test_category object_mapper
        """
        call_history = []

        def my_callback(*args, **kwargs):
            call_history.append(args)

        with warnings.catch_warnings(record=True) as w:
            with BatchQuery() as batch:
                batch.add_callback(my_callback)
                batch.execute()
            batch.execute()
        self.assertEqual(len(w), 2)  # package filter setup to warn always
        self.assertRegexpMatches(str(w[0].message), r"^Batch.*multiple.*")
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_disable_multiple_callback_warning(self):
        """
        Tests that multiple executions of a batch statement
        don't log a warning when warn_multiple_exec flag is set, and
        that we don't encounter an attribute error.
        @since 3.1
        @jira_ticket PYTHON-445
        @expected_result warning message is logged

        @test_category object_mapper
        """
        call_history = []

        def my_callback(*args, **kwargs):
            call_history.append(args)

        with patch('cassandra.cqlengine.query.BatchQuery.warn_multiple_exec', False):
            with warnings.catch_warnings(record=True) as w:
                with BatchQuery() as batch:
                    batch.add_callback(my_callback)
                    batch.execute()
                batch.execute()
            self.assertFalse(w)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def assert_warnings(fn, warning_msgs, regex=False):
    """Assert that each of the given warnings are emitted by fn."""

    from .assertions import eq_

    with warnings.catch_warnings(record=True) as log:
        # ensure that nothing is going into __warningregistry__
        warnings.filterwarnings("always")

        result = fn()
    for warning in log:
        popwarn = warning_msgs.pop(0)
        if regex:
            assert re.match(popwarn, str(warning.message))
        else:
            eq_(popwarn, str(warning.message))
    return result
项目:gitnet    作者:networks-lab    | 项目源码 | 文件源码
def test_warnings(self):
        """Are warnings printed at the right time?"""
        # Setting up a more complicated repository
        sub.call(["rm", "-rf", ".git"])
        sub.call(["cp", "-R", "repo_nx.git", ".git"])
        path = os.getcwd()
        nx_log = gitnet.get_log(path)

        # Note: Depending on the environment a warning may or may not be raised. For example, PyCharm uses UTF-8
        #       encoding and thus will not raised the unicode error.
        with warnings.catch_warnings(record=True) as w:
            # Ensure warnings are being shown
            warnings.simplefilter("always")
            # Trigger Warning
            nx_log.detect_dup_emails()
            # Check warning occurs
            self.assertTrue(len(w) == 0 or len(w) == 1)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_undecorated_coroutine(self):
        namespace = exec_test(globals(), locals(), """
        class Test(AsyncTestCase):
            async def test_coro(self):
                pass
        """)

        test_class = namespace['Test']
        test = test_class('test_coro')
        result = unittest.TestResult()

        # Silence "RuntimeWarning: coroutine 'test_coro' was never awaited".
        with warnings.catch_warnings():
            warnings.simplefilter('ignore')
            test.run(result)

        self.assertEqual(len(result.errors), 1)
        self.assertIn("should be decorated", result.errors[0][1])
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def twisted_coroutine_fetch(self, url, runner):
        body = [None]

        @gen.coroutine
        def f():
            # This is simpler than the non-coroutine version, but it cheats
            # by reading the body in one blob instead of streaming it with
            # a Protocol.
            client = Agent(self.reactor)
            response = yield client.request(b'GET', utf8(url))
            with warnings.catch_warnings():
                # readBody has a buggy DeprecationWarning in Twisted 15.0:
                # https://twistedmatrix.com/trac/changeset/43379
                warnings.simplefilter('ignore', category=DeprecationWarning)
                body[0] = yield readBody(response)
            self.stop_loop()
        self.io_loop.add_callback(f)
        runner()
        return body[0]
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_undecorated_coroutine(self):
        namespace = exec_test(globals(), locals(), """
        class Test(AsyncTestCase):
            async def test_coro(self):
                pass
        """)

        test_class = namespace['Test']
        test = test_class('test_coro')
        result = unittest.TestResult()

        # Silence "RuntimeWarning: coroutine 'test_coro' was never awaited".
        with warnings.catch_warnings():
            warnings.simplefilter('ignore')
            test.run(result)

        self.assertEqual(len(result.errors), 1)
        self.assertIn("should be decorated", result.errors[0][1])
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def twisted_coroutine_fetch(self, url, runner):
        body = [None]

        @gen.coroutine
        def f():
            # This is simpler than the non-coroutine version, but it cheats
            # by reading the body in one blob instead of streaming it with
            # a Protocol.
            client = Agent(self.reactor)
            response = yield client.request(b'GET', utf8(url))
            with warnings.catch_warnings():
                # readBody has a buggy DeprecationWarning in Twisted 15.0:
                # https://twistedmatrix.com/trac/changeset/43379
                warnings.simplefilter('ignore', category=DeprecationWarning)
                body[0] = yield readBody(response)
            self.stop_loop()
        self.io_loop.add_callback(f)
        runner()
        return body[0]
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_undecorated_coroutine(self):
        namespace = exec_test(globals(), locals(), """
        class Test(AsyncTestCase):
            async def test_coro(self):
                pass
        """)

        test_class = namespace['Test']
        test = test_class('test_coro')
        result = unittest.TestResult()

        # Silence "RuntimeWarning: coroutine 'test_coro' was never awaited".
        with warnings.catch_warnings():
            warnings.simplefilter('ignore')
            test.run(result)

        self.assertEqual(len(result.errors), 1)
        self.assertIn("should be decorated", result.errors[0][1])
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def twisted_coroutine_fetch(self, url, runner):
        body = [None]

        @gen.coroutine
        def f():
            # This is simpler than the non-coroutine version, but it cheats
            # by reading the body in one blob instead of streaming it with
            # a Protocol.
            client = Agent(self.reactor)
            response = yield client.request(b'GET', utf8(url))
            with warnings.catch_warnings():
                # readBody has a buggy DeprecationWarning in Twisted 15.0:
                # https://twistedmatrix.com/trac/changeset/43379
                warnings.simplefilter('ignore', category=DeprecationWarning)
                body[0] = yield readBody(response)
            self.stop_loop()
        self.io_loop.add_callback(f)
        runner()
        return body[0]
项目:cqlmapper    作者:reddit    | 项目源码 | 文件源码
def test_callbacks_work_multiple_times(self):
        """
        Tests that multiple executions of execute on a batch statement
        logs a warning, and that we don't encounter an attribute error.
        @since 3.1
        @jira_ticket PYTHON-445
        @expected_result warning message is logged

        @test_category object_mapper
        """
        call_history = []

        def my_callback(*args, **kwargs):
            call_history.append(args)

        with warnings.catch_warnings(record=True) as w:
            batch = Batch(self.conn)
            batch.add_callback(my_callback)
            batch.execute_batch()
            batch.execute_batch()
        self.assertEqual(len(w), 1)
        self.assertRegexpMatches(str(w[0].message), r"^Batch.*multiple.*")
项目:abusehelper    作者:Exploit-install    | 项目源码 | 文件源码
def execute(self):
        def showwarning(message, category, filename, fileno, file=None, line=None):
            msg = warnings.formatwarning(message, category, filename, fileno, line)
            self.log.warning(msg.strip())

        with warnings.catch_warnings():
            warnings.simplefilter("always")
            warnings.showwarning = showwarning

            try:
                return self.run()
            except SystemExit:
                raise
            except:
                self.log.critical(traceback.format_exc().strip())
                sys.exit(1)
项目:meme_get    作者:memegen    | 项目源码 | 文件源码
def guesscaption(simple=False):
    output = ""
    gf = guessformat()

    print("raw: ")
    for g in gf:
        print(guessline(g, True))
    print()

    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        for g in gf:
            gl = guessline(g, simple=simple)
            if not isgibber(gl):
                output += gl.replace(" !", "!").replace(" ?", "?") + "\n"

    return output
项目:properties    作者:aranzgeo    | 项目源码 | 文件源码
def test_backwards_compat(self):

        with warnings.catch_warnings(record=True) as w:

            class NewProperty(properties.Property):
                info_text = 'new property'

                def info(self):
                    return self.info_text

            assert len(w) == 2
            assert issubclass(w[0].category, FutureWarning)

            np = NewProperty('')

            assert getattr(np, 'class_info', None) == 'new property'
            assert getattr(np, 'info', None) == 'new property'
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def test_norm_hash_name(self):
        """norm_hash_name()"""
        from itertools import chain
        from passlib.crypto.digest import norm_hash_name, _known_hash_names

        # snapshot warning state, ignore unknown hash warnings
        ctx = warnings.catch_warnings()
        ctx.__enter__()
        self.addCleanup(ctx.__exit__)
        warnings.filterwarnings("ignore", '.*unknown hash')

        # test string types
        self.assertEqual(norm_hash_name(u("MD4")), "md4")
        self.assertEqual(norm_hash_name(b"MD4"), "md4")
        self.assertRaises(TypeError, norm_hash_name, None)

        # test selected results
        for row in chain(_known_hash_names, self.norm_hash_samples):
            for idx, format in enumerate(self.norm_hash_formats):
                correct = row[idx]
                for value in row:
                    result = norm_hash_name(value, format)
                    self.assertEqual(result, correct,
                                     "name=%r, format=%r:" % (value,
                                                              format))
项目:auto_ml    作者:doordash    | 项目源码 | 文件源码
def test_bad_val_in_column_descriptions():
    np.random.seed(0)

    df_titanic_train, df_titanic_test = utils.get_titanic_binary_classification_dataset()

    column_descriptions = {
        'survived': 'output'
        , 'embarked': 'categorical'
        , 'pclass': 'categorical'
        , 'fare': 'this_is_a_bad_value'
    }

    with warnings.catch_warnings(record=True) as w:

        ml_predictor = Predictor(type_of_estimator='classifier', column_descriptions=column_descriptions)
        print('we should be throwing a warning for the user to give them useful feedback')
        assert len(w) == 1
项目:auto_ml    作者:doordash    | 项目源码 | 文件源码
def test_throws_warning_when_fl_data_equals_df_train():
    df_titanic_train, df_titanic_test = utils.get_titanic_binary_classification_dataset()

    column_descriptions = {
        'survived': 'output'
        , 'embarked': 'categorical'
        , 'pclass': 'categorical'
    }

    ml_predictor = Predictor(type_of_estimator='classifier', column_descriptions=column_descriptions)

    with warnings.catch_warnings(record=True) as w:

        try:
            ml_predictor.train(df_titanic_train, feature_learning=True, fl_data=df_titanic_train)
        except KeyError as e:
            pass
    # We should not be getting to this line- we should be throwing an error above
        for thing in w:
            print(thing)
        assert len(w) == 1
项目:plotnine    作者:has2k1    | 项目源码 | 文件源码
def test_remove_missing():
    df = pd.DataFrame({'a': [1.0, np.NaN, 3, np.inf],
                       'b': [1, 2, 3, 4]})
    df2 = pd.DataFrame({'a': [1.0, 3, np.inf],
                       'b': [1, 3, 4]})
    df3 = pd.DataFrame({'a': [1.0, 3],
                       'b': [1, 3]})

    with warnings.catch_warnings(record=True) as w:
        res = remove_missing(df, na_rm=True, vars=['b'])
        res.equals(df)

        res = remove_missing(df)
        res.equals(df2)

        res = remove_missing(df, na_rm=True, finite=True)
        res.equals(df3)
        assert len(w) == 1
项目:google-drive-trash-cleaner    作者:cfbao    | 项目源码 | 文件源码
def get_credentials(flags):
    """Gets valid user credentials from storage.

    If nothing has been stored, or if the stored credentials are invalid,
    the OAuth2 flow is completed to obtain the new credentials.

    Returns:
        Credentials, the obtained credential.
    """
    store = Storage(flags.credfile)
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        credentials = store.get()
    if not credentials or credentials.invalid:
        flow = client.OAuth2WebServerFlow(**CLIENT_CREDENTIAL)
        credentials = tools.run_flow(flow, store, flags)
        print('credential file saved at\n\t' + flags.credfile)
    return credentials
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def test_loadTestsFromModule__load_tests(self):
        m = types.ModuleType('m')
        class MyTestCase(unittest2.TestCase):
            def test(self):
                pass
        m.testcase_1 = MyTestCase

        load_tests_args = []
        def load_tests(loader, tests, pattern):
            self.assertIsInstance(tests, unittest2.TestSuite)
            load_tests_args.extend((loader, tests, pattern))
            return tests
        m.load_tests = load_tests

        loader = unittest2.TestLoader()
        suite = loader.loadTestsFromModule(m)
        self.assertIsInstance(suite, unittest2.TestSuite)
        self.assertEqual(load_tests_args, [loader, suite, None])
        # With Python 3.5, the undocumented and unofficial use_load_tests is
        # ignored (and deprecated).
        load_tests_args = []
        with warnings.catch_warnings(record=False):
            warnings.simplefilter('never')
            suite = loader.loadTestsFromModule(m, use_load_tests=False)
            self.assertEqual(load_tests_args, [loader, suite, None])
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def test_loadTestsFromModule__use_load_tests_other_bad_keyword(self):
        m = types.ModuleType('m')
        class MyTestCase(unittest.TestCase):
            def test(self):
                pass
        m.testcase_1 = MyTestCase

        load_tests_args = []
        def load_tests(loader, tests, pattern):
            self.assertIsInstance(tests, unittest.TestSuite)
            load_tests_args.extend((loader, tests, pattern))
            return tests
        m.load_tests = load_tests
        loader = unittest.TestLoader()
        with warnings.catch_warnings():
            warnings.simplefilter('never')
            with self.assertRaises(TypeError) as cm:
                loader.loadTestsFromModule(
                    m, use_load_tests=False, very_bad=True, worse=False)
        self.assertEqual(type(cm.exception), TypeError)
        # The error message names the first bad argument alphabetically,
        # however use_load_tests (which sorts first) is ignored.
        self.assertEqual(
            str(cm.exception),
            "loadTestsFromModule() got an unexpected keyword argument 'very_bad'")
项目:codecad    作者:bluecube    | 项目源码 | 文件源码
def _build_program(self):
        code = "\n".join(itertools.chain(self.common_header.pieces,
                                         itertools.chain.from_iterable(cu.pieces for cu in self._compile_units)))
        program = pyopencl.Program(self.context, code)
        with warnings.catch_warnings():
            # Intel OpenCL generates non empty output and that causes warnings
            # from pyopencl. We just silence them.
            warnings.simplefilter("ignore")
            return program.build(options=DEFAULT_COMPILER_OPTIONS)

        # Working around bug in pyopencl.Program.compile in pyopencl
        # (https://lists.tiker.net/pipermail/pyopencl/2015-September/001986.html)
        # TODO: Fix this in OpenCL
        # compiled_units = [unit.compile(self.context, self.common_header.pieces)
        #                   for unit in self._compile_units]
        # return pyopencl.link_program(self.context, compiled_units)
项目:codecad    作者:bluecube    | 项目源码 | 文件源码
def _build_program(self):
        code = "\n".join(itertools.chain(self.common_header.pieces,
                                         itertools.chain.from_iterable(cu.pieces for cu in self._compile_units)))
        program = pyopencl.Program(self.context, code)
        with warnings.catch_warnings():
            # Intel OpenCL generates non empty output and that causes warnings
            # from pyopencl. We just silence them.
            warnings.simplefilter("ignore")
            return program.build(options=DEFAULT_COMPILER_OPTIONS)

        # Working around bug in pyopencl.Program.compile in pyopencl
        # (https://lists.tiker.net/pipermail/pyopencl/2015-September/001986.html)
        # TODO: Fix this in OpenCL
        # compiled_units = [unit.compile(self.context, self.common_header.pieces)
        #                   for unit in self._compile_units]
        # return pyopencl.link_program(self.context, compiled_units)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def test_pipe_handle(self):
        h, _ = windows_utils.pipe(overlapped=(True, True))
        _winapi.CloseHandle(_)
        p = windows_utils.PipeHandle(h)
        self.assertEqual(p.fileno(), h)
        self.assertEqual(p.handle, h)

        # check garbage collection of p closes handle
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", "",  ResourceWarning)
            del p
            support.gc_collect()
        try:
            _winapi.CloseHandle(h)
        except OSError as e:
            self.assertEqual(e.winerror, 6)     # ERROR_INVALID_HANDLE
        else:
            raise RuntimeError('expected ERROR_INVALID_HANDLE')
项目:deprecation    作者:briancurtin    | 项目源码 | 文件源码
def test_warning_raised(self):
        ret_val = "lololol"

        for test in [{"args": {},  # No args just means deprecated
                      "warning": deprecation.DeprecatedWarning},
                     {"args": {"deprecated_in": "1.0",
                               "current_version": "2.0"},
                      "warning": deprecation.DeprecatedWarning},
                     {"args": {"deprecated_in": "1.0",
                               "removed_in": "2.0",
                               "current_version": "2.0"},
                      "warning": deprecation.UnsupportedWarning}]:
            with self.subTest(**test):
                class Test(object):
                    @deprecation.deprecated(**test["args"])
                    def method(self):
                        return ret_val

                with warnings.catch_warnings(record=True) as caught_warnings:
                    warnings.simplefilter("always")

                    sot = Test()
                    self.assertEqual(ret_val, sot.method())

                self.assertEqual(len(caught_warnings), 1)
                self.assertEqual(caught_warnings[0].category, test["warning"])
项目:cbapi-examples    作者:cbcommunity    | 项目源码 | 文件源码
def main(argv):
    parser = build_cli_parser()
    opts, args = parser.parse_args(argv)
    if not opts.url or not opts.token or not opts.sensor:
        print "Missing required param; run with --help for usage"
        sys.exit(-1)

    cb = cbapi.CbApi(opts.url, token=opts.token, ssl_verify=opts.ssl_verify, ignore_system_proxy=True)
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        sensor = cb.sensor(opts.sensor)

    pprint.pprint(sensor)
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def __init__(self, period):
        # Quiet the warning about GTK Tooltip deprecation
        import warnings
        with warnings.catch_warnings():
            warnings.filterwarnings('ignore', category=DeprecationWarning)
            self.figure = pyplot.figure()

        self.figure.canvas.set_window_title('REDHAWK Speedometer')

        # Create a line graph of throughput over time
        self.axes = self.figure.add_subplot(111)
        self.axes.set_xlabel('Time')
        self.axes.set_ylabel('Throughput (Bps)')

        self.x = []
        self.y = []
        self.line, = self.axes.plot(self.x, self.y)
        self.axes.set_xlim(xmax=period)

        self.figure.subplots_adjust(bottom=0.2)

        sliderax = self.figure.add_axes([0.2, 0.05, 0.6, 0.05])
        min_size = 1024
        max_size = 8*1024*1024
        default_size = 1*1024*1024
        self.slider = matplotlib.widgets.Slider(sliderax, 'Transfer size', min_size, max_size,
                                                default_size, '%.0f')

        self.figure.show()
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def test_propertyInitialization(self):
        """
        Tests for the correct initialization of 'property' kind properties
        based on whether command line is set, and overrides via launch().
        """
        # First, test with defaults
        comp = sb.launch('sdr/dom/components/property_init/property_init.spd.xml')
        self.assertFalse('initial' in comp.cmdline_args)
        self.assertFalse('cmdline' in comp.initialize_props)
        comp.releaseObject()

        # Test with overrides
        comp = sb.launch('sdr/dom/components/property_init/property_init.spd.xml',
                         properties={'cmdline':'override', 'initial':'override'})
        self.assertFalse('initial' in comp.cmdline_args)
        self.assertFalse('cmdline' in comp.initialize_props)
        self.assertEquals('override', comp.cmdline)
        self.assertEquals('override', comp.initial)
        comp.releaseObject()

        # Test with overrides in deprecated 'execparams' and 'configure' arguments
        with warnings.catch_warnings():
            warnings.simplefilter('ignore', DeprecationWarning)
            comp = sb.launch('sdr/dom/components/property_init/property_init.spd.xml',
                             execparams={'cmdline':'override'}, configure={'initial':'override'})
        self.assertFalse('initial' in comp.cmdline_args)
        self.assertFalse('cmdline' in comp.initialize_props)
        self.assertEquals('override', comp.cmdline)
        self.assertEquals('override', comp.initial)
        comp.releaseObject()

        # Test with misplaced command line property in deprecated 'configure' argument
        comp = sb.launch('sdr/dom/components/property_init/property_init.spd.xml',
                         configure={'cmdline':'override'})
        self.assertFalse('initial' in comp.cmdline_args)
        self.assertFalse('cmdline' in comp.initialize_props)
        self.assertEquals('override', comp.cmdline)
        comp.releaseObject()
项目:npstreams    作者:LaurentRDC    | 项目源码 | 文件源码
def test_ignore_nan(self):
        """ Test that NaNs are handled correctly """
        stream = [np.random.random(size = (16,12)) for _ in range(5)]
        for s in stream:
            s[randint(0, 15), randint(0,11)] = np.nan

        with catch_warnings():
            simplefilter('ignore')
            from_iaverage = last(iaverage(stream, ignore_nan = True))  
        from_numpy = np.nanmean(np.dstack(stream), axis = 2)
        self.assertTrue(np.allclose(from_iaverage, from_numpy))
项目:npstreams    作者:LaurentRDC    | 项目源码 | 文件源码
def test_ddof(self):
        """ Test that the ddof parameter is equivalent to numpy's """
        stream = [np.random.random((16, 7, 3)) for _ in range(10)]
        stack = np.stack(stream, axis = -1)

        with catch_warnings():
            simplefilter('ignore')
            for axis in (0, 1, 2, None):
                for ddof in range(4):
                    with self.subTest('axis = {}, ddof = {}'.format(axis, ddof)):
                        from_numpy = np.var(stack, axis = axis, ddof = ddof)
                        from_ivar = last(ivar(stream, axis = axis, ddof = ddof))
                        self.assertSequenceEqual(from_numpy.shape, from_ivar.shape)
                        self.assertTrue(np.allclose(from_ivar, from_numpy))
项目:npstreams    作者:LaurentRDC    | 项目源码 | 文件源码
def test_against_numpy_std(self):
        stream = [np.random.random((16, 7, 3)) for _ in range(10)]
        stack = np.stack(stream, axis = -1)

        with catch_warnings():
            simplefilter('ignore')
            for axis in (0, 1, 2, None):
                for ddof in range(4):
                    with self.subTest('axis = {}, ddof = {}'.format(axis, ddof)):
                        from_numpy = np.std(stack, axis = axis, ddof = ddof)
                        from_ivar = last(istd(stream, axis = axis, ddof = ddof))
                        self.assertSequenceEqual(from_numpy.shape, from_ivar.shape)
                        self.assertTrue(np.allclose(from_ivar, from_numpy))
项目:cube_browser    作者:SciTools    | 项目源码 | 文件源码
def test_slider_slice_warning(self):
        index = 3
        kwargs = dict(grid_longitude=index)
        coords = ('time', 'grid_latitude')
        plot = Plot2D(self.cube, self.axes, coords=coords)
        plot.alias(wibble=2)
        wmsg = ("expected to be called with alias 'wibble' for dimension 2, "
                "rather than with 'grid_longitude'")
        with warnings.catch_warnings():
            # Cause all warnings to raise an exception.
            warnings.simplefilter('error')
            with self.assertRaisesRegexp(UserWarning, wmsg):
                plot(**kwargs)
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def test_auto_deltas_fail_warn(self):
        with warnings.catch_warnings(record=True) as ws:
            warnings.simplefilter('always')
            loader = BlazeLoader()
            expr = bz.data(self.df, dshape=self.dshape)
            from_blaze(
                expr,
                loader=loader,
                no_deltas_rule=no_deltas_rules.warn,
                missing_values=self.missing_values,
            )
        self.assertEqual(len(ws), 1)
        w = ws[0].message
        self.assertIsInstance(w, NoDeltasWarning)
        self.assertIn(str(expr), str(w))
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def test_warning_from_explicit_topic(self, valid_message_data):
        data_with_topic = self._make_message_data(
            valid_message_data,
            topic=str('explicit_topic')
        )
        with warnings.catch_warnings(record=True) as w:
            self.message_class(**data_with_topic)
            assert len(w) == 1
            assert "Passing in topics explicitly is deprecated." in w[0].message
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def test_specify_contains_pii_triggers_warnings(self, valid_message_data):
        message_data = self._make_message_data(
            valid_message_data,
            contains_pii=False
        )
        with warnings.catch_warnings(record=True) as warns:
            self.message_class(**message_data)
            assert len(warns) == 1
            contains_pii_warning = warns[0]
            assert issubclass(contains_pii_warning.category, DeprecationWarning)
            assert ('contains_pii is deprecated. Please stop passing it in.'
                    in contains_pii_warning.message)
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def test_specify_keys_triggers_warnings(self, valid_message_data):
        message_data = self._make_message_data(
            valid_message_data,
            keys=(1, 2)
        )
        with warnings.catch_warnings(record=True) as warns:
            self.message_class(**message_data)
            assert len(warns) == 1
            keys_warning = warns[0]
            assert issubclass(keys_warning.category, DeprecationWarning)
            assert ('Passing in keys explicitly is deprecated.'
                    in keys_warning.message)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def assertItemsEqual(self, expected_seq, actual_seq, msg=None):
        """An unordered sequence specific comparison. It asserts that
        actual_seq and expected_seq have the same element counts.
        Equivalent to::

            self.assertEqual(Counter(iter(actual_seq)),
                             Counter(iter(expected_seq)))

        Asserts that each element has the same count in both sequences.
        Example:
            - [0, 1, 1] and [1, 0, 1] compare equal.
            - [0, 0, 1] and [0, 1] compare unequal.
        """
        first_seq, second_seq = list(actual_seq), list(expected_seq)
        with warnings.catch_warnings():
            if sys.py3kwarning:
                # Silence Py3k warning raised during the sorting
                for _msg in ["(code|dict|type) inequality comparisons",
                             "builtin_function_or_method order comparisons",
                             "comparing unequal types"]:
                    warnings.filterwarnings("ignore", _msg, DeprecationWarning)
            try:
                first = collections.Counter(first_seq)
                second = collections.Counter(second_seq)
            except TypeError:
                # Handle case with unhashable elements
                differences = _count_diff_all_purpose(first_seq, second_seq)
            else:
                if first == second:
                    return
                differences = _count_diff_hashable(first_seq, second_seq)

        if differences:
            standardMsg = 'Element counts were not equal:\n'
            lines = ['First has %d, Second has %d:  %r' % diff for diff in differences]
            diffMsg = '\n'.join(lines)
            standardMsg = self._truncateMessage(standardMsg, diffMsg)
            msg = self._formatMessage(msg, standardMsg)
            self.fail(msg)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def initiate_send(self):
        while self.producer_fifo and self.connected:
            first = self.producer_fifo[0]
            # handle empty string/buffer or None entry
            if not first:
                del self.producer_fifo[0]
                if first is None:
                    self.handle_close()
                    return

            # handle classic producer behavior
            obs = self.ac_out_buffer_size
            try:
                with catch_warnings():
                    if py3kwarning:
                        filterwarnings("ignore", ".*buffer", DeprecationWarning)
                    data = buffer(first, 0, obs)
            except TypeError:
                data = first.more()
                if data:
                    self.producer_fifo.appendleft(data)
                else:
                    del self.producer_fifo[0]
                continue

            # send the data
            try:
                num_sent = self.send(data)
            except socket.error:
                self.handle_error()
                return

            if num_sent:
                if num_sent < len(data) or obs < len(first):
                    self.producer_fifo[0] = first[num_sent:]
                else:
                    del self.producer_fifo[0]
            # we tried to send some actual data
            return
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _sorted(iterable):
    with warnings.catch_warnings():
        if _sys.py3kwarning:
            warnings.filterwarnings("ignore", "comparing unequal types "
                                    "not supported", DeprecationWarning)
        return sorted(iterable)
项目:fccforensics    作者:RagtagOpen    | 项目源码 | 文件源码
def index_worker(self, queue, size=200):
        actions = []
        indexed = 0
        while True:
            item = queue.get()
            if item is None:
                break
            id_submission, analysis = item

            doc = {
                '_index': 'fcc-comments',
                '_type': 'document',
                '_op_type': 'update',
                '_id': id_submission,
                'doc': {'analysis': analysis},
            }
            actions.append(doc)

            if len(actions) == size:
                with warnings.catch_warnings():
                    warnings.simplefilter('ignore')
                    try:
                        response = bulk(self.es, actions)
                        indexed += response[0]
                        print('\tanalyzed %s/%s\t%s%%' % (indexed, self.limit,
                            int(indexed / self.limit * 100)))
                        actions = []
                    except ConnectionTimeout:
                        print('error indexing: connection timeout')

        with warnings.catch_warnings():
            warnings.simplefilter('ignore')
            response = bulk(self.es, actions)
            indexed += response[0]
            print('indexed %s' % (indexed))
项目:fccforensics    作者:RagtagOpen    | 项目源码 | 文件源码
def bulk_index(self, queue, size=20):

        actions = []
        indexed = 0
        ids = set()
        while True:
            item = queue.get()
            if item is None:
                break
            doc_id = item

            doc = {
                '_index': 'fcc-comments',
                '_type': 'document',
                '_op_type': 'update',
                '_id': doc_id,
                'doc': {'analysis.sentiment_sig_terms_ordered': True},
            }
            actions.append(doc)
            ids.add(doc_id)

            if len(actions) == size:
                with warnings.catch_warnings():
                    warnings.simplefilter('ignore')
                    try:
                        response = bulk(self.es, actions)
                        indexed += response[0]
                        if not indexed % 200:
                            print('\tindexed %s/%s\t%s%%' % (indexed, self.limit,
                                int(indexed / self.limit * 100)))
                        actions = []
                    except ConnectionTimeout:
                        print('error indexing: connection timeout')

        with warnings.catch_warnings():
            warnings.simplefilter('ignore')
            response = bulk(self.es, actions)
            indexed += response[0]
            print('indexed %s' % (indexed))
        ids = list(ids)
        #print('%s\n%s' % (len(ids), ' '.join(ids))
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def make_table(self, *args, **kwargs):
        # type: (*Any, **Any) -> str
        with warnings.catch_warnings():
            # PendingDeprecationWarning acknowledged.
            warnings.simplefilter("ignore")
            table = super(HtmlMultiDiff, self).make_table(*args, **kwargs)

            def unescape_zeroonetwo(m):
                return unichr(ord(m.group(1)) - ord('0'))

            table = re.sub('\x02([012])', unescape_zeroonetwo, table)
            return table
项目:giphycat    作者:kstrauser    | 项目源码 | 文件源码
def get_random_giphy(phrase):
    """Return the URL of a random GIF related to the phrase, if possible"""
    with warnings.catch_warnings():
        warnings.simplefilter('ignore')

        giphy = giphypop.Giphy()

    results = giphy.search_list(phrase=phrase, limit=100)

    if not results:
        raise ValueError('There were no results for that phrase')

    return random.choice(results).media_url
项目:psola    作者:jcreinhold    | 项目源码 | 文件源码
def test():
    """
    little test, NOT FOR REAL USE, put target filename as first and only argument
    will plot the pitch and strength v. time to screen
    safety is not guaranteed.
    """
    # imports specific to this test
    import sys
    import warnings
    from scipy.io import wavfile
    import matplotlib.pyplot as plt
    from psola.experiment_config import ExperimentConfig

    # get the data and do the estimation
    filename = sys.argv[1]    # filename is first command line arg
    cfg = ExperimentConfig()  # use default settings
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")  # ignore annoying WavFileWarning
        fs, data = wavfile.read(filename)
    pitch, t, strength = pitch_estimation(data, fs, cfg)

    # Plot estimated pitch and strength of pitch values
    f, (ax1, ax2) = plt.subplots(2, 1, sharex=True, figsize=(16,9))
    ax1.plot(t, pitch); ax1.set_title('Pitch v. Time'); ax1.set_ylabel('Freq (Hz)')
    ax2.plot(t, strength); ax2.set_title('Strength v. Time'); ax1.set_ylabel('Strength')
    plt.show()