Python mock.patch 模块,object() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用mock.patch.object()

项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_verify_user(self):
        """
            Test verify user if passed user is unverified in group
        """
        usrmgr = self.__get_dummy_object()
        config = {"foogroup": {"unverified": ["@foouser"]}}
        self.__set_config(usrmgr, config)

        with patch.object(usrmgr, "_UserManager__load_config"),\
                patch.object(usrmgr, "_UserManager__save_config"):
            result = usrmgr.verify_user(1337, "@foouser", "foogroup")
            self.assertTrue(result)
            expected_config = {
                "foogroup": {"users": [{"id": 1337,
                                        "username": "@foouser"}]}
            }
            self.assertEqual(usrmgr.config, expected_config)
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_add_user_already_in_grp(self):
        """
            Test adduser if the user is already in the passed group
        """
        usrmgr = self.__get_dummy_object()
        config = {"foogroup": {"unverified": ["@foouser"]}}
        self.__set_config(usrmgr, config)

        with patch.object(usrmgr, "_UserManager__load_config"),\
                patch.object(usrmgr, "_UserManager__save_config"):
            result = usrmgr.add_user("@foouser", "foogroup")
            self.assertFalse(result)
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_add_user_verified(self):
        """
            Test adduser if the username and the user_id is passed
        """
        usrmgr = self.__get_dummy_object()
        self.__set_config(usrmgr, {})

        with patch.object(usrmgr, "_UserManager__load_config"),\
                patch.object(usrmgr, "_UserManager__save_config"):
            result = usrmgr.add_user("@foouser", "foogroup", user_id=1337)
            self.assertTrue(result)
            expected_config = {
                "foogroup": {"users": [{"id": 1337,
                                        "username": "@foouser"}]}
            }
            self.assertEqual(usrmgr.config, expected_config)
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def make_test_handler(testcase, *args, **kwargs):
    """
    Returns a TestHandler which will be used by the given testcase. This
    handler can be used to test log messages.

    Parameters
    ----------
    testcase: unittest.TestCase
        The test class in which the log handler will be used.
    *args, **kwargs
        Forwarded to the new TestHandler object.

    Returns
    -------
    handler: logbook.TestHandler
        The handler to use for the test case.
    """
    handler = TestHandler(*args, **kwargs)
    testcase.addCleanup(handler.close)
    return handler
项目:wagtail-sharing    作者:cfpb    | 项目源码 | 文件源码
def setUp(self):
        def test_view():
            pass  # pragma: no cover

        root_patterns = [
            url(r'^foo/$', url, name='foo'),
            url(r'^((?:[\w\-]+/)*)$', url, name='wagtail_serve'),
            url(r'^bar/$', url, name='bar'),
        ]

        self.patcher = patch.object(
            wagtail.wagtailcore.urls,
            'urlpatterns',
            root_patterns
        )
        self.patcher.start()
        self.addCleanup(self.patcher.stop)

        reload(wagtailsharing.urls)
        self.urlpatterns = wagtailsharing.urls.urlpatterns
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def test_replace_loggroup_section(mocked_configparser_class):

    config_dir = api_utils.user_config_dir(cli.lecli.__name__)
    if not os.path.exists(api_utils.CONFIG_FILE_PATH):
        if not os.path.exists(config_dir):
            os.makedirs(config_dir)

    loggroups_section = api_utils.LOGGROUPS_SECTION
    config_parser_mock = mocked_configparser_class.return_value
    config_parser_mock.add_section(loggroups_section)
    with patch.object(api_utils.CONFIG, 'items',
                       return_value=[('test-log-group-favs', ["test-log-key1", "test-log-key2"])]):
        api_utils.replace_loggroup_section()
        assert not api_utils.CONFIG.has_section(loggroups_section)
        assert config_parser_mock.has_section(api_utils.CLI_FAVORITES_SECTION)

    try:
        os.remove(api_utils.CONFIG_FILE_PATH)
        os.rmdir(config_dir)
    except OSError:
        pass
项目:ucsmsdk_samples    作者:CiscoUcs    | 项目源码 | 文件源码
def test_valid_server_pool_add_slot(query_mock, add_mo_mock, commit_mock):
    query_mock.return_value = ComputePool(parent_mo_or_dn="org-root",
                                          name="test-pool")
    add_mo_mock.return_value = True
    commit_mock.return_value = True
    handle = UcsHandle('169.254.1.1', 'admin', 'password')

    # Scenario: Default parameters
    pool_retval = server_pool_add_slot(handle, 1, 8)
    # Verify we were passed back the correct object type
    assert isinstance(pool_retval, ComputePooledSlot)
    # Verify the ID we gave it was assigned correctly
    assert pool_retval.chassis_id == str(1)
    assert pool_retval.slot_id == str(8)


