Python unittest.mock 模块,PropertyMock() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用unittest.mock.PropertyMock()

项目:astropy-bot    作者:astropy    | 项目源码 | 文件源码
def setup_method(self, method):

        self.patch_repo_config = patch.object(RepoHandler, 'get_config_value')
        self.patch_open_pull_requests = patch.object(RepoHandler, 'open_pull_requests')
        self.patch_labels = patch.object(PullRequestHandler, 'labels', new_callable=PropertyMock)
        self.patch_last_commit_date = patch.object(PullRequestHandler, 'last_commit_date', new_callable=PropertyMock)
        self.patch_find_comments = patch.object(PullRequestHandler, 'find_comments')
        self.patch_submit_comment = patch.object(PullRequestHandler, 'submit_comment')
        self.patch_close = patch.object(PullRequestHandler, 'close')

        self.autoclose_stale = self.patch_repo_config.start()
        self.open_pull_requests = self.patch_open_pull_requests.start()
        self.labels = self.patch_labels.start()
        self.last_commit_date = self.patch_last_commit_date.start()
        self.find_comments = self.patch_find_comments.start()
        self.submit_comment = self.patch_submit_comment.start()
        self.close = self.patch_close.start()
项目:blobxfer    作者:Azure    | 项目源码 | 文件源码
def test_worker_thread_transfer():
    u = ops.Uploader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
    u._transfer_queue.put(
        (mock.MagicMock, mock.MagicMock, mock.MagicMock, mock.MagicMock)
    )
    u._transfer_queue.put(
        (mock.MagicMock, mock.MagicMock, mock.MagicMock, mock.MagicMock)
    )
    u._process_transfer = mock.MagicMock()
    u._process_transfer.side_effect = [None, Exception()]

    with mock.patch(
            'blobxfer.operations.upload.Uploader.termination_check',
            new_callable=mock.PropertyMock) as patched_tc:
        patched_tc.side_effect = [False, False, True]
        u._worker_thread_transfer()
        assert u._process_transfer.call_count == 2
        assert len(u._exceptions) == 1
项目:blobxfer    作者:Azure    | 项目源码 | 文件源码
def test_worker_thread_upload(ts):
    u = ops.Uploader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
    u._general_options.concurrency.transfer_threads = 1

    u._transfer_set = mock.MagicMock()
    u._transfer_set.__len__.side_effect = [5, 0, 0, 0]
    u._upload_queue.put(mock.MagicMock)
    u._upload_queue.put(mock.MagicMock)
    u._process_upload_descriptor = mock.MagicMock()
    u._process_upload_descriptor.side_effect = [None, Exception()]

    with mock.patch(
            'blobxfer.operations.upload.Uploader.termination_check',
            new_callable=mock.PropertyMock) as patched_tc:
        patched_tc.side_effect = [False, False, False, False, True]
        u._worker_thread_upload()
        assert u._process_upload_descriptor.call_count == 2
        assert len(u._exceptions) == 1
项目:floto    作者:babbel    | 项目源码 | 文件源码
def test_is_timer_task_completed_multiple_pages(self, page1_response, page2_response, dt1, dt2,
            dt3, mocker):
        page1_response['events'] = [{'eventId':3,
                                     'eventType':'TimerFired',
                                     'eventTimestamp':dt3,
                                     'timerFiredEventAttributes':{'timerId':'t_id'}},
                                    {'eventId':2, 
                                     'eventType':'DecisionTaskCompleted', 
                                     'eventTimestamp':dt2}]
        page2_response['events'] = [{'eventId':1, 
                                     'eventType':'TimerStarted', 
                                     'eventTimestamp':dt1,
                                     'timerStartedEventAttributes':{'timerId':'t_id'}}]

        page2_response.pop('nextPageToken', None)

        swf_mock = SwfMock()
        swf_mock.pages['page2'] = page2_response
        mocker.patch('floto.api.Swf.client', new_callable=PropertyMock, return_value=swf_mock)

        h = floto.History(domain='d', task_list='tl', response=page1_response)
        assert h._has_next_event_page()
        assert h.is_timer_task_completed('t_id')
        assert not h._has_next_event_page()
项目:floto    作者:babbel    | 项目源码 | 文件源码
def test_get_id_previous_started_multi_page(self, page1_response, page2_response, dt1, dt2,
            dt3, mocker):
        page1_response['events'] = [{'eventId':3,
                                     'eventType':'DecisionTaskStarted',
                                     'eventTimestamp':dt3},
                                    {'eventId':2, 
                                     'eventType':'DecisionTaskCompleted', 
                                     'eventTimestamp':dt2}]
        page2_response['events'] = [{'eventId':1, 
                                     'eventType':'DecisionTaskStarted', 
                                     'eventTimestamp':dt1}]

        swf_mock = SwfMock()
        swf_mock.pages['page2'] = page2_response
        mocker.patch('floto.api.Swf.client', new_callable=PropertyMock, return_value=swf_mock)

        h = floto.History(domain='d', task_list='tl', response=page1_response)
        assert h.get_id_previous_started(page1_response['events'][0]) == 1
