Python nose.tools 模块,assert_equals() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用nose.tools.assert_equals()

项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_package_update(self):
        idf.plugin.create_country_codes()
        helpers.call_action(
            'package_create', name='test_package',
            custom_text='this is my custom text', country_code='uk',
            resources=[{
                'url': 'http://test.com/',
                'custom_resource_text': 'my custom resource',
            }])
        result = helpers.call_action(
            'package_update',
            name='test_package',
            custom_text='this is my updated text',
            country_code='ie',
            resources=[{
                'url': 'http://test.com/',
                'custom_resource_text': 'updated custom resource',
            }]
        )
        nt.assert_equals('this is my updated text', result['custom_text'])
        nt.assert_equals([u'ie'], result['country_code'])
        nt.assert_equals('updated custom resource',
                         result['resources'][0]['custom_resource_text'])
项目:xFlow    作者:dsouzajude    | 项目源码 | 文件源码
def test_returns_no_tracking_when_no_execution_id_is_found(self):
        self.engine.cwlogs.get_log_events.side_effect = CloudWatchStreamDoesNotExist()
        workflow_state = {e: core.STATE_UNKNOWN for e in self.workflow_events}
        execution_id = "transaction-id-123"

        expected = {
            "events_defined": workflow_state,
            "events_received": [],
            "tracking_summary": {
                "last_received_event": None,
                "subscribers": [],
                "execution_path": self.engine \
                                      ._generate_execution_path(workflow_state)
            }
        }
        actual = self.engine.track(self.workflow_id, self.execution_id)
        nt.assert_equals(expected, actual)
项目:xFlow    作者:dsouzajude    | 项目源码 | 文件源码
def test_successfully_gets_log_events(self):
        mocked_timestamp = 1476826208 * 1000
        mocked_message = '{"foo": "bar"}'
        mocked_events = {
            "events": [
                {
                    "timestamp": mocked_timestamp,
                    "message": mocked_message
                }
            ],
            "nextForwardToken": None
        }
        self.logs.cwlogs.get_log_events.return_value = mocked_events
        expected = [{
            "timestamp": utils.format_datetime(datetime.fromtimestamp(mocked_timestamp / 1000)),
            "data": json.loads(mocked_message)
        }]
        actual = self.logs.get_log_events(self.log_group, self.log_stream)
        nt.assert_equals(expected, actual)
项目:tidml    作者:tidchile    | 项目源码 | 文件源码
def test_simple_engine():
    engine = Engine({
        'datasource': {
            'class': TestDataSource,
            'params': {
                'csv': test_data_file,
            },
        },
        'algorithm': {
            'class': TestSimpleAlgorithm,
            'params': {
                'model.pickle': '~/.tidml/tests/model.pkl',  # default built-in
            },
        },
    })
    engine.train()
    models = engine.load_models()
    prediction = engine.predict(models, 3)
    nt.assert_equals(prediction, 6)
项目:tidml    作者:tidchile    | 项目源码 | 文件源码
def test_multiple_algorithms_engine():
    engine = Engine({
        'datasource': TestEmptyDataSource,
        'algorithms': {
            'algo1': {
                'class': TestMultiAlgorithm,
                'params': {'p': 'A'},
            },
            'algo2': {
                'class': TestMultiAlgorithm,
                'params': {'p': 'B'},
            },
        },
        'serving': TestIdentityServing,
    })
    models = {
        'algo1': object(),
        'algo2': object(),
    }
    nt.assert_equals(engine.predict(models, None), {
        'algo1': 'A',
        'algo2': 'B',
    })
项目:isf    作者:w3h    | 项目源码 | 文件源码
def test_ipv4_async():
    global FLAG
    FLAG = Value('i', 0)
    nma = nmap.PortScannerAsync()

    def callback_result(host, scan_result):
        global FLAG
        FLAG.value = 1

    nma.scan(hosts='127.0.0.1',
             arguments='-p 22 -Pn',
             callback=callback_result)

    while nma.still_scanning():
        nma.wait(2)

    assert_equals(FLAG.value, 1)
