Python werkzeug.datastructures 模块,Headers() 实例源码

我们从Python开源项目中,提取了以下44个代码示例,用于说明如何使用werkzeug.datastructures.Headers()

项目:oclubs    作者:SHSIDers    | 项目源码 | 文件源码
def download_xlsx(filename, info):
    '''Çreate xlsx file for given info and download it '''
    output = StringIO()
    workbook = xlsxwriter.Workbook(output, {'in_memory': True})
    worksheet = workbook.add_worksheet()
    row_num = 0
    col_num = 0

    for row in info:
        for grid in row:
            worksheet.write(row_num, col_num, grid)
            col_num += 1
        col_num = 0
        row_num += 1
    workbook.close()
    output.seek(0)
    headers = Headers()
    headers.set('Content-Disposition', 'attachment', filename=filename)
    return Response(output.read(), mimetype='application/vnd.openxmlformats-'
                    'officedocument.spreadsheetml.sheet', headers=headers)
项目:sqlAdvistorConsole    作者:huangyiminghappy    | 项目源码 | 文件源码
def __init__(self, response=None, **kwargs):
        if 'mimetype' not in kwargs and 'contenttype' not in kwargs:
            if response.startswith('<?xml'):
                kwargs['mimetype'] = 'application/xml'
        kwargs['headers'] = ''
        headers = kwargs.get('headers')
        # ???? 
        origin = ('Access-Control-Allow-Origin', '*')
        methods = ('Access-Control-Allow-Methods', 'HEAD, OPTIONS, GET, POST, DELETE, PUT')
        if headers:
            headers.add(*origin)
            headers.add(*methods)
        else:
            headers = Headers([origin, methods])
        kwargs['headers'] = headers
        return super(MyResponse, self).__init__(response, **kwargs)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def parse_multipart_headers(iterable):
    """Parses multipart headers from an iterable that yields lines (including
    the trailing newline symbol).  The iterable has to be newline terminated.

    The iterable will stop at the line where the headers ended so it can be
    further consumed.

    :param iterable: iterable of strings that are newline terminated
    """
    result = []
    for line in iterable:
        line = to_native(line)
        line, line_terminated = _line_parse(line)
        if not line_terminated:
            raise ValueError('unexpected end of line in multipart header')
        if not line:
            break
        elif line[0] in ' \t' and result:
            key, value = result[-1]
            result[-1] = (key, value + '\n ' + line[1:])
        else:
            parts = line.split(':', 1)
            if len(parts) == 2:
                result.append((parts[0].strip(), parts[1].strip()))

    # we link the list to the headers, no need to create a copy, the
    # list was not shared anyways.
    return Headers(result)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def run_fixed(self, environ, start_response):
        def fixing_start_response(status, headers, exc_info=None):
            headers = Headers(headers)
            self.fix_headers(environ, headers, status)
            return start_response(status, headers.to_wsgi_list(), exc_info)
        return self.app(environ, fixing_start_response)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def check_start_response(self, status, headers, exc_info):
        check_string('status', status)
        status_code = status.split(None, 1)[0]
        if len(status_code) != 3 or not status_code.isdigit():
            warn(WSGIWarning('Status code must be three digits'), stacklevel=3)
        if len(status) < 4 or status[3] != ' ':
            warn(WSGIWarning('Invalid value for status %r.  Valid '
                             'status strings are three digits, a space '
                             'and a status explanation'), stacklevel=3)
        status_code = int(status_code)
        if status_code < 100:
            warn(WSGIWarning('status code < 100 detected'), stacklevel=3)

        if type(headers) is not list:
            warn(WSGIWarning('header list is not a list'), stacklevel=3)
        for item in headers:
            if type(item) is not tuple or len(item) != 2:
                warn(WSGIWarning('Headers must tuple 2-item tuples'),
                     stacklevel=3)
            name, value = item
            if type(name) is not str or type(value) is not str:
                warn(WSGIWarning('header items must be strings'),
                     stacklevel=3)
            if name.lower() == 'status':
                warn(WSGIWarning('The status header is not supported due to '
                                 'conflicts with the CGI spec.'),
                     stacklevel=3)

        if exc_info is not None and not isinstance(exc_info, tuple):
            warn(WSGIWarning('invalid value for exc_info'), stacklevel=3)

        headers = Headers(headers)
        self.check_headers(headers)

        return status_code, headers
