Python mock 模块,MagicMock() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用mock.MagicMock()

项目:shub-image    作者:scrapinghub    | 项目源码 | 文件源码
def test_cli_with_login_username_only(self, mocked_method):
        mocked = mock.MagicMock()
        mocked.login.return_value = {"Status": "Login Succeeded"}
        mocked.push.return_value = [
            {"stream": "In process"},
            {"status": "Successfully pushed"}]
        mocked_method.return_value = mocked
        with FakeProjectDirectory() as tmpdir:
            add_sh_fake_config(tmpdir)
            runner = CliRunner()
            result = runner.invoke(
                cli, ["dev", "--version", "test", "--apikey", "apikey"])
            assert result.exit_code == 0
            mocked.login.assert_called_with(
                email=None, password=' ',
                reauth=False, registry='registry', username='apikey')
            mocked.push.assert_called_with(
                'registry/user/project:test', decode=True,
                insecure_registry=False, stream=True)
项目:monasca-transform    作者:openstack    | 项目源码 | 文件源码
def test_transform_to_recordstore(self):
        # simply verify that the transform method is called first, then
        # rdd to recordstore
        kafka_stream = MagicMock(name='kafka_stream')
        transformed_stream = MagicMock(name='transformed_stream')
        kafka_stream.transform.return_value = transformed_stream
        MonMetricsKafkaProcessor.transform_to_recordstore(
            kafka_stream)

        # TODO(someone) figure out why these asserts now fail
        # transformed_stream_expected = call.foreachRDD(
        #     MonMetricsKafkaProcessor.rdd_to_recordstore
        # ).call_list()
        # kafka_stream_expected = call.transform(
        #     MonMetricsKafkaProcessor.store_offset_ranges
        # ).call_list()
        # self.assertEqual(kafka_stream_expected, kafka_stream.mock_calls)
        # self.assertEqual(transformed_stream_expected,
        #                  transformed_stream.mock_calls)