项目:isf    作者:w3h    | 项目源码 | 文件源码
def test_ipv6_async():
    global FLAG
    FLAG = Value('i', 0)
    nma = nmap.PortScannerAsync()

    def callback_result(host, scan_result):
        global FLAG
        FLAG.value = 1

    nma.scan(hosts='::1',
             arguments='-6 -p 22 -Pn',
             callback=callback_result)

    while nma.still_scanning():
        nma.wait(2)

    assert_equals(FLAG.value, 1)
项目:isf    作者:w3h    | 项目源码 | 文件源码
def test_multipe_osmatch():
    assert('osmatch' in nm['127.0.0.1'])
    assert('portused' in nm['127.0.0.1'])

    for osm in nm['127.0.0.1']['osmatch']:
        assert('accuracy' in osm)
        assert('line' in osm)
        assert('name' in osm)
        assert('osclass' in osm)
        assert('accuracy' in osm['osclass'][0])
        assert('cpe' in osm['osclass'][0])
        assert('osfamily' in osm['osclass'][0])
        assert('osgen' in osm['osclass'][0])
        assert('type' in osm['osclass'][0])
        assert('vendor' in osm['osclass'][0])


# def test_host_and_port_as_unicode():
#     # nosetests -x -s nmap/test_nmap.py:test_port_as_unicode
#     # Covers bug : https://bitbucket.org/xael/python-nmap/issues/9/can-not-pass-ports-with-unicode-string-at
#     nma = nm.scan(hosts=u'127.0.0.1', ports=u'22')
#     assert_equals(nma['nmap']['scaninfo']['error'], '')
项目:Python-Gantt    作者:asisrai23    | 项目源码 | 文件源码
def test_Resources():
    rANO = gantt.Resource('ANO')
    rANO.add_vacations(
        dfrom=datetime.date(2015, 2, 2), 
        dto=datetime.date(2015, 2, 4) 
        )
    # test global vacations
    assert_equals(rANO.is_available(datetime.date(2015, 1, 1)), False)
    # test resource vacations
    assert_equals(rANO.is_available(datetime.date(2015, 2, 1)), True)
    assert_equals(rANO.is_available(datetime.date(2015, 2, 2)), False)
    assert_equals(rANO.is_available(datetime.date(2015, 2, 3)), False)
    assert_equals(rANO.is_available(datetime.date(2015, 2, 3)), False)
    assert_equals(rANO.is_available(datetime.date(2015, 2, 5)), True)

    # Second resource
    rJLS = gantt.Resource('JLS')
    return
项目:Repobot    作者:Desgard    | 项目源码 | 文件源码
def test_latex_completions():
    from IPython.core.latex_symbols import latex_symbols
    import random
    ip = get_ipython()
    # Test some random unicode symbols
    keys = random.sample(latex_symbols.keys(), 10)
    for k in keys:
        text, matches = ip.complete(k)
        nt.assert_equal(len(matches),1)
        nt.assert_equal(text, k)
        nt.assert_equal(matches[0], latex_symbols[k])
    # Test a more complex line
    text, matches = ip.complete(u'print(\\alpha')
    nt.assert_equals(text, u'\\alpha')
    nt.assert_equals(matches[0], latex_symbols['\\alpha'])
    # Test multiple matching latex symbols
    text, matches = ip.complete(u'\\al')
    nt.assert_in('\\alpha', matches)
    nt.assert_in('\\aleph', matches)
