Python mock 模块,patch() 实例源码

我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用mock.patch()

项目:ckanext-validation    作者:frictionlessdata    | 项目源码 | 文件源码
def test_resource_form_create_valid(self, mock_open):
        dataset = Dataset()

        app = self._get_test_app()
        env, response = _get_resource_new_page_as_sysadmin(app, dataset['id'])
        form = response.forms['resource-edit']

        upload = ('upload', 'valid.csv', VALID_CSV)

        valid_stream = io.BufferedReader(io.BytesIO(VALID_CSV))

        with mock.patch('io.open', return_value=valid_stream):

            submit_and_follow(app, form, env, 'save', upload_files=[upload])

        dataset = call_action('package_show', id=dataset['id'])

        assert_equals(dataset['resources'][0]['validation_status'], 'success')
        assert 'validation_timestamp' in dataset['resources'][0]
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def _test_should_sleep(self, seconds_left, slept):
        attempt = 5
        timeout = 20
        interval = 3
        randint = 2
        deadline = self.now + seconds_left
        retry = h_retry.Retry(mock.Mock(), timeout=timeout, interval=interval)

        with mock.patch('random.randint') as m_randint, \
                mock.patch('time.sleep') as m_sleep:
            m_randint.return_value = randint

            ret = retry._sleep(deadline, attempt, _EX2())

            self.assertEqual(slept, ret)
            m_randint.assert_called_once_with(1, 2 ** attempt - 1)
            m_sleep.assert_called_once_with(slept)
项目:service-fabric-cli    作者:Azure    | 项目源码 | 文件源码
def upload_to_fileshare_test(self): #pylint: disable=no-self-use
        """Upload copies files to non-native store correctly with no
        progress"""
        import shutil
        import tempfile
        temp_file = tempfile.NamedTemporaryFile(dir=tempfile.mkdtemp())
        temp_src_dir = os.path.dirname(temp_file.name)
        temp_dst_dir = tempfile.mkdtemp()
        shutil_mock = MagicMock()
        shutil_mock.copyfile.return_value = None
        with patch('sfctl.custom_app.shutil', new=shutil_mock):
            sf_c.upload_to_fileshare(temp_src_dir, temp_dst_dir, False)
            shutil_mock.copyfile.assert_called_once()
        temp_file.close()
        shutil.rmtree(os.path.dirname(temp_file.name))
        shutil.rmtree(temp_dst_dir)
项目:ironic-staging-drivers    作者:openstack    | 项目源码 | 文件源码
def test_reboot_and_finish_deploy_force_reboot(self, power_action_mock,
                                                   get_pow_state_mock):
        d_info = self.node.driver_info
        d_info['deploy_forces_oob_reboot'] = True
        self.node.driver_info = d_info
        self.node.save()
        self.config(group='ansible',
                    post_deploy_get_power_state_retry_interval=0)
        self.node.provision_state = states.DEPLOYING
        self.node.save()

        with task_manager.acquire(self.context, self.node.uuid) as task:
            with mock.patch.object(task.driver, 'network') as net_mock:
                self.driver.reboot_and_finish_deploy(task)
                net_mock.remove_provisioning_network.assert_called_once_with(
                    task)
                net_mock.configure_tenant_networks.assert_called_once_with(
                    task)
            expected_power_calls = [((task, states.POWER_OFF),),
                                    ((task, states.POWER_ON),)]
            self.assertEqual(expected_power_calls,
                             power_action_mock.call_args_list)
        get_pow_state_mock.assert_not_called()
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_requires_usergroup_no_acc(self):
        """
            Test requires usergroup decorator if the user has no access
        """
        with patch("ownbot.auth.User") as user_mock:
            user_mock.return_value.has_access.return_value = False

            @ownbot.auth.requires_usergroup("foo")
            def my_command_handler(bot, update):
                """Dummy command handler"""
                print(bot, update)
                return True

            bot_mock = Mock(spec=Bot)
            update = self.__get_dummy_update()
            called = my_command_handler(bot_mock, update)

            self.assertIsNone(called)
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_requires_usergroup_acc(self):
        """
            Test requires usergroup decorator if the user has access
        """
        with patch("ownbot.auth.User") as user_mock,\
                patch("test_auth.Update") as update_mock:
            user_mock = user_mock.return_value
            user_mock.has_acces.return_value = True

            @ownbot.auth.requires_usergroup("foo")
            def my_command_handler(bot, update):
                """Dummy command handler"""
                print(bot, update)
                return True

            bot_mock = Mock(spec=Bot)
            update_mock = Update(1337)
            called = my_command_handler(bot_mock, update_mock)

            self.assertTrue(called)
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_requires_usergroup_self(self):
        """
            Test requires usergroup decorator with self as first argument.
        """
        with patch("ownbot.auth.User") as user_mock,\
                patch("test_auth.Update") as update_mock:
            user_mock = user_mock.return_value
            user_mock.has_acces.return_value = True

            @ownbot.auth.requires_usergroup("foo")
            def my_command_handler(self, bot, update):
                """Dummy command handler"""
                print(self, bot, update)
                return True

            bot_mock = Mock(spec=Bot)
            update_mock = Update(1337)
            called = my_command_handler(None, bot_mock, update_mock)

            self.assertTrue(called)
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_assign_first_to(self):
        """
            Test assign first to decorator.
        """
        with patch("ownbot.auth.User") as user_mock,\
                patch("test_auth.Update") as update_mock,\
                patch("ownbot.auth.UserManager") as usrmgr_mock:

            user_mock = user_mock.return_value
            usrmgr_mock.return_value.group_is_empty.return_value = True

            @ownbot.auth.assign_first_to("foo")
            def my_command_handler(bot, update):
                """Dummy command handler"""
                print(bot, update)

            bot_mock = Mock(spec=Bot)
            update_mock = Update(1337)
            my_command_handler(bot_mock, update_mock)

            self.assertTrue(usrmgr_mock.return_value.group_is_empty.called)
            self.assertTrue(user_mock.save.called)
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_assign_first_to_with_self(self):
        """
            Test assign first to decorator with self as first argument.
        """
        with patch("ownbot.auth.User") as user_mock,\
                patch("test_auth.Update") as update_mock,\
                patch("ownbot.auth.UserManager") as usrmgr_mock:

            user_mock = user_mock.return_value
            usrmgr_mock.return_value.group_is_empty.return_value = True

            @ownbot.auth.assign_first_to("foo")
            def my_command_handler(self, bot, update):
                """Dummy command handler"""
                print(self, bot, update)

            bot_mock = Mock(spec=Bot)
            update_mock = Update(1337)
            my_command_handler(None, bot_mock, update_mock)

            self.assertTrue(usrmgr_mock.return_value.group_is_empty.called)
            self.assertTrue(user_mock.save.called)
