Python unittest 模块,mock() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用unittest.mock()

项目:abodepy    作者:MisterWil    | 项目源码 | 文件源码
def tests_event_registration(self):
        """Tests that events register correctly."""
        # Get the event controller
        events = self.abode.events
        self.assertIsNotNone(events)

        # Create mock callback
        callback = Mock()

        # Test that a valid event registers
        self.assertTrue(
            events.add_event_callback(TIMELINE.ALARM_GROUP, callback))

        # Test that no event group returns false
        self.assertFalse(events.add_event_callback(None, callback))

        # Test that an invalid event throws exception
        with self.assertRaises(abodepy.AbodeException):
            events.add_event_callback("lol", callback)
项目:abodepy    作者:MisterWil    | 项目源码 | 文件源码
def tests_timeline_registration(self):
        """Tests that timeline events register correctly."""
        # Get the event controller
        events = self.abode.events
        self.assertIsNotNone(events)

        # Create mock callback
        callback = Mock()

        # Test that a valid timeline event registers
        self.assertTrue(
            events.add_timeline_callback(
                TIMELINE.CAPTURE_IMAGE, callback))

        # Test that no timeline event returns false
        self.assertFalse(events.add_timeline_callback(None, callback))

        # Test that an invalid timeline event string throws exception
        with self.assertRaises(abodepy.AbodeException):
            events.add_timeline_callback("lol", callback)

        # Test that an invalid timeline event dict throws exception
        with self.assertRaises(abodepy.AbodeException):
            events.add_timeline_callback({"lol": "lol"}, callback)
项目:abodepy    作者:MisterWil    | 项目源码 | 文件源码
def tests_multi_events_callback(self):
        """Tests that multiple event updates callback correctly."""
        # Get the event controller
        events = self.abode.events
        self.assertIsNotNone(events)

        # Create mock callback
        callback = Mock()

        # Register our events
        self.assertTrue(
            events.add_event_callback(
                [TIMELINE.ALARM_GROUP, TIMELINE.CAPTURE_GROUP],
                callback))

        # Call our events callback method and trigger a capture group event
        # pylint: disable=protected-access
        event_json = json.loads(IRCAMERA.timeline_event())
        events._on_timeline_update(event_json)

        # Ensure our callback was called
        callback.assert_called_with(event_json)
项目:abodepy    作者:MisterWil    | 项目源码 | 文件源码
def tests_multi_timeline_callback(self):
        """Tests that multiple timeline updates callback correctly."""
        # Get the event controller
        events = self.abode.events
        self.assertIsNotNone(events)

        # Create mock callback
        callback = Mock()

        # Register our events
        self.assertTrue(
            events.add_timeline_callback(
                [TIMELINE.CAPTURE_IMAGE, TIMELINE.OPENED], callback))

        # Call our events callback method and trigger a capture group event
        # pylint: disable=protected-access
        event_json = json.loads(IRCAMERA.timeline_event())
        events._on_timeline_update(event_json)

        # Ensure our callback was called
        callback.assert_called_with(event_json)
项目:abodepy    作者:MisterWil    | 项目源码 | 文件源码
def tests_automations_callback(self):
        """Tests that automation updates callback correctly."""
        # Get the event controller
        events = self.abode.events
        self.assertIsNotNone(events)

        # Create mock callbacks
        automation_callback = Mock()

        # Register our events
        self.assertTrue(
            events.add_event_callback(
                TIMELINE.AUTOMATION_EDIT_GROUP, automation_callback))

        # Call our events callback method and trigger a capture group event
        # pylint: disable=protected-access
        events._on_automation_update('{}')

        # Our capture callback should get one, but our alarm should not
        automation_callback.assert_called_with('{}')
项目:azure-python-devtools    作者:Azure    | 项目源码 | 文件源码
def test_subscription_recording_processor_for_request(self):
        replaced_subscription_id = str(uuid.uuid4())
        rp = SubscriptionRecordingProcessor(replaced_subscription_id)

        uri_templates = ['https://management.azure.com/subscriptions/{}/providers/Microsoft.ContainerRegistry/'
                         'checkNameAvailability?api-version=2017-03-01',
                         'https://graph.windows.net/{}/applications?api-version=1.6']

        for template in uri_templates:
            mock_sub_id = str(uuid.uuid4())
            mock_request = mock.Mock()
            mock_request.uri = template.format(mock_sub_id)
            mock_request.body = self._mock_subscription_request_body(mock_sub_id)

            rp.process_request(mock_request)
            self.assertEqual(mock_request.uri, template.format(replaced_subscription_id))
            self.assertEqual(mock_request.body,
                             self._mock_subscription_request_body(replaced_subscription_id))
项目:pip-update-requirements    作者:alanhamlett    | 项目源码 | 文件源码
def setUp(self):
        # disable logging while testing
        logging.disable(logging.CRITICAL)

        self.patched = {}
        if hasattr(self, 'patch_these'):
            for patch_this in self.patch_these:
                namespace = patch_this[0] if isinstance(patch_this, (list, set)) else patch_this

                patcher = mock.patch(namespace)
                mocked = patcher.start()
                mocked.reset_mock()
                self.patched[namespace] = mocked

                if isinstance(patch_this, (list, set)) and len(patch_this) > 0:
                    retval = patch_this[1]
                    if callable(retval):
                        retval = retval()
                    mocked.return_value = retval