项目:birdseye-server    作者:DramaticLiberty    | 项目源码 | 文件源码
def test_image_to_obs(self, mock_ps, mock_dl):
        session = jobs.db_session()
        session.query(bm.Observation).delete()
        session.commit()
        obs = session.query(bm.Observation).all()
        nt.assert_equals(obs, [])

        teardown_singleton_pubsub()
        mock_dl.return_value = [(7.0, 'mockingbird')]

        jobs.image_to_observation(self.file_path_gps, self.file_path)

        obs = session.query(bm.Observation).all()

        # TODO: test better what's in this obs
        nt.assert_equals(len(obs), 1)

        mock_dl.assert_called_once_with(self.file_path)
        mock_ps.assert_called_once_with()
        # as long as we return the repr of geometry, we cannot do this assert:
        # mock_ps().publish.assert_called_once_with(obs[0].as_public_dict())
        nt.assert_equals(mock_ps().publish.call_count, 1)
项目:bayip    作者:recall704    | 项目源码 | 文件源码
def test_ipv4_async():
    global FLAG
    FLAG = Value('i', 0)
    nma = nmap.PortScannerAsync()

    def callback_result(host, scan_result):
        global FLAG
        FLAG.value = 1

    nma.scan(hosts='127.0.0.1',
             arguments='-p 22 -Pn',
             callback=callback_result)

    while nma.still_scanning():
        nma.wait(2)

    assert_equals(FLAG.value, 1)
项目:bayip    作者:recall704    | 项目源码 | 文件源码
def test_ipv6_async():
    global FLAG_ipv6
    FLAG_ipv6 = Value('i', 0)
    nma_ipv6 = nmap.PortScannerAsync()

    def callback_result(host, scan_result):
        global FLAG_ipv6
        FLAG_ipv6.value = 1

    nma_ipv6.scan(hosts='::1',
             arguments='-6 -p 22 -Pn',
             callback=callback_result)

    while nma_ipv6.still_scanning():
        nma_ipv6.wait(2)

    assert_equals(FLAG_ipv6.value, 1)
项目:bayip    作者:recall704    | 项目源码 | 文件源码
def test_ipv6_async():
    global FLAG_ipv6
    FLAG_ipv6 = Value('i', 0)
    nma_ipv6 = nmap.PortScannerAsync()

    def callback_result(host, scan_result):
        global FLAG_ipv6
        FLAG_ipv6.value = 1

    nma_ipv6.scan(hosts='::1',
             arguments='-6 -p 22 -Pn',
             callback=callback_result)

    while nma_ipv6.still_scanning():
        nma_ipv6.wait(2)

    assert_equals(FLAG_ipv6.value, 1)
项目:ovirt-system-tests    作者:oVirt    | 项目源码 | 文件源码
def prepare_migration_attachments_ipv6(api):
    engine = api.system_service()
    hosts_service = engine.hosts_service()

    for index, host in enumerate(
            test_utils.hosts_in_cluster_v4(engine, CLUSTER_NAME),
            start=1):
        host_service = hosts_service.host_service(id=host.id)

        ip_address = MIGRATION_NETWORK_IPv6_ADDR.format(index)

        ip_configuration = network_utils_v4.create_static_ip_configuration(
            ipv6_addr=ip_address,
            ipv6_mask=MIGRATION_NETWORK_IPv6_MASK)

        network_utils_v4.modify_ip_config(
            engine, host_service, MIGRATION_NETWORK, ip_configuration)

        actual_address = next(nic for nic in host_service.nics_service().list()
                              if nic.name == VLAN200_IF_NAME).ipv6.address
        nt.assert_equals(IPAddress(actual_address), IPAddress(ip_address))
项目:ovirt-system-tests    作者:oVirt    | 项目源码 | 文件源码
def attach_vm_network_to_host_static_config(api):
    host = test_utils.hosts_in_cluster_v3(api, CLUSTER_NAME)[0]
    ip_configuration = network_utils_v3.create_static_ip_configuration(
        VM_NETWORK_IPv4_ADDR,
        VM_NETWORK_IPv4_MASK,
        VM_NETWORK_IPv6_ADDR,
        VM_NETWORK_IPv6_MASK)

    network_utils_v3.attach_network_to_host(
        api,
        host,
        NIC_NAME,
        VM_NETWORK,
        ip_configuration)

    # TODO: currently ost uses v3 SDK that doesn't report ipv6. once available,
    # verify ipv6 as well.
    nt.assert_equals(
        host.nics.list(name=VLAN_IF_NAME)[0].ip.address,
        VM_NETWORK_IPv4_ADDR)