项目:floto    作者:babbel    | 项目源码 | 文件源码
def test_get_id_activity_task_event_multi_page(self, page1_response, page2_response, 
            dt1, dt2, dt3, mocker):
        activity_task_failed_event = {'eventId':3,
                'eventType':'ActivityTaskFailed',
                'eventTimestamp':dt3,
                'activityTaskFailedEventAttributes':{'scheduledEventId':1}}

        decision_task_completed = {'eventId':2,
                'eventType':'DecisionTaskCompleted',
                'eventTimestamp':dt2}

        activity_task_scheduled_event = {'eventId':1,
                'eventType':'ActivityTaskScheduled',
                'eventTimestamp':dt1,
                'activityTaskScheduledEventAttributes':{'activityId':'a_id'}}

        page1_response['events'] = [activity_task_failed_event, decision_task_completed]
        page2_response['events'] = [activity_task_scheduled_event]

        swf_mock = SwfMock()
        swf_mock.pages['page2'] = page2_response
        mocker.patch('floto.api.Swf.client', new_callable=PropertyMock, return_value=swf_mock)

        h = floto.History(domain='d', task_list='tl', response=page1_response)
        assert h.get_id_activity_task_event(activity_task_failed_event) == 'a_id'
项目:floto    作者:babbel    | 项目源码 | 文件源码
def test_register_type_does_not_raise(self, mocker):
        error_response = {'Error':{'Code':'TypeAlreadyExistsFault'}}
        client_error = botocore.exceptions.ClientError(error_response=error_response,
                operation_name="op_name")

        mock_function = Mock(side_effect=client_error)
        client_mock = type("ClientMock", (object,), {"register_workflow_type":mock_function})
        mocker.patch('floto.api.Swf.client', new_callable=PropertyMock, return_value=client_mock())

        swf = floto.api.Swf()
        args = {'domain': 'test_domain',
                'name': 'my_workflow_type',
                'version': 'v1'}
        workflow_type = floto.api.WorkflowType(**args)
        properties = workflow_type.swf_attributes
        swf.register_type(workflow_type)
        swf.client.register_workflow_type.assert_called_once_with(**properties)
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def test_submit_run(self, mock_requests):
        mock_requests.codes.ok = 200
        mock_requests.post.return_value.json.return_value = {'run_id': '1'}
        status_code_mock = mock.PropertyMock(return_value=200)
        type(mock_requests.post.return_value).status_code = status_code_mock
        json = {
          'notebook_task': NOTEBOOK_TASK,
          'new_cluster': NEW_CLUSTER
        }
        run_id = self.hook.submit_run(json)

        self.assertEquals(run_id, '1')
        mock_requests.post.assert_called_once_with(
            submit_run_endpoint(HOST),
            json={
                'notebook_task': NOTEBOOK_TASK,
                'new_cluster': NEW_CLUSTER,
            },
            auth=(LOGIN, PASSWORD),
            headers=USER_AGENT_HEADER,
            timeout=self.hook.timeout_seconds)
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def test_get_run_state(self, mock_requests):
        mock_requests.codes.ok = 200
        mock_requests.get.return_value.json.return_value = GET_RUN_RESPONSE
        status_code_mock = mock.PropertyMock(return_value=200)
        type(mock_requests.get.return_value).status_code = status_code_mock

        run_state = self.hook.get_run_state(RUN_ID)

        self.assertEquals(run_state, RunState(
            LIFE_CYCLE_STATE,
            RESULT_STATE,
            STATE_MESSAGE))
        mock_requests.get.assert_called_once_with(
            get_run_endpoint(HOST),
            json={'run_id': RUN_ID},
            auth=(LOGIN, PASSWORD),
            headers=USER_AGENT_HEADER,
            timeout=self.hook.timeout_seconds)
项目:bigworldgraph    作者:majdigital    | 项目源码 | 文件源码
def test_task_functions(self, input_patch, output_patch):
        with mock.patch(
            "bwg.tasks.relation_merging.RelationMergingTask.workflow_resources", new_callable=mock.PropertyMock()
        ) as workflow_mock:
            task_config = {
                "CORPUS_ENCODING": "",
                "RELATION_MERGING_OUTPUT_PATH": ""
            }

            output_patch.return_value = MockOutput()
            input_patch.return_value = (
                MockInput(RELATION_MERGING_TASK["input"][0]),
                MockInput(RELATION_MERGING_TASK["input"][0])
            )
            workflow_mock.__get__ = mock.Mock(return_value={})

            task = bwg.tasks.relation_merging.RelationMergingTask(task_config=task_config)

            # Testing
            self._test_get_relations_from_sentence_json()
            self._test_is_relevant_article(task)
            self._test_is_relevant_sentence(task)