项目:python-ogn-client    作者:glidernet    | 项目源码 | 文件源码
def test_reset_kill_reconnect(self):
        client = AprsClient(aprs_user='testuser', aprs_filter='')
        client.connect()

        # .run() should be allowed to execute after .connect()
        mock_callback = mock.MagicMock(
            side_effect=lambda raw_msg: client.disconnect())

        self.assertFalse(client._kill)
        client.run(callback=mock_callback, autoreconnect=True)

        # After .disconnect(), client._kill should be True
        self.assertTrue(client._kill)
        self.assertEqual(mock_callback.call_count, 1)

        # After we reconnect, .run() should be able to run again
        mock_callback.reset_mock()
        client.connect()
        client.run(callback=mock_callback, autoreconnect=True)
        self.assertEqual(mock_callback.call_count, 1)
项目:pyconnectome    作者:neurospin    | 项目源码 | 文件源码
def test_normal_execution(self, mock_isfile, mock_glob):
        """ Test the normal behaviour of the function.
        """
        # Set the mocked function returned values.
        mock_isfile.side_effect = [True]
        mock_glob.return_value = ["/my/path/mock_output"]

        # Test execution
        fslreorient2std(**self.kwargs)

        self.assertEqual([
            mock.call(["which", "fslreorient2std"],
                      env={}, stderr=-1, stdout=-1),
            mock.call(["fslreorient2std",
                      self.kwargs["input_image"],
                      self.kwargs["output_image"]],
                      cwd=None, env={}, stderr=-1, stdout=-1)],
            self.mock_popen.call_args_list)
        self.assertEqual(len(self.mock_env.call_args_list), 1)
项目:smbus2    作者:kplindegaard    | 项目源码 | 文件源码
def mock_ioctl(fd, command, msg):
    print("Mocking ioctl")
    assert fd == MOCK_FD
    assert command is not None

    # Reproduce ioctl read operations
    if command == I2C_SMBUS and msg.read_write == I2C_SMBUS_READ:
        offset = msg.command
        if msg.size == I2C_SMBUS_BYTE_DATA:
            msg.data.contents.byte = test_buffer[offset]
        elif msg.size == I2C_SMBUS_WORD_DATA:
            msg.data.contents.word = test_buffer[offset+1]*256 + test_buffer[offset]
        elif msg.size == I2C_SMBUS_I2C_BLOCK_DATA:
            for k in range(msg.data.contents.byte):
                msg.data.contents.block[k+1] = test_buffer[offset+k]

# Override open, close and ioctl with our mock functions
项目:Securitas    作者:ArrenH    | 项目源码 | 文件源码
def test_checkOtp(self, mock):
        reply = {"requestId": "123", "status": "0000", "statusMessage": "Success", "credentialId": "",
                 "credentialType": "STANDARD_OTP", "authContext": {"params": {"Key": "authLevel.level", "Value": 1}}}

        mock.SymantecUserServices.checkOtp.return_value = Mock()
        mock.checkOtp.return_value.hash.return_value = reply

        response = symantec_package.lib.userService.SymantecUserServices.checkOut("checkOutyou")
        self.assertTrue(response.hash() != reply)

        response = symantec_package.lib.userService.SymantecUserServices.checkOtp("PARAMS")

        self.assertTrue((response.hash()) == reply)

        self.assertTrue(response.hash()["status"] == "0000")
        self.assertTrue(response.hash()['requestId'] == "123")
        self.assertTrue(response.hash()['statusMessage'] == "Success")
        self.assertTrue(response.hash()['credentialId'] == "")
        self.assertTrue(response.hash()['credentialType'] == "STANDARD_OTP")
        self.assertTrue(response.hash()['authContext']['params']['Key'] == "authLevel.level")
        self.assertTrue(response.hash()['authContext']['params']['Value'] == 1)

        pass
项目:Securitas    作者:ArrenH    | 项目源码 | 文件源码
def test_poll_in_Push(self, mock):
        reply = {"requestId": "ac123", "status": "6040", "statusMessage": "Mobile push request sent",
                 "pushDetail": {"pushCredentialId": "133709001", "pushSent": True},
                 "transactionId": "RealTransactionId",
                 "authContext": {"params": {"Key": "authLevel.level", "Value": 10}}}

        mock.SymantecServices.authenticateUserWithPushThenPolling.return_value = Mock()
        mock.authenticateUserWithPushThenPolling.return_value.hash.return_value = reply

        response = symantec_package.lib.allServices.SymantecServices.authenticateUserWithNothing()
        self.assertTrue(response.hash() != reply)

        response = symantec_package.lib.allServices.SymantecServices.authenticateUserWithPushThenPolling("Parameters Here!")

        self.assertTrue((response.hash()) == reply)

        self.assertTrue(response.hash()["status"] == "6040")
        self.assertTrue(response.hash()['requestId'] == "ac123")
        self.assertTrue(response.hash()['statusMessage'] == "Mobile push request sent")
        self.assertTrue(response.hash()["pushDetail"]['pushCredentialId'] == "133709001")
        self.assertTrue(response.hash()["pushDetail"]['pushSent'] is True)
        self.assertTrue(response.hash()['transactionId'] == "RealTransactionId")
        self.assertTrue(response.hash()['authContext']['params']['Key'] == "authLevel.level")
        self.assertTrue(response.hash()['authContext']['params']['Value'] == 10)
        pass
