Python unittest 模块,skip() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用unittest.skip()

项目:PyExPool    作者:eXascaleInfolab    | 项目源码 | 文件源码
def allocDelayProg(size, duration):
    """Python program as str that allocates object of the specified size
    in Gigabytes and then waits for the specified time

    size  - allocation size, bytes; None to skip allocation
    duration  - execution duration, sec; None to skip the sleep

    return  - Python program as str
    """
    return """from __future__ import print_function, division  # Required for stderr output, must be the first import
import sys
import time
import array

if {size} is not None:
    a = array.array('b', [0])
    asize = sys.getsizeof(a)
    # Note: allocate at least empty size, i.e. empty list
    buffer = a * int(max({size} - asize + 1, 0))
    #if _DEBUG_TRACE:
    # print(''.join(('  allocDelayProg(), allocated ', str(sys.getsizeof(buffer))
    #   , ' bytes for ', str({duration}),' sec')), file=sys.stderr)
if {duration} is not None:
    time.sleep({duration})
""".format(size=size, duration=duration)
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def skip_if_no_uuid(f):
    """Decorator to skip a test if uuid is not supported by Py/PG."""
    @wraps(f)
    def skip_if_no_uuid_(self):
        try:
            import uuid             # noqa
        except ImportError:
            return self.skipTest("uuid not available in this Python version")

        try:
            cur = self.conn.cursor()
            cur.execute("select typname from pg_type where typname = 'uuid'")
            has = cur.fetchone()
        finally:
            self.conn.rollback()

        if has:
            return f(self)
        else:
            return self.skipTest("uuid type not available on the server")

    return skip_if_no_uuid_
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def skip_if_no_uuid(f):
    """Decorator to skip a test if uuid is not supported by Py/PG."""
    @wraps(f)
    def skip_if_no_uuid_(self):
        try:
            import uuid             # noqa
        except ImportError:
            return self.skipTest("uuid not available in this Python version")

        try:
            cur = self.conn.cursor()
            cur.execute("select typname from pg_type where typname = 'uuid'")
            has = cur.fetchone()
        finally:
            self.conn.rollback()

        if has:
            return f(self)
        else:
            return self.skipTest("uuid type not available on the server")

    return skip_if_no_uuid_
项目:mooq    作者:jeremyarr    | 项目源码 | 文件源码
def test_callback_is_run(self):
        await self.GIVEN_ConsumerRegistered(queue_name="fake_consumer_queue",
                                      exchange_name="fake_exch",
                                      exchange_type="direct",
                                      routing_keys=["fake_routing_key"],
                                      callback = self.fake_callback)
        await self.GIVEN_MessagePublished(exchange_name="fake_exch",
                                    msg="fake_message",
                                    routing_key="fake_routing_key")
        await self.WHEN_ProcessEventsNTimes(2)

        self.THEN_CallbackReceivesMessage("fake_message")



    # @unittest.skip("skipped")
项目:mooq    作者:jeremyarr    | 项目源码 | 文件源码
def test_callback_is_not_run_if_routing_key_mismatch(self):
        fake_callback = Mock()
        await self.GIVEN_ConsumerRegistered(queue_name="fake_consumer_queue",
                                      exchange_name="fake_exch",
                                      exchange_type="direct",
                                      routing_keys=["fake_routing_key"],
                                      callback = fake_callback)

        await self.GIVEN_MessagePublished(exchange_name="fake_exch",
                                    msg="fake_message",
                                    routing_key="another_routing_key")

        await self.WHEN_ProcessEventsOnce()

        self.THEN_CallbackIsNotRun(fake_callback)


    # @unittest.skip("skipped")
项目:mooq    作者:jeremyarr    | 项目源码 | 文件源码
def test_callback_run_if_multiple_routing_keys(self):
        await self.GIVEN_ConsumerRegistered(queue_name="fake_consumer_queue",
                                      exchange_name="fake_exch",
                                      exchange_type="direct",
                                      routing_keys=["fake_routing_key","fake_routing_key2"],
                                      callback = self.fake_callback)

        await self.GIVEN_MessagePublished(exchange_name="fake_exch",
                                        msg="fake_message",
                                        routing_key="fake_routing_key2")

        await self.WHEN_ProcessEventsOnce()

        self.THEN_CallbackReceivesMessage("fake_message")


    # @unittest.skip("skipped")
项目:mooq    作者:jeremyarr    | 项目源码 | 文件源码
def test_callback_run_if_two_exclusive_queues_registered(self):
        await self.GIVEN_ConsumerRegistered(queue_name = None,
                                      exchange_name="fake_exch",
                                      exchange_type="direct",
                                      routing_keys=["fake_routing_key"],
                                      callback = self.fake_callback)

        await self.GIVEN_ConsumerRegistered(queue_name = None,
                                      exchange_name="fake_exch",
                                      exchange_type="direct",
                                      routing_keys=["fake_routing_key"],
                                      callback = self.fake_callback2)

        await self.GIVEN_MessagePublished(exchange_name="fake_exch",
                                    msg="fake_message",
                                    routing_key="fake_routing_key")

        await self.WHEN_ProcessEventsOnce()

        self.THEN_CallbackReceivesMessage("fake_message")
        self.THEN_Callback2ReceivesMessage("fake_message")


    # @unittest.skip("skipped")