项目:networking-huawei    作者:openstack    | 项目源码 | 文件源码
def test_default_security_group_rest_callback(self):
        try:
            kwargs = test_sg_create
            with mock.patch.object(self.securityGroupDb,
                                   'get_security_group',
                                   return_value=security_group):
                resp = self._mock_req_resp(requests.codes.all_good)
                with mock.patch('requests.request', return_value=resp):
                    self.secGroupSub.\
                        create_security_group(None, None, None, **kwargs)
                resp = self._mock_req_resp(requests.codes.no_content)
                with mock.patch('requests.request', return_value=resp):
                    self.secGroupSub.\
                        create_security_group(None, None, None, **kwargs)
                resp = self._mock_req_resp(requests.codes.not_implemented)
                with mock.patch('requests.request', return_value=resp):
                    self.secGroupSub.\
                        create_security_group(None, None, None, **kwargs)

        except Exception:
            pass
项目:ckanext-validation    作者:frictionlessdata    | 项目源码 | 文件源码
def test_resource_form_create_invalid(self, mock_open):
        dataset = Dataset()

        app = self._get_test_app()
        env, response = _get_resource_new_page_as_sysadmin(app, dataset['id'])
        form = response.forms['resource-edit']

        upload = ('upload', 'invalid.csv', INVALID_CSV)

        invalid_stream = io.BufferedReader(io.BytesIO(INVALID_CSV))

        with mock.patch('io.open', return_value=invalid_stream):

            response = webtest_submit(
                form, 'save', upload_files=[upload], extra_environ=env)

        assert_in('validation', response.body)
        assert_in('missing-value', response.body)
        assert_in('Row 2 has a missing value in column 4', response.body)
项目:ckanext-validation    作者:frictionlessdata    | 项目源码 | 文件源码
def test_resource_form_update_valid(self, mock_open):

        dataset = Dataset(resources=[
            {
                'url': 'https://example.com/data.csv'
            }
        ])

        app = self._get_test_app()
        env, response = _get_resource_update_page_as_sysadmin(
            app, dataset['id'], dataset['resources'][0]['id'])
        form = response.forms['resource-edit']

        upload = ('upload', 'valid.csv', VALID_CSV)

        valid_stream = io.BufferedReader(io.BytesIO(VALID_CSV))

        with mock.patch('io.open', return_value=valid_stream):

            submit_and_follow(app, form, env, 'save', upload_files=[upload])

        dataset = call_action('package_show', id=dataset['id'])

        assert_equals(dataset['resources'][0]['validation_status'], 'success')
        assert 'validation_timestamp' in dataset['resources'][0]