项目:Securitas    作者:ArrenH    | 项目源码 | 文件源码
def test_mock_create_user(self, mock_managementservices):
        reply = {'requestId': 'create_123', 'status': '0000', 'statusMessage': 'Success'}
        # Configure the mock to return a response with an OK status code. Also, the mock should have
        # a `json()` method that returns a list of todos.
        mock_managementservices.createUser.return_value = Mock()
        mock_managementservices.createUser.return_value.json.return_value = reply

        # Call the service, which will send a request to the server.
        response = symantec_package.lib.managementService.SymantecManagementServices.createUser("create_123",
                                                                                                "new_user3")

        print(response.json())
        # If the request is sent successfully, then I expect a response to be returned.
        self.assertTrue((response.json()) == reply)
        self.assertTrue((response.json()["status"] == "0000"))
        pass
项目:Securitas    作者:ArrenH    | 项目源码 | 文件源码
def test_mock_add_STANDARDOTP_credential(self, mock_managementservices):
        reply = {'statusMessage': "Success", 'requestId': 'add_otp_cred', 'status': '0000'}

        # Configure the mock to return a response with an OK status code. Also, the mock should have
        # a `json()` method that returns a list of todos.
        mock_managementservices.addCredentialOtp.return_value = Mock()
        mock_managementservices.addCredentialOtp.return_value.json.return_value = reply

        response = symantec_package.lib.managementService.SymantecManagementServices.addCredentialOtp("add_otp_cred", "new_user3",
                                                                               "", "STANDARD_OTP", \
                                                                               "678066")  # change with what's on your device
        print(response.json())
        # If the request is sent successfully, then I expect a response to be returned.
        self.assertTrue((response.json()) == reply)
        self.assertTrue((response.json()["status"] == "0000"))
        pass
项目:Securitas    作者:ArrenH    | 项目源码 | 文件源码
def test_mock_update_STANDARDOTP_credential(self, mock_managementservices):
        reply = {'statusMessage': 'Success', 'requestId': 'update_123', 'status': '0000'}

        # Configure the mock to return a response with an OK status code. Also, the mock should have
        # a `json()` method that returns a list of todos.
        mock_managementservices.updateCredential.return_value = Mock()
        mock_managementservices.updateCredential.return_value.json.return_value = reply

        response = symantec_package.lib.managementService.SymantecManagementServices.updateCredential("update_123", "gabe_phone",
                                                                                                      "", "STANDARD_OTP",
                                                                                                      "My personal cell phone")
        print(response.json())
        # If the request is sent successfully, then I expect a response to be returned.
        self.assertTrue((response.json()) == reply)
        self.assertTrue((response.json()["status"] == "0000"))
        pass
项目:Securitas    作者:ArrenH    | 项目源码 | 文件源码
def test_mock_setTemporaryPasswordSMSDelivery(self, mock_managementservices):
        reply = {'status': '0000', 'requestId': 'setTempPWD', 'statusMessage': 'Success', 'temporaryPassword': '998241'}

        # Configure the mock to return a response with an OK status code. Also, the mock should have
        # a `json()` method that returns a list of todos.
        mock_managementservices.setTemporaryPasswordSMSDelivery.return_value = Mock()
        mock_managementservices.setTemporaryPasswordSMSDelivery.return_value.json.return_value = reply

        response = symantec_package.lib.managementService.SymantecManagementServices.setTemporaryPasswordSMSDelivery("setTempPWD",
                                                                                                      "gabe_phone",
                                                                                                      "12313608781",
                                                                                                      "17879481605")
        print(response.json())
        # If the request is sent successfully, then I expect a response to be returned.
        self.assertTrue((response.json()) == reply)
        self.assertTrue((response.json()["status"] == "0000"))
        pass