项目:mooq    作者:jeremyarr    | 项目源码 | 文件源码
def test_routes_to_all_consumer_queues(self):
        await self.GIVEN_ConsumerRegistered(queue_name="fake_consumer_queue1",
                                      exchange_name="fake_exch",
                                      exchange_type="fanout",
                                      routing_keys="",
                                      callback = self.fake_callback)

        await self.GIVEN_ConsumerRegistered(queue_name="fake_consumer_queue2",
                                      exchange_name="fake_exch",
                                      exchange_type="fanout",
                                      routing_keys="",
                                      callback = self.fake_callback2)

        await self.GIVEN_MessagePublished(exchange_name="fake_exch",
                                    msg="fake_msg",
                                    routing_key="")

        await self.WHEN_ProcessEventsNTimes(1)

        self.THEN_CallbackReceivesMessage("fake_msg")
        self.THEN_Callback2ReceivesMessage("fake_msg")


# @unittest.skip("skipped")
项目:mooq    作者:jeremyarr    | 项目源码 | 文件源码
def test_direct_runs_callback(self):
        await self.GIVEN_ProducerRegistered(exchange_name="fake_direct_exch",
                                      exchange_type="direct")

        await self.GIVEN_ConsumerRegistered(queue_name="fake_direct_consumer_queue",
                                      exchange_name="fake_direct_exch",
                                      exchange_type="direct",
                                      routing_keys=["fake_routing_key"],
                                      callback = self.fake_callback)

        await self.GIVEN_MessagePublished(exchange_name="fake_direct_exch",
                                    msg="fake_message",
                                    routing_key="fake_routing_key")

        await self.WHEN_ProcessEventsOnce()

        self.THEN_CallbackReceivesMessage("fake_message")

    # @unittest.skip("skipped")
项目:mooq    作者:jeremyarr    | 项目源码 | 文件源码
def test_topic_runs_callback(self):
        await self.GIVEN_ProducerRegistered(exchange_name="fake_topic_exch",
                                      exchange_type="topic")

        await self.GIVEN_ConsumerRegistered(queue_name="fake_topic_consumer_queue",
                                      exchange_name="fake_topic_exch",
                                      exchange_type="topic",
                                      routing_keys=["*.red","ball.*"],
                                      callback = self.fake_callback)

        await self.GIVEN_MessagePublished(exchange_name="fake_topic_exch",
                                    msg="fake_message",
                                    routing_key="apple.red")

        await self.WHEN_ProcessEventsOnce()

        self.THEN_CallbackReceivesMessage("fake_message")

    # @unittest.skip("skipped")
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_skipping(self):
        class Foo(unittest.TestCase):
            def test_skip_me(self):
                self.skipTest("skip")
        events = []
        result = LoggingResult(events)
        test = Foo("test_skip_me")
        test.run(result)
        self.assertEqual(events, ['startTest', 'addSkip', 'stopTest'])
        self.assertEqual(result.skipped, [(test, "skip")])

        # Try letting setUp skip the test now.
        class Foo(unittest.TestCase):
            def setUp(self):
                self.skipTest("testing")
            def test_nothing(self): pass
        events = []
        result = LoggingResult(events)
        test = Foo("test_nothing")
        test.run(result)
        self.assertEqual(events, ['startTest', 'addSkip', 'stopTest'])
        self.assertEqual(result.skipped, [(test, "testing")])
        self.assertEqual(result.testsRun, 1)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_decorated_skip(self):
        def decorator(func):
            def inner(*a):
                return func(*a)
            return inner

        class Foo(unittest.TestCase):
            @decorator
            @unittest.skip('testing')
            def test_1(self):
                pass

        result = unittest.TestResult()
        test = Foo("test_1")
        suite = unittest.TestSuite([test])
        suite.run(result)
        self.assertEqual(result.skipped, [(test, "testing")])
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_class_not_setup_or_torndown_when_skipped(self):
        class Test(unittest.TestCase):
            classSetUp = False
            tornDown = False
            @classmethod
            def setUpClass(cls):
                Test.classSetUp = True
            @classmethod
            def tearDownClass(cls):
                Test.tornDown = True
            def test_one(self):
                pass

        Test = unittest.skip("hop")(Test)
        self.runTests(Test)
        self.assertFalse(Test.classSetUp)
        self.assertFalse(Test.tornDown)
