Python vcr 模块,VCR 实例源码

我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用vcr.VCR

项目:goodreads-api-client-python    作者:mdzhang    | 项目源码 | 文件源码
def make_vcr():
    cassette_library_dir = os.path.join(os.path.dirname(__file__),
                                        'fixtures',
                                        'cassettes')
    return VCR(cassette_library_dir=cassette_library_dir,
               filter_query_parameters=[
                   'key',
                   'oauth_consumer_key',
                   'oauth_nonce',
                   'oauth_signature_method',
                   'oauth_timestamp',
                   'oauth_token',
                   'oauth_version',
                   'oauth_signature',
               ],
               record_mode='new_episodes')
项目:IGitt    作者:GitMateIO    | 项目源码 | 文件源码
def vcr(self):
        """
        Returns a new vcrpy instance.
        """
        cassettes_dir = join(dirname(getfile(self.__class__)), 'cassettes')
        kwargs = {
            'record_mode': getattr(self, 'vcrpy_record_mode', 'once'),
            'cassette_library_dir': cassettes_dir,
            'match_on': ['method', 'scheme', 'host', 'port', 'path', 'query'],
            'filter_query_parameters': FILTER_QUERY_PARAMS,
            'filter_post_data_parameters': FILTER_QUERY_PARAMS,
            'before_record_response': IGittTestCase.remove_link_headers,
            'filter_headers': ['Link'],
        }
        kwargs.update(self.vcr_options)
        return VCR(**kwargs)
项目:azure-cli    作者:Azure    | 项目源码 | 文件源码
def _post_recording_scrub(self):
        """ Perform post-recording cleanup on the YAML file that can't be accomplished with the
        VCR recording hooks. """
        src_path = self.cassette_path
        rg_name = getattr(self, 'resource_group', None)
        rg_original = getattr(self, 'resource_group_original', None)

        t = tempfile.NamedTemporaryFile('r+')
        with open(src_path, 'r') as f:
            for line in f:
                # scrub resource group names
                if rg_original and rg_name != rg_original:
                    line = line.replace(rg_name, rg_original)
                # omit bearer tokens
                if 'authorization:' not in line.lower():
                    t.write(line)
        t.seek(0)
        with open(src_path, 'w') as f:
            for line in t:
                f.write(line)
        t.close()

    # COMMAND METHODS
项目:azure-batch-cli-extensions    作者:Azure    | 项目源码 | 文件源码
def _post_recording_scrub(self):
        """ Perform post-recording cleanup on the YAML file that can't be accomplished with the
        VCR recording hooks. """
        src_path = self.cassette_path
        rg_name = getattr(self, 'resource_group', None)
        rg_original = getattr(self, 'resource_group_original', None)

        t = tempfile.NamedTemporaryFile('r+')
        with open(src_path, 'r') as f:
            for line in f:
                # scrub resource group names
                if rg_name != rg_original:
                    line = line.replace(rg_name, rg_original)
                # omit bearer tokens
                if 'authorization:' not in line.lower():
                    t.write(line)
        t.seek(0)
        with open(src_path, 'w') as f:
            for line in t:
                f.write(line)
        t.close()

    # COMMAND METHODS
项目:azure-python-devtools    作者:Azure    | 项目源码 | 文件源码
def __init__(self,  # pylint: disable=too-many-arguments
                 method_name, config_file=None, recording_dir=None, recording_name=None, recording_processors=None,
                 replay_processors=None, recording_patches=None, replay_patches=None):
        super(ReplayableTest, self).__init__(method_name)

        self.recording_processors = recording_processors or []
        self.replay_processors = replay_processors or []

        self.recording_patches = recording_patches or []
        self.replay_patches = replay_patches or []

        self.config = TestConfig(config_file=config_file)

        self.disable_recording = False

        test_file_path = inspect.getfile(self.__class__)
        recording_dir = recording_dir or os.path.join(os.path.dirname(test_file_path), 'recordings')
        self.is_live = self.config.record_mode

        self.vcr = vcr.VCR(
            cassette_library_dir=recording_dir,
            before_record_request=self._process_request_recording,
            before_record_response=self._process_response_recording,
            decode_compressed_response=True,
            record_mode='once' if not self.is_live else 'all',
            filter_headers=self.FILTER_HEADERS
        )
        self.vcr.register_matcher('query', self._custom_request_query_matcher)

        self.recording_file = os.path.join(
            recording_dir,
            '{}.yaml'.format(recording_name or method_name)
        )
        if self.is_live and os.path.exists(self.recording_file):
            os.remove(self.recording_file)

        self.in_recording = self.is_live or not os.path.exists(self.recording_file)
        self.test_resources_count = 0
        self.original_env = os.environ.copy()