项目:monasca-transform    作者:openstack    | 项目源码 | 文件源码
def test_transform_service_heartbeat(self, coordinator):

        # mock coordinator
        fake_kazoo_driver = MagicMock(name="MagicKazooDriver",
                                      spec=KazooDriver)
        coordinator.return_value = fake_kazoo_driver

        # option1
        serv_thread = transform_service.TransformService()
        serv_thread.daemon = True
        serv_thread.start()
        time.sleep(2)

        # option2
        # mocks dont seem to work when spawning a service
        # pid = _spawn_transform_service()
        # time.sleep()
        # os.kill(pid, signal.SIGNAL_SIGTERM)

        fake_kazoo_driver.heartbeat.assert_called_with()
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_make_vif_subnets(self, m_mk_fixed_ip, m_make_vif_subnet):
        subnet_id = mock.sentinel.subnet_id
        ip_address = mock.sentinel.ip_address
        fixed_ip = mock.sentinel.fixed_ip
        subnet = mock.Mock()
        subnets = mock.MagicMock()
        subnets.__contains__.return_value = True
        m_mk_fixed_ip.return_value = fixed_ip
        m_make_vif_subnet.return_value = subnet
        port = {'fixed_ips': [
            {'subnet_id': subnet_id, 'ip_address': ip_address}]}

        self.assertEqual([subnet], ovu._make_vif_subnets(port, subnets))
        m_make_vif_subnet.assert_called_once_with(subnets, subnet_id)
        m_mk_fixed_ip.assert_called_once_with(address=ip_address)
        subnet.ips.objects.append.assert_called_once_with(fixed_ip)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_get_parent_port(self):
        cls = nested_vif.NestedPodVIFDriver
        m_driver = mock.Mock(spec=cls)
        neutron = self.useFixture(k_fix.MockNeutronClient()).client

        node_fixed_ip = mock.sentinel.node_fixed_ip
        pod_status = mock.MagicMock()
        pod_status.__getitem__.return_value = node_fixed_ip

        pod = mock.MagicMock()
        pod.__getitem__.return_value = pod_status
        parent_port = mock.sentinel.parent_port

        m_driver._get_parent_port_by_host_ip.return_value = parent_port

        cls._get_parent_port(m_driver, neutron, pod)
        m_driver._get_parent_port_by_host_ip.assert_called_once()
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_get_subnet(self, m_osv_subnet, m_osv_network):
        neutron = self.useFixture(k_fix.MockNeutronClient()).client

        subnet = mock.MagicMock()
        network = mock.MagicMock()
        subnet_id = mock.sentinel.subnet_id
        network_id = mock.sentinel.network_id

        neutron_subnet = {'network_id': network_id}
        neutron_network = mock.sentinel.neutron_network

        neutron.show_subnet.return_value = {'subnet': neutron_subnet}
        neutron.show_network.return_value = {'network': neutron_network}

        m_osv_subnet.return_value = subnet
        m_osv_network.return_value = network

        ret = default_subnet._get_subnet(subnet_id)

        self.assertEqual(network, ret)
        neutron.show_subnet.assert_called_once_with(subnet_id)
        neutron.show_network.assert_called_once_with(network_id)
        m_osv_subnet.assert_called_once_with(neutron_subnet)
        m_osv_network.assert_called_once_with(neutron_network)
        network.subnets.objects.append.assert_called_once_with(subnet)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_get_in_use_vlan_ids_set(self):
        cls = nested_vlan_vif.NestedVlanPodVIFDriver
        m_driver = mock.Mock(spec=cls)
        neutron = self.useFixture(k_fix.MockNeutronClient()).client

        vlan_ids = set()
        trunk_id = mock.sentinel.trunk_id
        vlan_ids.add('100')

        port = mock.MagicMock()
        port.__getitem__.return_value = '100'
        trunk_obj = mock.MagicMock()
        trunk_obj.__getitem__.return_value = [port]
        trunk = mock.MagicMock()
        trunk.__getitem__.return_value = trunk_obj
        neutron.show_trunk.return_value = trunk
        self.assertEqual(vlan_ids,
                         cls._get_in_use_vlan_ids_set(m_driver, trunk_id))
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_request_vif(self):
        cls = vif_pool.BaseVIFPool
        m_driver = mock.MagicMock(spec=cls)

        pod = get_pod_obj()
        project_id = mock.sentinel.project_id
        subnets = mock.sentinel.subnets
        security_groups = [mock.sentinel.security_groups]
        vif = mock.sentinel.vif

        m_driver._get_port_from_pool.return_value = vif
        oslo_cfg.CONF.set_override('ports_pool_min',
                                   5,
                                   group='vif_pool')
        pool_length = 5
        m_driver._get_pool_size.return_value = pool_length

        self.assertEqual(vif, cls.request_vif(m_driver, pod, project_id,
                                              subnets, security_groups))
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_request_vif_empty_pool(self, m_eventlet):
        cls = vif_pool.BaseVIFPool
        m_driver = mock.MagicMock(spec=cls)

        host_addr = mock.sentinel.host_addr
        pod_status = mock.MagicMock()
        pod_status.__getitem__.return_value = host_addr
        pod = mock.MagicMock()
        pod.__getitem__.return_value = pod_status
        project_id = mock.sentinel.project_id
        subnets = mock.sentinel.subnets
        security_groups = [mock.sentinel.security_groups]
        m_driver._get_port_from_pool.side_effect = (
            exceptions.ResourceNotReady(pod))

        self.assertRaises(exceptions.ResourceNotReady, cls.request_vif,
                          m_driver, pod, project_id, subnets, security_groups)
        m_eventlet.assert_called_once()
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test__get_in_use_ports(self):
        cls = vif_pool.BaseVIFPool
        m_driver = mock.MagicMock(spec=cls)

        kubernetes = self.useFixture(k_fix.MockK8sClient()).client
        pod = get_pod_obj()
        port_id = 'f2c1b73a-6a0c-4dca-b986-0d07d09e0c02'
        versioned_object = jsonutils.dumps({
            'versioned_object.data': {
                'active': True,
                'address': 'fa:16:3e:ef:e6:9f',
                'id': port_id
            }})

        pod['metadata']['annotations'][constants.K8S_ANNOTATION_VIF] = (
            versioned_object)
        items = [pod]
        kubernetes.get.return_value = {'items': items}

        resp = cls._get_in_use_ports(m_driver)

        self.assertEqual(resp, [port_id])
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test__return_ports_to_pool_no_update(self, max_pool, m_sleep):
        cls = vif_pool.NeutronVIFPool
        m_driver = mock.MagicMock(spec=cls)
        neutron = self.useFixture(k_fix.MockNeutronClient()).client

        pool_key = ('node_ip', 'project_id', tuple(['security_group']))
        port_id = mock.sentinel.port_id
        pool_length = 5

        m_driver._recyclable_ports = {port_id: pool_key}
        m_driver._available_ports_pools = {}
        oslo_cfg.CONF.set_override('ports_pool_max',
                                   max_pool,
                                   group='vif_pool')
        oslo_cfg.CONF.set_override('port_debug',
                                   False,
                                   group='kubernetes')
        m_driver._get_ports_by_attrs.return_value = [
            {'id': port_id, 'security_groups': ['security_group']}]
        m_driver._get_pool_size.return_value = pool_length

        self.assertRaises(SystemExit, cls._return_ports_to_pool, m_driver)

        neutron.update_port.assert_not_called()
        neutron.delete_port.assert_not_called()
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test__return_ports_to_pool_delete_port(self, m_sleep):
        cls = vif_pool.NeutronVIFPool
        m_driver = mock.MagicMock(spec=cls)
        neutron = self.useFixture(k_fix.MockNeutronClient()).client

        pool_key = ('node_ip', 'project_id', tuple(['security_group']))
        port_id = mock.sentinel.port_id
        pool_length = 10
        vif = mock.sentinel.vif

        m_driver._recyclable_ports = {port_id: pool_key}
        m_driver._available_ports_pools = {}
        m_driver._existing_vifs = {port_id: vif}
        oslo_cfg.CONF.set_override('ports_pool_max',
                                   10,
                                   group='vif_pool')
        m_driver._get_ports_by_attrs.return_value = [
            {'id': port_id, 'security_groups': ['security_group_modified']}]
        m_driver._get_pool_size.return_value = pool_length

        self.assertRaises(SystemExit, cls._return_ports_to_pool, m_driver)

        neutron.update_port.assert_not_called()
        neutron.delete_port.assert_called_once_with(port_id)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test__return_ports_to_pool_delete_exception(self, m_sleep):
        cls = vif_pool.NeutronVIFPool
        m_driver = mock.MagicMock(spec=cls)
        neutron = self.useFixture(k_fix.MockNeutronClient()).client

        pool_key = ('node_ip', 'project_id', tuple(['security_group']))
        port_id = mock.sentinel.port_id
        pool_length = 10
        vif = mock.sentinel.vif

        m_driver._recyclable_ports = {port_id: pool_key}
        m_driver._available_ports_pools = {}
        m_driver._existing_vifs = {port_id: vif}
        oslo_cfg.CONF.set_override('ports_pool_max',
                                   5,
                                   group='vif_pool')
        m_driver._get_ports_by_attrs.return_value = [
            {'id': port_id, 'security_groups': ['security_group_modified']}]
        m_driver._get_pool_size.return_value = pool_length
        neutron.delete_port.side_effect = n_exc.PortNotFoundClient

        self.assertRaises(SystemExit, cls._return_ports_to_pool, m_driver)

        neutron.update_port.assert_not_called()
        neutron.delete_port.assert_called_once_with(port_id)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test__return_ports_to_pool_delete_key_error(self, m_sleep):
        cls = vif_pool.NeutronVIFPool
        m_driver = mock.MagicMock(spec=cls)
        neutron = self.useFixture(k_fix.MockNeutronClient()).client

        pool_key = ('node_ip', 'project_id', tuple(['security_group']))
        port_id = mock.sentinel.port_id
        pool_length = 10

        m_driver._recyclable_ports = {port_id: pool_key}
        m_driver._available_ports_pools = {}
        m_driver._existing_vifs = {}
        oslo_cfg.CONF.set_override('ports_pool_max',
                                   5,
                                   group='vif_pool')
        m_driver._get_ports_by_attrs.return_value = [
            {'id': port_id, 'security_groups': ['security_group_modified']}]
        m_driver._get_pool_size.return_value = pool_length

        self.assertRaises(SystemExit, cls._return_ports_to_pool, m_driver)

        neutron.update_port.assert_not_called()
        neutron.delete_port.assert_not_called()
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test__recover_precreated_ports_empty(self, m_get_subnet, m_to_osvif):
        cls = vif_pool.NeutronVIFPool
        m_driver = mock.MagicMock(spec=cls)

        filtered_ports = []
        m_driver._get_ports_by_attrs.return_value = filtered_ports

        oslo_cfg.CONF.set_override('port_debug',
                                   False,
                                   group='kubernetes')

        cls._recover_precreated_ports(m_driver)

        m_driver._get_ports_by_attrs.assert_called_once()
        m_get_subnet.assert_not_called()
        m_to_osvif.assert_not_called()
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test__return_ports_to_pool_no_update(self, max_pool, m_sleep):
        cls = vif_pool.NestedVIFPool
        m_driver = mock.MagicMock(spec=cls)
        neutron = self.useFixture(k_fix.MockNeutronClient()).client

        pool_key = ('node_ip', 'project_id', tuple(['security_group']))
        port_id = mock.sentinel.port_id
        pool_length = 5

        m_driver._recyclable_ports = {port_id: pool_key}
        m_driver._available_ports_pools = {}
        oslo_cfg.CONF.set_override('ports_pool_max',
                                   max_pool,
                                   group='vif_pool')
        oslo_cfg.CONF.set_override('port_debug',
                                   False,
                                   group='kubernetes')
        m_driver._get_ports_by_attrs.return_value = [
            {'id': port_id, 'security_groups': ['security_group']}]
        m_driver._get_pool_size.return_value = pool_length

        self.assertRaises(SystemExit, cls._return_ports_to_pool, m_driver)

        neutron.update_port.assert_not_called()
        neutron.delete_port.assert_not_called()
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_release_vif_parent_not_found(self):
        cls = nested_macvlan_vif.NestedMacvlanPodVIFDriver
        m_driver = mock.Mock(spec=cls)
        neutron = self.useFixture(k_fix.MockNeutronClient()).client

        port_id = lib_utils.get_hash()
        pod = mock.sentinel.pod
        vif = mock.Mock()
        vif.id = port_id

        container_mac = mock.sentinel.mac_address
        container_ip = mock.sentinel.ip_address
        container_port = self._get_fake_port(port_id, container_ip,
                                             container_mac)
        neutron.show_port.return_value = container_port

        m_driver.lock = mock.MagicMock(spec=threading.Lock())
        m_driver._get_parent_port.side_effect = n_exc.NeutronClientException

        self.assertRaises(n_exc.NeutronClientException, cls.release_vif,
                          m_driver, pod, vif)
        neutron.show_port.assert_called_once_with(port_id)
        m_driver._get_parent_port.assert_called_once_with(neutron, pod)
        m_driver._remove_from_allowed_address_pairs.assert_not_called()
        neutron.delete_port.assert_not_called()
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_has_port_changes(self):
        m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler)
        m_service = mock.MagicMock()
        m_handler._get_service_ports.return_value = [
            {'port': 1, 'name': 'X', 'protocol': 'TCP'},
        ]

        m_lbaas_spec = mock.MagicMock()
        m_lbaas_spec.ports = [
            obj_lbaas.LBaaSPortSpec(name='X', protocol='TCP', port=1),
            obj_lbaas.LBaaSPortSpec(name='Y', protocol='TCP', port=2),
        ]

        ret = h_lbaas.LBaaSSpecHandler._has_port_changes(
            m_handler, m_service, m_lbaas_spec)

        self.assertTrue(ret)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_has_port_changes__no_changes(self):
        m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler)
        m_service = mock.MagicMock()
        m_handler._get_service_ports.return_value = [
            {'port': 1, 'name': 'X', 'protocol': 'TCP'},
            {'port': 2, 'name': 'Y', 'protocol': 'TCP'}
        ]

        m_lbaas_spec = mock.MagicMock()
        m_lbaas_spec.ports = [
            obj_lbaas.LBaaSPortSpec(name='X', protocol='TCP', port=1),
            obj_lbaas.LBaaSPortSpec(name='Y', protocol='TCP', port=2),
        ]

        ret = h_lbaas.LBaaSSpecHandler._has_port_changes(
            m_handler, m_service, m_lbaas_spec)

        self.assertFalse(ret)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_annotate(self, m_patch, m_count):
        m_count.return_value = list(range(1, 5))
        path = '/test'
        annotations = {'a1': 'v1', 'a2': 'v2'}
        resource_version = "123"
        ret = {'metadata': {'annotations': annotations,
                            "resourceVersion": resource_version}}
        data = jsonutils.dumps(ret, sort_keys=True)

        m_resp = mock.MagicMock()
        m_resp.ok = True
        m_resp.json.return_value = ret
        m_patch.return_value = m_resp

        self.assertEqual(annotations, self.client.annotate(
            path, annotations, resource_version=resource_version))
        m_patch.assert_called_once_with(self.base_url + path,
                                        data=data, headers=mock.ANY,
                                        cert=(None, None), verify=False)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_watch(self, m_get):
        path = '/test'
        data = [{'obj': 'obj%s' % i} for i in range(3)]
        lines = [jsonutils.dumps(i) for i in data]

        m_resp = mock.MagicMock()
        m_resp.ok = True
        m_resp.iter_lines.return_value = lines
        m_get.return_value = m_resp

        cycles = 3
        self.assertEqual(
            data * cycles,
            list(itertools.islice(self.client.watch(path),
                                  len(data) * cycles)))

        self.assertEqual(cycles, m_get.call_count)
        self.assertEqual(cycles, m_resp.close.call_count)
        m_get.assert_called_with(self.base_url + path, headers={}, stream=True,
                                 params={'watch': 'true'}, cert=(None, None),
                                 verify=False)