项目:craton    作者:openstack    | 项目源码 | 文件源码
def type_convert(self, obj):
        if obj is None:
            return None
        if isinstance(obj, (dict, list)) and not isinstance(obj, MultiDict):
            return obj
        if isinstance(obj, Headers):
            obj = MultiDict(obj)
        result = dict()

        convert_funs = {
            'integer': lambda v: int(v[0]),
            'boolean': lambda v: v[0].lower() not in ['n', 'no',
                                                      'false', '', '0'],
            'null': lambda v: None,
            'number': lambda v: float(v[0]),
            'string': lambda v: v[0]
        }

        def convert_array(type_, v):
            func = convert_funs.get(type_, lambda v: v[0])
            return [func([i]) for i in v]

        for k, values in obj.lists():
            prop = self.validator.schema['properties'].get(k, {})
            type_ = prop.get('type')
            fun = convert_funs.get(type_, lambda v: v[0])
            if type_ == 'array':
                item_type = prop.get('items', {}).get('type')
                result[k] = convert_array(item_type, values)
            else:
                result[k] = fun(values)
        return result
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def parse_multipart_headers(iterable):
    """Parses multipart headers from an iterable that yields lines (including
    the trailing newline symbol).  The iterable has to be newline terminated.

    The iterable will stop at the line where the headers ended so it can be
    further consumed.

    :param iterable: iterable of strings that are newline terminated
    """
    result = []
    for line in iterable:
        line = to_native(line)
        line, line_terminated = _line_parse(line)
        if not line_terminated:
            raise ValueError('unexpected end of line in multipart header')
        if not line:
            break
        elif line[0] in ' \t' and result:
            key, value = result[-1]
            result[-1] = (key, value + '\n ' + line[1:])
        else:
            parts = line.split(':', 1)
            if len(parts) == 2:
                result.append((parts[0].strip(), parts[1].strip()))

    # we link the list to the headers, no need to create a copy, the
    # list was not shared anyways.
    return Headers(result)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def run_fixed(self, environ, start_response):
        def fixing_start_response(status, headers, exc_info=None):
            headers = Headers(headers)
            self.fix_headers(environ, headers, status)
            return start_response(status, headers.to_wsgi_list(), exc_info)
        return self.app(environ, fixing_start_response)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def check_start_response(self, status, headers, exc_info):
        check_string('status', status)
        status_code = status.split(None, 1)[0]
        if len(status_code) != 3 or not status_code.isdigit():
            warn(WSGIWarning('Status code must be three digits'), stacklevel=3)
        if len(status) < 4 or status[3] != ' ':
            warn(WSGIWarning('Invalid value for status %r.  Valid '
                             'status strings are three digits, a space '
                             'and a status explanation'), stacklevel=3)
        status_code = int(status_code)
        if status_code < 100:
            warn(WSGIWarning('status code < 100 detected'), stacklevel=3)

        if type(headers) is not list:
            warn(WSGIWarning('header list is not a list'), stacklevel=3)
        for item in headers:
            if type(item) is not tuple or len(item) != 2:
                warn(WSGIWarning('Headers must tuple 2-item tuples'),
                     stacklevel=3)
            name, value = item
            if type(name) is not str or type(value) is not str:
                warn(WSGIWarning('header items must be strings'),
                     stacklevel=3)
            if name.lower() == 'status':
                warn(WSGIWarning('The status header is not supported due to '
                                 'conflicts with the CGI spec.'),
                     stacklevel=3)

        if exc_info is not None and not isinstance(exc_info, tuple):
            warn(WSGIWarning('invalid value for exc_info'), stacklevel=3)

        headers = Headers(headers)
        self.check_headers(headers)

        return status_code, headers
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def parse_multipart_headers(iterable):
    """Parses multipart headers from an iterable that yields lines (including
    the trailing newline symbol).  The iterable has to be newline terminated.

    The iterable will stop at the line where the headers ended so it can be
    further consumed.

    :param iterable: iterable of strings that are newline terminated
    """
    result = []
    for line in iterable:
        line = to_native(line)
        line, line_terminated = _line_parse(line)
        if not line_terminated:
            raise ValueError('unexpected end of line in multipart header')
        if not line:
            break
        elif line[0] in ' \t' and result:
            key, value = result[-1]
            result[-1] = (key, value + '\n ' + line[1:])
        else:
            parts = line.split(':', 1)
            if len(parts) == 2:
                result.append((parts[0].strip(), parts[1].strip()))

    # we link the list to the headers, no need to create a copy, the
    # list was not shared anyways.
    return Headers(result)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def run_fixed(self, environ, start_response):
        def fixing_start_response(status, headers, exc_info=None):
            headers = Headers(headers)
            self.fix_headers(environ, headers, status)
            return start_response(status, headers.to_wsgi_list(), exc_info)
        return self.app(environ, fixing_start_response)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def check_start_response(self, status, headers, exc_info):
        check_string('status', status)
        status_code = status.split(None, 1)[0]
        if len(status_code) != 3 or not status_code.isdigit():
            warn(WSGIWarning('Status code must be three digits'), stacklevel=3)
        if len(status) < 4 or status[3] != ' ':
            warn(WSGIWarning('Invalid value for status %r.  Valid '
                             'status strings are three digits, a space '
                             'and a status explanation'), stacklevel=3)
        status_code = int(status_code)
        if status_code < 100:
            warn(WSGIWarning('status code < 100 detected'), stacklevel=3)

        if type(headers) is not list:
            warn(WSGIWarning('header list is not a list'), stacklevel=3)
        for item in headers:
            if type(item) is not tuple or len(item) != 2:
                warn(WSGIWarning('Headers must tuple 2-item tuples'),
                     stacklevel=3)
            name, value = item
            if type(name) is not str or type(value) is not str:
                warn(WSGIWarning('header items must be strings'),
                     stacklevel=3)
            if name.lower() == 'status':
                warn(WSGIWarning('The status header is not supported due to '
                                 'conflicts with the CGI spec.'),
                     stacklevel=3)

        if exc_info is not None and not isinstance(exc_info, tuple):
            warn(WSGIWarning('invalid value for exc_info'), stacklevel=3)

        headers = Headers(headers)
        self.check_headers(headers)

        return status_code, headers
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def parse_multipart_headers(iterable):
    """Parses multipart headers from an iterable that yields lines (including
    the trailing newline symbol).  The iterable has to be newline terminated.

    The iterable will stop at the line where the headers ended so it can be
    further consumed.

    :param iterable: iterable of strings that are newline terminated
    """
    result = []
    for line in iterable:
        line = to_native(line)
        line, line_terminated = _line_parse(line)
        if not line_terminated:
            raise ValueError('unexpected end of line in multipart header')
        if not line:
            break
        elif line[0] in ' \t' and result:
            key, value = result[-1]
            result[-1] = (key, value + '\n ' + line[1:])
        else:
            parts = line.split(':', 1)
            if len(parts) == 2:
                result.append((parts[0].strip(), parts[1].strip()))

    # we link the list to the headers, no need to create a copy, the
    # list was not shared anyways.
    return Headers(result)
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def run_fixed(self, environ, start_response):
        def fixing_start_response(status, headers, exc_info=None):
            headers = Headers(headers)
            self.fix_headers(environ, headers, status)
            return start_response(status, headers.to_wsgi_list(), exc_info)
        return self.app(environ, fixing_start_response)
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def check_start_response(self, status, headers, exc_info):
        check_string('status', status)
        status_code = status.split(None, 1)[0]
        if len(status_code) != 3 or not status_code.isdigit():
            warn(WSGIWarning('Status code must be three digits'), stacklevel=3)
        if len(status) < 4 or status[3] != ' ':
            warn(WSGIWarning('Invalid value for status %r.  Valid '
                             'status strings are three digits, a space '
                             'and a status explanation'), stacklevel=3)
        status_code = int(status_code)
        if status_code < 100:
            warn(WSGIWarning('status code < 100 detected'), stacklevel=3)

        if type(headers) is not list:
            warn(WSGIWarning('header list is not a list'), stacklevel=3)
        for item in headers:
            if type(item) is not tuple or len(item) != 2:
                warn(WSGIWarning('Headers must tuple 2-item tuples'),
                     stacklevel=3)
            name, value = item
            if type(name) is not str or type(value) is not str:
                warn(WSGIWarning('header items must be strings'),
                     stacklevel=3)
            if name.lower() == 'status':
                warn(WSGIWarning('The status header is not supported due to '
                                 'conflicts with the CGI spec.'),
                     stacklevel=3)

        if exc_info is not None and not isinstance(exc_info, tuple):
            warn(WSGIWarning('invalid value for exc_info'), stacklevel=3)

        headers = Headers(headers)
        self.check_headers(headers)

        return status_code, headers
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def parse_multipart_headers(iterable):
    """Parses multipart headers from an iterable that yields lines (including
    the trailing newline symbol).  The iterable has to be newline terminated.

    The iterable will stop at the line where the headers ended so it can be
    further consumed.

    :param iterable: iterable of strings that are newline terminated
    """
    result = []
    for line in iterable:
        line = to_native(line)
        line, line_terminated = _line_parse(line)
        if not line_terminated:
            raise ValueError('unexpected end of line in multipart header')
        if not line:
            break
        elif line[0] in ' \t' and result:
            key, value = result[-1]
            result[-1] = (key, value + '\n ' + line[1:])
        else:
            parts = line.split(':', 1)
            if len(parts) == 2:
                result.append((parts[0].strip(), parts[1].strip()))

    # we link the list to the headers, no need to create a copy, the
    # list was not shared anyways.
    return Headers(result)
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def run_fixed(self, environ, start_response):
        def fixing_start_response(status, headers, exc_info=None):
            headers = Headers(headers)
            self.fix_headers(environ, headers, status)
            return start_response(status, headers.to_wsgi_list(), exc_info)
        return self.app(environ, fixing_start_response)
