Python requests.structures 模块,CaseInsensitiveDict() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.structures.CaseInsensitiveDict()

项目:eater    作者:alexhayes    | 项目源码 | 文件源码
def test_data_error_raised():
    class Person(Model):
        name = StringType(required=True, min_length=4)

    class PersonAPI(HTTPEater):
        request_cls = Person
        response_cls = Person
        url = 'http://example.com/person'

    api = PersonAPI(name='John')

    with pytest.raises(DataError):
        with requests_mock.Mocker() as mock:
            mock.get(
                api.url,
                json={'name': 'Joh'},
                headers=CaseInsensitiveDict({
                    'Content-Type': 'application/json'
                })
            )
            api()
项目:eater    作者:alexhayes    | 项目源码 | 文件源码
def test_non_json_content_response():
    class GetPersonAPI(HTTPEater):
        request_cls = Model
        response_cls = Model
        url = 'http://example.com/'

    api = GetPersonAPI()

    with requests_mock.Mocker() as mock:
        mock.get(
            'http://example.com/',
            text='Hello world',
            headers=CaseInsensitiveDict({
                'Content-Type': 'text/plain'
            })
        )
        with pytest.raises(NotImplementedError):
            api()
项目:jumpserver-python-sdk    作者:jumpserver    | 项目源码 | 文件源码
def __init__(self, url, method='get', data=None, params=None, headers=None,
                 content_type='application/json', app_name=''):
        self.url = url
        self.method = method
        self.params = params or {}
        self.result = None

        if not isinstance(headers, dict):
            headers = {}
        self.headers = CaseInsensitiveDict(headers)

        self.headers['Content-Type'] = content_type
        if data is None:
            data = {}
        self.data = json.dumps(data)

        if 'User-Agent' not in self.headers:
            if app_name:
                self.headers['User-Agent'] = _USER_AGENT + '/' + app_name
            else:
                self.headers['User-Agent'] = _USER_AGENT
项目:rets    作者:opendoor-labs    | 项目源码 | 文件源码
def test_guess_mime_type():
    # Can guess from URL extension
    assert 'image/jpeg' == _guess_mime_type(CaseInsensitiveDict({'Location': 'http://cdn.rets.com/1.jpg'}))
    assert 'image/png' == _guess_mime_type(CaseInsensitiveDict({'Location': 'http://cdn.rets.com/1.png'}))
    assert 'application/pdf' == _guess_mime_type(CaseInsensitiveDict({'Location': 'http://cdn.rets.com/1.pdf'}))

    # Can guess from content type if extension is missing
    assert 'image/jpeg' == _guess_mime_type(CaseInsensitiveDict({
        'Location': 'http://cdn.rets.com/1',
        'Content-Type': 'image/jpeg',
    }))

    # Can guess from content type
    assert 'image/jpeg' == _guess_mime_type(CaseInsensitiveDict({'Content-Type': 'image/jpeg'}))
    assert 'image/jpeg' == _guess_mime_type(CaseInsensitiveDict({'Content-Type': 'image/jpeg;charset=US-ASCII'}))

    assert None == _guess_mime_type(CaseInsensitiveDict({'Content-Type': ''}))
    assert None == _guess_mime_type(CaseInsensitiveDict())
项目:msgen    作者:MicrosoftGenomics    | 项目源码 | 文件源码
def handle_response(cls, response_code, response_data):
        """
        Handles service response.
        Returns the message, status or workflow id, if applicable
        """

        client_error = response_code >= 400 and response_code < 500
        success = response_code >= 200 and response_code < 300

        workflow = CaseInsensitiveDict({"Message":"", "Id": 0, "Status": 0})

        if client_error or success:
            try:
                response_content = json.loads(response_data)
                workflow.update(response_content)
            except ValueError:
                workflow["Message"] = "Invalid response content"
        else:
            workflow["Message"] = "{0}: Internal server error".format(response_code)

        return workflow
