Python rfc822 模块,formatdate() 实例源码

我们从Python开源项目中,提取了以下4个代码示例,用于说明如何使用rfc822.formatdate()

项目:cloud-ml-sdk    作者:XiaoMi    | 项目源码 | 文件源码
def __set_headers(self):
    headers = dict()
    headers[configs.CONTENT_TYPE] = "application/json; charset=UTF-8"
    headers[configs.DATE] = rfc822.formatdate(time.time())
    headers[configs.REQUEST_ID] = utils.request_id()
    headers[configs.CONTENT_MD5] = ""
    return headers
项目:lantern-detection    作者:gongxijun    | 项目源码 | 文件源码
def send_headers(self):
        """Assert, process, and send the HTTP response message-headers."""
        hkeys = [key.lower() for key, value in self.outheaders]
        status = int(self.status[:3])

        if status == 413:
            # Request Entity Too Large. Close conn to avoid garbage.
            self.close_connection = True
        elif "content-length" not in hkeys:
            # "All 1xx (informational), 204 (no content),
            # and 304 (not modified) responses MUST NOT
            # include a message-body." So no point chunking.
            if status < 200 or status in (204, 205, 304):
                pass
            else:
                if self.response_protocol == 'HTTP/1.1':
                    # Use the chunked transfer-coding
                    self.chunked_write = True
                    self.outheaders.append(("Transfer-Encoding", "chunked"))
                else:
                    # Closing the conn is the only way to determine len.
                    self.close_connection = True

        if "connection" not in hkeys:
            if self.response_protocol == 'HTTP/1.1':
                if self.close_connection:
                    self.outheaders.append(("Connection", "close"))
            else:
                if not self.close_connection:
                    self.outheaders.append(("Connection", "Keep-Alive"))

        if "date" not in hkeys:
            self.outheaders.append(("Date", rfc822.formatdate()))

        server = self.connection.server

        if "server" not in hkeys:
            self.outheaders.append(("Server", server.version))

        buf = [server.protocol, " ", self.status, "\r\n"]
        try:
            buf += [k + ": " + v + "\r\n" for k, v in self.outheaders]
        except TypeError:
            if not isinstance(k, str):
                raise TypeError("WSGI response header key %r is not a string.")
            if not isinstance(v, str):
                raise TypeError("WSGI response header value %r is not a string.")
            else:
                raise
        buf.append("\r\n")
        self.sendall("".join(buf))
项目:lantern-detection    作者:gongxijun    | 项目源码 | 文件源码
def send_headers(self):
        """Assert, process, and send the HTTP response message-headers."""
        hkeys = [key.lower() for key, value in self.outheaders]
        status = int(self.status[:3])

        if status == 413:
            # Request Entity Too Large. Close conn to avoid garbage.
            self.close_connection = True
        elif "content-length" not in hkeys:
            # "All 1xx (informational), 204 (no content),
            # and 304 (not modified) responses MUST NOT
            # include a message-body." So no point chunking.
            if status < 200 or status in (204, 205, 304):
                pass
            else:
                if self.response_protocol == 'HTTP/1.1':
                    # Use the chunked transfer-coding
                    self.chunked_write = True
                    self.outheaders.append(("Transfer-Encoding", "chunked"))
                else:
                    # Closing the conn is the only way to determine len.
                    self.close_connection = True

        if "connection" not in hkeys:
            if self.response_protocol == 'HTTP/1.1':
                if self.close_connection:
                    self.outheaders.append(("Connection", "close"))
            else:
                if not self.close_connection:
                    self.outheaders.append(("Connection", "Keep-Alive"))

        if "date" not in hkeys:
            self.outheaders.append(("Date", rfc822.formatdate()))

        server = self.connection.server

        if "server" not in hkeys:
            self.outheaders.append(("Server", server.version))

        buf = [server.protocol, " ", self.status, "\r\n"]
        try:
            buf += [k + ": " + v + "\r\n" for k, v in self.outheaders]
        except TypeError:
            if not isinstance(k, str):
                raise TypeError("WSGI response header key %r is not a string.")
            if not isinstance(v, str):
                raise TypeError("WSGI response header value %r is not a string.")
            else:
                raise
        buf.append("\r\n")
        self.sendall("".join(buf))
项目:viewvc    作者:viewvc    | 项目源码 | 文件源码
def check_freshness(request, mtime=None, etag=None, weak=0):
  cfg = request.cfg

  # See if we are supposed to disable etags (for debugging, usually)
  if not cfg.options.generate_etags:
    return 0

  request_etag = request_mtime = None
  if etag is not None:
    if weak:
      etag = 'W/"%s"' % etag
    else:
      etag = '"%s"' % etag
    request_etag = request.server.getenv('HTTP_IF_NONE_MATCH')
  if mtime is not None:
    try:
      request_mtime = request.server.getenv('HTTP_IF_MODIFIED_SINCE')
      request_mtime = rfc822.mktime_tz(rfc822.parsedate_tz(request_mtime))
    except:
      request_mtime = None

  # if we have an etag, use that for freshness checking.
  # if not available, then we use the last-modified time.
  # if not available, then the document isn't fresh.
  if etag is not None:
    isfresh = (request_etag == etag)
  elif mtime is not None:
    isfresh = (request_mtime >= mtime)
  else:
    isfresh = 0

  # require revalidation after the configured amount of time
  if cfg and cfg.options.http_expiration_time >= 0:
    expiration = rfc822.formatdate(time.time() +
                                   cfg.options.http_expiration_time)
    request.server.addheader('Expires', expiration)
    request.server.addheader('Cache-Control',
                             'max-age=%d' % cfg.options.http_expiration_time)

  if isfresh:
    request.server.header(status='304 Not Modified')
  else:
    if etag is not None:
      request.server.addheader('ETag', etag)
    if mtime is not None:
      request.server.addheader('Last-Modified', rfc822.formatdate(mtime))
  return isfresh