# Patch UcsHandle.commit to simulate ucsm interaction w/o real ucsm
项目:nameko-amqp-retry    作者:nameko    | 项目源码 | 文件源码
def test_non_backoff_exception(
        self, container_factory, rabbit_config, rpc_proxy
    ):
        """ Non-backoff exceptions are handled normally
        """
        class Boom(Exception):
            pass

        class Service(object):
            name = "service"

            @rpc
            def method(self):
                raise Boom()

        container = container_factory(Service, rabbit_config)
        container.start()

        with pytest.raises(RemoteError) as exc_info:
            rpc_proxy.service.method()
        assert exc_info.value.exc_type == "Boom"
项目:nameko-amqp-retry    作者:nameko    | 项目源码 | 文件源码
def container(self, container_factory):

        class Service(object):
            name = "service"

            @rpc
            def nothing_expected(self):
                raise Backoff()

            @rpc(expected_exceptions=self.UserException)
            def something_expected(self):
                raise Backoff()

            @rpc(expected_exceptions=(self.UserException, Backoff))
            def backoff_expected(self):
                raise Backoff()

        config = {'AMQP_URI': 'memory://localhost'}
        container = container_factory(Service, config)
        return container
项目:nameko-amqp-retry    作者:nameko    | 项目源码 | 文件源码
def entrypoint_tracker():

    class CallTracker(object):

        def __init__(self):
            self.calls = []

        def __len__(self):
            return len(self.calls)

        def track(self, **call):
            self.calls.append(call)

        def get_results(self):
            return [call['result'] for call in self.calls]

        def get_exceptions(self):
            return [call['exc_info'] for call in self.calls]

    return CallTracker()
项目:nameko-amqp-retry    作者:nameko    | 项目源码 | 文件源码
def container(self, container_factory, rabbit_config, counter):

        class Service(object):
            name = "service"

            backoff = BackoffPublisher()

            @rpc
            def slow(self):
                if counter["slow"].increment() <= 1:
                    raise SlowBackoff()
                return "slow"

            @rpc
            def quick(self):
                if counter["quick"].increment() <= 1:
                    raise QuickBackoff()
                return "quick"

        container = container_factory(Service, rabbit_config)
        container.start()

        return container
项目:nameko-amqp-retry    作者:nameko    | 项目源码 | 文件源码
def container(self, container_factory, rabbit_config, counter):

        class Service(object):
            name = "service"

            backoff = BackoffPublisher()

            @rpc
            def bad(self):
                class BadBackoff(Backoff):
                    schedule = (-10, )
                raise BadBackoff()

        container = container_factory(Service, rabbit_config)
        container.start()

        return container
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_noop_model_direct_update(self):
        """ Tests that calling update on a model with no changes will do nothing. """
        m0 = TestUpdateModel.create(count=5, text='monkey')

        with patch.object(self.session, 'execute') as execute:
            m0.update()
        assert execute.call_count == 0

        with patch.object(self.session, 'execute') as execute:
            m0.update(count=5)
        assert execute.call_count == 0

        with patch.object(self.session, 'execute') as execute:
            m0.update(partition=m0.partition)

        with patch.object(self.session, 'execute') as execute:
            m0.update(cluster=m0.cluster)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_noop_model_assignation_update(self):
        """ Tests that assigning the same value on a model will do nothing. """
        # Create object and fetch it back to eliminate any hidden variable
        # cache effect.
        m0 = TestUpdateModel.create(count=5, text='monkey')
        m1 = TestUpdateModel.get(partition=m0.partition, cluster=m0.cluster)

        with patch.object(self.session, 'execute') as execute:
            m1.save()
        assert execute.call_count == 0

        with patch.object(self.session, 'execute') as execute:
            m1.count = 5
            m1.save()
        assert execute.call_count == 0

        with patch.object(self.session, 'execute') as execute:
            m1.partition = m0.partition
            m1.save()
        assert execute.call_count == 0

        with patch.object(self.session, 'execute') as execute:
            m1.cluster = m0.cluster
            m1.save()
        assert execute.call_count == 0
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_only_set_values_is_updated(self):
        """
        Test the updates work as expected when an object is deleted
        @since 3.9
        @jira_ticket PYTHON-657
        @expected_result the non updated column is None and the
        updated column has the set value

        @test_category object_mapper
        """

        ModelWithDefault.create(id=1, mf={1: 1}, dummy=1).save()

        item = ModelWithDefault.filter(id=1).first()
        ModelWithDefault.objects(id=1).delete()
        item.mf = {1: 2}
        udt, udt_default = UDT(age=1, mf={2:3}), UDT(age=1, mf={2:3})
        item.udt, item.udt_default = udt, udt_default
        item.save()

        self.assertEqual(ModelWithDefault.get()._as_dict(),
                         {'id': 1, 'dummy': None, 'mf': {1: 2}, "udt": udt, "udt_default": udt_default})
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_speculative_and_timeout(self):
        """
        Test to ensure the timeout is honored when using speculative execution
        @since 3.10
        @jira_ticket PYTHON-750
        @expected_result speculative retries be schedule every fixed period, during the maximum
        period of the timeout.

        @test_category metadata
        """
        # We mock this so no messages are sent, otherwise a reponse might arrive
        # and we would not know how many hosts we queried
        with patch.object(Connection, "send_msg", return_value = 100) as mocked_send_msg:

            statement = SimpleStatement("INSERT INTO test3rf.test (k, v) VALUES (0, 1);", is_idempotent=True)

            # An OperationTimedOut is placed here in response_future,
            # that's why we can't call session.execute,which would raise it, but
            # we have to directly wait for the event
            response_future = self.session.execute_async(statement, execution_profile='spec_ep_brr_lim', timeout=2.2)
            response_future._event.wait(4)
            self.assertIsInstance(response_future._final_exception, OperationTimedOut)

            # This is because 2.2 / 0.4 + 1 = 6
            self.assertEqual(len(response_future.attempted_hosts), 6)