项目:ovirt-system-tests    作者:oVirt    | 项目源码 | 文件源码
def prepare_migration_attachments_ipv4(api):
    for index, host in enumerate(
            test_utils.hosts_in_cluster_v3(api, CLUSTER_NAME),
            start=1):
        ip_address = MIGRATION_NETWORK_IPv4_ADDR.format(index)

        ip_configuration = network_utils_v3.create_static_ip_configuration(
            ipv4_addr=ip_address,
            ipv4_mask=MIGRATION_NETWORK_IPv4_MASK)

        network_utils_v3.attach_network_to_host(
            api, host, NIC_NAME, MIGRATION_NETWORK, ip_configuration)

        nt.assert_equals(
            host.nics.list(name=VLAN200_IF_NAME)[0].ip.address,
            ip_address)
项目:ovirt-system-tests    作者:oVirt    | 项目源码 | 文件源码
def prepare_migration_attachments_ipv4(api):
    engine = api.system_service()
    hosts_service = engine.hosts_service()

    for index, host in enumerate(
            test_utils.hosts_in_cluster_v4(engine, CLUSTER_NAME),
            start=1):
        host_service = hosts_service.host_service(id=host.id)

        ip_address = MIGRATION_NETWORK_IPv4_ADDR.format(index)

        ip_configuration = network_utils_v4.create_static_ip_configuration(
            ipv4_addr=ip_address,
            ipv4_mask=MIGRATION_NETWORK_IPv4_MASK)

        network_utils_v4.attach_network_to_host(
            host_service, NIC_NAME, MIGRATION_NETWORK, ip_configuration)

        actual_address = next(nic for nic in host_service.nics_service().list()
                              if nic.name == VLAN200_IF_NAME).ip.address
        nt.assert_equals(IPAddress(actual_address), IPAddress(ip_address))
项目:blender    作者:gastrodia    | 项目源码 | 文件源码
def test_latex_completions():
    from IPython.core.latex_symbols import latex_symbols
    import random
    ip = get_ipython()
    # Test some random unicode symbols
    keys = random.sample(latex_symbols.keys(), 10)
    for k in keys:
        text, matches = ip.complete(k)
        nt.assert_equal(len(matches),1)
        nt.assert_equal(text, k)
        nt.assert_equal(matches[0], latex_symbols[k])
    # Test a more complex line
    text, matches = ip.complete(u'print(\\alpha')
    nt.assert_equals(text, u'\\alpha')
    nt.assert_equals(matches[0], latex_symbols['\\alpha'])
    # Test multiple matching latex symbols
    text, matches = ip.complete(u'\\al')
    nt.assert_in('\\alpha', matches)
    nt.assert_in('\\aleph', matches)
项目:blender    作者:gastrodia    | 项目源码 | 文件源码
def test_property_docstring_is_in_info_for_detail_level_0():
    class A(object):
        @property
        def foobar(self):
            """This is `foobar` property."""
            pass

    ip.user_ns['a_obj'] = A()
    nt.assert_equals(
        'This is `foobar` property.',
        ip.object_inspect('a_obj.foobar', detail_level=0)['docstring'])

    ip.user_ns['a_cls'] = A
    nt.assert_equals(
        'This is `foobar` property.',
        ip.object_inspect('a_cls.foobar', detail_level=0)['docstring'])