项目:harbour-sailfinder    作者:DylanVanAssche    | 项目源码 | 文件源码
def parse_multipart_headers(iterable):
    """Parses multipart headers from an iterable that yields lines (including
    the trailing newline symbol).  The iterable has to be newline terminated.

    The iterable will stop at the line where the headers ended so it can be
    further consumed.

    :param iterable: iterable of strings that are newline terminated
    """
    result = []
    for line in iterable:
        line = to_native(line)
        line, line_terminated = _line_parse(line)
        if not line_terminated:
            raise ValueError('unexpected end of line in multipart header')
        if not line:
            break
        elif line[0] in ' \t' and result:
            key, value = result[-1]
            result[-1] = (key, value + '\n ' + line[1:])
        else:
            parts = line.split(':', 1)
            if len(parts) == 2:
                result.append((parts[0].strip(), parts[1].strip()))

    # we link the list to the headers, no need to create a copy, the
    # list was not shared anyways.
    return Headers(result)
项目:harbour-sailfinder    作者:DylanVanAssche    | 项目源码 | 文件源码
def run_fixed(self, environ, start_response):
        def fixing_start_response(status, headers, exc_info=None):
            headers = Headers(headers)
            self.fix_headers(environ, headers, status)
            return start_response(status, headers.to_wsgi_list(), exc_info)
        return self.app(environ, fixing_start_response)