项目:crankycoin    作者:cranklin    | 项目源码 | 文件源码
def test_get_genesis_block_whenCalled_thenCreatesAndReturnsBlockWithGenesisTransactions(self):
        genesis_transaction_one = Transaction(
            "0",
            "03dd1e57d05d9cab1d8d9b727568ad951ac2d9ecd082bc36f69e021b8427812924",
            500000,
            0
        )
        genesis_transaction_two = Transaction(
            "0",
            "03dd1e3defd36c8c0c7282ca1a324851efdb15f742cac0c5b258ef7b290ece9e5d",
            500000,
            0
        )
        genesis_transactions = [genesis_transaction_one, genesis_transaction_two]

        with patch.object(Blockchain, '__init__', return_value=None) as patched_init, \
                patch('crankycoin.blockchain.Block') as patched_Block:
            patched_Block._calculate_block_hash.return_value = "mock_block_hash"
            subject = Blockchain()
            genesis_block = subject.get_genesis_block()

            patched_Block.assert_called_once_with(0, genesis_transactions, 0, 0, 0)
项目:crankycoin    作者:cranklin    | 项目源码 | 文件源码
def test_check_hash_and_hash_pattern_whenBlockHasValidHashAndPattern_thenReturnsTrue(self):
        mock_block = Mock(Block)

        transaction = Mock(Transaction)
        transaction.source = "from"
        transaction.timestamp = 1498923800
        transaction.destination = "to"
        transaction.amount = 50
        transaction.signature = "signature"
        transaction.tx_hash = "transaction_hash"

        with patch.object(Blockchain, '__init__', return_value=None) as patched_init, \
                patch.object(Blockchain, 'calculate_hash_difficulty', return_value=4) as patched_calculate_hash_difficulty:
            mock_block.current_hash = "0000_valid_block_hash"
            mock_block.index = 35
            mock_block.previous_hash = "0000_valid_previous_hash"
            mock_block.transactions = [transaction]
            mock_block.nonce = 37
            mock_block.timestamp = 12341234
            subject = Blockchain()

            resp = subject._check_hash_and_hash_pattern(mock_block)

            self.assertIsNone(resp)
项目:crankycoin    作者:cranklin    | 项目源码 | 文件源码
def test_check_hash_and_hash_pattern_whenBlockHasInvalidHash_thenReturnsFalse(self):
        mock_block = Mock(Block)

        transaction = Mock(Transaction)
        transaction.source = "from"
        transaction.timestamp = 1498923800
        transaction.destination = "to"
        transaction.amount = 50
        transaction.signature = "signature"
        transaction.tx_hash = "transaction_hash"

        with patch.object(Blockchain, '__init__', return_value=None) as patched_init:
            mock_block.current_hash = "0000_valid_block_hash"
            mock_block.index = 35
            mock_block.previous_hash = "0000_valid_previous_hash"
            mock_block.transactions = [transaction]
            mock_block.nonce = 37
            mock_block.timestamp = 12341234
            subject = Blockchain()

            with self.assertRaises(InvalidHash) as context:
                subject._check_hash_and_hash_pattern(mock_block)
                self.assertTrue("Block Hash Mismatch" in str(context.exception))