项目:deb-python-requests-kerberos    作者:openstack    | 项目源码 | 文件源码
def __init__(self, response):
        super(SanitizedResponse, self).__init__()
        self.status_code = response.status_code
        self.encoding = response.encoding
        self.raw = response.raw
        self.reason = response.reason
        self.url = response.url
        self.request = response.request
        self.connection = response.connection
        self._content_consumed = True

        self._content = ""
        self.cookies = cookiejar_from_dict({})
        self.headers = CaseInsensitiveDict()
        self.headers['content-length'] = '0'
        for header in ('date', 'server'):
            if header in response.headers:
                self.headers[header] = response.headers[header]
项目:plumeria    作者:sk89q    | 项目源码 | 文件源码
def __init__(self, flow_expiration=60 * 15, flow_cache_size=1000):
        """
        Create a new access manager for managing authorizations.

        Parameters
        ----------
        flow_expiration: int
            The number of seconds to keep pending authorization flows in memory
        flow_cache_size: int
            The maximum number of pending authorization flows to keep in memory at a time

        """
        self.store = None
        self.endpoints = CaseInsensitiveDict()
        self.state_cache = TTLCache(maxsize=flow_cache_size,
                                    ttl=flow_expiration)  # type: Dict[Tuple[str, str], Tuple[Endpoint, str, str]]
        self.random = random.SystemRandom()
        self.redirect_uri = "http://localhost/oauth2/callback/"
项目:TornadoWeb    作者:VxCoder    | 项目源码 | 文件源码
def __init__(self, method, url,
                 data=None,
                 params=None,
                 headers=None,
                 app_name=''):
        self.method = method
        self.url = url
        self.data = _convert_request_body(data)
        self.params = params or {}

        if not isinstance(headers, CaseInsensitiveDict):
            self.headers = CaseInsensitiveDict(headers)
        else:
            self.headers = headers

        # tell requests not to add 'Accept-Encoding: gzip, deflate' by default
        if 'Accept-Encoding' not in self.headers:
            self.headers['Accept-Encoding'] = None

        if 'User-Agent' not in self.headers:
            if app_name:
                self.headers['User-Agent'] = _USER_AGENT + '/' + app_name
            else:
                self.headers['User-Agent'] = _USER_AGENT
项目:ws-backend-community    作者:lavalamp-    | 项目源码 | 文件源码
def __deserialize_requests_response(self, to_deserialize):
        """
        Create and return a Response based on the contents of the given dictionary.
        :param to_deserialize: The dictionary to create the Response from.
        :return: A Response created by the contents of to_deserialize.
        """
        to_return = Response()
        to_return.status_code = to_deserialize["status_code"]
        to_return.headers = CaseInsensitiveDict(to_deserialize["headers"])
        to_return.encoding = to_deserialize["encoding"]
        to_return._content = to_deserialize["content"]
        to_return._content_consumed = True
        to_return.reason = to_deserialize["reason"]
        to_return.url = to_deserialize["url"]
        to_return.request = self.decode(to_deserialize["request"])
        return to_return
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def prepare_response(self, cached):
        """Verify our vary headers match and construct a real urllib3
        HTTPResponse object.
        """
        # Special case the '*' Vary value as it means we cannot actually
        # determine if the cached response is suitable for this request.
        if "*" in cached.get("vary", {}):
            return

        body_raw = cached["response"].pop("body")

        headers = CaseInsensitiveDict(data=cached['response']['headers'])
        if headers.get('transfer-encoding', '') == 'chunked':
            headers.pop('transfer-encoding')

        cached['response']['headers'] = headers

        try:
            body = io.BytesIO(body_raw)
        except TypeError:
            # This can happen if cachecontrol serialized to v1 format (pickle)
            # using Python 2. A Python 2 str(byte string) will be unpickled as
            # a Python 3 str (unicode string), which will cause the above to
            # fail with:
            #
            #     TypeError: 'str' does not support the buffer interface
            body = io.BytesIO(body_raw.encode('utf8'))

        return HTTPResponse(
            body=body,
            preload_content=False,
            **cached["response"]
        )