项目:harbour-sailfinder    作者:DylanVanAssche    | 项目源码 | 文件源码
def check_start_response(self, status, headers, exc_info):
        check_string('status', status)
        status_code = status.split(None, 1)[0]
        if len(status_code) != 3 or not status_code.isdigit():
            warn(WSGIWarning('Status code must be three digits'), stacklevel=3)
        if len(status) < 4 or status[3] != ' ':
            warn(WSGIWarning('Invalid value for status %r.  Valid '
                             'status strings are three digits, a space '
                             'and a status explanation'), stacklevel=3)
        status_code = int(status_code)
        if status_code < 100:
            warn(WSGIWarning('status code < 100 detected'), stacklevel=3)

        if type(headers) is not list:
            warn(WSGIWarning('header list is not a list'), stacklevel=3)
        for item in headers:
            if type(item) is not tuple or len(item) != 2:
                warn(WSGIWarning('Headers must tuple 2-item tuples'),
                     stacklevel=3)
            name, value = item
            if type(name) is not str or type(value) is not str:
                warn(WSGIWarning('header items must be strings'),
                     stacklevel=3)
            if name.lower() == 'status':
                warn(WSGIWarning('The status header is not supported due to '
                                 'conflicts with the CGI spec.'),
                     stacklevel=3)

        if exc_info is not None and not isinstance(exc_info, tuple):
            warn(WSGIWarning('invalid value for exc_info'), stacklevel=3)

        headers = Headers(headers)
        self.check_headers(headers)

        return status_code, headers
项目:harbour-sailfinder    作者:DylanVanAssche    | 项目源码 | 文件源码
def parse_multipart_headers(iterable):
    """Parses multipart headers from an iterable that yields lines (including
    the trailing newline symbol).  The iterable has to be newline terminated.

    The iterable will stop at the line where the headers ended so it can be
    further consumed.

    :param iterable: iterable of strings that are newline terminated
    """
    result = []
    for line in iterable:
        line = to_native(line)
        line, line_terminated = _line_parse(line)
        if not line_terminated:
            raise ValueError('unexpected end of line in multipart header')
        if not line:
            break
        elif line[0] in ' \t' and result:
            key, value = result[-1]
            result[-1] = (key, value + '\n ' + line[1:])
        else:
            parts = line.split(':', 1)
            if len(parts) == 2:
                result.append((parts[0].strip(), parts[1].strip()))

    # we link the list to the headers, no need to create a copy, the
    # list was not shared anyways.
    return Headers(result)