项目:libmozdata    作者:mozilla    | 项目源码 | 文件源码
def request_callback(self, request):
        logger.debug('Mock request {} {}'.format(request.method, request.url))
        path = self.build_path(request.method, request.url)

        if os.path.exists(path):
            # Load local file
            logger.info('Using mock file {}'.format(path))
            with gzip.open(path, 'rb') as f:
                response = pickle.load(f)
        else:
            # Build from actual request
            logger.info('Building mock file {}'.format(path))
            response = self.real_request(request)

            # Save in local file for future use
            with gzip.open(path, 'wb') as f:
                # Use old pickle ascii protocol (default)
                # to be compatible with Python 2
                f.write(pickle.dumps(response, protocol=2))

        return (
            response['status'],
            response['headers'],
            response['body'],
        )
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def test_mkdir_p(self, path_mocker_stopall):
        # mock
        mock_logger_info = path_mocker_stopall.MagicMock(name="mock_logger_info")
        mock_dir_exists = path_mocker_stopall.MagicMock(name="mock_dir_exists")
        mock_path = path_mocker_stopall.MagicMock(name="mock_path")
        # patch
        path_mocker_stopall.patch.object(scarlett_os.subprocess.logging.Logger, 'info', mock_logger_info)
        path_mocker_stopall.patch.object(scarlett_os.internal.path, 'dir_exists', mock_dir_exists)
        path_mocker_stopall.patch.object(scarlett_os.internal.path, 'Path', mock_path)

        path = '/home/pi/dev/bossjones-github/scarlett_os/_debug'

        mock_dir_exists.return_value = True

        # run test
        s_path.mkdir_p(path)

        # assert
        assert mock_logger_info.call_count == 1
        mock_path.assert_called_once_with(path)
        # from scarlett_os.internal.debugger import dump
        mock_path().mkdir.assert_any_call(parents=True, exist_ok=True)
        mock_logger_info.assert_any_call("Verify mkdir_p ran: {}".format(mock_dir_exists.return_value))
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def test_dir_exists_false(self, path_mocker_stopall):
        # mock
        mock_logger_error = path_mocker_stopall.MagicMock(name="mock_logger_error")
        mock_path = path_mocker_stopall.MagicMock(name="mock_path")
        # patch
        path_mocker_stopall.patch.object(scarlett_os.subprocess.logging.Logger, 'error', mock_logger_error)
        path_mocker_stopall.patch.object(scarlett_os.internal.path, 'Path', mock_path)

        path = '/home/pi/dev/bossjones-github/scarlett_os/_debug'

        mock_path_instance = mock_path()

        mock_path_instance.is_dir.return_value = False

        # run test
        s_path.dir_exists(path)

        # assert
        assert mock_logger_error.call_count == 1
        assert mock_path_instance.is_dir.call_count == 2
        mock_logger_error.assert_any_call("This is not a dir: {}".format(path))
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def test_dir_exists_true(self, path_mocker_stopall):
        # mock
        mock_logger_error = path_mocker_stopall.MagicMock(name="mock_logger_error")
        mock_path = path_mocker_stopall.MagicMock(name="mock_path")
        # patch
        path_mocker_stopall.patch.object(scarlett_os.subprocess.logging.Logger, 'error', mock_logger_error)
        path_mocker_stopall.patch.object(scarlett_os.internal.path, 'Path', mock_path)

        path = '/home/pi/dev/bossjones-github/scarlett_os/_debug'

        mock_path_instance = mock_path()
        #
        mock_path_instance.is_dir.return_value = True

        # run test
        s_path.dir_exists(path)

        # assert
        assert mock_logger_error.call_count == 0
        assert mock_path_instance.is_dir.call_count == 2
        mock_logger_error.assert_not_called()
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def test_mkdir_if_does_not_exist_false(self, path_mocker_stopall):
        # mock
        mock_mkdir_p = path_mocker_stopall.MagicMock(name="mock_mkdir_p")
        mock_dir_exists = path_mocker_stopall.MagicMock(name="mock_dir_exists", return_value=False)
        # patch
        path_mocker_stopall.patch.object(scarlett_os.internal.path, 'mkdir_p', mock_mkdir_p)
        path_mocker_stopall.patch.object(scarlett_os.internal.path, 'dir_exists', mock_dir_exists)

        path = '/home/pi/dev/bossjones-github/scarlett_os/_debug'

        # run test
        result = s_path.mkdir_if_does_not_exist(path)

        # assert
        assert mock_mkdir_p.call_count == 1
        assert mock_dir_exists.call_count == 1
        assert result == True
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def test_mkdir_if_does_not_exist_true(self, path_mocker_stopall):
        # mock
        mock_mkdir_p = path_mocker_stopall.MagicMock(name="mock_mkdir_p")
        mock_dir_exists = path_mocker_stopall.MagicMock(name="mock_dir_exists", return_value=True)
        # patch
        path_mocker_stopall.patch.object(scarlett_os.internal.path, 'mkdir_p', mock_mkdir_p)
        path_mocker_stopall.patch.object(scarlett_os.internal.path, 'dir_exists', mock_dir_exists)

        path = '/home/pi/dev/bossjones-github/scarlett_os/_debug'

        # run test
        result = s_path.mkdir_if_does_not_exist(path)

        # assert
        assert mock_mkdir_p.call_count == 0
        assert mock_dir_exists.call_count == 1
        assert result == False
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def test_fname_exists_false(self, path_mocker_stopall):
        # mock
        mock_path = path_mocker_stopall.MagicMock(name="mock_path")
        # patch
        path_mocker_stopall.patch.object(scarlett_os.internal.path, 'Path', mock_path)

        path = '/home/pi/dev/bossjones-github/scarlett_os/_debug/generator.dot'

        mock_path_instance = mock_path()

        mock_path_instance.exists.return_value = False

        # run test
        result = s_path.fname_exists(path)

        assert mock_path_instance.exists.call_count == 1
        mock_path_instance.exists.assert_called_once_with()
        mock_path.assert_any_call(path)
        assert result == False
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def test_dir_isWritable(self, path_mocker_stopall):
        # mock
        mock_os_access = path_mocker_stopall.MagicMock(name="mock_os_access")
        mock_os_path_isdir = path_mocker_stopall.MagicMock(name="mock_os_path_isdir")
        # patch
        path_mocker_stopall.patch.object(scarlett_os.internal.path.os, 'access', mock_os_access)
        path_mocker_stopall.patch.object(scarlett_os.internal.path.os.path, 'isdir', mock_os_path_isdir)

        path = 'file:///tmp'

        # patch return values
        mock_os_path_isdir.return_value = True
        mock_os_access.return_value = True

        # run test
        result = s_path.isWritable(path)

        # tests
        mock_os_path_isdir.assert_called_once_with('file:///tmp')
        mock_os_access.assert_called_once_with('file:///tmp', os.W_OK)
        assert result == True
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def test_unicode_decode_error_isWritable(self, path_mocker_stopall):

        # mock
        mock_os_path_isdir = path_mocker_stopall.MagicMock(name="mock_os_path_isdir")
        mock_os_access = path_mocker_stopall.MagicMock(name="mock_os_access")
        mock_unicode_error_dialog = path_mocker_stopall.MagicMock(name="mock_unicode_error_dialog")
        mock_logger_error = path_mocker_stopall.MagicMock(name="mock_logger_error")

        # patch
        path_mocker_stopall.patch.object(scarlett_os.internal.path.os.path, 'isdir', mock_os_path_isdir)
        path_mocker_stopall.patch.object(scarlett_os.internal.path.os, 'access', mock_os_access)
        path_mocker_stopall.patch.object(scarlett_os.internal.path, 'unicode_error_dialog', mock_unicode_error_dialog)
        path_mocker_stopall.patch.object(scarlett_os.subprocess.logging.Logger, 'error', mock_logger_error)


        path = b'file:///tmp/fake_file'

        # patch return values
        mock_os_path_isdir.side_effect = UnicodeDecodeError('', b'', 1, 0, '')
        s_path.isWritable(path)

        assert mock_unicode_error_dialog.call_count == 1
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def test_unicode_error_dialog(self, path_mocker_stopall):
        # mock
        mock_logger_error = path_mocker_stopall.MagicMock(name="mock_logger_error")
        # patch
        path_mocker_stopall.patch.object(scarlett_os.subprocess.logging.Logger, 'error', mock_logger_error)

        s_path.unicode_error_dialog()

        assert mock_logger_error.call_count == 1

        _message = _("The system's locale that you are using is not UTF-8 capable. "
                     "Unicode support is required for Python3 software like Pitivi. "
                     "Please correct your system settings; if you try to use Pitivi "
                     "with a broken locale, weird bugs will happen.")

        mock_logger_error.assert_any_call(_message)