项目:son-mano-framework    作者:sonata-nfv    | 项目源码 | 文件源码
def test_notification_pub_sub_mix(self):
        """
        Test notification messaging pattern mixed with basic pub/sub calls.
        """
        self.m.register_notification_endpoint(self._simple_subscribe_cbf1, "test.notification1")
        self.m.subscribe(self._simple_subscribe_cbf1, "test.notification2")
        time.sleep(0.5)  # give broker some time to register subscriptions
        # send publish to notify endpoint
        self.m.publish("test.notification1", "my-notification1")
        self.assertEqual(self.wait_for_messages()[0], "my-notification1")
        # send notify to subscribe endpoint
        self.m.notify("test.notification2", "my-notification2")
        #res = self.wait_for_messages(n_messages=2)
        self.assertTrue(self.wait_for_particular_messages("my-notification1"))
        self.assertTrue(self.wait_for_particular_messages("my-notification2"))

    #@unittest.skip("disabled")
项目:son-mano-framework    作者:sonata-nfv    | 项目源码 | 文件源码
def test_double_subscriptions(self):
        """
        Ensure that messages are delivered to all subscriptions of a topic.
        (e.g. identifies queue setup problems)
        :return:
        """
        self.m.subscribe(self._simple_subscribe_cbf1, "test.interleave")
        self.m.subscribe(self._simple_subscribe_cbf2, "test.interleave")
        time.sleep(0.5)
        # send publish to notify endpoint
        self.m.publish("test.interleave", "my-notification1")
        # enusre that it is received by each subscription
        self.assertTrue(self.wait_for_particular_messages("my-notification1", buffer=0))
        self.assertTrue(self.wait_for_particular_messages("my-notification1", buffer=1))

    #@unittest.skip("disabled")
项目:zamia-prolog    作者:gooofy    | 项目源码 | 文件源码
def test_strings(self):

        clause = self.parser.parse_line_clause_body('X is \'bar\', S is format_str(\'test %d %s foo\', 42, X)')
        solutions = self.rt.search(clause)
        self.assertEqual (solutions[0]['S'].s, 'test 42 bar foo')

        clause = self.parser.parse_line_clause_body('X is \'foobar\', sub_string(X, 0, 2, _, Y)')
        solutions = self.rt.search(clause)
        self.assertEqual (solutions[0]['Y'].s, 'fo')

        clause = self.parser.parse_line_clause_body('atom_chars(foo, X), atom_chars(Y, "bar").')
        solutions = self.rt.search(clause)
        self.assertEqual (solutions[0]['X'].s, 'foo')
        self.assertEqual (solutions[0]['Y'].name, 'bar')

    # @unittest.skip("temporarily disabled")
项目:zamia-prolog    作者:gooofy    | 项目源码 | 文件源码
def test_setz_multi(self):

        # self.rt.set_trace(True)

        clause = self.parser.parse_line_clause_body('I is ias2, setz(ias (I, a, _), a), setz(ias (I, b, _), b), setz(ias (I, c, _), c), ias(I, X, Y). ')
        solutions = self.rt.search(clause)
        logging.debug(repr(solutions))
        self.assertEqual (len(solutions), 3)
        self.assertEqual (solutions[0]['X'].name, 'a')
        self.assertEqual (solutions[0]['Y'].name, 'a')
        self.assertEqual (solutions[1]['X'].name, 'b')
        self.assertEqual (solutions[1]['Y'].name, 'b')
        self.assertEqual (solutions[2]['X'].name, 'c')
        self.assertEqual (solutions[2]['Y'].name, 'c')

    # @unittest.skip("temporarily disabled")
项目:zamia-prolog    作者:gooofy    | 项目源码 | 文件源码
def test_custom_builtins(self):

        global recorded_moves

        self.parser.compile_file('samples/hanoi2.pl', UNITTEST_MODULE)

        clause = self.parser.parse_line_clause_body('move(3,left,right,center)')
        logging.debug('clause: %s' % clause)

        # register our custom builtin
        recorded_moves = []
        self.rt.register_builtin('record_move', record_move)

        solutions = self.rt.search(clause)
        logging.debug('solutions: %s' % repr(solutions))
        self.assertEqual (len(solutions), 1)

        self.assertEqual (len(recorded_moves), 7)

    #@unittest.skip("temporarily disabled")
项目:zamia-prolog    作者:gooofy    | 项目源码 | 文件源码
def test_double_negation(self):

        self.parser.compile_file('samples/not_test.pl', UNITTEST_MODULE)

        clause = self.parser.parse_line_clause_body('not(not(chancellor(helmut_kohl))).')
        logging.debug('clause: %s' % clause)
        solutions = self.rt.search(clause, {})
        logging.debug('solutions: %s' % repr(solutions))
        self.assertEqual (len(solutions), 1)

        clause = self.parser.parse_line_clause_body('not(not(chancellor(angela_merkel))).')
        logging.debug('clause: %s' % clause)
        solutions = self.rt.search(clause, {})
        logging.debug('solutions: %s' % repr(solutions))
        self.assertEqual (len(solutions), 1)

        clause = self.parser.parse_line_clause_body('not(not(chancellor(X))).')
        logging.debug('clause: %s' % clause)
        solutions = self.rt.search(clause, {})
        logging.debug('solutions: %s' % repr(solutions))
        self.assertEqual (len(solutions), 2)

    # @unittest.skip("temporarily disabled")