项目:crankycoin    作者:cranklin    | 项目源码 | 文件源码
def test_check_hash_and_hash_pattern_whenBlockHasInvalidPattern_thenReturnsFalse(self):
        mock_block = Mock(Block)

        transaction = Mock(Transaction)
        transaction.source = "from"
        transaction.timestamp = 1498923800
        transaction.destination = "to"
        transaction.amount = 50
        transaction.signature = "signature"
        transaction.tx_hash = "transaction_hash"

        with patch.object(Blockchain, '__init__', return_value=None) as patched_init, \
                patch.object(Blockchain, 'calculate_hash_difficulty', return_value=4) as patched_calculate_hash_difficulty:
            mock_block.current_hash = "invalid_block_hash"
            mock_block.index = 35
            mock_block.previous_hash = "0000_valid_previous_hash"
            mock_block.transactions = [transaction]
            mock_block.nonce = 37
            mock_block.timestamp = 12341234
            subject = Blockchain()

            with self.assertRaises(InvalidHash) as context:
                subject._check_hash_and_hash_pattern(mock_block)
                self.assertTrue("Incompatible Block Hash" in str(context.exception))
项目:crankycoin    作者:cranklin    | 项目源码 | 文件源码
def test_mine_block_whenOneTransaction_andIncorrectTransactionHash_thenReturnsNone(self):
        transaction = Mock(Transaction)
        transaction.source = "from"
        transaction.timestamp = 1498923800
        transaction.destination = "to"
        transaction.amount = 25
        transaction.fee = 0.1
        transaction.signature = "signature"
        transaction.tx_hash = "incorrect_transaction_hash"

        latest_block = Mock(Block)
        latest_block.index = 35
        latest_block.current_hash = "latest_block_hash"
        with patch.object(Blockchain, '__init__', return_value=None) as patched_init, \
                patch.object(Blockchain, 'get_latest_block', return_value=latest_block) as patched_get_latest_block, \
                patch.object(Blockchain, 'pop_next_unconfirmed_transaction', side_effect=[transaction, None]) as patched_pop_next_unconfirmed_transaction:
            subject = Blockchain()

            resp = subject.mine_block("reward_address")

            self.assertIsNone(resp)
            self.assertEqual(patched_pop_next_unconfirmed_transaction.call_count, 2)
项目:crankycoin    作者:cranklin    | 项目源码 | 文件源码
def test_mine_block_whenOneDuplicateTransaction_thenReturnsNone(self):
        transaction = Mock(Transaction)
        transaction.source = "from"
        transaction.timestamp = 1498923800
        transaction.destination = "to"
        transaction.amount = 25
        transaction.fee = 0.1
        transaction.signature = "signature"
        transaction.tx_hash = "transaction_hash"

        block_id_with_same_transaction = 38
        latest_block = Mock(Block)
        latest_block.index = 35
        latest_block.current_hash = "latest_block_hash"
        with patch.object(Blockchain, '__init__', return_value=None) as patched_init, \
                patch.object(Blockchain, 'get_latest_block', return_value=latest_block) as patched_get_latest_block, \
                patch.object(Blockchain, 'pop_next_unconfirmed_transaction', side_effect=[transaction, None]) as patched_pop_next_unconfirmed_transaction, \
                patch.object(Blockchain, 'find_duplicate_transactions', return_value=block_id_with_same_transaction) as patched_find_duplicate_transactions:
            subject = Blockchain()

            resp = subject.mine_block("reward_address")

            self.assertIsNone(resp)
            self.assertEqual(patched_pop_next_unconfirmed_transaction.call_count, 2)
            patched_find_duplicate_transactions.asssert_called_once_with("transaction_hash")
项目:crankycoin    作者:cranklin    | 项目源码 | 文件源码
def test_push_unconfirmed_transaction_thenPushesTransactionAndReturnsNone(self):
        transaction_one = {
            'from': 'from',
            'timestamp': 1498923800,
            'to': 'to',
            'amount': 1,
            'signature': 'signature_one',
            'hash': "transaction_hash_one"
        }
        with patch.object(Blockchain, '__init__', return_value=None) as patched_init:
            subject = Blockchain()
            subject.unconfirmed_transactions = Queue()

            resp = subject.push_unconfirmed_transaction(transaction_one)

            self.assertTrue(resp)
            self.assertEqual(subject.unconfirmed_transactions.qsize(), 1)