项目:rakuten-ws    作者:alexandriagroup    | 项目源码 | 文件源码
def pytest_configure(config):
    # register the online marker
    config.addinivalue_line('markers',
                            'online: mark a test that goes online. VCR will automatically be used.')
项目:gitmate-2    作者:GitMateIO    | 项目源码 | 文件源码
def _get_vcr(self):
        testdir = os.path.dirname(inspect.getfile(self.__class__))
        cassettes_dir = os.path.join(testdir, 'cassettes')
        return vcr.VCR(
            record_mode=self.vcrpy_record_mode,
            cassette_library_dir=cassettes_dir,
            match_on=['method', 'scheme', 'host', 'port', 'path'],
            filter_query_parameters=FILTER_QUERY_PARAMS,
            filter_post_data_parameters=FILTER_QUERY_PARAMS,
            before_record_response=GitmateTestCase.remove_link_headers)
项目:k8s    作者:fiaas    | 项目源码 | 文件源码
def get_vcr(filepath):
    return vcr.VCR(
        path_transformer=vcr.VCR.ensure_suffix('.yaml'),
        match_on=("method", "scheme", "host", "port", "path", "query", "body"),
        cassette_library_dir=os.path.splitext(filepath)[0]
    )
项目:azure-cli    作者:Azure    | 项目源码 | 文件源码
def __init__(self, test_file, test_name, run_live=False, debug=False, debug_vcr=False,
                 skip_setup=False, skip_teardown=False):
        super(VCRTestBase, self).__init__(test_name)
        self.test_name = test_name
        self.recording_dir = find_recording_dir(test_file)
        self.cassette_path = os.path.join(self.recording_dir, '{}.yaml'.format(test_name))
        self.playback = os.path.isfile(self.cassette_path)

        if os.environ.get(ENV_LIVE_TEST, None) == 'True':
            self.run_live = True
        else:
            self.run_live = run_live

        self.skip_setup = skip_setup
        self.skip_teardown = skip_teardown
        self.success = False
        self.exception = None
        self.track_commands = os.environ.get(COMMAND_COVERAGE_CONTROL_ENV, None)
        self._debug = debug

        if not self.playback and ('--buffer' in sys.argv) and not run_live:
            self.exception = CLIError('No recorded result provided for {}.'.format(self.test_name))

        if debug_vcr:
            logging.basicConfig()
            vcr_log = logging.getLogger('vcr')
            vcr_log.setLevel(logging.INFO)
        self.my_vcr = vcr.VCR(
            cassette_library_dir=self.recording_dir,
            before_record_request=self._before_record_request,
            before_record_response=self._before_record_response,
            decode_compressed_response=True
        )
        self.my_vcr.register_matcher('custom', _custom_request_matcher)
        self.my_vcr.match_on = ['custom']