项目:bigworldgraph    作者:majdigital    | 项目源码 | 文件源码
def test_task_functions(self, input_patch, output_patch):
        with mock.patch(
                "bwg.tasks.dependency_parsing.DependencyParseTask.workflow_resources", new_callable=mock.PropertyMock()
        ) as workflow_mock:
            task_config = {
                "STANFORD_DEPENDENCY_MODEL_PATH": "",
                "STANFORD_CORENLP_MODELS_PATH": "",
                "CORPUS_ENCODING": "",
                "DEPENDENCY_OUTPUT_PATH": ""
            }

            output_patch.return_value = MockOutput()
            input_patch.return_value = MockInput(DEPENDENCY_TASK["input"])
            workflow_mock.__get__ = mock.Mock(
                return_value={
                    "dependency_parser": MockParser()
                }
            )

            task = bwg.tasks.dependency_parsing.DependencyParseTask(task_config=task_config)

            # Testing
            self._test_task(task)
            self._test_dependency_parse(task)
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_find_payments(self):
        cls = LowestBalanceFirstMethod(Decimal('100.00'))
        s1 = Mock(spec_set=CCStatement)
        type(s1).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s1).principal = PropertyMock(return_value=Decimal('10.00'))
        s2 = Mock(spec_set=CCStatement)
        type(s2).minimum_payment = PropertyMock(return_value=Decimal('5.00'))
        type(s2).principal = PropertyMock(return_value=Decimal('25.00'))
        s3 = Mock(spec_set=CCStatement)
        type(s3).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s3).principal = PropertyMock(return_value=Decimal('1234.56'))
        s4 = Mock(spec_set=CCStatement)
        type(s4).minimum_payment = PropertyMock(return_value=Decimal('7.00'))
        type(s4).principal = PropertyMock(return_value=Decimal('3.00'))
        assert cls.find_payments([s1, s2, s3, s4]) == [
            Decimal('2.00'),
            Decimal('5.00'),
            Decimal('2.00'),
            Decimal('91.00')
        ]
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_find_payments_total_too_low(self):
        cls = LowestBalanceFirstMethod(Decimal('3.00'))
        s1 = Mock(spec_set=CCStatement)
        type(s1).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s1).principal = PropertyMock(return_value=Decimal('10.00'))
        s2 = Mock(spec_set=CCStatement)
        type(s2).minimum_payment = PropertyMock(return_value=Decimal('5.00'))
        type(s2).principal = PropertyMock(return_value=Decimal('25.00'))
        s3 = Mock(spec_set=CCStatement)
        type(s3).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s3).principal = PropertyMock(return_value=Decimal('1234.56'))
        s4 = Mock(spec_set=CCStatement)
        type(s4).minimum_payment = PropertyMock(return_value=Decimal('7.00'))
        type(s4).principal = PropertyMock(return_value=Decimal('3.00'))
        with pytest.raises(TypeError):
            cls.find_payments([s1, s2, s3, s4])
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_find_payments(self):
        cls = HighestBalanceFirstMethod(Decimal('100.00'))
        s1 = Mock(spec_set=CCStatement)
        type(s1).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s1).principal = PropertyMock(return_value=Decimal('10.00'))
        s2 = Mock(spec_set=CCStatement)
        type(s2).minimum_payment = PropertyMock(return_value=Decimal('5.00'))
        type(s2).principal = PropertyMock(return_value=Decimal('25.00'))
        s3 = Mock(spec_set=CCStatement)
        type(s3).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s3).principal = PropertyMock(return_value=Decimal('1234.56'))
        s4 = Mock(spec_set=CCStatement)
        type(s4).minimum_payment = PropertyMock(return_value=Decimal('7.00'))
        type(s4).principal = PropertyMock(return_value=Decimal('3.00'))
        assert cls.find_payments([s1, s2, s3, s4]) == [
            Decimal('2.00'),
            Decimal('5.00'),
            Decimal('86.00'),
            Decimal('7.00')
        ]
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_find_payments(self):
        cls = HighestInterestRateFirstMethod(Decimal('100.00'))
        s1 = Mock(spec_set=CCStatement)
        type(s1).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s1).apr = PropertyMock(return_value=Decimal('0.0100'))
        type(s1).principal = PropertyMock(return_value=Decimal('10.00'))
        s2 = Mock(spec_set=CCStatement)
        type(s2).minimum_payment = PropertyMock(return_value=Decimal('5.00'))
        type(s2).apr = PropertyMock(return_value=Decimal('0.0200'))
        type(s2).principal = PropertyMock(return_value=Decimal('25.00'))
        s3 = Mock(spec_set=CCStatement)
        type(s3).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s3).apr = PropertyMock(return_value=Decimal('0.0800'))
        type(s3).principal = PropertyMock(return_value=Decimal('1234.56'))
        s4 = Mock(spec_set=CCStatement)
        type(s4).minimum_payment = PropertyMock(return_value=Decimal('7.00'))
        type(s4).apr = PropertyMock(return_value=Decimal('0.0300'))
        type(s4).principal = PropertyMock(return_value=Decimal('3.00'))
        assert cls.find_payments([s1, s2, s3, s4]) == [
            Decimal('2.00'),
            Decimal('5.00'),
            Decimal('86.00'),
            Decimal('7.00')
        ]
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_find_payments_total_too_low(self):
        cls = HighestInterestRateFirstMethod(Decimal('3.00'))
        s1 = Mock(spec_set=CCStatement)
        type(s1).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s1).principal = PropertyMock(return_value=Decimal('10.00'))
        s2 = Mock(spec_set=CCStatement)
        type(s2).minimum_payment = PropertyMock(return_value=Decimal('5.00'))
        type(s2).principal = PropertyMock(return_value=Decimal('25.00'))
        s3 = Mock(spec_set=CCStatement)
        type(s3).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s3).principal = PropertyMock(return_value=Decimal('1234.56'))
        s4 = Mock(spec_set=CCStatement)
        type(s4).minimum_payment = PropertyMock(return_value=Decimal('7.00'))
        type(s4).principal = PropertyMock(return_value=Decimal('3.00'))
        with pytest.raises(TypeError):
            cls.find_payments([s1, s2, s3, s4])
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_find_payments(self):
        cls = LowestInterestRateFirstMethod(Decimal('100.00'))
        s1 = Mock(spec_set=CCStatement)
        type(s1).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s1).apr = PropertyMock(return_value=Decimal('0.0100'))
        type(s1).principal = PropertyMock(return_value=Decimal('10.00'))
        s2 = Mock(spec_set=CCStatement)
        type(s2).minimum_payment = PropertyMock(return_value=Decimal('5.00'))
        type(s2).apr = PropertyMock(return_value=Decimal('0.0200'))
        type(s2).principal = PropertyMock(return_value=Decimal('25.00'))
        s3 = Mock(spec_set=CCStatement)
        type(s3).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s3).apr = PropertyMock(return_value=Decimal('0.0800'))
        type(s3).principal = PropertyMock(return_value=Decimal('1234.56'))
        s4 = Mock(spec_set=CCStatement)
        type(s4).minimum_payment = PropertyMock(return_value=Decimal('7.00'))
        type(s4).apr = PropertyMock(return_value=Decimal('0.0300'))
        type(s4).principal = PropertyMock(return_value=Decimal('3.00'))
        assert cls.find_payments([s1, s2, s3, s4]) == [
            Decimal('86.00'),
            Decimal('5.00'),
            Decimal('2.00'),
            Decimal('7.00')
        ]
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_find_payments_total_too_low(self):
        cls = LowestInterestRateFirstMethod(Decimal('3.00'))
        s1 = Mock(spec_set=CCStatement)
        type(s1).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s1).principal = PropertyMock(return_value=Decimal('10.00'))
        s2 = Mock(spec_set=CCStatement)
        type(s2).minimum_payment = PropertyMock(return_value=Decimal('5.00'))
        type(s2).principal = PropertyMock(return_value=Decimal('25.00'))
        s3 = Mock(spec_set=CCStatement)
        type(s3).minimum_payment = PropertyMock(return_value=Decimal('2.00'))
        type(s3).principal = PropertyMock(return_value=Decimal('1234.56'))
        s4 = Mock(spec_set=CCStatement)
        type(s4).minimum_payment = PropertyMock(return_value=Decimal('7.00'))
        type(s4).principal = PropertyMock(return_value=Decimal('3.00'))
        with pytest.raises(TypeError):
            cls.find_payments([s1, s2, s3, s4])
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_next_with_transactions(self):
        b = Mock(spec_set=_BillingPeriod)
        b_next = Mock(spec_set=_BillingPeriod)
        type(b).next_period = PropertyMock(return_value=b_next)
        i = Mock(spec_set=_InterestCalculation)
        p = Mock(spec_set=_MinPaymentFormula)
        cls = CCStatement(
            i, Decimal('1.23'), p, b,
            end_balance=Decimal('1.50'), interest_amt=Decimal('0.27')
        )
        mock_stmt = Mock(spec_set=CCStatement)
        with patch('biweeklybudget.interest.CCStatement', autospec=True) as m:
            m.return_value = mock_stmt
            res = cls.next_with_transactions({'foo': 'bar'})
        assert res == mock_stmt
        assert m.mock_calls == [
            call(
                i, Decimal('1.50'), p, b_next, transactions={'foo': 'bar'}
            )
        ]
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_pay(self):
        b = Mock(spec_set=_BillingPeriod)
        b_next = Mock(spec_set=_BillingPeriod)
        type(b_next).payment_date = PropertyMock(return_value=date(2017, 1, 15))
        type(b).next_period = PropertyMock(return_value=b_next)
        i = Mock(spec_set=_InterestCalculation)
        p = Mock(spec_set=_MinPaymentFormula)
        cls = CCStatement(
            i, Decimal('1.23'), p, b,
            end_balance=Decimal('1.50'), interest_amt=Decimal('0.27')
        )
        mock_stmt = Mock(spec_set=CCStatement)
        with patch(
            'biweeklybudget.interest.CCStatement.next_with_transactions',
            autospec=True
        ) as m:
            m.return_value = mock_stmt
            res = cls.pay(Decimal('98.76'))
        assert res == mock_stmt
        assert m.mock_calls == [
            call(cls, {date(2017, 1, 15): Decimal('98.76')})
        ]