项目:service-fabric-cli    作者:Azure    | 项目源码 | 文件源码
def upload_to_fileshare_test(self): #pylint: disable=no-self-use
        """Upload copies files to non-native store correctly with no
        progress"""
        import shutil
        import tempfile
        temp_file = tempfile.NamedTemporaryFile(dir=tempfile.mkdtemp())
        temp_src_dir = os.path.dirname(temp_file.name)
        temp_dst_dir = tempfile.mkdtemp()
        shutil_mock = MagicMock()
        shutil_mock.copyfile.return_value = None
        with patch('sfctl.custom_app.shutil', new=shutil_mock):
            sf_c.upload_to_fileshare(temp_src_dir, temp_dst_dir, False)
            shutil_mock.copyfile.assert_called_once()
        temp_file.close()
        shutil.rmtree(os.path.dirname(temp_file.name))
        shutil.rmtree(temp_dst_dir)
项目:ironic-staging-drivers    作者:openstack    | 项目源码 | 文件源码
def test__power_status_ironic_exception_index_error(self, mock_get_conn):
        mock_connection = mock.MagicMock(spec_set=['get_relays'])
        side_effect = IndexError("Gotcha!")
        mock_connection.get_relays.side_effect = side_effect

        mock_get_conn.return_value = mock_connection
        node = obj_utils.create_test_node(
            self.context,
            driver='fake_iboot_fake',
            driver_info=INFO_DICT)
        info = iboot_power._parse_driver_info(node)
        status = iboot_power._power_status(info)
        self.assertEqual(states.ERROR, status)

        mock_get_conn.assert_called_once_with(info)
        mock_connection.get_relays.assert_called_once_with()