项目:zamia-prolog    作者:gooofy    | 项目源码 | 文件源码
def test_parse_line_clauses(self):

        line = 'time_span(TE) :- date_time_stamp(+(D, 1.0)).'

        tree = self.parser.parse_line_clauses(line)
        logging.debug (unicode(tree[0].body))
        self.assertEqual (tree[0].body.name, 'date_time_stamp')
        self.assertEqual (tree[0].head.name, 'time_span')

        line = 'time_span(tomorrow, TS, TE) :- context(currentTime, T), stamp_date_time(T, date(Y, M, D, H, Mn, S, "local")), date_time_stamp(date(Y, M, +(D, 1.0), 0.0, 0.0, 0.0, "local"), TS), date_time_stamp(date(Y, M, +(D, 1.0), 23.0, 59.0, 59.0, "local"), TE).'

        tree = self.parser.parse_line_clauses(line)
        logging.debug (unicode(tree[0].body))
        self.assertEqual (tree[0].head.name, 'time_span')
        self.assertEqual (tree[0].body.name, 'and')
        self.assertEqual (len(tree[0].body.args), 4)

    # @unittest.skip("temporarily disabled")
项目:zamia-prolog    作者:gooofy    | 项目源码 | 文件源码
def test_cut(self):

        self.parser.compile_file('samples/cut_test.pl', UNITTEST_MODULE)

        # self.rt.set_trace(True)

        clause = self.parser.parse_line_clause_body(u'bar(R, X)')
        logging.debug(u'clause: %s' % clause)
        solutions = self.rt.search(clause)
        logging.debug('solutions: %s' % repr(solutions))
        self.assertEqual (len(solutions), 4)
        self.assertEqual (solutions[0]['R'].s, "one")
        self.assertEqual (solutions[1]['R'].s, "two")
        self.assertEqual (solutions[2]['R'].s, "many")
        self.assertEqual (solutions[3]['R'].s, "many")

    # @unittest.skip("temporarily disabled")
项目:sptgraph    作者:epfl-lts2    | 项目源码 | 文件源码
def test_create_signal_graph(self):
        node_signal = sptgraph.create_node_signal(gen_signal(), 'baseID', 'layer', False)

        for is_directed in [True, False]:
            g = utils.networkx_to_graphlab(gen_graph(is_directed))
            sg = sptgraph.merge_signal_on_graph(g, node_signal, 'baseID', 'layer', use_fast=False, verbose=False)

            # Node 5 is never activated
            self.assertEqual(4, len(sg.vertices))

            if not is_directed:
                self.assertEqual(8, len(sg.edges))
            else:
                self.assertEqual(4, len(sg.edges))

            actual_columns = set(sg.vertices.column_names())
            expected_columns = {'layers', 'node_weight', '__id'}
            self.assertItemsEqual(expected_columns, actual_columns)

    # @unittest.skip('Skipping aggregate layers')
项目:sptgraph    作者:epfl-lts2    | 项目源码 | 文件源码
def test_aggregate_layers(self):
        signal = gl.SFrame(gen_signal())
        nb_layers = signal['layer'].max() + 1  # starts at 0

        # Python 'slow'
        original = sptgraph.create_node_signal(signal, 'baseID', 'layer', False)
        # Fast c++ version
        res = sptgraph_fast.aggregate_layers(signal, 'baseID', 'layer', nb_layers)

        # Transform output to compare
        l1 = original['layers'].apply(int)
        l2 = res['layers'].apply(utils.reform_layer_int_from_blocks)
        m = l1 == l2
        self.assertTrue(m.all(), 'Layers should be equal')

    # @unittest.skip('Skipping rebuilt_bitset')
项目:sptgraph    作者:epfl-lts2    | 项目源码 | 文件源码
def test_directed_no_self_edge(self):
        # Directed no self-edge
        g = sptgraph.create_spatio_temporal_graph(gen_graph(True), gen_signal(), False, verbose=False)

        # The graph has only 1 connected component
        h = components.find_connected_components(g)
        cc = components.create_component_sframe(h)
        comps = components.extract_components(h, cc)
        self.assertEqual(1, len(comps))

        # We remove the edge (7, 12) to create 2 weakly connected components
        nodes = g.vertices
        edges = g.edges.add_row_number('eid')
        to_remove = g.get_edges(7, 12)
        edges = edges[edges['eid'] != to_remove['eid'][0]]
        g = gl.SGraph(nodes, edges)

        h = components.find_connected_components(g)
        cc = components.create_component_sframe(h)
        comps = components.extract_components(h, cc)
        self.assertEqual(2, len(comps))

    # @unittest.skip('Skipping undirected self edge')