项目:ckanext-validation    作者:frictionlessdata    | 项目源码 | 文件源码
def test_resource_form_update_invalid(self, mock_open):

        dataset = Dataset(resources=[
            {
                'url': 'https://example.com/data.csv'
            }
        ])

        app = self._get_test_app()
        env, response = _get_resource_update_page_as_sysadmin(
            app, dataset['id'], dataset['resources'][0]['id'])
        form = response.forms['resource-edit']

        upload = ('upload', 'invalid.csv', INVALID_CSV)

        invalid_stream = io.BufferedReader(io.BytesIO(INVALID_CSV))

        with mock.patch('io.open', return_value=invalid_stream):

            response = webtest_submit(
                form, 'save', upload_files=[upload], extra_environ=env)

        assert_in('validation', response.body)
        assert_in('missing-value', response.body)
        assert_in('Row 2 has a missing value in column 4', response.body)
项目:ckanext-validation    作者:frictionlessdata    | 项目源码 | 文件源码
def test_delete_file_not_deleted_if_resources_first(self, mock_open):

        resource_id = str(uuid.uuid4())
        path = '/doesnt_exist/resources/{}'.format(resource_id)

        patcher = fake_filesystem_unittest.Patcher()
        patcher.setUp()
        patcher.fs.CreateFile(path)

        assert os.path.exists(path)
        with mock.patch('ckanext.validation.utils.get_local_upload_path',
                        return_value=path):
            delete_local_uploaded_file(resource_id)

        assert not os.path.exists(path)
        assert os.path.exists('/doesnt_exist/resources')

        patcher.tearDown()
项目:ckanext-validation    作者:frictionlessdata    | 项目源码 | 文件源码
def test_delete_passes_if_os_exeception(self, mock_open):

        resource_id = str(uuid.uuid4())
        path = '/doesnt_exist/resources/{}/{}/{}'.format(
            resource_id[0:3], resource_id[3:6], resource_id[6:]
        )

        patcher = fake_filesystem_unittest.Patcher()
        patcher.setUp()
        patcher.fs.CreateFile(path)

        assert os.path.exists(path)
        with mock.patch('ckanext.validation.utils.os.remove',
                        side_effect=OSError):

            delete_local_uploaded_file(resource_id)


        patcher.tearDown()
项目:ckanext-validation    作者:frictionlessdata    | 项目源码 | 文件源码
def test_validation_fails_on_upload(self, mock_open):

        invalid_file = StringIO.StringIO()
        invalid_file.write(INVALID_CSV)

        mock_upload = MockFieldStorage(invalid_file, 'invalid.csv')

        dataset = factories.Dataset()

        invalid_stream = io.BufferedReader(io.BytesIO(INVALID_CSV))

        with mock.patch('io.open', return_value=invalid_stream):

            with assert_raises(t.ValidationError) as e:

                    call_action(
                        'resource_create',
                        package_id=dataset['id'],
                        format='CSV',
                        upload=mock_upload
                    )

        assert 'validation' in e.exception.error_dict
        assert 'missing-value' in str(e.exception)
        assert 'Row 2 has a missing value in column 4' in str(e.exception)