项目:ironic-staging-drivers    作者:openstack    | 项目源码 | 文件源码
def test__power_status_retries(self, mock_get_conn):
        self.config(max_retry=1, group='iboot')

        mock_connection = mock.MagicMock(spec_set=['get_relays'])
        side_effect = TypeError("Surprise!")
        mock_connection.get_relays.side_effect = side_effect

        mock_get_conn.return_value = mock_connection
        node = obj_utils.create_test_node(
            self.context,
            driver='fake_iboot_fake',
            driver_info=INFO_DICT)
        info = iboot_power._parse_driver_info(node)

        status = iboot_power._power_status(info)
        self.assertEqual(states.ERROR, status)
        mock_get_conn.assert_called_once_with(info)
        self.assertEqual(2, mock_connection.get_relays.call_count)
项目:ironic-staging-drivers    作者:openstack    | 项目源码 | 文件源码
def test_reboot_good(self, mock_switch, mock_power_status,
                         mock_sleep_switch):
        self.config(reboot_delay=3, group='iboot')
        manager = mock.MagicMock(spec_set=['switch', 'sleep'])
        mock_power_status.return_value = states.POWER_ON

        manager.attach_mock(mock_switch, 'switch')
        manager.attach_mock(mock_sleep_switch, 'sleep')
        expected = [mock.call.switch(self.info, False),
                    mock.call.sleep(3),
                    mock.call.switch(self.info, True)]

        with task_manager.acquire(self.context, self.node.uuid) as task:
            task.driver.power.reboot(task)

        self.assertEqual(expected, manager.mock_calls)