项目:snowflake-connector-python    作者:snowflakedb    | 项目源码 | 文件源码
def test_get_s3_file_object_http_400_error():
    """
    Tests Get S3 file object with HTTP 400 error. Looks like HTTP 400 is
    returned when AWS token expires and S3.Object.load is called.
    """
    load_method = MagicMock(
        side_effect=botocore.exceptions.ClientError(
            {'Error': {'Code': u'400', 'Message': 'Bad Request'}},
            operation_name='mock load'))
    s3object = MagicMock(load=load_method)
    client = Mock()
    client.Object.return_value = s3object
    client.load.return_value = None
    type(client).s3path = PropertyMock(return_value='s3://testbucket/')
    meta = {
        u'client': client,
        u'stage_info': {
            u'location': 'sfc-teststage/rwyitestacco/users/1234/',
            u'locationType': 'S3',
        }
    }
    filename = "/path1/file2.txt"
    akey = SnowflakeS3Util.get_file_header(meta, filename)
    assert akey is None
    assert meta['result_status'] == ResultStatus.RENEW_TOKEN
项目:snowflake-connector-python    作者:snowflakedb    | 项目源码 | 文件源码
def _init_rest(application, post_requset):
    connection = MagicMock()
    connection._login_timeout = 120
    connection.errorhandler = Mock(return_value=None)
    type(connection).application = PropertyMock(return_value=application)
    type(connection)._internal_application_name = PropertyMock(
        return_value=CLIENT_NAME
    )
    type(connection)._internal_application_version = PropertyMock(
        return_value=CLIENT_VERSION
    )

    rest = SnowflakeRestful(host='testaccount.snowflakecomputing.com',
                            port=443,
                            connection=connection)
    rest._post_request = post_requset
    return rest