项目:ckanext-validation    作者:frictionlessdata    | 项目源码 | 文件源码
def test_validation_passes_on_upload(self, mock_open):

        invalid_file = StringIO.StringIO()
        invalid_file.write(VALID_CSV)

        mock_upload = MockFieldStorage(invalid_file, 'invalid.csv')

        dataset = factories.Dataset()

        valid_stream = io.BufferedReader(io.BytesIO(VALID_CSV))

        with mock.patch('io.open', return_value=valid_stream):

            resource = call_action(
                'resource_create',
                package_id=dataset['id'],
                format='CSV',
                upload=mock_upload
            )

        assert_equals(resource['validation_status'], 'success')
        assert 'validation_timestamp' in resource
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def test_get_swift_hash_env(self, mock_config, mock_service_name):
        mock_config.return_value = None
        mock_service_name.return_value = "testsvc"
        tmpfile = tempfile.mktemp()
        swift_context.SWIFT_HASH_FILE = tmpfile
        with mock.patch('lib.swift_context.os.environ.get') as mock_env_get:
            mock_env_get.return_value = str(uuid.uuid4())
            hash_ = swift_context.get_swift_hash()
            mock_env_get.assert_has_calls([
                mock.call('JUJU_MODEL_UUID'),
                mock.call('JUJU_ENV_UUID',
                          mock_env_get.return_value)
            ])

        with open(tmpfile, 'r') as fd:
            self.assertEqual(hash_, fd.read())

        self.assertTrue(mock_config.called)
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def test_get_log_message(
        self,
        log_consumer_instance,
        publish_log_messages,
        log_message,
        log_topic
    ):
        with mock.patch(
            'yelp_kafka.discovery.get_region_cluster',
            return_value=get_config().cluster_config
        ):
            with log_consumer_instance as consumer:
                publish_log_messages(log_topic, log_message, count=1)
                asserter = ConsumerAsserter(
                    consumer=consumer,
                    expected_message=log_message
                )
                _message = consumer.get_message(blocking=True, timeout=TIMEOUT)
                asserter.assert_messages([_message], expected_count=1)
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def test_base_consumer_without_cluster_name(
        self,
        topic,
        consumer_init_kwargs
    ):
        with mock.patch(
            'yelp_kafka.discovery.get_kafka_cluster'
        ) as mock_get_kafka_cluster, mock.patch(
            'kafka_utils.util.config.ClusterConfig.__init__',
            return_value=None
        ) as mock_cluster_config_init:
            consumer = BaseConsumer(
                topic_to_consumer_topic_state_map={topic: None},
                auto_offset_reset='largest',
                **consumer_init_kwargs
            )
            consumer._region_cluster_config
            assert mock_get_kafka_cluster.call_count == 0
            config = get_config()
            mock_cluster_config_init.assert_called_once_with(
                type='standard',
                name='data_pipeline',
                broker_list=config.kafka_broker_list,
                zookeeper=config.kafka_zookeeper
            )
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def test_setup_connections(
        self,
        base_path,
        refresh_batch,
        cluster
    ):
        with mock.patch(
            base_path + '.TransactionManager'
        ) as mock_manager, mock.patch.object(
            refresh_batch,
            'get_connection_set_from_cluster'
        ) as mock_get_conn:
            refresh_batch.setup_connections()
            mock_manager.assert_called_once_with(
                cluster_name=cluster,
                ro_replica_name=cluster,
                rw_replica_name=cluster,
                connection_set_getter=mock_get_conn
            )
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def test_after_row_processing(self, refresh_batch, write_session, rw_conn):
        with mock.patch.object(
            refresh_batch,
            'throttle_to_replication'
        ) as throttle_mock, mock.patch.object(
            refresh_batch,
            '_wait_for_throughput',
            return_value=None
        ) as mock_wait:
            # count can be anything since self.avg_throughput_cap is set to None
            refresh_batch.unlock_tables(write_session)
            refresh_batch.throttle_throughput(count=0)

        assert write_session.rollback.call_count == 1
        write_session.execute.assert_called_once_with('UNLOCK TABLES')
        assert write_session.commit.call_count == 1
        throttle_mock.assert_called_once_with(rw_conn)
        assert mock_wait.call_count == 1
        assert refresh_batch.avg_rows_per_second_cap == refresh_batch.DEFAULT_AVG_ROWS_PER_SECOND_CAP
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def test_create_table_from_src_table(
        self,
        refresh_batch,
        fake_original_table,
        fake_new_table,
        show_table_query,
        write_session
    ):
        with mock.patch.object(
            refresh_batch,
            '_execute_query',
            autospec=True
        ) as mock_execute:
            mock_execute.return_value.fetchone.return_value = [
                'test_db',
                fake_original_table
            ]
            refresh_batch.create_table_from_src_table(write_session)
            calls = [
                mock.call(write_session, show_table_query),
                mock.call(write_session, fake_new_table)
            ]
            mock_execute.assert_has_calls(calls, any_order=True)
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def test_log_error_on_exception(self, message):
        with mock.patch.object(
            data_pipeline._clog_writer.clog,
            'log_line',
            side_effect=RandomException()
        ) as mock_log_line, mock.patch.object(
            data_pipeline._clog_writer,
            'logger'
        ) as mock_logger:
            writer = ClogWriter()
            writer.publish(message)

        call_args = "Failed to scribe message - {}".format(str(message))

        assert mock_log_line.called
        assert mock_logger.error.call_args_list[0] == mock.call(call_args)
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def test_meteorite_on_off(
        self,
        create_message,
        registered_schema,
        producer,
        enable_meteorite,
        expected_call_count
    ):
        with mock.patch.object(
            data_pipeline.tools.meteorite_wrappers.StatsCounter,
            'process',
            autospec=True
        ) as mock_stats_counter:
            producer.enable_meteorite = enable_meteorite
            m = create_message(registered_schema, timeslot=1.0)
            producer.publish(m)
            assert mock_stats_counter.call_count == expected_call_count
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def test_sensu_process_called_once_inside_window(
        self,
        create_message,
        registered_schema,
        producer,
        message_count
    ):
        with mock.patch.object(
            data_pipeline.tools.sensu_ttl_alerter.SensuTTLAlerter,
            'process',
            autospec=True,
            return_value=None
        ) as mock_sensu_ttl_process:
            producer.enable_sensu = True
            m1 = create_message(registered_schema, timeslot=1.0)
            for i in range(message_count):
                producer.publish(m1)
            assert mock_sensu_ttl_process.call_count == 1
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def test_ensure_messages_published_fails_when_overpublished(
        self, topic, messages, producer, topic_offsets
    ):
        for message in messages:
            producer.publish(message)
        producer.flush()

        with pytest.raises(
            PublicationUnensurableError
        ), mock.patch.object(
            data_pipeline.producer,
            'logger'
        ) as mock_logger:
            producer.ensure_messages_published(messages[:2], topic_offsets)

            self._assert_logged_info_correct(
                mock_logger,
                len(messages),
                topic,
                topic_offsets,
                message_count=len(messages[:2])
            )
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def test_publish_fails_after_retry(self, message, producer):
        # TODO(DATAPIPE-606|clin) investigate better way than mocking response
        with mock.patch.object(
            producer._kafka_producer.kafka_client,
            'send_produce_request',
            side_effect=[FailedPayloadsError]
        ) as mock_send_request, capture_new_messages(
            message.topic
        ) as get_messages, pytest.raises(
            MaxRetryError
        ):
            orig_topic_to_offset_map = self.get_orig_topic_to_offset_map(producer)
            producer.publish(message)
            producer.flush()

            messages = get_messages()
            assert len(messages) == 0
            assert mock_send_request.call_count == self.max_retry_count
            self.assert_new_topic_to_offset_map(
                producer,
                message.topic,
                orig_topic_to_offset_map,
                published_message_count=0
            )