项目:trip    作者:littlecodersh    | 项目源码 | 文件源码
def prepare_request(self, request):
        cookies = request.cookies or {}

        # Bootstrap CookieJar.
        if not isinstance(cookies, cookielib.CookieJar):
            cookies = cookiejar_from_dict(cookies)

        # Merge with session cookies
        merged_cookies = merge_cookies(
            merge_cookies(RequestsCookieJar(), self.cookies), cookies)

        # Set environment's basic authentication if not explicitly set.
        # auth = request.auth
        # if self.trust_env and not auth and not self.auth:
        #     auth = get_netrc_auth(request.url)

        p = PreparedRequest()
        p.prepare(
            method=request.method.upper(),
            url=request.url,
            files=request.files,
            data=request.data,
            json=request.json,
            headers=merge_setting(request.headers,
                self.headers, dict_class=CaseInsensitiveDict),
            params=merge_setting(request.params, self.params),
            auth=merge_setting(request.auth, self.auth),
            cookies=merged_cookies,
            hooks=merge_hooks(request.hooks, self.hooks),
        )
        return p
项目:trip    作者:littlecodersh    | 项目源码 | 文件源码
def default_headers():
    """
    :rtype: requests.structures.CaseInsensitiveDict
    """
    return CaseInsensitiveDict({
        'User-Agent': default_user_agent(),
        'Accept-Encoding': ', '.join(('gzip', 'deflate')),
        'Accept': '*/*',
        # 'Connection': 'keep-alive',
        'Connection': 'close',
    })
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def _parse_items(self):
        """Parse `args.items` into `args.headers`, `args.data`, `args.params`,
         and `args.files`.

        """
        self.args.headers = CaseInsensitiveDict()
        self.args.data = ParamDict() if self.args.form else OrderedDict()
        self.args.files = OrderedDict()
        self.args.params = ParamDict()

        try:
            parse_items(items=self.args.items,
                        headers=self.args.headers,
                        data=self.args.data,
                        files=self.args.files,
                        params=self.args.params)
        except ParseError as e:
            if self.args.traceback:
                raise
            self.error(e.message)

        if self.args.files and not self.args.form:
            # `http url @/path/to/file`
            file_fields = list(self.args.files.keys())
            if file_fields != ['']:
                self.error(
                    'Invalid file fields (perhaps you meant --form?): %s'
                    % ','.join(file_fields))

            fn, fd = self.args.files['']
            self.args.files = {}

            self._body_from_file(fd)

            if 'Content-Type' not in self.args.headers:
                mime, encoding = mimetypes.guess_type(fn, strict=False)
                if mime:
                    content_type = mime
                    if encoding:
                        content_type = '%s; charset=%s' % (mime, encoding)
                    self.args.headers['Content-Type'] = content_type
项目:tfs    作者:devopshq    | 项目源码 | 文件源码
def __init__(self, data=None, tfs=None):
        super().__init__(data, tfs)
        self._fields = CaseInsensitiveDict(self._data['fields'])
        self._system_prefix = 'System.'
        self.id = self._data['id']
项目:flickr_downloader    作者:Denisolt    | 项目源码 | 文件源码
def __init__(self, content, encoding):
        self.encoding = encoding
        headers = {}
        # Split into header section (if any) and the content
        if b'\r\n\r\n' in content:
            first, self.content = _split_on_find(content, b'\r\n\r\n')
            if first != b'':
                headers = _header_parser(first.lstrip(), encoding)
        else:
            raise ImproperBodyPartContentException(
                'content does not contain CR-LF-CR-LF'
            )
        self.headers = CaseInsensitiveDict(headers)
项目:Url    作者:beiruan    | 项目源码 | 文件源码
def __init__(self, status_code=None, url=None, orig_url=None, headers=CaseInsensitiveDict(),
                 content='', cookies=None, error=None, traceback=None, save=None, js_script_result=None, time=0):
        if cookies is None:
            cookies = {}
        self.status_code = status_code
        self.url = url
        self.orig_url = orig_url
        self.headers = headers
        self.content = content
        self.cookies = cookies
        self.error = error
        self.traceback = traceback
        self.save = save
        self.js_script_result = js_script_result
        self.time = time