项目:knack    作者:Microsoft    | 项目源码 | 文件源码
def __init__(self, cli, method_name, filter_headers=None):
        super(ScenarioTest, self).__init__(cli, method_name)
        self.name_replacer = GeneralNameReplacer()
        self.recording_processors = [LargeRequestBodyProcessor(),
                                     LargeResponseBodyProcessor(),
                                     self.name_replacer]
        self.replay_processors = [LargeResponseBodyReplacer()]
        self.filter_headers = filter_headers or []

        test_file_path = inspect.getfile(self.__class__)
        recordings_dir = find_recording_dir(test_file_path)
        live_test = os.environ.get(ENV_LIVE_TEST, None) == 'True'

        self.vcr = vcr.VCR(
            cassette_library_dir=recordings_dir,
            before_record_request=self._process_request_recording,
            before_record_response=self._process_response_recording,
            decode_compressed_response=True,
            record_mode='once' if not live_test else 'all',
            filter_headers=self.filter_headers
        )
        self.vcr.register_matcher('query', self._custom_request_query_matcher)

        self.recording_file = os.path.join(recordings_dir, '{}.yaml'.format(method_name))
        if live_test and os.path.exists(self.recording_file):
            os.remove(self.recording_file)

        self.in_recording = live_test or not os.path.exists(self.recording_file)
        self.test_resources_count = 0
        self.original_env = os.environ.copy()
项目:azure-batch-cli-extensions    作者:Azure    | 项目源码 | 文件源码
def __init__(self, test_file, test_name, run_live=False, debug=False, debug_vcr=False,
                 skip_setup=False, skip_teardown=False):
        super(VCRTestBase, self).__init__(test_name)
        self.test_name = test_name
        self.recording_dir = os.path.join(os.path.dirname(test_file), 'recordings')
        self.cassette_path = os.path.join(self.recording_dir, '{}.yaml'.format(test_name))
        self.playback = os.path.isfile(self.cassette_path)

        if os.environ.get(LIVE_TEST_CONTROL_ENV, None) == 'True':
            self.run_live = True
        else:
            self.run_live = run_live

        self.skip_setup = skip_setup
        self.skip_teardown = skip_teardown
        self.success = False
        self.exception = None
        self.track_commands = os.environ.get(COMMAND_COVERAGE_CONTROL_ENV, None)
        self._debug = debug

        if not self.playback and ('--buffer' in sys.argv) and not run_live:
            self.exception = CLIError('No recorded result provided for {}.'.format(self.test_name))

        if debug_vcr:
            import logging
            logging.basicConfig()
            vcr_log = logging.getLogger('vcr')
            vcr_log.setLevel(logging.INFO)
        self.my_vcr = vcr.VCR(
            cassette_library_dir=self.recording_dir,
            before_record_request=self._before_record_request,
            before_record_response=self._before_record_response,
            decode_compressed_response=True
        )
        self.my_vcr.register_matcher('custom', _custom_request_matcher)
        self.my_vcr.match_on = ['custom']
项目:foocenter    作者:jctanner    | 项目源码 | 文件源码
def record_requests(request):
    # vcr interceptor

    global REQUESTS
    global REQUESTID

    if type(request) == dict:
        thistype = 'response'
    else:
        REQUESTID += 1
        thistype = 'request'

    if thistype == 'request':
        ydata = {}
        ydata['request'] = {}
        ydata['request']['method'] = request.method
        ydata['request']['uri'] = request.uri
        if request.headers:
            ydata['request']['headers'] = {}
            for x in request.headers.items():
                ydata['request']['headers'][x[0]] = x[1]
        else:
            ydata['request']['headers'] = request.headers
        ydata['request']['body'] = literal(pretty_xml(request.body))
        REQUESTS.append(ydata)
    else:
        ydata = REQUESTS[-1]
        ydata['response'] = request.copy()
        REQUESTS[-1]['response'] = request.copy()
        REQUESTS[-1]['response']['body'] = literal(pretty_xml(request['body']['string']))
        fid = str(REQUESTID).rjust(4, '0')
        fname = os.path.join('/tmp', 'fixtures', '%s.yml' % (fid))
        with open(fname, 'wb') as f:
            yaml.dump(REQUESTS[-1], f, width=1000, indent=2)

    return request