项目:bitcoin-trading-system    作者:vinicius-ronconi    | 项目源码 | 文件源码
def setUp(self):
        client = mock.MagicMock()
        bootstrap = factory.TrailingOrdersFactory().make_fake_bootstrap(self.INITIAL_SETUP)

        logging_patch = mock.patch('trading_system.systems.trailing_orders.system.logging')
        self.addCleanup(logging_patch.stop)
        logging_patch.start()

        self.system = TrailingOrders(client, bootstrap)
        self.system.get_pending_orders = mock.MagicMock(return_value=[])

        self.set_next_operation = mock.MagicMock()
        next_operation_patcher = mock.patch(
            'trading_system.systems.trailing_orders.system.TrailingOrders.set_next_operation', self.set_next_operation
        )
        self.addCleanup(next_operation_patcher.stop)
        next_operation_patcher.start()
项目:bitcoin-trading-system    作者:vinicius-ronconi    | 项目源码 | 文件源码
def _setup_operation(self, next_operation):
        client = mock.Mock()
        bootstrap = TrailingOrdersFactory().make_fake_bootstrap(
            TrailingOrderSetup.make(
                next_operation=next_operation,
                start_value=self.START_VALUE,
                stop_value=self.STOP_VALUE,
                reversal=self.REVERSAL,
                stop_loss=self.STOP_LOSS,
                operational_cost=0.2,
                profit=0.5,
            )
        )

        logging_patch = mock.patch('trading_system.systems.trailing_orders.system.logging')
        self.addCleanup(logging_patch.stop)
        logging_patch.start()

        self.system = TrailingOrders(client, bootstrap)
项目:bitcoin-trading-system    作者:vinicius-ronconi    | 项目源码 | 文件源码
def setUp(self):
        client = mock.MagicMock()
        bootstrap = factory.TrailingOrdersFactory().make_fake_bootstrap(self.INITIAL_SETUP)

        logging_patch = mock.patch('trading_system.systems.trailing_orders.system.logging')
        self.addCleanup(logging_patch.stop)
        logging_patch.start()

        self.system = TrailingOrders(client, bootstrap)
        self.system.get_pending_orders = mock.MagicMock(return_value=[])

        self.set_next_operation = mock.MagicMock()
        next_operation_patcher = mock.patch(
            'trading_system.systems.trailing_orders.system.TrailingOrders.set_next_operation', self.set_next_operation
        )
        self.addCleanup(next_operation_patcher.stop)
        next_operation_patcher.start()