项目:snowflake-connector-python    作者:snowflakedb    | 项目源码 | 文件源码
def test_get_s3_file_object_http_400_error():
    """
    Tests Get S3 file object with HTTP 400 error. Looks like HTTP 400 is
    returned when AWS token expires and S3.Object.load is called.
    """
    load_method = MagicMock(
        side_effect=botocore.exceptions.ClientError(
            {'Error': {'Code': u'400', 'Message': 'Bad Request'}},
            operation_name='mock load'))
    s3object = MagicMock(load=load_method)
    client = Mock()
    client.Object.return_value = s3object
    client.load.return_value = None
    type(client).s3path = PropertyMock(return_value='s3://testbucket/')
    meta = {
        u'client': client,
        u'stage_info': {
            u'location': 'sfc-teststage/rwyitestacco/users/1234/',
            u'locationType': 'S3',
        }
    }
    filename = "/path1/file2.txt"
    akey = SnowflakeS3Util.get_file_header(meta, filename)
    assert akey is None
    assert meta['result_status'] == ResultStatus.RENEW_TOKEN
项目:snowflake-connector-python    作者:snowflakedb    | 项目源码 | 文件源码
def _init_rest(application, post_requset):
    connection = MagicMock()
    connection._login_timeout = 120
    connection.errorhandler = Mock(return_value=None)
    type(connection).application = PropertyMock(return_value=application)
    type(connection)._internal_application_name = PropertyMock(
        return_value=CLIENT_NAME
    )
    type(connection)._internal_application_version = PropertyMock(
        return_value=CLIENT_VERSION
    )

    rest = SnowflakeRestful(host='testaccount.snowflakecomputing.com',
                            port=443,
                            connection=connection)
    rest._post_request = post_requset
    return rest
项目:snowflake-connector-python    作者:snowflakedb    | 项目源码 | 文件源码
def test_get_s3_file_object_http_400_error():
    """
    Tests Get S3 file object with HTTP 400 error. Looks like HTTP 400 is
    returned when AWS token expires and S3.Object.load is called.
    """
    load_method = MagicMock(
        side_effect=botocore.exceptions.ClientError(
            {'Error': {'Code': u'400', 'Message': 'Bad Request'}},
            operation_name='mock load'))
    s3object = MagicMock(load=load_method)
    client = Mock()
    client.Object.return_value = s3object
    client.load.return_value = None
    type(client).s3path = PropertyMock(return_value='s3://testbucket/')
    meta = {
        u'client': client,
        u'stage_info': {
            u'location': 'sfc-teststage/rwyitestacco/users/1234/',
            u'locationType': 'S3',
        }
    }
    filename = "/path1/file2.txt"
    akey = SnowflakeS3Util.get_file_header(meta, filename)
    assert akey is None
    assert meta['result_status'] == ResultStatus.RENEW_TOKEN
项目:snowflake-connector-python    作者:snowflakedb    | 项目源码 | 文件源码
def _init_rest(application, post_requset):
    connection = MagicMock()
    connection._login_timeout = 120
    connection.errorhandler = Mock(return_value=None)
    type(connection).application = PropertyMock(return_value=application)
    type(connection)._internal_application_name = PropertyMock(
        return_value=CLIENT_NAME
    )
    type(connection)._internal_application_version = PropertyMock(
        return_value=CLIENT_VERSION
    )

    rest = SnowflakeRestful(host='testaccount.snowflakecomputing.com',
                            port=443,
                            connection=connection)
    rest._post_request = post_requset
    return rest
项目:searchConsole    作者:get-data-    | 项目源码 | 文件源码
def setUp(self):
        self.clientquery = {'property_uri': 'https://www.example.com/',
                            'siteMode': 'en-us',
                            'clientName': 'Example',
                            'query_date': '2016-09-01'}
        self.emptyresponse = '''{"responseAggregationType": "byPage"}'''
        self.p = (os.path.dirname(os.path.abspath(__file__)))
        self.build_response_data = '%s/build_response_data.json' % (self.p)
        self.http_auth = HttpMockSequence([
            ({'status': '200'}, open(self.build_response_data, 'rb').read()),
            ({'status': '200'}, self.emptyresponse.encode('UTF-8'))
        ])
        self.service = build('webmasters',
                        'v3',
                        http=self.http_auth,
                        developerKey='mocked_api_key_1234')
        # Mock the service attribute within Apiclient
        self.mocked_prop = PropertyMock(return_value=self.service)