项目:Url    作者:beiruan    | 项目源码 | 文件源码
def rebuild_response(r):
    response = Response(
        status_code=r.get('status_code', 599),
        url=r.get('url', ''),
        headers=CaseInsensitiveDict(r.get('headers', {})),
        content=r.get('content', ''),
        cookies=r.get('cookies', {}),
        error=r.get('error'),
        traceback=r.get('traceback'),
        time=r.get('time', 0),
        orig_url=r.get('orig_url', r.get('url', '')),
        js_script_result=r.get('js_script_result'),
        save=r.get('save'),
    )
    return response
项目:Liljimbo-Chatbot    作者:chrisjim316    | 项目源码 | 文件源码
def __init__(self, content, encoding):
        self.encoding = encoding
        headers = {}
        # Split into header section (if any) and the content
        if b'\r\n\r\n' in content:
            first, self.content = _split_on_find(content, b'\r\n\r\n')
            if first != b'':
                headers = _header_parser(first.lstrip(), encoding)
        else:
            raise ImproperBodyPartContentException(
                'content does not contain CR-LF-CR-LF'
            )
        self.headers = CaseInsensitiveDict(headers)
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_docstring_example(self):
        cid = CaseInsensitiveDict()
        cid['Accept'] = 'application/json'
        assert cid['aCCEPT'] == 'application/json'
        assert list(cid) == ['Accept']
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_len(self):
        cid = CaseInsensitiveDict({'a': 'a', 'b': 'b'})
        cid['A'] = 'a'
        assert len(cid) == 2
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_getitem(self):
        cid = CaseInsensitiveDict({'Spam': 'blueval'})
        assert cid['spam'] == 'blueval'
        assert cid['SPAM'] == 'blueval'
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_fixes_649(self):
        """__setitem__ should behave case-insensitively."""
        cid = CaseInsensitiveDict()
        cid['spam'] = 'oneval'
        cid['Spam'] = 'twoval'
        cid['sPAM'] = 'redval'
        cid['SPAM'] = 'blueval'
        assert cid['spam'] == 'blueval'
        assert cid['SPAM'] == 'blueval'
        assert list(cid.keys()) == ['SPAM']
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_delitem(self):
        cid = CaseInsensitiveDict()
        cid['Spam'] = 'someval'
        del cid['sPam']
        assert 'spam' not in cid
        assert len(cid) == 0
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_get(self):
        cid = CaseInsensitiveDict()
        cid['spam'] = 'oneval'
        cid['SPAM'] = 'blueval'
        assert cid.get('spam') == 'blueval'
        assert cid.get('SPAM') == 'blueval'
        assert cid.get('sPam') == 'blueval'
        assert cid.get('notspam', 'default') == 'default'
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_update(self):
        cid = CaseInsensitiveDict()
        cid['spam'] = 'blueval'
        cid.update({'sPam': 'notblueval'})
        assert cid['spam'] == 'notblueval'
        cid = CaseInsensitiveDict({'Foo': 'foo', 'BAr': 'bar'})
        cid.update({'fOO': 'anotherfoo', 'bAR': 'anotherbar'})
        assert len(cid) == 2
        assert cid['foo'] == 'anotherfoo'
        assert cid['bar'] == 'anotherbar'
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_update_retains_unchanged(self):
        cid = CaseInsensitiveDict({'foo': 'foo', 'bar': 'bar'})
        cid.update({'foo': 'newfoo'})
        assert cid['bar'] == 'bar'
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_iter(self):
        cid = CaseInsensitiveDict({'Spam': 'spam', 'Eggs': 'eggs'})
        keys = frozenset(['Spam', 'Eggs'])
        assert frozenset(iter(cid)) == keys
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_equality(self):
        cid = CaseInsensitiveDict({'SPAM': 'blueval', 'Eggs': 'redval'})
        othercid = CaseInsensitiveDict({'spam': 'blueval', 'eggs': 'redval'})
        assert cid == othercid
        del othercid['spam']
        assert cid != othercid
        assert cid == {'spam': 'blueval', 'eggs': 'redval'}
        assert cid != object()
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_lower_items(self):
        cid = CaseInsensitiveDict({
            'Accept': 'application/json',
            'user-Agent': 'requests',
        })
        keyset = frozenset(lowerkey for lowerkey, v in cid.lower_items())
        lowerkeyset = frozenset(['accept', 'user-agent'])
        assert keyset == lowerkeyset
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_preserve_key_case(self):
        cid = CaseInsensitiveDict({
            'Accept': 'application/json',
            'user-Agent': 'requests',
        })
        keyset = frozenset(['Accept', 'user-Agent'])
        assert frozenset(i[0] for i in cid.items()) == keyset
        assert frozenset(cid.keys()) == keyset
        assert frozenset(cid) == keyset
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_preserve_last_key_case(self):
        cid = CaseInsensitiveDict({
            'Accept': 'application/json',
            'user-Agent': 'requests',
        })
        cid.update({'ACCEPT': 'application/json'})
        cid['USER-AGENT'] = 'requests'
        keyset = frozenset(['ACCEPT', 'USER-AGENT'])
        assert frozenset(i[0] for i in cid.items()) == keyset
        assert frozenset(cid.keys()) == keyset
        assert frozenset(cid) == keyset
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def test_copy(self):
        cid = CaseInsensitiveDict({
            'Accept': 'application/json',
            'user-Agent': 'requests',
        })
        cid_copy = cid.copy()
        assert cid == cid_copy
        cid['changed'] = True
        assert cid != cid_copy
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def build_response(self):
        request = self.calls[-1].args[0]
        r = requests.Response()

        try:
            r.status_code = int(self.redirects.pop(0))
        except IndexError:
            r.status_code = 200

        r.headers = CaseInsensitiveDict({'Location': '/'})
        r.raw = self._build_raw()
        r.request = request
        return r