项目:expan    作者:zalando    | 项目源码 | 文件源码
def test_bayes_factor_delta(self):
        ndecimals = 5
        res = self.getExperiment(['normal_same']).delta(method='bayes_factor', num_iters=2000)

        variants = find_list_of_dicts_element(res['kpis'], 'name', 'normal_same', 'variants')
        aStats   = find_list_of_dicts_element(variants, 'name', 'A', 'delta_statistics')
        self.assertNumericalEqual(aStats['delta'], 0.033053, ndecimals)

        self.assertEqual(aStats['stop'],      True, ndecimals)
        self.assertEqual(aStats['number_of_iterations'], 2000, ndecimals)

        self.assertNumericalEqual(aStats['confidence_interval'][0]['value'], -0.00829, ndecimals)
        self.assertNumericalEqual(aStats['confidence_interval'][1]['value'],  0.07127, ndecimals)

        self.assertEqual(aStats['treatment_sample_size'], 6108)
        self.assertEqual(aStats['control_sample_size'],   3892)

        self.assertNumericalEqual(aStats['treatment_mean'],  0.025219, ndecimals)
        self.assertNumericalEqual(aStats['control_mean'],   -0.007833, ndecimals)

        self.assertNumericalEqual(aStats['statistical_power'], 0.36401, ndecimals)


    # @unittest.skip("sometimes takes too much time")
项目:expan    作者:zalando    | 项目源码 | 文件源码
def test_bayes_precision_delta(self):
        ndecimals = 5
        res = self.getExperiment(['normal_same']).delta(method='bayes_precision', num_iters=2000)

        variants = find_list_of_dicts_element(res['kpis'], 'name', 'normal_same', 'variants')
        aStats   = find_list_of_dicts_element(variants, 'name', 'A', 'delta_statistics')
        self.assertNumericalEqual(aStats['delta'], 0.033053, ndecimals)

        self.assertEqual(aStats['stop'], True, ndecimals)
        self.assertEqual(aStats['number_of_iterations'], 2000, ndecimals)

        self.assertNumericalEqual(aStats['confidence_interval'][0]['value'], -0.00829, ndecimals)
        self.assertNumericalEqual(aStats['confidence_interval'][1]['value'],  0.07127, ndecimals)

        self.assertEqual(aStats['treatment_sample_size'], 6108)
        self.assertEqual(aStats['control_sample_size'],   3892)

        self.assertNumericalEqual(aStats['treatment_mean'],  0.025219, ndecimals)
        self.assertNumericalEqual(aStats['control_mean'],   -0.007833, ndecimals)

        self.assertNumericalEqual(aStats['statistical_power'], 0.36401, ndecimals)


    # @unittest.skip("sometimes takes too much time")
项目:expan    作者:zalando    | 项目源码 | 文件源码
def test_bayes_factor(self):
        """
        Check the Bayes factor function.
        """
        res = es.bayes_factor(self.rand_s1, self.rand_s2, num_iters=2000)

        self.assertEqual       (res['stop'],                  True)
        self.assertAlmostEqual (res['delta'],                -0.15887364780635896)
        value025 = find_list_of_dicts_element(res['confidence_interval'], 'percentile',  2.5, 'value')
        value975 = find_list_of_dicts_element(res['confidence_interval'], 'percentile', 97.5, 'value')
        self.assertAlmostEqual (value025,                     -0.24293384641452503)
        self.assertAlmostEqual (value975,                     -0.075064346336461404)
        self.assertEqual       (res['treatment_sample_size'],  1000)
        self.assertEqual       (res['control_sample_size'],    1000)
        self.assertAlmostEqual (res['treatment_mean'],        -0.045256707490195384)
        self.assertAlmostEqual (res['control_mean'],           0.11361694031616358)

    # @unittest.skip("sometimes takes too much time")
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def test_get_by_user(self):
        response = self.admin_open(self.item_url('FooBarUser'))
        self.assertAPIError(response, 404, 'UserNotFound')

        response = self.admin_open(self.item_url(self.user.username))
        self.assert200(response)  # only Admin has permission

        validator = UsageResponseValidator()
        if not validator.validate_get(response.json):
            self.fail(validator.errors)

        # first user only
        self.assertEqual(len(response.json['data']['ip_usage']), 2)
        # first user only
        self.assertEqual(len(response.json['data']['pd_usage']), 2)

    # @unittest.skip('')
项目:CSB    作者:csb-toolbox    | 项目源码 | 文件源码
def skip(reason, condition=None):
    """
    Mark a test case or method for skipping.

    @param reason: message
    @type reason: str
    @param condition: skip only if the specified condition is True
    @type condition: bool/expression
    """
    if isinstance(reason, types.FunctionType):
        raise TypeError('skip: no reason specified')

    if condition is None:
        return unittest.skip(reason)
    else:
        return unittest.skipIf(condition, reason)