项目:searchConsole    作者:get-data-    | 项目源码 | 文件源码
def setUp(self):
        self.clientquery = {'property_uri': 'https://www.example.com/',
                            'siteMode': 'en-us',
                            'clientName': 'Example',
                            'query_date': '2016-09-01'}
        self.response = '''{"error": {"errors": [{"domain": "global","reason": "forbidden", 
        "message": "User does not have sufficient permission for site 'https://www.example.com/'. See also: https://support.google.com/webmasters/answer/2451999."}],
        "code": 403, 
        "message": "User does not have sufficient permission for site 'https://www.example.com/'. See also: https://support.google.com/webmasters/answer/2451999."}}'''
        self.p = (os.path.dirname(os.path.abspath(__file__)))
        self.build_response_data = '%s/build_response_data.json' % (self.p)
        self.http_auth = HttpMockSequence([
            ({'status': '403'}, open(self.build_response_data, 'rb').read()),
            ({'status': '403'}, self.response.encode('UTF-8'))
        ])
        self.service = build('webmasters',
                        'v3',
                        http=self.http_auth,
                        developerKey='mocked_api_key_1234')
        self.mocked_prop = PropertyMock(return_value=self.service)
项目:km-api    作者:knowmetools    | 项目源码 | 文件源码
def test_get_queryset(km_user_factory, media_resource_factory):
    """
    The queryset should return all the items owned by the Know Me user
    that is provided as context.
    """
    km_user = km_user_factory()
    media_resource_factory(km_user=km_user)

    media_resource_factory()

    with mock.patch(
            'know_me.serializers.fields.MediaResourceField.context',
            new_callable=mock.PropertyMock) as mock_context:
        mock_context.return_value = {
            'km_user': km_user,
        }

        field = fields.MediaResourceField()

        result = field.get_queryset()
        expected = km_user.media_resources.all()

        assert list(result) == list(expected)
项目:km-api    作者:knowmetools    | 项目源码 | 文件源码
def test_to_internal_value(media_resource_factory):
    """
    ``.to_internal_value()`` should return the media resource that has
    the provided ID.
    """
    resource = media_resource_factory()

    with mock.patch(
            'know_me.serializers.fields.MediaResourceField.context',
            new_callable=mock.PropertyMock) as mock_context:
        mock_context.return_value = {
            'km_user': resource.km_user,
        }
        field = fields.MediaResourceField()

        assert field.to_internal_value(resource.id) == resource
项目:km-api    作者:knowmetools    | 项目源码 | 文件源码
def test_to_internal_value_non_existent(km_user_factory):
    """
    If there is no media resource matching the provided ID, a
    ``ValidationError`` should be raised.
    """
    with mock.patch(
            'know_me.serializers.fields.MediaResourceField.context',
            new_callable=mock.PropertyMock) as mock_context:
        mock_context.return_value = {
            'km_user': km_user_factory(),
        }

        field = fields.MediaResourceField()

        with pytest.raises(ValidationError):
            field.to_internal_value(1)
项目:km-api    作者:knowmetools    | 项目源码 | 文件源码
def test_to_representation(media_resource_factory, serializer_context):
    """
    ``.to_representation()`` should return the serialized version of the
    provided media resource.
    """
    resource = media_resource_factory()
    serializer = MediaResourceSerializer(resource, context=serializer_context)

    with mock.patch(
            'know_me.serializers.fields.MediaResourceField.context',
            new_callable=mock.PropertyMock) as mock_context:
        mock_context.return_value = serializer_context

        field = fields.MediaResourceField()

        assert field.to_representation(resource) == serializer.data
项目:objection    作者:sensepost    | 项目源码 | 文件源码
def test_lists_unreadable_android_directory_using_helper_method(self, mock_runner):
        mock_response = mock.Mock()
        mock_response.is_successful.return_value = True
        type(mock_response).data = mock.PropertyMock(return_value={
            'path': '/foo/bar',
            'readable': False,
            'writable': False,
            'files': {}
        })

        mock_runner.return_value.get_last_message.return_value = mock_response

        with capture(_ls_android, ['/foo/bar']) as o:
            output = o

        self.assertEqual(output, '\nReadable: No  Writable: No\n')
项目:objection    作者:sensepost    | 项目源码 | 文件源码
def test_prints_ios_environment_via_platform_helpers(self, mock_runner):
        mock_response = mock.Mock()
        mock_response.is_successful.return_value = True
        type(mock_response).data = mock.PropertyMock(return_value={'foo': '/bar'})

        mock_runner.return_value.get_last_message.return_value = mock_response

        with capture(_get_ios_environment) as o:
            output = o

        expected_output = """
Name    Path
------  ------
foo     /bar
"""

        self.assertEqual(output, expected_output)