项目:ironic-staging-drivers    作者:openstack    | 项目源码 | 文件源码
def test_reboot_bad(self, mock_switch, mock_power_status,
                        mock_sleep_switch):
        self.config(reboot_delay=3, group='iboot')
        manager = mock.MagicMock(spec_set=['switch', 'sleep'])
        mock_power_status.return_value = states.POWER_OFF

        manager.attach_mock(mock_switch, 'switch')
        manager.attach_mock(mock_sleep_switch, 'sleep')
        expected = [mock.call.switch(self.info, False),
                    mock.call.sleep(3),
                    mock.call.switch(self.info, True)]

        with task_manager.acquire(self.context, self.node.uuid) as task:
            self.assertRaises(ironic_exception.PowerStateFailure,
                              task.driver.power.reboot, task)

        self.assertEqual(expected, manager.mock_calls)
项目:ironic-staging-drivers    作者:openstack    | 项目源码 | 文件源码
def test__execute_nm_command(self, addr_mock, raw_mock):
        addr_mock.return_value = ('0x0A', '0x0B')
        raw_mock.return_value = ('0x03 0x04', '')
        fake_data = {'foo': 'bar'}
        fake_command = mock.MagicMock()
        fake_parse = mock.MagicMock()
        fake_command.return_value = ('0x01', '0x02')
        with task_manager.acquire(self.context, self.node.uuid,
                                  shared=False) as task:
            nm_vendor._execute_nm_command(task, fake_data, fake_command,
                                          fake_parse)
            self.assertEqual('single', task.node.driver_info['ipmi_bridging'])
            self.assertEqual('0x0A',
                             task.node.driver_info['ipmi_target_channel'])
            self.assertEqual('0x0B',
                             task.node.driver_info['ipmi_target_address'])
            fake_command.assert_called_once_with(fake_data)
            raw_mock.assert_called_once_with(task, '0x01 0x02')
            fake_parse.assert_called_once_with(['0x03', '0x04'])
项目:networking-huawei    作者:openstack    | 项目源码 | 文件源码
def test_bind_port(self):

        self.vif_details = {'ovs_hybrid_plug': True}
        network = mock.MagicMock(spec=api.NetworkContext)

        port_context = mock.MagicMock(
            spec=ctx.PortContext, current={'id': 'CURRENT_CONTEXT_ID'},
            segments_to_bind=[self.valid_segment],
            network=network)

        # when port is bound
        self.bind_port(port_context)

        # port_context.
        # then context binding is setup with returned vif_type and valid
        # segment api ID
        port_context.set_binding.assert_called_once_with(
            self.valid_segment[api.ID], 'ovs', self.vif_details)
项目:BioDownloader    作者:biomadeira    | 项目源码 | 文件源码
def test_fetch_from_url_or_retry_post_json(self):
        # mocked requests
        identifier = "1csb, 2pah"
        base_url = c.http_pdbe
        endpoint_url = "api/pdb/entry/summary/"
        response = response_mocker(kwargs={}, base_url=base_url,
                                   endpoint_url=endpoint_url,
                                   content_type='application/octet-stream',
                                   post=True, data=identifier)
        self.fetch_from_url_or_retry = MagicMock(return_value=response)
        url = base_url + endpoint_url + identifier
        r = self.fetch_from_url_or_retry(url, json=True, post=True, data=identifier,
                                         header={'application/octet-stream'},
                                         retry_in=None, wait=0,
                                         n_retries=10, stream=False).json()
        self.assertEqual(r, json.loads('{"data": "some json formatted output"}'))
项目:bitcoin-trading-system    作者:vinicius-ronconi    | 项目源码 | 文件源码
def setUp(self):
        client = mock.MagicMock()
        bootstrap = factory.TrailingOrdersFactory().make_fake_bootstrap(self.INITIAL_SETUP)

        logging_patch = mock.patch('trading_system.systems.trailing_orders.system.logging')
        self.addCleanup(logging_patch.stop)
        logging_patch.start()

        self.system = TrailingOrders(client, bootstrap)
        self.system.get_pending_orders = mock.MagicMock(return_value=[])

        self.set_next_operation = mock.MagicMock()
        next_operation_patcher = mock.patch(
            'trading_system.systems.trailing_orders.system.TrailingOrders.set_next_operation', self.set_next_operation
        )
        self.addCleanup(next_operation_patcher.stop)
        next_operation_patcher.start()
项目:bitcoin-trading-system    作者:vinicius-ronconi    | 项目源码 | 文件源码
def setUp(self):
        client = mock.MagicMock()
        bootstrap = factory.TrailingOrdersFactory().make_fake_bootstrap(self.INITIAL_SETUP)

        logging_patch = mock.patch('trading_system.systems.trailing_orders.system.logging')
        self.addCleanup(logging_patch.stop)
        logging_patch.start()

        self.system = TrailingOrders(client, bootstrap)
        self.system.get_pending_orders = mock.MagicMock(return_value=[])

        self.set_next_operation = mock.MagicMock()
        next_operation_patcher = mock.patch(
            'trading_system.systems.trailing_orders.system.TrailingOrders.set_next_operation', self.set_next_operation
        )
        self.addCleanup(next_operation_patcher.stop)
        next_operation_patcher.start()