项目:oadoi    作者:Impactstory    | 项目源码 | 文件源码
def test_open_dois(self, test_data):
        (doi, fulltext_url, license) = test_data
        my_pub = pub.lookup_product_by_doi(doi)
        my_pub.recalculate()

        logger.info(u"was looking for {}, got {}\n\n".format(fulltext_url, my_pub.fulltext_url))
        logger.info(u"doi: http://doi.org/{}".format(doi))
        logger.info(u"title: {}".format(my_pub.best_title))
        logger.info(u"evidence: {}\n\n".format(my_pub.evidence))
        if my_pub.error:
            logger.info(my_pub.error)

        assert_not_equals(my_pub.fulltext_url, None)


    # @data(*closed_dois)
    # def test_closed_dois(self, test_data):
    #     (doi, fulltext_url, license) = test_data
    #     my_pub = pub.lookup_product_by_doi(doi)
    #     my_pub.recalculate()
    #
    #     logger.info(u"was looking for {}, got {}\n\n".format(fulltext_url, my_pub.fulltext_url))
    #     logger.info(u"doi: http://doi.org/{}".format(doi))
    #     logger.info(u"title: {}".format(my_pub.best_title))
    #     logger.info(u"evidence: {}\n\n".format(my_pub.evidence))
    #     if my_pub.error:
    #         logger.info(my_pub.error)
    #
    #     assert_equals(my_pub.fulltext_url, None)
    #



# have to scrape the publisher pages to find these
项目:nettool    作者:heyglen    | 项目源码 | 文件源码
def test_initialization(self):
        assert_equals(Acl('test').name, 'test')
项目:nettool    作者:heyglen    | 项目源码 | 文件源码
def test_initialization_default(self):
        assert_equals(Acl().name, Acl._default_name)
项目:nettool    作者:heyglen    | 项目源码 | 文件源码
def test_add(self):
        acl = Acl()
        acl.add(Ace())
        assert_equals(len(acl), 1)
        assert_equals(acl[-1]._line_number, 1)
        acl.add(Ace())
        assert_equals(len(acl), 1)
        assert_equals(acl[-1]._line_number, 1)
        acl.add(Ace(logging=2))
        assert_equals(acl[-1]._line_number, 2)
项目:nettool    作者:heyglen    | 项目源码 | 文件源码
def test_remove(self):
        acl = Acl()
        acl.add(Ace())
        acl.remove(Ace())
        assert_equals(len(acl), 0)
项目:nettool    作者:heyglen    | 项目源码 | 文件源码
def test_getitem(self):
        acl = Acl()
        ace = Ace()
        acl.add(ace)
        assert_equals(id(acl[0]), id(ace))
项目:nettool    作者:heyglen    | 项目源码 | 文件源码
def test_eqaulity(self):
        assert_equals(Acl(), Acl())
        acl01 = Acl()
        acl01.add(Ace(logging=2))
        acl02 = Acl()
        acl02.add(Ace(logging=2))
        assert_equals(acl01, acl02)
项目:nettool    作者:heyglen    | 项目源码 | 文件源码
def test_repr(self):
        expected = '<Acl test01 #0>'
        assert_equals(Acl('test01').__repr__(), expected)
项目:nettool    作者:heyglen    | 项目源码 | 文件源码
def test_str(self):
        acl = Acl('test01')
        expected = 'Acl test01 #0'
        assert_equals(acl.__str__(), expected)
        acl.add(Ace(permit=False, network='1.2.3.0/24 4.5.6.0/24'))
        expected = 'Acl test01 #1\n\tdeny ip 1.2.3.0/24 4.5.6.0/24'
        assert_equals(acl.__str__(), expected)
项目:ReGraph    作者:eugeniashurko    | 项目源码 | 文件源码
def test1(self):
        """r2 does not verify the formula."""
        assert_equals(self.hierarchy.check_all_ancestors("g2"),
                      {'g1':
                       {'f1': "['r2']"}})

    # def test2(self):
    #     """after adding an edge all the node are valid"""
    #     self.hie2.add_edge("r2", "a1")
    #     assert_equals(self.hie2.check(),
    #                   {'g1': {'or(not cnt(Region),<1<=Adj>cnt(Agent))': "[]"}})
