Python email.message 模块,add_header() 实例源码

我们从Python开源项目中,提取了以下6个代码示例,用于说明如何使用email.message.add_header()

项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def _get_upload_content(field_storage):
  """Returns an email.Message holding the values of the file transfer.

  It decodes the content of the field storage and creates a new email.Message.

  Args:
    field_storage: cgi.FieldStorage that represents uploaded blob.

  Returns:
    An email.message.Message holding the upload information.
  """
  message = email.message.Message()
  message.add_header(
      'content-transfer-encoding',
      field_storage.headers.getheader('Content-Transfer-Encoding', ''))
  message.set_payload(field_storage.file.read())
  payload = message.get_payload(decode=True)
  return email.message_from_string(payload)
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def _get_upload_content(field_storage):
  """Returns an email.Message holding the values of the file transfer.

  It decodes the content of the field storage and creates a new email.Message.

  Args:
    field_storage: cgi.FieldStorage that represents uploaded blob.

  Returns:
    An email.message.Message holding the upload information.
  """
  message = email.message.Message()
  message.add_header(
      'content-transfer-encoding',
      field_storage.headers.getheader('Content-Transfer-Encoding', ''))
  message.set_payload(field_storage.file.read())
  payload = message.get_payload(decode=True)
  return email.message_from_string(payload)
项目:xxNet    作者:drzorm    | 项目源码 | 文件源码
def _get_upload_content(field_storage):
  """Returns an email.Message holding the values of the file transfer.

  It decodes the content of the field storage and creates a new email.Message.

  Args:
    field_storage: cgi.FieldStorage that represents uploaded blob.

  Returns:
    An email.message.Message holding the upload information.
  """
  message = email.message.Message()
  message.add_header(
      'content-transfer-encoding',
      field_storage.headers.getheader('Content-Transfer-Encoding', ''))
  message.set_payload(field_storage.file.read())
  payload = message.get_payload(decode=True)
  return email.message_from_string(payload)
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def _get_upload_content(field_storage):
  """Returns an email.Message holding the values of the file transfer.

  It decodes the content of the field storage and creates a new email.Message.

  Args:
    field_storage: cgi.FieldStorage that represents uploaded blob.

  Returns:
    An email.message.Message holding the upload information.
  """
  message = email.message.Message()
  message.add_header(
      'content-transfer-encoding',
      field_storage.headers.getheader('Content-Transfer-Encoding', ''))
  message.set_payload(field_storage.file.read())
  payload = message.get_payload(decode=True)
  return email.message_from_string(payload)
项目:Docker-XX-Net    作者:kuanghy    | 项目源码 | 文件源码
def _get_upload_content(field_storage):
  """Returns an email.Message holding the values of the file transfer.

  It decodes the content of the field storage and creates a new email.Message.

  Args:
    field_storage: cgi.FieldStorage that represents uploaded blob.

  Returns:
    An email.message.Message holding the upload information.
  """
  message = email.message.Message()
  message.add_header(
      'content-transfer-encoding',
      field_storage.headers.getheader('Content-Transfer-Encoding', ''))
  message.set_payload(field_storage.file.read())
  payload = message.get_payload(decode=True)
  return email.message_from_string(payload)
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def _run_test_success(self, upload_data, upload_url):
    """Basic dispatcher request flow."""
    request_path = urlparse.urlparse(upload_url)[2]

    # Get session key from upload url.
    session_key = upload_url.split('/')[-1]

    self.environ['PATH_INFO'] = request_path
    self.environ['CONTENT_TYPE'] = (
        'multipart/form-data; boundary="================1234=="')
    status, _, response_body, forward_environ, forward_body = (
        self.run_dispatcher(upload_data))

    self.assertEquals('200 OK', status)
    self.assertEquals('Forwarded successfully.', response_body)

    self.assertNotEquals(None, forward_environ)

    # These must NOT be unicode strings.
    self.assertIsInstance(forward_environ['PATH_INFO'], str)
    if 'QUERY_STRING' in forward_environ:
      self.assertIsInstance(forward_environ['QUERY_STRING'], str)
    self.assertRegexpMatches(forward_environ['CONTENT_TYPE'],
                             r'multipart/form-data; boundary="[^"]+"')
    self.assertEquals(len(forward_body), int(forward_environ['CONTENT_LENGTH']))
    self.assertIn(constants.FAKE_IS_ADMIN_HEADER, forward_environ)
    self.assertEquals('1', forward_environ[constants.FAKE_IS_ADMIN_HEADER])

    new_request = email.message_from_string(
        'Content-Type: %s\n\n%s' % (forward_environ['CONTENT_TYPE'],
                                    forward_body))
    (upload,) = new_request.get_payload()
    self.assertEquals('message/external-body', upload.get_content_type())

    message = email.message.Message()
    message.add_header('Content-Type', upload['Content-Type'])
    blob_key = message.get_param('blob-key')
    blob_contents = blobstore.BlobReader(blob_key).read()
    self.assertEquals('value', blob_contents)

    self.assertRaises(datastore_errors.EntityNotFoundError,
                      datastore.Get,
                      session_key)

    return upload, forward_environ, forward_body