项目:objection    作者:sensepost    | 项目源码 | 文件源码
def test_prints_android_environment_via_platform_helpers(self, mock_runner):
        mock_response = mock.Mock()
        mock_response.is_successful.return_value = True
        type(mock_response).data = mock.PropertyMock(return_value={'foo': '/bar'})

        mock_runner.return_value.get_last_message.return_value = mock_response

        with capture(_get_android_environment) as o:
            output = o

        expected_output = """
Name    Path
------  ------
foo     /bar
"""

        self.assertEqual(output, expected_output)
项目:aws-ssh    作者:arusahni    | 项目源码 | 文件源码
def test_get_ssh_args_instance_name(env_mock, exit_mock):
    with patch('os.getcwd') as cwd_mock:
        cwd_mock.return_value = '/path/to/cwd'
        project = MagicMock()
        key_path_mock = PropertyMock(return_value='/path/to/key.pem')
        type(project).key_path = key_path_mock
        env_mock.is_initialized.return_value = True
        env_mock.return_value.find_project.return_value = project
        instance = MagicMock()
        ip_mock = PropertyMock(return_value='0.0.0.0')
        type(instance).ip = ip_mock
        instance.get_user_name.return_value = 'test_user'
        project.get_instance.return_value = instance
        args = cli.get_ssh_args(['fooinst'])
        env_mock.return_value.find_project.assert_called_with('/path/to/cwd')
        project.get_instance.assert_called_with('fooinst')
        key_path_mock.assert_called_with()
        instance.get_user_name.assert_called_with()
        ip_mock.assert_called_with()
        assert len(args) == 3
        assert args[0] == '/path/to/key.pem'
        assert args[1] == 'test_user'
        assert args[2] == '0.0.0.0'
项目:scar    作者:grycap    | 项目源码 | 文件源码
def test_get_access_key(self, mock_session):
        credentials = MagicMock()
        access_key = PropertyMock(return_value='test')
        mock_session.return_value.get_credentials.return_value = credentials
        type(credentials).access_key = access_key
        access_key = AwsClient().get_access_key()
        self.assertEqual(access_key, 'test')
项目:scar    作者:grycap    | 项目源码 | 文件源码
def test_add_lambda_permissions(self, mock_lambda_client, mock_config, mock_uuid):
        type(mock_config).lambda_name = PropertyMock(return_value='test_name')
        mock_uuid.return_value = 'test_uuid'
        AwsClient().add_lambda_permissions('test_bucket')
        self.assertEqual(mock_lambda_client.call_count, 1)
        self.assertTrue(call().add_permission(Action='lambda:InvokeFunction', FunctionName='test_name', Principal='s3.amazonaws.com', SourceArn='arn:aws:s3:::test_bucket', StatementId='test_uuid')
                        in mock_lambda_client.mock_calls)
项目:astropy-bot    作者:astropy    | 项目源码 | 文件源码
def setup_method(self, method):

        # Closed PR already tested in TestHook.test_invalid()
        pr_json = {'state': 'open',
                   'user': {'login': 'user'},
                   'head': {'repo': {'full_name': 'repo'},
                            'ref': 'branch'}}

        self.patch_repo_config = patch.object(RepoHandler, 'get_config_value')
        self.patch_pr_json = patch.object(
            PullRequestHandler, 'json', new_callable=PropertyMock,
            return_value=pr_json)
        self.patch_pr_comments = patch.object(
            PullRequestHandler, 'find_comments', return_value=[])
        self.patch_pr_labels = patch.object(
            PullRequestHandler, 'labels', new_callable=PropertyMock)
        self.patch_submit_comment = patch.object(
            PullRequestHandler, 'submit_comment', return_value='url')
        self.patch_set_status = patch.object(PullRequestHandler, 'set_status')
        self.patch_check_changelog = patch('changebot.blueprints.pull_request_checker.check_changelog_consistency')

        self.changelog_cfg = self.patch_repo_config.start()
        self.patch_pr_json.start()
        self.comment_ids = self.patch_pr_comments.start()
        self.labels = self.patch_pr_labels.start()
        self.submit_comment = self.patch_submit_comment.start()
        self.set_status = self.patch_set_status.start()
        self.issues = self.patch_check_changelog.start()
项目:astropy-bot    作者:astropy    | 项目源码 | 文件源码
def test_is_closed(self, state, answer):
        with patch('changebot.github.github_api.IssueHandler.json', new_callable=PropertyMock) as mock_json:  # noqa
            mock_json.return_value = {'state': state}
            assert self.issue.is_closed is answer
项目:directory-ui-buyer    作者:uktrade    | 项目源码 | 文件源码
def test_company_profile_edit_condition_address_feature_flag_off(
    letter_sent, preverified, expected, settings, sso_request
):
    settings.FEATURE_COMPANIES_HOUSE_OAUTH2_ENABLED = False

    view = views.CompanyProfileEditView()
    view.request = sso_request
    property_mock = PropertyMock(return_value={
        'is_verification_letter_sent': letter_sent,
        'verified_with_preverified_enrolment': preverified,
    })
    with patch.object(view, 'company_profile', new_callable=property_mock):
        assert view.condition_show_address() is expected