项目:py-secretcrypt    作者:Zemanta    | 项目源码 | 文件源码
def test_decrypt(self, mock_import_module):
        mock_crypter_module = mock.MagicMock()
        mock_crypter_module.__name__ = 'secretcrypt.mock_crypter'

        def mock_import_side_effect(*args, **kwargs):
            self.assertEqual(kwargs['package'], secretcrypt.__name__)
            if args[0] == '.mock_crypter':
                return mock_crypter_module
            raise Exception('Importing wrong module')
        mock_import_module.side_effect = mock_import_side_effect

        secret = StrictSecret('mock_crypter:key=value&key2=value2:myciphertext')
        self.assertEqual(secret._decrypt_params, dict(key='value', key2='value2'))
        self.assertEqual(secret._ciphertext, b'myciphertext')

        secret.decrypt()
        secret.decrypt()
        mock_crypter_module.decrypt.assert_called_with(
            b'myciphertext',
            key='value',
            key2='value2',
        )
项目:py-secretcrypt    作者:Zemanta    | 项目源码 | 文件源码
def test_eager_decrypt(self, mock_import_module):
        mock_crypter_module = mock.MagicMock()
        mock_crypter_module.decrypt.side_effect = lambda *args, **kwargs: b'plaintext'
        mock_crypter_module.__name__ = 'secretcrypt.mock_crypter'

        def mock_import_side_effect(*args, **kwargs):
            self.assertEqual(kwargs['package'], secretcrypt.__name__)
            if args[0] == '.mock_crypter':
                return mock_crypter_module
            raise Exception('Importing wrong module')
        mock_import_module.side_effect = mock_import_side_effect

        secret = Secret('mock_crypter:key=value&key2=value2:myciphertext')
        mock_crypter_module.decrypt.assert_called_with(
            b'myciphertext',
            key='value',
            key2='value2',
        )
        mock_crypter_module.reset_mock()
        plaintext = secret.get()
        self.assertEqual(b'plaintext', plaintext)
        mock_crypter_module.assert_not_called()
项目:skipper    作者:Stratoscale    | 项目源码 | 文件源码
def test_run_with_existing_remote_build_container(self, skipper_runner_run_mock, requests_get_mock):
        requests_response_class_mock = mock.MagicMock(spec='requests.Response')
        requests_response_mock = requests_response_class_mock.return_value
        requests_response_mock.json.return_value = {
            'name': 'my_image',
            'tags': ['latest', 'aaaaaaa', 'bbbbbbb', 'build-container-tag']
        }
        requests_response_mock.status_code = http_client.OK
        requests_get_mock.return_value = requests_response_mock

        command = ['ls', '-l']
        run_params = command
        self._invoke_cli(
            global_params=self.global_params,
            subcmd='run',
            subcmd_params=run_params
        )
        expected_image_name = 'registry.io:5000/build-container-image:build-container-tag'
        skipper_runner_run_mock.assert_called_once_with(command, fqdn_image=expected_image_name, environment=[],
                                                        interactive=False, name=None, net='host', volumes=None,
                                                        workdir=None, use_cache=False)
项目:skipper    作者:Stratoscale    | 项目源码 | 文件源码
def test_run_with_non_existing_build_container(self, requests_get_mock):
        requests_response_class_mock = mock.MagicMock(spec='requests.Response')
        requests_response_mock = requests_response_class_mock.return_value
        requests_response_mock.json.return_value = {
            'name': 'my_image',
            'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
        }

        requests_get_mock.return_value = requests_response_mock
        command = ['ls', '-l']
        run_params = command
        ret = self._invoke_cli(
            global_params=self.global_params,
            subcmd='run',
            subcmd_params=run_params
        )
        self.assertIsInstance(ret.exception, click.exceptions.ClickException)
项目:shub-image    作者:scrapinghub    | 项目源码 | 文件源码
def test_cli_with_custom_login(self, mocked_method):
        mocked = mock.MagicMock()
        mocked.login.return_value = {"Status": "Login Succeeded"}
        mocked.push.return_value = [
            {"stream": "In process"},
            {"status": "Successfully pushed"}]
        mocked_method.return_value = mocked
        with FakeProjectDirectory() as tmpdir:
            add_sh_fake_config(tmpdir)
            runner = CliRunner()
            result = runner.invoke(
                cli, ["dev", "--version", "test", "--username", "user",
                      "--password", "pass", "--email", "mail"])
            assert result.exit_code == 0
            mocked.login.assert_called_with(
                email=u'mail', password=u'pass',
                reauth=False, registry='registry', username=u'user')
            mocked.push.assert_called_with(
                'registry/user/project:test', decode=True,
                insecure_registry=False, stream=True)