项目:dcos-autoscaler    作者:julienstroheker    | 项目源码 | 文件源码
def test_filter_get_health_to_get_only_stateless_nodes(self, MockCluster):
        # Mock
        cluster = MockCluster()
        payload = {}
        metrics = {"totalCPU": 0, "totalMEM": 0,
                   "usedCPU": 0, "usedMEM": 0,
                   "ratioCPU": 0, "ratioMEM": 0,
                   "nbNodes": 0}
        with open('./test/mockupDown.json') as json_data:
            payload = json.load(json_data)
        cluster.get_health.return_value = payload
        # Act
        response = cluster.filter_stateless(metrics, cluster.get_health())
        # Assert
        #assert_equals(response['nbNodes'], 4)
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_package_create(self):
        result = helpers.call_action('package_create', name='test_package',
                                     custom_text='this is my custom text')
        nt.assert_equals('this is my custom text', result['custom_text'])
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_package_update(self):
        helpers.call_action('package_create', name='test_package',
                            custom_text='this is my custom text')
        result = helpers.call_action('package_update', name='test_package',
                                     custom_text='this is my updated text')
        nt.assert_equals('this is my updated text', result['custom_text'])
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_package_show(self):
        helpers.call_action('package_create', name='test_package',
                            custom_text='this is my custom text')
        result = helpers.call_action('package_show', name_or_id='test_package')
        nt.assert_equals('this is my custom text', result['custom_text'])
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_package_create(self):
        idf.plugin_v4.create_country_codes()
        result = helpers.call_action('package_create', name='test_package',
                                     custom_text='this is my custom text',
                                     country_code='uk')
        nt.assert_equals('this is my custom text', result['custom_text'])
        nt.assert_equals([u'uk'], result['country_code'])
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_package_update(self):
        idf.plugin_v4.create_country_codes()
        helpers.call_action('package_create', name='test_package',
                            custom_text='this is my custom text',
                            country_code='uk')
        result = helpers.call_action('package_update', name='test_package',
                                     custom_text='this is my updated text',
                                     country_code='ie')
        nt.assert_equals('this is my updated text', result['custom_text'])
        nt.assert_equals([u'ie'], result['country_code'])
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_package_show(self):
        idf.plugin.create_country_codes()
        helpers.call_action(
            'package_create', name='test_package',
            custom_text='this is my custom text', country_code='uk',
            resources=[{
                'url': 'http://test.com/',
                'custom_resource_text': 'my custom resource',
            }]
        )
        result = helpers.call_action('package_show', name_or_id='test_package')
        nt.assert_equals('my custom resource',
                         result['resources'][0]['custom_resource_text'])
        nt.assert_equals('my custom resource',
                         result['resources'][0]['custom_resource_text'])
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_upgrade_from_sha_with_wrong_password_fails_to_upgrade(self):
        user = factories.User()
        password = u'testpassword'
        user_obj = model.User.by_name(user['name'])

        old_hash = self._set_password(password)
        user_obj._password = old_hash
        user_obj.save()

        nt.assert_false(user_obj.validate_password('wrongpass'))
        nt.assert_equals(old_hash, user_obj.password)
        nt.assert_false(pbkdf2_sha512.identify(user_obj.password))
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_upgrade_from_pbkdf2_with_less_rounds(self):
        '''set up a pbkdf key with less than the default rounds

        If the number of default_rounds is increased in a later version of
        passlib, ckan should upgrade the password hashes for people without
        involvement from users'''
        user = factories.User()
        password = u'testpassword'
        user_obj = model.User.by_name(user['name'])

        # setup hash with salt/rounds less than the default
        old_hash = pbkdf2_sha512.encrypt(password, salt_size=2, rounds=10)
        user_obj._password = old_hash
        user_obj.save()

        nt.assert_true(user_obj.validate_password(password.encode('utf-8')))
        # check that the hash has been updated
        nt.assert_not_equals(old_hash, user_obj.password)
        new_hash = pbkdf2_sha512.from_string(user_obj.password)

        nt.assert_true(pbkdf2_sha512.default_rounds > 10)
        nt.assert_equals(pbkdf2_sha512.default_rounds, new_hash.rounds)

        nt.assert_true(pbkdf2_sha512.default_salt_size, 2)
        nt.assert_equals(pbkdf2_sha512.default_salt_size,
                         len(new_hash.salt))
        nt.assert_true(pbkdf2_sha512.verify(password, user_obj.password))
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def test_upgrade_from_pbkdf2_fails_with_wrong_password(self):
        user = factories.User()
        password = u'testpassword'
        user_obj = model.User.by_name(user['name'])

        # setup hash with salt/rounds less than the default

        old_hash = pbkdf2_sha512.encrypt(password, salt_size=2, rounds=10)
        user_obj._password = old_hash
        user_obj.save()

        nt.assert_false(user_obj.validate_password('wrong_pass'))
        # check that the hash has _not_ been updated
        nt.assert_equals(old_hash, user_obj.password)
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def assert_inode_equal(self, a, b):
        nt.assert_equals(os.stat(a).st_ino, os.stat(b).st_ino,
                         "%r and %r do not reference the same indoes" %(a, b))
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def assert_content_equal(self, a, b):
        with open(a) as a_f:
            with open(b) as b_f:
                nt.assert_equals(a_f.read(), b_f.read())