项目:harbour-sailfinder    作者:DylanVanAssche    | 项目源码 | 文件源码
def run_fixed(self, environ, start_response):
        def fixing_start_response(status, headers, exc_info=None):
            headers = Headers(headers)
            self.fix_headers(environ, headers, status)
            return start_response(status, headers.to_wsgi_list(), exc_info)
        return self.app(environ, fixing_start_response)
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
def parse_multipart_headers(iterable):
    """Parses multipart headers from an iterable that yields lines (including
    the trailing newline symbol).  The iterable has to be newline terminated.

    The iterable will stop at the line where the headers ended so it can be
    further consumed.

    :param iterable: iterable of strings that are newline terminated
    """
    result = []
    for line in iterable:
        line = to_native(line)
        line, line_terminated = _line_parse(line)
        if not line_terminated:
            raise ValueError('unexpected end of line in multipart header')
        if not line:
            break
        elif line[0] in ' \t' and result:
            key, value = result[-1]
            result[-1] = (key, value + '\n ' + line[1:])
        else:
            parts = line.split(':', 1)
            if len(parts) == 2:
                result.append((parts[0].strip(), parts[1].strip()))

    # we link the list to the headers, no need to create a copy, the
    # list was not shared anyways.
    return Headers(result)
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
def run_fixed(self, environ, start_response):
        def fixing_start_response(status, headers, exc_info=None):
            headers = Headers(headers)
            self.fix_headers(environ, headers, status)
            return start_response(status, headers.to_wsgi_list(), exc_info)
        return self.app(environ, fixing_start_response)
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
def check_start_response(self, status, headers, exc_info):
        check_string('status', status)
        status_code = status.split(None, 1)[0]
        if len(status_code) != 3 or not status_code.isdigit():
            warn(WSGIWarning('Status code must be three digits'), stacklevel=3)
        if len(status) < 4 or status[3] != ' ':
            warn(WSGIWarning('Invalid value for status %r.  Valid '
                             'status strings are three digits, a space '
                             'and a status explanation'), stacklevel=3)
        status_code = int(status_code)
        if status_code < 100:
            warn(WSGIWarning('status code < 100 detected'), stacklevel=3)

        if type(headers) is not list:
            warn(WSGIWarning('header list is not a list'), stacklevel=3)
        for item in headers:
            if type(item) is not tuple or len(item) != 2:
                warn(WSGIWarning('Headers must tuple 2-item tuples'),
                     stacklevel=3)
            name, value = item
            if type(name) is not str or type(value) is not str:
                warn(WSGIWarning('header items must be strings'),
                     stacklevel=3)
            if name.lower() == 'status':
                warn(WSGIWarning('The status header is not supported due to '
                                 'conflicts with the CGI spec.'),
                     stacklevel=3)

        if exc_info is not None and not isinstance(exc_info, tuple):
            warn(WSGIWarning('invalid value for exc_info'), stacklevel=3)

        headers = Headers(headers)
        self.check_headers(headers)

        return status_code, headers
项目:arithmancer    作者:google    | 项目源码 | 文件源码
def parse_multipart_headers(iterable):
    """Parses multipart headers from an iterable that yields lines (including
    the trailing newline symbol).  The iterable has to be newline terminated.

    The iterable will stop at the line where the headers ended so it can be
    further consumed.

    :param iterable: iterable of strings that are newline terminated
    """
    result = []
    for line in iterable:
        line = to_native(line)
        line, line_terminated = _line_parse(line)
        if not line_terminated:
            raise ValueError('unexpected end of line in multipart header')
        if not line:
            break
        elif line[0] in ' \t' and result:
            key, value = result[-1]
            result[-1] = (key, value + '\n ' + line[1:])
        else:
            parts = line.split(':', 1)
            if len(parts) == 2:
                result.append((parts[0].strip(), parts[1].strip()))

    # we link the list to the headers, no need to create a copy, the
    # list was not shared anyways.
    return Headers(result)
项目:arithmancer    作者:google    | 项目源码 | 文件源码
def run_fixed(self, environ, start_response):
        def fixing_start_response(status, headers, exc_info=None):
            headers = Headers(headers)
            self.fix_headers(environ, headers, status)
            return start_response(status, headers.to_wsgi_list(), exc_info)
        return self.app(environ, fixing_start_response)