项目:shub-image    作者:scrapinghub    | 项目源码 | 文件源码
def test_cli_with_insecure_registry(self, mocked_method):
        mocked = mock.MagicMock()
        mocked.login.return_value = {"Status": "Login Succeeded"}
        mocked.push.return_value = [
            {"stream": "In process"},
            {"status": "Successfully pushed"}]
        mocked_method.return_value = mocked
        with FakeProjectDirectory() as tmpdir:
            add_sh_fake_config(tmpdir)
            runner = CliRunner()
            result = runner.invoke(
                cli, ["dev", "--version", "test", "--insecure"])
            assert result.exit_code == 0
            assert not mocked.login.called
            mocked.push.assert_called_with(
                'registry/user/project:test', decode=True,
                insecure_registry=True, stream=True)
项目:shub-image    作者:scrapinghub    | 项目源码 | 文件源码
def test_cli(self, test_mock, mocked_method):
        mocked = mock.MagicMock()
        mocked.build.return_value = [
            {"stream": "all is ok"},
            {"stream": "Successfully built 12345"}]
        mocked_method.return_value = mocked
        with FakeProjectDirectory() as tmpdir:
            add_scrapy_fake_config(tmpdir)
            add_sh_fake_config(tmpdir)
            add_fake_dockerfile(tmpdir)
            setup_py_path = os.path.join(tmpdir, 'setup.py')
            assert not os.path.isfile(setup_py_path)
            runner = CliRunner()
            result = runner.invoke(cli, ["dev", "-d"])
            assert result.exit_code == 0
            mocked.build.assert_called_with(
                decode=True, path=tmpdir, tag='registry/user/project:1.0')
            assert os.path.isfile(setup_py_path)
            test_mock.assert_called_with("dev", None)
项目:shub-image    作者:scrapinghub    | 项目源码 | 文件源码
def test_cli_custom_version(self, test_mock, mocked_method):
        mocked = mock.MagicMock()
        mocked.build.return_value = [
            {"stream": "all is ok"},
            {"stream": "Successfully built 12345"}]
        mocked_method.return_value = mocked
        with FakeProjectDirectory() as tmpdir:
            add_scrapy_fake_config(tmpdir)
            add_sh_fake_config(tmpdir)
            add_fake_dockerfile(tmpdir)
            runner = CliRunner()
            result = runner.invoke(cli, ["dev", "--version", "test"])
            assert result.exit_code == 0
            mocked.build.assert_called_with(
                decode=True, path=tmpdir, tag='registry/user/project:test')
            test_mock.assert_called_with("dev", "test")
项目:networking-opencontrail    作者:openstack    | 项目源码 | 文件源码
def test_request_backend(self, options, config):
        driver = self._get_driver(options, config)
        driver._relay_request = mock.MagicMock()
        response = requests.Response()
        response.status_code = requests.codes.ok
        driver._relay_request.return_value = response

        context = mock.MagicMock()
        data_dict = {}
        obj_name = 'object'
        action = 'TEST'

        code, message = driver._request_backend(context, data_dict, obj_name,
                                                action)

        self.assertEqual(requests.codes.ok, code)
        self.assertEqual({'message': None}, message)
        driver._relay_request.assert_called()
项目:networking-opencontrail    作者:openstack    | 项目源码 | 文件源码
def test_create_resource(self, options, config):
        driver = self._get_driver(options, config)
        driver._request_backend = mock.MagicMock()
        status_code = requests.codes.ok
        context = mock.MagicMock()
        res_type = 'network'
        res_data = {
            'id': 'network-123',
            'tenant_id': ATTR_NOT_SPECIFIED,
        }
        driver._request_backend.return_value = (status_code, res_data)

        response = driver._create_resource(res_type,
                                           context,
                                           {res_type: res_data})

        self.assertEqual(response, res_data)
        driver._request_backend.assert_called_with(context, mock.ANY,
                                                   res_type, 'CREATE')
项目:networking-opencontrail    作者:openstack    | 项目源码 | 文件源码
def test_get_resource(self, options, config):
        driver = self._get_driver(options, config)
        driver._request_backend = mock.MagicMock()
        status_code = requests.codes.ok
        context = mock.MagicMock()
        res_type = 'network'
        res_data = {
            'id': 'network-123',
        }
        fields = ['id']
        driver._request_backend.return_value = (status_code, res_data)

        response = driver._get_resource(res_type,
                                        context, res_data['id'], fields)

        self.assertEqual(response, res_data)
        driver._request_backend.assert_called_with(context, mock.ANY,
                                                   res_type, 'READ')
项目:networking-opencontrail    作者:openstack    | 项目源码 | 文件源码
def test_update_resource(self, options, config):
        driver = self._get_driver(options, config)
        driver._request_backend = mock.MagicMock()
        status_code = requests.codes.ok
        context = mock.MagicMock()
        res_type = 'network'
        res_data = {
            'id': 'network-123',
        }
        driver._request_backend.return_value = (status_code, res_data)

        response = driver._update_resource(res_type, context, res_data['id'],
                                           {res_type: res_data})

        self.assertEqual(response, res_data)
        driver._request_backend.assert_called_with(context, mock.ANY,
                                                   res_type, 'UPDATE')
项目:networking-opencontrail    作者:openstack    | 项目源码 | 文件源码
def test_list_resource(self, options, config):
        driver = self._get_driver(options, config)
        driver._request_backend = mock.MagicMock()
        status_code = requests.codes.ok
        context = mock.MagicMock()
        res_type = 'network'
        res_data = {
            'id': 'network-123',
        }
        fields = ['id']
        driver._request_backend.return_value = (status_code, [res_data])

        response = driver._list_resource(res_type, context, None, fields)

        self.assertEqual(response, [res_data])
        driver._request_backend.assert_called_with(context, mock.ANY,
                                                   res_type, 'READALL')