项目:xFlow    作者:dsouzajude    | 项目源码 | 文件源码
def test_config_successfully_validates(self):
        ''' Test config is validated for a correct config '''
        config_path = config_dir + "/valid.yaml"
        Engine.validate_config(config_path)
        nt.assert_equals(True, True)
项目:xFlow    作者:dsouzajude    | 项目源码 | 文件源码
def test_lambdas_are_setup(self):
        ''' Tests that aws lambdas are created or updated
        from lambdas defined in the config '''
        lambda_names = [l['name'] for l in self.test_config['lambdas'] or []]
        expected = {name: ANY for name in lambda_names}
        actual = self.engine.setup_lambdas()
        nt.assert_equals(expected, actual)
项目:xFlow    作者:dsouzajude    | 项目源码 | 文件源码
def test_streams_and_subscriptions_are_setup(self):
        ''' Tests that streams are created and lambdas
        are subscribed successfully to these streams '''
        lambda_mappings = self.engine.setup_lambdas()
        subscriptions = self.test_config['subscriptions'] or []
        stream_names = [s['event'] for s in subscriptions]
        subscribers = []
        for ss in subscriptions:
            for sb in ss['subscribers'] or []:
                subscribers.append(sb)
        expected = {name: ANY for name in stream_names}
        actual = self.engine.setup_streams_and_subscriptions(lambda_mappings)
        nt.assert_equals(expected, actual)
        nt.assert_equals(len(stream_names), self.engine.kinesis.get_or_create_stream.call_count)
        nt.assert_equals(len(subscribers), self.engine.awslambda.subscribe_to_stream.call_count)
项目:xFlow    作者:dsouzajude    | 项目源码 | 文件源码
def test_tracker_is_setup(self, generate_code_mock, write_mock):
        ''' Tests that the tracker lambda is created along with its
        log group and that it is subscribed to all streams in the workflow '''
        workflow_id = "test_workflow"
        stream_arns = [
            "arn:aws:kinesis:eu-west-1:xxxxxxxxxxxx:stream/TestEvent1",
            "arn:aws:kinesis:eu-west-1:xxxxxxxxxxxx:stream/TestEvent2",
            "arn:aws:kinesis:eu-west-1:xxxxxxxxxxxx:stream/TestEvent3"
        ]
        self.engine.setup_tracker(workflow_id, stream_arns)
        nt.assert_equals(1, self.engine.awslambda.create_or_update_function.call_count)
        nt.assert_equals(len(stream_arns), self.engine.awslambda.subscribe_to_stream.call_count)
        nt.assert_equals(1, self.engine.cwlogs.create_log_group.call_count)