项目:crankycoin    作者:cranklin    | 项目源码 | 文件源码
def test_broadcast_transaction_thenBroadcastsToAllNodes(self):
        with patch.object(FullNode, '__init__', return_value=None) as patched_init, \
                patch.object(FullNode, 'request_nodes_from_all') as patched_request_nodes_from_all, \
                patch("crankycoin.time.time", return_value="1508823223") as patched_time_time, \
                patch("crankycoin.requests.post") as patched_requests:

            transaction = Transaction("source", "destination", 0, 0)
            node = FullNode("127.0.0.1", "reward_address")
            node.full_nodes = {"127.0.0.1", "127.0.0.2", "127.0.0.3"}

            node.broadcast_transaction(transaction)

            patched_request_nodes_from_all.assert_called_once()
            patched_requests.assert_has_calls([
                call("http://127.0.0.1:30013/transactions", json={'transaction': '{"amount": 0, "destination": "destination", "fee": 0, "signature": null, "source": "source", "timestamp": 1508823223, "tx_hash": null}'}),
                call("http://127.0.0.2:30013/transactions", json={'transaction': '{"amount": 0, "destination": "destination", "fee": 0, "signature": null, "source": "source", "timestamp": 1508823223, "tx_hash": null}'}),
                call("http://127.0.0.3:30013/transactions", json={'transaction': '{"amount": 0, "destination": "destination", "fee": 0, "signature": null, "source": "source", "timestamp": 1508823223, "tx_hash": null}'})
            ], True)
项目:crankycoin    作者:cranklin    | 项目源码 | 文件源码
def test_broadcast_transaction_whenRequestException_thenFailsGracefully(self):
        with patch.object(FullNode, '__init__', return_value=None) as patched_init, \
                patch.object(FullNode, 'request_nodes_from_all') as patched_request_nodes_from_all, \
                patch("crankycoin.time.time", return_value="1508823223") as patched_time_time, \
                patch("crankycoin.requests.post", side_effect=requests.exceptions.RequestException()) as patched_requests:

            transaction = Transaction("source", "destination", 0, 0)
            node = FullNode("127.0.0.1", "reward_address")
            node.full_nodes = {"127.0.0.1", "127.0.0.2", "127.0.0.3"}

            node.broadcast_transaction(transaction)

            patched_request_nodes_from_all.assert_called_once()
            patched_requests.assert_has_calls([
                call("http://127.0.0.1:30013/transactions", json={'transaction': '{"amount": 0, "destination": "destination", "fee": 0, "signature": null, "source": "source", "timestamp": 1508823223, "tx_hash": null}'}),
                call("http://127.0.0.2:30013/transactions", json={'transaction': '{"amount": 0, "destination": "destination", "fee": 0, "signature": null, "source": "source", "timestamp": 1508823223, "tx_hash": null}'}),
                call("http://127.0.0.3:30013/transactions", json={'transaction': '{"amount": 0, "destination": "destination", "fee": 0, "signature": null, "source": "source", "timestamp": 1508823223, "tx_hash": null}'})
            ], True)
项目:crankycoin    作者:cranklin    | 项目源码 | 文件源码
def test_request_block_whenIndexIsNumeric_thenRequestsCorrectBlockFromNode(self):
        mock_response = Mock()
        mock_response.status_code = 200
        mock_response.json.return_value = '{"nonce": 12345, "index": 29, "transactions": [], "timestamp": 1234567890, "current_hash": "current_hash", "previous_hash": "previous_hash"}'

        with patch.object(FullNode, '__init__', return_value=None) as patched_init, \
                patch("crankycoin.node.Block.current_hash", new_callable=PropertyMock) as patched_block_current_hash, \
                patch("crankycoin.requests.get", return_value=mock_response) as patched_requests:
            patched_block_current_hash.return_value = "current_hash"
            node = FullNode("127.0.0.1", "reward_address")

            block = node.request_block("127.0.0.2", "30013", 29)

            self.assertIsNotNone(block)
            self.assertEqual(block.index, 29)
            self.assertEqual(block.transactions, [])
            self.assertEqual(block.previous_hash, "previous_hash")
            self.assertEqual(block.current_hash, "current_hash")
            self.assertEqual(block.timestamp, 1234567890)
            self.assertEqual(block.nonce, 12345)
            patched_requests.assert_called_once_with('http://127.0.0.2:30013/block/29')