项目:http-observatory    作者:mozilla    | 项目源码 | 文件源码
def parse_http_equiv_headers(html: str) -> CaseInsensitiveDict:
    http_equiv_headers = CaseInsensitiveDict()

    # Try to parse the HTML
    try:
        soup = bs(html, 'html.parser')
    except:
        return http_equiv_headers

    # Find all the meta tags
    metas = soup.find_all('meta')

    for meta in metas:
        if meta.has_attr('http-equiv') and meta.has_attr('content'):
            # Add support for multiple CSP policies specified via http-equiv
            # See issue: https://github.com/mozilla/http-observatory/issues/266
            # Note that this is so far only done for CSP and not for other types
            # of http-equiv
            if (meta.get('http-equiv', '').lower().strip() == 'content-security-policy' and
               'Content-Security-Policy' in http_equiv_headers):
                http_equiv_headers['Content-Security-Policy'] += '; ' + meta.get('content')
            else:
                http_equiv_headers[meta.get('http-equiv')] = meta.get('content')

        # Technically not HTTP Equiv, but I'm treating it that way
        elif meta.get('name', '').lower().strip() == 'referrer' and meta.has_attr('content'):
            http_equiv_headers['Referrer-Policy'] = meta.get('content')

    return http_equiv_headers
项目:lti    作者:pylti    | 项目源码 | 文件源码
def __init__(self, consumer_key=None, consumer_secret=None, params=None,
                 launch_url=None, launch_headers=None):

        super(ToolProvider, self).__init__(consumer_key, consumer_secret,
                                           params=params)
        self.outcome_requests = []
        self._last_outcome_request = None
        self.launch_url = launch_url
        self.launch_headers = launch_headers or CaseInsensitiveDict()
        if 'Content-Type' not in self.launch_headers:
            self.launch_headers['Content-Type'] = CONTENT_TYPE_FORM_URLENCODED
项目:OldSpeak    作者:0rbitAeolian    | 项目源码 | 文件源码
def __init__(
            self,
            headers,
            data=None,
            status_code=None,
            oauth_token=None,
            response=None):
        self.headers = CaseInsensitiveDict(headers)
        self.data = data
        self.status_code = status_code
        self.oauth_token = oauth_token
        self.response = response
        self.cookies = OrderedDict(response.cookies.items())
        self.logger = get_logger(__name__)