项目:alexa-browser-client    作者:richtier    | 项目源码 | 文件源码
def mock_snowboy(request):
    path = (
        'command_lifecycle.wakeword.SnowboyWakewordDetector.'
        'wakeword_library_import_path'
    )
    stub = patch(path, PropertyMock(return_value='unittest.mock.Mock'))
    yield stub.start()
    stub.stop()
项目:django-service-objects    作者:mixxorz    | 项目源码 | 文件源码
def build_request(self, method_return_value, FILES_return_value):
        request = MagicMock()
        method = PropertyMock(return_value=method_return_value)
        type(request).method = method
        FILES = PropertyMock(return_value=FILES_return_value)
        type(request).FILES = FILES

        return request, method, FILES
项目:django-service-objects    作者:mixxorz    | 项目源码 | 文件源码
def test_get_service_input(self):
        form = MagicMock()
        cleaned_data = PropertyMock(return_value={})
        type(form).cleaned_data = cleaned_data

        view = MockView()
        rv = view.get_service_input(form)

        cleaned_data.assert_called_once_with()
        self.assertEqual({}, rv)
项目:lattice_mc    作者:bjmorgan    | 项目源码 | 文件源码
def test_old_tracer_correlation( self ):
        s = self.simulation
        s.atoms = Mock()
        s.atoms.sum_dr_squared = PropertyMock( return_value=10.0 )
        s.number_of_jumps = 5
        self.assertEqual( s.old_tracer_correlation, 2.0 )
项目:lattice_mc    作者:bjmorgan    | 项目源码 | 文件源码
def test_collective_diffusion_coefficient_per_atom( self ):
        s = self.simulation
        with patch( 'lattice_mc.simulation.Simulation.collective_diffusion_coefficient', new_callable=PropertyMock ) as mock_cdc:
            mock_cdc.return_value = 8.0
            s.number_of_atoms = 2
            self.assertEqual( s.collective_diffusion_coefficient_per_atom, 4.0 )
项目:blobxfer    作者:Azure    | 项目源码 | 文件源码
def test_worker_thread_transfer():
    s = ops.SyncCopy(mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
    s._transfer_queue.put(mock.MagicMock())
    s._transfer_queue.put(mock.MagicMock())
    s._process_synccopy_descriptor = mock.MagicMock()
    s._process_synccopy_descriptor.side_effect = [None, Exception()]

    with mock.patch(
            'blobxfer.operations.synccopy.SyncCopy.termination_check',
            new_callable=mock.PropertyMock) as patched_tc:
        patched_tc.side_effect = [False, False, True]
        s._worker_thread_transfer()
        assert s._process_synccopy_descriptor.call_count == 2
        assert len(s._exceptions) == 1
项目:blobxfer    作者:Azure    | 项目源码 | 文件源码
def test_check_for_uploads_from_md5():
    u = ops.Uploader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
    u._md5_offload = mock.MagicMock()
    u._post_md5_skip_on_check = mock.MagicMock()

    with mock.patch(
            'blobxfer.operations.upload.Uploader.termination_check_md5',
            new_callable=mock.PropertyMock) as patched_tcm:
        patched_tcm.side_effect = [False, False, False, True, True]
        u._md5_offload.pop_done_queue.side_effect = [
            None, mock.MagicMock(), None
        ]

        u._check_for_uploads_from_md5()
        assert u._post_md5_skip_on_check.call_count == 1
项目:blobxfer    作者:Azure    | 项目源码 | 文件源码
def test_worker_thread_disk():
    with mock.patch(
            'blobxfer.operations.download.Downloader.termination_check',
            new_callable=mock.PropertyMock) as patched_tc:
        d = ops.Downloader(
            mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
        d._general_options.concurrency.disk_threads = 1

        d._disk_queue = mock.MagicMock()
        d._disk_queue.get.side_effect = [
            (mock.MagicMock(), mock.MagicMock(), mock.MagicMock()),
        ]
        d._process_data = mock.MagicMock()
        patched_tc.side_effect = [False, True]

        d._worker_thread_disk()
        assert d._process_data.call_count == 1

    with mock.patch(
            'blobxfer.operations.download.Downloader.termination_check',
            new_callable=mock.PropertyMock) as patched_tc:
        d = ops.Downloader(
            mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
        d._general_options.concurrency.disk_threads = 1

        d._disk_queue = mock.MagicMock()
        d._disk_queue.get.side_effect = [
            (mock.MagicMock(), mock.MagicMock(), mock.MagicMock()),
        ]
        d._process_data = mock.MagicMock()
        d._process_data.side_effect = Exception()
        patched_tc.side_effect = [False, True]

        d._worker_thread_disk()
        assert len(d._exceptions) == 1
项目:floto    作者:babbel    | 项目源码 | 文件源码
def test_events_by_id_multi_page(self, mocker, page1_response, page2_response):
        swf_mock = SwfMock()
        swf_mock.pages['page2'] = page2_response
        mocker.patch('floto.api.Swf.client', new_callable=PropertyMock, return_value=swf_mock)
        h = floto.History(domain='d', task_list='tl', response=page1_response)
        assert h.lowest_event_id == 2
        assert h.get_event(1)
        assert h.lowest_event_id == 1