# Make a custom VCR instance so we can inject our spyware into it
项目:user-sync.py    作者:adobe-apiplatform    | 项目源码 | 文件源码
def _run(self, args, working_dir, output_filename, request_json_builder = None, live_request_json_builder = None):
        '''
        Runs the test given the command line arguments, and the output filename.
        :type args: list(str) representing the components of the command line argument.
        :c output_filename: str represents the filename of the file to write the command line output to.
        '''
        self.logger.info("%s" % (' '.join(args)))
        self._reset_output_file(output_filename)

        def match_um_requests_on(r1, r2):
            '''
            Custom uri matcher for use with vcrpy. Basically it provides custom matching for user and action UM
            requests, which ignores the org ID portion of the request. Otherwise, the request uri must match exactly.
            :param r1: 
            :param r2: 
            :return: 
            '''
            if re.match(self.test_suite_config['users_request_matcher'], r1.uri):
                if re.match(self.test_suite_config['users_request_matcher'], r2.uri):
                    return True

            if re.match(self.test_suite_config['actions_request_matcher'], r2.uri):
                if re.match(self.test_suite_config['actions_request_matcher'], r2.uri):
                    return True

            return r1.uri == r2.uri

        record_mode = 'all' if self.server_config['live_mode'] else 'none'
        mock_spec = 'proxy' if self.server_config['live_mode'] else 'playback'
        recorder = vcr.VCR(
            record_mode=record_mode,
            match_on=['um_request'],
            # match_on=match_um_requests_on,
            decode_compressed_response=True,
            filter_headers=['authorization']
        )
        recorder.register_matcher('um_request',match_um_requests_on)

        with recorder.use_cassette(self.server_config['cassette_filename']) as cassette:
            service = server.TestService(self.server_config, cassette, request_json_builder)
            service.run()

            with open(output_filename, 'w') as output_file:
                subprocess.call(args,
                                cwd=working_dir,
                                env=dict(os.environ, UMAPI_MOCK=mock_spec),
                                shell=IS_NT_PLATFORM,
                                stdin=None,
                                stdout=output_file,
                                stderr=output_file)
                # p = subprocess.Popen(args, cwd=working_dir, stdout=output_file, stderr=output_file)
                # output_bytes = subprocess.check_output(cmd, cwd=working_dir, shell=True)
                # output_file.write(output_bytes.decode())
                # p.communicate()

            service.stop()

            if live_request_json_builder:
                for stored_request, stored_response in cassette.data:
                    if stored_request.body:
                        live_request_json_builder.extend_with_json_string(stored_request.body)
项目:rakuten-ws    作者:alexandriagroup    | 项目源码 | 文件源码
def use_vcr(request, monkeypatch):
    """
    This fixture is applied automatically to any test using the `online` mark. It will record and playback network
    sessions using VCR.
    The record mode of VCR can be set using the VCR_RECORD_MODE environment variable when running tests.
    """
    if VCR_RECORD_MODE == 'off':
        yield None
    else:
        path_segments = [VCR_CASSETTE_DIR]
        if request.module is not None:
            path_segments.extend(request.module.__name__.split('tests.')[-1].split('.'))
        if request.cls is not None:
            path_segments.append(request.cls.__name__)

        # Take into account the parametrization set by pytest
        # to create many tests from a single function with many parameters.
        request_keywords = request.keywords.keys()
        if 'parametrize' in request_keywords:
            try:
                param_name = [x for x in request_keywords if x.startswith('_with_')][0]
            except IndexError:
                param_name = ''
        else:
            param_name = ''

        path_segments.append(request.function.__name__ + param_name + '.yaml')

        cassette_path = os.path.join(*path_segments)

        filter_query = [('applicationId', 'XXXXXX')]
        filter_headers = [('Authorization', 'ESA XXXXXX')]
        filter_post = []

        online = True
        if vcr.record_mode == 'none':
            online = False
        elif vcr.record_mode == 'once':
            online = not os.path.exists(cassette_path)
        # If we are going online, check the credentials
        if online:
            if os.environ.get("RAKUTEN_APP_ID", None) is None:
                pytest.skip('need credentials to run this test')

        with vcr.use_cassette(path=cassette_path,
                              filter_query_parameters=filter_query,
                              filter_headers=filter_headers,
                              filter_post_data_parameters=filter_post,
                              decode_compressed_response=True) as cassette:
            yield cassette