项目:xFlow    作者:dsouzajude    | 项目源码 | 文件源码
def test_workflows_are_setup(self, generate_code_mock, write_mock):
        ''' Tests that all workflows as defined in the config are created along
        with their trackers '''
        lambda_mappings = self.engine.setup_lambdas()
        stream_mappings = self.engine.setup_streams_and_subscriptions(lambda_mappings)
        workflows = self.test_config.get('workflows') or []
        num_workflows = len(workflows)
        self.engine.setup_workflows(stream_mappings)
        num_log_groups_created = self.engine.cwlogs.create_log_group.call_count
        nt.assert_equals(num_workflows, num_log_groups_created)
项目:xFlow    作者:dsouzajude    | 项目源码 | 文件源码
def test_publish_is_successful(self, cwlogs_mock, kinesis_mock, lambda_mock):
        ''' Test data is successfully published to a stream '''
        config_path = config_dir + "/valid.yaml"
        engine = Engine(config_path)
        engine.publish("test_stream", "test_data")
        num_publishes = engine.kinesis.publish.call_count
        nt.assert_equals(1, num_publishes)
项目:xFlow    作者:dsouzajude    | 项目源码 | 文件源码
def test_workflow_successfully_tracks_on_successful_execution(self):
        # Mock so that all events defined are received (and therefore logged)
        mocked_logged_events = [
            {
                "timestamp": "2016-10-09T23:11:00Z",
                "data": {
                    "event_name": e,
                    "execution_id": self.execution_id
                }

            } for e in self.workflow_events
        ]
        self.engine.cwlogs.get_log_events.return_value = mocked_logged_events

        expected_workflow_state = {e: core.STATE_RECEIVED for e in self.workflow_events}
        expected_last_received_event = self.workflow_events[len(self.workflow_events)-1]
        expected_subscribers = [ss['subscribers'] for ss in self.test_config['subscriptions'] \
                if ss['event'] == expected_last_received_event][0]
        expected = {
            "events_defined": expected_workflow_state,
            "events_received": mocked_logged_events,
            "tracking_summary": {
                "last_received_event": expected_last_received_event,
                "subscribers": expected_subscribers,
                "execution_path": self.engine \
                                      ._generate_execution_path(expected_workflow_state)
            }
        }
        actual = self.engine.track(self.workflow_id, self.execution_id)
        nt.assert_equals(expected, actual)
项目:xFlow    作者:dsouzajude    | 项目源码 | 文件源码
def test_workflow_successfully_tracks_on_failed_execution(self):
        # Mock so that all events defined except the last one
        # are received (and therefore logged)
        mocked_logged_events = [
            {
                "timestamp": "2016-10-09T23:11:00Z",
                "data": {
                    "event_name": e,
                    "execution_id": self.execution_id
                }

            } for e in self.workflow_events[:-1] # exclude last event
        ]
        self.engine.cwlogs.get_log_events.return_value = mocked_logged_events

        expected_workflow_state = {e: core.STATE_RECEIVED for e in self.workflow_events[:-1]}
        expected_workflow_state[self.workflow_events[-1:][0]] = core.STATE_UNKNOWN
        expected_last_received_event = self.workflow_events[len(self.workflow_events[:-1])-1]
        expected_subscribers = [ss['subscribers'] for ss in self.test_config['subscriptions'] \
                if ss['event'] == expected_last_received_event][0]
        expected = {
            "events_defined": expected_workflow_state,
            "events_received": mocked_logged_events,
            "tracking_summary": {
                "last_received_event": expected_last_received_event,
                "subscribers": expected_subscribers,
                "execution_path": self.engine \
                                      ._generate_execution_path(expected_workflow_state)
            }
        }
        actual = self.engine.track(self.workflow_id, self.execution_id)
        nt.assert_equals(expected, actual)