项目:OldSpeak    作者:0rbitAeolian    | 项目源码 | 文件源码
def make_headers(self, headers=None, cookies=None):
        headers = CaseInsensitiveDict(
            isinstance(headers, dict) and headers or {})
        if 'Content-Type' not in headers:
            headers['Content-Type'] = 'application/json'

        if self.oauth_token:
            headers['Authorization'] = 'oauth2: {}'.format(self.oauth_token)

        return dict(headers)
项目:fleece    作者:racker    | 项目源码 | 文件源码
def format_event(event, context):
    output_context = {
        'aws_request_id': context.aws_request_id,
        'client_context': context.client_context,
        'function_name': context.function_name,
        'function_version': context.function_version,
        'get_remaining_time_in_millis': context.get_remaining_time_in_millis(),
        'invoked_function_arn': context.invoked_function_arn,
        'log_group_name': context.log_group_name,
        'log_stream_name': context.log_stream_name,
        'memory_limit_in_mb': context.memory_limit_in_mb,
    }

    hashed_event = Hasher(event)

    request = {
        'requested-at': datetime.datetime.utcnow().isoformat(),
        'context': output_context,
        'operation': hashed_event['operation'],
        'requestor': hashed_event['parameters']['requestor'],
        'body': hashed_event['parameters']['request']['body'],
        'path': hashed_event['parameters']['request']['path'],
        'querystring': hashed_event['parameters']['request']['querystring'],
        'header': CaseInsensitiveDict(
            hashed_event['parameters']['request']['header']),
        'gateway': hashed_event['parameters']['gateway'],
    }

    return request
项目:Callandtext    作者:iaora    | 项目源码 | 文件源码
def send(self, request, stream=None, timeout=None, verify=None, cert=None,
             proxies=None):
        parsed_url = urlparse.urlparse(request.url)

        # We only work for requests with a host of localhost
        if parsed_url.netloc.lower() != "localhost":
            raise InvalidURL("Invalid URL %r: Only localhost is allowed" %
                request.url)

        real_url = urlparse.urlunparse(parsed_url[:1] + ("",) + parsed_url[2:])
        pathname = url_to_path(real_url)

        resp = Response()
        resp.status_code = 200
        resp.url = real_url

        stats = os.stat(pathname)
        modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
        resp.headers = CaseInsensitiveDict({
            "Content-Type": mimetypes.guess_type(pathname)[0] or "text/plain",
            "Content-Length": stats.st_size,
            "Last-Modified": modified,
        })

        resp.raw = LocalFSResponse(open(pathname, "rb"))
        resp.close = resp.raw.close

        return resp
项目:python_ddd_flask    作者:igorvinnicius    | 项目源码 | 文件源码
def send(self, request, stream=None, timeout=None, verify=None, cert=None,
             proxies=None):
        parsed_url = urlparse.urlparse(request.url)

        # We only work for requests with a host of localhost
        if parsed_url.netloc.lower() != "localhost":
            raise InvalidURL("Invalid URL %r: Only localhost is allowed" %
                request.url)

        real_url = urlparse.urlunparse(parsed_url[:1] + ("",) + parsed_url[2:])
        pathname = url_to_path(real_url)

        resp = Response()
        resp.status_code = 200
        resp.url = real_url

        stats = os.stat(pathname)
        modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
        resp.headers = CaseInsensitiveDict({
            "Content-Type": mimetypes.guess_type(pathname)[0] or "text/plain",
            "Content-Length": stats.st_size,
            "Last-Modified": modified,
        })

        resp.raw = LocalFSResponse(open(pathname, "rb"))
        resp.close = resp.raw.close

        return resp