项目:pg2kinesis    作者:handshake    | 项目源码 | 文件源码
def test__enter__(slot):
    # returns its self

    with patch.object(slot, '_get_connection', side_effects=[Mock(), Mock()]) as mock_gc, \
            patch.object(slot, '_send_keepalive') as mock_ka:
        assert slot == slot.__enter__(), 'Returns itself'
        assert mock_gc.call_count == 2

        assert call.set_isolation_level(0) in slot._normal_conn.method_calls, 'make sure we are in autocommit'
        assert call.cursor() in slot._repl_conn.method_calls, 'we opened a cursor'

        assert not mock_ka.called, "with no window we didn't start keep alive"

    slot._keepalive_window = 1
    with patch.object(slot, '_get_connection', side_effects=[Mock(), Mock()]) as mock_gc, \
            patch.object(slot, '_send_keepalive') as mock_ka:
        slot.__enter__()
        assert mock_ka.called, " we started up keepalive"
项目:pg2kinesis    作者:handshake    | 项目源码 | 文件源码
def test_delete_slot(slot):
    with patch.object(psycopg2.ProgrammingError, 'pgcode',
                      new_callable=PropertyMock,
                      return_value=psycopg2.errorcodes.UNDEFINED_OBJECT):
        pe = psycopg2.ProgrammingError()
        slot._repl_cursor.drop_replication_slot = Mock(side_effect=pe)
        slot.delete_slot()
    slot._repl_cursor.drop_replication_slot.assert_called_with('pg2kinesis')

    with patch.object(psycopg2.ProgrammingError, 'pgcode',
                      new_callable=PropertyMock,
                      return_value=-1):
        pe = psycopg2.ProgrammingError()
        slot._repl_cursor.create_replication_slot = Mock(side_effect=pe)
        with pytest.raises(psycopg2.ProgrammingError) as e_info:
            slot.delete_slot()
            slot._repl_cursor.drop_replication_slot.assert_called_with('pg2kinesis')

            assert e_info.value.pgcode == -1

    slot._repl_cursor.create_replication_slot = Mock(side_effect=Exception)
    with pytest.raises(Exception):
        slot.delete_slot()
        slot._repl_cursor.drop_replication_slot.assert_called_with('pg2kinesis')
项目:pg2kinesis    作者:handshake    | 项目源码 | 文件源码
def test__init__():
    mock_client = Mock()
    with patch.object(boto3, 'client', return_value=mock_client):
        error_response = {'Error': {'Code': 'ResourceInUseException'}}
        mock_client.create_stream = Mock(side_effect=ClientError(error_response, 'create_stream'))

        StreamWriter('blah')
        assert mock_client.create_stream.called
        assert call.get_waiter('stream_exists') in mock_client.method_calls, "We handled stream existence"

        error_response = {'Error': {'Code': 'Something else'}}
        mock_client.create_stream = Mock(side_effect=ClientError(error_response, 'create_stream'))

        mock_client.reset_mock()
        with pytest.raises(ClientError):
            StreamWriter('blah')
            assert mock_client.create_stream.called
            assert call.get_waiter('stream_exists') not in mock_client.method_calls, "never reached"
项目:cqlmapper    作者:reddit    | 项目源码 | 文件源码
def test_noop_model_direct_update(self):
        """
        Tests that calling update on a model with no changes will do nothing.
        """
        m0 = TestUpdateModel.create(self.conn, count=5, text='monkey')

        with patch.object(self.conn.session, 'execute') as execute:
            m0.update(self.conn)
        assert execute.call_count == 0

        with patch.object(self.conn.session, 'execute') as execute:
            m0.update(self.conn, count=5)
        assert execute.call_count == 0

        with self.assertRaises(ValidationError):
            m0.update(self.conn, partition=m0.partition)

        with self.assertRaises(ValidationError):
            m0.update(self.conn, cluster=m0.cluster)
项目:health-mosconi    作者:GNUHealth-Mosconi    | 项目源码 | 文件源码
def test0060_update_only_if_parent_is_modified(self):
        'The left and right fields must only be updated if parent is modified'
        with Transaction().start(DB_NAME, USER, context=CONTEXT):
            records = self.mptt.search([
                    ('parent', '=', None),
                    ])
            with patch.object(self.mptt, '_update_tree') as mock:
                self.mptt.write(records, {'name': 'Parent Records'})
                self.assertFalse(mock.called)

                first_parent, second_parent = records[:2]
                self.mptt.write(list(first_parent.childs), {
                        'parent': second_parent.id,
                        })

                self.assertTrue(mock.called)