项目:kafka-spark-influx-csv-analysis    作者:bwsw    | 项目源码 | 文件源码
def __get_result_set_from_mock(mock, query):
        result_wrapper = Mock()
        influx_points = []

        result = re.search(r"(\d{19}).*?(\d{19})", query)
        start, end = result.groups()

        # extract range
        points = list(filter(lambda point: point["time"] > int(start) and point["time"] < int(end),
                             mock.points))

        country_result = re.search(r"\"country\"=\'(\w+)\'", query)
        if country_result:
            country = country_result.groups()[0]
            points = list(filter(lambda point: point["tags"]["country"] == country, points))

        for point in points:
            d = {**point["fields"], **point.get("tags", {})}
            d["time"] = datetime.utcfromtimestamp(point["time"] // 1000000000).strftime('%Y-%m-%dT%H:%M:%SZ')
            influx_points.append(d)

        result_wrapper.get_points.return_value = influx_points
        return result_wrapper
项目:pynetbox    作者:digitalocean    | 项目源码 | 文件源码
def test_get_all(self):
        with patch(
            'pynetbox.lib.query.requests.get',
            return_value=Response(fixture='{}/{}.json'.format(
                self.app,
                self.name
            ))
        ) as mock:
            ret = getattr(nb, self.name).all()
            self.assertTrue(ret)
            self.assertTrue(isinstance(ret, list))
            self.assertTrue(isinstance(ret[0], self.ret))
            mock.assert_called_with(
                'http://localhost:8000/api/{}/{}/'.format(
                    self.app,
                    self.name.replace('_', '-')
                ),
                headers=HEADERS
            )
项目:pynetbox    作者:digitalocean    | 项目源码 | 文件源码
def test_filter(self):
        with patch(
            'pynetbox.lib.query.requests.get',
            return_value=Response(fixture='{}/{}.json'.format(
                self.app,
                self.name
            ))
        ) as mock:
            ret = getattr(nb, self.name).filter(pk=1)
            self.assertTrue(ret)
            self.assertTrue(isinstance(ret, list))
            self.assertTrue(isinstance(ret[0], self.ret))
            mock.assert_called_with(
                'http://localhost:8000/api/{}/{}/?pk=1'.format(
                    self.app,
                    self.name.replace('_', '-')
                ),
                headers=HEADERS
            )
项目:pynetbox    作者:digitalocean    | 项目源码 | 文件源码
def test_get(self):
        with patch(
            'pynetbox.lib.query.requests.get',
            return_value=Response(fixture='{}/{}.json'.format(
                self.app,
                self.name[:-1]
            ))
        ) as mock:
            ret = getattr(nb, self.name).get(1)
            self.assertTrue(ret)
            self.assertTrue(isinstance(ret, self.ret))
            self.assertTrue(isinstance(str(ret), str))
            self.assertTrue(isinstance(dict(ret), dict))
            mock.assert_called_with(
                'http://localhost:8000/api/{}/{}/1/'.format(
                    self.app,
                    self.name.replace('_', '-')
                ),
                headers=HEADERS
            )
项目:pynetbox    作者:digitalocean    | 项目源码 | 文件源码
def test_delete(self):
        with patch(
            'pynetbox.lib.query.requests.get',
            return_value=Response(fixture='{}/{}.json'.format(
                self.app,
                self.name[:-1]
            ))
        ) as mock, patch('pynetbox.lib.query.requests.delete') as delete:
            ret = getattr(nb, self.name).get(1)
            self.assertTrue(ret.delete())
            mock.assert_called_with(
                'http://localhost:8000/api/{}/{}/1/'.format(
                    self.app,
                    self.name.replace('_', '-')
                ),
                headers=HEADERS
            )
            delete.assert_called_with(
                'http://localhost:8000/api/{}/{}/1/'.format(
                    self.app,
                    self.name.replace('_', '-')
                ),
                headers=AUTH_HEADERS
            )
项目:pynetbox    作者:digitalocean    | 项目源码 | 文件源码
def test_create_device_bulk(self, mock):
        data = [
            {
                'name': 'test-device',
                'site': 1,
                'device_type': 1,
                'device_role': 1,
            },
            {
                'name': 'test-device1',
                'site': 1,
                'device_type': 1,
                'device_role': 1,
            },
        ]
        ret = nb.devices.create(data)
        self.assertTrue(ret)
        self.assertTrue(len(ret), 2)
项目:pynetbox    作者:digitalocean    | 项目源码 | 文件源码
def test_get_all(self):
        with patch(
            'pynetbox.lib.query.requests.get',
            return_value=Response(fixture='{}/{}.json'.format(
                self.app,
                self.name
            ))
        ) as mock:
            ret = getattr(nb, self.name).all()
            self.assertTrue(ret)
            self.assertTrue(isinstance(ret, list))
            self.assertTrue(isinstance(ret[0], self.ret))
            mock.assert_called_with(
                'http://localhost:8000/api/{}/{}/'.format(
                    self.app,
                    self.name.replace('_', '-')
                ),
                headers=HEADERS
            )
项目:pynetbox    作者:digitalocean    | 项目源码 | 文件源码
def test_get(self):
        with patch(
            'pynetbox.lib.query.requests.get',
            return_value=Response(fixture='{}/{}.json'.format(
                self.app,
                self.name[:-1]
            ))
        ) as mock:
            ret = getattr(nb, self.name).get(1)
            self.assertTrue(ret)
            self.assertTrue(isinstance(ret, self.ret))
            mock.assert_called_with(
                'http://localhost:8000/api/{}/{}/1/'.format(
                    self.app,
                    self.name.replace('_', '-')
                ),
                headers=HEADERS
            )
项目:pynetbox    作者:digitalocean    | 项目源码 | 文件源码
def test_get_all(self):
        with patch(
            'pynetbox.lib.query.requests.get',
            return_value=Response(fixture='{}/{}.json'.format(
                self.app,
                self.name
            ))
        ) as mock:
            ret = getattr(nb, self.name).all()
            self.assertTrue(ret)
            self.assertTrue(isinstance(ret, list))
            self.assertTrue(isinstance(ret[0], self.ret))
            mock.assert_called_with(
                'http://localhost:8000/api/{}/{}/'.format(
                    self.app,
                    self.name.replace('_', '-')
                ),
                headers=HEADERS
            )
项目:pynetbox    作者:digitalocean    | 项目源码 | 文件源码
def test_filter(self):
        with patch(
            'pynetbox.lib.query.requests.get',
            return_value=Response(fixture='{}/{}.json'.format(
                self.app,
                self.name
            ))
        ) as mock:
            ret = getattr(nb, self.name).filter(pk=1)
            self.assertTrue(ret)
            self.assertTrue(isinstance(ret, list))
            self.assertTrue(isinstance(ret[0], self.ret))
            mock.assert_called_with(
                'http://localhost:8000/api/{}/{}/?pk=1'.format(
                    self.app,
                    self.name.replace('_', '-')
                ),
                headers=HEADERS
            )
项目:pynetbox    作者:digitalocean    | 项目源码 | 文件源码
def test_get(self):
        with patch(
            'pynetbox.lib.query.requests.get',
            return_value=Response(fixture='{}/{}.json'.format(
                self.app,
                self.name[:-1]
            ))
        ) as mock:
            ret = getattr(nb, self.name).get(1)
            self.assertTrue(ret)
            self.assertTrue(isinstance(ret, self.ret))
            mock.assert_called_with(
                'http://localhost:8000/api/{}/{}/1/'.format(
                    self.app,
                    self.name.replace('_', '-')
                ),
                headers=HEADERS
            )
项目:pynetbox    作者:digitalocean    | 项目源码 | 文件源码
def test_get_all(self):
        with patch(
            'pynetbox.lib.query.requests.get',
            return_value=Response(fixture='{}/{}.json'.format(
                self.app,
                self.name
            ))
        ) as mock:
            ret = getattr(nb, self.name).all()
            self.assertTrue(ret)
            self.assertTrue(isinstance(ret, list))
            self.assertTrue(isinstance(ret[0], self.ret))
            mock.assert_called_with(
                'http://localhost:8000/api/{}/{}/'.format(
                    self.app,
                    self.name.replace('_', '-')
                ),
                headers=HEADERS
            )
项目:pynetbox    作者:digitalocean    | 项目源码 | 文件源码
def test_get(self):
        with patch(
            'pynetbox.lib.query.requests.get',
            return_value=Response(fixture='{}/{}.json'.format(
                self.app,
                self.name[:-1]
            ))
        ) as mock:
            ret = getattr(nb, self.name).get(1)
            self.assertTrue(ret)
            self.assertTrue(isinstance(ret, self.ret))
            mock.assert_called_with(
                'http://localhost:8000/api/{}/{}/1/'.format(
                    self.app,
                    self.name.replace('_', '-')
                ),
                headers=HEADERS
            )
项目:pynetbox    作者:digitalocean    | 项目源码 | 文件源码
def test_get_all(self):
        with patch(
            'pynetbox.lib.query.requests.get',
            return_value=Response(fixture='{}/{}.json'.format(
                self.app,
                self.name
            ))
        ) as mock:
            ret = getattr(nb, self.name).all()
            self.assertTrue(ret)
            self.assertTrue(isinstance(ret, list))
            self.assertTrue(isinstance(ret[0], self.ret))
            mock.assert_called_with(
                'http://localhost:8000/api/{}/{}/'.format(
                    self.app,
                    self.name.replace('_', '-')
                ),
                headers=HEADERS
            )
项目:pynetbox    作者:digitalocean    | 项目源码 | 文件源码
def test_filter(self):
        with patch(
            'pynetbox.lib.query.requests.get',
            return_value=Response(fixture='{}/{}.json'.format(
                self.app,
                self.name
            ))
        ) as mock:
            ret = getattr(nb, self.name).filter(pk=1)
            self.assertTrue(ret)
            self.assertTrue(isinstance(ret, list))
            self.assertTrue(isinstance(ret[0], self.ret))
            mock.assert_called_with(
                'http://localhost:8000/api/{}/{}/?pk=1'.format(
                    self.app,
                    self.name.replace('_', '-')
                ),
                headers=HEADERS
            )
项目:CatLaser2    作者:adafruit    | 项目源码 | 文件源码
def test_update_no_active_player_makes_next_active(self):
        # Setup, add some players to waiting list.
        laser_players = players.Players(1)
        laser_players.add_player('192.168.0.1')
        # Check none are yet active.
        self.assertEqual(None, laser_players.active_player())
        # Setup, create mock callbacks that will remember how they were called.
        start_active = unittest.mock.Mock()
        end_active = unittest.mock.Mock()
        # Call update with 1 second of time elapsed.
        laser_players.update(0.5, start_active, end_active)
        # Check start active was called with the first in line IP, and that the
        # currently active player is the first one added (with 1 second of playtime
        # left).
        start_active.assert_called_once_with('192.168.0.1')
        end_active.assert_not_called()
        self.assertEqual(('192.168.0.1', 1), laser_players.active_player())
项目:CatLaser2    作者:adafruit    | 项目源码 | 文件源码
def test_update_moves_to_next_after_playetime_elapsed(self):
        # Setup, add two players and make the first active.
        laser_players = players.Players(1)
        laser_players.add_player('192.168.0.1')
        laser_players.add_player('192.168.0.2')
        start_active = unittest.mock.Mock()
        end_active = unittest.mock.Mock()
        laser_players.update(1, start_active, end_active)
        start_active.reset_mock()
        end_active.reset_mock()
        # Update with two seconds of time so the first player time is up, then
        # check next player is active.
        laser_players.update(2, start_active, end_active)
        end_active.assert_called_once_with('192.168.0.1')
        start_active.assert_called_once_with('192.168.0.2')
        self.assertEqual(('192.168.0.2', 1), laser_players.active_player())
项目:ndr    作者:SecuredByTHEM    | 项目源码 | 文件源码
def test_uploading_syslog(self):
        '''Tests syslog upload command'''
        syslog_uploader_cli = ["syslog_uploader", "-c", self._ndr_config_file, TEST_SYSLOG_DATA]
        with unittest.mock.patch.object(sys, 'argv', syslog_uploader_cli):
            ndr.tools.syslog_uploader.main()

        # Make sure there's only one file in the queue
        outbound_queue = os.listdir(self._ncc.outgoing_upload_spool)
        self.assertEqual(len(outbound_queue), 1)
        this_msg = self._ncc.outgoing_upload_spool + "/" + outbound_queue[0]

        loaded_msg = ndr.IngestMessage.verify_and_load_message(
            self._ncc, this_msg, only_accept_cn="ndr_test_suite")
        os.remove(this_msg)

        self.assertEqual(loaded_msg.message_type, ndr.IngestMessageTypes.SYSLOG_UPLOAD)
        syslog = ndr.SyslogUploadMessage().from_message(loaded_msg)
项目:ndr    作者:SecuredByTHEM    | 项目源码 | 文件源码
def test_uploading_status(self):
        '''Tests uploading status messages'''
        status_uploader_cli = ["status_uploader", "-c", self._ndr_config_file]
        with unittest.mock.patch.object(sys, 'argv', status_uploader_cli):
            ndr.tools.status.main()

        # Make sure there's only one file in the queue
        outbound_queue = os.listdir(self._ncc.outgoing_upload_spool)
        self.assertEqual(len(outbound_queue), 1)
        this_msg = self._ncc.outgoing_upload_spool + "/" + outbound_queue[0]

        loaded_msg = ndr.IngestMessage.verify_and_load_message(
            self._ncc, this_msg, only_accept_cn="ndr_test_suite")
        os.remove(this_msg)

        self.assertEqual(loaded_msg.message_type, ndr.IngestMessageTypes.STATUS)
项目:ndr    作者:SecuredByTHEM    | 项目源码 | 文件源码
def test_alert_tester(self):
        '''Tests alert tester messages'''
        alert_tester_cli = ["alert_tester", "-c", self._ndr_config_file]
        with unittest.mock.patch.object(sys, 'argv', alert_tester_cli):
            ndr.tools.alert_tester.main()

        # Make sure there's only one file in the queue
        outbound_queue = os.listdir(self._ncc.outgoing_upload_spool)
        self.assertEqual(len(outbound_queue), 1)
        this_msg = self._ncc.outgoing_upload_spool + "/" + outbound_queue[0]

        loaded_msg = ndr.IngestMessage.verify_and_load_message(
            self._ncc, this_msg, only_accept_cn="ndr_test_suite")
        os.remove(this_msg)

        self.assertEqual(loaded_msg.message_type, ndr.IngestMessageTypes.TEST_ALERT)
项目:ndr    作者:SecuredByTHEM    | 项目源码 | 文件源码
def test_pcap_processing(self):
        '''Tests uploading and processing a pcap message'''
        alert_tester_cli = ["pcap_processor", "-k", "-c", self._ndr_config_file, TSHARK_PCAP]
        with unittest.mock.patch.object(sys, 'argv', alert_tester_cli):
            ndr.tools.pcap_to_traffic_report.main()

        # Make sure there's only one file in the queue
        outbound_queue = os.listdir(self._ncc.outgoing_upload_spool)
        self.assertEqual(len(outbound_queue), 1)
        this_msg = self._ncc.outgoing_upload_spool + "/" + outbound_queue[0]

        loaded_msg = ndr.IngestMessage.verify_and_load_message(
            self._ncc, this_msg, only_accept_cn="ndr_test_suite")
        os.remove(this_msg)

        self.assertEqual(loaded_msg.message_type, ndr.IngestMessageTypes.TRAFFIC_REPORT)
项目:fierce    作者:mschwager    | 项目源码 | 文件源码
def test_recursive_query_basic_failure(self):
        resolver = dns.resolver.Resolver()
        domain = dns.name.from_text('example.com.')
        record_type = 'NS'

        with unittest.mock.patch.object(fierce, 'query', return_value=None) as mock_method:
            result = fierce.recursive_query(resolver, domain, record_type=record_type)

        expected = [
            unittest.mock.call(resolver, 'example.com.', record_type),
            unittest.mock.call(resolver, 'com.', record_type),
            unittest.mock.call(resolver, '', record_type),
        ]

        mock_method.assert_has_calls(expected)
        self.assertIsNone(result)
项目:fierce    作者:mschwager    | 项目源码 | 文件源码
def test_recursive_query_long_domain_failure(self):
        resolver = dns.resolver.Resolver()
        domain = dns.name.from_text('sd1.sd2.example.com.')
        record_type = 'NS'

        with unittest.mock.patch.object(fierce, 'query', return_value=None) as mock_method:
            result = fierce.recursive_query(resolver, domain, record_type=record_type)

        expected = [
            unittest.mock.call(resolver, 'sd1.sd2.example.com.', record_type),
            unittest.mock.call(resolver, 'sd2.example.com.', record_type),
            unittest.mock.call(resolver, 'example.com.', record_type),
            unittest.mock.call(resolver, 'com.', record_type),
            unittest.mock.call(resolver, '', record_type),
        ]

        mock_method.assert_has_calls(expected)
        self.assertIsNone(result)
项目:fierce    作者:mschwager    | 项目源码 | 文件源码
def test_recursive_query_basic_success(self):
        resolver = dns.resolver.Resolver()
        domain = dns.name.from_text('example.com.')
        record_type = 'NS'
        good_response = unittest.mock.MagicMock()
        side_effect = [
            None,
            good_response,
            None,
        ]

        with unittest.mock.patch.object(fierce, 'query', side_effect=side_effect) as mock_method:
            result = fierce.recursive_query(resolver, domain, record_type=record_type)

        expected = [
            unittest.mock.call(resolver, 'example.com.', record_type),
            unittest.mock.call(resolver, 'com.', record_type),
        ]

        mock_method.assert_has_calls(expected)
        self.assertEqual(result, good_response)