项目:networking-opencontrail    作者:openstack    | 项目源码 | 文件源码
def test_add_router_interface(self, options, config):
        driver = self._get_driver(options, config)
        driver._request_backend = mock.MagicMock()
        status_code = requests.codes.ok
        context = mock.MagicMock()
        res_type = 'router'
        res_data = {
            'id': 'router-123',
        }
        res_id = res_data['id']
        driver._request_backend.return_value = (status_code, res_data)

        response = driver.add_router_interface(context, res_id, res_data)

        self.assertEqual(response, res_data)
        driver._request_backend.assert_called_with(context, mock.ANY,
                                                   res_type, 'ADDINTERFACE')
项目:networking-opencontrail    作者:openstack    | 项目源码 | 文件源码
def test_remove_router_interface(self, options, config):
        driver = self._get_driver(options, config)
        driver._request_backend = mock.MagicMock()
        status_code = requests.codes.ok
        context = mock.MagicMock()
        res_type = 'router'
        res_data = {
            'id': 'router-123',
        }
        res_id = res_data['id']
        driver._request_backend.return_value = (status_code, res_data)

        response = driver.remove_router_interface(context, res_id, res_data)

        self.assertEqual(response, res_data)
        driver._request_backend.assert_called_with(context, mock.ANY,
                                                   res_type, 'DELINTERFACE')
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_prepared_query_not_found(self):
        session = self.make_session()
        pool = session._pools.get.return_value
        connection = Mock(spec=Connection)
        pool.borrow_connection.return_value = (connection, 1)

        rf = self.make_response_future(session)
        rf.send_request()

        session.cluster._prepared_statements = MagicMock(dict)
        prepared_statement = session.cluster._prepared_statements.__getitem__.return_value
        prepared_statement.query_string = "SELECT * FROM foobar"
        prepared_statement.keyspace = "FooKeyspace"
        rf._connection.keyspace = "FooKeyspace"

        result = Mock(spec=PreparedQueryNotFound, info='a' * 16)
        rf._set_result(None, None, None, result)

        self.assertTrue(session.submit.call_args)
        args, kwargs = session.submit.call_args
        self.assertEqual(rf._reprepare, args[-5])
        self.assertIsInstance(args[-4], PrepareMessage)
        self.assertEqual(args[-4].query, "SELECT * FROM foobar")
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_prepared_query_not_found_bad_keyspace(self):
        session = self.make_session()
        pool = session._pools.get.return_value
        connection = Mock(spec=Connection)
        pool.borrow_connection.return_value = (connection, 1)

        rf = self.make_response_future(session)
        rf.send_request()

        session.cluster._prepared_statements = MagicMock(dict)
        prepared_statement = session.cluster._prepared_statements.__getitem__.return_value
        prepared_statement.query_string = "SELECT * FROM foobar"
        prepared_statement.keyspace = "FooKeyspace"
        rf._connection.keyspace = "BarKeyspace"

        result = Mock(spec=PreparedQueryNotFound, info='a' * 16)
        rf._set_result(None, None, None, result)
        self.assertRaises(ValueError, rf.result)
项目:automatron    作者:madflojo    | 项目源码 | 文件源码
def runTest(self, mock_template, mock_yaml, mock_isdir, mock_isfile, mock_open):
        ''' Execute test '''
        self.config = mock.MagicMock(spec_set={'runbook_path' : '/path/'})
        mock_isfile.return_value = True
        mock_isdir.return_value = True
        mock_template = mock.MagicMock(**{
            'render.return_value' : """
                '*':
                  - book
                'target':
                  - book1
                  - book2
            """
        })
        mock_yaml.return_value = {'*':['book'], 'target':['book1', 'book2']}
        mock_open.return_value = mock.MagicMock(spec=file)
        result = cache_runbooks(self.config, self.logger)
        self.assertEqual(result.keys(), ['*', 'target'], "Expected dictionary keys not found")
        self.assertTrue(mock_open.called, "open not called")
        self.assertTrue(mock_yaml.called, "yaml.safe_load not called")
        self.assertTrue(mock_isdir.called, "os.path.isdir not called")
        self.assertEqual(mock_isfile.call_count, 4,
            "mock_open.call_count {0} is not 4".format(mock_open.call_count))
项目:automatron    作者:madflojo    | 项目源码 | 文件源码
def runTest(self, mock_run, mock_put, mock_local, mock_hide, mock_env, mock_set_env):
        ''' Execute test '''
        # Set mock_env to empty dict
        mock_env = mock.MagicMock(spec={})
        mock_set_env.return_value = mock_env
        mock_local.return_value = mock.MagicMock(**{ 'succeeded': True})
        mock_run.return_value = mock.MagicMock(**{ 'succeeded': True})
        mock_put = True
        mock_hide = True
        action = {
            'type' : 'cmd',
            'execute_from' : 'remote',
            'cmd' : "bash"
        }
        results = execute_runbook(action, self.target, self.config, self.logger)
        self.assertTrue(results)
        self.assertFalse(self.logger.warn.called)
        self.assertTrue(mock_local.called)
        mock_local.assert_called_with("bash", capture=True)