项目:kolla-kubernetes-personal    作者:rthallisey    | 项目源码 | 文件源码
def runTest(self):
        with patch.object(sys, 'argv', self.build_args):
            LOG.info("Running with args %s", self.build_args)
            bad_results, good_results, unmatched_results = build.main()

        failures = 0
        for image, result in six.iteritems(bad_results):
            if image in self.excluded_images:
                if result is 'error':
                    continue
                failures = failures + 1
                LOG.warning(">>> Expected image '%s' to fail, please update"
                            " the excluded_images in source file above if the"
                            " image build has been fixed.", image)
            else:
                if result is not 'error':
                    continue
                failures = failures + 1
                LOG.critical(">>> Expected image '%s' to succeed!", image)

        for image in unmatched_results.keys():
            LOG.warning(">>> Image '%s' was not matched", image)

        self.assertEqual(failures, 0, "%d failure(s) occurred" % failures)
项目:mentos    作者:daskos    | 项目源码 | 文件源码
def test_enforcing_wait_time_sleeps(mocker):
    f = concurrent.Future()
    f.set_result(None)
    mock_gen_sleep = mocker.patch.object(retry.gen, 'sleep')
    mock_gen_sleep.return_value = f

    def in_the_future(_):
        return 60

    request = Mock()
    policy = retry.RetryPolicy(try_limit=3, sleep_func=in_the_future)

    yield policy.enforce(request)
    assert mock_gen_sleep.called is False

    yield policy.enforce(request)
    mock_gen_sleep.assert_called_once_with(60)
项目:Brightside    作者:BrighterCommand    | 项目源码 | 文件源码
def test_logging_a_handler(self):
        """s
        Given that I have a handler decorated for logging
        When I call that handler
        Then I should receive logs indicating the call and return of the handler
        * N.b. This is an example of using decorators to extend the Brightside pipeline
        """
        handler = MyCommandHandler()
        request = MyCommand()
        self._subscriber_registry.register(MyCommand, lambda: handler)
        logger = logging.getLogger("tests.handlers_testdoubles")
        with patch.object(logger, 'log') as mock_log:
            self._commandProcessor.send(request)

        mock_log.assert_has_calls([call(logging.DEBUG, "Entering handle " + str(request)),
                                   call(logging.DEBUG, "Exiting handle " + str(request))])
项目:emory-baggins    作者:emory-libraries    | 项目源码 | 文件源码
def test_data_files(self, lsdibag):
        with patch.object(lsdibag, 'image_files') as mock_imgfiles:
            with patch.object(lsdibag, 'page_text_files') as mock_txtfiles:

                mock_imgfiles.return_value = [
                    '001.tif', '002.tif', '003.tif'
                ]
                mock_txtfiles.return_value = [
                    '001.txt', '002.txt', '003.txt'
                    '001.pos', '002.pos', '003.pos'
                ]

                datafiles = lsdibag.data_files()

                # should include pdf and ocr file
                assert lsdibag.item.pdf in datafiles
                assert lsdibag.item.ocr_file in datafiles
                # should all image and text files
                for imgfile in mock_imgfiles.return_value:
                    assert imgfile in datafiles
                for txtfile in mock_txtfiles.return_value:
                    assert txtfile in datafiles