项目:nittygriddy    作者:cbourjau    | 项目源码 | 文件源码
def test_download_one_file(self):
        try:
            shutil.rmtree("/tmp/nitty_test")
        except:
            pass
        remote_path = "/alice/cern.ch/user/c/cbourjau/nitty_test/AnalysisResults.root"
        utils.download_file(remote_path,
                            "/tmp/nitty_test/AnalysisResults.root")
        self.assertTrue(os.path.isfile("/tmp/nitty_test/AnalysisResults.root"))
        shutil.rmtree("/tmp/nitty_test")
        # without specifiying dest file name:
        utils.download_file(remote_path,
                            "/tmp/nitty_test/")
        self.assertTrue(os.path.isfile("/tmp/nitty_test/AnalysisResults.root"))
        shutil.rmtree("/tmp/nitty_test")

    # @skip("Skip download test")
项目:datatest    作者:shawnbrown    | 项目源码 | 文件源码
def mandatory(test_item):
    """A decorator to mark whole test cases or individual methods as
    mandatory.  If a mandatory test fails, DataTestRunner will stop
    immediately (this is similar to the ``--failfast`` command line
    argument behavior)::

        @datatest.mandatory
        class TestFileFormat(datatest.DataTestCase):
            def test_columns(self):
                ...
    """
    test_item.__datatest_mandatory__ = True
    return test_item


# The standard unittest.skip decorators are reimplemented to add a
# _wrapped attribute that points to the orignal object so that the
# _sort_key() function can find the proper line number when test_item
# gets wrapped by functools.wraps().
项目:datatest    作者:shawnbrown    | 项目源码 | 文件源码
def skip(reason):
    """A decorator to unconditionally skip a test:

    .. code-block:: python

        @datatest.skip('Not finished collecting raw data.')
        class TestSumTotals(datatest.DataTestCase):
            def test_totals(self):
                ...
    """
    def decorator(test_item):
        if not isinstance(test_item, type):
            orig_item = test_item           # <- Not in unittest.skip()
            @functools.wraps(test_item)
            def skip_wrapper(*args, **kwargs):
                raise unittest.SkipTest(reason)
            test_item = skip_wrapper
            test_item._wrapped = orig_item  # <- Not in unittest.skip()

        test_item.__unittest_skip__ = True
        test_item.__unittest_skip_why__ = reason
        return test_item
    return decorator
项目:pbtranscript    作者:PacificBiosciences    | 项目源码 | 文件源码
def _check_unchunked_files(self, d):
        self.assertEqual(d['$chunk.ref_contigset_id'], self.INPUT_FILES[1])
        self.assertEqual(d['$chunk.ccsset_id'], self.INPUT_FILES[2])


#@unittest.skip("GMAP disabled")
#class TestScatterContigSetGMAP(ContigSetScatterBase,
#                               pbcommand.testkit.core.PbTestScatterApp):
#    DRIVER_BASE = "python -m pbtranscript.tasks.scatter_contigset_gmap"
#    INPUT_FILES = [  # identical to TestIcePartial inputs
#        op.join(MNT_DATA, "sa3", "nfl.contigset.xml"),
#        "/pbi/dept/secondary/siv/references/rat_UCSC/rat_UCSC.referenceset.xml",
#    ]
#
#    def _check_unchunked_files(self, d):
#        self.assertEqual(d['$chunk.reference_id'], self.INPUT_FILES[1])
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_skipping(self):
        class Foo(unittest.TestCase):
            def test_skip_me(self):
                self.skipTest("skip")
        events = []
        result = LoggingResult(events)
        test = Foo("test_skip_me")
        test.run(result)
        self.assertEqual(events, ['startTest', 'addSkip', 'stopTest'])
        self.assertEqual(result.skipped, [(test, "skip")])

        # Try letting setUp skip the test now.
        class Foo(unittest.TestCase):
            def setUp(self):
                self.skipTest("testing")
            def test_nothing(self): pass
        events = []
        result = LoggingResult(events)
        test = Foo("test_nothing")
        test.run(result)
        self.assertEqual(events, ['startTest', 'addSkip', 'stopTest'])
        self.assertEqual(result.skipped, [(test, "testing")])
        self.assertEqual(result.testsRun, 1)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_skip_doesnt_run_setup(self):
        class Foo(unittest.TestCase):
            wasSetUp = False
            wasTornDown = False
            def setUp(self):
                Foo.wasSetUp = True
            def tornDown(self):
                Foo.wasTornDown = True
            @unittest.skip('testing')
            def test_1(self):
                pass

        result = unittest.TestResult()
        test = Foo("test_1")
        suite = unittest.TestSuite([test])
        suite.run(result)
        self.assertEqual(result.skipped, [(test, "testing")])
        self.assertFalse(Foo.wasSetUp)
        self.assertFalse(Foo.wasTornDown)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_decorated_skip(self):
        def decorator(func):
            def inner(*a):
                return func(*a)
            return inner

        class Foo(unittest.TestCase):
            @decorator
            @unittest.skip('testing')
            def test_1(self):
                pass

        result = unittest.TestResult()
        test = Foo("test_1")
        suite = unittest.TestSuite([test])
        suite.run(result)
        self.assertEqual(result.skipped, [(test, "testing")])
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_gc(self):
        for tp in self._types:
            if not isinstance(tp, type):
                # If tp is a factory rather than a plain type, skip
                continue

            class MySource(tp):
                pass
            class MyObject:
                pass

            # Create a reference cycle through a memoryview object
            b = MySource(tp(b'abc'))
            m = self._view(b)
            o = MyObject()
            b.m = m
            b.o = o
            wr = weakref.ref(o)
            b = m = o = None
            # The cycle must be broken
            gc.collect()
            self.assertTrue(wr() is None, wr())
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_1(self):
        try:
            from sys import getrefcount as grc
        except ImportError:
            return unittest.skip("no sys.getrefcount()")

        f = dll._testfunc_callback_i_if
        f.restype = ctypes.c_int
        f.argtypes = [ctypes.c_int, MyCallback]

        def callback(value):
            #print "called back with", value
            return value

        self.assertEqual(grc(callback), 2)
        cb = MyCallback(callback)

        self.assertTrue(grc(callback) > 2)
        result = f(-10, cb)
        self.assertEqual(result, -18)
        cb = None

        gc.collect()

        self.assertEqual(grc(callback), 2)