项目:arithmancer    作者:google    | 项目源码 | 文件源码
def check_start_response(self, status, headers, exc_info):
        check_string('status', status)
        status_code = status.split(None, 1)[0]
        if len(status_code) != 3 or not status_code.isdigit():
            warn(WSGIWarning('Status code must be three digits'), stacklevel=3)
        if len(status) < 4 or status[3] != ' ':
            warn(WSGIWarning('Invalid value for status %r.  Valid '
                             'status strings are three digits, a space '
                             'and a status explanation'), stacklevel=3)
        status_code = int(status_code)
        if status_code < 100:
            warn(WSGIWarning('status code < 100 detected'), stacklevel=3)

        if type(headers) is not list:
            warn(WSGIWarning('header list is not a list'), stacklevel=3)
        for item in headers:
            if type(item) is not tuple or len(item) != 2:
                warn(WSGIWarning('Headers must tuple 2-item tuples'),
                     stacklevel=3)
            name, value = item
            if type(name) is not str or type(value) is not str:
                warn(WSGIWarning('header items must be strings'),
                     stacklevel=3)
            if name.lower() == 'status':
                warn(WSGIWarning('The status header is not supported due to '
                                 'conflicts with the CGI spec.'),
                                 stacklevel=3)

        if exc_info is not None and not isinstance(exc_info, tuple):
            warn(WSGIWarning('invalid value for exc_info'), stacklevel=3)

        headers = Headers(headers)
        self.check_headers(headers)

        return status_code, headers
项目:tesismometro    作者:joapaspe    | 项目源码 | 文件源码
def parse_multipart_headers(iterable):
    """Parses multipart headers from an iterable that yields lines (including
    the trailing newline symbol).  The iterable has to be newline terminated.

    The iterable will stop at the line where the headers ended so it can be
    further consumed.

    :param iterable: iterable of strings that are newline terminated
    """
    result = []
    for line in iterable:
        line = to_native(line)
        line, line_terminated = _line_parse(line)
        if not line_terminated:
            raise ValueError('unexpected end of line in multipart header')
        if not line:
            break
        elif line[0] in ' \t' and result:
            key, value = result[-1]
            result[-1] = (key, value + '\n ' + line[1:])
        else:
            parts = line.split(':', 1)
            if len(parts) == 2:
                result.append((parts[0].strip(), parts[1].strip()))

    # we link the list to the headers, no need to create a copy, the
    # list was not shared anyways.
    return Headers(result)
项目:tesismometro    作者:joapaspe    | 项目源码 | 文件源码
def run_fixed(self, environ, start_response):
        def fixing_start_response(status, headers, exc_info=None):
            headers = Headers(headers)
            self.fix_headers(environ, headers, status)
            return start_response(status, headers.to_wsgi_list(), exc_info)
        return self.app(environ, fixing_start_response)
项目:tesismometro    作者:joapaspe    | 项目源码 | 文件源码
def check_start_response(self, status, headers, exc_info):
        check_string('status', status)
        status_code = status.split(None, 1)[0]
        if len(status_code) != 3 or not status_code.isdigit():
            warn(WSGIWarning('Status code must be three digits'), stacklevel=3)
        if len(status) < 4 or status[3] != ' ':
            warn(WSGIWarning('Invalid value for status %r.  Valid '
                             'status strings are three digits, a space '
                             'and a status explanation'), stacklevel=3)
        status_code = int(status_code)
        if status_code < 100:
            warn(WSGIWarning('status code < 100 detected'), stacklevel=3)

        if type(headers) is not list:
            warn(WSGIWarning('header list is not a list'), stacklevel=3)
        for item in headers:
            if type(item) is not tuple or len(item) != 2:
                warn(WSGIWarning('Headers must tuple 2-item tuples'),
                     stacklevel=3)
            name, value = item
            if type(name) is not str or type(value) is not str:
                warn(WSGIWarning('header items must be strings'),
                     stacklevel=3)
            if name.lower() == 'status':
                warn(WSGIWarning('The status header is not supported due to '
                                 'conflicts with the CGI spec.'),
                                 stacklevel=3)

        if exc_info is not None and not isinstance(exc_info, tuple):
            warn(WSGIWarning('invalid value for exc_info'), stacklevel=3)

        headers = Headers(headers)
        self.check_headers(headers)

        return status_code, headers
项目:RPoint    作者:george17-meet    | 项目源码 | 文件源码
def parse_multipart_headers(iterable):
    """Parses multipart headers from an iterable that yields lines (including
    the trailing newline symbol).  The iterable has to be newline terminated.

    The iterable will stop at the line where the headers ended so it can be
    further consumed.

    :param iterable: iterable of strings that are newline terminated
    """
    result = []
    for line in iterable:
        line = to_native(line)
        line, line_terminated = _line_parse(line)
        if not line_terminated:
            raise ValueError('unexpected end of line in multipart header')
        if not line:
            break
        elif line[0] in ' \t' and result:
            key, value = result[-1]
            result[-1] = (key, value + '\n ' + line[1:])
        else:
            parts = line.split(':', 1)
            if len(parts) == 2:
                result.append((parts[0].strip(), parts[1].strip()))

    # we link the list to the headers, no need to create a copy, the
    # list was not shared anyways.
    return Headers(result)