项目:skipper    作者:Stratoscale    | 项目源码 | 文件源码
def test_push(self, skipper_runner_run_mock, requests_get_mock):
        skipper_runner_run_mock.side_effect = [0, 0]
        push_params = ['my_image']
        with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
            requests_response_mock = requests_response_class_mock.return_value
            requests_response_mock.json.return_value = {
                'name': 'my_image',
                'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
            }
            requests_get_mock.return_value = requests_response_mock

        self._invoke_cli(
            global_params=self.global_params,
            subcmd='push',
            subcmd_params=push_params
        )
        expected_commands = [
            mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
项目:skipper    作者:Stratoscale    | 项目源码 | 文件源码
def test_push_already_in_registry(self, skipper_runner_run_mock, requests_get_mock):
        skipper_runner_run_mock.side_effect = [0, 0]
        push_params = ['my_image']
        with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
            requests_response_mock = requests_response_class_mock.return_value
            requests_response_mock.json.return_value = {
                'name': 'my_image',
                'tags': ['latest', 'aaaaaaa', 'bbbbbbb', "1234567"]
            }
            requests_get_mock.return_value = requests_response_mock

        self._invoke_cli(
            global_params=self.global_params,
            subcmd='push',
            subcmd_params=push_params
        )
        expected_commands = [
            mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
项目:skipper    作者:Stratoscale    | 项目源码 | 文件源码
def test_push_already_in_registry_with_force(self, skipper_runner_run_mock, requests_get_mock):
        skipper_runner_run_mock.side_effect = [0, 0]
        push_params = ['my_image', "--force"]
        with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
            requests_response_mock = requests_response_class_mock.return_value
            requests_response_mock.json.return_value = {
                'name': 'my_image',
                'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
            }
            requests_get_mock.return_value = requests_response_mock

        self._invoke_cli(
            global_params=self.global_params,
            subcmd='push',
            subcmd_params=push_params
        )
        expected_commands = [
            mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
项目:skipper    作者:Stratoscale    | 项目源码 | 文件源码
def test_push_fail(self, skipper_runner_run_mock, requests_get_mock):
        skipper_runner_run_mock.side_effect = [0, 1]
        push_params = ['my_image']
        with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
            requests_response_mock = requests_response_class_mock.return_value
            requests_response_mock.json.return_value = {
                'name': 'my_image',
                'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
            }
            requests_get_mock.return_value = requests_response_mock

        result = self._invoke_cli(
            global_params=self.global_params,
            subcmd='push',
            subcmd_params=push_params
        )
        self.assertEqual(result.exit_code, 1)
        expected_commands = [
            mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
项目:skipper    作者:Stratoscale    | 项目源码 | 文件源码
def test_push_rmi_fail(self, skipper_runner_run_mock, requests_get_mock):
        skipper_runner_run_mock.side_effect = [0, 0, 1]
        push_params = ['my_image']
        with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
            requests_response_mock = requests_response_class_mock.return_value
            requests_response_mock.json.return_value = {
                'name': 'my_image',
                'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
            }
            requests_get_mock.return_value = requests_response_mock

        result = self._invoke_cli(
            global_params=self.global_params,
            subcmd='push',
            subcmd_params=push_params
        )
        self.assertEqual(result.exit_code, 0)
        expected_commands = [
            mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
项目:skipper    作者:Stratoscale    | 项目源码 | 文件源码
def test_push_with_defaults_from_config_file(self, skipper_runner_run_mock, requests_get_mock):
        skipper_runner_run_mock.side_effect = [0, 0]
        push_params = ['my_image']
        with mock.patch('requests.Response', autospec=True) as requests_response_class_mock:
            requests_response_mock = requests_response_class_mock.return_value
            requests_response_mock.json.return_value = {
                'name': 'my_image',
                'tags': ['latest', 'aaaaaaa', 'bbbbbbb']
            }
            requests_get_mock.return_value = requests_response_mock

        self._invoke_cli(
            defaults=config.load_defaults(),
            subcmd='push',
            subcmd_params=push_params
        )
        expected_commands = [
            mock.call(['docker', 'tag', 'my_image:1234567', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'push', 'registry.io:5000/my_image:1234567']),
            mock.call(['docker', 'rmi', 'registry.io:5000/my_image:1234567']),
        ]
        skipper_runner_run_mock.assert_has_calls(expected_commands)
项目:umapi-client.py    作者:adobe-apiplatform    | 项目源码 | 文件源码
def test_get_retry_logging(log_stream):
    with mock.patch("umapi_client.connection.requests.Session.get") as mock_get:
        mock_get.return_value = MockResponse(429, headers={"Retry-After": "3"})
        stream, logger = log_stream
        params = dict(mock_connection_params)
        params["logger"] = logger
        conn = Connection(**params)
        pytest.raises(UnavailableError, conn.make_call, "")
        stream.flush()
        log = stream.getvalue()  # save as a local so can do pytest -l to see exact log
        assert log == """UMAPI timeout...service unavailable (code 429 on try 1)
Next retry in 3 seconds...
UMAPI timeout...service unavailable (code 429 on try 2)
Next retry in 3 seconds...
UMAPI timeout...service unavailable (code 429 on try 3)
UMAPI timeout...giving up after 3 attempts (6 seconds).
"""


# log_stream fixture defined in conftest.py
项目:umapi-client.py    作者:adobe-apiplatform    | 项目源码 | 文件源码
def test_post_retry_logging(log_stream):
    with mock.patch("umapi_client.connection.requests.Session.post") as mock_post:
        mock_post.return_value = MockResponse(429, headers={"Retry-After": "3"})
        stream, logger = log_stream
        params = dict(mock_connection_params)
        params["logger"] = logger
        conn = Connection(**params)
        pytest.raises(UnavailableError, conn.make_call, "", [3, 5])
        stream.flush()
        log = stream.getvalue()  # save as a local so can do pytest -l to see exact log
        assert log == """UMAPI timeout...service unavailable (code 429 on try 1)
Next retry in 3 seconds...
UMAPI timeout...service unavailable (code 429 on try 2)
Next retry in 3 seconds...
UMAPI timeout...service unavailable (code 429 on try 3)
UMAPI timeout...giving up after 3 attempts (6 seconds).
"""
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_plug(self, mock_plug):
        plg = extension.Extension(name="noop",
                                  entry_point="os-vif",
                                  plugin=NoOpPlugin,
                                  obj=None)
        with mock.patch('stevedore.extension.ExtensionManager.names',
                        return_value=['foobar']),\
                mock.patch('stevedore.extension.ExtensionManager.__getitem__',
                           return_value=plg):
            os_vif.initialize()
            info = mock.sentinel.info
            vif = mock.MagicMock()
            vif.plugin_name = 'noop'
            os_vif.plug(vif, info)
            mock_plug.assert_called_once_with(vif, info)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_unplug(self, mock_unplug):
        plg = extension.Extension(name="demo",
                                  entry_point="os-vif",
                                  plugin=NoOpPlugin,
                                  obj=None)
        with mock.patch('stevedore.extension.ExtensionManager.names',
                        return_value=['foobar']),\
                mock.patch('stevedore.extension.ExtensionManager.__getitem__',
                           return_value=plg):
            os_vif.initialize()
            info = mock.sentinel.info
            vif = mock.MagicMock()
            vif.plugin_name = 'noop'
            os_vif.unplug(vif, info)
            mock_unplug.assert_called_once_with(vif, info)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_wrap_consumer(self, m_retry_type, m_logging_type):
        consumer = mock.sentinel.consumer
        retry_handler = mock.sentinel.retry_handler
        logging_handler = mock.sentinel.logging_handler
        m_retry_type.return_value = retry_handler
        m_logging_type.return_value = logging_handler
        thread_group = mock.sentinel.thread_group

        with mock.patch.object(h_dis.EventPipeline, '__init__'):
            pipeline = h_pipeline.ControllerPipeline(thread_group)
            ret = pipeline._wrap_consumer(consumer)

        self.assertEqual(logging_handler, ret)
        m_logging_type.assert_called_with(retry_handler)
        m_retry_type.assert_called_with(consumer, exceptions=mock.ANY)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_annotate_diff_resource_vers_no_conflict(self, m_patch, m_count):
        m_count.return_value = list(range(1, 5))
        path = '/test'
        annotations = {'a1': 'v1', 'a2': 'v2'}
        resource_version = "123"
        new_resource_version = "456"
        conflicting_obj = {'metadata': {
            'annotations': annotations,
            'resourceVersion': resource_version}}
        good_obj = {'metadata': {
            'annotations': annotations,
            'resourceVersion': new_resource_version}}
        conflicting_data = jsonutils.dumps(conflicting_obj, sort_keys=True)
        good_data = jsonutils.dumps(good_obj, sort_keys=True)

        m_resp_conflict = mock.MagicMock()
        m_resp_conflict.ok = False
        m_resp_conflict.status_code = requests.codes.conflict
        m_resp_good = mock.MagicMock()
        m_resp_good.ok = True
        m_resp_good.json.return_value = conflicting_obj
        m_patch.side_effect = [m_resp_conflict, m_resp_good]

        with mock.patch.object(self.client, 'get') as m_get:
            m_get.return_value = good_obj
            self.assertEqual(annotations, self.client.annotate(
                path, annotations, resource_version=resource_version))

        m_patch.assert_has_calls([
            mock.call(self.base_url + path,
                      data=conflicting_data,
                      headers=mock.ANY,
                      cert=(None, None), verify=False),
            mock.call(self.base_url + path,
                      data=good_data,
                      headers=mock.ANY,
                      cert=(None, None), verify=False)])
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_annotate_diff_resource_vers_no_annotation(self, m_patch, m_count):
        m_count.return_value = list(range(1, 5))
        path = '/test'
        annotations = {'a1': 'v1', 'a2': 'v2'}
        annotating_resource_version = '123'
        annotating_obj = {'metadata': {
            'annotations': annotations,
            'resourceVersion': annotating_resource_version}}
        annotating_data = jsonutils.dumps(annotating_obj, sort_keys=True)

        new_resource_version = '456'
        new_obj = {'metadata': {
            'resourceVersion': new_resource_version}}

        resolution_obj = annotating_obj.copy()
        resolution_obj['metadata']['resourceVersion'] = new_resource_version
        resolution_data = jsonutils.dumps(resolution_obj, sort_keys=True)

        m_resp_conflict = mock.MagicMock()
        m_resp_conflict.ok = False
        m_resp_conflict.status_code = requests.codes.conflict
        m_resp_good = mock.MagicMock()
        m_resp_good.ok = True
        m_resp_good.json.return_value = resolution_obj
        m_patch.side_effect = (m_resp_conflict, m_resp_good)

        with mock.patch.object(self.client, 'get') as m_get:
            m_get.return_value = new_obj
            self.assertEqual(annotations, self.client.annotate(
                path, annotations,
                resource_version=annotating_resource_version))

        m_patch.assert_has_calls([
            mock.call(self.base_url + path,
                      data=annotating_data,
                      headers=mock.ANY,
                      cert=(None, None), verify=False),
            mock.call(self.base_url + path,
                      data=resolution_data,
                      headers=mock.ANY,
                      cert=(None, None), verify=False)])
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_annotate_diff_resource_vers_conflict(self, m_patch, m_count):
        m_count.return_value = list(range(1, 5))
        path = '/test'
        annotations = {'a1': 'v1', 'a2': 'v2'}
        resource_version = "123"
        new_resource_version = "456"
        conflicting_obj = {'metadata': {
            'annotations': annotations,
            'resourceVersion': resource_version}}
        actual_obj = {'metadata': {
            'annotations': {'a1': 'v2'},
            'resourceVersion': new_resource_version}}
        conflicting_data = jsonutils.dumps(conflicting_obj, sort_keys=True)

        m_resp_conflict = mock.MagicMock()
        m_resp_conflict.ok = False
        m_resp_conflict.status_code = requests.codes.conflict
        m_patch.return_value = m_resp_conflict

        with mock.patch.object(self.client, 'get') as m_get:
            m_get.return_value = actual_obj
            self.assertRaises(exc.K8sClientException,
                              self.client.annotate,
                              path, annotations,
                              resource_version=resource_version)
        m_patch.assert_called_once_with(self.base_url + path,
                                        data=conflicting_data,
                                        headers=mock.ANY,
                                        cert=(None, None), verify=False)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_run(self, m_count):
        event = mock.sentinel.event
        group = mock.sentinel.group
        m_queue = mock.Mock()
        m_queue.empty.return_value = True
        m_queue.get.return_value = event
        m_handler = mock.Mock()
        m_count.return_value = [1]
        async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock(),
                                      queue_depth=1)

        with mock.patch('time.sleep'):
            async_handler._run(group, m_queue)

        m_handler.assert_called_once_with(event)
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def test_run_empty(self, m_count):
        events = [mock.sentinel.event1, mock.sentinel.event2]
        group = mock.sentinel.group
        m_queue = mock.Mock()
        m_queue.empty.return_value = True
        m_queue.get.side_effect = events + [six_queue.Empty()]
        m_handler = mock.Mock()
        m_count.return_value = list(range(5))
        async_handler = h_async.Async(m_handler, mock.Mock(), mock.Mock())

        with mock.patch('time.sleep'):
            async_handler._run(group, m_queue)

        m_handler.assert_has_calls([mock.call(event) for event in events])
        self.assertEqual(len(events), m_handler.call_count)
项目:ironic-staging-drivers    作者:openstack    | 项目源码 | 文件源码
def test__prepare_variables_configdrive_file(self, mem_req_mock):
        i_info = self.node.instance_info
        i_info['configdrive'] = 'fake-content'
        self.node.instance_info = i_info
        self.node.save()
        self.config(tempdir='/path/to/tmpfiles')
        expected = {"image": {"url": "http://image",
                              "validate_certs": "yes",
                              "source": "fake-image",
                              "mem_req": 2000,
                              "disk_format": "qcow2",
                              "checksum": "md5:checksum"},
                    'configdrive': {'type': 'file',
                                    'location': '/path/to/tmpfiles/%s.cndrive'
                                    % self.node.uuid}}
        with mock.patch.object(ansible_deploy, 'open', mock.mock_open(),
                               create=True) as open_mock:
            with task_manager.acquire(self.context, self.node.uuid) as task:
                self.assertEqual(expected,
                                 ansible_deploy._prepare_variables(task))
            open_mock.assert_has_calls((
                mock.call('/path/to/tmpfiles/%s.cndrive' % self.node.uuid,
                          'w'),
                mock.call().__enter__(),
                mock.call().write('fake-content'),
                mock.call().__exit__(None, None, None)))