项目:son-emu    作者:sonata-nfv    | 项目源码 | 文件源码
def testSingleDatacenter(self):
        """
        Create a single data center and add check if its switch is up
        by using manually added hosts. Tests especially the
        data center specific addLink method.
        """
        # create network
        self.createNet(nswitches=0, ndatacenter=1, nhosts=2, ndockers=0)
        # setup links
        self.net.addLink(self.dc[0], self.h[0])
        self.net.addLink(self.h[1], self.dc[0])
        # start Mininet network
        self.startNet()
        # check number of running nodes
        self.assertTrue(len(self.getContainernetContainers()) == 0)
        self.assertTrue(len(self.net.hosts) == 2)
        self.assertTrue(len(self.net.switches) == 1)
        # check connectivity by using ping
        self.assertTrue(self.net.ping([self.h[0], self.h[1]]) <= 0.0)
        # stop Mininet network
        self.stopNet()

    #@unittest.skip("disabled to test if CI fails because this is the first test.")
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def skip_if_no_uuid(f):
    """Decorator to skip a test if uuid is not supported by Py/PG."""
    @wraps(f)
    def skip_if_no_uuid_(self):
        try:
            import uuid             # noqa
        except ImportError:
            return self.skipTest("uuid not available in this Python version")

        try:
            cur = self.conn.cursor()
            cur.execute("select typname from pg_type where typname = 'uuid'")
            has = cur.fetchone()
        finally:
            self.conn.rollback()

        if has:
            return f(self)
        else:
            return self.skipTest("uuid type not available on the server")

    return skip_if_no_uuid_
项目:varapp-backend-py    作者:varapp    | 项目源码 | 文件源码
def test_Xlinked_Norine_BMX(self):
        """This gene has no X-linked variant"""
        db = 'lgueneau'
        ss = samples_selection_factory(db=db,
            groups = {'affected': ['09818','09819'], 'not_affected':['09960','09961']})
        qs = Variant.objects.using(db).filter(gene_symbol__in=['BMX'], chrom='chrX')
        # Using GenotypesFilterXLinked.apply()
        variants = variants_collection_factory(db=db, qs=qs)
        var = GenotypesFilterXLinked(ss).apply(variants).variants
        self.assertEqual(len(var), 0)
        # Using FilterCollection.apply()
        fc = FiltersCollection([GenotypesFilterXLinked(ss)])
        var = fc.apply(initqs=qs, db=db).variants
        self.assertEqual(len(var), 0)


############################
#       COMPOUND HET       #
############################

#@unittest.skip('')
项目:varapp-backend-py    作者:varapp    | 项目源码 | 文件源码
def test_compound_Lucie_FNDC1(self):
        """These 2(3) variants are annotated on different transcripts of the same gene,
        and because of the group_by(transcript_id), the compound was not found.
        Check here that it is fixed.
        """
        ss = samples_selection_factory(db='test',
            groups = {'affected': ['101563','101591'], 'not_affected':['101564','101565']})
        variants = variants_collection_factory(db='test',
            qs=Variant.objects.using('test').filter(gene_symbol='FNDC1'))
        ## To select them directly:
        #variants = variants_collection_factory(
        #    Variants.objects.using('test').filter(start__in=['159653504','159653634']), db='test')
        var = GenotypesFilterCompoundHeterozygous(ss).apply(variants).variants
        self.assertEqual(len(var), 3)