项目:RPoint    作者:george17-meet    | 项目源码 | 文件源码
def run_fixed(self, environ, start_response):
        def fixing_start_response(status, headers, exc_info=None):
            headers = Headers(headers)
            self.fix_headers(environ, headers, status)
            return start_response(status, headers.to_wsgi_list(), exc_info)
        return self.app(environ, fixing_start_response)
项目:RPoint    作者:george17-meet    | 项目源码 | 文件源码
def check_start_response(self, status, headers, exc_info):
        check_string('status', status)
        status_code = status.split(None, 1)[0]
        if len(status_code) != 3 or not status_code.isdigit():
            warn(WSGIWarning('Status code must be three digits'), stacklevel=3)
        if len(status) < 4 or status[3] != ' ':
            warn(WSGIWarning('Invalid value for status %r.  Valid '
                             'status strings are three digits, a space '
                             'and a status explanation'), stacklevel=3)
        status_code = int(status_code)
        if status_code < 100:
            warn(WSGIWarning('status code < 100 detected'), stacklevel=3)

        if type(headers) is not list:
            warn(WSGIWarning('header list is not a list'), stacklevel=3)
        for item in headers:
            if type(item) is not tuple or len(item) != 2:
                warn(WSGIWarning('Headers must tuple 2-item tuples'),
                     stacklevel=3)
            name, value = item
            if type(name) is not str or type(value) is not str:
                warn(WSGIWarning('header items must be strings'),
                     stacklevel=3)
            if name.lower() == 'status':
                warn(WSGIWarning('The status header is not supported due to '
                                 'conflicts with the CGI spec.'),
                     stacklevel=3)

        if exc_info is not None and not isinstance(exc_info, tuple):
            warn(WSGIWarning('invalid value for exc_info'), stacklevel=3)

        headers = Headers(headers)
        self.check_headers(headers)

        return status_code, headers
项目:isni-reconcile    作者:cmh2166    | 项目源码 | 文件源码
def parse_multipart_headers(iterable):
    """Parses multipart headers from an iterable that yields lines (including
    the trailing newline symbol).  The iterable has to be newline terminated.

    The iterable will stop at the line where the headers ended so it can be
    further consumed.

    :param iterable: iterable of strings that are newline terminated
    """
    result = []
    for line in iterable:
        line = to_native(line)
        line, line_terminated = _line_parse(line)
        if not line_terminated:
            raise ValueError('unexpected end of line in multipart header')
        if not line:
            break
        elif line[0] in ' \t' and result:
            key, value = result[-1]
            result[-1] = (key, value + '\n ' + line[1:])
        else:
            parts = line.split(':', 1)
            if len(parts) == 2:
                result.append((parts[0].strip(), parts[1].strip()))

    # we link the list to the headers, no need to create a copy, the
    # list was not shared anyways.
    return Headers(result)
项目:isni-reconcile    作者:cmh2166    | 项目源码 | 文件源码
def run_fixed(self, environ, start_response):
        def fixing_start_response(status, headers, exc_info=None):
            headers = Headers(headers)
            self.fix_headers(environ, headers, status)
            return start_response(status, headers.to_wsgi_list(), exc_info)
        return self.app(environ, fixing_start_response)