项目:gcdt    作者:glomex    | 项目源码 | 文件源码
def serialize_patch(obj):
    """Convert objects into JSON structures."""
    # Record class and module information for deserialization

    result = {'__class__': obj.__class__.__name__}
    try:
        result['__module__'] = obj.__module__
    except AttributeError:
        pass
    # Convert objects to dictionary representation based on type
    if isinstance(obj, datetime.datetime):
        result['year'] = obj.year
        result['month'] = obj.month
        result['day'] = obj.day
        result['hour'] = obj.hour
        result['minute'] = obj.minute
        result['second'] = obj.second
        result['microsecond'] = obj.microsecond
        return result
    if isinstance(obj, StreamingBody):
        original_text = obj.read()

        # We remove a BOM here if it exists so that it doesn't get reencoded
        # later on into a UTF-16 string, presumably by the json library
        result['body'] = original_text.decode('utf-8-sig')

        obj._raw_stream = BytesIO(original_text)
        obj._amount_read = 0
        return result
    if isinstance(obj, CaseInsensitiveDict):
        result['as_dict'] = dict(obj)
        return result
    raise TypeError('Type not serializable')
项目:rets    作者:opendoor-labs    | 项目源码 | 文件源码
def make_response(status_code: int = 200,
                  content: bytes = b'',
                  headers: dict = None,
                  reason: str = None,
                  encoding: str = None,
                  ) -> Response:
    response = Response()
    response.status_code = status_code
    response._content = content
    response._content_consumed = True
    response.headers = CaseInsensitiveDict(headers or {})
    response.encoding = encoding or get_encoding_from_headers(headers or {})
    response.reason = reason
    return response
项目:rets    作者:opendoor-labs    | 项目源码 | 文件源码
def _guess_mime_type(headers: CaseInsensitiveDict) -> Optional[str]:
    location = headers.get('location')
    if location:
        mime_type, _ = mimetypes.guess_type(location)
        if mime_type:
            return mime_type

    # Parse mime type from content-type header, e.g. 'image/jpeg;charset=US-ASCII' -> 'image/jpeg'
    mime_type, _ = cgi.parse_header(headers.get('content-type', ''))
    return mime_type or None
项目:rets    作者:opendoor-labs    | 项目源码 | 文件源码
def _decode_headers(headers: CaseInsensitiveDict, encoding: str) -> CaseInsensitiveDict:
    return CaseInsensitiveDict({
        k.decode(encoding): v.decode(encoding)
        for k, v in headers.items()
    })
项目:deb-python-dcos    作者:openstack    | 项目源码 | 文件源码
def _pod_response_fixture(headers=None):
    mock_response = mock.create_autospec(requests.Response)

    headers = CaseInsensitiveDict({} if headers is None else headers)
    mock_response.headers = headers

    return mock_response
项目:fulmar    作者:tylderen    | 项目源码 | 文件源码
def __init__(self):
        self.status_code = None
        self.url = None
        self.orig_url = None
        self.headers = CaseInsensitiveDict()
        self.content = ''
        self.cookies = {}
        self.error = None
        self.save = None
        self.js_script_result = None
        self.time = 0
项目:fulmar    作者:tylderen    | 项目源码 | 文件源码
def rebuild_response(r):
    response = Response()
    response.status_code = r.get('status_code', 599)
    response.url = r.get('url', '')
    response.orig_url = r.get('orig_url', response.url)
    response.headers = CaseInsensitiveDict(r.get('headers', {}))
    response.content = r.get('content', '')
    response.cookies = r.get('cookies', {})
    response.error = r.get('error')
    response.time_cost = r.get('time_cost', 0)
    response.js_script_result = r.get('js_script_result')
    return response
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def test_docstring_example(self):
        cid = CaseInsensitiveDict()
        cid['Accept'] = 'application/json'
        assert cid['aCCEPT'] == 'application/json'
        assert list(cid) == ['Accept']
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def test_len(self):
        cid = CaseInsensitiveDict({'a': 'a', 'b': 'b'})
        cid['A'] = 'a'
        assert len(cid) == 2
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def test_getitem(self):
        cid = CaseInsensitiveDict({'Spam': 'blueval'})
        assert cid['spam'] == 'blueval'
        assert cid['SPAM'] == 'blueval'