############################
#       FROM REQUEST       #
############################

#@unittest.skip('')
项目:varapp-backend-py    作者:varapp    | 项目源码 | 文件源码
def test_genotypes_Nothing_filter_from_request_with_ss_from_request(self):
        """With a sound grouping passed by url."""
        samples = [('samples', 'A=09818,09819'), ('samples', 'B=09960,09961')]
        filters = [('filter', 'genotype=nothing')]
        request = RequestFactory().get('', samples + filters)
        fc = variant_filters_from_request(request, db='test')
        self.assertIsInstance(fc, FiltersCollection)
        self.assertTrue(len(fc) == len(filters))

        gf = fc['genotype']
        self.assertEqual(sorted(gf.ss.groups.keys()), ['A','B'])
        self.assertEqual(gf.name, 'genotype')
        self.assertEqual(gf.val, 'nothing')
        self.assertEqual(gf.op, '=')
        self.assertIsInstance(gf, GenotypesFilterDoNothing)

    #@unittest.skip('')
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def skip_if_no_uuid(f):
    """Decorator to skip a test if uuid is not supported by Py/PG."""
    @wraps(f)
    def skip_if_no_uuid_(self):
        try:
            import uuid
        except ImportError:
            return self.skipTest("uuid not available in this Python version")

        try:
            cur = self.conn.cursor()
            cur.execute("select typname from pg_type where typname = 'uuid'")
            has = cur.fetchone()
        finally:
            self.conn.rollback()

        if has:
            return f(self)
        else:
            return self.skipTest("uuid type not available on the server")

    return skip_if_no_uuid_
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def skip_if_no_uuid(f):
    """Decorator to skip a test if uuid is not supported by Py/PG."""
    @wraps(f)
    def skip_if_no_uuid_(self):
        try:
            import uuid
        except ImportError:
            return self.skipTest("uuid not available in this Python version")

        try:
            cur = self.conn.cursor()
            cur.execute("select typname from pg_type where typname = 'uuid'")
            has = cur.fetchone()
        finally:
            self.conn.rollback()

        if has:
            return f(self)
        else:
            return self.skipTest("uuid type not available on the server")

    return skip_if_no_uuid_
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def skip_if_no_uuid(f):
    """Decorator to skip a test if uuid is not supported by Py/PG."""
    @wraps(f)
    def skip_if_no_uuid_(self):
        try:
            import uuid
        except ImportError:
            return self.skipTest("uuid not available in this Python version")

        try:
            cur = self.conn.cursor()
            cur.execute("select typname from pg_type where typname = 'uuid'")
            has = cur.fetchone()
        finally:
            self.conn.rollback()

        if has:
            return f(self)
        else:
            return self.skipTest("uuid type not available on the server")

    return skip_if_no_uuid_
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def skip_if_no_uuid(f):
    """Decorator to skip a test if uuid is not supported by Py/PG."""
    @wraps(f)
    def skip_if_no_uuid_(self):
        try:
            import uuid
        except ImportError:
            return self.skipTest("uuid not available in this Python version")

        try:
            cur = self.conn.cursor()
            cur.execute("select typname from pg_type where typname = 'uuid'")
            has = cur.fetchone()
        finally:
            self.conn.rollback()

        if has:
            return f(self)
        else:
            return self.skipTest("uuid type not available on the server")

    return skip_if_no_uuid_
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def skip_if_no_uuid(f):
    """Decorator to skip a test if uuid is not supported by Py/PG."""
    @wraps(f)
    def skip_if_no_uuid_(self):
        try:
            import uuid
        except ImportError:
            return self.skipTest("uuid not available in this Python version")

        try:
            cur = self.conn.cursor()
            cur.execute("select typname from pg_type where typname = 'uuid'")
            has = cur.fetchone()
        finally:
            self.conn.rollback()

        if has:
            return f(self)
        else:
            return self.skipTest("uuid type not available on the server")

    return skip_if_no_uuid_
项目:userbase-sns-lambda    作者:fartashh    | 项目源码 | 文件源码
def skip_if_no_uuid(f):
    """Decorator to skip a test if uuid is not supported by Py/PG."""
    @wraps(f)
    def skip_if_no_uuid_(self):
        try:
            import uuid
        except ImportError:
            return self.skipTest("uuid not available in this Python version")

        try:
            cur = self.conn.cursor()
            cur.execute("select typname from pg_type where typname = 'uuid'")
            has = cur.fetchone()
        finally:
            self.conn.rollback()

        if has:
            return f(self)
        else:
            return self.skipTest("uuid type not available on the server")

    return skip_if_no_uuid_