项目:isni-reconcile    作者:cmh2166    | 项目源码 | 文件源码
def check_start_response(self, status, headers, exc_info):
        check_string('status', status)
        status_code = status.split(None, 1)[0]
        if len(status_code) != 3 or not status_code.isdigit():
            warn(WSGIWarning('Status code must be three digits'), stacklevel=3)
        if len(status) < 4 or status[3] != ' ':
            warn(WSGIWarning('Invalid value for status %r.  Valid '
                             'status strings are three digits, a space '
                             'and a status explanation'), stacklevel=3)
        status_code = int(status_code)
        if status_code < 100:
            warn(WSGIWarning('status code < 100 detected'), stacklevel=3)

        if type(headers) is not list:
            warn(WSGIWarning('header list is not a list'), stacklevel=3)
        for item in headers:
            if type(item) is not tuple or len(item) != 2:
                warn(WSGIWarning('Headers must tuple 2-item tuples'),
                     stacklevel=3)
            name, value = item
            if type(name) is not str or type(value) is not str:
                warn(WSGIWarning('header items must be strings'),
                     stacklevel=3)
            if name.lower() == 'status':
                warn(WSGIWarning('The status header is not supported due to '
                                 'conflicts with the CGI spec.'),
                     stacklevel=3)

        if exc_info is not None and not isinstance(exc_info, tuple):
            warn(WSGIWarning('invalid value for exc_info'), stacklevel=3)

        headers = Headers(headers)
        self.check_headers(headers)

        return status_code, headers
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def parse_multipart_headers(iterable):
    """Parses multipart headers from an iterable that yields lines (including
    the trailing newline symbol).  The iterable has to be newline terminated.

    The iterable will stop at the line where the headers ended so it can be
    further consumed.

    :param iterable: iterable of strings that are newline terminated
    """
    result = []
    for line in iterable:
        line = to_native(line)
        line, line_terminated = _line_parse(line)
        if not line_terminated:
            raise ValueError('unexpected end of line in multipart header')
        if not line:
            break
        elif line[0] in ' \t' and result:
            key, value = result[-1]
            result[-1] = (key, value + '\n ' + line[1:])
        else:
            parts = line.split(':', 1)
            if len(parts) == 2:
                result.append((parts[0].strip(), parts[1].strip()))

    # we link the list to the headers, no need to create a copy, the
    # list was not shared anyways.
    return Headers(result)
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def run_fixed(self, environ, start_response):
        def fixing_start_response(status, headers, exc_info=None):
            headers = Headers(headers)
            self.fix_headers(environ, headers, status)
            return start_response(status, headers.to_wsgi_list(), exc_info)
        return self.app(environ, fixing_start_response)
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def check_start_response(self, status, headers, exc_info):
        check_string('status', status)
        status_code = status.split(None, 1)[0]
        if len(status_code) != 3 or not status_code.isdigit():
            warn(WSGIWarning('Status code must be three digits'), stacklevel=3)
        if len(status) < 4 or status[3] != ' ':
            warn(WSGIWarning('Invalid value for status %r.  Valid '
                             'status strings are three digits, a space '
                             'and a status explanation'), stacklevel=3)
        status_code = int(status_code)
        if status_code < 100:
            warn(WSGIWarning('status code < 100 detected'), stacklevel=3)

        if type(headers) is not list:
            warn(WSGIWarning('header list is not a list'), stacklevel=3)
        for item in headers:
            if type(item) is not tuple or len(item) != 2:
                warn(WSGIWarning('Headers must tuple 2-item tuples'),
                     stacklevel=3)
            name, value = item
            if type(name) is not str or type(value) is not str:
                warn(WSGIWarning('header items must be strings'),
                     stacklevel=3)
            if name.lower() == 'status':
                warn(WSGIWarning('The status header is not supported due to '
                                 'conflicts with the CGI spec.'),
                                 stacklevel=3)

        if exc_info is not None and not isinstance(exc_info, tuple):
            warn(WSGIWarning('invalid value for exc_info'), stacklevel=3)

        headers = Headers(headers)
        self.check_headers(headers)

        return status_code, headers
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_slicing(self):
        # there's nothing wrong with these being native strings
        # Headers doesn't care about the data types
        h = self.storage_class()
        h.set('X-Foo-Poo', 'bleh')
        h.set('Content-Type', 'application/whocares')
        h.set('X-Forwarded-For', '192.168.0.123')
        h[:] = [(k, v) for k, v in h if k.startswith(u'X-')]
        self.assert_equal(list(h), [
            ('X-Foo-Poo', 'bleh'),
            ('X-Forwarded-For', '192.168.0.123')
        ])
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_remove_entity_headers(self):
        now = http.http_date()
        headers1 = [('Date', now), ('Content-Type', 'text/html'), ('Content-Length', '0')]
        headers2 = datastructures.Headers(headers1)

        http.remove_entity_headers(headers1)
        assert headers1 == [('Date', now)]

        http.remove_entity_headers(headers2)
        self.assert_equal(headers2, datastructures.Headers([(u'Date', now)]))
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_remove_hop_by_hop_headers(self):
        headers1 = [('Connection', 'closed'), ('Foo', 'bar'),
                    ('Keep-Alive', 'wtf')]
        headers2 = datastructures.Headers(headers1)

        http.remove_hop_by_hop_headers(headers1)
        assert headers1 == [('Foo', 'bar')]

        http.remove_hop_by_hop_headers(headers2)
        assert headers2 == datastructures.Headers([('Foo', 'bar')])
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_cookie_unicode_dumping(self):
        val = http.dump_cookie('foo', u'\N{SNOWMAN}')
        h = datastructures.Headers()
        h.add('Set-Cookie', val)
        self.assert_equal(h['Set-Cookie'], 'foo="\\342\\230\\203"; Path=/')

        cookies = http.parse_cookie(h['Set-Cookie'])
        self.assert_equal(cookies['foo'], u'\N{SNOWMAN}')