项目:bitstore    作者:datahq    | 项目源码 | 文件源码
def setUp(self):

        # Cleanup
        self.addCleanup(patch.stopall)

        # Request patch
        self.request = patch.object(module, 'request').start()
        # Various patches
        self.services = patch.object(module, 'services').start()

        self.original_config = dict(module.config)
        module.config['STORAGE_BUCKET_NAME'] = self.bucket = 'buckbuck'
        module.config['STORAGE_ACCESS_KEY_ID'] = ''
        module.config['STORAGE_SECRET_ACCESS_KEY'] = ''
        module.config['ACCESS_KEY_EXPIRES_IN'] = ''
        module.config['STORAGE_PATH_PATTERN'] = '{owner}/{dataset}/{path}'
        self.s3 = boto3.client('s3')
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def patch(self, method):
        _m = patch.object(self.obj, method)
        mock = _m.start()
        self.addCleanup(_m.stop)
        return mock
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_has_access_is_in_group(self):
        """
            Test has access if passed user is in group
        """
        user, usrmgr_mock = self.__get_test_instance(
            "@foouser", 1337, group="foogroup")
        usrmgr_mock.return_value.user_is_in_group.return_value = True
        with patch.object(user, "save"):
            user.has_access("foogroup")
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_has_access_is_not_in_group(self):
        """
            Test has access if passed user is not in group
        """
        user, usrmgr_mock = self.__get_test_instance(
            "@foouser", 1337, group="bargroup")
        usrmgr_mock.return_value.user_is_in_group.return_value = False
        usrmgr_mock.return_value.verify_user.return_value = False
        with patch.object(user, "save"):
            user.has_access("foogroup")
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def __get_dummy_object():
        """Returns a dummy usermanager object."""
        with patch("os.mkdir"):
            return UserManager()
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def __set_config(usrmgr, config):
        """Sets the config attribute of the passed usermanager instance"""
        with patch.object(usrmgr, "_UserManager__save_config"):
            usrmgr.config = config
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_get_set_config(self):
        """
            Test getting and setting the config attribute
        """
        usrmgr = self.__get_dummy_object()
        config = {"hello": {}, "world": {}}
        with patch.object(usrmgr, "_UserManager__save_config"):
            usrmgr.config = config

        with patch.object(usrmgr, "_UserManager__load_config"):
            self.assertEqual(usrmgr.config, config)
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_userid_is_not_verified_grp(self):
        """
            Test user id is not verified in group check
        """
        usrmgr = self.__get_dummy_object()
        config = {"foogroup": {"users": [{"id": 1337, "username": "@foo"}]}}
        self.__set_config(usrmgr, config)

        with patch.object(usrmgr, "_UserManager__load_config"):
            result = usrmgr.userid_is_verified_in_group("foogroup", 1234)
            self.assertFalse(result)
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_username_is_verified_grp(self):
        """
            Test username is verified in group check
        """
        usrmgr = self.__get_dummy_object()
        config = {"foogroup": {"users": [{"id": 1337, "username": "@foo"}]}}
        self.__set_config(usrmgr, config)

        with patch.object(usrmgr, "_UserManager__load_config"):
            result = usrmgr.username_is_verified_in_group("foogroup", "@foo")
            self.assertTrue(result)
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_usernm_is_not_verified_grp(self):
        """
            Test username is not verified in group check
        """
        usrmgr = self.__get_dummy_object()
        config = {"foogroup": {"users": [{"id": 1337, "username": "@foo"}]}}
        self.__set_config(usrmgr, config)

        with patch.object(usrmgr, "_UserManager__load_config"):
            result = usrmgr.username_is_verified_in_group("foogroup", "@bar")
            self.assertFalse(result)
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_user_is_unverified_grp(self):
        """
            Test username is unverified in group check
        """
        usrmgr = self.__get_dummy_object()
        config = {"foogroup": {"unverified": ["@foo"]}}
        self.__set_config(usrmgr, config)

        with patch.object(usrmgr, "_UserManager__load_config"):
            result = usrmgr.user_is_unverified_in_group("foogroup", "@foo")
            self.assertTrue(result)
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_user_is_not_unverified_grp(self):
        """
            Test username is not unverified in group check
        """
        usrmgr = self.__get_dummy_object()
        config = {"foogroup": {"unverified": ["@foo"]}}
        self.__set_config(usrmgr, config)

        with patch.object(usrmgr, "_UserManager__load_config"):
            result = usrmgr.user_is_unverified_in_group("foogroup", "@bar")
            self.assertFalse(result)
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_user_is_in_group_username(self):
        """
            Test user is in group check if username is passed
        """
        usrmgr = self.__get_dummy_object()
        config = {"foogroup": {"unverified": ["@foo"]}}
        self.__set_config(usrmgr, config)

        with patch.object(usrmgr, "_UserManager__load_config"):
            result = usrmgr.user_is_in_group("foogroup", username="@foo")
            self.assertTrue(result)
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_user_is_in_group_userid(self):
        """
            Test user is in group check if userid is passed
        """
        usrmgr = self.__get_dummy_object()
        config = {"foogroup": {"users": [{"id": 1337, "username": "@foo"}]}}
        self.__set_config(usrmgr, config)

        with patch.object(usrmgr, "_UserManager__load_config"):
            result = usrmgr.user_is_in_group("foogroup", user_id=1337)
            self.assertTrue(result)
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_verify_user_no_verify(self):
        """
            Test verify user if passed user is not unverified in group
        """
        usrmgr = self.__get_dummy_object()
        config = {"foogroup": {"unverified": []}}
        self.__set_config(usrmgr, config)

        with patch.object(usrmgr, "_UserManager__load_config"):
            result = usrmgr.verify_user(1337, "@foouser", "foogroup")
            self.assertFalse(result)